-
Notifications
You must be signed in to change notification settings - Fork 591
perf(cluster): use multithreads to optimize sendSnapshotByRawKV #3299
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: unstable
Are you sure you want to change the base?
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
This PR introduces multi-threaded parallelism to optimize slot migration snapshot transmission in Kvrocks cluster operations. The implementation allows configuring the degree of parallelism through a new configuration parameter, defaulting to the number of CPU cores when set to 0.
Key Changes:
- Added
migrate-slots-send-snapshots-parallelismconfiguration parameter to control parallel snapshot sending - Refactored
sendSnapshotByRawKV()to distribute slot ranges across multiple threads - Changed rate limiting from per-BatchSender to a shared global RateLimiter for coordinated throughput control
Reviewed changes
Copilot reviewed 7 out of 7 changed files in this pull request and generated 17 comments.
Show a summary per file
| File | Description |
|---|---|
| src/config/config.h | Adds new configuration field for migration snapshot parallelism |
| src/config/config.cc | Implements configuration field with auto-detection of hardware concurrency when set to 0 |
| src/cluster/slot_migrate.h | Adds parallelism setter, connection creation method, slot range migration method, and global rate limiter |
| src/cluster/slot_migrate.cc | Implements parallel snapshot migration logic, creates per-thread connections, and refactors rate limiting to use shared limiter |
| src/cluster/batch_sender.h | Changes constructor to accept shared rate limiter instead of creating per-instance limiters |
| src/cluster/batch_sender.cc | Updates rate limiting logic to use shared rate limiter and removes dynamic rate update method |
| kvrocks.conf | Documents the new parallelism configuration parameter |
Comments suppressed due to low confidence (1)
src/cluster/slot_migrate.cc:1360
- The storage_ pointer is accessed by multiple threads without synchronization. While storage_ methods like DefaultScanOptions(), GetCFHandle(), and IsSlotIdEncoded() are likely thread-safe for reading, and the slot_snapshot ensures a consistent view, this should be verified and documented. Consider adding a comment indicating that storage_ access is thread-safe for read operations.
rocksdb::ReadOptions read_options = storage_->DefaultScanOptions();
read_options.snapshot = slot_snapshot_;
rocksdb::Slice prefix_slice(prefix);
rocksdb::Slice upper_bound_slice(upper_bound);
read_options.iterate_lower_bound = &prefix_slice;
read_options.iterate_upper_bound = &upper_bound_slice;
auto no_txn_ctx = engine::Context::NoTransactionContext(storage_);
engine::DBIterator iter(no_txn_ctx, read_options);
BatchSender batch_sender(fd, migrate_batch_size_bytes_, global_rate_limiter_);
for (iter.Seek(prefix); iter.Valid(); iter.Next()) {
auto key_slot_id = ExtractSlotId(iter.Key());
if (!sub.Contains(key_slot_id)) {
break;
}
auto redis_type = iter.Type();
std::string log_data;
if (redis_type == RedisType::kRedisList) {
redis::WriteBatchLogData batch_log_data(redis_type, {std::to_string(RedisCommand::kRedisCmdRPush)});
log_data = batch_log_data.Encode();
} else {
redis::WriteBatchLogData batch_log_data(redis_type);
log_data = batch_log_data.Encode();
}
batch_sender.SetPrefixLogData(log_data);
GET_OR_RET(batch_sender.Put(storage_->GetCFHandle(ColumnFamilyID::Metadata), iter.Key(), iter.Value()));
if (batch_sender.IsFull()) {
GET_OR_RET(sendMigrationBatch(&batch_sender));
}
auto subkey_iter = iter.GetSubKeyIterator();
if (!subkey_iter) {
continue;
}
for (subkey_iter->Seek(); subkey_iter->Valid(); subkey_iter->Next()) {
GET_OR_RET(batch_sender.Put(subkey_iter->ColumnFamilyHandle(), subkey_iter->Key(), subkey_iter->Value()));
if (batch_sender.IsFull()) {
GET_OR_RET(sendMigrationBatch(&batch_sender));
}
if (redis_type == RedisType::kRedisZSet) {
InternalKey internal_key(subkey_iter->Key(), storage_->IsSlotIdEncoded());
auto score_key = subkey_iter->Value().ToString();
score_key.append(subkey_iter->UserKey().ToString());
auto score_key_bytes =
InternalKey(iter.Key(), score_key, internal_key.GetVersion(), storage_->IsSlotIdEncoded()).Encode();
GET_OR_RET(batch_sender.Put(storage_->GetCFHandle(ColumnFamilyID::SecondarySubkey), score_key_bytes, Slice()));
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| auto s = authOnDstNode(*fd, pass); | ||
| if (!s.IsOK()) { | ||
| error("failed to authenticate on destination node error: {}", s.Msg()); | ||
| return -1; |
Copilot
AI
Dec 19, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The error handling returns -1 but doesn't close the file descriptor that was successfully created by SockConnect. When authentication fails, the established connection is leaked. The file descriptor from SockConnect should be closed before returning on authentication failure.
|
|
||
| # The parallelism of slot migration passing SST files | ||
| # | ||
| # Default: the number of Kvrocks node cores |
Copilot
AI
Dec 19, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The comment is incomplete - it should be "Default: 0 (which uses the number of Kvrocks node cores)" to clarify that 0 is a valid value that triggers automatic detection, not just stating what the automatic value would be.
| # Default: the number of Kvrocks node cores | |
| # Default: 0 (which uses the number of Kvrocks node cores) |
|
|
||
| Status SlotMigrator::sendMigrationBatch(BatchSender *batch) { | ||
| // user may dynamically change some configs, apply it when send data | ||
| batch->SetMaxBytes(migrate_batch_size_bytes_); |
Copilot
AI
Dec 19, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The removal of the SetBytesPerSecond call means that dynamic changes to migrate_batch_bytes_per_sec_ during migration will not be applied to the global rate limiter. Since the rate limiter is now created once in the constructor and shared globally, updates to the config value won't take effect until the SlotMigrator is recreated. Consider whether dynamic rate limit updates should still be supported and if so, add thread-safe updating of the global rate limiter.
| batch->SetMaxBytes(migrate_batch_size_bytes_); | |
| batch->SetMaxBytes(migrate_batch_size_bytes_); | |
| batch->SetBytesPerSecond(migrate_batch_bytes_per_sec_); |
|
|
||
| return Status::OK(); | ||
| } | ||
|
|
Copilot
AI
Dec 19, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The dst_ip_ and dst_port_ member variables are accessed by multiple threads without synchronization. While these are set before the parallel migration begins and are not modified during migration, they should be documented as thread-safe or protected. Consider making them const or adding documentation that they must not be modified during parallel operations.
| // NOTE: dst_ip_ and dst_port_ are configured before any parallel migration begins | |
| // and are not modified during migration. They must not be mutated while parallel | |
| // operations are in progress, so concurrent reads from multiple threads here are | |
| // considered thread-safe by design. |
| auto upper_bound = ComposeSlotKeyUpperBound(namespace_, end_slot); | ||
|
|
||
| rocksdb::ReadOptions read_options = storage_->DefaultScanOptions(); | ||
| read_options.snapshot = slot_snapshot_; |
Copilot
AI
Dec 19, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The slot_snapshot_ member variable is accessed by multiple threads without synchronization. While RocksDB snapshots are immutable and thread-safe to read from, the pointer itself should be properly synchronized or documented as being set before parallel access begins. Verify that slot_snapshot_ is fully initialized and won't change during the parallel migration phase.
| int count = slots_per_thread + (i < remain_slots ? 1 : 0); | ||
| int cur_end = cur_start + count - 1; | ||
|
|
||
| results.emplace_back(std::async(std::launch::async, [=]() -> Status { |
Copilot
AI
Dec 19, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The lambda captures variables by value (using [=]), including the loop variable 'i'. However, 'i' is only used in the error message on line 1281, which makes it valuable. The other captured variables (cur_start, cur_end) are correctly captured by value since they change in each iteration. This is correct, but consider being explicit about what's captured for better code clarity.
| int SlotMigrator::createConnectToDstNode() { | ||
| // Connect to the destination node | ||
| auto fd = util::SockConnect(dst_ip_, dst_port_); | ||
| if (!fd.IsOK()) { | ||
| error("failed to connect to the node error: {}", fd.Msg()); | ||
| return -1; | ||
| } | ||
|
|
||
| std::string pass = srv_->GetConfig()->requirepass; | ||
| if (!pass.empty()) { | ||
| auto s = authOnDstNode(*fd, pass); | ||
| if (!s.IsOK()) { | ||
| error("failed to authenticate on destination node error: {}", s.Msg()); | ||
| return -1; | ||
| } | ||
| } | ||
| return *fd; |
Copilot
AI
Dec 19, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The function signature returns an int that can be negative to indicate failure, but the return type should more clearly express this. Consider using a StatusOr or Result type pattern instead of returning a raw int where negative values mean error. This would make the API more consistent with the rest of the codebase which uses Status objects.
| int SlotMigrator::createConnectToDstNode() { | |
| // Connect to the destination node | |
| auto fd = util::SockConnect(dst_ip_, dst_port_); | |
| if (!fd.IsOK()) { | |
| error("failed to connect to the node error: {}", fd.Msg()); | |
| return -1; | |
| } | |
| std::string pass = srv_->GetConfig()->requirepass; | |
| if (!pass.empty()) { | |
| auto s = authOnDstNode(*fd, pass); | |
| if (!s.IsOK()) { | |
| error("failed to authenticate on destination node error: {}", s.Msg()); | |
| return -1; | |
| } | |
| } | |
| return *fd; | |
| Status SlotMigrator::createConnectToDstNode(int *out_fd) { | |
| // Connect to the destination node | |
| auto fd = util::SockConnect(dst_ip_, dst_port_); | |
| if (!fd.IsOK()) { | |
| auto msg = fmt::format("failed to connect to the node error: {}", fd.Msg()); | |
| error("{}", msg); | |
| return {Status::NotOK, msg}; | |
| } | |
| std::string pass = srv_->GetConfig()->requirepass; | |
| if (!pass.empty()) { | |
| auto s = authOnDstNode(*fd, pass); | |
| if (!s.IsOK()) { | |
| auto msg = fmt::format("failed to authenticate on destination node error: {}", s.Msg()); | |
| error("{}", msg); | |
| return {Status::NotOK, msg}; | |
| } | |
| } | |
| *out_fd = *fd; | |
| return Status::OK(); |
| GET_OR_RET(sendMigrationBatch(&batch_sender)); | ||
|
|
||
| auto elapsed = util::GetTimeStampMS() - start_ts; | ||
| info( | ||
| "[migrate] Succeed to migrate snapshot range, slot(s): {}, elapsed: {} ms, sent: {} bytes, rate: {:.2f} kb/s, " | ||
| "batches: {}, entries: {}", | ||
| slot_range.String(), elapsed, batch_sender.GetSentBytes(), batch_sender.GetRate(start_ts), | ||
| batch_sender.GetSentBatchesNum(), batch_sender.GetEntriesNum()); | ||
|
|
||
| return Status::OK(); |
Copilot
AI
Dec 19, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The logging information about migration progress (bytes sent, rate, batches, entries) has been removed from the individual thread migrations. This makes debugging and monitoring parallel migrations more difficult, as there's no per-thread visibility. Consider adding aggregate logging or at least debug-level logs for each thread's progress to aid troubleshooting.
| void SetMigrateSlotsSendSnapshotsParallelism(int value) { | ||
| if (value > 0) migrate_slots_send_snapshots_parallelism_ = value; | ||
| } |
Copilot
AI
Dec 19, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The validation only checks if the value is greater than 0, but the configuration allows 0 as a valid value (minimum is 0 in IntField). This inconsistency means if a user explicitly sets the value to 0, it will be accepted in configuration but ignored here. Consider aligning the validation with the configuration constraints or handling 0 explicitly as a special case in the setter.
| results.emplace_back(std::async(std::launch::async, [=]() -> Status { | ||
| int fd = createConnectToDstNode(); | ||
| if (fd < 0) { | ||
| return {Status::NotOK, fmt::format("failed to connect the destination node in thread[{}]", i)}; | ||
| } |
Copilot
AI
Dec 19, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Each parallel thread creates its own connection to the destination node via createConnectToDstNode(), but there's no mechanism to ensure these connections don't overwhelm the destination node. Consider adding configuration or documentation about the impact of parallel connections, or implementing connection pooling/throttling to prevent resource exhaustion on the destination.
use conf of kvrocks to set sendSnapshotByRawKV parallel.