From a7230247a7b62e78183afb3451673afc18a71a95 Mon Sep 17 00:00:00 2001 From: Nicolas Hallaert Date: Wed, 25 Jun 2025 13:25:13 +0200 Subject: [PATCH 1/2] feat(./class/service/events.service.ts): getUnresolvedEvents fn --- src/class/service/events.service.ts | 21 +++++++++++++++++++++ 1 file changed, 21 insertions(+) diff --git a/src/class/service/events.service.ts b/src/class/service/events.service.ts index d2540f0d..b1f4782d 100644 --- a/src/class/service/events.service.ts +++ b/src/class/service/events.service.ts @@ -114,6 +114,27 @@ export class EventsService extends EventEmitter { }; } + async getUnresolvedEvents() { + const dispatcherTransactions = await this.dispatcherTransactionStore.getTransactions(); + const backupDispatcherTransactions = await this.backupDispatcherTransactionStore.getTransactions(); + + const mappedBackupDispatcherTransactions = [...backupDispatcherTransactions.values()].map((backupDispatcherTransaction) => { + return { + ...backupDispatcherTransaction, + isBackupTransaction: true + }; + }); + + const events = []; + for (const dispatcherTransaction of [...dispatcherTransactions.values(), ...mappedBackupDispatcherTransactions]) { + if (dispatcherTransaction.name !== "PING" && !dispatcherTransaction.redisMetadata.resolved) { + events.push(dispatcherTransaction); + } + } + + return events; + } + async getIncomerReceivedEvents(opts: GetIncomerReceivedEventsOptions): Promise { const { incomerId } = opts; From ec7a99162361a35bf14fb5658b46f047f743829b Mon Sep 17 00:00:00 2001 From: Nicolas Hallaert Date: Mon, 7 Jul 2025 19:14:50 +0200 Subject: [PATCH 2/2] test(): update ut --- test/UT/class/service/events.test.ts | 112 ++++++++++++++++++++++++++- 1 file changed, 111 insertions(+), 1 deletion(-) diff --git a/test/UT/class/service/events.test.ts b/test/UT/class/service/events.test.ts index a2abd58b..b31367a0 100644 --- a/test/UT/class/service/events.test.ts +++ b/test/UT/class/service/events.test.ts @@ -211,7 +211,7 @@ describe("EventsService", () => { }) }); - describe("GetEventById", () => { + describe("getEventById", () => { const incomer = new Incomer({ redis, subscriber, @@ -433,4 +433,114 @@ describe("EventsService", () => { } }); }); + + describe("getUnresolvedEvents", () => { + const incomer = new Incomer({ + redis, + subscriber, + name: "foo", + eventsCast: [], + eventsSubscribe: [], + eventCallback: eventCallBackHandler, + dispatcherInactivityOptions: { + publishInterval: kPingInterval, + maxPingInterval: kPingInterval + }, + externalsInitialized: true + }); + + const dispatcher = new Dispatcher({ + redis, + name: "foo", + subscriber, + logger, + pingInterval: kPingInterval, + checkLastActivityInterval: 5_000, + checkTransactionInterval: 120_000, + eventsValidation: { + eventsValidationFn: eventsValidationFn, + customValidationCbFn: validate as any + }, + idleTime: 5_000, + incomerUUID: incomer.baseUUID + }); + + const secondIncomer = new Incomer({ + redis, + subscriber, + name: "foo", + eventsCast: [...Object.keys(AVAILABLE_EVENTS)], + eventsSubscribe: [...Object.values(AVAILABLE_EVENTS)], + eventCallback: eventCallBackHandler, + dispatcherInactivityOptions: { + publishInterval: kPingInterval, + maxPingInterval: kPingInterval + }, + externalsInitialized: true + }); + + const secondDispatcher = new Dispatcher({ + redis, + name: "foo", + subscriber, + logger, + pingInterval: kPingInterval, + checkLastActivityInterval: 5_000, + checkTransactionInterval: 120_000, + eventsValidation: { + eventsValidationFn: eventsValidationFn, + customValidationCbFn: validate as any + }, + idleTime: 5_000, + incomerUUID: secondIncomer.baseUUID + }); + + before(async() => { + await redis.flushall(); + + await dispatcher.initialize(); + await incomer.initialize(); + + await secondDispatcher.initialize(); + await secondIncomer.initialize(); + }); + + after(async() => { + await secondIncomer.close(); + await secondDispatcher.close(); + await incomer.close(); + await dispatcher.close(); + }); + + test("You should retrieved data about the unresolved events", async() => { + const eventId = await secondIncomer.publish({ + name: "connector", + scope: { + schemaId: 1 + }, + operation: "CREATE", + data: { + id: "1", + code: "foo" + }, + metadata: { + agent: "nodejs", + origin: { + endpoint: "test", + method: "POST" + }, + createdAt: Date.now() + } + }); + + await timers.setTimeout(500); + + const unresolvedEvents = await secondDispatcher.eventsService.getUnresolvedEvents(); + + for (const unresolvedEvent of unresolvedEvents.values()) { + console.log("FOOOOO", unresolvedEvent); + assert.equal(unresolvedEvent.redisMetadata.resolved, false); + } + }); + }); });