-
Notifications
You must be signed in to change notification settings - Fork 21
feat: introduce KvWriteBatch and PutKvRequest #176
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
This pull request introduces support for key-value (KV) write operations by adding KvWriteBatch and PutKvRequest structures. The PR refactors the WriteRecord API to support different write formats (append vs upsert operations) and updates error handling in the KV record batch builder.
Changes:
- Adds new
KvWriteBatchfor batching KV write operations with compacted row support - Introduces
PutKvRequestRPC message and adds PutKv API key (1016) - Refactors
WriteRecordto support multiple constructor patterns (for_append,for_append_record_batch,for_upsert) with distinct write formats
Reviewed changes
Copilot reviewed 15 out of 15 changed files in this pull request and generated 8 comments.
Show a summary per file
| File | Description |
|---|---|
| crates/fluss/src/rpc/message/put_kv.rs | New file implementing PutKvRequest RPC message for KV write operations |
| crates/fluss/src/rpc/message/mod.rs | Declares put_kv module (but doesn't export it) |
| crates/fluss/src/rpc/api_key.rs | Adds PutKv API key with value 1016 and its conversions |
| crates/fluss/src/client/write/batch.rs | Adds KvWriteBatch implementation with compacted row support and makes WriteBatch.build() mutable |
| crates/fluss/src/client/write/mod.rs | Refactors WriteRecord with private fields and new constructor methods for different write patterns |
| crates/fluss/src/client/write/sender.rs | Changes batch handling from references to owned values for mutability |
| crates/fluss/src/record/kv/kv_record_batch_builder.rs | Changes error handling from io::Error to custom Error type |
| crates/fluss/src/record/arrow.rs | Updates to use Record enum through record() accessor method |
| crates/fluss/src/proto/fluss_api.proto | Adds PutKvRequest and related protobuf message definitions |
| crates/fluss/build.rs | Configures prost to generate Bytes type for PutKv records field |
| crates/fluss/src/client/write/accumulator.rs | Updates tests to use new WriteRecord::for_append constructor |
| crates/fluss/src/client/table/*.rs | Updates writer implementations to pass schema_id and use new constructors |
| bindings/python/src/table.rs | Converts string formatting to inline format strings (unrelated style change) |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
e13a2c5 to
eda6470
Compare
eda6470 to
036d39b
Compare
|
@leekeiabstraction Please take a look when you get time |
leekeiabstraction
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for the PR. Left minor comments, PTAL.
| ack: i16, | ||
| max_request_timeout_ms: i32, | ||
| ready_batches: &[&ReadyWriteBatch], | ||
| ready_batches: &mut [ReadyWriteBatch], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need mut here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, we need it to be mut here since I have changed the build method of MemoryLogRecordsArrowBuilder to use &mut self to align with KvRecordBatchBuilder . KvRecordBatchBuilder rely much on the mut reference.
| impl KvWriteBatch { | ||
| #[allow(clippy::too_many_arguments)] | ||
| pub fn new( | ||
| batch_id: i64, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Curious on why we need to introduce batch_id here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We will use batch_id to lookup KvWriteBatch by batch_id in WriterClient
crates/fluss/src/client/write/mod.rs
Outdated
|
|
||
| // only valid for primary key table | ||
| key: Option<&'a [u8]>, | ||
| target_columns: Option<&'a [usize]>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: We can have this in a wrapper structure for CompactedRow, this way the complexities isn't exposed to WriteRecords that do not support key / target columns.
struct CompactedRecord<'a> {
key: &'a [u8],
bucket_key: &'a [u8],
target_columns: &'a [usize],
row: Option<CompactedRow<'a>>
}
pub enum Record<'a> {
Generic(GenericRow<'a>),
RecordBatch(Arc<RecordBatch>),
Compacted(CompactedRecord<'a>),
}There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good sugestion.
But Consider in the future we will also have CompactedRow as the log format, I will split the enum into LogWriteRecord and KvWriteRecod
091009e to
47a0cac
Compare
|
@leekeiabstraction Thanks for your review..Comments are addressed. PTAL |
leekeiabstraction
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TY for the quick turnaround, left one more small comment
| schema_id: i32, | ||
| bucket_key: &'a [u8], | ||
| key: &'a [u8], | ||
| target_columns: &'a [usize], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we also wrap target_columns with Option here? If not, do we need to parse empty array into None below?
leekeiabstraction
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Approving, as the last comment I had can be addressed in my PR
Purpose
Linked issue: close #170
Brief change log
Tests
API and Format
Documentation