Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -306,6 +306,7 @@ struct QuantileSortTree {
QuantileSortTree(AggregateInputData &aggr_input_data, const WindowPartitionInput &partition) {
// TODO: Two pass parallel sorting using Build
auto &inputs = *partition.inputs;
auto &interrupt = partition.interrupt_state;
ColumnDataScanState scan;
DataChunk sort;
inputs.InitializeScan(scan, partition.column_ids);
Expand Down Expand Up @@ -338,12 +339,12 @@ struct QuantileSortTree {
filter_sel[filtered++] = i;
}
}
local_state.Sink(partition.context, sort, row_idx, filter_sel, filtered);
local_state.Sink(partition.context, sort, row_idx, filter_sel, filtered, interrupt);
} else {
local_state.Sink(partition.context, sort, row_idx, nullptr, 0);
local_state.Sink(partition.context, sort, row_idx, nullptr, 0, interrupt);
}
}
local_state.Finalize(partition.context);
local_state.Finalize(partition.context, interrupt);
}

inline idx_t SelectNth(const SubFrames &frames, size_t n) const {
Expand Down
38 changes: 19 additions & 19 deletions src/duckdb/src/execution/operator/aggregate/physical_window.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -649,7 +649,7 @@ class WindowLocalSourceState : public LocalSourceState {
//! Assign the next task
bool TryAssignTask();
//! Execute a step in the current task
void ExecuteTask(ExecutionContext &context, DataChunk &chunk);
void ExecuteTask(ExecutionContext &context, DataChunk &chunk, InterruptState &interrupt);

//! The shared source state
WindowGlobalSourceState &gsource;
Expand All @@ -667,9 +667,9 @@ class WindowLocalSourceState : public LocalSourceState {
DataChunk output_chunk;

protected:
void Sink(ExecutionContext &context);
void Finalize(ExecutionContext &context);
void GetData(ExecutionContext &context, DataChunk &chunk);
void Sink(ExecutionContext &context, InterruptState &interrupt);
void Finalize(ExecutionContext &context, InterruptState &interrupt);
void GetData(ExecutionContext &context, DataChunk &chunk, InterruptState &interrupt);

//! Storage and evaluation for the fully materialised data
unique_ptr<WindowBuilder> builder;
Expand Down Expand Up @@ -711,7 +711,7 @@ WindowHashGroup::ExecutorGlobalStates &WindowHashGroup::Initialize(ClientContext
return gestates;
}

void WindowLocalSourceState::Sink(ExecutionContext &context) {
void WindowLocalSourceState::Sink(ExecutionContext &context, InterruptState &interrupt) {
D_ASSERT(task);
D_ASSERT(task->stage == WindowGroupStage::SINK);

Expand Down Expand Up @@ -765,15 +765,15 @@ void WindowLocalSourceState::Sink(ExecutionContext &context) {
}

for (idx_t w = 0; w < executors.size(); ++w) {
executors[w]->Sink(context, sink_chunk, coll_chunk, input_idx, *gestates[w], *local_states[w]);
executors[w]->Sink(context, sink_chunk, coll_chunk, input_idx, *gestates[w], *local_states[w], interrupt);
}

window_hash_group->sunk += input_chunk.size();
}
scanner.reset();
}

void WindowLocalSourceState::Finalize(ExecutionContext &context) {
void WindowLocalSourceState::Finalize(ExecutionContext &context, InterruptState &interrupt) {
D_ASSERT(task);
D_ASSERT(task->stage == WindowGroupStage::FINALIZE);

Expand All @@ -790,7 +790,7 @@ void WindowLocalSourceState::Finalize(ExecutionContext &context) {
auto &gestates = window_hash_group->gestates;
auto &local_states = window_hash_group->thread_states.at(task->thread_idx);
for (idx_t w = 0; w < executors.size(); ++w) {
executors[w]->Finalize(context, *gestates[w], *local_states[w], window_hash_group->collection);
executors[w]->Finalize(context, *gestates[w], *local_states[w], window_hash_group->collection, interrupt);
}

// Mark this range as done
Expand Down Expand Up @@ -898,7 +898,7 @@ bool WindowLocalSourceState::TryAssignTask() {
return gsource.TryNextTask(task, task_local);
}

void WindowLocalSourceState::ExecuteTask(ExecutionContext &context, DataChunk &result) {
void WindowLocalSourceState::ExecuteTask(ExecutionContext &context, DataChunk &result, InterruptState &interrupt) {
auto &gsink = gsource.gsink;

// Update the hash group
Expand All @@ -907,16 +907,16 @@ void WindowLocalSourceState::ExecuteTask(ExecutionContext &context, DataChunk &r
// Process the new state
switch (task->stage) {
case WindowGroupStage::SINK:
Sink(context);
Sink(context, interrupt);
D_ASSERT(TaskFinished());
break;
case WindowGroupStage::FINALIZE:
Finalize(context);
Finalize(context, interrupt);
D_ASSERT(TaskFinished());
break;
case WindowGroupStage::GETDATA:
D_ASSERT(!TaskFinished());
GetData(context, result);
GetData(context, result, interrupt);
break;
default:
throw InternalException("Invalid window source state.");
Expand All @@ -928,7 +928,7 @@ void WindowLocalSourceState::ExecuteTask(ExecutionContext &context, DataChunk &r
}
}

void WindowLocalSourceState::GetData(ExecutionContext &context, DataChunk &result) {
void WindowLocalSourceState::GetData(ExecutionContext &context, DataChunk &result, InterruptState &interrupt) {
D_ASSERT(window_hash_group->GetStage() == WindowGroupStage::GETDATA);

window_hash_group->UpdateScanner(scanner, task->begin_idx);
Expand All @@ -953,7 +953,7 @@ void WindowLocalSourceState::GetData(ExecutionContext &context, DataChunk &resul
eval_chunk.Reset();
eval_exec.Execute(input_chunk, eval_chunk);
}
executor.Evaluate(context, position, eval_chunk, result, lstate, gstate);
executor.Evaluate(context, position, eval_chunk, result, lstate, gstate, interrupt);
}
output_chunk.SetCardinality(input_chunk);
output_chunk.Verify();
Expand Down Expand Up @@ -1036,14 +1036,14 @@ OperatorPartitionData PhysicalWindow::GetPartitionData(ExecutionContext &context
}

SourceResultType PhysicalWindow::GetData(ExecutionContext &context, DataChunk &chunk,
OperatorSourceInput &input) const {
auto &gsource = input.global_state.Cast<WindowGlobalSourceState>();
auto &lsource = input.local_state.Cast<WindowLocalSourceState>();
OperatorSourceInput &source) const {
auto &gsource = source.global_state.Cast<WindowGlobalSourceState>();
auto &lsource = source.local_state.Cast<WindowLocalSourceState>();

while (gsource.HasUnfinishedTasks() && chunk.size() == 0) {
if (!lsource.TaskFinished() || lsource.TryAssignTask()) {
try {
lsource.ExecuteTask(context, chunk);
lsource.ExecuteTask(context, chunk, source.interrupt_state);
} catch (...) {
gsource.stopped = true;
throw;
Expand All @@ -1057,7 +1057,7 @@ SourceResultType PhysicalWindow::GetData(ExecutionContext &context, DataChunk &c
} else {
// there are more tasks available, but we can't execute them yet
// block the source
return gsource.BlockSource(guard, input.interrupt_state);
return gsource.BlockSource(guard, source.interrupt_state);
}
}
}
Expand Down
44 changes: 22 additions & 22 deletions src/duckdb/src/execution/sample/base_reservoir_sample.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,24 +3,6 @@

namespace duckdb {

double BaseReservoirSampling::GetMinWeightFromTuplesSeen(idx_t rows_seen_total) {
// this function was obtained using https://mycurvefit.com. Inputting multiple x, y values into
// The
switch (rows_seen_total) {
case 0:
return 0;
case 1:
return 0.000161;
case 2:
return 0.530136;
case 3:
return 0.693454;
default: {
return (0.99 - 0.355 * std::exp(-0.07 * static_cast<double>(rows_seen_total)));
}
}
}

BaseReservoirSampling::BaseReservoirSampling(int64_t seed) : random(seed) {
next_index_to_sample = 0;
min_weight_threshold = 0;
Expand Down Expand Up @@ -73,7 +55,7 @@ void BaseReservoirSampling::SetNextEntry() {
//! since all our weights are 1 (uniform sampling), we can just determine the amount of elements to skip
min_weight_threshold = t_w;
min_weighted_entry_index = min_key.second;
next_index_to_sample = MaxValue<idx_t>(1, idx_t(round(x_w)));
next_index_to_sample = idx_t(ceil(x_w));
num_entries_to_skip_b4_next_sample = 0;
}

Expand Down Expand Up @@ -118,15 +100,33 @@ void BaseReservoirSampling::UpdateMinWeightThreshold() {
min_weight_threshold = 1;
}

// Generate top k order statistics from n Uniform(0, 1) samples in O(k) time
// This method leverages two key properties of order statistics from a Uniform(0, 1) distribution:
// 1. The maximum of n independent Uniform(0, 1) samples, denoted as U(n), follows a Beta(n, 1) distribution.
// 2. U(n-i) / U(n-i+1) ~ Beta(n-i, 1)
// So we can use a recursive approach to generate the top k order statistics
// (See: https://www.math.ntu.edu.tw/~hchen/teaching/LargeSample/notes/noteorder.pdf)
static vector<double> GenerateTopKFromUniform(ReservoirRNG &random, idx_t n, idx_t k) {
vector<double> top_k_values(k);
double current_bound = 1.0;
for (idx_t i = 0; i < k; i++) {
// generate a sample from Beta(n - i, 1)
double beta = std::pow(random.NextRandom(), 1.0 / double(n - i));
current_bound *= beta;
top_k_values[i] = current_bound;
}
return top_k_values;
}

void BaseReservoirSampling::FillWeights(SelectionVector &sel, idx_t &sel_size) {
if (!reservoir_weights.empty()) {
return;
}
D_ASSERT(reservoir_weights.empty());
auto num_entries_seen_normalized = num_entries_seen_total / FIXED_SAMPLE_SIZE;
auto min_weight = GetMinWeightFromTuplesSeen(num_entries_seen_normalized);
auto weights = GenerateTopKFromUniform(random, num_entries_seen_total, sel_size);
std::shuffle(weights.begin(), weights.end(), random);
for (idx_t i = 0; i < sel_size; i++) {
auto weight = random.NextRandom(min_weight, 1);
auto weight = weights[i];
reservoir_weights.emplace(-weight, i);
}
D_ASSERT(reservoir_weights.size() <= sel_size);
Expand Down
6 changes: 3 additions & 3 deletions src/duckdb/src/function/table/version/pragma_version.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
#ifndef DUCKDB_PATCH_VERSION
#define DUCKDB_PATCH_VERSION "0-dev3028"
#define DUCKDB_PATCH_VERSION "0-dev3047"
#endif
#ifndef DUCKDB_MINOR_VERSION
#define DUCKDB_MINOR_VERSION 4
Expand All @@ -8,10 +8,10 @@
#define DUCKDB_MAJOR_VERSION 1
#endif
#ifndef DUCKDB_VERSION
#define DUCKDB_VERSION "v1.4.0-dev3028"
#define DUCKDB_VERSION "v1.4.0-dev3047"
#endif
#ifndef DUCKDB_SOURCE_ID
#define DUCKDB_SOURCE_ID "a8206a211f"
#define DUCKDB_SOURCE_ID "129b1fe55e"
#endif
#include "duckdb/function/table/system_functions.hpp"
#include "duckdb/main/database.hpp"
Expand Down
17 changes: 9 additions & 8 deletions src/duckdb/src/function/window/window_aggregate_function.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ WindowAggregateExecutor::GetLocalState(ExecutionContext &context, const WindowEx

void WindowAggregateExecutor::Sink(ExecutionContext &context, DataChunk &sink_chunk, DataChunk &coll_chunk,
const idx_t input_idx, WindowExecutorGlobalState &gstate,
WindowExecutorLocalState &lstate) const {
WindowExecutorLocalState &lstate, InterruptState &interrupt) const {
auto &gastate = gstate.Cast<WindowAggregateExecutorGlobalState>();
auto &lastate = lstate.Cast<WindowAggregateExecutorLocalState>();
auto &filter_sel = lastate.filter_sel;
Expand All @@ -147,9 +147,9 @@ void WindowAggregateExecutor::Sink(ExecutionContext &context, DataChunk &sink_ch
D_ASSERT(aggregator);
auto &gestate = *gastate.gsink;
auto &lestate = *lastate.aggregator_state;
aggregator->Sink(context, gestate, lestate, sink_chunk, coll_chunk, input_idx, filtering, filtered);
aggregator->Sink(context, gestate, lestate, sink_chunk, coll_chunk, input_idx, filtering, filtered, interrupt);

WindowExecutor::Sink(context, sink_chunk, coll_chunk, input_idx, gstate, lstate);
WindowExecutor::Sink(context, sink_chunk, coll_chunk, input_idx, gstate, lstate, interrupt);
}

static void ApplyWindowStats(const WindowBoundary &boundary, FrameDelta &delta, BaseStatistics *base, bool is_start) {
Expand Down Expand Up @@ -216,8 +216,9 @@ static void ApplyWindowStats(const WindowBoundary &boundary, FrameDelta &delta,
}

void WindowAggregateExecutor::Finalize(ExecutionContext &context, WindowExecutorGlobalState &gstate,
WindowExecutorLocalState &lstate, CollectionPtr collection) const {
WindowExecutor::Finalize(context, gstate, lstate, collection);
WindowExecutorLocalState &lstate, CollectionPtr collection,
InterruptState &interrupt) const {
WindowExecutor::Finalize(context, gstate, lstate, collection, interrupt);

auto &gastate = gstate.Cast<WindowAggregateExecutorGlobalState>();
auto &gsink = gastate.gsink;
Expand All @@ -239,20 +240,20 @@ void WindowAggregateExecutor::Finalize(ExecutionContext &context, WindowExecutor
ApplyWindowStats(wexpr.end, stats[1], base, false);

auto &lastate = lstate.Cast<WindowAggregateExecutorLocalState>();
aggregator->Finalize(context, *gsink, *lastate.aggregator_state, collection, stats);
aggregator->Finalize(context, *gsink, *lastate.aggregator_state, collection, stats, interrupt);
}

void WindowAggregateExecutor::EvaluateInternal(ExecutionContext &context, WindowExecutorGlobalState &gstate,
WindowExecutorLocalState &lstate, DataChunk &eval_chunk, Vector &result,
idx_t count, idx_t row_idx) const {
idx_t count, idx_t row_idx, InterruptState &interrupt) const {
auto &gastate = gstate.Cast<WindowAggregateExecutorGlobalState>();
auto &lastate = lstate.Cast<WindowAggregateExecutorLocalState>();
auto &gsink = gastate.gsink;
D_ASSERT(aggregator);

auto &agg_state = *lastate.aggregator_state;

aggregator->Evaluate(context, *gsink, agg_state, lastate.bounds, result, count, row_idx);
aggregator->Evaluate(context, *gsink, agg_state, lastate.bounds, result, count, row_idx, interrupt);
}

} // namespace duckdb
4 changes: 2 additions & 2 deletions src/duckdb/src/function/window/window_aggregator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ void WindowAggregatorLocalState::Sink(ExecutionContext &context, WindowAggregato

void WindowAggregator::Sink(ExecutionContext &context, WindowAggregatorState &gstate, WindowAggregatorState &lstate,
DataChunk &sink_chunk, DataChunk &coll_chunk, idx_t input_idx,
optional_ptr<SelectionVector> filter_sel, idx_t filtered) {
optional_ptr<SelectionVector> filter_sel, idx_t filtered, InterruptState &interrupt) {
auto &gastate = gstate.Cast<WindowAggregatorGlobalState>();
auto &lastate = lstate.Cast<WindowAggregatorLocalState>();
lastate.Sink(context, gastate, sink_chunk, coll_chunk, input_idx);
Expand Down Expand Up @@ -80,7 +80,7 @@ void WindowAggregatorLocalState::Finalize(ExecutionContext &context, WindowAggre
}

void WindowAggregator::Finalize(ExecutionContext &context, WindowAggregatorState &gstate, WindowAggregatorState &lstate,
CollectionPtr collection, const FrameStats &stats) {
CollectionPtr collection, const FrameStats &stats, InterruptState &interrupt) {
auto &gasink = gstate.Cast<WindowAggregatorGlobalState>();
auto &lastate = lstate.Cast<WindowAggregatorLocalState>();
lastate.Finalize(context, gasink, collection);
Expand Down
7 changes: 4 additions & 3 deletions src/duckdb/src/function/window/window_constant_aggregator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,8 @@ unique_ptr<WindowAggregatorState> WindowConstantAggregator::GetGlobalState(Clien

void WindowConstantAggregator::Sink(ExecutionContext &context, WindowAggregatorState &gsink,
WindowAggregatorState &lstate, DataChunk &sink_chunk, DataChunk &coll_chunk,
idx_t input_idx, optional_ptr<SelectionVector> filter_sel, idx_t filtered) {
idx_t input_idx, optional_ptr<SelectionVector> filter_sel, idx_t filtered,
InterruptState &interrupt) {
auto &lastate = lstate.Cast<WindowConstantAggregatorLocalState>();

lastate.Sink(context, sink_chunk, coll_chunk, input_idx, filter_sel, filtered);
Expand Down Expand Up @@ -300,7 +301,7 @@ void WindowConstantAggregatorLocalState::Sink(ExecutionContext &context, DataChu

void WindowConstantAggregator::Finalize(ExecutionContext &context, WindowAggregatorState &gstate,
WindowAggregatorState &lstate, CollectionPtr collection,
const FrameStats &stats) {
const FrameStats &stats, InterruptState &interrupt) {
auto &gastate = gstate.Cast<WindowConstantAggregatorGlobalState>();
auto &lastate = lstate.Cast<WindowConstantAggregatorLocalState>();

Expand All @@ -320,7 +321,7 @@ unique_ptr<WindowAggregatorState> WindowConstantAggregator::GetLocalState(const

void WindowConstantAggregator::Evaluate(ExecutionContext &context, const WindowAggregatorState &gsink,
WindowAggregatorState &lstate, const DataChunk &bounds, Vector &result,
idx_t count, idx_t row_idx) const {
idx_t count, idx_t row_idx, InterruptState &interrupt) const {
auto &gasink = gsink.Cast<WindowConstantAggregatorGlobalState>();
const auto &partition_offsets = gasink.partition_offsets;
const auto &results = *gasink.results;
Expand Down
Loading
Loading