Skip to content

Commit 2d8bdb9

Browse files
ekerstensEric Kerstens
andauthored
Stop fetching when flow stops (#253)
* Stop fetching when flow stops * Stop fetching when flow stops * Change order to keep interface the same and fix test cases * Fix test case * try/finally for not_waiting_next_records Co-authored-by: Eric Kerstens <ekerstens@expediagroup.com>
1 parent a0e9a31 commit 2d8bdb9

File tree

4 files changed

+53
-26
lines changed

4 files changed

+53
-26
lines changed

faust/app/base.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1681,6 +1681,9 @@ async def _on_partitions_revoked(self, revoked: Set[TP]) -> None:
16811681
T(self.flow_control.suspend)()
16821682
on_timeout.info("consumer.pause_partitions")
16831683
T(consumer.pause_partitions)(assignment)
1684+
on_timeout.info("consumer.wait_for_stopped_flow")
1685+
await T(consumer.wait_for_stopped_flow)()
1686+
16841687
# Every agent instance has an incoming buffer of messages
16851688
# (a asyncio.Queue) -- we clear those to make sure
16861689
# agents will not start processing them.

faust/tables/recovery.py

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -309,21 +309,21 @@ async def _resume_streams(self, generation_id: int = 0) -> None:
309309
self.log.warning("Recovery rebalancing again")
310310
return
311311
if assignment:
312+
self.log.dev("Resume stream partitions")
312313
consumer.resume_partitions(
313314
{tp for tp in assignment if not self._is_changelog_tp(tp)}
314315
)
315316
self.log.info("Seek stream partitions to committed offsets.")
316317
await self._wait(
317318
consumer.perform_seek(), timeout=self.app.conf.broker_request_timeout
318319
)
319-
self.log.dev("Resume stream partitions")
320320
else:
321321
self.log.info("Resuming streams with empty assignment")
322322
self.completed.set()
323323
# Resume partitions and start fetching.
324324
self.log.info("Resuming flow...")
325-
consumer.resume_flow()
326325
app.flow_control.resume()
326+
consumer.resume_flow()
327327
# finally make sure the fetcher is running.
328328
await cast(_App, app)._fetcher.maybe_start()
329329
self.tables.on_actives_ready()
@@ -440,9 +440,9 @@ async def _restart_recovery(self) -> None:
440440
T(consumer.resume_partitions)(active_tps)
441441
# Resume partitions and start fetching.
442442
self.log.info("Resuming flow...")
443+
T(self.app.flow_control.resume)()
443444
T(consumer.resume_flow)()
444445
await T(cast(_App, self.app)._fetcher.maybe_start)()
445-
T(self.app.flow_control.resume)()
446446

447447
# Wait for actives to be up to date.
448448
# This signal will be set by _slurp_changelogs
@@ -467,8 +467,8 @@ async def _restart_recovery(self) -> None:
467467
T(consumer.pause_partitions)(active_tps)
468468
else:
469469
self.log.info("Resuming flow...")
470-
T(consumer.resume_flow)()
471470
T(self.app.flow_control.resume)()
471+
T(consumer.resume_flow)()
472472
self._set_recovery_ended()
473473
self.log.info("Recovery complete")
474474
if span:
@@ -519,8 +519,8 @@ async def _restart_recovery(self) -> None:
519519
self.app._span_add_default_tags(span)
520520
self.log.dev("Resume standby partitions")
521521
T(consumer.resume_partitions)(standby_tps)
522-
T(consumer.resume_flow)()
523522
T(self.app.flow_control.resume)()
523+
T(consumer.resume_flow)()
524524

525525
# Pause all our topic partitions,
526526
# to make sure we don't fetch any more records from them.
@@ -625,8 +625,8 @@ async def on_recovery_completed(self, generation_id: int = 0) -> None:
625625
)
626626
self.completed.set()
627627
self.log.dev("Resume stream partitions")
628-
consumer.resume_flow()
629628
self.app.flow_control.resume()
629+
consumer.resume_flow()
630630
# finally make sure the fetcher is running.
631631
await cast(_App, self.app)._fetcher.maybe_start()
632632
self.tables.on_actives_ready()

faust/transport/consumer.py

Lines changed: 43 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -436,6 +436,7 @@ class Consumer(Service, ConsumerT):
436436
flow_active: bool = True
437437
can_resume_flow: Event
438438
suspend_flow: Event
439+
not_waiting_next_records: Event
439440

