Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 10 additions & 6 deletions src/duckdb/extension/icu/icu-timezone.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -185,16 +185,20 @@ struct ICUFromNaiveTimestamp : public ICUDateFunc {
}
}

static void AddCast(CastFunctionSet &casts, const LogicalType &source, const LogicalType &target) {
const auto implicit_cost = CastRules::ImplicitCast(source, target);
casts.RegisterCastFunction(source, target, BindCastFromNaive, implicit_cost);
}

static void AddCasts(DatabaseInstance &db) {
auto &config = DBConfig::GetConfig(db);
auto &casts = config.GetCastFunctions();

const auto implicit_cost = CastRules::ImplicitCast(LogicalType::TIMESTAMP, LogicalType::TIMESTAMP_TZ);
casts.RegisterCastFunction(LogicalType::TIMESTAMP, LogicalType::TIMESTAMP_TZ, BindCastFromNaive, implicit_cost);
casts.RegisterCastFunction(LogicalType::TIMESTAMP_MS, LogicalType::TIMESTAMP_TZ, BindCastFromNaive);
casts.RegisterCastFunction(LogicalType::TIMESTAMP_NS, LogicalType::TIMESTAMP_TZ, BindCastFromNaive);
casts.RegisterCastFunction(LogicalType::TIMESTAMP_S, LogicalType::TIMESTAMP_TZ, BindCastFromNaive);
casts.RegisterCastFunction(LogicalType::DATE, LogicalType::TIMESTAMP_TZ, BindCastFromNaive);
AddCast(casts, LogicalType::TIMESTAMP, LogicalType::TIMESTAMP_TZ);
AddCast(casts, LogicalType::TIMESTAMP_MS, LogicalType::TIMESTAMP_TZ);
AddCast(casts, LogicalType::TIMESTAMP_NS, LogicalType::TIMESTAMP_TZ);
AddCast(casts, LogicalType::TIMESTAMP_S, LogicalType::TIMESTAMP_TZ);
AddCast(casts, LogicalType::DATE, LogicalType::TIMESTAMP_TZ);
}
};

Expand Down
2 changes: 2 additions & 0 deletions src/duckdb/extension/parquet/column_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -763,6 +763,8 @@ unique_ptr<ColumnReader> CreateDecimalReader(ParquetReader &reader, const Parque
return make_uniq<TemplatedColumnReader<int32_t, TemplatedParquetValueConversion<T>>>(reader, schema);
case PhysicalType::INT64:
return make_uniq<TemplatedColumnReader<int64_t, TemplatedParquetValueConversion<T>>>(reader, schema);
case PhysicalType::INT128:
return make_uniq<TemplatedColumnReader<hugeint_t, TemplatedParquetValueConversion<T>>>(reader, schema);
default:
throw NotImplementedException("Unimplemented internal type for CreateDecimalReader");
}
Expand Down
9 changes: 7 additions & 2 deletions src/duckdb/extension/parquet/include/parquet_writer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,9 @@ class ParquetWriter {
vector<string> names, duckdb_parquet::CompressionCodec::type codec, ChildFieldIDs field_ids,
const vector<pair<string, string>> &kv_metadata,
shared_ptr<ParquetEncryptionConfig> encryption_config, idx_t dictionary_size_limit,
idx_t string_dictionary_page_size_limit, double bloom_filter_false_positive_ratio,
int64_t compression_level, bool debug_use_openssl, ParquetVersion parquet_version);
idx_t string_dictionary_page_size_limit, bool enable_bloom_filters,
double bloom_filter_false_positive_ratio, int64_t compression_level, bool debug_use_openssl,
ParquetVersion parquet_version);
~ParquetWriter();

