Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 8 additions & 7 deletions src/duckdb/extension/parquet/column_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,7 @@ void ColumnWriter::HandleDefineLevels(ColumnWriterState &state, ColumnWriterStat
//===--------------------------------------------------------------------===//

ParquetColumnSchema ColumnWriter::FillParquetSchema(vector<duckdb_parquet::SchemaElement> &schemas,
const LogicalType &type, const string &name,
const LogicalType &type, const string &name, bool allow_geometry,
optional_ptr<const ChildFieldIDs> field_ids, idx_t max_repeat,
idx_t max_define, bool can_have_nulls) {
auto null_type = can_have_nulls ? FieldRepetitionType::OPTIONAL : FieldRepetitionType::REQUIRED;
Expand Down Expand Up @@ -285,7 +285,8 @@ ParquetColumnSchema ColumnWriter::FillParquetSchema(vector<duckdb_parquet::Schem
struct_column.children.reserve(child_types.size());
for (auto &child_type : child_types) {
struct_column.children.emplace_back(FillParquetSchema(schemas, child_type.second, child_type.first,
child_field_ids, max_repeat, max_define + 1));
allow_geometry, child_field_ids, max_repeat,
max_define + 1));
}
return struct_column;
}
Expand Down Expand Up @@ -321,8 +322,8 @@ ParquetColumnSchema ColumnWriter::FillParquetSchema(vector<duckdb_parquet::Schem
schemas.push_back(std::move(repeated_element));

ParquetColumnSchema list_column(name, type, max_define, max_repeat, schema_idx, 0);
list_column.children.push_back(
FillParquetSchema(schemas, child_type, "element", child_field_ids, max_repeat + 1, max_define + 2));
list_column.children.push_back(FillParquetSchema(schemas, child_type, "element", allow_geometry,
child_field_ids, max_repeat + 1, max_define + 2));
return list_column;
}
if (type.id() == LogicalTypeId::MAP) {
Expand Down Expand Up @@ -369,8 +370,8 @@ ParquetColumnSchema ColumnWriter::FillParquetSchema(vector<duckdb_parquet::Schem
for (idx_t i = 0; i < 2; i++) {
// key needs to be marked as REQUIRED
bool is_key = i == 0;
auto child_schema = FillParquetSchema(schemas, kv_types[i], kv_names[i], child_field_ids, max_repeat + 1,
max_define + 2, !is_key);
auto child_schema = FillParquetSchema(schemas, kv_types[i], kv_names[i], allow_geometry, child_field_ids,
max_repeat + 1, max_define + 2, !is_key);

map_column.children.push_back(std::move(child_schema));
}
Expand All @@ -388,7 +389,7 @@ ParquetColumnSchema ColumnWriter::FillParquetSchema(vector<duckdb_parquet::Schem
schema_element.__isset.field_id = true;
schema_element.field_id = field_id->field_id;
}
ParquetWriter::SetSchemaProperties(type, schema_element);
ParquetWriter::SetSchemaProperties(type, schema_element, allow_geometry);
schemas.push_back(std::move(schema_element));
return ParquetColumnSchema(name, type, max_define, max_repeat, schema_idx, 0);
}
Expand Down
27 changes: 21 additions & 6 deletions src/duckdb/extension/parquet/geo_parquet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -208,17 +208,19 @@ unique_ptr<GeoParquetFileMetadata> GeoParquetFileMetadata::TryRead(const duckdb_
throw InvalidInputException("Geoparquet metadata is not an object");
}

auto result = make_uniq<GeoParquetFileMetadata>();
// We dont actually care about the version for now, as we only support V1+native
auto result = make_uniq<GeoParquetFileMetadata>(GeoParquetVersion::BOTH);

// Check and parse the version
const auto version_val = yyjson_obj_get(root, "version");
if (!yyjson_is_str(version_val)) {
throw InvalidInputException("Geoparquet metadata does not have a version");
}
result->version = yyjson_get_str(version_val);
if (StringUtil::StartsWith(result->version, "2")) {
// Guard against a breaking future 2.0 version
throw InvalidInputException("Geoparquet version %s is not supported", result->version);

auto version = yyjson_get_str(version_val);
if (StringUtil::StartsWith(version, "3")) {
// Guard against a breaking future 3.0 version
throw InvalidInputException("Geoparquet version %s is not supported", version);
}

// Check and parse the geometry columns
Expand Down Expand Up @@ -344,7 +346,20 @@ void GeoParquetFileMetadata::Write(duckdb_parquet::FileMetaData &file_meta_data)
yyjson_mut_doc_set_root(doc, root);

// Add the version
yyjson_mut_obj_add_strncpy(doc, root, "version", version.c_str(), version.size());
switch (version) {
case GeoParquetVersion::V1:
case GeoParquetVersion::BOTH:
yyjson_mut_obj_add_strcpy(doc, root, "version", "1.0.0");
break;
case GeoParquetVersion::V2:
yyjson_mut_obj_add_strcpy(doc, root, "version", "2.0.0");
break;
case GeoParquetVersion::NONE:
default:
// Should never happen, we should not be writing anything
yyjson_mut_doc_free(doc);
throw InternalException("GeoParquetVersion::NONE should not write metadata");
}

// Add the primary column
yyjson_mut_obj_add_strncpy(doc, root, "primary_column", primary_geometry_column.c_str(),
Expand Down
2 changes: 1 addition & 1 deletion src/duckdb/extension/parquet/include/column_writer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ class ColumnWriter {
}

static ParquetColumnSchema FillParquetSchema(vector<duckdb_parquet::SchemaElement> &schemas,
const LogicalType &type, const string &name,
const LogicalType &type, const string &name, bool allow_geometry,
optional_ptr<const ChildFieldIDs> field_ids, idx_t max_repeat = 0,
idx_t max_define = 1, bool can_have_nulls = true);
//! Create the column writer for a specific type recursively
Expand Down
29 changes: 28 additions & 1 deletion src/duckdb/extension/parquet/include/geo_parquet.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,31 @@ enum class GeoParquetColumnEncoding : uint8_t {
MULTIPOLYGON,
};

enum class GeoParquetVersion : uint8_t {
// Write GeoParquet 1.0 metadata
// GeoParquet 1.0 has the widest support among readers and writers
V1,

// Write GeoParquet 2.0
// The GeoParquet 2.0 options is identical to GeoParquet 1.0 except the underlying storage
// of spatial columns is Parquet native geometry, where the Parquet writer will include
// native statistics according to the underlying Parquet options. Compared to 'BOTH', this will
// actually write the metadata as containing GeoParquet version 2.0.0
// However, V2 isnt standardized yet, so this option is still a bit experimental
V2,

// Write GeoParquet 1.0 metadata, with native Parquet geometry types
// This is a bit of a hold-over option for compatibility with systems that
// reject GeoParquet 2.0 metadata, but can read Parquet native geometry types as they simply ignore the extra
// logical type. DuckDB v1.4.0 falls into this category.
BOTH,

// Do not write GeoParquet metadata
// This option suppresses GeoParquet metadata; however, spatial types will be written as
// Parquet native Geometry/Geography.
NONE,
};

struct GeoParquetColumnMetadata {
// The encoding of the geometry column
GeoParquetColumnEncoding geometry_encoding;
Expand All @@ -215,6 +240,8 @@ struct GeoParquetColumnMetadata {

class GeoParquetFileMetadata {
public:
GeoParquetFileMetadata(GeoParquetVersion geo_parquet_version) : version(geo_parquet_version) {
}
void AddGeoParquetStats(const string &column_name, const LogicalType &type, const GeometryStats &stats);
void Write(duckdb_parquet::FileMetaData &file_meta_data);

Expand All @@ -234,8 +261,8 @@ class GeoParquetFileMetadata {

private:
mutex write_lock;
string version = "1.1.0";
unordered_map<string, GeoParquetColumnMetadata> geometry_columns;
GeoParquetVersion version;
};

} // namespace duckdb
9 changes: 7 additions & 2 deletions src/duckdb/extension/parquet/include/parquet_writer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ class ParquetWriter {
shared_ptr<ParquetEncryptionConfig> encryption_config, optional_idx dictionary_size_limit,
idx_t string_dictionary_page_size_limit, bool enable_bloom_filters,
double bloom_filter_false_positive_ratio, int64_t compression_level, bool debug_use_openssl,
ParquetVersion parquet_version);
ParquetVersion parquet_version, GeoParquetVersion geoparquet_version);
~ParquetWriter();

public:
Expand All @@ -95,7 +95,8 @@ class ParquetWriter {
void Finalize();

static duckdb_parquet::Type::type DuckDBTypeToParquetType(const LogicalType &duckdb_type);
static void SetSchemaProperties(const LogicalType &duckdb_type, duckdb_parquet::SchemaElement &schema_ele);
static void SetSchemaProperties(const LogicalType &duckdb_type, duckdb_parquet::SchemaElement &schema_ele,
bool allow_geometry);

ClientContext &GetContext() {
return context;
Expand Down Expand Up @@ -139,6 +140,9 @@ class ParquetWriter {
ParquetVersion GetParquetVersion() const {
return parquet_version;
}
GeoParquetVersion GetGeoParquetVersion() const {
return geoparquet_version;
}
const string &GetFileName() const {
return file_name;
}
Expand Down Expand Up @@ -175,6 +179,7 @@ class ParquetWriter {
bool debug_use_openssl;
shared_ptr<EncryptionUtil> encryption_util;
ParquetVersion parquet_version;
GeoParquetVersion geoparquet_version;
vector<ParquetColumnSchema> column_schemas;

unique_ptr<BufferedFileWriter> writer;
Expand Down
57 changes: 56 additions & 1 deletion src/duckdb/extension/parquet/parquet_extension.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,9 @@ struct ParquetWriteBindData : public TableFunctionData {

//! Which encodings to include when writing
ParquetVersion parquet_version = ParquetVersion::V1;

//! Which geo-parquet version to use when writing
GeoParquetVersion geoparquet_version = GeoParquetVersion::V1;
};

struct ParquetWriteGlobalState : public GlobalFunctionData {
Expand Down Expand Up @@ -291,6 +294,7 @@ static void ParquetListCopyOptions(ClientContext &context, CopyOptionsInput &inp
copy_options["binary_as_string"] = CopyOption(LogicalType::BOOLEAN, CopyOptionMode::READ_ONLY);
copy_options["file_row_number"] = CopyOption(LogicalType::BOOLEAN, CopyOptionMode::READ_ONLY);
copy_options["can_have_nan"] = CopyOption(LogicalType::BOOLEAN, CopyOptionMode::READ_ONLY);
copy_options["geoparquet_version"] = CopyOption(LogicalType::VARCHAR, CopyOptionMode::WRITE_ONLY);
}

static unique_ptr<FunctionData> ParquetWriteBind(ClientContext &context, CopyFunctionBindInput &input,
Expand Down Expand Up @@ -426,6 +430,19 @@ static unique_ptr<FunctionData> ParquetWriteBind(ClientContext &context, CopyFun
} else {
throw BinderException("Expected parquet_version 'V1' or 'V2'");
}
} else if (loption == "geoparquet_version") {
const auto roption = StringUtil::Upper(option.second[0].ToString());
if (roption == "NONE") {
bind_data->geoparquet_version = GeoParquetVersion::NONE;
} else if (roption == "V1") {
bind_data->geoparquet_version = GeoParquetVersion::V1;
} else if (roption == "V2") {
bind_data->geoparquet_version = GeoParquetVersion::V2;
} else if (roption == "BOTH") {
bind_data->geoparquet_version = GeoParquetVersion::BOTH;
} else {
throw BinderException("Expected geoparquet_version 'NONE', 'V1' or 'BOTH'");
}
} else {
throw InternalException("Unrecognized option for PARQUET: %s", option.first.c_str());
}
Expand Down Expand Up @@ -457,7 +474,8 @@ static unique_ptr<GlobalFunctionData> ParquetWriteInitializeGlobal(ClientContext
parquet_bind.field_ids.Copy(), parquet_bind.kv_metadata, parquet_bind.encryption_config,
parquet_bind.dictionary_size_limit, parquet_bind.string_dictionary_page_size_limit,
parquet_bind.enable_bloom_filters, parquet_bind.bloom_filter_false_positive_ratio,
parquet_bind.compression_level, parquet_bind.debug_use_openssl, parquet_bind.parquet_version);
parquet_bind.compression_level, parquet_bind.debug_use_openssl, parquet_bind.parquet_version,
parquet_bind.geoparquet_version);
return std::move(global_state);
}

Expand Down Expand Up @@ -626,6 +644,39 @@ ParquetVersion EnumUtil::FromString<ParquetVersion>(const char *value) {
throw NotImplementedException(StringUtil::Format("Enum value: '%s' not implemented", value));
}

template <>
const char *EnumUtil::ToChars<GeoParquetVersion>(GeoParquetVersion value) {
switch (value) {
case GeoParquetVersion::NONE:
return "NONE";
case GeoParquetVersion::V1:
return "V1";
case GeoParquetVersion::V2:
return "V2";
case GeoParquetVersion::BOTH:
return "BOTH";
default:
throw NotImplementedException(StringUtil::Format("Enum value: '%s' not implemented", value));
}
}

template <>
GeoParquetVersion EnumUtil::FromString<GeoParquetVersion>(const char *value) {
if (StringUtil::Equals(value, "NONE")) {
return GeoParquetVersion::NONE;
}
if (StringUtil::Equals(value, "V1")) {
return GeoParquetVersion::V1;
}
if (StringUtil::Equals(value, "V2")) {
return GeoParquetVersion::V2;
}
if (StringUtil::Equals(value, "BOTH")) {
return GeoParquetVersion::BOTH;
}
throw NotImplementedException(StringUtil::Format("Enum value: '%s' not implemented", value));
}

static optional_idx SerializeCompressionLevel(const int64_t compression_level) {
return compression_level < 0 ? NumericLimits<idx_t>::Maximum() - NumericCast<idx_t>(AbsValue(compression_level))
: NumericCast<idx_t>(compression_level);
Expand Down Expand Up @@ -679,6 +730,8 @@ static void ParquetCopySerialize(Serializer &serializer, const FunctionData &bin
serializer.WritePropertyWithDefault(115, "string_dictionary_page_size_limit",
bind_data.string_dictionary_page_size_limit,
default_value.string_dictionary_page_size_limit);
serializer.WritePropertyWithDefault(116, "geoparquet_version", bind_data.geoparquet_version,
default_value.geoparquet_version);
}

static unique_ptr<FunctionData> ParquetCopyDeserialize(Deserializer &deserializer, CopyFunction &function) {
Expand Down Expand Up @@ -711,6 +764,8 @@ static unique_ptr<FunctionData> ParquetCopyDeserialize(Deserializer &deserialize
deserializer.ReadPropertyWithExplicitDefault(114, "parquet_version", default_value.parquet_version);
data->string_dictionary_page_size_limit = deserializer.ReadPropertyWithExplicitDefault(
115, "string_dictionary_page_size_limit", default_value.string_dictionary_page_size_limit);
data->geoparquet_version =
deserializer.ReadPropertyWithExplicitDefault(116, "geoparquet_version", default_value.geoparquet_version);

return std::move(data);
}
Expand Down
23 changes: 15 additions & 8 deletions src/duckdb/extension/parquet/parquet_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -166,15 +166,16 @@ Type::type ParquetWriter::DuckDBTypeToParquetType(const LogicalType &duckdb_type
throw NotImplementedException("Unimplemented type for Parquet \"%s\"", duckdb_type.ToString());
}

void ParquetWriter::SetSchemaProperties(const LogicalType &duckdb_type, duckdb_parquet::SchemaElement &schema_ele) {
void ParquetWriter::SetSchemaProperties(const LogicalType &duckdb_type, duckdb_parquet::SchemaElement &schema_ele,
bool allow_geometry) {
if (duckdb_type.IsJSONType()) {
schema_ele.converted_type = ConvertedType::JSON;
schema_ele.__isset.converted_type = true;
schema_ele.__isset.logicalType = true;
schema_ele.logicalType.__set_JSON(duckdb_parquet::JsonType());
return;
}
if (duckdb_type.GetAlias() == "WKB_BLOB") {
if (duckdb_type.GetAlias() == "WKB_BLOB" && allow_geometry) {
schema_ele.__isset.logicalType = true;
schema_ele.logicalType.__isset.GEOMETRY = true;
// TODO: Set CRS in the future
Expand Down Expand Up @@ -356,14 +357,16 @@ ParquetWriter::ParquetWriter(ClientContext &context, FileSystem &fs, string file
shared_ptr<ParquetEncryptionConfig> encryption_config_p,
optional_idx dictionary_size_limit_p, idx_t string_dictionary_page_size_limit_p,
bool enable_bloom_filters_p, double bloom_filter_false_positive_ratio_p,
int64_t compression_level_p, bool debug_use_openssl_p, ParquetVersion parquet_version)
int64_t compression_level_p, bool debug_use_openssl_p, ParquetVersion parquet_version,
GeoParquetVersion geoparquet_version)
: context(context), file_name(std::move(file_name_p)), sql_types(std::move(types_p)),
column_names(std::move(names_p)), codec(codec), field_ids(std::move(field_ids_p)),
encryption_config(std::move(encryption_config_p)), dictionary_size_limit(dictionary_size_limit_p),
string_dictionary_page_size_limit(string_dictionary_page_size_limit_p),
enable_bloom_filters(enable_bloom_filters_p),
bloom_filter_false_positive_ratio(bloom_filter_false_positive_ratio_p), compression_level(compression_level_p),
debug_use_openssl(debug_use_openssl_p), parquet_version(parquet_version), total_written(0), num_row_groups(0) {
debug_use_openssl(debug_use_openssl_p), parquet_version(parquet_version), geoparquet_version(geoparquet_version),
total_written(0), num_row_groups(0) {

// initialize the file writer
writer = make_uniq<BufferedFileWriter>(fs, file_name.c_str(),
Expand Down Expand Up @@ -416,10 +419,13 @@ ParquetWriter::ParquetWriter(ClientContext &context, FileSystem &fs, string file
auto &unique_names = column_names;
VerifyUniqueNames(unique_names);

// V1 GeoParquet stores geometries as blobs, no logical type
auto allow_geometry = geoparquet_version != GeoParquetVersion::V1;

// construct the child schemas
for (idx_t i = 0; i < sql_types.size(); i++) {
auto child_schema =
ColumnWriter::FillParquetSchema(file_meta_data.schema, sql_types[i], unique_names[i], &field_ids);
auto child_schema = ColumnWriter::FillParquetSchema(file_meta_data.schema, sql_types[i], unique_names[i],
allow_geometry, &field_ids);
column_schemas.push_back(std::move(child_schema));
}
// now construct the writers based on the schemas
Expand Down Expand Up @@ -975,7 +981,8 @@ void ParquetWriter::Finalize() {
}

// Add geoparquet metadata to the file metadata
if (geoparquet_data && GeoParquetFileMetadata::IsGeoParquetConversionEnabled(context)) {
if (geoparquet_data && GeoParquetFileMetadata::IsGeoParquetConversionEnabled(context) &&
geoparquet_version != GeoParquetVersion::NONE) {
geoparquet_data->Write(file_meta_data);
}

Expand Down Expand Up @@ -1005,7 +1012,7 @@ void ParquetWriter::Finalize() {

GeoParquetFileMetadata &ParquetWriter::GetGeoParquetData() {
if (!geoparquet_data) {
geoparquet_data = make_uniq<GeoParquetFileMetadata>();
geoparquet_data = make_uniq<GeoParquetFileMetadata>(geoparquet_version);
}
return *geoparquet_data;
}
Expand Down
Loading