-
Notifications
You must be signed in to change notification settings - Fork 488
[enhance] buffer storage #83
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
iaojnh
wants to merge
36
commits into
main
Choose a base branch
from
feat/buffer_storage_vec
base: main
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Changes from all commits
Commits
Show all changes
36 commits
Select commit
Hold shift + click to select a range
b307c97
add buffer pool & spsc_queue
iaojnh a96e684
add buffer pool & open buffer storage ut
iaojnh 03e4dbc
modify buffer pool
iaojnh 7df2716
upd buffer pool
iaojnh 2c610cf
Merge branch 'main' into feat/buffer_storage_vec
iaojnh 11a0e47
clang format
iaojnh 55e6f1b
clang format
iaojnh b24d921
clang format
iaojnh 8916f90
clang format
iaojnh e3d014c
fix bugs
iaojnh 4694835
Merge branch 'main' into feat/buffer_storage_vec
iaojnh ed6a3f2
fix complie“
iaojnh 95b1c16
clang format
iaojnh d6db41d
fix ut
iaojnh 8dd8e48
fix: combined indexer should use key instead of index (#87)
chinaux 753cc0d
feat: support ai extension (#88)
Cuiyus b83cf52
fix(py): py with build-in package typing not typing_extensions (#99)
Cuiyus c79f0b0
minor: add installation instruction for node.js package (#103)
zhourrr 42fa524
feat(ci): macos ci with github-runner (#94)
Cuiyus 34e7ced
minor: add links to package repository
zhourrr a4f3de8
chore: add trend badge (#132)
feihongxu0824 d72a074
docs: fix repository URL in CONTRIBUTING.md (#139)
Junio243 39f0437
fix(docs): fix typo in README align attr and Python version in CONTRI…
cluster2600 e956192
ci: continuous benching (#110)
JalinWang 779c63d
docs: adjust join us in the readme. (#168)
Cuiyus a7c6aa1
chore: enable the conventional-pre-commit run sucess and update to la…
SYaoJun fc988e3
Upgrade GitHub Actions for Node 24 compatibility (#129)
salmanmkc 49e2d34
Upgrade GitHub Actions to latest versions (#130)
salmanmkc 1dfeda6
feat(ci): ci workflow with github-hosted runner (#171)
Cuiyus 1fef6e6
feat: add jina embeddings v5 support (#156)
hanxiao 573f20f
Merge branch 'main' into feat/buffer_storage_vec
iaojnh 98918f8
Merge branch 'main' into feat/buffer_storage_vec
iaojnh a472bc8
Merge branch 'main' into feat/buffer_storage_vec
iaojnh 09f7f45
fix buffer storage
iaojnh 2e42b6b
fix memory leak
iaojnh 173fc58
fix memory leak
iaojnh File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,243 @@ | ||
| #include <zvec/ailego/buffer/buffer_pool.h> | ||
| #include <zvec/core/framework/index_logger.h> | ||
|
|
||
| namespace zvec { | ||
| namespace ailego { | ||
|
|
||
| int LRUCache::init(size_t block_size) { | ||
| block_size_ = block_size; | ||
| for (size_t i = 0; i < CATCH_QUEUE_NUM; i++) { | ||
| queues_.push_back(ConcurrentQueue(block_size)); | ||
| } | ||
| return 0; | ||
| } | ||
|
|
||
| bool LRUCache::evict_single_block(BlockType &item) { | ||
| bool found = false; | ||
| for (size_t i = 0; i < CATCH_QUEUE_NUM; i++) { | ||
| found = queues_[i].try_dequeue(item); | ||
| if (found) { | ||
| break; | ||
| } | ||
| } | ||
| return found; | ||
| } | ||
|
|
||
| bool LRUCache::add_single_block(const LPMap *lp_map, const BlockType &block, | ||
| int block_type) { | ||
| bool ok = queues_[block_type].try_enqueue(block); | ||
| evict_queue_insertions_.fetch_add(1, std::memory_order_relaxed); | ||
| if (evict_queue_insertions_ % block_size_ == 0) { | ||
| this->clear_dead_node(lp_map); | ||
| } | ||
| return ok; | ||
| } | ||
|
|
||
| void LRUCache::clear_dead_node(const LPMap *lp_map) { | ||
| for (size_t i = 0; i < CATCH_QUEUE_NUM; i++) { | ||
| size_t clear_size = block_size_ * 2; | ||
| if (queues_[i].size_approx() < clear_size * 4) { | ||
| continue; | ||
| } | ||
| size_t clear_count = 0; | ||
| ConcurrentQueue tmp(block_size_); | ||
| BlockType item; | ||
| while (queues_[i].try_dequeue(item) && (clear_count++ < clear_size)) { | ||
| if (!lp_map->isDeadBlock(item)) { | ||
| tmp.try_enqueue(item); | ||
| } | ||
| } | ||
| while (tmp.try_dequeue(item)) { | ||
| if (!lp_map->isDeadBlock(item)) { | ||
| queues_[i].try_enqueue(item); | ||
| } | ||
| } | ||
| } | ||
| } | ||
|
|
||
| void LPMap::init(size_t entry_num) { | ||
| if (entries_) { | ||
| delete[] entries_; | ||
| } | ||
| entry_num_ = entry_num; | ||
| entries_ = new Entry[entry_num_]; | ||
| for (size_t i = 0; i < entry_num_; i++) { | ||
| entries_[i].ref_count.store(std::numeric_limits<int>::min()); | ||
| entries_[i].load_count.store(0); | ||
| entries_[i].buffer = nullptr; | ||
| } | ||
| cache_.init(entry_num * 4); | ||
| } | ||
|
|
||
| char *LPMap::acquire_block(block_id_t block_id) { | ||
| assert(block_id < entry_num_); | ||
| Entry &entry = entries_[block_id]; | ||
| if (entry.ref_count.load(std::memory_order_relaxed) == 0) { | ||
| entry.load_count.fetch_add(1, std::memory_order_relaxed); | ||
| } | ||
| entry.ref_count.fetch_add(1, std::memory_order_relaxed); | ||
| if (entry.ref_count.load(std::memory_order_relaxed) < 0) { | ||
| return nullptr; | ||
| } | ||
| return entry.buffer; | ||
| } | ||
|
|
||
| void LPMap::release_block(block_id_t block_id) { | ||
| assert(block_id < entry_num_); | ||
| Entry &entry = entries_[block_id]; | ||
|
|
||
| if (entry.ref_count.fetch_sub(1, std::memory_order_release) == 1) { | ||
| std::atomic_thread_fence(std::memory_order_acquire); | ||
| LRUCache::BlockType block; | ||
| block.first = block_id; | ||
| block.second = entry.load_count.load(); | ||
| cache_.add_single_block(this, block, 0); | ||
| } | ||
| } | ||
|
|
||
| char *LPMap::evict_block(block_id_t block_id) { | ||
| assert(block_id < entry_num_); | ||
| Entry &entry = entries_[block_id]; | ||
| int expected = 0; | ||
| if (entry.ref_count.compare_exchange_strong( | ||
| expected, std::numeric_limits<int>::min())) { | ||
| char *buffer = entry.buffer; | ||
| entry.buffer = nullptr; | ||
| return buffer; | ||
| } else { | ||
| return nullptr; | ||
| } | ||
| } | ||
|
|
||
| char *LPMap::set_block_acquired(block_id_t block_id, char *buffer) { | ||
| assert(block_id < entry_num_); | ||
| Entry &entry = entries_[block_id]; | ||
| if (entry.ref_count.load(std::memory_order_relaxed) >= 0) { | ||
| entry.ref_count.fetch_add(1, std::memory_order_relaxed); | ||
| return entry.buffer; | ||
| } | ||
| entry.buffer = buffer; | ||
| entry.ref_count.store(1, std::memory_order_relaxed); | ||
| entry.load_count.fetch_add(1, std::memory_order_relaxed); | ||
| return buffer; | ||
| } | ||
|
|
||
| void LPMap::recycle(moodycamel::ConcurrentQueue<char *> &free_buffers) { | ||
| LRUCache::BlockType block; | ||
| do { | ||
| bool ok = cache_.evict_single_block(block); | ||
| if (!ok) { | ||
| return; | ||
| } | ||
| } while (isDeadBlock(block)); | ||
| char *buffer = evict_block(block.first); | ||
| if (buffer) { | ||
| free_buffers.try_enqueue(buffer); | ||
| } | ||
| } | ||
|
|
||
| VecBufferPool::VecBufferPool(const std::string &filename) { | ||
| fd_ = open(filename.c_str(), O_RDONLY); | ||
| if (fd_ < 0) { | ||
| throw std::runtime_error("Failed to open file: " + filename); | ||
| } | ||
| struct stat st; | ||
| if (fstat(fd_, &st) < 0) { | ||
| throw std::runtime_error("Failed to stat file: " + filename); | ||
| } | ||
| file_size_ = st.st_size; | ||
| } | ||
|
|
||
| int VecBufferPool::init(size_t pool_capacity, size_t block_size) { | ||
| pool_capacity_ = pool_capacity; | ||
| size_t buffer_num = pool_capacity_ / block_size + 10; | ||
| size_t block_num = file_size_ / block_size + 10; | ||
| lp_map_.init(block_num); | ||
| for (size_t i = 0; i < buffer_num; i++) { | ||
| char *buffer = (char *)aligned_alloc(64, block_size); | ||
| if (buffer != nullptr) { | ||
| free_buffers_.try_enqueue(buffer); | ||
| } else { | ||
| LOG_ERROR("aligned_alloc %zu(size: %zu) failed", i, block_size); | ||
| return -1; | ||
| } | ||
|
Comment on lines
+157
to
+163
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. memory leak when |
||
| } | ||
| LOG_DEBUG("Buffer pool num: %zu, entry num: %zu", buffer_num, | ||
| lp_map_.entry_num()); | ||
| return 0; | ||
| } | ||
|
|
||
| VecBufferPoolHandle VecBufferPool::get_handle() { | ||
| return VecBufferPoolHandle(*this); | ||
| } | ||
|
|
||
| char *VecBufferPool::acquire_buffer(block_id_t block_id, size_t offset, | ||
| size_t size, int retry) { | ||
| char *buffer = lp_map_.acquire_block(block_id); | ||
| if (buffer) { | ||
| return buffer; | ||
| } | ||
| { | ||
| bool found = free_buffers_.try_dequeue(buffer); | ||
| if (!found) { | ||
| for (int i = 0; i < retry; i++) { | ||
| lp_map_.recycle(free_buffers_); | ||
| found = free_buffers_.try_dequeue(buffer); | ||
| if (found) { | ||
| break; | ||
| } | ||
| } | ||
| } | ||
| if (!found) { | ||
| LOG_ERROR("Buffer pool failed to get free buffer"); | ||
| return nullptr; | ||
| } | ||
| } | ||
|
|
||
| ssize_t read_bytes = pread(fd_, buffer, size, offset); | ||
| if (read_bytes != static_cast<ssize_t>(size)) { | ||
| LOG_ERROR("Buffer pool failed to read file at offset: %zu", offset); | ||
| free_buffers_.try_enqueue(buffer); | ||
| return nullptr; | ||
| } | ||
iaojnh marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| char *placed_buffer = nullptr; | ||
| { | ||
| std::lock_guard<std::mutex> lock(mutex_); | ||
| placed_buffer = lp_map_.set_block_acquired(block_id, buffer); | ||
| } | ||
| if (placed_buffer != buffer) { | ||
| // another thread has set the block | ||
| free_buffers_.try_enqueue(buffer); | ||
| } | ||
| return placed_buffer; | ||
| } | ||
|
|
||
| int VecBufferPool::get_meta(size_t offset, size_t length, char *buffer) { | ||
| ssize_t read_bytes = pread(fd_, buffer, length, offset); | ||
| if (read_bytes != static_cast<ssize_t>(length)) { | ||
| LOG_ERROR("Buffer pool failed to read file at offset: %zu", offset); | ||
| return -1; | ||
| } | ||
| return 0; | ||
| } | ||
|
|
||
| char *VecBufferPoolHandle::get_block(size_t offset, size_t size, | ||
| size_t block_id) { | ||
| char *buffer = pool.acquire_buffer(block_id, offset, size, 5); | ||
| return buffer; | ||
| } | ||
|
|
||
| int VecBufferPoolHandle::get_meta(size_t offset, size_t length, char *buffer) { | ||
| return pool.get_meta(offset, length, buffer); | ||
| } | ||
|
|
||
| void VecBufferPoolHandle::release_one(block_id_t block_id) { | ||
| pool.lp_map_.release_block(block_id); | ||
| } | ||
|
|
||
| void VecBufferPoolHandle::acquire_one(block_id_t block_id) { | ||
| pool.lp_map_.acquire_block(block_id); | ||
| } | ||
|
|
||
| } // namespace ailego | ||
| } // namespace zvec | ||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
race condition: ref_count is checked at line 75, incremented at 78, then checked again at 79 - another thread could evict the block between these operations, making
entry.bufferinvalid by line 82