Skip to content

Commit 92f073c

Browse files
authored
Bootstrap and replication fixes (#259)
* bootstrap and replication fixes * update the doc * Post review fixes
1 parent 38fb84b commit 92f073c

14 files changed

+807
-73
lines changed
Lines changed: 224 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,224 @@
1+
# P2P Replication + Routing Eligibility Changes (2026-01-13)
2+
3+
This document describes all changes in the `https://github.com/LumeraProtocol/supernode/pull/259` that affect the P2P/Kademlia subsystem, including the replication/backlog fixes and routing-table eligibility gating.
4+
5+
## Context / Symptoms Observed
6+
7+
These changes were prompted by two production symptoms:
8+
9+
1. P2P metrics error (status/telemetry path):
10+
- Log: `failed to get p2p records count`
11+
- Error: `failed to get count of records: context deadline exceeded`
12+
- Typical attributes included `host=<supernode identity>`, `service=<ip:port>`
13+
14+
2. Replication producing huge key payloads (replication worker path):
15+
- Log: `replicate batching keys`
16+
- Example attributes:
17+
- `batch_size=5000`
18+
- `keys=15934902`
19+
- `batches=3187`
20+
- `correlation_id=supernode-start`
21+
- `module=p2p`
22+
- `rep-id=<peer supernode account>`
23+
- `rep-ip=<peer ip>`
24+
25+
Historically, replication had a bug that prevented progress when payloads became oversized; batching fixed request size, but exposed/created a large backlog catch-up scenario where a single replication cycle attempted to send millions of keys.
26+
27+
## High-Level Outcomes
28+
29+
The combined changes aim to:
30+
31+
- Make replication “catch-up” bounded and incremental (steady progress instead of huge one-shot attempts).
32+
- Reduce replication complexity (avoid per-peer re-scans of the same key set).
33+
- Advance replication cursors safely (avoid skipping keys when operating in bounded windows).
34+
- Reduce noisy error logging for best-effort DB metrics under load.
35+
- Gate routing table membership to chain-active supernodes (avoid non-active peers participating in routing and replication decisions).
36+
37+
## Change Inventory (Files Modified)
38+
39+
The following files are modified:
40+
41+
- `p2p/kademlia/bootstrap.go`
42+
- `p2p/kademlia/dht.go`
43+
- `p2p/kademlia/node_activity.go`
44+
- `p2p/kademlia/redundant_data.go`
45+
- `p2p/kademlia/replication.go`
46+
- `p2p/kademlia/store.go`
47+
- `p2p/kademlia/store/mem/mem.go`
48+
- `p2p/kademlia/store/sqlite/replication.go`
49+
- `p2p/kademlia/store/sqlite/sqlite.go`
50+
51+
## Detailed Changes
52+
53+
### 1) Routing table eligibility gating (chain-active only)
54+
55+
#### `p2p/kademlia/bootstrap.go`
56+
57+
**What changed**
58+
59+
- `loadBootstrapCandidatesFromChain(...)` now returns two values:
60+
1) `map[string]*Node` candidates keyed by `ip:port`
61+
2) `map[[32]byte]struct{}` allowlist of *active* supernode IDs, stored as `blake3(supernodeAccount)` keys
62+
- The supernode account string is trimmed, validated non-empty, and used consistently as `node.ID`.
63+
- `SyncBootstrapOnce(...)` now:
64+
- calls `setRoutingAllowlist(...)` with the active ID hash-set
65+
- calls `pruneIneligibleRoutingPeers(...)` to remove already-admitted peers that are now ineligible
66+
67+
**Why**
68+
69+
- Kademlia routing decisions (closest nodes) should only consider chain-active supernodes. Without this, postponed/disabled/stopped nodes can:
70+
- be admitted via inbound traffic
71+
- appear in `FindNodeResponse.Closest`
72+
- skew replication responsibility calculations
73+
74+
#### `p2p/kademlia/dht.go`
75+
76+
**What changed**
77+
78+
- Added an in-memory allowlist gate to `DHT`:
79+
- `routingAllow map[[32]byte]struct{}`
80+
- `routingAllowReady atomic.Bool`
81+
- `routingAllowCount atomic.Int64`
82+
- guarded by `routingAllowMu`
83+
- New helpers:
84+
- `setRoutingAllowlist(ctx, allow)`:
85+
- refuses empty allowlists (to avoid locking out due to transient chain issues)
86+
- no-ops when `INTEGRATION_TEST=true`
87+
- `eligibleForRouting(node)`:
88+
- returns true when allowlist isn’t ready (bootstrap safety)
89+
- returns true when `INTEGRATION_TEST=true`
90+
- otherwise checks `blake3(node.ID)` membership in the allowlist
91+
- `filterEligibleNodes(nodes)` filters node slices returned by the network (FindNode/FindValue paths)
92+
- `pruneIneligibleRoutingPeers(ctx)` walks the current routing table and removes ineligible nodes
93+
- Integrated eligibility checks into:
94+
- iterative lookup handling: `nl.AddNodes(s.filterEligibleNodes(v.Closest))`
95+
- response handling for `FindNode`/`StoreData`/`FindValue`
96+
- `addNode(...)`:
97+
- early nil/empty-ID returns
98+
- invalid IP rejection log level reduced to Debug
99+
- chain-state gating: rejects peers not in allowlist
100+
- `addKnownNodes(...)`:
101+
- skips nil/empty-ID nodes
102+
- skips nodes failing `eligibleForRouting(...)`
103+
104+
**Why**
105+
106+
- Prevent non-active peers from entering buckets, appearing in closest-sets, or being considered as replication targets.
107+
- Keep performance acceptable by keeping gating as a fast memory lookup, updated on the bootstrap refresh cadence.
108+
109+
#### `p2p/kademlia/node_activity.go`
110+
111+
**What changed**
112+
113+
- Node activity checks now skip pinging/promoting nodes that are not eligible for routing.
114+
- If a node is currently marked Active but becomes ineligible:
115+
- it is removed from routing (`removeNode`)
116+
- replication info is updated to inactive via `store.UpdateIsActive(..., false, false)`
117+
- `handlePingSuccess` now also refuses to “promote” an ineligible node to active/routing.
118+
119+
**Why**
120+
121+
- Avoid spending cycles on peers that the chain says should not participate.
122+
- Avoid “re-activating” a peer solely due to network reachability when it’s ineligible by chain state.
123+
124+
### 2) Replication backlog + oversized payload mitigation (bounded windows + one-pass assignment)
125+
126+
#### Root-cause (before)
127+
128+
- Replication used a time window `[globalFrom, now]` and fetched *all* keys from sqlite in that window.
129+
- For each peer, it then:
130+
- found the slice index for that peer’s `lastReplicatedAt` within the global key list, then
131+
- iterated over all remaining keys to filter “keys this peer should hold”.
132+
- After batching was added, a peer could still be responsible for millions of keys, which became thousands of batch RPCs.
133+
- Cursor update (`lastReplicatedAt`) happened only after all batches succeeded; a single failure in thousands of batches meant zero progress and repeated attempts next cycle.
134+
135+
#### `p2p/kademlia/store.go`
136+
137+
**What changed**
138+
139+
- Store interface method signature changed:
140+
- from: `GetKeysForReplication(ctx, from, to)`
141+
- to: `GetKeysForReplication(ctx, from, to, maxKeys int)` where `maxKeys <= 0` means “unlimited”.
142+
143+
**Why**
144+
145+
- Allows callers (replication worker) to bound the per-cycle key scan to a manageable size.
146+
147+
#### `p2p/kademlia/store/sqlite/replication.go`
148+
149+
**What changed**
150+
151+
- Added support for bounded key scans:
152+
- Base query now orders deterministically: `ORDER BY createdAt ASC, key ASC`
153+
- When `maxKeys > 0`, a `LIMIT ?` is applied.
154+
- If the limit is hit, the query is extended to include all additional rows where `createdAt` equals the last row’s `createdAt` (by fetching `createdAt = boundAt AND key > boundKey`).
155+
- `ctx.Err()` is checked post-query; cancellation/deadline is treated as failure (returns nil).
156+
157+
**Why**
158+
159+
- Ordering by `(createdAt, key)` provides a stable cursor for a bounded window.
160+
- Including “same `createdAt`” rows at the limit boundary prevents the replication cursor from skipping keys that share the same timestamp.
161+
162+
#### `p2p/kademlia/store/mem/mem.go`
163+
164+
**What changed**
165+
166+
- Updated `GetKeysForReplication` signature to match interface (`maxKeys` unused).
167+
168+
#### `p2p/kademlia/replication.go`
169+
170+
**What changed**
171+
172+
- Introduced `replicateKeysScanMax = 200000` to bound the per-cycle DB scan.
173+
- Replication now fetches:
174+
- `replicationKeys := store.GetKeysForReplication(globalFrom, to, replicateKeysScanMax)`
175+
- Defines a stable replication “window end”:
176+
- `windowEnd = lastKey.CreatedAt` if keys exist, else `windowEnd = to`
177+
- Replaces per-peer filtering loops with a one-pass assignment:
178+
- Build `peerStart[peerID] = lastReplicatedAt (or historicStart)`
179+
- For each key:
180+
- compute `closestIDs := closestContactsWithIncludingNode(Alpha, key, ignores, self)`
181+
- for each closest ID that is an active peer, if `key.CreatedAt.After(peerStart[id])`, append key into `assignedKeys[id]`
182+
- Each active peer then:
183+
- sends `assignedKeys[peerID]` in 5000-key batches (existing batching)
184+
- on full success, advances `lastReplicatedAt` to `windowEnd` (not `time.Now()`)
185+
- if no keys assigned for that peer, still advances `lastReplicatedAt` to `windowEnd`
186+
- `adjustNodeKeys` switched to the new `GetKeysForReplication` signature (unbounded).
187+
188+
**Why**
189+
190+
- Bounding the scan turns replication into incremental progress rather than an unbounded backlog dump.
191+
- `windowEnd` ensures cursor advancement is aligned to what was actually scanned/processed.
192+
- One-pass assignment reduces CPU/memory pressure from O(peers * keys) scanning to O(keys * Alpha) assignment.
193+
194+
**Operational expectations**
195+
196+
- Nodes with large backlogs will now “catch up” over multiple intervals (e.g., 16M keys at 200k keys/cycle ≈ 80 cycles).
197+
- Per-interval replication logs should show materially smaller `keys` and `batches` counts.
198+
199+
### 3) Redundant data cleanup worker compatibility
200+
201+
#### `p2p/kademlia/redundant_data.go`
202+
203+
- Updated call signature for `GetKeysForReplication(..., 0)` to mean “unlimited” in the redundant-data cleanup path.
204+
205+
### 4) P2P DB stats logging noise reduction
206+
207+
#### `p2p/kademlia/store/sqlite/sqlite.go`
208+
209+
**What changed**
210+
211+
- When `Store.Count(ctx)` fails inside `Store.Stats(ctx)`:
212+
- If the error is `context.DeadlineExceeded` or `context.Canceled`, it logs at Info instead of Error.
213+
- Other errors remain Error.
214+
215+
**Why**
216+
217+
- In production the count query (`SELECT COUNT(*) FROM data`) can legitimately time out under heavy DB load or when metrics collection has a short deadline.
218+
- The count is typically a best-effort metric; logging it as ERROR creates noise and can mask real faults.
219+
220+
## Notes / Known Limitations
221+
222+
- Replication assignment uses the current routing table’s `closestContactsWithIncludingNode(...)`. If an otherwise active peer is not present in routing-table-derived closest sets, it may receive zero keys for a window and still have its `lastReplicatedAt` advanced to `windowEnd`. This behavior existed in the prior approach as well (advancing on “no closest keys”), but bounding makes it more visible as replication progresses window-by-window.
223+
- `replicateKeysScanMax` is a constant. If you need environment-specific tuning, consider making it configurable (config file/env var), but this change intentionally keeps scope minimal.
224+
- The bounded sqlite query may return slightly more than `maxKeys` due to the “same createdAt extension” for correctness.

