diff --git a/src/duckdb/src/common/adbc/adbc.cpp b/src/duckdb/src/common/adbc/adbc.cpp index 2b6535055..fcbfe191f 100644 --- a/src/duckdb/src/common/adbc/adbc.cpp +++ b/src/duckdb/src/common/adbc/adbc.cpp @@ -1161,12 +1161,21 @@ AdbcStatusCode StatementSetOption(struct AdbcStatement *statement, const char *k return ADBC_STATUS_INVALID_ARGUMENT; } +std::string createFilter(const char *input) { + if (input) { + auto quoted = duckdb::KeywordHelper::WriteQuoted(input, '\''); + return quoted; + } + return "'%'"; +} + AdbcStatusCode ConnectionGetObjects(struct AdbcConnection *connection, int depth, const char *catalog, const char *db_schema, const char *table_name, const char **table_type, const char *column_name, struct ArrowArrayStream *out, struct AdbcError *error) { - std::string catalog_filter = catalog ? catalog : "%"; - std::string db_schema_filter = db_schema ? db_schema : "%"; - std::string table_name_filter = table_name ? table_name : "%"; + std::string catalog_filter = createFilter(catalog); + std::string db_schema_filter = createFilter(db_schema); + std::string table_name_filter = createFilter(table_name); + std::string column_name_filter = createFilter(column_name); std::string table_type_condition = ""; if (table_type && table_type[0]) { table_type_condition = " AND table_type IN ("; @@ -1182,13 +1191,10 @@ AdbcStatusCode ConnectionGetObjects(struct AdbcConnection *connection, int depth if (i > 0) { table_type_condition += ", "; } - table_type_condition += "'"; - table_type_condition += table_type[i]; - table_type_condition += "'"; + table_type_condition += createFilter(table_type[i]); } table_type_condition += ")"; } - std::string column_name_filter = column_name ? column_name : "%"; std::string query; switch (depth) { @@ -1233,7 +1239,7 @@ AdbcStatusCode ConnectionGetObjects(struct AdbcConnection *connection, int depth )[] catalog_db_schemas FROM information_schema.schemata - WHERE catalog_name LIKE '%s' + WHERE catalog_name LIKE %s GROUP BY catalog_name )", catalog_filter); @@ -1246,7 +1252,7 @@ AdbcStatusCode ConnectionGetObjects(struct AdbcConnection *connection, int depth catalog_name, schema_name, FROM information_schema.schemata - WHERE schema_name LIKE '%s' + WHERE schema_name LIKE %s ) SELECT @@ -1289,7 +1295,7 @@ AdbcStatusCode ConnectionGetObjects(struct AdbcConnection *connection, int depth information_schema.schemata LEFT JOIN db_schemas dbs USING (catalog_name, schema_name) - WHERE catalog_name LIKE '%s' + WHERE catalog_name LIKE %s GROUP BY catalog_name )", db_schema_filter, catalog_filter); @@ -1333,7 +1339,7 @@ AdbcStatusCode ConnectionGetObjects(struct AdbcConnection *connection, int depth )[], }) db_schema_tables FROM information_schema.tables - WHERE table_name LIKE '%s'%s + WHERE table_name LIKE %s%s GROUP BY table_catalog, table_schema ), db_schemas AS ( @@ -1344,7 +1350,7 @@ AdbcStatusCode ConnectionGetObjects(struct AdbcConnection *connection, int depth FROM information_schema.schemata LEFT JOIN tables USING (catalog_name, schema_name) - WHERE schema_name LIKE '%s' + WHERE schema_name LIKE %s ) SELECT @@ -1357,7 +1363,7 @@ AdbcStatusCode ConnectionGetObjects(struct AdbcConnection *connection, int depth information_schema.schemata LEFT JOIN db_schemas dbs USING (catalog_name, schema_name) - WHERE catalog_name LIKE '%s' + WHERE catalog_name LIKE %s GROUP BY catalog_name )", table_name_filter, table_type_condition, db_schema_filter, catalog_filter); @@ -1392,7 +1398,7 @@ AdbcStatusCode ConnectionGetObjects(struct AdbcConnection *connection, int depth xdbc_is_generatedcolumn: NULL::BOOLEAN, }) table_columns FROM information_schema.columns - WHERE column_name LIKE '%s' + WHERE column_name LIKE %s GROUP BY table_catalog, table_schema, table_name ), constraints AS ( @@ -1421,7 +1427,7 @@ AdbcStatusCode ConnectionGetObjects(struct AdbcConnection *connection, int depth constraint_column_names, list_filter( constraint_column_names, - lambda name: name LIKE '%s' + lambda name: name LIKE %s ) ) GROUP BY database_name, schema_name, table_name @@ -1441,7 +1447,7 @@ AdbcStatusCode ConnectionGetObjects(struct AdbcConnection *connection, int depth USING (table_catalog, table_schema, table_name) LEFT JOIN constraints USING (table_catalog, table_schema, table_name) - WHERE table_name LIKE '%s'%s + WHERE table_name LIKE %s%s GROUP BY table_catalog, table_schema ), db_schemas AS ( @@ -1452,7 +1458,7 @@ AdbcStatusCode ConnectionGetObjects(struct AdbcConnection *connection, int depth FROM information_schema.schemata LEFT JOIN tables USING (catalog_name, schema_name) - WHERE schema_name LIKE '%s' + WHERE schema_name LIKE %s ) SELECT @@ -1465,7 +1471,7 @@ AdbcStatusCode ConnectionGetObjects(struct AdbcConnection *connection, int depth information_schema.schemata LEFT JOIN db_schemas dbs USING (catalog_name, schema_name) - WHERE catalog_name LIKE '%s' + WHERE catalog_name LIKE %s GROUP BY catalog_name )", column_name_filter, column_name_filter, table_name_filter, diff --git a/src/duckdb/src/execution/operator/aggregate/physical_streaming_window.cpp b/src/duckdb/src/execution/operator/aggregate/physical_streaming_window.cpp index 38ef17061..afd53e3f7 100644 --- a/src/duckdb/src/execution/operator/aggregate/physical_streaming_window.cpp +++ b/src/duckdb/src/execution/operator/aggregate/physical_streaming_window.cpp @@ -17,11 +17,12 @@ PhysicalStreamingWindow::PhysicalStreamingWindow(PhysicalPlan &physical_plan, ve class StreamingWindowGlobalState : public GlobalOperatorState { public: - StreamingWindowGlobalState() : row_number(1) { - } + explicit StreamingWindowGlobalState(ClientContext &client); //! The next row number. std::atomic row_number; + //! The single local state + unique_ptr local_state; }; class StreamingWindowState : public OperatorState { @@ -340,6 +341,10 @@ class StreamingWindowState : public OperatorState { DataChunk shifted; }; +StreamingWindowGlobalState::StreamingWindowGlobalState(ClientContext &client) : row_number(1) { + local_state = make_uniq(client); +} + bool PhysicalStreamingWindow::IsStreamingFunction(ClientContext &context, unique_ptr &expr) { auto &wexpr = expr->Cast(); if (!wexpr.partitions.empty() || !wexpr.orders.empty() || wexpr.ignore_nulls || !wexpr.arg_orders.empty() || @@ -374,12 +379,8 @@ bool PhysicalStreamingWindow::IsStreamingFunction(ClientContext &context, unique } } -unique_ptr PhysicalStreamingWindow::GetGlobalOperatorState(ClientContext &context) const { - return make_uniq(); -} - -unique_ptr PhysicalStreamingWindow::GetOperatorState(ExecutionContext &context) const { - return make_uniq(context.client); +unique_ptr PhysicalStreamingWindow::GetGlobalOperatorState(ClientContext &client) const { + return make_uniq(client); } void StreamingWindowState::AggregateState::Execute(ExecutionContext &context, DataChunk &input, Vector &result) { @@ -486,9 +487,9 @@ void StreamingWindowState::AggregateState::Execute(ExecutionContext &context, Da } void PhysicalStreamingWindow::ExecuteFunctions(ExecutionContext &context, DataChunk &output, DataChunk &delayed, - GlobalOperatorState &gstate_p, OperatorState &state_p) const { + GlobalOperatorState &gstate_p) const { auto &gstate = gstate_p.Cast(); - auto &state = state_p.Cast(); + auto &state = gstate.local_state->Cast(); // Compute window functions const idx_t count = output.size(); @@ -530,9 +531,9 @@ void PhysicalStreamingWindow::ExecuteFunctions(ExecutionContext &context, DataCh } void PhysicalStreamingWindow::ExecuteInput(ExecutionContext &context, DataChunk &delayed, DataChunk &input, - DataChunk &output, GlobalOperatorState &gstate_p, - OperatorState &state_p) const { - auto &state = state_p.Cast(); + DataChunk &output, GlobalOperatorState &gstate_p) const { + auto &gstate = gstate_p.Cast(); + auto &state = gstate.local_state->Cast(); // Put payload columns in place for (idx_t col_idx = 0; col_idx < input.data.size(); col_idx++) { @@ -548,13 +549,13 @@ void PhysicalStreamingWindow::ExecuteInput(ExecutionContext &context, DataChunk } output.SetCardinality(count); - ExecuteFunctions(context, output, state.delayed, gstate_p, state_p); + ExecuteFunctions(context, output, state.delayed, gstate_p); } void PhysicalStreamingWindow::ExecuteShifted(ExecutionContext &context, DataChunk &delayed, DataChunk &input, - DataChunk &output, GlobalOperatorState &gstate_p, - OperatorState &state_p) const { - auto &state = state_p.Cast(); + DataChunk &output, GlobalOperatorState &gstate_p) const { + auto &gstate = gstate_p.Cast(); + auto &state = gstate.local_state->Cast(); auto &shifted = state.shifted; idx_t out = output.size(); @@ -576,12 +577,11 @@ void PhysicalStreamingWindow::ExecuteShifted(ExecutionContext &context, DataChun } delayed.SetCardinality(delay - out + in); - ExecuteFunctions(context, output, delayed, gstate_p, state_p); + ExecuteFunctions(context, output, delayed, gstate_p); } void PhysicalStreamingWindow::ExecuteDelayed(ExecutionContext &context, DataChunk &delayed, DataChunk &input, - DataChunk &output, GlobalOperatorState &gstate_p, - OperatorState &state_p) const { + DataChunk &output, GlobalOperatorState &gstate_p) const { // Put payload columns in place for (idx_t col_idx = 0; col_idx < delayed.data.size(); col_idx++) { output.data[col_idx].Reference(delayed.data[col_idx]); @@ -589,12 +589,13 @@ void PhysicalStreamingWindow::ExecuteDelayed(ExecutionContext &context, DataChun idx_t count = delayed.size(); output.SetCardinality(count); - ExecuteFunctions(context, output, input, gstate_p, state_p); + ExecuteFunctions(context, output, input, gstate_p); } OperatorResultType PhysicalStreamingWindow::Execute(ExecutionContext &context, DataChunk &input, DataChunk &output, - GlobalOperatorState &gstate_p, OperatorState &state_p) const { - auto &state = state_p.Cast(); + GlobalOperatorState &gstate_p, OperatorState &) const { + auto &gstate = gstate_p.Cast(); + auto &state = gstate.local_state->Cast(); if (!state.initialized) { state.Initialize(context.client, input, select_list); } @@ -615,27 +616,27 @@ OperatorResultType PhysicalStreamingWindow::Execute(ExecutionContext &context, D // If we can't consume all of the delayed values, // we need to split them instead of referencing them all output.SetCardinality(input.size()); - ExecuteShifted(context, delayed, input, output, gstate_p, state_p); + ExecuteShifted(context, delayed, input, output, gstate_p); // We delayed the unused input so ask for more return OperatorResultType::NEED_MORE_INPUT; } else if (delayed.size()) { // We have enough delayed rows so flush them - ExecuteDelayed(context, delayed, input, output, gstate_p, state_p); + ExecuteDelayed(context, delayed, input, output, gstate_p); // Defer resetting delayed as it may be referenced. delayed.SetCardinality(0); // Come back to process the input return OperatorResultType::HAVE_MORE_OUTPUT; } else { // No delayed rows, so emit what we can and delay the rest. - ExecuteInput(context, delayed, input, output, gstate_p, state_p); + ExecuteInput(context, delayed, input, output, gstate_p); return OperatorResultType::NEED_MORE_INPUT; } } OperatorFinalizeResultType PhysicalStreamingWindow::FinalExecute(ExecutionContext &context, DataChunk &output, - GlobalOperatorState &gstate_p, - OperatorState &state_p) const { - auto &state = state_p.Cast(); + GlobalOperatorState &gstate_p, OperatorState &) const { + auto &gstate = gstate_p.Cast(); + auto &state = gstate.local_state->Cast(); if (state.initialized && state.lead_count) { auto &delayed = state.delayed; @@ -646,10 +647,10 @@ OperatorFinalizeResultType PhysicalStreamingWindow::FinalExecute(ExecutionContex if (output.GetCapacity() < delayed.size()) { // More than one output buffer was delayed, so shift in what we can output.SetCardinality(output.GetCapacity()); - ExecuteShifted(context, delayed, input, output, gstate_p, state_p); + ExecuteShifted(context, delayed, input, output, gstate_p); return OperatorFinalizeResultType::HAVE_MORE_OUTPUT; } - ExecuteDelayed(context, delayed, input, output, gstate_p, state_p); + ExecuteDelayed(context, delayed, input, output, gstate_p); } return OperatorFinalizeResultType::FINISHED; diff --git a/src/duckdb/src/execution/operator/persistent/physical_merge_into.cpp b/src/duckdb/src/execution/operator/persistent/physical_merge_into.cpp index 04a5f3dca..b8c7e23a2 100644 --- a/src/duckdb/src/execution/operator/persistent/physical_merge_into.cpp +++ b/src/duckdb/src/execution/operator/persistent/physical_merge_into.cpp @@ -473,10 +473,17 @@ SourceResultType PhysicalMergeInto::GetData(ExecutionContext &context, DataChunk // no action to scan from continue; } + // found a good one + break; + } + if (lstate.index < actions.size()) { + auto &action = *actions[lstate.index]; + auto &child_gstate = *gstate.global_states[lstate.index]; auto &child_lstate = *lstate.local_states[lstate.index]; OperatorSourceInput source_input {child_gstate, child_lstate, input.interrupt_state}; + lstate.scan_chunk.Reset(); auto result = action.op->GetData(context, lstate.scan_chunk, source_input); if (lstate.scan_chunk.size() > 0) { // construct the result chunk @@ -505,9 +512,13 @@ SourceResultType PhysicalMergeInto::GetData(ExecutionContext &context, DataChunk if (result != SourceResultType::FINISHED) { return result; - } - if (chunk.size() != 0) { - return SourceResultType::HAVE_MORE_OUTPUT; + } else { + lstate.index++; + if (lstate.index < actions.size()) { + return SourceResultType::HAVE_MORE_OUTPUT; + } else { + return SourceResultType::FINISHED; + } } } return SourceResultType::FINISHED; diff --git a/src/duckdb/src/function/table/version/pragma_version.cpp b/src/duckdb/src/function/table/version/pragma_version.cpp index 0e8af9cfa..2419b63bb 100644 --- a/src/duckdb/src/function/table/version/pragma_version.cpp +++ b/src/duckdb/src/function/table/version/pragma_version.cpp @@ -1,5 +1,5 @@ #ifndef DUCKDB_PATCH_VERSION -#define DUCKDB_PATCH_VERSION "4-dev34" +#define DUCKDB_PATCH_VERSION "4-dev55" #endif #ifndef DUCKDB_MINOR_VERSION #define DUCKDB_MINOR_VERSION 4 @@ -8,10 +8,10 @@ #define DUCKDB_MAJOR_VERSION 1 #endif #ifndef DUCKDB_VERSION -#define DUCKDB_VERSION "v1.4.4-dev34" +#define DUCKDB_VERSION "v1.4.4-dev55" #endif #ifndef DUCKDB_SOURCE_ID -#define DUCKDB_SOURCE_ID "6e4e3391db" +#define DUCKDB_SOURCE_ID "7f217607e4" #endif #include "duckdb/function/table/system_functions.hpp" #include "duckdb/main/database.hpp" diff --git a/src/duckdb/src/include/duckdb/execution/operator/aggregate/physical_streaming_window.hpp b/src/duckdb/src/include/duckdb/execution/operator/aggregate/physical_streaming_window.hpp index 52e32d3fd..dfca3fa5e 100644 --- a/src/duckdb/src/include/duckdb/execution/operator/aggregate/physical_streaming_window.hpp +++ b/src/duckdb/src/include/duckdb/execution/operator/aggregate/physical_streaming_window.hpp @@ -30,7 +30,6 @@ class PhysicalStreamingWindow : public PhysicalOperator { public: unique_ptr GetGlobalOperatorState(ClientContext &context) const override; - unique_ptr GetOperatorState(ExecutionContext &context) const override; OperatorResultType Execute(ExecutionContext &context, DataChunk &input, DataChunk &chunk, GlobalOperatorState &gstate, OperatorState &state) const override; @@ -50,13 +49,13 @@ class PhysicalStreamingWindow : public PhysicalOperator { private: void ExecuteFunctions(ExecutionContext &context, DataChunk &chunk, DataChunk &delayed, - GlobalOperatorState &gstate_p, OperatorState &state_p) const; + GlobalOperatorState &gstate_p) const; void ExecuteInput(ExecutionContext &context, DataChunk &delayed, DataChunk &input, DataChunk &chunk, - GlobalOperatorState &gstate, OperatorState &state) const; + GlobalOperatorState &gstate) const; void ExecuteDelayed(ExecutionContext &context, DataChunk &delayed, DataChunk &input, DataChunk &chunk, - GlobalOperatorState &gstate, OperatorState &state) const; + GlobalOperatorState &gstate) const; void ExecuteShifted(ExecutionContext &context, DataChunk &delayed, DataChunk &input, DataChunk &chunk, - GlobalOperatorState &gstate, OperatorState &state) const; + GlobalOperatorState &gstate) const; }; } // namespace duckdb