diff --git a/packages/appkit-ui/src/react/genie/genie-chat-message-list.tsx b/packages/appkit-ui/src/react/genie/genie-chat-message-list.tsx index e741bc29..8af90f5c 100644 --- a/packages/appkit-ui/src/react/genie/genie-chat-message-list.tsx +++ b/packages/appkit-ui/src/react/genie/genie-chat-message-list.tsx @@ -4,7 +4,11 @@ import { ScrollArea } from "../ui/scroll-area"; import { Skeleton } from "../ui/skeleton"; import { Spinner } from "../ui/spinner"; import { GenieChatMessage } from "./genie-chat-message"; -import type { GenieChatStatus, GenieMessageItem } from "./types"; +import { + type GenieChatStatus, + type GenieMessageItem, + TERMINAL_STATUSES, +} from "./types"; interface GenieChatMessageListProps { /** Array of messages to display */ @@ -166,11 +170,18 @@ export function GenieChatMessageList({ const showStreamingIndicator = status === "streaming" && lastMessage?.role === "assistant" && - lastMessage.id === ""; + !lastMessage.content && + !TERMINAL_STATUSES.has(lastMessage.status); return ( - -
+ div]:!block", + className, + )} + > +
{hasPreviousPage &&
} {status === "loading-older" && ( @@ -192,7 +203,10 @@ export function GenieChatMessageList({ {messages .filter( - (msg) => msg.role !== "assistant" || msg.id !== "" || msg.content, + (msg) => + msg.role !== "assistant" || + msg.content || + (msg.id !== "" && TERMINAL_STATUSES.has(msg.status)), ) .map((msg) => ( diff --git a/packages/appkit-ui/src/react/genie/genie-chat-message.tsx b/packages/appkit-ui/src/react/genie/genie-chat-message.tsx index 507cce71..d3b87b0c 100644 --- a/packages/appkit-ui/src/react/genie/genie-chat-message.tsx +++ b/packages/appkit-ui/src/react/genie/genie-chat-message.tsx @@ -15,11 +15,12 @@ import type { GenieAttachmentResponse, GenieMessageItem } from "./types"; marked.setOptions({ breaks: true, gfm: true }); const markdownStyles = cn( - "text-sm", + "text-sm break-words", "[&_p]:my-1 [&_ul]:my-1 [&_ol]:my-1 [&_li]:my-0", "[&_pre]:bg-background/50 [&_pre]:p-2 [&_pre]:rounded [&_pre]:text-xs [&_pre]:overflow-x-auto", "[&_code]:text-xs [&_code]:bg-background/50 [&_code]:px-1 [&_code]:rounded", - "[&_table]:text-xs [&_th]:px-2 [&_th]:py-1 [&_td]:px-2 [&_td]:py-1", + "[&_table]:text-xs [&_table]:block [&_table]:overflow-x-auto [&_table]:max-w-full", + "[&_th]:px-2 [&_th]:py-1 [&_td]:px-2 [&_td]:py-1", "[&_table]:border-collapse [&_th]:border [&_td]:border", "[&_th]:border-border [&_td]:border-border", "[&_a]:underline", @@ -70,15 +71,10 @@ export function GenieChatMessage({ -
+
{queryResult != null && ( - + )} diff --git a/packages/appkit-ui/src/react/genie/genie-query-visualization.tsx b/packages/appkit-ui/src/react/genie/genie-query-visualization.tsx index 266a88e4..f0227f79 100644 --- a/packages/appkit-ui/src/react/genie/genie-query-visualization.tsx +++ b/packages/appkit-ui/src/react/genie/genie-query-visualization.tsx @@ -4,6 +4,7 @@ import type { GenieStatementResponse } from "shared"; import { BaseChart } from "../charts/base"; import { ChartErrorBoundary } from "../charts/chart-error-boundary"; import type { ChartType } from "../charts/types"; +import { cn } from "../lib/utils"; import { Button } from "../ui/button"; import { DropdownMenu, @@ -118,11 +119,11 @@ export function GenieQueryVisualization({ ); if (!inference || !activeChartType) { - return
{dataTable}
; + return
{dataTable}
; } return ( - +
Chart @@ -157,7 +158,7 @@ export function GenieQueryVisualization({ )}
-
+
0; - if (!hasAttachments) return [makeUserItem(msg)]; + + if (!hasAttachments && TERMINAL_STATUSES.has(msg.status)) { + return [makeUserItem(msg)]; + } + if (!hasAttachments) { + return [ + makeUserItem(msg, "-user"), + { + id: msg.messageId, + role: "assistant", + content: "", + status: msg.status, + attachments: [], + queryResults: new Map(), + }, + ]; + } return [makeUserItem(msg, "-user"), makeAssistantItem(msg)]; } @@ -169,6 +190,9 @@ export function useGenieChat(options: UseGenieChatOptions): UseGenieChatReturn { const conversationIdRef = useRef(null); const nextPageTokenRef = useRef(null); const isLoadingOlderRef = useRef(false); + const processStreamEventRef = useRef<(event: GenieStreamEvent) => void>( + () => {}, + ); useEffect(() => { conversationIdRef.current = conversationId; @@ -199,22 +223,17 @@ export function useGenieChat(options: UseGenieChatOptions): UseGenieChatReturn { } case "message_result": { - const msg = event.message; - const hasAttachments = (msg.attachments?.length ?? 0) > 0; - - if (hasAttachments) { - // During streaming we already appended the user message locally, - // so only handle assistant results. Messages without attachments - // are the user-message echo from the API — skip those. - const item = makeAssistantItem(msg); - setMessages((prev) => { - const last = prev[prev.length - 1]; - if (last?.role === "assistant" && last.id === "") { - return [...prev.slice(0, -1), item]; - } - return [...prev, item]; - }); - } + const item = makeAssistantItem(event.message); + setMessages((prev) => { + const last = prev[prev.length - 1]; + if (!last || last.role !== "assistant") return prev; + + if (last.id === event.message.messageId || last.id === "") { + return [...prev.slice(0, -1), item]; + } + + return prev; + }); break; } @@ -254,6 +273,8 @@ export function useGenieChat(options: UseGenieChatOptions): UseGenieChatReturn { [persistInUrl, urlParamName], ); + processStreamEventRef.current = processStreamEvent; + const sendMessage = useCallback( (content: string) => { const trimmed = content.trim(); @@ -298,7 +319,9 @@ export function useGenieChat(options: UseGenieChatOptions): UseGenieChatReturn { signal: abortController.signal, onMessage: async (message) => { try { - processStreamEvent(JSON.parse(message.data) as GenieStreamEvent); + processStreamEventRef.current( + JSON.parse(message.data) as GenieStreamEvent, + ); } catch { // Malformed SSE data } @@ -318,13 +341,19 @@ export function useGenieChat(options: UseGenieChatOptions): UseGenieChatReturn { : prev; }); }, - }).then(() => { - if (!abortController.signal.aborted) { - setStatus((prev) => (prev === "error" ? "error" : "idle")); - } - }); + }) + .then(() => { + if (!abortController.signal.aborted) { + setStatus((prev) => (prev === "error" ? "error" : "idle")); + } + }) + .catch(() => { + if (abortController.signal.aborted) return; + setError("Connection error. Please try again."); + setStatus("error"); + }); }, - [alias, basePath, processStreamEvent], + [alias, basePath], ); /** Creates an AbortController, stores it in the given ref, and fetches a conversation page. */ @@ -362,6 +391,55 @@ export function useGenieChat(options: UseGenieChatOptions): UseGenieChatReturn { [alias, basePath], ); + const pollPendingMessage = useCallback( + ( + convId: string, + messageId: string, + parentAbortController: AbortController, + ) => { + setStatus("streaming"); + + const requestId = crypto.randomUUID(); + const url = + `${basePath}/${encodeURIComponent(alias)}/conversations/${encodeURIComponent(convId)}` + + `/messages/${encodeURIComponent(messageId)}?requestId=${encodeURIComponent(requestId)}`; + + connectSSE({ + url, + signal: parentAbortController.signal, + onMessage: async (message) => { + try { + processStreamEventRef.current( + JSON.parse(message.data) as GenieStreamEvent, + ); + } catch { + // Malformed SSE data + } + }, + onError: (err) => { + if (parentAbortController.signal.aborted) return; + setError( + err instanceof Error + ? err.message + : "Failed to poll pending message.", + ); + setStatus("error"); + }, + }) + .then(() => { + if (!parentAbortController.signal.aborted) { + setStatus((prev) => (prev === "error" ? "error" : "idle")); + } + }) + .catch(() => { + if (parentAbortController.signal.aborted) return; + setError("Failed to poll pending message."); + setStatus("error"); + }); + }, + [alias, basePath], + ); + const loadHistory = useCallback( (convId: string) => { paginationAbortRef.current?.abort(); @@ -376,13 +454,21 @@ export function useGenieChat(options: UseGenieChatOptions): UseGenieChatReturn { { errorMessage: "Failed to load conversation history." }, ); promise.then((items) => { - if (!abortController.signal.aborted) { - setMessages(items); + if (abortController.signal.aborted) return; + setMessages(items); + + const lastItem = items[items.length - 1]; + if ( + lastItem?.role === "assistant" && + !TERMINAL_STATUSES.has(lastItem.status) + ) { + pollPendingMessage(convId, lastItem.id, abortController); + } else { setStatus((prev) => (prev === "error" ? "error" : "idle")); } }); }, - [fetchPage], + [fetchPage, pollPendingMessage], ); const fetchPreviousPage = useCallback(() => { diff --git a/packages/appkit/src/connectors/genie/client.ts b/packages/appkit/src/connectors/genie/client.ts index 280c56e4..5be91b6e 100644 --- a/packages/appkit/src/connectors/genie/client.ts +++ b/packages/appkit/src/connectors/genie/client.ts @@ -183,7 +183,7 @@ export class GenieConnector { spaceId: string, content: string, conversationId: string | undefined, - options?: { timeout?: number }, + options?: { timeout?: number; signal?: AbortSignal }, ): AsyncGenerator { try { const { @@ -289,6 +289,7 @@ export class GenieConnector { includeQueryResults?: boolean; pageSize?: number; pageToken?: string; + signal?: AbortSignal; }, ): AsyncGenerator { const includeQueryResults = options?.includeQueryResults !== false; @@ -381,6 +382,77 @@ export class GenieConnector { } } + /** + * Polls a single message via `getMessage` until it reaches a terminal + * state (`COMPLETED` or `FAILED`). Yields the same event types as + * `streamSendMessage` so callers can reuse the same SSE processing logic. + */ + async *streamGetMessage( + workspaceClient: WorkspaceClient, + spaceId: string, + conversationId: string, + messageId: string, + options?: { timeout?: number; pollInterval?: number; signal?: AbortSignal }, + ): AsyncGenerator { + const pollInterval = options?.pollInterval ?? 3_000; + const signal = options?.signal; + let lastStatus = ""; + + try { + while (true) { + if (signal?.aborted) return; + + const message = await workspaceClient.genie.getMessage({ + space_id: spaceId, + conversation_id: conversationId, + message_id: messageId, + }); + + if (message.status && message.status !== lastStatus) { + lastStatus = message.status; + yield { type: "status", status: message.status }; + } + + const isTerminal = + message.status === "COMPLETED" || message.status === "FAILED"; + if (isTerminal) { + const messageResponse = toMessageResponse(message); + yield { type: "message_result", message: messageResponse }; + yield* this.emitQueryResults( + workspaceClient, + spaceId, + conversationId, + messageId, + messageResponse, + ); + return; + } + + await new Promise((resolve) => { + const timer = setTimeout(resolve, pollInterval); + signal?.addEventListener( + "abort", + () => { + clearTimeout(timer); + resolve(); + }, + { once: true }, + ); + }); + } + } catch (error) { + if (signal?.aborted) return; + logger.error( + "Genie getMessage poll error (spaceId=%s, conversationId=%s, messageId=%s): %O", + spaceId, + conversationId, + messageId, + error, + ); + yield { type: "error", error: classifyGenieError(error) }; + } + } + async sendMessage( workspaceClient: WorkspaceClient, spaceId: string, diff --git a/packages/appkit/src/plugins/genie/genie.ts b/packages/appkit/src/plugins/genie/genie.ts index 2ca348b4..712aadbf 100644 --- a/packages/appkit/src/plugins/genie/genie.ts +++ b/packages/appkit/src/plugins/genie/genie.ts @@ -65,6 +65,15 @@ export class GeniePlugin extends Plugin { await this.asUser(req)._handleGetConversation(req, res); }, }); + + this.route(router, { + name: "getMessage", + method: "get", + path: "/:alias/conversations/:conversationId/messages/:messageId", + handler: async (req: express.Request, res: express.Response) => { + await this.asUser(req)._handleGetMessage(req, res); + }, + }); } async _handleSendMessage( @@ -114,13 +123,13 @@ export class GeniePlugin extends Plugin { await this.executeStream( res, - () => + (signal) => this.genieConnector.streamSendMessage( workspaceClient, spaceId, content, conversationId, - { timeout }, + { timeout, signal }, ), streamSettings, ); @@ -166,12 +175,65 @@ export class GeniePlugin extends Plugin { await this.executeStream( res, - () => + (signal) => this.genieConnector.streamConversation( workspaceClient, spaceId, conversationId, - { includeQueryResults, pageToken }, + { includeQueryResults, pageToken, signal }, + ), + streamSettings, + ); + } + + async _handleGetMessage( + req: express.Request, + res: express.Response, + ): Promise { + const { alias, conversationId, messageId } = req.params; + const spaceId = this.resolveSpaceId(alias); + + if (!spaceId) { + res.status(404).json({ error: `Unknown space alias: ${alias}` }); + return; + } + + const requestId = + (typeof req.query.requestId === "string" && req.query.requestId) || + randomUUID(); + + logger.debug( + "Polling message %s in conversation %s from space %s (alias=%s)", + messageId, + conversationId, + spaceId, + alias, + ); + + const timeout = this.config.timeout ?? 120_000; + const streamSettings: StreamExecutionSettings = { + ...genieStreamDefaults, + default: { + ...genieStreamDefaults.default, + timeout, + }, + stream: { + ...genieStreamDefaults.stream, + streamId: requestId, + }, + }; + + const workspaceClient = getWorkspaceClient(); + + await this.executeStream( + res, + (signal) => + this.genieConnector.streamGetMessage( + workspaceClient, + spaceId, + conversationId, + messageId, + { timeout, signal }, ), streamSettings, ); diff --git a/packages/appkit/src/plugins/genie/tests/genie.test.ts b/packages/appkit/src/plugins/genie/tests/genie.test.ts index 37bcb0e6..3cf0784d 100644 --- a/packages/appkit/src/plugins/genie/tests/genie.test.ts +++ b/packages/appkit/src/plugins/genie/tests/genie.test.ts @@ -175,11 +175,15 @@ describe("Genie Plugin", () => { expect.any(Function), ); - expect(router.get).toHaveBeenCalledTimes(1); + expect(router.get).toHaveBeenCalledTimes(2); expect(router.get).toHaveBeenCalledWith( "/:alias/conversations/:conversationId", expect.any(Function), ); + expect(router.get).toHaveBeenCalledWith( + "/:alias/conversations/:conversationId/messages/:messageId", + expect.any(Function), + ); }); });