From 5879e5fb2a5406ed837af3ca6422738a898fcdd0 Mon Sep 17 00:00:00 2001 From: Ihor Diachenko Date: Wed, 11 Feb 2026 18:57:23 +0200 Subject: [PATCH 01/17] Initial implementation --- Cargo.lock | 4 ++ libs/gl-client/src/persist.rs | 81 +++++++++++++++++++++++++++- libs/gl-client/src/signer/mod.rs | 23 +++++--- libs/gl-plugin/src/node/mod.rs | 51 ++++++++++++++---- libs/proto/glclient/greenlight.proto | 5 +- 5 files changed, 143 insertions(+), 21 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 50d8abb3a..0114fee6a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1578,6 +1578,10 @@ dependencies = [ "which", ] +[[package]] +name = "gl-signerutil" +version = "0.1.0" + [[package]] name = "gl-util" version = "0.1.0" diff --git a/libs/gl-client/src/persist.rs b/libs/gl-client/src/persist.rs index 169470bde..12d2510e4 100644 --- a/libs/gl-client/src/persist.rs +++ b/libs/gl-client/src/persist.rs @@ -205,6 +205,14 @@ impl State { } } + pub fn len(&self) -> usize { + self.values.len() + } + + pub fn values_size_bytes(&self) -> usize { + self.values.values().map(|v: &(u64, serde_json::Value)| serde_json::to_vec(&v.1).unwrap().len()).sum() + } + /// Take another `State` and attempt to update ourselves with any /// entry that is newer than ours. This may fail if the other /// state includes states that are older than our own. @@ -257,6 +265,75 @@ impl State { }) .collect()) } + + /// Return a `State` containing only entries that are newer in `other` than in `self`. + /// This is useful for sending compact state diffs. + pub fn diff_state(&self, other: &State) -> State { + let mut values = BTreeMap::new(); + for (key, (newver, newval)) in other.values.iter() { + match self.values.get(key) { + None => { + values.insert(key.clone(), (*newver, newval.clone())); + } + Some((oldver, _)) if oldver < newver => { + values.insert(key.clone(), (*newver, newval.clone())); + } + _ => {} + } + } + State { values } + } + + pub fn sketch(&self) -> StateSketch { + StateSketch::from_state(self) + } +} + +#[derive(Clone, Serialize, Deserialize, Debug, Default)] +pub struct StateSketch { + versions: BTreeMap, +} + +impl StateSketch { + pub fn new() -> Self { + Self::default() + } + + pub fn from_state(state: &State) -> Self { + let mut sketch = Self::new(); + sketch.apply_state(state); + sketch + } + + /// Apply versions from `state` without clearing existing entries. + pub fn apply_state(&mut self, state: &State) { + for (key, (ver, _)) in state.values.iter() { + self.versions.insert(key.clone(), *ver); + } + } + + /// Replace all versions with those from `state`. + pub fn reset_from_state(&mut self, state: &State) { + self.versions.clear(); + self.apply_state(state); + } + + /// Build a `State` containing entries newer than those recorded in the sketch. + pub fn diff_state(&self, state: &State) -> State { + let mut values = BTreeMap::new(); + for (key, (newver, newval)) in state.values.iter() { + match self.versions.get(key) { + None => { + values.insert(key.clone(), (*newver, newval.clone())); + } + Some(oldver) if oldver < newver => { + values.insert(key.clone(), (*newver, newval.clone())); + } + _ => {} + } + } + State { values } + } } impl Into> for State { @@ -476,7 +553,7 @@ impl Persist for MemoryPersister { let state = self.state.lock().unwrap(); let key = hex::encode(node_id.serialize()); let key = format!("{ALLOWLIST_PREFIX}/{key}"); - + // If allowlist doesn't exist (e.g., node created before VLS 0.14), default to empty let allowlist: Vec = match state.values.get(&key) { Some(value) => serde_json::from_value(value.1.clone()).unwrap_or_default(), @@ -521,7 +598,7 @@ impl Persist for MemoryPersister { use lightning_signer::node::Allowable; let network = lightning_signer::bitcoin::Network::from_str(&node.network) .map_err(|e| Error::Internal(format!("Invalid network: {}", e)))?; - + let allowlist: Vec = allowlist_strings .into_iter() .filter_map(|s| { diff --git a/libs/gl-client/src/signer/mod.rs b/libs/gl-client/src/signer/mod.rs index daa858e01..d0cf9f3bf 100644 --- a/libs/gl-client/src/signer/mod.rs +++ b/libs/gl-client/src/signer/mod.rs @@ -487,7 +487,7 @@ impl Signer { debug!("Processing request {:?}", req); let diff: crate::persist::State = req.signer_state.clone().into(); - let prestate = { + let (prestate_sketch, prestate_log) = { debug!("Updating local signer state with state from node"); let mut state = self.state.lock().map_err(|e| { Error::Other(anyhow!("Failed to acquire state lock: {:?}", e)) @@ -496,7 +496,9 @@ impl Signer { Error::Other(anyhow!("Failed to merge signer state: {:?}", e)) })?; trace!("Processing request {}", hex::encode(&req.raw)); - state.clone() + let log_state = serde_json::to_string(&*state) + .unwrap_or_else(|_| "".to_string()); + (state.sketch(), log_state) }; // The first two bytes represent the message type. Check that @@ -527,10 +529,7 @@ impl Signer { let msg = vls_protocol::msgs::from_vec(req.raw.clone()).map_err(|e| Error::Protocol(e))?; log::debug!("Handling message {:?}", msg); - log::trace!( - "Signer state {}", - serde_json::to_string(&prestate).unwrap_or_else(|_| "".to_string()) - ); + log::trace!("Signer state {}", prestate_log); if let Err(e) = self.authenticate_request(&msg, &ctxrequests) { report::Reporter::report(crate::pb::scheduler::SignerRejection { @@ -628,7 +627,17 @@ impl Signer { let state = self.state.lock().map_err(|e| { Error::Other(anyhow!("Failed to acquire state lock for serialization: {:?}", e)) })?; - state.clone().into() + let diff_state = prestate_sketch.diff_state(&state); + let diff_entries: Vec = diff_state.into(); + let diff_value_bytes: usize = diff_entries.iter().map(|e| e.value.len()).sum(); + let diff_key_bytes: usize = diff_entries.iter().map(|e| e.key.len()).sum(); + trace!( + "Signer state diff entries={}, value_bytes={}, key_bytes={}", + diff_entries.len(), + diff_value_bytes, + diff_key_bytes + ); + diff_entries }; Ok(HsmResponse { raw: response.as_vec(), diff --git a/libs/gl-plugin/src/node/mod.rs b/libs/gl-plugin/src/node/mod.rs index 2fa131714..aa5e942be 100644 --- a/libs/gl-plugin/src/node/mod.rs +++ b/libs/gl-plugin/src/node/mod.rs @@ -7,7 +7,7 @@ use anyhow::{Context, Error, Result}; use base64::{engine::general_purpose, Engine as _}; use bytes::BufMut; use cln_rpc::Notification; -use gl_client::persist::State; +use gl_client::persist::{State, StateSketch}; use governor::{ clock::MonotonicClock, state::direct::NotKeyed, state::InMemoryState, Quota, RateLimiter, }; @@ -328,8 +328,11 @@ impl Node for PluginNodeServer { tokio::spawn(async move { trace!("hsmd hsm_id={} request processor started", hsm_id); + let mut last_sent_sketch = StateSketch::new(); { + // TODO: Check this edge case + // We start by immediately injecting a // vls_protocol::Message::GetHeartbeat. This serves two // purposes: already send the initial snapshot of the @@ -339,9 +342,10 @@ impl Node for PluginNodeServer { // presumably time-critical messages, do not have to carry // the large state with them. - let state = signer_state.lock().await.clone(); - let state: Vec = state.into(); - let state: Vec = state + let state_snapshot = signer_state.lock().await.clone(); + let state_entries: Vec = + state_snapshot.clone().into(); + let state_entries: Vec = state_entries .into_iter() .map(|s| pb::SignerStateEntry { key: s.key, @@ -349,14 +353,22 @@ impl Node for PluginNodeServer { value: s.value, }) .collect(); - + let state_value_bytes: usize = state_entries.iter().map(|e| e.value.len()).sum(); + let state_key_bytes: usize = state_entries.iter().map(|e| e.key.len()).sum(); + trace!( + "Signer state heartbeat to hsm_id={} entries={}, value_bytes={}, key_bytes={}", + hsm_id, + state_entries.len(), + state_value_bytes, + state_key_bytes + ); let msg = vls_protocol::msgs::GetHeartbeat {}; use vls_protocol::msgs::SerBolt; let req = crate::pb::HsmRequest { // Notice that the request_counter starts at 1000, to // avoid collisions. request_id: 0, - signer_state: state, + signer_state: state_entries, raw: msg.as_vec(), requests: vec![], // No pending requests yet, nothing to authorize. context: None, @@ -364,6 +376,8 @@ impl Node for PluginNodeServer { if let Err(e) = tx.send(Ok(req)).await { log::warn!("Failed to send heartbeat message to signer: {}", e); + } else { + last_sent_sketch.reset_from_state(&state_snapshot); } } @@ -384,11 +398,14 @@ impl Node for PluginNodeServer { hsm_id ); - let state = signer_state.lock().await.clone(); - let state: Vec = state.into(); + + // Send only the changes since the last time we sent state to this signer. + let state_snapshot = signer_state.lock().await.clone(); + let diff_state = last_sent_sketch.diff_state(&state_snapshot); + let diff_entries: Vec = diff_state.clone().into(); // TODO Consolidate protos in `gl-client` and `gl-plugin`, then remove this map. - let state: Vec = state + let diff_entries: Vec = diff_entries .into_iter() .map(|s| pb::SignerStateEntry { key: s.key, @@ -397,7 +414,18 @@ impl Node for PluginNodeServer { }) .collect(); - req.request.signer_state = state.into(); + let diff_value_bytes: usize = diff_entries.iter().map(|e| e.value.len()).sum(); + let diff_key_bytes: usize = diff_entries.iter().map(|e| e.key.len()).sum(); + trace!( + "Signer state diff to hsm_id={} request_id={} entries={}, value_bytes={}, key_bytes={}", + hsm_id, + req.request.request_id, + diff_entries.len(), + diff_value_bytes, + diff_key_bytes + ); + + req.request.signer_state = diff_entries; req.request.requests = ctx.snapshot().await.into_iter().map(|r| r.into()).collect(); let serialized_configure_request = SERIALIZED_CONFIGURE_REQUEST.lock().await; @@ -424,6 +452,7 @@ impl Node for PluginNodeServer { warn!("Error streaming request {:?} to hsm_id={}", e, hsm_id); break; } + last_sent_sketch.apply_state(&diff_state); } info!("Signer hsm_id={} exited", hsm_id); SIGNER_COUNT.fetch_sub(1, Ordering::SeqCst); @@ -446,6 +475,8 @@ impl Node for PluginNodeServer { } eprintln!("WIRE: signer -> plugin: {:?}", req); + // Merge diff entries returned by signer. + // Create a state from the key-value-version tuples. Need to // convert here, since `pb` is duplicated in the two different // crates. diff --git a/libs/proto/glclient/greenlight.proto b/libs/proto/glclient/greenlight.proto index 63dd182c0..11a524551 100644 --- a/libs/proto/glclient/greenlight.proto +++ b/libs/proto/glclient/greenlight.proto @@ -75,6 +75,7 @@ message HsmResponse { // A list of updated key-value-version tuples that is to be // merged into the state tracked by the plugin. repeated SignerStateEntry signer_state = 5; + // TODO: make it diff-aware? // If the signer reported an error, and did therefore not include // `raw`, this is the stringified error, so we can print it in the @@ -179,8 +180,8 @@ message NodeConfig { // The `GlConfig` is used to pass greenlight-specific startup parameters -// to the node. The `gl-plugin` will look for a serialized config object in -// the node's datastore to load these values from. Please refer to the +// to the node. The `gl-plugin` will look for a serialized config object in +// the node's datastore to load these values from. Please refer to the // individual fields to learn what they do. message GlConfig { string close_to_addr = 1; From 7c9c5e7597e6bb4327bc69874e87201fe406c73a Mon Sep 17 00:00:00 2001 From: Ihor Diachenko Date: Wed, 11 Feb 2026 18:59:05 +0200 Subject: [PATCH 02/17] Clean up --- libs/gl-client/src/persist.rs | 4 ---- 1 file changed, 4 deletions(-) diff --git a/libs/gl-client/src/persist.rs b/libs/gl-client/src/persist.rs index 12d2510e4..e28a2a560 100644 --- a/libs/gl-client/src/persist.rs +++ b/libs/gl-client/src/persist.rs @@ -209,10 +209,6 @@ impl State { self.values.len() } - pub fn values_size_bytes(&self) -> usize { - self.values.values().map(|v: &(u64, serde_json::Value)| serde_json::to_vec(&v.1).unwrap().len()).sum() - } - /// Take another `State` and attempt to update ourselves with any /// entry that is newer than ours. This may fail if the other /// state includes states that are older than our own. From 222ca16884944dfcef8183971ab5e88d6fef334f Mon Sep 17 00:00:00 2001 From: Ihor Diachenko Date: Wed, 11 Feb 2026 19:17:14 +0200 Subject: [PATCH 03/17] Added unit tests --- Cargo.lock | 4 -- libs/gl-client/src/persist.rs | 115 +++++++++++++++++++++++++++++++-- libs/gl-plugin/src/node/mod.rs | 2 +- 3 files changed, 110 insertions(+), 11 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 0114fee6a..50d8abb3a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1578,10 +1578,6 @@ dependencies = [ "which", ] -[[package]] -name = "gl-signerutil" -version = "0.1.0" - [[package]] name = "gl-util" version = "0.1.0" diff --git a/libs/gl-client/src/persist.rs b/libs/gl-client/src/persist.rs index e28a2a560..20bf8ee2e 100644 --- a/libs/gl-client/src/persist.rs +++ b/libs/gl-client/src/persist.rs @@ -308,12 +308,6 @@ impl StateSketch { } } - /// Replace all versions with those from `state`. - pub fn reset_from_state(&mut self, state: &State) { - self.versions.clear(); - self.apply_state(state); - } - /// Build a `State` containing entries newer than those recorded in the sketch. pub fn diff_state(&self, state: &State) -> State { let mut values = BTreeMap::new(); @@ -645,3 +639,112 @@ impl Persist for MemoryPersister { [0u8; 16] } } + +#[cfg(test)] +mod tests { + use super::{State, StateSketch}; + use serde_json::json; + use std::collections::BTreeMap; + + fn mk_state(entries: Vec<(&str, u64, serde_json::Value)>) -> State { + let mut values = BTreeMap::new(); + for (key, version, value) in entries { + values.insert(key.to_string(), (version, value)); + } + State { values } + } + + #[test] + fn diff_state_only_includes_new_or_newer_entries() { + let old = mk_state(vec![ + ("k1", 1, json!({"v": 1})), + ("k2", 2, json!({"v": 2})), + ("k3", 3, json!({"v": 3})), + ]); + let new = mk_state(vec![ + // unchanged version, changed value should still be ignored + ("k1", 1, json!({"v": 999})), + // newer version should be included + ("k2", 3, json!({"v": 22})), + // older version should be ignored + ("k3", 2, json!({"v": 33})), + // brand new key should be included + ("k4", 0, json!({"v": 4})), + ]); + + let diff = old.diff_state(&new); + + assert_eq!(diff.values.len(), 2); + assert_eq!(diff.values.get("k2").unwrap().0, 3); + assert_eq!(diff.values.get("k4").unwrap().0, 0); + assert!(diff.values.get("k1").is_none()); + assert!(diff.values.get("k3").is_none()); + } + + #[test] + fn sate_diff_with_empty_old_state_includes_all_entries() { + let old = State::new(); + let new = mk_state(vec![ + ("k1", 1, json!({"v": 1})), + ("k2", 2, json!({"v": 2})), + ]); + + let diff = old.diff_state(&new); + + assert_eq!(diff.values.len(), 2); + assert_eq!(diff.values.get("k1").unwrap().0, 1); + assert_eq!(diff.values.get("k2").unwrap().0, 2); + } + + #[test] + fn sketch_diff_matches_state_diff() { + let old = mk_state(vec![ + ("a", 5, json!(1)), + ("b", 2, json!(2)), + ("c", 7, json!(3)), + ]); + let new = mk_state(vec![ + ("a", 5, json!(10)), + ("b", 3, json!(20)), + ("c", 6, json!(30)), + ("d", 1, json!(40)), + ]); + + let state_diff = old.diff_state(&new); + let sketch_diff = old.sketch().diff_state(&new); + + assert_eq!(state_diff.values, sketch_diff.values); + } + + #[test] + fn sketch_diff_with_empty_sketch_includes_all_entries() { + let state = mk_state(vec![ + ("a", 1, json!(1)), + ("b", 2, json!(2)), + ]); + let sketch = StateSketch::new(); + + let diff = sketch.diff_state(&state); + + assert_eq!(diff.values.len(), 2); + assert_eq!(diff.values.get("a").unwrap().0, 1); + assert_eq!(diff.values.get("b").unwrap().0, 2); + } + + #[test] + fn sketch_apply_follow_version_updates() { + let base = mk_state(vec![("a", 1, json!(1)), ("b", 2, json!(2))]); + let mut sketch = StateSketch::new(); + sketch.apply_state(&base); + + let next = mk_state(vec![("a", 2, json!(10)), ("b", 2, json!(20)), ("c", 0, json!(30))]); + let first_diff = sketch.diff_state(&next); + assert_eq!(first_diff.values.len(), 2); + assert!(first_diff.values.get("a").is_some()); + assert!(first_diff.values.get("c").is_some()); + + sketch.apply_state(&first_diff); + let second_diff = sketch.diff_state(&next); + assert!(second_diff.values.is_empty()); + } +} diff --git a/libs/gl-plugin/src/node/mod.rs b/libs/gl-plugin/src/node/mod.rs index aa5e942be..b3899d73d 100644 --- a/libs/gl-plugin/src/node/mod.rs +++ b/libs/gl-plugin/src/node/mod.rs @@ -377,7 +377,7 @@ impl Node for PluginNodeServer { if let Err(e) = tx.send(Ok(req)).await { log::warn!("Failed to send heartbeat message to signer: {}", e); } else { - last_sent_sketch.reset_from_state(&state_snapshot); + last_sent_sketch = state_snapshot.sketch(); } } From ec0d695f412e70b33a54afa548343e2ba84f29be Mon Sep 17 00:00:00 2001 From: Ihor Diachenko Date: Fri, 13 Feb 2026 01:26:11 +0200 Subject: [PATCH 04/17] Added tombstone logic --- libs/gl-client/src/persist.rs | 285 ++++++++++++++++++++++++++++++---- 1 file changed, 257 insertions(+), 28 deletions(-) diff --git a/libs/gl-client/src/persist.rs b/libs/gl-client/src/persist.rs index 20bf8ee2e..e029fdb94 100644 --- a/libs/gl-client/src/persist.rs +++ b/libs/gl-client/src/persist.rs @@ -20,6 +20,7 @@ const NODE_STATE_PREFIX: &str = "nodestates"; const CHANNEL_PREFIX: &str = "channels"; const ALLOWLIST_PREFIX: &str = "allowlists"; const TRACKER_PREFIX: &str = "trackers"; +const TOMBSTONE_PREFIX: &str = "tombs"; #[derive(Clone, Serialize, Deserialize)] pub struct State { @@ -27,6 +28,14 @@ pub struct State { } impl State { + fn put_tombstone(&mut self, live_key: &str) { + let tombstone_key = Self::tombstone_key(live_key); + let version = self.next_version_for_live_key(live_key); + self.values + .insert(tombstone_key, (version, serde_json::Value::Null)); + self.values.remove(live_key); + } + fn insert_node( &mut self, key: &str, @@ -35,17 +44,14 @@ impl State { ) -> Result<(), Error> { let node_key = format!("{NODE_PREFIX}/{key}"); let state_key = format!("{NODE_STATE_PREFIX}/{key}"); - assert!(!self.values.contains_key(&node_key), "inserting node twice"); - assert!( - !self.values.contains_key(&state_key), - "inserting node_state twice" - ); + let node_version = self.next_version_for_live_key(&node_key); + let state_version = self.next_version_for_live_key(&state_key); self.values - .insert(node_key, (0u64, serde_json::to_value(node_entry).unwrap())); + .insert(node_key, (node_version, serde_json::to_value(node_entry).unwrap())); self.values.insert( state_key, - (0u64, serde_json::to_value(node_state_entry).unwrap()), + (state_version, serde_json::to_value(node_state_entry).unwrap()), ); Ok(()) } @@ -71,9 +77,23 @@ impl State { fn delete_node(&mut self, key: &str) -> Result<(), Error> { let node_key = format!("{NODE_PREFIX}/{key}"); let state_key = format!("{NODE_STATE_PREFIX}/{key}"); + let allowlist_key = format!("{ALLOWLIST_PREFIX}/{key}"); + let tracker_key = format!("{TRACKER_PREFIX}/{key}"); + let channel_prefix = format!("{CHANNEL_PREFIX}/{key}"); + let channel_keys: Vec = self + .values + .keys() + .filter(|k| k.starts_with(&channel_prefix)) + .cloned() + .collect(); - self.values.remove(&node_key); - self.values.remove(&state_key); + self.put_tombstone(&node_key); + self.put_tombstone(&state_key); + self.put_tombstone(&allowlist_key); + self.put_tombstone(&tracker_key); + for channel_key in channel_keys { + self.put_tombstone(&channel_key); + } Ok(()) } @@ -83,14 +103,15 @@ impl State { channel_entry: vls_persist::model::ChannelEntry, ) -> Result<(), Error> { let key = format!("{CHANNEL_PREFIX}/{key}"); - assert!(!self.values.contains_key(&key)); + let version = self.next_version_for_live_key(&key); self.values - .insert(key, (0u64, serde_json::to_value(channel_entry).unwrap())); + .insert(key, (version, serde_json::to_value(channel_entry).unwrap())); Ok(()) } fn delete_channel(&mut self, key: &str) { - self.values.remove(key); + let live_key = format!("{CHANNEL_PREFIX}/{key}"); + self.put_tombstone(&live_key); } fn update_channel( @@ -113,7 +134,13 @@ impl State { key: &str, ) -> Result { let key = format!("{CHANNEL_PREFIX}/{key}"); - let value = self.values.get(&key).unwrap(); + if self.is_tombstoned(&key) { + return Err(Error::Internal(format!("channel {} has been deleted", key))); + } + let value = self + .values + .get(&key) + .ok_or_else(|| Error::Internal(format!("missing channel state for key {}", key)))?; let entry: vls_persist::model::ChannelEntry = serde_json::from_value(value.1.clone()).unwrap(); Ok(entry.into()) @@ -135,6 +162,7 @@ impl State { .values .iter() .filter(|(k, _)| k.starts_with(&prefix)) + .filter(|(k, _)| !self.is_tombstoned(k)) .map(|(k, v)| { let key = k.split('/').last().unwrap(); let key = vls_persist::model::NodeChannelId(hex::decode(&key).unwrap()); @@ -153,12 +181,12 @@ impl State { ) -> Result<(), Error> { let key = hex::encode(node_id.serialize()); let key = format!("{TRACKER_PREFIX}/{key}"); - assert!(!self.values.contains_key(&key)); + let version = self.next_version_for_live_key(&key); let tracker: vls_persist::model::ChainTrackerEntry = tracker.into(); self.values - .insert(key, (0u64, serde_json::to_value(tracker).unwrap())); + .insert(key, (version, serde_json::to_value(tracker).unwrap())); Ok(()) } @@ -166,6 +194,47 @@ impl State { self.values.clear(); Ok(()) } + + fn tombstone_key(live_key: &str) -> String { + format!("{TOMBSTONE_PREFIX}/{live_key}") + } + + fn tombstone_target_key(key: &str) -> Option<&str> { + key.strip_prefix(TOMBSTONE_PREFIX) + .and_then(|rest| rest.strip_prefix('/')) + } + + fn is_tombstone_key(key: &str) -> bool { + Self::tombstone_target_key(key).is_some() + } + + fn tombstone_version_for_live_key(&self, live_key: &str) -> Option { + let tombstone_key = Self::tombstone_key(live_key); + self.values.get(&tombstone_key).map(|v| v.0) + } + + fn next_version_for_live_key(&self, live_key: &str) -> u64 { + let live_ver = self.values.get(live_key).map(|v| v.0); + let tombstone_ver = self.tombstone_version_for_live_key(live_key); + live_ver + .into_iter() + .chain(tombstone_ver) + .max() + .map(|v| v.saturating_add(1)) + .unwrap_or(0) + } + + fn is_tombstoned(&self, live_key: &str) -> bool { + let live_ver = self.values.get(live_key).map(|v| v.0); + let tombstone_ver = self.tombstone_version_for_live_key(live_key); + + match (live_ver, tombstone_ver) { + (_, None) => false, + (None, Some(_)) => true, + (Some(live), Some(tomb)) => tomb >= live, + } + } + } #[derive(Debug)] @@ -215,6 +284,53 @@ impl State { pub fn merge(&mut self, other: &State) -> anyhow::Result, u64)>> { let mut res = Vec::new(); for (key, (newver, newval)) in other.values.iter() { + if Self::is_tombstone_key(key) { + let local = self.values.get_mut(key); + match local { + None => { + trace!("Insert new tombstone key {}: version={}", key, newver); + res.push((key.to_owned(), None, *newver)); + self.values.insert(key.clone(), (*newver, newval.clone())); + } + Some(v) => { + if v.0 == *newver { + // Idempotent re-application, but still enforce live-key cleanup below. + } else if v.0 > *newver { + warn!("Ignoring outdated tombstone version newver={}, we have oldver={}: newval={:?} vs oldval={:?}", newver, v.0, serde_json::to_string(newval), serde_json::to_string(&v.1)); + // Keep the newer local tombstone and still enforce live-key cleanup below. + } else { + trace!( + "Updating tombstone key {}: version={} => version={}", + key, + v.0, + *newver + ); + res.push((key.to_owned(), Some(v.0), *newver)); + *v = (*newver, newval.clone()); + } + } + } + + if let Some(live_key) = Self::tombstone_target_key(key) { + if self.is_tombstoned(live_key) { + self.values.remove(live_key); + } + } + continue; + } + + if self + .tombstone_version_for_live_key(key) + .map(|tomb| tomb >= *newver) + .unwrap_or(false) + { + trace!( + "Ignoring live key {} version={} because a tombstone is newer or equal", + key, newver + ); + continue; + } + let local = self.values.get_mut(key); match local { @@ -504,8 +620,19 @@ impl Persist for MemoryPersister { let key = format!("{TRACKER_PREFIX}/{key}"); let state = self.state.lock().unwrap(); + if state.is_tombstoned(&key) { + return Err(Error::Internal(format!("tracker {} has been deleted", key))); + } let v: vls_persist::model::ChainTrackerEntry = - serde_json::from_value(state.values.get(&key).unwrap().1.clone()).unwrap(); + serde_json::from_value( + state + .values + .get(&key) + .ok_or_else(|| Error::Internal(format!("missing tracker state {}", key)))? + .1 + .clone(), + ) + .unwrap(); Ok(v.into_tracker(node_id, validator_factory)) } @@ -531,9 +658,10 @@ impl Persist for MemoryPersister { *v = (v.0 + 1, serde_json::to_value(allowlist).unwrap()); } None => { + let version = state.next_version_for_live_key(&key); state .values - .insert(key, (0u64, serde_json::to_value(allowlist).unwrap())); + .insert(key, (version, serde_json::to_value(allowlist).unwrap())); } } Ok(()) @@ -543,6 +671,9 @@ impl Persist for MemoryPersister { let state = self.state.lock().unwrap(); let key = hex::encode(node_id.serialize()); let key = format!("{ALLOWLIST_PREFIX}/{key}"); + if state.is_tombstoned(&key) { + return Ok(Vec::new()); + } // If allowlist doesn't exist (e.g., node created before VLS 0.14), default to empty let allowlist: Vec = match state.values.get(&key) { @@ -562,9 +693,9 @@ impl Persist for MemoryPersister { let node_ids: Vec<&str> = state .values .keys() - .map(|k| k.split('/')) - .filter(|k| k.clone().next().unwrap() == NODE_PREFIX) - .map(|k| k.clone().last().unwrap()) + .filter(|k| k.starts_with(&format!("{NODE_PREFIX}/"))) + .filter(|k| !state.is_tombstoned(k)) + .filter_map(|k| k.split('/').last()) .collect(); let mut res = Vec::new(); @@ -573,15 +704,27 @@ impl Persist for MemoryPersister { let state_key = format!("{NODE_STATE_PREFIX}/{node_id}"); let allowlist_key = format!("{ALLOWLIST_PREFIX}/{node_id}"); - let node: vls_persist::model::NodeEntry = - serde_json::from_value(state.values.get(&node_key).unwrap().1.clone()).unwrap(); - let state_e: vls_persist::model::NodeStateEntry = - serde_json::from_value(state.values.get(&state_key).unwrap().1.clone()).unwrap(); + if state.is_tombstoned(&node_key) || state.is_tombstoned(&state_key) { + continue; + } + + let node: vls_persist::model::NodeEntry = match state.values.get(&node_key) { + Some(value) => serde_json::from_value(value.1.clone()).unwrap(), + None => continue, + }; + let state_e: vls_persist::model::NodeStateEntry = match state.values.get(&state_key) { + Some(value) => serde_json::from_value(value.1.clone()).unwrap(), + None => continue, + }; // Load allowlist, defaulting to empty if not found (for nodes created before VLS 0.14) - let allowlist_strings: Vec = match state.values.get(&allowlist_key) { - Some(value) => serde_json::from_value(value.1.clone()).unwrap_or_default(), - None => Vec::new(), + let allowlist_strings: Vec = if state.is_tombstoned(&allowlist_key) { + Vec::new() + } else { + match state.values.get(&allowlist_key) { + Some(value) => serde_json::from_value(value.1.clone()).unwrap_or_default(), + None => Vec::new(), + } }; // Parse allowlist strings into Allowable objects @@ -642,7 +785,10 @@ impl Persist for MemoryPersister { #[cfg(test)] mod tests { - use super::{State, StateSketch}; + use super::{ + State, StateSketch, ALLOWLIST_PREFIX, CHANNEL_PREFIX, NODE_PREFIX, NODE_STATE_PREFIX, + TOMBSTONE_PREFIX, TRACKER_PREFIX, + }; use serde_json::json; use std::collections::BTreeMap; @@ -747,4 +893,87 @@ mod tests { let second_diff = sketch.diff_state(&next); assert!(second_diff.values.is_empty()); } + + #[test] + fn merge_tombstone_deletes_older_live_entry() { + let live_key = format!("{CHANNEL_PREFIX}/abc"); + let tombstone_key = format!("{TOMBSTONE_PREFIX}/{live_key}"); + let mut state = mk_state(vec![(live_key.as_str(), 2, json!({"v": 1}))]); + let incoming = mk_state(vec![(tombstone_key.as_str(), 3, serde_json::Value::Null)]); + + state.merge(&incoming).unwrap(); + + assert!(state.values.get(&live_key).is_none()); + assert_eq!(state.values.get(&tombstone_key).unwrap().0, 3); + } + + #[test] + fn merge_ignores_live_entry_if_tombstone_is_newer_or_equal() { + let live_key = format!("{CHANNEL_PREFIX}/abc"); + let tombstone_key = format!("{TOMBSTONE_PREFIX}/{live_key}"); + let mut state = mk_state(vec![(tombstone_key.as_str(), 5, serde_json::Value::Null)]); + let incoming = mk_state(vec![(live_key.as_str(), 4, json!({"v": 1}))]); + + state.merge(&incoming).unwrap(); + + assert!(state.values.get(&live_key).is_none()); + assert_eq!(state.values.get(&tombstone_key).unwrap().0, 5); + } + + #[test] + fn merge_accepts_live_entry_if_newer_than_tombstone() { + let live_key = format!("{CHANNEL_PREFIX}/abc"); + let tombstone_key = format!("{TOMBSTONE_PREFIX}/{live_key}"); + let mut state = mk_state(vec![(tombstone_key.as_str(), 5, serde_json::Value::Null)]); + let incoming = mk_state(vec![(live_key.as_str(), 6, json!({"v": 2}))]); + + state.merge(&incoming).unwrap(); + + assert_eq!(state.values.get(&live_key).unwrap().0, 6); + assert_eq!(state.values.get(&tombstone_key).unwrap().0, 5); + } + + #[test] + fn delete_channel_creates_tombstone_and_bumps_version() { + let live_key = format!("{CHANNEL_PREFIX}/abc"); + let tombstone_key = format!("{TOMBSTONE_PREFIX}/{live_key}"); + let mut state = mk_state(vec![(live_key.as_str(), 7, json!({"v": 1}))]); + + state.delete_channel("abc"); + + assert!(state.values.get(&live_key).is_none()); + assert_eq!(state.values.get(&tombstone_key).unwrap().0, 8); + } + + #[test] + fn delete_node_creates_tombstones_for_node_related_keys() { + let node_id = "deadbeef"; + let node_key = format!("{NODE_PREFIX}/{node_id}"); + let node_state_key = format!("{NODE_STATE_PREFIX}/{node_id}"); + let allowlist_key = format!("{ALLOWLIST_PREFIX}/{node_id}"); + let tracker_key = format!("{TRACKER_PREFIX}/{node_id}"); + let channel_key = format!("{CHANNEL_PREFIX}/{node_id}cafebabe"); + + let mut state = mk_state(vec![ + (node_key.as_str(), 1, json!({"n": 1})), + (node_state_key.as_str(), 2, json!({"s": 1})), + (allowlist_key.as_str(), 3, json!(["127.0.0.1"])), + (tracker_key.as_str(), 4, json!({"t": 1})), + (channel_key.as_str(), 5, json!({"c": 1})), + ]); + + state.delete_node(node_id).unwrap(); + + for (live_key, old_version) in vec![ + (node_key, 1u64), + (node_state_key, 2u64), + (allowlist_key, 3u64), + (tracker_key, 4u64), + (channel_key, 5u64), + ] { + assert!(state.values.get(&live_key).is_none()); + let tombstone_key = format!("{TOMBSTONE_PREFIX}/{live_key}"); + assert_eq!(state.values.get(&tombstone_key).unwrap().0, old_version + 1); + } + } } From 16d41fe5375ce4660eb8e1d6c44650011eeeccdc Mon Sep 17 00:00:00 2001 From: Ihor Diachenko Date: Fri, 13 Feb 2026 01:35:21 +0200 Subject: [PATCH 05/17] Added full re-sync on conflict detection --- libs/gl-client/src/persist.rs | 59 ++++++++++++++++++++----- libs/gl-client/src/signer/mod.rs | 47 ++++++++++++++------ libs/gl-plugin/src/node/mod.rs | 76 ++++++++++++++++++++++++-------- 3 files changed, 138 insertions(+), 44 deletions(-) diff --git a/libs/gl-client/src/persist.rs b/libs/gl-client/src/persist.rs index e029fdb94..155ec69be 100644 --- a/libs/gl-client/src/persist.rs +++ b/libs/gl-client/src/persist.rs @@ -244,6 +244,18 @@ pub struct StateChange { new: (u64, serde_json::Value), } +#[derive(Debug, Default)] +pub struct SafeMergeResult { + pub changes: Vec<(String, Option, u64)>, + pub conflict_count: usize, +} + +impl SafeMergeResult { + pub fn has_conflicts(&self) -> bool { + self.conflict_count > 0 + } +} + use core::fmt::Display; impl Display for StateChange { @@ -278,18 +290,19 @@ impl State { self.values.len() } - /// Take another `State` and attempt to update ourselves with any - /// entry that is newer than ours. This may fail if the other - /// state includes states that are older than our own. - pub fn merge(&mut self, other: &State) -> anyhow::Result, u64)>> { - let mut res = Vec::new(); + /// Merge incoming state and track potential version conflicts. + /// + /// A conflict means the incoming state is stale or incompatible with local + /// tombstone knowledge. Callers may use this signal to trigger a full sync. + pub fn safe_merge(&mut self, other: &State) -> anyhow::Result { + let mut res = SafeMergeResult::default(); for (key, (newver, newval)) in other.values.iter() { if Self::is_tombstone_key(key) { let local = self.values.get_mut(key); match local { None => { trace!("Insert new tombstone key {}: version={}", key, newver); - res.push((key.to_owned(), None, *newver)); + res.changes.push((key.to_owned(), None, *newver)); self.values.insert(key.clone(), (*newver, newval.clone())); } Some(v) => { @@ -298,6 +311,7 @@ impl State { } else if v.0 > *newver { warn!("Ignoring outdated tombstone version newver={}, we have oldver={}: newval={:?} vs oldval={:?}", newver, v.0, serde_json::to_string(newval), serde_json::to_string(&v.1)); // Keep the newer local tombstone and still enforce live-key cleanup below. + res.conflict_count += 1; } else { trace!( "Updating tombstone key {}: version={} => version={}", @@ -305,7 +319,7 @@ impl State { v.0, *newver ); - res.push((key.to_owned(), Some(v.0), *newver)); + res.changes.push((key.to_owned(), Some(v.0), *newver)); *v = (*newver, newval.clone()); } } @@ -328,6 +342,7 @@ impl State { "Ignoring live key {} version={} because a tombstone is newer or equal", key, newver ); + res.conflict_count += 1; continue; } @@ -336,7 +351,7 @@ impl State { match local { None => { trace!("Insert new key {}: version={}", key, newver); - res.push((key.to_owned(), None, *newver)); + res.changes.push((key.to_owned(), None, *newver)); self.values.insert(key.clone(), (*newver, newval.clone())); } Some(v) => { @@ -344,6 +359,7 @@ impl State { continue; } else if v.0 > *newver { warn!("Ignoring outdated state version newver={}, we have oldver={}: newval={:?} vs oldval={:?}", newver, v.0, serde_json::to_string(newval), serde_json::to_string(&v.1)); + res.conflict_count += 1; continue; } else { trace!( @@ -352,7 +368,7 @@ impl State { v.0, *newver ); - res.push((key.to_owned(), Some(v.0), *newver)); + res.changes.push((key.to_owned(), Some(v.0), *newver)); *v = (*newver, newval.clone()); } } @@ -361,6 +377,11 @@ impl State { Ok(res) } + /// Backward-compatible merge API. + pub fn merge(&mut self, other: &State) -> anyhow::Result, u64)>> { + Ok(self.safe_merge(other)?.changes) + } + pub fn diff(&self, other: &State) -> anyhow::Result> { Ok(other .values @@ -901,10 +922,11 @@ mod tests { let mut state = mk_state(vec![(live_key.as_str(), 2, json!({"v": 1}))]); let incoming = mk_state(vec![(tombstone_key.as_str(), 3, serde_json::Value::Null)]); - state.merge(&incoming).unwrap(); + let res = state.safe_merge(&incoming).unwrap(); assert!(state.values.get(&live_key).is_none()); assert_eq!(state.values.get(&tombstone_key).unwrap().0, 3); + assert_eq!(res.conflict_count, 0); } #[test] @@ -914,10 +936,11 @@ mod tests { let mut state = mk_state(vec![(tombstone_key.as_str(), 5, serde_json::Value::Null)]); let incoming = mk_state(vec![(live_key.as_str(), 4, json!({"v": 1}))]); - state.merge(&incoming).unwrap(); + let res = state.safe_merge(&incoming).unwrap(); assert!(state.values.get(&live_key).is_none()); assert_eq!(state.values.get(&tombstone_key).unwrap().0, 5); + assert_eq!(res.conflict_count, 1); } #[test] @@ -927,10 +950,22 @@ mod tests { let mut state = mk_state(vec![(tombstone_key.as_str(), 5, serde_json::Value::Null)]); let incoming = mk_state(vec![(live_key.as_str(), 6, json!({"v": 2}))]); - state.merge(&incoming).unwrap(); + let res = state.safe_merge(&incoming).unwrap(); assert_eq!(state.values.get(&live_key).unwrap().0, 6); assert_eq!(state.values.get(&tombstone_key).unwrap().0, 5); + assert_eq!(res.conflict_count, 0); + } + + #[test] + fn safe_merge_reports_conflict_for_outdated_live_version() { + let mut state = mk_state(vec![("k1", 5, json!({"v": 5}))]); + let incoming = mk_state(vec![("k1", 4, json!({"v": 4}))]); + + let res = state.safe_merge(&incoming).unwrap(); + + assert_eq!(res.conflict_count, 1); + assert_eq!(state.values.get("k1").unwrap().0, 5); } #[test] diff --git a/libs/gl-client/src/signer/mod.rs b/libs/gl-client/src/signer/mod.rs index d0cf9f3bf..85941bb1c 100644 --- a/libs/gl-client/src/signer/mod.rs +++ b/libs/gl-client/src/signer/mod.rs @@ -487,18 +487,24 @@ impl Signer { debug!("Processing request {:?}", req); let diff: crate::persist::State = req.signer_state.clone().into(); - let (prestate_sketch, prestate_log) = { + let (prestate_sketch, prestate_log, force_full_sync) = { debug!("Updating local signer state with state from node"); let mut state = self.state.lock().map_err(|e| { Error::Other(anyhow!("Failed to acquire state lock: {:?}", e)) })?; - state.merge(&diff).map_err(|e| { + let merge_res = state.safe_merge(&diff).map_err(|e| { Error::Other(anyhow!("Failed to merge signer state: {:?}", e)) })?; + if merge_res.has_conflicts() { + warn!( + "State merge conflict detected (count={}), forcing full signer state sync", + merge_res.conflict_count + ); + } trace!("Processing request {}", hex::encode(&req.raw)); let log_state = serde_json::to_string(&*state) .unwrap_or_else(|_| "".to_string()); - (state.sketch(), log_state) + (state.sketch(), log_state, merge_res.has_conflicts()) }; // The first two bytes represent the message type. Check that @@ -627,17 +633,30 @@ impl Signer { let state = self.state.lock().map_err(|e| { Error::Other(anyhow!("Failed to acquire state lock for serialization: {:?}", e)) })?; - let diff_state = prestate_sketch.diff_state(&state); - let diff_entries: Vec = diff_state.into(); - let diff_value_bytes: usize = diff_entries.iter().map(|e| e.value.len()).sum(); - let diff_key_bytes: usize = diff_entries.iter().map(|e| e.key.len()).sum(); - trace!( - "Signer state diff entries={}, value_bytes={}, key_bytes={}", - diff_entries.len(), - diff_value_bytes, - diff_key_bytes - ); - diff_entries + if force_full_sync { + let full_entries: Vec = state.clone().into(); + let full_value_bytes: usize = full_entries.iter().map(|e| e.value.len()).sum(); + let full_key_bytes: usize = full_entries.iter().map(|e| e.key.len()).sum(); + trace!( + "Signer state full sync entries={}, value_bytes={}, key_bytes={}", + full_entries.len(), + full_value_bytes, + full_key_bytes + ); + full_entries + } else { + let diff_state = prestate_sketch.diff_state(&state); + let diff_entries: Vec = diff_state.into(); + let diff_value_bytes: usize = diff_entries.iter().map(|e| e.value.len()).sum(); + let diff_key_bytes: usize = diff_entries.iter().map(|e| e.key.len()).sum(); + trace!( + "Signer state diff entries={}, value_bytes={}, key_bytes={}", + diff_entries.len(), + diff_value_bytes, + diff_key_bytes + ); + diff_entries + } }; Ok(HsmResponse { raw: response.as_vec(), diff --git a/libs/gl-plugin/src/node/mod.rs b/libs/gl-plugin/src/node/mod.rs index b3899d73d..0026c9c98 100644 --- a/libs/gl-plugin/src/node/mod.rs +++ b/libs/gl-plugin/src/node/mod.rs @@ -16,7 +16,7 @@ use log::{debug, error, info, trace, warn}; use std::path::{Path, PathBuf}; use std::sync::atomic::AtomicBool; use std::sync::{ - atomic::{AtomicUsize, Ordering}, + atomic::{AtomicU64, AtomicUsize, Ordering}, Arc, }; use std::time::Duration; @@ -84,6 +84,7 @@ pub struct PluginNodeServer { rpc_path: PathBuf, events: tokio::sync::broadcast::Sender, signer_state: Arc>, + state_sync_epoch: Arc, grpc_binding: String, signer_state_store: Arc>>, pub ctx: crate::context::Context, @@ -128,6 +129,7 @@ impl PluginNodeServer { events, rpc_path: rpc_path.clone(), signer_state: Arc::new(Mutex::new(signer_state)), + state_sync_epoch: Arc::new(AtomicU64::new(0)), signer_state_store: Arc::new(Mutex::new(signer_state_store)), grpc_binding: config.node_grpc_binding, notifications, @@ -324,11 +326,13 @@ impl Node for PluginNodeServer { let (tx, rx) = mpsc::channel(10); let mut stream = self.stage.mystream().await; let signer_state = self.signer_state.clone(); + let state_sync_epoch = self.state_sync_epoch.clone(); let ctx = self.ctx.clone(); tokio::spawn(async move { trace!("hsmd hsm_id={} request processor started", hsm_id); let mut last_sent_sketch = StateSketch::new(); + let mut last_seen_sync_epoch = state_sync_epoch.load(Ordering::SeqCst); { // TODO: Check this edge case @@ -378,6 +382,7 @@ impl Node for PluginNodeServer { log::warn!("Failed to send heartbeat message to signer: {}", e); } else { last_sent_sketch = state_snapshot.sketch(); + last_seen_sync_epoch = state_sync_epoch.load(Ordering::SeqCst); } } @@ -399,13 +404,31 @@ impl Node for PluginNodeServer { ); - // Send only the changes since the last time we sent state to this signer. let state_snapshot = signer_state.lock().await.clone(); - let diff_state = last_sent_sketch.diff_state(&state_snapshot); - let diff_entries: Vec = diff_state.clone().into(); + let current_sync_epoch = state_sync_epoch.load(Ordering::SeqCst); + let force_full_sync = current_sync_epoch != last_seen_sync_epoch; + if force_full_sync { + last_seen_sync_epoch = current_sync_epoch; + } + + let outgoing_entries: Vec = if force_full_sync { + trace!( + "Forcing full signer state sync to hsm_id={} request_id={}", + hsm_id, + req.request.request_id + ); + last_sent_sketch = state_snapshot.sketch(); + state_snapshot.clone().into() + } else { + // Send only the changes since the last time we sent state to this signer. + let diff_state = last_sent_sketch.diff_state(&state_snapshot); + let diff_entries: Vec = diff_state.clone().into(); + last_sent_sketch.apply_state(&diff_state); + diff_entries + }; // TODO Consolidate protos in `gl-client` and `gl-plugin`, then remove this map. - let diff_entries: Vec = diff_entries + let outgoing_entries: Vec = outgoing_entries .into_iter() .map(|s| pb::SignerStateEntry { key: s.key, @@ -414,18 +437,29 @@ impl Node for PluginNodeServer { }) .collect(); - let diff_value_bytes: usize = diff_entries.iter().map(|e| e.value.len()).sum(); - let diff_key_bytes: usize = diff_entries.iter().map(|e| e.key.len()).sum(); - trace!( - "Signer state diff to hsm_id={} request_id={} entries={}, value_bytes={}, key_bytes={}", - hsm_id, - req.request.request_id, - diff_entries.len(), - diff_value_bytes, - diff_key_bytes - ); + let value_bytes: usize = outgoing_entries.iter().map(|e| e.value.len()).sum(); + let key_bytes: usize = outgoing_entries.iter().map(|e| e.key.len()).sum(); + if force_full_sync { + trace!( + "Signer state full sync to hsm_id={} request_id={} entries={}, value_bytes={}, key_bytes={}", + hsm_id, + req.request.request_id, + outgoing_entries.len(), + value_bytes, + key_bytes + ); + } else { + trace!( + "Signer state diff to hsm_id={} request_id={} entries={}, value_bytes={}, key_bytes={}", + hsm_id, + req.request.request_id, + outgoing_entries.len(), + value_bytes, + key_bytes + ); + } - req.request.signer_state = diff_entries; + req.request.signer_state = outgoing_entries; req.request.requests = ctx.snapshot().await.into_iter().map(|r| r.into()).collect(); let serialized_configure_request = SERIALIZED_CONFIGURE_REQUEST.lock().await; @@ -452,7 +486,6 @@ impl Node for PluginNodeServer { warn!("Error streaming request {:?} to hsm_id={}", e, hsm_id); break; } - last_sent_sketch.apply_state(&diff_state); } info!("Signer hsm_id={} exited", hsm_id); SIGNER_COUNT.fetch_sub(1, Ordering::SeqCst); @@ -493,12 +526,19 @@ impl Node for PluginNodeServer { // Apply state changes to the in-memory state let mut state = self.signer_state.lock().await; - state.merge(&new_state).map_err(|e| { + let merge_res = state.safe_merge(&new_state).map_err(|e| { Status::new( Code::Internal, format!("Error updating internal state: {e}"), ) })?; + if merge_res.has_conflicts() { + warn!( + "State merge conflict detected (count={}), forcing next full sync", + merge_res.conflict_count + ); + self.state_sync_epoch.fetch_add(1, Ordering::SeqCst); + } // Send changes to the signer_state_store for persistence let store = self.signer_state_store.lock().await; From f75e43f7e703b0e11b9932c11a3c3eab7a5dd12b Mon Sep 17 00:00:00 2001 From: Ihor Diachenko Date: Mon, 16 Feb 2026 13:34:00 +0200 Subject: [PATCH 06/17] Tracelog savings over the wire --- libs/gl-client/src/lib.rs | 1 + libs/gl-client/src/metrics.rs | 43 ++++++++++++++++++++++++++++++++ libs/gl-client/src/signer/mod.rs | 26 +++++++++++-------- libs/gl-plugin/src/node/mod.rs | 33 ++++++++++++++---------- 4 files changed, 80 insertions(+), 23 deletions(-) create mode 100644 libs/gl-client/src/metrics.rs diff --git a/libs/gl-client/src/lib.rs b/libs/gl-client/src/lib.rs index e17c825c2..80508bed8 100644 --- a/libs/gl-client/src/lib.rs +++ b/libs/gl-client/src/lib.rs @@ -29,6 +29,7 @@ pub mod scheduler; pub mod signer; pub mod persist; +pub mod metrics; pub mod lnurl; diff --git a/libs/gl-client/src/metrics.rs b/libs/gl-client/src/metrics.rs new file mode 100644 index 000000000..453db8cfa --- /dev/null +++ b/libs/gl-client/src/metrics.rs @@ -0,0 +1,43 @@ +use prost::Message; + +const HSM_REQUEST_SIGNER_STATE_FIELD_NUMBER: u32 = 4; +const HSM_RESPONSE_SIGNER_STATE_FIELD_NUMBER: u32 = 5; + +fn protobuf_varint_len(mut value: usize) -> usize { + let mut len = 1; + while value >= 0x80 { + value >>= 7; + len += 1; + } + len +} + +fn signer_state_wire_bytes(entries: &[crate::pb::SignerStateEntry], field_number: u32) -> usize { + let field_key = ((field_number << 3) | 2) as usize; // wire type 2 = length-delimited + let field_key_len = protobuf_varint_len(field_key); + entries + .iter() + .map(|entry| { + let entry_len = entry.encoded_len(); + field_key_len + protobuf_varint_len(entry_len) + entry_len + }) + .sum() +} + +pub fn signer_state_request_wire_bytes(entries: &[crate::pb::SignerStateEntry]) -> usize { + signer_state_wire_bytes(entries, HSM_REQUEST_SIGNER_STATE_FIELD_NUMBER) +} + +pub fn signer_state_response_wire_bytes(entries: &[crate::pb::SignerStateEntry]) -> usize { + signer_state_wire_bytes(entries, HSM_RESPONSE_SIGNER_STATE_FIELD_NUMBER) +} + +pub fn savings_percent(full_wire_bytes: usize, diff_wire_bytes: usize) -> usize { + if full_wire_bytes == 0 { + return 0; + } + full_wire_bytes + .saturating_sub(diff_wire_bytes) + .saturating_mul(100) + / full_wire_bytes +} diff --git a/libs/gl-client/src/signer/mod.rs b/libs/gl-client/src/signer/mod.rs index 85941bb1c..f9c22f725 100644 --- a/libs/gl-client/src/signer/mod.rs +++ b/libs/gl-client/src/signer/mod.rs @@ -10,6 +10,9 @@ use crate::pb::PendingRequest; use crate::pb::{node_client::NodeClient, Empty, HsmRequest, HsmRequestContext, HsmResponse}; use crate::runes; use crate::signer::resolve::Resolver; +use crate::metrics::{ + signer_state_response_wire_bytes, savings_percent, +}; use crate::tls::TlsConfig; use crate::{node, node::Client}; use anyhow::{anyhow, Result}; @@ -635,25 +638,28 @@ impl Signer { })?; if force_full_sync { let full_entries: Vec = state.clone().into(); - let full_value_bytes: usize = full_entries.iter().map(|e| e.value.len()).sum(); - let full_key_bytes: usize = full_entries.iter().map(|e| e.key.len()).sum(); + let full_wire_bytes = signer_state_response_wire_bytes(&full_entries); trace!( - "Signer state full sync entries={}, value_bytes={}, key_bytes={}", + "Signer state full sync entries={}, wire_bytes={}", full_entries.len(), - full_value_bytes, - full_key_bytes + full_wire_bytes ); full_entries } else { + let full_wire_bytes = { + let full_entries: Vec = state.clone().into(); + signer_state_response_wire_bytes(&full_entries) + }; let diff_state = prestate_sketch.diff_state(&state); let diff_entries: Vec = diff_state.into(); - let diff_value_bytes: usize = diff_entries.iter().map(|e| e.value.len()).sum(); - let diff_key_bytes: usize = diff_entries.iter().map(|e| e.key.len()).sum(); + let diff_wire_bytes = signer_state_response_wire_bytes(&diff_entries); + let saved_percent = savings_percent(full_wire_bytes, diff_wire_bytes); trace!( - "Signer state diff entries={}, value_bytes={}, key_bytes={}", + "Signer state diff entries={}, wire_bytes={}, full_wire_bytes={}, saved {}% bandwidth syncing the state", diff_entries.len(), - diff_value_bytes, - diff_key_bytes + diff_wire_bytes, + full_wire_bytes, + saved_percent ); diff_entries } diff --git a/libs/gl-plugin/src/node/mod.rs b/libs/gl-plugin/src/node/mod.rs index 0026c9c98..0a8dd02d9 100644 --- a/libs/gl-plugin/src/node/mod.rs +++ b/libs/gl-plugin/src/node/mod.rs @@ -8,6 +8,9 @@ use base64::{engine::general_purpose, Engine as _}; use bytes::BufMut; use cln_rpc::Notification; use gl_client::persist::{State, StateSketch}; +use gl_client::metrics::{ + signer_state_request_wire_bytes, savings_percent, +}; use governor::{ clock::MonotonicClock, state::direct::NotKeyed, state::InMemoryState, Quota, RateLimiter, }; @@ -349,6 +352,7 @@ impl Node for PluginNodeServer { let state_snapshot = signer_state.lock().await.clone(); let state_entries: Vec = state_snapshot.clone().into(); + let state_wire_bytes = signer_state_request_wire_bytes(&state_entries); let state_entries: Vec = state_entries .into_iter() .map(|s| pb::SignerStateEntry { @@ -357,14 +361,11 @@ impl Node for PluginNodeServer { value: s.value, }) .collect(); - let state_value_bytes: usize = state_entries.iter().map(|e| e.value.len()).sum(); - let state_key_bytes: usize = state_entries.iter().map(|e| e.key.len()).sum(); trace!( - "Signer state heartbeat to hsm_id={} entries={}, value_bytes={}, key_bytes={}", + "Signer state heartbeat to hsm_id={} entries={}, wire_bytes={}", hsm_id, state_entries.len(), - state_value_bytes, - state_key_bytes + state_wire_bytes ); let msg = vls_protocol::msgs::GetHeartbeat {}; use vls_protocol::msgs::SerBolt; @@ -411,6 +412,7 @@ impl Node for PluginNodeServer { last_seen_sync_epoch = current_sync_epoch; } + let mut full_wire_bytes: Option = None; let outgoing_entries: Vec = if force_full_sync { trace!( "Forcing full signer state sync to hsm_id={} request_id={}", @@ -421,11 +423,15 @@ impl Node for PluginNodeServer { state_snapshot.clone().into() } else { // Send only the changes since the last time we sent state to this signer. + let full_entries: Vec = + state_snapshot.clone().into(); + full_wire_bytes = Some(signer_state_request_wire_bytes(&full_entries)); let diff_state = last_sent_sketch.diff_state(&state_snapshot); let diff_entries: Vec = diff_state.clone().into(); last_sent_sketch.apply_state(&diff_state); diff_entries }; + let outgoing_wire_bytes = signer_state_request_wire_bytes(&outgoing_entries); // TODO Consolidate protos in `gl-client` and `gl-plugin`, then remove this map. let outgoing_entries: Vec = outgoing_entries @@ -437,25 +443,26 @@ impl Node for PluginNodeServer { }) .collect(); - let value_bytes: usize = outgoing_entries.iter().map(|e| e.value.len()).sum(); - let key_bytes: usize = outgoing_entries.iter().map(|e| e.key.len()).sum(); if force_full_sync { trace!( - "Signer state full sync to hsm_id={} request_id={} entries={}, value_bytes={}, key_bytes={}", + "Signer state full sync to hsm_id={} request_id={} entries={}, wire_bytes={}", hsm_id, req.request.request_id, outgoing_entries.len(), - value_bytes, - key_bytes + outgoing_wire_bytes ); } else { + let full_wire_bytes = full_wire_bytes.unwrap_or(outgoing_wire_bytes); + let saved_percent = + savings_percent(full_wire_bytes, outgoing_wire_bytes); trace!( - "Signer state diff to hsm_id={} request_id={} entries={}, value_bytes={}, key_bytes={}", + "Signer state diff to hsm_id={} request_id={} entries={}, wire_bytes={}, full_wire_bytes={}, saved {}% bandwidth syncing the state", hsm_id, req.request.request_id, outgoing_entries.len(), - value_bytes, - key_bytes + outgoing_wire_bytes, + full_wire_bytes, + saved_percent ); } From 51cf7b5e8d603f1d21185f3fb130d39ee4bc1ad4 Mon Sep 17 00:00:00 2001 From: Ihor Diachenko Date: Mon, 16 Feb 2026 13:38:38 +0200 Subject: [PATCH 07/17] Some refactoring --- libs/gl-client/src/persist.rs | 38 +++++++++++++++++------------------ 1 file changed, 19 insertions(+), 19 deletions(-) diff --git a/libs/gl-client/src/persist.rs b/libs/gl-client/src/persist.rs index 155ec69be..c4770192f 100644 --- a/libs/gl-client/src/persist.rs +++ b/libs/gl-client/src/persist.rs @@ -28,14 +28,6 @@ pub struct State { } impl State { - fn put_tombstone(&mut self, live_key: &str) { - let tombstone_key = Self::tombstone_key(live_key); - let version = self.next_version_for_live_key(live_key); - self.values - .insert(tombstone_key, (version, serde_json::Value::Null)); - self.values.remove(live_key); - } - fn insert_node( &mut self, key: &str, @@ -44,8 +36,8 @@ impl State { ) -> Result<(), Error> { let node_key = format!("{NODE_PREFIX}/{key}"); let state_key = format!("{NODE_STATE_PREFIX}/{key}"); - let node_version = self.next_version_for_live_key(&node_key); - let state_version = self.next_version_for_live_key(&state_key); + let node_version = self.next_version(&node_key); + let state_version = self.next_version(&state_key); self.values .insert(node_key, (node_version, serde_json::to_value(node_entry).unwrap())); @@ -103,7 +95,7 @@ impl State { channel_entry: vls_persist::model::ChannelEntry, ) -> Result<(), Error> { let key = format!("{CHANNEL_PREFIX}/{key}"); - let version = self.next_version_for_live_key(&key); + let version = self.next_version(&key); self.values .insert(key, (version, serde_json::to_value(channel_entry).unwrap())); Ok(()) @@ -181,7 +173,7 @@ impl State { ) -> Result<(), Error> { let key = hex::encode(node_id.serialize()); let key = format!("{TRACKER_PREFIX}/{key}"); - let version = self.next_version_for_live_key(&key); + let version = self.next_version(&key); let tracker: vls_persist::model::ChainTrackerEntry = tracker.into(); @@ -195,7 +187,15 @@ impl State { Ok(()) } - fn tombstone_key(live_key: &str) -> String { + fn put_tombstone(&mut self, live_key: &str) { + let tombstone_key = Self::tombstone_key(live_key); + let version = self.next_version(live_key); + self.values + .insert(tombstone_key, (version, serde_json::Value::Null)); + self.values.remove(live_key); + } + + fn tombstone_key(live_key: &str) -> String { format!("{TOMBSTONE_PREFIX}/{live_key}") } @@ -208,14 +208,14 @@ impl State { Self::tombstone_target_key(key).is_some() } - fn tombstone_version_for_live_key(&self, live_key: &str) -> Option { + fn tombstone_version(&self, live_key: &str) -> Option { let tombstone_key = Self::tombstone_key(live_key); self.values.get(&tombstone_key).map(|v| v.0) } - fn next_version_for_live_key(&self, live_key: &str) -> u64 { + fn next_version(&self, live_key: &str) -> u64 { let live_ver = self.values.get(live_key).map(|v| v.0); - let tombstone_ver = self.tombstone_version_for_live_key(live_key); + let tombstone_ver = self.tombstone_version(live_key); live_ver .into_iter() .chain(tombstone_ver) @@ -226,7 +226,7 @@ impl State { fn is_tombstoned(&self, live_key: &str) -> bool { let live_ver = self.values.get(live_key).map(|v| v.0); - let tombstone_ver = self.tombstone_version_for_live_key(live_key); + let tombstone_ver = self.tombstone_version(live_key); match (live_ver, tombstone_ver) { (_, None) => false, @@ -334,7 +334,7 @@ impl State { } if self - .tombstone_version_for_live_key(key) + .tombstone_version(key) .map(|tomb| tomb >= *newver) .unwrap_or(false) { @@ -679,7 +679,7 @@ impl Persist for MemoryPersister { *v = (v.0 + 1, serde_json::to_value(allowlist).unwrap()); } None => { - let version = state.next_version_for_live_key(&key); + let version = state.next_version(&key); state .values .insert(key, (version, serde_json::to_value(allowlist).unwrap())); From 0c398673ce7835f29435c597a11e6ec318069517 Mon Sep 17 00:00:00 2001 From: Ihor Diachenko Date: Mon, 16 Feb 2026 14:07:17 +0200 Subject: [PATCH 08/17] removed todos --- libs/gl-plugin/src/node/mod.rs | 2 -- libs/proto/glclient/greenlight.proto | 3 +-- 2 files changed, 1 insertion(+), 4 deletions(-) diff --git a/libs/gl-plugin/src/node/mod.rs b/libs/gl-plugin/src/node/mod.rs index 0a8dd02d9..1abbeb27b 100644 --- a/libs/gl-plugin/src/node/mod.rs +++ b/libs/gl-plugin/src/node/mod.rs @@ -338,8 +338,6 @@ impl Node for PluginNodeServer { let mut last_seen_sync_epoch = state_sync_epoch.load(Ordering::SeqCst); { - // TODO: Check this edge case - // We start by immediately injecting a // vls_protocol::Message::GetHeartbeat. This serves two // purposes: already send the initial snapshot of the diff --git a/libs/proto/glclient/greenlight.proto b/libs/proto/glclient/greenlight.proto index 11a524551..76ea1ef12 100644 --- a/libs/proto/glclient/greenlight.proto +++ b/libs/proto/glclient/greenlight.proto @@ -75,7 +75,6 @@ message HsmResponse { // A list of updated key-value-version tuples that is to be // merged into the state tracked by the plugin. repeated SignerStateEntry signer_state = 5; - // TODO: make it diff-aware? // If the signer reported an error, and did therefore not include // `raw`, this is the stringified error, so we can print it in the @@ -233,7 +232,7 @@ message LspInvoiceRequest { // Optional: for discounts/API keys string token = 2; // len=0 => None // Pass-through of cln invoice rpc params - uint64 amount_msat = 3; // 0 => Any + uint64 amount_msat = 3; // 0 => Any string description = 4; string label = 5; } From 88a323f7a11ffef92701162c5aa8788e0435a966 Mon Sep 17 00:00:00 2001 From: Ihor Diachenko Date: Mon, 16 Feb 2026 14:23:56 +0200 Subject: [PATCH 09/17] Readable loop --- libs/gl-plugin/src/node/mod.rs | 64 ++++++++++++++++++++-------------- 1 file changed, 37 insertions(+), 27 deletions(-) diff --git a/libs/gl-plugin/src/node/mod.rs b/libs/gl-plugin/src/node/mod.rs index 1abbeb27b..342e5fb20 100644 --- a/libs/gl-plugin/src/node/mod.rs +++ b/libs/gl-plugin/src/node/mod.rs @@ -336,6 +336,10 @@ impl Node for PluginNodeServer { trace!("hsmd hsm_id={} request processor started", hsm_id); let mut last_sent_sketch = StateSketch::new(); let mut last_seen_sync_epoch = state_sync_epoch.load(Ordering::SeqCst); + enum SyncMode { + Full, + Diff { full_wire_bytes: usize }, + } { // We start by immediately injecting a @@ -410,26 +414,31 @@ impl Node for PluginNodeServer { last_seen_sync_epoch = current_sync_epoch; } - let mut full_wire_bytes: Option = None; - let outgoing_entries: Vec = if force_full_sync { + let (outgoing_entries, sync_mode): ( + Vec, + SyncMode, + ) = if force_full_sync { trace!( "Forcing full signer state sync to hsm_id={} request_id={}", hsm_id, req.request.request_id ); last_sent_sketch = state_snapshot.sketch(); - state_snapshot.clone().into() + (state_snapshot.clone().into(), SyncMode::Full) } else { // Send only the changes since the last time we sent state to this signer. let full_entries: Vec = state_snapshot.clone().into(); - full_wire_bytes = Some(signer_state_request_wire_bytes(&full_entries)); + let full_wire_bytes = signer_state_request_wire_bytes(&full_entries); let diff_state = last_sent_sketch.diff_state(&state_snapshot); - let diff_entries: Vec = diff_state.clone().into(); + let diff_entries: Vec = + diff_state.clone().into(); last_sent_sketch.apply_state(&diff_state); - diff_entries + (diff_entries, SyncMode::Diff { full_wire_bytes }) }; + let outgoing_wire_bytes = signer_state_request_wire_bytes(&outgoing_entries); + let outgoing_entry_count = outgoing_entries.len(); // TODO Consolidate protos in `gl-client` and `gl-plugin`, then remove this map. let outgoing_entries: Vec = outgoing_entries @@ -441,27 +450,28 @@ impl Node for PluginNodeServer { }) .collect(); - if force_full_sync { - trace!( - "Signer state full sync to hsm_id={} request_id={} entries={}, wire_bytes={}", - hsm_id, - req.request.request_id, - outgoing_entries.len(), - outgoing_wire_bytes - ); - } else { - let full_wire_bytes = full_wire_bytes.unwrap_or(outgoing_wire_bytes); - let saved_percent = - savings_percent(full_wire_bytes, outgoing_wire_bytes); - trace!( - "Signer state diff to hsm_id={} request_id={} entries={}, wire_bytes={}, full_wire_bytes={}, saved {}% bandwidth syncing the state", - hsm_id, - req.request.request_id, - outgoing_entries.len(), - outgoing_wire_bytes, - full_wire_bytes, - saved_percent - ); + match sync_mode { + SyncMode::Full => { + trace!( + "Signer state full sync to hsm_id={} request_id={} entries={}, wire_bytes={}", + hsm_id, + req.request.request_id, + outgoing_entry_count, + outgoing_wire_bytes + ); + } + SyncMode::Diff { full_wire_bytes } => { + let saved_percent = savings_percent(full_wire_bytes, outgoing_wire_bytes); + trace!( + "Signer state diff to hsm_id={} request_id={} entries={}, wire_bytes={}, full_wire_bytes={}, saved {}% bandwidth syncing the state", + hsm_id, + req.request.request_id, + outgoing_entry_count, + outgoing_wire_bytes, + full_wire_bytes, + saved_percent + ); + } } req.request.signer_state = outgoing_entries; From b882ffc8e543400a3b735784c8d099a3ede77f23 Mon Sep 17 00:00:00 2001 From: Ihor Diachenko Date: Mon, 16 Feb 2026 14:41:30 +0200 Subject: [PATCH 10/17] Added todo --- libs/gl-client/src/persist.rs | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/libs/gl-client/src/persist.rs b/libs/gl-client/src/persist.rs index c4770192f..fae3eb72d 100644 --- a/libs/gl-client/src/persist.rs +++ b/libs/gl-client/src/persist.rs @@ -22,6 +22,10 @@ const ALLOWLIST_PREFIX: &str = "allowlists"; const TRACKER_PREFIX: &str = "trackers"; const TOMBSTONE_PREFIX: &str = "tombs"; +/** + * TODO: consider tombstone garbage collection strategy if we expect a large number of deletes. + */ + #[derive(Clone, Serialize, Deserialize)] pub struct State { values: BTreeMap, From 7a2a4586e09d640614128065404c990dce49b62c Mon Sep 17 00:00:00 2001 From: Ihor Diachenko Date: Thu, 19 Feb 2026 01:50:33 +0200 Subject: [PATCH 11/17] Return on serialization error --- libs/gl-client/src/signer/mod.rs | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/libs/gl-client/src/signer/mod.rs b/libs/gl-client/src/signer/mod.rs index f9c22f725..942bb4086 100644 --- a/libs/gl-client/src/signer/mod.rs +++ b/libs/gl-client/src/signer/mod.rs @@ -505,8 +505,11 @@ impl Signer { ); } trace!("Processing request {}", hex::encode(&req.raw)); - let log_state = serde_json::to_string(&*state) - .unwrap_or_else(|_| "".to_string()); + let log_state = serde_json::to_string(&*state).map_err(|e| { + Error::Other(anyhow!("Failed to serialize signer state for logging: {:?}", e)) + })?; + + (state.sketch(), log_state, merge_res.has_conflicts()) }; From cf64afa9cf833a079056487fc28b5da6f7a88191 Mon Sep 17 00:00:00 2001 From: Ihor Diachenko Date: Thu, 19 Feb 2026 02:05:24 +0200 Subject: [PATCH 12/17] Refactored persist tests --- libs/gl-client/src/persist.rs | 55 ++++++++++++++++++++++------------- 1 file changed, 34 insertions(+), 21 deletions(-) diff --git a/libs/gl-client/src/persist.rs b/libs/gl-client/src/persist.rs index fae3eb72d..65012f095 100644 --- a/libs/gl-client/src/persist.rs +++ b/libs/gl-client/src/persist.rs @@ -825,6 +825,18 @@ mod tests { State { values } } + fn assert_entry(state: &State, key: &str, expected_version: u64, expected_value: serde_json::Value) { + let (actual_version, actual_value) = state.values.get(key).unwrap_or_else(|| { + panic!("expected state to include key: {key}") + }); + assert_eq!(*actual_version, expected_version); + assert_eq!(actual_value, &expected_value); + } + + fn assert_entry_absent(state: &State, key: &str) { + assert!(state.values.get(key).is_none(), "expected state to omit key {key}"); + } + #[test] fn diff_state_only_includes_new_or_newer_entries() { let old = mk_state(vec![ @@ -846,10 +858,10 @@ mod tests { let diff = old.diff_state(&new); assert_eq!(diff.values.len(), 2); - assert_eq!(diff.values.get("k2").unwrap().0, 3); - assert_eq!(diff.values.get("k4").unwrap().0, 0); - assert!(diff.values.get("k1").is_none()); - assert!(diff.values.get("k3").is_none()); + assert_entry(&diff, "k2", 3, json!({"v": 22})); + assert_entry(&diff, "k4", 0, json!({"v": 4})); + assert_entry_absent(&diff, "k1"); + assert_entry_absent(&diff, "k3"); } #[test] @@ -863,8 +875,8 @@ mod tests { let diff = old.diff_state(&new); assert_eq!(diff.values.len(), 2); - assert_eq!(diff.values.get("k1").unwrap().0, 1); - assert_eq!(diff.values.get("k2").unwrap().0, 2); + assert_entry(&diff, "k1", 1, json!({"v": 1})); + assert_entry(&diff, "k2", 2, json!({"v": 2})); } #[test] @@ -898,8 +910,8 @@ mod tests { let diff = sketch.diff_state(&state); assert_eq!(diff.values.len(), 2); - assert_eq!(diff.values.get("a").unwrap().0, 1); - assert_eq!(diff.values.get("b").unwrap().0, 2); + assert_entry(&diff, "a", 1, json!(1)); + assert_entry(&diff, "b", 2, json!(2)); } #[test] @@ -911,8 +923,9 @@ mod tests { let next = mk_state(vec![("a", 2, json!(10)), ("b", 2, json!(20)), ("c", 0, json!(30))]); let first_diff = sketch.diff_state(&next); assert_eq!(first_diff.values.len(), 2); - assert!(first_diff.values.get("a").is_some()); - assert!(first_diff.values.get("c").is_some()); + assert_entry(&first_diff, "a", 2, json!(10)); + assert_entry(&first_diff, "c", 0, json!(30)); + assert_entry_absent(&first_diff, "b"); sketch.apply_state(&first_diff); let second_diff = sketch.diff_state(&next); @@ -928,8 +941,8 @@ mod tests { let res = state.safe_merge(&incoming).unwrap(); - assert!(state.values.get(&live_key).is_none()); - assert_eq!(state.values.get(&tombstone_key).unwrap().0, 3); + assert_entry_absent(&state, &live_key); + assert_entry(&state, &tombstone_key, 3, serde_json::Value::Null); assert_eq!(res.conflict_count, 0); } @@ -942,8 +955,8 @@ mod tests { let res = state.safe_merge(&incoming).unwrap(); - assert!(state.values.get(&live_key).is_none()); - assert_eq!(state.values.get(&tombstone_key).unwrap().0, 5); + assert_entry_absent(&state, &live_key); + assert_entry(&state, &tombstone_key, 5, serde_json::Value::Null); assert_eq!(res.conflict_count, 1); } @@ -956,8 +969,8 @@ mod tests { let res = state.safe_merge(&incoming).unwrap(); - assert_eq!(state.values.get(&live_key).unwrap().0, 6); - assert_eq!(state.values.get(&tombstone_key).unwrap().0, 5); + assert_entry(&state, &live_key, 6, json!({"v": 2})); + assert_entry(&state, &tombstone_key, 5, serde_json::Value::Null); assert_eq!(res.conflict_count, 0); } @@ -969,7 +982,7 @@ mod tests { let res = state.safe_merge(&incoming).unwrap(); assert_eq!(res.conflict_count, 1); - assert_eq!(state.values.get("k1").unwrap().0, 5); + assert_entry(&state, "k1", 5, json!({"v": 5})); } #[test] @@ -980,8 +993,8 @@ mod tests { state.delete_channel("abc"); - assert!(state.values.get(&live_key).is_none()); - assert_eq!(state.values.get(&tombstone_key).unwrap().0, 8); + assert_entry_absent(&state, &live_key); + assert_entry(&state, &tombstone_key, 8, serde_json::Value::Null); } #[test] @@ -1010,9 +1023,9 @@ mod tests { (tracker_key, 4u64), (channel_key, 5u64), ] { - assert!(state.values.get(&live_key).is_none()); + assert_entry_absent(&state, &live_key); let tombstone_key = format!("{TOMBSTONE_PREFIX}/{live_key}"); - assert_eq!(state.values.get(&tombstone_key).unwrap().0, old_version + 1); + assert_entry(&state, &tombstone_key, old_version + 1, serde_json::Value::Null); } } } From fd1e9ea30f6626d4477f985d48bf9e741f118462 Mon Sep 17 00:00:00 2001 From: Ihor Diachenko Date: Thu, 19 Feb 2026 03:06:23 +0200 Subject: [PATCH 13/17] Simplified the tombstone logic --- libs/gl-client/src/persist.rs | 201 +++++++++++++--------------------- 1 file changed, 77 insertions(+), 124 deletions(-) diff --git a/libs/gl-client/src/persist.rs b/libs/gl-client/src/persist.rs index 65012f095..1a4277a89 100644 --- a/libs/gl-client/src/persist.rs +++ b/libs/gl-client/src/persist.rs @@ -20,11 +20,7 @@ const NODE_STATE_PREFIX: &str = "nodestates"; const CHANNEL_PREFIX: &str = "channels"; const ALLOWLIST_PREFIX: &str = "allowlists"; const TRACKER_PREFIX: &str = "trackers"; -const TOMBSTONE_PREFIX: &str = "tombs"; - -/** - * TODO: consider tombstone garbage collection strategy if we expect a large number of deletes. - */ +const TOMBSTONE_VERSION: u64 = u64::MAX; #[derive(Clone, Serialize, Deserialize)] pub struct State { @@ -40,6 +36,8 @@ impl State { ) -> Result<(), Error> { let node_key = format!("{NODE_PREFIX}/{key}"); let state_key = format!("{NODE_STATE_PREFIX}/{key}"); + self.ensure_not_tombstone(&node_key)?; + self.ensure_not_tombstone(&state_key)?; let node_version = self.next_version(&node_key); let state_version = self.next_version(&state_key); @@ -62,6 +60,7 @@ impl State { serde_json::to_string(&node_state).unwrap() ); let key = format!("{NODE_STATE_PREFIX}/{key}"); + self.ensure_not_tombstone(&key)?; let v = self .values .get_mut(&key) @@ -99,6 +98,7 @@ impl State { channel_entry: vls_persist::model::ChannelEntry, ) -> Result<(), Error> { let key = format!("{CHANNEL_PREFIX}/{key}"); + self.ensure_not_tombstone(&key)?; let version = self.next_version(&key); self.values .insert(key, (version, serde_json::to_value(channel_entry).unwrap())); @@ -117,6 +117,7 @@ impl State { ) -> Result<(), Error> { trace!("Updating channel {key}"); let key = format!("{CHANNEL_PREFIX}/{key}"); + self.ensure_not_tombstone(&key)?; let v = self .values .get_mut(&key) @@ -130,7 +131,7 @@ impl State { key: &str, ) -> Result { let key = format!("{CHANNEL_PREFIX}/{key}"); - if self.is_tombstoned(&key) { + if self.is_tombstone(&key) { return Err(Error::Internal(format!("channel {} has been deleted", key))); } let value = self @@ -158,7 +159,7 @@ impl State { .values .iter() .filter(|(k, _)| k.starts_with(&prefix)) - .filter(|(k, _)| !self.is_tombstoned(k)) + .filter(|(k, _)| !self.is_tombstone(k)) .map(|(k, v)| { let key = k.split('/').last().unwrap(); let key = vls_persist::model::NodeChannelId(hex::decode(&key).unwrap()); @@ -177,6 +178,7 @@ impl State { ) -> Result<(), Error> { let key = hex::encode(node_id.serialize()); let key = format!("{TRACKER_PREFIX}/{key}"); + self.ensure_not_tombstone(&key)?; let version = self.next_version(&key); let tracker: vls_persist::model::ChainTrackerEntry = tracker.into(); @@ -192,51 +194,29 @@ impl State { } fn put_tombstone(&mut self, live_key: &str) { - let tombstone_key = Self::tombstone_key(live_key); - let version = self.next_version(live_key); self.values - .insert(tombstone_key, (version, serde_json::Value::Null)); - self.values.remove(live_key); - } - - fn tombstone_key(live_key: &str) -> String { - format!("{TOMBSTONE_PREFIX}/{live_key}") - } - - fn tombstone_target_key(key: &str) -> Option<&str> { - key.strip_prefix(TOMBSTONE_PREFIX) - .and_then(|rest| rest.strip_prefix('/')) - } - - fn is_tombstone_key(key: &str) -> bool { - Self::tombstone_target_key(key).is_some() - } - - fn tombstone_version(&self, live_key: &str) -> Option { - let tombstone_key = Self::tombstone_key(live_key); - self.values.get(&tombstone_key).map(|v| v.0) + .insert(live_key.to_owned(), (TOMBSTONE_VERSION, serde_json::Value::Null)); } fn next_version(&self, live_key: &str) -> u64 { - let live_ver = self.values.get(live_key).map(|v| v.0); - let tombstone_ver = self.tombstone_version(live_key); - live_ver - .into_iter() - .chain(tombstone_ver) - .max() - .map(|v| v.saturating_add(1)) + self.values + .get(live_key) + .map(|v| v.0.saturating_add(1)) .unwrap_or(0) } - fn is_tombstoned(&self, live_key: &str) -> bool { - let live_ver = self.values.get(live_key).map(|v| v.0); - let tombstone_ver = self.tombstone_version(live_key); + fn is_tombstone(&self, live_key: &str) -> bool { + self.values + .get(live_key) + .map(|v| v.0 == TOMBSTONE_VERSION) + .unwrap_or(false) + } - match (live_ver, tombstone_ver) { - (_, None) => false, - (None, Some(_)) => true, - (Some(live), Some(tomb)) => tomb >= live, + fn ensure_not_tombstone(&self, key: &str) -> Result<(), Error> { + if self.is_tombstone(key) { + return Err(Error::Internal(format!("key {} has been deleted", key))); } + Ok(()) } } @@ -301,64 +281,38 @@ impl State { pub fn safe_merge(&mut self, other: &State) -> anyhow::Result { let mut res = SafeMergeResult::default(); for (key, (newver, newval)) in other.values.iter() { - if Self::is_tombstone_key(key) { - let local = self.values.get_mut(key); - match local { - None => { - trace!("Insert new tombstone key {}: version={}", key, newver); - res.changes.push((key.to_owned(), None, *newver)); - self.values.insert(key.clone(), (*newver, newval.clone())); - } - Some(v) => { - if v.0 == *newver { - // Idempotent re-application, but still enforce live-key cleanup below. - } else if v.0 > *newver { - warn!("Ignoring outdated tombstone version newver={}, we have oldver={}: newval={:?} vs oldval={:?}", newver, v.0, serde_json::to_string(newval), serde_json::to_string(&v.1)); - // Keep the newer local tombstone and still enforce live-key cleanup below. - res.conflict_count += 1; - } else { - trace!( - "Updating tombstone key {}: version={} => version={}", - key, - v.0, - *newver - ); - res.changes.push((key.to_owned(), Some(v.0), *newver)); - *v = (*newver, newval.clone()); - } - } - } - - if let Some(live_key) = Self::tombstone_target_key(key) { - if self.is_tombstoned(live_key) { - self.values.remove(live_key); - } - } - continue; - } - - if self - .tombstone_version(key) - .map(|tomb| tomb >= *newver) - .unwrap_or(false) - { - trace!( - "Ignoring live key {} version={} because a tombstone is newer or equal", - key, newver - ); - res.conflict_count += 1; - continue; - } - - let local = self.values.get_mut(key); - - match local { + let incoming_is_tombstone = *newver == TOMBSTONE_VERSION; + match self.values.get_mut(key) { None => { trace!("Insert new key {}: version={}", key, newver); res.changes.push((key.to_owned(), None, *newver)); - self.values.insert(key.clone(), (*newver, newval.clone())); + let value = if incoming_is_tombstone { + serde_json::Value::Null + } else { + newval.clone() + }; + self.values.insert(key.clone(), (*newver, value)); } Some(v) => { + if incoming_is_tombstone { + if v.0 == TOMBSTONE_VERSION { + continue; + } + trace!("Tombstoning key {}: version={} => version={}", key, v.0, newver); + res.changes.push((key.to_owned(), Some(v.0), *newver)); + *v = (*newver, serde_json::Value::Null); + continue; + } + + if v.0 == TOMBSTONE_VERSION { + trace!( + "Ignoring live key {} version={} because local key is tombstoned", + key, newver + ); + res.conflict_count += 1; + continue; + } + if v.0 == *newver { continue; } else if v.0 > *newver { @@ -624,6 +578,7 @@ impl Persist for MemoryPersister { let key = format!("{TRACKER_PREFIX}/{key}"); let mut state = self.state.lock().unwrap(); + state.ensure_not_tombstone(&key)?; let v = state.values.get_mut(&key).unwrap(); let tracker: vls_persist::model::ChainTrackerEntry = tracker.into(); *v = (v.0 + 1, serde_json::to_value(tracker).unwrap()); @@ -645,7 +600,7 @@ impl Persist for MemoryPersister { let key = format!("{TRACKER_PREFIX}/{key}"); let state = self.state.lock().unwrap(); - if state.is_tombstoned(&key) { + if state.is_tombstone(&key) { return Err(Error::Internal(format!("tracker {} has been deleted", key))); } let v: vls_persist::model::ChainTrackerEntry = @@ -678,6 +633,7 @@ impl Persist for MemoryPersister { let key = format!("{ALLOWLIST_PREFIX}/{key}"); let mut state = self.state.lock().unwrap(); + state.ensure_not_tombstone(&key)?; match state.values.get_mut(&key) { Some(v) => { *v = (v.0 + 1, serde_json::to_value(allowlist).unwrap()); @@ -696,7 +652,7 @@ impl Persist for MemoryPersister { let state = self.state.lock().unwrap(); let key = hex::encode(node_id.serialize()); let key = format!("{ALLOWLIST_PREFIX}/{key}"); - if state.is_tombstoned(&key) { + if state.is_tombstone(&key) { return Ok(Vec::new()); } @@ -719,7 +675,7 @@ impl Persist for MemoryPersister { .values .keys() .filter(|k| k.starts_with(&format!("{NODE_PREFIX}/"))) - .filter(|k| !state.is_tombstoned(k)) + .filter(|k| !state.is_tombstone(k)) .filter_map(|k| k.split('/').last()) .collect(); @@ -729,7 +685,7 @@ impl Persist for MemoryPersister { let state_key = format!("{NODE_STATE_PREFIX}/{node_id}"); let allowlist_key = format!("{ALLOWLIST_PREFIX}/{node_id}"); - if state.is_tombstoned(&node_key) || state.is_tombstoned(&state_key) { + if state.is_tombstone(&node_key) || state.is_tombstone(&state_key) { continue; } @@ -743,7 +699,7 @@ impl Persist for MemoryPersister { }; // Load allowlist, defaulting to empty if not found (for nodes created before VLS 0.14) - let allowlist_strings: Vec = if state.is_tombstoned(&allowlist_key) { + let allowlist_strings: Vec = if state.is_tombstone(&allowlist_key) { Vec::new() } else { match state.values.get(&allowlist_key) { @@ -810,9 +766,11 @@ impl Persist for MemoryPersister { #[cfg(test)] mod tests { + use crate::persist::TOMBSTONE_VERSION; + use super::{ State, StateSketch, ALLOWLIST_PREFIX, CHANNEL_PREFIX, NODE_PREFIX, NODE_STATE_PREFIX, - TOMBSTONE_PREFIX, TRACKER_PREFIX, + TRACKER_PREFIX, }; use serde_json::json; use std::collections::BTreeMap; @@ -837,6 +795,10 @@ mod tests { assert!(state.values.get(key).is_none(), "expected state to omit key {key}"); } + fn assert_tombstone(state: &State, key: &str) { + assert_entry(state, key, TOMBSTONE_VERSION, serde_json::Value::Null); + } + #[test] fn diff_state_only_includes_new_or_newer_entries() { let old = mk_state(vec![ @@ -935,43 +897,37 @@ mod tests { #[test] fn merge_tombstone_deletes_older_live_entry() { let live_key = format!("{CHANNEL_PREFIX}/abc"); - let tombstone_key = format!("{TOMBSTONE_PREFIX}/{live_key}"); let mut state = mk_state(vec![(live_key.as_str(), 2, json!({"v": 1}))]); - let incoming = mk_state(vec![(tombstone_key.as_str(), 3, serde_json::Value::Null)]); + let incoming = mk_state(vec![(live_key.as_str(), u64::MAX, serde_json::Value::Null)]); let res = state.safe_merge(&incoming).unwrap(); - assert_entry_absent(&state, &live_key); - assert_entry(&state, &tombstone_key, 3, serde_json::Value::Null); + assert_tombstone(&state, &live_key); assert_eq!(res.conflict_count, 0); } #[test] - fn merge_ignores_live_entry_if_tombstone_is_newer_or_equal() { + fn merge_ignores_live_entry_if_key_is_tombstone() { let live_key = format!("{CHANNEL_PREFIX}/abc"); - let tombstone_key = format!("{TOMBSTONE_PREFIX}/{live_key}"); - let mut state = mk_state(vec![(tombstone_key.as_str(), 5, serde_json::Value::Null)]); + let mut state = mk_state(vec![(live_key.as_str(), u64::MAX, serde_json::Value::Null)]); let incoming = mk_state(vec![(live_key.as_str(), 4, json!({"v": 1}))]); let res = state.safe_merge(&incoming).unwrap(); - assert_entry_absent(&state, &live_key); - assert_entry(&state, &tombstone_key, 5, serde_json::Value::Null); + assert_tombstone(&state, &live_key); assert_eq!(res.conflict_count, 1); } #[test] - fn merge_accepts_live_entry_if_newer_than_tombstone() { + fn merge_ignores_newer_live_entry_if_key_is_tombstone() { let live_key = format!("{CHANNEL_PREFIX}/abc"); - let tombstone_key = format!("{TOMBSTONE_PREFIX}/{live_key}"); - let mut state = mk_state(vec![(tombstone_key.as_str(), 5, serde_json::Value::Null)]); + let mut state = mk_state(vec![(live_key.as_str(), u64::MAX, serde_json::Value::Null)]); let incoming = mk_state(vec![(live_key.as_str(), 6, json!({"v": 2}))]); let res = state.safe_merge(&incoming).unwrap(); - assert_entry(&state, &live_key, 6, json!({"v": 2})); - assert_entry(&state, &tombstone_key, 5, serde_json::Value::Null); - assert_eq!(res.conflict_count, 0); + assert_tombstone(&state, &live_key); + assert_eq!(res.conflict_count, 1); } #[test] @@ -986,15 +942,13 @@ mod tests { } #[test] - fn delete_channel_creates_tombstone_and_bumps_version() { + fn delete_channel_marks_channel_with_tombstone_version() { let live_key = format!("{CHANNEL_PREFIX}/abc"); - let tombstone_key = format!("{TOMBSTONE_PREFIX}/{live_key}"); let mut state = mk_state(vec![(live_key.as_str(), 7, json!({"v": 1}))]); state.delete_channel("abc"); - assert_entry_absent(&state, &live_key); - assert_entry(&state, &tombstone_key, 8, serde_json::Value::Null); + assert_tombstone(&state, &live_key); } #[test] @@ -1023,9 +977,8 @@ mod tests { (tracker_key, 4u64), (channel_key, 5u64), ] { - assert_entry_absent(&state, &live_key); - let tombstone_key = format!("{TOMBSTONE_PREFIX}/{live_key}"); - assert_entry(&state, &tombstone_key, old_version + 1, serde_json::Value::Null); + assert_tombstone(&state, &live_key); + assert!(old_version < u64::MAX); } } } From 6878fedc35167f7af45678c6bbee2e5ba0a50cb7 Mon Sep 17 00:00:00 2001 From: Ihor Diachenko Date: Thu, 19 Feb 2026 16:07:54 +0200 Subject: [PATCH 14/17] Omit tombstones on initial sync --- libs/gl-client/src/persist.rs | 25 +++++++++++++++++++++++++ libs/gl-plugin/src/node/mod.rs | 15 ++++++++++----- 2 files changed, 35 insertions(+), 5 deletions(-) diff --git a/libs/gl-client/src/persist.rs b/libs/gl-client/src/persist.rs index 1a4277a89..4cbfc8433 100644 --- a/libs/gl-client/src/persist.rs +++ b/libs/gl-client/src/persist.rs @@ -378,6 +378,17 @@ impl State { pub fn sketch(&self) -> StateSketch { StateSketch::from_state(self) } + + // Return a copy of the state with tombstoned entries omitted. + pub fn omit_tombstones(&self) -> State { + let values = self + .values + .iter() + .filter(|(_, (version, _))| *version != TOMBSTONE_VERSION) + .map(|(key, value)| (key.clone(), value.clone())) + .collect(); + State { values } + } } #[derive(Clone, Serialize, Deserialize, Debug, Default)] @@ -799,6 +810,20 @@ mod tests { assert_entry(state, key, TOMBSTONE_VERSION, serde_json::Value::Null); } + #[test] + fn omit_tombstones_omits_tombstoned_entries() { + let state = mk_state(vec![ + ("k1", 1, json!({"v": 1})), + ("k2", TOMBSTONE_VERSION, serde_json::Value::Null), + ]); + + let filtered = state.omit_tombstones(); + + assert_eq!(filtered.values.len(), 1); + assert_entry(&filtered, "k1", 1, json!({"v": 1})); + assert_entry_absent(&filtered, "k2"); + } + #[test] fn diff_state_only_includes_new_or_newer_entries() { let old = mk_state(vec![ diff --git a/libs/gl-plugin/src/node/mod.rs b/libs/gl-plugin/src/node/mod.rs index 342e5fb20..da66377c3 100644 --- a/libs/gl-plugin/src/node/mod.rs +++ b/libs/gl-plugin/src/node/mod.rs @@ -352,8 +352,9 @@ impl Node for PluginNodeServer { // the large state with them. let state_snapshot = signer_state.lock().await.clone(); - let state_entries: Vec = - state_snapshot.clone().into(); + let state_entries: Vec = state_snapshot + .full_state_without_tombstones() + .into(); let state_wire_bytes = signer_state_request_wire_bytes(&state_entries); let state_entries: Vec = state_entries .into_iter() @@ -424,11 +425,15 @@ impl Node for PluginNodeServer { req.request.request_id ); last_sent_sketch = state_snapshot.sketch(); - (state_snapshot.clone().into(), SyncMode::Full) + ( + state_snapshot.omit_tombstones().into(), + SyncMode::Full, + ) } else { // Send only the changes since the last time we sent state to this signer. - let full_entries: Vec = - state_snapshot.clone().into(); + let full_entries: Vec = state_snapshot + .omit_tombstones() + .into(); let full_wire_bytes = signer_state_request_wire_bytes(&full_entries); let diff_state = last_sent_sketch.diff_state(&state_snapshot); let diff_entries: Vec = From 7c2da8cd52307461c2f4cb3ca6d0521b616bf165 Mon Sep 17 00:00:00 2001 From: Ihor Diachenko Date: Thu, 19 Feb 2026 16:13:51 +0200 Subject: [PATCH 15/17] Fixed build --- libs/gl-plugin/src/node/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/libs/gl-plugin/src/node/mod.rs b/libs/gl-plugin/src/node/mod.rs index da66377c3..604460df9 100644 --- a/libs/gl-plugin/src/node/mod.rs +++ b/libs/gl-plugin/src/node/mod.rs @@ -353,7 +353,7 @@ impl Node for PluginNodeServer { let state_snapshot = signer_state.lock().await.clone(); let state_entries: Vec = state_snapshot - .full_state_without_tombstones() + .omit_tombstones() .into(); let state_wire_bytes = signer_state_request_wire_bytes(&state_entries); let state_entries: Vec = state_entries From f03352b9fc8c403b9c6fbe0a5fd7c820955e25c8 Mon Sep 17 00:00:00 2001 From: Ihor Diachenko Date: Thu, 19 Feb 2026 16:39:30 +0200 Subject: [PATCH 16/17] Avoid resync on conflicts --- libs/gl-client/src/persist.rs | 21 +++----- libs/gl-client/src/signer/mod.rs | 51 +++++++----------- libs/gl-plugin/src/node/mod.rs | 91 ++++++++------------------------ 3 files changed, 50 insertions(+), 113 deletions(-) diff --git a/libs/gl-client/src/persist.rs b/libs/gl-client/src/persist.rs index 4cbfc8433..624fb75de 100644 --- a/libs/gl-client/src/persist.rs +++ b/libs/gl-client/src/persist.rs @@ -229,12 +229,12 @@ pub struct StateChange { } #[derive(Debug, Default)] -pub struct SafeMergeResult { +pub struct MergeResult { pub changes: Vec<(String, Option, u64)>, pub conflict_count: usize, } -impl SafeMergeResult { +impl MergeResult { pub fn has_conflicts(&self) -> bool { self.conflict_count > 0 } @@ -278,8 +278,8 @@ impl State { /// /// A conflict means the incoming state is stale or incompatible with local /// tombstone knowledge. Callers may use this signal to trigger a full sync. - pub fn safe_merge(&mut self, other: &State) -> anyhow::Result { - let mut res = SafeMergeResult::default(); + pub fn merge(&mut self, other: &State) -> anyhow::Result { + let mut res = MergeResult::default(); for (key, (newver, newval)) in other.values.iter() { let incoming_is_tombstone = *newver == TOMBSTONE_VERSION; match self.values.get_mut(key) { @@ -335,11 +335,6 @@ impl State { Ok(res) } - /// Backward-compatible merge API. - pub fn merge(&mut self, other: &State) -> anyhow::Result, u64)>> { - Ok(self.safe_merge(other)?.changes) - } - pub fn diff(&self, other: &State) -> anyhow::Result> { Ok(other .values @@ -925,7 +920,7 @@ mod tests { let mut state = mk_state(vec![(live_key.as_str(), 2, json!({"v": 1}))]); let incoming = mk_state(vec![(live_key.as_str(), u64::MAX, serde_json::Value::Null)]); - let res = state.safe_merge(&incoming).unwrap(); + let res = state.merge(&incoming).unwrap(); assert_tombstone(&state, &live_key); assert_eq!(res.conflict_count, 0); @@ -937,7 +932,7 @@ mod tests { let mut state = mk_state(vec![(live_key.as_str(), u64::MAX, serde_json::Value::Null)]); let incoming = mk_state(vec![(live_key.as_str(), 4, json!({"v": 1}))]); - let res = state.safe_merge(&incoming).unwrap(); + let res = state.merge(&incoming).unwrap(); assert_tombstone(&state, &live_key); assert_eq!(res.conflict_count, 1); @@ -949,7 +944,7 @@ mod tests { let mut state = mk_state(vec![(live_key.as_str(), u64::MAX, serde_json::Value::Null)]); let incoming = mk_state(vec![(live_key.as_str(), 6, json!({"v": 2}))]); - let res = state.safe_merge(&incoming).unwrap(); + let res = state.merge(&incoming).unwrap(); assert_tombstone(&state, &live_key); assert_eq!(res.conflict_count, 1); @@ -960,7 +955,7 @@ mod tests { let mut state = mk_state(vec![("k1", 5, json!({"v": 5}))]); let incoming = mk_state(vec![("k1", 4, json!({"v": 4}))]); - let res = state.safe_merge(&incoming).unwrap(); + let res = state.merge(&incoming).unwrap(); assert_eq!(res.conflict_count, 1); assert_entry(&state, "k1", 5, json!({"v": 5})); diff --git a/libs/gl-client/src/signer/mod.rs b/libs/gl-client/src/signer/mod.rs index 942bb4086..d5522f667 100644 --- a/libs/gl-client/src/signer/mod.rs +++ b/libs/gl-client/src/signer/mod.rs @@ -490,17 +490,17 @@ impl Signer { debug!("Processing request {:?}", req); let diff: crate::persist::State = req.signer_state.clone().into(); - let (prestate_sketch, prestate_log, force_full_sync) = { + let (prestate_sketch, prestate_log) = { debug!("Updating local signer state with state from node"); let mut state = self.state.lock().map_err(|e| { Error::Other(anyhow!("Failed to acquire state lock: {:?}", e)) })?; - let merge_res = state.safe_merge(&diff).map_err(|e| { + let merge_res = state.merge(&diff).map_err(|e| { Error::Other(anyhow!("Failed to merge signer state: {:?}", e)) })?; if merge_res.has_conflicts() { - warn!( - "State merge conflict detected (count={}), forcing full signer state sync", + debug!( + "State merge ignored stale versions (count={})", merge_res.conflict_count ); } @@ -510,7 +510,7 @@ impl Signer { })?; - (state.sketch(), log_state, merge_res.has_conflicts()) + (state.sketch(), log_state) }; // The first two bytes represent the message type. Check that @@ -639,33 +639,22 @@ impl Signer { let state = self.state.lock().map_err(|e| { Error::Other(anyhow!("Failed to acquire state lock for serialization: {:?}", e)) })?; - if force_full_sync { + let full_wire_bytes = { let full_entries: Vec = state.clone().into(); - let full_wire_bytes = signer_state_response_wire_bytes(&full_entries); - trace!( - "Signer state full sync entries={}, wire_bytes={}", - full_entries.len(), - full_wire_bytes - ); - full_entries - } else { - let full_wire_bytes = { - let full_entries: Vec = state.clone().into(); - signer_state_response_wire_bytes(&full_entries) - }; - let diff_state = prestate_sketch.diff_state(&state); - let diff_entries: Vec = diff_state.into(); - let diff_wire_bytes = signer_state_response_wire_bytes(&diff_entries); - let saved_percent = savings_percent(full_wire_bytes, diff_wire_bytes); - trace!( - "Signer state diff entries={}, wire_bytes={}, full_wire_bytes={}, saved {}% bandwidth syncing the state", - diff_entries.len(), - diff_wire_bytes, - full_wire_bytes, - saved_percent - ); - diff_entries - } + signer_state_response_wire_bytes(&full_entries) + }; + let diff_state = prestate_sketch.diff_state(&state); + let diff_entries: Vec = diff_state.into(); + let diff_wire_bytes = signer_state_response_wire_bytes(&diff_entries); + let saved_percent = savings_percent(full_wire_bytes, diff_wire_bytes); + trace!( + "Signer state diff entries={}, wire_bytes={}, full_wire_bytes={}, saved {}% bandwidth syncing the state", + diff_entries.len(), + diff_wire_bytes, + full_wire_bytes, + saved_percent + ); + diff_entries }; Ok(HsmResponse { raw: response.as_vec(), diff --git a/libs/gl-plugin/src/node/mod.rs b/libs/gl-plugin/src/node/mod.rs index 604460df9..4539b629e 100644 --- a/libs/gl-plugin/src/node/mod.rs +++ b/libs/gl-plugin/src/node/mod.rs @@ -19,7 +19,7 @@ use log::{debug, error, info, trace, warn}; use std::path::{Path, PathBuf}; use std::sync::atomic::AtomicBool; use std::sync::{ - atomic::{AtomicU64, AtomicUsize, Ordering}, + atomic::{AtomicUsize, Ordering}, Arc, }; use std::time::Duration; @@ -87,7 +87,6 @@ pub struct PluginNodeServer { rpc_path: PathBuf, events: tokio::sync::broadcast::Sender, signer_state: Arc>, - state_sync_epoch: Arc, grpc_binding: String, signer_state_store: Arc>>, pub ctx: crate::context::Context, @@ -132,7 +131,6 @@ impl PluginNodeServer { events, rpc_path: rpc_path.clone(), signer_state: Arc::new(Mutex::new(signer_state)), - state_sync_epoch: Arc::new(AtomicU64::new(0)), signer_state_store: Arc::new(Mutex::new(signer_state_store)), grpc_binding: config.node_grpc_binding, notifications, @@ -329,17 +327,11 @@ impl Node for PluginNodeServer { let (tx, rx) = mpsc::channel(10); let mut stream = self.stage.mystream().await; let signer_state = self.signer_state.clone(); - let state_sync_epoch = self.state_sync_epoch.clone(); let ctx = self.ctx.clone(); tokio::spawn(async move { trace!("hsmd hsm_id={} request processor started", hsm_id); let mut last_sent_sketch = StateSketch::new(); - let mut last_seen_sync_epoch = state_sync_epoch.load(Ordering::SeqCst); - enum SyncMode { - Full, - Diff { full_wire_bytes: usize }, - } { // We start by immediately injecting a @@ -386,7 +378,6 @@ impl Node for PluginNodeServer { log::warn!("Failed to send heartbeat message to signer: {}", e); } else { last_sent_sketch = state_snapshot.sketch(); - last_seen_sync_epoch = state_sync_epoch.load(Ordering::SeqCst); } } @@ -409,38 +400,14 @@ impl Node for PluginNodeServer { let state_snapshot = signer_state.lock().await.clone(); - let current_sync_epoch = state_sync_epoch.load(Ordering::SeqCst); - let force_full_sync = current_sync_epoch != last_seen_sync_epoch; - if force_full_sync { - last_seen_sync_epoch = current_sync_epoch; - } - - let (outgoing_entries, sync_mode): ( - Vec, - SyncMode, - ) = if force_full_sync { - trace!( - "Forcing full signer state sync to hsm_id={} request_id={}", - hsm_id, - req.request.request_id - ); - last_sent_sketch = state_snapshot.sketch(); - ( - state_snapshot.omit_tombstones().into(), - SyncMode::Full, - ) - } else { - // Send only the changes since the last time we sent state to this signer. - let full_entries: Vec = state_snapshot - .omit_tombstones() - .into(); - let full_wire_bytes = signer_state_request_wire_bytes(&full_entries); - let diff_state = last_sent_sketch.diff_state(&state_snapshot); - let diff_entries: Vec = - diff_state.clone().into(); - last_sent_sketch.apply_state(&diff_state); - (diff_entries, SyncMode::Diff { full_wire_bytes }) - }; + // Send only the changes since the last time we sent state to this signer. + let full_entries: Vec = + state_snapshot.omit_tombstones().into(); + let full_wire_bytes = signer_state_request_wire_bytes(&full_entries); + let diff_state = last_sent_sketch.diff_state(&state_snapshot); + let outgoing_entries: Vec = + diff_state.clone().into(); + last_sent_sketch.apply_state(&diff_state); let outgoing_wire_bytes = signer_state_request_wire_bytes(&outgoing_entries); let outgoing_entry_count = outgoing_entries.len(); @@ -455,29 +422,16 @@ impl Node for PluginNodeServer { }) .collect(); - match sync_mode { - SyncMode::Full => { - trace!( - "Signer state full sync to hsm_id={} request_id={} entries={}, wire_bytes={}", - hsm_id, - req.request.request_id, - outgoing_entry_count, - outgoing_wire_bytes - ); - } - SyncMode::Diff { full_wire_bytes } => { - let saved_percent = savings_percent(full_wire_bytes, outgoing_wire_bytes); - trace!( - "Signer state diff to hsm_id={} request_id={} entries={}, wire_bytes={}, full_wire_bytes={}, saved {}% bandwidth syncing the state", - hsm_id, - req.request.request_id, - outgoing_entry_count, - outgoing_wire_bytes, - full_wire_bytes, - saved_percent - ); - } - } + let saved_percent = savings_percent(full_wire_bytes, outgoing_wire_bytes); + trace!( + "Signer state diff to hsm_id={} request_id={} entries={}, wire_bytes={}, full_wire_bytes={}, saved {}% bandwidth syncing the state", + hsm_id, + req.request.request_id, + outgoing_entry_count, + outgoing_wire_bytes, + full_wire_bytes, + saved_percent + ); req.request.signer_state = outgoing_entries; req.request.requests = ctx.snapshot().await.into_iter().map(|r| r.into()).collect(); @@ -546,18 +500,17 @@ impl Node for PluginNodeServer { // Apply state changes to the in-memory state let mut state = self.signer_state.lock().await; - let merge_res = state.safe_merge(&new_state).map_err(|e| { + let merge_res = state.merge(&new_state).map_err(|e| { Status::new( Code::Internal, format!("Error updating internal state: {e}"), ) })?; if merge_res.has_conflicts() { - warn!( - "State merge conflict detected (count={}), forcing next full sync", + debug!( + "State merge ignored stale versions (count={})", merge_res.conflict_count ); - self.state_sync_epoch.fetch_add(1, Ordering::SeqCst); } // Send changes to the signer_state_store for persistence From d920d02327e02942148fa8d2d7b23734eead64dd Mon Sep 17 00:00:00 2001 From: Ihor Diachenko Date: Thu, 19 Feb 2026 16:55:06 +0200 Subject: [PATCH 17/17] Minor tweaks --- libs/gl-plugin/src/node/mod.rs | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/libs/gl-plugin/src/node/mod.rs b/libs/gl-plugin/src/node/mod.rs index 4539b629e..a9593e662 100644 --- a/libs/gl-plugin/src/node/mod.rs +++ b/libs/gl-plugin/src/node/mod.rs @@ -400,17 +400,18 @@ impl Node for PluginNodeServer { let state_snapshot = signer_state.lock().await.clone(); - // Send only the changes since the last time we sent state to this signer. + // Estimate the size of the full state to calculate the bandwidth savings of sending diffs let full_entries: Vec = state_snapshot.omit_tombstones().into(); let full_wire_bytes = signer_state_request_wire_bytes(&full_entries); + + // Send only the changes since the last time we sent state to this signer. let diff_state = last_sent_sketch.diff_state(&state_snapshot); let outgoing_entries: Vec = diff_state.clone().into(); - last_sent_sketch.apply_state(&diff_state); - let outgoing_wire_bytes = signer_state_request_wire_bytes(&outgoing_entries); let outgoing_entry_count = outgoing_entries.len(); + last_sent_sketch.apply_state(&diff_state); // TODO Consolidate protos in `gl-client` and `gl-plugin`, then remove this map. let outgoing_entries: Vec = outgoing_entries