public:
Expand Down Expand Up @@ -122,6 +123,9 @@ class ParquetWriter {
idx_t StringDictionaryPageSizeLimit() const {
return string_dictionary_page_size_limit;
}
double EnableBloomFilters() const {
return enable_bloom_filters;
}
double BloomFilterFalsePositiveRatio() const {
return bloom_filter_false_positive_ratio;
}
Expand Down Expand Up @@ -164,6 +168,7 @@ class ParquetWriter {
shared_ptr<ParquetEncryptionConfig> encryption_config;
idx_t dictionary_size_limit;
idx_t string_dictionary_page_size_limit;
bool enable_bloom_filters;
double bloom_filter_false_positive_ratio;
int64_t compression_level;
bool debug_use_openssl;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -284,15 +284,19 @@ class StandardColumnWriter : public PrimitiveColumnWriter {
auto &state = state_p.Cast<StandardColumnWriterState<SRC, TGT, OP>>();
D_ASSERT(state.encoding == duckdb_parquet::Encoding::RLE_DICTIONARY);

state.bloom_filter =
make_uniq<ParquetBloomFilter>(state.dictionary.GetSize(), writer.BloomFilterFalsePositiveRatio());
if (writer.EnableBloomFilters()) {
state.bloom_filter =
make_uniq<ParquetBloomFilter>(state.dictionary.GetSize(), writer.BloomFilterFalsePositiveRatio());
}

state.dictionary.IterateValues([&](const SRC &src_value, const TGT &tgt_value) {
// update the statistics
OP::template HandleStats<SRC, TGT>(stats, tgt_value);
// update the bloom filter
auto hash = OP::template XXHash64<SRC, TGT>(tgt_value);
state.bloom_filter->FilterInsert(hash);
if (state.bloom_filter) {
// update the bloom filter
auto hash = OP::template XXHash64<SRC, TGT>(tgt_value);
state.bloom_filter->FilterInsert(hash);
}
});

// flush the dictionary page and add it to the to-be-written pages
Expand Down
7 changes: 5 additions & 2 deletions src/duckdb/extension/parquet/parquet_extension.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,7 @@ struct ParquetWriteBindData : public TableFunctionData {
//! This is huge but we grow it starting from 1 MB
idx_t string_dictionary_page_size_limit = PrimitiveColumnWriter::MAX_UNCOMPRESSED_DICT_PAGE_SIZE;

bool enable_bloom_filters = true;
//! What false positive rate are we willing to accept for bloom filters
double bloom_filter_false_positive_ratio = 0.01;

Expand Down Expand Up @@ -373,6 +374,8 @@ unique_ptr<FunctionData> ParquetWriteBind(ClientContext &context, CopyFunctionBi
PrimitiveColumnWriter::MAX_UNCOMPRESSED_DICT_PAGE_SIZE);
}
bind_data->string_dictionary_page_size_limit = val;
} else if (loption == "write_bloom_filter") {
bind_data->enable_bloom_filters = BooleanValue::Get(option.second[0].DefaultCastAs(LogicalType::BOOLEAN));
} else if (loption == "bloom_filter_false_positive_ratio") {
auto val = option.second[0].GetValue<double>();
if (val <= 0) {
Expand Down Expand Up @@ -436,8 +439,8 @@ unique_ptr<GlobalFunctionData> ParquetWriteInitializeGlobal(ClientContext &conte
context, fs, file_path, parquet_bind.sql_types, parquet_bind.column_names, parquet_bind.codec,
parquet_bind.field_ids.Copy(), parquet_bind.kv_metadata, parquet_bind.encryption_config,
parquet_bind.dictionary_size_limit, parquet_bind.string_dictionary_page_size_limit,
parquet_bind.bloom_filter_false_positive_ratio, parquet_bind.compression_level, parquet_bind.debug_use_openssl,
parquet_bind.parquet_version);
parquet_bind.enable_bloom_filters, parquet_bind.bloom_filter_false_positive_ratio,
parquet_bind.compression_level, parquet_bind.debug_use_openssl, parquet_bind.parquet_version);
return std::move(global_state);
}

Expand Down
6 changes: 4 additions & 2 deletions src/duckdb/extension/parquet/parquet_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -345,12 +345,14 @@ ParquetWriter::ParquetWriter(ClientContext &context, FileSystem &fs, string file
vector<string> names_p, CompressionCodec::type codec, ChildFieldIDs field_ids_p,
const vector<pair<string, string>> &kv_metadata,
shared_ptr<ParquetEncryptionConfig> encryption_config_p, idx_t dictionary_size_limit_p,
idx_t string_dictionary_page_size_limit_p, double bloom_filter_false_positive_ratio_p,
int64_t compression_level_p, bool debug_use_openssl_p, ParquetVersion parquet_version)
idx_t string_dictionary_page_size_limit_p, bool enable_bloom_filters_p,
double bloom_filter_false_positive_ratio_p, int64_t compression_level_p,
bool debug_use_openssl_p, ParquetVersion parquet_version)
: context(context), file_name(std::move(file_name_p)), sql_types(std::move(types_p)),
column_names(std::move(names_p)), codec(codec), field_ids(std::move(field_ids_p)),
encryption_config(std::move(encryption_config_p)), dictionary_size_limit(dictionary_size_limit_p),
string_dictionary_page_size_limit(string_dictionary_page_size_limit_p),
enable_bloom_filters(enable_bloom_filters_p),
bloom_filter_false_positive_ratio(bloom_filter_false_positive_ratio_p), compression_level(compression_level_p),
debug_use_openssl(debug_use_openssl_p), parquet_version(parquet_version), total_written(0), num_row_groups(0) {

Expand Down
2 changes: 1 addition & 1 deletion src/duckdb/src/catalog/catalog.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -662,7 +662,7 @@ CatalogException Catalog::CreateMissingEntryException(CatalogEntryRetriever &ret
break;
}
auto &catalog = database.get().GetCatalog();
auto current_schemas = catalog.GetAllSchemas(context);
auto current_schemas = catalog.GetSchemas(context);
for (auto &current_schema : current_schemas) {
if (unseen_schemas.size() >= max_schema_count) {
break;
Expand Down
6 changes: 4 additions & 2 deletions src/duckdb/src/common/arrow/arrow_type_extension.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -358,7 +358,9 @@ struct ArrowBool8 {
auto source_ptr = reinterpret_cast<bool *>(format.data);
auto result_ptr = reinterpret_cast<int8_t *>(FlatVector::GetData(result));
for (idx_t i = 0; i < count; i++) {
result_ptr[i] = static_cast<int8_t>(source_ptr[i]);
if (format.validity.RowIsValid(i)) {
result_ptr[i] = static_cast<int8_t>(source_ptr[i]);
}
}
}
};
Expand All @@ -380,7 +382,7 @@ void ArrowTypeExtensionSet::Initialize(const DBConfig &config) {

// Types that are 1:n
config.RegisterArrowExtension({"arrow.json", &ArrowJson::PopulateSchema, &ArrowJson::GetType,
make_shared_ptr<ArrowTypeExtensionData>(LogicalType::VARCHAR)});
make_shared_ptr<ArrowTypeExtensionData>(LogicalType::JSON())});

config.RegisterArrowExtension({"DuckDB", "bit", &ArrowBit::PopulateSchema, &ArrowBit::GetType,
make_shared_ptr<ArrowTypeExtensionData>(LogicalType::BIT), nullptr, nullptr});
Expand Down
7 changes: 7 additions & 0 deletions src/duckdb/src/common/error_data.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,13 @@ string ErrorData::ConstructFinalMessage() const {
error += "\nThis error signals an assertion failure within DuckDB. This usually occurs due to "
"unexpected conditions or errors in the program's logic.\nFor more information, see "
"https://duckdb.org/docs/stable/dev/internal_errors";

// Ensure that we print the stack trace for internal exceptions.
auto entry = extra_info.find("stack_trace_pointers");
if (entry != extra_info.end()) {
auto stack_trace = StackTrace::ResolveStacktraceSymbols(entry->second);
error += "\n\nStack Trace:\n" + stack_trace;
}
}
return error;
}
Expand Down
3 changes: 3 additions & 0 deletions src/duckdb/src/common/operator/string_cast.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,9 @@ duckdb::string_t StringFromTimestamp(timestamp_t input, Vector &vector) {
idx_t nano_length = 0;
if (picos) {
// If there are ps, we need all the µs
if (!time[3]) {
TimeToStringCast::FormatMicros(time[3], micro_buffer);
}
time_length = 15;
nano_length = 6;
nano_length -= NumericCast<idx_t>(TimeToStringCast::FormatMicros(picos, nano_buffer));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,11 @@ idx_t CSVIterator::BytesPerThread(const CSVReaderOptions &reader_options) {
// If we are setting up the buffer size directly, we must make sure each thread will read the full buffer.
return max_row_size;
}
if (bytes_per_thread == 0) {
// Bytes per thread can never be zero, but it might happen if max_row_size = 0
// Not sure why a human being would do that...
return 1;
}
return bytes_per_thread;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1373,7 +1373,7 @@ void StringValueScanner::ProcessOverBufferValue() {
result.escaped = true;
}
if (states.IsComment()) {
result.comment = true;
result.SetComment(result, j);
}
if (states.IsInvalid()) {
result.InvalidState(result);
Expand Down Expand Up @@ -1435,7 +1435,7 @@ void StringValueScanner::ProcessOverBufferValue() {
result.SetQuoted(result, j);
}
if (states.IsComment()) {
result.comment = true;
result.SetComment(result, j);
}
if (states.IsEscaped() && result.state_machine.dialect_options.state_machine_options.escape != '\0') {
result.escaped = true;
Expand Down
12 changes: 10 additions & 2 deletions src/duckdb/src/execution/operator/join/physical_iejoin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,10 @@ SinkResultType PhysicalIEJoin::Sink(ExecutionContext &context, DataChunk &chunk,
auto &gstate = input.global_state.Cast<IEJoinGlobalState>();
auto &lstate = input.local_state.Cast<IEJoinLocalState>();

if (gstate.child == 0 && gstate.tables[1]->global_sort_state.sorted_blocks.empty() && EmptyResultIfRHSIsEmpty()) {
return SinkResultType::FINISHED;
}

gstate.Sink(chunk, lstate);

if (filter_pushdown && !gstate.skip_filter_pushdown) {
Expand Down Expand Up @@ -207,15 +211,19 @@ SinkFinalizeType PhysicalIEJoin::Finalize(Pipeline &pipeline, Event &event, Clie
// for FULL/LEFT/RIGHT OUTER JOIN, initialize found_match to false for every tuple
table.IntializeMatches();
}

SinkFinalizeType res;
if (gstate.child == 1 && global_sort_state.sorted_blocks.empty() && EmptyResultIfRHSIsEmpty()) {
// Empty input!
return SinkFinalizeType::NO_OUTPUT_POSSIBLE;
res = SinkFinalizeType::NO_OUTPUT_POSSIBLE;
} else {
res = SinkFinalizeType::READY;
}

// Move to the next input child
gstate.Finalize(pipeline, event);

return SinkFinalizeType::READY;
return res;
}

//===--------------------------------------------------------------------===//
Expand Down
9 changes: 7 additions & 2 deletions src/duckdb/src/execution/physical_plan/plan_asof_join.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,13 @@ PhysicalPlanGenerator::PlanAsOfLoopJoin(LogicalComparisonJoin &op, PhysicalOpera
asof_idx = i;
arg_min_max = "arg_min";
break;
default:
case ExpressionType::COMPARE_EQUAL:
case ExpressionType::COMPARE_NOTEQUAL:
case ExpressionType::COMPARE_DISTINCT_FROM:
break;
default:
// Unsupported NLJ comparison
return nullptr;
}
}

Expand Down Expand Up @@ -271,7 +276,7 @@ PhysicalOperator &PhysicalPlanGenerator::PlanAsOfJoin(LogicalComparisonJoin &op)

auto &config = ClientConfig::GetConfig(context);
if (!config.force_asof_iejoin) {
if (op.children[0]->has_estimated_cardinality && lhs_cardinality <= config.asof_loop_join_threshold) {
if (op.children[0]->has_estimated_cardinality && lhs_cardinality < config.asof_loop_join_threshold) {
auto result = PlanAsOfLoopJoin(op, left, right);
if (result) {
return *result;
Expand Down
2 changes: 1 addition & 1 deletion src/duckdb/src/function/function_list.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,8 @@ static const StaticFunctionDefinition function[] = {
DUCKDB_SCALAR_FUNCTION_SET(InternalCompressIntegralUintegerFun),
DUCKDB_SCALAR_FUNCTION_SET(InternalCompressIntegralUsmallintFun),
DUCKDB_SCALAR_FUNCTION_SET(InternalCompressIntegralUtinyintFun),
DUCKDB_SCALAR_FUNCTION(InternalCompressStringHugeintFun),
DUCKDB_SCALAR_FUNCTION(InternalCompressStringUbigintFun),
DUCKDB_SCALAR_FUNCTION(InternalCompressStringUhugeintFun),
DUCKDB_SCALAR_FUNCTION(InternalCompressStringUintegerFun),
DUCKDB_SCALAR_FUNCTION(InternalCompressStringUsmallintFun),
DUCKDB_SCALAR_FUNCTION(InternalCompressStringUtinyintFun),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,8 +93,8 @@ static scalar_function_t GetStringCompressFunctionSwitch(const LogicalType &resu
return GetStringCompressFunction<uint32_t>(result_type);
case LogicalTypeId::UBIGINT:
return GetStringCompressFunction<uint64_t>(result_type);
case LogicalTypeId::HUGEINT:
return GetStringCompressFunction<hugeint_t>(result_type);
case LogicalTypeId::UHUGEINT:
return GetStringCompressFunction<uhugeint_t>(result_type);
default:
throw InternalException("Unexpected type in GetStringCompressFunctionSwitch");
}
Expand Down Expand Up @@ -189,8 +189,8 @@ static scalar_function_t GetStringDecompressFunctionSwitch(const LogicalType &in
return GetStringDecompressFunction<uint32_t>(input_type);
case LogicalTypeId::UBIGINT:
return GetStringDecompressFunction<uint64_t>(input_type);
case LogicalTypeId::HUGEINT:
return GetStringDecompressFunction<hugeint_t>(input_type);
case LogicalTypeId::UHUGEINT:
return GetStringDecompressFunction<uhugeint_t>(input_type);
default:
throw InternalException("Unexpected type in GetStringDecompressFunctionSwitch");
}
Expand Down Expand Up @@ -262,8 +262,8 @@ ScalarFunction InternalCompressStringUbigintFun::GetFunction() {
return CMStringCompressFun::GetFunction(LogicalType(LogicalTypeId::UBIGINT));
}

ScalarFunction InternalCompressStringHugeintFun::GetFunction() {
return CMStringCompressFun::GetFunction(LogicalType(LogicalTypeId::HUGEINT));
ScalarFunction InternalCompressStringUhugeintFun::GetFunction() {
return CMStringCompressFun::GetFunction(LogicalType(LogicalTypeId::UHUGEINT));
}

ScalarFunctionSet InternalDecompressStringFun::GetFunctions() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ const vector<LogicalType> CMUtils::IntegralTypes() {

const vector<LogicalType> CMUtils::StringTypes() {
return {LogicalType::UTINYINT, LogicalType::USMALLINT, LogicalType::UINTEGER, LogicalType::UBIGINT,
LogicalType::HUGEINT};
LogicalType::UHUGEINT};
}

// LCOV_EXCL_START
Expand Down
Loading
Loading