Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion crates/core/parquet/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ datasets-common = { path = "../datasets-common" }
futures.workspace = true
metadata-db = { path = "../metadata-db" }
object_store.workspace = true
parquet-ext = { path = "../../parquet-ext", features = ["async", "zstd"] }
parquet-ext = { path = "../../parquet-ext", features = ["async", "zstd", "tracing"] }
serde.workspace = true
serde_json.workspace = true
thiserror.workspace = true
Expand Down
27 changes: 18 additions & 9 deletions crates/core/parquet/src/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use metadata_db::{
};
use object_store::{ObjectMeta, buffered::BufWriter};
use parquet_ext::arrow::async_writer::AsyncArrowWriter;
use tracing::Instrument;
use url::Url;

use crate::{
Expand Down Expand Up @@ -77,6 +78,7 @@ impl ParquetFileWriter {
}

/// Appends a record batch, flushing the row group when it approaches the configured size limit.
#[tracing::instrument(skip_all, fields(rows = batch.num_rows(), table = %self.table_ref_compact))]
pub async fn write(&mut self, batch: &RecordBatch) -> Result<(), ParquetError> {
self.rows_written += batch.num_rows();
self.writer.write(batch).await?;
Expand Down Expand Up @@ -104,10 +106,14 @@ impl ParquetFileWriter {
parent_ids: Vec<FileId>,
generation: Generation,
) -> Result<ParquetFileWriterOutput, ParquetFileWriterCloseError> {
self.writer
.flush()
.await
.map_err(ParquetFileWriterCloseError::Flush)?;
async {
self.writer
.flush()
.await
.map_err(ParquetFileWriterCloseError::Flush)
}
.instrument(tracing::info_span!("flush_remaining"))
.await?;

let parquet_meta = ParquetMeta {
table: self.table_name.to_string(),
Expand Down Expand Up @@ -136,11 +142,14 @@ impl ParquetFileWriter {
KeyValue::new(GENERATION_METADATA_KEY.to_string(), generation.to_string());
self.writer.append_key_value_metadata(generation_metadata);

let meta = self
.writer
.close()
.await
.map_err(ParquetFileWriterCloseError::Close)?;
let meta = async {
self.writer
.close()
.await
.map_err(ParquetFileWriterCloseError::Close)
}
.instrument(tracing::info_span!("finalize_parquet"))
.await?;

let object_meta = self
.store
Expand Down
8 changes: 4 additions & 4 deletions crates/core/worker-datasets-raw/src/job_impl/ranges.rs
Original file line number Diff line number Diff line change
Expand Up @@ -510,10 +510,10 @@ impl<S: BlockStreamer> MaterializePartition<S> {

let mut stream = std::pin::pin!(stream);
let mut prev_block_num = None;
while let Some(dataset_rows) = stream
.try_next()
.await
.map_err(RunRangeError::read_stream)?
while let Some(dataset_rows) =
async { stream.try_next().await.map_err(RunRangeError::read_stream) }
.instrument(tracing::info_span!("fetch_block"))
.await?
{
let cur_block_num = dataset_rows.block_num();
if let Some(prev) = prev_block_num
Expand Down
3 changes: 3 additions & 0 deletions crates/parquet-ext/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ serde = ["parquet/serde"]
snap = ["parquet/snap"]
# Tokio async runtime with object store and Arrow support
tokio = ["async", "object_store", "parquet/arrow"]
# Tracing instrumentation for pipeline operations
tracing = ["dep:tracing"]
# Zstandard compression codec
zstd = ["parquet/zstd"]

Expand All @@ -69,6 +71,7 @@ futures = { workspace = true, features = ["std"], optional = true }
parking_lot.workspace = true
parquet = { version = "57", features = ["arrow"] }
tokio = { workspace = true, optional = true }
tracing = { workspace = true, optional = true }

[dev-dependencies]
arrow = { workspace = true, features = ["test_utils"] }
Expand Down
1 change: 1 addition & 0 deletions crates/parquet-ext/src/writer/pipeline/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ where
self.writer_executor.append_key_value_metadata(kv);
}

#[cfg_attr(feature = "tracing", tracing::instrument(skip_all))]
pub async fn close_async(mut self) -> Result<ParquetMetaData> {
self.encoder_executor.flush_async(true).await?;
let metadata = self.writer_executor.close_async().await?;
Expand Down
3 changes: 3 additions & 0 deletions crates/parquet-ext/src/writer/pipeline/writer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ where
}

/// Blocking close: receive all remaining WriteJobs until Finalize, then finalize.
#[cfg_attr(feature = "tracing", tracing::instrument(skip_all))]
pub async fn close_async(mut self) -> Result<ParquetMetaData> {
loop {
match self
Expand Down Expand Up @@ -105,6 +106,7 @@ where
self.key_value_metadata.push(kv);
}

#[cfg_attr(feature = "tracing", tracing::instrument(skip_all))]
async fn write_row_group(&mut self, pending: PendingRowGroup) -> Result<()> {
let PendingRowGroup {
chunks,
Expand Down Expand Up @@ -176,6 +178,7 @@ where
Ok(())
}

#[cfg_attr(feature = "tracing", tracing::instrument(skip_all))]
async fn finalize(&mut self) -> Result<ParquetMetaData> {
self.flush_pending_encoders().await?;
self.flush_key_value_metadata();
Expand Down
Loading