Skip to content

Conversation

@hll1213181368
Copy link
Contributor

use conf of kvrocks to set sendSnapshotByRawKV parallel.

Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR introduces multi-threaded parallelism to optimize slot migration snapshot transmission in Kvrocks cluster operations. The implementation allows configuring the degree of parallelism through a new configuration parameter, defaulting to the number of CPU cores when set to 0.

Key Changes:

  • Added migrate-slots-send-snapshots-parallelism configuration parameter to control parallel snapshot sending
  • Refactored sendSnapshotByRawKV() to distribute slot ranges across multiple threads
  • Changed rate limiting from per-BatchSender to a shared global RateLimiter for coordinated throughput control

Reviewed changes

Copilot reviewed 7 out of 7 changed files in this pull request and generated 17 comments.

Show a summary per file
File Description
src/config/config.h Adds new configuration field for migration snapshot parallelism
src/config/config.cc Implements configuration field with auto-detection of hardware concurrency when set to 0
src/cluster/slot_migrate.h Adds parallelism setter, connection creation method, slot range migration method, and global rate limiter
src/cluster/slot_migrate.cc Implements parallel snapshot migration logic, creates per-thread connections, and refactors rate limiting to use shared limiter
src/cluster/batch_sender.h Changes constructor to accept shared rate limiter instead of creating per-instance limiters
src/cluster/batch_sender.cc Updates rate limiting logic to use shared rate limiter and removes dynamic rate update method
kvrocks.conf Documents the new parallelism configuration parameter
Comments suppressed due to low confidence (1)

