Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
111 changes: 65 additions & 46 deletions src/duckdb/src/execution/operator/persistent/physical_delete.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,11 @@ namespace duckdb {

PhysicalDelete::PhysicalDelete(PhysicalPlan &physical_plan, vector<LogicalType> types, TableCatalogEntry &tableref,
DataTable &table, vector<unique_ptr<BoundConstraint>> bound_constraints,
idx_t row_id_index, idx_t estimated_cardinality, bool return_chunk)
idx_t row_id_index, idx_t estimated_cardinality, bool return_chunk,
vector<idx_t> return_columns)
: PhysicalOperator(physical_plan, PhysicalOperatorType::DELETE_OPERATOR, std::move(types), estimated_cardinality),
tableref(tableref), table(table), bound_constraints(std::move(bound_constraints)), row_id_index(row_id_index),
return_chunk(return_chunk) {
return_chunk(return_chunk), return_columns(std::move(return_columns)) {
}
//===--------------------------------------------------------------------===//
// Sink
Expand Down Expand Up @@ -64,7 +65,6 @@ SinkResultType PhysicalDelete::Sink(ExecutionContext &context, DataChunk &chunk,
auto &g_state = input.global_state.Cast<DeleteGlobalState>();
auto &l_state = input.local_state.Cast<DeleteLocalState>();

auto &transaction = DuckTransaction::Get(context.client, table.db);
auto &row_ids = chunk.data[row_id_index];

lock_guard<mutex> delete_guard(g_state.delete_lock);
Expand All @@ -74,59 +74,78 @@ SinkResultType PhysicalDelete::Sink(ExecutionContext &context, DataChunk &chunk,
}

auto types = table.GetTypes();
auto to_be_fetched = vector<bool>(types.size(), return_chunk);
vector<StorageIndex> column_ids;
vector<LogicalType> column_types;
if (return_chunk) {
// Fetch all columns.
column_types = types;
l_state.delete_chunk.Reset();
row_ids.Flatten(chunk.size());

// Check if we can use columns from the input chunk (passed through from the scan)
// instead of fetching them by row ID
bool use_input_columns = !return_columns.empty();

if (use_input_columns) {
// Use columns from the input chunk - they were passed through from the scan
for (idx_t i = 0; i < table.ColumnCount(); i++) {
column_ids.emplace_back(i);
D_ASSERT(return_columns[i] != DConstants::INVALID_INDEX);
l_state.delete_chunk.data[i].Reference(chunk.data[return_columns[i]]);
}

l_state.delete_chunk.SetCardinality(chunk.size());
} else {
// Fetch only the required columns for updating the delete indexes.
auto &local_storage = LocalStorage::Get(context.client, table.db);
auto storage = local_storage.GetStorage(table);
unordered_set<column_t> indexed_column_id_set;
storage->delete_indexes.Scan([&](Index &index) {
if (!index.IsBound() || !index.IsUnique()) {
// Fall back to fetching columns by row ID
// This path is used when:
// - Table has generated columns (can't be scanned, must be computed)
// - Unique indexes exist but no RETURNING (need indexed columns for delete tracking)
// - MERGE INTO operations (optimization not implemented there yet)
auto &transaction = DuckTransaction::Get(context.client, table.db);
auto to_be_fetched = vector<bool>(types.size(), return_chunk);
vector<StorageIndex> column_ids;
vector<LogicalType> column_types;

if (return_chunk) {
// Fetch all columns.
column_types = types;
for (idx_t i = 0; i < table.ColumnCount(); i++) {
column_ids.emplace_back(i);
}
} else {
// Fetch only the required columns for updating the delete indexes.
auto &local_storage = LocalStorage::Get(context.client, table.db);
auto storage = local_storage.GetStorage(table);
unordered_set<column_t> indexed_column_id_set;
storage->delete_indexes.Scan([&](Index &index) {
if (!index.IsBound() || !index.IsUnique()) {
return false;
}
auto &set = index.GetColumnIdSet();
indexed_column_id_set.insert(set.begin(), set.end());
return false;
});
for (auto &col : indexed_column_id_set) {
column_ids.emplace_back(col);
}
sort(column_ids.begin(), column_ids.end());
for (auto &col : column_ids) {
auto i = col.GetPrimaryIndex();
to_be_fetched[i] = true;
column_types.push_back(types[i]);
}
auto &set = index.GetColumnIdSet();
indexed_column_id_set.insert(set.begin(), set.end());
return false;
});
for (auto &col : indexed_column_id_set) {
column_ids.emplace_back(col);
}
sort(column_ids.begin(), column_ids.end());
for (auto &col : column_ids) {
auto i = col.GetPrimaryIndex();
to_be_fetched[i] = true;
column_types.push_back(types[i]);
}
}

l_state.delete_chunk.Reset();
row_ids.Flatten(chunk.size());
// Fetch the to-be-deleted chunk.
DataChunk fetch_chunk;
fetch_chunk.Initialize(Allocator::Get(context.client), column_types, chunk.size());
auto fetch_state = ColumnFetchState();
table.Fetch(transaction, fetch_chunk, column_ids, row_ids, chunk.size(), fetch_state);

// Fetch the to-be-deleted chunk.
DataChunk fetch_chunk;
fetch_chunk.Initialize(Allocator::Get(context.client), column_types, chunk.size());
auto fetch_state = ColumnFetchState();
table.Fetch(transaction, fetch_chunk, column_ids, row_ids, chunk.size(), fetch_state);

// Reference the necessary columns of the fetch_chunk.
idx_t fetch_idx = 0;
for (idx_t i = 0; i < table.ColumnCount(); i++) {
if (to_be_fetched[i]) {
l_state.delete_chunk.data[i].Reference(fetch_chunk.data[fetch_idx++]);
continue;
// Reference the necessary columns of the fetch_chunk.
idx_t fetch_idx = 0;
for (idx_t i = 0; i < table.ColumnCount(); i++) {
if (to_be_fetched[i]) {
l_state.delete_chunk.data[i].Reference(fetch_chunk.data[fetch_idx++]);
continue;
}
l_state.delete_chunk.data[i].Reference(Value(types[i]));
}
l_state.delete_chunk.data[i].Reference(Value(types[i]));
l_state.delete_chunk.SetCardinality(fetch_chunk);
}
l_state.delete_chunk.SetCardinality(fetch_chunk);

// Append the deleted row IDs to the delete indexes.
// If we only delete local row IDs, then the delete_chunk is empty.
Expand Down
3 changes: 2 additions & 1 deletion src/duckdb/src/execution/physical_plan/plan_delete.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@ PhysicalOperator &DuckCatalog::PlanDelete(ClientContext &context, PhysicalPlanGe
// Get the row_id column index.
auto &bound_ref = op.expressions[0]->Cast<BoundReferenceExpression>();
auto &del = planner.Make<PhysicalDelete>(op.types, op.table, op.table.GetStorage(), std::move(op.bound_constraints),
bound_ref.index, op.estimated_cardinality, op.return_chunk);
bound_ref.index, op.estimated_cardinality, op.return_chunk,
std::move(op.return_columns));
del.children.push_back(plan);
return del;
}
Expand Down
6 changes: 3 additions & 3 deletions src/duckdb/src/execution/physical_plan/plan_merge_into.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,9 @@ unique_ptr<MergeIntoOperator> PlanMergeIntoAction(ClientContext &context, Logica
break;
}
case MergeActionType::MERGE_DELETE: {
result->op =
planner.Make<PhysicalDelete>(std::move(return_types), op.table, op.table.GetStorage(),
std::move(bound_constraints), op.row_id_start, cardinality, op.return_chunk);
result->op = planner.Make<PhysicalDelete>(std::move(return_types), op.table, op.table.GetStorage(),
std::move(bound_constraints), op.row_id_start, cardinality,
op.return_chunk, vector<idx_t>());
break;
}
case MergeActionType::MERGE_INSERT: {
Expand Down
6 changes: 3 additions & 3 deletions src/duckdb/src/function/table/version/pragma_version.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
#ifndef DUCKDB_PATCH_VERSION
#define DUCKDB_PATCH_VERSION "0-dev5469"
#define DUCKDB_PATCH_VERSION "0-dev5476"
#endif
#ifndef DUCKDB_MINOR_VERSION
#define DUCKDB_MINOR_VERSION 5
Expand All @@ -8,10 +8,10 @@
#define DUCKDB_MAJOR_VERSION 1
#endif
#ifndef DUCKDB_VERSION
#define DUCKDB_VERSION "v1.5.0-dev5469"
#define DUCKDB_VERSION "v1.5.0-dev5476"
#endif
#ifndef DUCKDB_SOURCE_ID
#define DUCKDB_SOURCE_ID "a83f5f359d"
#define DUCKDB_SOURCE_ID "1c62e11b82"
#endif
#include "duckdb/function/table/system_functions.hpp"
#include "duckdb/main/database.hpp"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,14 @@ class PhysicalDelete : public PhysicalOperator {
public:
PhysicalDelete(PhysicalPlan &physical_plan, vector<LogicalType> types, TableCatalogEntry &tableref,
DataTable &table, vector<unique_ptr<BoundConstraint>> bound_constraints, idx_t row_id_index,
idx_t estimated_cardinality, bool return_chunk);
idx_t estimated_cardinality, bool return_chunk, vector<idx_t> return_columns);

TableCatalogEntry &tableref;
DataTable &table;
vector<unique_ptr<BoundConstraint>> bound_constraints;
idx_t row_id_index;
bool return_chunk;
vector<idx_t> return_columns;

public:
// Source interface
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ class LogicalDelete : public LogicalOperator {
TableCatalogEntry &table;
idx_t table_index;
bool return_chunk;
vector<idx_t> return_columns;
vector<unique_ptr<BoundConstraint>> bound_constraints;

public:
Expand Down
3 changes: 2 additions & 1 deletion src/duckdb/src/main/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -600,7 +600,8 @@ idx_t DBConfig::ParseMemoryLimit(const string &arg) {

if (!error.empty()) {
if (error == "Memory cannot be negative") {
result = -1;
// mapping negative memory values to infinite
return NumericLimits<idx_t>::Maximum();
} else {
throw ParserException(error);
}
Expand Down
28 changes: 28 additions & 0 deletions src/duckdb/src/planner/binder/statement/bind_delete.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,34 @@ BoundStatement Binder::Bind(DeleteStatement &stmt) {
// create the delete node
auto del = make_uniq<LogicalDelete>(table, GenerateTableIndex());
del->bound_constraints = BindConstraints(table);

// If RETURNING is present, add all table columns to the scan so we can pass them through
// instead of having to fetch them by row ID in PhysicalDelete
// Skip this optimization if the table has generated columns, as they need to be computed
// rather than scanned
if (!stmt.returning_list.empty() && !table.HasGeneratedColumns()) {
auto &column_ids = get.GetColumnIds();
auto column_count = table.GetColumns().LogicalColumnCount();

// Build a map of which table columns are already in the scan
// and track their indices in the input chunk
del->return_columns.resize(column_count, DConstants::INVALID_INDEX);
for (idx_t chunk_idx = 0; chunk_idx < column_ids.size(); chunk_idx++) {
auto &col_id = column_ids[chunk_idx];
if (!col_id.IsVirtualColumn() && col_id.GetPrimaryIndex() < column_count) {
del->return_columns[col_id.GetPrimaryIndex()] = chunk_idx;
}
}

// Add any missing columns to the scan
for (idx_t col_idx = 0; col_idx < column_count; col_idx++) {
if (del->return_columns[col_idx] == DConstants::INVALID_INDEX) {
del->return_columns[col_idx] = column_ids.size();
get.AddColumnId(col_idx);
}
}
}

del->AddChild(std::move(root));

// bind the row id columns and add them to the projection list
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -408,6 +408,7 @@ void LogicalDelete::Serialize(Serializer &serializer) const {
serializer.WritePropertyWithDefault<idx_t>(201, "table_index", table_index);
serializer.WritePropertyWithDefault<bool>(202, "return_chunk", return_chunk);
serializer.WritePropertyWithDefault<vector<unique_ptr<Expression>>>(203, "expressions", expressions);
serializer.WritePropertyWithDefault<vector<idx_t>>(204, "return_columns", return_columns);
}

unique_ptr<LogicalOperator> LogicalDelete::Deserialize(Deserializer &deserializer) {
Expand All @@ -416,6 +417,7 @@ unique_ptr<LogicalOperator> LogicalDelete::Deserialize(Deserializer &deserialize
deserializer.ReadPropertyWithDefault<idx_t>(201, "table_index", result->table_index);
deserializer.ReadPropertyWithDefault<bool>(202, "return_chunk", result->return_chunk);
deserializer.ReadPropertyWithDefault<vector<unique_ptr<Expression>>>(203, "expressions", result->expressions);
deserializer.ReadPropertyWithDefault<vector<idx_t>>(204, "return_columns", result->return_columns);
return std::move(result);
}

Expand Down