Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 28 additions & 3 deletions datafusion/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use arrow_ipc::CompressionType;
use crate::encryption::{FileDecryptionProperties, FileEncryptionProperties};
use crate::error::_config_err;
use crate::format::{ExplainAnalyzeLevel, ExplainFormat};
use crate::parquet_config::DFParquetWriterVersion;
use crate::parquet_config::{DFParquetWriterVersion, DFTimeUnit};
use crate::parsers::CompressionTypeVariant;
use crate::utils::get_available_parallelism;
use crate::{DataFusionError, Result};
Expand Down Expand Up @@ -732,13 +732,13 @@ config_namespace! {
/// BLOB instead.
pub binary_as_string: bool, default = false

/// (reading) If true, parquet reader will read columns of
/// (reading) If set, parquet reader will read columns of
/// physical type int96 as originating from a different resolution
/// than nanosecond. This is useful for reading data from systems like Spark
/// which stores microsecond resolution timestamps in an int96 allowing it
/// to write values with a larger date range than 64-bit timestamps with
/// nanosecond resolution.
pub coerce_int96: Option<String>, transform = str::to_lowercase, default = None
pub coerce_int96: DFTimeUnit, default = DFTimeUnit::default()

/// (reading) Use any available bloom filters when reading parquet files
pub bloom_filter_on_read: bool, default = true
Expand Down Expand Up @@ -3508,4 +3508,29 @@ mod tests {
"Invalid or Unsupported Configuration: Invalid parquet writer version: 3.0. Expected one of: 1.0, 2.0"
);
}
#[cfg(feature = "parquet")]
#[test]
fn test_parquet_coerce_int96_validation() {
use crate::{config::ConfigOptions, parquet_config::DFTimeUnit};

let mut config = ConfigOptions::default();

// Valid values should work
config
.set("datafusion.execution.parquet.coerce_int96", "ns")
.unwrap();
assert_eq!(
config.execution.parquet.coerce_int96,
DFTimeUnit::Nanosecond
);

// Invalid value should error immediately at SET time
let err = config
.set("datafusion.execution.parquet.coerce_int96", "invalid")
.unwrap_err();
assert_eq!(
err.to_string(),
"Invalid or Unsupported Configuration: Invalid parquet coerce_int96: invalid. Expected one of: ns, us, ms, s"
);
}
}
6 changes: 3 additions & 3 deletions datafusion/common/src/file_options/parquet_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -389,7 +389,7 @@ mod tests {
#[cfg(feature = "parquet_encryption")]
use crate::config::ConfigFileEncryptionProperties;
use crate::config::{ParquetColumnOptions, ParquetEncryptionOptions, ParquetOptions};
use crate::parquet_config::DFParquetWriterVersion;
use crate::parquet_config::{DFParquetWriterVersion, DFTimeUnit};
use parquet::basic::Compression;
use parquet::file::properties::{
BloomFilterProperties, DEFAULT_BLOOM_FILTER_FPP, DEFAULT_BLOOM_FILTER_NDV,
Expand Down Expand Up @@ -458,7 +458,7 @@ mod tests {
schema_force_view_types: defaults.schema_force_view_types,
binary_as_string: defaults.binary_as_string,
skip_arrow_metadata: defaults.skip_arrow_metadata,
coerce_int96: None,
coerce_int96: DFTimeUnit::default(),
max_predicate_cache_size: defaults.max_predicate_cache_size,
}
}
Expand Down Expand Up @@ -573,7 +573,7 @@ mod tests {
schema_force_view_types: global_options_defaults.schema_force_view_types,
binary_as_string: global_options_defaults.binary_as_string,
skip_arrow_metadata: global_options_defaults.skip_arrow_metadata,
coerce_int96: None,
coerce_int96: DFTimeUnit::default(),
},
column_specific_options,
key_value_metadata,
Expand Down
82 changes: 82 additions & 0 deletions datafusion/common/src/parquet_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,3 +106,85 @@ impl From<parquet::file::properties::WriterVersion> for DFParquetWriterVersion {
}
}
}

/// Time unit options for Parquet INT96 timestamp coercion.
///
/// This enum validates time unit values at configuration time,
/// ensuring only supported units ("ns", "us", "ms", "s") can be set
/// via `SET` commands or proto deserialization.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
pub enum DFTimeUnit {
#[default]
Nanosecond,
Microsecond,
Millisecond,
Second,
}

