Skip to content

Commit f4808d8

Browse files
feat: Introduce KVReadContext and read path wiring (#174)
1 parent 9c41e2c commit f4808d8

File tree

9 files changed

+674
-181
lines changed

9 files changed

+674
-181
lines changed

crates/fluss/src/record/kv/kv_record.rs

Lines changed: 89 additions & 77 deletions
Original file line numberDiff line numberDiff line change
@@ -27,14 +27,16 @@
2727
use bytes::{BufMut, Bytes, BytesMut};
2828
use std::io;
2929

30+
use crate::row::RowDecoder;
31+
use crate::row::compacted::CompactedRow;
3032
use crate::util::varint::{
3133
read_unsigned_varint_bytes, size_of_unsigned_varint, write_unsigned_varint_buf,
3234
};
3335

3436
/// Length field size in bytes
3537
pub const LENGTH_LENGTH: usize = 4;
3638

37-
/// A key-value record.
39+
/// A key-value record containing raw key and value bytes.
3840
///
3941
/// The schema is:
4042
/// - Length => Int32
@@ -43,34 +45,39 @@ pub const LENGTH_LENGTH: usize = 4;
4345
/// - Value => bytes (BinaryRow, written directly without length prefix)
4446
///
4547
/// When the value is None (deletion), no Value bytes are present.
46-
// Reference implementation:
47-
// https://github.com/apache/fluss/blob/main/fluss-common/src/main/java/org/apache/fluss/record/KvRecord.java
48+
///
49+
/// This struct stores only raw bytes. To decode the value into a typed row,
50+
/// use the `row()` method with a RowDecoder (typically obtained from the iterator).
51+
///
52+
/// Reference implementation:
53+
/// https://github.com/apache/fluss/blob/main/fluss-common/src/main/java/org/apache/fluss/record/KvRecord.java
4854
#[derive(Debug, Clone)]
4955
pub struct KvRecord {
5056
key: Bytes,
51-
value: Option<Bytes>,
57+
value_bytes: Option<Bytes>,
5258
size_in_bytes: usize,
5359
}
5460

5561
impl KvRecord {
56-
/// Create a new KvRecord with the given key and optional value.
57-
pub fn new(key: Bytes, value: Option<Bytes>) -> Self {
58-
let size_in_bytes = Self::size_of(&key, value.as_deref());
59-
Self {
60-
key,
61-
value,
62-
size_in_bytes,
63-
}
64-
}
65-
6662
/// Get the key bytes.
6763
pub fn key(&self) -> &Bytes {
6864
&self.key
6965
}
7066

71-
/// Get the value bytes (None indicates a deletion).
72-
pub fn value(&self) -> Option<&Bytes> {
73-
self.value.as_ref()
67+
/// Get the raw value bytes (for testing).
68+
#[cfg(test)]
69+
pub(crate) fn value_bytes(&self) -> Option<&Bytes> {
70+
self.value_bytes.as_ref()
71+
}
72+
73+
/// Decode the value bytes into a typed row using the provided decoder.
74+
/// This creates a lightweight CompactedRow view over the raw bytes.
75+
/// Actual field parsing is lazy (on first access).
76+
pub fn row<'a>(&'a self, decoder: &dyn RowDecoder) -> Option<CompactedRow<'a>> {
77+
self.value_bytes.as_ref().map(|bytes| {
78+
// Decode on-demand - CompactedRow<'a> lifetime tied to &'a self
79+
decoder.decode(bytes.as_ref())
80+
})
7481
}
7582

7683
/// Calculate the total size of the record when serialized (including length prefix).
@@ -121,8 +128,7 @@ impl KvRecord {
121128
/// Read a KV record from bytes at the given position.
122129
///
123130
/// Returns the KvRecord and the number of bytes consumed.
124-
///
125-
/// TODO: Connect KvReadContext and return CompactedRow records.
131+
/// The record contains only raw bytes; use `row()` with a RowDecoder to decode the value.
126132
pub fn read_from(bytes: &Bytes, position: usize) -> io::Result<(Self, usize)> {
127133
if bytes.len() < position.saturating_add(LENGTH_LENGTH) {
128134
return Err(io::Error::new(
@@ -183,11 +189,10 @@ impl KvRecord {
183189
let key = bytes.slice(current_offset..key_end);
184190
current_offset = key_end;
185191

186-
// Read value bytes directly
187-
let value = if current_offset < record_end {
192+
// Read value bytes directly (don't decode yet - will decode on-demand)
193+
let value_bytes = if current_offset < record_end {
188194
// Value is present: all remaining bytes are the value
189-
let value_bytes = bytes.slice(current_offset..record_end);
190-
Some(value_bytes)
195+
Some(bytes.slice(current_offset..record_end))
191196
} else {
192197
// No remaining bytes: this is a deletion record
193198
None
@@ -196,7 +201,7 @@ impl KvRecord {
196201
Ok((
197202
Self {
198203
key,
199-
value,
204+
value_bytes,
200205
size_in_bytes: total_size,
201206
},
202207
total_size,
@@ -207,37 +212,37 @@ impl KvRecord {
207212
pub fn get_size_in_bytes(&self) -> usize {
208213
self.size_in_bytes
209214
}
215+
216+
/// Check if this is a deletion record (no value).
217+
pub fn is_deletion(&self) -> bool {
218+
self.value_bytes.is_none()
219+
}
210220
}
211221

212222
#[cfg(test)]
213223
mod tests {
214224
use super::*;
215225

216226
#[test]
217-
fn test_kv_record_size_calculation() {
227+
fn test_kv_record_basic_operations() {
218228
let key = b"test_key";
219229
let value = b"test_value";
220230

221-
// With value (no value length varint)
231+
// Test size calculation with value
222232
let size_with_value = KvRecord::size_of(key, Some(value));
223233
assert_eq!(
224234
size_with_value,
225235
LENGTH_LENGTH + size_of_unsigned_varint(key.len() as u32) + key.len() + value.len()
226236
);
227237

228-
// Without value
238+
// Test size calculation without value (deletion)
229239
let size_without_value = KvRecord::size_of(key, None);
230240
assert_eq!(
231241
size_without_value,
232242
LENGTH_LENGTH + size_of_unsigned_varint(key.len() as u32) + key.len()
233243
);
234-
}
235-
236-
#[test]
237-
fn test_kv_record_write_read_round_trip() {
238-
let key = b"my_key";
239-
let value = b"my_value_data";
240244

245+
// Test write/read round trip with value
241246
let mut buf = BytesMut::new();
242247
let written = KvRecord::write_to_buf(&mut buf, key, Some(value)).unwrap();
243248

@@ -246,40 +251,70 @@ mod tests {
246251

247252
assert_eq!(written, read_size);
248253
assert_eq!(record.key().as_ref(), key);
249-
assert_eq!(record.value().unwrap().as_ref(), value);
254+
assert_eq!(record.value_bytes().unwrap().as_ref(), value);
250255
assert_eq!(record.get_size_in_bytes(), written);
251-
}
252-
253-
#[test]
254-
fn test_kv_record_deletion() {
255-
let key = b"delete_me";
256+
assert!(!record.is_deletion());
256257

257-
// Write deletion record (no value)
258+
// Test deletion record (no value)
259+
let delete_key = b"delete_me";
258260
let mut buf = BytesMut::new();
259-
let written = KvRecord::write_to_buf(&mut buf, key, None).unwrap();
261+
let written = KvRecord::write_to_buf(&mut buf, delete_key, None).unwrap();
260262

261263
let bytes = buf.freeze();
262264
let (record, read_size) = KvRecord::read_from(&bytes, 0).unwrap();
263265

264266
assert_eq!(written, read_size);
265-
assert_eq!(record.key().as_ref(), key);
266-
assert!(record.value().is_none());
267+
assert_eq!(record.key().as_ref(), delete_key);
268+
assert!(record.is_deletion());
269+
assert!(record.value_bytes().is_none());
267270
}
268271

269272
#[test]
270-
fn test_kv_record_with_large_key() {
271-
let key = vec![0u8; 1024];
272-
let value = vec![1u8; 4096];
273+
fn test_kv_record_multiple_records() {
274+
// Test multiple regular-sized records in buffer
275+
let records = vec![
276+
(b"key1".as_slice(), Some(b"value1".as_slice())),
277+
(b"key2".as_slice(), None), // Deletion
278+
(b"key3".as_slice(), Some(b"value3".as_slice())),
279+
];
273280

274281
let mut buf = BytesMut::new();
275-
let written = KvRecord::write_to_buf(&mut buf, &key, Some(&value)).unwrap();
282+
for (key, value) in &records {
283+
KvRecord::write_to_buf(&mut buf, key, *value).unwrap();
284+
}
285+
286+
let bytes = buf.freeze();
287+
let mut offset = 0;
288+
for (expected_key, expected_value) in &records {
289+
let (record, size) = KvRecord::read_from(&bytes, offset).unwrap();
290+
assert_eq!(record.key().as_ref(), *expected_key);
291+
match expected_value {
292+
Some(v) => {
293+
assert_eq!(record.value_bytes().unwrap().as_ref(), *v);
294+
assert!(!record.is_deletion());
295+
}
296+
None => {
297+
assert!(record.is_deletion());
298+
assert!(record.value_bytes().is_none());
299+
}
300+
}
301+
offset += size;
302+
}
303+
assert_eq!(offset, bytes.len());
304+
305+
// Test large keys and values
306+
let large_key = vec![0u8; 1024];
307+
let large_value = vec![1u8; 4096];
308+
309+
let mut buf = BytesMut::new();
310+
let written = KvRecord::write_to_buf(&mut buf, &large_key, Some(&large_value)).unwrap();
276311

277312
let bytes = buf.freeze();
278313
let (record, read_size) = KvRecord::read_from(&bytes, 0).unwrap();
279314

280315
assert_eq!(written, read_size);
281-
assert_eq!(record.key().len(), key.len());
282-
assert_eq!(record.value().unwrap().len(), value.len());
316+
assert_eq!(record.key().len(), large_key.len());
317+
assert_eq!(record.value_bytes().unwrap().len(), large_value.len());
283318
}
284319

285320
#[test]
@@ -291,7 +326,9 @@ mod tests {
291326
let bytes = buf.freeze();
292327
let result = KvRecord::read_from(&bytes, 0);
293328
assert!(result.is_err());
294-
assert_eq!(result.unwrap_err().kind(), io::ErrorKind::InvalidData);
329+
if let Err(e) = result {
330+
assert_eq!(e.kind(), io::ErrorKind::InvalidData);
331+
}
295332

296333
// Test overflow length
297334
let mut buf = BytesMut::new();
@@ -307,33 +344,8 @@ mod tests {
307344
let bytes = buf.freeze();
308345
let result = KvRecord::read_from(&bytes, 0);
309346
assert!(result.is_err());
310-
assert_eq!(result.unwrap_err().kind(), io::ErrorKind::UnexpectedEof);
311-
}
312-
313-
#[test]
314-
fn test_multiple_records_in_buffer() {
315-
let records = vec![
316-
(b"key1".as_slice(), Some(b"value1".as_slice())),
317-
(b"key2".as_slice(), None),
318-
(b"key3".as_slice(), Some(b"value3".as_slice())),
319-
];
320-
321-
let mut buf = BytesMut::new();
322-
for (key, value) in &records {
323-
KvRecord::write_to_buf(&mut buf, key, *value).unwrap();
347+
if let Err(e) = result {
348+
assert_eq!(e.kind(), io::ErrorKind::UnexpectedEof);
324349
}
325-
326-
let bytes = buf.freeze();
327-
let mut offset = 0;
328-
for (expected_key, expected_value) in &records {
329-
let (record, size) = KvRecord::read_from(&bytes, offset).unwrap();
330-
assert_eq!(record.key().as_ref(), *expected_key);
331-
match expected_value {
332-
Some(v) => assert_eq!(record.value().unwrap().as_ref(), *v),
333-
None => assert!(record.value().is_none()),
334-
}
335-
offset += size;
336-
}
337-
assert_eq!(offset, bytes.len());
338350
}
339351
}

0 commit comments

Comments
 (0)