Skip to content

Commit 1fc6293

Browse files
committed
ADD: loggers and event listeners
1 parent 4cb3c82 commit 1fc6293

File tree

10 files changed

+200
-260
lines changed

10 files changed

+200
-260
lines changed

lib/events/JobFailed.ts

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,9 @@
1-
import { EmitsEvent, Event } from '@squareboat/nest-events';
2-
import { events } from '../constants';
1+
import { EmitsEvent, Event } from "@squareboat/nest-events";
2+
import { events } from "../constants";
33

44
@Event(events.jobFailed)
5-
export class JobFailed extends EmitsEvent {}
5+
export class JobFailed extends EmitsEvent {
6+
constructor(public message: any, public job: any) {
7+
super();
8+
}
9+
}

lib/events/JobProcessed.ts

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,9 @@
1-
import { EmitsEvent, Event } from '@squareboat/nest-events';
2-
import { events } from '../constants';
1+
import { EmitsEvent, Event } from "@squareboat/nest-events";
2+
import { events } from "../constants";
33

44
@Event(events.jobProcessed)
5-
export class JobProcessed extends EmitsEvent {}
5+
export class JobProcessed extends EmitsEvent {
6+
constructor(public message: any, public job: any) {
7+
super();
8+
}
9+
}

lib/events/JobProcessing.ts

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,9 @@
1-
import { EmitsEvent, Event } from '@squareboat/nest-events';
2-
import { events } from '../constants';
1+
import { EmitsEvent, Event } from "@squareboat/nest-events";
2+
import { events } from "../constants";
33

44
@Event(events.jobProcessing)
5-
export class JobProcessing extends EmitsEvent {}
5+
export class JobProcessing extends EmitsEvent {
6+
constructor(public message: any, public job: any) {
7+
super();
8+
}
9+
}

lib/interfaces/options.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,4 +32,5 @@ export interface ListenerOptions {
3232
connection?: string;
3333
queue?: string;
3434
schedulerInterval?: number;
35+
logger?: boolean;
3536
}

lib/jobrunner.ts

Lines changed: 42 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,33 +6,73 @@ import {
66
import { ListenerOptions } from "./interfaces";
77
import { QueueMetadata } from "./metadata";
88
import { Dispatch } from "./queue";
9+
import { EmitEvent } from "@squareboat/nest-events";
10+
import { JobFailed, JobProcessed, JobProcessing } from "./events";
11+
import { ConsoleIO, Logger } from "@squareboat/nest-console";
912

1013
export class JobRunner {
14+
private consoleIo: ConsoleIO;
15+
1116
constructor(
1217
private options: ListenerOptions,
1318
private connection: QueueDriver
14-
) {}
19+
) {
20+
this.consoleIo = new ConsoleIO("", {});
21+
}
1522

1623
async run(job: DriverJob) {
1724
const message = this.fetchMessage(job);
1825
const { data } = message;
1926
try {
27+
this.log("info", `LOG [${message.job}] Job Processing...`);
2028
const targetJob = QueueMetadata.getJob(message.job);
2129
if (!targetJob || !targetJob.target) return;
22-
30+
const event = new JobProcessing(message, job);
31+
event.emit();
2332
await targetJob.target(data);
2433
await this.success(message, job);
34+
this.log("success", `LOG [${message.job}] Job Processed`);
2535
} catch (e) {
36+
const event = new JobFailed(message, job);
37+
event.emit();
2638
await this.retry(message, job);
39+
const errorMessage = (e as Error).message;
40+
this.log(
41+
"error",
42+
`LOG [${message.job}] Job Failed | Error: ${errorMessage}`
43+
);
2744
}
2845
}
2946

47+
log(level: string, msg: string): void {
48+
if (!this.options.logger) return;
49+
let logger = undefined;
50+
switch (level) {
51+
case "info":
52+
logger = Logger.info;
53+
break;
54+
case "success":
55+
logger = Logger.success;
56+
break;
57+
case "error":
58+
logger = Logger.error;
59+
break;
60+
case "warn":
61+
logger = Logger.warn;
62+
break;
63+
}
64+
65+
logger && logger(msg);
66+
}
67+
3068
/**
3169
* Job processed succesfully method
3270
* @param message
3371
* @param job
3472
*/
3573
async success(message: InternalMessage, job: DriverJob): Promise<void> {
74+
const event = new JobProcessed(message, job);
75+
event.emit();
3676
await this.removeJobFromQueue(job);
3777
}
3878

lib/metadata.ts

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,10 @@ interface JobTarget {
1111
@Injectable()
1212
export class QueueMetadata {
1313
private static data: QueueOptions;
14-
private static defaultOptions: Record<string, string | number | undefined>;
14+
private static defaultOptions: Record<
15+
string,
16+
string | number | boolean | undefined
17+
>;
1518
private static store: Record<string, any> = { jobs: {} };
1619

1720
constructor(@Inject(QUEUE_OPTIONS) data: QueueOptions) {

lib/queue.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ export class Queue {
1414

1515
export function Dispatch(message: Message): Promise<void> {
1616
const job = QueueMetadata.getJob(message.job);
17-
const payload = PayloadBuilder.build(message, job.options);
17+
const payload = PayloadBuilder.build(message, job?.options || {});
1818
const connection = QueueService.getConnection(payload.connection);
1919
return connection.push(JSON.stringify(payload), payload);
2020
}

lib/worker.ts

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ import { QueueMetadata } from "./metadata";
33
import { QueueService } from "./service";
44
import { JobRunner } from "./jobrunner";
55
import { DriverJob, QueueDriver } from "@squareboat/nest-queue-strategy";
6+
import { Logger } from "@squareboat/nest-console";
67

78
export class QueueWorker {
89
private options: ListenerOptions;
@@ -14,6 +15,7 @@ export class QueueWorker {
1415
...defaultOptions,
1516
schedulerInterval: 10000,
1617
queue: undefined,
18+
logger: true,
1719
...this.options,
1820
};
1921

@@ -38,6 +40,7 @@ export class QueueWorker {
3840
* Listen to the queue
3941
*/
4042
async listen() {
43+
this.log("info", "LOG [QueueWorker] Queue Worker Initialised");
4144
const connection = QueueService.getConnection(this.options.connection);
4245

4346
// perform scheduled task of the driver
@@ -75,4 +78,25 @@ export class QueueWorker {
7578
const connection = QueueService.getConnection(this.options.connection);
7679
return await connection.count({ queue: this.options.queue });
7780
}
81+
82+
log(level: string, msg: string): void {
83+
if (!this.options.logger) return;
84+
let logger = undefined;
85+
switch (level) {
86+
case "info":
87+
logger = Logger.info;
88+
break;
89+
case "success":
90+
logger = Logger.success;
91+
break;
92+
case "error":
93+
logger = Logger.error;
94+
break;
95+
case "warn":
96+
logger = Logger.warn;
97+
break;
98+
}
99+
100+
logger && logger(msg);
101+
}
78102
}

0 commit comments

Comments
 (0)