impl FromStr for DFTimeUnit {
type Err = DataFusionError;

fn from_str(s: &str) -> Result<Self> {
match s.to_lowercase().as_str() {
"ns" => Ok(DFTimeUnit::Nanosecond),
"us" => Ok(DFTimeUnit::Microsecond),
"ms" => Ok(DFTimeUnit::Millisecond),
"s" => Ok(DFTimeUnit::Second),
other => Err(DataFusionError::Configuration(format!(
"Invalid parquet coerce_int96: {other}. Expected one of: ns, us, ms, s"
))),
}
}
}

impl Display for DFTimeUnit {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let s = match self {
DFTimeUnit::Nanosecond => "ns",
DFTimeUnit::Microsecond => "us",
DFTimeUnit::Millisecond => "ms",
DFTimeUnit::Second => "s",
};
write!(f, "{s}")
}
}

impl ConfigField for DFTimeUnit {
fn visit<V: Visit>(&self, v: &mut V, key: &str, description: &'static str) {
v.some(key, self, description)
}

fn set(&mut self, _: &str, value: &str) -> Result<()> {
*self = DFTimeUnit::from_str(value)?;
Ok(())
}
}

/// Convert `DFTimeUnit` to parquet crate's `arrow::datatypes::TimeUnit`
///
/// This conversion is infallible since `DFTimeUnit` only contains
/// valid values that have been validated at configuration time.
#[cfg(feature = "parquet")]
impl From<DFTimeUnit> for arrow::datatypes::TimeUnit {
fn from(value: DFTimeUnit) -> Self {
match value {
DFTimeUnit::Nanosecond => arrow::datatypes::TimeUnit::Nanosecond,
DFTimeUnit::Microsecond => arrow::datatypes::TimeUnit::Microsecond,
DFTimeUnit::Millisecond => arrow::datatypes::TimeUnit::Millisecond,
DFTimeUnit::Second => arrow::datatypes::TimeUnit::Second,
}
}
}

