Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 24 additions & 18 deletions src/duckdb/src/common/adbc/adbc.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1161,12 +1161,21 @@ AdbcStatusCode StatementSetOption(struct AdbcStatement *statement, const char *k
return ADBC_STATUS_INVALID_ARGUMENT;
}

std::string createFilter(const char *input) {
if (input) {
auto quoted = duckdb::KeywordHelper::WriteQuoted(input, '\'');
return quoted;
}
return "'%'";
}

AdbcStatusCode ConnectionGetObjects(struct AdbcConnection *connection, int depth, const char *catalog,
const char *db_schema, const char *table_name, const char **table_type,
const char *column_name, struct ArrowArrayStream *out, struct AdbcError *error) {
std::string catalog_filter = catalog ? catalog : "%";
std::string db_schema_filter = db_schema ? db_schema : "%";
std::string table_name_filter = table_name ? table_name : "%";
std::string catalog_filter = createFilter(catalog);
std::string db_schema_filter = createFilter(db_schema);
std::string table_name_filter = createFilter(table_name);
std::string column_name_filter = createFilter(column_name);
std::string table_type_condition = "";
if (table_type && table_type[0]) {
table_type_condition = " AND table_type IN (";
Expand All @@ -1182,13 +1191,10 @@ AdbcStatusCode ConnectionGetObjects(struct AdbcConnection *connection, int depth
if (i > 0) {
table_type_condition += ", ";
}
table_type_condition += "'";
table_type_condition += table_type[i];
table_type_condition += "'";
table_type_condition += createFilter(table_type[i]);
}
table_type_condition += ")";
}
std::string column_name_filter = column_name ? column_name : "%";

std::string query;
switch (depth) {
Expand Down Expand Up @@ -1233,7 +1239,7 @@ AdbcStatusCode ConnectionGetObjects(struct AdbcConnection *connection, int depth
)[] catalog_db_schemas
FROM
information_schema.schemata
WHERE catalog_name LIKE '%s'
WHERE catalog_name LIKE %s
GROUP BY catalog_name
)",
catalog_filter);
Expand All @@ -1246,7 +1252,7 @@ AdbcStatusCode ConnectionGetObjects(struct AdbcConnection *connection, int depth
catalog_name,
schema_name,
FROM information_schema.schemata
WHERE schema_name LIKE '%s'
WHERE schema_name LIKE %s
)

