Skip to content

Commit 1695dd4

Browse files
committed
fix(core): add a PromiseBuffer for incoming events on the client
1 parent d4a2b2b commit 1695dd4

File tree

3 files changed

+101
-10
lines changed

3 files changed

+101
-10
lines changed

packages/core/src/client.ts

Lines changed: 27 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import { _INTERNAL_flushMetricsBuffer } from './metrics/internal';
1111
import type { Scope } from './scope';
1212
import { updateSession } from './session';
1313
import { getDynamicSamplingContextFromScope } from './tracing/dynamicSamplingContext';
14+
import { DEFAULT_TRANSPORT_BUFFER_SIZE } from './transports/base';
1415
import type { Breadcrumb, BreadcrumbHint, FetchBreadcrumbHint, XhrBreadcrumbHint } from './types-hoist/breadcrumb';
1516
import type { CheckIn, MonitorConfig } from './types-hoist/checkin';
1617
import type { EventDropReason, Outcome } from './types-hoist/clientreport';
@@ -43,6 +44,7 @@ import { merge } from './utils/merge';
4344
import { checkOrSetAlreadyCaught, uuid4 } from './utils/misc';
4445
import { parseSampleRate } from './utils/parseSampleRate';
4546
import { prepareEvent } from './utils/prepareEvent';
47+
import { type PromiseBuffer, makePromiseBuffer, SENTRY_BUFFER_FULL_ERROR } from './utils/promisebuffer';
4648
import { reparentChildSpans, shouldIgnoreSpan } from './utils/should-ignore-span';
4749
import { showSpanDropWarning } from './utils/spanUtils';
4850
import { rejectedSyncPromise } from './utils/syncpromise';
@@ -194,6 +196,8 @@ export abstract class Client<O extends ClientOptions = ClientOptions> {
194196
// eslint-disable-next-line @typescript-eslint/ban-types
195197
private _hooks: Record<string, Set<Function>>;
196198

199+
private _promiseBuffer: PromiseBuffer<unknown>;
200+
197201
/**
198202
* Initializes this client instance.
199203
*
@@ -206,6 +210,7 @@ export abstract class Client<O extends ClientOptions = ClientOptions> {
206210
this._outcomes = {};
207211
this._hooks = {};
208212
this._eventProcessors = [];
213+
this._promiseBuffer = makePromiseBuffer(options.transportOptions?.bufferSize ?? DEFAULT_TRANSPORT_BUFFER_SIZE);
209214

210215
if (options.dsn) {
211216
this._dsn = makeDsn(options.dsn);
@@ -268,9 +273,11 @@ export abstract class Client<O extends ClientOptions = ClientOptions> {
268273
};
269274

270275
this._process(
271-
this.eventFromException(exception, hintWithEventId).then(event =>
272-
this._captureEvent(event, hintWithEventId, scope),
273-
),
276+
() =>
277+
this.eventFromException(exception, hintWithEventId)
278+
.then(event => this._captureEvent(event, hintWithEventId, scope))
279+
.then(res => res),
280+
'error',
274281
);
275282

276283
return hintWithEventId.event_id;
@@ -293,12 +300,15 @@ export abstract class Client<O extends ClientOptions = ClientOptions> {
293300
};
294301

295302
const eventMessage = isParameterizedString(message) ? message : String(message);
296-
297-
const promisedEvent = isPrimitive(message)
303+
const isMessage = isPrimitive(message);
304+
const promisedEvent = isMessage
298305
? this.eventFromMessage(eventMessage, level, hintWithEventId)
299306
: this.eventFromException(message, hintWithEventId);
300307

301-
this._process(promisedEvent.then(event => this._captureEvent(event, hintWithEventId, currentScope)));
308+
this._process(
309+
() => promisedEvent.then(event => this._captureEvent(event, hintWithEventId, currentScope)),
310+
isMessage ? 'unknown' : 'error',
311+
);
302312

303313
return hintWithEventId.event_id;
304314
}
@@ -325,9 +335,11 @@ export abstract class Client<O extends ClientOptions = ClientOptions> {
325335
const sdkProcessingMetadata = event.sdkProcessingMetadata || {};
326336
const capturedSpanScope: Scope | undefined = sdkProcessingMetadata.capturedSpanScope;
327337
const capturedSpanIsolationScope: Scope | undefined = sdkProcessingMetadata.capturedSpanIsolationScope;
338+
const dataCategory = event.type === 'replay_event' ? 'replay' : (event.type ?? 'unknown');
328339

329340
this._process(
330-
this._captureEvent(event, hintWithEventId, capturedSpanScope || currentScope, capturedSpanIsolationScope),
341+
() => this._captureEvent(event, hintWithEventId, capturedSpanScope || currentScope, capturedSpanIsolationScope),
342+
dataCategory,
331343
);
332344

333345
return hintWithEventId.event_id;
@@ -1312,15 +1324,21 @@ export abstract class Client<O extends ClientOptions = ClientOptions> {
13121324
/**
13131325
* Occupies the client with processing and event
13141326
*/
1315-
protected _process<T>(promise: PromiseLike<T>): void {
1327+
protected _process<T>(taskProducer: () => PromiseLike<T>, dataCategory: DataCategory): void {
13161328
this._numProcessing++;
1317-
void promise.then(
1329+
1330+
void this._promiseBuffer.add(taskProducer).then(
13181331
value => {
13191332
this._numProcessing--;
13201333
return value;
13211334
},
13221335
reason => {
13231336
this._numProcessing--;
1337+
1338+
if (reason === SENTRY_BUFFER_FULL_ERROR) {
1339+
this.recordDroppedEvent('queue_overflow', dataCategory);
1340+
}
1341+
13241342
return reason;
13251343
},
13261344
);

packages/core/test/lib/client.test.ts

Lines changed: 64 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,10 @@ import * as miscModule from '../../src/utils/misc';
2222
import * as stringModule from '../../src/utils/string';
2323
import * as timeModule from '../../src/utils/time';
2424
import { getDefaultTestClientOptions, TestClient } from '../mocks/client';
25-
import { AdHocIntegration, TestIntegration } from '../mocks/integration';
25+
import { AdHocIntegration, AsyncTestIntegration, TestIntegration } from '../mocks/integration';
2626
import { makeFakeTransport } from '../mocks/transport';
2727
import { clearGlobalScope } from '../testutils';
28+
import { DEFAULT_TRANSPORT_BUFFER_SIZE } from '../../src/transports/base';
2829

2930
const PUBLIC_DSN = 'https://username@domain/123';
3031
// eslint-disable-next-line no-var
@@ -2806,4 +2807,66 @@ describe('Client', () => {
28062807
expect(sendEnvelopeSpy).toHaveBeenCalledTimes(1);
28072808
});
28082809
});
2810+
2811+
describe('promise buffer usage', () => {
2812+
it('respects the default value of the buffer size', async () => {
2813+
const options = getDefaultTestClientOptions({ dsn: PUBLIC_DSN });
2814+
const client = new TestClient(options);
2815+
2816+
client.addIntegration(new AsyncTestIntegration());
2817+
2818+
Array.from({ length: DEFAULT_TRANSPORT_BUFFER_SIZE + 1 }).forEach(() => {
2819+
client.captureException(new Error('ʕノ•ᴥ•ʔノ ︵ ┻━┻'));
2820+
});
2821+
2822+
expect(client._clearOutcomes()).toEqual([{ reason: 'queue_overflow', category: 'error', quantity: 1 }]);
2823+
});
2824+
2825+
it('records queue_overflow when promise buffer is full', async () => {
2826+
const options = getDefaultTestClientOptions({ dsn: PUBLIC_DSN, transportOptions: { bufferSize: 1 } });
2827+
const client = new TestClient(options);
2828+
2829+
client.addIntegration(new AsyncTestIntegration());
2830+
2831+
client.captureException(new Error('first'));
2832+
client.captureException(new Error('second'));
2833+
client.captureException(new Error('third'));
2834+
2835+
expect(client._clearOutcomes()).toEqual([{ reason: 'queue_overflow', category: 'error', quantity: 2 }]);
2836+
});
2837+
2838+
it('records different types of dropped events', async () => {
2839+
const options = getDefaultTestClientOptions({ dsn: PUBLIC_DSN, transportOptions: { bufferSize: 1 } });
2840+
const client = new TestClient(options);
2841+
2842+
client.addIntegration(new AsyncTestIntegration());
2843+
2844+
client.captureException(new Error('first')); // error
2845+
client.captureException(new Error('second')); // error
2846+
client.captureMessage('third'); // unknown
2847+
client.captureEvent({ message: 'fourth' }); // unknown
2848+
client.captureEvent({ message: 'fifth', type: 'replay_event' }); // replay
2849+
client.captureEvent({ message: 'sixth', type: 'transaction' }); // transaction
2850+
2851+
expect(client._clearOutcomes()).toEqual([
2852+
{ reason: 'queue_overflow', category: 'error', quantity: 1 },
2853+
{ reason: 'queue_overflow', category: 'unknown', quantity: 2 },
2854+
{ reason: 'queue_overflow', category: 'replay', quantity: 1 },
2855+
{ reason: 'queue_overflow', category: 'transaction', quantity: 1 },
2856+
]);
2857+
});
2858+
2859+
it('should skip the promise buffer with sync integrations', async () => {
2860+
const options = getDefaultTestClientOptions({ dsn: PUBLIC_DSN, transportOptions: { bufferSize: 1 } });
2861+
const client = new TestClient(options);
2862+
2863+
client.addIntegration(new TestIntegration());
2864+
2865+
client.captureException(new Error('first'));
2866+
client.captureException(new Error('second'));
2867+
client.captureException(new Error('third'));
2868+
2869+
expect(client._clearOutcomes()).toEqual([]);
2870+
});
2871+
});
28092872
});

packages/core/test/mocks/integration.ts

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,16 @@ export class TestIntegration implements Integration {
2424
}
2525
}
2626

27+
export class AsyncTestIntegration implements Integration {
28+
public static id: string = 'AsyncTestIntegration';
29+
30+
public name: string = 'AsyncTestIntegration';
31+
32+
processEvent(event: Event): Event | null | PromiseLike<Event | null> {
33+
return new Promise(resolve => setTimeout(() => resolve(event), 1));
34+
}
35+
}
36+
2737
export class AddAttachmentTestIntegration implements Integration {
2838
public static id: string = 'AddAttachmentTestIntegration';
2939

0 commit comments

Comments
 (0)