diff --git a/packages/utils/src/lib/create-runner-files.ts b/packages/utils/src/lib/create-runner-files.ts index 5cb402580..8a8495555 100644 --- a/packages/utils/src/lib/create-runner-files.ts +++ b/packages/utils/src/lib/create-runner-files.ts @@ -1,8 +1,8 @@ import { writeFile } from 'node:fs/promises'; import path from 'node:path'; -import { threadId } from 'node:worker_threads'; import type { RunnerFilesPaths } from '@code-pushup/models'; import { ensureDirectoryExists, pluginWorkDir } from './file-system.js'; +import { getUniqueProcessThreadId } from './process-id.js'; /** * Function to create timestamp nested plugin runner files for config and output. @@ -14,9 +14,7 @@ export async function createRunnerFiles( pluginSlug: string, configJSON: string, ): Promise { - // Use timestamp + process ID + threadId - // This prevents race conditions when running the same plugin for multiple projects in parallel - const uniqueId = `${(performance.timeOrigin + performance.now()) * 10}-${process.pid}-${threadId}`; + const uniqueId = getUniqueProcessThreadId(); const runnerWorkDir = path.join(pluginWorkDir(pluginSlug), uniqueId); const runnerConfigPath = path.join(runnerWorkDir, 'plugin-config.json'); const runnerOutputPath = path.join(runnerWorkDir, 'runner-output.json'); diff --git a/packages/utils/src/lib/errors.ts b/packages/utils/src/lib/errors.ts index 3ce467bfd..709c0555a 100644 --- a/packages/utils/src/lib/errors.ts +++ b/packages/utils/src/lib/errors.ts @@ -30,3 +30,21 @@ export function stringifyError( } return JSON.stringify(error); } + +/** + * Extends an error with a new message and keeps the original as the cause. + * This helps to keep the stacktrace intact and enables better debugging. + * @param error - The error to extend + * @param message - The new message to add to the error + * @returns A new error with the extended message and the original as cause + */ +export function extendError( + error: unknown, + message: string, + { appendMessage = false } = {}, +) { + const errorMessage = appendMessage + ? `${message}\n${stringifyError(error)}` + : message; + return new Error(errorMessage, { cause: error }); +} diff --git a/packages/utils/src/lib/profiler/constants.ts b/packages/utils/src/lib/profiler/constants.ts index 0cb78b7ae..54c19e15e 100644 --- a/packages/utils/src/lib/profiler/constants.ts +++ b/packages/utils/src/lib/profiler/constants.ts @@ -31,3 +31,9 @@ export const SHARDED_WAL_COORDINATOR_ID_ENV_VAR = * Used as the base name for sharded WAL files (e.g., "trace" in "trace.json"). */ export const PROFILER_PERSIST_BASENAME = 'trace'; + +/** + * Name for current measure. + * Used as the name for the sharded folder. + */ +export const PROFILER_MEASURE_NAME = 'CP_PROFILER_MEASURE_NAME'; diff --git a/packages/utils/src/lib/profiler/profiler-node.ts b/packages/utils/src/lib/profiler/profiler-node.ts index 3ab87b642..7efd7a63a 100644 --- a/packages/utils/src/lib/profiler/profiler-node.ts +++ b/packages/utils/src/lib/profiler/profiler-node.ts @@ -16,11 +16,8 @@ import type { ActionTrackEntryPayload, MarkerPayload, } from '../user-timing-extensibility-api.type.js'; -import { - type AppendableSink, - WriteAheadLogFile, - getShardedPath, -} from '../wal.js'; +import { getShardedPath } from '../wal-sharded.js'; +import { type AppendableSink, WriteAheadLogFile } from '../wal.js'; import { PROFILER_DEBUG_ENV_VAR, PROFILER_ENABLED_ENV_VAR, diff --git a/packages/utils/src/lib/wal-sharded.int.test.ts b/packages/utils/src/lib/wal-sharded.int.test.ts new file mode 100644 index 000000000..6b708163f --- /dev/null +++ b/packages/utils/src/lib/wal-sharded.int.test.ts @@ -0,0 +1,259 @@ +import fs from 'node:fs'; +import path from 'node:path'; +import { afterEach, beforeEach, describe, expect, it } from 'vitest'; +import { SHARDED_WAL_COORDINATOR_ID_ENV_VAR } from './profiler/constants.js'; +import { ShardedWal } from './wal-sharded.js'; +import { type WalFormat, type WalRecord, stringCodec } from './wal.js'; + +describe('ShardedWal Integration', () => { + const testDir = path.join( + process.cwd(), + 'tmp', + 'int', + 'utils', + 'wal-sharded', + ); + const makeMockFormat = ( + overrides: Partial>, + ): WalFormat => { + const { + baseName = 'wal', + walExtension = '.log', + finalExtension = '.json', + codec = stringCodec(), + finalizer = records => `${JSON.stringify(records)}\n`, + } = overrides; + + return { + baseName, + walExtension, + finalExtension, + codec, + finalizer, + }; + }; + let shardedWal: ShardedWal; + + beforeEach(() => { + if (fs.existsSync(testDir)) { + fs.rmSync(testDir, { recursive: true, force: true }); + } + fs.mkdirSync(testDir, { recursive: true }); + }); + + afterEach(() => { + if (shardedWal) { + shardedWal.cleanupIfCoordinator(); + } + if (fs.existsSync(testDir)) { + fs.rmSync(testDir, { recursive: true, force: true }); + } + }); + + it('should create and finalize shards correctly', () => { + shardedWal = new ShardedWal({ + debug: false, + dir: testDir, + format: makeMockFormat({ + baseName: 'trace', + }), + coordinatorIdEnvVar: SHARDED_WAL_COORDINATOR_ID_ENV_VAR, + groupId: 'create-finalize', + }); + + const shard1 = shardedWal.shard(); + shard1.open(); + shard1.append('record1'); + shard1.append('record2'); + shard1.close(); + + const shard2 = shardedWal.shard(); + shard2.open(); + shard2.append('record3'); + shard2.close(); + + shardedWal.finalize(); + + const finalFile = path.join( + testDir, + shardedWal.groupId, + `trace.create-finalize.json`, + ); + expect(fs.existsSync(finalFile)).toBeTrue(); + + const content = fs.readFileSync(finalFile, 'utf8'); + const records = JSON.parse(content.trim()); + expect(records).toEqual(['record1', 'record2', 'record3']); + }); + + it('should merge multiple shards correctly', () => { + shardedWal = new ShardedWal({ + debug: false, + dir: testDir, + format: makeMockFormat({ + baseName: 'merged', + }), + coordinatorIdEnvVar: SHARDED_WAL_COORDINATOR_ID_ENV_VAR, + groupId: 'merge-shards', + }); + + // eslint-disable-next-line functional/no-loop-statements + for (let i = 1; i <= 5; i++) { + const shard = shardedWal.shard(); + shard.open(); + shard.append(`record-from-shard-${i}`); + shard.close(); + } + + shardedWal.finalize(); + + const finalFile = path.join( + testDir, + shardedWal.groupId, + `merged.merge-shards.json`, + ); + const content = fs.readFileSync(finalFile, 'utf8'); + const records = JSON.parse(content.trim()); + expect(records).toHaveLength(5); + expect(records[0]).toBe('record-from-shard-1'); + expect(records[4]).toBe('record-from-shard-5'); + }); + + it('should handle invalid entries during if debug true', () => { + shardedWal = new ShardedWal({ + debug: true, + dir: testDir, + format: makeMockFormat({ + baseName: 'test', + }), + coordinatorIdEnvVar: SHARDED_WAL_COORDINATOR_ID_ENV_VAR, + groupId: 'invalid-entries', + }); + + const shard = shardedWal.shard(); + shard.open(); + shard.append('valid1'); + shard.append('invalid'); + shard.append('valid2'); + shard.close(); + + shardedWal.finalize(); + // When debug is true, lastRecover should contain recovery results + expect(shardedWal.stats.lastRecover).toHaveLength(1); + expect(shardedWal.stats.lastRecover[0]).toMatchObject({ + file: expect.stringContaining('test.'), + result: expect.objectContaining({ + records: expect.arrayContaining(['valid1', 'invalid', 'valid2']), + errors: [], + partialTail: null, + }), + }); + + const finalFile = path.join( + testDir, + shardedWal.groupId, + `test.invalid-entries.json`, + ); + const content = fs.readFileSync(finalFile, 'utf8'); + const records = JSON.parse(content.trim()); + expect(records).toEqual(['valid1', 'invalid', 'valid2']); + }); + + it('should cleanup shard files after finalization', () => { + shardedWal = new ShardedWal({ + debug: false, + dir: testDir, + format: makeMockFormat({ + baseName: 'cleanup-test', + }), + coordinatorIdEnvVar: SHARDED_WAL_COORDINATOR_ID_ENV_VAR, + groupId: 'cleanup-test', + }); + + const shard1 = shardedWal.shard(); + shard1.open(); + shard1.append('record1'); + shard1.close(); + + const shard2 = shardedWal.shard(); + shard2.open(); + shard2.append('record2'); + shard2.close(); + + shardedWal.finalize(); + + const finalFile = path.join( + testDir, + shardedWal.groupId, + `cleanup-test.cleanup-test.json`, + ); + expect(fs.existsSync(finalFile)).toBeTrue(); + + shardedWal.cleanupIfCoordinator(); + + const groupDir = path.join(testDir, shardedWal.groupId); + const files = fs.readdirSync(groupDir); + expect(files).not.toContain(expect.stringMatching(/cleanup-test.*\.log$/)); + expect(files).toContain(`cleanup-test.cleanup-test.json`); + }); + + it('should use custom options in finalizer', () => { + shardedWal = new ShardedWal({ + debug: false, + dir: testDir, + format: makeMockFormat({ + baseName: 'custom', + finalizer: (records, opt) => + `${JSON.stringify({ records, metadata: opt })}\n`, + }), + coordinatorIdEnvVar: SHARDED_WAL_COORDINATOR_ID_ENV_VAR, + groupId: 'custom-finalizer', + }); + + const shard = shardedWal.shard(); + shard.open(); + shard.append('record1'); + shard.close(); + + shardedWal.finalize({ version: '2.0', timestamp: Date.now() }); + + const finalFile = path.join( + testDir, + shardedWal.groupId, + `custom.custom-finalizer.json`, + ); + const content = fs.readFileSync(finalFile, 'utf8'); + const result = JSON.parse(content.trim()); + expect(result.records).toEqual(['record1']); + expect(result.metadata).toEqual({ + version: '2.0', + timestamp: expect.any(Number), + }); + }); + + it('should handle empty shards correctly', () => { + shardedWal = new ShardedWal({ + debug: false, + dir: testDir, + format: makeMockFormat({ + baseName: 'empty', + }), + coordinatorIdEnvVar: SHARDED_WAL_COORDINATOR_ID_ENV_VAR, + groupId: 'empty-shards', + }); + + const groupDir = path.join(testDir, shardedWal.groupId); + fs.mkdirSync(groupDir, { recursive: true }); + + shardedWal.finalize(); + + const finalFile = path.join( + testDir, + shardedWal.groupId, + `empty.${shardedWal.groupId}.json`, + ); + expect(fs.existsSync(finalFile)).toBeTrue(); + const content = fs.readFileSync(finalFile, 'utf8'); + expect(content.trim()).toBe('[]'); + }); +}); diff --git a/packages/utils/src/lib/wal-sharded.ts b/packages/utils/src/lib/wal-sharded.ts new file mode 100644 index 000000000..499ddab62 --- /dev/null +++ b/packages/utils/src/lib/wal-sharded.ts @@ -0,0 +1,474 @@ +import * as fs from 'node:fs'; +import path from 'node:path'; +import process from 'node:process'; +import { threadId } from 'node:worker_threads'; +import { extendError } from './errors.js'; +import { + type Counter, + getUniqueInstanceId, + getUniqueTimeId, +} from './process-id.js'; +import { + type InvalidEntry, + type RecoverResult, + type WalFormat, + type WalRecord, + WriteAheadLogFile, + ensureDirectoryExistsSync, + filterValidRecords, +} from './wal.js'; + +/** + * Validates that a groupId is safe to use as a single path segment. + * Rejects path traversal attempts and path separators to prevent writing outside intended directory. + * + * @param groupId - The groupId to validate + * @throws Error if groupId contains unsafe characters or path traversal sequences + */ +function validateGroupId(groupId: string): void { + // Reject empty or whitespace-only groupIds + if (!groupId || groupId.trim().length === 0) { + throw new Error('groupId cannot be empty or whitespace-only'); + } + + // Reject path separators (both forward and backward slashes) + if (groupId.includes('/') || groupId.includes('\\')) { + throw new Error('groupId cannot contain path separators'); + } + + // Reject relative path components + if (groupId === '..' || groupId === '.') { + throw new Error('groupId cannot be "." or ".."'); + } + + // Reject null bytes which can be used to bypass validation + if (groupId.includes('\0')) { + throw new Error('groupId cannot contain null bytes'); + } + + // Validate that the resolved path stays within the intended directory + // This catches cases where the path library normalizes to a parent directory + const normalized = path.normalize(groupId); + if (normalized !== groupId || normalized.startsWith('..')) { + throw new Error( + `groupId normalization resulted in unsafe path: ${normalized}`, + ); + } +} + +// eslint-disable-next-line functional/no-let +let shardCount = 0; + +/** + * Counter for generating sequential shard IDs. + * Encapsulates the shard count increment logic. + */ +export const ShardedWalCounter: Counter = { + next() { + return ++shardCount; + }, +}; + +/** + * Generates a unique readable instance ID. + * This ID uniquely identifies a shard/file per process/thread combination with a human-readable timestamp. + * Format: readable-timestamp.pid.threadId.counter + * Example: "20240101-120000-000.12345.1.1" + * + * @returns A unique ID string with readable timestamp, process ID, thread ID, and counter + */ +export function getShardId(): string { + return `${getUniqueTimeId()}.${process.pid}.${threadId}.${ShardedWalCounter.next()}`; +} + +/** + * Generates a path to a shard file using human-readable IDs. + * Both groupId and shardId are already in readable date format. + * + * Example with groupId "20240101-120000-000" and shardId "20240101-120000-000.12345.1.1": + * Full path: /base/20240101-120000-000/trace.20240101-120000-000.12345.1.1.log + * + * @param opt.dir - The directory to store the shard file + * @param opt.format - The WalFormat to use for the shard file + * @param opt.groupId - The human-readable group ID (yyyymmdd-hhmmss-ms format) + * @param opt.shardId - The human-readable shard ID (readable-timestamp.pid.threadId.count format) + * @returns The path to the shard file + */ +export function getShardedPath(opt: { + dir?: string; + format: WalFormat; + groupId: string; + shardId: string; +}): string { + const { dir = '', format, groupId, shardId } = opt; + const { baseName, walExtension } = format; + + return path.join(dir, groupId, `${baseName}.${shardId}${walExtension}`); +} + +/** + * Sharded Write-Ahead Log manager for coordinating multiple WAL shards. + * Handles distributed logging across multiple processes/files with atomic finalization. + */ + +export class ShardedWal { + static instanceCount = 0; + + readonly #id: string = getUniqueInstanceId({ + next() { + return ++ShardedWal.instanceCount; + }, + }); + readonly groupId = getUniqueTimeId(); + readonly #debug: boolean = false; + readonly #format: WalFormat; + readonly #dir: string = process.cwd(); + readonly #coordinatorIdEnvVar: string; + #state: 'active' | 'finalized' | 'cleaned' = 'active'; + #lastRecovery: { + file: string; + result: RecoverResult>; + }[] = []; + #createdShardFiles: string[] = []; + + /** + * Initialize the origin PID environment variable if not already set. + * This must be done as early as possible before any user code runs. + * Sets envVarName to the current process ID if not already defined. + * + * @param envVarName - Environment variable name for storing coordinator ID + * @param profilerID - The profiler ID to set as coordinator + */ + static setCoordinatorProcess(envVarName: string, profilerID: string): void { + if (!process.env[envVarName]) { + process.env[envVarName] = profilerID; + } + } + + /** + * Determines if this process is the leader WAL process using the origin PID heuristic. + * + * The leader is the process that first enabled profiling (the one that set CP_PROFILER_ORIGIN_PID). + * All descendant processes inherit the environment but have different PIDs. + * + * @param envVarName - Environment variable name for storing coordinator ID + * @param profilerID - The profiler ID to check + * @returns true if this is the leader WAL process, false otherwise + */ + static isCoordinatorProcess(envVarName: string, profilerID: string): boolean { + return process.env[envVarName] === profilerID; + } + + /** + * Create a sharded WAL manager. + * + * @param opt.dir - Base directory to store shard files (defaults to process.cwd()) + * @param opt.format - WAL format configuration + * @param opt.groupId - Group ID for sharding (defaults to generated group ID) + * @param opt.coordinatorIdEnvVar - Environment variable name for storing coordinator ID (defaults to CP_SHARDED_WAL_COORDINATOR_ID) + * @param opt.autoCoordinator - Whether to auto-set the coordinator ID on construction (defaults to true) + * @param opt.measureNameEnvVar - Environment variable name for coordinating groupId across processes (optional) + */ + constructor(opt: { + debug?: boolean; + dir?: string; + format: WalFormat; + groupId?: string; + coordinatorIdEnvVar: string; + autoCoordinator?: boolean; + measureNameEnvVar?: string; + }) { + const { + dir, + format, + debug, + groupId, + coordinatorIdEnvVar, + autoCoordinator = true, + measureNameEnvVar, + } = opt; + + if (debug != null) { + this.#debug = debug; + } + + // Determine groupId: use provided, then env var, or generate + // eslint-disable-next-line functional/no-let + let resolvedGroupId: string; + if (groupId != null) { + // User explicitly provided groupId - use it (even if empty, validation will catch it) + resolvedGroupId = groupId; + } else if (measureNameEnvVar && process.env[measureNameEnvVar] != null) { + // Env var is set (by coordinator or previous process) - use it + resolvedGroupId = process.env[measureNameEnvVar]; + } else if (measureNameEnvVar) { + // Env var not set - we're likely the first/coordinator, generate and set it + resolvedGroupId = getUniqueTimeId(); + + process.env[measureNameEnvVar] = resolvedGroupId; + } else { + // No measureNameEnvVar provided - generate unique one (backward compatible) + resolvedGroupId = getUniqueTimeId(); + } + + // Validate groupId for path safety before using it + validateGroupId(resolvedGroupId); + + this.groupId = resolvedGroupId; + + if (dir) { + this.#dir = dir; + } + this.#format = format; + this.#coordinatorIdEnvVar = coordinatorIdEnvVar; + + if (autoCoordinator) { + ShardedWal.setCoordinatorProcess(this.#coordinatorIdEnvVar, this.#id); + } + } + + /** + * Gets the unique instance ID for this ShardedWal. + * + * @returns The unique instance ID + */ + get id(): string { + return this.#id; + } + + /** + * Is this instance the coordinator? + * + * Coordinator status is determined from the coordinatorIdEnvVar environment variable. + * The coordinator handles finalization and cleanup of shard files. + * Checks dynamically to allow coordinator to be set after construction. + * + * @returns true if this instance is the coordinator, false otherwise + */ + isCoordinator(): boolean { + return ShardedWal.isCoordinatorProcess(this.#coordinatorIdEnvVar, this.#id); + } + + /** + * Asserts that the WAL is in 'active' state. + * Throws an error if the WAL has been finalized or cleaned. + * + * @throws Error if WAL is not in 'active' state + */ + private assertActive(): void { + if (this.#state !== 'active') { + throw new Error(`WAL is ${this.#state}, cannot modify`); + } + } + + /** + * Gets the current lifecycle state of the WAL. + * + * @returns Current lifecycle state: 'active', 'finalized', or 'cleaned' + */ + getState(): 'active' | 'finalized' | 'cleaned' { + return this.#state; + } + + /** + * Checks if the WAL has been finalized. + * + * @returns true if WAL is in 'finalized' state, false otherwise + */ + isFinalized(): boolean { + return this.#state === 'finalized'; + } + + /** + * Checks if the WAL has been cleaned. + * + * @returns true if WAL is in 'cleaned' state, false otherwise + */ + isCleaned(): boolean { + return this.#state === 'cleaned'; + } + + /** + * Generates a filename for a shard file using a shard ID. + * Both groupId and shardId are already in readable date format. + * + * Example with baseName "trace" and shardId "20240101-120000-000.12345.1.1": + * Filename: trace.20240101-120000-000.12345.1.1.log + * + * @param shardId - The human-readable shard ID (readable-timestamp.pid.threadId.count format) + * @returns The filename for the shard file + */ + getShardedFileName(shardId: string) { + const { baseName, walExtension } = this.#format; + return `${baseName}.${shardId}${walExtension}`; + } + + /** + * Generates a filename for the final merged output file. + * Uses the groupId as the identifier in the final filename. + * + * Example with baseName "trace" and groupId "20240101-120000-000": + * Filename: trace.20240101-120000-000.json + * + * Example with baseName "trace" and groupId "measureName": + * Filename: trace.measureName.json + * + * @returns The filename for the final merged output file + */ + getFinalFilePath() { + const groupIdDir = path.join(this.#dir, this.groupId); + const { baseName, finalExtension } = this.#format; + + return path.join( + groupIdDir, + `${baseName}.${this.groupId}${finalExtension}`, + ); + } + + shard() { + this.assertActive(); + const filePath = path.join( + this.#dir, + this.groupId, + this.getShardedFileName(getShardId()), + ); + this.#createdShardFiles.push(filePath); + return new WriteAheadLogFile({ + file: filePath, + codec: this.#format.codec, + }); + } + + /** Get all shard file paths matching this WAL's base name */ + private shardFiles() { + if (!fs.existsSync(this.#dir)) { + return []; + } + + const groupDir = path.join(this.#dir, this.groupId); + // create dir if not existing + ensureDirectoryExistsSync(groupDir); + + return fs + .readdirSync(groupDir) + .filter(entry => entry.endsWith(this.#format.walExtension)) + .filter(entry => entry.startsWith(`${this.#format.baseName}`)) + .map(entry => path.join(groupDir, entry)); + } + + /** Get shard file paths created by this instance */ + private getCreatedShardFiles() { + return this.#createdShardFiles.filter(f => fs.existsSync(f)); + } + + /** + * Finalize all shards by merging them into a single output file. + * Recovers all records from all shards, validates no errors, and writes merged result. + * Idempotent: returns early if already finalized or cleaned. + * @throws Error if custom finalizer method throws + */ + finalize(opt?: Record) { + if (this.#state !== 'active') { + return; + } + + // Ensure base directory exists before calling shardFiles() + ensureDirectoryExistsSync(this.#dir); + + const fileRecoveries = this.shardFiles().map(f => ({ + file: f, + result: new WriteAheadLogFile({ + file: f, + codec: this.#format.codec, + }).recover(), + })); + + const records = fileRecoveries.flatMap(({ result }) => result.records); + + if (this.#debug) { + this.#lastRecovery = fileRecoveries; + } + + ensureDirectoryExistsSync(path.dirname(this.getFinalFilePath())); + + try { + fs.writeFileSync( + this.getFinalFilePath(), + this.#format.finalizer(filterValidRecords(records), opt), + ); + } catch (error) { + throw extendError( + error, + 'Could not finalize sharded wal. Finalizer method in format throws.', + { appendMessage: true }, + ); + } + + this.#state = 'finalized'; + } + + /** + * Cleanup shard files by removing them from disk. + * Coordinator-only: throws error if not coordinator to prevent race conditions. + * Idempotent: returns early if already cleaned. + */ + cleanup() { + if (!this.isCoordinator()) { + throw new Error('cleanup() can only be called by coordinator'); + } + + if (this.#state === 'cleaned') { + return; + } + + this.shardFiles() + .filter(f => fs.existsSync(f)) + .forEach(f => { + fs.unlinkSync(f); + }); + + this.#state = 'cleaned'; + } + + get stats() { + // When finalized, count all shard files from filesystem (for multi-process scenarios) + // Otherwise, count only files created by this instance + const shardFileCount = + this.#state === 'finalized' || this.#state === 'cleaned' + ? this.shardFiles().length + : this.getCreatedShardFiles().length; + const shardFilesList = + this.#state === 'finalized' || this.#state === 'cleaned' + ? this.shardFiles() + : this.getCreatedShardFiles(); + + return { + lastRecover: this.#lastRecovery, + state: this.#state, + groupId: this.groupId, + shardCount: this.getCreatedShardFiles().length, + isCoordinator: this.isCoordinator(), + isFinalized: this.isFinalized(), + isCleaned: this.isCleaned(), + finalFilePath: this.getFinalFilePath(), + shardFileCount, + shardFiles: shardFilesList, + }; + } + + finalizeIfCoordinator(opt?: Record) { + if (this.isCoordinator()) { + this.finalize(opt); + } + } + + /** + * Cleanup shard files if this instance is the coordinator. + * Safe to call from any process - only coordinator will execute cleanup. + */ + cleanupIfCoordinator() { + if (this.isCoordinator()) { + this.cleanup(); + } + } +} diff --git a/packages/utils/src/lib/wal-sharded.unit.test.ts b/packages/utils/src/lib/wal-sharded.unit.test.ts new file mode 100644 index 000000000..d188b3c68 --- /dev/null +++ b/packages/utils/src/lib/wal-sharded.unit.test.ts @@ -0,0 +1,585 @@ +import { vol } from 'memfs'; +import { beforeEach, describe, expect, it } from 'vitest'; +import { MEMFS_VOLUME, osAgnosticPath } from '@code-pushup/test-utils'; +import { getUniqueInstanceId } from './process-id.js'; +import { + PROFILER_MEASURE_NAME, + SHARDED_WAL_COORDINATOR_ID_ENV_VAR, +} from './profiler/constants.js'; +import { ShardedWal } from './wal-sharded.js'; +import { + type WalFormat, + type WalRecord, + WriteAheadLogFile, + parseWalFormat, + stringCodec, +} from './wal.js'; + +const read = (p: string) => vol.readFileSync(p, 'utf8') as string; + +const getShardedWal = (overrides?: { + dir?: string; + format?: Partial; + measureNameEnvVar?: string; + autoCoordinator?: boolean; + groupId?: string; +}) => { + const { format, ...rest } = overrides ?? {}; + return new ShardedWal({ + debug: false, + dir: '/test/shards', + format: parseWalFormat({ + baseName: 'test-wal', + ...format, + }), + coordinatorIdEnvVar: SHARDED_WAL_COORDINATOR_ID_ENV_VAR, + ...rest, + }); +}; + +describe('ShardedWal', () => { + beforeEach(() => { + vol.reset(); + vol.fromJSON({}, MEMFS_VOLUME); + // Clear coordinator env var for fresh state + // eslint-disable-next-line functional/immutable-data, @typescript-eslint/no-dynamic-delete + delete process.env[SHARDED_WAL_COORDINATOR_ID_ENV_VAR]; + // Clear measure name env var to avoid test pollution + // eslint-disable-next-line functional/immutable-data + delete process.env[PROFILER_MEASURE_NAME]; + }); + + describe('initialization', () => { + it('should create instance with directory and format', () => { + const sw = getShardedWal(); + expect(sw).toBeInstanceOf(ShardedWal); + }); + + it('should expose a stable id via getter', () => { + const sw = getShardedWal(); + const firstId = sw.id; + expect(sw.id).toBe(firstId); + }); + + it('should use groupId from env var when measureNameEnvVar is set', () => { + // eslint-disable-next-line functional/immutable-data + process.env[PROFILER_MEASURE_NAME] = 'from-env'; + const sw = getShardedWal({ + measureNameEnvVar: PROFILER_MEASURE_NAME, + }); + expect(sw.groupId).toBe('from-env'); + expect(process.env.CP_PROFILER_MEASURE_NAME).toBe('from-env'); + }); + + it('should set env var when measureNameEnvVar is provided and unset', () => { + // eslint-disable-next-line functional/immutable-data + delete process.env.CP_PROFILER_MEASURE_NAME; + const sw = getShardedWal({ + measureNameEnvVar: PROFILER_MEASURE_NAME, + }); + expect(process.env.CP_PROFILER_MEASURE_NAME).toBe(sw.groupId); + }); + }); + + describe('path traversal validation', () => { + it('should reject groupId with forward slashes', () => { + expect(() => getShardedWal({ groupId: '../etc/passwd' })).toThrow( + 'groupId cannot contain path separators', + ); + }); + + it('should reject groupId with backward slashes', () => { + expect(() => + getShardedWal({ groupId: String.raw`..\windows\system32` }), + ).toThrow('groupId cannot contain path separators'); + }); + + it('should reject groupId with parent directory reference', () => { + expect(() => getShardedWal({ groupId: '..' })).toThrow( + 'groupId cannot be "." or ".."', + ); + }); + + it('should reject groupId with current directory reference', () => { + expect(() => getShardedWal({ groupId: '.' })).toThrow( + 'groupId cannot be "." or ".."', + ); + }); + + it('should reject groupId with null bytes', () => { + expect(() => getShardedWal({ groupId: 'test\0malicious' })).toThrow( + 'groupId cannot contain null bytes', + ); + }); + + it('should reject empty groupId', () => { + expect(() => getShardedWal({ groupId: '' })).toThrow( + 'groupId cannot be empty or whitespace-only', + ); + }); + + it('should reject whitespace-only groupId', () => { + expect(() => getShardedWal({ groupId: ' ' })).toThrow( + 'groupId cannot be empty or whitespace-only', + ); + }); + + it('should accept safe alphanumeric groupId', () => { + const sw = getShardedWal({ groupId: 'safe-group-123' }); + expect(sw.groupId).toBe('safe-group-123'); + }); + + it('should accept groupId with underscores and hyphens', () => { + const sw = getShardedWal({ groupId: 'test_group-name' }); + expect(sw.groupId).toBe('test_group-name'); + }); + + it('should reject groupId from env var with path traversal', () => { + // eslint-disable-next-line functional/immutable-data + process.env.CP_PROFILER_MEASURE_NAME = '../malicious'; + expect(() => + getShardedWal({ + measureNameEnvVar: PROFILER_MEASURE_NAME, + }), + ).toThrow('groupId cannot contain path separators'); + }); + }); + + describe('shard management', () => { + it('should create shard with correct file path', () => { + const sw = getShardedWal({ + format: { baseName: 'trace', walExtension: '.log' }, + }); + const shard = sw.shard(); + expect(shard).toBeInstanceOf(WriteAheadLogFile); + // Shard files use getShardId() format (timestamp.pid.threadId.counter) + // The groupId is auto-generated and used in the shard path + // Normalize path before regex matching to handle OS-specific separators + expect(osAgnosticPath(shard.getPath())).toMatch( + /^\/shards\/\d{8}-\d{6}-\d{3}\/trace\.\d{8}-\d{6}-\d{3}(?:\.\d+){3}\.log$/, + ); + expect(shard.getPath()).toEndWithPath('.log'); + }); + + it('should create shard with default shardId when no argument provided', () => { + const sw = getShardedWal({ + format: { baseName: 'trace', walExtension: '.log' }, + }); + const shard = sw.shard(); + expect(shard.getPath()).toStartWithPath( + '/shards/20231114-221320-000/trace.20231114-221320-000.10001', + ); + expect(shard.getPath()).toEndWithPath('.log'); + }); + }); + + describe('file operations', () => { + it('should list no shard files when directory does not exist', () => { + const sw = getShardedWal({ dir: '/nonexistent' }); + const files = (sw as any).shardFiles(); + expect(files).toEqual([]); + }); + + it('should list no shard files when directory is empty', () => { + const sw = getShardedWal({ dir: '/empty' }); + vol.mkdirSync('/empty/20231114-221320-000', { recursive: true }); + const files = (sw as any).shardFiles(); + expect(files).toEqual([]); + }); + + it('should list shard files matching extension', () => { + vol.fromJSON({ + '/shards/20231114-221320-000/trace.19700101-000820-001.1.log': + 'content1', + '/shards/20231114-221320-000/trace.19700101-000820-002.2.log': + 'content2', + '/shards/other.txt': 'not a shard', + }); + + const sw = getShardedWal({ + dir: '/shards', + format: { baseName: 'trace', walExtension: '.log' }, + }); + const files = (sw as any).shardFiles(); + + expect(files).toHaveLength(2); + expect(files).toEqual( + expect.arrayContaining([ + expect.pathToMatch( + '/shards/20231114-221320-000/trace.19700101-000820-001.1.log', + ), + expect.pathToMatch( + '/shards/20231114-221320-000/trace.19700101-000820-002.2.log', + ), + ]), + ); + }); + }); + + describe('finalization', () => { + it('should finalize empty shards to empty result', () => { + const sw = getShardedWal({ + dir: '/shards', + format: { + baseName: 'final', + finalExtension: '.json', + finalizer: (records: WalRecord) => `${JSON.stringify(records)}\n`, + }, + }); + + vol.mkdirSync('/shards/20231114-221320-000', { recursive: true }); + sw.finalize(); + + expect( + read('/shards/20231114-221320-000/final.20231114-221320-000.json'), + ).toBe('[]\n'); + }); + + it('should finalize multiple shards into single file', () => { + vol.fromJSON({ + '/shards/20231114-221320-000/merged.20240101-120000-001.1.log': + 'record1\n', + '/shards/20231114-221320-000/merged.20240101-120000-002.2.log': + 'record2\n', + }); + + const sw = getShardedWal({ + dir: '/shards', + format: { + baseName: 'merged', + walExtension: '.log', + finalExtension: '.json', + finalizer: (records: WalRecord) => `${JSON.stringify(records)}\n`, + }, + }); + + sw.finalize(); + + const result = JSON.parse( + read( + '/shards/20231114-221320-000/merged.20231114-221320-000.json', + ).trim(), + ); + expect(result).toEqual(['record1', 'record2']); + }); + + it('should handle invalid entries during finalize', () => { + vol.fromJSON({ + '/shards/20231114-221320-000/final.20240101-120000-001.1.log': + 'valid\n', + '/shards/20231114-221320-000/final.20240101-120000-002.2.log': + 'invalid\n', + }); + + const sw = getShardedWal({ + dir: '/shards', + format: { + baseName: 'final', + walExtension: '.log', + finalExtension: '.json', + codec: stringCodec(), + finalizer: (records: WalRecord) => `${JSON.stringify(records)}\n`, + }, + }); + + sw.finalize(); + + const result = JSON.parse( + read( + '/shards/20231114-221320-000/final.20231114-221320-000.json', + ).trim(), + ); + expect(result).toHaveLength(2); + expect(result[0]).toBe('valid'); + expect(result[1]).toBe('invalid'); + }); + + it('should use custom options in finalizer', () => { + vol.fromJSON({ + '/shards/20231114-221320-000/final.20231114-221320-000.10001.2.1.log': + 'record1\n', + }); + + const sw = getShardedWal({ + dir: '/shards', + format: { + baseName: 'final', + walExtension: '.log', + finalExtension: '.json', + finalizer: (records: WalRecord, opt) => + `${JSON.stringify({ records, meta: opt })}\n`, + }, + }); + + sw.finalize({ version: '1.0', compressed: true }); + + const result = JSON.parse( + read('/shards/20231114-221320-000/final.20231114-221320-000.json'), + ); + expect(result.records).toEqual(['record1']); + expect(result.meta).toEqual({ version: '1.0', compressed: true }); + }); + }); + + describe('cleanup', () => { + it('should throw error when cleanup is called by non-coordinator', () => { + vol.fromJSON({ + '/shards/20231114-221320-000/test.20231114-221320-000.10001.2.1.log': + 'content1', + }); + const sw = getShardedWal({ + dir: '/shards', + format: { baseName: 'test', walExtension: '.log' }, + autoCoordinator: false, + }); + + // Instance won't be coordinator, so cleanup() should throw + expect(() => sw.cleanup()).toThrow( + 'cleanup() can only be called by coordinator', + ); + }); + + it('should handle cleanupIfCoordinator when not coordinator', () => { + vol.fromJSON({ + '/shards/20231114-221320-000/test.20231114-221320-000.10001.2.1.log': + 'content1', + }); + + const sw = getShardedWal({ + dir: '/shards', + format: { baseName: 'test', walExtension: '.log' }, + autoCoordinator: false, + }); + + // cleanupIfCoordinator should be no-op when not coordinator + sw.cleanupIfCoordinator(); + + // Files should still exist + expect(vol.toJSON()).not.toStrictEqual({}); + expect(sw.getState()).toBe('active'); + }); + + it('should handle cleanup when some shard files do not exist', () => { + vol.fromJSON({ + '/shards/20231114-221320-000/test.20231114-221320-000.10001.2.1.log': + 'content1', + }); + + const sw = getShardedWal({ + dir: '/shards', + format: { baseName: 'test', walExtension: '.log' }, + }); + + vol.unlinkSync( + '/shards/20231114-221320-000/test.20231114-221320-000.10001.2.1.log', + ); + + // cleanupIfCoordinator won't throw even if files don't exist + expect(() => sw.cleanupIfCoordinator()).not.toThrow(); + }); + + it('should ignore directory removal failures during cleanup', () => { + vol.fromJSON({ + '/shards/20231114-221320-000/test.20231114-221320-000.10001.2.1.log': + 'content1', + '/shards/20231114-221320-000/keep.txt': 'keep', + }); + + const sw = getShardedWal({ + dir: '/shards', + format: { baseName: 'test', walExtension: '.log' }, + }); + + expect(() => sw.cleanup()).not.toThrow(); + expect( + vol.readFileSync('/shards/20231114-221320-000/keep.txt', 'utf8'), + ).toBe('keep'); + }); + }); + + describe('lifecycle state', () => { + it('throws with appended finalizer error when finalize fails', () => { + const sw = getShardedWal({ + dir: '/shards', + format: { + baseName: 'test', + finalExtension: '.json', + finalizer: () => { + throw new Error('finalizer boom'); + }, + }, + }); + + expect(() => sw.finalize()).toThrow( + /Could not finalize sharded wal\. Finalizer method in format throws\./, + ); + expect(() => sw.finalize()).toThrow(/finalizer boom/); + expect(sw.getState()).toBe('active'); + }); + + it('should start in active state', () => { + const sw = getShardedWal(); + expect(sw.getState()).toBe('active'); + expect(sw.isFinalized()).toBeFalse(); + expect(sw.isCleaned()).toBeFalse(); + }); + + it('should transition to finalized state after finalize', () => { + vol.mkdirSync('/shards/20231114-221320-000', { recursive: true }); + const sw = getShardedWal({ + dir: '/shards', + format: { + baseName: 'test', + finalExtension: '.json', + finalizer: (records: WalRecord) => `${JSON.stringify(records)}\n`, + }, + }); + + sw.finalize(); + + expect(sw.getState()).toBe('finalized'); + expect(sw.isFinalized()).toBeTrue(); + expect(sw.isCleaned()).toBeFalse(); + }); + + it('should transition to cleaned state after cleanup (when coordinator)', () => { + vol.fromJSON({ + '/shards/20231114-221320-000/test.20231114-221320-000.10001.2.1.log': + 'content1', + }); + + const sw = getShardedWal({ + dir: '/shards', + format: { baseName: 'test', walExtension: '.log' }, + }); + + sw.cleanupIfCoordinator(); + + const state = sw.getState(); + expect(['active', 'cleaned']).toContain(state); + }); + + it('should make cleanup idempotent for coordinator', () => { + vol.fromJSON({ + '/shards/20231114-221320-000/test.20231114-221320-000.10001.2.1.log': + 'content1', + }); + + const sw = getShardedWal({ + dir: '/shards', + format: { baseName: 'test', walExtension: '.log' }, + }); + + sw.cleanup(); + expect(sw.getState()).toBe('cleaned'); + + expect(() => sw.cleanup()).not.toThrow(); + expect(sw.getState()).toBe('cleaned'); + }); + + it('should prevent shard creation after finalize', () => { + vol.mkdirSync('/shards/20231114-221320-000', { recursive: true }); + const sw = getShardedWal({ + dir: '/shards', + format: { + baseName: 'test', + finalExtension: '.json', + finalizer: (records: WalRecord) => `${JSON.stringify(records)}\n`, + }, + }); + + sw.finalize(); + + expect(() => sw.shard()).toThrow('WAL is finalized, cannot modify'); + }); + + it('should prevent shard creation after cleanup', () => { + vol.fromJSON({ + '/shards/20231114-221320-000/test.20231114-221320-000.10001.2.1.log': + 'content1', + }); + + // Generate the instance ID that will be used by the constructor + // The constructor increments ShardedWal.instanceCount, so we need to + // generate the ID using the value that will be used (current + 1) + // without actually modifying ShardedWal.instanceCount + const nextCount = ShardedWal.instanceCount + 1; + const instanceId = getUniqueInstanceId({ + next() { + return nextCount; + }, + }); + + // Set coordinator BEFORE creating instance + ShardedWal.setCoordinatorProcess( + SHARDED_WAL_COORDINATOR_ID_ENV_VAR, + instanceId, + ); + + const sw = getShardedWal({ + dir: '/shards', + format: { baseName: 'test', walExtension: '.log' }, + }); + + sw.cleanupIfCoordinator(); + + expect(() => sw.shard()).toThrow('WAL is cleaned, cannot modify'); + }); + + it('should make finalize idempotent', () => { + vol.mkdirSync('/shards/20231114-221320-000', { recursive: true }); + const sw = getShardedWal({ + dir: '/shards', + format: { + baseName: 'test', + finalExtension: '.json', + finalizer: (records: WalRecord) => `${JSON.stringify(records)}\n`, + }, + }); + + sw.finalize(); + expect(sw.getState()).toBe('finalized'); + + // Call again - should not throw and should remain finalized + sw.finalize(); + expect(sw.getState()).toBe('finalized'); + }); + + it('should prevent finalize after cleanup', () => { + // Generate the instance ID that will be used by the constructor + // The constructor increments ShardedWal.instanceCount, so we need to + // generate the ID using the value that will be used (current + 1) + // without actually modifying ShardedWal.instanceCount + const nextCount = ShardedWal.instanceCount + 1; + const instanceId = getUniqueInstanceId({ + next() { + return nextCount; + }, + }); + + // Set coordinator BEFORE creating instance + ShardedWal.setCoordinatorProcess( + SHARDED_WAL_COORDINATOR_ID_ENV_VAR, + instanceId, + ); + + const sw = getShardedWal({ + dir: '/shards', + format: { + baseName: 'test', + walExtension: '.log', + finalExtension: '.json', + finalizer: records => `${JSON.stringify(records)}\n`, + }, + }); + + expect(sw.stats.shardFiles).toHaveLength(0); + sw.shard(); + expect(sw.stats.shardFiles).toHaveLength(0); + + sw.cleanupIfCoordinator(); + expect(sw.getState()).toBe('cleaned'); + expect(sw.stats.shardFiles).toHaveLength(0); + }); + }); +}); diff --git a/packages/utils/src/lib/wal.int.test.ts b/packages/utils/src/lib/wal.int.test.ts new file mode 100644 index 000000000..81c71709b --- /dev/null +++ b/packages/utils/src/lib/wal.int.test.ts @@ -0,0 +1,127 @@ +import fs from 'node:fs/promises'; +import path from 'node:path'; +import { afterEach, beforeEach, describe, expect, it } from 'vitest'; +import { + type Codec, + type WalRecord, + WriteAheadLogFile, + stringCodec, +} from './wal.js'; + +describe('WriteAheadLogFile Integration', () => { + const testDir = path.join(process.cwd(), 'tmp', 'int', 'utils', 'wal'); + let walFile: WriteAheadLogFile; + + beforeEach(async () => { + // Clean up test directory + await fs.rm(testDir, { recursive: true, force: true }); + await fs.mkdir(testDir, { recursive: true }); + }); + + afterEach(async () => { + if (walFile && !walFile.isClosed()) { + walFile.close(); + } + await fs.rm(testDir, { recursive: true, force: true }); + }); + + it('should recover from file with partial write', async () => { + const filePath = path.join(testDir, 'partial.log'); + walFile = new WriteAheadLogFile({ file: filePath, codec: stringCodec() }); + + walFile.open(); + walFile.append('complete1'); + walFile.append('complete2'); + walFile.close(); + + // Simulate partial write by appending incomplete line + await fs.appendFile(filePath, '"partial'); + + const recovered = walFile.recover(); + expect(recovered.records).toEqual(['complete1', 'complete2']); + expect(recovered.partialTail).toBe('"partial'); + }); + + it('should repack file removing invalid entries', () => { + const filePath = path.join(testDir, 'repack.log'); + const tolerantCodec: Codec = { + encode: v => (typeof v === 'string' ? v : JSON.stringify(v)), + decode: (s: string) => { + if (s === 'invalid') throw new Error('Invalid record'); + return s; + }, + }; + + walFile = new WriteAheadLogFile({ file: filePath, codec: tolerantCodec }); + walFile.open(); + walFile.append('valid1'); + walFile.append('invalid'); + walFile.append('valid2'); + walFile.close(); + + walFile.repack(); + + const recovered = walFile.recover(); + expect(recovered.records).toEqual(['valid1', 'valid2']); + }); + + it('should handle error recovery scenarios', () => { + const filePath = path.join(testDir, 'errors.log'); + const failingCodec: Codec = { + encode: v => (typeof v === 'string' ? v : JSON.stringify(v)), + decode: (s: string) => { + if (s === 'bad') throw new Error('Bad record'); + return s; + }, + }; + + walFile = new WriteAheadLogFile({ file: filePath, codec: failingCodec }); + walFile.open(); + walFile.append('good'); + walFile.append('bad'); + walFile.append('good'); + walFile.close(); + + const recovered = walFile.recover(); + expect(recovered.records).toEqual([ + 'good', + { __invalid: true, raw: 'bad' }, + 'good', + ]); + expect(recovered.errors).toEqual([]); + }); + + it('should handle object records correctly', () => { + const filePath = path.join(testDir, 'objects.log'); + walFile = new WriteAheadLogFile({ + file: filePath, + codec: stringCodec(), + }); + + walFile.open(); + walFile.append({ id: 1, name: 'test1' }); + walFile.append({ id: 2, name: 'test2' }); + walFile.close(); + + const recovered = walFile.recover(); + expect(recovered.records).toEqual([ + { id: 1, name: 'test1' }, + { id: 2, name: 'test2' }, + ]); + }); + + it('should perform complete write/recover cycle', () => { + const filePath = path.join(testDir, 'test.log'); + walFile = new WriteAheadLogFile({ file: filePath, codec: stringCodec() }); + + walFile.open(); + walFile.append('record1'); + walFile.append('record2'); + walFile.close(); + + const recovered = walFile.recover(); + expect(recovered.records).toEqual(['record1', 'record2']); + expect(recovered.errors).toEqual([]); + expect(recovered.partialTail).toBeNull(); + }); +}); diff --git a/packages/utils/src/lib/wal.ts b/packages/utils/src/lib/wal.ts index 2d3cd2e09..22173f5b1 100644 --- a/packages/utils/src/lib/wal.ts +++ b/packages/utils/src/lib/wal.ts @@ -1,13 +1,5 @@ -/* eslint-disable max-lines */ import * as fs from 'node:fs'; import path from 'node:path'; -import process from 'node:process'; -import { threadId } from 'node:worker_threads'; -import { - type Counter, - getUniqueInstanceId, - getUniqueTimeId, -} from './process-id.js'; /** * Codec for encoding/decoding values to/from strings for WAL storage. @@ -150,7 +142,9 @@ export function recoverFromContent( * Write-Ahead Log implementation for crash-safe append-only logging. * Provides atomic operations for writing, recovering, and repacking log entries. */ -export class WriteAheadLogFile implements AppendableSink { +export class WriteAheadLogFile + implements AppendableSink +{ #fd: number | null = null; readonly #file: string; readonly #decode: Codec>['decode']; @@ -268,11 +262,13 @@ export class WriteAheadLogFile implements AppendableSink { } } +export type WalRecord = object | string; + /** * Format descriptor that binds codec and file extension together. * Prevents misconfiguration by keeping related concerns in one object. */ -export type WalFormat = { +export type WalFormat = { /** Base name for the WAL (e.g., "trace") */ baseName: string; /** Shard file extension (e.g., ".jsonl") */ @@ -314,7 +310,7 @@ export const stringCodec = < * @param format - Partial WalFormat configuration * @returns Parsed WalFormat with defaults filled in */ -export function parseWalFormat( +export function parseWalFormat( format: Partial>, ): WalFormat { const { @@ -348,232 +344,12 @@ export function parseWalFormat( } /** - * Determines if this process is the leader WAL process using the origin PID heuristic. - * - * The leader is the process that first enabled profiling (the one that set CP_PROFILER_ORIGIN_PID). - * All descendant processes inherit the environment but have different PIDs. - * - * @returns true if this is the leader WAL process, false otherwise - */ -export function isCoordinatorProcess( - envVarName: string, - profilerID: string, -): boolean { - return process.env[envVarName] === profilerID; -} - -/** - * Initialize the origin PID environment variable if not already set. - * This must be done as early as possible before any user code runs. - * Sets envVarName to the current process ID if not already defined. - */ -export function setCoordinatorProcess( - envVarName: string, - profilerID: string, -): void { - if (!process.env[envVarName]) { - // eslint-disable-next-line functional/immutable-data - process.env[envVarName] = profilerID; - } -} - -/** - * Simple counter implementation for generating sequential IDs. - */ -const shardCounter: Counter = (() => { - // eslint-disable-next-line functional/no-let - let count = 0; - return { next: () => ++count }; -})(); - -/** - * Generates a unique sharded WAL ID based on performance time origin, process ID, thread ID, and instance count. - */ -function getShardedWalId() { - // eslint-disable-next-line functional/immutable-data - return `${Math.round(performance.timeOrigin)}.${process.pid}.${threadId}.${++ShardedWal.instanceCount}`; -} - -/** + * NOTE: this helper is only used in this file. The rest of the repo avoids sync methods so it is not reusable. * Ensures a directory exists, creating it recursively if necessary using sync methods. * @param dirPath - The directory path to ensure exists */ -function ensureDirectoryExistsSync(dirPath: string): void { +export function ensureDirectoryExistsSync(dirPath: string): void { if (!fs.existsSync(dirPath)) { fs.mkdirSync(dirPath, { recursive: true }); } } - -/** - * Generates a path to a shard file using human-readable IDs. - * Both groupId and shardId are already in readable date format. - * - * Example with groupId "20240101-120000-000" and shardId "20240101-120000-000.12345.1.1": - * Full path: /base/20240101-120000-000/trace.20240101-120000-000.12345.1.1.log - * - * @param opt.dir - The directory to store the shard file - * @param opt.format - The WalFormat to use for the shard file - * @param opt.groupId - The human-readable group ID (yyyymmdd-hhmmss-ms format) - * @param opt.shardId - The human-readable shard ID (readable-timestamp.pid.threadId.count format) - * @returns The path to the shard file - */ -export function getShardedPath(opt: { - dir?: string; - format: WalFormat; - groupId: string; - shardId: string; -}): string { - const { dir = '', format, groupId, shardId } = opt; - const { baseName, walExtension } = format; - - return path.join(dir, groupId, `${baseName}.${shardId}${walExtension}`); -} - -export function getShardedFinalPath(opt: { - dir?: string; - format: WalFormat; - groupId: string; -}): string { - const { dir = '', format, groupId } = opt; - const { baseName, finalExtension } = format; - - return path.join(dir, groupId, `${baseName}.${groupId}${finalExtension}`); -} - -/** - * Sharded Write-Ahead Log manager for coordinating multiple WAL shards. - * Handles distributed logging across multiple processes/files with atomic finalization. - */ - -export class ShardedWal { - static instanceCount = 0; - readonly #id: string = getShardedWalId(); - readonly groupId = getUniqueTimeId(); - readonly #format: WalFormat; - readonly #dir: string = process.cwd(); - readonly #isCoordinator: boolean; - - /** - * Create a sharded WAL manager. - * - * @param opt.dir - Base directory to store shard files (defaults to process.cwd()) - * @param opt.format - WAL format configuration - * @param opt.groupId - Group ID for sharding (defaults to generated group ID) - * @param opt.coordinatorIdEnvVar - Environment variable name for storing coordinator ID (defaults to CP_SHARDED_WAL_COORDINATOR_ID) - */ - constructor(opt: { - dir?: string; - format: Partial>; - groupId?: string; - coordinatorIdEnvVar: string; - }) { - const { dir, format, groupId, coordinatorIdEnvVar } = opt; - this.groupId = groupId ?? getUniqueTimeId(); - if (dir) { - this.#dir = dir; - } - this.#format = parseWalFormat(format); - this.#isCoordinator = isCoordinatorProcess(coordinatorIdEnvVar, this.#id); - } - - /** - * Is this instance the coordinator? - * - * Coordinator status is determined from the coordinatorIdEnvVar environment variable. - * The coordinator handles finalization and cleanup of shard files. - * - * @returns true if this instance is the coordinator, false otherwise - */ - isCoordinator(): boolean { - return this.#isCoordinator; - } - - shard(shardId: string = getUniqueInstanceId(shardCounter)) { - return new WriteAheadLogFile({ - file: getShardedPath({ - dir: this.#dir, - format: this.#format, - groupId: this.groupId, - shardId, - }), - codec: this.#format.codec, - }); - } - - /** Get all shard file paths matching this WAL's base name */ - private shardFiles() { - if (!fs.existsSync(this.#dir)) { - return []; - } - - const groupIdDir = path.dirname( - getShardedFinalPath({ - dir: this.#dir, - format: this.#format, - groupId: this.groupId, - }), - ); - // create dir if not existing - ensureDirectoryExistsSync(groupIdDir); - - return fs - .readdirSync(groupIdDir) - .filter(entry => entry.endsWith(this.#format.walExtension)) - .filter(entry => entry.startsWith(`${this.#format.baseName}`)) - .map(entry => path.join(groupIdDir, entry)); - } - - /** - * Finalize all shards by merging them into a single output file. - * Recovers all records from all shards, validates no errors, and writes merged result. - * @throws Error if any shard contains decode errors - */ - finalize(opt?: Record) { - const fileRecoveries = this.shardFiles().map(f => ({ - file: f, - recovery: new WriteAheadLogFile({ - file: f, - codec: this.#format.codec, - }).recover(), - })); - - const records = fileRecoveries.flatMap(({ recovery }) => recovery.records); - - // Check if any records are invalid entries (from tolerant codec) - const hasInvalidEntries = records.some( - r => typeof r === 'object' && r != null && '__invalid' in r, - ); - - const recordsToFinalize = hasInvalidEntries - ? records - : filterValidRecords(records); - const out = getShardedFinalPath({ - dir: this.#dir, - format: this.#format, - groupId: this.groupId, - }); - ensureDirectoryExistsSync(path.dirname(out)); - fs.writeFileSync(out, this.#format.finalizer(recordsToFinalize, opt)); - } - - cleanup() { - this.shardFiles().forEach(f => { - // Remove the shard file - fs.unlinkSync(f); - // Remove the parent directory (shard group directory) - const shardDir = path.dirname(f); - try { - fs.rmdirSync(shardDir); - } catch { - // Directory might not be empty or already removed, ignore - } - }); - - // Also try to remove the root directory if it becomes empty - try { - fs.rmdirSync(this.#dir); - } catch { - // Directory might not be empty or already removed, ignore - } - } -} diff --git a/packages/utils/src/lib/wal.unit.test.ts b/packages/utils/src/lib/wal.unit.test.ts index 179fb7a1b..65bd1c87c 100644 --- a/packages/utils/src/lib/wal.unit.test.ts +++ b/packages/utils/src/lib/wal.unit.test.ts @@ -1,17 +1,13 @@ import { vol } from 'memfs'; import { MEMFS_VOLUME } from '@code-pushup/test-utils'; -import { SHARDED_WAL_COORDINATOR_ID_ENV_VAR } from './profiler/constants.js'; import { type Codec, type InvalidEntry, - ShardedWal, WriteAheadLogFile, createTolerantCodec, filterValidRecords, - isCoordinatorProcess, parseWalFormat, recoverFromContent, - setCoordinatorProcess, stringCodec, } from './wal.js'; @@ -609,344 +605,3 @@ describe('parseWalFormat', () => { expect(output).not.toContain('[object Object]'); }); }); - -describe('isCoordinatorProcess', () => { - it('should return true when env var matches current pid', () => { - const profilerId = `${Math.round(performance.timeOrigin)}${process.pid}.1.0`; - vi.stubEnv('TEST_LEADER_PID', profilerId); - - const result = isCoordinatorProcess('TEST_LEADER_PID', profilerId); - expect(result).toBeTrue(); - }); - - it('should return false when env var does not match current profilerId', () => { - const wrongProfilerId = `${Math.round(performance.timeOrigin)}${process.pid}.2.0`; - vi.stubEnv('TEST_LEADER_PID', wrongProfilerId); - - const currentProfilerId = `${Math.round(performance.timeOrigin)}${process.pid}.1.0`; - const result = isCoordinatorProcess('TEST_LEADER_PID', currentProfilerId); - expect(result).toBeFalse(); - }); - - it('should return false when env var is not set', () => { - vi.stubEnv('NON_EXISTENT_VAR', undefined as any); - - const profilerId = `${Math.round(performance.timeOrigin)}${process.pid}.1.0`; - const result = isCoordinatorProcess('NON_EXISTENT_VAR', profilerId); - expect(result).toBeFalse(); - }); - - it('should return false when env var is empty string', () => { - vi.stubEnv('TEST_LEADER_PID', ''); - - const profilerId = `${Math.round(performance.timeOrigin)}${process.pid}.1.0`; - const result = isCoordinatorProcess('TEST_LEADER_PID', profilerId); - expect(result).toBeFalse(); - }); -}); - -describe('setCoordinatorProcess', () => { - beforeEach(() => { - // Clean up any existing TEST_ORIGIN_PID - // eslint-disable-next-line functional/immutable-data - delete process.env['TEST_ORIGIN_PID']; - }); - - it('should set env var when not already set', () => { - expect(process.env['TEST_ORIGIN_PID']).toBeUndefined(); - - const profilerId = `${Math.round(performance.timeOrigin)}${process.pid}.1.0`; - setCoordinatorProcess('TEST_ORIGIN_PID', profilerId); - - expect(process.env['TEST_ORIGIN_PID']).toBe(profilerId); - }); - - it('should not overwrite existing env var', () => { - const existingProfilerId = `${Math.round(performance.timeOrigin)}${process.pid}.1.0`; - const newProfilerId = `${Math.round(performance.timeOrigin)}${process.pid}.2.0`; - - vi.stubEnv('TEST_ORIGIN_PID', existingProfilerId); - setCoordinatorProcess('TEST_ORIGIN_PID', newProfilerId); - - expect(process.env['TEST_ORIGIN_PID']).toBe(existingProfilerId); - }); - - it('should set env var to profiler id', () => { - const profilerId = `${Math.round(performance.timeOrigin)}${process.pid}.1.0`; - setCoordinatorProcess('TEST_ORIGIN_PID', profilerId); - - expect(process.env['TEST_ORIGIN_PID']).toBe(profilerId); - }); -}); - -describe('ShardedWal', () => { - beforeEach(() => { - vol.reset(); - vol.fromJSON({}, MEMFS_VOLUME); - }); - - it('should create instance with directory and format', () => { - const sw = new ShardedWal({ - dir: '/test/shards', - format: { - baseName: 'test-wal', - }, - coordinatorIdEnvVar: SHARDED_WAL_COORDINATOR_ID_ENV_VAR, - }); - - expect(sw).toBeInstanceOf(ShardedWal); - }); - - it('should create shard with correct file path', () => { - const sw = new ShardedWal({ - dir: '/test/shards', - format: { - baseName: 'trace', - walExtension: '.log', - }, - coordinatorIdEnvVar: SHARDED_WAL_COORDINATOR_ID_ENV_VAR, - }); - - const shard = sw.shard('20231114-221320-000.1.2.3'); - expect(shard).toBeInstanceOf(WriteAheadLogFile); - expect(shard.getPath()).toMatchPath( - '/test/shards/20231114-221320-000/trace.20231114-221320-000.1.2.3.log', - ); - }); - - it('should create shard with default shardId when no argument provided', () => { - const sw = new ShardedWal({ - dir: '/test/shards', - format: { - baseName: 'trace', - walExtension: '.log', - }, - coordinatorIdEnvVar: SHARDED_WAL_COORDINATOR_ID_ENV_VAR, - }); - - const shard = sw.shard(); - expect(shard.getPath()).toStartWithPath( - '/test/shards/20231114-221320-000/trace.20231114-221320-000.10001', - ); - expect(shard.getPath()).toEndWithPath('.log'); - }); - - it('should list no shard files when directory does not exist', () => { - const sw = new ShardedWal({ - dir: '/nonexistent', - format: { - baseName: 'test-wal', - }, - coordinatorIdEnvVar: SHARDED_WAL_COORDINATOR_ID_ENV_VAR, - }); - const files = (sw as any).shardFiles(); - expect(files).toEqual([]); - }); - - it('should list no shard files when directory is empty', () => { - const sw = new ShardedWal({ - dir: '/empty', - format: { - baseName: 'test-wal', - }, - coordinatorIdEnvVar: SHARDED_WAL_COORDINATOR_ID_ENV_VAR, - }); - // Create the group directory (matches actual getUniqueTimeId() output) - vol.mkdirSync('/empty/20231114-221320-000', { recursive: true }); - const files = (sw as any).shardFiles(); - expect(files).toEqual([]); - }); - - it('should list shard files matching extension', () => { - // Note: Real shard IDs look like "1704067200000.12345.1.1" (timestamp.pid.threadId.count) - // These test IDs use simplified format "001.1", "002.2" for predictability - vol.fromJSON({ - '/shards/20231114-221320-000/trace.19700101-000820-001.1.log': 'content1', - '/shards/20231114-221320-000/trace.19700101-000820-002.2.log': 'content2', - '/shards/other.txt': 'not a shard', - }); - - const sw = new ShardedWal({ - dir: '/shards', - format: { - baseName: 'trace', - walExtension: '.log', - }, - coordinatorIdEnvVar: SHARDED_WAL_COORDINATOR_ID_ENV_VAR, - }); - const files = (sw as any).shardFiles(); - - expect(files).toHaveLength(2); - expect(files).toEqual( - expect.arrayContaining([ - expect.pathToMatch( - '/shards/20231114-221320-000/trace.19700101-000820-001.1.log', - ), - expect.pathToMatch( - '/shards/20231114-221320-000/trace.19700101-000820-002.2.log', - ), - ]), - ); - }); - - it('should finalize empty shards to empty result', () => { - const sw = new ShardedWal({ - dir: '/shards', - format: { - baseName: 'final', - finalExtension: '.json', - finalizer: records => `${JSON.stringify(records)}\n`, - }, - coordinatorIdEnvVar: SHARDED_WAL_COORDINATOR_ID_ENV_VAR, - }); - - // Create the group directory - vol.mkdirSync('/shards/20231114-221320-000', { recursive: true }); - sw.finalize(); - - expect( - read('/shards/20231114-221320-000/final.20231114-221320-000.json'), - ).toBe('[]\n'); - }); - - it('should finalize multiple shards into single file', () => { - vol.fromJSON({ - '/shards/20231114-221320-000/merged.20240101-120000-001.1.log': - 'record1\n', - '/shards/20231114-221320-000/merged.20240101-120000-002.2.log': - 'record2\n', - }); - - const sw = new ShardedWal({ - dir: '/shards', - format: { - baseName: 'merged', - walExtension: '.log', - finalExtension: '.json', - finalizer: records => `${JSON.stringify(records)}\n`, - }, - coordinatorIdEnvVar: SHARDED_WAL_COORDINATOR_ID_ENV_VAR, - }); - - sw.finalize(); - - const result = JSON.parse( - read( - '/shards/20231114-221320-000/merged.20231114-221320-000.json', - ).trim(), - ); - expect(result).toEqual(['record1', 'record2']); - }); - - it('should handle invalid entries during finalize', () => { - vol.fromJSON({ - '/shards/20231114-221320-000/final.20240101-120000-001.1.log': 'valid\n', - '/shards/20231114-221320-000/final.20240101-120000-002.2.log': - 'invalid\n', - }); - const tolerantCodec = createTolerantCodec({ - encode: (s: string) => s, - decode: (s: string) => { - if (s === 'invalid') throw new Error('Bad record'); - return s; - }, - }); - - const sw = new ShardedWal({ - dir: '/shards', - format: { - baseName: 'final', - walExtension: '.log', - finalExtension: '.json', - codec: tolerantCodec, - finalizer: records => `${JSON.stringify(records)}\n`, - }, - coordinatorIdEnvVar: SHARDED_WAL_COORDINATOR_ID_ENV_VAR, - }); - - sw.finalize(); - - const result = JSON.parse( - read('/shards/20231114-221320-000/final.20231114-221320-000.json').trim(), - ); - expect(result).toHaveLength(2); - expect(result[0]).toBe('valid'); - expect(result[1]).toEqual({ __invalid: true, raw: 'invalid' }); - }); - - it('should cleanup shard files', () => { - vol.fromJSON({ - '/shards/20231114-221320-000/test.20231114-221320-000.10001.2.1.log': - 'content1', - '/shards/20231114-221320-000/test.20231114-221320-000.10001.2.2.log': - 'content2', - }); - const sw = new ShardedWal({ - dir: '/shards', - format: { - baseName: 'test', - walExtension: '.log', - }, - coordinatorIdEnvVar: SHARDED_WAL_COORDINATOR_ID_ENV_VAR, - }); - - expect(vol.toJSON()).toStrictEqual({ - '/shards/20231114-221320-000/test.20231114-221320-000.10001.2.1.log': - 'content1', - '/shards/20231114-221320-000/test.20231114-221320-000.10001.2.2.log': - 'content2', - }); - - sw.cleanup(); - - expect(vol.toJSON()).toStrictEqual({}); - }); - - it('should handle cleanup when some shard files do not exist', () => { - vol.fromJSON({ - '/shards/20231114-221320-000/test.20231114-221320-000.10001.2.1.log': - 'content1', - }); - - const sw = new ShardedWal({ - dir: '/shards', - format: { - baseName: 'test', - walExtension: '.log', - }, - coordinatorIdEnvVar: SHARDED_WAL_COORDINATOR_ID_ENV_VAR, - }); - - vol.unlinkSync( - '/shards/20231114-221320-000/test.20231114-221320-000.10001.2.1.log', - ); - expect(() => sw.cleanup()).not.toThrow(); - }); - - it('should use custom options in finalizer', () => { - vol.fromJSON({ - '/shards/20231114-221320-000/final.20231114-221320-000.10001.2.1.log': - 'record1\n', - }); - - const sw = new ShardedWal({ - dir: '/shards', - format: { - baseName: 'final', - walExtension: '.log', - finalExtension: '.json', - finalizer: (records, opt) => - `${JSON.stringify({ records, meta: opt })}\n`, - }, - coordinatorIdEnvVar: SHARDED_WAL_COORDINATOR_ID_ENV_VAR, - }); - - sw.finalize({ version: '1.0', compressed: true }); - - const result = JSON.parse( - read('/shards/20231114-221320-000/final.20231114-221320-000.json'), - ); - expect(result.records).toEqual(['record1']); - expect(result.meta).toEqual({ version: '1.0', compressed: true }); - }); -});