Skip to content

Commit 60e4c09

Browse files
feat: Introduce python bindings row-based append API (#142)
1 parent b1b56a6 commit 60e4c09

File tree

4 files changed

+283
-18
lines changed

4 files changed

+283
-18
lines changed

.gitignore

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,4 +17,13 @@ Cargo.lock
1717
# and can be added to the global gitignore or merged into this file. For a more nuclear
1818
# option (not recommended) you can uncomment the following to ignore the entire idea folder.
1919
.idea/
20-
.vscode/
20+
.vscode/
21+
22+
# Python
23+
__pycache__/
24+
*.py[cod]
25+
*$py.class
26+
*.so
27+
*.egg-info/
28+
dist/
29+
build/

bindings/python/example/example.py

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -118,11 +118,21 @@ async def main():
118118
append_writer.write_arrow_batch(pa_record_batch)
119119
print("Successfully wrote PyArrow RecordBatch")
120120

121-
# Test 3: Write Pandas DataFrame
121+
# Test 3: Append single rows
122+
print("\n--- Testing single row append ---")
123+
# Dict input
124+
await append_writer.append({"id": 8, "name": "Helen", "score": 93.5, "age": 26})
125+
print("Successfully appended row (dict)")
126+
127+
# List input
128+
await append_writer.append([9, "Ivan", 90.0, 31])
129+
print("Successfully appended row (list)")
130+
131+
# Test 4: Write Pandas DataFrame
122132
print("\n--- Testing Pandas DataFrame write ---")
123133
df = pd.DataFrame(
124134
{
125-
"id": [6, 7],
135+
"id": [10, 11],
126136
"name": ["Frank", "Grace"],
127137
"score": [89.3, 94.7],
128138
"age": [29, 27],

bindings/python/fluss/__init__.pyi

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,32 @@ class FlussTable:
6868
def __repr__(self) -> str: ...
6969

7070
class AppendWriter:
71+
async def append(self, row: dict | list | tuple) -> None:
72+
"""Append a single row to the table.
73+
74+
Args:
75+
row: Dictionary mapping field names to values, or
76+
list/tuple of values in schema order
77+
78+
Supported Types:
79+
Currently supports primitive types only:
80+
- Boolean, TinyInt, SmallInt, Int, BigInt (integers)
81+
- Float, Double (floating point)
82+
- String, Char (text)
83+
- Bytes, Binary (binary data)
84+
- Null values
85+
86+
Temporal types (Date, Timestamp, Decimal) are not yet supported.
87+
88+
Example:
89+
await writer.append({'id': 1, 'name': 'Alice', 'score': 95.5})
90+
await writer.append([1, 'Alice', 95.5])
91+
92+
Note:
93+
For high-throughput bulk loading, prefer write_arrow_batch().
94+
Use flush() to ensure all queued records are sent and acknowledged.
95+
"""
96+
...
7197
def write_arrow(self, table: pa.Table) -> None: ...
7298
def write_arrow_batch(self, batch: pa.RecordBatch) -> None: ...
7399
def write_pandas(self, df: pd.DataFrame) -> None: ...

bindings/python/src/table.rs

Lines changed: 235 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -49,15 +49,15 @@ impl FlussTable {
4949
let table_info = self.table_info.clone();
5050

5151
future_into_py(py, async move {
52-
let fluss_table = fcore::client::FlussTable::new(&conn, metadata, table_info);
52+
let fluss_table = fcore::client::FlussTable::new(&conn, metadata, table_info.clone());
5353

5454
let table_append = fluss_table
5555
.new_append()
5656
.map_err(|e| FlussError::new_err(e.to_string()))?;
5757

5858
let rust_writer = table_append.create_writer();
5959

60-
let py_writer = AppendWriter::from_core(rust_writer);
60+
let py_writer = AppendWriter::from_core(rust_writer, table_info);
6161

6262
Python::attach(|py| Py::new(py, py_writer))
6363
})
@@ -193,13 +193,14 @@ impl FlussTable {
193193
/// Writer for appending data to a Fluss table
194194
#[pyclass]
195195
pub struct AppendWriter {
196-
inner: fcore::client::AppendWriter,
196+
inner: Arc<fcore::client::AppendWriter>,
197+
table_info: fcore::metadata::TableInfo,
197198
}
198199

199200
#[pymethods]
200201
impl AppendWriter {
201202
/// Write Arrow table data
202-
pub fn write_arrow(&mut self, py: Python, table: Py<PyAny>) -> PyResult<()> {
203+
pub fn write_arrow(&self, py: Python, table: Py<PyAny>) -> PyResult<()> {
203204
// Convert Arrow Table to batches and write each batch
204205
let batches = table.call_method0(py, "to_batches")?;
205206
let batch_list: Vec<Py<PyAny>> = batches.extract(py)?;
@@ -211,22 +212,40 @@ impl AppendWriter {
211212
}
212213

213214
/// Write Arrow batch data
214-
pub fn write_arrow_batch(&mut self, py: Python, batch: Py<PyAny>) -> PyResult<()> {
215+
pub fn write_arrow_batch(&self, py: Python, batch: Py<PyAny>) -> PyResult<()> {
215216
// This shares the underlying Arrow buffers without copying data
216217
let batch_bound = batch.bind(py);
217218
let rust_batch: RecordBatch = FromPyArrow::from_pyarrow_bound(batch_bound)
218219
.map_err(|e| FlussError::new_err(format!("Failed to convert RecordBatch: {e}")))?;
219220

221+
let inner = self.inner.clone();
220222
// Release the GIL before blocking on async operation
221223
let result = py.detach(|| {
222-
TOKIO_RUNTIME.block_on(async { self.inner.append_arrow_batch(rust_batch).await })
224+
TOKIO_RUNTIME.block_on(async { inner.append_arrow_batch(rust_batch).await })
223225
});
224226

225227
result.map_err(|e| FlussError::new_err(e.to_string()))
226228
}
227229

230+
/// Append a single row to the table
231+
pub fn append<'py>(
232+
&self,
233+
py: Python<'py>,
234+
row: &Bound<'py, PyAny>,
235+
) -> PyResult<Bound<'py, PyAny>> {
236+
let generic_row = python_to_generic_row(row, &self.table_info)?;
237+
let inner = self.inner.clone();
238+
239+
future_into_py(py, async move {
240+
inner
241+
.append(generic_row)
242+
.await
243+
.map_err(|e| FlussError::new_err(e.to_string()))
244+
})
245+
}
246+
228247
/// Write Pandas DataFrame data
229-
pub fn write_pandas(&mut self, py: Python, df: Py<PyAny>) -> PyResult<()> {
248+
pub fn write_pandas(&self, py: Python, df: Py<PyAny>) -> PyResult<()> {
230249
// Import pyarrow module
231250
let pyarrow = py.import("pyarrow")?;
232251

@@ -241,12 +260,16 @@ impl AppendWriter {
241260
}
242261

243262
/// Flush any pending data
244-
pub fn flush(&mut self) -> PyResult<()> {
245-
TOKIO_RUNTIME.block_on(async {
246-
self.inner
247-
.flush()
248-
.await
249-
.map_err(|e| FlussError::new_err(e.to_string()))
263+
pub fn flush(&self, py: Python) -> PyResult<()> {
264+
let inner = self.inner.clone();
265+
// Release the GIL before blocking on I/O
266+
py.detach(|| {
267+
TOKIO_RUNTIME.block_on(async {
268+
inner
269+
.flush()
270+
.await
271+
.map_err(|e| FlussError::new_err(e.to_string()))
272+
})
250273
})
251274
}
252275

@@ -257,8 +280,205 @@ impl AppendWriter {
257280

258281
impl AppendWriter {
259282
/// Create a AppendWriter from a core append writer
260-
pub fn from_core(append: fcore::client::AppendWriter) -> Self {
261-
Self { inner: append }
283+
pub fn from_core(
284+
append: fcore::client::AppendWriter,
285+
table_info: fcore::metadata::TableInfo,
286+
) -> Self {
287+
Self {
288+
inner: Arc::new(append),
289+
table_info,
290+
}
291+
}
292+
}
293+
294+
/// Represents different input shapes for a row
295+
#[derive(FromPyObject)]
296+
enum RowInput<'py> {
297+
Dict(Bound<'py, pyo3::types::PyDict>),
298+
Tuple(Bound<'py, pyo3::types::PyTuple>),
299+
List(Bound<'py, pyo3::types::PyList>),
300+
}
301+
302+
/// Helper function to process sequence types (list/tuple) into datums
303+
fn process_sequence_to_datums<'a, I>(
304+
values: I,
305+
len: usize,
306+
fields: &[fcore::metadata::DataField],
307+
) -> PyResult<Vec<fcore::row::Datum<'static>>>
308+
where
309+
I: Iterator<Item = Bound<'a, PyAny>>,
310+
{
311+
if len != fields.len() {
312+
return Err(FlussError::new_err(format!(
313+
"Expected {} values, got {}",
314+
fields.len(),
315+
len
316+
)));
317+
}
318+
319+
let mut datums = Vec::with_capacity(fields.len());
320+
for (i, (field, value)) in fields.iter().zip(values).enumerate() {
321+
datums.push(
322+
python_value_to_datum(&value, field.data_type()).map_err(|e| {
323+
FlussError::new_err(format!("Field '{}' (index {}): {}", field.name(), i, e))
324+
})?,
325+
);
326+
}
327+
Ok(datums)
328+
}
329+
330+
/// Convert Python row (dict/list/tuple) to GenericRow based on schema
331+
fn python_to_generic_row(
332+
row: &Bound<PyAny>,
333+
table_info: &fcore::metadata::TableInfo,
334+
) -> PyResult<fcore::row::GenericRow<'static>> {
335+
// Extract with user-friendly error message
336+
let row_input: RowInput = row.extract().map_err(|_| {
337+
let type_name = row
338+
.get_type()
339+
.name()
340+
.map(|n| n.to_string())
341+
.unwrap_or_else(|_| "unknown".to_string());
342+
FlussError::new_err(format!(
343+
"Row must be a dict, list, or tuple; got {}",
344+
type_name
345+
))
346+
})?;
347+
let schema = table_info.row_type();
348+
let fields = schema.fields();
349+
350+
let datums = match row_input {
351+
RowInput::Dict(dict) => {
352+
// Strict: reject unknown keys (and also reject non-str keys nicely)
353+
for (k, _) in dict.iter() {
354+
let key_str = k.extract::<&str>().map_err(|_| {
355+
let key_type = k
356+
.get_type()
357+
.name()
358+
.map(|n| n.to_string())
359+
.unwrap_or_else(|_| "unknown".to_string());
360+
FlussError::new_err(format!("Row dict keys must be strings; got {}", key_type))
361+
})?;
362+
363+
if fields.iter().all(|f| f.name() != key_str) {
364+
let expected = fields
365+
.iter()
366+
.map(|f| f.name())
367+
.collect::<Vec<_>>()
368+
.join(", ");
369+
return Err(FlussError::new_err(format!(
370+
"Unknown field '{}'. Expected fields: {}",
371+
key_str, expected
372+
)));
373+
}
374+
}
375+
376+
let mut datums = Vec::with_capacity(fields.len());
377+
for field in fields {
378+
let value = dict.get_item(field.name())?.ok_or_else(|| {
379+
FlussError::new_err(format!("Missing field: {}", field.name()))
380+
})?;
381+
datums.push(
382+
python_value_to_datum(&value, field.data_type()).map_err(|e| {
383+
FlussError::new_err(format!("Field '{}': {}", field.name(), e))
384+
})?,
385+
);
386+
}
387+
datums
388+
}
389+
390+
RowInput::List(list) => process_sequence_to_datums(list.iter(), list.len(), fields)?,
391+
392+
RowInput::Tuple(tuple) => process_sequence_to_datums(tuple.iter(), tuple.len(), fields)?,
393+
};
394+
395+
Ok(fcore::row::GenericRow { values: datums })
396+
}
397+
398+
/// Convert Python value to Datum based on data type
399+
fn python_value_to_datum(
400+
value: &Bound<PyAny>,
401+
data_type: &fcore::metadata::DataType,
402+
) -> PyResult<fcore::row::Datum<'static>> {
403+
use fcore::row::{Datum, F32, F64};
404+
405+
if value.is_none() {
406+
return Ok(Datum::Null);
407+
}
408+
409+
match data_type {
410+
fcore::metadata::DataType::Boolean(_) => {
411+
let v: bool = value.extract()?;
412+
Ok(Datum::Bool(v))
413+
}
414+
fcore::metadata::DataType::TinyInt(_) => {
415+
// Strict type checking: reject bool for int columns
416+
if value.is_instance_of::<pyo3::types::PyBool>() {
417+
return Err(FlussError::new_err(
418+
"Expected int for TinyInt column, got bool. Use 0 or 1 explicitly.".to_string(),
419+
));
420+
}
421+
let v: i8 = value.extract()?;
422+
Ok(Datum::Int8(v))
423+
}
424+
fcore::metadata::DataType::SmallInt(_) => {
425+
if value.is_instance_of::<pyo3::types::PyBool>() {
426+
return Err(FlussError::new_err(
427+
"Expected int for SmallInt column, got bool. Use 0 or 1 explicitly."
428+
.to_string(),
429+
));
430+
}
431+
let v: i16 = value.extract()?;
432+
Ok(Datum::Int16(v))
433+
}
434+
fcore::metadata::DataType::Int(_) => {
435+
if value.is_instance_of::<pyo3::types::PyBool>() {
436+
return Err(FlussError::new_err(
437+
"Expected int for Int column, got bool. Use 0 or 1 explicitly.".to_string(),
438+
));
439+
}
440+
let v: i32 = value.extract()?;
441+
Ok(Datum::Int32(v))
442+
}
443+
fcore::metadata::DataType::BigInt(_) => {
444+
if value.is_instance_of::<pyo3::types::PyBool>() {
445+
return Err(FlussError::new_err(
446+
"Expected int for BigInt column, got bool. Use 0 or 1 explicitly.".to_string(),
447+
));
448+
}
449+
let v: i64 = value.extract()?;
450+
Ok(Datum::Int64(v))
451+
}
452+
fcore::metadata::DataType::Float(_) => {
453+
let v: f32 = value.extract()?;
454+
Ok(Datum::Float32(F32::from(v)))
455+
}
456+
fcore::metadata::DataType::Double(_) => {
457+
let v: f64 = value.extract()?;
458+
Ok(Datum::Float64(F64::from(v)))
459+
}
460+
fcore::metadata::DataType::String(_) | fcore::metadata::DataType::Char(_) => {
461+
let v: String = value.extract()?;
462+
Ok(v.into())
463+
}
464+
fcore::metadata::DataType::Bytes(_) | fcore::metadata::DataType::Binary(_) => {
465+
// Efficient extraction: downcast to specific type and use bulk copy.
466+
// PyBytes::as_bytes() and PyByteArray::to_vec() are O(n) bulk copies of the underlying data.
467+
if let Ok(bytes) = value.downcast::<pyo3::types::PyBytes>() {
468+
Ok(bytes.as_bytes().to_vec().into())
469+
} else if let Ok(bytearray) = value.downcast::<pyo3::types::PyByteArray>() {
470+
Ok(bytearray.to_vec().into())
471+
} else {
472+
Err(FlussError::new_err(format!(
473+
"Expected bytes or bytearray, got {}",
474+
value.get_type().name()?
475+
)))
476+
}
477+
}
478+
_ => Err(FlussError::new_err(format!(
479+
"Unsupported data type for row-level operations: {:?}",
480+
data_type
481+
))),
262482
}
263483
}
264484

0 commit comments

Comments
 (0)