From fab656ab6dbcc48cffcfe1cc3b612e5057a63c73 Mon Sep 17 00:00:00 2001 From: Will Jones Date: Tue, 30 Dec 2025 16:12:10 -0800 Subject: [PATCH] fix: reduce verbosity of errors due to string conversion --- rust/lance-index/src/scalar/inverted/index.rs | 3 +-- rust/lance-index/src/scalar/ngram.rs | 2 +- rust/lance-index/src/vector/ivf/shuffler.rs | 17 ++++++++++++----- rust/lance/src/dataset/scanner.rs | 4 +--- rust/lance/src/dataset/write/update.rs | 2 +- rust/lance/src/io/exec/knn.rs | 4 ++-- rust/lance/src/io/exec/pushdown_scan.rs | 2 +- 7 files changed, 19 insertions(+), 15 deletions(-) diff --git a/rust/lance-index/src/scalar/inverted/index.rs b/rust/lance-index/src/scalar/inverted/index.rs index f9794079811..83882f8a48d 100644 --- a/rust/lance-index/src/scalar/inverted/index.rs +++ b/rust/lance-index/src/scalar/inverted/index.rs @@ -1372,8 +1372,7 @@ impl PostingListReader { let batch = self.posting_batch(token_id, false).await?; self.posting_list_from_batch(&batch, token_id) }) - .await - .map_err(|e| Error::io(e.to_string(), location!()))? + .await? .as_ref() .clone(); diff --git a/rust/lance-index/src/scalar/ngram.rs b/rust/lance-index/src/scalar/ngram.rs index 052d712b619..4d4c0bfeef2 100644 --- a/rust/lance-index/src/scalar/ngram.rs +++ b/rust/lance-index/src/scalar/ngram.rs @@ -240,7 +240,7 @@ impl NGramPostingListReader { ) .await?; NGramPostingList::try_from_batch(batch, self.frag_reuse_index.clone()) - }).await.map_err(|e| Error::io(e.to_string(), location!())) + }).await } } diff --git a/rust/lance-index/src/vector/ivf/shuffler.rs b/rust/lance-index/src/vector/ivf/shuffler.rs index e8e90f9c22b..4dc3678a876 100644 --- a/rust/lance-index/src/vector/ivf/shuffler.rs +++ b/rust/lance-index/src/vector/ivf/shuffler.rs @@ -322,8 +322,11 @@ pub async fn shuffle_dataset( .buffer_unordered(get_num_compute_intensive_cpus()) .map(|res| match res { Ok(Ok(batch)) => Ok(batch), - Ok(Err(err)) => Err(Error::io(err.to_string(), location!())), - Err(err) => Err(Error::io(err.to_string(), location!())), + Ok(Err(err)) => Err(err), + Err(join_err) => Err(Error::Execution { + message: join_err.to_string(), + location: location!(), + }), }) .boxed(); @@ -448,13 +451,17 @@ impl IvfShuffler { let writer = object_store.create(&path).await?; let mut data = Box::pin(data.peekable()); - let schema = match data.as_mut().peek().await { + let schema = match data.as_mut().peek_mut().await { Some(Ok(batch)) => batch.schema(), Some(Err(err)) => { - return Err(Error::io(err.to_string(), location!())); + // Using Error::Stop as dummy value to take the error out. + return Err(std::mem::replace(err, Error::Stop)); } None => { - return Err(Error::io("empty stream".to_string(), location!())); + return Err(Error::InvalidInput { + source: "data must not be empty".into(), + location: location!(), + }) } }; diff --git a/rust/lance/src/dataset/scanner.rs b/rust/lance/src/dataset/scanner.rs index fc5f636f5b1..756fd1b07d5 100644 --- a/rust/lance/src/dataset/scanner.rs +++ b/rust/lance/src/dataset/scanner.rs @@ -4008,9 +4008,7 @@ impl Stream for DatasetRecordBatchStream { let mut this = self.project(); let _guard = this.span.enter(); match this.exec_node.poll_next_unpin(cx) { - Poll::Ready(result) => { - Poll::Ready(result.map(|r| r.map_err(|e| Error::io(e.to_string(), location!())))) - } + Poll::Ready(result) => Poll::Ready(result.map(|r| Ok(r?))), Poll::Pending => Poll::Pending, } } diff --git a/rust/lance/src/dataset/write/update.rs b/rust/lance/src/dataset/write/update.rs index e301444ee48..1bd4ab78193 100644 --- a/rust/lance/src/dataset/write/update.rs +++ b/rust/lance/src/dataset/write/update.rs @@ -300,7 +300,7 @@ impl UpdateJob { .map(|res| match res { Ok(Ok(batch)) => Ok(batch), Ok(Err(err)) => Err(err), - Err(e) => Err(DataFusionError::Execution(e.to_string())), + Err(e) => Err(DataFusionError::ExecutionJoin(Box::new(e))), }); let stream = RecordBatchStreamAdapter::new(schema, stream); diff --git a/rust/lance/src/io/exec/knn.rs b/rust/lance/src/io/exec/knn.rs index 8c62541a519..32a253e7eb7 100644 --- a/rust/lance/src/io/exec/knn.rs +++ b/rust/lance/src/io/exec/knn.rs @@ -233,7 +233,7 @@ impl ExecutionPlan for KNNVectorDistanceExec { async move { let batch = compute_distance(key, dt, &column, batch?) .await - .map_err(|e| DataFusionError::Execution(e.to_string()))?; + .map_err(|e| DataFusionError::External(Box::new(e)))?; let distances = batch[DIST_COL].as_primitive::(); let mask = BooleanArray::from_iter( @@ -242,7 +242,7 @@ impl ExecutionPlan for KNNVectorDistanceExec { .map(|v| Some(v.map(|v| !v.is_nan()).unwrap_or(false))), ); arrow::compute::filter_record_batch(&batch, &mask) - .map_err(|e| DataFusionError::Execution(e.to_string())) + .map_err(|e| DataFusionError::ArrowError(Box::new(e), None)) } }) .buffer_unordered(get_num_compute_intensive_cpus()); diff --git a/rust/lance/src/io/exec/pushdown_scan.rs b/rust/lance/src/io/exec/pushdown_scan.rs index 6f07618655c..00d9806d95e 100644 --- a/rust/lance/src/io/exec/pushdown_scan.rs +++ b/rust/lance/src/io/exec/pushdown_scan.rs @@ -353,7 +353,7 @@ impl FragmentScanner { .map(|res| match res { Ok(Ok(batch)) => Ok(batch), Ok(Err(err)) => Err(err), - Err(err) => Err(DataFusionError::Execution(err.to_string())), + Err(join_err) => Err(DataFusionError::ExecutionJoin(Box::new(join_err))), }) });