diff --git a/package-lock.json b/package-lock.json index b249fc8..ae5fdc9 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1,16 +1,16 @@ { "name": "@athenna/ratelimiter", - "version": "5.2.0", + "version": "5.3.0", "lockfileVersion": 3, "requires": true, "packages": { "": { "name": "@athenna/ratelimiter", - "version": "5.2.0", + "version": "5.3.0", "license": "MIT", "devDependencies": { "@athenna/cache": "^5.2.0", - "@athenna/common": "^5.18.0", + "@athenna/common": "^5.19.0", "@athenna/config": "^5.4.0", "@athenna/ioc": "^5.2.0", "@athenna/logger": "^5.10.0", @@ -46,9 +46,9 @@ } }, "node_modules/@athenna/common": { - "version": "5.18.0", - "resolved": "https://registry.npmjs.org/@athenna/common/-/common-5.18.0.tgz", - "integrity": "sha512-UW4A7b8zl42ZLltih6Sm231StqtUbmO3Yq9+hDjSglP8coyjbOlCJ97r/hGjcCpPGBBTET50LcI85aVkDGw7OA==", + "version": "5.19.0", + "resolved": "https://registry.npmjs.org/@athenna/common/-/common-5.19.0.tgz", + "integrity": "sha512-h1UrJFjl+0JS0lGRgnnujo/F4HXJg+/L7Ox7EJM0cPrsEOgg9kCFjGrsf0PEy6AkPwhbhlaYP2W5umw+6CsOew==", "dev": true, "license": "MIT", "dependencies": { diff --git a/package.json b/package.json index 4a7f1f6..f5cf986 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "@athenna/ratelimiter", - "version": "5.2.0", + "version": "5.3.0", "description": "Respect the rate limit rules of API's you need to consume.", "license": "MIT", "author": "João Lenon ", @@ -51,7 +51,7 @@ }, "devDependencies": { "@athenna/cache": "^5.2.0", - "@athenna/common": "^5.18.0", + "@athenna/common": "^5.19.0", "@athenna/config": "^5.4.0", "@athenna/ioc": "^5.2.0", "@athenna/logger": "^5.10.0", @@ -152,6 +152,7 @@ "camelcase": "off", "dot-notation": "off", "prettier/prettier": "error", + "no-case-declarations": "off", "no-useless-constructor": "off", "@typescript-eslint/no-explicit-any": "off", "@typescript-eslint/no-empty-function": "off", diff --git a/src/exceptions/MissingRuleException.ts b/src/exceptions/MissingRuleException.ts index 84cde0d..3b4ce44 100644 --- a/src/exceptions/MissingRuleException.ts +++ b/src/exceptions/MissingRuleException.ts @@ -11,10 +11,14 @@ import { Exception } from '@athenna/common' export class MissingRuleException extends Exception { public constructor() { + const message = 'Missing rules value for rate limiter and targets.' + const help = + 'This error happens when you forget to define default rules for your RateLimiter instance and custom rules by target. You has two options, define a default rule in your RateLimiter that will be used by targets that does not have a rule or define a custom rule for all your targets.' + super({ code: 'E_MISSING_RULE_ERROR', - help: 'This errors happens when you forget to define rules for your RateLimiter instance.', - message: 'Missing rules value for rate limiter.' + help, + message }) } } diff --git a/src/index.ts b/src/index.ts index 6ba2cf7..6e63b54 100644 --- a/src/index.ts +++ b/src/index.ts @@ -10,7 +10,5 @@ export * from '#src/types' export * from '#src/ratelimiter/RateLimiter' +export * from '#src/ratelimiter/RateLimitStore' export * from '#src/ratelimiter/RateLimiterBuilder' -export * from '#src/ratelimiter/stores/RedisStore' -export * from '#src/ratelimiter/stores/MemoryStore' -export * from '#src/ratelimiter/stores/RateLimitStore' diff --git a/src/ratelimiter/RateLimitStore.ts b/src/ratelimiter/RateLimitStore.ts new file mode 100644 index 0000000..e62f7a8 --- /dev/null +++ b/src/ratelimiter/RateLimitStore.ts @@ -0,0 +1,155 @@ +/** + * @athenna/ratelimiter + * + * (c) João Lenon + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +import { debug } from '#src/debug' +import { Cache } from '@athenna/cache' +import { Macroable } from '@athenna/common' +import { WINDOW_MS } from '#src/constants/window' +import type { Reserve, RateLimitRule, RateLimitStoreOptions } from '#src/types' + +export class RateLimitStore extends Macroable { + /** + * Holds the options that will be used to build the rate limiter + * store. + */ + public options: RateLimitStoreOptions + + public constructor(options: RateLimitStoreOptions) { + super() + + options.windowMs = options.windowMs ?? WINDOW_MS + + this.options = options + } + + public async truncate() { + await Cache.store(this.options.store).truncate() + } + + /** + * Get the rate limit buckets from the cache or initialize them. + */ + public async getOrInit(key: string, rules: RateLimitRule[]) { + const cache = Cache.store(this.options.store) + + let buckets = await cache.get(key) + + if (!buckets) { + buckets = JSON.stringify(rules.map(() => [])) + + await cache.set(key, buckets) + } + + return JSON.parse(buckets) as number[][] + } + + /** + * Get the defined cooldown if it exists in the cache. + * If it cannot be found, return 0. + */ + public async getCooldown(key: string) { + const cdKey = `${key}:cooldown` + + debug('getting cooldown in %s store for key %s', this.options.store, cdKey) + + const cooldown = await Cache.store(this.options.store).get(cdKey) + + if (!cooldown) { + return 0 + } + + return Number(cooldown) + } + + /** + * Put the key in cooldown for some milliseconds. Also saves + * the timestamp into the cache for when it will be available + * again. + */ + public async setCooldown(key: string, ms: number) { + if (!ms || ms <= 0) { + return + } + + const cdKey = `${key}:cooldown` + const cdMs = `${Date.now() + ms}` + + debug( + 'setting cooldown of %s ms in %s store for key %s', + cdMs, + this.options.store, + cdKey + ) + + await Cache.store(this.options.store).set(cdKey, cdMs, { ttl: ms }) + } + + /** + * Try to reserve a token for all rules of the key. If not + * allowed to reserve, return the maximum waitMs necessary. + */ + public async tryReserve(key: string, rules: RateLimitRule[]) { + debug( + 'running %s store tryReserve for key %s with rules %o', + this.options.store, + key, + rules + ) + + let wait = 0 + const now = Date.now() + const cache = Cache.store(this.options.store) + const cooldown = await this.getCooldown(key) + + if (Number.isFinite(cooldown) && cooldown > now) { + return { allowed: false, waitMs: cooldown - now } + } + + await cache.delete(`${key}:cooldown`) + + const buckets = await this.getOrInit(key, rules) + + for (let i = 0; i < rules.length; i++) { + const bucket = buckets[i] + const window = this.options.windowMs[rules[i].type] + + while (bucket.length && bucket[0] <= now - window) { + bucket.shift() + } + + if (bucket.length >= rules[i].limit) { + const earliest = bucket[0] + const rem = earliest + window - now + + if (rem > wait) { + wait = rem + } + } + } + + const reserve: Reserve = { allowed: false, waitMs: wait } + + if (wait > 0) { + await cache.set(key, JSON.stringify(buckets)) + + return reserve + } + + for (let i = 0; i < rules.length; i++) { + buckets[i].push(now) + } + + await cache.set(key, JSON.stringify(buckets)) + + reserve.waitMs = 0 + reserve.allowed = true + + return reserve + } +} diff --git a/src/ratelimiter/RateLimiterBuilder.ts b/src/ratelimiter/RateLimiterBuilder.ts index efbb363..d791b86 100644 --- a/src/ratelimiter/RateLimiterBuilder.ts +++ b/src/ratelimiter/RateLimiterBuilder.ts @@ -11,21 +11,30 @@ import type { QueueItem, RateLimitRule, ScheduleOptions, - RateLimiterOptions + RateLimitTarget, + RateLimitRetryCtx, + RateLimiterOptions, + RateLimitScheduleCtx, + RateLimitStoreOptions, + RateLimitRetryDecision } from '#src/types' +import { debug } from '#src/debug' +import { Config } from '@athenna/config' +import { RateLimitStore } from '#src/ratelimiter/RateLimitStore' +import { Json, String, Macroable, Options } from '@athenna/common' import { MissingKeyException } from '#src/exceptions/MissingKeyException' import { MissingRuleException } from '#src/exceptions/MissingRuleException' -import type { RateLimitStore } from '#src/ratelimiter/stores/RateLimitStore' import { MissingStoreException } from '#src/exceptions/MissingStoreException' -export class RateLimiterBuilder { +export class RateLimiterBuilder extends Macroable { /** * Holds the options that will be used to build the rate limiter. */ private options: RateLimiterOptions = { + jitterMs: 0, maxConcurrent: 1, - jitterMs: 0 + targetSelectionStrategy: 'first_available' } /** @@ -46,6 +55,11 @@ export class RateLimiterBuilder { */ private queue: QueueItem[] = [] + /** + * Index for when using round_robin selection strategy. + */ + private rrIndex = 0 + /** * Holds the setTimeout id to be able to disable it * later on. @@ -59,6 +73,16 @@ export class RateLimiterBuilder { /** * Logical key that will be used by store to save buckets. + * + * @example + * ```ts + * const limiter = RateLimiter.build() + * .store('memory') + * .addRule({ type: 'second', limit: 1 }) + * .key('request:/profile') + * + * await limiter.schedule(() => {...}) + * ``` */ public key(value: string) { this.options.key = value @@ -69,15 +93,43 @@ export class RateLimiterBuilder { /** * Define the store that will be responsible to save the * rate limit buckets. + * + * @example + * ```ts + * const limiter = RateLimiter.build() + * .key('request:/profile') + * .addRule({ type: 'second', limit: 1 }) + * .store('memory') + * + * await limiter.schedule(() => {...}) + * ``` */ - public store(value: RateLimitStore) { - this.options.store = value + public store( + store: 'memory' | 'redis' | string, + options: Omit = {} + ) { + // eslint-disable-next-line + // @ts-ignore + options.store = store + + this.options.store = new RateLimitStore(options) return this } /** * Set the max number of tasks that could run concurrently. + * + * @example + * ```ts + * const limiter = RateLimiter.build() + * .store('memory') + * .key('request:/profile') + * .addRule({ type: 'second', limit: 1 }) + * .maxConcurrent(10) + * + * await limiter.schedule(() => {...}) + * ``` */ public maxConcurrent(value: number) { this.options.maxConcurrent = Math.max(1, value ?? 1) @@ -88,6 +140,17 @@ export class RateLimiterBuilder { /** * Random jitter in milliseconds to avoid thundering herd in * distributed environments. + * + * @example + * ```ts + * const limiter = RateLimiter.build() + * .store('memory') + * .key('request:/profile') + * .addRule({ type: 'second', limit: 1 }) + * .randomJitter(1000) + * + * await limiter.schedule(() => {...}) + * ``` */ public jitterMs(value: number) { this.options.jitterMs = Math.max(0, value ?? 0) @@ -97,6 +160,16 @@ export class RateLimiterBuilder { /** * Add a new rate limit rule. + * + * @example + * ```ts + * const limiter = RateLimiter.build() + * .store('memory') + * .key('request:/profile') + * .addRule({ type: 'second', limit: 1 }) + * + * await limiter.schedule(() => {...}) + * ``` */ public addRule(rule: RateLimitRule) { if (!this.options.rules) { @@ -108,14 +181,139 @@ export class RateLimiterBuilder { return this } + /** + * Add a new rate limit target. + * + * @example + * ```ts + * const limiter = RateLimiter.build() + * .store('memory') + * .key('request:/profile') + * .addRule({ type: 'second', limit: 1 }) + * .addTarget({ metadata: { baseUrl: 'http://example.com' } }) + * + * await limiter.schedule(() => {...}) + * ``` + */ + public addTarget(target: RateLimitTarget) { + if (!this.options.targets) { + this.options.targets = [] + } + + if (!target.id) { + target.id = this.getTargetId(target) + } + + this.options.targets.push(target) + + return this + } + + /** + * Set multiple rate limit rules with one method call. + * + * @example + * ```ts + * const limiter = RateLimiter.build() + * .store('memory') + * .key('request:/profile') + * .setRules([{ type: 'second', limit: 1 }]) + * + * await limiter.schedule(() => {...}) + * ``` + */ + public setRules(rules: RateLimitRule[]) { + rules.forEach(rule => this.addRule(rule)) + + return this + } + + /** + * Set multiple rate limit targets with one method call. + * + * @example + * ```ts + * const limiter = RateLimiter.build() + * .store('memory') + * .key('request:/profile') + * .addRule({ type: 'second', limit: 1 }) + * .setTargets([{ metadata: { baseUrl: 'http://example.com' } }]) + * + * await limiter.schedule(() => {...}) + * ``` + */ + public setTargets(targets: RateLimitTarget[]) { + targets.forEach(target => this.addTarget(target)) + + return this + } + + /** + * Define the target selection strategy that will be used + * to select the next one when an target fails. + * + * @example + * ```ts + * const limiter = RateLimiter.build() + * .store('memory') + * .key('request:/profile') + * .addRule({ type: 'second', limit: 1 }) + * .addTarget({ metadata: { baseUrl: 'http://example.com' } }) + * .targetSelectionStrategy('round_robin') + * + * await limiter.schedule(() => {...}) + * ``` + */ + public targetSelectionStrategy(value: 'first_available' | 'round_robin') { + this.options.targetSelectionStrategy = value + + return this + } + + /** + * Define the RateLimiter retry strategy. This is useful to control + * when and how we should proceed with the retry of tasks that failed + * to execute. + * + * @example + * ```ts + * const limiter = RateLimiter.build() + * .store('memory') + * .key('request:/profile') + * .addRule({ type: 'second', limit: 1 }) + * .retryStrategy(({ attempt }) => { + * const decision = { type: 'fail' } + * + * if (attempt === 3) { + * return decision + * } + * + * decision.type = 'retry_same' + * + * return decision + * }) + * + * await limiter.schedule(() => {...}) + * ``` + */ + public retryStrategy( + fn: ( + ctx?: RateLimitRetryCtx + ) => RateLimitRetryDecision | Promise + ) { + this.options.retryStrategy = fn + + return this + } + /** * Return the current number of active tasks. * * @example * ```ts * const limiter = RateLimiter.build() - * .store(new MemoryStore()) - * .key('request:api-key:/profile') + * .store('memory') + * .key('request:/profile') * .addRule({ type: 'second', limit: 1 }) * * limiter.getActiveCount() // 0 @@ -132,8 +330,8 @@ export class RateLimiterBuilder { * @example * ```ts * const limiter = RateLimiter.build() - * .store(new MemoryStore()) - * .key('request:api-key:/profile') + * .store('memory') + * .key('request:/profile') * .addRule({ type: 'second', limit: 1 }) * * limiter.getQueuedCount() // 0 @@ -144,14 +342,14 @@ export class RateLimiterBuilder { } /** - * Stimate when the next slot will be available based on the + * Estimate when the next slot will be available based on the * next retry defined. * * @example * ```ts * const limiter = RateLimiter.build() - * .store(new MemoryStore()) - * .key('request:api-key:/profile') + * .store('memory') + * .key('request:/profile') * .addRule({ type: 'second', limit: 1 }) * * limiter.getAvailableInMs() // 0 @@ -164,9 +362,20 @@ export class RateLimiterBuilder { } /** - * Drop all the tasks that are in the queue. + * Drop all the tasks that are in the queue and clear + * store. + * + * @example + * ```ts + * const limiter = RateLimiter.build() + * .store('memory') + * .key('request:/profile') + * .addRule({ type: 'second', limit: 1 }) + * + * await limiter.truncate() + * ``` */ - public truncate() { + public async truncate() { this.queue = [] this.active = 0 this.nextWakeUpAt = 0 @@ -177,15 +386,29 @@ export class RateLimiterBuilder { this.timer = null } + await this.options.store.truncate() + return this } /** * Schedule the execution of an async function respecting - * the rate limit rules. + * the rate limit rules and the targets. + * + * @example + * ```ts + * const limiter = RateLimiter.build() + * .store('memory') + * .key('request:/profile') + * .addRule({ type: 'second', limit: 1 }) + * + * const response = await limiter.schedule(() => { + * return fetch('http://example.com') + * }) + * ``` */ public schedule( - fn: (signal?: AbortSignal) => T | Promise, + closure: (ctx: RateLimitScheduleCtx) => T | Promise, opts: ScheduleOptions = {} ): Promise { if (!this.options.key) { @@ -197,7 +420,17 @@ export class RateLimiterBuilder { } if (!this.options.rules?.length) { - throw new MissingRuleException() + if (!this.options.targets?.length) { + throw new MissingRuleException() + } + + const missingRuleTargets = this.options.targets.filter( + target => !target.rules + ) + + if (missingRuleTargets.length) { + throw new MissingRuleException() + } } if (opts.signal?.aborted) { @@ -206,11 +439,12 @@ export class RateLimiterBuilder { return new Promise((resolve, reject) => { const item: QueueItem = { - run: fn, + run: closure, resolve, reject, started: false, - opts + signal: opts.signal, + attempt: 1 } this.queue.push(item) @@ -239,118 +473,459 @@ export class RateLimiterBuilder { item.abortHandler = onAbort } - this.pump() + this.scheduleQueueItemRun() }) } /** - * Process the queue of tasks. + * Create a custom id for an target by reading the metadata object. + * The object will always be sorted by keys. */ - private pump() { - if (this.timer) { - return + public getTargetId(target: RateLimitTarget) { + return String.hash(JSON.stringify(Json.sort(target.metadata)), { + key: Config.get('app.key', 'ratelimiter') + }) + } + + /** + * Create a custom key for an target to be used to map the + * target rules into the cache. + */ + public createTargetKey(target: RateLimitTarget) { + return `${this.options.key}:${this.getTargetId(target)}` + } + + /** + * Get a random jitter or return 0 if user has not + * defined one. + */ + private randomJitter(): number { + if (!this.options.jitterMs) { + return 0 } - const tryRun = async () => { - this.timer = null + return Math.floor(Math.random() * this.options.jitterMs) + } - if (this.active >= this.options.maxConcurrent) { - return + /** + * Read the target selection strategy and defines which is + * going to be used. + */ + private createIdxBySelectionStrategy(item: QueueItem) { + if (item.pinnedTargetId) { + const i = this.options.targets.findIndex( + a => a.id === item.pinnedTargetId + ) + + if (i >= 0) { + return [i] } + } - if (this.queue.length === 0) { - return + let indexes = [] + + switch (this.options.targetSelectionStrategy) { + case 'round_robin': + indexes = this.createRoundRobinIdx() + break + case 'first_available': + default: + indexes = this.createFirstAvailableIdx() + } + + if (item.avoidTargetId) { + const i = this.options.targets.findIndex(a => a.id === item.avoidTargetId) + + return indexes.filter(idx => idx !== i) + } + + return indexes + } + + /** + * Create the indexes for when using round_robin selection strategy. + */ + private createRoundRobinIdx() { + return Array.from( + { length: this.options.targets.length }, + (_, k) => (this.rrIndex + k) % this.options.targets.length + ) + } + + /** + * Create the indexes for when using first_available selection strategy. + */ + private createFirstAvailableIdx() { + return Array.from({ length: this.options.targets.length }, (_, k) => k) + } + + /** + * Release the rate limit task. + */ + private releaseTask(options: { isToScheduleTick: boolean }) { + this.active-- + + if (options.isToScheduleTick) { + if (this.timer) { + clearTimeout(this.timer) + + this.timer = null } - const now = Date.now() - - let waitMs = 0 - let allowed = false - - try { - const res = await this.options.store.tryReserve( - this.options.key, - this.options.rules - ) - - this.storeErrorCount = 0 - allowed = res.allowed - waitMs = res.waitMs - } catch (error) { - this.storeErrorCount++ - - /** - * If the store failed 10 times it means it is not working for some - * reason, in this case we can reject all the requests that are in - * the queue. - */ - if (this.storeErrorCount > 10) { - while (this.queue.length) { - const item = this.queue.shift()! - - item.reject(error) + this.scheduleQueueItemRun() + } + } + + /** + * Try process an item from the queue of tasks. + */ + private tryToRunQueueItem = async () => { + this.timer = null + + if (this.active >= this.options.maxConcurrent) { + return + } + + if (this.queue.length === 0) { + return + } + + const now = Date.now() + + if (this.options.targets?.length) { + let minWait = Number.POSITIVE_INFINITY + let target: RateLimitTarget = null + + const nextItem = this.queue[0] + + for (const i of this.createIdxBySelectionStrategy(nextItem)) { + const key = this.createTargetKey(this.options.targets[i]) + + const rules = this.options.targets[i].rules?.length + ? this.options.targets[i].rules + : this.options.rules + + try { + const res = await this.options.store.tryReserve(key, rules) + + this.storeErrorCount = 0 + + if (res.allowed) { + target = this.options.targets[i] + + if (this.options.targetSelectionStrategy === 'round_robin') { + this.rrIndex = (i + 1) % this.options.targets.length + } + + break } - throw error - } + minWait = Math.min(minWait, res.waitMs) + } catch (error) { + this.storeErrorCount++ + + if (this.storeErrorCount > 10) { + while (this.queue.length) { + this.queue.shift().reject(error) + } - allowed = false - waitMs = 100 + throw error + } + + minWait = Math.min(minWait, 100) + } } - if (!allowed) { - const delay = waitMs + this.randomJitter() + if (!target) { + const delay = (isFinite(minWait) ? minWait : 100) + this.randomJitter() this.nextWakeUpAt = now + delay - this.timer = setTimeout(tryRun, delay) + this.scheduleQueueItemRun({ delay }) return } - const item = this.queue.shift()! + const item = this.queue.shift() - if (item.opts.signal?.aborted) { + if (item.signal?.aborted) { item.reject(new DOMException('Aborted', 'AbortError')) - this.pump() + this.scheduleQueueItemRun() return } item.started = true - if (item.opts.signal && item.abortHandler) { - item.opts.signal.removeEventListener('abort', item.abortHandler) + if (item.signal && item.abortHandler) { + item.signal.removeEventListener('abort', item.abortHandler) item.abortHandler = undefined } this.active++ Promise.resolve() - .then(() => item.run(item.opts.signal)) - .then(item.resolve, item.reject) - .finally(() => { - this.active-- - this.pump() + .then(() => item.run({ signal: item.signal, target })) + .then(result => { + this.releaseTask({ isToScheduleTick: true }) + + item.resolve(result) }) + .catch(error => this.onFailInMultiMode({ error, item, target })) if (this.active < this.options.maxConcurrent) { - this.timer = setTimeout(tryRun, 0) + this.scheduleQueueItemRun() } + + return + } + + let waitMs = 0 + let allowed = false + + try { + const res = await this.options.store.tryReserve( + this.options.key, + this.options.rules + ) + + this.storeErrorCount = 0 + + allowed = res.allowed + waitMs = res.waitMs + } catch (error) { + this.storeErrorCount++ + + /** + * If the store failed 10 times it means it is not working for some + * reason, in this case we can reject all the requests that are in + * the queue. + */ + if (this.storeErrorCount > 10) { + while (this.queue.length) { + this.queue.shift().reject(error) + } + + throw error + } + + allowed = false + waitMs = 100 } - this.timer = setTimeout(tryRun, 0) + if (!allowed) { + const delay = waitMs + this.randomJitter() + + this.nextWakeUpAt = now + delay + this.scheduleQueueItemRun({ delay }) + + return + } + + const item = this.queue.shift() + + if (item.signal?.aborted) { + item.reject(new DOMException('Aborted', 'AbortError')) + + this.scheduleQueueItemRun() + + return + } + + item.started = true + + if (item.signal && item.abortHandler) { + item.signal.removeEventListener('abort', item.abortHandler) + item.abortHandler = undefined + } + + this.active++ + + Promise.resolve() + .then(() => item.run({ signal: item.signal })) + .then(result => { + this.releaseTask({ isToScheduleTick: true }) + + item.resolve(result) + }) + .catch(error => this.onFailInSingleMode({ error, item })) + + if (this.active < this.options.maxConcurrent) { + this.scheduleQueueItemRun() + } } /** - * Get a random jitter or return 0 if user has not - * defined one. + * Schedule to run another queue item. */ - private randomJitter(): number { - if (!this.options.jitterMs) { - return 0 + private scheduleQueueItemRun = (options?: { delay?: number }) => { + options = Options.create(options, { + delay: 0 + }) + + if (this.timer) { + return } - return Math.floor(Math.random() * this.options.jitterMs) + const fire = async () => { + this.timer = null + + await this.tryToRunQueueItem() + } + + this.timer = setTimeout(fire, options.delay) + } + + /** + * Closure that deals with all the errors that happens when when running + * RateLimiter in single-mode. + */ + private onFailInSingleMode = async (options: { + error: Error + item: QueueItem + }) => { + if (!this.options.retryStrategy) { + this.releaseTask({ isToScheduleTick: true }) + + options.item.reject(options.error) + + return + } + + const ctx: RateLimitRetryCtx = { + error: options.error, + key: this.options.key, + attempt: options.item.attempt + } + + const decision = await this.options.retryStrategy(ctx) + const cooldown = Math.max(0, decision.currentTargetCooldownMs ?? 0) + + if (cooldown > 0) { + await this.options + .store!.setCooldown(this.options.key, cooldown) + .catch(() => { + debug('failed to set cooldown in cache for key %s', this.options.key) + }) + } + + switch (decision.type) { + case 'retry_other': + case 'retry_same': + this.releaseTask({ isToScheduleTick: false }) + + options.item.attempt++ + options.item.started = false + options.item.avoidTargetId = undefined + options.item.pinnedTargetId = undefined + + if (options.item.signal?.aborted) { + options.item.reject(new DOMException('Aborted', 'AbortError')) + + break + } + + this.queue.unshift(options.item) + + const delay = cooldown + + this.nextWakeUpAt = Date.now() + cooldown + this.scheduleQueueItemRun({ delay }) + + return + + default: + this.releaseTask({ isToScheduleTick: true }) + + options.item.reject(options.error) + } + } + + /** + * Closure that deals with all the errors that happens when when running + * RateLimiter in multi-mode. + */ + private onFailInMultiMode = async (options: { + error: Error + item: QueueItem + target: RateLimitTarget + }) => { + if (!this.options.retryStrategy) { + this.releaseTask({ isToScheduleTick: true }) + + options.item.reject(options.error) + + return + } + + const key = this.createTargetKey(options.target) + + const ctx: RateLimitRetryCtx = { + key, + error: options.error, + signal: options.item.signal, + attempt: options.item.attempt, + target: options.target + } + + const decision = await this.options.retryStrategy(ctx) + const cooldown = Math.max(0, decision.currentTargetCooldownMs ?? 0) + + if (cooldown > 0) { + await this.options.store!.setCooldown(key, cooldown).catch(() => { + debug('failed to set cooldown in cache for key %s', key) + }) + } + + switch (decision.type) { + case 'retry_same': + this.releaseTask({ isToScheduleTick: false }) + + options.item.attempt++ + options.item.started = false + options.item.avoidTargetId = undefined + options.item.pinnedTargetId = options.target.id + + if (options.item.signal?.aborted) { + options.item.reject(new DOMException('Aborted', 'AbortError')) + + break + } + + this.queue.unshift(options.item) + + const delay = cooldown + + this.nextWakeUpAt = Date.now() + delay + this.scheduleQueueItemRun({ delay }) + + return + + case 'retry_other': + this.releaseTask({ isToScheduleTick: false }) + + options.item.attempt++ + options.item.started = false + options.item.avoidTargetId = options.target.id + options.item.pinnedTargetId = undefined + + if (options.item.signal?.aborted) { + options.item.reject(new DOMException('Aborted', 'AbortError')) + + break + } + + this.queue.unshift(options.item) + + this.nextWakeUpAt = Date.now() + this.scheduleQueueItemRun() + + return + + default: + this.releaseTask({ isToScheduleTick: true }) + + options.item.reject(options.error) + } } } diff --git a/src/ratelimiter/stores/MemoryStore.ts b/src/ratelimiter/stores/MemoryStore.ts deleted file mode 100644 index a35b606..0000000 --- a/src/ratelimiter/stores/MemoryStore.ts +++ /dev/null @@ -1,91 +0,0 @@ -/** - * @athenna/ratelimiter - * - * (c) João Lenon - * - * For the full copyright and license information, please view the LICENSE - * file that was distributed with this source code. - */ - -import { debug } from '#src/debug' -import { Cache } from '@athenna/cache' -import { Options } from '@athenna/common' -import { RateLimitStore } from '#src/ratelimiter/stores/RateLimitStore' -import type { Reserve, RateLimitRule, RateLimitStoreOptions } from '#src/types' - -export class MemoryStore extends RateLimitStore { - public constructor(options: RateLimitStoreOptions = {}) { - options = Options.create(options, { - store: 'memory' - }) - - super(options) - } - - private async getOrInit(key: string, rules: RateLimitRule[]) { - const cache = Cache.store(this.options.store) - - let buckets = await cache.get(key) - - if (!buckets) { - buckets = JSON.stringify(rules.map(() => [])) - - await cache.set(key, buckets) - } - - return JSON.parse(buckets) as number[][] - } - - /** - * Try to reserve a token for all rules of the key. If not - * allowed to reserve, return the maximum waitMs necessary. - */ - public async tryReserve(key: string, rules: RateLimitRule[]) { - debug( - 'running memory store tryReserve for key %s with rules %o', - key, - rules - ) - - let wait = 0 - const now = Date.now() - const buckets = await this.getOrInit(key, rules) - - for (let i = 0; i < rules.length; i++) { - const bucket = buckets[i] - const window = this.options.windowMs[rules[i].type] - - while (bucket.length && bucket[0] <= now - window) { - bucket.shift() - } - - if (bucket.length >= rules[i].limit) { - const earliest = bucket[0] - const rem = earliest + window - now - - if (rem > wait) { - wait = rem - } - } - } - - const reserve: Reserve = { allowed: false, waitMs: wait } - - if (wait > 0) { - await Cache.store(this.options.store).set(key, JSON.stringify(buckets)) - - return reserve - } - - for (let i = 0; i < rules.length; i++) { - buckets[i].push(now) - } - - await Cache.store(this.options.store).set(key, JSON.stringify(buckets)) - - reserve.waitMs = 0 - reserve.allowed = true - - return reserve - } -} diff --git a/src/ratelimiter/stores/RateLimitStore.ts b/src/ratelimiter/stores/RateLimitStore.ts deleted file mode 100644 index 24982d3..0000000 --- a/src/ratelimiter/stores/RateLimitStore.ts +++ /dev/null @@ -1,34 +0,0 @@ -/** - * @athenna/ratelimiter - * - * (c) João Lenon - * - * For the full copyright and license information, please view the LICENSE - * file that was distributed with this source code. - */ - -import { WINDOW_MS } from '#src/constants/window' -import type { Reserve, RateLimitRule, RateLimitStoreOptions } from '#src/types' - -export abstract class RateLimitStore { - /** - * Holds the options that will be used to build the rate limiter - * store. - */ - public options: RateLimitStoreOptions - - public constructor(options: RateLimitStoreOptions = {}) { - options.windowMs = options.windowMs ?? WINDOW_MS - - this.options = options - } - - /** - * Try to reserve a token para all rules of the key. - * If allowed is false, return the maximum waitMs necessary. - */ - public abstract tryReserve( - key: string, - rules: RateLimitRule[] - ): Promise -} diff --git a/src/ratelimiter/stores/RedisStore.ts b/src/ratelimiter/stores/RedisStore.ts deleted file mode 100644 index 4a46a24..0000000 --- a/src/ratelimiter/stores/RedisStore.ts +++ /dev/null @@ -1,87 +0,0 @@ -/** - * @athenna/ratelimiter - * - * (c) João Lenon - * - * For the full copyright and license information, please view the LICENSE - * file that was distributed with this source code. - */ - -import { debug } from '#src/debug' -import { Cache } from '@athenna/cache' -import { Options } from '@athenna/common' -import { RateLimitStore } from '#src/ratelimiter/stores/RateLimitStore' -import type { Reserve, RateLimitRule, RateLimitStoreOptions } from '#src/types' - -export class RedisStore extends RateLimitStore { - public constructor(options: RateLimitStoreOptions = {}) { - options = Options.create(options, { - store: 'redis' - }) - - super(options) - } - - private async getOrInit(key: string, rules: RateLimitRule[]) { - const cache = Cache.store(this.options.store) - - let buckets = await cache.get(key) - - if (!buckets) { - buckets = JSON.stringify(rules.map(() => [])) - - await cache.set(key, buckets) - } - - return JSON.parse(buckets) as number[][] - } - - /** - * Try to reserve a token for all rules of the key. If not - * allowed to reserve, return the maximum waitMs necessary. - */ - public async tryReserve(key: string, rules: RateLimitRule[]) { - debug('running redis store tryReserve for key %s with rules %o', key, rules) - - let wait = 0 - const now = Date.now() - const buckets = await this.getOrInit(key, rules) - - for (let i = 0; i < rules.length; i++) { - const bucket = buckets[i] - const window = this.options.windowMs[rules[i].type] - - while (bucket.length && bucket[0] <= now - window) { - bucket.shift() - } - - if (bucket.length >= rules[i].limit) { - const earliest = bucket[0] - const rem = earliest + window - now - - if (rem > wait) { - wait = rem - } - } - } - - const reserve: Reserve = { allowed: false, waitMs: wait } - - if (wait > 0) { - await Cache.store(this.options.store).set(key, JSON.stringify(buckets)) - - return reserve - } - - for (let i = 0; i < rules.length; i++) { - buckets[i].push(now) - } - - await Cache.store(this.options.store).set(key, JSON.stringify(buckets)) - - reserve.waitMs = 0 - reserve.allowed = true - - return reserve - } -} diff --git a/src/types/QueueItem.ts b/src/types/QueueItem.ts index 122993d..e8bf1d9 100644 --- a/src/types/QueueItem.ts +++ b/src/types/QueueItem.ts @@ -7,13 +7,16 @@ * file that was distributed with this source code. */ -import type { ScheduleOptions } from '#src/types' +import type { RateLimitScheduleCtx } from '#src/types' export type QueueItem = { - run: (signal?: AbortSignal) => T | Promise + run: (ctx: RateLimitScheduleCtx) => T | Promise resolve: (v: T) => void reject: (e: unknown) => void abortHandler?: () => void started: boolean - opts: ScheduleOptions + signal?: AbortSignal + attempt?: number + avoidTargetId?: string + pinnedTargetId?: string } diff --git a/src/types/RateLimitRetryClosure.ts b/src/types/RateLimitRetryClosure.ts new file mode 100644 index 0000000..dae49a5 --- /dev/null +++ b/src/types/RateLimitRetryClosure.ts @@ -0,0 +1,14 @@ +/** + * @athenna/ratelimiter + * + * (c) João Lenon + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +import type { RateLimitRetryCtx, RateLimitRetryDecision } from '#src/types' + +export type RateLimitRetryClosure = ( + ctx: RateLimitRetryCtx +) => RateLimitRetryDecision | Promise diff --git a/src/types/RateLimitRetryCtx.ts b/src/types/RateLimitRetryCtx.ts new file mode 100644 index 0000000..7e091ec --- /dev/null +++ b/src/types/RateLimitRetryCtx.ts @@ -0,0 +1,37 @@ +/** + * @athenna/ratelimiter + * + * (c) João Lenon + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +import type { RateLimitTarget } from '#src/types' + +export type RateLimitRetryCtx = { + /** + * The error that has happened while trying to make the request. + */ + error: Error + + /** + * The abort signal to abort the entire process when needed. + */ + signal?: AbortSignal + + /** + * The cache key that was used to store the rate limit rules. + */ + key: string + + /** + * Define the number of attempts that have run so far. + */ + attempt: number + + /** + * The target that this retry is currently using. + */ + target?: RateLimitTarget +} diff --git a/src/types/RateLimitRetryDecision.ts b/src/types/RateLimitRetryDecision.ts new file mode 100644 index 0000000..8e21a7d --- /dev/null +++ b/src/types/RateLimitRetryDecision.ts @@ -0,0 +1,60 @@ +/** + * @athenna/ratelimiter + * + * (c) João Lenon + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +export type RateLimitRetryDecision = + | { + /** + * Decide that RateLimtiter should fail the request. Use this when + * you have already retried as much as possible and you want RateLimiter + * to throw the exception. + */ + type: 'fail' + + /** + * Define for how long time your target will be blocked from usage. + * This is a global state that will be respected by your store when + * defining if it's allowed to run with that target or not. + */ + currentTargetCooldownMs?: number + } + | { + /** + * Decide that your next try should be with the same target. + * Returning `retry_same` will basically avoid your RateLimiter + * from using any other target until you decide something else. + * + * This decision works when using single targets or multiple. + */ + type: 'retry_same' + + /** + * Define for how long time your target will be blocked from usage. + * This is a global state that will be respected by your store when + * defining if it's allowed to run with that target or not. + */ + currentTargetCooldownMs?: number + } + | { + /** + * Decide that your next try should be with another target. + * Returning `retry_other` will basically avoid your RateLimiter + * from using the last target until you decide something else. + * + * This decision only takes effect when using multiple targets, + * If using none or only one, it will use `retry_same` by default. + */ + type: 'retry_other' + + /** + * Define for how long time your target will be blocked from usage. + * This is a global state that will be respected by your store when + * defining if it's allowed to run with that target or not. + */ + currentTargetCooldownMs?: number + } diff --git a/src/types/RateLimitScheduleCtx.ts b/src/types/RateLimitScheduleCtx.ts new file mode 100644 index 0000000..8af8ef9 --- /dev/null +++ b/src/types/RateLimitScheduleCtx.ts @@ -0,0 +1,22 @@ +/** + * @athenna/ratelimiter + * + * (c) João Lenon + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +import type { RateLimitTarget } from '#src/types' + +export type RateLimitScheduleCtx = { + /** + * The abort signal to abort the entire process when needed. + */ + signal?: AbortSignal + + /** + * The target that this retry is currently using. + */ + target?: RateLimitTarget +} diff --git a/src/types/RateLimitTarget.ts b/src/types/RateLimitTarget.ts new file mode 100644 index 0000000..a540775 --- /dev/null +++ b/src/types/RateLimitTarget.ts @@ -0,0 +1,34 @@ +/** + * @athenna/ratelimiter + * + * (c) João Lenon + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +import type { RateLimitRule } from '#src/types' + +export type RateLimitTarget = { + /** + * The rate limit target ID. By default this will be created by creating + * a hash from the target metadata object, but you can also define your + * own ID. + */ + id?: string + + /** + * Define all the metadata for this target to function. Metadata + * is required because we are going to create a hash from this object + * to store the rules inside the cache by Target. With this + * implementation you can create not only API rotations but also API + * Keys rotations at the same time. + */ + metadata: Record + + /** + * Custom rate limit rules for this target. If not defined, + * the default defined in RateLimiter will be used. + */ + rules?: RateLimitRule[] +} diff --git a/src/types/RateLimiterOptions.ts b/src/types/RateLimiterOptions.ts index 01ca715..239f5ea 100644 --- a/src/types/RateLimiterOptions.ts +++ b/src/types/RateLimiterOptions.ts @@ -7,8 +7,12 @@ * file that was distributed with this source code. */ -import type { RateLimitRule } from '#src/types' -import type { RateLimitStore } from '#src/ratelimiter/stores/RateLimitStore' +import type { + RateLimitRule, + RateLimitTarget, + RateLimitRetryClosure +} from '#src/types' +import type { RateLimitStore } from '#src/ratelimiter/RateLimitStore' export type RateLimiterOptions = { /** @@ -18,9 +22,24 @@ export type RateLimiterOptions = { /** * The logical key that will be used by store to save buckets. + * If API Targers are defined, it will be used as a prefix from + * a hash created from targets metadata object: `${key}:${hash}`. */ key?: string + /** + * The api targets that will be used to create API rotations when + * some of them fails. + */ + targets?: RateLimitTarget[] + + /** + * The retry strategy for this rate limiter. This is useful to + * give the power to the user when and how we should proceed with + * the retry of failed executions. + */ + retryStrategy?: RateLimitRetryClosure + /** * The store responsible to save the rate limit buckets. */ @@ -28,11 +47,23 @@ export type RateLimiterOptions = { /** * Max number of tasks that could run concurrently. + * + * @default 1 */ maxConcurrent?: number /** * Random jitter in milliseconds to avoid thundering herd in distributed envs. + * + * @default 0 */ jitterMs?: number + + /** + * Define the selection strategy that will be used to select which target + * will be used next when some of them fails. + * + * @default 'first_available' + */ + targetSelectionStrategy: 'first_available' | 'round_robin' } diff --git a/src/types/ScheduleOptions.ts b/src/types/ScheduleOptions.ts index bcb35ea..b767f9d 100644 --- a/src/types/ScheduleOptions.ts +++ b/src/types/ScheduleOptions.ts @@ -8,6 +8,5 @@ */ export type ScheduleOptions = { - id?: string signal?: AbortSignal } diff --git a/src/types/index.ts b/src/types/index.ts index 7e07fef..cbd312a 100644 --- a/src/types/index.ts +++ b/src/types/index.ts @@ -11,5 +11,10 @@ export * from '#src/types/Reserve' export * from '#src/types/QueueItem' export * from '#src/types/RateLimitRule' export * from '#src/types/ScheduleOptions' +export * from '#src/types/RateLimitTarget' +export * from '#src/types/RateLimitRetryCtx' export * from '#src/types/RateLimiterOptions' +export * from '#src/types/RateLimitScheduleCtx' +export * from '#src/types/RateLimitRetryClosure' export * from '#src/types/RateLimitStoreOptions' +export * from '#src/types/RateLimitRetryDecision' diff --git a/tests/fixtures/config/cache.ts b/tests/fixtures/config/cache.ts index 1dbcef2..e260491 100644 --- a/tests/fixtures/config/cache.ts +++ b/tests/fixtures/config/cache.ts @@ -1,5 +1,5 @@ /** - * @athenna/queue + * @athenna/ratelimiter * * (c) João Lenon * diff --git a/tests/unit/ratelimiter/RateLimiterBuilderTest.ts b/tests/unit/ratelimiter/RateLimiterBuilderTest.ts index bfb7da3..803bc4f 100644 --- a/tests/unit/ratelimiter/RateLimiterBuilderTest.ts +++ b/tests/unit/ratelimiter/RateLimiterBuilderTest.ts @@ -1,6 +1,15 @@ +/** + * @athenna/ratelimiter + * + * (c) João Lenon + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +import { RateLimiter, RateLimitStore } from '#src' import { Path, Sleep } from '@athenna/common' -import { CacheProvider } from '@athenna/cache' -import { MemoryStore, RateLimiter } from '#src' +import { Cache, CacheProvider } from '@athenna/cache' import { AfterEach, BeforeEach, Test, type Context } from '@athenna/test' import { MissingKeyException } from '#src/exceptions/MissingKeyException' import { MissingRuleException } from '#src/exceptions/MissingRuleException' @@ -16,6 +25,8 @@ export class RateLimiterBuilderTest { @AfterEach() public async afterEach() { + await Cache.store('memory').truncate() + Config.clear() ioc.reconstruct() } @@ -81,7 +92,7 @@ export class RateLimiterBuilderTest { assert.throws( () => RateLimiter.build() - .store(new MemoryStore()) + .store('memory') .addRule({ type: 'second', limit: 1 }) .schedule(() => {}), MissingKeyException @@ -94,7 +105,7 @@ export class RateLimiterBuilderTest { () => RateLimiter.build() .key('request:api-key:/profile') - .store(new MemoryStore()) + .store('memory') .schedule(() => {}), MissingRuleException ) @@ -115,7 +126,7 @@ export class RateLimiterBuilderTest { @Test() public async shouldBeAbleToBuildARateLimiterWithARuleOfOneRequestPerSecond({ assert }: Context) { const limiter = RateLimiter.build() - .store(new MemoryStore({ windowMs: { second: 100 } })) + .store('memory', { windowMs: { second: 100 } }) .key('request:api-key:/profile') .addRule({ type: 'second', limit: 1 }) @@ -132,10 +143,30 @@ export class RateLimiterBuilderTest { assert.isAtLeast(Date.now() - dateStart, 400) } + @Test() + public async shouldBeAbleToBuildARateLimiterWithSettingMultipleRules({ assert }: Context) { + const limiter = RateLimiter.build() + .store('memory', { windowMs: { second: 100 } }) + .key('request:api-key:/profile') + .setRules([{ type: 'second', limit: 1 }]) + + const promises = [] + const dateStart = Date.now() + const numberOfRequests = 5 + + for (let i = 0; i < numberOfRequests; i++) { + promises.push(limiter.schedule(() => 'ok' + i)) + } + + await Promise.all(promises) + + assert.isAtLeast(Date.now() - dateStart, 400) + } + @Test() public async shouldBeAbleToHaveErrorsHappeningInsideTheRateLimiterHandler({ assert }: Context) { const limiter = RateLimiter.build() - .store(new MemoryStore({ windowMs: { second: 100 } })) + .store('memory', { windowMs: { second: 100 } }) .key('request:api-key:/profile') .addRule({ type: 'second', limit: 1 }) @@ -149,7 +180,7 @@ export class RateLimiterBuilderTest { @Test() public async shouldBeAbleToBuildARateLimiterWithARuleOfOneRequestPerMinute({ assert }: Context) { const limiter = RateLimiter.build() - .store(new MemoryStore({ windowMs: { minute: 100 } })) + .store('memory', { windowMs: { minute: 100 } }) .key('request:api-key:/profile') .addRule({ type: 'minute', limit: 1 }) @@ -169,7 +200,7 @@ export class RateLimiterBuilderTest { @Test() public async shouldBeAbleToBuildARateLimiterWithARuleOfOneRequestPerHour({ assert }: Context) { const limiter = RateLimiter.build() - .store(new MemoryStore({ windowMs: { hour: 100 } })) + .store('memory', { windowMs: { hour: 100 } }) .key('request:api-key:/profile') .addRule({ type: 'hour', limit: 1 }) @@ -189,7 +220,7 @@ export class RateLimiterBuilderTest { @Test() public async shouldBeAbleToBuildARateLimiterWithARuleOfOneRequestPerDay({ assert }: Context) { const limiter = RateLimiter.build() - .store(new MemoryStore({ windowMs: { day: 100 } })) + .store('memory', { windowMs: { day: 100 } }) .key('request:api-key:/profile') .addRule({ type: 'day', limit: 1 }) @@ -209,7 +240,7 @@ export class RateLimiterBuilderTest { @Test() public async shouldBeAbleToBuildARateLimiterWithARuleOfOneRequestPerMonth({ assert }: Context) { const limiter = RateLimiter.build() - .store(new MemoryStore({ windowMs: { month: 100 } })) + .store('memory', { windowMs: { month: 100 } }) .key('request:api-key:/profile') .addRule({ type: 'month', limit: 1 }) @@ -229,7 +260,7 @@ export class RateLimiterBuilderTest { @Test() public async shouldBeAbleToBuildARateLimiterWithSecondAndMinutesRules({ assert }: Context) { const limiter = RateLimiter.build() - .store(new MemoryStore({ windowMs: { second: 300, minute: 400 } })) + .store('memory', { windowMs: { second: 300, minute: 400 } }) .key('request:api-key:/profile') .addRule({ type: 'second', limit: 1 }) .addRule({ type: 'minute', limit: 2 }) @@ -258,17 +289,17 @@ export class RateLimiterBuilderTest { /** * Third one only after releasing the 300ms window (rule minute) */ - assert.isAtLeast(starts[2] - starts[1], 300) - assert.isAtLeast(starts[3] - starts[2], 200) - assert.isAtLeast(starts[4] - starts[3], 300) - assert.isAtLeast(starts[5] - starts[4], 200) - assert.isAtLeast(total, 800) + assert.isAtLeast(starts[2] - starts[1], 299) + assert.isAtLeast(starts[3] - starts[2], 199) + assert.isAtLeast(starts[4] - starts[3], 299) + assert.isAtLeast(starts[5] - starts[4], 199) + assert.isAtLeast(total, 799) } @Test() public async shouldBeAbleToCleanTheLimiterExecutionByTruncatingIt({ assert }: Context) { const limiter = RateLimiter.build() - .store(new MemoryStore({ windowMs: { second: 100 } })) + .store('memory', { windowMs: { second: 100 } }) .key('request:api-key:/profile') .addRule({ type: 'second', limit: 1 }) @@ -278,7 +309,7 @@ export class RateLimiterBuilderTest { limiter.schedule(() => 'ok' + i) } - limiter.truncate() + await limiter.truncate() assert.equal(limiter.getActiveCount(), 0) assert.equal(limiter.getQueuedCount(), 0) @@ -287,7 +318,7 @@ export class RateLimiterBuilderTest { @Test() public async shouldBeAbleToGetTheQueuedCountOfTheLimiter({ assert }: Context) { const limiter = RateLimiter.build() - .store(new MemoryStore({ windowMs: { second: 100 } })) + .store('memory', { windowMs: { second: 100 } }) .key('request:api-key:/profile') .addRule({ type: 'second', limit: 1 }) @@ -299,14 +330,12 @@ export class RateLimiterBuilderTest { } assert.equal(limiter.getQueuedCount(), 5) - - limiter.truncate() } @Test() public async shouldBeAbleToGetTheActiveCountOfTheLimiter({ assert }: Context) { const limiter = RateLimiter.build() - .store(new MemoryStore({ windowMs: { second: 100 } })) + .store('memory', { windowMs: { second: 100 } }) .key('request:api-key:/profile') .addRule({ type: 'second', limit: 1 }) @@ -338,7 +367,7 @@ export class RateLimiterBuilderTest { @Test() public async shouldBeAbleToGetTheActiveCountOfTheLimiterWithConcurrentRequests({ assert }: Context) { const limiter = RateLimiter.build() - .store(new MemoryStore({ windowMs: { second: 100 } })) + .store('memory', { windowMs: { second: 100 } }) .key('request:api-key:/profile') .jitterMs(0) .maxConcurrent(5) @@ -374,7 +403,7 @@ export class RateLimiterBuilderTest { @Test() public async shouldBeAbleToGetWhenTheRateLimiterWillBeAvailable({ assert }: Context) { const limiter = RateLimiter.build() - .store(new MemoryStore({ windowMs: { second: 100 } })) + .store('memory', { windowMs: { second: 100 } }) .key('request:api-key:/profile') .jitterMs(0) .maxConcurrent(10) @@ -383,12 +412,14 @@ export class RateLimiterBuilderTest { limiter.schedule(() => new Promise(() => {})) limiter.schedule(() => 'ok') - await Sleep.for(0).milliseconds().wait() + await this.waitUntil(() => limiter.getAvailableInMs() > 0, 10, 200) + + const availableInMs = limiter.getAvailableInMs() - assert.isAtLeast(limiter.getAvailableInMs(), 60) - assert.isAtMost(limiter.getAvailableInMs(), 120) + assert.isAtLeast(availableInMs, 50) + assert.isAtMost(availableInMs, 120) - await this.waitUntil(() => limiter.getAvailableInMs() === 0, 5, 800) + await this.waitUntil(() => limiter.getAvailableInMs() === 0, 10, 300) assert.equal(limiter.getAvailableInMs(), 0) } @@ -396,7 +427,7 @@ export class RateLimiterBuilderTest { @Test() public async shouldBeAbleToAbortARateLimiterTaskThatIsEnqueuedUsingAnAbortController({ assert }: Context) { const limiter = RateLimiter.build() - .store(new MemoryStore({ windowMs: { second: 100 } })) + .store('memory', { windowMs: { second: 100 } }) .key('request:api-key:/profile') .addRule({ type: 'second', limit: 1 }) @@ -414,7 +445,7 @@ export class RateLimiterBuilderTest { abortController.abort('testing') await Sleep.for(30).milliseconds().wait() - assert.equal(limiter.getQueuedCount(), 2) + assert.equal(limiter.getQueuedCount(), 1) barrier.release() @@ -428,7 +459,7 @@ export class RateLimiterBuilderTest { @Test() public async shouldNotBeAbleToCancelAlreadyStartedRateLimiterTask({ assert }: Context) { const limiter = RateLimiter.build() - .store(new MemoryStore({ windowMs: { second: 100 } })) + .store('memory', { windowMs: { second: 100 } }) .key('request:api-key:/profile') .addRule({ type: 'second', limit: 1 }) @@ -462,7 +493,7 @@ export class RateLimiterBuilderTest { @Test() public async shouldBeAbleToCancelAlreadyStartedRateLimiterTaskIfUserUsesTheAbortController({ assert }: Context) { const limiter = RateLimiter.build() - .store(new MemoryStore({ windowMs: { second: 100 } })) + .store('memory', { windowMs: { second: 100 } }) .key('request:api-key:/profile') .addRule({ type: 'second', limit: 1 }) @@ -470,10 +501,10 @@ export class RateLimiterBuilderTest { const abortController = new AbortController() const p = limiter.schedule( - async signal => { + async ctx => { started = true - await this.cancellableSleep(1_000, signal) + await this.cancellableSleep(1_000, ctx.signal) return 'ok' }, @@ -502,7 +533,7 @@ export class RateLimiterBuilderTest { @Test() public async shouldBeAbleToAbortARateLimiterTaskBeforeItEvenStarts({ assert }: Context) { const limiter = RateLimiter.build() - .store(new MemoryStore({ windowMs: { second: 100 } })) + .store('memory', { windowMs: { second: 100 } }) .key('request:api-key:/profile') .addRule({ type: 'second', limit: 1 }) @@ -518,7 +549,7 @@ export class RateLimiterBuilderTest { @Test() public async shouldNotBeAbleToAbortTheTaskIfItHasAlreadyStartedRunning({ assert }: Context) { const limiter = RateLimiter.build() - .store(new MemoryStore({ windowMs: { second: 100 } })) + .store('memory', { windowMs: { second: 100 } }) .key('request:api-key:/profile') .addRule({ type: 'second', limit: 1 }) @@ -532,4 +563,541 @@ export class RateLimiterBuilderTest { await assert.doesNotReject(() => p) } + + @Test() + public async shouldBeAbleToBuildARateLimiterWithARuleOfOneRequestPerSecondWithAnTarget({ assert }: Context) { + assert.plan(6) + + const limiter = RateLimiter.build() + .store('memory', { windowMs: { second: 100 } }) + .key('request:api-key:/profile') + .addRule({ type: 'second', limit: 1 }) + .addTarget({ metadata: { baseUrl: 'http://api1.com' } }) + + const promises = [] + const dateStart = Date.now() + const numberOfRequests = 5 + + for (let i = 0; i < numberOfRequests; i++) { + promises.push( + limiter.schedule(({ target }) => { + assert.isDefined(target) + + return 'ok' + i + }) + ) + } + + await Promise.all(promises) + + assert.isAtLeast(Date.now() - dateStart, 400) + } + + @Test() + public async shouldBeAbleToBuildARateLimiterWithARuleOfOneRequestPerSecondSettingMultipleTargets({ + assert + }: Context) { + assert.plan(6) + + const limiter = RateLimiter.build() + .store('memory', { windowMs: { second: 100 } }) + .key('request:api-key:/profile') + .addRule({ type: 'second', limit: 1 }) + .setTargets([{ metadata: { baseUrl: 'http://api1.com' } }]) + + const promises = [] + const dateStart = Date.now() + const numberOfRequests = 5 + + for (let i = 0; i < numberOfRequests; i++) { + promises.push( + limiter.schedule(({ target }) => { + assert.isDefined(target) + + return 'ok' + i + }) + ) + } + + await Promise.all(promises) + + assert.isAtLeast(Date.now() - dateStart, 400) + } + + @Test() + public async shouldBeAbleToHaveErrorsHappeningInsideTheRateLimiterHandlerEvenWithAnTargetSet({ assert }: Context) { + const limiter = RateLimiter.build() + .store('memory', { windowMs: { second: 100 } }) + .key('request:api-key:/profile') + .addRule({ type: 'second', limit: 1 }) + .addTarget({ metadata: { baseUrl: 'http://api1.com' } }) + + await assert.rejects(() => { + return limiter.schedule(() => { + throw new Error('failed') + }) + }) + } + + @Test() + public async shouldBeAbleToTryWithTheSecondTargetIfTheFirstTargetIsAtFullCapacityInASequentialScenario({ + assert + }: Context) { + const api1 = { metadata: { baseUrl: 'http://api1.com' } } + const api2 = { metadata: { baseUrl: 'http://api2.com' } } + + const limiter = RateLimiter.build() + .store('memory', { windowMs: { second: 100 } }) + .key('request:api-key:/profile') + .addRule({ type: 'second', limit: 1 }) + .addTarget(api1) + .addTarget(api2) + + const store = new RateLimitStore({ store: 'memory', windowMs: { second: 100 } }) + + await store.setCooldown(limiter.createTargetKey(api1), 1000) + + const result = await limiter.schedule(({ target }) => { + return target.metadata.baseUrl + }) + + assert.deepEqual(result, 'http://api2.com') + } + + @Test() + public async shouldBeAbleToTryWithTheSecondTargetIfTheFirstTargetIsAtFullCapacityInAConcurrentScenario({ + assert + }: Context) { + const api1 = { metadata: { baseUrl: 'http://api1.com' } } + const api2 = { metadata: { baseUrl: 'http://api2.com' } } + + const limiter = RateLimiter.build() + .maxConcurrent(2) + .store('memory', { windowMs: { second: 100 } }) + .key('request:api-key:/profile') + .addRule({ type: 'second', limit: 1 }) + .addTarget(api1) + .addTarget(api2) + + const store = new RateLimitStore({ store: 'memory', windowMs: { second: 100 } }) + + await store.setCooldown(limiter.createTargetKey(api1), 1000) + + const used: string[] = [] + const barrier = this.createBarrier() + + const run = async ({ target }) => { + used.push(target.metadata.baseUrl) + + await barrier.wait() + + return target.metadata.baseUrl + } + + const p1 = limiter.schedule(run) + const p2 = limiter.schedule(run) + + await Sleep.for(105).milliseconds().wait() + + used.sort() + + assert.deepEqual(used, ['http://api2.com', 'http://api2.com']) + + barrier.release() + + const results = await Promise.all([p1, p2]) + + results.sort() + + assert.deepEqual(results, ['http://api2.com', 'http://api2.com']) + } + + @Test() + public async shouldBeAbleToTryWithTheSecondTargetIfTheFirstTargetIsAtFullCapacityInASequentialScenarioWithRoundRobinStrategy({ + assert + }: Context) { + const api1 = { id: 'api1', metadata: { baseUrl: 'http://api1.com' } } + const api2 = { id: 'api2', metadata: { baseUrl: 'http://api2.com' } } + + const limiter = RateLimiter.build() + .store('memory', { windowMs: { second: 100 } }) + .key('request:api-key:/profile') + .targetSelectionStrategy('round_robin') + .addRule({ type: 'second', limit: 1 }) + .addTarget(api1) + .addTarget(api2) + + const store = new RateLimitStore({ store: 'memory', windowMs: { second: 100 } }) + + await store.setCooldown(limiter.createTargetKey(api1), 1000) + + const result = await limiter.schedule(({ target }) => { + return target.metadata.baseUrl + }) + + assert.deepEqual(result, 'http://api2.com') + } + + @Test() + public async shouldBeAbleToTryWithTheSecondTargetIfTheFirstTargetIsAtFullCapacityInAConcurrentScenarioWithRoundRobinStrategy({ + assert + }: Context) { + const api1 = { id: 'api1', metadata: { baseUrl: 'http://api1.com' } } + const api2 = { id: 'api2', metadata: { baseUrl: 'http://api2.com' } } + + const limiter = RateLimiter.build() + .maxConcurrent(2) + .store('memory', { windowMs: { second: 100 } }) + .key('request:api-key:/profile') + .targetSelectionStrategy('round_robin') + .addRule({ type: 'second', limit: 1 }) + .addTarget(api1) + .addTarget(api2) + + const store = new RateLimitStore({ store: 'memory', windowMs: { second: 100 } }) + + await store.setCooldown(limiter.createTargetKey(api1), 1000) + + const used: string[] = [] + const barrier = this.createBarrier() + + const run = async ({ target }) => { + used.push(target.metadata.baseUrl) + + await barrier.wait() + + return target.metadata.baseUrl + } + + const p1 = limiter.schedule(run) + const p2 = limiter.schedule(run) + + await Sleep.for(105).milliseconds().wait() + + used.sort() + + assert.deepEqual(used, ['http://api2.com', 'http://api2.com']) + + barrier.release() + + const results = await Promise.all([p1, p2]) + + results.sort() + + assert.deepEqual(results, ['http://api2.com', 'http://api2.com']) + } + + @Test() + public async shouldThrowMissingRuleExceptionIfRateLimiterRulesAndTargetRulesAreNotDefined({ assert }: Context) { + const limiter = RateLimiter.build() + .store('memory', { windowMs: { second: 100 } }) + .key('request:api-key:/profile') + .addTarget({ id: 't1', metadata: { baseUrl: 'http://api1.com' } }) + + await assert.rejects(() => limiter.schedule(() => {}), MissingRuleException) + } + + @Test() + public async shouldNotThrowMissingRuleExceptionIfRateLimiterRulesAreNotDefinedButTargetRulesAreDefined({ + assert + }: Context) { + const limiter = RateLimiter.build() + .store('memory', { windowMs: { second: 100 } }) + .key('request:api-key:/profile') + .addTarget({ id: 't1', rules: [{ type: 'second', limit: 1 }], metadata: { baseUrl: 'http://api1.com' } }) + + await assert.doesNotReject(() => limiter.schedule(() => {}), MissingRuleException) + } + + @Test() + public async shouldBeAbleToBuildARateLimiterDefiningRulesInTarget({ assert }: Context) { + assert.plan(6) + + const limiter = RateLimiter.build() + .store('memory', { windowMs: { second: 100 } }) + .key('request:api-key:/profile') + .addTarget({ rules: [{ type: 'second', limit: 1 }], metadata: { baseUrl: 'http://api0.com' } }) + + const promises = [] + const dateStart = Date.now() + const numberOfRequests = 5 + + for (let i = 0; i < numberOfRequests; i++) { + promises.push( + limiter.schedule(({ target }) => { + assert.isDefined(target) + + return 'ok' + i + }) + ) + } + + await Promise.all(promises) + + assert.isAtLeast(Date.now() - dateStart, 400) + } + + @Test() + public async shouldNotBeAbleToRetryRequestWithoutARetryStrategy({ assert }: Context) { + const limiter = RateLimiter.build() + .store('memory', { windowMs: { second: 100 } }) + .key('request:api-key:/profile') + .addRule({ type: 'second', limit: 1 }) + + await assert.rejects(() => + limiter.schedule(({ target }) => { + throw new Error(target.metadata.baseUrl) + }) + ) + } + + @Test() + public async shouldAlwaysFailTheRequestIfRetryStrategyDecideItShouldFail({ assert }: Context) { + const limiter = RateLimiter.build() + .store('memory', { windowMs: { second: 100 } }) + .key('request:api-key:/profile') + .addRule({ type: 'second', limit: 1 }) + .retryStrategy(() => { + return { type: 'fail' } + }) + + await assert.rejects(() => + limiter.schedule(() => { + throw new Error('fail') + }) + ) + } + + @Test() + public async shouldAlwaysRetryTheRequestIfRetryStrategyDecideItShouldRetry({ assert }: Context) { + const limiter = RateLimiter.build() + .key('request:api-key:/profile') + .store('memory', { windowMs: { second: 100 } }) + .addRule({ type: 'second', limit: 1 }) + .retryStrategy(({ attempt }) => { + if (attempt === 1) { + return { type: 'retry_same' } + } + + return { type: 'fail' } + }) + + let retriedForCount = 0 + let isFirstRequest = true + + await limiter.schedule(() => { + retriedForCount++ + + if (isFirstRequest) { + isFirstRequest = false + + throw new Error('fail') + } + }) + + assert.deepEqual(retriedForCount, 2) + } + + @Test() + public async shouldAlwaysCooldownAndFailTheRequestIfRetryStrategyDecideItShouldCooldownAndFail({ assert }: Context) { + const limiter = RateLimiter.build() + .key('request:api-key:/profile') + .store('memory', { windowMs: { second: 100 } }) + .addRule({ type: 'second', limit: 1 }) + .retryStrategy(({ attempt }) => { + if (attempt === 1) { + return { type: 'retry_same', currentTargetCooldownMs: 100 } + } + + return { type: 'fail', currentTargetCooldownMs: 100 } + }) + + let retriedForCount = 0 + + await assert.rejects(() => + limiter.schedule(() => { + retriedForCount++ + throw new Error('fail') + }) + ) + + assert.deepEqual(retriedForCount, 2) + } + + @Test() + public async shouldAlwaysCooldownAndRetryTheRequestIfRetryStrategyDecideItShouldCooldownAndRetry({ + assert + }: Context) { + const limiter = RateLimiter.build() + .key('request:api-key:/profile') + .store('memory', { windowMs: { second: 100 } }) + .addRule({ type: 'second', limit: 1 }) + .retryStrategy(({ attempt }) => { + if (attempt === 1) { + return { type: 'retry_same', currentTargetCooldownMs: 100 } + } + + return { type: 'fail' } + }) + + let retriedForCount = 0 + let isFirstRequest = true + + await limiter.schedule(() => { + retriedForCount++ + + if (isFirstRequest) { + isFirstRequest = false + + throw new Error('fail') + } + }) + + assert.deepEqual(retriedForCount, 2) + } + + @Test() + public async shouldAlwaysRetryTheRequestWithTheSameTargetIfRetryStrategyDecideItShouldRetryWithTheSame({ + assert + }: Context) { + const limiter = RateLimiter.build() + .key('request:api-key:/profile') + .store('memory', { windowMs: { second: 100 } }) + .addTarget({ rules: [{ type: 'second', limit: 1 }], metadata: { baseUrl: 'http://api0.com' } }) + .addTarget({ rules: [{ type: 'second', limit: 1 }], metadata: { baseUrl: 'http://api1.com' } }) + .retryStrategy(() => { + return { type: 'retry_same' } + }) + + const targetUsed = [] + let isFirstRequest = true + + await limiter.schedule(({ target }) => { + targetUsed.push(target.metadata.baseUrl) + + if (isFirstRequest) { + isFirstRequest = false + + throw new Error('fail') + } + }) + + assert.deepEqual(targetUsed, ['http://api0.com', 'http://api0.com']) + } + + @Test() + public async shouldAlwaysRetryTheRequestWithTheOtherTargetIfRetryStrategyDecideItShouldRetryWithOther({ + assert + }: Context) { + const limiter = RateLimiter.build() + .key('request:api-key:/profile') + .store('memory', { windowMs: { second: 100 } }) + .addTarget({ rules: [{ type: 'second', limit: 1 }], metadata: { baseUrl: 'http://api0.com' } }) + .addTarget({ rules: [{ type: 'second', limit: 1 }], metadata: { baseUrl: 'http://api1.com' } }) + .retryStrategy(() => { + return { type: 'retry_other' } + }) + + const targetUsed = [] + let isFirstRequest = true + + await limiter.schedule(({ target }) => { + targetUsed.push(target.metadata.baseUrl) + + if (isFirstRequest) { + isFirstRequest = false + + throw new Error('fail') + } + }) + + assert.deepEqual(targetUsed, ['http://api0.com', 'http://api1.com']) + } + + @Test() + public async shouldAlwaysCooldownAndFailTheRequestWithTargetIfRetryStrategyDecideItShouldCooldownAnFail({ + assert + }: Context) { + const limiter = RateLimiter.build() + .key('request:api-key:/profile') + .store('memory', { windowMs: { second: 100 } }) + .addTarget({ rules: [{ type: 'second', limit: 1 }], metadata: { baseUrl: 'http://api0.com' } }) + .addTarget({ rules: [{ type: 'second', limit: 1 }], metadata: { baseUrl: 'http://api1.com' } }) + .retryStrategy(({ attempt }) => { + if (attempt === 1) { + return { type: 'retry_other', currentTargetCooldownMs: 100 } + } + + return { type: 'fail', currentTargetCooldownMs: 100 } + }) + + let retriedForCount = 0 + + await assert.rejects(() => + limiter.schedule(() => { + retriedForCount++ + throw new Error('fail') + }) + ) + + assert.deepEqual(retriedForCount, 2) + } + + @Test() + public async shouldAlwaysCooldownAndRetryTheRequestWithOtherTargetIfRetryStrategyDecideItShouldCooldownAndTryWithOther({ + assert + }: Context) { + const limiter = RateLimiter.build() + .key('request:api-key:/profile') + .store('memory', { windowMs: { second: 100 } }) + .addTarget({ rules: [{ type: 'second', limit: 1 }], metadata: { baseUrl: 'http://api0.com' } }) + .addTarget({ rules: [{ type: 'second', limit: 1 }], metadata: { baseUrl: 'http://api1.com' } }) + .retryStrategy(() => { + return { type: 'retry_other', currentTargetCooldownMs: 100 } + }) + + const targetUsed = [] + let isFirstRequest = true + + await limiter.schedule(({ target }) => { + targetUsed.push(target.metadata.baseUrl) + + if (isFirstRequest) { + isFirstRequest = false + + throw new Error('fail') + } + }) + + assert.deepEqual(targetUsed, ['http://api0.com', 'http://api1.com']) + } + + @Test() + public async shouldAlwaysCooldownAndRetryTheRequestWithTheSameTargetIfRetryStrategyDecideItShouldCooldownAndTryWithSame({ + assert + }: Context) { + const limiter = RateLimiter.build() + .key('request:api-key:/profile') + .store('memory', { windowMs: { second: 100 } }) + .addTarget({ rules: [{ type: 'second', limit: 1 }], metadata: { baseUrl: 'http://api0.com' } }) + .addTarget({ rules: [{ type: 'second', limit: 1 }], metadata: { baseUrl: 'http://api1.com' } }) + .retryStrategy(() => { + return { type: 'retry_same', currentTargetCooldownMs: 100 } + }) + + const targetUsed = [] + let isFirstRequest = true + + await limiter.schedule(({ target }) => { + targetUsed.push(target.metadata.baseUrl) + + if (isFirstRequest) { + isFirstRequest = false + + throw new Error('fail') + } + }) + + assert.deepEqual(targetUsed, ['http://api0.com', 'http://api0.com']) + } }