diff --git a/src/duckdb/extension/core_functions/function_list.cpp b/src/duckdb/extension/core_functions/function_list.cpp index f34b59188..a8ba52658 100644 --- a/src/duckdb/extension/core_functions/function_list.cpp +++ b/src/duckdb/extension/core_functions/function_list.cpp @@ -344,6 +344,7 @@ static const StaticFunctionDefinition core_functions[] = { DUCKDB_AGGREGATE_FUNCTION_SET(StringAggFun), DUCKDB_SCALAR_FUNCTION_ALIAS(StrposFun), DUCKDB_SCALAR_FUNCTION(StructInsertFun), + DUCKDB_SCALAR_FUNCTION(StructUpdateFun), DUCKDB_AGGREGATE_FUNCTION_SET(SumFun), DUCKDB_AGGREGATE_FUNCTION_SET(SumNoOverflowFun), DUCKDB_AGGREGATE_FUNCTION_ALIAS(SumkahanFun), diff --git a/src/duckdb/extension/core_functions/include/core_functions/scalar/struct_functions.hpp b/src/duckdb/extension/core_functions/include/core_functions/scalar/struct_functions.hpp index e409eaf1c..86c3188fe 100644 --- a/src/duckdb/extension/core_functions/include/core_functions/scalar/struct_functions.hpp +++ b/src/duckdb/extension/core_functions/include/core_functions/scalar/struct_functions.hpp @@ -25,4 +25,14 @@ struct StructInsertFun { static ScalarFunction GetFunction(); }; +struct StructUpdateFun { + static constexpr const char *Name = "struct_update"; + static constexpr const char *Parameters = "struct,any"; + static constexpr const char *Description = "Changes field(s)/value(s) to an existing STRUCT with the argument values. The entry name(s) will be the bound variable name(s)"; + static constexpr const char *Example = "struct_update({'a': 1}, a := 2)"; + static constexpr const char *Categories = ""; + + static ScalarFunction GetFunction(); +}; + } // namespace duckdb diff --git a/src/duckdb/extension/core_functions/scalar/struct/struct_update.cpp b/src/duckdb/extension/core_functions/scalar/struct/struct_update.cpp new file mode 100644 index 000000000..e83c9b884 --- /dev/null +++ b/src/duckdb/extension/core_functions/scalar/struct/struct_update.cpp @@ -0,0 +1,161 @@ +#include "core_functions/scalar/struct_functions.hpp" +#include "duckdb/planner/expression/bound_function_expression.hpp" +#include "duckdb/common/string_util.hpp" +#include "duckdb/parser/expression/bound_expression.hpp" +#include "duckdb/function/scalar/nested_functions.hpp" +#include "duckdb/common/case_insensitive_map.hpp" +#include "duckdb/storage/statistics/struct_stats.hpp" +#include "duckdb/planner/expression_binder.hpp" + +namespace duckdb { + +static void StructUpdateFunction(DataChunk &args, ExpressionState &state, Vector &result) { + auto &starting_vec = args.data[0]; + starting_vec.Verify(args.size()); + + auto &starting_child_entries = StructVector::GetEntries(starting_vec); + auto &result_child_entries = StructVector::GetEntries(result); + + auto &starting_types = StructType::GetChildTypes(starting_vec.GetType()); + + auto &func_args = state.expr.Cast().children; + auto new_entries = case_insensitive_tree_t(); + auto is_new_field = vector(args.ColumnCount(), true); + + for (idx_t arg_idx = 1; arg_idx < func_args.size(); arg_idx++) { + auto &new_child = func_args[arg_idx]; + new_entries.emplace(new_child->alias, arg_idx); + } + + // Assign the original child entries to the STRUCT. + for (idx_t field_idx = 0; field_idx < starting_child_entries.size(); field_idx++) { + auto &starting_child = starting_child_entries[field_idx]; + auto update = new_entries.find(starting_types[field_idx].first.c_str()); + + if (update == new_entries.end()) { + // No update present, copy from source + result_child_entries[field_idx]->Reference(*starting_child); + } else { + // We found a replacement of the same name to update + auto arg_idx = update->second; + result_child_entries[field_idx]->Reference(args.data[arg_idx]); + is_new_field[arg_idx] = false; + } + } + + // Assign the new (not updated) children to the end of the result vector. + for (idx_t arg_idx = 1, field_idx = starting_child_entries.size(); arg_idx < args.ColumnCount(); arg_idx++) { + if (is_new_field[arg_idx]) { + result_child_entries[field_idx++]->Reference(args.data[arg_idx]); + } + } + + result.Verify(args.size()); + if (args.AllConstant()) { + result.SetVectorType(VectorType::CONSTANT_VECTOR); + } +} + +static unique_ptr StructUpdateBind(ClientContext &context, ScalarFunction &bound_function, + vector> &arguments) { + if (arguments.empty()) { + throw InvalidInputException("Missing required arguments for struct_update function."); + } + if (LogicalTypeId::STRUCT != arguments[0]->return_type.id()) { + throw InvalidInputException("The first argument to struct_update must be a STRUCT"); + } + if (arguments.size() < 2) { + throw InvalidInputException("Can't update nothing into a STRUCT"); + } + + child_list_t new_children; + auto &existing_children = StructType::GetChildTypes(arguments[0]->return_type); + + auto incomming_children = case_insensitive_tree_t(); + auto is_new_field = vector(arguments.size(), true); + + // Validate incomming arguments and record names + for (idx_t arg_idx = 1; arg_idx < arguments.size(); arg_idx++) { + auto &child = arguments[arg_idx]; + if (child->alias.empty()) { + throw BinderException("Need named argument for struct update, e.g., a := b"); + } else if (incomming_children.find(child->alias) != incomming_children.end()) { + throw InvalidInputException("Duplicate named argument provided for %s", child->alias.c_str()); + } + incomming_children.emplace(child->alias, arg_idx); + } + + for (idx_t field_idx = 0; field_idx < existing_children.size(); field_idx++) { + auto &existing_child = existing_children[field_idx]; + auto update = incomming_children.find(existing_child.first); + if (update == incomming_children.end()) { + // No update provided for the named value + new_children.push_back(make_pair(existing_child.first, existing_child.second)); + } else { + // Update the struct with the new data of the same name + auto arg_idx = update->second; + auto &new_child = arguments[arg_idx]; + new_children.push_back(make_pair(new_child->alias, new_child->return_type)); + is_new_field[arg_idx] = false; + } + } + + // Loop through the additional arguments (name/value pairs) + for (idx_t arg_idx = 1; arg_idx < arguments.size(); arg_idx++) { + if (is_new_field[arg_idx]) { + auto &child = arguments[arg_idx]; + new_children.push_back(make_pair(child->alias, child->return_type)); + } + } + + bound_function.return_type = LogicalType::STRUCT(new_children); + return make_uniq(bound_function.return_type); +} + +unique_ptr StructUpdateStats(ClientContext &context, FunctionStatisticsInput &input) { + auto &child_stats = input.child_stats; + auto &expr = input.expr; + + auto incomming_children = case_insensitive_tree_t(); + auto is_new_field = vector(expr.children.size(), true); + auto new_stats = StructStats::CreateUnknown(expr.return_type); + + for (idx_t arg_idx = 1; arg_idx < expr.children.size(); arg_idx++) { + auto &new_child = expr.children[arg_idx]; + incomming_children.emplace(new_child->alias, arg_idx); + } + + auto existing_type = child_stats[0].GetType(); + auto existing_count = StructType::GetChildCount(existing_type); + auto existing_stats = StructStats::GetChildStats(child_stats[0]); + for (idx_t field_idx = 0; field_idx < existing_count; field_idx++) { + auto &existing_child = existing_stats[field_idx]; + auto update = incomming_children.find(StructType::GetChildName(existing_type, field_idx)); + if (update == incomming_children.end()) { + StructStats::SetChildStats(new_stats, field_idx, existing_child); + } else { + auto arg_idx = update->second; + StructStats::SetChildStats(new_stats, field_idx, child_stats[arg_idx]); + is_new_field[arg_idx] = false; + } + } + + for (idx_t arg_idx = 1, field_idx = existing_count; arg_idx < expr.children.size(); arg_idx++) { + if (is_new_field[arg_idx]) { + StructStats::SetChildStats(new_stats, field_idx++, child_stats[arg_idx]); + } + } + + return new_stats.ToUnique(); +} + +ScalarFunction StructUpdateFun::GetFunction() { + ScalarFunction fun({}, LogicalTypeId::STRUCT, StructUpdateFunction, StructUpdateBind, nullptr, StructUpdateStats); + fun.null_handling = FunctionNullHandling::SPECIAL_HANDLING; + fun.varargs = LogicalType::ANY; + fun.serialize = VariableReturnBindData::Serialize; + fun.deserialize = VariableReturnBindData::Deserialize; + return fun; +} + +} // namespace duckdb diff --git a/src/duckdb/extension/parquet/include/column_writer.hpp b/src/duckdb/extension/parquet/include/column_writer.hpp index 1a889c209..6452af8c1 100644 --- a/src/duckdb/extension/parquet/include/column_writer.hpp +++ b/src/duckdb/extension/parquet/include/column_writer.hpp @@ -119,7 +119,8 @@ class ColumnWriter { throw NotImplementedException("Writer does not need analysis"); } - virtual void Prepare(ColumnWriterState &state, ColumnWriterState *parent, Vector &vector, idx_t count) = 0; + virtual void Prepare(ColumnWriterState &state, ColumnWriterState *parent, Vector &vector, idx_t count, + bool vector_can_span_multiple_pages) = 0; virtual void BeginWrite(ColumnWriterState &state) = 0; virtual void Write(ColumnWriterState &state, Vector &vector, idx_t count) = 0; diff --git a/src/duckdb/extension/parquet/include/writer/array_column_writer.hpp b/src/duckdb/extension/parquet/include/writer/array_column_writer.hpp index 6e68bb7a0..630bfd17f 100644 --- a/src/duckdb/extension/parquet/include/writer/array_column_writer.hpp +++ b/src/duckdb/extension/parquet/include/writer/array_column_writer.hpp @@ -22,7 +22,8 @@ class ArrayColumnWriter : public ListColumnWriter { public: void Analyze(ColumnWriterState &state, ColumnWriterState *parent, Vector &vector, idx_t count) override; - void Prepare(ColumnWriterState &state, ColumnWriterState *parent, Vector &vector, idx_t count) override; + void Prepare(ColumnWriterState &state, ColumnWriterState *parent, Vector &vector, idx_t count, + bool vector_can_span_multiple_pages) override; void Write(ColumnWriterState &state, Vector &vector, idx_t count) override; }; diff --git a/src/duckdb/extension/parquet/include/writer/list_column_writer.hpp b/src/duckdb/extension/parquet/include/writer/list_column_writer.hpp index f04bfb209..f1070b0f1 100644 --- a/src/duckdb/extension/parquet/include/writer/list_column_writer.hpp +++ b/src/duckdb/extension/parquet/include/writer/list_column_writer.hpp @@ -40,7 +40,8 @@ class ListColumnWriter : public ColumnWriter { bool HasAnalyze() override; void Analyze(ColumnWriterState &state, ColumnWriterState *parent, Vector &vector, idx_t count) override; void FinalizeAnalyze(ColumnWriterState &state) override; - void Prepare(ColumnWriterState &state, ColumnWriterState *parent, Vector &vector, idx_t count) override; + void Prepare(ColumnWriterState &state, ColumnWriterState *parent, Vector &vector, idx_t count, + bool vector_can_span_multiple_pages) override; void BeginWrite(ColumnWriterState &state) override; void Write(ColumnWriterState &state, Vector &vector, idx_t count) override; diff --git a/src/duckdb/extension/parquet/include/writer/primitive_column_writer.hpp b/src/duckdb/extension/parquet/include/writer/primitive_column_writer.hpp index 01be024ea..28b217692 100644 --- a/src/duckdb/extension/parquet/include/writer/primitive_column_writer.hpp +++ b/src/duckdb/extension/parquet/include/writer/primitive_column_writer.hpp @@ -70,7 +70,8 @@ class PrimitiveColumnWriter : public ColumnWriter { public: unique_ptr InitializeWriteState(duckdb_parquet::RowGroup &row_group) override; - void Prepare(ColumnWriterState &state, ColumnWriterState *parent, Vector &vector, idx_t count) override; + void Prepare(ColumnWriterState &state, ColumnWriterState *parent, Vector &vector, idx_t count, + bool vector_can_span_multiple_pages) override; void BeginWrite(ColumnWriterState &state) override; void Write(ColumnWriterState &state, Vector &vector, idx_t count) override; void FinalizeWrite(ColumnWriterState &state) override; diff --git a/src/duckdb/extension/parquet/include/writer/struct_column_writer.hpp b/src/duckdb/extension/parquet/include/writer/struct_column_writer.hpp index 5b61e2ddb..8927c391b 100644 --- a/src/duckdb/extension/parquet/include/writer/struct_column_writer.hpp +++ b/src/duckdb/extension/parquet/include/writer/struct_column_writer.hpp @@ -28,7 +28,8 @@ class StructColumnWriter : public ColumnWriter { bool HasAnalyze() override; void Analyze(ColumnWriterState &state, ColumnWriterState *parent, Vector &vector, idx_t count) override; void FinalizeAnalyze(ColumnWriterState &state) override; - void Prepare(ColumnWriterState &state, ColumnWriterState *parent, Vector &vector, idx_t count) override; + void Prepare(ColumnWriterState &state, ColumnWriterState *parent, Vector &vector, idx_t count, + bool vector_can_span_multiple_pages) override; void BeginWrite(ColumnWriterState &state) override; void Write(ColumnWriterState &state, Vector &vector, idx_t count) override; diff --git a/src/duckdb/extension/parquet/include/zstd_file_system.hpp b/src/duckdb/extension/parquet/include/zstd_file_system.hpp index 15a2e5887..d50dd992b 100644 --- a/src/duckdb/extension/parquet/include/zstd_file_system.hpp +++ b/src/duckdb/extension/parquet/include/zstd_file_system.hpp @@ -15,7 +15,7 @@ namespace duckdb { class ZStdFileSystem : public CompressedFileSystem { public: - unique_ptr OpenCompressedFile(unique_ptr handle, bool write) override; + unique_ptr OpenCompressedFile(QueryContext context, unique_ptr handle, bool write) override; std::string GetName() const override { return "ZStdFileSystem"; diff --git a/src/duckdb/extension/parquet/parquet_writer.cpp b/src/duckdb/extension/parquet/parquet_writer.cpp index 205a7d05c..4819a2274 100644 --- a/src/duckdb/extension/parquet/parquet_writer.cpp +++ b/src/duckdb/extension/parquet/parquet_writer.cpp @@ -468,7 +468,7 @@ void ParquetWriter::PrepareRowGroup(ColumnDataCollection &buffer, PreparedRowGro for (auto &chunk : buffer.Chunks({column_ids})) { for (idx_t i = 0; i < next; i++) { - col_writers[i].get().Prepare(*write_states[i], nullptr, chunk.data[i], chunk.size()); + col_writers[i].get().Prepare(*write_states[i], nullptr, chunk.data[i], chunk.size(), true); } } diff --git a/src/duckdb/extension/parquet/writer/array_column_writer.cpp b/src/duckdb/extension/parquet/writer/array_column_writer.cpp index 6d3c5954a..024dbe819 100644 --- a/src/duckdb/extension/parquet/writer/array_column_writer.cpp +++ b/src/duckdb/extension/parquet/writer/array_column_writer.cpp @@ -9,7 +9,8 @@ void ArrayColumnWriter::Analyze(ColumnWriterState &state_p, ColumnWriterState *p child_writer->Analyze(*state.child_state, &state_p, array_child, array_size * count); } -void ArrayColumnWriter::Prepare(ColumnWriterState &state_p, ColumnWriterState *parent, Vector &vector, idx_t count) { +void ArrayColumnWriter::Prepare(ColumnWriterState &state_p, ColumnWriterState *parent, Vector &vector, idx_t count, + bool vector_can_span_multiple_pages) { auto &state = state_p.Cast(); auto array_size = ArrayType::GetSize(vector.GetType()); @@ -66,7 +67,9 @@ void ArrayColumnWriter::Prepare(ColumnWriterState &state_p, ColumnWriterState *p state.parent_index += vcount; auto &array_child = ArrayVector::GetEntry(vector); - child_writer->Prepare(*state.child_state, &state_p, array_child, count * array_size); + // The elements of a single array should not span multiple Parquet pages + // So, we force the entire vector to fit on a single page by setting "vector_can_span_multiple_pages=false" + child_writer->Prepare(*state.child_state, &state_p, array_child, count * array_size, false); } void ArrayColumnWriter::Write(ColumnWriterState &state_p, Vector &vector, idx_t count) { diff --git a/src/duckdb/extension/parquet/writer/list_column_writer.cpp b/src/duckdb/extension/parquet/writer/list_column_writer.cpp index 85b30003a..8fba00c23 100644 --- a/src/duckdb/extension/parquet/writer/list_column_writer.cpp +++ b/src/duckdb/extension/parquet/writer/list_column_writer.cpp @@ -57,7 +57,8 @@ static idx_t GetConsecutiveChildList(Vector &list, Vector &result, idx_t offset, return total_length; } -void ListColumnWriter::Prepare(ColumnWriterState &state_p, ColumnWriterState *parent, Vector &vector, idx_t count) { +void ListColumnWriter::Prepare(ColumnWriterState &state_p, ColumnWriterState *parent, Vector &vector, idx_t count, + bool vector_can_span_multiple_pages) { auto &state = state_p.Cast(); auto list_data = FlatVector::GetData(vector); @@ -111,7 +112,9 @@ void ListColumnWriter::Prepare(ColumnWriterState &state_p, ColumnWriterState *pa auto &list_child = ListVector::GetEntry(vector); Vector child_list(list_child); auto child_length = GetConsecutiveChildList(vector, child_list, 0, count); - child_writer->Prepare(*state.child_state, &state_p, child_list, child_length); + // The elements of a single list should not span multiple Parquet pages + // So, we force the entire vector to fit on a single page by setting "vector_can_span_multiple_pages=false" + child_writer->Prepare(*state.child_state, &state_p, child_list, child_length, false); } void ListColumnWriter::BeginWrite(ColumnWriterState &state_p) { diff --git a/src/duckdb/extension/parquet/writer/primitive_column_writer.cpp b/src/duckdb/extension/parquet/writer/primitive_column_writer.cpp index 0878fa33c..d4087fd3d 100644 --- a/src/duckdb/extension/parquet/writer/primitive_column_writer.cpp +++ b/src/duckdb/extension/parquet/writer/primitive_column_writer.cpp @@ -36,8 +36,8 @@ unique_ptr PrimitiveColumnWriter::InitializePageState(Pri void PrimitiveColumnWriter::FlushPageState(WriteStream &temp_writer, ColumnWriterPageState *state) { } -void PrimitiveColumnWriter::Prepare(ColumnWriterState &state_p, ColumnWriterState *parent, Vector &vector, - idx_t count) { +void PrimitiveColumnWriter::Prepare(ColumnWriterState &state_p, ColumnWriterState *parent, Vector &vector, idx_t count, + bool vector_can_span_multiple_pages) { auto &state = state_p.Cast(); auto &col_chunk = state.row_group.columns[state.col_idx]; @@ -70,6 +70,10 @@ void PrimitiveColumnWriter::Prepare(ColumnWriterState &state_p, ColumnWriterStat if (validity.RowIsValid(vector_index)) { page_info.estimated_page_size += GetRowSize(vector, vector_index, state); if (page_info.estimated_page_size >= MAX_UNCOMPRESSED_PAGE_SIZE) { + if (!vector_can_span_multiple_pages && i != 0) { + // Vector is not allowed to span multiple pages, and we already started writing it + continue; + } PageInformation new_info; new_info.offset = page_info.offset + page_info.row_count; state.page_info.push_back(new_info); diff --git a/src/duckdb/extension/parquet/writer/struct_column_writer.cpp b/src/duckdb/extension/parquet/writer/struct_column_writer.cpp index 9b3c35acb..c70c35ba2 100644 --- a/src/duckdb/extension/parquet/writer/struct_column_writer.cpp +++ b/src/duckdb/extension/parquet/writer/struct_column_writer.cpp @@ -55,7 +55,8 @@ void StructColumnWriter::FinalizeAnalyze(ColumnWriterState &state_p) { } } -void StructColumnWriter::Prepare(ColumnWriterState &state_p, ColumnWriterState *parent, Vector &vector, idx_t count) { +void StructColumnWriter::Prepare(ColumnWriterState &state_p, ColumnWriterState *parent, Vector &vector, idx_t count, + bool vector_can_span_multiple_pages) { auto &state = state_p.Cast(); auto &validity = FlatVector::Validity(vector); @@ -69,7 +70,8 @@ void StructColumnWriter::Prepare(ColumnWriterState &state_p, ColumnWriterState * HandleDefineLevels(state_p, parent, validity, count, PARQUET_DEFINE_VALID, MaxDefine() - 1); auto &child_vectors = StructVector::GetEntries(vector); for (idx_t child_idx = 0; child_idx < child_writers.size(); child_idx++) { - child_writers[child_idx]->Prepare(*state.child_states[child_idx], &state_p, *child_vectors[child_idx], count); + child_writers[child_idx]->Prepare(*state.child_states[child_idx], &state_p, *child_vectors[child_idx], count, + vector_can_span_multiple_pages); } } diff --git a/src/duckdb/extension/parquet/zstd_file_system.cpp b/src/duckdb/extension/parquet/zstd_file_system.cpp index 7204f3607..3bddf8661 100644 --- a/src/duckdb/extension/parquet/zstd_file_system.cpp +++ b/src/duckdb/extension/parquet/zstd_file_system.cpp @@ -13,7 +13,7 @@ struct ZstdStreamWrapper : public StreamWrapper { bool writing = false; public: - void Initialize(CompressedFile &file, bool write) override; + void Initialize(QueryContext context, CompressedFile &file, bool write) override; bool Read(StreamData &stream_data) override; void Write(CompressedFile &file, StreamData &stream_data, data_ptr_t buffer, int64_t nr_bytes) override; @@ -32,7 +32,7 @@ ZstdStreamWrapper::~ZstdStreamWrapper() { } } -void ZstdStreamWrapper::Initialize(CompressedFile &file, bool write) { +void ZstdStreamWrapper::Initialize(QueryContext context, CompressedFile &file, bool write) { Close(); this->file = &file; this->writing = write; @@ -156,9 +156,9 @@ void ZstdStreamWrapper::Close() { class ZStdFile : public CompressedFile { public: - ZStdFile(unique_ptr child_handle_p, const string &path, bool write) + ZStdFile(QueryContext context, unique_ptr child_handle_p, const string &path, bool write) : CompressedFile(zstd_fs, std::move(child_handle_p), path) { - Initialize(write); + Initialize(context, write); } FileCompressionType GetFileCompressionType() override { @@ -168,9 +168,10 @@ class ZStdFile : public CompressedFile { ZStdFileSystem zstd_fs; }; -unique_ptr ZStdFileSystem::OpenCompressedFile(unique_ptr handle, bool write) { +unique_ptr ZStdFileSystem::OpenCompressedFile(QueryContext context, unique_ptr handle, + bool write) { auto path = handle->path; - return make_uniq(std::move(handle), path, write); + return make_uniq(context, std::move(handle), path, write); } unique_ptr ZStdFileSystem::CreateStream() { diff --git a/src/duckdb/src/common/box_renderer.cpp b/src/duckdb/src/common/box_renderer.cpp index 82938257c..e33abd5e3 100644 --- a/src/duckdb/src/common/box_renderer.cpp +++ b/src/duckdb/src/common/box_renderer.cpp @@ -1006,7 +1006,7 @@ void BoxRenderer::Render(ClientContext &context, const vector &names, co top_rows = rows_to_render / 2 + (rows_to_render % 2 != 0 ? 1 : 0); bottom_rows = rows_to_render - top_rows; } - auto row_count_str = to_string(row_count) + " rows"; + auto row_count_str = FormatNumber(to_string(row_count)) + " rows"; bool has_limited_rows = config.limit > 0 && row_count == config.limit; if (has_limited_rows) { row_count_str = "? rows"; @@ -1016,9 +1016,9 @@ void BoxRenderer::Render(ClientContext &context, const vector &names, co if (has_hidden_rows) { shown_str = "("; if (has_limited_rows) { - shown_str += ">" + to_string(config.limit - 1) + " rows, "; + shown_str += ">" + FormatNumber(to_string(config.limit - 1)) + " rows, "; } - shown_str += to_string(top_rows + bottom_rows) + " shown)"; + shown_str += FormatNumber(to_string(top_rows + bottom_rows)) + " shown)"; } auto minimum_row_length = MaxValue(row_count_str.size(), shown_str.size()) + 4; diff --git a/src/duckdb/src/common/compressed_file_system.cpp b/src/duckdb/src/common/compressed_file_system.cpp index d222bf13d..bdbced81a 100644 --- a/src/duckdb/src/common/compressed_file_system.cpp +++ b/src/duckdb/src/common/compressed_file_system.cpp @@ -1,5 +1,6 @@ #include "duckdb/common/compressed_file_system.hpp" #include "duckdb/common/numeric_utils.hpp" +#include "duckdb/main/client_context.hpp" namespace duckdb { @@ -18,7 +19,7 @@ CompressedFile::~CompressedFile() { } } -void CompressedFile::Initialize(bool write) { +void CompressedFile::Initialize(QueryContext context, bool write) { Close(); this->write = write; @@ -34,14 +35,14 @@ void CompressedFile::Initialize(bool write) { current_position = 0; stream_wrapper = compressed_fs.CreateStream(); - stream_wrapper->Initialize(*this, write); + stream_wrapper->Initialize(context, *this, write); } idx_t CompressedFile::GetProgress() { return current_position; } -int64_t CompressedFile::ReadData(void *buffer, int64_t remaining) { +int64_t CompressedFile::ReadData(QueryContext context, void *buffer, int64_t remaining) { idx_t total_read = 0; while (true) { // first check if there are input bytes available in the output buffers @@ -78,7 +79,7 @@ int64_t CompressedFile::ReadData(void *buffer, int64_t remaining) { memmove(stream_data.in_buff.get(), stream_data.in_buff_start, UnsafeNumericCast(bufrem)); stream_data.in_buff_start = stream_data.in_buff.get(); // refill the rest of input buffer - auto sz = child_handle->Read(stream_data.in_buff_start + bufrem, + auto sz = child_handle->Read(context, stream_data.in_buff_start + bufrem, stream_data.in_buf_size - UnsafeNumericCast(bufrem)); stream_data.in_buff_end = stream_data.in_buff_start + bufrem + sz; if (sz <= 0) { @@ -92,7 +93,7 @@ int64_t CompressedFile::ReadData(void *buffer, int64_t remaining) { // empty input buffer: refill from the start stream_data.in_buff_start = stream_data.in_buff.get(); stream_data.in_buff_end = stream_data.in_buff_start; - auto sz = child_handle->Read(stream_data.in_buff.get(), stream_data.in_buf_size); + auto sz = child_handle->Read(context, stream_data.in_buff.get(), stream_data.in_buf_size); if (sz <= 0) { stream_wrapper.reset(); break; @@ -131,7 +132,7 @@ void CompressedFile::Close() { int64_t CompressedFileSystem::Read(FileHandle &handle, void *buffer, int64_t nr_bytes) { auto &compressed_file = handle.Cast(); - return compressed_file.ReadData(buffer, nr_bytes); + return compressed_file.ReadData(QueryContext(), buffer, nr_bytes); } int64_t CompressedFileSystem::Write(FileHandle &handle, void *buffer, int64_t nr_bytes) { @@ -142,7 +143,7 @@ int64_t CompressedFileSystem::Write(FileHandle &handle, void *buffer, int64_t nr void CompressedFileSystem::Reset(FileHandle &handle) { auto &compressed_file = handle.Cast(); compressed_file.child_handle->Reset(); - compressed_file.Initialize(compressed_file.write); + compressed_file.Initialize(QueryContext(), compressed_file.write); } int64_t CompressedFileSystem::GetFileSize(FileHandle &handle) { diff --git a/src/duckdb/src/common/file_system.cpp b/src/duckdb/src/common/file_system.cpp index a24d92206..a163378e3 100644 --- a/src/duckdb/src/common/file_system.cpp +++ b/src/duckdb/src/common/file_system.cpp @@ -62,6 +62,7 @@ constexpr FileOpenFlags FileFlags::FILE_FLAGS_NULL_IF_NOT_EXISTS; constexpr FileOpenFlags FileFlags::FILE_FLAGS_PARALLEL_ACCESS; constexpr FileOpenFlags FileFlags::FILE_FLAGS_EXCLUSIVE_CREATE; constexpr FileOpenFlags FileFlags::FILE_FLAGS_NULL_IF_EXISTS; +constexpr FileOpenFlags FileFlags::FILE_FLAGS_MULTI_CLIENT_ACCESS; void FileOpenFlags::Verify() { #ifdef DEBUG @@ -672,7 +673,7 @@ bool FileSystem::IsManuallySet() { return false; } -unique_ptr FileSystem::OpenCompressedFile(unique_ptr handle, bool write) { +unique_ptr FileSystem::OpenCompressedFile(QueryContext context, unique_ptr handle, bool write) { throw NotImplementedException("%s: OpenCompressedFile is not implemented!", GetName()); } @@ -692,6 +693,11 @@ int64_t FileHandle::Read(void *buffer, idx_t nr_bytes) { return file_system.Read(*this, buffer, UnsafeNumericCast(nr_bytes)); } +int64_t FileHandle::Read(QueryContext context, void *buffer, idx_t nr_bytes) { + // FIXME: Add profiling. + return file_system.Read(*this, buffer, UnsafeNumericCast(nr_bytes)); +} + bool FileHandle::Trim(idx_t offset_bytes, idx_t length_bytes) { return file_system.Trim(*this, offset_bytes, length_bytes); } @@ -752,6 +758,20 @@ string FileHandle::ReadLine() { } } +string FileHandle::ReadLine(QueryContext context) { + string result; + char buffer[1]; + while (true) { + auto tuples_read = UnsafeNumericCast(Read(context, buffer, 1)); + if (tuples_read == 0 || buffer[0] == '\n') { + return result; + } + if (buffer[0] != '\r') { + result += buffer[0]; + } + } +} + bool FileHandle::OnDiskFile() { return file_system.OnDiskFile(*this); } diff --git a/src/duckdb/src/common/gzip_file_system.cpp b/src/duckdb/src/common/gzip_file_system.cpp index ffb07eb5c..92b4e10d2 100644 --- a/src/duckdb/src/common/gzip_file_system.cpp +++ b/src/duckdb/src/common/gzip_file_system.cpp @@ -7,6 +7,7 @@ #include "miniz_wrapper.hpp" #include "duckdb/common/limits.hpp" +#include "duckdb/main/client_context.hpp" namespace duckdb { @@ -53,10 +54,10 @@ namespace duckdb { */ -static idx_t GZipConsumeString(FileHandle &input) { +static idx_t GZipConsumeString(QueryContext context, FileHandle &input) { idx_t size = 1; // terminator char buffer[1]; - while (input.Read(buffer, 1) == 1) { + while (input.Read(context, buffer, 1) == 1) { if (buffer[0] == '\0') { break; } @@ -75,7 +76,7 @@ struct MiniZStreamWrapper : public StreamWrapper { idx_t total_size; public: - void Initialize(CompressedFile &file, bool write) override; + void Initialize(QueryContext context, CompressedFile &file, bool write) override; bool Read(StreamData &stream_data) override; void Write(CompressedFile &file, StreamData &stream_data, data_ptr_t buffer, int64_t nr_bytes) override; @@ -96,7 +97,7 @@ MiniZStreamWrapper::~MiniZStreamWrapper() { } } -void MiniZStreamWrapper::Initialize(CompressedFile &file, bool write) { +void MiniZStreamWrapper::Initialize(QueryContext context, CompressedFile &file, bool write) { Close(); this->file = &file; mz_stream_ptr = make_uniq(); @@ -119,20 +120,20 @@ void MiniZStreamWrapper::Initialize(CompressedFile &file, bool write) { } } else { idx_t data_start = GZIP_HEADER_MINSIZE; - auto read_count = file.child_handle->Read(gzip_hdr, GZIP_HEADER_MINSIZE); + auto read_count = file.child_handle->Read(context, gzip_hdr, GZIP_HEADER_MINSIZE); GZipFileSystem::VerifyGZIPHeader(gzip_hdr, NumericCast(read_count), &file); // Skip over the extra field if necessary if (gzip_hdr[3] & GZIP_FLAG_EXTRA) { uint8_t gzip_xlen[2]; file.child_handle->Seek(data_start); - file.child_handle->Read(gzip_xlen, 2); + file.child_handle->Read(context, gzip_xlen, 2); auto xlen = NumericCast((uint8_t)gzip_xlen[0] | (uint8_t)gzip_xlen[1] << 8); data_start += xlen + 2; } // Skip over the file name if necessary if (gzip_hdr[3] & GZIP_FLAG_NAME) { file.child_handle->Seek(data_start); - data_start += GZipConsumeString(*file.child_handle); + data_start += GZipConsumeString(context, *file.child_handle); } file.child_handle->Seek(data_start); // stream is now set to beginning of payload data @@ -296,9 +297,9 @@ void MiniZStreamWrapper::Close() { class GZipFile : public CompressedFile { public: - GZipFile(unique_ptr child_handle_p, const string &path, bool write) + GZipFile(QueryContext context, unique_ptr child_handle_p, const string &path, bool write) : CompressedFile(gzip_fs, std::move(child_handle_p), path) { - Initialize(write); + Initialize(context, write); } FileCompressionType GetFileCompressionType() override { return FileCompressionType::GZIP; @@ -407,9 +408,10 @@ string GZipFileSystem::UncompressGZIPString(const char *data, idx_t size) { return decompressed; } -unique_ptr GZipFileSystem::OpenCompressedFile(unique_ptr handle, bool write) { +unique_ptr GZipFileSystem::OpenCompressedFile(QueryContext context, unique_ptr handle, + bool write) { auto path = handle->path; - return make_uniq(std::move(handle), path, write); + return make_uniq(context, std::move(handle), path, write); } unique_ptr GZipFileSystem::CreateStream() { diff --git a/src/duckdb/src/common/local_file_system.cpp b/src/duckdb/src/common/local_file_system.cpp index 1f246d241..8733e0162 100644 --- a/src/duckdb/src/common/local_file_system.cpp +++ b/src/duckdb/src/common/local_file_system.cpp @@ -238,7 +238,7 @@ static string AdditionalProcessInfo(FileSystem &fs, pid_t pid) { try { auto cmdline_file = fs.OpenFile(StringUtil::Format("/proc/%d/cmdline", pid), FileFlags::FILE_FLAGS_READ); - auto cmdline = cmdline_file->ReadLine(); + auto cmdline = cmdline_file->ReadLine(QueryContext()); process_name = basename(const_cast(cmdline.c_str())); // NOLINT: old C API does not take const } catch (std::exception &) { // ignore @@ -258,7 +258,7 @@ static string AdditionalProcessInfo(FileSystem &fs, pid_t pid) { // try to find out who created that process try { auto loginuid_file = fs.OpenFile(StringUtil::Format("/proc/%d/loginuid", pid), FileFlags::FILE_FLAGS_READ); - auto uid = std::stoi(loginuid_file->ReadLine()); + auto uid = std::stoi(loginuid_file->ReadLine(QueryContext())); auto pw = getpwuid(uid); if (pw) { process_owner = pw->pw_name; diff --git a/src/duckdb/src/common/pipe_file_system.cpp b/src/duckdb/src/common/pipe_file_system.cpp index 3345e4987..dc9b7d108 100644 --- a/src/duckdb/src/common/pipe_file_system.cpp +++ b/src/duckdb/src/common/pipe_file_system.cpp @@ -3,13 +3,15 @@ #include "duckdb/common/file_system.hpp" #include "duckdb/common/helper.hpp" #include "duckdb/common/numeric_utils.hpp" +#include "duckdb/main/client_context.hpp" namespace duckdb { + class PipeFile : public FileHandle { public: - explicit PipeFile(unique_ptr child_handle_p) + explicit PipeFile(QueryContext context_p, unique_ptr child_handle_p) : FileHandle(pipe_fs, child_handle_p->path, child_handle_p->GetFlags()), - child_handle(std::move(child_handle_p)) { + child_handle(std::move(child_handle_p)), context(context_p) { } PipeFileSystem pipe_fs; @@ -21,10 +23,13 @@ class PipeFile : public FileHandle { void Close() override { } + +private: + QueryContext context; }; int64_t PipeFile::ReadChunk(void *buffer, int64_t nr_bytes) { - return child_handle->Read(buffer, UnsafeNumericCast(nr_bytes)); + return child_handle->Read(context, buffer, UnsafeNumericCast(nr_bytes)); } int64_t PipeFile::WriteChunk(void *buffer, int64_t nr_bytes) { return child_handle->Write(buffer, UnsafeNumericCast(nr_bytes)); @@ -51,8 +56,8 @@ int64_t PipeFileSystem::GetFileSize(FileHandle &handle) { void PipeFileSystem::FileSync(FileHandle &handle) { } -unique_ptr PipeFileSystem::OpenPipe(unique_ptr handle) { - return make_uniq(std::move(handle)); +unique_ptr PipeFileSystem::OpenPipe(QueryContext context, unique_ptr handle) { + return make_uniq(context, std::move(handle)); } } // namespace duckdb diff --git a/src/duckdb/src/common/types/vector.cpp b/src/duckdb/src/common/types/vector.cpp index 3beae89eb..cad5da707 100644 --- a/src/duckdb/src/common/types/vector.cpp +++ b/src/duckdb/src/common/types/vector.cpp @@ -95,7 +95,8 @@ Vector::Vector(const Value &value) : type(value.type()) { Vector::Vector(Vector &&other) noexcept : vector_type(other.vector_type), type(std::move(other.type)), data(other.data), - validity(std::move(other.validity)), buffer(std::move(other.buffer)), auxiliary(std::move(other.auxiliary)) { + validity(std::move(other.validity)), buffer(std::move(other.buffer)), auxiliary(std::move(other.auxiliary)), + cached_hashes(std::move(other.cached_hashes)) { } void Vector::Reference(const Value &value) { @@ -168,6 +169,7 @@ void Vector::Reinterpret(const Vector &other) { auxiliary = make_shared_ptr(std::move(new_vector)); } else { AssignSharedPointer(auxiliary, other.auxiliary); + AssignSharedPointer(cached_hashes, other.cached_hashes); } data = other.data; validity = other.validity; @@ -272,6 +274,7 @@ void Vector::Slice(const SelectionVector &sel, idx_t count) { vector_type = VectorType::DICTIONARY_VECTOR; buffer = std::move(dict_buffer); auxiliary = std::move(child_ref); + cached_hashes.reset(); } void Vector::Dictionary(idx_t dictionary_size, const SelectionVector &sel, idx_t count) { @@ -281,7 +284,12 @@ void Vector::Dictionary(idx_t dictionary_size, const SelectionVector &sel, idx_t } } -void Vector::Dictionary(const Vector &dict, idx_t dictionary_size, const SelectionVector &sel, idx_t count) { +void Vector::Dictionary(Vector &dict, idx_t dictionary_size, const SelectionVector &sel, idx_t count) { + if (DictionaryVector::CanCacheHashes(dict.GetType()) && !dict.cached_hashes) { + // Create an empty hash vector for this dictionary, potentially to be used for caching hashes later + // This needs to happen here, as we need to add "cached_hashes" to the original input Vector "dict" + dict.cached_hashes = make_buffer(Vector(LogicalType::HASH, false, false, 0)); + } Reference(dict); Dictionary(dictionary_size, sel, count); } @@ -1888,6 +1896,22 @@ void Vector::DebugShuffleNestedVector(Vector &vector, idx_t count) { } } +//===--------------------------------------------------------------------===// +// DictionaryVector +//===--------------------------------------------------------------------===// +const Vector &DictionaryVector::GetCachedHashes(Vector &input) { + D_ASSERT(CanCacheHashes(input)); + auto &dictionary = Child(input); + auto &dictionary_hashes = dictionary.cached_hashes->Cast().data; + if (!dictionary_hashes.data) { + // Uninitialized: hash the dictionary + const auto dictionary_count = DictionarySize(input).GetIndex(); + dictionary_hashes.Initialize(false, dictionary_count); + VectorOperations::Hash(dictionary, dictionary_hashes, dictionary_count); + } + return dictionary.cached_hashes->Cast().data; +} + //===--------------------------------------------------------------------===// // FlatVector //===--------------------------------------------------------------------===// diff --git a/src/duckdb/src/common/vector_operations/vector_hash.cpp b/src/duckdb/src/common/vector_operations/vector_hash.cpp index 83197e28b..062ee1719 100644 --- a/src/duckdb/src/common/vector_operations/vector_hash.cpp +++ b/src/duckdb/src/common/vector_operations/vector_hash.cpp @@ -22,38 +22,53 @@ struct HashOp { } }; +struct CachedHashOp { + template + static hash_t Operation(T) { + throw InternalException("CachedHashOp::Operation called on non-hash input"); + } +}; + +template <> +hash_t CachedHashOp::Operation(hash_t input) { + return input; +} + hash_t CombineHashScalar(hash_t a, hash_t b) { a ^= a >> 32; a *= 0xd6e8feb86659fd93U; return a ^ b; } -template +template void TightLoopHash(const T *__restrict ldata, hash_t *__restrict result_data, const SelectionVector *rsel, idx_t count, const SelectionVector *__restrict sel_vector, const ValidityMask &mask) { if (!mask.AllValid()) { for (idx_t i = 0; i < count; i++) { auto ridx = HAS_RSEL ? rsel->get_index_unsafe(i) : i; auto idx = HAS_SEL_VECTOR ? sel_vector->get_index_unsafe(ridx) : ridx; - result_data[ridx] = HashOp::Operation(ldata[idx], !mask.RowIsValidUnsafe(idx)); + result_data[ridx] = INPUT_IS_ALREADY_HASH ? CachedHashOp::Operation(ldata[idx]) + : HashOp::Operation(ldata[idx], !mask.RowIsValidUnsafe(idx)); } } else { for (idx_t i = 0; i < count; i++) { auto ridx = HAS_RSEL ? rsel->get_index_unsafe(i) : i; auto idx = HAS_SEL_VECTOR ? sel_vector->get_index_unsafe(ridx) : ridx; - result_data[ridx] = duckdb::Hash(ldata[idx]); + result_data[ridx] = + INPUT_IS_ALREADY_HASH ? CachedHashOp::Operation(ldata[idx]) : duckdb::Hash(ldata[idx]); } } } -template +template void TemplatedLoopHash(Vector &input, Vector &result, const SelectionVector *rsel, idx_t count) { if (input.GetVectorType() == VectorType::CONSTANT_VECTOR) { result.SetVectorType(VectorType::CONSTANT_VECTOR); auto ldata = ConstantVector::GetData(input); auto result_data = ConstantVector::GetData(result); - *result_data = HashOp::Operation(*ldata, ConstantVector::IsNull(input)); + *result_data = INPUT_IS_ALREADY_HASH ? CachedHashOp::Operation(*ldata) + : HashOp::Operation(*ldata, ConstantVector::IsNull(input)); } else { result.SetVectorType(VectorType::FLAT_VECTOR); @@ -61,13 +76,13 @@ void TemplatedLoopHash(Vector &input, Vector &result, const SelectionVector *rse input.ToUnifiedFormat(count, idata); if (idata.sel->IsSet()) { - TightLoopHash(UnifiedVectorFormat::GetData(idata), - FlatVector::GetData(result), rsel, count, idata.sel, - idata.validity); + TightLoopHash(UnifiedVectorFormat::GetData(idata), + FlatVector::GetData(result), rsel, count, + idata.sel, idata.validity); } else { - TightLoopHash(UnifiedVectorFormat::GetData(idata), - FlatVector::GetData(result), rsel, count, idata.sel, - idata.validity); + TightLoopHash(UnifiedVectorFormat::GetData(idata), + FlatVector::GetData(result), rsel, count, + idata.sel, idata.validity); } } } @@ -324,7 +339,7 @@ void HashTypeSwitch(Vector &input, Vector &result, const SelectionVector *rsel, } } -template +template void TightLoopCombineHashConstant(const T *__restrict ldata, hash_t constant_hash, hash_t *__restrict hash_data, const SelectionVector *rsel, idx_t count, const SelectionVector *__restrict sel_vector, ValidityMask &mask) { @@ -332,20 +347,21 @@ void TightLoopCombineHashConstant(const T *__restrict ldata, hash_t constant_has for (idx_t i = 0; i < count; i++) { auto ridx = HAS_RSEL ? rsel->get_index(i) : i; auto idx = sel_vector->get_index(ridx); - auto other_hash = HashOp::Operation(ldata[idx], !mask.RowIsValid(idx)); + auto other_hash = INPUT_IS_ALREADY_HASH ? CachedHashOp::Operation(ldata[idx]) + : HashOp::Operation(ldata[idx], !mask.RowIsValid(idx)); hash_data[ridx] = CombineHashScalar(constant_hash, other_hash); } } else { for (idx_t i = 0; i < count; i++) { auto ridx = HAS_RSEL ? rsel->get_index(i) : i; auto idx = sel_vector->get_index(ridx); - auto other_hash = duckdb::Hash(ldata[idx]); + auto other_hash = INPUT_IS_ALREADY_HASH ? CachedHashOp::Operation(ldata[idx]) : duckdb::Hash(ldata[idx]); hash_data[ridx] = CombineHashScalar(constant_hash, other_hash); } } } -template +template static inline void TightLoopCombineHash(const T *__restrict ldata, hash_t *__restrict const hash_data, const SelectionVector *__restrict const rsel, const idx_t count, const SelectionVector *__restrict const sel_vector, const ValidityMask &mask) { @@ -353,26 +369,28 @@ static inline void TightLoopCombineHash(const T *__restrict ldata, hash_t *__res for (idx_t i = 0; i < count; i++) { auto ridx = HAS_RSEL ? rsel->get_index_unsafe(i) : i; auto idx = HAS_SEL ? sel_vector->get_index_unsafe(ridx) : ridx; - auto other_hash = HashOp::Operation(ldata[idx], !mask.RowIsValidUnsafe(idx)); + auto other_hash = INPUT_IS_ALREADY_HASH ? CachedHashOp::Operation(ldata[idx]) + : HashOp::Operation(ldata[idx], !mask.RowIsValid(idx)); hash_data[ridx] = CombineHashScalar(hash_data[ridx], other_hash); } } else { for (idx_t i = 0; i < count; i++) { auto ridx = HAS_RSEL ? rsel->get_index_unsafe(i) : i; auto idx = HAS_SEL ? sel_vector->get_index_unsafe(ridx) : ridx; - auto other_hash = duckdb::Hash(ldata[idx]); + auto other_hash = INPUT_IS_ALREADY_HASH ? CachedHashOp::Operation(ldata[idx]) : duckdb::Hash(ldata[idx]); hash_data[ridx] = CombineHashScalar(hash_data[ridx], other_hash); } } } -template +template void TemplatedLoopCombineHash(Vector &input, Vector &hashes, const SelectionVector *rsel, idx_t count) { if (input.GetVectorType() == VectorType::CONSTANT_VECTOR && hashes.GetVectorType() == VectorType::CONSTANT_VECTOR) { auto ldata = ConstantVector::GetData(input); auto hash_data = ConstantVector::GetData(hashes); - auto other_hash = HashOp::Operation(*ldata, ConstantVector::IsNull(input)); + auto other_hash = INPUT_IS_ALREADY_HASH ? CachedHashOp::Operation(*ldata) + : HashOp::Operation(*ldata, ConstantVector::IsNull(input)); *hash_data = CombineHashScalar(*hash_data, other_hash); } else { UnifiedVectorFormat idata; @@ -382,19 +400,19 @@ void TemplatedLoopCombineHash(Vector &input, Vector &hashes, const SelectionVect auto constant_hash = *ConstantVector::GetData(hashes); // now re-initialize the hashes vector to an empty flat vector hashes.SetVectorType(VectorType::FLAT_VECTOR); - TightLoopCombineHashConstant(UnifiedVectorFormat::GetData(idata), constant_hash, - FlatVector::GetData(hashes), rsel, count, idata.sel, - idata.validity); + TightLoopCombineHashConstant( + UnifiedVectorFormat::GetData(idata), constant_hash, FlatVector::GetData(hashes), rsel, count, + idata.sel, idata.validity); } else { D_ASSERT(hashes.GetVectorType() == VectorType::FLAT_VECTOR); if (idata.sel->IsSet()) { - TightLoopCombineHash(UnifiedVectorFormat::GetData(idata), - FlatVector::GetData(hashes), rsel, count, idata.sel, - idata.validity); + TightLoopCombineHash(UnifiedVectorFormat::GetData(idata), + FlatVector::GetData(hashes), + rsel, count, idata.sel, idata.validity); } else { - TightLoopCombineHash(UnifiedVectorFormat::GetData(idata), - FlatVector::GetData(hashes), rsel, count, idata.sel, - idata.validity); + TightLoopCombineHash(UnifiedVectorFormat::GetData(idata), + FlatVector::GetData(hashes), + rsel, count, idata.sel, idata.validity); } } } @@ -464,19 +482,39 @@ void CombineHashTypeSwitch(Vector &hashes, Vector &input, const SelectionVector } // namespace void VectorOperations::Hash(Vector &input, Vector &result, idx_t count) { - HashTypeSwitch(input, result, nullptr, count); + if (input.GetVectorType() == VectorType::DICTIONARY_VECTOR && DictionaryVector::CanCacheHashes(input)) { + VectorOperations::Copy(DictionaryVector::GetCachedHashes(input), result, DictionaryVector::SelVector(input), + count, 0, 0); + } else { + HashTypeSwitch(input, result, nullptr, count); + } } void VectorOperations::Hash(Vector &input, Vector &result, const SelectionVector &sel, idx_t count) { - HashTypeSwitch(input, result, &sel, count); + if (input.GetVectorType() == VectorType::DICTIONARY_VECTOR && DictionaryVector::CanCacheHashes(input)) { + Vector input_hashes(DictionaryVector::GetCachedHashes(input), DictionaryVector::SelVector(input), count); + TemplatedLoopHash(input_hashes, result, &sel, count); + } else { + HashTypeSwitch(input, result, &sel, count); + } } void VectorOperations::CombineHash(Vector &hashes, Vector &input, idx_t count) { - CombineHashTypeSwitch(hashes, input, nullptr, count); + if (input.GetVectorType() == VectorType::DICTIONARY_VECTOR && DictionaryVector::CanCacheHashes(input)) { + Vector input_hashes(DictionaryVector::GetCachedHashes(input), DictionaryVector::SelVector(input), count); + TemplatedLoopCombineHash(input_hashes, hashes, nullptr, count); + } else { + CombineHashTypeSwitch(hashes, input, nullptr, count); + } } void VectorOperations::CombineHash(Vector &hashes, Vector &input, const SelectionVector &rsel, idx_t count) { - CombineHashTypeSwitch(hashes, input, &rsel, count); + if (input.GetVectorType() == VectorType::DICTIONARY_VECTOR && DictionaryVector::CanCacheHashes(input)) { + Vector input_hashes(DictionaryVector::GetCachedHashes(input), DictionaryVector::SelVector(input), count); + TemplatedLoopCombineHash(input_hashes, hashes, &rsel, count); + } else { + CombineHashTypeSwitch(hashes, input, &rsel, count); + } } } // namespace duckdb diff --git a/src/duckdb/src/common/virtual_file_system.cpp b/src/duckdb/src/common/virtual_file_system.cpp index 472248850..559e140f5 100644 --- a/src/duckdb/src/common/virtual_file_system.cpp +++ b/src/duckdb/src/common/virtual_file_system.cpp @@ -1,7 +1,10 @@ #include "duckdb/common/virtual_file_system.hpp" + +#include "duckdb/common/file_opener.hpp" #include "duckdb/common/gzip_file_system.hpp" #include "duckdb/common/pipe_file_system.hpp" #include "duckdb/common/string_util.hpp" +#include "duckdb/main/client_context.hpp" namespace duckdb { @@ -36,8 +39,10 @@ unique_ptr VirtualFileSystem::OpenFileExtended(const OpenFileInfo &f if (!file_handle) { return nullptr; } + + const auto context = !flags.MultiClientAccess() ? FileOpener::TryGetClientContext(opener) : QueryContext(); if (file_handle->GetType() == FileType::FILE_TYPE_FIFO) { - file_handle = PipeFileSystem::OpenPipe(std::move(file_handle)); + file_handle = PipeFileSystem::OpenPipe(context, std::move(file_handle)); } else if (compression != FileCompressionType::UNCOMPRESSED) { auto entry = compressed_fs.find(compression); if (entry == compressed_fs.end()) { @@ -49,7 +54,7 @@ unique_ptr VirtualFileSystem::OpenFileExtended(const OpenFileInfo &f throw NotImplementedException( "Attempting to open a compressed file, but the compression type is not supported"); } - file_handle = entry->second->OpenCompressedFile(std::move(file_handle), flags.OpenForWriting()); + file_handle = entry->second->OpenCompressedFile(context, std::move(file_handle), flags.OpenForWriting()); } return file_handle; } diff --git a/src/duckdb/src/execution/index/art/node256.cpp b/src/duckdb/src/execution/index/art/node256.cpp index f5ff96643..84cf40989 100644 --- a/src/duckdb/src/execution/index/art/node256.cpp +++ b/src/duckdb/src/execution/index/art/node256.cpp @@ -20,7 +20,7 @@ void Node256::DeleteChild(ART &art, Node &node, const uint8_t byte) { Node::FreeTree(art, n.children[byte]); n.count--; - if (n.count >= SHRINK_THRESHOLD) { + if (n.count > SHRINK_THRESHOLD) { return; } } diff --git a/src/duckdb/src/execution/operator/csv_scanner/buffer_manager/csv_buffer.cpp b/src/duckdb/src/execution/operator/csv_scanner/buffer_manager/csv_buffer.cpp index bcb8306e6..5c09dd0d6 100644 --- a/src/duckdb/src/execution/operator/csv_scanner/buffer_manager/csv_buffer.cpp +++ b/src/duckdb/src/execution/operator/csv_scanner/buffer_manager/csv_buffer.cpp @@ -1,5 +1,6 @@ #include "duckdb/execution/operator/csv_scanner/csv_buffer.hpp" #include "duckdb/common/string_util.hpp" +#include "duckdb/main/client_context.hpp" namespace duckdb { diff --git a/src/duckdb/src/execution/operator/csv_scanner/buffer_manager/csv_file_handle.cpp b/src/duckdb/src/execution/operator/csv_scanner/buffer_manager/csv_file_handle.cpp index 5c39deab4..2d4b5388f 100644 --- a/src/duckdb/src/execution/operator/csv_scanner/buffer_manager/csv_file_handle.cpp +++ b/src/duckdb/src/execution/operator/csv_scanner/buffer_manager/csv_file_handle.cpp @@ -7,10 +7,10 @@ namespace duckdb { -CSVFileHandle::CSVFileHandle(ClientContext &context, unique_ptr file_handle_p, const OpenFileInfo &file_p, +CSVFileHandle::CSVFileHandle(ClientContext &context_p, unique_ptr file_handle_p, const OpenFileInfo &file_p, const CSVReaderOptions &options) - : compression_type(options.compression), file_handle(std::move(file_handle_p)), - encoder(context, options.encoding, options.buffer_size_option.GetValue()), file(file_p) { + : compression_type(options.compression), context(context_p), file_handle(std::move(file_handle_p)), + encoder(context_p, options.encoding, options.buffer_size_option.GetValue()), file(file_p) { can_seek = file_handle->CanSeek(); on_disk_file = file_handle->OnDiskFile(); file_size = file_handle->GetFileSize(); @@ -80,7 +80,7 @@ idx_t CSVFileHandle::Read(void *buffer, idx_t nr_bytes) { // if this is a plain file source OR we can seek we are not caching anything idx_t bytes_read = 0; if (encoder.encoding_name == "utf-8") { - bytes_read = static_cast(file_handle->Read(buffer, nr_bytes)); + bytes_read = static_cast(file_handle->Read(context, buffer, nr_bytes)); } else { bytes_read = encoder.Encode(*file_handle, static_cast(buffer), nr_bytes); } diff --git a/src/duckdb/src/execution/operator/csv_scanner/encode/csv_encoder.cpp b/src/duckdb/src/execution/operator/csv_scanner/encode/csv_encoder.cpp index 17fe9837c..1a06cbf0f 100644 --- a/src/duckdb/src/execution/operator/csv_scanner/encode/csv_encoder.cpp +++ b/src/duckdb/src/execution/operator/csv_scanner/encode/csv_encoder.cpp @@ -1,4 +1,5 @@ #include "duckdb/execution/operator/csv_scanner/encode/csv_encoder.hpp" + #include "duckdb/common/exception.hpp" #include "duckdb/main/config.hpp" #include "duckdb/function/encoding_function.hpp" @@ -37,14 +38,14 @@ void CSVEncoderBuffer::Reset() { actual_encoded_buffer_size = 0; } -CSVEncoder::CSVEncoder(ClientContext &context, const string &encoding_name_to_find, idx_t buffer_size) - : pass_on_byte(0) { - auto &config = DBConfig::GetConfig(context); +CSVEncoder::CSVEncoder(ClientContext &context_p, const string &encoding_name_to_find, idx_t buffer_size) + : context(context_p), pass_on_byte(0) { + auto &config = DBConfig::GetConfig(context_p); encoding_name = StringUtil::Lower(encoding_name_to_find); auto function = config.GetEncodeFunction(encoding_name_to_find); if (!function) { // Maybe we can try to auto-load from our encodings extension, if this somehow fails, we just error. - if (Catalog::TryAutoLoad(context, "encodings")) { + if (Catalog::TryAutoLoad(context_p, "encodings")) { // If it successfully loaded, we can try to get our function again function = config.GetEncodeFunction(encoding_name_to_find); } @@ -54,7 +55,7 @@ CSVEncoder::CSVEncoder(ClientContext &context, const string &encoding_name_to_fi auto loaded_encodings = config.GetLoadedEncodedFunctions(); std::ostringstream error; error << "The CSV Reader does not support the encoding: \"" << encoding_name_to_find << "\"\n"; - if (!context.db->ExtensionIsLoaded("encodings")) { + if (!context_p.db->ExtensionIsLoaded("encodings")) { error << "It is possible that the encoding exists in the encodings extension. You can try \"INSTALL " "encodings; LOAD encodings\"" << "\n"; @@ -117,14 +118,14 @@ idx_t CSVEncoder::Encode(FileHandle &file_handle_input, char *output_buffer, con encoded_buffer.Ptr()[pass_on_buffer.size()] = pass_on_byte; } auto actual_encoded_bytes = static_cast( - file_handle_input.Read(encoded_buffer.Ptr() + pass_on_buffer.size() + has_pass_on_byte, + file_handle_input.Read(context, encoded_buffer.Ptr() + pass_on_buffer.size() + has_pass_on_byte, encoded_buffer.GetCapacity() - pass_on_buffer.size() - has_pass_on_byte)); encoded_buffer.SetSize(actual_encoded_bytes + pass_on_buffer.size() + has_pass_on_byte); if (actual_encoded_bytes < encoded_buffer.GetCapacity() - pass_on_buffer.size()) { encoded_buffer.last_buffer = true; has_pass_on_byte = false; } else { - auto bytes_read = static_cast(file_handle_input.Read(&pass_on_byte, 1)); + auto bytes_read = static_cast(file_handle_input.Read(context, &pass_on_byte, 1)); if (bytes_read == 0) { encoded_buffer.last_buffer = true; has_pass_on_byte = false; diff --git a/src/duckdb/src/function/cast_rules.cpp b/src/duckdb/src/function/cast_rules.cpp index 5ab775eea..531572ca0 100644 --- a/src/duckdb/src/function/cast_rules.cpp +++ b/src/duckdb/src/function/cast_rules.cpp @@ -360,6 +360,10 @@ int64_t CastRules::ImplicitCast(const LogicalType &from, const LogicalType &to) // NULL expression can be cast to anything return TargetTypeCost(to); } + if (from.id() == LogicalTypeId::ANY && to.IsTemplated()) { + // This can happen when changing a function from using ANY to using TEMPLATE. + return TargetTypeCost(to); + } if (from.id() == LogicalTypeId::UNKNOWN) { // parameter expression can be cast to anything for no cost return 0; diff --git a/src/duckdb/src/function/table/read_file.cpp b/src/duckdb/src/function/table/read_file.cpp index 6be5618e5..1a4fb1da2 100644 --- a/src/duckdb/src/function/table/read_file.cpp +++ b/src/duckdb/src/function/table/read_file.cpp @@ -180,7 +180,7 @@ static void ReadFileExecute(ClientContext &context, TableFunctionInput &input, D } else { // Local file: non-caching read actually_read = NumericCast(file_handle->GetFileHandle().Read( - content_string_ptr, UnsafeNumericCast(bytes_to_read))); + QueryContext(context), content_string_ptr, UnsafeNumericCast(bytes_to_read))); } if (actually_read == 0) { diff --git a/src/duckdb/src/function/table/version/pragma_version.cpp b/src/duckdb/src/function/table/version/pragma_version.cpp index 976b568b1..ab430d63c 100644 --- a/src/duckdb/src/function/table/version/pragma_version.cpp +++ b/src/duckdb/src/function/table/version/pragma_version.cpp @@ -1,5 +1,5 @@ #ifndef DUCKDB_PATCH_VERSION -#define DUCKDB_PATCH_VERSION "0-dev2825" +#define DUCKDB_PATCH_VERSION "0-dev2912" #endif #ifndef DUCKDB_MINOR_VERSION #define DUCKDB_MINOR_VERSION 4 @@ -8,10 +8,10 @@ #define DUCKDB_MAJOR_VERSION 1 #endif #ifndef DUCKDB_VERSION -#define DUCKDB_VERSION "v1.4.0-dev2825" +#define DUCKDB_VERSION "v1.4.0-dev2912" #endif #ifndef DUCKDB_SOURCE_ID -#define DUCKDB_SOURCE_ID "3483d12aab" +#define DUCKDB_SOURCE_ID "2ed9bf887f" #endif #include "duckdb/function/table/system_functions.hpp" #include "duckdb/main/database.hpp" diff --git a/src/duckdb/src/include/duckdb/common/compressed_file_system.hpp b/src/duckdb/src/include/duckdb/common/compressed_file_system.hpp index ff6cffab8..3ebc3834a 100644 --- a/src/duckdb/src/include/duckdb/common/compressed_file_system.hpp +++ b/src/duckdb/src/include/duckdb/common/compressed_file_system.hpp @@ -32,7 +32,7 @@ struct StreamData { struct StreamWrapper { DUCKDB_API virtual ~StreamWrapper(); - DUCKDB_API virtual void Initialize(CompressedFile &file, bool write) = 0; + DUCKDB_API virtual void Initialize(QueryContext context, CompressedFile &file, bool write) = 0; DUCKDB_API virtual bool Read(StreamData &stream_data) = 0; DUCKDB_API virtual void Write(CompressedFile &file, StreamData &stream_data, data_ptr_t buffer, int64_t nr_bytes) = 0; @@ -70,8 +70,8 @@ class CompressedFile : public FileHandle { StreamData stream_data; public: - DUCKDB_API void Initialize(bool write); - DUCKDB_API int64_t ReadData(void *buffer, int64_t nr_bytes); + DUCKDB_API void Initialize(QueryContext context, bool write); + DUCKDB_API int64_t ReadData(QueryContext context, void *buffer, int64_t nr_bytes); DUCKDB_API int64_t WriteData(data_ptr_t buffer, int64_t nr_bytes); DUCKDB_API void Close() override; diff --git a/src/duckdb/src/include/duckdb/common/file_open_flags.hpp b/src/duckdb/src/include/duckdb/common/file_open_flags.hpp index b469e338c..4dd14af80 100644 --- a/src/duckdb/src/include/duckdb/common/file_open_flags.hpp +++ b/src/duckdb/src/include/duckdb/common/file_open_flags.hpp @@ -28,6 +28,7 @@ class FileOpenFlags { static constexpr idx_t FILE_FLAGS_PARALLEL_ACCESS = idx_t(1 << 8); static constexpr idx_t FILE_FLAGS_EXCLUSIVE_CREATE = idx_t(1 << 9); static constexpr idx_t FILE_FLAGS_NULL_IF_EXISTS = idx_t(1 << 10); + static constexpr idx_t FILE_FLAGS_MULTI_CLIENT_ACCESS = idx_t(1 << 11); public: FileOpenFlags() = default; @@ -107,6 +108,9 @@ class FileOpenFlags { inline bool ReturnNullIfExists() const { return flags & FILE_FLAGS_NULL_IF_EXISTS; } + inline bool MultiClientAccess() const { + return flags & FILE_FLAGS_MULTI_CLIENT_ACCESS; + } inline idx_t GetFlagsInternal() const { return flags; } @@ -145,6 +149,9 @@ class FileFlags { FileOpenFlags(FileOpenFlags::FILE_FLAGS_EXCLUSIVE_CREATE); //! Return NULL if the file exist instead of throwing an error static constexpr FileOpenFlags FILE_FLAGS_NULL_IF_EXISTS = FileOpenFlags(FileOpenFlags::FILE_FLAGS_NULL_IF_EXISTS); + //! Multiple clients may access the file at the same time + static constexpr FileOpenFlags FILE_FLAGS_MULTI_CLIENT_ACCESS = + FileOpenFlags(FileOpenFlags::FILE_FLAGS_MULTI_CLIENT_ACCESS); }; } // namespace duckdb diff --git a/src/duckdb/src/include/duckdb/common/file_system.hpp b/src/duckdb/src/include/duckdb/common/file_system.hpp index 97e7691d2..3489c5875 100644 --- a/src/duckdb/src/include/duckdb/common/file_system.hpp +++ b/src/duckdb/src/include/duckdb/common/file_system.hpp @@ -64,6 +64,7 @@ struct FileHandle { // Read at [nr_bytes] bytes into [buffer], and return the bytes actually read. // File offset will be changed, which advances for number of bytes read. DUCKDB_API int64_t Read(void *buffer, idx_t nr_bytes); + DUCKDB_API int64_t Read(QueryContext context, void *buffer, idx_t nr_bytes); DUCKDB_API int64_t Write(void *buffer, idx_t nr_bytes); // Read at [nr_bytes] bytes into [buffer]. // File offset will not be changed. @@ -76,6 +77,7 @@ struct FileHandle { DUCKDB_API void Sync(); DUCKDB_API void Truncate(int64_t new_size); DUCKDB_API string ReadLine(); + DUCKDB_API string ReadLine(QueryContext context); DUCKDB_API bool Trim(idx_t offset_bytes, idx_t length_bytes); DUCKDB_API virtual idx_t GetProgress(); DUCKDB_API virtual FileCompressionType GetFileCompressionType(); @@ -263,7 +265,8 @@ class FileSystem { //! in a file on-disk are much cheaper than e.g. random reads in a file over the network DUCKDB_API virtual bool OnDiskFile(FileHandle &handle); - DUCKDB_API virtual unique_ptr OpenCompressedFile(unique_ptr handle, bool write); + DUCKDB_API virtual unique_ptr OpenCompressedFile(QueryContext context, unique_ptr handle, + bool write); //! Create a LocalFileSystem. DUCKDB_API static unique_ptr CreateLocal(); diff --git a/src/duckdb/src/include/duckdb/common/gzip_file_system.hpp b/src/duckdb/src/include/duckdb/common/gzip_file_system.hpp index fffe726e8..6e69c0475 100644 --- a/src/duckdb/src/include/duckdb/common/gzip_file_system.hpp +++ b/src/duckdb/src/include/duckdb/common/gzip_file_system.hpp @@ -17,7 +17,7 @@ class GZipFileSystem : public CompressedFileSystem { static constexpr const idx_t BUFFER_SIZE = 1u << 15; public: - unique_ptr OpenCompressedFile(unique_ptr handle, bool write) override; + unique_ptr OpenCompressedFile(QueryContext context, unique_ptr handle, bool write) override; std::string GetName() const override { return "GZipFileSystem"; diff --git a/src/duckdb/src/include/duckdb/common/pipe_file_system.hpp b/src/duckdb/src/include/duckdb/common/pipe_file_system.hpp index a84433d21..5fb6ba1fd 100644 --- a/src/duckdb/src/include/duckdb/common/pipe_file_system.hpp +++ b/src/duckdb/src/include/duckdb/common/pipe_file_system.hpp @@ -14,7 +14,7 @@ namespace duckdb { class PipeFileSystem : public FileSystem { public: - static unique_ptr OpenPipe(unique_ptr handle); + static unique_ptr OpenPipe(QueryContext context, unique_ptr handle); int64_t Read(FileHandle &handle, void *buffer, int64_t nr_bytes) override; int64_t Write(FileHandle &handle, void *buffer, int64_t nr_bytes) override; diff --git a/src/duckdb/src/include/duckdb/common/types/vector.hpp b/src/duckdb/src/include/duckdb/common/types/vector.hpp index ff3028f27..6cde5fea1 100644 --- a/src/duckdb/src/include/duckdb/common/types/vector.hpp +++ b/src/duckdb/src/include/duckdb/common/types/vector.hpp @@ -169,7 +169,7 @@ class Vector { //! Turn this vector into a dictionary vector DUCKDB_API void Dictionary(idx_t dictionary_size, const SelectionVector &sel, idx_t count); //! Creates a reference to a dictionary of the other vector - DUCKDB_API void Dictionary(const Vector &dict, idx_t dictionary_size, const SelectionVector &sel, idx_t count); + DUCKDB_API void Dictionary(Vector &dict, idx_t dictionary_size, const SelectionVector &sel, idx_t count); //! Creates the data of this vector with the specified type. Any data that //! is currently in the vector is destroyed. @@ -280,6 +280,9 @@ class Vector { //! The buffer holding auxiliary data of the vector //! e.g. a string vector uses this to store strings buffer_ptr auxiliary; + //! The buffer holding precomputed hashes of the data in the vector + //! used for caching hashes of string dictionaries + buffer_ptr cached_hashes; }; //! The DictionaryBuffer holds a selection vector @@ -390,6 +393,13 @@ struct DictionaryVector { VerifyDictionary(vector); vector.buffer->Cast().SetDictionaryId(std::move(new_id)); } + static inline bool CanCacheHashes(const LogicalType &type) { + return type.InternalType() == PhysicalType::VARCHAR; + } + static inline bool CanCacheHashes(const Vector &vector) { + return DictionarySize(vector).IsValid() && CanCacheHashes(vector.GetType()); + } + static const Vector &GetCachedHashes(Vector &input); }; struct FlatVector { diff --git a/src/duckdb/src/include/duckdb/execution/index/art/base_leaf.hpp b/src/duckdb/src/include/duckdb/execution/index/art/base_leaf.hpp index a5aa56ce9..508ec487f 100644 --- a/src/duckdb/src/include/duckdb/execution/index/art/base_leaf.hpp +++ b/src/duckdb/src/include/duckdb/execution/index/art/base_leaf.hpp @@ -109,6 +109,7 @@ class Node15Leaf : public BaseLeaf<15, NType::NODE_15_LEAF> { private: static void GrowNode7Leaf(ART &art, Node &node15_leaf, Node &node7_leaf); + //! We shrink at <= Node48::SHRINK_THRESHOLD. static void ShrinkNode256Leaf(ART &art, Node &node15_leaf, Node &node256_leaf); }; diff --git a/src/duckdb/src/include/duckdb/execution/index/art/base_node.hpp b/src/duckdb/src/include/duckdb/execution/index/art/base_node.hpp index f06977e9a..17f47b29c 100644 --- a/src/duckdb/src/include/duckdb/execution/index/art/base_node.hpp +++ b/src/duckdb/src/include/duckdb/execution/index/art/base_node.hpp @@ -152,6 +152,7 @@ class Node16 : public BaseNode<16, NType::NODE_16> { private: static void GrowNode4(ART &art, Node &node16, Node &node4); + //! We shrink at < Node48::SHRINK_THRESHOLD. static void ShrinkNode48(ART &art, Node &node16, Node &node48); }; diff --git a/src/duckdb/src/include/duckdb/execution/index/art/node48.hpp b/src/duckdb/src/include/duckdb/execution/index/art/node48.hpp index 6fee57ffd..c03e658f7 100644 --- a/src/duckdb/src/include/duckdb/execution/index/art/node48.hpp +++ b/src/duckdb/src/include/duckdb/execution/index/art/node48.hpp @@ -126,6 +126,7 @@ class Node48 { private: static void GrowNode16(ART &art, Node &node48, Node &node16); + //! We shrink at <= Node256::SHRINK_THRESHOLD. static void ShrinkNode256(ART &art, Node &node48, Node &node256); }; } // namespace duckdb diff --git a/src/duckdb/src/include/duckdb/execution/operator/csv_scanner/csv_file_handle.hpp b/src/duckdb/src/include/duckdb/execution/operator/csv_scanner/csv_file_handle.hpp index 7370be584..acf4950ee 100644 --- a/src/duckdb/src/include/duckdb/execution/operator/csv_scanner/csv_file_handle.hpp +++ b/src/duckdb/src/include/duckdb/execution/operator/csv_scanner/csv_file_handle.hpp @@ -13,6 +13,8 @@ #include "duckdb/common/helper.hpp" #include "duckdb/common/allocator.hpp" #include "duckdb/execution/operator/csv_scanner/encode/csv_encoder.hpp" +#include "duckdb/main/client_context.hpp" + namespace duckdb { class Allocator; class FileSystem; @@ -51,6 +53,7 @@ class CSVFileHandle { double GetProgress() const; private: + QueryContext context; unique_ptr file_handle; CSVEncoder encoder; const OpenFileInfo file; diff --git a/src/duckdb/src/include/duckdb/execution/operator/csv_scanner/encode/csv_encoder.hpp b/src/duckdb/src/include/duckdb/execution/operator/csv_scanner/encode/csv_encoder.hpp index 0fdafef5f..caab637a3 100644 --- a/src/duckdb/src/include/duckdb/execution/operator/csv_scanner/encode/csv_encoder.hpp +++ b/src/duckdb/src/include/duckdb/execution/operator/csv_scanner/encode/csv_encoder.hpp @@ -11,6 +11,7 @@ #include "duckdb/common/typedefs.hpp" #include "duckdb/common/file_system.hpp" #include "duckdb/function/encoding_function.hpp" +#include "duckdb/main/client_context.hpp" namespace duckdb { @@ -54,6 +55,8 @@ class CSVEncoder { string encoding_name; private: + QueryContext context; + //! The actual encoded buffer CSVEncoderBuffer encoded_buffer; //! Potential remaining bytes diff --git a/src/duckdb/src/include/duckdb/function/function_serialization.hpp b/src/duckdb/src/include/duckdb/function/function_serialization.hpp index e0a2c58c9..d24472eff 100644 --- a/src/duckdb/src/include/duckdb/function/function_serialization.hpp +++ b/src/duckdb/src/include/duckdb/function/function_serialization.hpp @@ -57,7 +57,8 @@ class FunctionSerializer { } template - static pair DeserializeBase(Deserializer &deserializer, CatalogType catalog_type) { + static pair DeserializeBase(Deserializer &deserializer, CatalogType catalog_type, + optional_ptr>> children = nullptr) { auto &context = deserializer.Get(); auto name = deserializer.ReadProperty(500, "name"); auto arguments = deserializer.ReadProperty>(501, "arguments"); @@ -70,6 +71,17 @@ class FunctionSerializer { if (schema_name.empty()) { schema_name = DEFAULT_SCHEMA; } + + if (arguments.empty() && original_arguments.empty() && children && !children->empty()) { + // The function is specified as having no arguments, but somehow expressions were passed anyway + // Assume this is a "varargs" function and use the types of the expressions as the arguments + // This can happen when we change a function that used to take varargs, to no longer do so. + arguments.reserve(children->size()); + for (auto &child : *children) { + arguments.push_back(child->return_type); + } + } + auto function = DeserializeFunction(context, catalog_type, catalog_name, schema_name, name, arguments, original_arguments); auto has_serialize = deserializer.ReadProperty(503, "has_serialize"); @@ -133,7 +145,7 @@ class FunctionSerializer { vector> &children, LogicalType return_type) { // NOLINT: clang-tidy bug auto &context = deserializer.Get(); - auto entry = DeserializeBase(deserializer, catalog_type); + auto entry = DeserializeBase(deserializer, catalog_type, children); auto &function = entry.first; auto has_serialize = entry.second; diff --git a/src/duckdb/src/include/duckdb/main/database_path_and_type.hpp b/src/duckdb/src/include/duckdb/main/database_path_and_type.hpp index cbf2a56f3..dff1808ef 100644 --- a/src/duckdb/src/include/duckdb/main/database_path_and_type.hpp +++ b/src/duckdb/src/include/duckdb/main/database_path_and_type.hpp @@ -13,11 +13,13 @@ namespace duckdb { +class QueryContext; + struct DBPathAndType { //! Parse database extension type and rest of path from combined form (type:path) static void ExtractExtensionPrefix(string &path, string &db_type); //! Check the magic bytes of a file and set the database type based on that - static void CheckMagicBytes(FileSystem &fs, string &path, string &db_type); + static void CheckMagicBytes(QueryContext context, FileSystem &fs, string &path, string &db_type); //! Run ExtractExtensionPrefix followed by CheckMagicBytes static void ResolveDatabaseType(FileSystem &fs, string &path, string &db_type); diff --git a/src/duckdb/src/include/duckdb/main/extension_entries.hpp b/src/duckdb/src/include/duckdb/main/extension_entries.hpp index 34588351a..0979ca9b3 100644 --- a/src/duckdb/src/include/duckdb/main/extension_entries.hpp +++ b/src/duckdb/src/include/duckdb/main/extension_entries.hpp @@ -700,6 +700,7 @@ static constexpr ExtensionFunctionEntry EXTENSION_FUNCTIONS[] = { {"string_agg", "core_functions", CatalogType::AGGREGATE_FUNCTION_ENTRY}, {"strpos", "core_functions", CatalogType::SCALAR_FUNCTION_ENTRY}, {"struct_insert", "core_functions", CatalogType::SCALAR_FUNCTION_ENTRY}, + {"struct_update", "core_functions", CatalogType::SCALAR_FUNCTION_ENTRY}, {"sum", "core_functions", CatalogType::AGGREGATE_FUNCTION_ENTRY}, {"sum_no_overflow", "core_functions", CatalogType::AGGREGATE_FUNCTION_ENTRY}, {"sumkahan", "core_functions", CatalogType::AGGREGATE_FUNCTION_ENTRY}, diff --git a/src/duckdb/src/include/duckdb/optimizer/build_probe_side_optimizer.hpp b/src/duckdb/src/include/duckdb/optimizer/build_probe_side_optimizer.hpp index e60938745..d013074f6 100644 --- a/src/duckdb/src/include/duckdb/optimizer/build_probe_side_optimizer.hpp +++ b/src/duckdb/src/include/duckdb/optimizer/build_probe_side_optimizer.hpp @@ -35,7 +35,7 @@ class BuildProbeSideOptimizer : LogicalOperatorVisitor { void VisitExpression(unique_ptr *expression) override {}; private: - void TryFlipJoinChildren(LogicalOperator &op) const; + bool TryFlipJoinChildren(LogicalOperator &op) const; static idx_t ChildHasJoins(LogicalOperator &op); static BuildSize GetBuildSizes(const LogicalOperator &op, idx_t lhs_cardinality, idx_t rhs_cardinality); diff --git a/src/duckdb/src/include/duckdb/optimizer/filter_pushdown.hpp b/src/duckdb/src/include/duckdb/optimizer/filter_pushdown.hpp index 8a2bd63a2..c2fc87a52 100644 --- a/src/duckdb/src/include/duckdb/optimizer/filter_pushdown.hpp +++ b/src/duckdb/src/include/duckdb/optimizer/filter_pushdown.hpp @@ -74,6 +74,9 @@ class FilterPushdown { unique_ptr PushdownLeftJoin(unique_ptr op, unordered_set &left_bindings, unordered_set &right_bindings); + // Pushdown an outer join + unique_ptr PushdownOuterJoin(unique_ptr op, unordered_set &left_bindings, + unordered_set &right_bindings); unique_ptr PushdownSemiAntiJoin(unique_ptr op); // Pushdown a mark join unique_ptr PushdownMarkJoin(unique_ptr op, unordered_set &left_bindings, diff --git a/src/duckdb/src/include/duckdb/planner/expression_iterator.hpp b/src/duckdb/src/include/duckdb/planner/expression_iterator.hpp index e40ab22cc..5b2e2e8a8 100644 --- a/src/duckdb/src/include/duckdb/planner/expression_iterator.hpp +++ b/src/duckdb/src/include/duckdb/planner/expression_iterator.hpp @@ -27,6 +27,8 @@ class ExpressionIterator { static void EnumerateExpression(unique_ptr &expr, const std::function &callback); + static void EnumerateExpression(unique_ptr &expr, + const std::function &child)> &callback); static void VisitExpressionClass(const Expression &expr, ExpressionClass expr_class, const std::function &callback); diff --git a/src/duckdb/src/include/duckdb/storage/magic_bytes.hpp b/src/duckdb/src/include/duckdb/storage/magic_bytes.hpp index 7e3a57881..4df1c8b8e 100644 --- a/src/duckdb/src/include/duckdb/storage/magic_bytes.hpp +++ b/src/duckdb/src/include/duckdb/storage/magic_bytes.hpp @@ -12,6 +12,7 @@ namespace duckdb { class FileSystem; +class QueryContext; enum class DataFileType : uint8_t { FILE_DOES_NOT_EXIST, // file does not exist @@ -23,7 +24,7 @@ enum class DataFileType : uint8_t { class MagicBytes { public: - static DataFileType CheckMagicBytes(FileSystem &fs, const string &path); + static DataFileType CheckMagicBytes(QueryContext context, FileSystem &fs, const string &path); }; } // namespace duckdb diff --git a/src/duckdb/src/main/database_manager.cpp b/src/duckdb/src/main/database_manager.cpp index f3b1599a2..d8d6f1f1e 100644 --- a/src/duckdb/src/main/database_manager.cpp +++ b/src/duckdb/src/main/database_manager.cpp @@ -187,7 +187,7 @@ void DatabaseManager::GetDatabaseType(ClientContext &context, AttachInfo &info, CheckPathConflict(context, info.path); auto &fs = FileSystem::GetFileSystem(context); - DBPathAndType::CheckMagicBytes(fs, info.path, options.db_type); + DBPathAndType::CheckMagicBytes(QueryContext(context), fs, info.path, options.db_type); } if (options.db_type.empty()) { diff --git a/src/duckdb/src/main/database_path_and_type.cpp b/src/duckdb/src/main/database_path_and_type.cpp index 3cae9b54f..5f0d87ac5 100644 --- a/src/duckdb/src/main/database_path_and_type.cpp +++ b/src/duckdb/src/main/database_path_and_type.cpp @@ -3,6 +3,7 @@ #include "duckdb/main/extension_helper.hpp" #include "duckdb/storage/magic_bytes.hpp" #include "duckdb/function/replacement_scan.hpp" +#include "duckdb/main/client_context.hpp" namespace duckdb { @@ -15,9 +16,9 @@ void DBPathAndType::ExtractExtensionPrefix(string &path, string &db_type) { } } -void DBPathAndType::CheckMagicBytes(FileSystem &fs, string &path, string &db_type) { +void DBPathAndType::CheckMagicBytes(QueryContext context, FileSystem &fs, string &path, string &db_type) { // if there isn't - check the magic bytes of the file (if any) - auto file_type = MagicBytes::CheckMagicBytes(fs, path); + auto file_type = MagicBytes::CheckMagicBytes(context, fs, path); db_type = string(); switch (file_type) { case DataFileType::SQLITE_FILE: @@ -50,7 +51,7 @@ void DBPathAndType::ResolveDatabaseType(FileSystem &fs, string &path, string &db return; } // check database type by reading the magic bytes of a file - DBPathAndType::CheckMagicBytes(fs, path, db_type); + DBPathAndType::CheckMagicBytes(QueryContext(), fs, path, db_type); } } // namespace duckdb diff --git a/src/duckdb/src/main/extension/extension_install.cpp b/src/duckdb/src/main/extension/extension_install.cpp index ca02a979c..5be3dded1 100644 --- a/src/duckdb/src/main/extension/extension_install.cpp +++ b/src/duckdb/src/main/extension/extension_install.cpp @@ -169,7 +169,7 @@ static unsafe_unique_array ReadExtensionFileFromDisk(FileSystem &fs, con auto source_file = fs.OpenFile(path, FileFlags::FILE_FLAGS_READ); file_size = source_file->GetFileSize(); auto in_buffer = make_unsafe_uniq_array(file_size); - source_file->Read(in_buffer.get(), file_size); + source_file->Read(QueryContext(), in_buffer.get(), file_size); source_file->Close(); return in_buffer; } diff --git a/src/duckdb/src/optimizer/build_probe_side_optimizer.cpp b/src/duckdb/src/optimizer/build_probe_side_optimizer.cpp index 2eea10358..b5c422133 100644 --- a/src/duckdb/src/optimizer/build_probe_side_optimizer.cpp +++ b/src/duckdb/src/optimizer/build_probe_side_optimizer.cpp @@ -157,7 +157,7 @@ idx_t BuildProbeSideOptimizer::ChildHasJoins(LogicalOperator &op) { return ChildHasJoins(*op.children[0]); } -void BuildProbeSideOptimizer::TryFlipJoinChildren(LogicalOperator &op) const { +bool BuildProbeSideOptimizer::TryFlipJoinChildren(LogicalOperator &op) const { auto &left_child = *op.children[0]; auto &right_child = *op.children[1]; const auto lhs_cardinality = left_child.has_estimated_cardinality ? left_child.estimated_cardinality @@ -209,6 +209,7 @@ void BuildProbeSideOptimizer::TryFlipJoinChildren(LogicalOperator &op) const { if (swap) { FlipChildren(op); } + return swap; } void BuildProbeSideOptimizer::VisitOperator(LogicalOperator &op) { @@ -217,8 +218,7 @@ void BuildProbeSideOptimizer::VisitOperator(LogicalOperator &op) { case LogicalOperatorType::LOGICAL_DELIM_JOIN: { auto &join = op.Cast(); if (HasInverseJoinType(join.join_type)) { - FlipChildren(join); - join.delim_flipped = true; + join.delim_flipped = TryFlipJoinChildren(join); } break; } diff --git a/src/duckdb/src/optimizer/filter_pushdown.cpp b/src/duckdb/src/optimizer/filter_pushdown.cpp index 461d79767..dcb79fe60 100644 --- a/src/duckdb/src/optimizer/filter_pushdown.cpp +++ b/src/duckdb/src/optimizer/filter_pushdown.cpp @@ -160,6 +160,9 @@ unique_ptr FilterPushdown::PushdownJoin(unique_ptr result; switch (join.join_type) { + case JoinType::OUTER: + result = PushdownOuterJoin(std::move(op), left_bindings, right_bindings); + break; case JoinType::INNER: // AsOf joins can't push anything into the RHS, so treat it as a left join if (op->type == LogicalOperatorType::LOGICAL_ASOF_JOIN) { diff --git a/src/duckdb/src/optimizer/pushdown/pushdown_outer_join.cpp b/src/duckdb/src/optimizer/pushdown/pushdown_outer_join.cpp new file mode 100644 index 000000000..3d81c68c1 --- /dev/null +++ b/src/duckdb/src/optimizer/pushdown/pushdown_outer_join.cpp @@ -0,0 +1,201 @@ +#include "duckdb/optimizer/filter_pushdown.hpp" +#include "duckdb/parser/expression_map.hpp" +#include "duckdb/planner/expression/bound_comparison_expression.hpp" +#include "duckdb/planner/expression/bound_constant_expression.hpp" +#include "duckdb/planner/expression_iterator.hpp" +#include "duckdb/planner/operator/logical_comparison_join.hpp" + +namespace duckdb { + +using Filter = FilterPushdown::Filter; + +//! A representation of a coalesce expression that removes unnecessary usages of +//! `coalesce` in its arguments when such usages don't affect the final value of +//! the expression. +//! E.g. `coalesce(a, coalesce(b, c))` is equivalent to `coalesce(a, b, c)` so +//! that similarly, with some abuse of notation, +//! `FlattenedCoalesce::Of({a, coalesce(b, c)}) == FlattenedCoalesce.Of({a, b, c})` +struct FlattenedCoalesce { +public: + vector> args; + + bool operator==(const FlattenedCoalesce &other) const { + if (args.size() != other.args.size()) { + return false; + } + + for (idx_t i = 0; i < args.size(); i++) { + if (!args[i].get().Equals(other.args[i].get())) { + return false; + } + } + + return true; + } + + static FlattenedCoalesce Of(const vector> &expressions) { + vector> args {}; + for (auto expr : expressions) { + EnumerateFlattenedCoalesceArgs(expr, [&](Expression &arg) { args.push_back(arg); }); + } + return {args}; + } + +private: + static void EnumerateFlattenedCoalesceArgs(Expression &expr, const std::function &callback) { + if (expr.GetExpressionType() == ExpressionType::OPERATOR_COALESCE) { + ExpressionIterator::EnumerateChildren( + expr, [&](Expression &arg) { EnumerateFlattenedCoalesceArgs(arg, callback); }); + } else { + callback(expr); + } + } +}; + +struct FlattenedCoalesceHash { + hash_t operator()(const FlattenedCoalesce &coalesce) const { + hash_t hash = 0; + for (auto arg : coalesce.args) { + hash = CombineHash(hash, arg.get().Hash()); + } + return hash; + } +}; + +//! Replace all occurrences of `exprs_to_replace` in `expr` with `replacement_expr` +static unique_ptr ReplaceIn(unique_ptr expr, const expression_set_t &exprs_to_replace, + const Expression &replacement_expr) { + ExpressionIterator::EnumerateExpression(expr, [&](unique_ptr &sub_expr) { + if (exprs_to_replace.find(*sub_expr) != exprs_to_replace.end()) { + sub_expr = replacement_expr.Copy(); + } + }); + + return std::move(expr); +} + +//! True if replacing all the `args` expressions occurring in `expr` with a +//! fixed constant would make the `expr` a scalar value. +static bool ExprIsFunctionOnlyOf(const Expression &expr, const expression_set_t &args) { + auto expr_to_check = expr.Copy(); + + ExpressionIterator::EnumerateExpression(expr_to_check, [&](unique_ptr &sub_expr) { + if (args.find(*sub_expr) != args.end()) { + auto null_value = make_uniq(Value(sub_expr->return_type)); + sub_expr = std::move(null_value); + } + }); + + return expr_to_check->IsScalar(); +} + +//! Whenever a filter is of the form `P(coalesce(l, r))` or `P(coalesce(r, l))` +//! where `P` is some predicate that depends only on `coalesce(l, r)` and there +//! is a join condition of the form `l = r` where `l` and `r` are join keys for +//! the left and right table respectively, then pushdown `P(l)` to the left +//! table, `P(r)` to the right table, and remove the original filter. +static bool +PushDownFiltersOnCoalescedEqualJoinKeys(vector> &filters, + const vector &join_conditions, + const std::function filter)> &pushdown_left, + const std::function filter)> &pushdown_right) { + // Generate set of all possible coalesced join keys expressions to later + // discover filters on such expressions which are candidates for pushdown + unordered_map, FlattenedCoalesceHash> + join_cond_by_coalesced_join_keys; + + for (auto &cond : join_conditions) { + if (cond.comparison == ExpressionType::COMPARE_EQUAL) { + auto left = std::ref(*cond.left); + auto right = std::ref(*cond.right); + auto coalesce_left_right = FlattenedCoalesce::Of({left, right}); + auto coalesce_right_left = FlattenedCoalesce::Of({right, left}); + join_cond_by_coalesced_join_keys.emplace(coalesce_left_right, std::ref(cond)); + join_cond_by_coalesced_join_keys.emplace(coalesce_right_left, std::ref(cond)); + } + } + + if (join_cond_by_coalesced_join_keys.empty()) { + return false; + } + + bool has_applied_pushdown = false; + for (idx_t i = 0; i < filters.size(); i++) { + auto &filter = filters[i]->filter; + if (filter->IsVolatile() || filter->CanThrow()) { + continue; + } + + // occurrences of equivalent coalesce expressions on the same join keys + // which need to be replaced if the filter is to be pushed down + expression_set_t coalesce_exprs_to_replace; + const JoinCondition *join_cond_ptr = nullptr; + bool many_non_equivalent_coalesce_exprs = false; + + ExpressionIterator::EnumerateExpression(filter, [&](Expression &sub_expr) { + if (many_non_equivalent_coalesce_exprs || + sub_expr.GetExpressionType() != ExpressionType::OPERATOR_COALESCE) { + return; + } + + auto sub_expr_flattened_coalesce = FlattenedCoalesce::Of({sub_expr}); + auto join_cond_it = join_cond_by_coalesced_join_keys.find(sub_expr_flattened_coalesce); + if (join_cond_it == join_cond_by_coalesced_join_keys.end()) { + return; + } + + auto new_join_cond_ptr = &join_cond_it->second.get(); + if (join_cond_ptr && new_join_cond_ptr != join_cond_ptr) { + many_non_equivalent_coalesce_exprs = true; + return; + } + + join_cond_ptr = new_join_cond_ptr; + coalesce_exprs_to_replace.insert(sub_expr); + }); + + if (coalesce_exprs_to_replace.empty() || many_non_equivalent_coalesce_exprs || + !ExprIsFunctionOnlyOf(*filter, coalesce_exprs_to_replace)) { + continue; + } + + auto left_filter = ReplaceIn(filter->Copy(), coalesce_exprs_to_replace, *join_cond_ptr->left); + auto right_filter = ReplaceIn(filter->Copy(), coalesce_exprs_to_replace, *join_cond_ptr->right); + pushdown_left(std::move(left_filter)); + pushdown_right(std::move(right_filter)); + filters.erase_at(i); + has_applied_pushdown = true; + i--; + } + + return has_applied_pushdown; +} + +unique_ptr FilterPushdown::PushdownOuterJoin(unique_ptr op, + unordered_set &left_bindings, + unordered_set &right_bindings) { + + if (op->type != LogicalOperatorType::LOGICAL_COMPARISON_JOIN) { + return FinishPushdown(std::move(op)); + } + + auto &join = op->Cast(); + D_ASSERT(join.join_type == JoinType::OUTER); + + FilterPushdown left_pushdown(optimizer, convert_mark_joins), right_pushdown(optimizer, convert_mark_joins); + auto has_applied_pushdown = PushDownFiltersOnCoalescedEqualJoinKeys( + filters, join.conditions, [&](unique_ptr filter) { left_pushdown.AddFilter(std::move(filter)); }, + [&](unique_ptr filter) { right_pushdown.AddFilter(std::move(filter)); }); + + if (!has_applied_pushdown) { + return FinishPushdown(std::move(op)); + } + + left_pushdown.GenerateFilters(); + right_pushdown.GenerateFilters(); + op->children[0] = left_pushdown.Rewrite(std::move(op->children[0])); + op->children[1] = right_pushdown.Rewrite(std::move(op->children[1])); + return PushFinalFilters(std::move(op)); +} + +} // namespace duckdb diff --git a/src/duckdb/src/parallel/thread_context.cpp b/src/duckdb/src/parallel/thread_context.cpp index eb3d619ea..2f04f616b 100644 --- a/src/duckdb/src/parallel/thread_context.cpp +++ b/src/duckdb/src/parallel/thread_context.cpp @@ -21,14 +21,6 @@ ThreadContext::ThreadContext(ClientContext &context) : profiler(context) { } log_context.thread_id = TaskScheduler::GetEstimatedCPUId(); - if (context.transaction.HasActiveTransaction()) { - auto query_id = context.transaction.GetActiveQuery(); - if (query_id == DConstants::INVALID_INDEX) { - log_context.transaction_id = optional_idx(); - } else { - log_context.transaction_id = query_id; - } - } logger = LogManager::Get(context).CreateLogger(log_context, true); } diff --git a/src/duckdb/src/planner/expression_iterator.cpp b/src/duckdb/src/planner/expression_iterator.cpp index 2c6ef4ae7..8f3140e17 100644 --- a/src/duckdb/src/planner/expression_iterator.cpp +++ b/src/duckdb/src/planner/expression_iterator.cpp @@ -152,6 +152,16 @@ void ExpressionIterator::EnumerateExpression(unique_ptr &expr, [&](unique_ptr &child) { EnumerateExpression(child, callback); }); } +void ExpressionIterator::EnumerateExpression(unique_ptr &expr, + const std::function &child)> &callback) { + if (!expr) { + return; + } + callback(expr); + ExpressionIterator::EnumerateChildren(*expr, + [&](unique_ptr &child) { EnumerateExpression(child, callback); }); +} + void ExpressionIterator::VisitExpressionClass(const Expression &expr, ExpressionClass expr_class, const std::function &callback) { if (expr.GetExpressionClass() == expr_class) { diff --git a/src/duckdb/src/storage/caching_file_system.cpp b/src/duckdb/src/storage/caching_file_system.cpp index 19a032708..3de905228 100644 --- a/src/duckdb/src/storage/caching_file_system.cpp +++ b/src/duckdb/src/storage/caching_file_system.cpp @@ -124,7 +124,7 @@ BufferHandle CachingFileHandle::Read(data_ptr_t &buffer, idx_t &nr_bytes) { if (!external_file_cache.IsEnabled() || !CanSeek()) { result = external_file_cache.GetBufferManager().Allocate(MemoryTag::EXTERNAL_FILE_CACHE, nr_bytes); buffer = result.Ptr(); - nr_bytes = NumericCast(GetFileHandle().Read(buffer, nr_bytes)); + nr_bytes = NumericCast(GetFileHandle().Read(context, buffer, nr_bytes)); position += NumericCast(nr_bytes); return result; } @@ -142,7 +142,7 @@ BufferHandle CachingFileHandle::Read(data_ptr_t &buffer, idx_t &nr_bytes) { buffer = result.Ptr(); GetFileHandle().Seek(position); - nr_bytes = NumericCast(GetFileHandle().Read(buffer, nr_bytes)); + nr_bytes = NumericCast(GetFileHandle().Read(context, buffer, nr_bytes)); auto new_file_range = make_shared_ptr(result.GetBlockHandle(), nr_bytes, position, version_tag); result = TryInsertFileRange(result, buffer, nr_bytes, position, new_file_range); diff --git a/src/duckdb/src/storage/magic_bytes.cpp b/src/duckdb/src/storage/magic_bytes.cpp index a4f32896f..51aea584a 100644 --- a/src/duckdb/src/storage/magic_bytes.cpp +++ b/src/duckdb/src/storage/magic_bytes.cpp @@ -1,10 +1,11 @@ #include "duckdb/storage/magic_bytes.hpp" +#include "duckdb/main/client_context.hpp" #include "duckdb/common/local_file_system.hpp" #include "duckdb/storage/storage_info.hpp" namespace duckdb { -DataFileType MagicBytes::CheckMagicBytes(FileSystem &fs, const string &path) { +DataFileType MagicBytes::CheckMagicBytes(QueryContext context, FileSystem &fs, const string &path) { if (path.empty() || path == IN_MEMORY_PATH) { return DataFileType::DUCKDB_FILE; } @@ -16,7 +17,7 @@ DataFileType MagicBytes::CheckMagicBytes(FileSystem &fs, const string &path) { constexpr const idx_t MAGIC_BYTES_READ_SIZE = 16; char buffer[MAGIC_BYTES_READ_SIZE] = {}; - handle->Read(buffer, MAGIC_BYTES_READ_SIZE); + handle->Read(context, buffer, MAGIC_BYTES_READ_SIZE); if (memcmp(buffer, "SQLite format 3\0", 16) == 0) { return DataFileType::SQLITE_FILE; } diff --git a/src/duckdb/src/storage/single_file_block_manager.cpp b/src/duckdb/src/storage/single_file_block_manager.cpp index cfcbd43a7..8626125ce 100644 --- a/src/duckdb/src/storage/single_file_block_manager.cpp +++ b/src/duckdb/src/storage/single_file_block_manager.cpp @@ -261,6 +261,7 @@ FileOpenFlags SingleFileBlockManager::GetFileFlags(bool create_new) const { } // database files can be read from in parallel result |= FileFlags::FILE_FLAGS_PARALLEL_ACCESS; + result |= FileFlags::FILE_FLAGS_MULTI_CLIENT_ACCESS; return result; } diff --git a/src/duckdb/src/storage/write_ahead_log.cpp b/src/duckdb/src/storage/write_ahead_log.cpp index 69045bb49..a913f1fa6 100644 --- a/src/duckdb/src/storage/write_ahead_log.cpp +++ b/src/duckdb/src/storage/write_ahead_log.cpp @@ -47,9 +47,10 @@ BufferedFileWriter &WriteAheadLog::Initialize() { } lock_guard lock(wal_lock); if (!writer) { - writer = make_uniq(FileSystem::Get(database), wal_path, - FileFlags::FILE_FLAGS_WRITE | FileFlags::FILE_FLAGS_FILE_CREATE | - FileFlags::FILE_FLAGS_APPEND); + writer = + make_uniq(FileSystem::Get(database), wal_path, + FileFlags::FILE_FLAGS_WRITE | FileFlags::FILE_FLAGS_FILE_CREATE | + FileFlags::FILE_FLAGS_APPEND | FileFlags::FILE_FLAGS_MULTI_CLIENT_ACCESS); if (init_state == WALInitState::UNINITIALIZED_REQUIRES_TRUNCATE) { writer->Truncate(wal_size); } diff --git a/src/duckdb/ub_extension_core_functions_scalar_struct.cpp b/src/duckdb/ub_extension_core_functions_scalar_struct.cpp index 8d52b2f9c..46d3f649f 100644 --- a/src/duckdb/ub_extension_core_functions_scalar_struct.cpp +++ b/src/duckdb/ub_extension_core_functions_scalar_struct.cpp @@ -1,2 +1,4 @@ #include "extension/core_functions/scalar/struct/struct_insert.cpp" +#include "extension/core_functions/scalar/struct/struct_update.cpp" + diff --git a/src/duckdb/ub_src_optimizer_pushdown.cpp b/src/duckdb/ub_src_optimizer_pushdown.cpp index 26f3d9c43..3fcb5727e 100644 --- a/src/duckdb/ub_src_optimizer_pushdown.cpp +++ b/src/duckdb/ub_src_optimizer_pushdown.cpp @@ -16,6 +16,8 @@ #include "src/optimizer/pushdown/pushdown_mark_join.cpp" +#include "src/optimizer/pushdown/pushdown_outer_join.cpp" + #include "src/optimizer/pushdown/pushdown_projection.cpp" #include "src/optimizer/pushdown/pushdown_semi_anti_join.cpp"