Skip to content

Commit 3cee117

Browse files
authored
feat: add WorkflowRetryAfterError error (#144)
* feat: add WorkflowRetryAfterError error * fix: update zod import path to the latest version * feat: export WorkflowTool from agents/adapters
1 parent 16e5702 commit 3cee117

File tree

9 files changed

+128
-12
lines changed

9 files changed

+128
-12
lines changed

src/error.ts

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import { QstashError } from "@upstash/qstash";
2-
import type { FailureFunctionPayload, Step } from "./types";
2+
import type { Duration, FailureFunctionPayload, Step } from "./types";
33

44
/**
55
* Error raised during Workflow execution
@@ -56,6 +56,20 @@ export class WorkflowNonRetryableError extends WorkflowAbort {
5656
}
5757
}
5858

59+
export class WorkflowRetryAfterError extends WorkflowAbort {
60+
public retryAfter: number | Duration;
61+
/**
62+
* @param retryAfter time in seconds after which the workflow should be retried
63+
* @param message error message to be displayed
64+
*/
65+
constructor(message: string, retryAfter: number | Duration) {
66+
super("retry", undefined, false);
67+
this.name = "WorkflowRetryAfterError";
68+
this.retryAfter = retryAfter;
69+
if (message) this.message = message;
70+
}
71+
}
72+
5973
/**
6074
* Formats an unknown error to match the FailureFunctionPayload format
6175
*

src/index.ts

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,5 +4,10 @@ export * from "./types";
44
export * from "./client/types";
55
export * from "./logger";
66
export * from "./client";
7-
export { WorkflowError, WorkflowAbort, WorkflowNonRetryableError } from "./error";
7+
export {
8+
WorkflowError,
9+
WorkflowAbort,
10+
WorkflowNonRetryableError,
11+
WorkflowRetryAfterError,
12+
} from "./error";
813
export { WorkflowTool } from "./agents/adapters";

src/serve/authorization.ts

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,11 @@
11
import type { Err, Ok } from "neverthrow";
22
import { err, ok } from "neverthrow";
3-
import { WorkflowAbort, WorkflowNonRetryableError } from "../error";
3+
import {
4+
isInstanceOf,
5+
WorkflowAbort,
6+
WorkflowNonRetryableError,
7+
WorkflowRetryAfterError,
8+
} from "../error";
49
import { RouteFunction } from "../types";
510
import { WorkflowContext } from "../context";
611
import { BaseLazyStep } from "../context/steps";
@@ -94,8 +99,9 @@ export class DisabledWorkflowContext<
9499
await routeFunction(disabledContext);
95100
} catch (error) {
96101
if (
97-
(error instanceof WorkflowAbort && error.stepName === this.disabledMessage) ||
98-
error instanceof WorkflowNonRetryableError
102+
(isInstanceOf(error, WorkflowAbort) && error.stepName === this.disabledMessage) ||
103+
isInstanceOf(error, WorkflowNonRetryableError) ||
104+
isInstanceOf(error, WorkflowRetryAfterError)
99105
) {
100106
return ok("step-found");
101107
}

src/serve/index.ts

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,12 @@ import {
77
WORKFLOW_PROTOCOL_VERSION_HEADER,
88
} from "../constants";
99
import { WorkflowContext } from "../context";
10-
import { formatWorkflowError, isInstanceOf, WorkflowNonRetryableError } from "../error";
10+
import {
11+
formatWorkflowError,
12+
isInstanceOf,
13+
WorkflowNonRetryableError,
14+
WorkflowRetryAfterError,
15+
} from "../error";
1116
import { WorkflowLogger } from "../logger";
1217
import {
1318
ExclusiveValidationOptions,
@@ -241,6 +246,13 @@ export const serveBase = <
241246
});
242247
}
243248

249+
if (result.isOk() && isInstanceOf(result.value, WorkflowRetryAfterError)) {
250+
return onStepFinish(workflowRunId, result.value, {
251+
condition: "retry-after-error",
252+
result: result.value,
253+
});
254+
}
255+
244256
if (result.isErr()) {
245257
// error while running the workflow or when cleaning up
246258
await debug?.log("ERROR", "ERROR", { error: result.error.message });

src/serve/options.ts

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,14 @@ export const processOptions = <TResponse extends Response = Response, TInitialPa
7373
},
7474
status: 489,
7575
}) as TResponse;
76+
} else if (detailedFinishCondition?.condition === "retry-after-error") {
77+
return new Response(JSON.stringify(formatWorkflowError(detailedFinishCondition.result)), {
78+
headers: {
79+
"Retry-After": detailedFinishCondition.result.retryAfter.toString(),
80+
[WORKFLOW_PROTOCOL_VERSION_HEADER]: WORKFLOW_PROTOCOL_VERSION,
81+
},
82+
status: 429,
83+
}) as TResponse;
7684
} else if (detailedFinishCondition?.condition === "failure-callback") {
7785
return new Response(
7886
JSON.stringify({ result: detailedFinishCondition.result ?? undefined }),

src/serve/serve.test.ts

Lines changed: 36 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ import {
2727
import { AUTH_FAIL_MESSAGE, processOptions } from "./options";
2828
import { WorkflowLogger } from "../logger";
2929
import { z } from "zod";
30-
import { WorkflowNonRetryableError } from "../error";
30+
import { WorkflowNonRetryableError, WorkflowRetryAfterError } from "../error";
3131

3232
const someWork = (input: string) => {
3333
return `processed '${input}'`;
@@ -913,6 +913,41 @@ describe("serve", () => {
913913
expect(called).toBeTrue();
914914
});
915915

916+
test("should call qstash to retry workflow on WorkflowRetryAfterError", async () => {
917+
const request = getRequest(WORKFLOW_ENDPOINT, "wfr-foo-3", "my-payload", []);
918+
let called = false;
919+
const { handler: endpoint } = serve(
920+
async (context) => {
921+
called = true;
922+
throw new WorkflowRetryAfterError("This is a retry-after error", 30);
923+
},
924+
{
925+
qstashClient,
926+
receiver: undefined,
927+
verbose: true,
928+
}
929+
);
930+
931+
await mockQStashServer({
932+
execute: async () => {
933+
const response = await endpoint(request);
934+
935+
expect(response.status).toBe(429);
936+
expect(response.headers.get("Retry-After")).toBe("30");
937+
938+
const body = await response.json();
939+
expect(body).toEqual({
940+
error: "WorkflowRetryAfterError",
941+
message: "This is a retry-after error",
942+
stack: expect.any(String),
943+
});
944+
},
945+
responseFields: { body: undefined, status: 429 },
946+
receivesRequest: false,
947+
});
948+
expect(called).toBeTrue();
949+
});
950+
916951
test("should send waitForEvent", async () => {
917952
const request = getRequest(WORKFLOW_ENDPOINT, "wfr-bar", "my-payload", []);
918953
const { handler: endpoint } = serve(

src/types.ts

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ import type { HTTPMethods } from "@upstash/qstash";
44
import type { WorkflowContext } from "./context";
55
import type { WorkflowLogger } from "./logger";
66
import { z } from "zod";
7-
import { WorkflowNonRetryableError } from "./error";
7+
import { WorkflowNonRetryableError, WorkflowRetryAfterError } from "./error";
88

99
/**
1010
* Interface for Client with required methods
@@ -145,6 +145,10 @@ export type DetailedFinishCondition =
145145
condition: "non-retryable-error";
146146
result: WorkflowNonRetryableError;
147147
}
148+
| {
149+
condition: "retry-after-error";
150+
result: WorkflowRetryAfterError;
151+
}
148152
| {
149153
condition: "failure-callback";
150154
result: string | void;

src/workflow-requests.test.ts

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ import {
99
triggerRouteFunction,
1010
triggerWorkflowDelete,
1111
} from "./workflow-requests";
12-
import { WorkflowAbort, WorkflowNonRetryableError } from "./error";
12+
import { WorkflowAbort, WorkflowNonRetryableError, WorkflowRetryAfterError } from "./error";
1313
import { WorkflowContext } from "./context";
1414
import { Client } from "@upstash/qstash";
1515
import { Client as WorkflowClient } from "./client";
@@ -263,6 +263,25 @@ describe("Workflow Requests", () => {
263263
expect(result.value).toBeInstanceOf(WorkflowNonRetryableError);
264264
});
265265

266+
test("should retry workflow and return ok if WorkflowRetryAfterError is thrown", async () => {
267+
const result = await triggerRouteFunction({
268+
onStep: async () => {
269+
throw new WorkflowRetryAfterError("This is a retry-after error", 5);
270+
},
271+
onCleanup: async () => {
272+
throw new Error("shouldn't call");
273+
},
274+
onCancel: async () => {
275+
throw new Error("shouldn't call");
276+
},
277+
});
278+
expect(result.isOk()).toBeTrue();
279+
// @ts-expect-error value will be set since result isOk
280+
expect(result.value).toBeInstanceOf(WorkflowRetryAfterError);
281+
// @ts-expect-error value will be set since result isOk
282+
expect(result.value.retryAfter).toBe(5);
283+
});
284+
266285
test("should call onCancel if context.cancel is called inside context.run", async () => {
267286
const workflowRunId = nanoid();
268287
const token = "myToken";

src/workflow-requests.ts

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,12 @@
11
import type { Err, Ok } from "neverthrow";
22
import { err, ok } from "neverthrow";
3-
import { isInstanceOf, WorkflowAbort, WorkflowError, WorkflowNonRetryableError } from "./error";
3+
import {
4+
isInstanceOf,
5+
WorkflowAbort,
6+
WorkflowError,
7+
WorkflowNonRetryableError,
8+
WorkflowRetryAfterError,
9+
} from "./error";
410
import type { WorkflowContext } from "./context";
511
import {
612
TELEMETRY_HEADER_FRAMEWORK,
@@ -164,7 +170,11 @@ export const triggerRouteFunction = async ({
164170
debug?: WorkflowLogger;
165171
}): Promise<
166172
| Ok<
167-
"workflow-finished" | "step-finished" | "workflow-was-finished" | WorkflowNonRetryableError,
173+
| "workflow-finished"
174+
| "step-finished"
175+
| "workflow-was-finished"
176+
| WorkflowNonRetryableError
177+
| WorkflowRetryAfterError,
168178
never
169179
>
170180
| Err<never, Error>
@@ -185,7 +195,10 @@ export const triggerRouteFunction = async ({
185195
errorMessage: error.message,
186196
});
187197
return ok("workflow-was-finished");
188-
} else if (isInstanceOf(error_, WorkflowNonRetryableError)) {
198+
} else if (
199+
isInstanceOf(error_, WorkflowNonRetryableError) ||
200+
isInstanceOf(error_, WorkflowRetryAfterError)
201+
) {
189202
return ok(error_);
190203
} else if (!isInstanceOf(error_, WorkflowAbort)) {
191204
return err(error_);

0 commit comments

Comments
 (0)