Skip to content

Commit 96ae461

Browse files
Store maybe-missed socket messages in SQLite (#727)
1 parent 78131da commit 96ae461

File tree

7 files changed

+164
-64
lines changed

7 files changed

+164
-64
lines changed

package.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,7 @@
7474
"u-wave-announce": "=0.6.0-alpha.1",
7575
"u-wave-source-soundcloud": "^2.0.2",
7676
"u-wave-source-youtube": "^2.0.0",
77+
"ulid": "^3.0.2",
7778
"ultron": "^1.1.1",
7879
"umzug": "^3.1.0",
7980
"ws": "^8.0.0",

src/SocketServer.js

Lines changed: 62 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,9 @@ import GuestConnection from './sockets/GuestConnection.js';
1111
import AuthedConnection from './sockets/AuthedConnection.js';
1212
import LostConnection from './sockets/LostConnection.js';
1313
import { serializeUser } from './utils/serialize.js';
14+
import { ulid, encodeTime } from 'ulid';
15+
import { jsonb } from './utils/sqlite.js';
16+
import { subMinutes } from 'date-fns';
1417

1518
const { isEmpty } = lodash;
1619

@@ -472,7 +475,7 @@ class SocketServer {
472475
.selectAll()
473476
.execute();
474477
disconnectedUsers.forEach((user) => {
475-
this.add(this.createLostConnection(user, 'TODO: Actual session ID!!'));
478+
this.add(this.createLostConnection(user, 'TODO: Actual session ID!!', null));
476479
});
477480
}
478481

@@ -536,15 +539,15 @@ class SocketServer {
536539
connection.on('close', () => {
537540
this.remove(connection);
538541
});
539-
connection.on('authenticate', async (user, sessionID) => {
542+
connection.on('authenticate', async (user, sessionID, lastEventID) => {
540543
const isReconnect = await connection.isReconnect(sessionID);
541-
this.#logger.info({ userId: user.id, isReconnect }, 'authenticated socket');
544+
this.#logger.info({ userId: user.id, isReconnect, lastEventID }, 'authenticated socket');
542545
if (isReconnect) {
543546
const previousConnection = this.getLostConnection(sessionID);
544547
if (previousConnection) this.remove(previousConnection);
545548
}
546549

547-
this.replace(connection, this.createAuthedConnection(socket, user, sessionID));
550+
this.replace(connection, this.createAuthedConnection(socket, user, sessionID, lastEventID));
548551

549552
if (!isReconnect) {
550553
this.#uw.publish('user:join', { userID: user.id });
@@ -559,18 +562,19 @@ class SocketServer {
559562
* @param {import('ws').WebSocket} socket
560563
* @param {User} user
561564
* @param {string} sessionID
565+
* @param {string|null} lastEventID
562566
* @returns {AuthedConnection}
563567
* @private
564568
*/
565-
createAuthedConnection(socket, user, sessionID) {
566-
const connection = new AuthedConnection(this.#uw, socket, user, sessionID);
567-
connection.on('close', ({ banned }) => {
569+
createAuthedConnection(socket, user, sessionID, lastEventID) {
570+
const connection = new AuthedConnection(this.#uw, socket, user, sessionID, lastEventID);
571+
connection.on('close', ({ banned, lastEventID }) => {
568572
if (banned) {
569573
this.#logger.info({ userId: user.id }, 'removing connection after ban');
570574
disconnectUser(this.#uw, user.id);
571575
} else if (!this.#closing) {
572576
this.#logger.info({ userId: user.id }, 'lost connection');
573-
this.add(this.createLostConnection(user, sessionID));
577+
this.add(this.createLostConnection(user, sessionID, lastEventID));
574578
}
575579
this.remove(connection);
576580
});
@@ -603,11 +607,18 @@ class SocketServer {
603607
*
604608
* @param {User} user
605609
* @param {string} sessionID
610+
* @param {string|null} lastEventID
606611
* @returns {LostConnection}
607612
* @private
608613
*/
609-
createLostConnection(user, sessionID) {
610-
const connection = new LostConnection(this.#uw, user, sessionID, this.options.timeout);
614+
createLostConnection(user, sessionID, lastEventID) {
615+
const connection = new LostConnection(
616+
this.#uw,
617+
user,
618+
sessionID,
619+
lastEventID,
620+
this.options.timeout,
621+
);
611622
connection.on('close', () => {
612623
this.#logger.info({ userId: user.id }, 'user left');
613624
this.remove(connection);
@@ -732,6 +743,40 @@ class SocketServer {
732743
this.#connections.forEach((connection) => {
733744
connection.ping();
734745
});
746+
747+
this.#cleanupMessageQueue().catch((err) => {
748+
this.#logger.error({ err }, 'failed to clean up socket message queue');
749+
});
750+
}
751+
752+
async #cleanupMessageQueue() {
753+
const oldestID = encodeTime(subMinutes(new Date(), 10).getTime());
754+
755+
await this.#uw.db.deleteFrom('socketMessageQueue')
756+
.where('id', '<', oldestID)
757+
.execute();
758+
}
759+
760+
/**
761+
* Broadcast a command to all connected clients.
762+
*
763+
* @param {string} command Command name.
764+
* @param {import('type-fest').JsonValue} data Command data.
765+
* @param {import('./schema.js').UserID | null} targetUserID
766+
*/
767+
#recordMessage(command, data, targetUserID = null) {
768+
const id = ulid();
769+
770+
this.#uw.db.insertInto('socketMessageQueue')
771+
.values({
772+
id,
773+
command,
774+
data: jsonb(data),
775+
targetUserID,
776+
})
777+
.execute();
778+
779+
return id;
735780
}
736781

737782
/**
@@ -741,7 +786,10 @@ class SocketServer {
741786
* @param {import('type-fest').JsonValue} data Command data.
742787
*/
743788
broadcast(command, data) {
789+
const id = this.#recordMessage(command, data);
790+
744791
this.#logger.trace({
792+
id,
745793
command,
746794
data,
747795
to: this.#connections.map((connection) => (
@@ -750,23 +798,24 @@ class SocketServer {
750798
}, 'broadcast');
751799

752800
this.#connections.forEach((connection) => {
753-
connection.send(command, data);
801+
connection.send(id, command, data);
754802
});
755803
}
756804

757805
/**
758806
* Send a command to a single user.
759807
*
760-
* @param {User|string} user User or user ID to send the command to.
808+
* @param {User|import('./schema.js').UserID} user User or user ID to send the command to.
761809
* @param {string} command Command name.
762810
* @param {import('type-fest').JsonValue} data Command data.
763811
*/
764812
sendTo(user, command, data) {
765813
const userID = typeof user === 'object' ? user.id : user;
814+
const id = this.#recordMessage(command, data, userID);
766815

767816
this.#connections.forEach((connection) => {
768817
if ('user' in connection && connection.user.id === userID) {
769-
connection.send(command, data);
818+
connection.send(id, command, data);
770819
}
771820
});
772821
}
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
'use strict';
2+
3+
const { sql } = require('kysely');
4+
5+
/**
6+
* @param {import('umzug').MigrationParams<import('../Uwave').default>} params
7+
*/
8+
async function up({ context: uw }) {
9+
const { db } = uw;
10+
11+
await db.schema.createTable('socket_message_queue')
12+
// This contains a ULID which also encodes the timestamp.
13+
.addColumn('id', 'text', (col) => col.primaryKey())
14+
.addColumn('target_user_id', 'uuid', (col) => col.references('users.id'))
15+
.addColumn('command', 'text', (col) => col.notNull())
16+
.addColumn('data', 'jsonb', (col) => col.notNull().defaultTo(sql`(jsonb('null'))`))
17+
.execute();
18+
}
19+
20+
/**
21+
* @param {import('umzug').MigrationParams<import('../Uwave').default>} params
22+
*/
23+
async function down({ context: uw }) {
24+
const { db } = uw;
25+
26+
await db.schema.dropTable('socket_message_queue').execute();
27+
}
28+
29+
module.exports = { up, down };

src/schema.ts

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -163,6 +163,13 @@ export interface MigrationTable {
163163
name: string,
164164
}
165165

166+
export interface SocketMessageTable {
167+
id: string,
168+
targetUserID: UserID | null,
169+
command: string,
170+
data: JSONB<JsonValue>,
171+
}
172+
166173
export interface Database {
167174
configuration: ConfigurationTable,
168175
keyval: KeyvalTable,
@@ -179,6 +186,7 @@ export interface Database {
179186
playlistItems: PlaylistItemTable,
180187
historyEntries: HistoryEntryTable,
181188
feedback: FeedbackTable,
189+
socketMessageQueue: SocketMessageTable,
182190
}
183191

184192
export type Kysely = KyselyBase<Database>;

src/sockets/AuthedConnection.js

Lines changed: 41 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@ import EventEmitter from 'node:events';
22
import Ultron from 'ultron';
33
import WebSocket from 'ws';
44
import sjson from 'secure-json-parse';
5+
import { ulid } from 'ulid';
6+
import { fromJson, json } from '../utils/sqlite.js';
57

68
const PING_TIMEOUT = 5_000;
79
const DEAD_TIMEOUT = 30_000;
@@ -13,13 +15,19 @@ class AuthedConnection extends EventEmitter {
1315

1416
#lastMessage = Date.now();
1517

18+
// Ideally, the client should actually be responsible for this,
19+
// because the server only knows if something was *sent*, not if it was received.
20+
/** @type {string|null} */
21+
#lastEventID = null;
22+
1623
/**
1724
* @param {import('../Uwave.js').default} uw
1825
* @param {import('ws').WebSocket} socket
1926
* @param {import('../schema.js').User} user
2027
* @param {string} sessionID
28+
* @param {string|null} lastEventID
2129
*/
22-
constructor(uw, socket, user, sessionID) {
30+
constructor(uw, socket, user, sessionID, lastEventID) {
2331
super();
2432
this.uw = uw;
2533
this.socket = socket;
@@ -31,7 +39,10 @@ class AuthedConnection extends EventEmitter {
3139
});
3240

3341
this.#events.on('close', () => {
34-
this.emit('close', { banned: this.banned });
42+
this.emit('close', {
43+
banned: this.banned,
44+
lastEventID: this.#lastEventID,
45+
});
3546
});
3647
this.#events.on('message', (raw) => {
3748
this.#onMessage(raw);
@@ -40,7 +51,9 @@ class AuthedConnection extends EventEmitter {
4051
this.#onPong();
4152
});
4253

43-
this.sendWaiting();
54+
this.#sendWaiting(lastEventID).catch((err) => {
55+
this.#logger.error({ err }, 'failed to send waiting messages on reconnect');
56+
});
4457
}
4558

4659
/**
@@ -50,29 +63,31 @@ class AuthedConnection extends EventEmitter {
5063
return `http-api:disconnected:${this.sessionID}`;
5164
}
5265

53-
/**
54-
* @private
55-
*/
56-
get messagesKey() {
57-
return `http-api:disconnected:${this.sessionID}:messages`;
58-
}
59-
60-
/**
61-
* @private
62-
*/
63-
async sendWaiting() {
64-
const wasDisconnected = await this.uw.redis.exists(this.key);
65-
if (!wasDisconnected) {
66+
/** @param {string|null} clientLastEventID */
67+
async #sendWaiting(clientLastEventID) {
68+
// Legacy clients may not send a last event ID.
69+
const lastEventID = clientLastEventID ?? await this.uw.redis.get(this.key);
70+
if (!lastEventID) {
6671
return;
6772
}
68-
/** @type {string[]} */
69-
const messages = await this.uw.redis.lrange(this.messagesKey, 0, -1);
73+
74+
const messages = await this.uw.db.selectFrom('socketMessageQueue')
75+
.select([
76+
'id',
77+
'command',
78+
(eb) => json(eb.ref('data')).as('data'),
79+
])
80+
.where('id', '>', lastEventID)
81+
.where((eb) => eb.or([
82+
eb('targetUserID', 'is', null),
83+
eb('targetUserID', '=', this.user.id),
84+
]))
85+
.execute();
86+
7087
this.#logger.info({ count: messages.length }, 'queued messages');
7188
messages.forEach((message) => {
72-
const { command, data } = sjson.parse(message);
73-
this.send(command, data);
89+
this.send(message.id, message.command, fromJson(message.data));
7490
});
75-
await this.uw.redis.del(this.key, this.messagesKey);
7691
}
7792

7893
/**
@@ -91,12 +106,14 @@ class AuthedConnection extends EventEmitter {
91106
}
92107

93108
/**
109+
* @param {string} id
94110
* @param {string} command
95111
* @param {import('type-fest').JsonValue} data
96112
*/
97-
send(command, data) {
98-
this.socket.send(JSON.stringify({ command, data }));
113+
send(id, command, data) {
114+
this.socket.send(JSON.stringify({ id, command, data }));
99115
this.#lastMessage = Date.now();
116+
this.#lastEventID = id;
100117
}
101118

102119
#timeSinceLastMessage() {
@@ -119,7 +136,7 @@ class AuthedConnection extends EventEmitter {
119136
ban() {
120137
this.#logger.info('ban');
121138
this.banned = true;
122-
this.send('error', 'You have been banned');
139+
this.send(ulid(), 'error', 'You have been banned');
123140
this.socket.close(4001, 'ban');
124141
}
125142

0 commit comments

Comments
 (0)