Skip to content

Commit 2dd10bc

Browse files
authored
Merge pull request #24 from AthennaIO/develop
chore(queue): set status to pending it not acked
2 parents 376cf66 + 87d7e84 commit 2dd10bc

File tree

6 files changed

+67
-51
lines changed

6 files changed

+67
-51
lines changed

package-lock.json

Lines changed: 2 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
{
22
"name": "@athenna/queue",
3-
"version": "5.8.0",
3+
"version": "5.9.0",
44
"description": "The Athenna queue handler.",
55
"license": "MIT",
66
"author": "João Lenon <[email protected]>",

src/drivers/DatabaseDriver.ts

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,11 @@ import type { DatabaseImpl } from '@athenna/database'
1616
import { ConnectionFactory } from '#src/factories/ConnectionFactory'
1717

1818
export class DatabaseDriver extends Driver<DatabaseImpl> {
19+
/**
20+
* Set the acked ids of the driver.
21+
*/
22+
private ackedIds = new Set<string>()
23+
1924
/**
2025
* The `connection` database that is being used.
2126
*/
@@ -183,7 +188,13 @@ export class DatabaseDriver extends Driver<DatabaseImpl> {
183188
* ```
184189
*/
185190
public async ack(id: string) {
186-
await this.client.table(this.table).where('id', id).delete()
191+
this.ackedIds.add(id)
192+
193+
await this.client
194+
.table(this.table)
195+
.where('id', id)
196+
.where('status', 'processing')
197+
.delete()
187198
}
188199

189200
/**
@@ -242,10 +253,27 @@ export class DatabaseDriver extends Driver<DatabaseImpl> {
242253
return
243254
}
244255

256+
this.ackedIds.delete(job.id)
257+
245258
job.attemptsLeft--
259+
job.status = 'processing'
260+
261+
await this.client.table(this.table).where('id', job.id).update({
262+
status: 'processing',
263+
attemptsLeft: job.attemptsLeft
264+
})
246265

247266
try {
248267
await processor(job)
268+
269+
/**
270+
* If the job still exists after processing, it means that the job was
271+
* not processed for some reason, so we need to put it back the pending
272+
* status.
273+
*/
274+
if (!this.ackedIds.has(job.id)) {
275+
job.status = 'pending'
276+
}
249277
} catch (err) {
250278
const shouldRetry = job.attemptsLeft > 0
251279

src/drivers/VanillaDriver.ts

Lines changed: 35 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -12,10 +12,13 @@ import { Options, Uuid } from '@athenna/common'
1212
import { Driver } from '#src/drivers/Driver'
1313
import type { ConnectionOptions } from '#src/types'
1414
import { ConnectionFactory } from '#src/factories/ConnectionFactory'
15-
import { NotFoundJobException } from '#src/exceptions/NotFoundJobException'
16-
import { JobNotProcessingException } from '#src/exceptions/JobNotProcessingException'
1715

1816
export class VanillaDriver extends Driver {
17+
/**
18+
* Set the acked ids of the driver.
19+
*/
20+
private ackedIds = new Set<string>()
21+
1922
private defineQueue() {
2023
if (!this.client.queues[this.queueName]) {
2124
this.client.queues[this.queueName] = []
@@ -171,6 +174,20 @@ export class VanillaDriver extends Driver {
171174
return this.client.queues[this.queueName].length
172175
}
173176

177+
/**
178+
* Find a job by its id.
179+
*
180+
* @example
181+
* ```ts
182+
* const job = await Queue.getJobById(id)
183+
* ```
184+
*/
185+
public async getJobById(id: string) {
186+
this.defineQueue()
187+
188+
return this.client.queues[this.queueName].find(j => j.id === id)
189+
}
190+
174191
/**
175192
* Acknowledge the job removing it from the queue.
176193
*
@@ -182,14 +199,16 @@ export class VanillaDriver extends Driver {
182199
public async ack(id: string) {
183200
this.defineQueue()
184201

185-
const job = this.client.queues[this.queueName].find(j => j.id === id)
202+
this.ackedIds.add(id)
203+
204+
const job = await this.getJobById(id)
186205

187206
if (!job) {
188-
throw new NotFoundJobException(id)
207+
return
189208
}
190209

191210
if (job.status !== 'processing') {
192-
throw new JobNotProcessingException(id)
211+
return
193212
}
194213

195214
this.client.queues[this.queueName] = this.client.queues[
@@ -231,11 +250,22 @@ export class VanillaDriver extends Driver {
231250
return
232251
}
233252

253+
this.ackedIds.delete(job.id)
254+
234255
job.attemptsLeft--
235256
job.status = 'processing'
236257

237258
try {
238259
await processor(job)
260+
261+
/**
262+
* If the job still exists after processing, it means that the job was
263+
* not processed for some reason, so we need to put it back the pending
264+
* status.
265+
*/
266+
if (!this.ackedIds.has(job.id)) {
267+
job.status = 'pending'
268+
}
239269
} catch (err) {
240270
const shouldRetry = job.attemptsLeft > 0
241271

src/exceptions/JobNotProcessingException.ts

Lines changed: 0 additions & 21 deletions
This file was deleted.

src/exceptions/NotFoundJobException.ts

Lines changed: 0 additions & 21 deletions
This file was deleted.

0 commit comments

Comments
 (0)