-
Notifications
You must be signed in to change notification settings - Fork 3
Reliable clients #199
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
kalabukdima
wants to merge
5
commits into
main
Choose a base branch
from
kalabukdima/reliable-clients
base: main
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Reliable clients #199
Changes from all commits
Commits
Show all changes
5 commits
Select commit
Hold shift + click to select a range
52612c4
Rename ongoing_queries to ongoing_lookups
kalabukdima 3d9d9b1
Don't wait for Kad lookups before connecting to the peer
kalabukdima b8f3120
Expose the number of active connections for the readiness check
kalabukdima b54dfb4
Bump minor version
kalabukdima 724e5db
Don't run kademlia lookup for already found peers
kalabukdima File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Oops, something went wrong.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,7 +1,6 @@ | ||
| use std::{ | ||
| collections::{HashMap, HashSet, VecDeque}, | ||
| num::NonZeroUsize, | ||
| sync::Arc, | ||
| task::{Context, Poll}, | ||
| time::Duration, | ||
| vec, | ||
|
|
@@ -16,13 +15,12 @@ use libp2p::{ | |
| } | ||
| }; | ||
| use libp2p_swarm_derive::NetworkBehaviour; | ||
| use parking_lot::RwLock; | ||
| use serde::{Deserialize, Serialize}; | ||
|
|
||
| use sqd_contract_client::{Client as ContractClient, NetworkNodes}; | ||
|
|
||
| #[cfg(feature = "metrics")] | ||
| use crate::metrics::{ACTIVE_CONNECTIONS, ONGOING_QUERIES}; | ||
| use crate::metrics::{ACTIVE_CONNECTIONS, ONGOING_LOOKUPS}; | ||
| use crate::{ | ||
| behaviour::{ | ||
| addr_cache::AddressCache, | ||
|
|
@@ -39,7 +37,7 @@ use crate::{ | |
| #[cfg(feature = "pubsub")] | ||
| use crate::{protocol::MAX_PUBSUB_MSG_SIZE, PubsubBehaviour, PubsubMsg}; | ||
|
|
||
| use super::stream_client::{self, ClientBehaviour, ClientConfig, StreamClientHandle}; | ||
| use super::stream_client::{ClientBehaviour, ClientConfig, StreamClientHandle}; | ||
|
|
||
| #[derive(NetworkBehaviour)] | ||
| pub struct InnerBehaviour { | ||
|
|
@@ -70,6 +68,8 @@ pub struct BaseConfig { | |
| pub max_pubsub_msg_size: usize, | ||
| /// Maximum number of peers to keep in the address cache (default: 1024) | ||
| pub addr_cache_size: NonZeroUsize, | ||
| /// Maximum number of concurrent Kademlia lookups for worker discovery (default: 20) | ||
| pub max_concurrent_lookups: usize, | ||
| } | ||
|
|
||
| impl BaseConfig { | ||
|
|
@@ -83,6 +83,7 @@ impl BaseConfig { | |
| let max_pubsub_msg_size = parse_env_var("MAX_PUBSUB_MSG_SIZE", MAX_PUBSUB_MSG_SIZE); | ||
| let addr_cache_size = NonZeroUsize::new(parse_env_var("ADDR_CACHE_SIZE", 1024)) | ||
| .expect("addr_cache_size should be > 0"); | ||
| let max_concurrent_lookups = parse_env_var("MAX_CONCURRENT_LOOKUPS", 20); | ||
| Self { | ||
| onchain_update_interval, | ||
| autonat_timeout, | ||
|
|
@@ -91,6 +92,7 @@ impl BaseConfig { | |
| #[cfg(feature = "pubsub")] | ||
| max_pubsub_msg_size, | ||
| addr_cache_size, | ||
| max_concurrent_lookups, | ||
| } | ||
| } | ||
| } | ||
|
|
@@ -99,10 +101,13 @@ pub struct BaseBehaviour { | |
| inner: InnerBehaviour, | ||
| keypair: Keypair, | ||
| pending_events: VecDeque<TToSwarm<Self>>, | ||
| ongoing_queries: BiHashMap<PeerId, QueryId>, | ||
| ongoing_lookups: BiHashMap<PeerId, QueryId>, | ||
| outbound_conns: HashMap<PeerId, u32>, | ||
| registered_workers: Arc<RwLock<HashSet<PeerId>>>, | ||
| registered_workers: HashSet<PeerId>, | ||
| whitelist_initialized: bool, | ||
| maintain_worker_connections: bool, | ||
| pending_lookups: VecDeque<PeerId>, | ||
| max_concurrent_lookups: usize, | ||
| } | ||
|
|
||
| #[allow(dead_code)] | ||
|
|
@@ -171,10 +176,13 @@ impl BaseBehaviour { | |
| inner, | ||
| keypair: keypair.clone(), | ||
| pending_events: Default::default(), | ||
| ongoing_queries: Default::default(), | ||
| ongoing_lookups: Default::default(), | ||
| outbound_conns: Default::default(), | ||
| registered_workers: Arc::new(RwLock::new(Default::default())), | ||
| registered_workers: Default::default(), | ||
| whitelist_initialized: false, | ||
| maintain_worker_connections: false, | ||
| pending_lookups: Default::default(), | ||
| max_concurrent_lookups: config.max_concurrent_lookups, | ||
| } | ||
| } | ||
|
|
||
|
|
@@ -187,8 +195,9 @@ impl BaseBehaviour { | |
| self.inner.kademlia.set_mode(Some(kad::Mode::Server)); | ||
| } | ||
|
|
||
| pub fn keep_all_connections_alive(&mut self) { | ||
| pub fn maintain_worker_connections(&mut self) { | ||
| self.inner.keep_alive.keep_all_connections_alive(); | ||
| self.maintain_worker_connections = true; | ||
| } | ||
|
|
||
| pub fn request_handle( | ||
|
|
@@ -204,25 +213,25 @@ impl BaseBehaviour { | |
| } | ||
|
|
||
| pub fn find_and_dial(&mut self, peer_id: PeerId) { | ||
| if self.ongoing_queries.contains_left(&peer_id) { | ||
| if self.inner.address_cache.contains(&peer_id) { | ||
| log::debug!("Dialing peer {peer_id} using cached address"); | ||
| self.pending_events | ||
| .push_back(ToSwarm::Dial { opts: peer_id.into() }); | ||
| } else if self.ongoing_lookups.contains_left(&peer_id) { | ||
| log::debug!("Query for peer {peer_id} already ongoing"); | ||
| } else { | ||
| log::debug!("Starting query for peer {peer_id}"); | ||
| let query_id = self.inner.kademlia.get_closest_peers(peer_id); | ||
| self.ongoing_queries.insert(peer_id, query_id); | ||
| self.ongoing_lookups.insert(peer_id, query_id); | ||
| #[cfg(feature = "metrics")] | ||
| ONGOING_QUERIES.inc(); | ||
| ONGOING_LOOKUPS.inc(); | ||
| } | ||
| } | ||
|
|
||
| pub fn outbound_conn_exists(&self, peer_id: &PeerId) -> bool { | ||
| self.outbound_conns.get(peer_id).is_some_and(|x| *x > 0) | ||
| } | ||
|
|
||
| pub fn can_be_dialed(&self, peer_id: &PeerId) -> bool { | ||
| self.outbound_conn_exists(peer_id) || self.inner.address_cache.contains(peer_id) | ||
| } | ||
|
|
||
| pub fn allow_peer(&mut self, peer_id: PeerId) { | ||
| self.inner.whitelist.allow_peer(peer_id); | ||
| } | ||
|
|
@@ -270,7 +279,6 @@ impl BehaviourWrapper for BaseBehaviour { | |
| None | ||
| } | ||
| InnerBehaviourEvent::Whitelist(nodes) => self.on_nodes_update(nodes), | ||
| InnerBehaviourEvent::Stream(ev) => self.on_stream_event(ev), | ||
| _ => None, | ||
| } | ||
| } | ||
|
|
@@ -280,6 +288,15 @@ impl BehaviourWrapper for BaseBehaviour { | |
| return Poll::Ready(Some(ev)); | ||
| } | ||
|
|
||
| while self.ongoing_lookups.len() < self.max_concurrent_lookups { | ||
| let Some(peer_id) = self.pending_lookups.pop_front() else { | ||
| break; | ||
| }; | ||
| if !self.outbound_conn_exists(&peer_id) { | ||
| self.find_and_dial(peer_id); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. comment about find_and_dial also managing ongoing_lookups would fit here |
||
| } | ||
| } | ||
|
|
||
| Poll::Pending | ||
| } | ||
| } | ||
|
|
@@ -309,6 +326,13 @@ impl BaseBehaviour { | |
| Some(x) => *x -= 1, | ||
| None => log::error!("Closed connection not established before"), | ||
| } | ||
| if self.maintain_worker_connections | ||
| && !self.outbound_conn_exists(&peer_id) | ||
| && self.registered_workers.contains(&peer_id) | ||
| { | ||
| log::debug!("Worker {peer_id} disconnected, scheduling reconnection"); | ||
| self.pending_lookups.push_back(peer_id); | ||
| } | ||
| None | ||
| } | ||
|
|
||
|
|
@@ -371,7 +395,7 @@ impl BaseBehaviour { | |
| return None; | ||
| }; | ||
|
|
||
| let peer_id = self.ongoing_queries.get_by_right(&query_id)?.to_owned(); | ||
| let peer_id = self.ongoing_lookups.get_by_right(&query_id)?.to_owned(); | ||
| let peer_info = match result { | ||
| Ok(GetClosestPeersOk { peers, .. }) | ||
| | Err(GetClosestPeersError::Timeout { peers, .. }) => { | ||
|
|
@@ -383,9 +407,9 @@ impl BaseBehaviour { | |
| // Query finished | ||
| if query_finished { | ||
| log::debug!("Query for peer {peer_id} finished."); | ||
| self.ongoing_queries.remove_by_right(&query_id); | ||
| self.ongoing_lookups.remove_by_right(&query_id); | ||
| #[cfg(feature = "metrics")] | ||
| ONGOING_QUERIES.dec(); | ||
| ONGOING_LOOKUPS.dec(); | ||
| } | ||
|
|
||
| if let Some(peer_info) = peer_info { | ||
|
|
@@ -425,24 +449,9 @@ impl BaseBehaviour { | |
| None | ||
| } | ||
|
|
||
| // TODO: consider capturing all dial requests, not only from the stream behaviour | ||
| fn on_stream_event(&mut self, ev: stream_client::Event) -> Option<TToSwarm<Self>> { | ||
| // When trying to dial an unknown peer, try to find it on DHT first | ||
| let stream_client::Event::Dial(opts) = ev; | ||
| match opts.get_peer_id() { | ||
| Some(peer_id) if !self.can_be_dialed(&peer_id) => { | ||
| // The ConnectionId will not correspond to the requested one, | ||
| // but it's not used by the stream behaviour anyway | ||
| self.find_and_dial(peer_id); | ||
| None | ||
| } | ||
| _ => Some(ToSwarm::Dial { opts }), | ||
| } | ||
| } | ||
|
|
||
| fn on_nodes_update(&mut self, nodes: NetworkNodes) -> Option<TToSwarm<Self>> { | ||
| log::debug!("Updating registered workers"); | ||
| *self.registered_workers.write() = nodes.workers; | ||
| self.registered_workers = nodes.workers; | ||
|
|
||
| if !self.whitelist_initialized { | ||
| self.whitelist_initialized = true; | ||
|
|
@@ -458,6 +467,14 @@ impl BaseBehaviour { | |
| } | ||
| } | ||
|
|
||
| if self.maintain_worker_connections { | ||
| for peer_id in &self.registered_workers { | ||
| if !self.outbound_conn_exists(peer_id) { | ||
| self.pending_lookups.push_back(*peer_id); | ||
| } | ||
| } | ||
| } | ||
|
|
||
| None | ||
| } | ||
| } | ||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
probably it's better to move condition into while to make code more understandable.