Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions src/duckdb/src/function/table/version/pragma_version.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
#ifndef DUCKDB_PATCH_VERSION
#define DUCKDB_PATCH_VERSION "5-dev13"
#define DUCKDB_PATCH_VERSION "5-dev17"
#endif
#ifndef DUCKDB_MINOR_VERSION
#define DUCKDB_MINOR_VERSION 4
Expand All @@ -8,10 +8,10 @@
#define DUCKDB_MAJOR_VERSION 1
#endif
#ifndef DUCKDB_VERSION
#define DUCKDB_VERSION "v1.4.5-dev13"
#define DUCKDB_VERSION "v1.4.5-dev17"
#endif
#ifndef DUCKDB_SOURCE_ID
#define DUCKDB_SOURCE_ID "32a088ad40"
#define DUCKDB_SOURCE_ID "7fba13fbe1"
#endif
#include "duckdb/function/table/system_functions.hpp"
#include "duckdb/main/database.hpp"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ class DuckTransactionManager : public TransactionManager {
//! Whether or not we can checkpoint
CheckpointDecision CanCheckpoint(DuckTransaction &transaction, unique_ptr<StorageLockKey> &checkpoint_lock,
const UndoBufferProperties &properties);
void CleanupTransactions();

private:
//! The current start timestamp used by transactions
Expand Down
2 changes: 1 addition & 1 deletion src/duckdb/src/storage/local_storage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -595,7 +595,6 @@ void LocalStorage::Flush(DataTable &table, LocalTableStorage &storage, optional_

TableAppendState append_state;
table.AppendLock(append_state);
transaction.PushAppend(table, NumericCast<idx_t>(append_state.row_start), append_count);
if ((append_state.row_start == 0 || storage.row_groups->GetTotalRows() >= row_group_size) &&
storage.deleted_rows == 0) {
// table is currently empty OR we are bulk appending: move over the storage directly
Expand All @@ -615,6 +614,7 @@ void LocalStorage::Flush(DataTable &table, LocalTableStorage &storage, optional_
// append to the indexes and append to the base table
storage.AppendToIndexes(transaction, append_state, true);
}
transaction.PushAppend(table, NumericCast<idx_t>(append_state.row_start), append_count);

#ifdef DEBUG
// Verify that our index memory is stable.
Expand Down
7 changes: 2 additions & 5 deletions src/duckdb/src/storage/table/standard_column_data.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -156,11 +156,8 @@ void StandardColumnData::Update(TransactionData transaction, DataTable &data_tab
Vector &update_vector, row_t *row_ids, idx_t update_count) {
ColumnScanState standard_state, validity_state;
Vector base_vector(type);
auto standard_fetch = FetchUpdateData(standard_state, row_ids, base_vector);
auto validity_fetch = validity.FetchUpdateData(validity_state, row_ids, base_vector);
if (standard_fetch != validity_fetch) {
throw InternalException("Unaligned fetch in validity and main column data for update");
}
FetchUpdateData(standard_state, row_ids, base_vector);
validity.FetchUpdateData(validity_state, row_ids, base_vector);

UpdateInternal(transaction, data_table, column_index, update_vector, row_ids, update_count, base_vector);
validity.UpdateInternal(transaction, data_table, column_index, update_vector, row_ids, update_count, base_vector);
Expand Down
49 changes: 21 additions & 28 deletions src/duckdb/src/transaction/duck_transaction_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,25 @@ transaction_t DuckTransactionManager::GetCommitTimestamp() {
return commit_ts;
}

void DuckTransactionManager::CleanupTransactions() {
lock_guard<mutex> c_lock(cleanup_lock);
while (true) {
unique_ptr<DuckCleanupInfo> top_cleanup_info;
{
lock_guard<mutex> q_lock(cleanup_queue_lock);
if (cleanup_queue.empty()) {
// all transactions have been cleaned up - done
return;
}
top_cleanup_info = std::move(cleanup_queue.front());
cleanup_queue.pop();
}
if (top_cleanup_info) {
top_cleanup_info->Cleanup();
}
}
}

ErrorData DuckTransactionManager::CommitTransaction(ClientContext &context, Transaction &transaction_p) {
auto &transaction = transaction_p.Cast<DuckTransaction>();
unique_lock<mutex> t_lock(transaction_lock);
Expand Down Expand Up @@ -327,20 +346,7 @@ ErrorData DuckTransactionManager::CommitTransaction(ClientContext &context, Tran
// as they (1) have been removed, or (2) exited old_transactions.
t_lock.unlock();

{
lock_guard<mutex> c_lock(cleanup_lock);
unique_ptr<DuckCleanupInfo> top_cleanup_info;
{
lock_guard<mutex> q_lock(cleanup_queue_lock);
if (!cleanup_queue.empty()) {
top_cleanup_info = std::move(cleanup_queue.front());
cleanup_queue.pop();
}
}
if (top_cleanup_info) {
top_cleanup_info->Cleanup();
}
}
CleanupTransactions();

// now perform a checkpoint if (1) we are able to checkpoint, and (2) the WAL has reached sufficient size to
// checkpoint
Expand Down Expand Up @@ -379,20 +385,7 @@ void DuckTransactionManager::RollbackTransaction(Transaction &transaction_p) {
}
}

{
lock_guard<mutex> c_lock(cleanup_lock);
unique_ptr<DuckCleanupInfo> top_cleanup_info;
{
lock_guard<mutex> q_lock(cleanup_queue_lock);
if (!cleanup_queue.empty()) {
top_cleanup_info = std::move(cleanup_queue.front());
cleanup_queue.pop();
}
}
if (top_cleanup_info) {
top_cleanup_info->Cleanup();
}
}
CleanupTransactions();

if (error.HasError()) {
throw FatalException("Failed to rollback transaction. Cannot continue operation.\nError: %s", error.Message());
Expand Down