GH-3562: Add AckMode.RECORD_FILTERED#4108
GH-3562: Add AckMode.RECORD_FILTERED#4108Chaedie wants to merge 12 commits intospring-projects:mainfrom
Conversation
Signed-off-by: Chaedong Im <chaedong.im.dev@gmail.com>
Signed-off-by: Chaedong Im <chaedong.im.dev@gmail.com>
Signed-off-by: Chaedong Im <chaedong.im.dev@gmail.com>
Signed-off-by: Chaedong Im <chaedong.im.dev@gmail.com>
Signed-off-by: Chaedong Im <chaedong.im.dev@gmail.com>
Signed-off-by: Chaedong Im <chaedong.im.dev@gmail.com>
Signed-off-by: Chaedong Im <chaedong.im.dev@gmail.com>
dc6bc25 to
c0bb203
Compare
Signed-off-by: Chaedong Im <chaedong.im.dev@gmail.com>
sobychacko
left a comment
There was a problem hiding this comment.
You don't need to use public modifier on test classes or methods any longer.
| * @author Chaedong Im | ||
| * @see AckModeRecordWithFilteringTest | ||
| */ | ||
| public class AckModeRecordFilteredTest { |
There was a problem hiding this comment.
You can drop public modifier from tests.
There was a problem hiding this comment.
I removed it. Thanks 😊
|
|
||
| @SuppressWarnings({"unchecked", "deprecation"}) | ||
| @Test | ||
| public void testRecordFilteredModeOnlyCommitsProcessedRecords() throws InterruptedException { |
There was a problem hiding this comment.
No public modifier needed.
There was a problem hiding this comment.
I removed it. Thanks 😊
|
|
||
| @SuppressWarnings({"unchecked", "deprecation"}) | ||
| @Test | ||
| public void testRecordFilteredModeWithAllRecordsFiltered() throws InterruptedException { |
There was a problem hiding this comment.
no public modifier needed.
There was a problem hiding this comment.
I removed it. Thanks 😊
| * @author Chaedong Im | ||
| * @see AckModeRecordFilteredTest | ||
| */ | ||
| public class AckModeRecordWithFilteringTest { |
There was a problem hiding this comment.
No public modifier needed.
There was a problem hiding this comment.
I removed it. Thanks 😊
| */ | ||
| RECORD, | ||
|
|
||
| /** |
There was a problem hiding this comment.
need to add author tag to the class.
There was a problem hiding this comment.
Oh, I missed this class. I added author tag. Thanks 😊
Signed-off-by: Chaedong Im <chaedong.im.dev@gmail.com>
…MessageListenerAdapter Signed-off-by: Chaedong Im <chaedong.im.dev@gmail.com>
Signed-off-by: Chaedong Im <chaedong.im.dev@gmail.com>
Issue: #3562
Summary
Add a new AckMode
AckMode.RECORD_FILTERED.It will commits offsets only for records that are not filtered by
RecordFilterStrategy.Problem
When RecordFilterStrategy is used with
AckMode.RECORD, the container commits the offset for every record, including those filtered out (never delivered to the listener).This causes unnecessary synchronous commits and can advance offsets for records that were effectively skipped.
Implementation
ContainerProperties.AckMode.RECORD_FILTEREDKafkaMessageListenerContainer: per-record commit path skips filtered recordsFilteringAware+FilteringMessageListenerAdapterto expose filter resultsTests
I wasn’t fully confident about my code, so I hesitated to open this PR for a few weeks. 😅
Please feel free to review it — thank you!