Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion crates/transport/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[package]
name = "sqd-network-transport"
license = "AGPL-3.0-or-later"
version = "3.0.0"
version = "3.1.0"
edition = "2021"

[dependencies]
Expand Down
2 changes: 1 addition & 1 deletion crates/transport/src/actors/logs_collector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ pub struct LogsCollectorBehaviour {

impl LogsCollectorBehaviour {
pub fn new(mut base: BaseBehaviour) -> Self {
base.keep_all_connections_alive();
base.maintain_worker_connections();
Self { base: base.into() }
}
}
Expand Down
2 changes: 1 addition & 1 deletion crates/transport/src/actors/pings_collector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ pub struct PingsCollectorBehaviour {

impl PingsCollectorBehaviour {
pub fn new(mut base: BaseBehaviour, _config: PingsCollectorConfig) -> Wrapped<Self> {
base.keep_all_connections_alive();
base.maintain_worker_connections();
Self { base: base.into() }.into()
}
}
Expand Down
7 changes: 6 additions & 1 deletion crates/transport/src/actors/portal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,11 @@ impl PortalTransportHandle {
}
}

#[cfg(feature = "metrics")]
pub fn active_connections(&self) -> u32 {
crate::metrics::ACTIVE_CONNECTIONS.get()
}

pub async fn send_logs(&self, logs: Vec<QueryFinished>) {
let listeners = self.log_listeners.lock().clone();
self.publish_portal_logs(logs, &listeners).await;
Expand Down Expand Up @@ -285,7 +290,7 @@ pub struct PortalBehaviour {

impl PortalBehaviour {
pub fn new(mut base: BaseBehaviour, _config: PortalConfig) -> Wrapped<Self> {
base.keep_all_connections_alive();
base.maintain_worker_connections();
Self {
base: base.into(),
provider_query: Default::default(),
Expand Down
2 changes: 1 addition & 1 deletion crates/transport/src/actors/sql_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ pub struct SQLClientBehaviour {

impl SQLClientBehaviour {
pub fn new(mut base: BaseBehaviour) -> Self {
base.keep_all_connections_alive();
base.maintain_worker_connections();
Self { base: base.into() }
}
}
Expand Down
89 changes: 53 additions & 36 deletions crates/transport/src/behaviour/base.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use std::{
collections::{HashMap, HashSet, VecDeque},
num::NonZeroUsize,
sync::Arc,
task::{Context, Poll},
time::Duration,
vec,
Expand All @@ -16,13 +15,12 @@ use libp2p::{
}
};
use libp2p_swarm_derive::NetworkBehaviour;
use parking_lot::RwLock;
use serde::{Deserialize, Serialize};

use sqd_contract_client::{Client as ContractClient, NetworkNodes};

#[cfg(feature = "metrics")]
use crate::metrics::{ACTIVE_CONNECTIONS, ONGOING_QUERIES};
use crate::metrics::{ACTIVE_CONNECTIONS, ONGOING_LOOKUPS};
use crate::{
behaviour::{
addr_cache::AddressCache,
Expand All @@ -39,7 +37,7 @@ use crate::{
#[cfg(feature = "pubsub")]
use crate::{protocol::MAX_PUBSUB_MSG_SIZE, PubsubBehaviour, PubsubMsg};

use super::stream_client::{self, ClientBehaviour, ClientConfig, StreamClientHandle};
use super::stream_client::{ClientBehaviour, ClientConfig, StreamClientHandle};

#[derive(NetworkBehaviour)]
pub struct InnerBehaviour {
Expand Down Expand Up @@ -70,6 +68,8 @@ pub struct BaseConfig {
pub max_pubsub_msg_size: usize,
/// Maximum number of peers to keep in the address cache (default: 1024)
pub addr_cache_size: NonZeroUsize,
/// Maximum number of concurrent Kademlia lookups for worker discovery (default: 20)
pub max_concurrent_lookups: usize,
}

impl BaseConfig {
Expand All @@ -83,6 +83,7 @@ impl BaseConfig {
let max_pubsub_msg_size = parse_env_var("MAX_PUBSUB_MSG_SIZE", MAX_PUBSUB_MSG_SIZE);
let addr_cache_size = NonZeroUsize::new(parse_env_var("ADDR_CACHE_SIZE", 1024))
.expect("addr_cache_size should be > 0");
let max_concurrent_lookups = parse_env_var("MAX_CONCURRENT_LOOKUPS", 20);
Self {
onchain_update_interval,
autonat_timeout,
Expand All @@ -91,6 +92,7 @@ impl BaseConfig {
#[cfg(feature = "pubsub")]
max_pubsub_msg_size,
addr_cache_size,
max_concurrent_lookups,
}
}
}
Expand All @@ -99,10 +101,13 @@ pub struct BaseBehaviour {
inner: InnerBehaviour,
keypair: Keypair,
pending_events: VecDeque<TToSwarm<Self>>,
ongoing_queries: BiHashMap<PeerId, QueryId>,
ongoing_lookups: BiHashMap<PeerId, QueryId>,
outbound_conns: HashMap<PeerId, u32>,
registered_workers: Arc<RwLock<HashSet<PeerId>>>,
registered_workers: HashSet<PeerId>,
whitelist_initialized: bool,
maintain_worker_connections: bool,
pending_lookups: VecDeque<PeerId>,
max_concurrent_lookups: usize,
}

#[allow(dead_code)]
Expand Down Expand Up @@ -171,10 +176,13 @@ impl BaseBehaviour {
inner,
keypair: keypair.clone(),
pending_events: Default::default(),
ongoing_queries: Default::default(),
ongoing_lookups: Default::default(),
outbound_conns: Default::default(),
registered_workers: Arc::new(RwLock::new(Default::default())),
registered_workers: Default::default(),
whitelist_initialized: false,
maintain_worker_connections: false,
pending_lookups: Default::default(),
max_concurrent_lookups: config.max_concurrent_lookups,
}
}

Expand All @@ -187,8 +195,9 @@ impl BaseBehaviour {
self.inner.kademlia.set_mode(Some(kad::Mode::Server));
}

pub fn keep_all_connections_alive(&mut self) {
pub fn maintain_worker_connections(&mut self) {
self.inner.keep_alive.keep_all_connections_alive();
self.maintain_worker_connections = true;
}

pub fn request_handle(
Expand All @@ -204,25 +213,25 @@ impl BaseBehaviour {
}

pub fn find_and_dial(&mut self, peer_id: PeerId) {
if self.ongoing_queries.contains_left(&peer_id) {
if self.inner.address_cache.contains(&peer_id) {
log::debug!("Dialing peer {peer_id} using cached address");
self.pending_events
.push_back(ToSwarm::Dial { opts: peer_id.into() });
} else if self.ongoing_lookups.contains_left(&peer_id) {
log::debug!("Query for peer {peer_id} already ongoing");
} else {
log::debug!("Starting query for peer {peer_id}");
let query_id = self.inner.kademlia.get_closest_peers(peer_id);
self.ongoing_queries.insert(peer_id, query_id);
self.ongoing_lookups.insert(peer_id, query_id);
#[cfg(feature = "metrics")]
ONGOING_QUERIES.inc();
ONGOING_LOOKUPS.inc();
}
}

pub fn outbound_conn_exists(&self, peer_id: &PeerId) -> bool {
self.outbound_conns.get(peer_id).is_some_and(|x| *x > 0)
}

pub fn can_be_dialed(&self, peer_id: &PeerId) -> bool {
self.outbound_conn_exists(peer_id) || self.inner.address_cache.contains(peer_id)
}

pub fn allow_peer(&mut self, peer_id: PeerId) {
self.inner.whitelist.allow_peer(peer_id);
}
Expand Down Expand Up @@ -270,7 +279,6 @@ impl BehaviourWrapper for BaseBehaviour {
None
}
InnerBehaviourEvent::Whitelist(nodes) => self.on_nodes_update(nodes),
InnerBehaviourEvent::Stream(ev) => self.on_stream_event(ev),
_ => None,
}
}
Expand All @@ -280,6 +288,15 @@ impl BehaviourWrapper for BaseBehaviour {
return Poll::Ready(Some(ev));
}

while self.ongoing_lookups.len() < self.max_concurrent_lookups {
let Some(peer_id) = self.pending_lookups.pop_front() else {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

probably it's better to move condition into while to make code more understandable.

break;
};
if !self.outbound_conn_exists(&peer_id) {
self.find_and_dial(peer_id);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

comment about find_and_dial also managing ongoing_lookups would fit here

}
}

Poll::Pending
}
}
Expand Down Expand Up @@ -309,6 +326,13 @@ impl BaseBehaviour {
Some(x) => *x -= 1,
None => log::error!("Closed connection not established before"),
}
if self.maintain_worker_connections
&& !self.outbound_conn_exists(&peer_id)
&& self.registered_workers.contains(&peer_id)
{
log::debug!("Worker {peer_id} disconnected, scheduling reconnection");
self.pending_lookups.push_back(peer_id);
}
None
}

Expand Down Expand Up @@ -371,7 +395,7 @@ impl BaseBehaviour {
return None;
};

let peer_id = self.ongoing_queries.get_by_right(&query_id)?.to_owned();
let peer_id = self.ongoing_lookups.get_by_right(&query_id)?.to_owned();
let peer_info = match result {
Ok(GetClosestPeersOk { peers, .. })
| Err(GetClosestPeersError::Timeout { peers, .. }) => {
Expand All @@ -383,9 +407,9 @@ impl BaseBehaviour {
// Query finished
if query_finished {
log::debug!("Query for peer {peer_id} finished.");
self.ongoing_queries.remove_by_right(&query_id);
self.ongoing_lookups.remove_by_right(&query_id);
#[cfg(feature = "metrics")]
ONGOING_QUERIES.dec();
ONGOING_LOOKUPS.dec();
}

if let Some(peer_info) = peer_info {
Expand Down Expand Up @@ -425,24 +449,9 @@ impl BaseBehaviour {
None
}

// TODO: consider capturing all dial requests, not only from the stream behaviour
fn on_stream_event(&mut self, ev: stream_client::Event) -> Option<TToSwarm<Self>> {
// When trying to dial an unknown peer, try to find it on DHT first
let stream_client::Event::Dial(opts) = ev;
match opts.get_peer_id() {
Some(peer_id) if !self.can_be_dialed(&peer_id) => {
// The ConnectionId will not correspond to the requested one,
// but it's not used by the stream behaviour anyway
self.find_and_dial(peer_id);
None
}
_ => Some(ToSwarm::Dial { opts }),
}
}

fn on_nodes_update(&mut self, nodes: NetworkNodes) -> Option<TToSwarm<Self>> {
log::debug!("Updating registered workers");
*self.registered_workers.write() = nodes.workers;
self.registered_workers = nodes.workers;

if !self.whitelist_initialized {
self.whitelist_initialized = true;
Expand All @@ -458,6 +467,14 @@ impl BaseBehaviour {
}
}

if self.maintain_worker_connections {
for peer_id in &self.registered_workers {
if !self.outbound_conn_exists(peer_id) {
self.pending_lookups.push_back(*peer_id);
}
}
}

None
}
}
25 changes: 2 additions & 23 deletions crates/transport/src/behaviour/stream_client.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,11 @@
use std::{collections::HashMap, sync::Arc, time::Duration};

use futures::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
use libp2p::{
swarm::{dial_opts::DialOpts, NetworkBehaviour, THandlerInEvent, ToSwarm},
PeerId, StreamProtocol,
};
use libp2p::{PeerId, StreamProtocol};
use libp2p_stream::OpenStreamError;

use crate::{util::StreamWithPayload, BehaviourWrapper};

use super::wrapped::TToSwarm;

#[derive(Debug, Clone, Copy)]
pub struct ClientConfig {
/// The maximum number of open substreams per peer (default: 3)
Expand Down Expand Up @@ -165,29 +160,13 @@ impl ClientBehaviour {
}
}

#[derive(Debug)]
pub enum Event {
Dial(DialOpts),
}

impl BehaviourWrapper for ClientBehaviour {
type Event = Event;
type Event = ();
type Inner = libp2p_stream::Behaviour;

fn inner(&mut self) -> &mut Self::Inner {
&mut self.inner
}

fn on_inner_command(
&mut self,
ev: ToSwarm<<Self::Inner as NetworkBehaviour>::ToSwarm, THandlerInEvent<Self::Inner>>,
) -> impl IntoIterator<Item = TToSwarm<Self>> {
match ev {
// Capture an attempt to dial a peer to be handled by the BaseBehaviour
ToSwarm::Dial { opts } => Some(ToSwarm::GenerateEvent(Event::Dial(opts))),
e => Some(e.map_out(|()| unreachable!("Stream behaviour doesn't produce events"))),
}
}
}

impl From<OpenStreamError> for RequestError {
Expand Down
4 changes: 2 additions & 2 deletions crates/transport/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use tokio::sync::OnceCell;

lazy_static! {
pub static ref ACTIVE_CONNECTIONS: Gauge<u32, AtomicU32> = Default::default();
pub static ref ONGOING_QUERIES: Gauge<u32, AtomicU32> = Default::default();
pub static ref ONGOING_LOOKUPS: Gauge<u32, AtomicU32> = Default::default();
pub static ref QUEUE_SIZE: Family<Vec<(&'static str, &'static str)>, Gauge<u32, AtomicU32>> =
Default::default();
pub static ref DROPPED: Family<Vec<(&'static str, &'static str)>, Counter<u64, AtomicU64>> =
Expand All @@ -33,7 +33,7 @@ pub fn register_metrics(registry: &mut Registry) {
registry.register(
"ongoing_queries",
"The number of ongoing kademlia DHT queries",
ONGOING_QUERIES.clone(),
ONGOING_LOOKUPS.clone(),
);
registry.register(
"queue_size",
Expand Down