Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions crates/store/re_protos/proto/rerun/v1alpha1/cloud.proto
Original file line number Diff line number Diff line change
Expand Up @@ -673,6 +673,9 @@ enum TableInsertMode {

// Overwrites all existing rows in the table with the new rows.
TABLE_INSERT_MODE_OVERWRITE = 2;

// Overwrite rows based on the rerun_table_index fields.
TABLE_INSERT_MODE_REPLACE = 3;
}

message WriteTableRequest {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2341,6 +2341,7 @@ impl TryFrom<QueryTasksRequest> for crate::cloud::v1alpha1::QueryTasksRequest {
pub enum TableInsertMode {
Append,
Overwrite,
Replace,
}

impl Default for TableInsertMode {
Expand All @@ -2364,6 +2365,7 @@ impl From<crate::cloud::v1alpha1::TableInsertMode> for TableInsertMode {
match value {
cloud::TableInsertMode::Unspecified | cloud::TableInsertMode::Append => Self::Append,
cloud::TableInsertMode::Overwrite => Self::Overwrite,
cloud::TableInsertMode::Replace => Self::Replace,
}
}
}
Expand All @@ -2373,6 +2375,7 @@ impl From<TableInsertMode> for crate::cloud::v1alpha1::TableInsertMode {
match value {
TableInsertMode::Append => Self::Append,
TableInsertMode::Overwrite => Self::Overwrite,
TableInsertMode::Replace => Self::Replace,
}
}
}
Expand Down
4 changes: 4 additions & 0 deletions crates/store/re_protos/src/v1alpha1/rerun.cloud.v1alpha1.rs

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Large diffs are not rendered by default.

Large diffs are not rendered by default.

Large diffs are not rendered by default.

40 changes: 39 additions & 1 deletion crates/store/re_redap_tests/src/tests/write_table.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
use crate::tests::common::RerunCloudServiceExt as _;
use crate::RecordBatchExt;
use crate::tests::common::{RerunCloudServiceExt as _, concat_record_batches};
use crate::utils::streaming::make_streaming_request;
use crate::utils::tables::create_simple_lance_dataset;
use arrow::array::RecordBatch;
use arrow::array::record_batch;
use arrow::datatypes as arrow_schema;
use futures::TryStreamExt as _;
use itertools::Itertools as _;
use re_protos::cloud::v1alpha1::TableInsertMode;
Expand Down Expand Up @@ -102,6 +105,8 @@ pub async fn write_table(service: impl RerunCloudService) {
let returned_batches = get_table_batches(&service, &entry).await;
let returned_rows: usize = returned_batches.iter().map(|batch| batch.num_rows()).sum();
assert_eq!(returned_rows, 2 * original_rows);
let combined = concat_record_batches(&returned_batches);
insta::assert_snapshot!("append_table", combined.format_snapshot(false));

let overwrite_batches = original_batches
.iter()
Expand All @@ -126,4 +131,37 @@ pub async fn write_table(service: impl RerunCloudService) {
let returned_batches = get_table_batches(&service, &entry).await;
let returned_rows: usize = returned_batches.iter().map(|batch| batch.num_rows()).sum();
assert_eq!(returned_rows, original_rows);
let combined = concat_record_batches(&returned_batches);
insta::assert_snapshot!("overwrite_table", combined.format_snapshot(false));

let replacement_batch = record_batch!(
("boolean_nullable", Boolean, [Some(false), Some(true), None]),
("int32_nullable", Int32, [Some(11), Some(-1), Some(12)]),
("int64_not_nullable", Int64, [18, 19, 20]),
("utf8_not_nullable", Utf8, ["xyz", "pqr", "stu"])
)
.expect("Unable to create record batch");

let replace_batches = vec![WriteTableRequest {
dataframe_part: Some(replacement_batch.into()),
insert_mode: TableInsertMode::Replace.into(),
}];

service
.write_table(
make_streaming_request(replace_batches)
.with_entry_id(entry.id)
.expect("Unable to set entry_id on write table"),
)
.await
.expect("Failed to write table in replace mode");

// We replace with existing rows, so should get the same number back
let returned_batches = get_table_batches(&service, &entry).await;
let returned_rows: usize = returned_batches.iter().map(|batch| batch.num_rows()).sum();

// We have added one row that does not match the original, we should get an insert
assert_eq!(returned_rows, original_rows + 1);
let combined = concat_record_batches(&returned_batches);
insta::assert_snapshot!("replace_rows", combined.format_snapshot(false));
}
49 changes: 43 additions & 6 deletions crates/store/re_redap_tests/src/utils/tables.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,30 @@
use crate::TempPath;
use arrow::array::{RecordBatch, RecordBatchIterator, record_batch};
use arrow::datatypes as arrow_schema;
use arrow::datatypes::Schema;
use std::collections::HashMap;
use std::sync::Arc;

fn create_example_record_batch() -> RecordBatch {
record_batch!(
fn create_example_record_batch(base: i32) -> RecordBatch {
let batch = record_batch!(
("boolean_nullable", Boolean, [Some(true), Some(false), None]),
("boolean_not_nullable", Boolean, [true, false, true]),
("int8_nullable", Int8, [Some(1), None, Some(2)]),
("int8_not_nullable", Int8, [3, 4, 5]),
("int16_nullable", Int16, [Some(1), None, Some(2)]),
("int16_not_nullable", Int16, [3, 4, 5]),
("int32_nullable", Int32, [Some(1), None, Some(2)]),
(
"int32_nullable",
Int32,
[Some(11 + base), None, Some(12 + base)]
),
("int32_not_nullable", Int32, [3, 4, 5]),
("int64_nullable", Int64, [Some(1), None, Some(2)]),
("int64_not_nullable", Int64, [3, 4, 5]),
(
"int64_not_nullable",
Int64,
[18 + base as i64, 19 + base as i64, 20 + base as i64]
),
("uint8_nullable", UInt8, [Some(1), None, Some(2)]),
("uint8_not_nullable", UInt8, [3, 4, 5]),
("uint16_nullable", UInt16, [Some(1), None, Some(2)]),
Expand All @@ -35,13 +46,39 @@ fn create_example_record_batch() -> RecordBatch {
),
("large_utf8_not_nullable", LargeUtf8, ["ghi", "jkl", "mno"])
)
.expect("Unable to create record batch")
.expect("Unable to create record batch");

// Set the indices
let schema = Schema::new(
batch
.schema()
.fields
.iter()
.map(|field| {
if field.name() == "int32_nullable" || field.name() == "int64_not_nullable" {
field.as_ref().clone().with_metadata(HashMap::from([(
"rerun:is_table_index".to_owned(),
"true".to_owned(),
)]))
} else {
field.as_ref().clone()
}
})
.collect::<Vec<_>>(),
);

batch
.with_schema(Arc::new(schema))
.expect("unable to create record batch")
}

pub async fn create_simple_lance_dataset() -> anyhow::Result<TempPath> {
let tmp_dir = tempfile::tempdir()?;

let batches = vec![create_example_record_batch(), create_example_record_batch()];
let batches = vec![
create_example_record_batch(0),
create_example_record_batch(100),
];
let schema = batches[0].schema();

let batches = RecordBatchIterator::new(batches.into_iter().map(Ok), schema);
Expand Down
2 changes: 1 addition & 1 deletion crates/store/re_server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ default = []

# TODO(lancedb/lance#3073): lance depends on system protoc, so lance must be an opt-in dependency
## Enable reading in LanceDB files.
lance = ["dep:lance"]
lance = ["dep:lance", "re_redap_tests/lance"]

[dependencies]

Expand Down
1 change: 1 addition & 0 deletions crates/store/re_server/src/rerun_cloud.rs
Original file line number Diff line number Diff line change
Expand Up @@ -779,6 +779,7 @@ impl RerunCloudService for RerunCloudHandler {
{
TableInsertMode::Append => InsertOp::Append,
TableInsertMode::Overwrite => InsertOp::Overwrite,
TableInsertMode::Replace => InsertOp::Replace,
};

table.write_table(rb, insert_op).await.map_err(|err| {
Expand Down
33 changes: 29 additions & 4 deletions crates/store/re_server/src/store/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,10 @@ use datafusion::{
error::DataFusionError, execution::SessionStateBuilder, logical_expr::dml::InsertOp,
};
use futures::StreamExt as _;

#[cfg(feature = "lance")]
use lance::{
Dataset as LanceDataset,
datafusion::LanceTableProvider,
dataset::{WriteMode, WriteParams},
dataset::{MergeInsertBuilder, WhenMatched, WhenNotMatched, WriteMode, WriteParams},
};

use re_log_types::EntryId;
Expand Down Expand Up @@ -170,7 +168,34 @@ impl Table {
.map_err(|err| DataFusionError::External(err.into()))?;
}
InsertOp::Replace => {
exec_err!("Invalid insert operation. Only append and overwrite are supported.")?;
let key_columns: Vec<_> = dataset
.schema()
.fields
.iter()
.filter_map(|field| {
if field
.metadata
.get("rerun:is_table_index")
.map(|v| v.to_lowercase() == "true")
== Some(true)
{
Some(field.name.clone())
} else {
None
}
})
.collect();

let mut builder = MergeInsertBuilder::try_new(Arc::clone(dataset), key_columns)?;

let op = builder
.when_not_matched(WhenNotMatched::InsertAll)
.when_matched(WhenMatched::UpdateAll)
.try_build()?;

let (merge_dataset, _merge_stats) = op.execute_reader(reader).await?;

*dataset = merge_dataset;
}
InsertOp::Overwrite => {
params.mode = WriteMode::Overwrite;
Expand Down
13 changes: 8 additions & 5 deletions examples/python/server_tables/server_tables.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,9 @@ def create_table(client: CatalogClient, directory: Path, table_name: str, schema
def create_status_log_table(client: CatalogClient, directory: Path) -> DataFrame:
"""Create the status log table."""
schema = pa.schema([
("rerun_partition_id", pa.utf8()),
("is_complete", pa.bool_()),
("update_time", pa.timestamp(unit="ms")),
pa.field("rerun_partition_id", pa.utf8()).with_metadata({"rerun:is_table_index": "true"}),
pa.field("is_complete", pa.bool_()),
pa.field("update_time", pa.timestamp(unit="ms")),
])
return create_table(client, directory, STATUS_LOG_TABLE_NAME, schema)

Expand Down Expand Up @@ -118,10 +118,13 @@ def process_partitions(client: CatalogClient, dataset: DatasetEntry, partition_l

df.write_table(RESULTS_TABLE_NAME)

client.append_to_table(
# This command will replace the existing rows with a `True` completion status.
# If instead you wish to measure how long it takes your workflow to run, you
# can use an append statement as in the previous write.
client.update_table(
STATUS_LOG_TABLE_NAME,
rerun_partition_id=partition_list,
is_complete=[True] * len(partition_list), # Add the `True` value to prevent this from processing again
is_complete=[True] * len(partition_list),
update_time=[datetime.now()] * len(partition_list),
)

Expand Down
1 change: 1 addition & 0 deletions rerun_py/rerun_bindings/rerun_bindings.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -1627,6 +1627,7 @@ class TableInsertMode:

APPEND: TableInsertMode
OVERWRITE: TableInsertMode
REPLACE: TableInsertMode

def __str__(self, /) -> str:
"""Return str(self)."""
Expand Down
12 changes: 11 additions & 1 deletion rerun_py/rerun_sdk/rerun/catalog.py
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,16 @@ def append_to_table(self, table_name: str, **named_params: Any) -> None:
named `age` and cast the value `3` to the appropriate type.

"""
if not named_params:
return
self.write_python_objects_to_table(table_name, TableInsertMode.APPEND, **named_params)

def update_table(self, table_name: str, **named_params: Any) -> None:
if not named_params:
return
self.write_python_objects_to_table(table_name, TableInsertMode.REPLACE, **named_params)

def write_python_objects_to_table(self, table_name: str, insert_mode: TableInsertMode, **named_params: Any) -> None:
if not named_params:
return
params = named_params.items()
Expand Down Expand Up @@ -329,7 +339,7 @@ def append_to_table(self, table_name: str, **named_params: Any) -> None:
columns.append(pa.array([None] * expected_len, type=field.type))

rb = pa.RecordBatch.from_arrays(columns, schema=schema)
self.write_table(table_name, rb, TableInsertMode.APPEND)
self.write_table(table_name, rb, insert_mode)

def do_global_maintenance(self) -> None:
"""Perform maintenance tasks on the whole system."""
Expand Down
4 changes: 4 additions & 0 deletions rerun_py/src/catalog/table_entry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,13 +146,17 @@ pub enum PyTableInsertMode {

#[pyo3(name = "OVERWRITE")]
Overwrite = 2,

#[pyo3(name = "REPLACE")]
Replace = 3,
}

impl From<PyTableInsertMode> for TableInsertMode {
fn from(value: PyTableInsertMode) -> Self {
match value {
PyTableInsertMode::Append => Self::Append,
PyTableInsertMode::Overwrite => Self::Overwrite,
PyTableInsertMode::Replace => Self::Replace,
}
}
}
Loading