Skip to content

Conversation

@klaatu01
Copy link

@klaatu01 klaatu01 commented Oct 25, 2025

As mentioned in #17, your preferred strategy is to use a Stream-based approach so that we don’t need to hold read locks on all shards simultaneously. This PR is a best-effort implementation of that approach.

stream_owned streams cloned (K, V) items by acquiring a read lock on one shard at a time, cloning its contents, and then releasing the lock before yielding. This allows concurrent writes to other shards to continue while iteration proceeds.

stream_shards is a best-effort attempt to provide a borrowed stream of entries (&K, &V) without cloning. Achieving fully borrowed iteration across asynchronous boundaries would require deeper changes to whirlwind internals (for example, storing shard data in Arc or using a custom pinned stream type), which could impact performance and complexity. For now, this implementation yields ShardRead guards one shard at a time, so by-reference iteration is possible per-shard, but not across shards concurrently.

Implements keys, values and entries for K and V that implement Clone.

In summary:

  • Avoids locking all shards at once
  • Supports concurrent writes during iteration
  • Still clones values for stream_owned to keep lifetimes sound
  • Borrowed streaming remains limited to one shard at a time without structural changes

I have put all of this behind a stream feature as it doesn't seem to be an extremely in-demand feature.

@klaatu01 klaatu01 marked this pull request as ready for review October 25, 2025 15:14
@klaatu01 klaatu01 changed the title Implement stream iterator feat: Implement stream iterator Oct 25, 2025
Comment on lines +509 to +558
#[cfg(feature = "stream")]
/// Flattened stream of **owned** `(K, V)` items.
///
/// Locks one shard at a time, snapshots (clones) its entries into a `Vec`, drops the lock,
/// then yields items. This allows concurrent writes to other shards.
///
/// # Example
/// ```
/// use tokio::runtime::Runtime;
/// use futures::{pin_mut, StreamExt};
/// use std::sync::Arc;
/// use whirlwind::ShardMap;
///
/// let rt = Runtime::new().unwrap();
/// let map = Arc::new(ShardMap::new());
/// rt.block_on(async {
/// map.insert(1, "a".to_string()).await;
/// map.insert(2, "b".to_string()).await;
///
/// let s = map.stream_owned();
/// pin_mut!(s);
///
/// let mut items = Vec::new();
/// while let Some((k, v)) = s.next().await {
/// items.push((k, v));
/// }
/// items.sort_by_key(|(k, _)| *k);
/// assert_eq!(items, vec![(1, "a".into()), (2, "b".into())]);
/// });
/// ```
pub fn stream_owned(&self) -> impl Stream<Item = (K, V)> + '_
where
K: Clone,
V: Clone,
{
let shard_stream = self.stream_shards();

stream! {
pin_mut!(shard_stream);

while let Some(shard) = shard_stream.next().await {
let items: Vec<(K, V)> =
shard.iter().map(|(k, v)| (k.clone(), v.clone())).collect();
drop(shard);
for item in items {
yield item;
}
}
}
}
Copy link
Author

@klaatu01 klaatu01 Oct 27, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I might actually remove stream_owned. It’s not particularly ergonomic or idiomatic in practice, mainly because it requires pin_mut! just to iterate. I think it would take some rejigging of the internals to get this working without pin_mut!, and even then would probably need to return a BoxedStream.

Now that we have entries (and keys / values) implemented without locking all shards, stream_owned feels a bit redundant and outside the core responsibility of Whirlwind.

If someone really needs to iterate efficiently, they can always use stream_shards.

In that case I think we could also remove the stream feature.

@willothy what do you think?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant