Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/)
## Unreleased

### Added

- Added missing block detection and recovery in the indexer.[#488](https://github.com/proto-kit/framework/pull/488)
- `@dependencyFactory` for static dependency factory type safety
- Added Mempool sorting [#395](https://github.com/proto-kit/framework/pull/395)
- Introduced dynamic block building and JIT transaction fetching [#394](https://github.com/proto-kit/framework/pull/394)
Expand Down
148 changes: 93 additions & 55 deletions packages/indexer/src/IndexerNotifier.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import {
BlockTriggerBase,
BlockStorage,
Sequencer,
sequencerModule,
SequencerModule,
Expand All @@ -9,10 +10,10 @@ import {
PrivateMempool,
SettlementModule,
} from "@proto-kit/sequencer";
import { log } from "@proto-kit/common";
import { filterNonUndefined, log } from "@proto-kit/common";
import { inject } from "tsyringe";

import { IndexBlockTask } from "./tasks/IndexBlockTask";
import { IndexBlockTask, IndexBlockResult } from "./tasks/IndexBlockTask";
import { IndexPendingTxTask } from "./tasks/IndexPendingTxTask";
import { IndexSettlementTask } from "./tasks/IndexSettlementTask";
import { IndexBatchTask } from "./tasks/IndexBatchTask";
Expand All @@ -30,6 +31,8 @@ export class IndexerNotifier extends SequencerModule<Record<never, never>> {
public sequencer: Sequencer<NotifierMandatorySequencerModules>,
@inject("TaskQueue")
public taskQueue: TaskQueue,
@inject("BlockStorage")
private readonly blockStorage: BlockStorage,
public indexBlockTask: IndexBlockTask,
public indexPendingTxTask: IndexPendingTxTask,
public indexBatchTask: IndexBatchTask,
Expand All @@ -39,6 +42,65 @@ export class IndexerNotifier extends SequencerModule<Record<never, never>> {
super();
}

private async pushTask(
queueName: string,
name: string,
payload: string
): Promise<void> {
const queue = await this.taskQueue.getQueue(queueName);
await queue.addTask({
name,
payload,
flowId: "",
sequencerId: this.sequencerIdProvider.getSequencerId(),
});
}

private async handleIndexBlockTaskCompleted(
payload: TaskPayload
): Promise<void> {
if (payload.name !== this.indexBlockTask.name) {
return;
}

try {
// eslint-disable-next-line @typescript-eslint/consistent-type-assertions
const result = JSON.parse(payload.payload) as IndexBlockResult;

if (
result.status !== "missing-blocks" ||
result.missingHeights.length === 0
) {
return;
}

const blocks = await Promise.all(
result.missingHeights.map((h) =>
this.blockStorage.getBlockWithResultAt(h)
)
);

const filteredBlocks = blocks.filter(filterNonUndefined);

if (filteredBlocks.length === 0) {
log.warn("No blocks found to re-send");
return;
}

const serialized = await this.indexBlockTask
.inputSerializer()
.toJSON(filteredBlocks);

await this.pushTask(
this.indexBlockTask.name,
this.indexBlockTask.name,
serialized
);
} catch (error) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's leave that to the queue implementation to handle errors that come from this. Generally, catching stuff early and not handling it (just emitting it is not handling it imo) lead to weird and non-recoverable behaviour.
Throwing stuff is okay, sometimes stuff just breaks down, but then the sequencer should crash and restart.
At least that's the ideal philosophy I try to approach these things nowadays. Idk I just thought I'd share this - here it's definitely not missing critical so it might be unwise to let the sequencer crash because of a peripherial system. Yeah idk let's talk about it more i guess

log.error("Failed to handle block task completion result", error);
}
}

public async propagateEventsAsTasks() {
const queue = await this.taskQueue.getQueue(this.indexBlockTask.name);
const inputSerializer = this.indexBlockTask.inputSerializer();
Expand All @@ -47,86 +109,62 @@ export class IndexerNotifier extends SequencerModule<Record<never, never>> {
const settlementInputSerializer =
this.indexSettlementTask.inputSerializer();

await queue.onCompleted(
async (payload) => await this.handleIndexBlockTaskCompleted(payload)
);

this.sequencer.events.on("block-metadata-produced", async (block) => {
log.debug(
"Notifiying the indexer about block",
block.block.height.toBigInt()
);
const payload = await inputSerializer.toJSON(block);
const sequencerId = this.sequencerIdProvider.getSequencerId();

const task: TaskPayload = {
name: this.indexBlockTask.name,
payload,
flowId: "", // empty for now
sequencerId,
};

await queue.addTask(task);
const payload = await inputSerializer.toJSON([block]);
await this.pushTask(
this.indexBlockTask.name,
this.indexBlockTask.name,
payload
);
});

this.sequencer.events.on("mempool-transaction-added", async (tx) => {
try {
const txQueue = await this.taskQueue.getQueue(
this.indexPendingTxTask.name
);
const payload = await txInputSerializer.toJSON(tx);
const sequencerId = this.sequencerIdProvider.getSequencerId();

const task: TaskPayload = {
name: this.indexPendingTxTask.name,
payload,
flowId: "",
sequencerId,
};

await txQueue.addTask(task);
await this.pushTask(
this.indexPendingTxTask.name,
this.indexPendingTxTask.name,
payload
);
} catch (err) {
log.error("Failed to add pending-tx task", err);
}
});

this.sequencer.events.on("batch-produced", async (batch) => {
log.debug("Notifiying the indexer about batch", batch?.height);
try {
const batchQueue = await this.taskQueue.getQueue(
this.indexBatchTask.name
);

const payload = await batchInputSerializer.toJSON(batch);
const sequencerId = this.sequencerIdProvider.getSequencerId();

const task: TaskPayload = {
name: this.indexBatchTask.name,
payload,
flowId: "",
sequencerId,
};

await batchQueue.addTask(task);
await this.pushTask(
this.indexBatchTask.name,
this.indexBatchTask.name,
payload
);
} catch (err) {
log.error(`Failed to index batch ${batch?.height} ${err}`);
}
});

this.sequencer.events.on("settlement-submitted", async (settlement) => {
log.debug(
"Notifiying the indexer about settlement",
"Notifying the indexer about settlement",
settlement.transactionHash
);
try {
const settlementQueue = await this.taskQueue.getQueue(
this.indexSettlementTask.name
);

const payload = await settlementInputSerializer.toJSON(settlement);
const sequencerId = this.sequencerIdProvider.getSequencerId();

const task: TaskPayload = {
name: this.indexSettlementTask.name,
payload,
flowId: "",
sequencerId,
};

await settlementQueue.addTask(task);
await this.pushTask(
this.indexSettlementTask.name,
this.indexSettlementTask.name,
payload
);
} catch (err) {
log.error(
`Failed to add index settlement: ${settlement.transactionHash} ${err}`
Expand Down
71 changes: 55 additions & 16 deletions packages/indexer/src/tasks/IndexBlockTask.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import {
BlockQueue,
BlockStorage,
Task,
TaskSerializer,
TaskWorkerModule,
Expand All @@ -12,17 +13,26 @@ import {
IndexBlockTaskParametersSerializer,
} from "./IndexBlockTaskParameters";

export type IndexBlockResult =
| { status: "ok" }
| {
status: "missing-blocks";
missingHeights: number[];
};

@injectable()
export class IndexBlockTask
extends TaskWorkerModule
implements Task<IndexBlockTaskParameters, string | void>
implements Task<IndexBlockTaskParameters[], IndexBlockResult>
{
public name = "index-block";

public constructor(
public taskSerializer: IndexBlockTaskParametersSerializer,
@inject("BlockQueue")
public blockStorage: BlockQueue
public blockStorage: BlockQueue,
@inject("BlockStorage")
private readonly blockRepository: BlockStorage
) {
super();
}
Expand All @@ -31,28 +41,57 @@ export class IndexBlockTask
public async prepare(): Promise<void> {}

public async compute(
input: IndexBlockTaskParameters
): Promise<string | void> {
input: IndexBlockTaskParameters[]
): Promise<IndexBlockResult> {
const firstBlockHeight = Number(input[0].block.height.toBigInt());

try {
await this.blockStorage.pushBlock(input.block);
await this.blockStorage.pushResult(input.result);
const currentHeight = await this.blockRepository.getCurrentBlockHeight();

if (firstBlockHeight > currentHeight) {
const missingHeights = Array.from(
{ length: firstBlockHeight - currentHeight + 1 },
(_, i) => currentHeight + i
);
return { status: "missing-blocks", missingHeights };
}

for (const blockWithResult of input) {
const height = Number(blockWithResult.block.height.toBigInt());
// eslint-disable-next-line no-await-in-loop
await this.blockStorage.pushBlock(blockWithResult.block);
// eslint-disable-next-line no-await-in-loop
await this.blockStorage.pushResult(blockWithResult.result);
log.info(`Block ${height} indexed successfully`);
}

return { status: "ok" };
} catch (error) {
log.error("Failed to index block", input.block.height.toBigInt(), error);
return undefined;
log.error("Failed to index block", firstBlockHeight, error);
return { status: "ok" };
}

log.info(`Block ${input.block.height.toBigInt()} indexed sucessfully`);
return "";
}

public inputSerializer(): TaskSerializer<IndexBlockTaskParameters> {
return this.taskSerializer;
public inputSerializer(): TaskSerializer<IndexBlockTaskParameters[]> {
return {
toJSON: (blocks: IndexBlockTaskParameters[]): string =>
JSON.stringify(blocks.map((b) => this.taskSerializer.toJSON(b))),

fromJSON: (json: string): IndexBlockTaskParameters[] => {
// eslint-disable-next-line @typescript-eslint/consistent-type-assertions
const items = JSON.parse(json) as string[];
return items.map((item) => this.taskSerializer.fromJSON(item));
},
};
}

public resultSerializer(): TaskSerializer<string | void> {
public resultSerializer(): TaskSerializer<IndexBlockResult> {
return {
fromJSON: async () => {},
toJSON: async () => "",
toJSON: async (input: IndexBlockResult) => JSON.stringify(input),

fromJSON: async (json: string) =>
// eslint-disable-next-line @typescript-eslint/consistent-type-assertions
JSON.parse(json) as IndexBlockResult,
};
}
}
2 changes: 1 addition & 1 deletion packages/indexer/test/IndexBlockTask.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ describe("IndexBlockTask", () => {
const queue = await taskQueue.getQueue(indexBlockTask.name);
const block = BlockWithResult.createEmpty();

const payload = await indexBlockTask.inputSerializer().toJSON(block);
const payload = await indexBlockTask.inputSerializer().toJSON([block]);

const task: TaskPayload = {
name: indexBlockTask.name,
Expand Down
10 changes: 10 additions & 0 deletions packages/persistance/src/services/prisma/PrismaBlockStorage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,16 @@ export class PrismaBlockStorage implements BlockQueue, BlockStorage {
return (await this.getBlockByQuery({ height }))?.block;
}

public async getBlockWithResultAt(
height: number
): Promise<BlockWithResult | undefined> {
const data = await this.getBlockByQuery({ height });
if (data === undefined || data.result === undefined) {
return undefined;
}
return { block: data.block, result: data.result };
}

public async getBlock(hash: string): Promise<Block | undefined> {
return (await this.getBlockByQuery({ hash }))?.block;
}
Expand Down
11 changes: 11 additions & 0 deletions packages/sequencer/src/storage/inmemory/InMemoryBlockStorage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,17 @@ export class InMemoryBlockStorage implements BlockStorage, BlockQueue {
return this.blocks.at(height);
}

public async getBlockWithResultAt(
height: number
): Promise<BlockWithResult | undefined> {
const block = this.blocks.at(height);
const result = this.results.at(height);
if (block === undefined || result === undefined) {
return undefined;
}
return { block, result };
}

public async getCurrentBlockHeight(): Promise<number> {
return this.blocks.length;
}
Expand Down
3 changes: 3 additions & 0 deletions packages/sequencer/src/storage/repositories/BlockStorage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,5 +19,8 @@ export interface BlockStorage {
pushBlock: (block: Block) => Promise<void>;

getBlockAt: (height: number) => Promise<Block | undefined>;
getBlockWithResultAt: (
height: number
) => Promise<BlockWithResult | undefined>;
getBlock: (hash: string) => Promise<Block | undefined>;
}
Loading