Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions rust/lance-index/src/scalar/inverted/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1372,8 +1372,7 @@ impl PostingListReader {
let batch = self.posting_batch(token_id, false).await?;
self.posting_list_from_batch(&batch, token_id)
})
.await
.map_err(|e| Error::io(e.to_string(), location!()))?
.await?
.as_ref()
.clone();

Expand Down
2 changes: 1 addition & 1 deletion rust/lance-index/src/scalar/ngram.rs
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,7 @@ impl NGramPostingListReader {
)
.await?;
NGramPostingList::try_from_batch(batch, self.frag_reuse_index.clone())
}).await.map_err(|e| Error::io(e.to_string(), location!()))
}).await
}
}

Expand Down
17 changes: 12 additions & 5 deletions rust/lance-index/src/vector/ivf/shuffler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -322,8 +322,11 @@ pub async fn shuffle_dataset(
.buffer_unordered(get_num_compute_intensive_cpus())
.map(|res| match res {
Ok(Ok(batch)) => Ok(batch),
Ok(Err(err)) => Err(Error::io(err.to_string(), location!())),
Err(err) => Err(Error::io(err.to_string(), location!())),
Ok(Err(err)) => Err(err),
Err(join_err) => Err(Error::Execution {
message: join_err.to_string(),
location: location!(),
}),
})
.boxed();

Expand Down Expand Up @@ -448,13 +451,17 @@ impl IvfShuffler {
let writer = object_store.create(&path).await?;

let mut data = Box::pin(data.peekable());
let schema = match data.as_mut().peek().await {
let schema = match data.as_mut().peek_mut().await {
Some(Ok(batch)) => batch.schema(),
Some(Err(err)) => {
return Err(Error::io(err.to_string(), location!()));
// Using Error::Stop as dummy value to take the error out.
return Err(std::mem::replace(err, Error::Stop));
}
None => {
return Err(Error::io("empty stream".to_string(), location!()));
return Err(Error::InvalidInput {
source: "data must not be empty".into(),
location: location!(),
})
}
};

Expand Down
4 changes: 1 addition & 3 deletions rust/lance/src/dataset/scanner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4008,9 +4008,7 @@ impl Stream for DatasetRecordBatchStream {
let mut this = self.project();
let _guard = this.span.enter();
match this.exec_node.poll_next_unpin(cx) {
Poll::Ready(result) => {
Poll::Ready(result.map(|r| r.map_err(|e| Error::io(e.to_string(), location!()))))
}
Poll::Ready(result) => Poll::Ready(result.map(|r| Ok(r?))),
Poll::Pending => Poll::Pending,
}
}
Expand Down
2 changes: 1 addition & 1 deletion rust/lance/src/dataset/write/update.rs
Original file line number Diff line number Diff line change
Expand Up @@ -300,7 +300,7 @@ impl UpdateJob {
.map(|res| match res {
Ok(Ok(batch)) => Ok(batch),
Ok(Err(err)) => Err(err),
Err(e) => Err(DataFusionError::Execution(e.to_string())),
Err(e) => Err(DataFusionError::ExecutionJoin(Box::new(e))),
});
let stream = RecordBatchStreamAdapter::new(schema, stream);

Expand Down
4 changes: 2 additions & 2 deletions rust/lance/src/io/exec/knn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ impl ExecutionPlan for KNNVectorDistanceExec {
async move {
let batch = compute_distance(key, dt, &column, batch?)
.await
.map_err(|e| DataFusionError::Execution(e.to_string()))?;
.map_err(|e| DataFusionError::External(Box::new(e)))?;

let distances = batch[DIST_COL].as_primitive::<Float32Type>();
let mask = BooleanArray::from_iter(
Expand All @@ -242,7 +242,7 @@ impl ExecutionPlan for KNNVectorDistanceExec {
.map(|v| Some(v.map(|v| !v.is_nan()).unwrap_or(false))),
);
arrow::compute::filter_record_batch(&batch, &mask)
.map_err(|e| DataFusionError::Execution(e.to_string()))
.map_err(|e| DataFusionError::ArrowError(Box::new(e), None))
}
})
.buffer_unordered(get_num_compute_intensive_cpus());
Expand Down
2 changes: 1 addition & 1 deletion rust/lance/src/io/exec/pushdown_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -353,7 +353,7 @@ impl FragmentScanner {
.map(|res| match res {
Ok(Ok(batch)) => Ok(batch),
Ok(Err(err)) => Err(err),
Err(err) => Err(DataFusionError::Execution(err.to_string())),
Err(join_err) => Err(DataFusionError::ExecutionJoin(Box::new(join_err))),
})
});

Expand Down