Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions src/duckdb/extension/parquet/column_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -534,10 +534,10 @@ ColumnWriter::CreateWriterRecursive(ClientContext &context, ParquetWriter &write
template <>
struct NumericLimits<float_na_equal> {
static constexpr float Minimum() {
return std::numeric_limits<float>::lowest();
return NumericLimits<float>::Minimum();
};
static constexpr float Maximum() {
return std::numeric_limits<float>::max();
return NumericLimits<float>::Maximum();
};
static constexpr bool IsSigned() {
return std::is_signed<float>::value;
Expand All @@ -550,10 +550,10 @@ struct NumericLimits<float_na_equal> {
template <>
struct NumericLimits<double_na_equal> {
static constexpr double Minimum() {
return std::numeric_limits<double>::lowest();
return NumericLimits<double>::Minimum();
};
static constexpr double Maximum() {
return std::numeric_limits<double>::max();
return NumericLimits<double>::Maximum();
};
static constexpr bool IsSigned() {
return std::is_signed<double>::value;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,8 @@ class StandardColumnWriter : public PrimitiveColumnWriter {
public:
unique_ptr<ColumnWriterState> InitializeWriteState(duckdb_parquet::RowGroup &row_group) override {
auto result = make_uniq<StandardColumnWriterState<SRC, TGT, OP>>(writer, row_group, row_group.columns.size());
result->encoding = duckdb_parquet::Encoding::RLE_DICTIONARY;
result->encoding = writer.GetParquetVersion() == ParquetVersion::V1 ? duckdb_parquet::Encoding::PLAIN_DICTIONARY
: duckdb_parquet::Encoding::RLE_DICTIONARY;
RegisterToRowGroup(row_group);
return std::move(result);
}
Expand All @@ -150,6 +151,8 @@ class StandardColumnWriter : public PrimitiveColumnWriter {
}
page_state.dbp_encoder.FinishWrite(temp_writer);
break;
case duckdb_parquet::Encoding::PLAIN_DICTIONARY:
// PLAIN_DICTIONARY can be treated the same as RLE_DICTIONARY
case duckdb_parquet::Encoding::RLE_DICTIONARY:
D_ASSERT(page_state.dict_bit_width != 0);
if (!page_state.dict_written_value) {
Expand Down Expand Up @@ -265,7 +268,8 @@ class StandardColumnWriter : public PrimitiveColumnWriter {

bool HasDictionary(PrimitiveColumnWriterState &state_p) override {
auto &state = state_p.Cast<StandardColumnWriterState<SRC, TGT, OP>>();
return state.encoding == duckdb_parquet::Encoding::RLE_DICTIONARY;
return state.encoding == duckdb_parquet::Encoding::RLE_DICTIONARY ||
state.encoding == duckdb_parquet::Encoding::PLAIN_DICTIONARY;
}

idx_t DictionarySize(PrimitiveColumnWriterState &state_p) override {
Expand All @@ -285,7 +289,8 @@ class StandardColumnWriter : public PrimitiveColumnWriter {

void FlushDictionary(PrimitiveColumnWriterState &state_p, ColumnWriterStatistics *stats) override {
auto &state = state_p.Cast<StandardColumnWriterState<SRC, TGT, OP>>();
D_ASSERT(state.encoding == duckdb_parquet::Encoding::RLE_DICTIONARY);
D_ASSERT(state.encoding == duckdb_parquet::Encoding::RLE_DICTIONARY ||
state.encoding == duckdb_parquet::Encoding::PLAIN_DICTIONARY);

if (writer.EnableBloomFilters()) {
state.bloom_filter =
Expand All @@ -310,7 +315,8 @@ class StandardColumnWriter : public PrimitiveColumnWriter {
idx_t GetRowSize(const Vector &vector, const idx_t index,
const PrimitiveColumnWriterState &state_p) const override {
auto &state = state_p.Cast<StandardColumnWriterState<SRC, TGT, OP>>();
if (state.encoding == duckdb_parquet::Encoding::RLE_DICTIONARY) {
if (state.encoding == duckdb_parquet::Encoding::RLE_DICTIONARY ||
state.encoding == duckdb_parquet::Encoding::PLAIN_DICTIONARY) {
return (state.key_bit_width + 7) / 8;
} else {
return OP::template GetRowSize<SRC, TGT>(vector, index);
Expand All @@ -328,6 +334,8 @@ class StandardColumnWriter : public PrimitiveColumnWriter {
const auto *data_ptr = FlatVector::GetData<SRC>(input_column);

switch (page_state.encoding) {
case duckdb_parquet::Encoding::PLAIN_DICTIONARY:
// PLAIN_DICTIONARY can be treated the same as RLE_DICTIONARY
case duckdb_parquet::Encoding::RLE_DICTIONARY: {
idx_t r = chunk_start;
if (!page_state.dict_written_value) {
Expand Down
23 changes: 23 additions & 0 deletions src/duckdb/src/common/local_file_system.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1283,6 +1283,29 @@ bool LocalFileSystem::OnDiskFile(FileHandle &handle) {
return true;
}

string LocalFileSystem::GetVersionTag(FileHandle &handle) {
// TODO: Fix using FileSystem::Stats for v1.5, which should also fix it for Windows
#ifdef _WIN32
return "";
#else
int fd = handle.Cast<UnixFileHandle>().fd;
struct stat s;
if (fstat(fd, &s) == -1) {
throw IOException("Failed to get file size for file \"%s\": %s", {{"errno", std::to_string(errno)}},
handle.path, strerror(errno));
}

// dev/ino should be enough, but to guard against in-place writes we also add file size and modification time
uint64_t version_tag[4];
Store(NumericCast<uint64_t>(s.st_dev), data_ptr_cast(&version_tag[0]));
Store(NumericCast<uint64_t>(s.st_ino), data_ptr_cast(&version_tag[1]));
Store(NumericCast<uint64_t>(s.st_size), data_ptr_cast(&version_tag[2]));
Store(Timestamp::FromEpochSeconds(s.st_mtime).value, data_ptr_cast(&version_tag[3]));

return string(char_ptr_cast(version_tag), sizeof(uint64_t) * 4);
#endif
}

void LocalFileSystem::Seek(FileHandle &handle, idx_t location) {
if (!CanSeek()) {
throw IOException("Cannot seek in files of this type");
Expand Down
4 changes: 3 additions & 1 deletion src/duckdb/src/execution/index/art/base_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,9 @@ void Node4::DeleteChild(ART &art, Node &node, Node &parent, const uint8_t byte,

auto prev_node4_status = node.GetGateStatus();
Node::FreeNode(art, node);
Prefix::Concat(art, parent, node, child, remaining_byte, prev_node4_status);
// Propagate both the prev_node_4 status and the general gate status (if the gate was earlier on),
// since the concatenation logic depends on both.
Prefix::Concat(art, parent, node, child, remaining_byte, prev_node4_status, status);
}

void Node4::ShrinkNode16(ART &art, Node &node4, Node &node16) {
Expand Down
13 changes: 5 additions & 8 deletions src/duckdb/src/execution/index/art/prefix.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,8 @@ void Prefix::New(ART &art, reference<Node> &ref, const ARTKey &key, const idx_t
}
}

void Prefix::Concat(ART &art, Node &parent, Node &node4, const Node child, uint8_t byte,
const GateStatus node4_status) {
void Prefix::Concat(ART &art, Node &parent, Node &node4, const Node child, uint8_t byte, const GateStatus node4_status,
const GateStatus status) {
// We have four situations from which we enter here:
// 1: PREFIX (parent) - Node4 (prev_node4) - PREFIX (child) - INLINED_LEAF, or
// 2: PREFIX (parent) - Node4 (prev_node4) - INLINED_LEAF (child), or
Expand All @@ -90,10 +90,7 @@ void Prefix::Concat(ART &art, Node &parent, Node &node4, const Node child, uint8
ConcatChildIsGate(art, parent, node4, child, byte);
return;
}

auto inside_gate = parent.GetGateStatus() == GateStatus::GATE_SET;
ConcatInternal(art, parent, node4, child, byte, inside_gate);
return;
ConcatInternal(art, parent, node4, child, byte, status);
}

void Prefix::Reduce(ART &art, Node &node, const idx_t pos) {
Expand Down Expand Up @@ -286,9 +283,9 @@ Prefix Prefix::GetTail(ART &art, const Node &node) {
}

void Prefix::ConcatInternal(ART &art, Node &parent, Node &node4, const Node child, uint8_t byte,
const bool inside_gate) {
const GateStatus status) {
if (child.GetType() == NType::LEAF_INLINED) {
if (inside_gate) {
if (status == GateStatus::GATE_SET) {
if (parent.GetType() == NType::PREFIX) {
// The parent only contained the Node4, so we can now inline 'all the way up',
// and the gate is no longer nested.
Expand Down
93 changes: 68 additions & 25 deletions src/duckdb/src/execution/index/bound_index.cpp
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
#include "duckdb/execution/index/bound_index.hpp"

#include "duckdb/common/array.hpp"
#include "duckdb/common/radix.hpp"
#include "duckdb/common/serializer/serializer.hpp"
#include "duckdb/planner/expression/bound_columnref_expression.hpp"
#include "duckdb/planner/expression/bound_reference_expression.hpp"
#include "duckdb/planner/expression_iterator.hpp"
#include "duckdb/storage/table/append_state.hpp"
#include "duckdb/common/types/selection_vector.hpp"

namespace duckdb {

Expand Down Expand Up @@ -154,39 +156,80 @@ string BoundIndex::AppendRowError(DataChunk &input, idx_t index) {
return error;
}

void BoundIndex::ApplyBufferedReplays(const vector<LogicalType> &table_types,
vector<BufferedIndexData> &buffered_replays,
namespace {

struct BufferedReplayState {
optional_ptr<ColumnDataCollection> buffer = nullptr;
ColumnDataScanState scan_state;
DataChunk current_chunk;
bool scan_initialized = false;
};
} // namespace

void BoundIndex::ApplyBufferedReplays(const vector<LogicalType> &table_types, BufferedIndexReplays &buffered_replays,
const vector<StorageIndex> &mapped_column_ids) {
for (auto &replay : buffered_replays) {
ColumnDataScanState state;
auto &buffered_data = *replay.data;
buffered_data.InitializeScan(state);

DataChunk scan_chunk;
buffered_data.InitializeScanChunk(scan_chunk);
DataChunk table_chunk;
table_chunk.InitializeEmpty(table_types);

while (buffered_data.Scan(state, scan_chunk)) {
for (idx_t i = 0; i < scan_chunk.ColumnCount() - 1; i++) {
auto col_id = mapped_column_ids[i].GetPrimaryIndex();
table_chunk.data[col_id].Reference(scan_chunk.data[i]);
if (!buffered_replays.HasBufferedReplays()) {
return;
}

// We have two replay states: one for inserts and one for deletes. These are indexed into using the
// replay_type. Both scans are interleaved, so the state maintains the position of each scan.
array<BufferedReplayState, 2> replay_states;
DataChunk table_chunk;
table_chunk.InitializeEmpty(table_types);

for (const auto &replay_range : buffered_replays.ranges) {
const auto type_idx = static_cast<idx_t>(replay_range.type);
auto &state = replay_states[type_idx];

// Initialize the scan state if necessary. Take ownership of buffered operations, since we won't need
// them after replaying anyways.
if (!state.scan_initialized) {
state.buffer = buffered_replays.GetBuffer(replay_range.type);
state.buffer->InitializeScan(state.scan_state);
state.buffer->InitializeScanChunk(state.current_chunk);
state.scan_initialized = true;
}

idx_t current_row = replay_range.start;
while (current_row < replay_range.end) {
// Scan the next DataChunk from the ColumnDataCollection buffer if the current row is on or after
// that chunk's starting row index.
if (current_row >= state.scan_state.next_row_index) {
if (!state.buffer->Scan(state.scan_state, state.current_chunk)) {
throw InternalException("Buffered index data exhausted during replay");
}
}
table_chunk.SetCardinality(scan_chunk.size());

switch (replay.type) {
case BufferedIndexReplay::INSERT_ENTRY: {
IndexAppendInfo index_append_info(IndexAppendMode::INSERT_DUPLICATES, nullptr);
auto error = Append(table_chunk, scan_chunk.data.back(), index_append_info);
// We need to process the remaining rows in the current chunk, which is the minimum of the available
// rows in the chunk and the remaining rows in the current range.
const auto offset_in_chunk = current_row - state.scan_state.current_row_index;
const auto available_in_chunk = state.current_chunk.size() - offset_in_chunk;
// [start, end) in ReplayRange is [inclusive, exclusive).
const auto range_remaining = replay_range.end - current_row;
const auto rows_to_process = MinValue<idx_t>(available_in_chunk, range_remaining);

SelectionVector sel(offset_in_chunk, rows_to_process);

for (idx_t col_idx = 0; col_idx < state.current_chunk.ColumnCount() - 1; col_idx++) {
const auto col_id = mapped_column_ids[col_idx].GetPrimaryIndex();
table_chunk.data[col_id].Reference(state.current_chunk.data[col_idx]);
table_chunk.data[col_id].Slice(sel, rows_to_process);
}
table_chunk.SetCardinality(rows_to_process);
Vector row_ids(state.current_chunk.data.back(), sel, rows_to_process);

if (replay_range.type == BufferedIndexReplay::INSERT_ENTRY) {
IndexAppendInfo append_info(IndexAppendMode::INSERT_DUPLICATES, nullptr);
const auto error = Append(table_chunk, row_ids, append_info);
if (error.HasError()) {
throw InternalException("error while applying buffered appends: " + error.Message());
}
current_row += rows_to_process;
continue;
}
case BufferedIndexReplay::DEL_ENTRY: {
Delete(table_chunk, scan_chunk.data.back());
}
}
Delete(table_chunk, row_ids);
current_row += rows_to_process;
}
}
}
Expand Down
31 changes: 21 additions & 10 deletions src/duckdb/src/execution/index/unbound_index.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,6 @@

namespace duckdb {

BufferedIndexData::BufferedIndexData(BufferedIndexReplay replay_type, unique_ptr<ColumnDataCollection> data_p)
: type(replay_type), data(std::move(data_p)) {
}

UnboundIndex::UnboundIndex(unique_ptr<CreateInfo> create_info, IndexStorageInfo storage_info_p,
TableIOManager &table_io_manager, AttachedDatabase &db)
: Index(create_info->Cast<CreateIndexInfo>().column_ids, table_io_manager, db), create_info(std::move(create_info)),
Expand Down Expand Up @@ -40,32 +36,47 @@ void UnboundIndex::CommitDrop() {
}

void UnboundIndex::BufferChunk(DataChunk &index_column_chunk, Vector &row_ids,
const vector<StorageIndex> &mapped_column_ids_p, BufferedIndexReplay replay_type) {
const vector<StorageIndex> &mapped_column_ids_p, const BufferedIndexReplay replay_type) {
D_ASSERT(!column_ids.empty());
auto types = index_column_chunk.GetTypes(); // column types
types.push_back(LogicalType::ROW_TYPE);

auto &allocator = Allocator::Get(db);

BufferedIndexData buffered_data(replay_type, make_uniq<ColumnDataCollection>(allocator, types));

//! First time we are buffering data, canonical column_id mapping is stored.
//! This should be a sorted list of all the physical offsets of Indexed columns on this table.
if (mapped_column_ids.empty()) {
mapped_column_ids = mapped_column_ids_p;
}
D_ASSERT(mapped_column_ids == mapped_column_ids_p);

// Combined chunk has all the indexed columns and rowids.
// combined_chunk has all the indexed columns according to mapped_column_ids ordering, as well as a rowid column.
DataChunk combined_chunk;
combined_chunk.InitializeEmpty(types);
for (idx_t i = 0; i < index_column_chunk.ColumnCount(); i++) {
combined_chunk.data[i].Reference(index_column_chunk.data[i]);
}
combined_chunk.data.back().Reference(row_ids);
combined_chunk.SetCardinality(index_column_chunk.size());
buffered_data.data->Append(combined_chunk);
buffered_replays.emplace_back(std::move(buffered_data));

auto &buffer = buffered_replays.GetBuffer(replay_type);
if (buffer == nullptr) {
buffer = make_uniq<ColumnDataCollection>(allocator, types);
}
// The starting index of the buffer range is the size of the buffer.
const idx_t start = buffer->Count();
const idx_t end = start + combined_chunk.size();
auto &ranges = buffered_replays.ranges;

if (ranges.empty() || ranges.back().type != replay_type) {
// If there are no buffered ranges, or the replay types don't match, append a new range.
ranges.emplace_back(replay_type, start, end);
buffer->Append(combined_chunk);
return;
}
// Otherwise merge the range with the previous one.
ranges.back().end = end;
buffer->Append(combined_chunk);
}

} // namespace duckdb
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@ BaseScanner::BaseScanner(shared_ptr<CSVBufferManager> buffer_manager_p, shared_p
}
}

void BaseScanner::Print() const {
state_machine->Print();
}

string BaseScanner::RemoveSeparator(const char *value_ptr, const idx_t size, char thousands_separator) {
string result;
result.reserve(size);
Expand Down
Loading