Skip to content

Commit a561135

Browse files
committed
refactor(ensindexer): use local ponder client
Local Ponder Client is a point of integration between ENSIndexer and Ponder applications. It uses Ponder SDK to interact with Ponder APIs.
1 parent 3cc7ea4 commit a561135

26 files changed

+685
-1075
lines changed
Lines changed: 167 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,167 @@
1+
import {
2+
type ChainIdString,
3+
ChainIndexingConfigTypeIds,
4+
ChainIndexingStatusIds,
5+
type ChainIndexingStatusSnapshot,
6+
createIndexingConfig,
7+
deserializeBlockRef,
8+
deserializeChainId,
9+
deserializeChainIndexingStatusSnapshot,
10+
deserializeNonNegativeInteger,
11+
type SerializedChainIndexingStatusSnapshot,
12+
type SerializedChainIndexingStatusSnapshotBackfill,
13+
type SerializedChainIndexingStatusSnapshotCompleted,
14+
type SerializedChainIndexingStatusSnapshotFollowing,
15+
type SerializedChainIndexingStatusSnapshotQueued,
16+
} from "@ensnode/ensnode-sdk";
17+
import type {
18+
ChainBlockRefs,
19+
ChainMetadata,
20+
ChainName,
21+
PonderMetricsResponse,
22+
PonderStatusResponse,
23+
UnvalidatedChainMetadata,
24+
} from "@ensnode/ponder-sdk";
25+
26+
/**
27+
* Create {@link ChainIndexingStatusSnapshot} for the indexed chain metadata.
28+
*/
29+
export function createChainIndexingSnapshot(
30+
chainMetadata: ChainMetadata,
31+
): ChainIndexingStatusSnapshot {
32+
const {
33+
config: chainBlocksConfig,
34+
backfillEndBlock: chainBackfillEndBlock,
35+
isSyncComplete,
36+
isSyncRealtime,
37+
syncBlock: chainSyncBlock,
38+
statusBlock: chainStatusBlock,
39+
} = chainMetadata;
40+
41+
const { startBlock, endBlock } = chainBlocksConfig;
42+
const config = createIndexingConfig(startBlock, endBlock);
43+
44+
// In omnichain ordering, if the startBlock is the same as the
45+
// status block, the chain has not started yet.
46+
if (chainBlocksConfig.startBlock.number === chainStatusBlock.number) {
47+
return deserializeChainIndexingStatusSnapshot({
48+
chainStatus: ChainIndexingStatusIds.Queued,
49+
config,
50+
} satisfies SerializedChainIndexingStatusSnapshotQueued);
51+
}
52+
53+
if (isSyncComplete) {
54+
if (config.configType !== ChainIndexingConfigTypeIds.Definite) {
55+
throw new Error(
56+
`The '${ChainIndexingStatusIds.Completed}' indexing status can be only created with the '${ChainIndexingConfigTypeIds.Definite}' indexing config type.`,
57+
);
58+
}
59+
60+
return deserializeChainIndexingStatusSnapshot({
61+
chainStatus: ChainIndexingStatusIds.Completed,
62+
latestIndexedBlock: chainStatusBlock,
63+
config,
64+
} satisfies SerializedChainIndexingStatusSnapshotCompleted);
65+
}
66+
67+
if (isSyncRealtime) {
68+
if (config.configType !== ChainIndexingConfigTypeIds.Indefinite) {
69+
throw new Error(
70+
`The '${ChainIndexingStatusIds.Following}' indexing status can be only created with the '${ChainIndexingConfigTypeIds.Indefinite}' indexing config type.`,
71+
);
72+
}
73+
74+
return deserializeChainIndexingStatusSnapshot({
75+
chainStatus: ChainIndexingStatusIds.Following,
76+
latestIndexedBlock: chainStatusBlock,
77+
latestKnownBlock: chainSyncBlock,
78+
config: {
79+
configType: config.configType,
80+
startBlock: config.startBlock,
81+
},
82+
} satisfies SerializedChainIndexingStatusSnapshotFollowing);
83+
}
84+
85+
return deserializeChainIndexingStatusSnapshot({
86+
chainStatus: ChainIndexingStatusIds.Backfill,
87+
latestIndexedBlock: chainStatusBlock,
88+
backfillEndBlock: chainBackfillEndBlock,
89+
config,
90+
} satisfies SerializedChainIndexingStatusSnapshotBackfill);
91+
}
92+
93+
/**
94+
* Create serialized chain indexing snapshots.
95+
*
96+
* The output of this function is required for
97+
* calling {@link createOmnichainIndexingSnapshot}.
98+
*/
99+
export function createSerializedChainSnapshots(
100+
chainIds: ChainIdString[],
101+
chainsBlockRefs: Map<ChainName, ChainBlockRefs>,
102+
metrics: PonderMetricsResponse,
103+
status: PonderStatusResponse,
104+
): Record<ChainIdString, SerializedChainIndexingStatusSnapshot> {
105+
const serializedChainIndexingStatusSnapshots = {} as Record<
106+
ChainIdString,
107+
ChainIndexingStatusSnapshot
108+
>;
109+
110+
// collect unvalidated chain metadata for each indexed chain
111+
for (const chainId of chainIds) {
112+
const chainBlockRefs = chainsBlockRefs.get(chainId);
113+
114+
const statusChainId = deserializeChainId(`${status[chainId]?.id}`);
115+
116+
const backfillEndBlock = deserializeBlockRef(chainBlockRefs?.backfillEndBlock);
117+
118+
const syncBlock = deserializeBlockRef({
119+
number: metrics.getValue("ponder_sync_block", { chain: chainId }),
120+
timestamp: metrics.getValue("ponder_sync_block_timestamp", { chain: chainId }),
121+
});
122+
123+
const statusBlock = deserializeBlockRef({
124+
number: status[chainId]?.block.number,
125+
timestamp: status[chainId]?.block.timestamp,
126+
});
127+
128+
const historicalTotalBlocks = deserializeNonNegativeInteger(
129+
metrics.getValue("ponder_historical_total_blocks", {
130+
chain: chainId,
131+
}),
132+
);
133+
134+
const isSyncComplete = metrics.getValue("ponder_sync_is_complete", { chain: chainId });
135+
136+
const isSyncRealtime = metrics.getValue("ponder_sync_is_realtime", { chain: chainId });
137+
138+
if (typeof isSyncRealtime === "string" && !["0", "1"].includes(isSyncRealtime)) {
139+
throw new Error(
140+
`The 'ponder_sync_is_realtime' metric for chain '${chainId}' must be a string with value "0" or "1".`,
141+
);
142+
}
143+
144+
const config = {
145+
startBlock: deserializeBlockRef(chainBlockRefs?.config.startBlock),
146+
endBlock:
147+
chainBlockRefs?.config.endBlock === null
148+
? null
149+
: deserializeBlockRef(chainBlockRefs?.config.endBlock),
150+
};
151+
152+
const chainMetadata = {
153+
chainId: statusChainId,
154+
isSyncComplete: String(isSyncComplete) === "1",
155+
isSyncRealtime: String(isSyncRealtime) === "1",
156+
config,
157+
backfillEndBlock,
158+
historicalTotalBlocks,
159+
syncBlock,
160+
statusBlock,
161+
} satisfies UnvalidatedChainMetadata;
162+
163+
serializedChainIndexingStatusSnapshots[chainId] = createChainIndexingSnapshot(chainMetadata);
164+
}
165+
166+
return serializedChainIndexingStatusSnapshots;
167+
}
File renamed without changes.
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
export * from "./local-ponder-client";
Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
import { publicClients } from "ponder:api";
2+
import type { PublicClient } from "viem";
3+
4+
import {
5+
deserializeOmnichainIndexingStatusSnapshot,
6+
type OmnichainIndexingStatusSnapshot,
7+
} from "@ensnode/ensnode-sdk";
8+
import {
9+
buildHistoricalTotalBlocksForChains,
10+
type ChainBlockRefs,
11+
type ChainName,
12+
getChainsBlockRefs,
13+
getChainsBlockrange,
14+
type PonderClient,
15+
type PonderMetricsResponse,
16+
} from "@ensnode/ponder-sdk";
17+
18+
import { createSerializedChainSnapshots } from "./chain-indexing-status-snapshot";
19+
import ponderConfig from "./config";
20+
import { createSerializedOmnichainIndexingStatusSnapshot } from "./omnichain-indexing-status-snapshot";
21+
22+
export class LocalPonderClient {
23+
/**
24+
* Cached Chain Block Refs
25+
*
26+
* {@link ChainBlockRefs} for each indexed chain.
27+
*/
28+
private chainsBlockRefs = new Map<ChainName, ChainBlockRefs>();
29+
30+
constructor(private readonly ponderClient: PonderClient) {}
31+
32+
public async buildCrossChainIndexingStatusSnapshot(): Promise<OmnichainIndexingStatusSnapshot> {
33+
const [metrics, status] = await Promise.all([
34+
this.ponderClient.metrics(),
35+
this.ponderClient.status(),
36+
]);
37+
38+
const chainsBlockRefs = await this.getChainsBlockRefsCached(metrics);
39+
40+
// create serialized chain indexing snapshot for each indexed chain
41+
const serializedChainSnapshots = createSerializedChainSnapshots(
42+
this.indexedChainNames,
43+
chainsBlockRefs,
44+
metrics,
45+
status,
46+
);
47+
48+
const serializedOmnichainSnapshot =
49+
createSerializedOmnichainIndexingStatusSnapshot(serializedChainSnapshots);
50+
51+
return deserializeOmnichainIndexingStatusSnapshot(serializedOmnichainSnapshot);
52+
}
53+
54+
/**
55+
* Get cached {@link IndexedChainBlockRefs} for indexed chains.
56+
*
57+
* Guaranteed to include {@link ChainBlockRefs} for each indexed chain.
58+
*
59+
* Note: performs a network request only once and caches response to
60+
* re-use it for further `getChainsBlockRefs` calls.
61+
*
62+
* @throws when RPC calls fail or data model invariants are not met.
63+
*/
64+
private async getChainsBlockRefsCached(
65+
metrics: PonderMetricsResponse,
66+
): Promise<Map<ChainName, ChainBlockRefs>> {
67+
// early-return the cached chain block refs
68+
if (this.chainsBlockRefs.size > 0) {
69+
return this.chainsBlockRefs;
70+
}
71+
72+
this.chainsBlockRefs = await getChainsBlockRefs(
73+
this.indexedChainNames,
74+
getChainsBlockrange(this.ponderConfig),
75+
buildHistoricalTotalBlocksForChains(this.indexedChainNames, metrics),
76+
this.publicClients,
77+
);
78+
79+
return this.chainsBlockRefs;
80+
}
81+
82+
private get ponderConfig() {
83+
return ponderConfig;
84+
}
85+
86+
private get publicClients(): Record<ChainName, PublicClient> {
87+
return publicClients;
88+
}
89+
90+
private get indexedChainNames(): ChainName[] {
91+
return Object.keys(this.ponderConfig.chains) as ChainName[];
92+
}
93+
}
Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
import {
2+
type ChainIdString,
3+
type ChainIndexingStatusSnapshotForOmnichainIndexingStatusSnapshotBackfill,
4+
getOmnichainIndexingCursor,
5+
getOmnichainIndexingStatus,
6+
OmnichainIndexingStatusIds,
7+
type SerializedChainIndexingStatusSnapshot,
8+
type SerializedChainIndexingStatusSnapshotCompleted,
9+
type SerializedChainIndexingStatusSnapshotQueued,
10+
type SerializedOmnichainIndexingStatusSnapshot,
11+
type SerializedOmnichainIndexingStatusSnapshotBackfill,
12+
type SerializedOmnichainIndexingStatusSnapshotCompleted,
13+
type SerializedOmnichainIndexingStatusSnapshotFollowing,
14+
type SerializedOmnichainIndexingStatusSnapshotUnstarted,
15+
} from "@ensnode/ensnode-sdk";
16+
17+
/**
18+
* Create Serialized Omnichain Indexing Snapshot
19+
*
20+
* Creates {@link SerializedOmnichainIndexingStatusSnapshot} from serialized chain snapshots.
21+
*/
22+
export function createSerializedOmnichainIndexingStatusSnapshot(
23+
serializedChainSnapshots: Record<ChainIdString, SerializedChainIndexingStatusSnapshot>,
24+
): SerializedOmnichainIndexingStatusSnapshot {
25+
const chains = Object.values(serializedChainSnapshots);
26+
const omnichainStatus = getOmnichainIndexingStatus(chains);
27+
const omnichainIndexingCursor = getOmnichainIndexingCursor(chains);
28+
29+
switch (omnichainStatus) {
30+
case OmnichainIndexingStatusIds.Unstarted: {
31+
return {
32+
omnichainStatus: OmnichainIndexingStatusIds.Unstarted,
33+
chains: serializedChainSnapshots as Record<
34+
ChainIdString,
35+
SerializedChainIndexingStatusSnapshotQueued
36+
>, // forcing the type here, will be validated in the following 'check' step
37+
omnichainIndexingCursor,
38+
} satisfies SerializedOmnichainIndexingStatusSnapshotUnstarted;
39+
}
40+
41+
case OmnichainIndexingStatusIds.Backfill: {
42+
return {
43+
omnichainStatus: OmnichainIndexingStatusIds.Backfill,
44+
chains: serializedChainSnapshots as Record<
45+
ChainIdString,
46+
ChainIndexingStatusSnapshotForOmnichainIndexingStatusSnapshotBackfill
47+
>, // forcing the type here, will be validated in the following 'check' step
48+
omnichainIndexingCursor,
49+
} satisfies SerializedOmnichainIndexingStatusSnapshotBackfill;
50+
}
51+
52+
case OmnichainIndexingStatusIds.Completed: {
53+
return {
54+
omnichainStatus: OmnichainIndexingStatusIds.Completed,
55+
chains: serializedChainSnapshots as Record<
56+
ChainIdString,
57+
SerializedChainIndexingStatusSnapshotCompleted
58+
>, // forcing the type here, will be validated in the following 'check' step
59+
omnichainIndexingCursor,
60+
} satisfies SerializedOmnichainIndexingStatusSnapshotCompleted;
61+
}
62+
63+
case OmnichainIndexingStatusIds.Following:
64+
return {
65+
omnichainStatus: OmnichainIndexingStatusIds.Following,
66+
chains: serializedChainSnapshots,
67+
omnichainIndexingCursor,
68+
} satisfies SerializedOmnichainIndexingStatusSnapshotFollowing;
69+
}
70+
}

