Skip to content

Commit 4f57718

Browse files
feat(session): add bi-directional cursor-based pagination for messages
Implements Issue #6548: Cursor-based pagination to support long session histories without performance degradation. This supersedes PR #6656 by @ry2009, addressing the feedback from that review and adding additional features. Backend: - Add `before` and `after` cursor support to Session.messages and API - Add Binary.lowerBound to util for efficient cursor lookup on ULIDs - Use Promise.allSettled for graceful handling of deleted messages - Return RFC 5005 compliant Link headers (rel="prev"/rel="next") - Return 400 when both cursors specified (mutual exclusion) - Validate cursors via Zod schema with graceful fallback TUI: - Track message_page state with hasOlder/hasNewer flags - Implement loadOlder/loadNewer with scroll position restoration - Memory-bound to 500 messages with 50-message eviction chunks - Skip live SSE inserts when detached from head (hasNewer=true) - Add error state handling for failed pagination requests Improvements over PR #6656: - Bi-directional pagination (before + after cursors) - Proper parseLinkHeader utility with comprehensive tests - Memory bounding to prevent unbounded growth - Error state feedback instead of silent failures - RFC 5005 compliant Link header semantics Tests: - messages-pagination.test.ts: cursor, boundary, deletion tests (5 tests) - session-messages.test.ts: API validation, Link header tests (4 tests) - parse-link-header.test.ts: RFC 8288 parser tests (9 tests) Closes #6548
1 parent 6b6d6e9 commit 4f57718

File tree

12 files changed

+904
-17
lines changed

12 files changed

+904
-17
lines changed

packages/opencode/src/cli/cmd/tui/context/sync.tsx

Lines changed: 146 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,12 @@ import { useArgs } from "./args"
2828
import { batch, onMount } from "solid-js"
2929
import { Log } from "@/util/log"
3030
import type { Path } from "@opencode-ai/sdk"
31+
import { parseLinkHeader } from "@/util/link-header"
32+
33+
/** Maximum messages kept in memory per session */
34+
const MAX_LOADED_MESSAGES = 500
35+
/** Chunk size for eviction when limit exceeded */
36+
const EVICTION_CHUNK_SIZE = 50
3137

