Skip to content

Commit 00adae7

Browse files
s3563602123LeohuangLeoekerstens
authored
Faust commits the wrong offset in case of a gap in acks #312 (#313)
* Faust commits the wrong offset in case of a gap in acks #312 Faust commits the wrong offset in case of a gap in acks #312 Co-Authored-By: ekerstens <49325583+ekerstens@users.noreply.github.com> * kljlk jhkjhj * Revert "kljlk" This reverts commit 8b487fc. * update fix #313 Co-Authored-By: ekerstens <49325583+ekerstens@users.noreply.github.com> * update test case Co-Authored-By: ekerstens <49325583+ekerstens@users.noreply.github.com> Co-Authored-By: Leo <47164895+LeohuangLeo@users.noreply.github.com> Co-authored-by: Leo Huang <s3563602123@gmail.com> Co-authored-by: ekerstens <49325583+ekerstens@users.noreply.github.com> Co-authored-by: Leo <47164895+LeohuangLeo@users.noreply.github.com>
1 parent 26ff8fc commit 00adae7

File tree

2 files changed

+16
-0
lines changed

2 files changed

+16
-0
lines changed

faust/transport/consumer.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1097,6 +1097,18 @@ def _new_offset(self, tp: TP) -> Optional[int]:
10971097
acked.extend(gaps)
10981098
gap_for_tp[:gap_index] = []
10991099
acked.sort()
1100+
1101+
# We iterate over it until we handle gap in the head of acked queue
1102+
# then return the previous committed offset.
1103+
# For example if acked[tp] is:
1104+
# 34 35 36 37
1105+
# ^-- gap
1106+
# self._committed_offset[tp] is 31
1107+
# the return value will be None (the same as 31)
1108+
if self._committed_offset[tp]:
1109+
if min(acked) - self._committed_offset[tp] > 0:
1110+
return None
1111+
11001112
# Note: acked is always kept sorted.
11011113
# find first list of consecutive numbers
11021114
batch = next(consecutive_numbers(acked))

tests/unit/transport/test_consumer.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1079,9 +1079,13 @@ def test_new_offset(self, tp, acked, expected_offset, expected_acked, *, consume
10791079
(TP1, [1, 2, 3, 4, 5, 6, 7, 8, 10], [9], 11),
10801080
(TP1, [1, 2, 3, 4, 6, 7, 8, 10], [5], 9),
10811081
(TP1, [1, 3, 4, 6, 7, 8, 10], [2, 5, 9], 11),
1082+
(TP1, [3, 4], [], None),
1083+
(TP1, [3, 4], [2], None),
1084+
(TP1, [3, 4], [1, 2], 5),
10821085
],
10831086
)
10841087
def test_new_offset_with_gaps(self, tp, acked, gaps, expected_offset, *, consumer):
1088+
consumer._committed_offset[tp] = 1
10851089
consumer._acked[tp] = acked
10861090
consumer._gap[tp] = gaps
10871091
assert consumer._new_offset(tp) == expected_offset

0 commit comments

Comments
 (0)