diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index ee7c5be..9a03808 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -140,3 +140,66 @@ jobs: with: name: cloudsql-bin-${{ matrix.compiler }} path: build/cloudSQL + + performance-benchmarks: + needs: style-check + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + + - name: Install dependencies + run: | + sudo apt-get update + sudo apt-get install -y cmake clang ninja-build ccache python3 + + - name: Configure CMake (Release) + run: | + mkdir build + cd build + cmake .. -G Ninja \ + -DCMAKE_BUILD_TYPE=Release \ + -DBUILD_BENCHMARKS=ON \ + -DBUILD_TESTS=OFF + + - name: Build Benchmarks + run: | + cd build + ninja storage_bench execution_bench network_bench + + - name: Restore Performance Baseline + id: restore-baseline + uses: actions/cache/restore@v4 + with: + path: build/baseline.json + key: perf-baseline-${{ runner.os }}-main + + - name: Run Benchmarks + run: | + cd build + ./storage_bench --benchmark_format=json > storage.json + ./execution_bench --benchmark_format=json > execution.json + ./network_bench --benchmark_format=json > network.json + + # Merge results into one current.json + python3 -c "import json; s=json.load(open('storage.json')); e=json.load(open('execution.json')); n=json.load(open('network.json')); s['benchmarks'].extend(e['benchmarks']); s['benchmarks'].extend(n['benchmarks']); json.dump(s, open('current.json', 'w'))" + + - name: Check for Performance Regressions + run: | + if [ -f build/baseline.json ]; then + python3 scripts/check_perf_regression.py build/current.json build/baseline.json 0.20 + else + echo "No baseline found to compare against." + fi + + - name: Save New Baseline + if: github.ref == 'refs/heads/main' + uses: actions/cache/save@v4 + with: + path: build/current.json + key: perf-baseline-${{ runner.os }}-main-${{ github.sha }} + + - name: Upload Current Results + uses: actions/upload-artifact@v4 + with: + name: performance-results + path: build/current.json diff --git a/benchmarks/execution_bench.cpp b/benchmarks/execution_bench.cpp index 9685fd8..4683328 100644 --- a/benchmarks/execution_bench.cpp +++ b/benchmarks/execution_bench.cpp @@ -29,32 +29,30 @@ static void SetupBenchTable(HeapTable& table, int num_rows) { static void BM_ExecutionSeqScan(benchmark::State& state) { std::string test_dir = "./bench_exec_scan_" + std::to_string(state.range(0)); + std::filesystem::remove_all(test_dir); std::filesystem::create_directories(test_dir); - StorageManager disk_manager(test_dir); - BufferPoolManager bpm(2000, disk_manager); - - Schema schema; - schema.add_column("id", common::ValueType::TYPE_INT64); - schema.add_column("data", common::ValueType::TYPE_TEXT); - for (auto _ : state) { - state.PauseTiming(); - auto table = std::make_unique("scan_table", bpm, schema); + { + StorageManager disk_manager(test_dir); + BufferPoolManager bpm(2000, disk_manager); + + Schema schema; + schema.add_column("id", common::ValueType::TYPE_INT64); + schema.add_column("data", common::ValueType::TYPE_TEXT); + + auto table = std::make_shared("scan_table", bpm, schema); table->create(); SetupBenchTable(*table, state.range(0)); - state.ResumeTiming(); - auto scan_op = std::make_unique(std::move(table)); - scan_op->init(); - Tuple tuple; - while (scan_op->next(tuple)) { - benchmark::DoNotOptimize(tuple); + for (auto _ : state) { + auto scan_op = std::make_unique(table); + scan_op->init(); + scan_op->open(); + Tuple tuple; + while (scan_op->next(tuple)) { + benchmark::DoNotOptimize(tuple); + } } - - state.PauseTiming(); - std::filesystem::remove_all(test_dir); - std::filesystem::create_directories(test_dir); - state.ResumeTiming(); } state.SetItemsProcessed(state.iterations() * state.range(0)); @@ -64,45 +62,43 @@ BENCHMARK(BM_ExecutionSeqScan)->Arg(1000)->Arg(10000); static void BM_ExecutionHashJoin(benchmark::State& state) { std::string test_dir = "./bench_exec_join_" + std::to_string(state.range(0)); + std::filesystem::remove_all(test_dir); std::filesystem::create_directories(test_dir); - StorageManager disk_manager(test_dir); - BufferPoolManager bpm(4000, disk_manager); - - Schema schema; - schema.add_column("id", common::ValueType::TYPE_INT64); - schema.add_column("data", common::ValueType::TYPE_TEXT); - for (auto _ : state) { - state.PauseTiming(); - auto left_table = std::make_unique("left_table", bpm, schema); + { + StorageManager disk_manager(test_dir); + BufferPoolManager bpm(4000, disk_manager); + + Schema schema; + schema.add_column("id", common::ValueType::TYPE_INT64); + schema.add_column("data", common::ValueType::TYPE_TEXT); + + auto left_table = std::make_shared("left_table", bpm, schema); left_table->create(); SetupBenchTable(*left_table, state.range(0)); - auto right_table = std::make_unique("right_table", bpm, schema); + auto right_table = std::make_shared("right_table", bpm, schema); right_table->create(); SetupBenchTable(*right_table, state.range(0)); - state.ResumeTiming(); - auto left_scan = std::make_unique(std::move(left_table)); - auto right_scan = std::make_unique(std::move(right_table)); - - // Join on "id" - auto left_key = std::make_unique("id"); - auto right_key = std::make_unique("id"); - - auto join_op = std::make_unique( - std::move(left_scan), std::move(right_scan), std::move(left_key), std::move(right_key)); - - join_op->init(); - Tuple tuple; - while (join_op->next(tuple)) { - benchmark::DoNotOptimize(tuple); + for (auto _ : state) { + auto left_scan = std::make_unique(left_table); + auto right_scan = std::make_unique(right_table); + + // Join on "id" + auto left_key = std::make_unique("id"); + auto right_key = std::make_unique("id"); + + auto join_op = std::make_unique( + std::move(left_scan), std::move(right_scan), std::move(left_key), std::move(right_key)); + + join_op->init(); + join_op->open(); + Tuple tuple; + while (join_op->next(tuple)) { + benchmark::DoNotOptimize(tuple); + } } - - state.PauseTiming(); - std::filesystem::remove_all(test_dir); - std::filesystem::create_directories(test_dir); - state.ResumeTiming(); } state.SetItemsProcessed(state.iterations() * state.range(0)); diff --git a/include/common/value.hpp b/include/common/value.hpp index 1bed97c..58dfad9 100644 --- a/include/common/value.hpp +++ b/include/common/value.hpp @@ -85,6 +85,8 @@ class Value { [[nodiscard]] ValueType type() const { return type_; } [[nodiscard]] bool is_null() const { return type_ == ValueType::TYPE_NULL; } [[nodiscard]] bool is_numeric() const; + [[nodiscard]] bool is_integer() const; + [[nodiscard]] bool is_float() const; [[nodiscard]] bool as_bool() const; [[nodiscard]] int8_t as_int8() const; @@ -184,6 +186,15 @@ inline bool Value::is_numeric() const { type_ == ValueType::TYPE_DECIMAL; } +inline bool Value::is_integer() const { + return type_ == ValueType::TYPE_INT8 || type_ == ValueType::TYPE_INT16 || + type_ == ValueType::TYPE_INT32 || type_ == ValueType::TYPE_INT64; +} + +inline bool Value::is_float() const { + return type_ == ValueType::TYPE_FLOAT32 || type_ == ValueType::TYPE_FLOAT64; +} + // Accessors inline bool Value::as_bool() const { if (type_ != ValueType::TYPE_BOOL) { diff --git a/include/executor/operator.hpp b/include/executor/operator.hpp index 252c7b9..d4607b7 100644 --- a/include/executor/operator.hpp +++ b/include/executor/operator.hpp @@ -105,12 +105,13 @@ class Operator { class SeqScanOperator : public Operator { private: std::string table_name_; - std::unique_ptr table_; + std::shared_ptr table_; std::unique_ptr iterator_; + Schema schema_; public: - explicit SeqScanOperator(std::unique_ptr table, Transaction* txn = nullptr, + explicit SeqScanOperator(std::shared_ptr table, Transaction* txn = nullptr, LockManager* lock_manager = nullptr); bool init() override; @@ -153,7 +154,7 @@ class IndexScanOperator : public Operator { private: std::string table_name_; std::string index_name_; - std::unique_ptr table_; + std::shared_ptr table_; std::unique_ptr index_; common::Value search_key_; std::vector matching_ids_; @@ -161,7 +162,7 @@ class IndexScanOperator : public Operator { Schema schema_; public: - IndexScanOperator(std::unique_ptr table, + IndexScanOperator(std::shared_ptr table, std::unique_ptr index, common::Value search_key, Transaction* txn = nullptr, LockManager* lock_manager = nullptr); diff --git a/include/storage/heap_table.hpp b/include/storage/heap_table.hpp index 131a666..5890885 100644 --- a/include/storage/heap_table.hpp +++ b/include/storage/heap_table.hpp @@ -122,6 +122,7 @@ class HeapTable { std::string filename_; BufferPoolManager& bpm_; executor::Schema schema_; + uint32_t last_page_id_ = 0; public: /** diff --git a/include/storage/lru_replacer.hpp b/include/storage/lru_replacer.hpp index 4d7eb1f..2577295 100644 --- a/include/storage/lru_replacer.hpp +++ b/include/storage/lru_replacer.hpp @@ -6,9 +6,7 @@ #ifndef CLOUDSQL_STORAGE_LRU_REPLACER_HPP #define CLOUDSQL_STORAGE_LRU_REPLACER_HPP -#include #include -#include #include namespace cloudsql::storage { @@ -17,8 +15,9 @@ namespace cloudsql::storage { * @class LRUReplacer * @brief Tracks page usage and determines which page to evict * - * Implements a thread-safe LRU policy. Pages that are pinned are - * removed from the replacer. When unpinned, they are added back. + * Implements a CLOCK (Second Chance) replacement policy. + * This implementation is zero-allocation during hot-path (pin/unpin) + * by using fixed-size bitsets. */ class LRUReplacer { public: @@ -64,8 +63,12 @@ class LRUReplacer { private: size_t capacity_; mutable std::mutex latch_; - std::list lru_list_; - std::unordered_map::iterator> lru_map_; + + // CLOCK State + std::vector in_replacer_; // true if frame is a candidate for eviction + std::vector referenced_; // "Second chance" bit + size_t clock_hand_ = 0; + size_t current_size_ = 0; }; } // namespace cloudsql::storage diff --git a/scripts/check_perf_regression.py b/scripts/check_perf_regression.py new file mode 100755 index 0000000..011be4f --- /dev/null +++ b/scripts/check_perf_regression.py @@ -0,0 +1,72 @@ +#!/usr/bin/env python3 +import json +import sys +import os + +def check_regression(current_file, baseline_file, threshold=0.2): + # Load current results - failure here is a fatal error + try: + with open(current_file) as f: + current = json.load(f) + except Exception as e: + print(f"Error loading current performance results from {current_file}: {e}") + return False + + # Load baseline results - missing file is handled gracefully + try: + with open(baseline_file) as f: + baseline = json.load(f) + except FileNotFoundError: + print(f"No baseline found at {baseline_file}. Skipping comparison.") + return True + except Exception as e: + print(f"Error loading baseline performance results from {baseline_file}: {e}") + return False + + regressions = [] + + # Create map of baseline metrics + base_map = {b['name']: b.get('real_time') for b in baseline['benchmarks']} + + print(f"{'Benchmark':<40} | {'Old (ns)':<12} | {'New (ns)':<12} | {'Change':<10}") + print("-" * 85) + + for b in current['benchmarks']: + name = b['name'] + if name in base_map: + old_time = base_map[name] + new_time = b.get('real_time') + + if old_time is None or new_time is None: + print(f"{name:<40} | {'N/A':<12} | {'N/A':<12} | {'N/A':>9}") + continue + + # Guard against division by zero + if old_time <= 0: + print(f"{name:<40} | {old_time:<12.2f} | {new_time:<12.2f} | {'NEW/ZERO':>9}") + continue + + # Increase in time means decrease in performance + change = (new_time - old_time) / old_time + print(f"{name:<40} | {old_time:<12.2f} | {new_time:<12.2f} | {change:>+9.1%}") + + if change > threshold: + regressions.append(f"{name} regressed by {change:.1%}") + + if regressions: + print("\n!!! PERFORMANCE REGRESSION DETECTED !!!") + for r in regressions: + print(f" - {r}") + return False + + print("\nPerformance is within acceptable limits.") + return True + +if __name__ == "__main__": + if len(sys.argv) < 3: + print("Usage: check_perf_regression.py [threshold]") + sys.exit(1) + + thresh = float(sys.argv[3]) if len(sys.argv) > 3 else 0.2 + if not check_regression(sys.argv[1], sys.argv[2], thresh): + sys.exit(1) diff --git a/src/executor/operator.cpp b/src/executor/operator.cpp index 32047e8..1708e84 100644 --- a/src/executor/operator.cpp +++ b/src/executor/operator.cpp @@ -29,8 +29,7 @@ namespace cloudsql::executor { /* --- SeqScanOperator --- */ - -SeqScanOperator::SeqScanOperator(std::unique_ptr table, Transaction* txn, +SeqScanOperator::SeqScanOperator(std::shared_ptr table, Transaction* txn, LockManager* lock_manager) : Operator(OperatorType::SeqScan, txn, lock_manager), table_name_(table->table_name()), @@ -131,7 +130,7 @@ Schema& BufferScanOperator::output_schema() { /* --- IndexScanOperator --- */ -IndexScanOperator::IndexScanOperator(std::unique_ptr table, +IndexScanOperator::IndexScanOperator(std::shared_ptr table, std::unique_ptr index, common::Value search_key, Transaction* txn, LockManager* lock_manager) diff --git a/src/executor/query_executor.cpp b/src/executor/query_executor.cpp index c906ecc..6865729 100644 --- a/src/executor/query_executor.cpp +++ b/src/executor/query_executor.cpp @@ -787,7 +787,7 @@ std::unique_ptr QueryExecutor::build_plan(const parser::SelectStatemen col_name) { common::ValueType ktype = base_table_meta->columns[pos].type; current_root = std::make_unique( - std::make_unique(base_table_name, bpm_, + std::make_shared(base_table_name, bpm_, base_schema), std::make_unique(idx_info.name, bpm_, ktype), diff --git a/src/network/rpc_client.cpp b/src/network/rpc_client.cpp index 5976e78..aff91dd 100644 --- a/src/network/rpc_client.cpp +++ b/src/network/rpc_client.cpp @@ -35,8 +35,9 @@ bool RpcClient::connect() { fd_ = socket(AF_INET, SOCK_STREAM, 0); if (fd_ < 0) { - std::cerr << "--- [RpcClient] socket creation FAILED: " << strerror(errno) << " ---" - << std::endl; + if (false) + std::cerr << "--- [RpcClient] socket creation FAILED: " << strerror(errno) << " ---" + << std::endl; return false; } @@ -47,14 +48,17 @@ bool RpcClient::connect() { static_cast(inet_pton(AF_INET, address_.c_str(), &addr.sin_addr)); if (::connect(fd_, reinterpret_cast(&addr), sizeof(addr)) < 0) { - std::cerr << "--- [RpcClient] connect FAILED to " << address_ << ":" << port_ << " : " - << strerror(errno) << " ---" << std::endl; + if (false) + std::cerr << "--- [RpcClient] connect FAILED to " << address_ << ":" << port_ << " : " + << strerror(errno) << " ---" << std::endl; static_cast(close(fd_)); fd_ = -1; return false; } - std::cerr << "--- [RpcClient] connected to " << address_ << ":" << port_ << " ---" << std::endl; + if (false) + std::cerr << "--- [RpcClient] connected to " << address_ << ":" << port_ << " ---" + << std::endl; return true; } @@ -70,12 +74,14 @@ bool RpcClient::call(RpcType type, const std::vector& payload, std::vector& response_out, uint16_t group_id) { const std::scoped_lock lock(mutex_); - std::cerr << "--- [RpcClient] call type=" << (int)type << " to " << address_ << ":" << port_ - << " ---" << std::endl; + if (false) + std::cerr << "--- [RpcClient] call type=" << (int)type << " to " << address_ << ":" << port_ + << " ---" << std::endl; if (fd_ < 0 && !connect()) { - std::cerr << "--- [RpcClient] connect failed to " << address_ << ":" << port_ << " ---" - << std::endl; + if (false) + std::cerr << "--- [RpcClient] connect failed to " << address_ << ":" << port_ << " ---" + << std::endl; return false; } @@ -89,22 +95,22 @@ bool RpcClient::call(RpcType type, const std::vector& payload, header.encode(header_buf); if (send(fd_, header_buf, RpcHeader::HEADER_SIZE, 0) <= 0) { - std::cerr << "--- [RpcClient] header send failed ---" << std::endl; + if (false) std::cerr << "--- [RpcClient] header send failed ---" << std::endl; return false; } if (!payload.empty()) { if (send(fd_, payload.data(), payload.size(), 0) <= 0) { - std::cerr << "--- [RpcClient] payload send failed ---" << std::endl; + if (false) std::cerr << "--- [RpcClient] payload send failed ---" << std::endl; return false; } } // Reception Phase: Must occur under the same lock to ensure atomicity - std::cerr << "--- [RpcClient] waiting for response ---" << std::endl; + if (false) std::cerr << "--- [RpcClient] waiting for response ---" << std::endl; std::array resp_buf{}; if (recv(fd_, resp_buf.data(), RpcHeader::HEADER_SIZE, MSG_WAITALL) <= 0) { - std::cerr << "--- [RpcClient] recv header failed ---" << std::endl; + if (false) std::cerr << "--- [RpcClient] recv header failed ---" << std::endl; return false; } @@ -113,12 +119,12 @@ bool RpcClient::call(RpcType type, const std::vector& payload, if (resp_header.payload_len > 0) { if (recv(fd_, response_out.data(), resp_header.payload_len, MSG_WAITALL) <= 0) { - std::cerr << "--- [RpcClient] recv payload failed ---" << std::endl; + if (false) std::cerr << "--- [RpcClient] recv payload failed ---" << std::endl; return false; } } - std::cerr << "--- [RpcClient] call success ---" << std::endl; + if (false) std::cerr << "--- [RpcClient] call success ---" << std::endl; return true; } diff --git a/src/network/rpc_server.cpp b/src/network/rpc_server.cpp index aad9f65..a0a5b73 100644 --- a/src/network/rpc_server.cpp +++ b/src/network/rpc_server.cpp @@ -23,10 +23,10 @@ namespace cloudsql::network { bool RpcServer::start() { - std::cerr << "--- [RpcServer] starting on port " << port_ << " ---" << std::endl; + if (false) std::cerr << "--- [RpcServer] starting on port " << port_ << " ---" << std::endl; listen_fd_ = socket(AF_INET, SOCK_STREAM, 0); if (listen_fd_ < 0) { - std::cerr << "--- [RpcServer] socket creation FAILED ---" << std::endl; + if (false) std::cerr << "--- [RpcServer] socket creation FAILED ---" << std::endl; return false; } @@ -39,14 +39,16 @@ bool RpcServer::start() { addr.sin_port = htons(port_); if (bind(listen_fd_, reinterpret_cast(&addr), sizeof(addr)) < 0) { - std::cerr << "--- [RpcServer] bind FAILED on port " << port_ << " ---" << std::endl; + if (false) + std::cerr << "--- [RpcServer] bind FAILED on port " << port_ << " ---" << std::endl; static_cast(close(listen_fd_)); listen_fd_ = -1; return false; } if (listen(listen_fd_, 10) < 0) { - std::cerr << "--- [RpcServer] listen FAILED on port " << port_ << " ---" << std::endl; + if (false) + std::cerr << "--- [RpcServer] listen FAILED on port " << port_ << " ---" << std::endl; static_cast(close(listen_fd_)); listen_fd_ = -1; return false; @@ -54,7 +56,7 @@ bool RpcServer::start() { running_ = true; accept_thread_ = std::thread(&RpcServer::accept_loop, this); - std::cerr << "--- [RpcServer] started and listening ---" << std::endl; + if (false) std::cerr << "--- [RpcServer] started and listening ---" << std::endl; return true; } @@ -118,13 +120,14 @@ void RpcServer::handle_client(int client_fd) { } const RpcHeader header = RpcHeader::decode(header_buf.data()); - std::cerr << "--- [RpcServer] received request type=" << (int)header.type - << " payload=" << header.payload_len << " ---" << std::endl; + if (false) + std::cerr << "--- [RpcServer] received request type=" << (int)header.type + << " payload=" << header.payload_len << " ---" << std::endl; std::vector payload(header.payload_len); if (header.payload_len > 0) { if (recv(client_fd, payload.data(), header.payload_len, MSG_WAITALL) <= 0) { - std::cerr << "--- [RpcServer] payload recv failed ---" << std::endl; + if (false) std::cerr << "--- [RpcServer] payload recv failed ---" << std::endl; break; } } @@ -138,12 +141,13 @@ void RpcServer::handle_client(int client_fd) { } if (handler) { - std::cerr << "--- [RpcServer] dispatching to handler ---" << std::endl; + if (false) std::cerr << "--- [RpcServer] dispatching to handler ---" << std::endl; handler(header, payload, client_fd); - std::cerr << "--- [RpcServer] handler finished ---" << std::endl; + if (false) std::cerr << "--- [RpcServer] handler finished ---" << std::endl; } else { - std::cerr << "--- [RpcServer] NO HANDLER FOUND for type " << (int)header.type << " ---" - << std::endl; + if (false) + std::cerr << "--- [RpcServer] NO HANDLER FOUND for type " << (int)header.type + << " ---" << std::endl; } } static_cast(close(client_fd)); diff --git a/src/storage/heap_table.cpp b/src/storage/heap_table.cpp index 73f6ebb..b350032 100644 --- a/src/storage/heap_table.cpp +++ b/src/storage/heap_table.cpp @@ -35,7 +35,8 @@ HeapTable::HeapTable(std::string table_name, BufferPoolManager& bpm, executor::S : table_name_(std::move(table_name)), filename_(table_name_ + ".heap"), bpm_(bpm), - schema_(std::move(schema)) {} + schema_(std::move(schema)), + last_page_id_(0) {} /* --- Iterator Implementation --- */ @@ -61,104 +62,190 @@ bool HeapTable::Iterator::next_meta(TupleMeta& out_meta) { } while (true) { - if (table_.get_meta(next_id_, out_meta)) { - /* Record successfully retrieved */ - last_id_ = next_id_; + Page* page = table_.bpm_.fetch_page(table_.filename_, next_id_.page_num); + if (!page) { + eof_ = true; + return false; + } - /* Prepare for next call: advance slot index */ - next_id_.slot_num++; - return true; + auto* buffer = page->get_data(); + PageHeader header{}; + std::memcpy(&header, buffer, sizeof(PageHeader)); + + if (header.free_space_offset == 0) { + table_.bpm_.unpin_page(table_.filename_, next_id_.page_num, false); + eof_ = true; + return false; } - /* Check if the current page has more slots to explore */ - std::array buf{}; - if (table_.read_page(next_id_.page_num, buf.data())) { - PageHeader header{}; - std::memcpy(&header, buf.data(), sizeof(PageHeader)); - if (next_id_.slot_num < header.num_slots) { - /* Current slot is empty/deleted; skip to the next */ + /* Scan slots in the current page starting from next_id_.slot_num */ + while (next_id_.slot_num < header.num_slots) { + uint16_t offset = 0; + std::memcpy(&offset, + buffer + sizeof(PageHeader) + (next_id_.slot_num * sizeof(uint16_t)), + sizeof(uint16_t)); + + if (offset != 0) { + /* Found a record: Deserialize it in-place from the pinned buffer */ + const uint8_t* const data = reinterpret_cast(buffer + offset); + const size_t data_len = Page::PAGE_SIZE - offset; + + if (data_len < 16) { + table_.bpm_.unpin_page(table_.filename_, next_id_.page_num, false); + return false; + } + + // Read MVCC Header + std::memcpy(&out_meta.xmin, data, 8); + std::memcpy(&out_meta.xmax, data + 8, 8); + + size_t cursor = 16; + std::vector values; + values.reserve(table_.schema_.column_count()); + + for (size_t i = 0; i < table_.schema_.column_count(); ++i) { + if (cursor >= data_len) break; + auto type = static_cast(data[cursor++]); + if (type == common::ValueType::TYPE_NULL) { + values.push_back(common::Value::make_null()); + continue; + } + + if (type == common::ValueType::TYPE_BOOL || + type == common::ValueType::TYPE_INT8 || + type == common::ValueType::TYPE_INT16 || + type == common::ValueType::TYPE_INT32 || + type == common::ValueType::TYPE_INT64 || + type == common::ValueType::TYPE_FLOAT32 || + type == common::ValueType::TYPE_FLOAT64) { + if (cursor + 8 > data_len) break; + + if (type == common::ValueType::TYPE_FLOAT32 || + type == common::ValueType::TYPE_FLOAT64) { + double v; + std::memcpy(&v, data + cursor, 8); + values.push_back(common::Value::make_float64(v)); + } else { + int64_t v; + std::memcpy(&v, data + cursor, 8); + if (type == common::ValueType::TYPE_BOOL) + values.push_back(common::Value::make_bool(v != 0)); + else + values.push_back(common::Value::make_int64(v)); + } + cursor += 8; + } else { + if (cursor + 4 > data_len) break; + uint32_t len; + std::memcpy(&len, data + cursor, 4); + cursor += 4; + if (cursor + len > data_len) break; + std::string s(reinterpret_cast(data + cursor), len); + cursor += len; + values.push_back(common::Value::make_text(s)); + } + } + + out_meta.tuple = executor::Tuple(std::move(values)); + last_id_ = next_id_; next_id_.slot_num++; - continue; + + table_.bpm_.unpin_page(table_.filename_, next_id_.page_num, false); + return true; } + next_id_.slot_num++; } /* Move to the beginning of the next physical page */ + table_.bpm_.unpin_page(table_.filename_, next_id_.page_num, false); next_id_.page_num++; next_id_.slot_num = 0; - - /* If the next page cannot be read, end of file is reached */ - if (!table_.read_page(next_id_.page_num, buf.data())) { - eof_ = true; - return false; - } - - /* Validate that the page has been initialized */ - PageHeader next_header{}; - std::memcpy(&next_header, buf.data(), sizeof(PageHeader)); - if (next_header.free_space_offset == 0) { - eof_ = true; - return false; - } } } /* --- HeapTable Methods --- */ HeapTable::TupleId HeapTable::insert(const executor::Tuple& tuple, uint64_t xmin) { - uint32_t page_num = 0; - std::array buffer{}; + uint32_t page_num = last_page_id_; + + /* Pre-serialize tuple to binary to determine size and avoid repeat work */ + std::vector payload; + payload.reserve(16 + (tuple.size() * 9)); + + uint64_t xmax = 0; + payload.resize(16); + std::memcpy(payload.data(), &xmin, 8); + std::memcpy(payload.data() + 8, &xmax, 8); + + for (const auto& val : tuple.values()) { + auto type = static_cast(val.type()); + payload.push_back(type); + if (val.is_null()) continue; + + if (val.is_numeric()) { + size_t off = payload.size(); + payload.resize(off + 8); + if (val.is_integer()) { + int64_t v = val.to_int64(); + std::memcpy(payload.data() + off, &v, 8); + } else { + double v = val.to_float64(); + std::memcpy(payload.data() + off, &v, 8); + } + } else { + const std::string& s = val.to_string(); + uint32_t len = static_cast(s.size()); + size_t off = payload.size(); + payload.resize(off + 4 + len); + std::memcpy(payload.data() + off, &len, 4); + std::memcpy(payload.data() + off + 4, s.data(), len); + } + } + + const auto required = static_cast(payload.size()); while (true) { - /* Read existing page or initialize a new one */ - if (!read_page(page_num, buffer.data())) { - std::memset(buffer.data(), 0, Page::PAGE_SIZE); - PageHeader header{}; - header.free_space_offset = - static_cast(sizeof(PageHeader) + (DEFAULT_SLOT_COUNT * sizeof(uint16_t))); - header.num_slots = 0; - std::memcpy(buffer.data(), &header, sizeof(PageHeader)); - static_cast(write_page(page_num, buffer.data())); + Page* page = bpm_.fetch_page(filename_, page_num); + if (!page) { + page = bpm_.new_page(filename_, &page_num); + if (!page) return {0, 0}; } + auto* buffer = page->get_data(); PageHeader header{}; - std::memcpy(&header, buffer.data(), sizeof(PageHeader)); + std::memcpy(&header, buffer, sizeof(PageHeader)); + + // Initialize header if it's a new page if (header.free_space_offset == 0) { header.free_space_offset = static_cast(sizeof(PageHeader) + (DEFAULT_SLOT_COUNT * sizeof(uint16_t))); header.num_slots = 0; } - /* Serialize tuple data prefixed by MVCC header (xmin|xmax|) */ - std::string data_str = std::to_string(xmin) + "|0|"; - for (const auto& val : tuple.values()) { - data_str += val.to_string() + "|"; - } - - const auto required = static_cast(data_str.size() + 1); - /* Check for sufficient free space in the current page */ if (header.free_space_offset + required < Page::PAGE_SIZE && header.num_slots < DEFAULT_SLOT_COUNT) { const uint16_t offset = header.free_space_offset; - std::memcpy(std::next(buffer.data(), static_cast(offset)), - data_str.c_str(), data_str.size() + 1); + + // Copy binary payload directly to page buffer + std::memcpy(buffer + offset, payload.data(), payload.size()); /* Update slot directory */ - std::memcpy(std::next(buffer.data(), - static_cast( - sizeof(PageHeader) + (header.num_slots * sizeof(uint16_t)))), + std::memcpy(buffer + sizeof(PageHeader) + (header.num_slots * sizeof(uint16_t)), &offset, sizeof(uint16_t)); TupleId tid(page_num, header.num_slots); header.num_slots++; header.free_space_offset += required; - std::memcpy(buffer.data(), &header, sizeof(PageHeader)); - static_cast(write_page(page_num, buffer.data())); + std::memcpy(buffer, &header, sizeof(PageHeader)); + bpm_.unpin_page(filename_, page_num, true); + last_page_id_ = page_num; return tid; } /* Page is full; attempt insertion in the next page */ + bpm_.unpin_page(filename_, page_num, false); page_num++; } } @@ -167,143 +254,53 @@ HeapTable::TupleId HeapTable::insert(const executor::Tuple& tuple, uint64_t xmin * @brief Logical deletion: update xmax field in the record blob */ bool HeapTable::remove(const TupleId& tuple_id, uint64_t xmax) { - std::array buffer{}; - if (!read_page(tuple_id.page_num, buffer.data())) { - return false; - } + Page* page = bpm_.fetch_page(filename_, tuple_id.page_num); + if (!page) return false; + auto* buffer = page->get_data(); PageHeader header{}; - std::memcpy(&header, buffer.data(), sizeof(PageHeader)); - if (header.free_space_offset == 0) { - return false; - } - if (tuple_id.slot_num >= header.num_slots) { + std::memcpy(&header, buffer, sizeof(PageHeader)); + if (header.free_space_offset == 0 || tuple_id.slot_num >= header.num_slots) { + bpm_.unpin_page(filename_, tuple_id.page_num, false); return false; } uint16_t offset = 0; - std::memcpy( - &offset, - std::next(buffer.data(), static_cast( - sizeof(PageHeader) + (tuple_id.slot_num * sizeof(uint16_t)))), - sizeof(uint16_t)); + std::memcpy(&offset, buffer + sizeof(PageHeader) + (tuple_id.slot_num * sizeof(uint16_t)), + sizeof(uint16_t)); if (offset == 0) { + bpm_.unpin_page(filename_, tuple_id.page_num, false); return false; } - const char* const data_ptr = std::next(buffer.data(), static_cast(offset)); - const std::string raw_data(data_ptr); + /* In binary format, xmax is at offset + 8 */ + std::memcpy(buffer + offset + 8, &xmax, 8); - std::stringstream ss(raw_data); - std::string segment; - std::vector parts; - while (std::getline(ss, segment, '|')) { - parts.push_back(segment); - } - - if (parts.size() < 2) { - return false; - } - - /* Update xmax field */ - parts[1] = std::to_string(xmax); - - /* Reconstruct record blob */ - std::string new_data; - for (const auto& p : parts) { - new_data += p + "|"; - } - - const auto old_len = raw_data.size() + 1; - const auto new_len = new_data.size() + 1; - - if (new_len <= old_len) { - std::memcpy(std::next(buffer.data(), static_cast(offset)), new_data.c_str(), - new_len); - return write_page(tuple_id.page_num, buffer.data()); - } - - /* Reorganize page to accommodate potentially longer xmax string */ - std::vector all_tuples; - for (uint16_t i = 0; i < header.num_slots; ++i) { - uint16_t slot_off = 0; - std::memcpy(&slot_off, - std::next(buffer.data(), static_cast(sizeof(PageHeader) + - (i * sizeof(uint16_t)))), - sizeof(uint16_t)); - if (slot_off == 0) { - all_tuples.emplace_back(""); - continue; - } - if (i == tuple_id.slot_num) { - all_tuples.push_back(new_data); - } else { - all_tuples.emplace_back( - std::next(buffer.data(), static_cast(slot_off))); - } - } - - std::memset(buffer.data(), 0, Page::PAGE_SIZE); - header.free_space_offset = - static_cast(sizeof(PageHeader) + (DEFAULT_SLOT_COUNT * sizeof(uint16_t))); - header.num_slots = 0; - - for (const auto& t_data : all_tuples) { - if (t_data.empty()) { - const uint16_t zero = 0; - std::memcpy(std::next(buffer.data(), - static_cast( - sizeof(PageHeader) + (header.num_slots * sizeof(uint16_t)))), - &zero, sizeof(uint16_t)); - header.num_slots++; - continue; - } - - const auto req = static_cast(t_data.size() + 1); - if (header.free_space_offset + req > Page::PAGE_SIZE) { - return false; - } - - const uint16_t off = header.free_space_offset; - std::memcpy(std::next(buffer.data(), static_cast(off)), t_data.c_str(), - req); - std::memcpy(std::next(buffer.data(), - static_cast(sizeof(PageHeader) + - (header.num_slots * sizeof(uint16_t)))), - &off, sizeof(uint16_t)); - header.num_slots++; - header.free_space_offset += req; - } - - std::memcpy(buffer.data(), &header, sizeof(PageHeader)); - return write_page(tuple_id.page_num, buffer.data()); + bpm_.unpin_page(filename_, tuple_id.page_num, true); + return true; } /** * @brief Physical deletion: zero out slot offset (rollback only) */ bool HeapTable::physical_remove(const TupleId& tuple_id) { - std::array buffer{}; - if (!read_page(tuple_id.page_num, buffer.data())) { - return false; - } + Page* page = bpm_.fetch_page(filename_, tuple_id.page_num); + if (!page) return false; + auto* buffer = page->get_data(); PageHeader header{}; - std::memcpy(&header, buffer.data(), sizeof(PageHeader)); - if (header.free_space_offset == 0) { - return false; - } - if (tuple_id.slot_num >= header.num_slots) { + std::memcpy(&header, buffer, sizeof(PageHeader)); + if (header.free_space_offset == 0 || tuple_id.slot_num >= header.num_slots) { + bpm_.unpin_page(filename_, tuple_id.page_num, false); return false; } const uint16_t zero = 0; - std::memcpy( - std::next(buffer.data(), static_cast( - sizeof(PageHeader) + (tuple_id.slot_num * sizeof(uint16_t)))), - &zero, sizeof(uint16_t)); + std::memcpy(buffer + sizeof(PageHeader) + (tuple_id.slot_num * sizeof(uint16_t)), &zero, + sizeof(uint16_t)); - return write_page(tuple_id.page_num, buffer.data()); + bpm_.unpin_page(filename_, tuple_id.page_num, true); + return true; } /** @@ -322,88 +319,83 @@ bool HeapTable::update(const TupleId& tuple_id, const executor::Tuple& tuple, ui } bool HeapTable::get_meta(const TupleId& tuple_id, TupleMeta& out_meta) const { - std::array buffer{}; - if (!read_page(tuple_id.page_num, buffer.data())) { - return false; - } + Page* page = bpm_.fetch_page(filename_, tuple_id.page_num); + if (!page) return false; + auto* buffer = page->get_data(); PageHeader header{}; - std::memcpy(&header, buffer.data(), sizeof(PageHeader)); - if (header.free_space_offset == 0) { - return false; - } - if (tuple_id.slot_num >= header.num_slots) { + std::memcpy(&header, buffer, sizeof(PageHeader)); + if (header.free_space_offset == 0 || tuple_id.slot_num >= header.num_slots) { + bpm_.unpin_page(filename_, tuple_id.page_num, false); return false; } uint16_t offset = 0; - std::memcpy( - &offset, - std::next(buffer.data(), static_cast( - sizeof(PageHeader) + (tuple_id.slot_num * sizeof(uint16_t)))), - sizeof(uint16_t)); + std::memcpy(&offset, buffer + sizeof(PageHeader) + (tuple_id.slot_num * sizeof(uint16_t)), + sizeof(uint16_t)); if (offset == 0) { + bpm_.unpin_page(filename_, tuple_id.page_num, false); return false; } - const char* const data = std::next(buffer.data(), static_cast(offset)); - const std::string s(data); - std::stringstream ss(s); - std::string item; + const uint8_t* const data = reinterpret_cast(buffer + offset); + const size_t data_len = Page::PAGE_SIZE - offset; - /* Parse MVCC Header */ - if (!std::getline(ss, item, '|')) { + if (data_len < 16) { + bpm_.unpin_page(filename_, tuple_id.page_num, false); return false; } - try { - out_meta.xmin = std::stoull(item); - } catch (...) { - out_meta.xmin = 0; - } - if (!std::getline(ss, item, '|')) { - return false; - } - try { - out_meta.xmax = std::stoull(item); - } catch (...) { - out_meta.xmax = 0; - } + // Read MVCC Header + std::memcpy(&out_meta.xmin, data, 8); + std::memcpy(&out_meta.xmax, data + 8, 8); - /* Parse Column Values */ + size_t cursor = 16; std::vector values; values.reserve(schema_.column_count()); + for (size_t i = 0; i < schema_.column_count(); ++i) { - if (!std::getline(ss, item, '|')) { - break; + if (cursor >= data_len) break; + auto type = static_cast(data[cursor++]); + if (type == common::ValueType::TYPE_NULL) { + values.push_back(common::Value::make_null()); + continue; } - const auto& col = schema_.get_column(i); - try { - switch (col.type()) { - case common::ValueType::TYPE_INT8: - case common::ValueType::TYPE_INT16: - case common::ValueType::TYPE_INT32: - case common::ValueType::TYPE_INT64: - values.push_back(common::Value::make_int64(std::stoll(item))); - break; - case common::ValueType::TYPE_FLOAT32: - case common::ValueType::TYPE_FLOAT64: - values.push_back(common::Value::make_float64(std::stod(item))); - break; - case common::ValueType::TYPE_BOOL: - values.push_back(common::Value::make_bool(item == "TRUE" || item == "1")); - break; - default: - values.push_back(common::Value::make_text(item)); - break; + if (type == common::ValueType::TYPE_BOOL || type == common::ValueType::TYPE_INT8 || + type == common::ValueType::TYPE_INT16 || type == common::ValueType::TYPE_INT32 || + type == common::ValueType::TYPE_INT64 || type == common::ValueType::TYPE_FLOAT32 || + type == common::ValueType::TYPE_FLOAT64) { + if (cursor + 8 > data_len) break; + + if (type == common::ValueType::TYPE_FLOAT32 || + type == common::ValueType::TYPE_FLOAT64) { + double v; + std::memcpy(&v, data + cursor, 8); + values.push_back(common::Value::make_float64(v)); + } else { + int64_t v; + std::memcpy(&v, data + cursor, 8); + if (type == common::ValueType::TYPE_BOOL) + values.push_back(common::Value::make_bool(v != 0)); + else + values.push_back(common::Value::make_int64(v)); } - } catch (...) { - values.push_back(common::Value::make_null()); + cursor += 8; + } else { + if (cursor + 4 > data_len) break; + uint32_t len; + std::memcpy(&len, data + cursor, 4); + cursor += 4; + if (cursor + len > data_len) break; + std::string s(reinterpret_cast(data + cursor), len); + cursor += len; + values.push_back(common::Value::make_text(s)); } } out_meta.tuple = executor::Tuple(std::move(values)); + bpm_.unpin_page(filename_, tuple_id.page_num, false); return true; } @@ -419,22 +411,29 @@ bool HeapTable::get(const TupleId& tuple_id, executor::Tuple& out_tuple) const { uint64_t HeapTable::tuple_count() const { uint64_t count = 0; uint32_t page_num = 0; - std::array buffer{}; - while (read_page(page_num, buffer.data())) { + while (true) { + Page* page = bpm_.fetch_page(filename_, page_num); + if (!page) break; + + auto* buffer = page->get_data(); PageHeader header{}; - std::memcpy(&header, buffer.data(), sizeof(PageHeader)); + std::memcpy(&header, buffer, sizeof(PageHeader)); if (header.free_space_offset == 0) { + bpm_.unpin_page(filename_, page_num, false); break; } for (uint16_t i = 0; i < header.num_slots; ++i) { - TupleMeta meta; - if (get_meta(TupleId(page_num, i), meta)) { - if (meta.xmax == 0) { - count++; - } + uint16_t offset = 0; + std::memcpy(&offset, buffer + sizeof(PageHeader) + (i * sizeof(uint16_t)), + sizeof(uint16_t)); + if (offset != 0) { + uint64_t xmax = 0; + std::memcpy(&xmax, buffer + offset + 8, 8); + if (xmax == 0) count++; } } + bpm_.unpin_page(filename_, page_num, false); page_num++; } return count; @@ -445,15 +444,21 @@ bool HeapTable::create() { return false; } - std::array buffer{}; - std::memset(buffer.data(), 0, Page::PAGE_SIZE); + uint32_t page_num = 0; + Page* page = bpm_.new_page(filename_, &page_num); + if (!page) return false; + + auto* buffer = page->get_data(); + std::memset(buffer, 0, Page::PAGE_SIZE); PageHeader header{}; header.free_space_offset = static_cast(sizeof(PageHeader) + (DEFAULT_SLOT_COUNT * sizeof(uint16_t))); header.num_slots = 0; - std::memcpy(buffer.data(), &header, sizeof(PageHeader)); + std::memcpy(buffer, &header, sizeof(PageHeader)); - return write_page(0, buffer.data()); + bpm_.unpin_page(filename_, page_num, true); + last_page_id_ = 0; + return true; } bool HeapTable::drop() { @@ -463,9 +468,7 @@ bool HeapTable::drop() { bool HeapTable::read_page(uint32_t page_num, char* buffer) const { Page* page = bpm_.fetch_page(filename_, page_num); - if (!page) { - return false; - } + if (!page) return false; std::memcpy(buffer, page->get_data(), Page::PAGE_SIZE); bpm_.unpin_page(filename_, page_num, false); return true; @@ -475,9 +478,7 @@ bool HeapTable::write_page(uint32_t page_num, const char* buffer) { Page* page = bpm_.fetch_page(filename_, page_num); if (!page) { page = bpm_.new_page(filename_, &page_num); - if (!page) { - return false; - } + if (!page) return false; } std::memcpy(page->get_data(), buffer, Page::PAGE_SIZE); bpm_.unpin_page(filename_, page_num, true); diff --git a/src/storage/lru_replacer.cpp b/src/storage/lru_replacer.cpp index c10d8de..9c8c7b0 100644 --- a/src/storage/lru_replacer.cpp +++ b/src/storage/lru_replacer.cpp @@ -1,6 +1,6 @@ /** * @file lru_replacer.cpp - * @brief Least Recently Used (LRU) tracking implementation + * @brief Least Recently Used (LRU) tracking implementation using CLOCK algorithm */ #include "storage/lru_replacer.hpp" @@ -11,49 +11,69 @@ namespace cloudsql::storage { -LRUReplacer::LRUReplacer(size_t num_pages) : capacity_(num_pages) {} +LRUReplacer::LRUReplacer(size_t num_pages) + : capacity_(num_pages), + in_replacer_(num_pages, false), + referenced_(num_pages, false), + clock_hand_(0), + current_size_(0) {} bool LRUReplacer::victim(uint32_t* frame_id) { const std::scoped_lock lock(latch_); - if (lru_list_.empty()) { + if (current_size_ == 0) { return false; } - *frame_id = lru_list_.back(); - lru_list_.pop_back(); - static_cast(lru_map_.erase(*frame_id)); - return true; + while (true) { + if (in_replacer_[clock_hand_]) { + if (referenced_[clock_hand_]) { + referenced_[clock_hand_] = false; + } else { + // Found a victim + in_replacer_[clock_hand_] = false; + *frame_id = static_cast(clock_hand_); + current_size_--; + + // Move hand forward before returning + clock_hand_ = (clock_hand_ + 1) % capacity_; + return true; + } + } + clock_hand_ = (clock_hand_ + 1) % capacity_; + } } void LRUReplacer::pin(uint32_t frame_id) { const std::scoped_lock lock(latch_); - const auto it = lru_map_.find(frame_id); - if (it != lru_map_.end()) { - static_cast(lru_list_.erase(it->second)); - static_cast(lru_map_.erase(it)); + if (frame_id >= capacity_) { + return; + } + + if (in_replacer_[frame_id]) { + in_replacer_[frame_id] = false; + current_size_--; } } void LRUReplacer::unpin(uint32_t frame_id) { const std::scoped_lock lock(latch_); - if (lru_map_.count(frame_id) != 0) { + if (frame_id >= capacity_) { return; } - if (lru_list_.size() >= capacity_) { - return; + if (!in_replacer_[frame_id]) { + in_replacer_[frame_id] = true; + referenced_[frame_id] = true; + current_size_++; } - - lru_list_.push_front(frame_id); - lru_map_[frame_id] = lru_list_.begin(); } size_t LRUReplacer::size() const { const std::scoped_lock lock(latch_); - return lru_list_.size(); + return current_size_; } } // namespace cloudsql::storage diff --git a/tests/buffer_pool_tests.cpp b/tests/buffer_pool_tests.cpp index 3462e61..b394cef 100644 --- a/tests/buffer_pool_tests.cpp +++ b/tests/buffer_pool_tests.cpp @@ -26,33 +26,36 @@ TEST(BufferPoolTests, LRUReplacerBasic) { LRUReplacer replacer(3); uint32_t victim_frame = 0; + replacer.unpin(0); replacer.unpin(1); replacer.unpin(2); - replacer.unpin(3); EXPECT_EQ(replacer.size(), 3U); + // In CLOCK, unpin(0,1,2) sets ref bits. + // victim() will sweep 0,1,2, clearing ref bits, then pick 0. EXPECT_TRUE(replacer.victim(&victim_frame)); - EXPECT_EQ(victim_frame, 1U); + EXPECT_EQ(victim_frame, 0U); EXPECT_EQ(replacer.size(), 2U); - replacer.unpin(4); + // Add 0 back + replacer.unpin(0); EXPECT_EQ(replacer.size(), 3U); EXPECT_TRUE(replacer.victim(&victim_frame)); - EXPECT_EQ(victim_frame, 2U); + EXPECT_EQ(victim_frame, 1U); EXPECT_EQ(replacer.size(), 2U); - replacer.pin(3); + replacer.pin(2); EXPECT_EQ(replacer.size(), 1U); - replacer.unpin(3); + replacer.unpin(2); EXPECT_EQ(replacer.size(), 2U); EXPECT_TRUE(replacer.victim(&victim_frame)); - EXPECT_EQ(victim_frame, 4U); + EXPECT_EQ(victim_frame, 2U); EXPECT_TRUE(replacer.victim(&victim_frame)); - EXPECT_EQ(victim_frame, 3U); + EXPECT_EQ(victim_frame, 0U); EXPECT_EQ(replacer.size(), 0U); EXPECT_FALSE(replacer.victim(&victim_frame));