Skip to content

Conversation

@luoyuxia
Copy link
Contributor

@luoyuxia luoyuxia commented Jan 18, 2026

Purpose

Linked issue: close #170

Brief change log

Tests

API and Format

Documentation

Copy link

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This pull request introduces support for key-value (KV) write operations by adding KvWriteBatch and PutKvRequest structures. The PR refactors the WriteRecord API to support different write formats (append vs upsert operations) and updates error handling in the KV record batch builder.

Changes:

  • Adds new KvWriteBatch for batching KV write operations with compacted row support
  • Introduces PutKvRequest RPC message and adds PutKv API key (1016)
  • Refactors WriteRecord to support multiple constructor patterns (for_append, for_append_record_batch, for_upsert) with distinct write formats

Reviewed changes

Copilot reviewed 15 out of 15 changed files in this pull request and generated 8 comments.

Show a summary per file
File Description
crates/fluss/src/rpc/message/put_kv.rs New file implementing PutKvRequest RPC message for KV write operations
crates/fluss/src/rpc/message/mod.rs Declares put_kv module (but doesn't export it)
crates/fluss/src/rpc/api_key.rs Adds PutKv API key with value 1016 and its conversions
crates/fluss/src/client/write/batch.rs Adds KvWriteBatch implementation with compacted row support and makes WriteBatch.build() mutable
crates/fluss/src/client/write/mod.rs Refactors WriteRecord with private fields and new constructor methods for different write patterns
crates/fluss/src/client/write/sender.rs Changes batch handling from references to owned values for mutability
crates/fluss/src/record/kv/kv_record_batch_builder.rs Changes error handling from io::Error to custom Error type
crates/fluss/src/record/arrow.rs Updates to use Record enum through record() accessor method
crates/fluss/src/proto/fluss_api.proto Adds PutKvRequest and related protobuf message definitions
crates/fluss/build.rs Configures prost to generate Bytes type for PutKv records field
crates/fluss/src/client/write/accumulator.rs Updates tests to use new WriteRecord::for_append constructor
crates/fluss/src/client/table/*.rs Updates writer implementations to pass schema_id and use new constructors
bindings/python/src/table.rs Converts string formatting to inline format strings (unrelated style change)

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

@luoyuxia luoyuxia force-pushed the introduce-kv-write-batch branch 2 times, most recently from e13a2c5 to eda6470 Compare January 18, 2026 08:46
@luoyuxia luoyuxia force-pushed the introduce-kv-write-batch branch from eda6470 to 036d39b Compare January 18, 2026 08:49
@luoyuxia
Copy link
Contributor Author

@leekeiabstraction Please take a look when you get time

Copy link
Contributor

@leekeiabstraction leekeiabstraction left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for the PR. Left minor comments, PTAL.

ack: i16,
max_request_timeout_ms: i32,
ready_batches: &[&ReadyWriteBatch],
ready_batches: &mut [ReadyWriteBatch],
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need mut here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, we need it to be mut here since I have changed the build method of MemoryLogRecordsArrowBuilder to use &mut self to align with KvRecordBatchBuilder . KvRecordBatchBuilder rely much on the mut reference.

impl KvWriteBatch {
#[allow(clippy::too_many_arguments)]
pub fn new(
batch_id: i64,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Curious on why we need to introduce batch_id here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We will use batch_id to lookup KvWriteBatch by batch_id in WriterClient


// only valid for primary key table
key: Option<&'a [u8]>,
target_columns: Option<&'a [usize]>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: We can have this in a wrapper structure for CompactedRow, this way the complexities isn't exposed to WriteRecords that do not support key / target columns.

struct CompactedRecord<'a> {
   key: &'a [u8],
   bucket_key: &'a [u8],
   target_columns: &'a [usize],
   row: Option<CompactedRow<'a>>
}
pub enum Record<'a> {
    Generic(GenericRow<'a>),
    RecordBatch(Arc<RecordBatch>),
    Compacted(CompactedRecord<'a>),
}

Copy link
Contributor Author

@luoyuxia luoyuxia Jan 18, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good sugestion.
But Consider in the future we will also have CompactedRow as the log format, I will split the enum into LogWriteRecord and KvWriteRecod

@luoyuxia luoyuxia force-pushed the introduce-kv-write-batch branch from 091009e to 47a0cac Compare January 18, 2026 10:37
@luoyuxia
Copy link
Contributor Author

@leekeiabstraction Thanks for your review..Comments are addressed. PTAL

Copy link
Contributor

@leekeiabstraction leekeiabstraction left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TY for the quick turnaround, left one more small comment

schema_id: i32,
bucket_key: &'a [u8],
key: &'a [u8],
target_columns: &'a [usize],
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we also wrap target_columns with Option here? If not, do we need to parse empty array into None below?

Copy link
Contributor

@leekeiabstraction leekeiabstraction left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Approving, as the last comment I had can be addressed in my PR

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Introduce KvWriteBatch, Pb/PutKvRequest/Response and their translations

2 participants