Skip to content

Commit a887571

Browse files
mdrago98Matthew Dragopatkivikramtaybin
authored
Fixed filter not acking filtered out messages. (#208)
* Fixed filter not acking filtered out messages. * Removed debug print from test. * Added Cython implementation for the filter fix. Co-authored-by: Matthew Drago <matthew.drago@gig.com> Co-authored-by: Vikram Patki <54442035+patkivikram@users.noreply.github.com> Co-authored-by: Taybin Rutkin <taybin@users.noreply.github.com>
1 parent 49574b2 commit a887571

File tree

3 files changed

+21
-2
lines changed

3 files changed

+21
-2
lines changed

faust/_cython/streams.pyx

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,8 @@ cdef class StreamIterator:
109109
object consumer
110110
consumer = self.consumer
111111
last_stream_to_ack = False
112-
if do_ack and event is not None:
112+
# if do_ack and event is not None:
113+
if event is not None and (do_ack or event.value is self._skipped_value):
113114
message = event.message
114115
if not message.acked:
115116
refcount = message.refcount

faust/streams.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1113,6 +1113,8 @@ async def _py_aiter(self) -> AsyncIterator[T_co]:
11131113
value = await _maybe_async(processor(value))
11141114
value = await on_merge(value)
11151115
except Skip:
1116+
# We want to ack the filtered message
1117+
# otherwise the lag would increase
11161118
value = skipped_value
11171119

11181120
try:
@@ -1121,7 +1123,9 @@ async def _py_aiter(self) -> AsyncIterator[T_co]:
11211123
yield value
11221124
finally:
11231125
self.current_event = None
1124-
if do_ack and event is not None:
1126+
# We want to ack the filtered out message
1127+
# otherwise the lag would increase
1128+
if event is not None and (do_ack or value is skipped_value):
11251129
# This inlines self.ack
11261130
last_stream_to_ack = event.ack()
11271131
message = event.message
@@ -1130,6 +1134,7 @@ async def _py_aiter(self) -> AsyncIterator[T_co]:
11301134
on_stream_event_out(tp, offset, self, event, sensor_state)
11311135
if last_stream_to_ack:
11321136
on_message_out(tp, offset, message)
1137+
11331138
except StopAsyncIteration:
11341139
# We are not allowed to propagate StopAsyncIteration in __aiter__
11351140
# (if we do, it'll be converted to RuntimeError by CPython).

tests/functional/test_streams.py

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -222,6 +222,19 @@ async def myfilter(value):
222222
assert i == 3
223223

224224

225+
@pytest.mark.asyncio
226+
async def test_stream_filter_acks_filtered_out_messages(app, event_loop):
227+
"""
228+
Test the filter function acknowledges the filtered out
229+
messages regardless of the ack setting.
230+
"""
231+
values = [1000, 3000, 99, 5000, 3, 9999]
232+
async with app.stream(values).filter(lambda x: x > 1000) as stream:
233+
async for event in stream.events():
234+
assert event.value > 1000
235+
assert len(app.consumer.unacked) == 0
236+
237+
225238
@pytest.mark.asyncio
226239
async def test_events(app):
227240
async with new_stream(app) as stream:

0 commit comments

Comments
 (0)