Skip to content

Commit 6e5c301

Browse files
ekerstensEric Kerstens
andauthored
Remove wait_first and extra log (#240)
Co-authored-by: Eric Kerstens <ekerstens@expediagroup.com>
1 parent e67c8d7 commit 6e5c301

File tree

1 file changed

+1
-16
lines changed

1 file changed

+1
-16
lines changed

faust/transport/consumer.py

Lines changed: 1 addition & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -733,13 +733,6 @@ async def getmany(self, timeout: float) -> AsyncIterator[Tuple[TP, Message]]:
733733
# convert timestamp to seconds from int milliseconds.
734734
yield tp, to_message(tp, record)
735735

736-
async def _wait_suspend(self):
737-
"""Wrapper around self.suspend_flow.wait() with no return value.
738-
739-
This allows for easily
740-
"""
741-
await self.suspend_flow.wait()
742-
743736
async def _wait_next_records(
744737
self, timeout: float
745738
) -> Tuple[Optional[RecordMap], Optional[Set[TP]]]:
@@ -760,18 +753,10 @@ async def _wait_next_records(
760753
# Fetch records only if active partitions to avoid the risk of
761754
# fetching all partitions in the beginning when none of the
762755
# partitions is paused/resumed.
763-
_getmany = self._getmany(
756+
records = await self._getmany(
764757
active_partitions=active_partitions,
765758
timeout=timeout,
766759
)
767-
wait_results = await self.wait_first(
768-
_getmany,
769-
self.suspend_flow.wait(),
770-
)
771-
for coro, result in zip(wait_results.done, wait_results.results):
772-
if coro is _getmany:
773-
records = result
774-
break
775760
else:
776761
# We should still release to the event loop
777762
await self.sleep(1)

0 commit comments

Comments
 (0)