diff --git a/src/server/index.ts b/src/server/index.ts index 80ec1099..74509dda 100644 --- a/src/server/index.ts +++ b/src/server/index.ts @@ -18,7 +18,6 @@ import { withOpenApi } from "./middleware/open-api"; import { withPrometheus } from "./middleware/prometheus"; import { withRateLimit } from "./middleware/rate-limit"; import { withSecurityHeaders } from "./middleware/security-headers"; -import { withWebSocket } from "./middleware/websocket"; import { withRoutes } from "./routes"; import { writeOpenApiToFile } from "./utils/openapi"; @@ -81,7 +80,6 @@ export const initServer = async () => { withPrometheus(server); // Register routes - await withWebSocket(server); await withAuth(server); await withOpenApi(server); await withRoutes(server); diff --git a/src/server/routes/index.ts b/src/server/routes/index.ts index 4a74394e..1150458d 100644 --- a/src/server/routes/index.ts +++ b/src/server/routes/index.ts @@ -104,7 +104,10 @@ import { getAllTransactions } from "./transaction/get-all"; import { getAllDeployedContracts } from "./transaction/get-all-deployed-contracts"; import { retryTransaction } from "./transaction/retry"; import { retryFailedTransactionRoute } from "./transaction/retry-failed"; -import { checkTxStatus } from "./transaction/status"; +import { + getTransactionStatusQueryParamRoute, + getTransactionStatusRoute, +} from "./transaction/status"; import { syncRetryTransactionRoute } from "./transaction/sync-retry"; import { createWebhookRoute } from "./webhooks/create"; import { getWebhooksEventTypes } from "./webhooks/events"; @@ -239,7 +242,8 @@ export async function withRoutes(fastify: FastifyInstance) { // Transactions await fastify.register(getAllTransactions); - await fastify.register(checkTxStatus); + await fastify.register(getTransactionStatusRoute); + await fastify.register(getTransactionStatusQueryParamRoute); await fastify.register(getAllDeployedContracts); await fastify.register(retryTransaction); await fastify.register(syncRetryTransactionRoute); diff --git a/src/server/routes/transaction/status.ts b/src/server/routes/transaction/status.ts index 60dcb18b..d18fa033 100644 --- a/src/server/routes/transaction/status.ts +++ b/src/server/routes/transaction/status.ts @@ -1,23 +1,13 @@ -import type { SocketStream } from "@fastify/websocket"; import { type Static, Type } from "@sinclair/typebox"; import type { FastifyInstance } from "fastify"; import { StatusCodes } from "http-status-codes"; import { TransactionDB } from "../../../shared/db/transactions/db"; -import { logger } from "../../../shared/utils/logger"; import { createCustomError } from "../../middleware/error"; import { standardResponseSchema } from "../../schemas/shared-api-schemas"; import { TransactionSchema, toTransactionSchema, } from "../../schemas/transaction"; -import { - findOrAddWSConnectionInSharedState, - formatSocketMessage, - getStatusMessageAndConnectionStatus, - onClose, - onError, - onMessage, -} from "../../utils/websocket"; // INPUT const requestSchema = Type.Object({ @@ -62,7 +52,7 @@ responseBodySchema.example = { }, }; -export async function checkTxStatus(fastify: FastifyInstance) { +export async function getTransactionStatusRoute(fastify: FastifyInstance) { fastify.route<{ Params: Static; Reply: Static; @@ -96,41 +86,51 @@ export async function checkTxStatus(fastify: FastifyInstance) { result: toTransactionSchema(transaction), }); }, - wsHandler: async (connection: SocketStream, request) => { - const { queueId } = request.params; + }); +} - findOrAddWSConnectionInSharedState(connection, queueId, request); +// An alterate route that accepts the queueId as a query param. +export async function getTransactionStatusQueryParamRoute( + fastify: FastifyInstance, +) { + fastify.route<{ + Querystring: Static; + Reply: Static; + }>({ + method: "GET", + url: "/transaction/status", + schema: { + summary: "Get transaction status", + description: "Get the status for a transaction request.", + tags: ["Transaction"], + operationId: "status", + querystring: requestSchema, + response: { + ...standardResponseSchema, + [StatusCodes.OK]: responseBodySchema, + }, + }, + handler: async (request, reply) => { + const { queueId } = request.query; + if (!queueId) { + throw createCustomError( + "Queue ID is required.", + StatusCodes.BAD_REQUEST, + "QUEUE_ID_REQUIRED", + ); + } const transaction = await TransactionDB.get(queueId); - const returnData = transaction ? toTransactionSchema(transaction) : null; - - const { message, closeConnection } = - await getStatusMessageAndConnectionStatus(returnData); - - connection.socket.send(await formatSocketMessage(returnData, message)); - - if (closeConnection) { - connection.socket.close(); - return; + if (!transaction) { + throw createCustomError( + "Transaction not found.", + StatusCodes.BAD_REQUEST, + "TRANSACTION_NOT_FOUND", + ); } - connection.socket.on("error", (error) => { - logger({ - service: "websocket", - level: "error", - message: "Websocket error", - error, - }); - - onError(error, connection, request); - }); - - connection.socket.on("message", async (_message, _isBinary) => { - onMessage(connection, request); - }); - - connection.socket.on("close", () => { - onClose(connection, request); + reply.status(StatusCodes.OK).send({ + result: toTransactionSchema(transaction), }); }, }); diff --git a/src/server/utils/websocket.ts b/src/server/utils/websocket.ts deleted file mode 100644 index e01f0071..00000000 --- a/src/server/utils/websocket.ts +++ /dev/null @@ -1,169 +0,0 @@ -import type { SocketStream } from "@fastify/websocket"; -import type { Static } from "@sinclair/typebox"; -import type { FastifyRequest } from "fastify"; -import { logger } from "../../shared/utils/logger"; -import type { TransactionSchema } from "../schemas/transaction"; -import { type UserSubscription, subscriptionsData } from "../schemas/websocket"; - -// websocket timeout, i.e., ws connection closed after 10 seconds -const timeoutDuration = 10 * 60 * 1000; - -export const findWSConnectionInSharedState = async ( - connection: SocketStream, - _request: FastifyRequest, -): Promise => { - const index = subscriptionsData.findIndex( - (sub) => sub.socket === connection.socket, - ); - return index; -}; - -export const removeWSFromSharedState = async ( - connection: SocketStream, - request: FastifyRequest, -): Promise => { - const index = await findWSConnectionInSharedState(connection, request); - if (index === -1) { - return -1; - } - subscriptionsData.splice(index, 1); - return index; -}; - -export const onError = async ( - error: Error, - connection: SocketStream, - request: FastifyRequest, -): Promise => { - logger({ - service: "server", - level: "error", - message: "Websocket error", - error, - }); - - const index = await findWSConnectionInSharedState(connection, request); - if (index === -1) { - return; - } - - const userSubscription = subscriptionsData[index]; - subscriptionsData.splice(index, 1); - userSubscription.socket.send( - JSON.stringify({ - result: null, - requestId: userSubscription.requestId, - status: "error", - message: error.message, - }), - ); - - connection.socket.close(); -}; - -export const onMessage = async ( - connection: SocketStream, - request: FastifyRequest, -): Promise => { - const index = await findWSConnectionInSharedState(connection, request); - const userSubscription = subscriptionsData[index]; - subscriptionsData.splice(index, 1); - userSubscription.socket.send( - JSON.stringify({ - result: null, - requestId: userSubscription.requestId, - status: "error", - message: "Do not send any message. Closing Socket... Reconnect again.", - }), - ); - userSubscription.socket.close(); -}; - -export const onClose = async ( - connection: SocketStream, - request: FastifyRequest, -): Promise => { - const index = await findWSConnectionInSharedState(connection, request); - if (index === -1) { - return; - } - subscriptionsData.splice(index, 1); -}; - -export const wsTimeout = async ( - connection: SocketStream, - queueId: string, - request: FastifyRequest, -): Promise => { - return setTimeout(() => { - connection.socket.send("Timeout exceeded. Closing connection..."); - removeWSFromSharedState(connection, request); - connection.socket.close(1000, "Session timeout"); // 1000 is a normal closure status code - - logger({ - service: "server", - level: "info", - message: `Websocket connection for ${queueId} closed due to timeout.`, - }); - }, timeoutDuration); -}; - -export const findOrAddWSConnectionInSharedState = async ( - connection: SocketStream, - queueId: string, - request: FastifyRequest, -) => { - let userSubscription: UserSubscription | undefined = undefined; - const index = await findWSConnectionInSharedState(connection, request); - if (index > -1) { - userSubscription = subscriptionsData[index]; - } else { - userSubscription = { - socket: connection.socket, - requestId: queueId, - }; - - subscriptionsData.push(userSubscription); - } -}; - -type CustomStatusAndConnectionType = { - message: string; - closeConnection: boolean; -}; - -export const getStatusMessageAndConnectionStatus = async ( - data: Static | null, -): Promise => { - let message = - "Request is queued. Waiting for transaction to be picked up by worker."; - let closeConnection = false; - - if (!data) { - message = "Transaction not found. Make sure the provided ID is correct."; - closeConnection = true; - } else if (data.status === "mined") { - message = "Transaction mined. Closing connection."; - closeConnection = true; - } else if (data.status === "errored") { - message = data.errorMessage || "Transaction errored. Closing connection."; - closeConnection = true; - } else if (data.status === "sent") { - message = - "Transaction submitted to blockchain. Waiting for transaction to be mined..."; - } - - return { message, closeConnection }; -}; - -export const formatSocketMessage = async ( - data: Static | null, - message: string, -): Promise => { - const returnData = JSON.stringify({ - result: data ? JSON.stringify(data) : undefined, - queueId: data?.queueId, - message, - }); - return returnData; -};