apps/ensindexer/ponder/ponder.config.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,8 @@ import config from "@/config";
33
import { prettyPrintJson } from "@ensnode/ensnode-sdk/internal";
44

55
import { redactENSIndexerConfig } from "@/config/redact";
6-
import ponderConfig from "@/ponder/config";
6+
7+
import ponderConfig from "./local-client/config";
78

89
////////
910
// Log redacted ENSIndexerConfig for debugging.

apps/ensindexer/ponder/src/api/handlers/ensnode-api.ts

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
import config from "@/config";
22

3-
import { publicClients } from "ponder:api";
43
import { getUnixTime } from "date-fns";
54
import { Hono } from "hono";
65

@@ -38,7 +37,7 @@ app.get("/indexing-status", async (c) => {
3837
let omnichainSnapshot: OmnichainIndexingStatusSnapshot | undefined;
3938

4039
try {
41-
omnichainSnapshot = await buildOmnichainIndexingStatusSnapshot(publicClients);
40+
omnichainSnapshot = await buildOmnichainIndexingStatusSnapshot();
4241
} catch (error) {
4342
const errorMessage = error instanceof Error ? error.message : "Unknown error";
4443
console.error(`Omnichain snapshot is currently not available: ${errorMessage}`);

0 commit comments

Comments
 (0)