440441
def __init__(
441442
self,
@@ -477,6 +478,8 @@ def __init__(
477478
self.randomly_assigned_topics = set()
478479
self.can_resume_flow = Event()
479480
self.suspend_flow = Event()
481+
self.not_waiting_next_records = Event()
482+
self.not_waiting_next_records.set()
480483
self._reset_state()
481484
super().__init__(loop=loop or self.transport.loop, **kwargs)
482485
self.transactions = self.transport.create_transaction_manager(
@@ -500,6 +503,7 @@ def _reset_state(self) -> None:
500503
self._buffered_partitions = set()
501504
self.can_resume_flow.clear()
502505
self.suspend_flow.clear()
506+
self.not_waiting_next_records.set()
503507
self.flow_active = True
504508
self._time_start = monotonic()
505509

@@ -581,6 +585,11 @@ def stop_flow(self) -> None:
581585
self.can_resume_flow.clear()
582586
self.suspend_flow.set()
583587

588+
async def wait_for_stopped_flow(self) -> None:
589+
"""Wait until the consumer is not waiting on any newly fetched records."""
590+
if not self.not_waiting_next_records.is_set():
591+
await self.not_waiting_next_records.wait()
592+
584593
def resume_flow(self) -> None:
585594
"""Allow consumer to process messages."""
586595
self.flow_active = True
@@ -704,6 +713,7 @@ async def getmany(self, timeout: float) -> AsyncIterator[Tuple[TP, Message]]:
704713
# has 1 partition, then t2 will end up being starved most of the time.
705714
#
706715
# We solve this by going round-robin through each topic.
716+
707717
records, active_partitions = await self._wait_next_records(timeout)
708718
generation_id = self.app.consumer_generation_id
709719
if records is None or self.should_stop:
@@ -739,28 +749,41 @@ async def _wait_next_records(
739749
if not self.flow_active:
740750
await self.wait(self.can_resume_flow)
741751
# Implementation for the Fetcher service.
752+
try:
753+
self.not_waiting_next_records.clear()
742754

743-
is_client_only = self.app.client_only
744-
745-
active_partitions: Optional[Set[TP]]
746-
if is_client_only:
747-
active_partitions = None
748-
else:
749-
active_partitions = self._get_active_partitions()
755+
is_client_only = self.app.client_only
750756

751-
records: RecordMap = {}
752-
if is_client_only or active_partitions:
753-
# Fetch records only if active partitions to avoid the risk of
754-
# fetching all partitions in the beginning when none of the
755-
# partitions is paused/resumed.
756-
records = await self._getmany(
757-
active_partitions=active_partitions,
758-
timeout=timeout,
759-
)
760-
else:
761-
# We should still release to the event loop
762-
await self.sleep(1)
763-
return records, active_partitions
757+
active_partitions: Optional[Set[TP]]
758+
if is_client_only:
759+
active_partitions = None
760+
else:
761+
active_partitions = self._get_active_partitions()
762+
763+
records: RecordMap = {}
764+
if is_client_only or active_partitions:
765+
# Fetch records only if active partitions to avoid the risk of
766+
# fetching all partitions in the beginning when none of the
767+
# partitions is paused/resumed.
768+
suspend_flow = self.suspend_flow.wait()
769+
getmany = self._getmany(
770+
active_partitions=active_partitions,
771+
timeout=timeout,
772+
)
773+
wait_results = await self.wait_first(getmany, suspend_flow)
774+
for coro, result in zip(wait_results.done, wait_results.results):
775+
# Ignore records fetched while flow was suspended
776+
if coro is suspend_flow:
777+
records = {}
778+
break
779+
if coro is getmany:
780+
records = result
781+
else:
782+
# We should still release to the event loop
783+
await self.sleep(1)
784+
return records, active_partitions
785+
finally:
786+
self.not_waiting_next_records.set()
764787

765788
@abc.abstractmethod
766789
def _to_message(self, tp: TP, record: Any) -> ConsumerMessage:

tests/unit/app/test_base.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -299,6 +299,7 @@ async def test_on_partitions_revoked(self, *, app):
299299
transactions=Mock(
300300
on_partitions_revoked=AsyncMock(),
301301
),
302+
wait_for_stopped_flow=AsyncMock(),
302303
)
303304
app.tables = Mock()
304305
app.flow_control = Mock()

0 commit comments

Comments
 (0)