Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 22 additions & 53 deletions fastpool/src/bounded.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,9 @@ use mea::semaphore::Semaphore;
use crate::ManageObject;
use crate::ObjectStatus;
use crate::QueueStrategy;
use crate::RetainResult;
use crate::mutex::Mutex;
use crate::retain_spec;

/// The configuration of [`Pool`].
#[derive(Clone, Copy, Debug)]
Expand Down Expand Up @@ -117,16 +119,6 @@ impl PoolConfig {
}
}

/// The result returned by [`Pool::retain`].
#[derive(Debug)]
#[non_exhaustive]
pub struct RetainResult<T> {
/// The number of retained objects.
pub retained: usize,
/// The objects removed from the pool.
pub removed: Vec<T>,
}

/// The current pool status.
///
/// See [`Pool::status`].
Expand Down Expand Up @@ -288,51 +280,12 @@ impl<M: ManageObject> Pool<M> {
/// ```
pub fn retain(
&self,
mut f: impl FnMut(&mut M::Object, ObjectStatus) -> bool,
f: impl FnMut(&mut M::Object, ObjectStatus) -> bool,
) -> RetainResult<M::Object> {
let mut slots = self.slots.lock();

let len = slots.deque.len();
let mut idx = 0;
let mut cur = 0;

// Stage 1: All values are retained.
while cur < len {
let state = &mut slots.deque[cur];
if !f(&mut state.o, state.status) {
cur += 1;
break;
}
cur += 1;
idx += 1;
}

// Stage 2: Swap retained value into current idx.
while cur < len {
let state = &mut slots.deque[cur];
if !f(&mut state.o, state.status) {
cur += 1;
continue;
}

slots.deque.swap(idx, cur);
cur += 1;
idx += 1;
}

// Stage 3: Truncate all values after idx.
let removed = if cur != idx {
let removed = slots.deque.split_off(idx);
slots.current_size -= removed.len();
removed.into_iter().map(|state| state.o).collect()
} else {
Vec::new()
};

RetainResult {
retained: idx,
removed,
}
let result = retain_spec::do_vec_deque_retain(&mut slots.deque, f);
slots.current_size -= result.removed.len();
result
}

/// Returns the current status of the pool.
Expand Down Expand Up @@ -525,3 +478,19 @@ struct ObjectState<T> {
o: T,
status: ObjectStatus,
}

impl<T> retain_spec::SealedState for ObjectState<T> {
type Object = T;

fn status(&self) -> ObjectStatus {
self.status
}

fn mut_object(&mut self) -> &mut Self::Object {
&mut self.o
}

fn take_object(self) -> Self::Object {
self.o
}
}
2 changes: 2 additions & 0 deletions fastpool/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -198,9 +198,11 @@
pub use common::ManageObject;
pub use common::ObjectStatus;
pub use common::QueueStrategy;
pub use retain_spec::RetainResult;

mod common;
mod mutex;
mod retain_spec;

pub mod bounded;
pub mod unbounded;
83 changes: 83 additions & 0 deletions fastpool/src/retain_spec.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
// Copyright 2025 FastLabs Developers
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::VecDeque;

use crate::ObjectStatus;

/// The result returned by `Pool::retain`.
#[derive(Debug)]
#[non_exhaustive]
pub struct RetainResult<T> {
/// The number of retained objects.
pub retained: usize,
/// The objects removed from the pool.
pub removed: Vec<T>,
}

pub(crate) trait SealedState {
type Object;

fn status(&self) -> ObjectStatus;
fn mut_object(&mut self) -> &mut Self::Object;
fn take_object(self) -> Self::Object;
}

pub(crate) fn do_vec_deque_retain<T, State: SealedState<Object = T>>(
deque: &mut VecDeque<State>,
mut f: impl FnMut(&mut T, ObjectStatus) -> bool,
) -> RetainResult<T> {
let len = deque.len();
let mut idx = 0;
let mut cur = 0;

// Stage 1: All values are retained.
while cur < len {
let state = &mut deque[cur];
let status = state.status();
if !f(state.mut_object(), status) {
cur += 1;
break;
}
cur += 1;
idx += 1;
}

// Stage 2: Swap retained value into current idx.
while cur < len {
let state = &mut deque[cur];
let status = state.status();
if !f(state.mut_object(), status) {
cur += 1;
continue;
}

deque.swap(idx, cur);
cur += 1;
idx += 1;
}

// Stage 3: Truncate all values after idx.
let removed = if cur != idx {
let removed = deque.split_off(idx);
removed.into_iter().map(State::take_object).collect()
} else {
Vec::new()
};

RetainResult {
retained: idx,
removed,
}
}
52 changes: 52 additions & 0 deletions fastpool/src/unbounded.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,9 @@ use std::sync::Weak;
use crate::ManageObject;
use crate::ObjectStatus;
use crate::QueueStrategy;
use crate::RetainResult;
use crate::mutex::Mutex;
use crate::retain_spec;

/// The configuration of [`Pool`].
#[derive(Clone, Copy, Debug)]
Expand Down Expand Up @@ -388,6 +390,40 @@ impl<T, M: ManageObject<Object = T>> Pool<T, M> {
}
}

/// Retains only the objects that pass the given predicate.
///
/// This function blocks the entire pool. Therefore, the given function should not block.
///
/// The following example starts a background task that runs every 30 seconds and removes
/// objects from the pool that have not been used for more than one minute. The task will
/// terminate if the pool is dropped.
///
/// ```rust,ignore
/// let interval = Duration::from_secs(30);
/// let max_age = Duration::from_secs(60);
///
/// let weak_pool = Arc::downgrade(&pool);
/// tokio::spawn(async move {
/// loop {
/// tokio::time::sleep(interval).await;
/// if let Some(pool) = weak_pool.upgrade() {
/// pool.retain(|_, status| status.last_used().elapsed() < max_age);
/// } else {
/// break;
/// }
/// }
/// });
/// ```
pub fn retain(
&self,
f: impl FnMut(&mut M::Object, ObjectStatus) -> bool,
) -> RetainResult<M::Object> {
let mut slots = self.slots.lock();
let result = retain_spec::do_vec_deque_retain(&mut slots.deque, f);
slots.current_size -= result.removed.len();
result
}

/// Returns the current status of the pool.
///
/// The status returned by the pool is not guaranteed to be consistent.
Expand Down Expand Up @@ -537,3 +573,19 @@ struct ObjectState<T> {
o: T,
status: ObjectStatus,
}

impl<T> retain_spec::SealedState for ObjectState<T> {
type Object = T;

fn status(&self) -> ObjectStatus {
self.status
}

fn mut_object(&mut self) -> &mut Self::Object {
&mut self.o
}

fn take_object(self) -> Self::Object {
self.o
}
}