feat(stream): add support of the CLAIM option for the XREADGROUP command #3278
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
https://redis.io/docs/latest/commands/xreadgroup/

feat(stream): Add support for XREADGROUP CLAIM option
English / 中文
English
Overview
This PR implements the
CLAIMoption for theXREADGROUPcommand, enabling consumers to claim idle pending messages from other consumers in a consumer group. This feature is essential for handling failed or slow consumers in distributed stream processing scenarios.Motivation
In distributed stream processing with consumer groups, consumers may fail or become slow, causing messages to remain in their Pending Entries List (PEL) indefinitely. The
CLAIMoption allows other consumers to take over these idle messages, improving fault tolerance and ensuring timely message processing.Changes
Core Implementation
Command Parsing (
src/commands/cmd_stream.cc)CLAIM min-idle-timeparameter parsing with validationAddStreamEntriesToResponsefor conditional formattingStream Processing (
src/types/redis_stream.cc)Stream::RangeWithPendingexclude_startoptionData Structures (
src/types/redis_stream_base.h,src/types/redis_stream.h)StreamEntrywithidle_msanddelivery_countfieldsStreamReadGroupReadOptionsstruct for CLAIM parametersRangeWithPendingsignatureKey Features
[id, fields, idle_ms, delivery_count]Test Coverage
Comprehensive test suite in
tests/gocase/unit/type/stream/xreadgroup_test.go:All tests passing ✅
Usage Example
Redis Specification Compliance
This implementation follows the Redis protocol specification:
>(latest) ID and NOACK optionTesting
Performance
中文
概述
本PR为
XREADGROUP命令实现了CLAIM选项,使消费者能够从消费者组中的其他消费者那里认领空闲的待处理消息。此功能对于处理分布式流处理场景中失败或缓慢的消费者至关重要。动机
在使用消费者组进行分布式流处理时,消费者可能会失败或变慢,导致消息无限期地保留在其待处理条目列表(PEL)中。
CLAIM选项允许其他消费者接管这些空闲消息,提高容错性并确保及时处理消息。更改内容
核心实现
命令解析 (
src/commands/cmd_stream.cc)CLAIM min-idle-time参数解析和验证AddStreamEntriesToResponse以支持条件格式化流处理 (
src/types/redis_stream.cc)Stream::RangeWithPending中实现了完整的认领逻辑exclude_start选项数据结构 (
src/types/redis_stream_base.h,src/types/redis_stream.h)StreamEntry增加idle_ms和delivery_count字段StreamReadGroupReadOptions结构体用于传递CLAIM参数RangeWithPending函数签名关键特性
[id, fields, idle_ms, delivery_count]测试覆盖
tests/gocase/unit/type/stream/xreadgroup_test.go中的完整测试套件:所有测试通过 ✅
使用示例
Redis规范合规性
此实现遵循Redis协议规范:
>(latest) ID和NOACK选项兼容测试
性能