Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions datafusion/proto/proto/datafusion.proto
Original file line number Diff line number Diff line change
Expand Up @@ -751,6 +751,7 @@ message PhysicalPlanNode {
MemoryScanExecNode memory_scan = 35;
AsyncFuncExecNode async_func = 36;
BufferExecNode buffer = 37;
ArrowScanExecNode arrow_scan = 38;
}
}

Expand Down Expand Up @@ -1106,6 +1107,10 @@ message AvroScanExecNode {
FileScanExecConf base_conf = 1;
}

message ArrowScanExecNode {
FileScanExecConf base_conf = 1;
}

message MemoryScanExecNode {
repeated bytes partitions = 1;
datafusion_common.Schema schema = 2;
Expand Down
106 changes: 106 additions & 0 deletions datafusion/proto/src/generated/pbjson.rs

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 8 additions & 1 deletion datafusion/proto/src/generated/prost.rs

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

42 changes: 42 additions & 0 deletions datafusion/proto/src/physical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ use datafusion_datasource::file_compression_type::FileCompressionType;
use datafusion_datasource::file_scan_config::{FileScanConfig, FileScanConfigBuilder};
use datafusion_datasource::sink::DataSinkExec;
use datafusion_datasource::source::{DataSource, DataSourceExec};
use datafusion_datasource_arrow::source::ArrowSource;
#[cfg(feature = "avro")]
use datafusion_datasource_avro::source::AvroSource;
use datafusion_datasource_csv::file_format::CsvSink;
Expand Down Expand Up @@ -199,6 +200,9 @@ impl protobuf::PhysicalPlanNode {
PhysicalPlanType::MemoryScan(scan) => {
self.try_into_memory_scan_physical_plan(scan, ctx, codec, proto_converter)
}
PhysicalPlanType::ArrowScan(scan) => {
self.try_into_arrow_scan_physical_plan(scan, ctx, codec, proto_converter)
}
PhysicalPlanType::CoalesceBatches(coalesce_batches) => self
.try_into_coalesce_batches_physical_plan(
coalesce_batches,
Expand Down Expand Up @@ -774,6 +778,27 @@ impl protobuf::PhysicalPlanNode {
Ok(DataSourceExec::from_data_source(scan_conf))
}

fn try_into_arrow_scan_physical_plan(
&self,
scan: &protobuf::ArrowScanExecNode,
ctx: &TaskContext,
codec: &dyn PhysicalExtensionCodec,
proto_converter: &dyn PhysicalProtoConverterExtension,
) -> Result<Arc<dyn ExecutionPlan>> {
let base_conf = scan.base_conf.as_ref().ok_or_else(|| {
internal_datafusion_err!("base_conf in ArrowScanExecNode is missing.")
})?;
let table_schema = parse_table_schema_from_proto(base_conf)?;
let scan_conf = parse_protobuf_file_scan_config(
base_conf,
ctx,
codec,
proto_converter,
Arc::new(ArrowSource::new_file_source(table_schema)),
)?;
Ok(DataSourceExec::from_data_source(scan_conf))
}

#[cfg_attr(not(feature = "parquet"), expect(unused_variables))]
fn try_into_parquet_scan_physical_plan(
&self,
Expand Down Expand Up @@ -2867,6 +2892,23 @@ impl protobuf::PhysicalPlanNode {
}
}

if let Some(scan_conf) = data_source.as_any().downcast_ref::<FileScanConfig>() {
let source = scan_conf.file_source();
if let Some(_arrow_source) = source.as_any().downcast_ref::<ArrowSource>() {
return Ok(Some(protobuf::PhysicalPlanNode {
physical_plan_type: Some(PhysicalPlanType::ArrowScan(
protobuf::ArrowScanExecNode {
base_conf: Some(serialize_file_scan_config(
scan_conf,
codec,
proto_converter,
)?),
},
)),
}));
}
}

#[cfg(feature = "parquet")]
if let Some((maybe_parquet, conf)) =
data_source_exec.downcast_to_file_source::<ParquetSource>()
Expand Down
28 changes: 26 additions & 2 deletions datafusion/proto/tests/cases/roundtrip_physical_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@ use datafusion::datasource::listing::{
};
use datafusion::datasource::object_store::ObjectStoreUrl;
use datafusion::datasource::physical_plan::{
FileGroup, FileOutputMode, FileScanConfigBuilder, FileSinkConfig, ParquetSource,
wrap_partition_type_in_dict, wrap_partition_value_in_dict,
ArrowSource, FileGroup, FileOutputMode, FileScanConfigBuilder, FileSinkConfig,
ParquetSource, wrap_partition_type_in_dict, wrap_partition_value_in_dict,
};
use datafusion::datasource::sink::DataSinkExec;
use datafusion::datasource::source::DataSourceExec;
Expand Down Expand Up @@ -929,6 +929,30 @@ fn roundtrip_parquet_exec_with_pruning_predicate() -> Result<()> {
roundtrip_test(DataSourceExec::from_data_source(scan_config))
}

#[test]
fn roundtrip_arrow_scan() -> Result<()> {
let file_schema =
Arc::new(Schema::new(vec![Field::new("col", DataType::Utf8, false)]));

let table_schema = TableSchema::new(file_schema.clone(), vec![]);
let file_source = Arc::new(ArrowSource::new_file_source(table_schema));

let scan_config =
FileScanConfigBuilder::new(ObjectStoreUrl::local_filesystem(), file_source)
.with_file_groups(vec![FileGroup::new(vec![PartitionedFile::new(
"/path/to/file.arrow".to_string(),
1024,
)])])
.with_statistics(Statistics {
num_rows: Precision::Inexact(100),
total_byte_size: Precision::Inexact(1024),
column_statistics: Statistics::unknown_column(&file_schema),
})
.build();

roundtrip_test(DataSourceExec::from_data_source(scan_config))
}

#[tokio::test]
async fn roundtrip_parquet_exec_with_table_partition_cols() -> Result<()> {
let mut file_group =
Expand Down