diff --git a/lib/Socket.ts b/lib/Socket.ts index 37bd498b..6f377b89 100644 --- a/lib/Socket.ts +++ b/lib/Socket.ts @@ -1,54 +1,70 @@ import { BITSOCKET_URL, WS_URL } from "./BITBOX" import { SocketConfig } from "./interfaces/BITBOXInterfaces" +import io from "socket.io-client" +import EventSource from "eventsource" + +enum SocketType { + Uninitialized, + SocketIO, + BitSocket +} -const io: any = require("socket.io-client") export class Socket { socket: any websocketURL: string bitsocketURL: string - constructor(config: SocketConfig = {}) { - if (config.wsURL) { - // default to passed in wsURL - this.websocketURL = config.wsURL - } else if (config.restURL) { - // 2nd option deprecated restURL - this.websocketURL = config.restURL - } else { - // fallback to WS_URL - this.websocketURL = WS_URL - } - - if (config.bitsocketURL) { - this.bitsocketURL = config.bitsocketURL - } else { - this.bitsocketURL = BITSOCKET_URL - } + socketType: SocketType = SocketType.Uninitialized + constructor(config: SocketConfig = {}) { + // Order of preference: passed in wsURL, deprecated restURL, fallback WS_URL + this.websocketURL = config.wsURL || config.restURL || WS_URL + // Similar for BitSocket case + this.bitsocketURL = config.bitsocketURL || BITSOCKET_URL + // Execute callback (immediate, synchronous and unconditional) if (config.callback) config.callback() + // Note that we can't set socketType in constructor as config may contain + // both socket.io and BitSocket URLs, so we need to wait for listen() before + // we know which type it will be. } public listen(query: string, cb: Function): void { if (query === "blocks" || query === "transactions") { - this.socket = io(this.websocketURL, { transports: ["websocket"] }) - this.socket.emit(query) - - if (query === "blocks") this.socket.on("blocks", (msg: any) => cb(msg)) - else if (query === "transactions") - this.socket.on("transactions", (msg: any) => cb(msg)) - } else { - let EventSource = require("eventsource") - let b64 = Buffer.from(JSON.stringify(query)).toString("base64") - this.socket = new EventSource(`${this.bitsocketURL}/s/${b64}`) - this.socket.onmessage = (msg: any) => { - cb(msg.data) + // socket.io case + switch (this.socketType) { + case SocketType.Uninitialized: + this.socketType = SocketType.SocketIO + this.socket = io(this.websocketURL, { transports: ["websocket"] }) + case SocketType.SocketIO: + // Send server the event name of interest. At time of writing this is + // not used by the server but is left in so that server-side filtering + // is an option in the future. + this.socket.emit(query) + this.socket.on(query, (msg: any) => cb(msg)) + break; + case SocketType.BitSocket: + throw new Error("Query type not possible on a BitSocket connection.") + } + } else { + // BitSocket case + switch (this.socketType) { + case SocketType.Uninitialized: + this.socketType = SocketType.BitSocket + let b64 = Buffer.from(JSON.stringify(query)).toString("base64") + this.socket = new EventSource(`${this.bitsocketURL}/s/${b64}`) + this.socket.onmessage = (msg: any) => { + cb(msg.data) + } + break; + case SocketType.BitSocket: + throw new Error("Only one BitSocket query can be run at a time.") + case SocketType.SocketIO: + throw new Error("Query type not possible on a SocketIO connection.") } } } public close(cb?: Function): void { this.socket.close() - if (cb) { - cb() - } + if (cb) cb() } } diff --git a/package.json b/package.json index f63adb84..0a671fbd 100644 --- a/package.json +++ b/package.json @@ -34,6 +34,7 @@ "@bitcoin-dot-com/bitcoincashjs2-lib": "^4.1.0", "@types/bigi": "^1.4.2", "@types/bip39": "^2.4.2", + "@types/eventsource": "^1.1.2", "@types/randombytes": "^2.0.0", "@types/socket.io": "^2.1.2", "@types/socket.io-client": "^1.4.32", diff --git a/test/e2e/count-connections-issue-173/countConnectionsBitSocket.js b/test/e2e/count-connections-issue-173/countConnectionsBitSocket.js new file mode 100644 index 00000000..9ede17e2 --- /dev/null +++ b/test/e2e/count-connections-issue-173/countConnectionsBitSocket.js @@ -0,0 +1,62 @@ +const BITBOX = require("../../../lib/BITBOX").BITBOX; +const { exec } = require('child_process'); + +const bitbox = new BITBOX(); +const socket = new bitbox.Socket(); + +function countSockets(stage) { + return new Promise((resolve, reject) => { + // Call the lsof system command for outgoing internet connections. + exec(`lsof -i -n -P | grep ${process.pid}`, (err, stdout, stderr) => { + // Print list of open connections allowing a visual count to be done. + console.log(`Outbound connections from this node process ${stage}:\n${stdout}`); + resolve(); + }); + }); +} + +function sleep(ms) { + return new Promise(resolve => setTimeout(resolve, ms)); +} + +(async () => { + + await countSockets("before calling listen"); + + // First call to listen() which should create new connection. + socket.listen({"v": 3, "q": {"find": {}}}, + (message) => { + console.log("Callback from first query invoked."); + }); + + // Second call to listen() which should share connection with first call. + // Use try catch in case this throws one of our new errors. + try { + socket.listen({"v": 3, "q": {"find": {}}}, + (message) => { + console.log("Callback from first query invoked."); + }); + } catch(error) { + console.log(`ERROR: ${error.message}`); + } + + // listen doesn't return a promise so wait 100ms for connections to establish. + await sleep(100); + + await countSockets("after calling listen twice"); + + // now close the socket + socket.close(); + + // callback from close() is short-circuited so give it 100ms to clean up. + await sleep(100); + + // check if any zombie connections remaining + await countSockets("after calling close (zombie connections)"); + + // exit process + process.exit(); + +})().catch((error)=> {console.log(`ERROR: ${error.message}`)}); + + diff --git a/test/e2e/count-connections-issue-173/countConnectionsMixedBitSocketFirst.js b/test/e2e/count-connections-issue-173/countConnectionsMixedBitSocketFirst.js new file mode 100644 index 00000000..585d2015 --- /dev/null +++ b/test/e2e/count-connections-issue-173/countConnectionsMixedBitSocketFirst.js @@ -0,0 +1,62 @@ +const BITBOX = require("../../../lib/BITBOX").BITBOX; +const { exec } = require('child_process'); + +const bitbox = new BITBOX(); +const socket = new bitbox.Socket(); + +function countSockets(stage) { + return new Promise((resolve, reject) => { + // Call the lsof system command for outgoing internet connections. + exec(`lsof -i -n -P | grep ${process.pid}`, (err, stdout, stderr) => { + // Print list of open connections allowing a visual count to be done. + console.log(`Outbound connections from this node process ${stage}:\n${stdout}`); + resolve(); + }); + }); +} + +function sleep(ms) { + return new Promise(resolve => setTimeout(resolve, ms)); +} + +(async () => { + + await countSockets("before calling listen"); + + // First call to listen() which should create new BitSocket connection. + socket.listen({"v": 3, "q": {"find": {}}}, + (message) => { + console.log("Callback from first query invoked."); + }); + + // Second call to listen() which needs a socket.io connection and so can't + // share the exisiting connection. Use try catch in case this throws one of + // our new errors. + try { + socket.listen("blocks", (message) => { + console.log("Received a block."); + }); + } catch(error) { + console.log(`ERROR: ${error.message}`); + } + + // listen doesn't return a promise so wait 100ms for connections to establish. + await sleep(100); + + await countSockets("after calling listen twice"); + + // now close the socket + socket.close(); + + // callback from close() is short-circuited so give it 100ms to clean up. + await sleep(100); + + // check if any zombie connections remaining + await countSockets("after calling close (zombie connections)"); + + // exit process + process.exit(); + +})().catch((error)=> {console.log(`ERROR: ${error.message}`)}); + + diff --git a/test/e2e/count-connections-issue-173/countConnectionsMixedSocketIOFirst.js b/test/e2e/count-connections-issue-173/countConnectionsMixedSocketIOFirst.js new file mode 100644 index 00000000..24166ebc --- /dev/null +++ b/test/e2e/count-connections-issue-173/countConnectionsMixedSocketIOFirst.js @@ -0,0 +1,61 @@ +const BITBOX = require("../../../lib/BITBOX").BITBOX; +const { exec } = require('child_process'); + +const bitbox = new BITBOX(); +const socket = new bitbox.Socket(); + +function countSockets(stage) { + return new Promise((resolve, reject) => { + // Call the lsof system command for outgoing internet connections. + exec(`lsof -i -n -P | grep ${process.pid}`, (err, stdout, stderr) => { + // Print list of open connections allowing a visual count to be done. + console.log(`Outbound connections from this node process ${stage}:\n${stdout}`); + resolve(); + }); + }); +} + +function sleep(ms) { + return new Promise(resolve => setTimeout(resolve, ms)); +} + +(async () => { + + await countSockets("before calling listen"); + + // First call to listen() which should create new connection. + socket.listen("transactions", (message) => { + console.log("Received a transaction."); + }); + + // Second call to listen() which should share connection with first call. + // Use try catch in case this throws one of our new errors. + try { + socket.listen({"v": 3, "q": {"find": {}}}, + (message) => { + console.log("Callback from first query invoked."); + }); + } catch(error) { + console.log(`ERROR: ${error.message}`); + } + + // listen doesn't return a promise so wait 100ms for connections to establish. + await sleep(100); + + await countSockets("after calling listen twice"); + + // now close the socket + socket.close(); + + // callback from close() is short-circuited so give it 100ms to clean up. + await sleep(100); + + // check if any zombie connections remaining + await countSockets("after calling close (zombie connections)"); + + // exit process + process.exit(); + +})().catch((error)=> {console.log(`ERROR: ${error.message}`)}); + + diff --git a/test/e2e/count-connections-issue-173/countConnectionsSocketIO.js b/test/e2e/count-connections-issue-173/countConnectionsSocketIO.js new file mode 100644 index 00000000..18828cab --- /dev/null +++ b/test/e2e/count-connections-issue-173/countConnectionsSocketIO.js @@ -0,0 +1,60 @@ +const BITBOX = require("../../../lib/BITBOX").BITBOX; +const { exec } = require('child_process'); + +const bitbox = new BITBOX(); +const socket = new bitbox.Socket(); + +function countSockets(stage) { + return new Promise((resolve, reject) => { + // Call the lsof system command for outgoing internet connections. + exec(`lsof -i -n -P | grep ${process.pid}`, (err, stdout, stderr) => { + // Print list of open connections allowing a visual count to be done. + console.log(`Outbound connections from this node process ${stage}:\n${stdout}`); + resolve(); + }); + }); +} + +function sleep(ms) { + return new Promise(resolve => setTimeout(resolve, ms)); +} + +(async () => { + + await countSockets("before calling listen"); + + // First call to listen() which should create new connection. + socket.listen("transactions", (message) => { + console.log("Received a transaction."); + }); + + // Second call to listen() which should share connection with first call. + // Use try catch in case this throws one of our new errors. + try { + socket.listen("blocks", (message) => { + console.log("Received a block."); + }); + } catch(error) { + console.log(`ERROR: ${error.message}`); + } + + // listen doesn't return a promise so wait 100ms for connections to establish. + await sleep(100); + + await countSockets("after calling listen twice"); + + // now close the socket + socket.close(); + + // callback from close() is short-circuited so give it 100ms to clean up. + await sleep(100); + + // check if any zombie connections remaining + await countSockets("after calling close (zombie connections)"); + + // exit process + process.exit(); + +})().catch((error)=> {console.log(`ERROR: ${error.message}`)}); + + diff --git a/yarn.lock b/yarn.lock index 7e247055..87d3caf8 100644 --- a/yarn.lock +++ b/yarn.lock @@ -330,6 +330,11 @@ resolved "https://registry.yarnpkg.com/@types/events/-/events-3.0.0.tgz#2862f3f58a9a7f7c3e78d79f130dd4d71c25c2a7" integrity sha512-EaObqwIvayI5a8dCzhFrjKzVwKLxjoG9T6Ppd5CEo07LRKfQ8Yokw54r5+Wq7FaBQ+yXRvQAYPrHwya1/UFt9g== +"@types/eventsource@^1.1.2": + version "1.1.2" + resolved "https://registry.yarnpkg.com/@types/eventsource/-/eventsource-1.1.2.tgz#079ab4213e844e56f7384aec620e1163dab692b3" + integrity sha512-4AKWJ6tvEU4fk0770oAK4Z0lQUuSnc5ljHTcYZhQtdP7XMDKKvegGUC6xGD8+4+F+svZKAzlxbKnuGWfgMtgVA== + "@types/glob@^7.1.1": version "7.1.1" resolved "https://registry.yarnpkg.com/@types/glob/-/glob-7.1.1.tgz#aa59a1c6e3fbc421e07ccd31a944c30eba521575"