SELECT
Expand Down Expand Up @@ -1289,7 +1295,7 @@ AdbcStatusCode ConnectionGetObjects(struct AdbcConnection *connection, int depth
information_schema.schemata
LEFT JOIN db_schemas dbs
USING (catalog_name, schema_name)
WHERE catalog_name LIKE '%s'
WHERE catalog_name LIKE %s
GROUP BY catalog_name
)",
db_schema_filter, catalog_filter);
Expand Down Expand Up @@ -1333,7 +1339,7 @@ AdbcStatusCode ConnectionGetObjects(struct AdbcConnection *connection, int depth
)[],
}) db_schema_tables
FROM information_schema.tables
WHERE table_name LIKE '%s'%s
WHERE table_name LIKE %s%s
GROUP BY table_catalog, table_schema
),
db_schemas AS (
Expand All @@ -1344,7 +1350,7 @@ AdbcStatusCode ConnectionGetObjects(struct AdbcConnection *connection, int depth
FROM information_schema.schemata
LEFT JOIN tables
USING (catalog_name, schema_name)
WHERE schema_name LIKE '%s'
WHERE schema_name LIKE %s
)

SELECT
Expand All @@ -1357,7 +1363,7 @@ AdbcStatusCode ConnectionGetObjects(struct AdbcConnection *connection, int depth
information_schema.schemata
LEFT JOIN db_schemas dbs
USING (catalog_name, schema_name)
WHERE catalog_name LIKE '%s'
WHERE catalog_name LIKE %s
GROUP BY catalog_name
)",
table_name_filter, table_type_condition, db_schema_filter, catalog_filter);
Expand Down Expand Up @@ -1392,7 +1398,7 @@ AdbcStatusCode ConnectionGetObjects(struct AdbcConnection *connection, int depth
xdbc_is_generatedcolumn: NULL::BOOLEAN,
}) table_columns
FROM information_schema.columns
WHERE column_name LIKE '%s'
WHERE column_name LIKE %s
GROUP BY table_catalog, table_schema, table_name
),
constraints AS (
Expand Down Expand Up @@ -1421,7 +1427,7 @@ AdbcStatusCode ConnectionGetObjects(struct AdbcConnection *connection, int depth
constraint_column_names,
list_filter(
constraint_column_names,
lambda name: name LIKE '%s'
lambda name: name LIKE %s
)
)
GROUP BY database_name, schema_name, table_name
Expand All @@ -1441,7 +1447,7 @@ AdbcStatusCode ConnectionGetObjects(struct AdbcConnection *connection, int depth
USING (table_catalog, table_schema, table_name)
LEFT JOIN constraints
USING (table_catalog, table_schema, table_name)
WHERE table_name LIKE '%s'%s
WHERE table_name LIKE %s%s
GROUP BY table_catalog, table_schema
),
db_schemas AS (
Expand All @@ -1452,7 +1458,7 @@ AdbcStatusCode ConnectionGetObjects(struct AdbcConnection *connection, int depth
FROM information_schema.schemata
LEFT JOIN tables
USING (catalog_name, schema_name)
WHERE schema_name LIKE '%s'
WHERE schema_name LIKE %s
)

SELECT
Expand All @@ -1465,7 +1471,7 @@ AdbcStatusCode ConnectionGetObjects(struct AdbcConnection *connection, int depth
information_schema.schemata
LEFT JOIN db_schemas dbs
USING (catalog_name, schema_name)
WHERE catalog_name LIKE '%s'
WHERE catalog_name LIKE %s
GROUP BY catalog_name
)",
column_name_filter, column_name_filter, table_name_filter,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,12 @@ PhysicalStreamingWindow::PhysicalStreamingWindow(PhysicalPlan &physical_plan, ve

class StreamingWindowGlobalState : public GlobalOperatorState {
public:
StreamingWindowGlobalState() : row_number(1) {
}
explicit StreamingWindowGlobalState(ClientContext &client);

//! The next row number.
std::atomic<int64_t> row_number;
//! The single local state
unique_ptr<OperatorState> local_state;
};

class StreamingWindowState : public OperatorState {
Expand Down Expand Up @@ -340,6 +341,10 @@ class StreamingWindowState : public OperatorState {
DataChunk shifted;
};

StreamingWindowGlobalState::StreamingWindowGlobalState(ClientContext &client) : row_number(1) {
local_state = make_uniq<StreamingWindowState>(client);
}

bool PhysicalStreamingWindow::IsStreamingFunction(ClientContext &context, unique_ptr<Expression> &expr) {
auto &wexpr = expr->Cast<BoundWindowExpression>();
if (!wexpr.partitions.empty() || !wexpr.orders.empty() || wexpr.ignore_nulls || !wexpr.arg_orders.empty() ||
Expand Down Expand Up @@ -374,12 +379,8 @@ bool PhysicalStreamingWindow::IsStreamingFunction(ClientContext &context, unique
}
}

unique_ptr<GlobalOperatorState> PhysicalStreamingWindow::GetGlobalOperatorState(ClientContext &context) const {
return make_uniq<StreamingWindowGlobalState>();
}

unique_ptr<OperatorState> PhysicalStreamingWindow::GetOperatorState(ExecutionContext &context) const {
return make_uniq<StreamingWindowState>(context.client);
unique_ptr<GlobalOperatorState> PhysicalStreamingWindow::GetGlobalOperatorState(ClientContext &client) const {
return make_uniq<StreamingWindowGlobalState>(client);
}

void StreamingWindowState::AggregateState::Execute(ExecutionContext &context, DataChunk &input, Vector &result) {
Expand Down Expand Up @@ -486,9 +487,9 @@ void StreamingWindowState::AggregateState::Execute(ExecutionContext &context, Da
}

void PhysicalStreamingWindow::ExecuteFunctions(ExecutionContext &context, DataChunk &output, DataChunk &delayed,
GlobalOperatorState &gstate_p, OperatorState &state_p) const {
GlobalOperatorState &gstate_p) const {
auto &gstate = gstate_p.Cast<StreamingWindowGlobalState>();
auto &state = state_p.Cast<StreamingWindowState>();
auto &state = gstate.local_state->Cast<StreamingWindowState>();

// Compute window functions
const idx_t count = output.size();
Expand Down Expand Up @@ -530,9 +531,9 @@ void PhysicalStreamingWindow::ExecuteFunctions(ExecutionContext &context, DataCh
}

void PhysicalStreamingWindow::ExecuteInput(ExecutionContext &context, DataChunk &delayed, DataChunk &input,
DataChunk &output, GlobalOperatorState &gstate_p,
OperatorState &state_p) const {
auto &state = state_p.Cast<StreamingWindowState>();
DataChunk &output, GlobalOperatorState &gstate_p) const {
auto &gstate = gstate_p.Cast<StreamingWindowGlobalState>();
auto &state = gstate.local_state->Cast<StreamingWindowState>();

// Put payload columns in place
for (idx_t col_idx = 0; col_idx < input.data.size(); col_idx++) {
Expand All @@ -548,13 +549,13 @@ void PhysicalStreamingWindow::ExecuteInput(ExecutionContext &context, DataChunk
}
output.SetCardinality(count);

ExecuteFunctions(context, output, state.delayed, gstate_p, state_p);
ExecuteFunctions(context, output, state.delayed, gstate_p);
}

void PhysicalStreamingWindow::ExecuteShifted(ExecutionContext &context, DataChunk &delayed, DataChunk &input,
DataChunk &output, GlobalOperatorState &gstate_p,
OperatorState &state_p) const {
auto &state = state_p.Cast<StreamingWindowState>();
DataChunk &output, GlobalOperatorState &gstate_p) const {
auto &gstate = gstate_p.Cast<StreamingWindowGlobalState>();
auto &state = gstate.local_state->Cast<StreamingWindowState>();
auto &shifted = state.shifted;

idx_t out = output.size();
Expand All @@ -576,25 +577,25 @@ void PhysicalStreamingWindow::ExecuteShifted(ExecutionContext &context, DataChun
}
delayed.SetCardinality(delay - out + in);

ExecuteFunctions(context, output, delayed, gstate_p, state_p);
ExecuteFunctions(context, output, delayed, gstate_p);
}

void PhysicalStreamingWindow::ExecuteDelayed(ExecutionContext &context, DataChunk &delayed, DataChunk &input,
DataChunk &output, GlobalOperatorState &gstate_p,
OperatorState &state_p) const {
DataChunk &output, GlobalOperatorState &gstate_p) const {
// Put payload columns in place
for (idx_t col_idx = 0; col_idx < delayed.data.size(); col_idx++) {
output.data[col_idx].Reference(delayed.data[col_idx]);
}
idx_t count = delayed.size();
output.SetCardinality(count);

ExecuteFunctions(context, output, input, gstate_p, state_p);
ExecuteFunctions(context, output, input, gstate_p);
}

OperatorResultType PhysicalStreamingWindow::Execute(ExecutionContext &context, DataChunk &input, DataChunk &output,
GlobalOperatorState &gstate_p, OperatorState &state_p) const {
auto &state = state_p.Cast<StreamingWindowState>();
GlobalOperatorState &gstate_p, OperatorState &) const {
auto &gstate = gstate_p.Cast<StreamingWindowGlobalState>();
auto &state = gstate.local_state->Cast<StreamingWindowState>();
if (!state.initialized) {
state.Initialize(context.client, input, select_list);
}
Expand All @@ -615,27 +616,27 @@ OperatorResultType PhysicalStreamingWindow::Execute(ExecutionContext &context, D
// If we can't consume all of the delayed values,
// we need to split them instead of referencing them all
output.SetCardinality(input.size());
ExecuteShifted(context, delayed, input, output, gstate_p, state_p);
ExecuteShifted(context, delayed, input, output, gstate_p);
// We delayed the unused input so ask for more
return OperatorResultType::NEED_MORE_INPUT;
} else if (delayed.size()) {
// We have enough delayed rows so flush them
ExecuteDelayed(context, delayed, input, output, gstate_p, state_p);
ExecuteDelayed(context, delayed, input, output, gstate_p);
// Defer resetting delayed as it may be referenced.
delayed.SetCardinality(0);
// Come back to process the input
return OperatorResultType::HAVE_MORE_OUTPUT;
} else {
// No delayed rows, so emit what we can and delay the rest.
ExecuteInput(context, delayed, input, output, gstate_p, state_p);
ExecuteInput(context, delayed, input, output, gstate_p);
return OperatorResultType::NEED_MORE_INPUT;
}
}

OperatorFinalizeResultType PhysicalStreamingWindow::FinalExecute(ExecutionContext &context, DataChunk &output,
GlobalOperatorState &gstate_p,
OperatorState &state_p) const {
auto &state = state_p.Cast<StreamingWindowState>();
GlobalOperatorState &gstate_p, OperatorState &) const {
auto &gstate = gstate_p.Cast<StreamingWindowGlobalState>();
auto &state = gstate.local_state->Cast<StreamingWindowState>();

if (state.initialized && state.lead_count) {
auto &delayed = state.delayed;
Expand All @@ -646,10 +647,10 @@ OperatorFinalizeResultType PhysicalStreamingWindow::FinalExecute(ExecutionContex
if (output.GetCapacity() < delayed.size()) {
// More than one output buffer was delayed, so shift in what we can
output.SetCardinality(output.GetCapacity());
ExecuteShifted(context, delayed, input, output, gstate_p, state_p);
ExecuteShifted(context, delayed, input, output, gstate_p);
return OperatorFinalizeResultType::HAVE_MORE_OUTPUT;
}
ExecuteDelayed(context, delayed, input, output, gstate_p, state_p);
ExecuteDelayed(context, delayed, input, output, gstate_p);
}

return OperatorFinalizeResultType::FINISHED;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -473,10 +473,17 @@ SourceResultType PhysicalMergeInto::GetData(ExecutionContext &context, DataChunk
// no action to scan from
continue;
}
// found a good one
break;
}
if (lstate.index < actions.size()) {
auto &action = *actions[lstate.index];

auto &child_gstate = *gstate.global_states[lstate.index];
auto &child_lstate = *lstate.local_states[lstate.index];
OperatorSourceInput source_input {child_gstate, child_lstate, input.interrupt_state};

lstate.scan_chunk.Reset();
auto result = action.op->GetData(context, lstate.scan_chunk, source_input);
if (lstate.scan_chunk.size() > 0) {
// construct the result chunk
Expand Down Expand Up @@ -505,9 +512,13 @@ SourceResultType PhysicalMergeInto::GetData(ExecutionContext &context, DataChunk

if (result != SourceResultType::FINISHED) {
return result;
}
if (chunk.size() != 0) {
return SourceResultType::HAVE_MORE_OUTPUT;
} else {
lstate.index++;
if (lstate.index < actions.size()) {
return SourceResultType::HAVE_MORE_OUTPUT;
} else {
return SourceResultType::FINISHED;
}
}
}
return SourceResultType::FINISHED;
Expand Down
6 changes: 3 additions & 3 deletions src/duckdb/src/function/table/version/pragma_version.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
#ifndef DUCKDB_PATCH_VERSION
#define DUCKDB_PATCH_VERSION "4-dev34"
#define DUCKDB_PATCH_VERSION "4-dev55"
#endif
#ifndef DUCKDB_MINOR_VERSION
#define DUCKDB_MINOR_VERSION 4
Expand All @@ -8,10 +8,10 @@
#define DUCKDB_MAJOR_VERSION 1
#endif
#ifndef DUCKDB_VERSION
#define DUCKDB_VERSION "v1.4.4-dev34"
#define DUCKDB_VERSION "v1.4.4-dev55"
#endif
#ifndef DUCKDB_SOURCE_ID
#define DUCKDB_SOURCE_ID "6e4e3391db"
#define DUCKDB_SOURCE_ID "7f217607e4"
#endif
#include "duckdb/function/table/system_functions.hpp"
#include "duckdb/main/database.hpp"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ class PhysicalStreamingWindow : public PhysicalOperator {

public:
unique_ptr<GlobalOperatorState> GetGlobalOperatorState(ClientContext &context) const override;
unique_ptr<OperatorState> GetOperatorState(ExecutionContext &context) const override;

OperatorResultType Execute(ExecutionContext &context, DataChunk &input, DataChunk &chunk,
GlobalOperatorState &gstate, OperatorState &state) const override;
Expand All @@ -50,13 +49,13 @@ class PhysicalStreamingWindow : public PhysicalOperator {

private:
void ExecuteFunctions(ExecutionContext &context, DataChunk &chunk, DataChunk &delayed,
GlobalOperatorState &gstate_p, OperatorState &state_p) const;
GlobalOperatorState &gstate_p) const;
void ExecuteInput(ExecutionContext &context, DataChunk &delayed, DataChunk &input, DataChunk &chunk,
GlobalOperatorState &gstate, OperatorState &state) const;
GlobalOperatorState &gstate) const;
void ExecuteDelayed(ExecutionContext &context, DataChunk &delayed, DataChunk &input, DataChunk &chunk,
GlobalOperatorState &gstate, OperatorState &state) const;
GlobalOperatorState &gstate) const;
void ExecuteShifted(ExecutionContext &context, DataChunk &delayed, DataChunk &input, DataChunk &chunk,
GlobalOperatorState &gstate, OperatorState &state) const;
GlobalOperatorState &gstate) const;
};

} // namespace duckdb