3238
export const { use: useSync, provider: SyncProvider } = createSimpleContext({
3339
name: "Sync",
@@ -48,6 +54,15 @@ export const { use: useSync, provider: SyncProvider } = createSimpleContext({
4854
}
4955
config: Config
5056
session: Session[]
57+
message_page: {
58+
[sessionID: string]: {
59+
hasOlder: boolean
60+
hasNewer: boolean
61+
loading: boolean
62+
loadingDirection?: "older" | "newer"
63+
error?: string
64+
}
65+
}
5166
session_status: {
5267
[sessionID: string]: SessionStatus
5368
}
@@ -89,6 +104,7 @@ export const { use: useSync, provider: SyncProvider } = createSimpleContext({
89104
provider: [],
90105
provider_default: {},
91106
session: [],
107+
message_page: {},
92108
session_status: {},
93109
session_diff: {},
94110
todo: {},
@@ -226,22 +242,26 @@ export const { use: useSync, provider: SyncProvider } = createSimpleContext({
226242
}
227243

228244
case "message.updated": {
229-
const messages = store.message[event.properties.info.sessionID]
245+
const sessionID = event.properties.info.sessionID
246+
const page = store.message_page[sessionID]
247+
const messages = store.message[sessionID]
230248
if (!messages) {
231-
setStore("message", event.properties.info.sessionID, [event.properties.info])
249+
setStore("message", sessionID, [event.properties.info])
232250
break
233251
}
234252
const result = Binary.search(messages, event.properties.info.id, (m) => m.id)
235253
if (result.found) {
236-
setStore("message", event.properties.info.sessionID, result.index, reconcile(event.properties.info))
254+
setStore("message", sessionID, result.index, reconcile(event.properties.info))
255+
break
256+
}
257+
if (page?.hasNewer) {
237258
break
238259
}
239260
setStore(
240261
"message",
241-
event.properties.info.sessionID,
262+
sessionID,
242263
produce((draft) => {
243264
draft.splice(result.index, 0, event.properties.info)
244-
if (draft.length > 100) draft.shift()
245265
}),
246266
)
247267
break
@@ -261,6 +281,13 @@ export const { use: useSync, provider: SyncProvider } = createSimpleContext({
261281
break
262282
}
263283
case "message.part.updated": {
284+
const sessionID = event.properties.part.sessionID
285+
const page = store.message_page[sessionID]
286+
const messages = store.message[sessionID]
287+
const messageExists = messages?.some((m) => m.id === event.properties.part.messageID)
288+
if (page?.hasNewer && !messageExists) {
289+
break
290+
}
264291
const parts = store.part[event.properties.part.messageID]
265292
if (!parts) {
266293
setStore("part", event.properties.part.messageID, [event.properties.part])
@@ -371,6 +398,7 @@ export const { use: useSync, provider: SyncProvider } = createSimpleContext({
371398
})
372399

373400
const fullSyncedSessions = new Set<string>()
401+
const loadingGuard = new Set<string>()
374402
const result = {
375403
data: store,
376404
set: setStore,
@@ -404,6 +432,8 @@ export const { use: useSync, provider: SyncProvider } = createSimpleContext({
404432
sdk.client.session.todo({ sessionID }),
405433
sdk.client.session.diff({ sessionID }),
406434
])
435+
const link = messages.response.headers.get("link") ?? ""
436+
const hasOlder = parseLinkHeader(link).prev !== undefined
407437
setStore(
408438
produce((draft) => {
409439
const match = Binary.search(draft.session, sessionID, (s) => s.id)
@@ -415,10 +445,121 @@ export const { use: useSync, provider: SyncProvider } = createSimpleContext({
415445
draft.part[message.info.id] = message.parts
416446
}
417447
draft.session_diff[sessionID] = diff.data ?? []
448+
draft.message_page[sessionID] = { hasOlder, hasNewer: false, loading: false, error: undefined }
418449
}),
419450
)
420451
fullSyncedSessions.add(sessionID)
421452
},
453+
async loadOlder(sessionID: string) {
454+
if (loadingGuard.has(sessionID)) return
455+
loadingGuard.add(sessionID)
456+
try {
457+
const page = store.message_page[sessionID]
458+
if (page?.loading || !page?.hasOlder) return
459+
460+
const messages = store.message[sessionID] ?? []
461+
const oldest = messages.at(0)
462+
if (!oldest) return
463+
464+
setStore("message_page", sessionID, { ...page, loading: true, loadingDirection: "older", error: undefined })
465+
466+
const res = await sdk.client.session.messages(
467+
{ sessionID, before: oldest.id, limit: 100 },
468+
{ throwOnError: true },
469+
)
470+
const link = res.response.headers.get("link") ?? ""
471+
const hasOlder = parseLinkHeader(link).prev !== undefined
472+
setStore(
473+
produce((draft) => {
474+
const existing = draft.message[sessionID] ?? []
475+
for (const msg of res.data ?? []) {
476+
const match = Binary.search(existing, msg.info.id, (m) => m.id)
477+
if (!match.found) {
478+
existing.splice(match.index, 0, msg.info)
479+
draft.part[msg.info.id] = msg.parts
480+
}
481+
}
482+
if (existing.length > MAX_LOADED_MESSAGES + EVICTION_CHUNK_SIZE) {
483+
const evicted = existing.splice(-(existing.length - MAX_LOADED_MESSAGES))
484+
for (const msg of evicted) delete draft.part[msg.id]
485+
draft.message_page[sessionID] = { hasOlder, hasNewer: true, loading: false, error: undefined }
486+
} else {
487+
draft.message_page[sessionID] = {
488+
hasOlder,
489+
hasNewer: draft.message_page[sessionID]?.hasNewer ?? false,
490+
loading: false,
491+
error: undefined,
492+
}
493+
}
494+
}),
495+
)
496+
} catch (e) {
497+
const page = store.message_page[sessionID]
498+
setStore("message_page", sessionID, {
499+
hasOlder: page?.hasOlder ?? true,
500+
hasNewer: page?.hasNewer ?? false,
501+
loading: false,
502+
error: e instanceof Error ? e.message : String(e),
503+
})
504+
} finally {
505+
loadingGuard.delete(sessionID)
506+
}
507+
},
508+
async loadNewer(sessionID: string) {
509+
if (loadingGuard.has(sessionID)) return
510+
loadingGuard.add(sessionID)
511+
try {
512+
const page = store.message_page[sessionID]
513+
if (page?.loading || !page?.hasNewer) return
514+
515+
const messages = store.message[sessionID] ?? []
516+
const newest = messages.at(-1)
517+
if (!newest) return
518+
519+
setStore("message_page", sessionID, { ...page, loading: true, loadingDirection: "newer", error: undefined })
520+
521+
const res = await sdk.client.session.messages(
522+
{ sessionID, after: newest.id, limit: 100 },
523+
{ throwOnError: true },
524+
)
525+
const link = res.response.headers.get("link") ?? ""
526+
const hasNewer = parseLinkHeader(link).next !== undefined
527+
setStore(
528+
produce((draft) => {
529+
const existing = draft.message[sessionID] ?? []
530+
for (const msg of res.data ?? []) {
531+
const match = Binary.search(existing, msg.info.id, (m) => m.id)
532+
if (!match.found) {
533+
existing.splice(match.index, 0, msg.info)
534+
draft.part[msg.info.id] = msg.parts
535+
}
536+
}
537+
if (existing.length > MAX_LOADED_MESSAGES + EVICTION_CHUNK_SIZE) {
538+
const evicted = existing.splice(0, existing.length - MAX_LOADED_MESSAGES)
539+
for (const msg of evicted) delete draft.part[msg.id]
540+
draft.message_page[sessionID] = { hasOlder: true, hasNewer, loading: false, error: undefined }
541+
} else {
542+
draft.message_page[sessionID] = {
543+
hasOlder: draft.message_page[sessionID]?.hasOlder ?? false,
544+
hasNewer,
545+
loading: false,
546+
error: undefined,
547+
}
548+
}
549+
}),
550+
)
551+
} catch (e) {
552+
const page = store.message_page[sessionID]
553+
setStore("message_page", sessionID, {
554+
hasOlder: page?.hasOlder ?? false,
555+
hasNewer: page?.hasNewer ?? true,
556+
loading: false,
557+
error: e instanceof Error ? e.message : String(e),
558+
})
559+
} finally {
560+
loadingGuard.delete(sessionID)
561+
}
562+
},
422563
},
423564
bootstrap,
424565
}

packages/opencode/src/cli/cmd/tui/routes/session/index.tsx

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,7 @@ export function Session() {
119119
.toSorted((a, b) => (a.id < b.id ? -1 : a.id > b.id ? 1 : 0))
120120
})
121121
const messages = createMemo(() => sync.data.message[route.sessionID] ?? [])
122+
const paging = createMemo(() => sync.data.message_page[route.sessionID])
122123
const permissions = createMemo(() => {
123124
if (session()?.parentID) return []
124125
return children().flatMap((x) => sync.data.permission[x.id] ?? [])
@@ -128,6 +129,34 @@ export function Session() {
128129
return children().flatMap((x) => sync.data.question[x.id] ?? [])
129130
})
130131

132+
const LOAD_MORE_THRESHOLD = 5
133+
134+
const loadOlder = () => {
135+
const page = paging()
136+
if (!page?.hasOlder || page.loading || !scroll) return
137+
if (scroll.y > LOAD_MORE_THRESHOLD) return
138+
139+
const height = scroll.scrollHeight
140+
const y = scroll.y
141+
sync.session.loadOlder(route.sessionID).then(() => {
142+
queueMicrotask(() => {
143+
requestAnimationFrame(() => {
144+
const delta = scroll.scrollHeight - height
145+
if (delta > 0) scroll.scrollTo(y + delta)
146+
})
147+
})
148+
})
149+
}
150+
151+
const loadNewer = () => {
152+
const page = paging()
153+
if (!page?.hasNewer || page.loading || !scroll) return
154+
const bottomDistance = scroll.scrollHeight - scroll.y - scroll.height
155+
if (bottomDistance > LOAD_MORE_THRESHOLD) return
156+
157+
sync.session.loadNewer(route.sessionID)
158+
}
159+
131160
const pending = createMemo(() => {
132161
return messages().findLast((x) => x.role === "assistant" && !x.time.completed)?.id
133162
})
@@ -910,6 +939,19 @@ export function Session() {
910939
</Show>
911940
<scrollbox
912941
ref={(r) => (scroll = r)}
942+
onMouseScroll={() => {
943+
loadOlder()
944+
loadNewer()
945+
}}
946+
onKeyDown={(e) => {
947+
if (["up", "pageup", "home"].includes(e.name)) {
948+
setTimeout(loadOlder, 0)
949+
}
950+
if (["down", "pagedown", "end"].includes(e.name)) {
951+
setTimeout(loadNewer, 0)
952+
}
953+
}}
954+
viewportCulling={true}
913955
viewportOptions={{
914956
paddingRight: showScrollbar() ? 1 : 0,
915957
}}
@@ -926,6 +968,22 @@ export function Session() {
926968
flexGrow={1}
927969
scrollAcceleration={scrollAcceleration()}
928970
>
971+
<Show when={paging()?.loading && paging()?.loadingDirection === "older"}>
972+
<box flexShrink={0} paddingLeft={1}>
973+
<text fg={theme.textMuted}>Loading older messages...</text>
974+
</box>
975+
</Show>
976+
<Show when={paging()?.hasOlder && !paging()?.loading}>
977+
<box flexShrink={0} paddingLeft={1}>
978+
<text fg={theme.textMuted}>(scroll up for more)</text>
979+
</box>
980+
</Show>
981+
<Show when={paging()?.error}>
982+
<box flexShrink={0} paddingLeft={1}>
983+
<text fg={theme.error}>Failed to load: {paging()?.error}</text>
984+
<text fg={theme.textMuted}> (scroll to retry)</text>
985+
</box>
986+
</Show>
929987
<For each={messages()}>
930988
{(message, index) => (
931989
<Switch>
@@ -1021,6 +1079,16 @@ export function Session() {
10211079
</Switch>
10221080
)}
10231081
</For>
1082+
<Show when={paging()?.loading && paging()?.loadingDirection === "newer"}>
1083+
<box flexShrink={0} paddingLeft={1}>
1084+
<text fg={theme.textMuted}>Loading newer messages...</text>
1085+
</box>
1086+
</Show>
1087+
<Show when={paging()?.hasNewer && !paging()?.loading}>
1088+
<box flexShrink={0} paddingLeft={1}>
1089+
<text fg={theme.textMuted}>(scroll down for more)</text>
1090+
</box>
1091+
</Show>
10241092
</scrollbox>
10251093
<box flexShrink={0}>
10261094
<Show when={permissions().length > 0}>

packages/opencode/src/server/server.ts

Lines changed: 50 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ import { QuestionRoute } from "./question"
5555
import { Installation } from "@/installation"
5656
import { MDNS } from "./mdns"
5757
import { Worktree } from "../worktree"
58+
import { Identifier } from "../id/id"
5859

5960
// @ts-ignore This global is needed to prevent ai-sdk from logging warnings to stdout https://github.com/vercel/ai/blob/2dc67e0ef538307f21368db32d5a12345d98831b/packages/ai/src/logger/log-warnings.ts#L85
6061
globalThis.AI_SDK_LOG_WARNINGS = false
@@ -1250,16 +1251,60 @@ export namespace Server {
12501251
validator(
12511252
"query",
12521253
z.object({
1253-
limit: z.coerce.number().optional(),
1254+
limit: z.coerce.number().int().min(1).max(100).optional(),
1255+
before: Identifier.schema("message").optional(),
1256+
after: Identifier.schema("message").optional(),
12541257
}),
12551258
),
12561259
async (c) => {
12571260
const query = c.req.valid("query")
1258-
const messages = await Session.messages({
1259-
sessionID: c.req.valid("param").sessionID,
1260-
limit: query.limit,
1261+
if (query.before && query.after) {
1262+
return c.json({ error: "Cannot specify both 'before' and 'after'" }, 400)
1263+
}
1264+
const limit = query.limit ?? 100
1265+
const sessionID = c.req.valid("param").sessionID
1266+
1267+
if (query.after) {
1268+
const page = await Session.messages({
1269+
sessionID,
1270+
limit: limit + 1,
1271+
after: query.after,
1272+
})
1273+
1274+
if (page.length > limit) {
1275+
const messages = page.slice(0, -1)
1276+
const last = messages.at(-1)
1277+
if (last) {
1278+
const url = new URL(c.req.url)
1279+
url.searchParams.set("limit", limit.toString())
1280+
url.searchParams.set("after", last.info.id)
1281+
c.header("Link", `<${url.toString()}>; rel="next"`)
1282+
}
1283+
return c.json(messages)
1284+
}
1285+
1286+
return c.json(page)
1287+
}
1288+
1289+
const page = await Session.messages({
1290+
sessionID,
1291+
limit: limit + 1,
1292+
before: query.before,
12611293
})
1262-
return c.json(messages)
1294+
1295+
if (page.length > limit) {
1296+
const messages = page.slice(1)
1297+
const first = messages.at(0)
1298+
if (first) {
1299+
const url = new URL(c.req.url)
1300+
url.searchParams.set("limit", limit.toString())
1301+
url.searchParams.set("before", first.info.id)
1302+
c.header("Link", `<${url.toString()}>; rel="prev"`)
1303+
}
1304+
return c.json(messages)
1305+
}
1306+
1307+
return c.json(page)
12631308
},
12641309
)
12651310
.get(

0 commit comments

Comments
 (0)