Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
113 changes: 86 additions & 27 deletions lib/internal/abort_controller.js
Original file line number Diff line number Diff line change
Expand Up @@ -85,17 +85,16 @@ function lazyMessageChannel() {
}

const clearTimeoutRegistry = new SafeFinalizationRegistry(clearTimeout);
const dependantSignalsCleanupRegistry = new SafeFinalizationRegistry((signalWeakRef) => {
const signal = signalWeakRef.deref();
if (signal === undefined) {
return;
}
signal[kDependantSignals].forEach((ref) => {
if (ref.deref() === undefined) {
signal[kDependantSignals].delete(ref);
const dependantSignalsCleanupRegistry = new SafeFinalizationRegistry(
({ sourceSignalRef, dependantSignalRef, sourceSignalsCleanupToken }) => {
sourceSignalsCleanupRegistry.unregister(sourceSignalsCleanupToken);

const sourceSignal = sourceSignalRef.deref();
if (sourceSignal === undefined) {
return;
}
sourceSignal[kDependantSignals].delete(dependantSignalRef);
});
});

const gcPersistentSignals = new SafeSet();

Expand All @@ -117,6 +116,8 @@ const kCloneData = Symbol('kCloneData');
const kTimeout = Symbol('kTimeout');
const kMakeTransferable = Symbol('kMakeTransferable');
const kComposite = Symbol('kComposite');
const kFollowing = Symbol('kFollowing');
const kResultSignalWeakRef = Symbol('kResultSignalWeakRef');
const kSourceSignals = Symbol('kSourceSignals');
const kDependantSignals = Symbol('kDependantSignals');

Expand All @@ -136,6 +137,60 @@ function validateThisAbortSignal(obj) {
throw new ERR_INVALID_THIS('AbortSignal');
}

function refreshCompositeSignal(signal) {
if (!signal[kComposite] || signal[kAborted] || !signal[kSourceSignals]?.size) {
return;
}

for (const sourceSignalWeakRef of signal[kSourceSignals]) {
const sourceSignal = sourceSignalWeakRef.deref();
if (sourceSignal === undefined) {
signal[kSourceSignals].delete(sourceSignalWeakRef);
continue;
}

if (sourceSignal.aborted) {
abortSignal(signal, sourceSignal.reason);
return;
}
}
}

function followCompositeSignal(signal) {
if (signal[kFollowing] || signal[kAborted] || !signal[kSourceSignals]?.size) {
return;
}

const resultSignalWeakRef = signal[kResultSignalWeakRef] ??= new SafeWeakRef(signal);

for (const sourceSignalWeakRef of signal[kSourceSignals]) {
const sourceSignal = sourceSignalWeakRef.deref();
if (sourceSignal === undefined) {
signal[kSourceSignals].delete(sourceSignalWeakRef);
continue;
}

if (sourceSignal.aborted) {
abortSignal(signal, sourceSignal.reason);
return;
}

sourceSignal[kDependantSignals] ??= new SafeSet();
sourceSignal[kDependantSignals].add(resultSignalWeakRef);
dependantSignalsCleanupRegistry.register(signal, {
sourceSignalRef: sourceSignalWeakRef,
dependantSignalRef: resultSignalWeakRef,
sourceSignalsCleanupToken: sourceSignalWeakRef,
});
sourceSignalsCleanupRegistry.register(sourceSignal, {
sourceSignalRef: sourceSignalWeakRef,
composedSignalRef: resultSignalWeakRef,
}, sourceSignalWeakRef);
}

signal[kFollowing] = true;
}

// Because the AbortSignal timeout cannot be canceled, we don't want the
// presence of the timer alone to keep the AbortSignal from being garbage
// collected if it otherwise no longer accessible. We also don't want the
Expand All @@ -148,6 +203,7 @@ function setWeakAbortSignalTimeout(weakRef, delay) {
const timeout = setTimeout(() => {
const signal = weakRef.deref();
if (signal !== undefined) {
clearTimeoutRegistry.unregister(signal);
gcPersistentSignals.delete(signal);
abortSignal(
signal,
Expand Down Expand Up @@ -198,6 +254,7 @@ class AbortSignal extends EventTarget {
*/
get aborted() {
validateThisAbortSignal(this);
refreshCompositeSignal(this);
return !!this[kAborted];
}

Expand All @@ -206,11 +263,13 @@ class AbortSignal extends EventTarget {
*/
get reason() {
validateThisAbortSignal(this);
refreshCompositeSignal(this);
return this[kReason];
}

throwIfAborted() {
validateThisAbortSignal(this);
refreshCompositeSignal(this);
if (this[kAborted]) {
throw this[kReason];
}
Expand Down Expand Up @@ -241,7 +300,8 @@ class AbortSignal extends EventTarget {
signal[kTimeout] = true;
clearTimeoutRegistry.register(
signal,
setWeakAbortSignalTimeout(new SafeWeakRef(signal), delay));
setWeakAbortSignalTimeout(new SafeWeakRef(signal), delay),
signal);
return signal;
}

Expand All @@ -260,7 +320,6 @@ class AbortSignal extends EventTarget {
return resultSignal;
}

const resultSignalWeakRef = new SafeWeakRef(resultSignal);
resultSignal[kSourceSignals] = new SafeSet();

// Track if we have any timeout signals
Expand All @@ -283,51 +342,51 @@ class AbortSignal extends EventTarget {
return resultSignal;
}

signal[kDependantSignals] ??= new SafeSet();
if (!signal[kComposite]) {
const signalWeakRef = new SafeWeakRef(signal);
resultSignal[kSourceSignals].add(signalWeakRef);
signal[kDependantSignals].add(resultSignalWeakRef);
dependantSignalsCleanupRegistry.register(resultSignal, signalWeakRef);
sourceSignalsCleanupRegistry.register(signal, {
sourceSignalRef: signalWeakRef,
composedSignalRef: resultSignalWeakRef,
});
} else if (!signal[kSourceSignals]) {
continue;
} else {
refreshCompositeSignal(signal);
if (signal.aborted) {
abortSignal(resultSignal, signal.reason);
return resultSignal;
}
for (const sourceSignalWeakRef of signal[kSourceSignals]) {
const sourceSignal = sourceSignalWeakRef.deref();
if (!sourceSignal) {
continue;
}
assert(!sourceSignal.aborted);
assert(!sourceSignal[kComposite]);

if (sourceSignal.aborted) {
abortSignal(resultSignal, sourceSignal.reason);
return resultSignal;
}

if (resultSignal[kSourceSignals].has(sourceSignalWeakRef)) {
continue;
}
resultSignal[kSourceSignals].add(sourceSignalWeakRef);
sourceSignal[kDependantSignals].add(resultSignalWeakRef);
dependantSignalsCleanupRegistry.register(resultSignal, sourceSignalWeakRef);
sourceSignalsCleanupRegistry.register(signal, {
sourceSignalRef: sourceSignalWeakRef,
composedSignalRef: resultSignalWeakRef,
});
}
}
}

// If we have any timeout signals, add the composite signal to gcPersistentSignals
if (hasTimeoutSignals && resultSignal[kSourceSignals].size > 0) {
gcPersistentSignals.add(resultSignal);
resultSignal[kTimeout] = true;
}

return resultSignal;
}

[kNewListener](size, type, listener, once, capture, passive, weak) {
super[kNewListener](size, type, listener, once, capture, passive, weak);

if (this[kComposite] && type === 'abort' && !this.aborted && size === 1) {
followCompositeSignal(this);
}

const isTimeoutOrNonEmptyCompositeSignal = this[kTimeout] || (this[kComposite] && this[kSourceSignals]?.size);
if (isTimeoutOrNonEmptyCompositeSignal &&
type === 'abort' &&
Expand Down
31 changes: 30 additions & 1 deletion test/parallel/test-abortsignal-drop-settled-signals.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@ function makeSubsequentCalls(limit, done, holdReferences = false) {
}

if (holdReferences) {
retainedSignals.push(AbortSignal.any([ac.signal]));
const signal = AbortSignal.any([ac.signal]);
signal.addEventListener('abort', handler);
retainedSignals.push(signal);
} else {
// Using a WeakRef to avoid retaining information that will interfere with the test
signalRef = new WeakRef(AbortSignal.any([ac.signal]));
Expand Down Expand Up @@ -119,6 +121,27 @@ describe('when there is a long-lived signal', () => {
done();
}, true);
});

it('does not keep retained dependent signals without listeners', (t, done) => {
const ac = new AbortController();
const retainedSignals = [];
const kDependantSignals = Object.getOwnPropertySymbols(ac.signal).find(
(s) => s.toString() === 'Symbol(kDependantSignals)'
);

function run(iteration) {
if (iteration > limit) {
t.assert.strictEqual(ac.signal[kDependantSignals]?.size ?? 0, 0);
done();
return;
}

retainedSignals.push(AbortSignal.any([ac.signal]));
setImmediate(() => run(iteration + 1));
}

run(1);
});
});

it('does not prevent source signal from being GCed if it is short-lived', (t, done) => {
Expand All @@ -134,10 +157,13 @@ it('does not prevent source signal from being GCed if it is short-lived', (t, do

it('drops settled dependent signals when signal is composite', (t, done) => {
const controllers = Array.from({ length: 2 }, () => new AbortController());
const handler = () => {};

// Using WeakRefs to avoid this test to retain information that will make the test fail
const composedSignal1 = new WeakRef(AbortSignal.any([controllers[0].signal]));
const composedSignalRef = new WeakRef(AbortSignal.any([composedSignal1.deref(), controllers[1].signal]));
composedSignal1.deref().addEventListener('abort', handler);
composedSignalRef.deref().addEventListener('abort', handler);

const kDependantSignals = Object.getOwnPropertySymbols(controllers[0].signal).find(
(s) => s.toString() === 'Symbol(kDependantSignals)'
Expand All @@ -147,6 +173,9 @@ it('drops settled dependent signals when signal is composite', (t, done) => {
t.assert.strictEqual(controllers[1].signal[kDependantSignals].size, 1);

setImmediate(mustCall(() => {
composedSignal1.deref()?.removeEventListener('abort', handler);
composedSignalRef.deref()?.removeEventListener('abort', handler);

globalThis.gc({ execution: 'async' }).then(async () => {
await gcUntil('all signals are GCed', () => {
const totalDependantSignals = Math.max(
Expand Down
Loading