Skip to content

Commit 9334cb0

Browse files
feat(session): add bi-directional cursor-based pagination with Home/End navigation
Implements cursor-based pagination for message loading to handle long sessions without memory explosion with absolute navigation via Home/End keys. API changes: - Add 'before' cursor param: fetch messages older than cursor (newest first) - Add 'after' cursor param: fetch messages newer than cursor (oldest first) - Add 'oldest' param: start from oldest messages (for jumpToOldest) - Link headers with rel="prev"/"next" for cursor discovery (RFC 5005) TUI changes: - loadOlder/loadNewer actions with sliding window eviction (500 msg limit) - jumpToOldest (Home): fetches oldest page via ?oldest=true - jumpToLatest (End): fetches newest page, preserves revert marker - Detached mode: ignores SSE when viewing history to prevent gaps Implementation: - Binary.lowerBound for efficient cursor lookup - parseLinkHeader utility for RFC 5988 parsing - Message.stream() reverse option for ascending order - Smart parts cleanup: only deletes parts for evicted messages Tests: - Unit tests for pagination logic and cursor handling - API tests for before/after/oldest params and Link headers Resolves: #6548
1 parent 36f5ba5 commit 9334cb0

File tree

13 files changed

+1229
-25
lines changed

13 files changed

+1229
-25
lines changed

packages/opencode/src/cli/cmd/tui/context/sync.tsx

Lines changed: 286 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,12 @@ import { useArgs } from "./args"
2828
import { batch, onMount } from "solid-js"
2929
import { Log } from "@/util/log"
3030
import type { Path } from "@opencode-ai/sdk"
31+
import { parseLinkHeader } from "@/util/link-header"
32+
33+
/** Maximum messages kept in memory per session */
34+
const MAX_LOADED_MESSAGES = 500
35+
/** Chunk size for eviction when limit exceeded */
36+
const EVICTION_CHUNK_SIZE = 50
3137

