diff --git a/src/duckdb/extension/core_functions/include/core_functions/aggregate/quantile_sort_tree.hpp b/src/duckdb/extension/core_functions/include/core_functions/aggregate/quantile_sort_tree.hpp index 1762ed90f..2c796b2e1 100644 --- a/src/duckdb/extension/core_functions/include/core_functions/aggregate/quantile_sort_tree.hpp +++ b/src/duckdb/extension/core_functions/include/core_functions/aggregate/quantile_sort_tree.hpp @@ -306,6 +306,7 @@ struct QuantileSortTree { QuantileSortTree(AggregateInputData &aggr_input_data, const WindowPartitionInput &partition) { // TODO: Two pass parallel sorting using Build auto &inputs = *partition.inputs; + auto &interrupt = partition.interrupt_state; ColumnDataScanState scan; DataChunk sort; inputs.InitializeScan(scan, partition.column_ids); @@ -338,12 +339,12 @@ struct QuantileSortTree { filter_sel[filtered++] = i; } } - local_state.Sink(partition.context, sort, row_idx, filter_sel, filtered); + local_state.Sink(partition.context, sort, row_idx, filter_sel, filtered, interrupt); } else { - local_state.Sink(partition.context, sort, row_idx, nullptr, 0); + local_state.Sink(partition.context, sort, row_idx, nullptr, 0, interrupt); } } - local_state.Finalize(partition.context); + local_state.Finalize(partition.context, interrupt); } inline idx_t SelectNth(const SubFrames &frames, size_t n) const { diff --git a/src/duckdb/src/execution/operator/aggregate/physical_window.cpp b/src/duckdb/src/execution/operator/aggregate/physical_window.cpp index a86ce6ae4..34a1b2bca 100644 --- a/src/duckdb/src/execution/operator/aggregate/physical_window.cpp +++ b/src/duckdb/src/execution/operator/aggregate/physical_window.cpp @@ -649,7 +649,7 @@ class WindowLocalSourceState : public LocalSourceState { //! Assign the next task bool TryAssignTask(); //! Execute a step in the current task - void ExecuteTask(ExecutionContext &context, DataChunk &chunk); + void ExecuteTask(ExecutionContext &context, DataChunk &chunk, InterruptState &interrupt); //! The shared source state WindowGlobalSourceState &gsource; @@ -667,9 +667,9 @@ class WindowLocalSourceState : public LocalSourceState { DataChunk output_chunk; protected: - void Sink(ExecutionContext &context); - void Finalize(ExecutionContext &context); - void GetData(ExecutionContext &context, DataChunk &chunk); + void Sink(ExecutionContext &context, InterruptState &interrupt); + void Finalize(ExecutionContext &context, InterruptState &interrupt); + void GetData(ExecutionContext &context, DataChunk &chunk, InterruptState &interrupt); //! Storage and evaluation for the fully materialised data unique_ptr builder; @@ -711,7 +711,7 @@ WindowHashGroup::ExecutorGlobalStates &WindowHashGroup::Initialize(ClientContext return gestates; } -void WindowLocalSourceState::Sink(ExecutionContext &context) { +void WindowLocalSourceState::Sink(ExecutionContext &context, InterruptState &interrupt) { D_ASSERT(task); D_ASSERT(task->stage == WindowGroupStage::SINK); @@ -765,7 +765,7 @@ void WindowLocalSourceState::Sink(ExecutionContext &context) { } for (idx_t w = 0; w < executors.size(); ++w) { - executors[w]->Sink(context, sink_chunk, coll_chunk, input_idx, *gestates[w], *local_states[w]); + executors[w]->Sink(context, sink_chunk, coll_chunk, input_idx, *gestates[w], *local_states[w], interrupt); } window_hash_group->sunk += input_chunk.size(); @@ -773,7 +773,7 @@ void WindowLocalSourceState::Sink(ExecutionContext &context) { scanner.reset(); } -void WindowLocalSourceState::Finalize(ExecutionContext &context) { +void WindowLocalSourceState::Finalize(ExecutionContext &context, InterruptState &interrupt) { D_ASSERT(task); D_ASSERT(task->stage == WindowGroupStage::FINALIZE); @@ -790,7 +790,7 @@ void WindowLocalSourceState::Finalize(ExecutionContext &context) { auto &gestates = window_hash_group->gestates; auto &local_states = window_hash_group->thread_states.at(task->thread_idx); for (idx_t w = 0; w < executors.size(); ++w) { - executors[w]->Finalize(context, *gestates[w], *local_states[w], window_hash_group->collection); + executors[w]->Finalize(context, *gestates[w], *local_states[w], window_hash_group->collection, interrupt); } // Mark this range as done @@ -898,7 +898,7 @@ bool WindowLocalSourceState::TryAssignTask() { return gsource.TryNextTask(task, task_local); } -void WindowLocalSourceState::ExecuteTask(ExecutionContext &context, DataChunk &result) { +void WindowLocalSourceState::ExecuteTask(ExecutionContext &context, DataChunk &result, InterruptState &interrupt) { auto &gsink = gsource.gsink; // Update the hash group @@ -907,16 +907,16 @@ void WindowLocalSourceState::ExecuteTask(ExecutionContext &context, DataChunk &r // Process the new state switch (task->stage) { case WindowGroupStage::SINK: - Sink(context); + Sink(context, interrupt); D_ASSERT(TaskFinished()); break; case WindowGroupStage::FINALIZE: - Finalize(context); + Finalize(context, interrupt); D_ASSERT(TaskFinished()); break; case WindowGroupStage::GETDATA: D_ASSERT(!TaskFinished()); - GetData(context, result); + GetData(context, result, interrupt); break; default: throw InternalException("Invalid window source state."); @@ -928,7 +928,7 @@ void WindowLocalSourceState::ExecuteTask(ExecutionContext &context, DataChunk &r } } -void WindowLocalSourceState::GetData(ExecutionContext &context, DataChunk &result) { +void WindowLocalSourceState::GetData(ExecutionContext &context, DataChunk &result, InterruptState &interrupt) { D_ASSERT(window_hash_group->GetStage() == WindowGroupStage::GETDATA); window_hash_group->UpdateScanner(scanner, task->begin_idx); @@ -953,7 +953,7 @@ void WindowLocalSourceState::GetData(ExecutionContext &context, DataChunk &resul eval_chunk.Reset(); eval_exec.Execute(input_chunk, eval_chunk); } - executor.Evaluate(context, position, eval_chunk, result, lstate, gstate); + executor.Evaluate(context, position, eval_chunk, result, lstate, gstate, interrupt); } output_chunk.SetCardinality(input_chunk); output_chunk.Verify(); @@ -1036,14 +1036,14 @@ OperatorPartitionData PhysicalWindow::GetPartitionData(ExecutionContext &context } SourceResultType PhysicalWindow::GetData(ExecutionContext &context, DataChunk &chunk, - OperatorSourceInput &input) const { - auto &gsource = input.global_state.Cast(); - auto &lsource = input.local_state.Cast(); + OperatorSourceInput &source) const { + auto &gsource = source.global_state.Cast(); + auto &lsource = source.local_state.Cast(); while (gsource.HasUnfinishedTasks() && chunk.size() == 0) { if (!lsource.TaskFinished() || lsource.TryAssignTask()) { try { - lsource.ExecuteTask(context, chunk); + lsource.ExecuteTask(context, chunk, source.interrupt_state); } catch (...) { gsource.stopped = true; throw; @@ -1057,7 +1057,7 @@ SourceResultType PhysicalWindow::GetData(ExecutionContext &context, DataChunk &c } else { // there are more tasks available, but we can't execute them yet // block the source - return gsource.BlockSource(guard, input.interrupt_state); + return gsource.BlockSource(guard, source.interrupt_state); } } } diff --git a/src/duckdb/src/execution/sample/base_reservoir_sample.cpp b/src/duckdb/src/execution/sample/base_reservoir_sample.cpp index 0f0fcdf7a..35de6d54f 100644 --- a/src/duckdb/src/execution/sample/base_reservoir_sample.cpp +++ b/src/duckdb/src/execution/sample/base_reservoir_sample.cpp @@ -3,24 +3,6 @@ namespace duckdb { -double BaseReservoirSampling::GetMinWeightFromTuplesSeen(idx_t rows_seen_total) { - // this function was obtained using https://mycurvefit.com. Inputting multiple x, y values into - // The - switch (rows_seen_total) { - case 0: - return 0; - case 1: - return 0.000161; - case 2: - return 0.530136; - case 3: - return 0.693454; - default: { - return (0.99 - 0.355 * std::exp(-0.07 * static_cast(rows_seen_total))); - } - } -} - BaseReservoirSampling::BaseReservoirSampling(int64_t seed) : random(seed) { next_index_to_sample = 0; min_weight_threshold = 0; @@ -73,7 +55,7 @@ void BaseReservoirSampling::SetNextEntry() { //! since all our weights are 1 (uniform sampling), we can just determine the amount of elements to skip min_weight_threshold = t_w; min_weighted_entry_index = min_key.second; - next_index_to_sample = MaxValue(1, idx_t(round(x_w))); + next_index_to_sample = idx_t(ceil(x_w)); num_entries_to_skip_b4_next_sample = 0; } @@ -118,15 +100,33 @@ void BaseReservoirSampling::UpdateMinWeightThreshold() { min_weight_threshold = 1; } +// Generate top k order statistics from n Uniform(0, 1) samples in O(k) time +// This method leverages two key properties of order statistics from a Uniform(0, 1) distribution: +// 1. The maximum of n independent Uniform(0, 1) samples, denoted as U(n), follows a Beta(n, 1) distribution. +// 2. U(n-i) / U(n-i+1) ~ Beta(n-i, 1) +// So we can use a recursive approach to generate the top k order statistics +// (See: https://www.math.ntu.edu.tw/~hchen/teaching/LargeSample/notes/noteorder.pdf) +static vector GenerateTopKFromUniform(ReservoirRNG &random, idx_t n, idx_t k) { + vector top_k_values(k); + double current_bound = 1.0; + for (idx_t i = 0; i < k; i++) { + // generate a sample from Beta(n - i, 1) + double beta = std::pow(random.NextRandom(), 1.0 / double(n - i)); + current_bound *= beta; + top_k_values[i] = current_bound; + } + return top_k_values; +} + void BaseReservoirSampling::FillWeights(SelectionVector &sel, idx_t &sel_size) { if (!reservoir_weights.empty()) { return; } D_ASSERT(reservoir_weights.empty()); - auto num_entries_seen_normalized = num_entries_seen_total / FIXED_SAMPLE_SIZE; - auto min_weight = GetMinWeightFromTuplesSeen(num_entries_seen_normalized); + auto weights = GenerateTopKFromUniform(random, num_entries_seen_total, sel_size); + std::shuffle(weights.begin(), weights.end(), random); for (idx_t i = 0; i < sel_size; i++) { - auto weight = random.NextRandom(min_weight, 1); + auto weight = weights[i]; reservoir_weights.emplace(-weight, i); } D_ASSERT(reservoir_weights.size() <= sel_size); diff --git a/src/duckdb/src/function/table/version/pragma_version.cpp b/src/duckdb/src/function/table/version/pragma_version.cpp index 63d81b173..cc14d55a6 100644 --- a/src/duckdb/src/function/table/version/pragma_version.cpp +++ b/src/duckdb/src/function/table/version/pragma_version.cpp @@ -1,5 +1,5 @@ #ifndef DUCKDB_PATCH_VERSION -#define DUCKDB_PATCH_VERSION "0-dev3028" +#define DUCKDB_PATCH_VERSION "0-dev3047" #endif #ifndef DUCKDB_MINOR_VERSION #define DUCKDB_MINOR_VERSION 4 @@ -8,10 +8,10 @@ #define DUCKDB_MAJOR_VERSION 1 #endif #ifndef DUCKDB_VERSION -#define DUCKDB_VERSION "v1.4.0-dev3028" +#define DUCKDB_VERSION "v1.4.0-dev3047" #endif #ifndef DUCKDB_SOURCE_ID -#define DUCKDB_SOURCE_ID "a8206a211f" +#define DUCKDB_SOURCE_ID "129b1fe55e" #endif #include "duckdb/function/table/system_functions.hpp" #include "duckdb/main/database.hpp" diff --git a/src/duckdb/src/function/window/window_aggregate_function.cpp b/src/duckdb/src/function/window/window_aggregate_function.cpp index 20a512d9d..b8b7f4aa0 100644 --- a/src/duckdb/src/function/window/window_aggregate_function.cpp +++ b/src/duckdb/src/function/window/window_aggregate_function.cpp @@ -131,7 +131,7 @@ WindowAggregateExecutor::GetLocalState(ExecutionContext &context, const WindowEx void WindowAggregateExecutor::Sink(ExecutionContext &context, DataChunk &sink_chunk, DataChunk &coll_chunk, const idx_t input_idx, WindowExecutorGlobalState &gstate, - WindowExecutorLocalState &lstate) const { + WindowExecutorLocalState &lstate, InterruptState &interrupt) const { auto &gastate = gstate.Cast(); auto &lastate = lstate.Cast(); auto &filter_sel = lastate.filter_sel; @@ -147,9 +147,9 @@ void WindowAggregateExecutor::Sink(ExecutionContext &context, DataChunk &sink_ch D_ASSERT(aggregator); auto &gestate = *gastate.gsink; auto &lestate = *lastate.aggregator_state; - aggregator->Sink(context, gestate, lestate, sink_chunk, coll_chunk, input_idx, filtering, filtered); + aggregator->Sink(context, gestate, lestate, sink_chunk, coll_chunk, input_idx, filtering, filtered, interrupt); - WindowExecutor::Sink(context, sink_chunk, coll_chunk, input_idx, gstate, lstate); + WindowExecutor::Sink(context, sink_chunk, coll_chunk, input_idx, gstate, lstate, interrupt); } static void ApplyWindowStats(const WindowBoundary &boundary, FrameDelta &delta, BaseStatistics *base, bool is_start) { @@ -216,8 +216,9 @@ static void ApplyWindowStats(const WindowBoundary &boundary, FrameDelta &delta, } void WindowAggregateExecutor::Finalize(ExecutionContext &context, WindowExecutorGlobalState &gstate, - WindowExecutorLocalState &lstate, CollectionPtr collection) const { - WindowExecutor::Finalize(context, gstate, lstate, collection); + WindowExecutorLocalState &lstate, CollectionPtr collection, + InterruptState &interrupt) const { + WindowExecutor::Finalize(context, gstate, lstate, collection, interrupt); auto &gastate = gstate.Cast(); auto &gsink = gastate.gsink; @@ -239,12 +240,12 @@ void WindowAggregateExecutor::Finalize(ExecutionContext &context, WindowExecutor ApplyWindowStats(wexpr.end, stats[1], base, false); auto &lastate = lstate.Cast(); - aggregator->Finalize(context, *gsink, *lastate.aggregator_state, collection, stats); + aggregator->Finalize(context, *gsink, *lastate.aggregator_state, collection, stats, interrupt); } void WindowAggregateExecutor::EvaluateInternal(ExecutionContext &context, WindowExecutorGlobalState &gstate, WindowExecutorLocalState &lstate, DataChunk &eval_chunk, Vector &result, - idx_t count, idx_t row_idx) const { + idx_t count, idx_t row_idx, InterruptState &interrupt) const { auto &gastate = gstate.Cast(); auto &lastate = lstate.Cast(); auto &gsink = gastate.gsink; @@ -252,7 +253,7 @@ void WindowAggregateExecutor::EvaluateInternal(ExecutionContext &context, Window auto &agg_state = *lastate.aggregator_state; - aggregator->Evaluate(context, *gsink, agg_state, lastate.bounds, result, count, row_idx); + aggregator->Evaluate(context, *gsink, agg_state, lastate.bounds, result, count, row_idx, interrupt); } } // namespace duckdb diff --git a/src/duckdb/src/function/window/window_aggregator.cpp b/src/duckdb/src/function/window/window_aggregator.cpp index 10a3d53a8..bb2ada29f 100644 --- a/src/duckdb/src/function/window/window_aggregator.cpp +++ b/src/duckdb/src/function/window/window_aggregator.cpp @@ -42,7 +42,7 @@ void WindowAggregatorLocalState::Sink(ExecutionContext &context, WindowAggregato void WindowAggregator::Sink(ExecutionContext &context, WindowAggregatorState &gstate, WindowAggregatorState &lstate, DataChunk &sink_chunk, DataChunk &coll_chunk, idx_t input_idx, - optional_ptr filter_sel, idx_t filtered) { + optional_ptr filter_sel, idx_t filtered, InterruptState &interrupt) { auto &gastate = gstate.Cast(); auto &lastate = lstate.Cast(); lastate.Sink(context, gastate, sink_chunk, coll_chunk, input_idx); @@ -80,7 +80,7 @@ void WindowAggregatorLocalState::Finalize(ExecutionContext &context, WindowAggre } void WindowAggregator::Finalize(ExecutionContext &context, WindowAggregatorState &gstate, WindowAggregatorState &lstate, - CollectionPtr collection, const FrameStats &stats) { + CollectionPtr collection, const FrameStats &stats, InterruptState &interrupt) { auto &gasink = gstate.Cast(); auto &lastate = lstate.Cast(); lastate.Finalize(context, gasink, collection); diff --git a/src/duckdb/src/function/window/window_constant_aggregator.cpp b/src/duckdb/src/function/window/window_constant_aggregator.cpp index ca51b5be2..94c77b37e 100644 --- a/src/duckdb/src/function/window/window_constant_aggregator.cpp +++ b/src/duckdb/src/function/window/window_constant_aggregator.cpp @@ -208,7 +208,8 @@ unique_ptr WindowConstantAggregator::GetGlobalState(Clien void WindowConstantAggregator::Sink(ExecutionContext &context, WindowAggregatorState &gsink, WindowAggregatorState &lstate, DataChunk &sink_chunk, DataChunk &coll_chunk, - idx_t input_idx, optional_ptr filter_sel, idx_t filtered) { + idx_t input_idx, optional_ptr filter_sel, idx_t filtered, + InterruptState &interrupt) { auto &lastate = lstate.Cast(); lastate.Sink(context, sink_chunk, coll_chunk, input_idx, filter_sel, filtered); @@ -300,7 +301,7 @@ void WindowConstantAggregatorLocalState::Sink(ExecutionContext &context, DataChu void WindowConstantAggregator::Finalize(ExecutionContext &context, WindowAggregatorState &gstate, WindowAggregatorState &lstate, CollectionPtr collection, - const FrameStats &stats) { + const FrameStats &stats, InterruptState &interrupt) { auto &gastate = gstate.Cast(); auto &lastate = lstate.Cast(); @@ -320,7 +321,7 @@ unique_ptr WindowConstantAggregator::GetLocalState(const void WindowConstantAggregator::Evaluate(ExecutionContext &context, const WindowAggregatorState &gsink, WindowAggregatorState &lstate, const DataChunk &bounds, Vector &result, - idx_t count, idx_t row_idx) const { + idx_t count, idx_t row_idx, InterruptState &interrupt) const { auto &gasink = gsink.Cast(); const auto &partition_offsets = gasink.partition_offsets; const auto &results = *gasink.results; diff --git a/src/duckdb/src/function/window/window_custom_aggregator.cpp b/src/duckdb/src/function/window/window_custom_aggregator.cpp index 70f288969..3fb3b4b70 100644 --- a/src/duckdb/src/function/window/window_custom_aggregator.cpp +++ b/src/duckdb/src/function/window/window_custom_aggregator.cpp @@ -49,20 +49,24 @@ class WindowCustomAggregatorLocalState : public WindowAggregatorLocalState { class WindowCustomAggregatorGlobalState : public WindowAggregatorGlobalState { public: - explicit WindowCustomAggregatorGlobalState(ClientContext &context, const WindowCustomAggregator &aggregator, - idx_t group_count) - : WindowAggregatorGlobalState(context, aggregator, group_count), context(context) { + using CollectionPtr = optional_ptr; + + WindowCustomAggregatorGlobalState(ClientContext &client, const WindowCustomAggregator &aggregator, + idx_t group_count) + : WindowAggregatorGlobalState(client, aggregator, group_count) { gcstate = make_uniq(aggr, aggregator.exclude_mode); } - //! Buffer manager for paging custom accelerator data - ClientContext &context; //! Traditional packed filter mask for API ValidityMask filter_packed; //! Data pointer that contains a single local state, used for global custom window execution state unique_ptr gcstate; - //! Partition description for custom window APIs - unique_ptr partition_input; + //! The argument data + CollectionPtr collection; + //! Column global validity flags + vector all_valids; + //! Frame statistics + FrameStats stats; }; WindowCustomAggregatorLocalState::WindowCustomAggregatorLocalState(const AggregateObject &aggr, @@ -88,8 +92,8 @@ unique_ptr WindowCustomAggregator::GetGlobalState(ClientC } void WindowCustomAggregator::Finalize(ExecutionContext &context, WindowAggregatorState &gstate, - WindowAggregatorState &lstate, CollectionPtr collection, - const FrameStats &stats) { + WindowAggregatorState &lstate, CollectionPtr collection, const FrameStats &stats, + InterruptState &interrupt) { // Single threaded Finalize for now auto &gcsink = gstate.Cast(); lock_guard gestate_guard(gcsink.lock); @@ -97,11 +101,12 @@ void WindowCustomAggregator::Finalize(ExecutionContext &context, WindowAggregato return; } - WindowAggregator::Finalize(context, gstate, lstate, collection, stats); + WindowAggregator::Finalize(context, gstate, lstate, collection, stats, interrupt); + gcsink.collection = collection; auto inputs = collection->inputs.get(); const auto count = collection->size(); - vector all_valids; + auto &all_valids = gcsink.all_valids; for (auto col_idx : child_idx) { all_valids.push_back(collection->all_valids[col_idx]); } @@ -109,14 +114,12 @@ void WindowCustomAggregator::Finalize(ExecutionContext &context, WindowAggregato auto &filter_packed = gcsink.filter_packed; filter_mask.Pack(filter_packed, filter_mask.Capacity()); - gcsink.partition_input = - make_uniq(context, inputs, count, child_idx, all_valids, filter_packed, stats); - if (aggr.function.window_init) { auto &gcstate = *gcsink.gcstate; + WindowPartitionInput partition(context, inputs, count, child_idx, all_valids, filter_packed, stats, interrupt); AggregateInputData aggr_input_data(aggr.GetFunctionData(), gcstate.allocator); - aggr.function.window_init(aggr_input_data, *gcsink.partition_input, gcstate.state.data()); + aggr.function.window_init(aggr_input_data, partition, gcstate.state.data()); } ++gcsink.finalized; @@ -128,7 +131,7 @@ unique_ptr WindowCustomAggregator::GetLocalState(const Wi void WindowCustomAggregator::Evaluate(ExecutionContext &context, const WindowAggregatorState &gsink, WindowAggregatorState &lstate, const DataChunk &bounds, Vector &result, - idx_t count, idx_t row_idx) const { + idx_t count, idx_t row_idx, InterruptState &interrupt) const { auto &lcstate = lstate.Cast(); auto &frames = lcstate.frames; const_data_ptr_t gstate_p = nullptr; @@ -137,11 +140,17 @@ void WindowCustomAggregator::Evaluate(ExecutionContext &context, const WindowAgg gstate_p = gcsink.gcstate->state.data(); } + auto collection = gcsink.collection; + auto inputs = collection->inputs.get(); + auto &all_valids = gcsink.all_valids; + auto &filter_packed = gcsink.filter_packed; + auto &stats = gcsink.stats; + WindowPartitionInput partition(context, inputs, collection->size(), child_idx, all_valids, filter_packed, stats, + interrupt); EvaluateSubFrames(bounds, exclude_mode, count, row_idx, frames, [&](idx_t i) { // Extract the range AggregateInputData aggr_input_data(aggr.GetFunctionData(), lstate.allocator); - aggr.function.window(aggr_input_data, *gcsink.partition_input, gstate_p, lcstate.state.data(), frames, result, - i); + aggr.function.window(aggr_input_data, partition, gstate_p, lcstate.state.data(), frames, result, i); }); } diff --git a/src/duckdb/src/function/window/window_distinct_aggregator.cpp b/src/duckdb/src/function/window/window_distinct_aggregator.cpp index a868335e7..71814ea0b 100644 --- a/src/duckdb/src/function/window/window_distinct_aggregator.cpp +++ b/src/duckdb/src/function/window/window_distinct_aggregator.cpp @@ -187,7 +187,7 @@ class WindowDistinctAggregatorLocalState : public WindowAggregatorLocalState { } void Sink(ExecutionContext &context, DataChunk &sink_chunk, DataChunk &coll_chunk, idx_t input_idx, - optional_ptr filter_sel, idx_t filtered); + optional_ptr filter_sel, idx_t filtered, InterruptState &interrupt); void Finalize(ExecutionContext &context, WindowAggregatorGlobalState &gastate, CollectionPtr collection) override; void Sorted(); void ExecuteTask(ExecutionContext &context, WindowDistinctAggregatorGlobalState &gdstate); @@ -248,16 +248,17 @@ unique_ptr WindowDistinctAggregator::GetGlobalState(Clien void WindowDistinctAggregator::Sink(ExecutionContext &context, WindowAggregatorState &gsink, WindowAggregatorState &lstate, DataChunk &sink_chunk, DataChunk &coll_chunk, - idx_t input_idx, optional_ptr filter_sel, idx_t filtered) { - WindowAggregator::Sink(context, gsink, lstate, sink_chunk, coll_chunk, input_idx, filter_sel, filtered); + idx_t input_idx, optional_ptr filter_sel, idx_t filtered, + InterruptState &interrupt) { + WindowAggregator::Sink(context, gsink, lstate, sink_chunk, coll_chunk, input_idx, filter_sel, filtered, interrupt); auto &ldstate = lstate.Cast(); - ldstate.Sink(context, sink_chunk, coll_chunk, input_idx, filter_sel, filtered); + ldstate.Sink(context, sink_chunk, coll_chunk, input_idx, filter_sel, filtered, interrupt); } void WindowDistinctAggregatorLocalState::Sink(ExecutionContext &context, DataChunk &sink_chunk, DataChunk &coll_chunk, - idx_t input_idx, optional_ptr filter_sel, - idx_t filtered) { + idx_t input_idx, optional_ptr filter_sel, idx_t filtered, + InterruptState &interrupt) { // 3: for i ← 0 to in.size do // 4: sorted[i] ← (in[i], i) const auto count = sink_chunk.size(); @@ -283,8 +284,7 @@ void WindowDistinctAggregatorLocalState::Sink(ExecutionContext &context, DataChu local_sink = gdstate.InitializeLocalSort(context); } - InterruptState interrupt_state; - OperatorSinkInput sink {*gdstate.global_sink, *local_sink, interrupt_state}; + OperatorSinkInput sink {*gdstate.global_sink, *local_sink, interrupt}; gdstate.sort->Sink(context, sort_chunk, sink); } @@ -390,7 +390,7 @@ bool WindowDistinctAggregatorGlobalState::TryPrepareNextStage(WindowDistinctAggr void WindowDistinctAggregator::Finalize(ExecutionContext &context, WindowAggregatorState &gsink, WindowAggregatorState &lstate, CollectionPtr collection, - const FrameStats &stats) { + const FrameStats &stats, InterruptState &interrupt) { auto &gdsink = gsink.Cast(); auto &ldstate = lstate.Cast(); ldstate.Finalize(context, gdsink, collection); @@ -700,7 +700,7 @@ unique_ptr WindowDistinctAggregator::GetLocalState(const void WindowDistinctAggregator::Evaluate(ExecutionContext &context, const WindowAggregatorState &gsink, WindowAggregatorState &lstate, const DataChunk &bounds, Vector &result, - idx_t count, idx_t row_idx) const { + idx_t count, idx_t row_idx, InterruptState &interrupt) const { const auto &gdstate = gsink.Cast(); auto &ldstate = lstate.Cast(); diff --git a/src/duckdb/src/function/window/window_executor.cpp b/src/duckdb/src/function/window/window_executor.cpp index 90d1b6569..2d2e256b4 100644 --- a/src/duckdb/src/function/window/window_executor.cpp +++ b/src/duckdb/src/function/window/window_executor.cpp @@ -48,12 +48,13 @@ bool WindowExecutor::IgnoreNulls() const { } void WindowExecutor::Evaluate(ExecutionContext &context, idx_t row_idx, DataChunk &eval_chunk, Vector &result, - WindowExecutorLocalState &lstate, WindowExecutorGlobalState &gstate) const { + WindowExecutorLocalState &lstate, WindowExecutorGlobalState &gstate, + InterruptState &interrupt) const { auto &lbstate = lstate.Cast(); lbstate.UpdateBounds(gstate, row_idx, eval_chunk, lstate.range_cursor); const auto count = eval_chunk.size(); - EvaluateInternal(context, gstate, lstate, eval_chunk, result, count, row_idx); + EvaluateInternal(context, gstate, lstate, eval_chunk, result, count, row_idx, interrupt); result.Verify(count); } @@ -72,11 +73,11 @@ WindowExecutorLocalState::WindowExecutorLocalState(ExecutionContext &context, co } void WindowExecutorLocalState::Sink(ExecutionContext &context, WindowExecutorGlobalState &gstate, DataChunk &sink_chunk, - DataChunk &coll_chunk, idx_t input_idx) { + DataChunk &coll_chunk, idx_t input_idx, InterruptState &interrupt) { } void WindowExecutorLocalState::Finalize(ExecutionContext &context, WindowExecutorGlobalState &gstate, - CollectionPtr collection) { + CollectionPtr collection, InterruptState &interrupt) { const auto range_idx = gstate.executor.range_idx; if (range_idx != DConstants::INVALID_INDEX) { range_cursor = make_uniq(*collection, range_idx); @@ -95,14 +96,15 @@ unique_ptr WindowExecutor::GetLocalState(ExecutionCont } void WindowExecutor::Sink(ExecutionContext &context, DataChunk &sink_chunk, DataChunk &coll_chunk, - const idx_t input_idx, WindowExecutorGlobalState &gstate, - WindowExecutorLocalState &lstate) const { - lstate.Sink(context, gstate, sink_chunk, coll_chunk, input_idx); + const idx_t input_idx, WindowExecutorGlobalState &gstate, WindowExecutorLocalState &lstate, + InterruptState &interrupt) const { + lstate.Sink(context, gstate, sink_chunk, coll_chunk, input_idx, interrupt); } void WindowExecutor::Finalize(ExecutionContext &context, WindowExecutorGlobalState &gstate, - WindowExecutorLocalState &lstate, CollectionPtr collection) const { - lstate.Finalize(context, gstate, collection); + WindowExecutorLocalState &lstate, CollectionPtr collection, + InterruptState &interrupt) const { + lstate.Finalize(context, gstate, collection, interrupt); } } // namespace duckdb diff --git a/src/duckdb/src/function/window/window_merge_sort_tree.cpp b/src/duckdb/src/function/window/window_merge_sort_tree.cpp index 9ffd269bb..6af3d0e5b 100644 --- a/src/duckdb/src/function/window/window_merge_sort_tree.cpp +++ b/src/duckdb/src/function/window/window_merge_sort_tree.cpp @@ -65,7 +65,8 @@ WindowMergeSortTreeLocalState::WindowMergeSortTreeLocalState(ExecutionContext &c } void WindowMergeSortTreeLocalState::Sink(ExecutionContext &context, DataChunk &chunk, const idx_t row_idx, - optional_ptr filter_sel, idx_t filtered) { + optional_ptr filter_sel, idx_t filtered, + InterruptState &interrupt) { // Sequence the payload column sort_chunk.Reset(); auto &indices = sort_chunk.data.back(); @@ -83,25 +84,22 @@ void WindowMergeSortTreeLocalState::Sink(ExecutionContext &context, DataChunk &c sort_chunk.Slice(*filter_sel, filtered); } - InterruptState interrupt; OperatorSinkInput sink {*window_tree.global_sink, *local_sink, interrupt}; window_tree.sort->Sink(context, sort_chunk, sink); } -void WindowMergeSortTreeLocalState::ExecuteSortTask(ExecutionContext &context) { +void WindowMergeSortTreeLocalState::ExecuteSortTask(ExecutionContext &context, InterruptState &interrupt) { PostIncrement> on_completed(window_tree.tasks_completed); switch (build_stage) { case WindowMergeSortStage::COMBINE: { auto &local_sink = *window_tree.local_sinks[build_task]; - InterruptState interrupt_state; - OperatorSinkCombineInput combine {*window_tree.global_sink, local_sink, interrupt_state}; + OperatorSinkCombineInput combine {*window_tree.global_sink, local_sink, interrupt}; window_tree.sort->Combine(context, combine); break; } case WindowMergeSortStage::FINALIZE: { auto &sort = *window_tree.sort; - InterruptState interrupt; OperatorSinkFinalizeInput finalize {*window_tree.global_sink, interrupt}; sort.Finalize(context.client, finalize); auto sort_global = sort.GetGlobalSourceState(context.client, *window_tree.global_sink); @@ -201,11 +199,11 @@ bool WindowMergeSortTree::TryPrepareSortStage(WindowMergeSortTreeLocalState &lst return true; } -void WindowMergeSortTreeLocalState::Finalize(ExecutionContext &context) { +void WindowMergeSortTreeLocalState::Finalize(ExecutionContext &context, InterruptState &interrupt) { // Sort, merge and build the tree in parallel while (window_tree.build_stage.load() != WindowMergeSortStage::FINISHED) { if (window_tree.TryPrepareSortStage(*this)) { - ExecuteSortTask(context); + ExecuteSortTask(context, interrupt); } else { std::this_thread::yield(); } diff --git a/src/duckdb/src/function/window/window_naive_aggregator.cpp b/src/duckdb/src/function/window/window_naive_aggregator.cpp index 35ba22cbd..2201f19cf 100644 --- a/src/duckdb/src/function/window/window_naive_aggregator.cpp +++ b/src/duckdb/src/function/window/window_naive_aggregator.cpp @@ -53,7 +53,7 @@ class WindowNaiveLocalState : public WindowAggregatorLocalState { void Finalize(ExecutionContext &context, WindowAggregatorGlobalState &gastate, CollectionPtr collection) override; void Evaluate(ExecutionContext &context, const WindowAggregatorGlobalState &gsink, const DataChunk &bounds, - Vector &result, idx_t count, idx_t row_idx); + Vector &result, idx_t count, idx_t row_idx, InterruptState &interrupt); protected: //! Flush the accumulated intermediate states into the result states @@ -220,7 +220,8 @@ bool WindowNaiveLocalState::KeyEqual(const idx_t &lidx, const idx_t &ridx) { } void WindowNaiveLocalState::Evaluate(ExecutionContext &context, const WindowAggregatorGlobalState &gsink, - const DataChunk &bounds, Vector &result, idx_t count, idx_t row_idx) { + const DataChunk &bounds, Vector &result, idx_t count, idx_t row_idx, + InterruptState &interrupt) { const auto &aggr = gsink.aggr; auto &filter_mask = gsink.filter_mask; const auto types = cursor->chunk.GetTypes(); @@ -243,7 +244,6 @@ void WindowNaiveLocalState::Evaluate(ExecutionContext &context, const WindowAggr if (arg_orderer) { auto global_sink = sort->GetGlobalSinkState(context.client); auto local_sink = sort->GetLocalSinkState(context); - InterruptState interrupt; OperatorSinkInput sink {*global_sink, *local_sink, interrupt}; idx_t orderby_count = 0; @@ -366,10 +366,10 @@ unique_ptr WindowNaiveAggregator::GetLocalState(const Win void WindowNaiveAggregator::Evaluate(ExecutionContext &context, const WindowAggregatorState &gsink, WindowAggregatorState &lstate, const DataChunk &bounds, Vector &result, - idx_t count, idx_t row_idx) const { + idx_t count, idx_t row_idx, InterruptState &interrupt) const { const auto &gnstate = gsink.Cast(); auto &lnstate = lstate.Cast(); - lnstate.Evaluate(context, gnstate, bounds, result, count, row_idx); + lnstate.Evaluate(context, gnstate, bounds, result, count, row_idx, interrupt); } } // namespace duckdb diff --git a/src/duckdb/src/function/window/window_rank_function.cpp b/src/duckdb/src/function/window/window_rank_function.cpp index dbd2f98a4..9de1aab12 100644 --- a/src/duckdb/src/function/window/window_rank_function.cpp +++ b/src/duckdb/src/function/window/window_rank_function.cpp @@ -49,9 +49,10 @@ class WindowPeerLocalState : public WindowExecutorBoundsLocalState { //! Accumulate the secondary sort values void Sink(ExecutionContext &context, WindowExecutorGlobalState &gstate, DataChunk &sink_chunk, - DataChunk &coll_chunk, idx_t input_idx) override; + DataChunk &coll_chunk, idx_t input_idx, InterruptState &interrupt) override; //! Finish the sinking and prepare to scan - void Finalize(ExecutionContext &context, WindowExecutorGlobalState &gstate, CollectionPtr collection) override; + void Finalize(ExecutionContext &context, WindowExecutorGlobalState &gstate, CollectionPtr collection, + InterruptState &interrupt) override; void NextRank(idx_t partition_begin, idx_t peer_begin, idx_t row_idx); @@ -66,22 +67,22 @@ class WindowPeerLocalState : public WindowExecutorBoundsLocalState { }; void WindowPeerLocalState::Sink(ExecutionContext &context, WindowExecutorGlobalState &gstate, DataChunk &sink_chunk, - DataChunk &coll_chunk, idx_t input_idx) { - WindowExecutorBoundsLocalState::Sink(context, gstate, sink_chunk, coll_chunk, input_idx); + DataChunk &coll_chunk, idx_t input_idx, InterruptState &interrupt) { + WindowExecutorBoundsLocalState::Sink(context, gstate, sink_chunk, coll_chunk, input_idx, interrupt); if (local_tree) { auto &local_tokens = local_tree->Cast(); - local_tokens.Sink(context, sink_chunk, input_idx, nullptr, 0); + local_tokens.Sink(context, sink_chunk, input_idx, nullptr, 0, interrupt); } } void WindowPeerLocalState::Finalize(ExecutionContext &context, WindowExecutorGlobalState &gstate, - CollectionPtr collection) { - WindowExecutorBoundsLocalState::Finalize(context, gstate, collection); + CollectionPtr collection, InterruptState &interrupt) { + WindowExecutorBoundsLocalState::Finalize(context, gstate, collection, interrupt); if (local_tree) { auto &local_tokens = local_tree->Cast(); - local_tokens.Finalize(context); + local_tokens.Finalize(context, interrupt); local_tokens.window_tree.Build(); } } @@ -131,7 +132,7 @@ WindowRankExecutor::WindowRankExecutor(BoundWindowExpression &wexpr, WindowShare void WindowRankExecutor::EvaluateInternal(ExecutionContext &context, WindowExecutorGlobalState &gstate, WindowExecutorLocalState &lstate, DataChunk &eval_chunk, Vector &result, - idx_t count, idx_t row_idx) const { + idx_t count, idx_t row_idx, InterruptState &interrupt) const { auto &gpeer = gstate.Cast(); auto &lpeer = lstate.Cast(); auto rdata = FlatVector::GetData(result); @@ -175,7 +176,7 @@ WindowDenseRankExecutor::WindowDenseRankExecutor(BoundWindowExpression &wexpr, W void WindowDenseRankExecutor::EvaluateInternal(ExecutionContext &context, WindowExecutorGlobalState &gstate, WindowExecutorLocalState &lstate, DataChunk &eval_chunk, Vector &result, - idx_t count, idx_t row_idx) const { + idx_t count, idx_t row_idx, InterruptState &interrupt) const { auto &lpeer = lstate.Cast(); auto &order_mask = gstate.order_mask; @@ -242,7 +243,8 @@ static inline double PercentRank(const idx_t begin, const idx_t end, const uint6 void WindowPercentRankExecutor::EvaluateInternal(ExecutionContext &context, WindowExecutorGlobalState &gstate, WindowExecutorLocalState &lstate, DataChunk &eval_chunk, - Vector &result, idx_t count, idx_t row_idx) const { + Vector &result, idx_t count, idx_t row_idx, + InterruptState &interrupt) const { auto &gpeer = gstate.Cast(); auto &lpeer = lstate.Cast(); auto rdata = FlatVector::GetData(result); @@ -295,7 +297,7 @@ static inline double CumeDist(const idx_t begin, const idx_t end, const idx_t pe void WindowCumeDistExecutor::EvaluateInternal(ExecutionContext &context, WindowExecutorGlobalState &gstate, WindowExecutorLocalState &lstate, DataChunk &eval_chunk, Vector &result, - idx_t count, idx_t row_idx) const { + idx_t count, idx_t row_idx, InterruptState &interrupt) const { auto &gpeer = gstate.Cast(); auto &lpeer = lstate.Cast(); auto rdata = FlatVector::GetData(result); diff --git a/src/duckdb/src/function/window/window_rownumber_function.cpp b/src/duckdb/src/function/window/window_rownumber_function.cpp index de52242b5..347b1a7dc 100644 --- a/src/duckdb/src/function/window/window_rownumber_function.cpp +++ b/src/duckdb/src/function/window/window_rownumber_function.cpp @@ -56,9 +56,10 @@ class WindowRowNumberLocalState : public WindowExecutorBoundsLocalState { //! Accumulate the secondary sort values void Sink(ExecutionContext &context, WindowExecutorGlobalState &gstate, DataChunk &sink_chunk, - DataChunk &coll_chunk, idx_t input_idx) override; + DataChunk &coll_chunk, idx_t input_idx, InterruptState &interrupt) override; //! Finish the sinking and prepare to scan - void Finalize(ExecutionContext &context, WindowExecutorGlobalState &gstate, CollectionPtr collection) override; + void Finalize(ExecutionContext &context, WindowExecutorGlobalState &gstate, CollectionPtr collection, + InterruptState &interrupt) override; //! The corresponding global peer state const WindowRowNumberGlobalState &grstate; @@ -67,22 +68,23 @@ class WindowRowNumberLocalState : public WindowExecutorBoundsLocalState { }; void WindowRowNumberLocalState::Sink(ExecutionContext &context, WindowExecutorGlobalState &gstate, - DataChunk &sink_chunk, DataChunk &coll_chunk, idx_t input_idx) { - WindowExecutorBoundsLocalState::Sink(context, gstate, sink_chunk, coll_chunk, input_idx); + DataChunk &sink_chunk, DataChunk &coll_chunk, idx_t input_idx, + InterruptState &interrupt) { + WindowExecutorBoundsLocalState::Sink(context, gstate, sink_chunk, coll_chunk, input_idx, interrupt); if (local_tree) { auto &local_tokens = local_tree->Cast(); - local_tokens.Sink(context, sink_chunk, input_idx, nullptr, 0); + local_tokens.Sink(context, sink_chunk, input_idx, nullptr, 0, interrupt); } } void WindowRowNumberLocalState::Finalize(ExecutionContext &context, WindowExecutorGlobalState &gstate, - CollectionPtr collection) { - WindowExecutorBoundsLocalState::Finalize(context, gstate, collection); + CollectionPtr collection, InterruptState &interrupt) { + WindowExecutorBoundsLocalState::Finalize(context, gstate, collection, interrupt); if (local_tree) { auto &local_tokens = local_tree->Cast(); - local_tokens.Finalize(context); + local_tokens.Finalize(context, interrupt); local_tokens.window_tree.Build(); } } @@ -112,7 +114,7 @@ WindowRowNumberExecutor::GetLocalState(ExecutionContext &context, const WindowEx void WindowRowNumberExecutor::EvaluateInternal(ExecutionContext &context, WindowExecutorGlobalState &gstate, WindowExecutorLocalState &lstate, DataChunk &eval_chunk, Vector &result, - idx_t count, idx_t row_idx) const { + idx_t count, idx_t row_idx, InterruptState &interrupt) const { auto &grstate = gstate.Cast(); auto &lrstate = lstate.Cast(); auto rdata = FlatVector::GetData(result); @@ -151,7 +153,7 @@ WindowNtileExecutor::WindowNtileExecutor(BoundWindowExpression &wexpr, WindowSha void WindowNtileExecutor::EvaluateInternal(ExecutionContext &context, WindowExecutorGlobalState &gstate, WindowExecutorLocalState &lstate, DataChunk &eval_chunk, Vector &result, - idx_t count, idx_t row_idx) const { + idx_t count, idx_t row_idx, InterruptState &interrupt) const { auto &grstate = gstate.Cast(); auto &lrstate = lstate.Cast(); auto partition_begin = FlatVector::GetData(lrstate.bounds.data[PARTITION_BEGIN]); diff --git a/src/duckdb/src/function/window/window_segment_tree.cpp b/src/duckdb/src/function/window/window_segment_tree.cpp index 31d4f8597..38fb851e1 100644 --- a/src/duckdb/src/function/window/window_segment_tree.cpp +++ b/src/duckdb/src/function/window/window_segment_tree.cpp @@ -146,8 +146,8 @@ class WindowSegmentTreeLocalState : public WindowAggregatorLocalState { }; void WindowSegmentTree::Finalize(ExecutionContext &context, WindowAggregatorState &gsink, WindowAggregatorState &lstate, - CollectionPtr collection, const FrameStats &stats) { - WindowAggregator::Finalize(context, gsink, lstate, collection, stats); + CollectionPtr collection, const FrameStats &stats, InterruptState &interrupt) { + WindowAggregator::Finalize(context, gsink, lstate, collection, stats, interrupt); auto &gasink = gsink.Cast(); ++gasink.finalized; @@ -393,7 +393,7 @@ void WindowSegmentTreeLocalState::Finalize(ExecutionContext &context, WindowAggr void WindowSegmentTree::Evaluate(ExecutionContext &context, const WindowAggregatorState &gsink, WindowAggregatorState &lstate, const DataChunk &bounds, Vector &result, idx_t count, - idx_t row_idx) const { + idx_t row_idx, InterruptState &interrupt) const { const auto >state = gsink.Cast(); auto <state = lstate.Cast(); ltstate.Evaluate(context, gtstate, bounds, result, count, row_idx); diff --git a/src/duckdb/src/function/window/window_value_function.cpp b/src/duckdb/src/function/window/window_value_function.cpp index 9e7f0f514..a4bb0695d 100644 --- a/src/duckdb/src/function/window/window_value_function.cpp +++ b/src/duckdb/src/function/window/window_value_function.cpp @@ -70,9 +70,10 @@ class WindowValueLocalState : public WindowExecutorBoundsLocalState { //! Accumulate the secondary sort values void Sink(ExecutionContext &context, WindowExecutorGlobalState &gstate, DataChunk &sink_chunk, - DataChunk &coll_chunk, idx_t input_idx) override; + DataChunk &coll_chunk, idx_t input_idx, InterruptState &interrupt) override; //! Finish the sinking and prepare to scan - void Finalize(ExecutionContext &context, WindowExecutorGlobalState &gstate, CollectionPtr collection) override; + void Finalize(ExecutionContext &context, WindowExecutorGlobalState &gstate, CollectionPtr collection, + InterruptState &interrupt) override; //! The corresponding global value state const WindowValueGlobalState &gvstate; @@ -88,8 +89,8 @@ class WindowValueLocalState : public WindowExecutorBoundsLocalState { }; void WindowValueLocalState::Sink(ExecutionContext &context, WindowExecutorGlobalState &gstate, DataChunk &sink_chunk, - DataChunk &coll_chunk, idx_t input_idx) { - WindowExecutorBoundsLocalState::Sink(context, gstate, sink_chunk, coll_chunk, input_idx); + DataChunk &coll_chunk, idx_t input_idx, InterruptState &interrupt) { + WindowExecutorBoundsLocalState::Sink(context, gstate, sink_chunk, coll_chunk, input_idx, interrupt); if (local_value) { idx_t filtered = 0; @@ -114,17 +115,17 @@ void WindowValueLocalState::Sink(ExecutionContext &context, WindowExecutorGlobal } auto &value_state = local_value->Cast(); - value_state.Sink(context, sink_chunk, input_idx, filter_sel, filtered); + value_state.Sink(context, sink_chunk, input_idx, filter_sel, filtered, interrupt); } } void WindowValueLocalState::Finalize(ExecutionContext &context, WindowExecutorGlobalState &gstate, - CollectionPtr collection) { - WindowExecutorBoundsLocalState::Finalize(context, gstate, collection); + CollectionPtr collection, InterruptState &interrupt) { + WindowExecutorBoundsLocalState::Finalize(context, gstate, collection, interrupt); if (local_value) { auto &value_state = local_value->Cast(); - value_state.Finalize(context); + value_state.Finalize(context, interrupt); value_state.index_tree.Build(); } @@ -165,11 +166,12 @@ unique_ptr WindowValueExecutor::GetGlobalState(Client } void WindowValueExecutor::Finalize(ExecutionContext &context, WindowExecutorGlobalState &gstate, - WindowExecutorLocalState &lstate, CollectionPtr collection) const { + WindowExecutorLocalState &lstate, CollectionPtr collection, + InterruptState &interrupt) const { auto &gvstate = gstate.Cast(); gvstate.Finalize(collection); - WindowExecutor::Finalize(context, gstate, lstate, collection); + WindowExecutor::Finalize(context, gstate, lstate, collection, interrupt); } unique_ptr WindowValueExecutor::GetLocalState(ExecutionContext &context, @@ -245,34 +247,35 @@ class WindowLeadLagLocalState : public WindowValueLocalState { //! Accumulate the secondary sort values void Sink(ExecutionContext &context, WindowExecutorGlobalState &gstate, DataChunk &sink_chunk, - DataChunk &coll_chunk, idx_t input_idx) override; + DataChunk &coll_chunk, idx_t input_idx, InterruptState &interrupt) override; //! Finish the sinking and prepare to scan - void Finalize(ExecutionContext &context, WindowExecutorGlobalState &gstate, CollectionPtr collection) override; + void Finalize(ExecutionContext &context, WindowExecutorGlobalState &gstate, CollectionPtr collection, + InterruptState &interrupt) override; //! The optional sorting state for the secondary sort row mapping unique_ptr local_row; }; void WindowLeadLagLocalState::Sink(ExecutionContext &context, WindowExecutorGlobalState &gstate, DataChunk &sink_chunk, - DataChunk &coll_chunk, idx_t input_idx) { - WindowValueLocalState::Sink(context, gstate, sink_chunk, coll_chunk, input_idx); + DataChunk &coll_chunk, idx_t input_idx, InterruptState &interrupt) { + WindowValueLocalState::Sink(context, gstate, sink_chunk, coll_chunk, input_idx, interrupt); if (local_row) { idx_t filtered = 0; optional_ptr filter_sel; auto &row_state = local_row->Cast(); - row_state.Sink(context, sink_chunk, input_idx, filter_sel, filtered); + row_state.Sink(context, sink_chunk, input_idx, filter_sel, filtered, interrupt); } } void WindowLeadLagLocalState::Finalize(ExecutionContext &context, WindowExecutorGlobalState &gstate, - CollectionPtr collection) { - WindowValueLocalState::Finalize(context, gstate, collection); + CollectionPtr collection, InterruptState &interrupt) { + WindowValueLocalState::Finalize(context, gstate, collection, interrupt); if (local_row) { auto &row_state = local_row->Cast(); - row_state.Finalize(context); + row_state.Finalize(context, interrupt); row_state.window_tree.Build(); } } @@ -299,7 +302,7 @@ WindowLeadLagExecutor::GetLocalState(ExecutionContext &context, const WindowExec void WindowLeadLagExecutor::EvaluateInternal(ExecutionContext &context, WindowExecutorGlobalState &gstate, WindowExecutorLocalState &lstate, DataChunk &eval_chunk, Vector &result, - idx_t count, idx_t row_idx) const { + idx_t count, idx_t row_idx, InterruptState &interrupt) const { auto &glstate = gstate.Cast(); auto &llstate = lstate.Cast(); auto &cursor = *llstate.cursor; @@ -456,7 +459,7 @@ WindowFirstValueExecutor::WindowFirstValueExecutor(BoundWindowExpression &wexpr, void WindowFirstValueExecutor::EvaluateInternal(ExecutionContext &context, WindowExecutorGlobalState &gstate, WindowExecutorLocalState &lstate, DataChunk &eval_chunk, Vector &result, - idx_t count, idx_t row_idx) const { + idx_t count, idx_t row_idx, InterruptState &interrupt) const { auto &gvstate = gstate.Cast(); auto &lvstate = lstate.Cast(); auto &cursor = *lvstate.cursor; @@ -506,7 +509,7 @@ WindowLastValueExecutor::WindowLastValueExecutor(BoundWindowExpression &wexpr, W void WindowLastValueExecutor::EvaluateInternal(ExecutionContext &context, WindowExecutorGlobalState &gstate, WindowExecutorLocalState &lstate, DataChunk &eval_chunk, Vector &result, - idx_t count, idx_t row_idx) const { + idx_t count, idx_t row_idx, InterruptState &interrupt) const { auto &gvstate = gstate.Cast(); auto &lvstate = lstate.Cast(); auto &cursor = *lvstate.cursor; @@ -566,7 +569,7 @@ WindowNthValueExecutor::WindowNthValueExecutor(BoundWindowExpression &wexpr, Win void WindowNthValueExecutor::EvaluateInternal(ExecutionContext &context, WindowExecutorGlobalState &gstate, WindowExecutorLocalState &lstate, DataChunk &eval_chunk, Vector &result, - idx_t count, idx_t row_idx) const { + idx_t count, idx_t row_idx, InterruptState &interrupt) const { auto &gvstate = gstate.Cast(); auto &lvstate = lstate.Cast(); auto &cursor = *lvstate.cursor; @@ -896,15 +899,16 @@ class WindowFillLocalState : public WindowLeadLagLocalState { } //! Finish the sinking and prepare to scan - void Finalize(ExecutionContext &context, WindowExecutorGlobalState &gstate, CollectionPtr collection) override; + void Finalize(ExecutionContext &context, WindowExecutorGlobalState &gstate, CollectionPtr collection, + InterruptState &interrupt) override; //! Cursor for the secondary sort values unique_ptr order_cursor; }; void WindowFillLocalState::Finalize(ExecutionContext &context, WindowExecutorGlobalState &gstate, - CollectionPtr collection) { - WindowLeadLagLocalState::Finalize(context, gstate, collection); + CollectionPtr collection, InterruptState &interrupt) { + WindowLeadLagLocalState::Finalize(context, gstate, collection, interrupt); // Prepare to scan auto &gfstate = gvstate.Cast(); @@ -928,7 +932,7 @@ unique_ptr WindowFillExecutor::GetLocalState(Execution void WindowFillExecutor::EvaluateInternal(ExecutionContext &context, WindowExecutorGlobalState &gstate, WindowExecutorLocalState &lstate, DataChunk &, Vector &result, idx_t count, - idx_t row_idx) const { + idx_t row_idx, InterruptState &interrupt) const { auto &lfstate = lstate.Cast(); auto &cursor = *lfstate.cursor; diff --git a/src/duckdb/src/include/duckdb/execution/reservoir_sample.hpp b/src/duckdb/src/include/duckdb/execution/reservoir_sample.hpp index 811c3faca..5050808cf 100644 --- a/src/duckdb/src/include/duckdb/execution/reservoir_sample.hpp +++ b/src/duckdb/src/include/duckdb/execution/reservoir_sample.hpp @@ -88,7 +88,6 @@ class BaseReservoirSampling { void Serialize(Serializer &serializer) const; static unique_ptr Deserialize(Deserializer &deserializer); - static double GetMinWeightFromTuplesSeen(idx_t rows_seen_total); // static unordered_map tuples_to_min_weight_map; // Blocking sample is a virtual class. It should be allowed to see the weights and // of tuples in the sample. The blocking sample can then easily maintain statisitcal properties diff --git a/src/duckdb/src/include/duckdb/function/aggregate_function.hpp b/src/duckdb/src/include/duckdb/function/aggregate_function.hpp index 799d6c3b1..97aea3d58 100644 --- a/src/duckdb/src/include/duckdb/function/aggregate_function.hpp +++ b/src/duckdb/src/include/duckdb/function/aggregate_function.hpp @@ -17,6 +17,7 @@ namespace duckdb { class BufferManager; +class InterruptState; //! A half-open range of frame boundary values _relative to the current row_ //! This is why they are signed values. @@ -35,19 +36,20 @@ using FrameStats = array; //! but the row count will still be valid class ColumnDataCollection; struct WindowPartitionInput { - WindowPartitionInput(ExecutionContext &context, const ColumnDataCollection *inputs, idx_t count, - vector &column_ids, vector &all_valid, const ValidityMask &filter_mask, - const FrameStats &stats) + WindowPartitionInput(ExecutionContext &context, const ColumnDataCollection *inputs, const idx_t count, + const vector &column_ids, const vector &all_valid, + const ValidityMask &filter_mask, const FrameStats &stats, InterruptState &interrupt_state) : context(context), inputs(inputs), count(count), column_ids(column_ids), all_valid(all_valid), - filter_mask(filter_mask), stats(stats) { + filter_mask(filter_mask), stats(stats), interrupt_state(interrupt_state) { } ExecutionContext &context; const ColumnDataCollection *inputs; - idx_t count; - vector column_ids; - vector all_valid; + const idx_t count; + const vector column_ids; + const vector &all_valid; const ValidityMask &filter_mask; const FrameStats stats; + InterruptState &interrupt_state; }; //! The type used for sizing hashed aggregate function states diff --git a/src/duckdb/src/include/duckdb/function/window/window_aggregate_function.hpp b/src/duckdb/src/include/duckdb/function/window/window_aggregate_function.hpp index df6ec5745..766dc61ef 100644 --- a/src/duckdb/src/include/duckdb/function/window/window_aggregate_function.hpp +++ b/src/duckdb/src/include/duckdb/function/window/window_aggregate_function.hpp @@ -20,9 +20,10 @@ class WindowAggregateExecutor : public WindowExecutor { WindowAggregationMode mode); void Sink(ExecutionContext &context, DataChunk &sink_chunk, DataChunk &coll_chunk, const idx_t input_idx, - WindowExecutorGlobalState &gstate, WindowExecutorLocalState &lstate) const override; + WindowExecutorGlobalState &gstate, WindowExecutorLocalState &lstate, + InterruptState &interrupt) const override; void Finalize(ExecutionContext &context, WindowExecutorGlobalState &gstate, WindowExecutorLocalState &lstate, - CollectionPtr collection) const override; + CollectionPtr collection, InterruptState &interrupt) const override; unique_ptr GetGlobalState(ClientContext &client, const idx_t payload_count, const ValidityMask &partition_mask, @@ -41,7 +42,7 @@ class WindowAggregateExecutor : public WindowExecutor { protected: void EvaluateInternal(ExecutionContext &context, WindowExecutorGlobalState &gstate, WindowExecutorLocalState &lstate, DataChunk &eval_chunk, Vector &result, idx_t count, - idx_t row_idx) const override; + idx_t row_idx, InterruptState &interrupt) const override; }; } // namespace duckdb diff --git a/src/duckdb/src/include/duckdb/function/window/window_aggregator.hpp b/src/duckdb/src/include/duckdb/function/window/window_aggregator.hpp index 82fd00bd9..2edb8745e 100644 --- a/src/duckdb/src/include/duckdb/function/window/window_aggregator.hpp +++ b/src/duckdb/src/include/duckdb/function/window/window_aggregator.hpp @@ -117,13 +117,14 @@ class WindowAggregator { // Build virtual void Sink(ExecutionContext &context, WindowAggregatorState &gstate, WindowAggregatorState &lstate, DataChunk &sink_chunk, DataChunk &coll_chunk, idx_t input_idx, - optional_ptr filter_sel, idx_t filtered); + optional_ptr filter_sel, idx_t filtered, InterruptState &interrupt); virtual void Finalize(ExecutionContext &context, WindowAggregatorState &gstate, WindowAggregatorState &lstate, - CollectionPtr collection, const FrameStats &stats); + CollectionPtr collection, const FrameStats &stats, InterruptState &interrupt); // Probe virtual void Evaluate(ExecutionContext &context, const WindowAggregatorState &gsink, WindowAggregatorState &lstate, - const DataChunk &bounds, Vector &result, idx_t count, idx_t row_idx) const = 0; + const DataChunk &bounds, Vector &result, idx_t count, idx_t row_idx, + InterruptState &interrupt) const = 0; //! The window function const BoundWindowExpression &wexpr; diff --git a/src/duckdb/src/include/duckdb/function/window/window_constant_aggregator.hpp b/src/duckdb/src/include/duckdb/function/window/window_constant_aggregator.hpp index f15572d9c..a81075b62 100644 --- a/src/duckdb/src/include/duckdb/function/window/window_constant_aggregator.hpp +++ b/src/duckdb/src/include/duckdb/function/window/window_constant_aggregator.hpp @@ -26,13 +26,14 @@ class WindowConstantAggregator : public WindowAggregator { const ValidityMask &partition_mask) const override; void Sink(ExecutionContext &context, WindowAggregatorState &gstate, WindowAggregatorState &lstate, DataChunk &sink_chunk, DataChunk &coll_chunk, idx_t input_idx, optional_ptr filter_sel, - idx_t filtered) override; + idx_t filtered, InterruptState &interrupt) override; void Finalize(ExecutionContext &context, WindowAggregatorState &gstate, WindowAggregatorState &lstate, - CollectionPtr collection, const FrameStats &stats) override; + CollectionPtr collection, const FrameStats &stats, InterruptState &interrupt) override; unique_ptr GetLocalState(const WindowAggregatorState &gstate) const override; void Evaluate(ExecutionContext &context, const WindowAggregatorState &gsink, WindowAggregatorState &lstate, - const DataChunk &bounds, Vector &result, idx_t count, idx_t row_idx) const override; + const DataChunk &bounds, Vector &result, idx_t count, idx_t row_idx, + InterruptState &interrupt) const override; }; } // namespace duckdb diff --git a/src/duckdb/src/include/duckdb/function/window/window_custom_aggregator.hpp b/src/duckdb/src/include/duckdb/function/window/window_custom_aggregator.hpp index cbee084fd..a86e1fe94 100644 --- a/src/duckdb/src/include/duckdb/function/window/window_custom_aggregator.hpp +++ b/src/duckdb/src/include/duckdb/function/window/window_custom_aggregator.hpp @@ -22,11 +22,12 @@ class WindowCustomAggregator : public WindowAggregator { unique_ptr GetGlobalState(ClientContext &context, idx_t group_count, const ValidityMask &partition_mask) const override; void Finalize(ExecutionContext &context, WindowAggregatorState &gstate, WindowAggregatorState &lstate, - CollectionPtr collection, const FrameStats &stats) override; + CollectionPtr collection, const FrameStats &stats, InterruptState &interrupt) override; unique_ptr GetLocalState(const WindowAggregatorState &gstate) const override; void Evaluate(ExecutionContext &context, const WindowAggregatorState &gsink, WindowAggregatorState &lstate, - const DataChunk &bounds, Vector &result, idx_t count, idx_t row_idx) const override; + const DataChunk &bounds, Vector &result, idx_t count, idx_t row_idx, + InterruptState &interrupt) const override; }; } // namespace duckdb diff --git a/src/duckdb/src/include/duckdb/function/window/window_distinct_aggregator.hpp b/src/duckdb/src/include/duckdb/function/window/window_distinct_aggregator.hpp index 839150e52..a2a19929a 100644 --- a/src/duckdb/src/include/duckdb/function/window/window_distinct_aggregator.hpp +++ b/src/duckdb/src/include/duckdb/function/window/window_distinct_aggregator.hpp @@ -24,14 +24,15 @@ class WindowDistinctAggregator : public WindowAggregator { const ValidityMask &partition_mask) const override; void Sink(ExecutionContext &context, WindowAggregatorState &gsink, WindowAggregatorState &lstate, DataChunk &sink_chunk, DataChunk &coll_chunk, idx_t input_idx, optional_ptr filter_sel, - idx_t filtered) override; + idx_t filtered, InterruptState &interrupt) override; void Finalize(ExecutionContext &context, WindowAggregatorState &gstate, WindowAggregatorState &lstate, - CollectionPtr collection, const FrameStats &stats) override; + CollectionPtr collection, const FrameStats &stats, InterruptState &interrupt) override; // Evaluate unique_ptr GetLocalState(const WindowAggregatorState &gstate) const override; void Evaluate(ExecutionContext &context, const WindowAggregatorState &gsink, WindowAggregatorState &lstate, - const DataChunk &bounds, Vector &result, idx_t count, idx_t row_idx) const override; + const DataChunk &bounds, Vector &result, idx_t count, idx_t row_idx, + InterruptState &interrupt) const override; //! Context for sorting ClientContext &context; diff --git a/src/duckdb/src/include/duckdb/function/window/window_executor.hpp b/src/duckdb/src/include/duckdb/function/window/window_executor.hpp index 1489b4c0a..1459f2d19 100644 --- a/src/duckdb/src/include/duckdb/function/window/window_executor.hpp +++ b/src/duckdb/src/include/duckdb/function/window/window_executor.hpp @@ -14,6 +14,7 @@ namespace duckdb { class WindowCollection; +class InterruptState; struct WindowSharedExpressions; @@ -60,8 +61,9 @@ class WindowExecutorLocalState : public WindowExecutorState { WindowExecutorLocalState(ExecutionContext &context, const WindowExecutorGlobalState &gstate); virtual void Sink(ExecutionContext &context, WindowExecutorGlobalState &gstate, DataChunk &sink_chunk, - DataChunk &coll_chunk, idx_t input_idx); - virtual void Finalize(ExecutionContext &context, WindowExecutorGlobalState &gstate, CollectionPtr collection); + DataChunk &coll_chunk, idx_t input_idx, InterruptState &interrupt); + virtual void Finalize(ExecutionContext &context, WindowExecutorGlobalState &gstate, CollectionPtr collection, + InterruptState &interrupt); //! The state used for reading the range collection unique_ptr range_cursor; @@ -100,13 +102,14 @@ class WindowExecutor { const WindowExecutorGlobalState &gstate) const; virtual void Sink(ExecutionContext &context, DataChunk &sink_chunk, DataChunk &coll_chunk, const idx_t input_idx, - WindowExecutorGlobalState &gstate, WindowExecutorLocalState &lstate) const; + WindowExecutorGlobalState &gstate, WindowExecutorLocalState &lstate, + InterruptState &interrupt) const; virtual void Finalize(ExecutionContext &context, WindowExecutorGlobalState &gstate, - WindowExecutorLocalState &lstate, CollectionPtr collection) const; + WindowExecutorLocalState &lstate, CollectionPtr collection, InterruptState &interrupt) const; void Evaluate(ExecutionContext &context, idx_t row_idx, DataChunk &eval_chunk, Vector &result, - WindowExecutorLocalState &lstate, WindowExecutorGlobalState &gstate) const; + WindowExecutorLocalState &lstate, WindowExecutorGlobalState &gstate, InterruptState &interrupt) const; // The function const BoundWindowExpression &wexpr; @@ -122,7 +125,7 @@ class WindowExecutor { protected: virtual void EvaluateInternal(ExecutionContext &context, WindowExecutorGlobalState &gstate, WindowExecutorLocalState &lstate, DataChunk &eval_chunk, Vector &result, idx_t count, - idx_t row_idx) const = 0; + idx_t row_idx, InterruptState &interrupt) const = 0; }; } // namespace duckdb diff --git a/src/duckdb/src/include/duckdb/function/window/window_merge_sort_tree.hpp b/src/duckdb/src/include/duckdb/function/window/window_merge_sort_tree.hpp index faa0f30b7..a79ce50a9 100644 --- a/src/duckdb/src/include/duckdb/function/window/window_merge_sort_tree.hpp +++ b/src/duckdb/src/include/duckdb/function/window/window_merge_sort_tree.hpp @@ -26,9 +26,9 @@ class WindowMergeSortTreeLocalState : public WindowAggregatorState { //! Add a chunk to the local sort void Sink(ExecutionContext &context, DataChunk &chunk, const idx_t row_idx, - optional_ptr filter_sel, idx_t filtered); + optional_ptr filter_sel, idx_t filtered, InterruptState &interrupt); //! Sort the data - void Finalize(ExecutionContext &context); + void Finalize(ExecutionContext &context, InterruptState &interrupt); //! Process sorted leaf data virtual void BuildLeaves() = 0; @@ -44,7 +44,7 @@ class WindowMergeSortTreeLocalState : public WindowAggregatorState { idx_t build_task; private: - void ExecuteSortTask(ExecutionContext &context); + void ExecuteSortTask(ExecutionContext &context, InterruptState &interrupt); }; class WindowMergeSortTree { diff --git a/src/duckdb/src/include/duckdb/function/window/window_naive_aggregator.hpp b/src/duckdb/src/include/duckdb/function/window/window_naive_aggregator.hpp index 0af8d691c..26dc3a2fd 100644 --- a/src/duckdb/src/include/duckdb/function/window/window_naive_aggregator.hpp +++ b/src/duckdb/src/include/duckdb/function/window/window_naive_aggregator.hpp @@ -22,7 +22,8 @@ class WindowNaiveAggregator : public WindowAggregator { unique_ptr GetLocalState(const WindowAggregatorState &gstate) const override; void Evaluate(ExecutionContext &context, const WindowAggregatorState &gsink, WindowAggregatorState &lstate, - const DataChunk &bounds, Vector &result, idx_t count, idx_t row_idx) const override; + const DataChunk &bounds, Vector &result, idx_t count, idx_t row_idx, + InterruptState &interrupt) const override; //! The parent executor const WindowAggregateExecutor &executor; diff --git a/src/duckdb/src/include/duckdb/function/window/window_rank_function.hpp b/src/duckdb/src/include/duckdb/function/window/window_rank_function.hpp index 7127fd04d..ebed62dc0 100644 --- a/src/duckdb/src/include/duckdb/function/window/window_rank_function.hpp +++ b/src/duckdb/src/include/duckdb/function/window/window_rank_function.hpp @@ -33,7 +33,7 @@ class WindowRankExecutor : public WindowPeerExecutor { protected: void EvaluateInternal(ExecutionContext &context, WindowExecutorGlobalState &gstate, WindowExecutorLocalState &lstate, DataChunk &eval_chunk, Vector &result, idx_t count, - idx_t row_idx) const override; + idx_t row_idx, InterruptState &interrupt) const override; }; class WindowDenseRankExecutor : public WindowPeerExecutor { @@ -43,7 +43,7 @@ class WindowDenseRankExecutor : public WindowPeerExecutor { protected: void EvaluateInternal(ExecutionContext &context, WindowExecutorGlobalState &gstate, WindowExecutorLocalState &lstate, DataChunk &eval_chunk, Vector &result, idx_t count, - idx_t row_idx) const override; + idx_t row_idx, InterruptState &interrupt) const override; }; class WindowPercentRankExecutor : public WindowPeerExecutor { @@ -53,7 +53,7 @@ class WindowPercentRankExecutor : public WindowPeerExecutor { protected: void EvaluateInternal(ExecutionContext &context, WindowExecutorGlobalState &gstate, WindowExecutorLocalState &lstate, DataChunk &eval_chunk, Vector &result, idx_t count, - idx_t row_idx) const override; + idx_t row_idx, InterruptState &interrupt) const override; }; class WindowCumeDistExecutor : public WindowPeerExecutor { @@ -63,7 +63,7 @@ class WindowCumeDistExecutor : public WindowPeerExecutor { protected: void EvaluateInternal(ExecutionContext &context, WindowExecutorGlobalState &gstate, WindowExecutorLocalState &lstate, DataChunk &eval_chunk, Vector &result, idx_t count, - idx_t row_idx) const override; + idx_t row_idx, InterruptState &interrupt) const override; }; } // namespace duckdb diff --git a/src/duckdb/src/include/duckdb/function/window/window_rownumber_function.hpp b/src/duckdb/src/include/duckdb/function/window/window_rownumber_function.hpp index c387143ca..f548a6e30 100644 --- a/src/duckdb/src/include/duckdb/function/window/window_rownumber_function.hpp +++ b/src/duckdb/src/include/duckdb/function/window/window_rownumber_function.hpp @@ -30,7 +30,7 @@ class WindowRowNumberExecutor : public WindowExecutor { protected: void EvaluateInternal(ExecutionContext &context, WindowExecutorGlobalState &gstate, WindowExecutorLocalState &lstate, DataChunk &eval_chunk, Vector &result, idx_t count, - idx_t row_idx) const override; + idx_t row_idx, InterruptState &interrupt) const override; }; // NTILE is just scaled ROW_NUMBER @@ -41,7 +41,7 @@ class WindowNtileExecutor : public WindowRowNumberExecutor { protected: void EvaluateInternal(ExecutionContext &context, WindowExecutorGlobalState &gstate, WindowExecutorLocalState &lstate, DataChunk &eval_chunk, Vector &result, idx_t count, - idx_t row_idx) const override; + idx_t row_idx, InterruptState &interrupt) const override; }; } // namespace duckdb diff --git a/src/duckdb/src/include/duckdb/function/window/window_segment_tree.hpp b/src/duckdb/src/include/duckdb/function/window/window_segment_tree.hpp index c571533b6..bad3aebb2 100644 --- a/src/duckdb/src/include/duckdb/function/window/window_segment_tree.hpp +++ b/src/duckdb/src/include/duckdb/function/window/window_segment_tree.hpp @@ -22,10 +22,11 @@ class WindowSegmentTree : public WindowAggregator { const ValidityMask &partition_mask) const override; unique_ptr GetLocalState(const WindowAggregatorState &gstate) const override; void Finalize(ExecutionContext &context, WindowAggregatorState &gstate, WindowAggregatorState &lstate, - CollectionPtr collection, const FrameStats &stats) override; + CollectionPtr collection, const FrameStats &stats, InterruptState &interrupt) override; void Evaluate(ExecutionContext &context, const WindowAggregatorState &gstate, WindowAggregatorState &lstate, - const DataChunk &bounds, Vector &result, idx_t count, idx_t row_idx) const override; + const DataChunk &bounds, Vector &result, idx_t count, idx_t row_idx, + InterruptState &interrupt) const override; }; } // namespace duckdb diff --git a/src/duckdb/src/include/duckdb/function/window/window_value_function.hpp b/src/duckdb/src/include/duckdb/function/window/window_value_function.hpp index 6076ebdd0..de581a915 100644 --- a/src/duckdb/src/include/duckdb/function/window/window_value_function.hpp +++ b/src/duckdb/src/include/duckdb/function/window/window_value_function.hpp @@ -18,7 +18,7 @@ class WindowValueExecutor : public WindowExecutor { WindowValueExecutor(BoundWindowExpression &wexpr, WindowSharedExpressions &shared); void Finalize(ExecutionContext &context, WindowExecutorGlobalState &gstate, WindowExecutorLocalState &lstate, - CollectionPtr collection) const override; + CollectionPtr collection, InterruptState &interrupt) const override; unique_ptr GetGlobalState(ClientContext &client, const idx_t payload_count, const ValidityMask &partition_mask, @@ -51,7 +51,7 @@ class WindowLeadLagExecutor : public WindowValueExecutor { protected: void EvaluateInternal(ExecutionContext &context, WindowExecutorGlobalState &gstate, WindowExecutorLocalState &lstate, DataChunk &eval_chunk, Vector &result, idx_t count, - idx_t row_idx) const override; + idx_t row_idx, InterruptState &interrupt) const override; }; class WindowFirstValueExecutor : public WindowValueExecutor { @@ -61,7 +61,7 @@ class WindowFirstValueExecutor : public WindowValueExecutor { protected: void EvaluateInternal(ExecutionContext &context, WindowExecutorGlobalState &gstate, WindowExecutorLocalState &lstate, DataChunk &eval_chunk, Vector &result, idx_t count, - idx_t row_idx) const override; + idx_t row_idx, InterruptState &interrupt) const override; }; class WindowLastValueExecutor : public WindowValueExecutor { @@ -71,7 +71,7 @@ class WindowLastValueExecutor : public WindowValueExecutor { protected: void EvaluateInternal(ExecutionContext &context, WindowExecutorGlobalState &gstate, WindowExecutorLocalState &lstate, DataChunk &eval_chunk, Vector &result, idx_t count, - idx_t row_idx) const override; + idx_t row_idx, InterruptState &interrupt) const override; }; class WindowNthValueExecutor : public WindowValueExecutor { @@ -81,7 +81,7 @@ class WindowNthValueExecutor : public WindowValueExecutor { protected: void EvaluateInternal(ExecutionContext &context, WindowExecutorGlobalState &gstate, WindowExecutorLocalState &lstate, DataChunk &eval_chunk, Vector &result, idx_t count, - idx_t row_idx) const override; + idx_t row_idx, InterruptState &interrupt) const override; }; class WindowFillExecutor : public WindowValueExecutor { @@ -105,7 +105,7 @@ class WindowFillExecutor : public WindowValueExecutor { protected: void EvaluateInternal(ExecutionContext &context, WindowExecutorGlobalState &gstate, WindowExecutorLocalState &lstate, DataChunk &eval_chunk, Vector &result, idx_t count, - idx_t row_idx) const override; + idx_t row_idx, InterruptState &interrupt) const override; }; } // namespace duckdb