Skip to content

Commit a44fb6b

Browse files
nicor88patkivikram
andauthored
Take with timestamp (#242)
* add full_take * remove processor remove * try to add kafka_timestamp * tweaks * add * add extra param * add offset * remove offset in this pr * fix formatting * add functional tests * add another unit test and edge case handling * change buffer type... Co-authored-by: Vikram Patki <54442035+patkivikram@users.noreply.github.com>
1 parent 8d8971a commit a44fb6b

File tree

2 files changed

+216
-0
lines changed

2 files changed

+216
-0
lines changed

faust/streams.py

Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -392,6 +392,105 @@ async def add_to_buffer(value: T) -> T:
392392
self.enable_acks = stream_enable_acks
393393
self._processors.remove(add_to_buffer)
394394

395+
async def take_with_timestamp(
396+
self, max_: int, within: Seconds, timestamp_field_name: str
397+
) -> AsyncIterable[Sequence[T_co]]:
398+
"""Buffer n values at a time and yield a list of buffered values with the timestamp
399+
when the message was added to kafka.
400+
401+
Arguments:
402+
max_: Max number of messages to receive. When more than this
403+
number of messages are received within the specified number of
404+
seconds then we flush the buffer immediately.
405+
within: Timeout for when we give up waiting for another value,
406+
and process the values we have.
407+
Warning: If there's no timeout (i.e. `timeout=None`),
408+
the agent is likely to stall and block buffered events for an
409+
unreasonable length of time(!).
410+
timestamp_field_name: the name of the field containing kafka timestamp,
411+
that is going to be added to the value
412+
"""
413+
buffer: List[T_co] = []
414+
events: List[EventT] = []
415+
buffer_add = buffer.append
416+
event_add = events.append
417+
buffer_size = buffer.__len__
418+
buffer_full = asyncio.Event(loop=self.loop)
419+
buffer_consumed = asyncio.Event(loop=self.loop)
420+
timeout = want_seconds(within) if within else None
421+
stream_enable_acks: bool = self.enable_acks
422+
423+
buffer_consuming: Optional[asyncio.Future] = None
424+
425+
channel_it = aiter(self.channel)
426+
427+
# We add this processor to populate the buffer, and the stream
428+
# is passively consumed in the background (enable_passive below).
429+
async def add_to_buffer(value: T) -> T:
430+
try:
431+
# buffer_consuming is set when consuming buffer after timeout.
432+
nonlocal buffer_consuming
433+
if buffer_consuming is not None:
434+
try:
435+
await buffer_consuming
436+
finally:
437+
buffer_consuming = None
438+
event = self.current_event
439+
if isinstance(value, dict) and timestamp_field_name:
440+
value[timestamp_field_name] = event.message.timestamp
441+
buffer_add(value)
442+
if event is None:
443+
raise RuntimeError("Take buffer found current_event is None")
444+
event_add(event)
445+
if buffer_size() >= max_:
446+
# signal that the buffer is full and should be emptied.
447+
buffer_full.set()
448+
# strict wait for buffer to be consumed after buffer full.
449+
# If max is 1000, we are not allowed to return 1001 values.
450+
buffer_consumed.clear()
451+
await self.wait(buffer_consumed)
452+
except CancelledError: # pragma: no cover
453+
raise
454+
except Exception as exc:
455+
self.log.exception("Error adding to take buffer: %r", exc)
456+
await self.crash(exc)
457+
return value
458+
459+
# Disable acks to ensure this method acks manually
460+
# events only after they are consumed by the user
461+
self.enable_acks = False
462+
463+
self.add_processor(add_to_buffer)
464+
self._enable_passive(cast(ChannelT, channel_it))
465+
try:
466+
while not self.should_stop:
467+
# wait until buffer full, or timeout
468+
await self.wait_for_stopped(buffer_full, timeout=timeout)
469+
if buffer:
470+
# make sure background thread does not add new items to
471+
# buffer while we read.
472+
buffer_consuming = self.loop.create_future()
473+
try:
474+
yield list(buffer)
475+
finally:
476+
buffer.clear()
477+
for event in events:
478+
await self.ack(event)
479+
events.clear()
480+
# allow writing to buffer again
481+
notify(buffer_consuming)
482+
buffer_full.clear()
483+
buffer_consumed.set()
484+
else: # pragma: no cover
485+
pass
486+
else: # pragma: no cover
487+
pass
488+
489+
finally:
490+
# Restore last behaviour of "enable_acks"
491+
self.enable_acks = stream_enable_acks
492+
self._processors.remove(add_to_buffer)
493+
395494
def enumerate(self, start: int = 0) -> AsyncIterable[Tuple[int, T_co]]:
396495
"""Enumerate values received on this stream.
397496

tests/functional/test_streams.py

Lines changed: 117 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -777,3 +777,120 @@ def current_event(self, event):
777777
assert isinstance(s._crash_reason, RuntimeError)
778778
print("RETURNING")
779779
assert s.enable_acks is True
780+
781+
782+
@pytest.mark.asyncio
783+
async def test_take_wit_timestamp(app):
784+
async with new_stream(app) as s:
785+
assert s.enable_acks is True
786+
await s.channel.send(value={"id": 1})
787+
event = None
788+
async for value in s.take_with_timestamp(
789+
1, within=1, timestamp_field_name="test_timestamp"
790+
):
791+
assert "test_timestamp" in value[0].keys()
792+
assert isinstance(value[0]["test_timestamp"], float)
793+
assert s.enable_acks is False
794+
event = mock_stream_event_ack(s)
795+
break
796+
797+
assert event
798+
# need one sleep on Python 3.6.0-3.6.6 + 3.7.0
799+
# need two sleeps on Python 3.6.7 + 3.7.1 :-/
800+
await asyncio.sleep(0)
801+
await asyncio.sleep(0)
802+
803+
if not event.ack.called:
804+
assert event.message.acked
805+
assert not event.message.refcount
806+
assert s.enable_acks is True
807+
808+
809+
@pytest.mark.asyncio
810+
async def test_take_wit_timestamp_wit_simple_value(app):
811+
async with new_stream(app) as s:
812+
assert s.enable_acks is True
813+
await s.channel.send(value=1)
814+
event = None
815+
async for value in s.take_with_timestamp(
816+
1, within=1, timestamp_field_name="test_timestamp"
817+
):
818+
assert value == [1]
819+
assert s.enable_acks is False
820+
event = mock_stream_event_ack(s)
821+
break
822+
823+
assert event
824+
# need one sleep on Python 3.6.0-3.6.6 + 3.7.0
825+
# need two sleeps on Python 3.6.7 + 3.7.1 :-/
826+
await asyncio.sleep(0)
827+
await asyncio.sleep(0)
828+
829+
if not event.ack.called:
830+
assert event.message.acked
831+
assert not event.message.refcount
832+
assert s.enable_acks is True
833+
834+
835+
@pytest.mark.asyncio
836+
async def test_take_wit_timestamp_without_timestamp_field(app):
837+
async with new_stream(app) as s:
838+
assert s.enable_acks is True
839+
await s.channel.send(value=1)
840+
event = None
841+
async for value in s.take_with_timestamp(
842+
1, within=1, timestamp_field_name=None
843+
):
844+
assert value == [1]
845+
assert s.enable_acks is False
846+
event = mock_stream_event_ack(s)
847+
break
848+
849+
assert event
850+
# need one sleep on Python 3.6.0-3.6.6 + 3.7.0
851+
# need two sleeps on Python 3.6.7 + 3.7.1 :-/
852+
await asyncio.sleep(0)
853+
await asyncio.sleep(0)
854+
855+
if not event.ack.called:
856+
assert event.message.acked
857+
assert not event.message.refcount
858+
assert s.enable_acks is True
859+
860+
861+
@pytest.mark.asyncio
862+
async def test_take_wit_timestamp__5(app, loop):
863+
s = new_stream(app)
864+
async with s:
865+
assert s.enable_acks is True
866+
for i in range(5):
867+
await s.channel.send(value={"id": i})
868+
869+
event = None
870+
buffer_processor = s.take_with_timestamp(
871+
5, within=10.0, timestamp_field_name="test_timestamp"
872+
)
873+
async for batch in buffer_processor:
874+
assert len(batch) == 5
875+
assert all("test_timestamp" in _m.keys() for _m in batch)
876+
assert s.enable_acks is False
877+
878+
event = mock_stream_event_ack(s)
879+
break
880+
881+
try:
882+
await buffer_processor.athrow(asyncio.CancelledError())
883+
except asyncio.CancelledError:
884+
pass
885+
886+
assert event
887+
# need one sleep on Python 3.6.0-3.6.6 + 3.7.0
888+
# need two sleeps on Python 3.6.7 + 3.7.1 :-/
889+
await asyncio.sleep(0) # needed for some reason
890+
await asyncio.sleep(0) # needed for some reason
891+
await asyncio.sleep(0) # needed for some reason
892+
893+
if not event.ack.called:
894+
assert event.message.acked
895+
assert not event.message.refcount
896+
assert s.enable_acks is True

0 commit comments

Comments
 (0)