Skip to content

Commit dd683c0

Browse files
committed
Use in-process event emitter instead of Redis pub/sub
1 parent 2dd6aa2 commit dd683c0

File tree

4 files changed

+38
-75
lines changed

4 files changed

+38
-75
lines changed

package.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
"cookie": "^1.0.1",
3939
"cookie-parser": "^1.4.4",
4040
"cors": "^2.8.5",
41+
"emittery": "^1.2.0",
4142
"escape-string-regexp": "^5.0.0",
4243
"explain-error": "^1.0.4",
4344
"express": "^5.0.0",

src/SocketServer.js

Lines changed: 16 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
import { promisify } from 'node:util';
22
import lodash from 'lodash';
3-
import sjson from 'secure-json-parse';
43
import { WebSocketServer } from 'ws';
54
import Ajv from 'ajv';
65
import { stdSerializers } from 'pino';
@@ -96,8 +95,6 @@ class SocketServer {
9695

9796
#logger;
9897

99-
#redisSubscription;
100-
10198
#wss;
10299

103100
#closing = false;
@@ -134,6 +131,8 @@ class SocketServer {
134131
*/
135132
#serverActions;
136133

134+
#unsubscribe;
135+
137136
/**
138137
* Create a socket server.
139138
*
@@ -157,7 +156,6 @@ class SocketServer {
157156
req: stdSerializers.req,
158157
},
159158
});
160-
this.#redisSubscription = uw.redis.duplicate();
161159

162160
this.options = {
163161
/** @type {(_socket: import('ws').WebSocket | undefined, err: Error) => void} */
@@ -176,11 +174,8 @@ class SocketServer {
176174
port: options.server ? undefined : options.port,
177175
});
178176

179-
uw.use(() => this.#redisSubscription.subscribe('uwave'));
180-
this.#redisSubscription.on('message', (channel, command) => {
181-
// this returns a promise, but we don't handle the error case:
182-
// there is not much we can do, so just let node.js crash w/ an unhandled rejection
183-
this.onServerMessage(channel, command);
177+
this.#unsubscribe = uw.events.onAny((command, data) => {
178+
this.#onServerMessage(command, data);
184179
});
185180

186181
this.#wss.on('error', (error) => {
@@ -678,33 +673,20 @@ class SocketServer {
678673
}
679674

