-
Notifications
You must be signed in to change notification settings - Fork 2.5k
Implement Request and Response Policy Based Routing in Cluster Mode #3422
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Implement Request and Response Policy Based Routing in Cluster Mode #3422
Conversation
* Add search module builders and tests * Add tests
Co-authored-by: Nedyalko Dyakov <[email protected]>
Co-authored-by: Nedyalko Dyakov <[email protected]>
Co-authored-by: Nedyalko Dyakov <[email protected]>
| } | ||
| if result.cmd != nil && result.err == nil { | ||
| // For MGET, extract individual values from the array result | ||
| if strings.ToLower(cmd.Name()) == "mget" { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we actually need this special case?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cc @ofekshenawa
6e3b627 to
1b2eaa6
Compare
ndyakov
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Submitting partial review for the aggregators.
| // For MGET without policy, use keyed aggregator | ||
| if cmdName == "mget" { | ||
| return routing.NewDefaultAggregator(true) | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since we are passing the cmd.Name() in routing.NewResponseAggregator this can be handler by it. If policy is nil for mget, maybe the NewResponseAggregator can accept a policy and check the nil as well`.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ndyakov
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Submitting another partial review.
ndyakov
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Final part of initial review
Overview:
- Let's use atomics when possible.
- Left questions related to the node selection and setting of values.
Overall the design of the solution looks good, would have to do an additional pass over the test files once this review is addressed.
Thank you both @ofekshenawa and @htemelski-redis!
osscluster_router.go
Outdated
| if c.hasKeys(cmd) { | ||
| // execute on key based shard | ||
| return node.Client.Process(ctx, cmd) | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we know that this node servers the slot for the key?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, the node should've been selected based on the slot osscluster.go:L1906
func (c *ClusterClient) cmdNode(
osscluster_router.go
Outdated
| // execute on key based shard | ||
| return node.Client.Process(ctx, cmd) | ||
| } | ||
| return c.executeOnArbitraryShard(ctx, cmd) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
since it doesn't matter and there is already some node selected, why not use it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We have two different ways of picking an arbitrary shard, either round robin or a random one
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I understand that, but for some reason there is already a node selected here that may have been selected because MOVED OR normal key based selection. Why do we have to reselect the node? Shouldn't this selection of arbitrary node be done outside, so we do the node selection only one time and the node on line #52 is the one that should be used for this command?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The re-selection is required so keyless commands respect the configured ShardPicker strategy.
cmdNode() picks nodes by hash slot, which is correct for keyed commands but ignores the picker for keyless ones. Calling executeOnArbitraryShard() ensures keyless commands use the user’s picker
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But if the command is keyless, what will be the node here? Won't it be picked based on the ShardPolicy? Will it be just a random node? Can't we pick the correct node there?
727a799 to
14bd6e1
Compare
ndyakov
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
left some comments related to aggregators
ndyakov
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The aggregators look good, there are some prints left in the code as bunch of unanswered questions. Let's resolve them before merging this. cc @ofekshenawa , @htemelski-redis
| // AggMaxAggregator returns the maximum numeric value from all shards. | ||
| type AggMaxAggregator struct { | ||
| err atomic.Value | ||
| res *util.AtomicMax | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
general question, are those min,max aggregators only for ints?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's a good question, the initial implementation was for ints only, but looking at the docs, we should support any numerical types
| // For MGET without policy, use keyed aggregator | ||
| if cmdName == "mget" { | ||
| return routing.NewDefaultAggregator(true) | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
|
|
||
| defer func() { | ||
| if r := recover(); r != nil { | ||
| cmd.SetErr(fmt.Errorf("redis: failed to set command value: %v", r)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why don't we return the error as well? it will return nil, but the err will be set on the cmd.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure, how good of a practice is to modify the return value from within recover
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it is possible exactly for scenarios like this one.
a4ac8df to
7181bcc
Compare
* feat: load the policy table in cluster client * Remove comments
07963c2 to
cd74db0
Compare
Co-authored-by: Nedyalko Dyakov <[email protected]>
Co-authored-by: Nedyalko Dyakov <[email protected]>
… load-balance-search-commands-to-shards
…ub.com/ofekshenawa/go-redis into load-balance-search-commands-to-shards
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR adds support for explicit routing policies in Redis Cluster operations by introducing a command policy resolver system. The changes enable commands to be routed to specific shards based on their requirements (all shards, multiple shards, single shard) and aggregate responses accordingly.
- Implements a comprehensive routing and aggregation framework for cluster commands
- Adds Clone() methods to various command types to support concurrent execution
- Introduces configurable shard picker strategies (round-robin, random, static)
- Adds extensive test coverage for routing policies and command aggregation
Reviewed Changes
Copilot reviewed 30 out of 46 changed files in this pull request and generated 2 comments.
Show a summary per file
| File | Description |
|---|---|
| osscluster_router.go | New file implementing command routing logic and response aggregation |
| osscluster.go | Integration of command policy resolver and routing infrastructure |
| command_policy_resolver.go | New resolver for determining command routing policies |
| internal/routing/*.go | Core routing policy types and aggregation implementations |
| internal/util/atomic_*.go | Atomic min/max utilities for response aggregation |
| *_commands.go | Added cmdType fields and Clone() methods to command structs |
| osscluster_test.go | Extensive test coverage for routing policies and aggregation |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| } | ||
|
|
||
| func (c *ClusterClient) cmdSlot(cmd Cmder, preferredRandomSlot int) int { | ||
| func (c *ClusterClient) cmdSlot(cmd Cmder, prefferedSlot int) int { |
Copilot
AI
Nov 9, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Corrected spelling of 'preffered' to 'preferred'.
| } | ||
|
|
||
| return cmdSlot(cmd, cmdFirstKeyPos(cmd), preferredRandomSlot) | ||
| return cmdSlot(cmd, cmdFirstKeyPos(cmd), prefferedSlot) |
Copilot
AI
Nov 9, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Corrected spelling of 'preffered' to 'preferred'.
This PR introduces support for Redis COMMAND-based request_policy and response_policy routing for Redis commands when used in OSS Cluster client.
Key Additions:
Command Policy Loader: Parses and caches COMMAND metadata with routing/aggregation tips on first use.
Routing Engine Enhancements:
Implements support for all request policies: default(keyless), default(hashslot), all_shards, all_nodes, multi_shard, and special.
Response Aggregator: Combines multi-shard replies based on response_policy:
all_succeeded, one_succeeded, agg_sum, special, etc.
Includes custom handling for special commands like FT.CURSOR.
Raw Command Support: Policies are enforced on Client.Do(ctx, args...).