Skip to content

Commit ae858cb

Browse files
committed
Harden watcher event-source failure handling and scan metrics
1 parent 4cf53fe commit ae858cb

File tree

3 files changed

+158
-7
lines changed

3 files changed

+158
-7
lines changed

packages/stackflow-agent/README.md

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -135,7 +135,9 @@ const result = await agent.acceptIncomingTransfer({
135135
4. Watcher retries are idempotent for already-disputed closures (same closure txid is skipped on later polls).
136136
5. Read-only polling isolates per-pipe failures (`getPipeState` errors on one pipe do not stop others).
137137
6. Event scan mode intentionally holds the cursor when any dispute submission errors occur, so failed disputes are retried on next run.
138-
7. `buildOutgoingTransfer(...)` defaults `actor` to the tracked local principal and rejects mismatched actor values.
139-
8. Incoming transfer validation enforces tracked contract/pipe/principals/token consistency; mismatched `pipeId`, `pipeKey`, `actor`, or token payloads are rejected.
140-
9. Incoming transfer validation also enforces sequential nonces and balance invariants against the latest stored local state (same total balance, and counterparty-actor updates must not reduce local balance).
141-
10. For production hardening, add alerting, signer balance checks, and idempotency audit logs.
138+
7. Event scan mode now reports `listErrors` (event source/indexer failures) and keeps the watcher cursor unchanged on those failures.
139+
8. Invalid closure event payloads are skipped and counted in `invalidEvents` so one malformed record does not abort a full scan.
140+
9. `buildOutgoingTransfer(...)` defaults `actor` to the tracked local principal and rejects mismatched actor values.
141+
10. Incoming transfer validation enforces tracked contract/pipe/principals/token consistency; mismatched `pipeId`, `pipeKey`, `actor`, or token payloads are rejected.
142+
11. Incoming transfer validation also enforces sequential nonces and balance invariants against the latest stored local state (same total balance, and counterparty-actor updates must not reduce local balance).
143+
12. For production hardening, add alerting, signer balance checks, and idempotency audit logs.

packages/stackflow-agent/src/watcher.js

Lines changed: 25 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -315,16 +315,34 @@ export class HourlyClosureWatcher {
315315
this.running = true;
316316
try {
317317
const fromBlockHeight = this.agentService.stateStore.getWatcherCursor();
318-
const events = await this.listClosureEvents({
319-
fromBlockHeight,
320-
});
318+
let events;
319+
try {
320+
events = await this.listClosureEvents({
321+
fromBlockHeight,
322+
});
323+
} catch (error) {
324+
this.reportError(error, "listClosureEvents");
325+
return {
326+
ok: false,
327+
scanned: 0,
328+
invalidEvents: 0,
329+
disputesSubmitted: 0,
330+
skippedAlreadyDisputed: 0,
331+
disputeErrors: 0,
332+
listErrors: 1,
333+
fromBlockHeight,
334+
toBlockHeight: fromBlockHeight,
335+
};
336+
}
321337
if (!Array.isArray(events) || events.length === 0) {
322338
return {
323339
ok: true,
324340
scanned: 0,
341+
invalidEvents: 0,
325342
disputesSubmitted: 0,
326343
skippedAlreadyDisputed: 0,
327344
disputeErrors: 0,
345+
listErrors: 0,
328346
fromBlockHeight,
329347
toBlockHeight: fromBlockHeight,
330348
};
@@ -334,6 +352,7 @@ export class HourlyClosureWatcher {
334352
let disputesSubmitted = 0;
335353
let skippedAlreadyDisputed = 0;
336354
let disputeErrors = 0;
355+
let invalidEvents = 0;
337356
let hasDisputeErrors = false;
338357
let scanned = 0;
339358

@@ -342,6 +361,7 @@ export class HourlyClosureWatcher {
342361
try {
343362
closure = normalizeClosureEvent(rawEvent);
344363
} catch {
364+
invalidEvents += 1;
345365
continue;
346366
}
347367
scanned += 1;
@@ -382,9 +402,11 @@ export class HourlyClosureWatcher {
382402
return {
383403
ok: true,
384404
scanned,
405+
invalidEvents,
385406
disputesSubmitted,
386407
skippedAlreadyDisputed,
387408
disputeErrors,
409+
listErrors: 0,
388410
fromBlockHeight,
389411
toBlockHeight,
390412
};

tests/stackflow-agent.test.ts

Lines changed: 127 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -510,6 +510,133 @@ describe("stackflow agent", () => {
510510
store.close();
511511
});
512512

513+
it("keeps event cursor and reports list source failures", async () => {
514+
const dbFile = tempDbFile("agent-event-source-error");
515+
const store = new AgentStateStore({ dbFile });
516+
517+
const errors: Error[] = [];
518+
const agent = new StackflowAgentService({
519+
stateStore: store,
520+
signer: {
521+
async submitDispute() {
522+
return { txid: "0x1" };
523+
},
524+
async callContract() {
525+
return { ok: true };
526+
},
527+
},
528+
network: "devnet",
529+
});
530+
531+
const watcher = new HourlyClosureWatcher({
532+
agentService: agent,
533+
listClosureEvents: async () => {
534+
throw new Error("indexer timeout");
535+
},
536+
onError: (error) => {
537+
errors.push(error as Error);
538+
},
539+
});
540+
541+
const result = await watcher.runOnce();
542+
expect(result.ok).toBe(false);
543+
expect(result.listErrors).toBe(1);
544+
expect(result.scanned).toBe(0);
545+
expect(result.toBlockHeight).toBe("0");
546+
expect(store.getWatcherCursor()).toBe("0");
547+
expect(errors).toHaveLength(1);
548+
549+
watcher.stop();
550+
store.close();
551+
});
552+
553+
it("counts invalid closure events without aborting scan", async () => {
554+
const dbFile = tempDbFile("agent-event-invalid-event");
555+
const store = new AgentStateStore({ dbFile });
556+
557+
const contractId = "ST1TESTABC.contract";
558+
const pipeKey = {
559+
"principal-1": "ST1LOCAL",
560+
"principal-2": "ST1OTHER",
561+
token: null,
562+
};
563+
const pipeId = buildPipeId({ contractId, pipeKey });
564+
565+
store.upsertTrackedPipe({
566+
pipeId,
567+
contractId,
568+
pipeKey,
569+
localPrincipal: "ST1LOCAL",
570+
counterpartyPrincipal: "ST1OTHER",
571+
token: null,
572+
});
573+
store.upsertSignatureState({
574+
contractId,
575+
pipeKey,
576+
forPrincipal: "ST1LOCAL",
577+
withPrincipal: "ST1OTHER",
578+
token: null,
579+
myBalance: "90",
580+
theirBalance: "10",
581+
nonce: "8",
582+
action: "1",
583+
actor: "ST1LOCAL",
584+
mySignature: "0x" + "11".repeat(65),
585+
theirSignature: "0x" + "22".repeat(65),
586+
secret: null,
587+
validAfter: null,
588+
beneficialOnly: false,
589+
});
590+
591+
let disputeCalls = 0;
592+
const agent = new StackflowAgentService({
593+
stateStore: store,
594+
signer: {
595+
async submitDispute() {
596+
disputeCalls += 1;
597+
return { txid: "0xdispute-ok" };
598+
},
599+
async callContract() {
600+
return { ok: true };
601+
},
602+
},
603+
network: "devnet",
604+
disputeOnlyBeneficial: true,
605+
});
606+
607+
const watcher = new HourlyClosureWatcher({
608+
agentService: agent,
609+
listClosureEvents: async () => [
610+
{
611+
eventName: "invalid",
612+
},
613+
{
614+
contractId,
615+
pipeKey,
616+
eventName: "force-close",
617+
nonce: "5",
618+
closer: "ST1OTHER",
619+
txid: "0xtx-valid",
620+
blockHeight: "123",
621+
expiresAt: "200",
622+
closureMyBalance: "20",
623+
},
624+
],
625+
});
626+
627+
const result = await watcher.runOnce();
628+
expect(result.ok).toBe(true);
629+
expect(result.invalidEvents).toBe(1);
630+
expect(result.scanned).toBe(1);
631+
expect(result.disputesSubmitted).toBe(1);
632+
expect(result.toBlockHeight).toBe("123");
633+
expect(store.getWatcherCursor()).toBe("123");
634+
expect(disputeCalls).toBe(1);
635+
636+
watcher.stop();
637+
store.close();
638+
});
639+
513640
it("skips overlapping readonly watcher runs", async () => {
514641
const dbFile = tempDbFile("agent-readonly-overlap");
515642
const store = new AgentStateStore({ dbFile });

0 commit comments

Comments
 (0)