diff --git a/CHANGELOG.md b/CHANGELOG.md index bb77b2d31..b33033fcc 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,7 +7,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/) ## Unreleased ### Added - +- Added missing block detection and recovery in the indexer.[#488](https://github.com/proto-kit/framework/pull/488) - `@dependencyFactory` for static dependency factory type safety - Added Mempool sorting [#395](https://github.com/proto-kit/framework/pull/395) - Introduced dynamic block building and JIT transaction fetching [#394](https://github.com/proto-kit/framework/pull/394) diff --git a/packages/indexer/src/IndexerNotifier.ts b/packages/indexer/src/IndexerNotifier.ts index 93a33921c..642d9d6af 100644 --- a/packages/indexer/src/IndexerNotifier.ts +++ b/packages/indexer/src/IndexerNotifier.ts @@ -1,5 +1,6 @@ import { BlockTriggerBase, + BlockStorage, Sequencer, sequencerModule, SequencerModule, @@ -9,10 +10,10 @@ import { PrivateMempool, SettlementModule, } from "@proto-kit/sequencer"; -import { log } from "@proto-kit/common"; +import { filterNonUndefined, log } from "@proto-kit/common"; import { inject } from "tsyringe"; -import { IndexBlockTask } from "./tasks/IndexBlockTask"; +import { IndexBlockTask, IndexBlockResult } from "./tasks/IndexBlockTask"; import { IndexPendingTxTask } from "./tasks/IndexPendingTxTask"; import { IndexSettlementTask } from "./tasks/IndexSettlementTask"; import { IndexBatchTask } from "./tasks/IndexBatchTask"; @@ -30,6 +31,8 @@ export class IndexerNotifier extends SequencerModule> { public sequencer: Sequencer, @inject("TaskQueue") public taskQueue: TaskQueue, + @inject("BlockStorage") + private readonly blockStorage: BlockStorage, public indexBlockTask: IndexBlockTask, public indexPendingTxTask: IndexPendingTxTask, public indexBatchTask: IndexBatchTask, @@ -39,6 +42,65 @@ export class IndexerNotifier extends SequencerModule> { super(); } + private async pushTask( + queueName: string, + name: string, + payload: string + ): Promise { + const queue = await this.taskQueue.getQueue(queueName); + await queue.addTask({ + name, + payload, + flowId: "", + sequencerId: this.sequencerIdProvider.getSequencerId(), + }); + } + + private async handleIndexBlockTaskCompleted( + payload: TaskPayload + ): Promise { + if (payload.name !== this.indexBlockTask.name) { + return; + } + + try { + // eslint-disable-next-line @typescript-eslint/consistent-type-assertions + const result = JSON.parse(payload.payload) as IndexBlockResult; + + if ( + result.status !== "missing-blocks" || + result.missingHeights.length === 0 + ) { + return; + } + + const blocks = await Promise.all( + result.missingHeights.map((h) => + this.blockStorage.getBlockWithResultAt(h) + ) + ); + + const filteredBlocks = blocks.filter(filterNonUndefined); + + if (filteredBlocks.length === 0) { + log.warn("No blocks found to re-send"); + return; + } + + const serialized = await this.indexBlockTask + .inputSerializer() + .toJSON(filteredBlocks); + + await this.pushTask( + this.indexBlockTask.name, + this.indexBlockTask.name, + serialized + ); + } catch (error) { + log.error("Failed to handle block task completion result", error); + } + } + public async propagateEventsAsTasks() { const queue = await this.taskQueue.getQueue(this.indexBlockTask.name); const inputSerializer = this.indexBlockTask.inputSerializer(); @@ -47,86 +109,62 @@ export class IndexerNotifier extends SequencerModule> { const settlementInputSerializer = this.indexSettlementTask.inputSerializer(); + await queue.onCompleted( + async (payload) => await this.handleIndexBlockTaskCompleted(payload) + ); + this.sequencer.events.on("block-metadata-produced", async (block) => { log.debug( "Notifiying the indexer about block", block.block.height.toBigInt() ); - const payload = await inputSerializer.toJSON(block); - const sequencerId = this.sequencerIdProvider.getSequencerId(); - - const task: TaskPayload = { - name: this.indexBlockTask.name, - payload, - flowId: "", // empty for now - sequencerId, - }; - - await queue.addTask(task); + const payload = await inputSerializer.toJSON([block]); + await this.pushTask( + this.indexBlockTask.name, + this.indexBlockTask.name, + payload + ); }); + this.sequencer.events.on("mempool-transaction-added", async (tx) => { try { - const txQueue = await this.taskQueue.getQueue( - this.indexPendingTxTask.name - ); const payload = await txInputSerializer.toJSON(tx); - const sequencerId = this.sequencerIdProvider.getSequencerId(); - - const task: TaskPayload = { - name: this.indexPendingTxTask.name, - payload, - flowId: "", - sequencerId, - }; - - await txQueue.addTask(task); + await this.pushTask( + this.indexPendingTxTask.name, + this.indexPendingTxTask.name, + payload + ); } catch (err) { log.error("Failed to add pending-tx task", err); } }); + this.sequencer.events.on("batch-produced", async (batch) => { log.debug("Notifiying the indexer about batch", batch?.height); try { - const batchQueue = await this.taskQueue.getQueue( - this.indexBatchTask.name - ); - const payload = await batchInputSerializer.toJSON(batch); - const sequencerId = this.sequencerIdProvider.getSequencerId(); - - const task: TaskPayload = { - name: this.indexBatchTask.name, - payload, - flowId: "", - sequencerId, - }; - - await batchQueue.addTask(task); + await this.pushTask( + this.indexBatchTask.name, + this.indexBatchTask.name, + payload + ); } catch (err) { log.error(`Failed to index batch ${batch?.height} ${err}`); } }); + this.sequencer.events.on("settlement-submitted", async (settlement) => { log.debug( - "Notifiying the indexer about settlement", + "Notifying the indexer about settlement", settlement.transactionHash ); try { - const settlementQueue = await this.taskQueue.getQueue( - this.indexSettlementTask.name - ); - const payload = await settlementInputSerializer.toJSON(settlement); - const sequencerId = this.sequencerIdProvider.getSequencerId(); - - const task: TaskPayload = { - name: this.indexSettlementTask.name, - payload, - flowId: "", - sequencerId, - }; - - await settlementQueue.addTask(task); + await this.pushTask( + this.indexSettlementTask.name, + this.indexSettlementTask.name, + payload + ); } catch (err) { log.error( `Failed to add index settlement: ${settlement.transactionHash} ${err}` diff --git a/packages/indexer/src/tasks/IndexBlockTask.ts b/packages/indexer/src/tasks/IndexBlockTask.ts index 96d92677c..cb0e3e530 100644 --- a/packages/indexer/src/tasks/IndexBlockTask.ts +++ b/packages/indexer/src/tasks/IndexBlockTask.ts @@ -1,5 +1,6 @@ import { BlockQueue, + BlockStorage, Task, TaskSerializer, TaskWorkerModule, @@ -12,17 +13,26 @@ import { IndexBlockTaskParametersSerializer, } from "./IndexBlockTaskParameters"; +export type IndexBlockResult = + | { status: "ok" } + | { + status: "missing-blocks"; + missingHeights: number[]; + }; + @injectable() export class IndexBlockTask extends TaskWorkerModule - implements Task + implements Task { public name = "index-block"; public constructor( public taskSerializer: IndexBlockTaskParametersSerializer, @inject("BlockQueue") - public blockStorage: BlockQueue + public blockStorage: BlockQueue, + @inject("BlockStorage") + private readonly blockRepository: BlockStorage ) { super(); } @@ -31,28 +41,57 @@ export class IndexBlockTask public async prepare(): Promise {} public async compute( - input: IndexBlockTaskParameters - ): Promise { + input: IndexBlockTaskParameters[] + ): Promise { + const firstBlockHeight = Number(input[0].block.height.toBigInt()); + try { - await this.blockStorage.pushBlock(input.block); - await this.blockStorage.pushResult(input.result); + const currentHeight = await this.blockRepository.getCurrentBlockHeight(); + + if (firstBlockHeight > currentHeight) { + const missingHeights = Array.from( + { length: firstBlockHeight - currentHeight + 1 }, + (_, i) => currentHeight + i + ); + return { status: "missing-blocks", missingHeights }; + } + + for (const blockWithResult of input) { + const height = Number(blockWithResult.block.height.toBigInt()); + // eslint-disable-next-line no-await-in-loop + await this.blockStorage.pushBlock(blockWithResult.block); + // eslint-disable-next-line no-await-in-loop + await this.blockStorage.pushResult(blockWithResult.result); + log.info(`Block ${height} indexed successfully`); + } + + return { status: "ok" }; } catch (error) { - log.error("Failed to index block", input.block.height.toBigInt(), error); - return undefined; + log.error("Failed to index block", firstBlockHeight, error); + return { status: "ok" }; } - - log.info(`Block ${input.block.height.toBigInt()} indexed sucessfully`); - return ""; } - public inputSerializer(): TaskSerializer { - return this.taskSerializer; + public inputSerializer(): TaskSerializer { + return { + toJSON: (blocks: IndexBlockTaskParameters[]): string => + JSON.stringify(blocks.map((b) => this.taskSerializer.toJSON(b))), + + fromJSON: (json: string): IndexBlockTaskParameters[] => { + // eslint-disable-next-line @typescript-eslint/consistent-type-assertions + const items = JSON.parse(json) as string[]; + return items.map((item) => this.taskSerializer.fromJSON(item)); + }, + }; } - public resultSerializer(): TaskSerializer { + public resultSerializer(): TaskSerializer { return { - fromJSON: async () => {}, - toJSON: async () => "", + toJSON: async (input: IndexBlockResult) => JSON.stringify(input), + + fromJSON: async (json: string) => + // eslint-disable-next-line @typescript-eslint/consistent-type-assertions + JSON.parse(json) as IndexBlockResult, }; } } diff --git a/packages/indexer/test/IndexBlockTask.test.ts b/packages/indexer/test/IndexBlockTask.test.ts index abdc583c0..10c3194a6 100644 --- a/packages/indexer/test/IndexBlockTask.test.ts +++ b/packages/indexer/test/IndexBlockTask.test.ts @@ -35,7 +35,7 @@ describe("IndexBlockTask", () => { const queue = await taskQueue.getQueue(indexBlockTask.name); const block = BlockWithResult.createEmpty(); - const payload = await indexBlockTask.inputSerializer().toJSON(block); + const payload = await indexBlockTask.inputSerializer().toJSON([block]); const task: TaskPayload = { name: indexBlockTask.name, diff --git a/packages/persistance/src/services/prisma/PrismaBlockStorage.ts b/packages/persistance/src/services/prisma/PrismaBlockStorage.ts index f3146642e..7ac3461b4 100644 --- a/packages/persistance/src/services/prisma/PrismaBlockStorage.ts +++ b/packages/persistance/src/services/prisma/PrismaBlockStorage.ts @@ -72,6 +72,16 @@ export class PrismaBlockStorage implements BlockQueue, BlockStorage { return (await this.getBlockByQuery({ height }))?.block; } + public async getBlockWithResultAt( + height: number + ): Promise { + const data = await this.getBlockByQuery({ height }); + if (data === undefined || data.result === undefined) { + return undefined; + } + return { block: data.block, result: data.result }; + } + public async getBlock(hash: string): Promise { return (await this.getBlockByQuery({ hash }))?.block; } diff --git a/packages/sequencer/src/storage/inmemory/InMemoryBlockStorage.ts b/packages/sequencer/src/storage/inmemory/InMemoryBlockStorage.ts index 5c5306e7b..4cc1c537f 100644 --- a/packages/sequencer/src/storage/inmemory/InMemoryBlockStorage.ts +++ b/packages/sequencer/src/storage/inmemory/InMemoryBlockStorage.ts @@ -23,6 +23,17 @@ export class InMemoryBlockStorage implements BlockStorage, BlockQueue { return this.blocks.at(height); } + public async getBlockWithResultAt( + height: number + ): Promise { + const block = this.blocks.at(height); + const result = this.results.at(height); + if (block === undefined || result === undefined) { + return undefined; + } + return { block, result }; + } + public async getCurrentBlockHeight(): Promise { return this.blocks.length; } diff --git a/packages/sequencer/src/storage/repositories/BlockStorage.ts b/packages/sequencer/src/storage/repositories/BlockStorage.ts index 7ec488754..d1246cc9d 100644 --- a/packages/sequencer/src/storage/repositories/BlockStorage.ts +++ b/packages/sequencer/src/storage/repositories/BlockStorage.ts @@ -19,5 +19,8 @@ export interface BlockStorage { pushBlock: (block: Block) => Promise; getBlockAt: (height: number) => Promise; + getBlockWithResultAt: ( + height: number + ) => Promise; getBlock: (hash: string) => Promise; }