From e896e0bc898e3a2851f556a177ac59a9d60b9d79 Mon Sep 17 00:00:00 2001 From: Jared Perreault Date: Thu, 5 Feb 2026 11:26:05 -0500 Subject: [PATCH 1/4] progress --- .../auth-foundation/src/utils/TaskBridge.ts | 29 +++++-- .../test/spec/utils/TaskBridge.spec.ts | 75 ++++++++++++++----- .../jest-helpers/browser/jest.environment.js | 2 + tooling/jest-helpers/browser/jest.setup.ts | 3 +- 4 files changed, 83 insertions(+), 26 deletions(-) diff --git a/packages/auth-foundation/src/utils/TaskBridge.ts b/packages/auth-foundation/src/utils/TaskBridge.ts index ae25432..592c214 100644 --- a/packages/auth-foundation/src/utils/TaskBridge.ts +++ b/packages/auth-foundation/src/utils/TaskBridge.ts @@ -86,6 +86,7 @@ export abstract class TaskBridge { }); this.#pending.set(request.id, request); + let abortHandler: () => void; const result = (new Promise((resolve, reject) => { const setTimeoutTimer = () => { // `options.timeout` set to `null` disables the timeout mechanism @@ -103,14 +104,16 @@ export abstract class TaskBridge { setTimeoutTimer(); // forces the pending promise to reject, so resources clean up if the request is aborted - request.signal.addEventListener('abort', () => { + abortHandler = () => { + console.log('thrown abort error') reject(new DOMException('Aborted', 'AbortError')); - }); + }; + request.signal.addEventListener('abort', abortHandler); // This channel is meant for the Receiver to send the results (aka `HandlerMessage` messages) // ignore all Requestor events received (aka `RequestorMessage`) responseChannel.onmessage = (event) => { - if ('action' in event.data) { + if (request.signal.aborted || 'action' in event.data) { return; // ignore message } @@ -141,20 +144,23 @@ export abstract class TaskBridge { } requestChannel.close(); responseChannel.close(); + request.signal.removeEventListener('abort', abortHandler); this.#pending.delete(request.id); }); // TODO: review - const cancel = () => { + const abort = () => { responseChannel.postMessage({ action: 'CANCEL', __v: TaskBridge.BridgeVersion }); + request.controller.abort('cancel'); }; - return { result, cancel }; + return { result, abort }; } subscribe(handler: TaskBridge.TaskHandler) { this.#channel = this.createBridgeChannel(); this.#channel.onmessage = async (evt, reply) => { + console.log('onmessage: ', evt.data, reply); const { requestId, __v, ...rest } = evt.data; if (!requestId) { @@ -173,6 +179,8 @@ export abstract class TaskBridge { this.pushMessage(message); responseChannel.onmessage = (event) => { + console.log('[response channel]', event.data); + // The Requestor may send a `RequestorMessage` (like `CANCEL`) to the Subscriber // ignore `HandlerMessage` messages - only the Requestor cares about those if ('status' in event.data) { @@ -184,20 +192,25 @@ export abstract class TaskBridge { case 'CANCEL': // TODO: probably don't need to reply, just cancel action, if possible // responseChannel.postMessage({ status: 'CANCELED' }); + console.log('received cancel') message.abort('cancel'); break; } }; try { + console.log('in try') message.reply('PENDING'); // send instantaneous `PENDING` message, essentially a "received" event + console.log('sent PENDING') await handler( evt.data, // message payload (response) => message.reply(response, 'SUCCESS'), // reply fn { signal: message.signal } // options ); + console.log('handler done') } catch (err) { + console.log('error thrown', err); if (err instanceof DOMException && err.name === 'AbortError') { return null; } @@ -207,6 +220,7 @@ export abstract class TaskBridge { } } finally { + console.log('finally') this.clearMessage(requestId); responseChannel.close(); } @@ -311,7 +325,7 @@ export namespace TaskBridge { reply (data: S, status: TaskBridge.TaskStatus): void; reply (status: 'PENDING'): void; reply (data: S | 'PENDING', status: TaskBridge.TaskStatus = 'SUCCESS') { - const fn = this.replyFn ?? this.channel.postMessage; + const fn = this.replyFn ?? this.channel.postMessage.bind(this.channel); if (data === 'PENDING' || status === 'PENDING') { // only send `PENDING` heartbeats when using <= v2 of the TaskBridge payload structure @@ -345,6 +359,7 @@ export namespace TaskBridge { */ export type TaskOptions = { timeout?: number | null; + signal?: AbortSignal; }; /** @@ -352,7 +367,7 @@ export namespace TaskBridge { */ export type TaskResponse = { result: Promise; - cancel: () => void; + abort: () => void; }; /** diff --git a/packages/auth-foundation/test/spec/utils/TaskBridge.spec.ts b/packages/auth-foundation/test/spec/utils/TaskBridge.spec.ts index f05e98e..2310afe 100644 --- a/packages/auth-foundation/test/spec/utils/TaskBridge.spec.ts +++ b/packages/auth-foundation/test/spec/utils/TaskBridge.spec.ts @@ -34,7 +34,7 @@ class TestChannel implements BroadcastChannelLike { get onmessage () { return this.#handler; } - + set onmessage (handler) { if (handler === null) { this.channel.onmessage = null; @@ -54,6 +54,7 @@ class TestChannel implements BroadcastChannelLike { } postMessage(message: M): void { + console.log('postMessage called', message) this.channel.postMessage(message); } @@ -62,19 +63,28 @@ class TestChannel implements BroadcastChannelLike { } } -class TestBus extends TaskBridge { +class TestBus extends TaskBridge { - protected createBridgeChannel (): TaskBridge.BridgeChannel { - return new TestChannel(this.name); - } + // protected createBridgeChannel (): TaskBridge.BridgeChannel { + // return new TestChannel(this.name); + // } - protected createTaskChannel(name: string): TaskBridge.TaskChannel { - return new TestChannel(name); + // protected createTaskChannel(name: string): TaskBridge.TaskChannel { + // return new TestChannel(name); + // } + + protected createBridgeChannel (): TaskBridge.BridgeChannel { + return new BroadcastChannel(this.name) as TaskBridge.BridgeChannel; + } + + protected createTaskChannel(name: string): TaskBridge.TaskChannel { + return new BroadcastChannel(name) as TaskBridge.TaskChannel; } } +const sleep = (ms) => new Promise(resolve => setTimeout(resolve, ms)); -describe.skip('TaskBridge', () => { +describe('TaskBridge', () => { let receiver: TaskBridge; let sender: TaskBridge; @@ -86,25 +96,54 @@ describe.skip('TaskBridge', () => { afterEach(() => { receiver.close(); sender.close(); + jest.clearAllTimers(); }); describe('test', () => { - it('sends and receives messages', async () => { - const channel = new BroadcastChannel('test'); - channel.onmessage = (event) => { - console.log('[monitor]: ', event.data); - }; + it('sends and receives messages between separate instances', async () => { + const response = { foo: '2', bar: '1' }; receiver.subscribe(async (message, reply) => { - console.log('handler called'); - reply({ foo: '2', bar: '1' }); + reply(response); }); const result = await sender.send({ foo: 1, bar: 2 }).result; - expect(result).toEqual({ bar: 'baz' }); - - channel.close(); + expect(result).toEqual(response); }); + + fit('can handle aborting pending tasks', async () => { + jest.useFakeTimers(); + expect.assertions(4); // ensures `catch` block is reached + + const abortListener = jest.fn(); + const handler = jest.fn().mockImplementation( async (message, reply, { signal }) => { + // TODO: why isn't this being called? + signal.addEventListener('abort', abortListener, { once: true }); + + await sleep(1000); // sleep to delay responding to the message, so the abort fires first + reply({ foo: '1', bar: '2' }); + }); + receiver.subscribe(handler); + + try { + const { result, abort } = sender.send({ foo: 1, bar: 2 }); + // flushes the promise queue, so the `receiver.subscribe` handler actually gets called + await jest.advanceTimersByTimeAsync(100); + abort(); + await jest.advanceTimersByTimeAsync(100); + await result; + } + catch (err) { + console.log(err); + expect(err).toBeInstanceOf(DOMException); + expect((err as Error).name).toEqual('AbortError'); + } + + expect(handler).toHaveBeenCalled(); + expect(abortListener).toHaveBeenCalled(); + + jest.useRealTimers(); + }, 100000); }); // xdescribe('', async () => { diff --git a/tooling/jest-helpers/browser/jest.environment.js b/tooling/jest-helpers/browser/jest.environment.js index f025a39..f06b89e 100644 --- a/tooling/jest-helpers/browser/jest.environment.js +++ b/tooling/jest-helpers/browser/jest.environment.js @@ -15,6 +15,8 @@ class CustomJSDomEnv extends JSDOMEnv { this.global.Request = Request; this.global.Response = Response; this.global.Headers = Headers; + this.global.BroadcastChannel = BroadcastChannel; + this.global.DOMException = DOMException; } } diff --git a/tooling/jest-helpers/browser/jest.setup.ts b/tooling/jest-helpers/browser/jest.setup.ts index 43e699c..a7bcbc8 100644 --- a/tooling/jest-helpers/browser/jest.setup.ts +++ b/tooling/jest-helpers/browser/jest.setup.ts @@ -25,7 +25,8 @@ class MockBroadcastChannel implements BroadcastChannel { global.TextEncoder = TextEncoder; global.TextDecoder = TextDecoder; -global.BroadcastChannel = MockBroadcastChannel; +// global.BroadcastChannel = MockBroadcastChannel; +// global.BroadcastChannel = BroadcastChannel; global.fetch = () => { throw new Error(` From dcba805c7693d9f902a3b35b6fa2c01495143030 Mon Sep 17 00:00:00 2001 From: Jared Perreault Date: Fri, 6 Feb 2026 14:55:34 -0500 Subject: [PATCH 2/4] progess --- .../auth-foundation/src/utils/TaskBridge.ts | 80 ++++++- .../auth-foundation/src/utils/asPromise.ts | 7 + .../test/spec/utils/TaskBridge.spec.ts | 204 ++++++++++++++++-- 3 files changed, 261 insertions(+), 30 deletions(-) create mode 100644 packages/auth-foundation/src/utils/asPromise.ts diff --git a/packages/auth-foundation/src/utils/TaskBridge.ts b/packages/auth-foundation/src/utils/TaskBridge.ts index 592c214..16e64be 100644 --- a/packages/auth-foundation/src/utils/TaskBridge.ts +++ b/packages/auth-foundation/src/utils/TaskBridge.ts @@ -1,5 +1,6 @@ import type { BroadcastChannelLike } from '../types/index.ts'; import { shortID } from '../crypto/index.ts'; +import { AuthSdkError } from '../errors/AuthSdkError.ts'; /** @useDeclaredType */ type TypeMap = Record; @@ -56,6 +57,7 @@ export abstract class TaskBridge { } protected pushMessage (message: TaskBridge.Task ) { + console.log('pushMessage called'); this.#pending.set(message.id, message); // if there is no active heartbeat, start one if (this.#heartbeatInt === null) { @@ -88,7 +90,9 @@ export abstract class TaskBridge { let abortHandler: () => void; const result = (new Promise((resolve, reject) => { - const setTimeoutTimer = () => { + const resetTimeoutTimer = () => { + console.log('reset called') + // `options.timeout` set to `null` disables the timeout mechanism if (options.timeout === null) { return; @@ -98,10 +102,12 @@ export abstract class TaskBridge { clearTimeout(timeoutId); } // TODO: error type - timeoutId = setTimeout(() => reject(new Error('timeout')), options.timeout ?? 5000); + timeoutId = setTimeout(() => reject( + new TaskBridge.TimeoutError('timeout') + ), options.timeout ?? 5000); }; // sets timeout timer - setTimeoutTimer(); + resetTimeoutTimer(); // forces the pending promise to reject, so resources clean up if the request is aborted abortHandler = () => { @@ -129,8 +135,13 @@ export abstract class TaskBridge { case 'PENDING': // defer the timeout timer when a heartbeat is received (host is still working) - setTimeoutTimer(); + resetTimeoutTimer(); + break; + case 'ABORTED': + // reject(new DOMException('Aborted', 'AbortError')); + console.log('here') + request.abort('Host Aborted'); break; } }; @@ -139,6 +150,7 @@ export abstract class TaskBridge { requestChannel.close(); })) .finally(() => { + console.log('in finally', timeoutId) if (timeoutId) { clearTimeout(timeoutId); } @@ -210,9 +222,19 @@ export abstract class TaskBridge { console.log('handler done') } catch (err) { - console.log('error thrown', err); - if (err instanceof DOMException && err.name === 'AbortError') { - return null; + if (err instanceof DOMException) { + if (err.name === 'AbortError') { + // task was aborted, do nothing + return null; + } + + if (err.name === 'InvalidStateError') { + // this is error is thrown if a `.postMessage` is attempted after the channel is closed + // this can happen when the `handler` function attempts to `reply()` after `.close()` + // is called. Ignore the error, the `AbortSignal` is provided to the `handler` for + // if needed + return null; + } } if (err instanceof Error) { @@ -227,12 +249,24 @@ export abstract class TaskBridge { }; } + /** + * Returns the number of pending tasks + */ + get pending (): number { + return this.#pending.size; + } + close () { this.#channel?.close(); for (const message of this.#pending.values()) { message.abort(); message.channel.close(); + this.clearMessage(message.id); } + // this.#pending.clear(); + // if (this.#heartbeatInt) { + // clearInterval(this.#heartbeatInt); + // } } } @@ -243,7 +277,7 @@ export namespace TaskBridge { /** * Possible `status` values indicating the process of an orchestrated request */ - export type TaskStatus = 'PENDING' | 'SUCCESS' | 'FAILED'; + export type TaskStatus = 'PENDING' | 'SUCCESS' | 'FAILED' | 'ABORTED'; export type BridgeVersions = 1 | 2; @@ -261,6 +295,9 @@ export namespace TaskBridge { } | { status: 'PENDING' __v: BridgeVersions; + } | { + status: 'ABORTED' + __v: BridgeVersions; } /** @@ -323,8 +360,8 @@ export namespace TaskBridge { } reply (data: S, status: TaskBridge.TaskStatus): void; - reply (status: 'PENDING'): void; - reply (data: S | 'PENDING', status: TaskBridge.TaskStatus = 'SUCCESS') { + reply (status: 'PENDING' | 'ABORTED'): void; + reply (data: S | 'PENDING' | 'ABORTED', status: TaskBridge.TaskStatus = 'SUCCESS') { const fn = this.replyFn ?? this.channel.postMessage.bind(this.channel); if (data === 'PENDING' || status === 'PENDING') { @@ -333,6 +370,12 @@ export namespace TaskBridge { fn({ status: 'PENDING', __v: this.__v } satisfies HandlerMessage); } } + else if (data === 'ABORTED' || status === 'ABORTED') { + // only send `PENDING` heartbeats when using <= v2 of the TaskBridge payload structure + if (this.__v === 2) { + fn({ status: 'ABORTED', __v: this.__v } satisfies HandlerMessage); + } + } else { // TODO: remove this condition - OKTA-1053515 if (this.__v < 2) { @@ -346,6 +389,7 @@ export namespace TaskBridge { } abort (...args: Parameters) { + this.reply('ABORTED'); return this.controller.abort(...args); } @@ -379,4 +423,20 @@ export namespace TaskBridge { options?: { signal: AbortSignal } ) => any; + /** + * @group Errors + */ + export class TimeoutError extends AuthSdkError { + #timeout: boolean = false; + + constructor (...args: ConstructorParameters) { + const [message, ...rest] = args; + super(message ?? 'timeout', ...rest); + this.#timeout = true; + } + + get timeout (): boolean { + return this.#timeout; + } + }; } diff --git a/packages/auth-foundation/src/utils/asPromise.ts b/packages/auth-foundation/src/utils/asPromise.ts new file mode 100644 index 0000000..6fb299b --- /dev/null +++ b/packages/auth-foundation/src/utils/asPromise.ts @@ -0,0 +1,7 @@ + +export function eventAsPromise (target: EventTarget, event: string, shouldThrow: boolean = false) { + return new Promise((resolve, reject) => { + const fn = shouldThrow ? reject : resolve; + target.addEventListener(event, fn); + }); +} diff --git a/packages/auth-foundation/test/spec/utils/TaskBridge.spec.ts b/packages/auth-foundation/test/spec/utils/TaskBridge.spec.ts index 2310afe..a869dc0 100644 --- a/packages/auth-foundation/test/spec/utils/TaskBridge.spec.ts +++ b/packages/auth-foundation/test/spec/utils/TaskBridge.spec.ts @@ -1,5 +1,6 @@ import { BroadcastChannelLike, JsonRecord } from 'src/types'; import { TaskBridge } from 'src/utils/TaskBridge.ts'; +import { eventAsPromise } from 'src/utils/asPromise.ts'; type TestRequest = { ADD: { @@ -94,6 +95,9 @@ describe('TaskBridge', () => { }); afterEach(() => { + expect(receiver.pending).toEqual(0); + expect(sender.pending).toEqual(0); + receiver.close(); sender.close(); jest.clearAllTimers(); @@ -101,49 +105,209 @@ describe('TaskBridge', () => { describe('test', () => { it('sends and receives messages between separate instances', async () => { + jest.useFakeTimers(); + const response = { foo: '2', bar: '1' }; receiver.subscribe(async (message, reply) => { reply(response); }); - const result = await sender.send({ foo: 1, bar: 2 }).result; - expect(result).toEqual(response); + const { result } = sender.send({ foo: 1, bar: 2 }); + await expect(result).resolves.toEqual(response); + expect(jest.getTimerCount()).toBe(0); + + jest.useRealTimers(); }); - fit('can handle aborting pending tasks', async () => { + it('can handle aborting pending tasks', async () => { jest.useFakeTimers(); - expect.assertions(4); // ensures `catch` block is reached const abortListener = jest.fn(); const handler = jest.fn().mockImplementation( async (message, reply, { signal }) => { - // TODO: why isn't this being called? signal.addEventListener('abort', abortListener, { once: true }); - await sleep(1000); // sleep to delay responding to the message, so the abort fires first + await sleep(50); // sleep to delay responding to the message, so the abort fires first reply({ foo: '1', bar: '2' }); }); receiver.subscribe(handler); - try { - const { result, abort } = sender.send({ foo: 1, bar: 2 }); - // flushes the promise queue, so the `receiver.subscribe` handler actually gets called - await jest.advanceTimersByTimeAsync(100); - abort(); - await jest.advanceTimersByTimeAsync(100); - await result; - } - catch (err) { - console.log(err); - expect(err).toBeInstanceOf(DOMException); - expect((err as Error).name).toEqual('AbortError'); - } + const { result, abort } = sender.send({ foo: 1, bar: 2 }); + + // flush microtasks to ensure subscribe abortHandler is set up + // await sleep(10); + await jest.advanceTimersByTimeAsync(10); + + abort(); + + await expect(result).rejects.toThrow(DOMException); + await expect(result).rejects.toThrow('Aborted'); + + // wait a bit more to ensure abort listener is called + // await sleep(100); + await jest.advanceTimersByTimeAsync(100); expect(handler).toHaveBeenCalled(); expect(abortListener).toHaveBeenCalled(); + expect(jest.getTimerCount()).toBe(0); + + jest.useRealTimers(); + }); + + it('will not timeout a pending request when host is available', async () => { + jest.useFakeTimers(); + + const response = { foo: '2', bar: '1' }; + const largeDelay = 10000; + + // clever way of capturing the requestId + let requestId; + const bc = new BroadcastChannel('test'); + bc.onmessage = (evt => { + console.log('[bc]', evt.data); + if (evt.data.requestId) { + requestId = evt.data.requestId; + } + }); + + receiver.subscribe(async (message, reply) => { + await sleep(largeDelay); // very long delay + reply(response); + }); + + const { result } = sender.send({ foo: 1, bar: 2 }); + // advance timers to send BroadcastChannel messages + await jest.advanceTimersByTimeAsync(100); + + // listen on "response channel" and count number of `PENDING` "pings" + let pendingCount = 0; + const channel = new BroadcastChannel(requestId); + channel.onmessage = (evt) => { + if (evt.data.status === 'PENDING') { + pendingCount++; + } + }; + + // advance the timers to the length of the delay, so response is finally returned + await jest.advanceTimersByTimeAsync(largeDelay); + + await expect(result).resolves.toEqual(response); + // expect a predictable number of 'PENDING' pings given the large delay + expect(pendingCount).toEqual(largeDelay / receiver.heartbeatInterval); + expect(jest.getTimerCount()).toBe(0); + + // cleanup + jest.useRealTimers(); + bc.close(); + channel.close(); + }); + + it('will timeout when host does not response within default timeout window', async () => { + expect.assertions(4); // ensures `result.catch()` is invoked + jest.useFakeTimers(); + + receiver.close(); + + const { result } = sender.send({ foo: 1, bar: 2 }); + + // use `.catch` to bind listener synchronously + const promise = result.catch(err => { + expect(err).toBeInstanceOf(TaskBridge.TimeoutError); + }); + + await jest.advanceTimersByTimeAsync(10000); + await promise; + + expect(jest.getTimerCount()).toBe(0); + + jest.useRealTimers(); + }); + + it('will timeout when host does not response within user defined timeout window', async () => { + expect.assertions(4); // ensures `result.catch()` is invoked + jest.useFakeTimers(); + + const largeTimeout = 10000; + + receiver.close(); + + const { result } = sender.send({ foo: 1, bar: 2 }, { timeout: largeTimeout - 100 }); + + // use `.catch` to bind listener synchronously + const promise = result.catch(err => { + expect(err).toBeInstanceOf(TaskBridge.TimeoutError); + }); + + await jest.advanceTimersByTimeAsync(10000); + await promise; + + expect(jest.getTimerCount()).toBe(0); + + jest.useRealTimers(); + }); + + it('will timeout when no host is avaiable', async () => { + expect.assertions(4); // ensures `result.catch()` is invoked + jest.useFakeTimers(); + + const timeout = 100; + + // NOTE: no `receiver.subscribe` call + + const { result } = sender.send({ foo: 1, bar: 2 }, { timeout }); + + // use `.catch` to bind listener synchronously + const promise = result.catch(err => { + expect(err).toBeInstanceOf(TaskBridge.TimeoutError); + }); + + await jest.advanceTimersByTimeAsync(timeout); + await promise; + + expect(jest.getTimerCount()).toBe(0); + + jest.useRealTimers(); + }); + + fit('will abort pending tasks when closed', async () => { + jest.useFakeTimers(); + + const abortListener = jest.fn(); + const handler = jest.fn().mockImplementation(async (message, reply, { signal }) => { + signal.addEventListener('abort', abortListener); + + // sleep to delay responding to the message, so the abort fires first + // await sleep(sender.heartbeatInterval * 10); + + await Promise.race([ + sleep(sender.heartbeatInterval * 10), + eventAsPromise(signal, 'abort', true), + ]); + + reply({ foo: '1', bar: '2' }); + }); + receiver.subscribe(handler); + + const promises = Promise.allSettled(Array.from({ length: 3 }, (_, i) => { + const { result } = sender.send({ foo: 1 + i, bar: 2 + i }, { timeout: null }); + return result; + })); + + // flush microtasks to ensure subscribe handler is set up + await jest.advanceTimersByTimeAsync(10); + + expect(handler).toHaveBeenCalledTimes(3); + + receiver.close(); + const result = await promises; + await jest.advanceTimersByTimeAsync(10); + + expect(result).toEqual(Array(3).fill({ status: 'rejected', reason: expect.any(DOMException) })); + expect(abortListener).toHaveBeenCalledTimes(3); + expect(jest.getTimerCount()).toBe(0); jest.useRealTimers(); - }, 100000); + }, 10000); }); // xdescribe('', async () => { From fcec803c2a1cf645e2f4d4608fad757123e51f12 Mon Sep 17 00:00:00 2001 From: Jared Perreault Date: Fri, 6 Feb 2026 19:13:32 -0500 Subject: [PATCH 3/4] added tests --- .../auth-foundation/src/utils/TaskBridge.ts | 31 +- .../auth-foundation/src/utils/asPromise.ts | 7 - .../test/spec/utils/TaskBridge.spec.ts | 422 +++++++++--------- 3 files changed, 217 insertions(+), 243 deletions(-) delete mode 100644 packages/auth-foundation/src/utils/asPromise.ts diff --git a/packages/auth-foundation/src/utils/TaskBridge.ts b/packages/auth-foundation/src/utils/TaskBridge.ts index 16e64be..4af4ebc 100644 --- a/packages/auth-foundation/src/utils/TaskBridge.ts +++ b/packages/auth-foundation/src/utils/TaskBridge.ts @@ -57,7 +57,6 @@ export abstract class TaskBridge { } protected pushMessage (message: TaskBridge.Task ) { - console.log('pushMessage called'); this.#pending.set(message.id, message); // if there is no active heartbeat, start one if (this.#heartbeatInt === null) { @@ -91,8 +90,6 @@ export abstract class TaskBridge { let abortHandler: () => void; const result = (new Promise((resolve, reject) => { const resetTimeoutTimer = () => { - console.log('reset called') - // `options.timeout` set to `null` disables the timeout mechanism if (options.timeout === null) { return; @@ -110,10 +107,7 @@ export abstract class TaskBridge { resetTimeoutTimer(); // forces the pending promise to reject, so resources clean up if the request is aborted - abortHandler = () => { - console.log('thrown abort error') - reject(new DOMException('Aborted', 'AbortError')); - }; + abortHandler = () => reject(new DOMException('Aborted', 'AbortError')); request.signal.addEventListener('abort', abortHandler); // This channel is meant for the Receiver to send the results (aka `HandlerMessage` messages) @@ -139,8 +133,6 @@ export abstract class TaskBridge { break; case 'ABORTED': - // reject(new DOMException('Aborted', 'AbortError')); - console.log('here') request.abort('Host Aborted'); break; } @@ -150,7 +142,6 @@ export abstract class TaskBridge { requestChannel.close(); })) .finally(() => { - console.log('in finally', timeoutId) if (timeoutId) { clearTimeout(timeoutId); } @@ -160,7 +151,6 @@ export abstract class TaskBridge { this.#pending.delete(request.id); }); - // TODO: review const abort = () => { responseChannel.postMessage({ action: 'CANCEL', __v: TaskBridge.BridgeVersion }); request.controller.abort('cancel'); @@ -172,7 +162,6 @@ export abstract class TaskBridge { subscribe(handler: TaskBridge.TaskHandler) { this.#channel = this.createBridgeChannel(); this.#channel.onmessage = async (evt, reply) => { - console.log('onmessage: ', evt.data, reply); const { requestId, __v, ...rest } = evt.data; if (!requestId) { @@ -191,8 +180,6 @@ export abstract class TaskBridge { this.pushMessage(message); responseChannel.onmessage = (event) => { - console.log('[response channel]', event.data); - // The Requestor may send a `RequestorMessage` (like `CANCEL`) to the Subscriber // ignore `HandlerMessage` messages - only the Requestor cares about those if ('status' in event.data) { @@ -202,24 +189,18 @@ export abstract class TaskBridge { // event type is now `RequestorMessage` switch (event.data.action) { case 'CANCEL': - // TODO: probably don't need to reply, just cancel action, if possible - // responseChannel.postMessage({ status: 'CANCELED' }); - console.log('received cancel') message.abort('cancel'); break; } }; try { - console.log('in try') message.reply('PENDING'); // send instantaneous `PENDING` message, essentially a "received" event - console.log('sent PENDING') await handler( evt.data, // message payload (response) => message.reply(response, 'SUCCESS'), // reply fn { signal: message.signal } // options ); - console.log('handler done') } catch (err) { if (err instanceof DOMException) { @@ -242,7 +223,6 @@ export abstract class TaskBridge { } } finally { - console.log('finally') this.clearMessage(requestId); responseChannel.close(); } @@ -263,10 +243,11 @@ export abstract class TaskBridge { message.channel.close(); this.clearMessage(message.id); } - // this.#pending.clear(); - // if (this.#heartbeatInt) { - // clearInterval(this.#heartbeatInt); - // } + this.#pending.clear(); + if (this.#heartbeatInt !== null) { + clearInterval(this.#heartbeatInt); + this.#heartbeatInt = null; + } } } diff --git a/packages/auth-foundation/src/utils/asPromise.ts b/packages/auth-foundation/src/utils/asPromise.ts deleted file mode 100644 index 6fb299b..0000000 --- a/packages/auth-foundation/src/utils/asPromise.ts +++ /dev/null @@ -1,7 +0,0 @@ - -export function eventAsPromise (target: EventTarget, event: string, shouldThrow: boolean = false) { - return new Promise((resolve, reject) => { - const fn = shouldThrow ? reject : resolve; - target.addEventListener(event, fn); - }); -} diff --git a/packages/auth-foundation/test/spec/utils/TaskBridge.spec.ts b/packages/auth-foundation/test/spec/utils/TaskBridge.spec.ts index a869dc0..22fc05d 100644 --- a/packages/auth-foundation/test/spec/utils/TaskBridge.spec.ts +++ b/packages/auth-foundation/test/spec/utils/TaskBridge.spec.ts @@ -1,6 +1,5 @@ -import { BroadcastChannelLike, JsonRecord } from 'src/types'; import { TaskBridge } from 'src/utils/TaskBridge.ts'; -import { eventAsPromise } from 'src/utils/asPromise.ts'; + type TestRequest = { ADD: { @@ -24,56 +23,7 @@ type TestResponse = { } }; -class TestChannel implements BroadcastChannelLike { - channel: BroadcastChannel; - #handler: BroadcastChannelLike['onmessage'] = null; - - constructor (public name: string) { - this.channel = new BroadcastChannel(name); - } - - get onmessage () { - return this.#handler; - } - - set onmessage (handler) { - if (handler === null) { - this.channel.onmessage = null; - this.#handler = null; - } - - console.log('handler set', handler); - - this.#handler = async (event) => { - console.log('got message', event.data); - // const reply = (response) => this.channel.postMessage(response); - // @ts-ignore - await handler(event.data); - }; - - this.channel.onmessage = this.#handler; - } - - postMessage(message: M): void { - console.log('postMessage called', message) - this.channel.postMessage(message); - } - - close () { - this.channel.close(); - } -} - class TestBus extends TaskBridge { - - // protected createBridgeChannel (): TaskBridge.BridgeChannel { - // return new TestChannel(this.name); - // } - - // protected createTaskChannel(name: string): TaskBridge.TaskChannel { - // return new TestChannel(name); - // } - protected createBridgeChannel (): TaskBridge.BridgeChannel { return new BroadcastChannel(this.name) as TaskBridge.BridgeChannel; } @@ -83,7 +33,9 @@ class TestBus extends TaskBridge { } } -const sleep = (ms) => new Promise(resolve => setTimeout(resolve, ms)); +const sleep = (ms: number) => new Promise(resolve => { + setTimeout(resolve, ms) +}); describe('TaskBridge', () => { let receiver: TaskBridge; @@ -103,218 +55,266 @@ describe('TaskBridge', () => { jest.clearAllTimers(); }); - describe('test', () => { - it('sends and receives messages between separate instances', async () => { - jest.useFakeTimers(); + it('sends and receives messages between separate instances', async () => { + jest.useFakeTimers(); - const response = { foo: '2', bar: '1' }; + const response = { foo: '2', bar: '1' }; - receiver.subscribe(async (message, reply) => { - reply(response); - }); - - const { result } = sender.send({ foo: 1, bar: 2 }); - await expect(result).resolves.toEqual(response); - expect(jest.getTimerCount()).toBe(0); + receiver.subscribe(async (message, reply) => { + reply(response); + }); + + const { result } = sender.send({ foo: 1, bar: 2 }); + await expect(result).resolves.toEqual(response); + expect(jest.getTimerCount()).toBe(0); - jest.useRealTimers(); + jest.useRealTimers(); + }); + + it('handles multiple tasks simultaneously', async () => { + jest.useFakeTimers(); + + const response = { foo: '2', bar: '1' }; + + receiver.subscribe(async (message, reply) => { + reply(response); }); - it('can handle aborting pending tasks', async () => { - jest.useFakeTimers(); + const promises = Promise.allSettled(Array.from({ length: 3 }, (_, i) => { + const { result } = sender.send({ foo: 1 + i, bar: 2 + i }); + return result; + })); - const abortListener = jest.fn(); - const handler = jest.fn().mockImplementation( async (message, reply, { signal }) => { - signal.addEventListener('abort', abortListener, { once: true }); + await expect(promises).resolves.toEqual([ + { status: 'fulfilled', value: { ...response} }, + { status: 'fulfilled', value: { ...response } }, + { status: 'fulfilled', value: { ...response } }, + ]); - await sleep(50); // sleep to delay responding to the message, so the abort fires first - reply({ foo: '1', bar: '2' }); - }); - receiver.subscribe(handler); + expect(jest.getTimerCount()).toBe(0); + + jest.useRealTimers(); + }); + + it('gracefully handles an error being thrown by the subscribe handler', async () => { + jest.useFakeTimers(); - const { result, abort } = sender.send({ foo: 1, bar: 2 }); + const handler = jest.fn().mockImplementation(async (message, reply) => { + throw new Error('test'); + }); + receiver.subscribe(handler); + + const { result } = sender.send({ foo: 1, bar: 2 }); + await expect(result).resolves.toEqual({ error: 'test' }); + expect(jest.getTimerCount()).toBe(0); + + jest.useRealTimers(); + }); + + it('can handle aborting pending tasks', async () => { + jest.useFakeTimers(); + + const abortListener = jest.fn(); + const handler = jest.fn().mockImplementation( async (message, reply, { signal }) => { + signal.addEventListener('abort', abortListener, { once: true }); + + await sleep(50); // sleep to delay responding to the message, so the abort fires first + reply({ foo: '1', bar: '2' }); + }); + receiver.subscribe(handler); - // flush microtasks to ensure subscribe abortHandler is set up - // await sleep(10); - await jest.advanceTimersByTimeAsync(10); + const { result, abort } = sender.send({ foo: 1, bar: 2 }); - abort(); + // flush microtasks to ensure subscribe abortHandler is set up + await jest.advanceTimersByTimeAsync(10); - await expect(result).rejects.toThrow(DOMException); - await expect(result).rejects.toThrow('Aborted'); + abort(); - // wait a bit more to ensure abort listener is called - // await sleep(100); - await jest.advanceTimersByTimeAsync(100); + await expect(result).rejects.toThrow(DOMException); + await expect(result).rejects.toThrow('Aborted'); - expect(handler).toHaveBeenCalled(); - expect(abortListener).toHaveBeenCalled(); - expect(jest.getTimerCount()).toBe(0); + // wait a bit more to ensure abort listener is called + await jest.advanceTimersByTimeAsync(100); - jest.useRealTimers(); + expect(handler).toHaveBeenCalled(); + expect(abortListener).toHaveBeenCalled(); + expect(jest.getTimerCount()).toBe(0); + + jest.useRealTimers(); + }); + + it('will not timeout a pending request when host is available', async () => { + jest.useFakeTimers(); + + const response = { foo: '2', bar: '1' }; + const largeDelay = 10000; + + // clever way of capturing the requestId + let requestId; + const bc = new BroadcastChannel('test'); + bc.onmessage = (evt => { + if (evt.data.requestId) { + requestId = evt.data.requestId; + } }); - it('will not timeout a pending request when host is available', async () => { - jest.useFakeTimers(); - - const response = { foo: '2', bar: '1' }; - const largeDelay = 10000; - - // clever way of capturing the requestId - let requestId; - const bc = new BroadcastChannel('test'); - bc.onmessage = (evt => { - console.log('[bc]', evt.data); - if (evt.data.requestId) { - requestId = evt.data.requestId; - } - }); - - receiver.subscribe(async (message, reply) => { - await sleep(largeDelay); // very long delay - reply(response); - }); - - const { result } = sender.send({ foo: 1, bar: 2 }); - // advance timers to send BroadcastChannel messages - await jest.advanceTimersByTimeAsync(100); - - // listen on "response channel" and count number of `PENDING` "pings" - let pendingCount = 0; - const channel = new BroadcastChannel(requestId); - channel.onmessage = (evt) => { - if (evt.data.status === 'PENDING') { - pendingCount++; - } - }; - - // advance the timers to the length of the delay, so response is finally returned - await jest.advanceTimersByTimeAsync(largeDelay); - - await expect(result).resolves.toEqual(response); - // expect a predictable number of 'PENDING' pings given the large delay - expect(pendingCount).toEqual(largeDelay / receiver.heartbeatInterval); - expect(jest.getTimerCount()).toBe(0); - - // cleanup - jest.useRealTimers(); - bc.close(); - channel.close(); + receiver.subscribe(async (message, reply) => { + await sleep(largeDelay); // very long delay + reply(response); }); - it('will timeout when host does not response within default timeout window', async () => { - expect.assertions(4); // ensures `result.catch()` is invoked - jest.useFakeTimers(); + const { result } = sender.send({ foo: 1, bar: 2 }); + // advance timers to send BroadcastChannel messages + await jest.advanceTimersByTimeAsync(100); + + // listen on "response channel" and count number of `PENDING` "pings" + let pendingCount = 0; + const channel = new BroadcastChannel(requestId); + channel.onmessage = (evt) => { + if (evt.data.status === 'PENDING') { + pendingCount++; + } + }; + + // advance the timers to the length of the delay, so response is finally returned + await jest.advanceTimersByTimeAsync(largeDelay); - receiver.close(); - - const { result } = sender.send({ foo: 1, bar: 2 }); + await expect(result).resolves.toEqual(response); + // expect a predictable number of 'PENDING' pings given the large delay + expect(pendingCount).toEqual(largeDelay / receiver.heartbeatInterval); + expect(jest.getTimerCount()).toBe(0); - // use `.catch` to bind listener synchronously - const promise = result.catch(err => { - expect(err).toBeInstanceOf(TaskBridge.TimeoutError); - }); + // cleanup + jest.useRealTimers(); + bc.close(); + channel.close(); + }); + + it('will timeout when host does not response within default timeout window', async () => { + expect.assertions(4); // ensures `result.catch()` is invoked + jest.useFakeTimers(); - await jest.advanceTimersByTimeAsync(10000); - await promise; + receiver.close(); - expect(jest.getTimerCount()).toBe(0); + const { result } = sender.send({ foo: 1, bar: 2 }); - jest.useRealTimers(); + // use `.catch` to bind listener synchronously + const promise = result.catch(err => { + expect(err).toBeInstanceOf(TaskBridge.TimeoutError); }); - it('will timeout when host does not response within user defined timeout window', async () => { - expect.assertions(4); // ensures `result.catch()` is invoked - jest.useFakeTimers(); + await jest.advanceTimersByTimeAsync(10000); + await promise; - const largeTimeout = 10000; + expect(jest.getTimerCount()).toBe(0); - receiver.close(); - - const { result } = sender.send({ foo: 1, bar: 2 }, { timeout: largeTimeout - 100 }); + jest.useRealTimers(); + }); + + it('will timeout when host does not response within user defined timeout window', async () => { + expect.assertions(4); // ensures `result.catch()` is invoked + jest.useFakeTimers(); - // use `.catch` to bind listener synchronously - const promise = result.catch(err => { - expect(err).toBeInstanceOf(TaskBridge.TimeoutError); - }); + const largeTimeout = 10000; - await jest.advanceTimersByTimeAsync(10000); - await promise; + receiver.close(); - expect(jest.getTimerCount()).toBe(0); + const { result } = sender.send({ foo: 1, bar: 2 }, { timeout: largeTimeout - 100 }); - jest.useRealTimers(); + // use `.catch` to bind listener synchronously + const promise = result.catch(err => { + expect(err).toBeInstanceOf(TaskBridge.TimeoutError); }); - it('will timeout when no host is avaiable', async () => { - expect.assertions(4); // ensures `result.catch()` is invoked - jest.useFakeTimers(); - - const timeout = 100; + await jest.advanceTimersByTimeAsync(10000); + await promise; - // NOTE: no `receiver.subscribe` call + expect(jest.getTimerCount()).toBe(0); - const { result } = sender.send({ foo: 1, bar: 2 }, { timeout }); + jest.useRealTimers(); + }); + + it('will timeout when no host is avaiable', async () => { + expect.assertions(4); // ensures `result.catch()` is invoked + jest.useFakeTimers(); - // use `.catch` to bind listener synchronously - const promise = result.catch(err => { - expect(err).toBeInstanceOf(TaskBridge.TimeoutError); - }); + const timeout = 100; - await jest.advanceTimersByTimeAsync(timeout); - await promise; + // NOTE: no `receiver.subscribe` call - expect(jest.getTimerCount()).toBe(0); + const { result } = sender.send({ foo: 1, bar: 2 }, { timeout }); - jest.useRealTimers(); + // use `.catch` to bind listener synchronously + const promise = result.catch(err => { + expect(err).toBeInstanceOf(TaskBridge.TimeoutError); }); - fit('will abort pending tasks when closed', async () => { - jest.useFakeTimers(); + await jest.advanceTimersByTimeAsync(timeout); + await promise; - const abortListener = jest.fn(); - const handler = jest.fn().mockImplementation(async (message, reply, { signal }) => { - signal.addEventListener('abort', abortListener); + expect(jest.getTimerCount()).toBe(0); - // sleep to delay responding to the message, so the abort fires first - // await sleep(sender.heartbeatInterval * 10); + jest.useRealTimers(); + }); + it('will abort pending tasks when closed', async () => { + jest.useFakeTimers(); + + const abortListener = jest.fn(); + const handler = jest.fn().mockImplementation(async (message, reply, { signal }) => { + // confirm the `signal` instance fires an `abort` event + signal.addEventListener('abort', abortListener); + + // returns Promise which rejects when event is fired + function rejectWhenFired (target: EventTarget, event: string) { + return new Promise((_, reject) => { + target.addEventListener(event, reject, { once: true }); + }); + } + + // track the timers set by `sleep()` within this test + let sleepTimeout; + function sleep (delay) { + return new Promise((resolve) => { + sleepTimeout = setTimeout(resolve, delay); + }); + } + + // sleep to delay responding to the message, so the abort fires first + try { await Promise.race([ - sleep(sender.heartbeatInterval * 10), - eventAsPromise(signal, 'abort', true), + sleep(sender.heartbeatInterval * 10), + rejectWhenFired(signal, 'abort'), ]); reply({ foo: '1', bar: '2' }); - }); - receiver.subscribe(handler); + } + finally { + // timeouts set via `sleep()` need to be cleared. Test requires no timers remain + clearTimeout(sleepTimeout); + } + }); + receiver.subscribe(handler); - const promises = Promise.allSettled(Array.from({ length: 3 }, (_, i) => { - const { result } = sender.send({ foo: 1 + i, bar: 2 + i }, { timeout: null }); - return result; - })); + const promises = Promise.allSettled(Array.from({ length: 3 }, (_, i) => { + const { result } = sender.send({ foo: 1 + i, bar: 2 + i }); + return result; + })); - // flush microtasks to ensure subscribe handler is set up - await jest.advanceTimersByTimeAsync(10); + // flush microtasks to ensure subscribe handler is set up + await jest.advanceTimersByTimeAsync(10); - expect(handler).toHaveBeenCalledTimes(3); - - receiver.close(); - const result = await promises; - await jest.advanceTimersByTimeAsync(10); + expect(handler).toHaveBeenCalledTimes(3); + + receiver.close(); + const result = await promises; + await jest.advanceTimersByTimeAsync(10); - expect(result).toEqual(Array(3).fill({ status: 'rejected', reason: expect.any(DOMException) })); - expect(abortListener).toHaveBeenCalledTimes(3); - expect(jest.getTimerCount()).toBe(0); + expect(result).toEqual(Array(3).fill({ status: 'rejected', reason: expect.any(DOMException) })); + expect(abortListener).toHaveBeenCalledTimes(3); + expect(jest.getTimerCount()).toBe(0); - jest.useRealTimers(); - }, 10000); + jest.useRealTimers(); }); - // xdescribe('', async () => { - - // }); - - // xdescribe('', async () => { - - // }); }); From 4bf86beb5b0d7b4a3112c1cfde8b2d90e82e77fd Mon Sep 17 00:00:00 2001 From: Jared Perreault Date: Mon, 9 Feb 2026 09:44:21 -0500 Subject: [PATCH 4/4] adds last test --- .../auth-foundation/src/utils/TaskBridge.ts | 2 +- .../test/spec/utils/TaskBridge.spec.ts | 37 ++++++++++++++++++- 2 files changed, 36 insertions(+), 3 deletions(-) diff --git a/packages/auth-foundation/src/utils/TaskBridge.ts b/packages/auth-foundation/src/utils/TaskBridge.ts index 4af4ebc..4755e29 100644 --- a/packages/auth-foundation/src/utils/TaskBridge.ts +++ b/packages/auth-foundation/src/utils/TaskBridge.ts @@ -419,5 +419,5 @@ export namespace TaskBridge { get timeout (): boolean { return this.#timeout; } - }; + } } diff --git a/packages/auth-foundation/test/spec/utils/TaskBridge.spec.ts b/packages/auth-foundation/test/spec/utils/TaskBridge.spec.ts index 22fc05d..a16d429 100644 --- a/packages/auth-foundation/test/spec/utils/TaskBridge.spec.ts +++ b/packages/auth-foundation/test/spec/utils/TaskBridge.spec.ts @@ -34,7 +34,7 @@ class TestBus extends TaskBridge { } const sleep = (ms: number) => new Promise(resolve => { - setTimeout(resolve, ms) + setTimeout(resolve, ms); }); describe('TaskBridge', () => { @@ -96,10 +96,43 @@ describe('TaskBridge', () => { jest.useRealTimers(); }); + it('handles a single task throwing gracefully', async () => { + jest.useFakeTimers(); + + const response = { foo: '2', bar: '1' }; + + let taskCount = 0; + receiver.subscribe(async (message, reply) => { + const isEven = taskCount === 0 || taskCount % 2 === 0; + taskCount += 1; + if (isEven) { + reply(response); + } + else { + throw new Error('test error'); + } + }); + + const promises = Promise.allSettled(Array.from({ length: 3 }, (_, i) => { + const { result } = sender.send({ foo: 1 + i, bar: 2 + i }); + return result; + })); + + await expect(promises).resolves.toEqual([ + { status: 'fulfilled', value: { ...response} }, + { status: 'fulfilled', value: { error: 'test error' } }, + { status: 'fulfilled', value: { ...response } }, + ]); + + expect(jest.getTimerCount()).toBe(0); + + jest.useRealTimers(); + }); + it('gracefully handles an error being thrown by the subscribe handler', async () => { jest.useFakeTimers(); - const handler = jest.fn().mockImplementation(async (message, reply) => { + const handler = jest.fn().mockImplementation(async () => { throw new Error('test'); }); receiver.subscribe(handler);