Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
96 changes: 76 additions & 20 deletions packages/auth-foundation/src/utils/TaskBridge.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import type { BroadcastChannelLike } from '../types/index.ts';
import { shortID } from '../crypto/index.ts';
import { AuthSdkError } from '../errors/AuthSdkError.ts';

/** @useDeclaredType */
type TypeMap = Record<string, any>;
Expand Down Expand Up @@ -86,8 +87,9 @@ export abstract class TaskBridge<M extends TypeMap, R extends TypeMap> {
});
this.#pending.set(request.id, request);

let abortHandler: () => void;
const result = (new Promise<R[K]>((resolve, reject) => {
const setTimeoutTimer = () => {
const resetTimeoutTimer = () => {
// `options.timeout` set to `null` disables the timeout mechanism
if (options.timeout === null) {
return;
Expand All @@ -97,20 +99,21 @@ export abstract class TaskBridge<M extends TypeMap, R extends TypeMap> {
clearTimeout(timeoutId);
}
// TODO: error type
timeoutId = setTimeout(() => reject(new Error('timeout')), options.timeout ?? 5000);
timeoutId = setTimeout(() => reject(
new TaskBridge.TimeoutError('timeout')
), options.timeout ?? 5000);
};
// sets timeout timer
setTimeoutTimer();
resetTimeoutTimer();

// forces the pending promise to reject, so resources clean up if the request is aborted
request.signal.addEventListener('abort', () => {
reject(new DOMException('Aborted', 'AbortError'));
});
abortHandler = () => reject(new DOMException('Aborted', 'AbortError'));
request.signal.addEventListener('abort', abortHandler);

// This channel is meant for the Receiver to send the results (aka `HandlerMessage<M>` messages)
// ignore all Requestor events received (aka `RequestorMessage`)
responseChannel.onmessage = (event) => {
if ('action' in event.data) {
if (request.signal.aborted || 'action' in event.data) {
return; // ignore message
}

Expand All @@ -126,8 +129,11 @@ export abstract class TaskBridge<M extends TypeMap, R extends TypeMap> {

case 'PENDING':
// defer the timeout timer when a heartbeat is received (host is still working)
setTimeoutTimer();
resetTimeoutTimer();

break;
case 'ABORTED':
request.abort('Host Aborted');
break;
}
};
Expand All @@ -141,15 +147,16 @@ export abstract class TaskBridge<M extends TypeMap, R extends TypeMap> {
}
requestChannel.close();
responseChannel.close();
request.signal.removeEventListener('abort', abortHandler);
this.#pending.delete(request.id);
});

// TODO: review
const cancel = () => {
const abort = () => {
responseChannel.postMessage({ action: 'CANCEL', __v: TaskBridge.BridgeVersion });
request.controller.abort('cancel');
};

return { result, cancel };
return { result, abort };
}

subscribe<K extends keyof M & keyof R>(handler: TaskBridge.TaskHandler<M, R>) {
Expand Down Expand Up @@ -182,8 +189,6 @@ export abstract class TaskBridge<M extends TypeMap, R extends TypeMap> {
// event type is now `RequestorMessage`
switch (event.data.action) {
case 'CANCEL':
// TODO: probably don't need to reply, just cancel action, if possible
// responseChannel.postMessage({ status: 'CANCELED' });
message.abort('cancel');
break;
}
Expand All @@ -198,8 +203,19 @@ export abstract class TaskBridge<M extends TypeMap, R extends TypeMap> {
);
}
catch (err) {
if (err instanceof DOMException && err.name === 'AbortError') {
return null;
if (err instanceof DOMException) {
if (err.name === 'AbortError') {
// task was aborted, do nothing
return null;
}

if (err.name === 'InvalidStateError') {
// this is error is thrown if a `.postMessage` is attempted after the channel is closed
// this can happen when the `handler` function attempts to `reply()` after `.close()`
// is called. Ignore the error, the `AbortSignal` is provided to the `handler` for
// if needed
return null;
}
}

if (err instanceof Error) {
Expand All @@ -213,11 +229,24 @@ export abstract class TaskBridge<M extends TypeMap, R extends TypeMap> {
};
}

/**
* Returns the number of pending tasks
*/
get pending (): number {
return this.#pending.size;
}

close () {
this.#channel?.close();
for (const message of this.#pending.values()) {
message.abort();
message.channel.close();
this.clearMessage(message.id);
}
this.#pending.clear();
if (this.#heartbeatInt !== null) {
clearInterval(this.#heartbeatInt);
this.#heartbeatInt = null;
}
}
}
Expand All @@ -229,7 +258,7 @@ export namespace TaskBridge {
/**
* Possible `status` values indicating the process of an orchestrated request
*/
export type TaskStatus = 'PENDING' | 'SUCCESS' | 'FAILED';
export type TaskStatus = 'PENDING' | 'SUCCESS' | 'FAILED' | 'ABORTED';

export type BridgeVersions = 1 | 2;

Expand All @@ -247,6 +276,9 @@ export namespace TaskBridge {
} | {
status: 'PENDING'
__v: BridgeVersions;
} | {
status: 'ABORTED'
__v: BridgeVersions;
}

/**
Expand Down Expand Up @@ -309,16 +341,22 @@ export namespace TaskBridge {
}

reply (data: S, status: TaskBridge.TaskStatus): void;
reply (status: 'PENDING'): void;
reply (data: S | 'PENDING', status: TaskBridge.TaskStatus = 'SUCCESS') {
const fn = this.replyFn ?? this.channel.postMessage;
reply (status: 'PENDING' | 'ABORTED'): void;
reply (data: S | 'PENDING' | 'ABORTED', status: TaskBridge.TaskStatus = 'SUCCESS') {
const fn = this.replyFn ?? this.channel.postMessage.bind(this.channel);

if (data === 'PENDING' || status === 'PENDING') {
// only send `PENDING` heartbeats when using <= v2 of the TaskBridge payload structure
if (this.__v === 2) {
fn({ status: 'PENDING', __v: this.__v } satisfies HandlerMessage<S>);
}
}
else if (data === 'ABORTED' || status === 'ABORTED') {
// only send `PENDING` heartbeats when using <= v2 of the TaskBridge payload structure
if (this.__v === 2) {
fn({ status: 'ABORTED', __v: this.__v } satisfies HandlerMessage<S>);
}
}
else {
// TODO: remove this condition - OKTA-1053515
if (this.__v < 2) {
Expand All @@ -332,6 +370,7 @@ export namespace TaskBridge {
}

abort (...args: Parameters<AbortController['abort']>) {
this.reply('ABORTED');
return this.controller.abort(...args);
}

Expand All @@ -345,14 +384,15 @@ export namespace TaskBridge {
*/
export type TaskOptions = {
timeout?: number | null;
signal?: AbortSignal;
};

/**
* @internal
*/
export type TaskResponse<R extends TypeMap> = {
result: Promise<R>;
cancel: () => void;
abort: () => void;
};

/**
Expand All @@ -364,4 +404,20 @@ export namespace TaskBridge {
options?: { signal: AbortSignal }
) => any;

/**
* @group Errors
*/
export class TimeoutError extends AuthSdkError {
#timeout: boolean = false;

constructor (...args: ConstructorParameters<typeof AuthSdkError>) {
const [message, ...rest] = args;
super(message ?? 'timeout', ...rest);
this.#timeout = true;
}

get timeout (): boolean {
return this.#timeout;
}
}
}
Loading