p2p/kademlia/bootstrap.go

Lines changed: 31 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@ import (
44
"context"
55
"fmt"
66
"net"
7-
"os"
87
"strconv"
98
"strings"
109
"time"
@@ -13,6 +12,7 @@ import (
1312
"github.com/LumeraProtocol/supernode/v2/pkg/errors"
1413
"github.com/LumeraProtocol/supernode/v2/pkg/logtrace"
1514
ltc "github.com/LumeraProtocol/supernode/v2/pkg/net/credentials"
15+
"github.com/LumeraProtocol/supernode/v2/pkg/utils"
1616
)
1717

1818
const (
@@ -22,8 +22,7 @@ const (
2222

2323
// seed a couple of obviously bad addrs (unless in integration tests)
2424
func (s *DHT) skipBadBootstrapAddrs() {
25-
isTest := os.Getenv("INTEGRATION_TEST") == "true"
26-
if isTest {
25+
if integrationTestEnabled() {
2726
return
2827
}
2928
s.cache.Set(fmt.Sprintf("%s:%d", "127.0.0.1", s.options.Port), []byte("true"))
@@ -65,7 +64,7 @@ func (s *DHT) parseNode(extP2P string, selfAddr string) (*Node, error) {
6564
}
6665

6766
// Hygiene: reject non-routables unless in integration tests
68-
isTest := os.Getenv("INTEGRATION_TEST") == "true"
67+
isTest := integrationTestEnabled()
6968
if parsed := net.ParseIP(ip); parsed != nil {
7069
if parsed.IsUnspecified() || parsed.IsLinkLocalUnicast() || parsed.IsLinkLocalMulticast() {
7170
return nil, errors.New("non-routable address")
@@ -111,12 +110,13 @@ func (s *DHT) setBootstrapNodesFromConfigVar(ctx context.Context, bootstrapNodes
111110

112111
// loadBootstrapCandidatesFromChain returns active supernodes (by latest state)
113112
// mapped by "ip:port". No pings here.
114-
func (s *DHT) loadBootstrapCandidatesFromChain(ctx context.Context, selfAddress string) (map[string]*Node, error) {
113+
func (s *DHT) loadBootstrapCandidatesFromChain(ctx context.Context, selfAddress string) (map[string]*Node, map[[32]byte]struct{}, error) {
115114
resp, err := s.options.LumeraClient.SuperNode().ListSuperNodes(ctx)
116115
if err != nil {
117-
return nil, fmt.Errorf("failed to list supernodes: %w", err)
116+
return nil, nil, fmt.Errorf("failed to list supernodes: %w", err)
118117
}
119118

119+
activeIDs := make(map[[32]byte]struct{}, len(resp.Supernodes))
120120
mapNodes := make(map[string]*Node, len(resp.Supernodes))
121121
for _, sn := range resp.Supernodes {
122122
if len(sn.States) == 0 {
@@ -134,6 +134,23 @@ func (s *DHT) loadBootstrapCandidatesFromChain(ctx context.Context, selfAddress
134134
continue
135135
}
136136

137+
id := strings.TrimSpace(sn.SupernodeAccount)
138+
if id == "" {
139+
continue
140+
}
141+
h, err := utils.Blake3Hash([]byte(id))
142+
if err != nil {
143+
logtrace.Debug(ctx, "failed to compute Blake3 hash for supernode ID", logtrace.Fields{
144+
logtrace.FieldModule: "p2p",
145+
logtrace.FieldError: err.Error(),
146+
"supernode": sn.SupernodeAccount,
147+
})
148+
} else if len(h) == 32 {
149+
var key [32]byte
150+
copy(key[:], h)
151+
activeIDs[key] = struct{}{}
152+
}
153+
137154
// latest IP by height
138155
var latestIP string
139156
var maxHeight int64 = -1
@@ -170,10 +187,10 @@ func (s *DHT) loadBootstrapCandidatesFromChain(ctx context.Context, selfAddress
170187
})
171188
continue
172189
}
173-
node.ID = []byte(sn.SupernodeAccount)
190+
node.ID = []byte(id)
174191
mapNodes[full] = node
175192
}
176-
return mapNodes, nil
193+
return mapNodes, activeIDs, nil
177194
}
178195

179196
// upsertBootstrapNode inserts/updates replication_info for the discovered node (Active=false).
@@ -248,11 +265,16 @@ func (s *DHT) SyncBootstrapOnce(ctx context.Context, bootstrapNodes string) erro
248265
}
249266
selfAddress := fmt.Sprintf("%s:%d", parseSupernodeAddress(supernodeAddr), s.options.Port)
250267

251-
cands, err := s.loadBootstrapCandidatesFromChain(ctx, selfAddress)
268+
cands, activeIDs, err := s.loadBootstrapCandidatesFromChain(ctx, selfAddress)
252269
if err != nil {
253270
return err
254271
}
255272

273+
// Update eligibility gate from chain Active state and prune any peers that slipped in via
274+
// inbound traffic before the last bootstrap refresh.
275+
s.setRoutingAllowlist(ctx, activeIDs)
276+
s.pruneIneligibleRoutingPeers(ctx)
277+
256278
// Upsert candidates to replication_info
257279
seen := make(map[string]struct{}, len(cands))
258280
s.options.BootstrapNodes = s.options.BootstrapNodes[:0]

0 commit comments

Comments
 (0)