Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions src/duckdb/extension/parquet/parquet_metadata.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -709,6 +709,9 @@ void ParquetMetaDataOperator::BindSchema<ParquetMetadataOperatorType::FILE_META_

names.emplace_back("footer_size");
return_types.emplace_back(LogicalType::UBIGINT);

names.emplace_back("column_orders");
return_types.emplace_back(LogicalType::LIST(LogicalType::VARCHAR));
}

idx_t ParquetFileMetadataProcessor::TotalRowCount(ParquetReader &reader) {
Expand Down Expand Up @@ -739,6 +742,17 @@ void ParquetFileMetadataProcessor::ReadRow(vector<reference<Vector>> &output, id
output[7].get().SetValue(output_idx, Value::UBIGINT(reader.GetHandle().GetFileSize()));
// footer_size
output[8].get().SetValue(output_idx, Value::UBIGINT(reader.metadata->footer_size));
// column_orders
Value column_orders_value;
if (meta_data->__isset.column_orders) {
vector<Value> column_orders;
column_orders.reserve(meta_data->column_orders.size());
for (auto &column_order : meta_data->column_orders) {
column_orders.push_back(Value(ConvertParquetElementToString(column_order)));
}
column_orders_value = Value::LIST(LogicalType::VARCHAR, column_orders);
}
output[9].get().SetValue(output_idx, column_orders_value);
}

//===--------------------------------------------------------------------===//
Expand Down
5 changes: 5 additions & 0 deletions src/duckdb/extension/parquet/parquet_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -411,6 +411,11 @@ ParquetWriter::ParquetWriter(ClientContext &context, FileSystem &fs, string file
file_meta_data.created_by =
StringUtil::Format("DuckDB version %s (build %s)", DuckDB::LibraryVersion(), DuckDB::SourceID());

duckdb_parquet::ColumnOrder column_order;
column_order.__set_TYPE_ORDER(duckdb_parquet::TypeDefinedOrder());
file_meta_data.column_orders.resize(column_names.size(), column_order);
file_meta_data.__isset.column_orders = true;

for (auto &kv_pair : kv_metadata) {
duckdb_parquet::KeyValue kv;
kv.__set_key(kv_pair.first);
Expand Down
21 changes: 21 additions & 0 deletions src/duckdb/src/common/enum_util.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@
#include "duckdb/execution/index/unbound_index.hpp"
#include "duckdb/execution/operator/csv_scanner/csv_option.hpp"
#include "duckdb/execution/operator/csv_scanner/csv_state.hpp"
#include "duckdb/execution/physical_operator.hpp"
#include "duckdb/execution/physical_table_scan_enum.hpp"
#include "duckdb/execution/reservoir_sample.hpp"
#include "duckdb/function/aggregate_state.hpp"
Expand Down Expand Up @@ -3218,6 +3219,26 @@ OnEntryNotFound EnumUtil::FromString<OnEntryNotFound>(const char *value) {
return static_cast<OnEntryNotFound>(StringUtil::StringToEnum(GetOnEntryNotFoundValues(), 2, "OnEntryNotFound", value));
}

const StringUtil::EnumStringLiteral *GetOperatorCachingModeValues() {
static constexpr StringUtil::EnumStringLiteral values[] {
{ static_cast<uint32_t>(OperatorCachingMode::NONE), "NONE" },
{ static_cast<uint32_t>(OperatorCachingMode::PARTITIONED), "PARTITIONED" },
{ static_cast<uint32_t>(OperatorCachingMode::ORDERED), "ORDERED" },
{ static_cast<uint32_t>(OperatorCachingMode::UNORDERED), "UNORDERED" }
};
return values;
}

template<>
const char* EnumUtil::ToChars<OperatorCachingMode>(OperatorCachingMode value) {
return StringUtil::EnumToString(GetOperatorCachingModeValues(), 4, "OperatorCachingMode", static_cast<uint32_t>(value));
}

template<>
OperatorCachingMode EnumUtil::FromString<OperatorCachingMode>(const char *value) {
return static_cast<OperatorCachingMode>(StringUtil::StringToEnum(GetOperatorCachingModeValues(), 4, "OperatorCachingMode", value));
}

const StringUtil::EnumStringLiteral *GetOperatorFinalResultTypeValues() {
static constexpr StringUtil::EnumStringLiteral values[] {
{ static_cast<uint32_t>(OperatorFinalResultType::FINISHED), "FINISHED" },
Expand Down
6 changes: 6 additions & 0 deletions src/duckdb/src/common/file_system.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -575,6 +575,12 @@ bool FileSystem::TryRemoveFile(const string &filename, optional_ptr<FileOpener>
return false;
}

void FileSystem::RemoveFiles(const vector<string> &filenames, optional_ptr<FileOpener> opener) {
for (const auto &filename : filenames) {
TryRemoveFile(filename, opener);
}
}

void FileSystem::FileSync(FileHandle &handle) {
throw NotImplementedException("%s: FileSync is not implemented!", GetName());
}
Expand Down
2 changes: 1 addition & 1 deletion src/duckdb/src/common/sort/sort.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -339,7 +339,7 @@ SinkFinalizeType Sort::Finalize(ClientContext &context, OperatorSinkFinalizeInpu
gstate.total_count += sorted_run->Count();
maximum_run_count = MaxValue(maximum_run_count, sorted_run->Count());
}
if (gstate.num_threads == 1 || context.config.verify_parallelism) {
if (context.config.verify_parallelism) {
gstate.partition_size = STANDARD_VECTOR_SIZE;
} else {
gstate.partition_size = MinValue<idx_t>(gstate.total_count, DEFAULT_ROW_GROUP_SIZE);
Expand Down
5 changes: 5 additions & 0 deletions src/duckdb/src/common/sort/sorted_run.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,11 @@ void SortedRunScanState::Scan(const SortedRun &sorted_run, const Vector &sort_ke
}
}

void SortedRunScanState::Clear() {
payload_state.pin_state.row_handles.clear();
payload_state.pin_state.heap_handles.clear();
}

template <class SORT_KEY, class PHYSICAL_TYPE>
void TemplatedGetKeyAndPayload(SORT_KEY *const *const sort_keys, SORT_KEY *temp_keys, const idx_t &count,
DataChunk &key, data_ptr_t *const payload_ptrs) {
Expand Down
25 changes: 21 additions & 4 deletions src/duckdb/src/common/sort/sorted_run_merger.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,8 @@ class SortedRunMergerLocalState : public LocalSourceState {
bool TaskFinished() const;
//! Do the work this thread has been assigned
SourceResultType ExecuteTask(SortedRunMergerGlobalState &gstate, optional_ptr<DataChunk> chunk);
//! Clear outstanding allocations
void Clear();

private:
//! Computes upper partition boundaries using K-way Merge Path
Expand Down Expand Up @@ -315,6 +317,13 @@ SortedRunMergerLocalState::SortedRunMergerLocalState(SortedRunMergerGlobalState
}
}

void SortedRunMergerLocalState::Clear() {
in_memory_states.clear();
external_states.clear();
merged_partition.Reset();
sorted_run_scan_state.Clear();
}

bool SortedRunMergerLocalState::TaskFinished() const {
switch (task) {
case SortedRunMergerTask::COMPUTE_BOUNDARIES:
Expand Down Expand Up @@ -856,7 +865,13 @@ SourceResultType SortedRunMerger::GetData(ExecutionContext &, DataChunk &chunk,
}
}

return chunk.size() == 0 ? SourceResultType::FINISHED : SourceResultType::HAVE_MORE_OUTPUT;
if (chunk.size() != 0) {
return SourceResultType::HAVE_MORE_OUTPUT;
}

// Done
lstate.Clear();
return SourceResultType::FINISHED;
}

OperatorPartitionData SortedRunMerger::GetPartitionData(ExecutionContext &, DataChunk &, GlobalSourceState &,
Expand Down Expand Up @@ -890,6 +905,7 @@ SourceResultType SortedRunMerger::MaterializeSortedRun(ExecutionContext &, Opera
break;
}
}
lstate.Clear(); // Done

// The thread that completes the materialization returns FINISHED, all other threads return HAVE_MORE_OUTPUT
return res;
Expand All @@ -904,11 +920,12 @@ unique_ptr<SortedRun> SortedRunMerger::GetSortedRun(GlobalSourceState &global_st
}
auto &target = *gstate.materialized_partitions[0];
for (idx_t i = 1; i < gstate.materialized_partitions.size(); i++) {
auto &source = *gstate.materialized_partitions[i];
target.key_data->Combine(*source.key_data);
auto &source = gstate.materialized_partitions[i];
target.key_data->Combine(*source->key_data);
if (target.payload_data) {
target.payload_data->Combine(*source.payload_data);
target.payload_data->Combine(*source->payload_data);
}
source.reset();
}
auto res = std::move(gstate.materialized_partitions[0]);
gstate.materialized_partitions.clear();
Expand Down
2 changes: 1 addition & 1 deletion src/duckdb/src/common/types/data_chunk.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -131,8 +131,8 @@ void DataChunk::Reference(DataChunk &chunk) {
}

void DataChunk::Move(DataChunk &chunk) {
SetCardinality(chunk);
SetCapacity(chunk);
SetCardinality(chunk);
data = std::move(chunk.data);
vector_caches = std::move(chunk.vector_caches);

Expand Down
16 changes: 8 additions & 8 deletions src/duckdb/src/common/types/row/partitioned_tuple_data.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,9 @@ void PartitionedTupleData::AppendUnified(PartitionedTupleDataAppendState &state,
auto &partition = *partitions[partition_index.GetIndex()];
auto &partition_pin_state = state.partition_pin_states[partition_index.GetIndex()];

const auto size_before = partition.SizeInBytes();
const auto size_before = partition.data_size;
partition.AppendUnified(partition_pin_state, state.chunk_state, input, append_sel, actual_append_count);
data_size += partition.SizeInBytes() - size_before;
data_size += partition.data_size - size_before;
} else {
// Compute the heap sizes for the whole chunk
if (!layout.AllConstant()) {
Expand Down Expand Up @@ -103,9 +103,9 @@ void PartitionedTupleData::Append(PartitionedTupleDataAppendState &state, TupleD

state.chunk_state.heap_sizes.Reference(input.heap_sizes);

const auto size_before = partition.SizeInBytes();
const auto size_before = partition.data_size;
partition.Build(partition_pin_state, state.chunk_state, 0, append_count);
data_size += partition.SizeInBytes() - size_before;
data_size += partition.data_size - size_before;

partition.CopyRows(state.chunk_state, input, *FlatVector::IncrementalSelectionVector(), append_count);
} else {
Expand Down Expand Up @@ -224,9 +224,9 @@ void PartitionedTupleData::BuildBufferSpace(PartitionedTupleDataAppendState &sta
const auto partition_offset = partition_entry.offset - partition_length;

// Build out the buffer space for this partition
const auto size_before = partition.SizeInBytes();
const auto size_before = partition.data_size;
partition.Build(partition_pin_state, state.chunk_state, partition_offset, partition_length);
data_size += partition.SizeInBytes() - size_before;
data_size += partition.data_size - size_before;
}
}

Expand Down Expand Up @@ -337,7 +337,7 @@ idx_t PartitionedTupleData::Count() const {
}

idx_t PartitionedTupleData::SizeInBytes() const {
return data_size;
return data_size + stl_allocator->AllocationSize();
}

idx_t PartitionedTupleData::PartitionCount() const {
Expand All @@ -361,7 +361,7 @@ void PartitionedTupleData::Verify() const {
for (auto &partition : partitions) {
partition->Verify();
total_count += partition->Count();
total_size += partition->SizeInBytes();
total_size += partition->data_size;
}
D_ASSERT(total_count == this->count);
D_ASSERT(total_size == this->data_size);
Expand Down
2 changes: 1 addition & 1 deletion src/duckdb/src/common/types/row/tuple_data_collection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ idx_t TupleDataCollection::ChunkCount() const {
}

idx_t TupleDataCollection::SizeInBytes() const {
return data_size;
return data_size + stl_allocator->AllocationSize();
}

void TupleDataCollection::Unpin() {
Expand Down
11 changes: 11 additions & 0 deletions src/duckdb/src/common/virtual_file_system.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,17 @@ bool VirtualFileSystem::TryRemoveFile(const string &filename, optional_ptr<FileO
return FindFileSystem(filename).TryRemoveFile(filename, opener);
}

void VirtualFileSystem::RemoveFiles(const vector<string> &filenames, optional_ptr<FileOpener> opener) {
reference_map_t<FileSystem, vector<string>> files_by_fs;
for (const auto &filename : filenames) {
auto &fs = FindFileSystem(filename);
files_by_fs[fs].push_back(filename);
}
for (auto &entry : files_by_fs) {
entry.first.get().RemoveFiles(entry.second, opener);
}
}

string VirtualFileSystem::PathSeparator(const string &path) {
return FindFileSystem(path).PathSeparator(path);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,18 @@ using vector_of_value_map_t = unordered_map<vector<Value>, T, VectorOfValuesHash

class CopyToFunctionGlobalState : public GlobalSinkState {
public:
explicit CopyToFunctionGlobalState(ClientContext &context)
: initialized(false), rows_copied(0), last_file_offset(0),
explicit CopyToFunctionGlobalState(ClientContext &context_p)
: context(context_p), finalized(false), initialized(false), rows_copied(0), last_file_offset(0),
file_write_lock_if_rotating(make_uniq<StorageLock>()) {
max_open_files = DBConfig::GetSetting<PartitionedWriteMaxOpenFilesSetting>(context);
}
~CopyToFunctionGlobalState() override;

ClientContext &context;
//! Whether the copy was successfully finalized
bool finalized;
//! The list of files created by this operator
vector<string> created_files;

StorageLock lock;
atomic<bool> initialized;
Expand All @@ -79,6 +86,7 @@ class CopyToFunctionGlobalState : public GlobalSinkState {
return;
}
// initialize writing to the file
created_files.push_back(op.file_path);
global_state = op.function.copy_to_initialize_global(context, *op.bind_data, op.file_path);
if (op.function.initialize_operator) {
op.function.initialize_operator(*global_state, op);
Expand Down Expand Up @@ -198,6 +206,7 @@ class CopyToFunctionGlobalState : public GlobalSinkState {
full_path = op.filename_pattern.CreateFilename(fs, hive_path, op.file_extension, offset);
}
}
created_files.push_back(full_path);
optional_ptr<CopyToFileInfo> written_file_info;
if (op.return_type != CopyFunctionReturnType::CHANGED_ROWS) {
written_file_info = AddFile(*global_lock, full_path, op.return_type);
Expand Down Expand Up @@ -244,6 +253,17 @@ class CopyToFunctionGlobalState : public GlobalSinkState {
idx_t global_offset = 0;
};

CopyToFunctionGlobalState::~CopyToFunctionGlobalState() {
if (!initialized || finalized || created_files.empty()) {
return;
}
// If we reach here, the query failed before Finalize was called
auto &fs = FileSystem::GetFileSystem(context);
for (auto &file : created_files) {
fs.TryRemoveFile(file);
}
}

string PhysicalCopyToFile::GetTrimmedPath(ClientContext &context) const {
auto &fs = FileSystem::GetFileSystem(context);
string trimmed_path = file_path;
Expand Down Expand Up @@ -356,6 +376,7 @@ unique_ptr<GlobalFunctionData> PhysicalCopyToFile::CreateFileState(ClientContext
idx_t this_file_offset = g.last_file_offset++;
auto &fs = FileSystem::GetFileSystem(context);
string output_path(filename_pattern.CreateFilename(fs, file_path, file_extension, this_file_offset));
g.created_files.push_back(output_path);
optional_ptr<CopyToFileInfo> written_file_info;
if (return_type != CopyFunctionReturnType::CHANGED_ROWS) {
written_file_info = g.AddFile(global_lock, output_path, return_type);
Expand Down Expand Up @@ -412,9 +433,7 @@ void CheckDirectory(FileSystem &fs, const string &file_path, CopyOverwriteMode o
return;
}
if (overwrite_mode == CopyOverwriteMode::COPY_OVERWRITE) {
for (auto &file : file_list) {
fs.RemoveFile(file);
}
fs.RemoveFiles(file_list);
} else {
throw IOException("Directory \"%s\" is not empty! Enable OVERWRITE option to overwrite files", file_path);
}
Expand Down Expand Up @@ -639,7 +658,10 @@ SinkFinalizeType PhysicalCopyToFile::FinalizeInternal(ClientContext &context, Gl

SinkFinalizeType PhysicalCopyToFile::Finalize(Pipeline &pipeline, Event &event, ClientContext &context,
OperatorSinkFinalizeInput &input) const {
return FinalizeInternal(context, input.global_state);
auto &gstate = input.global_state.Cast<CopyToFunctionGlobalState>();
auto result = FinalizeInternal(context, input.global_state);
gstate.finalized = true;
return result;
}

//===--------------------------------------------------------------------===//
Expand Down
Loading