Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,7 @@ set(DUCKDB_SRC_FILES
src/duckdb/ub_src_function.cpp
src/duckdb/ub_src_function_cast.cpp
src/duckdb/ub_src_function_cast_union.cpp
src/duckdb/ub_src_function_cast_variant.cpp
src/duckdb/ub_src_function_pragma.cpp
src/duckdb/ub_src_function_scalar_compressed_materialization.cpp
src/duckdb/ub_src_function_scalar.cpp
Expand All @@ -171,6 +172,7 @@ set(DUCKDB_SRC_FILES
src/duckdb/ub_src_function_scalar_string_regexp.cpp
src/duckdb/ub_src_function_scalar_struct.cpp
src/duckdb/ub_src_function_scalar_system.cpp
src/duckdb/ub_src_function_scalar_variant.cpp
src/duckdb/ub_src_function_table_arrow.cpp
src/duckdb/ub_src_function_table.cpp
src/duckdb/ub_src_function_table_system.cpp
Expand Down
8 changes: 5 additions & 3 deletions src/duckdb/extension/core_functions/scalar/string/instr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,11 @@ static unique_ptr<BaseStatistics> InStrPropagateStats(ClientContext &context, Fu
}

ScalarFunction InstrFun::GetFunction() {
return ScalarFunction({LogicalType::VARCHAR, LogicalType::VARCHAR}, LogicalType::BIGINT,
ScalarFunction::BinaryFunction<string_t, string_t, int64_t, InstrOperator>, nullptr, nullptr,
InStrPropagateStats);
auto function = ScalarFunction({LogicalType::VARCHAR, LogicalType::VARCHAR}, LogicalType::BIGINT,
ScalarFunction::BinaryFunction<string_t, string_t, int64_t, InstrOperator>, nullptr,
nullptr, InStrPropagateStats);
function.collation_handling = FunctionCollationHandling::PUSH_COMBINABLE_COLLATIONS;
return function;
}

} // namespace duckdb
12 changes: 10 additions & 2 deletions src/duckdb/extension/json/include/json_common.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -353,11 +353,19 @@ struct JSONCommon {

template <>
inline char *JSONCommon::WriteVal(yyjson_val *val, yyjson_alc *alc, idx_t &len) {
return yyjson_val_write_opts(val, JSONCommon::WRITE_FLAG, alc, reinterpret_cast<size_t *>(&len), nullptr);
size_t len_size_t;
// yyjson_val_write_opts must not throw
auto ret = yyjson_val_write_opts(val, JSONCommon::WRITE_FLAG, alc, &len_size_t, nullptr);
len = len_size_t;
return ret;
}
template <>
inline char *JSONCommon::WriteVal(yyjson_mut_val *val, yyjson_alc *alc, idx_t &len) {
return yyjson_mut_val_write_opts(val, JSONCommon::WRITE_FLAG, alc, reinterpret_cast<size_t *>(&len), nullptr);
size_t len_size_t;
// yyjson_mut_val_write_opts must not throw
auto ret = yyjson_mut_val_write_opts(val, JSONCommon::WRITE_FLAG, alc, &len_size_t, nullptr);
len = len_size_t;
return ret;
}

struct yyjson_doc_deleter {
Expand Down
11 changes: 8 additions & 3 deletions src/duckdb/extension/json/json_functions/copy_json.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,13 @@ static void ThrowJSONCopyParameterException(const string &loption) {
}

static BoundStatement CopyToJSONPlan(Binder &binder, CopyStatement &stmt) {
static const unordered_set<string> SUPPORTED_BASE_OPTIONS {
"compression", "encoding", "use_tmp_file", "overwrite_or_ignore", "overwrite", "append", "filename_pattern",
"file_extension", "per_thread_output", "file_size_bytes",
// "partition_by", unsupported
"return_files", "preserve_order", "return_stats", "write_partition_columns", "write_empty_file",
"hive_file_pattern"};

auto stmt_copy = stmt.Copy();
auto &copy = stmt_copy->Cast<CopyStatement>();
auto &copied_info = *copy.info;
Expand Down Expand Up @@ -48,9 +55,7 @@ static BoundStatement CopyToJSONPlan(Binder &binder, CopyStatement &stmt) {
csv_copy_options["suffix"] = {"\n]\n"};
csv_copy_options["new_line"] = {",\n\t"};
}
} else if (loption == "compression" || loption == "encoding" || loption == "per_thread_output" ||
loption == "file_size_bytes" || loption == "use_tmp_file" || loption == "overwrite_or_ignore" ||
loption == "filename_pattern" || loption == "file_extension") {
} else if (SUPPORTED_BASE_OPTIONS.find(loption) != SUPPORTED_BASE_OPTIONS.end()) {
// We support these base options
csv_copy_options.insert(kv);
} else {
Expand Down
1 change: 1 addition & 0 deletions src/duckdb/extension/json/json_functions/json_create.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -608,6 +608,7 @@ static void CreateValues(const StructNames &names, yyjson_mut_doc *doc, yyjson_m
case LogicalTypeId::ANY:
case LogicalTypeId::USER:
case LogicalTypeId::TEMPLATE:
case LogicalTypeId::VARIANT:
case LogicalTypeId::CHAR:
case LogicalTypeId::STRING_LITERAL:
case LogicalTypeId::INTEGER_LITERAL:
Expand Down
6 changes: 3 additions & 3 deletions src/duckdb/extension/json/json_functions/json_pretty.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@ namespace duckdb {
//! Pretty Print a given JSON Document
string_t PrettyPrint(yyjson_val *val, yyjson_alc *alc, Vector &, ValidityMask &, idx_t) {
D_ASSERT(alc);
idx_t len;
auto data =
yyjson_val_write_opts(val, JSONCommon::WRITE_PRETTY_FLAG, alc, reinterpret_cast<size_t *>(&len), nullptr);
size_t len_size_t;
auto data = yyjson_val_write_opts(val, JSONCommon::WRITE_PRETTY_FLAG, alc, &len_size_t, nullptr);
idx_t len = len_size_t;
return string_t(data, len);
}

Expand Down
10 changes: 6 additions & 4 deletions src/duckdb/extension/json/json_functions/json_serialize_plan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -162,10 +162,11 @@ static void JsonSerializePlanFunction(DataChunk &args, ExpressionState &state, V
yyjson_mut_obj_add_false(doc, result_obj, "error");
yyjson_mut_obj_add_val(doc, result_obj, "plans", plans_arr);

idx_t len;
size_t len_size_t;
auto data = yyjson_mut_val_write_opts(result_obj,
info.format ? JSONCommon::WRITE_PRETTY_FLAG : JSONCommon::WRITE_FLAG,
alc, reinterpret_cast<size_t *>(&len), nullptr);
alc, &len_size_t, nullptr);
idx_t len = len_size_t;
if (data == nullptr) {
throw SerializationException(
"Failed to serialize json, perhaps the query contains invalid utf8 characters?");
Expand All @@ -185,10 +186,11 @@ static void JsonSerializePlanFunction(DataChunk &args, ExpressionState &state, V
yyjson_mut_obj_add_strcpy(doc, result_obj, entry.first.c_str(), entry.second.c_str());
}

idx_t len;
size_t len_size_t;
auto data = yyjson_mut_val_write_opts(result_obj,
info.format ? JSONCommon::WRITE_PRETTY_FLAG : JSONCommon::WRITE_FLAG,
alc, reinterpret_cast<size_t *>(&len), nullptr);
alc, &len_size_t, nullptr);
idx_t len = len_size_t;
return StringVector::AddString(result, data, len);
}
});
Expand Down
10 changes: 6 additions & 4 deletions src/duckdb/extension/json/json_functions/json_serialize_sql.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -114,10 +114,11 @@ static void JsonSerializeFunction(DataChunk &args, ExpressionState &state, Vecto

yyjson_mut_obj_add_false(doc, result_obj, "error");
yyjson_mut_obj_add_val(doc, result_obj, "statements", statements_arr);
idx_t len;
size_t len_size_t;
auto data = yyjson_mut_val_write_opts(result_obj,
info.format ? JSONCommon::WRITE_PRETTY_FLAG : JSONCommon::WRITE_FLAG,
alc, reinterpret_cast<size_t *>(&len), nullptr);
alc, &len_size_t, nullptr);
idx_t len = len_size_t;
if (data == nullptr) {
throw SerializationException(
"Failed to serialize json, perhaps the query contains invalid utf8 characters?");
Expand All @@ -135,10 +136,11 @@ static void JsonSerializeFunction(DataChunk &args, ExpressionState &state, Vecto
yyjson_mut_obj_add_strcpy(doc, result_obj, entry.first.c_str(), entry.second.c_str());
}

idx_t len;
size_t len_size_t;
auto data = yyjson_mut_val_write_opts(result_obj,
info.format ? JSONCommon::WRITE_PRETTY_FLAG : JSONCommon::WRITE_FLAG,
alc, reinterpret_cast<size_t *>(&len), nullptr);
alc, &len_size_t, nullptr);
idx_t len = len_size_t;
return StringVector::AddString(result, data, len);
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,11 @@ static void InitializeLocalState(JSONTableInOutLocalState &lstate, DataChunk &in
// Parse path, default to root if not given
Value path_value("$");
if (input.data.size() > 1) {
path_value = ConstantVector::GetData<string_t>(input.data[1])[0];
auto &path_vector = input.data[1];
if (ConstantVector::IsNull(path_vector)) {
return;
}
path_value = ConstantVector::GetData<string_t>(path_vector)[0];
}

if (JSONReadFunctionData::CheckPath(path_value, lstate.path, lstate.len) == JSONCommon::JSONPathType::WILDCARD) {
Expand Down
48 changes: 31 additions & 17 deletions src/duckdb/extension/parquet/column_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,8 @@ void ColumnReader::InitializeRead(idx_t row_group_idx_p, const vector<ColumnChun
D_ASSERT(chunk->__isset.meta_data);

if (chunk->__isset.file_path) {
throw std::runtime_error("Only inlined data files are supported (no references)");
throw InvalidInputException("Failed to read file \"%s\": Only inlined data files are supported (no references)",
Reader().GetFileName());
}

// ugh. sometimes there is an extra offset for the dict. sometimes it's wrong.
Expand Down Expand Up @@ -252,7 +253,7 @@ void ColumnReader::PrepareRead(optional_ptr<const TableFilter> filter, optional_
}
// some basic sanity check
if (page_hdr.compressed_page_size < 0 || page_hdr.uncompressed_page_size < 0) {
throw std::runtime_error("Page sizes can't be < 0");
throw InvalidInputException("Failed to read file \"%s\": Page sizes can't be < 0", Reader().GetFileName());
}

if (PageIsFilteredOut(page_hdr)) {
Expand All @@ -273,7 +274,8 @@ void ColumnReader::PrepareRead(optional_ptr<const TableFilter> filter, optional_
PreparePage(page_hdr);
auto dictionary_size = page_hdr.dictionary_page_header.num_values;
if (dictionary_size < 0) {
throw std::runtime_error("Invalid dictionary page header (num_values < 0)");
throw InvalidInputException("Failed to read file \"%s\": Invalid dictionary page header (num_values < 0)",
Reader().GetFileName());
}
dictionary_decoder.InitializeDictionary(dictionary_size, filter, filter_state, HasDefines());
break;
Expand All @@ -297,7 +299,7 @@ void ColumnReader::PreparePageV2(PageHeader &page_hdr) {
}
if (chunk->meta_data.codec == CompressionCodec::UNCOMPRESSED) {
if (page_hdr.compressed_page_size != page_hdr.uncompressed_page_size) {
throw std::runtime_error("Page size mismatch");
throw InvalidInputException("Failed to read file \"%s\": Page size mismatch", Reader().GetFileName());
}
uncompressed = true;
}
Expand All @@ -310,8 +312,10 @@ void ColumnReader::PreparePageV2(PageHeader &page_hdr) {
auto uncompressed_bytes = page_hdr.data_page_header_v2.repetition_levels_byte_length +
page_hdr.data_page_header_v2.definition_levels_byte_length;
if (uncompressed_bytes > page_hdr.uncompressed_page_size) {
throw std::runtime_error("Page header inconsistency, uncompressed_page_size needs to be larger than "
"repetition_levels_byte_length + definition_levels_byte_length");
throw InvalidInputException(
"Failed to read file \"%s\": header inconsistency, uncompressed_page_size needs to be larger than "
"repetition_levels_byte_length + definition_levels_byte_length",
Reader().GetFileName());
}
reader.ReadData(*protocol, block->ptr, uncompressed_bytes);

Expand Down Expand Up @@ -368,7 +372,8 @@ void ColumnReader::DecompressInternal(CompressionCodec::type codec, const_data_p
duckdb_lz4::LZ4_decompress_safe(const_char_ptr_cast(src), char_ptr_cast(dst),
UnsafeNumericCast<int32_t>(src_size), UnsafeNumericCast<int32_t>(dst_size));
if (res != NumericCast<int>(dst_size)) {
throw std::runtime_error("LZ4 decompression failure");
throw InvalidInputException("Failed to read file \"%s\": LZ4 decompression failure",
Reader().GetFileName());
}
break;
}
Expand All @@ -377,22 +382,27 @@ void ColumnReader::DecompressInternal(CompressionCodec::type codec, const_data_p
size_t uncompressed_size = 0;
auto res = duckdb_snappy::GetUncompressedLength(const_char_ptr_cast(src), src_size, &uncompressed_size);
if (!res) {
throw std::runtime_error("Snappy decompression failure");
throw InvalidInputException("Failed to read file \"%s\": Snappy decompression failure",
Reader().GetFileName());
}
if (uncompressed_size != dst_size) {
throw std::runtime_error("Snappy decompression failure: Uncompressed data size mismatch");
throw InvalidInputException(
"Failed to read file \"%s\": Snappy decompression failure: Uncompressed data size mismatch",
Reader().GetFileName());
}
}
auto res = duckdb_snappy::RawUncompress(const_char_ptr_cast(src), src_size, char_ptr_cast(dst));
if (!res) {
throw std::runtime_error("Snappy decompression failure");
throw InvalidInputException("Failed to read file \"%s\": Snappy decompression failure",
Reader().GetFileName());
}
break;
}
case CompressionCodec::ZSTD: {
auto res = duckdb_zstd::ZSTD_decompress(dst, dst_size, src, src_size);
if (duckdb_zstd::ZSTD_isError(res) || res != dst_size) {
throw std::runtime_error("ZSTD Decompression failure");
throw InvalidInputException("Failed to read file \"%s\": ZSTD Decompression failure",
Reader().GetFileName());
}
break;
}
Expand All @@ -405,7 +415,8 @@ void ColumnReader::DecompressInternal(CompressionCodec::type codec, const_data_p
auto res = duckdb_brotli::BrotliDecoderDecompressStream(state, &src_size_size_t, &src, &dst_size_size_t, &dst,
&total_out);
if (res != duckdb_brotli::BROTLI_DECODER_RESULT_SUCCESS) {
throw std::runtime_error("Brotli Decompression failure");
throw InvalidInputException("Failed to read file \"%s\": Brotli Decompression failure",
Reader().GetFileName());
}
duckdb_brotli::BrotliDecoderDestroyInstance(state);
break;
Expand All @@ -414,18 +425,21 @@ void ColumnReader::DecompressInternal(CompressionCodec::type codec, const_data_p
default: {
duckdb::stringstream codec_name;
codec_name << codec;
throw std::runtime_error("Unsupported compression codec \"" + codec_name.str() +
"\". Supported options are uncompressed, brotli, gzip, lz4_raw, snappy or zstd");
throw InvalidInputException("Failed to read file \"%s\": Unsupported compression codec \"%s\". Supported "
"options are uncompressed, brotli, gzip, lz4_raw, snappy or zstd",
Reader().GetFileName(), codec_name.str());
}
}
}

void ColumnReader::PrepareDataPage(PageHeader &page_hdr) {
if (page_hdr.type == PageType::DATA_PAGE && !page_hdr.__isset.data_page_header) {
throw std::runtime_error("Missing data page header from data page");
throw InvalidInputException("Failed to read file \"%s\": Missing data page header from data page",
Reader().GetFileName());
}
if (page_hdr.type == PageType::DATA_PAGE_V2 && !page_hdr.__isset.data_page_header_v2) {
throw std::runtime_error("Missing data page header from data page v2");
throw InvalidInputException("Failed to read file \"%s\": Missing data page header from data page v2",
Reader().GetFileName());
}

bool is_v1 = page_hdr.type == PageType::DATA_PAGE;
Expand Down Expand Up @@ -492,7 +506,7 @@ void ColumnReader::PrepareDataPage(PageHeader &page_hdr) {
break;

default:
throw std::runtime_error("Unsupported page encoding");
throw InvalidInputException("Failed to read file \"%s\": Unsupported page encoding", Reader().GetFileName());
}
}

Expand Down
72 changes: 16 additions & 56 deletions src/duckdb/extension/parquet/column_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,18 @@ bool ColumnWriterStatistics::MaxIsExact() {
return true;
}

bool ColumnWriterStatistics::HasGeoStats() {
return false;
}

optional_ptr<GeometryStats> ColumnWriterStatistics::GetGeoStats() {
return nullptr;
}

void ColumnWriterStatistics::WriteGeoStats(duckdb_parquet::GeospatialStatistics &stats) {
D_ASSERT(false); // this should never be called
}

//===--------------------------------------------------------------------===//
// ColumnWriter
//===--------------------------------------------------------------------===//
Expand Down Expand Up @@ -220,59 +232,6 @@ void ColumnWriter::HandleDefineLevels(ColumnWriterState &state, ColumnWriterStat
}
}

//===--------------------------------------------------------------------===//
// WKB Column Writer
//===--------------------------------------------------------------------===//
// Used to store the metadata for a WKB-encoded geometry column when writing
// GeoParquet files.
class WKBColumnWriterState final : public StandardColumnWriterState<string_t, string_t, ParquetStringOperator> {
public:
WKBColumnWriterState(ParquetWriter &writer, duckdb_parquet::RowGroup &row_group, idx_t col_idx)
: StandardColumnWriterState(writer, row_group, col_idx), geo_data(), geo_data_writer(writer.GetContext()) {
}

GeoParquetColumnMetadata geo_data;
GeoParquetColumnMetadataWriter geo_data_writer;
};

class WKBColumnWriter final : public StandardColumnWriter<string_t, string_t, ParquetStringOperator> {
public:
WKBColumnWriter(ParquetWriter &writer, const ParquetColumnSchema &column_schema, vector<string> schema_path_p,
bool can_have_nulls, string name)
: StandardColumnWriter(writer, column_schema, std::move(schema_path_p), can_have_nulls),
column_name(std::move(name)) {

this->writer.GetGeoParquetData().RegisterGeometryColumn(column_name);
}

unique_ptr<ColumnWriterState> InitializeWriteState(duckdb_parquet::RowGroup &row_group) override {
auto result = make_uniq<WKBColumnWriterState>(writer, row_group, row_group.columns.size());
result->encoding = Encoding::RLE_DICTIONARY;
RegisterToRowGroup(row_group);
return std::move(result);
}

void Write(ColumnWriterState &state, Vector &vector, idx_t count) override {
StandardColumnWriter::Write(state, vector, count);

auto &geo_state = state.Cast<WKBColumnWriterState>();
geo_state.geo_data_writer.Update(geo_state.geo_data, vector, count);
}

void FinalizeWrite(ColumnWriterState &state) override {
StandardColumnWriter::FinalizeWrite(state);

// Add the geodata object to the writer
const auto &geo_state = state.Cast<WKBColumnWriterState>();

// Merge this state's geo column data with the writer's geo column data
writer.GetGeoParquetData().FlushColumnMeta(column_name, geo_state.geo_data);
}

private:
string column_name;
};

//===--------------------------------------------------------------------===//
// Create Column Writer
//===--------------------------------------------------------------------===//
Expand Down Expand Up @@ -470,9 +429,10 @@ ColumnWriter::CreateWriterRecursive(ClientContext &context, ParquetWriter &write
make_uniq<StructColumnWriter>(writer, schema, path_in_schema, std::move(child_writers), can_have_nulls);
return make_uniq<ListColumnWriter>(writer, schema, path_in_schema, std::move(struct_writer), can_have_nulls);
}
if (type.id() == LogicalTypeId::BLOB && type.GetAlias() == "WKB_BLOB" &&
GeoParquetFileMetadata::IsGeoParquetConversionEnabled(context)) {
return make_uniq<WKBColumnWriter>(writer, schema, std::move(path_in_schema), can_have_nulls, schema.name);

if (type.id() == LogicalTypeId::BLOB && type.GetAlias() == "WKB_BLOB") {
return make_uniq<StandardColumnWriter<string_t, string_t, ParquetGeometryOperator>>(
writer, schema, std::move(path_in_schema), can_have_nulls);
}

switch (type.id()) {
Expand Down
Loading
Loading