diff --git a/Cargo.lock b/Cargo.lock index ee0af9ad2..a437f0d1c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7652,6 +7652,7 @@ dependencies = [ "parquet", "rand 0.9.2", "tokio", + "tracing", ] [[package]] diff --git a/crates/core/parquet/Cargo.toml b/crates/core/parquet/Cargo.toml index f790ef228..c42d4cdf5 100644 --- a/crates/core/parquet/Cargo.toml +++ b/crates/core/parquet/Cargo.toml @@ -13,7 +13,7 @@ datasets-common = { path = "../datasets-common" } futures.workspace = true metadata-db = { path = "../metadata-db" } object_store.workspace = true -parquet-ext = { path = "../../parquet-ext", features = ["async", "zstd"] } +parquet-ext = { path = "../../parquet-ext", features = ["async", "zstd", "tracing"] } serde.workspace = true serde_json.workspace = true thiserror.workspace = true diff --git a/crates/core/parquet/src/writer.rs b/crates/core/parquet/src/writer.rs index 46fb0512b..11f3442a5 100644 --- a/crates/core/parquet/src/writer.rs +++ b/crates/core/parquet/src/writer.rs @@ -13,6 +13,7 @@ use metadata_db::{ }; use object_store::{ObjectMeta, buffered::BufWriter}; use parquet_ext::arrow::async_writer::AsyncArrowWriter; +use tracing::Instrument; use url::Url; use crate::{ @@ -77,6 +78,7 @@ impl ParquetFileWriter { } /// Appends a record batch, flushing the row group when it approaches the configured size limit. + #[tracing::instrument(skip_all, fields(rows = batch.num_rows(), table = %self.table_ref_compact))] pub async fn write(&mut self, batch: &RecordBatch) -> Result<(), ParquetError> { self.rows_written += batch.num_rows(); self.writer.write(batch).await?; @@ -104,10 +106,14 @@ impl ParquetFileWriter { parent_ids: Vec, generation: Generation, ) -> Result { - self.writer - .flush() - .await - .map_err(ParquetFileWriterCloseError::Flush)?; + async { + self.writer + .flush() + .await + .map_err(ParquetFileWriterCloseError::Flush) + } + .instrument(tracing::info_span!("flush_remaining")) + .await?; let parquet_meta = ParquetMeta { table: self.table_name.to_string(), @@ -136,11 +142,14 @@ impl ParquetFileWriter { KeyValue::new(GENERATION_METADATA_KEY.to_string(), generation.to_string()); self.writer.append_key_value_metadata(generation_metadata); - let meta = self - .writer - .close() - .await - .map_err(ParquetFileWriterCloseError::Close)?; + let meta = async { + self.writer + .close() + .await + .map_err(ParquetFileWriterCloseError::Close) + } + .instrument(tracing::info_span!("finalize_parquet")) + .await?; let object_meta = self .store diff --git a/crates/core/worker-datasets-raw/src/job_impl/ranges.rs b/crates/core/worker-datasets-raw/src/job_impl/ranges.rs index 18ba48df8..0bc59471f 100644 --- a/crates/core/worker-datasets-raw/src/job_impl/ranges.rs +++ b/crates/core/worker-datasets-raw/src/job_impl/ranges.rs @@ -510,10 +510,10 @@ impl MaterializePartition { let mut stream = std::pin::pin!(stream); let mut prev_block_num = None; - while let Some(dataset_rows) = stream - .try_next() - .await - .map_err(RunRangeError::read_stream)? + while let Some(dataset_rows) = + async { stream.try_next().await.map_err(RunRangeError::read_stream) } + .instrument(tracing::info_span!("fetch_block")) + .await? { let cur_block_num = dataset_rows.block_num(); if let Some(prev) = prev_block_num diff --git a/crates/parquet-ext/Cargo.toml b/crates/parquet-ext/Cargo.toml index 2ae180549..949d67826 100644 --- a/crates/parquet-ext/Cargo.toml +++ b/crates/parquet-ext/Cargo.toml @@ -57,6 +57,8 @@ serde = ["parquet/serde"] snap = ["parquet/snap"] # Tokio async runtime with object store and Arrow support tokio = ["async", "object_store", "parquet/arrow"] +# Tracing instrumentation for pipeline operations +tracing = ["dep:tracing"] # Zstandard compression codec zstd = ["parquet/zstd"] @@ -69,6 +71,7 @@ futures = { workspace = true, features = ["std"], optional = true } parking_lot.workspace = true parquet = { version = "57", features = ["arrow"] } tokio = { workspace = true, optional = true } +tracing = { workspace = true, optional = true } [dev-dependencies] arrow = { workspace = true, features = ["test_utils"] } diff --git a/crates/parquet-ext/src/writer/pipeline/mod.rs b/crates/parquet-ext/src/writer/pipeline/mod.rs index acaba801c..9c4a840ab 100644 --- a/crates/parquet-ext/src/writer/pipeline/mod.rs +++ b/crates/parquet-ext/src/writer/pipeline/mod.rs @@ -62,6 +62,7 @@ where self.writer_executor.append_key_value_metadata(kv); } + #[cfg_attr(feature = "tracing", tracing::instrument(skip_all))] pub async fn close_async(mut self) -> Result { self.encoder_executor.flush_async(true).await?; let metadata = self.writer_executor.close_async().await?; diff --git a/crates/parquet-ext/src/writer/pipeline/writer/mod.rs b/crates/parquet-ext/src/writer/pipeline/writer/mod.rs index 3f14a9519..04882413a 100644 --- a/crates/parquet-ext/src/writer/pipeline/writer/mod.rs +++ b/crates/parquet-ext/src/writer/pipeline/writer/mod.rs @@ -71,6 +71,7 @@ where } /// Blocking close: receive all remaining WriteJobs until Finalize, then finalize. + #[cfg_attr(feature = "tracing", tracing::instrument(skip_all))] pub async fn close_async(mut self) -> Result { loop { match self @@ -105,6 +106,7 @@ where self.key_value_metadata.push(kv); } + #[cfg_attr(feature = "tracing", tracing::instrument(skip_all))] async fn write_row_group(&mut self, pending: PendingRowGroup) -> Result<()> { let PendingRowGroup { chunks, @@ -176,6 +178,7 @@ where Ok(()) } + #[cfg_attr(feature = "tracing", tracing::instrument(skip_all))] async fn finalize(&mut self) -> Result { self.flush_pending_encoders().await?; self.flush_key_value_metadata();