3238
export const { use: useSync, provider: SyncProvider } = createSimpleContext({
3339
name: "Sync",
@@ -48,6 +54,15 @@ export const { use: useSync, provider: SyncProvider } = createSimpleContext({
4854
}
4955
config: Config
5056
session: Session[]
57+
message_page: {
58+
[sessionID: string]: {
59+
hasOlder: boolean
60+
hasNewer: boolean
61+
loading: boolean
62+
loadingDirection?: "older" | "newer"
63+
error?: string
64+
}
65+
}
5166
session_status: {
5267
[sessionID: string]: SessionStatus
5368
}
@@ -89,6 +104,7 @@ export const { use: useSync, provider: SyncProvider } = createSimpleContext({
89104
provider: [],
90105
provider_default: {},
91106
session: [],
107+
message_page: {},
92108
session_status: {},
93109
session_diff: {},
94110
todo: {},
@@ -226,19 +242,24 @@ export const { use: useSync, provider: SyncProvider } = createSimpleContext({
226242
}
227243

228244
case "message.updated": {
229-
const messages = store.message[event.properties.info.sessionID]
245+
const sessionID = event.properties.info.sessionID
246+
const page = store.message_page[sessionID]
247+
const messages = store.message[sessionID]
230248
if (!messages) {
231-
setStore("message", event.properties.info.sessionID, [event.properties.info])
249+
setStore("message", sessionID, [event.properties.info])
232250
break
233251
}
234252
const result = Binary.search(messages, event.properties.info.id, (m) => m.id)
235253
if (result.found) {
236-
setStore("message", event.properties.info.sessionID, result.index, reconcile(event.properties.info))
254+
setStore("message", sessionID, result.index, reconcile(event.properties.info))
255+
break
256+
}
257+
if (page?.hasNewer) {
237258
break
238259
}
239260
setStore(
240261
"message",
241-
event.properties.info.sessionID,
262+
sessionID,
242263
produce((draft) => {
243264
draft.splice(result.index, 0, event.properties.info)
244265
}),
@@ -279,6 +300,13 @@ export const { use: useSync, provider: SyncProvider } = createSimpleContext({
279300
break
280301
}
281302
case "message.part.updated": {
303+
const sessionID = event.properties.part.sessionID
304+
const page = store.message_page[sessionID]
305+
const messages = store.message[sessionID]
306+
const messageExists = messages?.some((m) => m.id === event.properties.part.messageID)
307+
if (page?.hasNewer && !messageExists) {
308+
break
309+
}
282310
const parts = store.part[event.properties.part.messageID]
283311
if (!parts) {
284312
setStore("part", event.properties.part.messageID, [event.properties.part])
@@ -389,6 +417,7 @@ export const { use: useSync, provider: SyncProvider } = createSimpleContext({
389417
})
390418

391419
const fullSyncedSessions = new Set<string>()
420+
const loadingGuard = new Set<string>()
392421
const result = {
393422
data: store,
394423
set: setStore,
@@ -422,6 +451,8 @@ export const { use: useSync, provider: SyncProvider } = createSimpleContext({
422451
sdk.client.session.todo({ sessionID }),
423452
sdk.client.session.diff({ sessionID }),
424453
])
454+
const link = messages.response.headers.get("link") ?? ""
455+
const hasOlder = parseLinkHeader(link).prev !== undefined
425456
setStore(
426457
produce((draft) => {
427458
const match = Binary.search(draft.session, sessionID, (s) => s.id)
@@ -433,10 +464,261 @@ export const { use: useSync, provider: SyncProvider } = createSimpleContext({
433464
draft.part[message.info.id] = message.parts
434465
}
435466
draft.session_diff[sessionID] = diff.data ?? []
467+
draft.message_page[sessionID] = { hasOlder, hasNewer: false, loading: false, error: undefined }
436468
}),
437469
)
438470
fullSyncedSessions.add(sessionID)
439471
},
472+
async loadOlder(sessionID: string) {
473+
const page = store.message_page[sessionID]
474+
if (page?.loading || !page?.hasOlder) return
475+
const messages = store.message[sessionID] ?? []
476+
const oldest = messages.at(0)
477+
if (!oldest) return
478+
if (loadingGuard.has(sessionID)) return
479+
loadingGuard.add(sessionID)
480+
try {
481+
setStore("message_page", sessionID, { ...page, loading: true, loadingDirection: "older", error: undefined })
482+
483+
const res = await sdk.client.session.messages(
484+
{ sessionID, before: oldest.id, limit: 100 },
485+
{ throwOnError: true },
486+
)
487+
const link = res.response.headers.get("link") ?? ""
488+
const hasOlder = parseLinkHeader(link).prev !== undefined
489+
setStore(
490+
produce((draft) => {
491+
const existing = draft.message[sessionID] ?? []
492+
for (const msg of res.data ?? []) {
493+
const match = Binary.search(existing, msg.info.id, (m) => m.id)
494+
if (!match.found) {
495+
existing.splice(match.index, 0, msg.info)
496+
draft.part[msg.info.id] = msg.parts
497+
}
498+
}
499+
if (existing.length > MAX_LOADED_MESSAGES + EVICTION_CHUNK_SIZE) {
500+
const evicted = existing.splice(-(existing.length - MAX_LOADED_MESSAGES))
501+
for (const msg of evicted) delete draft.part[msg.id]
502+
draft.message_page[sessionID] = { hasOlder, hasNewer: true, loading: false, error: undefined }
503+
} else {
504+
draft.message_page[sessionID] = {
505+
hasOlder,
506+
hasNewer: draft.message_page[sessionID]?.hasNewer ?? false,
507+
loading: false,
508+
error: undefined,
509+
}
510+
}
511+
}),
512+
)
513+
} catch (e) {
514+
const page = store.message_page[sessionID]
515+
setStore("message_page", sessionID, {
516+
hasOlder: page?.hasOlder ?? false,
517+
hasNewer: page?.hasNewer ?? false,
518+
loading: false,
519+
error: e instanceof Error ? e.message : String(e),
520+
})
521+
} finally {
522+
loadingGuard.delete(sessionID)
523+
}
524+
},
525+
async loadNewer(sessionID: string) {
526+
const page = store.message_page[sessionID]
527+
if (page?.loading || !page?.hasNewer) return
528+
const messages = store.message[sessionID] ?? []
529+
const newest = messages.at(-1)
530+
if (!newest) return
531+
if (loadingGuard.has(sessionID)) return
532+
loadingGuard.add(sessionID)
533+
try {
534+
setStore("message_page", sessionID, { ...page, loading: true, loadingDirection: "newer", error: undefined })
535+
536+
const res = await sdk.client.session.messages(
537+
{ sessionID, after: newest.id, limit: 100 },
538+
{ throwOnError: true },
539+
)
540+
const link = res.response.headers.get("link") ?? ""
541+
const hasNewer = parseLinkHeader(link).next !== undefined
542+
setStore(
543+
produce((draft) => {
544+
const existing = draft.message[sessionID] ?? []
545+
for (const msg of res.data ?? []) {
546+
const match = Binary.search(existing, msg.info.id, (m) => m.id)
547+
if (!match.found) {
548+
existing.splice(match.index, 0, msg.info)
549+
draft.part[msg.info.id] = msg.parts
550+
}
551+
}
552+
if (existing.length > MAX_LOADED_MESSAGES + EVICTION_CHUNK_SIZE) {
553+
const evicted = existing.splice(0, existing.length - MAX_LOADED_MESSAGES)
554+
for (const msg of evicted) delete draft.part[msg.id]
555+
draft.message_page[sessionID] = { hasOlder: true, hasNewer, loading: false, error: undefined }
556+
} else {
557+
draft.message_page[sessionID] = {
558+
hasOlder: draft.message_page[sessionID]?.hasOlder ?? false,
559+
hasNewer,
560+
loading: false,
561+
error: undefined,
562+
}
563+
}
564+
}),
565+
)
566+
} catch (e) {
567+
const page = store.message_page[sessionID]
568+
setStore("message_page", sessionID, {
569+
hasOlder: page?.hasOlder ?? false,
570+
hasNewer: page?.hasNewer ?? false,
571+
loading: false,
572+
error: e instanceof Error ? e.message : String(e),
573+
})
574+
} finally {
575+
loadingGuard.delete(sessionID)
576+
}
577+
},
578+
async jumpToLatest(sessionID: string) {
579+
const page = store.message_page[sessionID]
580+
if (page?.loading || !page?.hasNewer) return
581+
if (loadingGuard.has(sessionID)) return
582+
loadingGuard.add(sessionID)
583+
584+
try {
585+
// Check for revert state
586+
const session = store.session.find((s) => s.id === sessionID)
587+
const revertMessageID = session?.revert?.messageID
588+
589+
setStore("message_page", sessionID, {
590+
...page,
591+
loading: true,
592+
loadingDirection: "newer",
593+
error: undefined,
594+
})
595+
596+
// Fetch newest page (no cursor = newest)
597+
const res = await sdk.client.session.messages({ sessionID, limit: 100 }, { throwOnError: true })
598+
599+
let messages = res.data ?? []
600+
const link = res.response.headers.get("link") ?? ""
601+
const hasOlder = parseLinkHeader(link).prev !== undefined
602+
603+
// Revert-aware: If in revert state and marker not in results, fetch it
604+
if (revertMessageID && !messages.some((m) => m.info.id === revertMessageID)) {
605+
try {
606+
const revertResult = await sdk.client.session.message(
607+
{ sessionID, messageID: revertMessageID },
608+
{ throwOnError: true },
609+
)
610+
if (revertResult.data) {
611+
// Prepend revert message (it's older than newest page)
612+
messages = [revertResult.data, ...messages]
613+
}
614+
} catch (e) {
615+
// Revert message may have been deleted, continue without it
616+
Log.Default.info("Revert marker fetch failed (may be deleted)", {
617+
messageID: revertMessageID,
618+
error: e,
619+
})
620+
}
621+
}
622+
623+
setStore(
624+
produce((draft) => {
625+
// Clean up parts only for messages not in new results
626+
const oldMessages = draft.message[sessionID] ?? []
627+
const newIds = new Set(messages.map((m) => m.info.id))
628+
for (const msg of oldMessages) {
629+
if (!newIds.has(msg.id)) {
630+
delete draft.part[msg.id]
631+
}
632+
}
633+
634+
// Store new messages
635+
draft.message[sessionID] = messages.map((m) => m.info)
636+
for (const msg of messages) {
637+
draft.part[msg.info.id] = msg.parts
638+
}
639+
draft.message_page[sessionID] = {
640+
hasOlder,
641+
hasNewer: false,
642+
loading: false,
643+
error: undefined,
644+
}
645+
}),
646+
)
647+
} catch (e) {
648+
setStore(
649+
produce((draft) => {
650+
const p = draft.message_page[sessionID]
651+
if (p) {
652+
p.loading = false
653+
p.error = e instanceof Error ? e.message : String(e)
654+
}
655+
}),
656+
)
657+
} finally {
658+
loadingGuard.delete(sessionID)
659+
}
660+
},
661+
async jumpToOldest(sessionID: string) {
662+
const page = store.message_page[sessionID]
663+
if (page?.loading || !page?.hasOlder) return
664+
if (loadingGuard.has(sessionID)) return
665+
loadingGuard.add(sessionID)
666+
667+
try {
668+
setStore("message_page", sessionID, {
669+
...page,
670+
loading: true,
671+
loadingDirection: "older",
672+
error: undefined,
673+
})
674+
675+
const res = await sdk.client.session.messages(
676+
{ sessionID, oldest: true, limit: 100 },
677+
{ throwOnError: true },
678+
)
679+
680+
const messages = res.data ?? []
681+
const link = res.response.headers.get("link") ?? ""
682+
const hasNewer = parseLinkHeader(link).next !== undefined
683+
684+
setStore(
685+
produce((draft) => {
686+
// Clean up parts only for messages not in new results
687+
const oldMessages = draft.message[sessionID] ?? []
688+
const newIds = new Set(messages.map((m) => m.info.id))
689+
for (const msg of oldMessages) {
690+
if (!newIds.has(msg.id)) {
691+
delete draft.part[msg.id]
692+
}
693+
}
694+
695+
// Store new messages
696+
draft.message[sessionID] = messages.map((m) => m.info)
697+
for (const msg of messages) {
698+
draft.part[msg.info.id] = msg.parts
699+
}
700+
draft.message_page[sessionID] = {
701+
hasOlder: false,
702+
hasNewer,
703+
loading: false,
704+
error: undefined,
705+
}
706+
}),
707+
)
708+
} catch (e) {
709+
setStore(
710+
produce((draft) => {
711+
const p = draft.message_page[sessionID]
712+
if (p) {
713+
p.loading = false
714+
p.error = e instanceof Error ? e.message : String(e)
715+
}
716+
}),
717+
)
718+
} finally {
719+
loadingGuard.delete(sessionID)
720+
}
721+
},
440722
},
441723
bootstrap,
442724
}

0 commit comments

Comments
 (0)