From 2ee14b484d00432145d4f9a6773fbd31f92921d7 Mon Sep 17 00:00:00 2001 From: JPeer264 Date: Fri, 7 Nov 2025 16:26:00 +0100 Subject: [PATCH 1/2] fix(core): add a PromiseBuffer for incoming events on the client --- packages/core/src/client.ts | 36 ++++++++++---- packages/core/test/lib/client.test.ts | 65 ++++++++++++++++++++++++- packages/core/test/mocks/integration.ts | 10 ++++ 3 files changed, 101 insertions(+), 10 deletions(-) diff --git a/packages/core/src/client.ts b/packages/core/src/client.ts index c3ff126732f8..6a74513e6426 100644 --- a/packages/core/src/client.ts +++ b/packages/core/src/client.ts @@ -11,6 +11,7 @@ import { _INTERNAL_flushMetricsBuffer } from './metrics/internal'; import type { Scope } from './scope'; import { updateSession } from './session'; import { getDynamicSamplingContextFromScope } from './tracing/dynamicSamplingContext'; +import { DEFAULT_TRANSPORT_BUFFER_SIZE } from './transports/base'; import type { Breadcrumb, BreadcrumbHint, FetchBreadcrumbHint, XhrBreadcrumbHint } from './types-hoist/breadcrumb'; import type { CheckIn, MonitorConfig } from './types-hoist/checkin'; import type { EventDropReason, Outcome } from './types-hoist/clientreport'; @@ -43,6 +44,7 @@ import { merge } from './utils/merge'; import { checkOrSetAlreadyCaught, uuid4 } from './utils/misc'; import { parseSampleRate } from './utils/parseSampleRate'; import { prepareEvent } from './utils/prepareEvent'; +import { type PromiseBuffer, makePromiseBuffer, SENTRY_BUFFER_FULL_ERROR } from './utils/promisebuffer'; import { reparentChildSpans, shouldIgnoreSpan } from './utils/should-ignore-span'; import { showSpanDropWarning } from './utils/spanUtils'; import { rejectedSyncPromise } from './utils/syncpromise'; @@ -194,6 +196,8 @@ export abstract class Client { // eslint-disable-next-line @typescript-eslint/ban-types private _hooks: Record>; + private _promiseBuffer: PromiseBuffer; + /** * Initializes this client instance. * @@ -206,6 +210,7 @@ export abstract class Client { this._outcomes = {}; this._hooks = {}; this._eventProcessors = []; + this._promiseBuffer = makePromiseBuffer(options.transportOptions?.bufferSize ?? DEFAULT_TRANSPORT_BUFFER_SIZE); if (options.dsn) { this._dsn = makeDsn(options.dsn); @@ -268,9 +273,11 @@ export abstract class Client { }; this._process( - this.eventFromException(exception, hintWithEventId).then(event => - this._captureEvent(event, hintWithEventId, scope), - ), + () => + this.eventFromException(exception, hintWithEventId) + .then(event => this._captureEvent(event, hintWithEventId, scope)) + .then(res => res), + 'error', ); return hintWithEventId.event_id; @@ -293,12 +300,15 @@ export abstract class Client { }; const eventMessage = isParameterizedString(message) ? message : String(message); - - const promisedEvent = isPrimitive(message) + const isMessage = isPrimitive(message); + const promisedEvent = isMessage ? this.eventFromMessage(eventMessage, level, hintWithEventId) : this.eventFromException(message, hintWithEventId); - this._process(promisedEvent.then(event => this._captureEvent(event, hintWithEventId, currentScope))); + this._process( + () => promisedEvent.then(event => this._captureEvent(event, hintWithEventId, currentScope)), + isMessage ? 'unknown' : 'error', + ); return hintWithEventId.event_id; } @@ -325,9 +335,11 @@ export abstract class Client { const sdkProcessingMetadata = event.sdkProcessingMetadata || {}; const capturedSpanScope: Scope | undefined = sdkProcessingMetadata.capturedSpanScope; const capturedSpanIsolationScope: Scope | undefined = sdkProcessingMetadata.capturedSpanIsolationScope; + const dataCategory = event.type === 'replay_event' ? 'replay' : (event.type ?? 'unknown'); this._process( - this._captureEvent(event, hintWithEventId, capturedSpanScope || currentScope, capturedSpanIsolationScope), + () => this._captureEvent(event, hintWithEventId, capturedSpanScope || currentScope, capturedSpanIsolationScope), + dataCategory, ); return hintWithEventId.event_id; @@ -1312,15 +1324,21 @@ export abstract class Client { /** * Occupies the client with processing and event */ - protected _process(promise: PromiseLike): void { + protected _process(taskProducer: () => PromiseLike, dataCategory: DataCategory): void { this._numProcessing++; - void promise.then( + + void this._promiseBuffer.add(taskProducer).then( value => { this._numProcessing--; return value; }, reason => { this._numProcessing--; + + if (reason === SENTRY_BUFFER_FULL_ERROR) { + this.recordDroppedEvent('queue_overflow', dataCategory); + } + return reason; }, ); diff --git a/packages/core/test/lib/client.test.ts b/packages/core/test/lib/client.test.ts index db25793ccf7b..39b4d34678bb 100644 --- a/packages/core/test/lib/client.test.ts +++ b/packages/core/test/lib/client.test.ts @@ -14,6 +14,7 @@ import { import * as integrationModule from '../../src/integration'; import { _INTERNAL_captureLog } from '../../src/logs/internal'; import { _INTERNAL_captureMetric } from '../../src/metrics/internal'; +import { DEFAULT_TRANSPORT_BUFFER_SIZE } from '../../src/transports/base'; import type { Envelope } from '../../src/types-hoist/envelope'; import type { ErrorEvent, Event, TransactionEvent } from '../../src/types-hoist/event'; import type { SpanJSON } from '../../src/types-hoist/span'; @@ -22,7 +23,7 @@ import * as miscModule from '../../src/utils/misc'; import * as stringModule from '../../src/utils/string'; import * as timeModule from '../../src/utils/time'; import { getDefaultTestClientOptions, TestClient } from '../mocks/client'; -import { AdHocIntegration, TestIntegration } from '../mocks/integration'; +import { AdHocIntegration, AsyncTestIntegration, TestIntegration } from '../mocks/integration'; import { makeFakeTransport } from '../mocks/transport'; import { clearGlobalScope } from '../testutils'; @@ -2806,4 +2807,66 @@ describe('Client', () => { expect(sendEnvelopeSpy).toHaveBeenCalledTimes(1); }); }); + + describe('promise buffer usage', () => { + it('respects the default value of the buffer size', async () => { + const options = getDefaultTestClientOptions({ dsn: PUBLIC_DSN }); + const client = new TestClient(options); + + client.addIntegration(new AsyncTestIntegration()); + + Array.from({ length: DEFAULT_TRANSPORT_BUFFER_SIZE + 1 }).forEach(() => { + client.captureException(new Error('ʕノ•ᴥ•ʔノ ︵ ┻━┻')); + }); + + expect(client._clearOutcomes()).toEqual([{ reason: 'queue_overflow', category: 'error', quantity: 1 }]); + }); + + it('records queue_overflow when promise buffer is full', async () => { + const options = getDefaultTestClientOptions({ dsn: PUBLIC_DSN, transportOptions: { bufferSize: 1 } }); + const client = new TestClient(options); + + client.addIntegration(new AsyncTestIntegration()); + + client.captureException(new Error('first')); + client.captureException(new Error('second')); + client.captureException(new Error('third')); + + expect(client._clearOutcomes()).toEqual([{ reason: 'queue_overflow', category: 'error', quantity: 2 }]); + }); + + it('records different types of dropped events', async () => { + const options = getDefaultTestClientOptions({ dsn: PUBLIC_DSN, transportOptions: { bufferSize: 1 } }); + const client = new TestClient(options); + + client.addIntegration(new AsyncTestIntegration()); + + client.captureException(new Error('first')); // error + client.captureException(new Error('second')); // error + client.captureMessage('third'); // unknown + client.captureEvent({ message: 'fourth' }); // unknown + client.captureEvent({ message: 'fifth', type: 'replay_event' }); // replay + client.captureEvent({ message: 'sixth', type: 'transaction' }); // transaction + + expect(client._clearOutcomes()).toEqual([ + { reason: 'queue_overflow', category: 'error', quantity: 1 }, + { reason: 'queue_overflow', category: 'unknown', quantity: 2 }, + { reason: 'queue_overflow', category: 'replay', quantity: 1 }, + { reason: 'queue_overflow', category: 'transaction', quantity: 1 }, + ]); + }); + + it('should skip the promise buffer with sync integrations', async () => { + const options = getDefaultTestClientOptions({ dsn: PUBLIC_DSN, transportOptions: { bufferSize: 1 } }); + const client = new TestClient(options); + + client.addIntegration(new TestIntegration()); + + client.captureException(new Error('first')); + client.captureException(new Error('second')); + client.captureException(new Error('third')); + + expect(client._clearOutcomes()).toEqual([]); + }); + }); }); diff --git a/packages/core/test/mocks/integration.ts b/packages/core/test/mocks/integration.ts index 72a18dabe7b3..f5fc5682265a 100644 --- a/packages/core/test/mocks/integration.ts +++ b/packages/core/test/mocks/integration.ts @@ -24,6 +24,16 @@ export class TestIntegration implements Integration { } } +export class AsyncTestIntegration implements Integration { + public static id: string = 'AsyncTestIntegration'; + + public name: string = 'AsyncTestIntegration'; + + processEvent(event: Event): Event | null | PromiseLike { + return new Promise(resolve => setTimeout(() => resolve(event), 1)); + } +} + export class AddAttachmentTestIntegration implements Integration { public static id: string = 'AddAttachmentTestIntegration'; From 7381a49ac34964d637f56625b2bf48617820b29d Mon Sep 17 00:00:00 2001 From: JPeer264 Date: Fri, 7 Nov 2025 17:02:19 +0100 Subject: [PATCH 2/2] fixup! fix(core): add a PromiseBuffer for incoming events on the client --- packages/core/src/client.ts | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/packages/core/src/client.ts b/packages/core/src/client.ts index 6a74513e6426..df7444f04231 100644 --- a/packages/core/src/client.ts +++ b/packages/core/src/client.ts @@ -18,7 +18,7 @@ import type { EventDropReason, Outcome } from './types-hoist/clientreport'; import type { DataCategory } from './types-hoist/datacategory'; import type { DsnComponents } from './types-hoist/dsn'; import type { DynamicSamplingContext, Envelope } from './types-hoist/envelope'; -import type { ErrorEvent, Event, EventHint, TransactionEvent } from './types-hoist/event'; +import type { ErrorEvent, Event, EventHint, EventType, TransactionEvent } from './types-hoist/event'; import type { EventProcessor } from './types-hoist/eventprocessor'; import type { FeedbackEvent } from './types-hoist/feedback'; import type { Integration } from './types-hoist/integration'; @@ -335,7 +335,7 @@ export abstract class Client { const sdkProcessingMetadata = event.sdkProcessingMetadata || {}; const capturedSpanScope: Scope | undefined = sdkProcessingMetadata.capturedSpanScope; const capturedSpanIsolationScope: Scope | undefined = sdkProcessingMetadata.capturedSpanIsolationScope; - const dataCategory = event.type === 'replay_event' ? 'replay' : (event.type ?? 'unknown'); + const dataCategory = getDataCategoryByType(event.type); this._process( () => this._captureEvent(event, hintWithEventId, capturedSpanScope || currentScope, capturedSpanIsolationScope), @@ -1241,7 +1241,7 @@ export abstract class Client { ); } - const dataCategory = (eventType === 'replay_event' ? 'replay' : eventType) satisfies DataCategory; + const dataCategory = getDataCategoryByType(event.type); return this._prepareEvent(event, hint, currentScope, isolationScope) .then(prepared => { @@ -1403,6 +1403,10 @@ export abstract class Client { ): PromiseLike; } +function getDataCategoryByType(type: EventType | 'replay_event' | undefined): DataCategory { + return type === 'replay_event' ? 'replay' : type || 'error'; +} + /** * Verifies that return value of configured `beforeSend` or `beforeSendTransaction` is of expected type, and returns the value if so. */