Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 8 additions & 7 deletions src/duckdb/src/execution/operator/aggregate/physical_window.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,9 @@ class WindowHashGroup {
public:
using HashGroupPtr = unique_ptr<HashedSortGroup>;
using OrderMasks = HashedSortGroup::OrderMasks;
using ExecutorGlobalStatePtr = unique_ptr<WindowExecutorGlobalState>;
using ExecutorGlobalStatePtr = unique_ptr<GlobalSinkState>;
using ExecutorGlobalStates = vector<ExecutorGlobalStatePtr>;
using ExecutorLocalStatePtr = unique_ptr<WindowExecutorLocalState>;
using ExecutorLocalStatePtr = unique_ptr<LocalSinkState>;
using ExecutorLocalStates = vector<ExecutorLocalStatePtr>;
using ThreadLocalStates = vector<ExecutorLocalStates>;
using Task = WindowSourceTask;
Expand Down Expand Up @@ -765,7 +765,8 @@ void WindowLocalSourceState::Sink(ExecutionContext &context, InterruptState &int
}

for (idx_t w = 0; w < executors.size(); ++w) {
executors[w]->Sink(context, sink_chunk, coll_chunk, input_idx, *gestates[w], *local_states[w], interrupt);
OperatorSinkInput sink {*gestates[w], *local_states[w], interrupt};
executors[w]->Sink(context, sink_chunk, coll_chunk, input_idx, sink);
}

window_hash_group->sunk += input_chunk.size();
Expand All @@ -790,7 +791,8 @@ void WindowLocalSourceState::Finalize(ExecutionContext &context, InterruptState
auto &gestates = window_hash_group->gestates;
auto &local_states = window_hash_group->thread_states.at(task->thread_idx);
for (idx_t w = 0; w < executors.size(); ++w) {
executors[w]->Finalize(context, *gestates[w], *local_states[w], window_hash_group->collection, interrupt);
OperatorSinkInput sink {*gestates[w], *local_states[w], interrupt};
executors[w]->Finalize(context, window_hash_group->collection, sink);
}

// Mark this range as done
Expand Down Expand Up @@ -944,16 +946,15 @@ void WindowLocalSourceState::GetData(ExecutionContext &context, DataChunk &resul
output_chunk.Reset();
for (idx_t expr_idx = 0; expr_idx < executors.size(); ++expr_idx) {
auto &executor = *executors[expr_idx];
auto &gstate = *gestates[expr_idx];
auto &lstate = *local_states[expr_idx];
auto &result = output_chunk.data[expr_idx];
if (eval_chunk.data.empty()) {
eval_chunk.SetCardinality(input_chunk);
} else {
eval_chunk.Reset();
eval_exec.Execute(input_chunk, eval_chunk);
}
executor.Evaluate(context, position, eval_chunk, result, lstate, gstate, interrupt);
OperatorSinkInput sink {*gestates[expr_idx], *local_states[expr_idx], interrupt};
executor.Evaluate(context, position, eval_chunk, result, sink);
}
output_chunk.SetCardinality(input_chunk);
output_chunk.Verify();
Expand Down
6 changes: 3 additions & 3 deletions src/duckdb/src/function/table/version/pragma_version.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
#ifndef DUCKDB_PATCH_VERSION
#define DUCKDB_PATCH_VERSION "0-dev3047"
#define DUCKDB_PATCH_VERSION "0-dev3109"
#endif
#ifndef DUCKDB_MINOR_VERSION
#define DUCKDB_MINOR_VERSION 4
Expand All @@ -8,10 +8,10 @@
#define DUCKDB_MAJOR_VERSION 1
#endif
#ifndef DUCKDB_VERSION
#define DUCKDB_VERSION "v1.4.0-dev3047"
#define DUCKDB_VERSION "v1.4.0-dev3109"
#endif
#ifndef DUCKDB_SOURCE_ID
#define DUCKDB_SOURCE_ID "129b1fe55e"
#define DUCKDB_SOURCE_ID "d229d97f40"
#endif
#include "duckdb/function/table/system_functions.hpp"
#include "duckdb/main/database.hpp"
Expand Down
64 changes: 30 additions & 34 deletions src/duckdb/src/function/window/window_aggregate_function.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ class WindowAggregateExecutorGlobalState : public WindowExecutorGlobalState {
const ValidityMask &order_mask);

// aggregate global state
unique_ptr<WindowAggregatorState> gsink;
unique_ptr<GlobalSinkState> gsink;

// the filter reference expression.
const Expression *filter_ref;
Expand Down Expand Up @@ -91,21 +91,21 @@ WindowAggregateExecutorGlobalState::WindowAggregateExecutorGlobalState(ClientCon
gsink = executor.aggregator->GetGlobalState(client, group_count, partition_mask);
}

unique_ptr<WindowExecutorGlobalState> WindowAggregateExecutor::GetGlobalState(ClientContext &client,
const idx_t payload_count,
const ValidityMask &partition_mask,
const ValidityMask &order_mask) const {
unique_ptr<GlobalSinkState> WindowAggregateExecutor::GetGlobalState(ClientContext &client, const idx_t payload_count,
const ValidityMask &partition_mask,
const ValidityMask &order_mask) const {
return make_uniq<WindowAggregateExecutorGlobalState>(client, *this, payload_count, partition_mask, order_mask);
}

class WindowAggregateExecutorLocalState : public WindowExecutorBoundsLocalState {
public:
WindowAggregateExecutorLocalState(ExecutionContext &context, const WindowExecutorGlobalState &gstate,
WindowAggregateExecutorLocalState(ExecutionContext &context, const GlobalSinkState &gstate,
const WindowAggregator &aggregator)
: WindowExecutorBoundsLocalState(context, gstate), filter_executor(gstate.client) {
: WindowExecutorBoundsLocalState(context, gstate.Cast<WindowAggregateExecutorGlobalState>()),
filter_executor(context.client) {

auto &gastate = gstate.Cast<WindowAggregateExecutorGlobalState>();
aggregator_state = aggregator.GetLocalState(*gastate.gsink);
aggregator_state = aggregator.GetLocalState(context, *gastate.gsink);

// evaluate the FILTER clause and stuff it into a large mask for compactness and reuse
auto filter_ref = gastate.filter_ref;
Expand All @@ -117,23 +117,22 @@ class WindowAggregateExecutorLocalState : public WindowExecutorBoundsLocalState

public:
// state of aggregator
unique_ptr<WindowAggregatorState> aggregator_state;
unique_ptr<LocalSinkState> aggregator_state;
//! Executor for any filter clause
ExpressionExecutor filter_executor;
//! Result of filtering
SelectionVector filter_sel;
};

unique_ptr<WindowExecutorLocalState>
WindowAggregateExecutor::GetLocalState(ExecutionContext &context, const WindowExecutorGlobalState &gstate) const {
unique_ptr<LocalSinkState> WindowAggregateExecutor::GetLocalState(ExecutionContext &context,
const GlobalSinkState &gstate) const {
return make_uniq<WindowAggregateExecutorLocalState>(context, gstate, *aggregator);
}

void WindowAggregateExecutor::Sink(ExecutionContext &context, DataChunk &sink_chunk, DataChunk &coll_chunk,
const idx_t input_idx, WindowExecutorGlobalState &gstate,
WindowExecutorLocalState &lstate, InterruptState &interrupt) const {
auto &gastate = gstate.Cast<WindowAggregateExecutorGlobalState>();
auto &lastate = lstate.Cast<WindowAggregateExecutorLocalState>();
const idx_t input_idx, OperatorSinkInput &sink) const {
auto &gastate = sink.global_state.Cast<WindowAggregateExecutorGlobalState>();
auto &lastate = sink.local_state.Cast<WindowAggregateExecutorLocalState>();
auto &filter_sel = lastate.filter_sel;
auto &filter_executor = lastate.filter_executor;

Expand All @@ -145,11 +144,10 @@ void WindowAggregateExecutor::Sink(ExecutionContext &context, DataChunk &sink_ch
}

D_ASSERT(aggregator);
auto &gestate = *gastate.gsink;
auto &lestate = *lastate.aggregator_state;
aggregator->Sink(context, gestate, lestate, sink_chunk, coll_chunk, input_idx, filtering, filtered, interrupt);
OperatorSinkInput asink {*gastate.gsink, *lastate.aggregator_state, sink.interrupt_state};
aggregator->Sink(context, sink_chunk, coll_chunk, input_idx, filtering, filtered, asink);

WindowExecutor::Sink(context, sink_chunk, coll_chunk, input_idx, gstate, lstate, interrupt);
WindowExecutor::Sink(context, sink_chunk, coll_chunk, input_idx, sink);
}

static void ApplyWindowStats(const WindowBoundary &boundary, FrameDelta &delta, BaseStatistics *base, bool is_start) {
Expand Down Expand Up @@ -215,12 +213,11 @@ static void ApplyWindowStats(const WindowBoundary &boundary, FrameDelta &delta,
}
}

void WindowAggregateExecutor::Finalize(ExecutionContext &context, WindowExecutorGlobalState &gstate,
WindowExecutorLocalState &lstate, CollectionPtr collection,
InterruptState &interrupt) const {
WindowExecutor::Finalize(context, gstate, lstate, collection, interrupt);
void WindowAggregateExecutor::Finalize(ExecutionContext &context, CollectionPtr collection,
OperatorSinkInput &sink) const {
WindowExecutor::Finalize(context, collection, sink);

auto &gastate = gstate.Cast<WindowAggregateExecutorGlobalState>();
auto &gastate = sink.global_state.Cast<WindowAggregateExecutorGlobalState>();
auto &gsink = gastate.gsink;
D_ASSERT(aggregator);

Expand All @@ -239,21 +236,20 @@ void WindowAggregateExecutor::Finalize(ExecutionContext &context, WindowExecutor
base = wexpr.expr_stats.empty() ? nullptr : wexpr.expr_stats[1].get();
ApplyWindowStats(wexpr.end, stats[1], base, false);

auto &lastate = lstate.Cast<WindowAggregateExecutorLocalState>();
aggregator->Finalize(context, *gsink, *lastate.aggregator_state, collection, stats, interrupt);
auto &lastate = sink.local_state.Cast<WindowAggregateExecutorLocalState>();
OperatorSinkInput asink {*gsink, *lastate.aggregator_state, sink.interrupt_state};
aggregator->Finalize(context, collection, stats, asink);
}

void WindowAggregateExecutor::EvaluateInternal(ExecutionContext &context, WindowExecutorGlobalState &gstate,
WindowExecutorLocalState &lstate, DataChunk &eval_chunk, Vector &result,
idx_t count, idx_t row_idx, InterruptState &interrupt) const {
auto &gastate = gstate.Cast<WindowAggregateExecutorGlobalState>();
auto &lastate = lstate.Cast<WindowAggregateExecutorLocalState>();
void WindowAggregateExecutor::EvaluateInternal(ExecutionContext &context, DataChunk &eval_chunk, Vector &result,
idx_t count, idx_t row_idx, OperatorSinkInput &sink) const {
auto &gastate = sink.global_state.Cast<WindowAggregateExecutorGlobalState>();
auto &lastate = sink.local_state.Cast<WindowAggregateExecutorLocalState>();
auto &gsink = gastate.gsink;
D_ASSERT(aggregator);

auto &agg_state = *lastate.aggregator_state;

aggregator->Evaluate(context, *gsink, agg_state, lastate.bounds, result, count, row_idx, interrupt);
OperatorSinkInput asink {*gsink, *lastate.aggregator_state, sink.interrupt_state};
aggregator->Evaluate(context, lastate.bounds, result, count, row_idx, asink);
}

} // namespace duckdb
41 changes: 27 additions & 14 deletions src/duckdb/src/function/window/window_aggregator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,6 @@ namespace duckdb {
//===--------------------------------------------------------------------===//
// WindowAggregator
//===--------------------------------------------------------------------===//
WindowAggregatorState::WindowAggregatorState() : allocator(Allocator::DefaultAllocator()) {
}

WindowAggregator::WindowAggregator(const BoundWindowExpression &wexpr)
: wexpr(wexpr), aggr(wexpr), result_type(wexpr.return_type), state_size(aggr.function.state_size(aggr.function)),
exclude_mode(wexpr.exclude_clause) {
Expand All @@ -31,20 +28,36 @@ WindowAggregator::WindowAggregator(const BoundWindowExpression &wexpr, WindowSha
WindowAggregator::~WindowAggregator() {
}

unique_ptr<WindowAggregatorState> WindowAggregator::GetGlobalState(ClientContext &context, idx_t group_count,
const ValidityMask &) const {
WindowAggregatorGlobalState::WindowAggregatorGlobalState(ClientContext &client, const WindowAggregator &aggregator_p,
idx_t group_count)
: client(client), allocator(Allocator::DefaultAllocator()), aggregator(aggregator_p), aggr(aggregator.wexpr),
locals(0), finalized(0) {

if (aggr.filter) {
// Start with all invalid and set the ones that pass
filter_mask.Initialize(group_count, false);
} else {
filter_mask.InitializeEmpty(group_count);
}
}

unique_ptr<GlobalSinkState> WindowAggregator::GetGlobalState(ClientContext &context, idx_t group_count,
const ValidityMask &) const {
return make_uniq<WindowAggregatorGlobalState>(context, *this, group_count);
}

WindowAggregatorLocalState::WindowAggregatorLocalState(ExecutionContext &context)
: allocator(Allocator::DefaultAllocator()) {
}

void WindowAggregatorLocalState::Sink(ExecutionContext &context, WindowAggregatorGlobalState &gastate,
DataChunk &sink_chunk, DataChunk &coll_chunk, idx_t input_idx) {
}

void WindowAggregator::Sink(ExecutionContext &context, WindowAggregatorState &gstate, WindowAggregatorState &lstate,
DataChunk &sink_chunk, DataChunk &coll_chunk, idx_t input_idx,
optional_ptr<SelectionVector> filter_sel, idx_t filtered, InterruptState &interrupt) {
auto &gastate = gstate.Cast<WindowAggregatorGlobalState>();
auto &lastate = lstate.Cast<WindowAggregatorLocalState>();
void WindowAggregator::Sink(ExecutionContext &context, DataChunk &sink_chunk, DataChunk &coll_chunk, idx_t input_idx,
optional_ptr<SelectionVector> filter_sel, idx_t filtered, OperatorSinkInput &sink) {
auto &gastate = sink.global_state.Cast<WindowAggregatorGlobalState>();
auto &lastate = sink.local_state.Cast<WindowAggregatorLocalState>();
lastate.Sink(context, gastate, sink_chunk, coll_chunk, input_idx);
if (filter_sel) {
auto &filter_mask = gastate.filter_mask;
Expand Down Expand Up @@ -79,10 +92,10 @@ void WindowAggregatorLocalState::Finalize(ExecutionContext &context, WindowAggre
}
}

void WindowAggregator::Finalize(ExecutionContext &context, WindowAggregatorState &gstate, WindowAggregatorState &lstate,
CollectionPtr collection, const FrameStats &stats, InterruptState &interrupt) {
auto &gasink = gstate.Cast<WindowAggregatorGlobalState>();
auto &lastate = lstate.Cast<WindowAggregatorLocalState>();
void WindowAggregator::Finalize(ExecutionContext &context, CollectionPtr collection, const FrameStats &stats,
OperatorSinkInput &sink) {
auto &gasink = sink.global_state.Cast<WindowAggregatorGlobalState>();
auto &lastate = sink.local_state.Cast<WindowAggregatorLocalState>();
lastate.Finalize(context, gasink, collection);
}

Expand Down
41 changes: 20 additions & 21 deletions src/duckdb/src/function/window/window_constant_aggregator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ WindowConstantAggregatorGlobalState::WindowConstantAggregatorGlobalState(ClientC
//===--------------------------------------------------------------------===//
class WindowConstantAggregatorLocalState : public WindowAggregatorLocalState {
public:
explicit WindowConstantAggregatorLocalState(const WindowConstantAggregatorGlobalState &gstate);
WindowConstantAggregatorLocalState(ExecutionContext &context, const WindowConstantAggregatorGlobalState &gstate);
~WindowConstantAggregatorLocalState() override {
}

Expand All @@ -103,8 +103,9 @@ class WindowConstantAggregatorLocalState : public WindowAggregatorLocalState {
};

WindowConstantAggregatorLocalState::WindowConstantAggregatorLocalState(
const WindowConstantAggregatorGlobalState &gstate)
: gstate(gstate), statep(Value::POINTER(0)), statef(gstate.statef.aggr), partition(0) {
ExecutionContext &context, const WindowConstantAggregatorGlobalState &gstate)
: WindowAggregatorLocalState(context), gstate(gstate), statep(Value::POINTER(0)), statef(gstate.statef.aggr),
partition(0) {
matches.Initialize();

// Start the aggregates
Expand Down Expand Up @@ -201,16 +202,15 @@ WindowConstantAggregator::WindowConstantAggregator(BoundWindowExpression &wexpr,
}
}

unique_ptr<WindowAggregatorState> WindowConstantAggregator::GetGlobalState(ClientContext &context, idx_t group_count,
const ValidityMask &partition_mask) const {
unique_ptr<GlobalSinkState> WindowConstantAggregator::GetGlobalState(ClientContext &context, idx_t group_count,
const ValidityMask &partition_mask) const {
return make_uniq<WindowConstantAggregatorGlobalState>(context, *this, group_count, partition_mask);
}

void WindowConstantAggregator::Sink(ExecutionContext &context, WindowAggregatorState &gsink,
WindowAggregatorState &lstate, DataChunk &sink_chunk, DataChunk &coll_chunk,
void WindowConstantAggregator::Sink(ExecutionContext &context, DataChunk &sink_chunk, DataChunk &coll_chunk,
idx_t input_idx, optional_ptr<SelectionVector> filter_sel, idx_t filtered,
InterruptState &interrupt) {
auto &lastate = lstate.Cast<WindowConstantAggregatorLocalState>();
OperatorSinkInput &sink) {
auto &lastate = sink.local_state.Cast<WindowConstantAggregatorLocalState>();

lastate.Sink(context, sink_chunk, coll_chunk, input_idx, filter_sel, filtered);
}
Expand Down Expand Up @@ -299,11 +299,10 @@ void WindowConstantAggregatorLocalState::Sink(ExecutionContext &context, DataChu
}
}

void WindowConstantAggregator::Finalize(ExecutionContext &context, WindowAggregatorState &gstate,
WindowAggregatorState &lstate, CollectionPtr collection,
const FrameStats &stats, InterruptState &interrupt) {
auto &gastate = gstate.Cast<WindowConstantAggregatorGlobalState>();
auto &lastate = lstate.Cast<WindowConstantAggregatorLocalState>();
void WindowConstantAggregator::Finalize(ExecutionContext &context, CollectionPtr collection, const FrameStats &stats,
OperatorSinkInput &sink) {
auto &gastate = sink.global_state.Cast<WindowConstantAggregatorGlobalState>();
auto &lastate = sink.local_state.Cast<WindowConstantAggregatorLocalState>();

// Single-threaded combine
lock_guard<mutex> finalize_guard(gastate.lock);
Expand All @@ -315,20 +314,20 @@ void WindowConstantAggregator::Finalize(ExecutionContext &context, WindowAggrega
}
}

unique_ptr<WindowAggregatorState> WindowConstantAggregator::GetLocalState(const WindowAggregatorState &gstate) const {
return make_uniq<WindowConstantAggregatorLocalState>(gstate.Cast<WindowConstantAggregatorGlobalState>());
unique_ptr<LocalSinkState> WindowConstantAggregator::GetLocalState(ExecutionContext &context,
const GlobalSinkState &gstate) const {
return make_uniq<WindowConstantAggregatorLocalState>(context, gstate.Cast<WindowConstantAggregatorGlobalState>());
}

void WindowConstantAggregator::Evaluate(ExecutionContext &context, const WindowAggregatorState &gsink,
WindowAggregatorState &lstate, const DataChunk &bounds, Vector &result,
idx_t count, idx_t row_idx, InterruptState &interrupt) const {
auto &gasink = gsink.Cast<WindowConstantAggregatorGlobalState>();
void WindowConstantAggregator::Evaluate(ExecutionContext &context, const DataChunk &bounds, Vector &result, idx_t count,
idx_t row_idx, OperatorSinkInput &sink) const {
auto &gasink = sink.global_state.Cast<WindowConstantAggregatorGlobalState>();
const auto &partition_offsets = gasink.partition_offsets;
const auto &results = *gasink.results;

auto begins = FlatVector::GetData<const idx_t>(bounds.data[FRAME_BEGIN]);
// Chunk up the constants and copy them one at a time
auto &lcstate = lstate.Cast<WindowConstantAggregatorLocalState>();
auto &lcstate = sink.local_state.Cast<WindowConstantAggregatorLocalState>();
idx_t matched = 0;
idx_t target_offset = 0;
for (idx_t i = 0; i < count; ++i) {
Expand Down
Loading
Loading