diff --git a/src/config/constants.js b/src/config/constants.js index aba2c63..825dcd4 100644 --- a/src/config/constants.js +++ b/src/config/constants.js @@ -25,6 +25,10 @@ const BROADCAST_THROTTLE = 1000; const DIAGNOSTICS_INTERVAL = 10000; const PORT = process.env.PORT || 3000; +// Rate Limiting: Semi effective mitigation to limit abuse from malicious peers / Sybil attack leveraging weak PoW. 100 new peers per 5 seconds per connection. +const RATE_LIMIT_WINDOW = 5000; +const RATE_LIMIT_MAX_NEW_PEERS = 1000; + module.exports = { TOPIC_NAME, TOPIC, @@ -39,4 +43,6 @@ module.exports = { BROADCAST_THROTTLE, DIAGNOSTICS_INTERVAL, PORT, + RATE_LIMIT_WINDOW, + RATE_LIMIT_MAX_NEW_PEERS, }; diff --git a/src/p2p/messaging.js b/src/p2p/messaging.js index 67967c6..8d7d552 100644 --- a/src/p2p/messaging.js +++ b/src/p2p/messaging.js @@ -1,5 +1,5 @@ const { verifyPoW, verifySignature, createPublicKey } = require("../core/security"); -const { MAX_RELAY_HOPS } = require("../config/constants"); +const { MAX_RELAY_HOPS, RATE_LIMIT_WINDOW, RATE_LIMIT_MAX_NEW_PEERS } = require("../config/constants"); const { BloomFilterManager } = require("../state/bloom"); class MessageHandler { @@ -62,10 +62,21 @@ class MessageHandler { if (wasNew) { this.diagnostics.increment("newPeersAdded"); this.broadcastCallback(); + if (hops === 0) { + const now = Date.now(); + if (!sourceSocket.rateLimiter || now > sourceSocket.rateLimiter.resetTime) { + sourceSocket.rateLimiter = { count: 0, resetTime: now + RATE_LIMIT_WINDOW }; + } + if (++sourceSocket.rateLimiter.count >= RATE_LIMIT_MAX_NEW_PEERS) { + this.diagnostics.increment("rateLimitedConnections"); + sourceSocket.destroy(); + return; + } + } } // Only relay if we haven't already relayed this message (bloom filter check) - if (hops < MAX_RELAY_HOPS && !this.bloomFilter.hasRelayed(id, seq)) { + if (hops >= 0 && hops < MAX_RELAY_HOPS && !this.bloomFilter.hasRelayed(id, seq)) { this.bloomFilter.markRelayed(id, seq); this.diagnostics.increment("heartbeatsRelayed"); this.relayCallback({ ...msg, hops: hops + 1 }, sourceSocket); @@ -97,7 +108,7 @@ class MessageHandler { this.broadcastCallback(); // Use id:leave as key for LEAVE messages - if (hops < MAX_RELAY_HOPS && !this.bloomFilter.hasRelayed(id, "leave")) { + if (hops >= 0 && hops < MAX_RELAY_HOPS && !this.bloomFilter.hasRelayed(id, "leave")) { this.bloomFilter.markRelayed(id, "leave"); this.relayCallback({ ...msg, hops: hops + 1 }, sourceSocket); } @@ -116,15 +127,20 @@ const validateMessage = (msg) => { const allowedFields = ['type', 'id', 'seq', 'hops', 'nonce', 'sig']; const fields = Object.keys(msg); return fields.every(f => allowedFields.includes(f)) && - msg.id && typeof msg.seq === 'number' && - typeof msg.hops === 'number' && msg.nonce && msg.sig; + typeof msg.id === 'string' && msg.id.length > 0 && + typeof msg.seq === 'number' && Number.isInteger(msg.seq) && msg.seq >= 0 && + typeof msg.hops === 'number' && Number.isInteger(msg.hops) && msg.hops >= 0 && + typeof msg.nonce === 'number' && Number.isInteger(msg.nonce) && msg.nonce >= 0 && + typeof msg.sig === 'string' && msg.sig.length > 0; } if (msg.type === "LEAVE") { const allowedFields = ['type', 'id', 'hops', 'sig']; const fields = Object.keys(msg); return fields.every(f => allowedFields.includes(f)) && - msg.id && typeof msg.hops === 'number' && msg.sig; + typeof msg.id === 'string' && msg.id.length > 0 && + typeof msg.hops === 'number' && Number.isInteger(msg.hops) && msg.hops >= 0 && + typeof msg.sig === 'string' && msg.sig.length > 0; } return false; diff --git a/src/state/diagnostics.js b/src/state/diagnostics.js index 659e406..6fcfabb 100644 --- a/src/state/diagnostics.js +++ b/src/state/diagnostics.js @@ -12,6 +12,7 @@ class DiagnosticsManager { bytesReceived: 0, bytesRelayed: 0, leaveMessages: 0, + rateLimitedConnections: 0, }; this.interval = null;