/// Convert parquet crate's `arrow::datatypes::TimeUnit` to `DFTimeUnit`
#[cfg(feature = "parquet")]
impl From<arrow::datatypes::TimeUnit> for DFTimeUnit {
fn from(value: arrow::datatypes::TimeUnit) -> Self {
match value {
arrow::datatypes::TimeUnit::Nanosecond => DFTimeUnit::Nanosecond,
arrow::datatypes::TimeUnit::Microsecond => DFTimeUnit::Microsecond,
arrow::datatypes::TimeUnit::Millisecond => DFTimeUnit::Millisecond,
arrow::datatypes::TimeUnit::Second => DFTimeUnit::Second,
}
}
}
11 changes: 6 additions & 5 deletions datafusion/core/src/datasource/physical_plan/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ mod tests {
use arrow_schema::{SchemaRef, TimeUnit};
use bytes::{BufMut, BytesMut};
use datafusion_common::config::TableParquetOptions;
use datafusion_common::parquet_config::DFTimeUnit;
use datafusion_common::test_util::{batches_to_sort_string, batches_to_string};
use datafusion_common::{Result, ScalarValue, assert_contains};
use datafusion_datasource::file_format::FileFormat;
Expand Down Expand Up @@ -1342,7 +1343,7 @@ mod tests {

let time_units_and_expected = vec![
(
None, // Same as "ns" time_unit
DFTimeUnit::default(), // default: "Nanosecond"
Arc::new(Int64Array::from(vec![
Some(1704141296123456000), // Reads as nanosecond fine (note 3 extra 0s)
Some(1704070800000000000), // Reads as nanosecond fine (note 3 extra 0s)
Expand All @@ -1353,7 +1354,7 @@ mod tests {
])),
),
(
Some("ns".to_string()),
DFTimeUnit::Millisecond,
Arc::new(Int64Array::from(vec![
Some(1704141296123456000),
Some(1704070800000000000),
Expand All @@ -1364,7 +1365,7 @@ mod tests {
])),
),
(
Some("us".to_string()),
DFTimeUnit::Microsecond,
Arc::new(Int64Array::from(vec![
Some(1704141296123456),
Some(1704070800000000),
Expand All @@ -1379,7 +1380,7 @@ mod tests {
for (time_unit, expected) in time_units_and_expected {
let parquet_exec = scan_format(
&state,
&ParquetFormat::default().with_coerce_int96(time_unit.clone()),
&ParquetFormat::default().with_coerce_int96(time_unit),
Some(schema.clone()),
&testdata,
filename,
Expand Down Expand Up @@ -1428,7 +1429,7 @@ mod tests {

let parquet_exec = scan_format(
&state,
&ParquetFormat::default().with_coerce_int96(Some("us".to_string())),
&ParquetFormat::default().with_coerce_int96(DFTimeUnit::Microsecond),
None,
testdata,
filename,
Expand Down
17 changes: 7 additions & 10 deletions datafusion/datasource-parquet/src/file_format.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use std::{fmt, vec};

use arrow::array::RecordBatch;
use arrow::datatypes::{Fields, Schema, SchemaRef, TimeUnit};
use datafusion_common::parquet_config::DFTimeUnit;
use datafusion_datasource::TableSchema;
use datafusion_datasource::file_compression_type::FileCompressionType;
use datafusion_datasource::file_sink_config::{FileSink, FileSinkConfig};
Expand Down Expand Up @@ -60,7 +61,7 @@ use datafusion_session::Session;

use crate::metadata::{DFParquetMetadata, lex_ordering_to_sorting_columns};
use crate::reader::CachedParquetFileReaderFactory;
use crate::source::{ParquetSource, parse_coerce_int96_string};
use crate::source::ParquetSource;
use async_trait::async_trait;
use bytes::Bytes;
use datafusion_datasource::source::DataSourceExec;
Expand Down Expand Up @@ -273,11 +274,11 @@ impl ParquetFormat {
self
}

pub fn coerce_int96(&self) -> Option<String> {
self.options.global.coerce_int96.clone()
pub fn coerce_int96(&self) -> DFTimeUnit {
self.options.global.coerce_int96
}

pub fn with_coerce_int96(mut self, time_unit: Option<String>) -> Self {
pub fn with_coerce_int96(mut self, time_unit: DFTimeUnit) -> Self {
self.options.global.coerce_int96 = time_unit;
self
}
Expand Down Expand Up @@ -364,11 +365,7 @@ impl FileFormat for ParquetFormat {
store: &Arc<dyn ObjectStore>,
objects: &[ObjectMeta],
) -> Result<SchemaRef> {
let coerce_int96 = match self.coerce_int96() {
Some(time_unit) => Some(parse_coerce_int96_string(time_unit.as_str())?),
None => None,
};

let coerce_int96 = self.coerce_int96();
let file_metadata_cache =
state.runtime_env().cache_manager.get_file_metadata_cache();

Expand All @@ -384,7 +381,7 @@ impl FileFormat for ParquetFormat {
.with_metadata_size_hint(self.metadata_size_hint())
.with_decryption_properties(file_decryption_properties)
.with_file_metadata_cache(Some(Arc::clone(&file_metadata_cache)))
.with_coerce_int96(coerce_int96)
.with_coerce_int96(Some(coerce_int96.into()))
.fetch_schema_with_location()
.await?;
Ok::<_, DataFusionError>(result)
Expand Down
33 changes: 3 additions & 30 deletions datafusion/datasource-parquet/src/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,12 @@ use crate::row_filter::can_expr_be_pushed_down_with_schemas;
use datafusion_common::config::ConfigOptions;
#[cfg(feature = "parquet_encryption")]
use datafusion_common::config::EncryptionFactoryOptions;
use datafusion_datasource::as_file_source;
use datafusion_datasource::file_stream::FileOpener;

use arrow::datatypes::TimeUnit;
use datafusion_common::DataFusionError;
use datafusion_common::config::TableParquetOptions;
use datafusion_datasource::TableSchema;
use datafusion_datasource::as_file_source;
use datafusion_datasource::file::FileSource;
use datafusion_datasource::file_scan_config::FileScanConfig;
use datafusion_datasource::file_stream::FileOpener;
use datafusion_physical_expr::projection::ProjectionExprs;
use datafusion_physical_expr::{EquivalenceProperties, conjunction};
use datafusion_physical_expr_adapter::DefaultPhysicalExprAdapterFactory;
Expand Down Expand Up @@ -483,24 +480,6 @@ impl ParquetSource {
}
}

/// Parses datafusion.common.config.ParquetOptions.coerce_int96 String to a arrow_schema.datatype.TimeUnit
pub(crate) fn parse_coerce_int96_string(
str_setting: &str,
) -> datafusion_common::Result<TimeUnit> {
let str_setting_lower: &str = &str_setting.to_lowercase();

match str_setting_lower {
"ns" => Ok(TimeUnit::Nanosecond),
"us" => Ok(TimeUnit::Microsecond),
"ms" => Ok(TimeUnit::Millisecond),
"s" => Ok(TimeUnit::Second),
_ => Err(DataFusionError::Configuration(format!(
"Unknown or unsupported parquet coerce_int96: \
{str_setting}. Valid values are: ns, us, ms, and s."
))),
}
}

/// Allows easy conversion from ParquetSource to Arc&lt;dyn FileSource&gt;
impl From<ParquetSource> for Arc<dyn FileSource> {
fn from(source: ParquetSource) -> Self {
Expand Down Expand Up @@ -534,13 +513,7 @@ impl FileSource for ParquetSource {
.map(FileDecryptionProperties::from)
.map(Arc::new);

let coerce_int96 = self
.table_parquet_options
.global
.coerce_int96
.as_ref()
.map(|time_unit| parse_coerce_int96_string(time_unit.as_str()).unwrap());

let coerce_int96 = Some(self.table_parquet_options.global.coerce_int96.into());
let opener = Arc::new(ParquetOpener {
partition_index: partition,
projection: self.projection.clone(),
Expand Down
10 changes: 7 additions & 3 deletions datafusion/proto-common/src/from_proto/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

use std::collections::HashMap;
use std::convert::{TryFrom, TryInto};
use std::str::FromStr;
use std::sync::Arc;

use crate::common::proto_error;
Expand All @@ -35,6 +36,7 @@ use arrow::ipc::{
writer::{DictionaryTracker, IpcDataGenerator, IpcWriteOptions},
};

use datafusion_common::parquet_config::DFTimeUnit;
use datafusion_common::{
Column, ColumnStatistics, Constraint, Constraints, DFSchema, DFSchemaRef,
DataFusionError, JoinSide, ScalarValue, Statistics, TableReference,
Expand Down Expand Up @@ -1083,9 +1085,11 @@ impl TryFrom<&protobuf::ParquetOptions> for ParquetOptions {
maximum_buffered_record_batches_per_stream: value.maximum_buffered_record_batches_per_stream as usize,
schema_force_view_types: value.schema_force_view_types,
binary_as_string: value.binary_as_string,
coerce_int96: value.coerce_int96_opt.clone().map(|opt| match opt {
protobuf::parquet_options::CoerceInt96Opt::CoerceInt96(v) => Some(v),
}).unwrap_or(None),
coerce_int96: value.coerce_int96_opt.clone().and_then(|opt| match opt {
protobuf::parquet_options::CoerceInt96Opt::CoerceInt96(v) => {
DFTimeUnit::from_str(&v).ok()
}
}).unwrap_or_default(),
skip_arrow_metadata: value.skip_arrow_metadata,
max_predicate_cache_size: value.max_predicate_cache_size_opt.map(|opt| match opt {
protobuf::parquet_options::MaxPredicateCacheSizeOpt::MaxPredicateCacheSize(v) => Some(v as usize),
Expand Down
6 changes: 5 additions & 1 deletion datafusion/proto-common/src/to_proto/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -902,7 +902,11 @@ impl TryFrom<&ParquetOptions> for protobuf::ParquetOptions {
schema_force_view_types: value.schema_force_view_types,
binary_as_string: value.binary_as_string,
skip_arrow_metadata: value.skip_arrow_metadata,
coerce_int96_opt: value.coerce_int96.clone().map(protobuf::parquet_options::CoerceInt96Opt::CoerceInt96),
coerce_int96_opt: Some(
protobuf::parquet_options::CoerceInt96Opt::CoerceInt96(
value.coerce_int96.to_string(),
),
),
max_predicate_cache_size_opt: value.max_predicate_cache_size.map(|v| protobuf::parquet_options::MaxPredicateCacheSizeOpt::MaxPredicateCacheSize(v as u64)),
})
}
Expand Down
Loading
Loading