Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions src/class/service/events.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,27 @@ export class EventsService extends EventEmitter {
};
}

async getUnresolvedEvents() {
const dispatcherTransactions = await this.dispatcherTransactionStore.getTransactions();
const backupDispatcherTransactions = await this.backupDispatcherTransactionStore.getTransactions();

const mappedBackupDispatcherTransactions = [...backupDispatcherTransactions.values()].map((backupDispatcherTransaction) => {
return {
...backupDispatcherTransaction,
isBackupTransaction: true
};
});

const events = [];
for (const dispatcherTransaction of [...dispatcherTransactions.values(), ...mappedBackupDispatcherTransactions]) {
if (dispatcherTransaction.name !== "PING" && !dispatcherTransaction.redisMetadata.resolved) {
events.push(dispatcherTransaction);
}
}

return events;
}

async getIncomerReceivedEvents(opts: GetIncomerReceivedEventsOptions): Promise<GetIncomerReceivedEventsResponse[]> {
const { incomerId } = opts;

Expand Down
112 changes: 111 additions & 1 deletion test/UT/class/service/events.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ describe("EventsService", () => {
})
});

describe("GetEventById", () => {
describe("getEventById", () => {
const incomer = new Incomer({
redis,
subscriber,
Expand Down Expand Up @@ -433,4 +433,114 @@ describe("EventsService", () => {
}
});
});

describe("getUnresolvedEvents", () => {
const incomer = new Incomer({
redis,
subscriber,
name: "foo",
eventsCast: [],
eventsSubscribe: [],
eventCallback: eventCallBackHandler,
dispatcherInactivityOptions: {
publishInterval: kPingInterval,
maxPingInterval: kPingInterval
},
externalsInitialized: true
});

const dispatcher = new Dispatcher({
redis,
name: "foo",
subscriber,
logger,
pingInterval: kPingInterval,
checkLastActivityInterval: 5_000,
checkTransactionInterval: 120_000,
eventsValidation: {
eventsValidationFn: eventsValidationFn,
customValidationCbFn: validate as any
},
idleTime: 5_000,
incomerUUID: incomer.baseUUID
});

const secondIncomer = new Incomer({
redis,
subscriber,
name: "foo",
eventsCast: [...Object.keys(AVAILABLE_EVENTS)],
eventsSubscribe: [...Object.values(AVAILABLE_EVENTS)],
eventCallback: eventCallBackHandler,
dispatcherInactivityOptions: {
publishInterval: kPingInterval,
maxPingInterval: kPingInterval
},
externalsInitialized: true
});

const secondDispatcher = new Dispatcher({
redis,
name: "foo",
subscriber,
logger,
pingInterval: kPingInterval,
checkLastActivityInterval: 5_000,
checkTransactionInterval: 120_000,
eventsValidation: {
eventsValidationFn: eventsValidationFn,
customValidationCbFn: validate as any
},
idleTime: 5_000,
incomerUUID: secondIncomer.baseUUID
});

before(async() => {
await redis.flushall();

await dispatcher.initialize();
await incomer.initialize();

await secondDispatcher.initialize();
await secondIncomer.initialize();
});

after(async() => {
await secondIncomer.close();
await secondDispatcher.close();
await incomer.close();
await dispatcher.close();
});

test("You should retrieved data about the unresolved events", async() => {
const eventId = await secondIncomer.publish({
name: "connector",
scope: {
schemaId: 1
},
operation: "CREATE",
data: {
id: "1",
code: "foo"
},
metadata: {
agent: "nodejs",
origin: {
endpoint: "test",
method: "POST"
},
createdAt: Date.now()
}
});

await timers.setTimeout(500);

const unresolvedEvents = await secondDispatcher.eventsService.getUnresolvedEvents();

for (const unresolvedEvent of unresolvedEvents.values()) {
console.log("FOOOOO", unresolvedEvent);
assert.equal(unresolvedEvent.redisMetadata.resolved, false);
}
});
});
});
Loading