Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@
"u-wave-announce": "=0.6.0-alpha.1",
"u-wave-source-soundcloud": "^2.0.2",
"u-wave-source-youtube": "^2.0.0",
"ulid": "^3.0.2",
"ultron": "^1.1.1",
"umzug": "^3.1.0",
"ws": "^8.0.0",
Expand Down
75 changes: 62 additions & 13 deletions src/SocketServer.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@ import GuestConnection from './sockets/GuestConnection.js';
import AuthedConnection from './sockets/AuthedConnection.js';
import LostConnection from './sockets/LostConnection.js';
import { serializeUser } from './utils/serialize.js';
import { ulid, encodeTime } from 'ulid';
import { jsonb } from './utils/sqlite.js';
import { subMinutes } from 'date-fns';

const { isEmpty } = lodash;

Expand Down Expand Up @@ -472,7 +475,7 @@ class SocketServer {
.selectAll()
.execute();
disconnectedUsers.forEach((user) => {
this.add(this.createLostConnection(user, 'TODO: Actual session ID!!'));
this.add(this.createLostConnection(user, 'TODO: Actual session ID!!', null));
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

might be a good time to solve this now(?)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No it's not 😇

I think when all session state is in SQLite this will be feasible.

});
}

Expand Down Expand Up @@ -536,15 +539,15 @@ class SocketServer {
connection.on('close', () => {
this.remove(connection);
});
connection.on('authenticate', async (user, sessionID) => {
connection.on('authenticate', async (user, sessionID, lastEventID) => {
const isReconnect = await connection.isReconnect(sessionID);
this.#logger.info({ userId: user.id, isReconnect }, 'authenticated socket');
this.#logger.info({ userId: user.id, isReconnect, lastEventID }, 'authenticated socket');
if (isReconnect) {
const previousConnection = this.getLostConnection(sessionID);
if (previousConnection) this.remove(previousConnection);
}

this.replace(connection, this.createAuthedConnection(socket, user, sessionID));
this.replace(connection, this.createAuthedConnection(socket, user, sessionID, lastEventID));

if (!isReconnect) {
this.#uw.publish('user:join', { userID: user.id });
Expand All @@ -559,18 +562,19 @@ class SocketServer {
* @param {import('ws').WebSocket} socket
* @param {User} user
* @param {string} sessionID
* @param {string|null} lastEventID
* @returns {AuthedConnection}
* @private
*/
createAuthedConnection(socket, user, sessionID) {
const connection = new AuthedConnection(this.#uw, socket, user, sessionID);
connection.on('close', ({ banned }) => {
createAuthedConnection(socket, user, sessionID, lastEventID) {
const connection = new AuthedConnection(this.#uw, socket, user, sessionID, lastEventID);
connection.on('close', ({ banned, lastEventID }) => {
if (banned) {
this.#logger.info({ userId: user.id }, 'removing connection after ban');
disconnectUser(this.#uw, user.id);
} else if (!this.#closing) {
this.#logger.info({ userId: user.id }, 'lost connection');
this.add(this.createLostConnection(user, sessionID));
this.add(this.createLostConnection(user, sessionID, lastEventID));
}
this.remove(connection);
});
Expand Down Expand Up @@ -603,11 +607,18 @@ class SocketServer {
*
* @param {User} user
* @param {string} sessionID
* @param {string|null} lastEventID
* @returns {LostConnection}
* @private
*/
createLostConnection(user, sessionID) {
const connection = new LostConnection(this.#uw, user, sessionID, this.options.timeout);
createLostConnection(user, sessionID, lastEventID) {
const connection = new LostConnection(
this.#uw,
user,
sessionID,
lastEventID,
this.options.timeout,
);
connection.on('close', () => {
this.#logger.info({ userId: user.id }, 'user left');
this.remove(connection);
Expand Down Expand Up @@ -732,6 +743,40 @@ class SocketServer {
this.#connections.forEach((connection) => {
connection.ping();
});

this.#cleanupMessageQueue().catch((err) => {
this.#logger.error({ err }, 'failed to clean up socket message queue');
});
}

async #cleanupMessageQueue() {
const oldestID = encodeTime(subMinutes(new Date(), 10).getTime());

await this.#uw.db.deleteFrom('socketMessageQueue')
.where('id', '<', oldestID)
.execute();
}

/**
* Broadcast a command to all connected clients.
*
* @param {string} command Command name.
* @param {import('type-fest').JsonValue} data Command data.
* @param {import('./schema.js').UserID | null} targetUserID
*/
#recordMessage(command, data, targetUserID = null) {
const id = ulid();

this.#uw.db.insertInto('socketMessageQueue')
.values({
id,
command,
data: jsonb(data),
targetUserID,
})
.execute();

return id;
}

/**
Expand All @@ -741,7 +786,10 @@ class SocketServer {
* @param {import('type-fest').JsonValue} data Command data.
*/
broadcast(command, data) {
const id = this.#recordMessage(command, data);

this.#logger.trace({
id,
command,
data,
to: this.#connections.map((connection) => (
Expand All @@ -750,23 +798,24 @@ class SocketServer {
}, 'broadcast');

this.#connections.forEach((connection) => {
connection.send(command, data);
connection.send(id, command, data);
});
}

/**
* Send a command to a single user.
*
* @param {User|string} user User or user ID to send the command to.
* @param {User|import('./schema.js').UserID} user User or user ID to send the command to.
* @param {string} command Command name.
* @param {import('type-fest').JsonValue} data Command data.
*/
sendTo(user, command, data) {
const userID = typeof user === 'object' ? user.id : user;
const id = this.#recordMessage(command, data, userID);

this.#connections.forEach((connection) => {
if ('user' in connection && connection.user.id === userID) {
connection.send(command, data);
connection.send(id, command, data);
}
});
}
Expand Down
29 changes: 29 additions & 0 deletions src/migrations/006-socketMessageQueue.cjs
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
'use strict';

const { sql } = require('kysely');

/**
* @param {import('umzug').MigrationParams<import('../Uwave').default>} params
*/
async function up({ context: uw }) {
const { db } = uw;

await db.schema.createTable('socket_message_queue')
// This contains a ULID which also encodes the timestamp.
.addColumn('id', 'text', (col) => col.primaryKey())
.addColumn('target_user_id', 'uuid', (col) => col.references('users.id'))
.addColumn('command', 'text', (col) => col.notNull())
.addColumn('data', 'jsonb', (col) => col.notNull().defaultTo(sql`(jsonb('null'))`))
.execute();
}

/**
* @param {import('umzug').MigrationParams<import('../Uwave').default>} params
*/
async function down({ context: uw }) {
const { db } = uw;

await db.schema.dropTable('socket_message_queue').execute();
}

module.exports = { up, down };
8 changes: 8 additions & 0 deletions src/schema.ts
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,13 @@ export interface MigrationTable {
name: string,
}

export interface SocketMessageTable {
id: string,
targetUserID: UserID | null,
command: string,
data: JSONB<JsonValue>,
}

export interface Database {
configuration: ConfigurationTable,
keyval: KeyvalTable,
Expand All @@ -179,6 +186,7 @@ export interface Database {
playlistItems: PlaylistItemTable,
historyEntries: HistoryEntryTable,
feedback: FeedbackTable,
socketMessageQueue: SocketMessageTable,
}

export type Kysely = KyselyBase<Database>;
65 changes: 41 additions & 24 deletions src/sockets/AuthedConnection.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ import EventEmitter from 'node:events';
import Ultron from 'ultron';
import WebSocket from 'ws';
import sjson from 'secure-json-parse';
import { ulid } from 'ulid';
import { fromJson, json } from '../utils/sqlite.js';

const PING_TIMEOUT = 5_000;
const DEAD_TIMEOUT = 30_000;
Expand All @@ -13,13 +15,19 @@ class AuthedConnection extends EventEmitter {

#lastMessage = Date.now();

// Ideally, the client should actually be responsible for this,
// because the server only knows if something was *sent*, not if it was received.
/** @type {string|null} */
#lastEventID = null;

/**
* @param {import('../Uwave.js').default} uw
* @param {import('ws').WebSocket} socket
* @param {import('../schema.js').User} user
* @param {string} sessionID
* @param {string|null} lastEventID
*/
constructor(uw, socket, user, sessionID) {
constructor(uw, socket, user, sessionID, lastEventID) {
super();
this.uw = uw;
this.socket = socket;
Expand All @@ -31,7 +39,10 @@ class AuthedConnection extends EventEmitter {
});

this.#events.on('close', () => {
this.emit('close', { banned: this.banned });
this.emit('close', {
banned: this.banned,
lastEventID: this.#lastEventID,
});
});
this.#events.on('message', (raw) => {
this.#onMessage(raw);
Expand All @@ -40,7 +51,9 @@ class AuthedConnection extends EventEmitter {
this.#onPong();
});

this.sendWaiting();
this.#sendWaiting(lastEventID).catch((err) => {
this.#logger.error({ err }, 'failed to send waiting messages on reconnect');
});
}

/**
Expand All @@ -50,29 +63,31 @@ class AuthedConnection extends EventEmitter {
return `http-api:disconnected:${this.sessionID}`;
}

/**
* @private
*/
get messagesKey() {
return `http-api:disconnected:${this.sessionID}:messages`;
}

/**
* @private
*/
async sendWaiting() {
const wasDisconnected = await this.uw.redis.exists(this.key);
if (!wasDisconnected) {
/** @param {string|null} clientLastEventID */
async #sendWaiting(clientLastEventID) {
// Legacy clients may not send a last event ID.
const lastEventID = clientLastEventID ?? await this.uw.redis.get(this.key);
if (!lastEventID) {
return;
}
/** @type {string[]} */
const messages = await this.uw.redis.lrange(this.messagesKey, 0, -1);

const messages = await this.uw.db.selectFrom('socketMessageQueue')
.select([
'id',
'command',
(eb) => json(eb.ref('data')).as('data'),
])
.where('id', '>', lastEventID)
.where((eb) => eb.or([
eb('targetUserID', 'is', null),
eb('targetUserID', '=', this.user.id),
]))
.execute();

this.#logger.info({ count: messages.length }, 'queued messages');
messages.forEach((message) => {
const { command, data } = sjson.parse(message);
this.send(command, data);
this.send(message.id, message.command, fromJson(message.data));
});
await this.uw.redis.del(this.key, this.messagesKey);
}

/**
Expand All @@ -91,12 +106,14 @@ class AuthedConnection extends EventEmitter {
}

/**
* @param {string} id
* @param {string} command
* @param {import('type-fest').JsonValue} data
*/
send(command, data) {
this.socket.send(JSON.stringify({ command, data }));
send(id, command, data) {
this.socket.send(JSON.stringify({ id, command, data }));
this.#lastMessage = Date.now();
this.#lastEventID = id;
}

#timeSinceLastMessage() {
Expand All @@ -119,7 +136,7 @@ class AuthedConnection extends EventEmitter {
ban() {
this.#logger.info('ban');
this.banned = true;
this.send('error', 'You have been banned');
this.send(ulid(), 'error', 'You have been banned');
this.socket.close(4001, 'ban');
}

Expand Down
Loading