Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions src/duckdb/src/common/error_data.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,17 @@ const ExceptionType &ErrorData::Type() const {
return this->type;
}

void ErrorData::Merge(const ErrorData &other) {
if (!other.HasError()) {
return;
}
if (!HasError()) {
*this = other;
return;
}
final_message += "\n\n" + other.Message();
}

bool ErrorData::operator==(const ErrorData &other) const {
if (initialized != other.initialized) {
return false;
Expand Down
6 changes: 3 additions & 3 deletions src/duckdb/src/function/table/version/pragma_version.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
#ifndef DUCKDB_PATCH_VERSION
#define DUCKDB_PATCH_VERSION "3-dev137"
#define DUCKDB_PATCH_VERSION "3-dev144"
#endif
#ifndef DUCKDB_MINOR_VERSION
#define DUCKDB_MINOR_VERSION 3
Expand All @@ -8,10 +8,10 @@
#define DUCKDB_MAJOR_VERSION 1
#endif
#ifndef DUCKDB_VERSION
#define DUCKDB_VERSION "v1.3.3-dev137"
#define DUCKDB_VERSION "v1.3.3-dev144"
#endif
#ifndef DUCKDB_SOURCE_ID
#define DUCKDB_SOURCE_ID "3ac7e19ac4"
#define DUCKDB_SOURCE_ID "67cbce34e1"
#endif
#include "duckdb/function/table/system_functions.hpp"
#include "duckdb/main/database.hpp"
Expand Down
3 changes: 2 additions & 1 deletion src/duckdb/src/include/duckdb/common/error_data.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ class ErrorData {
//! From std::exception
DUCKDB_API ErrorData(const std::exception &ex); // NOLINT: allow implicit construction from exception
//! From a raw string and exception type
DUCKDB_API explicit ErrorData(ExceptionType type, const string &raw_message);
DUCKDB_API ErrorData(ExceptionType type, const string &raw_message);
//! From a raw string
DUCKDB_API explicit ErrorData(const string &raw_message);

Expand All @@ -38,6 +38,7 @@ class ErrorData {
DUCKDB_API const string &RawMessage() const {
return raw_message;
}
DUCKDB_API void Merge(const ErrorData &other);
DUCKDB_API bool operator==(const ErrorData &other) const;

inline bool HasError() const {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ struct DictFSSTCompressionState : public CompressionState {
DictionaryAppendState TryEncode();

bool CompressInternal(UnifiedVectorFormat &vector_format, const string_t &str, bool is_null,
EncodedInput &encoded_input, const idx_t i, idx_t count);
EncodedInput &encoded_input, const idx_t i, idx_t count, bool fail_on_no_space);
void Compress(Vector &scan_vector, idx_t count);
void FinalizeCompress();
void Flush(bool final);
Expand Down
71 changes: 47 additions & 24 deletions src/duckdb/src/storage/compression/dict_fsst/compression.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -305,12 +305,14 @@ static inline bool RequiresHigherBitWidth(bitpacking_width_t bitwidth, uint32_t
}

template <DictionaryAppendState APPEND_STATE>
static inline bool AddLookup(DictFSSTCompressionState &state, idx_t lookup, const bool recalculate_indices_space) {
static inline bool AddLookup(DictFSSTCompressionState &state, idx_t lookup, const bool recalculate_indices_space,
bool fail_on_no_space) {
D_ASSERT(lookup != DConstants::INVALID_INDEX);

//! This string exists in the dictionary
idx_t new_dictionary_indices_space = state.dictionary_indices_space;
if (APPEND_STATE != DictionaryAppendState::ENCODED_ALL_UNIQUE && recalculate_indices_space) {
auto get_bitpacking_size = APPEND_STATE != DictionaryAppendState::ENCODED_ALL_UNIQUE && recalculate_indices_space;
if (get_bitpacking_size) {
new_dictionary_indices_space =
BitpackingPrimitives::GetRequiredSize(state.tuple_count + 1, state.dictionary_indices_width);
}
Expand All @@ -336,6 +338,12 @@ static inline bool AddLookup(DictFSSTCompressionState &state, idx_t lookup, cons
available_space -= FSST_SYMBOL_TABLE_SIZE;
}
if (required_space > available_space) {
if (fail_on_no_space) {
throw FatalException("AddLookup in DictFSST failed: required: %d, available: %d, indices: %d, bitpacking: "
"%b, dict offset: %d, str length: %d",
required_space, available_space, new_dictionary_indices_space, get_bitpacking_size,
state.dictionary_offset, state.string_lengths_space);
}
return false;
}

Expand All @@ -349,7 +357,7 @@ static inline bool AddLookup(DictFSSTCompressionState &state, idx_t lookup, cons

template <DictionaryAppendState APPEND_STATE>
static inline bool AddToDictionary(DictFSSTCompressionState &state, const string_t &str,
const bool recalculate_indices_space) {
const bool recalculate_indices_space, bool fail_on_no_space) {
uint32_t str_len = UnsafeNumericCast<uint32_t>(str.GetSize());
if (APPEND_STATE == DictionaryAppendState::ENCODED) {
//! We delay encoding of new entries.
Expand Down Expand Up @@ -413,6 +421,12 @@ static inline bool AddToDictionary(DictFSSTCompressionState &state, const string
available_space -= FSST_SYMBOL_TABLE_SIZE;
}
if (required_space > available_space) {
if (fail_on_no_space) {
throw FatalException("AddToDictionary in DictFSST failed: required: %d, available: %d, dict offset + "
"str_len: %d, new str length: %d, new dict indices: %d",
required_space, available_space, state.dictionary_offset + str_len,
new_string_lengths_space, new_dictionary_indices_space);
}
return false;
}

Expand Down Expand Up @@ -461,7 +475,8 @@ static inline bool AddToDictionary(DictFSSTCompressionState &state, const string
}

bool DictFSSTCompressionState::CompressInternal(UnifiedVectorFormat &vector_format, const string_t &str, bool is_null,
EncodedInput &encoded_input, const idx_t i, idx_t count) {
EncodedInput &encoded_input, const idx_t i, idx_t count,
bool fail_on_no_space) {
auto strings = UnifiedVectorFormat::GetData<string_t>(vector_format);
idx_t lookup = DConstants::INVALID_INDEX;

Expand All @@ -484,17 +499,21 @@ bool DictFSSTCompressionState::CompressInternal(UnifiedVectorFormat &vector_form
case DictionaryAppendState::REGULAR: {
if (append_state == DictionaryAppendState::REGULAR) {
if (lookup != DConstants::INVALID_INDEX) {
return AddLookup<DictionaryAppendState::REGULAR>(*this, lookup, recalculate_indices_space);
return AddLookup<DictionaryAppendState::REGULAR>(*this, lookup, recalculate_indices_space,
fail_on_no_space);
} else {
//! This string does not exist in the dictionary, add it
return AddToDictionary<DictionaryAppendState::REGULAR>(*this, str, recalculate_indices_space);
return AddToDictionary<DictionaryAppendState::REGULAR>(*this, str, recalculate_indices_space,
fail_on_no_space);
}
} else {
if (lookup != DConstants::INVALID_INDEX) {
return AddLookup<DictionaryAppendState::NOT_ENCODED>(*this, lookup, recalculate_indices_space);
return AddLookup<DictionaryAppendState::NOT_ENCODED>(*this, lookup, recalculate_indices_space,
fail_on_no_space);
} else {
//! This string does not exist in the dictionary, add it
return AddToDictionary<DictionaryAppendState::NOT_ENCODED>(*this, str, recalculate_indices_space);
return AddToDictionary<DictionaryAppendState::NOT_ENCODED>(*this, str, recalculate_indices_space,
fail_on_no_space);
}
}
}
Expand All @@ -505,10 +524,12 @@ bool DictFSSTCompressionState::CompressInternal(UnifiedVectorFormat &vector_form

bool fits;
if (lookup != DConstants::INVALID_INDEX) {
fits = AddLookup<DictionaryAppendState::ENCODED>(*this, lookup, recalculate_indices_space);
fits =
AddLookup<DictionaryAppendState::ENCODED>(*this, lookup, recalculate_indices_space, fail_on_no_space);
} else {
//! Not in the dictionary, add it
fits = AddToDictionary<DictionaryAppendState::ENCODED>(*this, str, recalculate_indices_space);
fits = AddToDictionary<DictionaryAppendState::ENCODED>(*this, str, recalculate_indices_space,
fail_on_no_space);
}
if (fits) {
return fits;
Expand All @@ -523,10 +544,12 @@ bool DictFSSTCompressionState::CompressInternal(UnifiedVectorFormat &vector_form
// we flush these and try again to see if the size went down enough
FlushEncodingBuffer();
if (lookup != DConstants::INVALID_INDEX) {
return AddLookup<DictionaryAppendState::ENCODED>(*this, lookup, recalculate_indices_space);
return AddLookup<DictionaryAppendState::ENCODED>(*this, lookup, recalculate_indices_space,
fail_on_no_space);
} else {
//! Not in the dictionary, add it
return AddToDictionary<DictionaryAppendState::ENCODED>(*this, str, recalculate_indices_space);
return AddToDictionary<DictionaryAppendState::ENCODED>(*this, str, recalculate_indices_space,
fail_on_no_space);
}
}
case DictionaryAppendState::ENCODED_ALL_UNIQUE: {
Expand All @@ -535,8 +558,7 @@ bool DictFSSTCompressionState::CompressInternal(UnifiedVectorFormat &vector_form

#ifdef DEBUG
auto temp_decoder = alloca(sizeof(duckdb_fsst_decoder_t));
duckdb_fsst_import((duckdb_fsst_decoder_t *)temp_decoder, fsst_serialized_symbol_table.get());

duckdb_fsst_import(reinterpret_cast<duckdb_fsst_decoder_t *>(temp_decoder), fsst_serialized_symbol_table.get());
vector<unsigned char> decompress_buffer;
#endif

Expand Down Expand Up @@ -589,12 +611,12 @@ bool DictFSSTCompressionState::CompressInternal(UnifiedVectorFormat &vector_form
//! Verify that we can decompress the string
auto &uncompressed_str = strings[encoded_input.offset + j];
decompress_buffer.resize(uncompressed_str.GetSize() + 1 + 100);
auto decoded_std_string =
FSSTPrimitives::DecompressValue((void *)temp_decoder, (const char *)compressed_ptrs[j],
(idx_t)compressed_sizes[j], decompress_buffer);
auto decoded_std_string = FSSTPrimitives::DecompressValue(
(void *)temp_decoder, reinterpret_cast<const char *>(compressed_ptrs[j]),
(idx_t)compressed_sizes[j], decompress_buffer);

D_ASSERT(decoded_std_string.size() == uncompressed_str.GetSize());
string_t decompressed_string((const char *)decompress_buffer.data(),
string_t decompressed_string(reinterpret_cast<const char *>(decompress_buffer.data()),
UnsafeNumericCast<uint32_t>(uncompressed_str.GetSize()));
D_ASSERT(decompressed_string == uncompressed_str);
#endif
Expand All @@ -615,14 +637,15 @@ bool DictFSSTCompressionState::CompressInternal(UnifiedVectorFormat &vector_form
compressed_string.GetSize(), decompress_buffer);

D_ASSERT(decoded_std_string.size() == uncompressed_string.GetSize());
string_t decompressed_string((const char *)decompress_buffer.data(),
string_t decompressed_string(reinterpret_cast<const char *>(decompress_buffer.data()),
UnsafeNumericCast<uint32_t>(uncompressed_string.GetSize()));
D_ASSERT(decompressed_string == uncompressed_string);
}

#endif
auto &string = encoded_input.data[i - encoded_input.offset];
return AddToDictionary<DictionaryAppendState::ENCODED_ALL_UNIQUE>(*this, string, recalculate_indices_space);
return AddToDictionary<DictionaryAppendState::ENCODED_ALL_UNIQUE>(*this, string, recalculate_indices_space,
fail_on_no_space);
}
};
throw InternalException("Unreachable");
Expand Down Expand Up @@ -820,22 +843,22 @@ void DictFSSTCompressionState::Compress(Vector &scan_vector, idx_t count) {
auto &str = strings[idx];
auto is_null = !vector_format.validity.RowIsValid(idx);
do {
if (CompressInternal(vector_format, str, is_null, encoded_input, i, count)) {
if (CompressInternal(vector_format, str, is_null, encoded_input, i, count, false)) {
break;
}

if (append_state == DictionaryAppendState::REGULAR) {
append_state = TryEncode();
D_ASSERT(append_state != DictionaryAppendState::REGULAR);
if (CompressInternal(vector_format, str, is_null, encoded_input, i, count)) {
if (CompressInternal(vector_format, str, is_null, encoded_input, i, count, false)) {
break;
}
}
Flush(false);
encoded_input.data.clear();
encoded_input.offset = 0;
if (!CompressInternal(vector_format, str, is_null, encoded_input, i, count)) {
throw FatalException("Compressing directly after Flush doesn't fit");
if (!CompressInternal(vector_format, str, is_null, encoded_input, i, count, true)) {
throw FatalException("Compressing directly after Flush doesn't fit - expected to throw earlier!");
}
} while (false);
if (!is_null) {
Expand Down
8 changes: 7 additions & 1 deletion src/duckdb/src/transaction/duck_transaction_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -335,8 +335,14 @@ ErrorData DuckTransactionManager::CommitTransaction(ClientContext &context, Tran
options.action = CheckpointAction::ALWAYS_CHECKPOINT;
options.type = checkpoint_decision.type;
auto &storage_manager = db.GetStorageManager();
storage_manager.CreateCheckpoint(context, options);

try {
storage_manager.CreateCheckpoint(context, options);
} catch (std::exception &ex) {
error.Merge(ErrorData(ex));
}
}

return error;
}

Expand Down
32 changes: 23 additions & 9 deletions src/duckdb/src/transaction/meta_transaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -115,34 +115,48 @@ ErrorData MetaTransaction::Commit() {
if (entry == transactions.end()) {
throw InternalException("Could not find transaction corresponding to database in MetaTransaction");
}

#ifdef DEBUG
auto already_committed = committed_tx.insert(db).second == false;
if (already_committed) {
throw InternalException("All databases inside all_transactions should be unique, invariant broken!");
}
#endif

auto &transaction_manager = db.GetTransactionManager();
auto &transaction = entry->second.get();
if (!error.HasError()) {
// commit
error = transaction_manager.CommitTransaction(context, transaction);
} else {
// we have encountered an error previously - roll back subsequent entries
transaction_manager.RollbackTransaction(transaction);
try {
if (!error.HasError()) {
// Commit the transaction.
error = transaction_manager.CommitTransaction(context, transaction);
} else {
// Rollback due to previous error.
transaction_manager.RollbackTransaction(transaction);
}
} catch (std::exception &ex) {
error.Merge(ErrorData(ex));
}
}
return error;
}

void MetaTransaction::Rollback() {
// rollback transactions in reverse order
// Rollback all transactions in reverse order.
ErrorData error;
for (idx_t i = all_transactions.size(); i > 0; i--) {
auto &db = all_transactions[i - 1].get();
auto &transaction_manager = db.GetTransactionManager();
auto entry = transactions.find(db);
D_ASSERT(entry != transactions.end());
auto &transaction = entry->second.get();
transaction_manager.RollbackTransaction(transaction);
try {
auto &transaction = entry->second.get();
transaction_manager.RollbackTransaction(transaction);
} catch (std::exception &ex) {
error.Merge(ErrorData(ex));
}
}
if (error.HasError()) {
error.Throw();
}
}

Expand Down
Loading