Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 26 additions & 13 deletions packages/adapters/cache/src/purchaseCache.ts
Original file line number Diff line number Diff line change
Expand Up @@ -51,16 +51,22 @@ export class PurchaseCache {
*/
public async getPurchases(targetIds: string[]): Promise<PurchaseAction[]> {
const results = await this.store.hmget(this.dataKey, ...targetIds);
const purchases: PurchaseAction[] = [];

return results
.filter((result): result is string => result !== null)
.map((result) => {
for (const result of results) {
if (result === null) continue;
try {
const parsed = JSON.parse(result) as PurchaseAction;
return {
purchases.push({
...parsed,
transactionType: parsed.transactionType || TransactionSubmissionType.Onchain, // backwards compatability
};
});
});
} catch (parseError) {
console.error('Failed to parse purchase data, skipping corrupted entry:', parseError);
}
}

return purchases;
}

/**
Expand Down Expand Up @@ -101,14 +107,21 @@ export class PurchaseCache {
*/
public async getAllPurchases(): Promise<PurchaseAction[]> {
const all = await this.store.hgetall(this.dataKey);
const purchases: PurchaseAction[] = [];

return Object.values(all).map((result) => {
const parsed = JSON.parse(result) as PurchaseAction;
return {
...parsed,
transactionType: parsed.transactionType || TransactionSubmissionType.Onchain, // backwards compatability
};
});
for (const result of Object.values(all)) {
try {
const parsed = JSON.parse(result) as PurchaseAction;
purchases.push({
...parsed,
transactionType: parsed.transactionType || TransactionSubmissionType.Onchain, // backwards compatability
});
} catch (parseError) {
console.error('Failed to parse purchase data in getAllPurchases, skipping corrupted entry:', parseError);
}
}

return purchases;
}

/**
Expand Down
33 changes: 33 additions & 0 deletions packages/adapters/cache/test/purchaseCache.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,21 @@ describe('PurchaseCache', () => {
expect(result).toEqual([mockPurchaseAction]);
});

it('should handle corrupted JSON data gracefully', async () => {
const consoleSpy = jest.spyOn(console, 'error').mockImplementation();
mockRedis.hmget.mockResolvedValue(['invalid-json{', JSON.stringify(mockPurchaseAction)]);

const result = await cache.getPurchases(['corrupted', 'test-intent-1']);

expect(result).toHaveLength(1);
expect(result[0]).toEqual(mockPurchaseAction);
expect(consoleSpy).toHaveBeenCalledWith(
'Failed to parse purchase data, skipping corrupted entry:',
expect.any(SyntaxError)
);
consoleSpy.mockRestore();
});

it('should return an empty array if targetIds is empty', async () => {
// If targetIds is empty, hmget might be called with just the key,
// or the mock might need to handle ...targetIds spreading an empty array.
Expand Down Expand Up @@ -266,6 +281,24 @@ describe('PurchaseCache', () => {
expect(result).toEqual([]);
expect(mockRedis.hgetall).toHaveBeenCalledWith('purchases:data');
});

it('should handle corrupted JSON data gracefully', async () => {
const consoleSpy = jest.spyOn(console, 'error').mockImplementation();
mockRedis.hgetall.mockResolvedValue({
'corrupted': 'invalid-json{',
[mockPurchaseAction.target.intent_id]: JSON.stringify(mockPurchaseAction),
});

const result = await cache.getAllPurchases();

expect(result).toHaveLength(1);
expect(result[0]).toEqual(mockPurchaseAction);
expect(consoleSpy).toHaveBeenCalledWith(
'Failed to parse purchase data in getAllPurchases, skipping corrupted entry:',
expect.any(SyntaxError)
);
consoleSpy.mockRestore();
});
});

describe('setPause', () => {
Expand Down
29 changes: 21 additions & 8 deletions packages/handler/src/consumer/eventConsumer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ export class EventConsumer {
private readonly activeCounts: Record<WebhookEventType, number> = {} as Record<WebhookEventType, number>;
// Debounce timers for pending work per event type (prevents unbounded async spawning)
private readonly pendingWorkTimers: Map<WebhookEventType, NodeJS.Timeout> = new Map();
// Tracks whether pending work processing is currently running for each event type (prevents race conditions)
private readonly pendingWorkInProgress: Set<WebhookEventType> = new Set();

constructor(
private readonly queue: EventQueue,
Expand Down Expand Up @@ -283,40 +285,51 @@ export class EventConsumer {
/**
* Schedule pending work for an event type with debouncing.
* Prevents unbounded async task spawning by coalescing rapid calls.
* Uses a lock to prevent race conditions between timer expiration and new scheduling.
*/
private schedulePendingWork(eventType: WebhookEventType): void {
// If there's already a pending timer for this event type, don't schedule another
if (this.pendingWorkTimers.has(eventType)) {
// If there's already a pending timer or work in progress for this event type, don't schedule another
if (this.pendingWorkTimers.has(eventType) || this.pendingWorkInProgress.has(eventType)) {
return;
}

// Schedule processing after a short debounce delay
const timer = setTimeout(() => {
// Delete timer reference first
this.pendingWorkTimers.delete(eventType);

// Only process if we're still running
if (!this.isProcessing) {
return;
}

this.processPendingEventsForType(eventType).catch((e) => {
this.logger.error('Error processing pending events after debounce', {
eventType,
error: jsonifyError(e),
// Mark work as in progress to prevent concurrent scheduling
this.pendingWorkInProgress.add(eventType);

this.processPendingEventsForType(eventType)
.catch((e) => {
this.logger.error('Error processing pending events after debounce', {
eventType,
error: jsonifyError(e),
});
})
.finally(() => {
// Clear the in-progress flag when done
this.pendingWorkInProgress.delete(eventType);
});
});
}, PENDING_WORK_DEBOUNCE_MS);

this.pendingWorkTimers.set(eventType, timer);
}

/**
* Clear all pending work timers (called during shutdown)
* Clear all pending work timers and in-progress flags (called during shutdown)
*/
private clearPendingWorkTimers(): void {
for (const timer of this.pendingWorkTimers.values()) {
clearTimeout(timer);
}
this.pendingWorkTimers.clear();
this.pendingWorkInProgress.clear();
}
}
3 changes: 3 additions & 0 deletions packages/handler/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,9 @@ async function runMaintenanceTasks(): Promise<void> {
// Clean up expired earmarks
await cleanupExpiredEarmarks(context);

// Clean up expired dead letter queue entries
await adapters.eventQueue.cleanupExpiredDeadLetterEntries();

// Cleanup expired regular rebalance operations
await cleanupExpiredRegularRebalanceOps(context);

Expand Down
126 changes: 111 additions & 15 deletions packages/handler/src/queue/eventQueue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@ import Redis from 'ioredis';
import { Logger } from '@mark/logger';
import { WebhookEvent, WebhookEventType } from '@mark/core';

// Default TTL for dead letter queue entries (7 days in milliseconds)
const DEFAULT_DEAD_LETTER_TTL_MS = 7 * 24 * 60 * 60 * 1000;

export enum EventPriority {
HIGH = 'HIGH',
NORMAL = 'NORMAL',
Expand Down Expand Up @@ -47,12 +50,15 @@ export class EventQueue {
private readonly cursorKey = `${this.prefix}:backfill-cursor`;
private readonly metricsKey = `${this.prefix}:metrics`;
private readonly store: Redis;
private readonly deadLetterTtlMs: number;

constructor(
host: string,
port: number,
private readonly logger: Logger,
deadLetterTtlMs: number = DEFAULT_DEAD_LETTER_TTL_MS,
) {
this.deadLetterTtlMs = deadLetterTtlMs;
this.store = new Redis({
host,
port,
Expand All @@ -73,8 +79,23 @@ export class EventQueue {
* Event ID is stored in a sorted set, full event data is stored separately.
* If the event exists in the processing queue, it will be removed first (for retries).
* @returns true if the event was already in the pending or processing queue, false otherwise
* @throws Error if event validation fails
*/
async enqueueEvent(event: QueuedEvent, priority: EventPriority = EventPriority.NORMAL): Promise<boolean> {
// Input validation
if (!event.id || typeof event.id !== 'string' || event.id.trim() === '') {
throw new Error('Event ID must be a non-empty string');
}
if (typeof event.scheduledAt !== 'number' || !Number.isFinite(event.scheduledAt) || event.scheduledAt < 0) {
throw new Error('Event scheduledAt must be a non-negative finite number');
}
if (!Object.values(EventPriority).includes(priority)) {
throw new Error(`Invalid priority: ${priority}. Must be one of: ${Object.values(EventPriority).join(', ')}`);
}
if (!Object.values(WebhookEventType).includes(event.type)) {
throw new Error(`Invalid event type: ${event.type}`);
}

const eventWithPriority = { ...event, priority };
const pendingQueueKey = this.pendingQueueKeys[event.type];
const processingQueueKey = this.processingQueueKeys[event.type];
Expand Down Expand Up @@ -156,12 +177,23 @@ export class EventQueue {
const eventId = eventIds[i];
const eventData = eventDataArray[i];
if (eventData) {
// Parse event to get scheduledAt for proper ordering
const event: QueuedEvent = JSON.parse(eventData);
const score = event.scheduledAt;
// Remove from the processing queue and add to the pending queue
multi.zrem(processingQueueKey, eventId);
multi.zadd(pendingQueueKey, score, eventId);
try {
// Parse event to get scheduledAt for proper ordering
const event: QueuedEvent = JSON.parse(eventData);
const score = event.scheduledAt;
// Remove from the processing queue and add to the pending queue
multi.zrem(processingQueueKey, eventId);
multi.zadd(pendingQueueKey, score, eventId);
} catch (parseError) {
// Corrupted event data - remove from processing queue and delete data
this.logger.error('Failed to parse event data during moveProcessingToPending, removing corrupted event', {
eventId,
eventType,
error: parseError instanceof Error ? parseError.message : String(parseError),
});
multi.zrem(processingQueueKey, eventId);
multi.hdel(this.dataKey, eventId);
}
} else {
// Event data is missing, just remove from the processing queue
multi.zrem(processingQueueKey, eventId);
Expand All @@ -184,9 +216,20 @@ export class EventQueue {
* Dequeue multiple events for processing (FIFO)
* Checks the specified event type pending queue and returns up to count ready events
* Uses a Redis transaction for atomic batch operations
* @param eventType - The type of events to dequeue
* @param count - Maximum number of events to dequeue (must be between 1 and 1000)
*/
async dequeueEvents(eventType: WebhookEventType, count: number): Promise<QueuedEvent[]> {
if (count <= 0) {
// Input validation
if (!Number.isInteger(count) || count <= 0) {
return [];
}
if (count > 1000) {
this.logger.warn('Dequeue count exceeds maximum, capping at 1000', { requestedCount: count });
count = 1000;
}
if (!Object.values(WebhookEventType).includes(eventType)) {
this.logger.error('Invalid event type for dequeue', { eventType });
return [];
}

Expand All @@ -211,11 +254,21 @@ export class EventQueue {
const eventId = eventIds[i];
const eventData = eventDataArray[i];
if (eventData) {
const event: QueuedEvent = JSON.parse(eventData);
// Skip events scheduled for future processing (scheduledAt > now)
if (event.scheduledAt <= Date.now()) {
events.push(event);
validEventIds.push(eventId);
try {
const event: QueuedEvent = JSON.parse(eventData);
// Skip events scheduled for future processing (scheduledAt > now)
if (event.scheduledAt <= Date.now()) {
events.push(event);
validEventIds.push(eventId);
}
} catch (parseError) {
// Corrupted event data - mark for removal
this.logger.error('Failed to parse event data during dequeue, marking as orphaned', {
eventId,
eventType,
error: parseError instanceof Error ? parseError.message : String(parseError),
});
orphanedEventIds.push(eventId);
}
} else {
// Event data is missing, mark for removal
Expand All @@ -233,12 +286,13 @@ export class EventQueue {
multi.zadd(processingQueueKey, timestamp, eventId);
}

// Remove orphaned event IDs from the queue (missing data)
// Remove orphaned event IDs from the queue and clean up any corrupted data
if (orphanedEventIds.length > 0) {
for (const eventId of orphanedEventIds) {
multi.zrem(pendingQueueKey, eventId);
multi.hdel(this.dataKey, eventId);
}
this.logger.warn('Removed orphaned event IDs (missing data)', {
this.logger.warn('Removed orphaned event IDs', {
eventType,
orphanedCount: orphanedEventIds.length,
orphanedIds: orphanedEventIds,
Expand Down Expand Up @@ -286,6 +340,39 @@ export class EventQueue {
this.logger.warn('Event moved to dead letter queue', { eventId: event.id, error });
}

/**
* Clean up expired entries from the dead letter queue.
* Removes entries older than the configured TTL and their associated data.
* @returns Number of expired entries removed
*/
async cleanupExpiredDeadLetterEntries(): Promise<number> {
const cutoffTimestamp = Date.now() - this.deadLetterTtlMs;

// Get all event IDs with scores (timestamps) older than the cutoff
// Score range: 0 to cutoffTimestamp (entries added before the cutoff)
const expiredEventIds = await this.store.zrangebyscore(this.deadLetterQueueKey, 0, cutoffTimestamp);

if (expiredEventIds.length === 0) {
return 0;
}

// Remove expired entries from the sorted set and their data from the hash
const multi = this.store.multi();
for (const eventId of expiredEventIds) {
multi.zrem(this.deadLetterQueueKey, eventId);
multi.hdel(this.dataKey, eventId);
}
await multi.exec();

this.logger.info('Cleaned up expired dead letter queue entries', {
count: expiredEventIds.length,
cutoffTimestamp,
ttlMs: this.deadLetterTtlMs,
});

return expiredEventIds.length;
}

/**
* Get queue status
*/
Expand All @@ -306,7 +393,16 @@ export class EventQueue {

// Get the last processed timestamp from status
const statusData = await this.store.get(this.statusKey);
const status = statusData ? JSON.parse(statusData) : {};
let status: { lastProcessedAt?: number } = {};
if (statusData) {
try {
status = JSON.parse(statusData);
} catch (parseError) {
this.logger.error('Failed to parse queue status data', {
error: parseError instanceof Error ? parseError.message : String(parseError),
});
}
}

return {
queueLength: totalQueueLength,
Expand Down
Loading