680675
/**
681-
* Handle command messages coming in from Redis.
676+
* Handle command messages coming in from elsewhere in the app.
682677
* Some commands are intended to broadcast immediately to all connected
683678
* clients, but others require special action.
684679
*
685-
* @param {string} channel
686-
* @param {string} rawCommand
687-
* @returns {Promise<void>}
688-
* @private
680+
* @template {keyof import('./redisMessages.js').ServerActionParameters} K
681+
* @param {K} command
682+
* @param {import('./redisMessages.js').ServerActionParameters[K]} data
689683
*/
690-
async onServerMessage(channel, rawCommand) {
691-
/**
692-
* @type {{ command: string, data: import('type-fest').JsonValue }|undefined}
693-
*/
694-
const json = sjson.safeParse(rawCommand);
695-
if (!json) {
696-
return;
697-
}
698-
const { command, data } = json;
684+
#onServerMessage(command, data) {
685+
this.#logger.trace({ channel: command, command, data }, 'server message');
699686

700-
this.#logger.trace({ channel, command, data }, 'server message');
701-
702-
if (has(this.#serverActions, command)) {
703-
const action = this.#serverActions[command];
704-
if (action !== undefined) { // the types for `ServerActions` allow undefined, so...
705-
// @ts-expect-error TS2345 `data` is validated
706-
action(data);
707-
}
687+
const action = this.#serverActions[command];
688+
if (action !== undefined) {
689+
action(data);
708690
}
709691
}
710692

@@ -714,9 +696,10 @@ class SocketServer {
714696
* @returns {Promise<void>}
715697
*/
716698
async destroy() {
717-
clearInterval(this.#pinger);
718-
719699
this.#closing = true;
700+
701+
this.#unsubscribe();
702+
clearInterval(this.#pinger);
720703
clearInterval(this.#guestCountInterval);
721704

722705
for (const connection of this.#connections) {
@@ -725,7 +708,6 @@ class SocketServer {
725708

726709
const closeWsServer = promisify(this.#wss.close.bind(this.#wss));
727710
await closeWsServer();
728-
await this.#redisSubscription.quit();
729711
}
730712

731713
/**

src/Uwave.js

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import waitlist from './plugins/waitlist.js';
2121
import passport from './plugins/passport.js';
2222
import migrations from './plugins/migrations.js';
2323
import { SqliteDateColumnsPlugin, connect as connectSqlite } from './utils/sqlite.js';
24+
import Emittery from 'emittery';
2425

2526
const DEFAULT_SQLITE_PATH = './uwave.sqlite';
2627
const DEFAULT_REDIS_URL = 'redis://localhost:6379';
@@ -125,6 +126,11 @@ class UwaveServer extends EventEmitter {
125126
// @ts-expect-error TS2564 Definitely assigned in a plugin
126127
socketServer;
127128

129+
/** @type {Emittery<import('./redisMessages.js').ServerActionParameters>} */
130+
events = new Emittery({
131+
debug: { name: 'u-wave-core' },
132+
});
133+
128134
/**
129135
* @type {Map<string, Source>}
130136
*/
@@ -294,7 +300,7 @@ class UwaveServer extends EventEmitter {
294300
* @param {import('./redisMessages.js').ServerActionParameters[CommandName]} data
295301
*/
296302
publish(command, data) {
297-
return this.redis.publish('uwave', JSON.stringify({ command, data }));
303+
return this.events.emit(command, data);
298304
}
299305

300306
async listen() {

src/plugins/configStore.js

Lines changed: 14 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@ import EventEmitter from 'node:events';
44
import Ajv from 'ajv/dist/2019.js';
55
import formats from 'ajv-formats';
66
import jsonMergePatch from 'json-merge-patch';
7-
import sjson from 'secure-json-parse';
87
import ValidationError from '../errors/ValidationError.js';
98
import { sql } from 'kysely';
109
import { fromJson, json, jsonb } from '../utils/sqlite.js';
@@ -29,12 +28,12 @@ class ConfigStore {
2928

3029
#logger;
3130

32-
#subscriber;
33-
3431
#ajv;
3532

3633
#emitter = new EventEmitter();
3734

35+
#unsubscribe;
36+
3837
/** @type {Map<string, import('ajv').ValidateFunction<unknown>>} */
3938
#validators = new Map();
4039

@@ -44,7 +43,6 @@ class ConfigStore {
4443
constructor(uw) {
4544
this.#uw = uw;
4645
this.#logger = uw.logger.child({ ns: 'uwave:config' });
47-
this.#subscriber = uw.redis.duplicate();
4846
this.#ajv = new Ajv({
4947
useDefaults: true,
5048
// Allow unknown keywords (`uw:xyz`)
@@ -59,40 +57,16 @@ class ConfigStore {
5957
fs.readFileSync(new URL('../schemas/definitions.json', import.meta.url), 'utf8'),
6058
));
6159

62-
this.#subscriber.on('message', (_channel, command) => {
63-
this.#onServerMessage(command);
64-
});
65-
66-
uw.use(async () => this.#subscriber.subscribe('uwave'));
67-
}
68-
69-
/**
70-
* @param {string} rawCommand
71-
*/
72-
async #onServerMessage(rawCommand) {
73-
/**
74-
* @type {undefined|{
75-
* command: string,
76-
* data: import('../redisMessages.js').ServerActionParameters['configStore:update'],
77-
* }}
78-
*/
79-
const json = sjson.safeParse(rawCommand);
80-
if (!json) {
81-
return;
82-
}
83-
const { command, data } = json;
84-
if (command !== CONFIG_UPDATE_MESSAGE) {
85-
return;
86-
}
87-
88-
this.#logger.trace({ command, data }, 'handle config update');
60+
this.#unsubscribe = uw.events.on(CONFIG_UPDATE_MESSAGE, async (data) => {
61+
this.#logger.trace({ data }, 'handle config update');
8962

90-
try {
91-
const updatedSettings = await this.get(data.key);
92-
this.#emitter.emit(data.key, updatedSettings, data.user, data.patch);
93-
} catch (error) {
94-
this.#logger.error({ err: error }, 'could not retrieve settings after update');
95-
}
63+
try {
64+
const updatedSettings = await this.get(data.key);
65+
this.#emitter.emit(data.key, updatedSettings, data.user, data.patch);
66+
} catch (error) {
67+
this.#logger.error({ err: error }, 'could not retrieve settings after update');
68+
}
69+
});
9670
}
9771

9872
/**
@@ -258,8 +232,8 @@ class ConfigStore {
258232
};
259233
}
260234

261-
async destroy() {
262-
await this.#subscriber.quit();
235+
destroy() {
236+
this.#unsubscribe();
263237
}
264238
}
265239

@@ -268,7 +242,7 @@ class ConfigStore {
268242
*/
269243
async function configStorePlugin(uw) {
270244
uw.config = new ConfigStore(uw);
271-
uw.onClose(() => uw.config.destroy());
245+
uw.onClose(async () => uw.config.destroy());
272246
}
273247

274248
export default configStorePlugin;

0 commit comments

Comments
 (0)