src/cluster/slot_migrate.cc:1360

  • The storage_ pointer is accessed by multiple threads without synchronization. While storage_ methods like DefaultScanOptions(), GetCFHandle(), and IsSlotIdEncoded() are likely thread-safe for reading, and the slot_snapshot ensures a consistent view, this should be verified and documented. Consider adding a comment indicating that storage_ access is thread-safe for read operations.
  rocksdb::ReadOptions read_options = storage_->DefaultScanOptions();
  read_options.snapshot = slot_snapshot_;
  rocksdb::Slice prefix_slice(prefix);
  rocksdb::Slice upper_bound_slice(upper_bound);
  read_options.iterate_lower_bound = &prefix_slice;
  read_options.iterate_upper_bound = &upper_bound_slice;
  auto no_txn_ctx = engine::Context::NoTransactionContext(storage_);
  engine::DBIterator iter(no_txn_ctx, read_options);

  BatchSender batch_sender(fd, migrate_batch_size_bytes_, global_rate_limiter_);

  for (iter.Seek(prefix); iter.Valid(); iter.Next()) {
    auto key_slot_id = ExtractSlotId(iter.Key());
    if (!sub.Contains(key_slot_id)) {
      break;
    }

    auto redis_type = iter.Type();
    std::string log_data;
    if (redis_type == RedisType::kRedisList) {
      redis::WriteBatchLogData batch_log_data(redis_type, {std::to_string(RedisCommand::kRedisCmdRPush)});
      log_data = batch_log_data.Encode();
    } else {
      redis::WriteBatchLogData batch_log_data(redis_type);
      log_data = batch_log_data.Encode();
    }
    batch_sender.SetPrefixLogData(log_data);

    GET_OR_RET(batch_sender.Put(storage_->GetCFHandle(ColumnFamilyID::Metadata), iter.Key(), iter.Value()));
    if (batch_sender.IsFull()) {
      GET_OR_RET(sendMigrationBatch(&batch_sender));
    }

    auto subkey_iter = iter.GetSubKeyIterator();
    if (!subkey_iter) {
      continue;
    }

    for (subkey_iter->Seek(); subkey_iter->Valid(); subkey_iter->Next()) {
      GET_OR_RET(batch_sender.Put(subkey_iter->ColumnFamilyHandle(), subkey_iter->Key(), subkey_iter->Value()));
      if (batch_sender.IsFull()) {
        GET_OR_RET(sendMigrationBatch(&batch_sender));
      }

      if (redis_type == RedisType::kRedisZSet) {
        InternalKey internal_key(subkey_iter->Key(), storage_->IsSlotIdEncoded());
        auto score_key = subkey_iter->Value().ToString();
        score_key.append(subkey_iter->UserKey().ToString());
        auto score_key_bytes =
            InternalKey(iter.Key(), score_key, internal_key.GetVersion(), storage_->IsSlotIdEncoded()).Encode();
        GET_OR_RET(batch_sender.Put(storage_->GetCFHandle(ColumnFamilyID::SecondarySubkey), score_key_bytes, Slice()));

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +1384 to +1387
auto s = authOnDstNode(*fd, pass);
if (!s.IsOK()) {
error("failed to authenticate on destination node error: {}", s.Msg());
return -1;
Copy link

Copilot AI Dec 19, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The error handling returns -1 but doesn't close the file descriptor that was successfully created by SockConnect. When authentication fails, the established connection is leaked. The file descriptor from SockConnect should be closed before returning on authentication failure.

Copilot uses AI. Check for mistakes.

# The parallelism of slot migration passing SST files
#
# Default: the number of Kvrocks node cores
Copy link

Copilot AI Dec 19, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comment is incomplete - it should be "Default: 0 (which uses the number of Kvrocks node cores)" to clarify that 0 is a valid value that triggers automatic detection, not just stating what the automatic value would be.

Suggested change
# Default: the number of Kvrocks node cores
# Default: 0 (which uses the number of Kvrocks node cores)

Copilot uses AI. Check for mistakes.

Status SlotMigrator::sendMigrationBatch(BatchSender *batch) {
// user may dynamically change some configs, apply it when send data
batch->SetMaxBytes(migrate_batch_size_bytes_);
Copy link

Copilot AI Dec 19, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The removal of the SetBytesPerSecond call means that dynamic changes to migrate_batch_bytes_per_sec_ during migration will not be applied to the global rate limiter. Since the rate limiter is now created once in the constructor and shared globally, updates to the config value won't take effect until the SlotMigrator is recreated. Consider whether dynamic rate limit updates should still be supported and if so, add thread-safe updating of the global rate limiter.

Suggested change
batch->SetMaxBytes(migrate_batch_size_bytes_);
batch->SetMaxBytes(migrate_batch_size_bytes_);
batch->SetBytesPerSecond(migrate_batch_bytes_per_sec_);

Copilot uses AI. Check for mistakes.

return Status::OK();
}

Copy link

Copilot AI Dec 19, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The dst_ip_ and dst_port_ member variables are accessed by multiple threads without synchronization. While these are set before the parallel migration begins and are not modified during migration, they should be documented as thread-safe or protected. Consider making them const or adding documentation that they must not be modified during parallel operations.

Suggested change
// NOTE: dst_ip_ and dst_port_ are configured before any parallel migration begins
// and are not modified during migration. They must not be mutated while parallel
// operations are in progress, so concurrent reads from multiple threads here are
// considered thread-safe by design.

Copilot uses AI. Check for mistakes.
auto upper_bound = ComposeSlotKeyUpperBound(namespace_, end_slot);

rocksdb::ReadOptions read_options = storage_->DefaultScanOptions();
read_options.snapshot = slot_snapshot_;
Copy link

Copilot AI Dec 19, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The slot_snapshot_ member variable is accessed by multiple threads without synchronization. While RocksDB snapshots are immutable and thread-safe to read from, the pointer itself should be properly synchronized or documented as being set before parallel access begins. Verify that slot_snapshot_ is fully initialized and won't change during the parallel migration phase.

Copilot uses AI. Check for mistakes.
int count = slots_per_thread + (i < remain_slots ? 1 : 0);
int cur_end = cur_start + count - 1;

results.emplace_back(std::async(std::launch::async, [=]() -> Status {
Copy link

Copilot AI Dec 19, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The lambda captures variables by value (using [=]), including the loop variable 'i'. However, 'i' is only used in the error message on line 1281, which makes it valuable. The other captured variables (cur_start, cur_end) are correctly captured by value since they change in each iteration. This is correct, but consider being explicit about what's captured for better code clarity.

Copilot uses AI. Check for mistakes.
Comment on lines +1374 to +1390
int SlotMigrator::createConnectToDstNode() {
// Connect to the destination node
auto fd = util::SockConnect(dst_ip_, dst_port_);
if (!fd.IsOK()) {
error("failed to connect to the node error: {}", fd.Msg());
return -1;
}

std::string pass = srv_->GetConfig()->requirepass;
if (!pass.empty()) {
auto s = authOnDstNode(*fd, pass);
if (!s.IsOK()) {
error("failed to authenticate on destination node error: {}", s.Msg());
return -1;
}
}
return *fd;
Copy link

Copilot AI Dec 19, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The function signature returns an int that can be negative to indicate failure, but the return type should more clearly express this. Consider using a StatusOr or Result type pattern instead of returning a raw int where negative values mean error. This would make the API more consistent with the rest of the codebase which uses Status objects.

Suggested change
int SlotMigrator::createConnectToDstNode() {
// Connect to the destination node
auto fd = util::SockConnect(dst_ip_, dst_port_);
if (!fd.IsOK()) {
error("failed to connect to the node error: {}", fd.Msg());
return -1;
}
std::string pass = srv_->GetConfig()->requirepass;
if (!pass.empty()) {
auto s = authOnDstNode(*fd, pass);
if (!s.IsOK()) {
error("failed to authenticate on destination node error: {}", s.Msg());
return -1;
}
}
return *fd;
Status SlotMigrator::createConnectToDstNode(int *out_fd) {
// Connect to the destination node
auto fd = util::SockConnect(dst_ip_, dst_port_);
if (!fd.IsOK()) {
auto msg = fmt::format("failed to connect to the node error: {}", fd.Msg());
error("{}", msg);
return {Status::NotOK, msg};
}
std::string pass = srv_->GetConfig()->requirepass;
if (!pass.empty()) {
auto s = authOnDstNode(*fd, pass);
if (!s.IsOK()) {
auto msg = fmt::format("failed to authenticate on destination node error: {}", s.Msg());
error("{}", msg);
return {Status::NotOK, msg};
}
}
*out_fd = *fd;
return Status::OK();

Copilot uses AI. Check for mistakes.
Comment on lines 1369 to 1371
GET_OR_RET(sendMigrationBatch(&batch_sender));

auto elapsed = util::GetTimeStampMS() - start_ts;
info(
"[migrate] Succeed to migrate snapshot range, slot(s): {}, elapsed: {} ms, sent: {} bytes, rate: {:.2f} kb/s, "
"batches: {}, entries: {}",
slot_range.String(), elapsed, batch_sender.GetSentBytes(), batch_sender.GetRate(start_ts),
batch_sender.GetSentBatchesNum(), batch_sender.GetEntriesNum());

return Status::OK();
Copy link

Copilot AI Dec 19, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The logging information about migration progress (bytes sent, rate, batches, entries) has been removed from the individual thread migrations. This makes debugging and monitoring parallel migrations more difficult, as there's no per-thread visibility. Consider adding aggregate logging or at least debug-level logs for each thread's progress to aid troubleshooting.

Copilot uses AI. Check for mistakes.
Comment on lines +103 to +105
void SetMigrateSlotsSendSnapshotsParallelism(int value) {
if (value > 0) migrate_slots_send_snapshots_parallelism_ = value;
}
Copy link

Copilot AI Dec 19, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The validation only checks if the value is greater than 0, but the configuration allows 0 as a valid value (minimum is 0 in IntField). This inconsistency means if a user explicitly sets the value to 0, it will be accepted in configuration but ignored here. Consider aligning the validation with the configuration constraints or handling 0 explicitly as a special case in the setter.

Copilot uses AI. Check for mistakes.
Comment on lines +1278 to +1282
results.emplace_back(std::async(std::launch::async, [=]() -> Status {
int fd = createConnectToDstNode();
if (fd < 0) {
return {Status::NotOK, fmt::format("failed to connect the destination node in thread[{}]", i)};
}
Copy link

Copilot AI Dec 19, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Each parallel thread creates its own connection to the destination node via createConnectToDstNode(), but there's no mechanism to ensure these connections don't overwhelm the destination node. Consider adding configuration or documentation about the impact of parallel connections, or implementing connection pooling/throttling to prevent resource exhaustion on the destination.

Copilot uses AI. Check for mistakes.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant