-
Notifications
You must be signed in to change notification settings - Fork 11
feat: Implement stream iterator #18
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
| #[cfg(feature = "stream")] | ||
| /// Flattened stream of **owned** `(K, V)` items. | ||
| /// | ||
| /// Locks one shard at a time, snapshots (clones) its entries into a `Vec`, drops the lock, | ||
| /// then yields items. This allows concurrent writes to other shards. | ||
| /// | ||
| /// # Example | ||
| /// ``` | ||
| /// use tokio::runtime::Runtime; | ||
| /// use futures::{pin_mut, StreamExt}; | ||
| /// use std::sync::Arc; | ||
| /// use whirlwind::ShardMap; | ||
| /// | ||
| /// let rt = Runtime::new().unwrap(); | ||
| /// let map = Arc::new(ShardMap::new()); | ||
| /// rt.block_on(async { | ||
| /// map.insert(1, "a".to_string()).await; | ||
| /// map.insert(2, "b".to_string()).await; | ||
| /// | ||
| /// let s = map.stream_owned(); | ||
| /// pin_mut!(s); | ||
| /// | ||
| /// let mut items = Vec::new(); | ||
| /// while let Some((k, v)) = s.next().await { | ||
| /// items.push((k, v)); | ||
| /// } | ||
| /// items.sort_by_key(|(k, _)| *k); | ||
| /// assert_eq!(items, vec![(1, "a".into()), (2, "b".into())]); | ||
| /// }); | ||
| /// ``` | ||
| pub fn stream_owned(&self) -> impl Stream<Item = (K, V)> + '_ | ||
| where | ||
| K: Clone, | ||
| V: Clone, | ||
| { | ||
| let shard_stream = self.stream_shards(); | ||
|
|
||
| stream! { | ||
| pin_mut!(shard_stream); | ||
|
|
||
| while let Some(shard) = shard_stream.next().await { | ||
| let items: Vec<(K, V)> = | ||
| shard.iter().map(|(k, v)| (k.clone(), v.clone())).collect(); | ||
| drop(shard); | ||
| for item in items { | ||
| yield item; | ||
| } | ||
| } | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I might actually remove stream_owned. It’s not particularly ergonomic or idiomatic in practice, mainly because it requires pin_mut! just to iterate. I think it would take some rejigging of the internals to get this working without pin_mut!, and even then would probably need to return a BoxedStream.
Now that we have entries (and keys / values) implemented without locking all shards, stream_owned feels a bit redundant and outside the core responsibility of Whirlwind.
If someone really needs to iterate efficiently, they can always use stream_shards.
In that case I think we could also remove the stream feature.
@willothy what do you think?
As mentioned in #17, your preferred strategy is to use a Stream-based approach so that we don’t need to hold read locks on all shards simultaneously. This PR is a best-effort implementation of that approach.
stream_ownedstreams cloned(K, V)items by acquiring a read lock on one shard at a time, cloning its contents, and then releasing the lock before yielding. This allows concurrent writes to other shards to continue while iteration proceeds.stream_shardsis a best-effort attempt to provide a borrowed stream of entries(&K, &V)without cloning. Achieving fully borrowed iteration across asynchronous boundaries would require deeper changes to whirlwind internals (for example, storing shard data in Arc or using a custom pinned stream type), which could impact performance and complexity. For now, this implementation yieldsShardReadguards one shard at a time, so by-reference iteration is possible per-shard, but not across shards concurrently.Implements
keys,valuesandentriesforKandVthat implementClone.In summary:
I have put all of this behind a
streamfeature as it doesn't seem to be an extremely in-demand feature.