Skip to content

Commit 35d6885

Browse files
ekerstensEric Kerstens
andauthored
Support waiting for fetcher to finish (#263)
Co-authored-by: Eric Kerstens <ekerstens@expediagroup.com>
1 parent e963706 commit 35d6885

File tree

2 files changed

+43
-23
lines changed

2 files changed

+43
-23
lines changed

faust/transport/consumer.py

Lines changed: 43 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -436,6 +436,7 @@ class Consumer(Service, ConsumerT):
436436
flow_active: bool = True
437437
can_resume_flow: Event
438438
suspend_flow: Event
439+
not_waiting_next_records: Event
439440

440441
def __init__(
441442
self,
@@ -477,6 +478,8 @@ def __init__(
477478
self.randomly_assigned_topics = set()
478479
self.can_resume_flow = Event()
479480
self.suspend_flow = Event()
481+
self.not_waiting_next_records = Event()
482+
self.not_waiting_next_records.set()
480483
self._reset_state()
481484
super().__init__(loop=loop or self.transport.loop, **kwargs)
482485
self.transactions = self.transport.create_transaction_manager(
@@ -500,6 +503,7 @@ def _reset_state(self) -> None:
500503
self._buffered_partitions = set()
501504
self.can_resume_flow.clear()
502505
self.suspend_flow.clear()
506+
self.not_waiting_next_records.set()
503507
self.flow_active = True
504508
self._time_start = monotonic()
505509

@@ -587,6 +591,18 @@ def resume_flow(self) -> None:
587591
self.can_resume_flow.set()
588592
self.suspend_flow.clear()
589593

594+
async def wait_for_stopped_flow(self) -> None:
595+
"""Wait until the consumer is not waiting on any newly fetched records.
596+
597+
Useful for scenarios where the consumer needs to be stopped to change the
598+
position of the fetcher to something other than the committed offset. There is a
599+
chance that getmany forces a seek to the committed offsets if the fetcher
600+
returns while the consumer is stopped. This can be prevented by waiting for the
601+
fetcher to finish (by default every second).
602+
"""
603+
if not self.not_waiting_next_records.is_set():
604+
await self.not_waiting_next_records.wait()
605+
590606
def pause_partitions(self, tps: Iterable[TP]) -> None:
591607
"""Pause fetching from partitions."""
592608
tpset = ensure_TPset(tps)
@@ -745,28 +761,33 @@ async def _wait_next_records(
745761
if not self.flow_active:
746762
await self.wait(self.can_resume_flow)
747763

748-
# Implementation for the Fetcher service.
749-
is_client_only = self.app.client_only
750-
751-
active_partitions: Optional[Set[TP]]
752-
if is_client_only:
753-
active_partitions = None
754-
else:
755-
active_partitions = self._get_active_partitions()
756-
757-
records: RecordMap = {}
758-
if is_client_only or active_partitions:
759-
# Fetch records only if active partitions to avoid the risk of
760-
# fetching all partitions in the beginning when none of the
761-
# partitions is paused/resumed.
762-
records = await self._getmany(
763-
active_partitions=active_partitions,
764-
timeout=timeout,
765-
)
766-
else:
767-
# We should still release to the event loop
768-
await self.sleep(1)
769-
return records, active_partitions
764+
try:
765+
# Set signal that _wait_next_records is waiting on the fetcher service.
766+
self.not_waiting_next_records.set()
767+
# Implementation for the Fetcher service.
768+
is_client_only = self.app.client_only
769+
770+
active_partitions: Optional[Set[TP]]
771+
if is_client_only:
772+
active_partitions = None
773+
else:
774+
active_partitions = self._get_active_partitions()
775+
776+
records: RecordMap = {}
777+
if is_client_only or active_partitions:
778+
# Fetch records only if active partitions to avoid the risk of
779+
# fetching all partitions in the beginning when none of the
780+
# partitions is paused/resumed.
781+
records = await self._getmany(
782+
active_partitions=active_partitions,
783+
timeout=timeout,
784+
)
785+
else:
786+
# We should still release to the event loop
787+
await self.sleep(1)
788+
return records, active_partitions
789+
finally:
790+
self.not_waiting_next_records.set()
770791

771792
@abc.abstractmethod
772793
def _to_message(self, tp: TP, record: Any) -> ConsumerMessage:

tests/unit/transport/test_consumer.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -544,7 +544,6 @@ def to_message(tp, record):
544544
assert not consumer.should_stop
545545
consumer.flow_active = False
546546
consumer.can_resume_flow.set()
547-
# Test is hanging here
548547
assert [a async for a in consumer.getmany(1.0)] == []
549548
assert not consumer.should_stop
550549
consumer.flow_active = True

0 commit comments

Comments
 (0)