Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 16 additions & 4 deletions packages/appkit-ui/src/react/genie/genie-chat-message-list.tsx
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here are some P2 stuff from agentic review:

1. hasAttachments guard causes FAILED messages to leave a phantom placeholder
  use-genie-chat.ts:236 — In processStreamEvent, the condition last.id === "" && hasAttachments means if the API returns a message_result for a FAILED message
  (no attachments), the empty placeholder (id === "") is never replaced. The user is stuck with a phantom empty bubble. Fix: Remove the hasAttachments guard —
  when last.id === "", always replace the placeholder:
  if (last.id === "") {
    return [...prev.slice(0, -1), item];
  }

  2. No abort signal propagation in streamGetMessage polling loop
  client.ts:389-447 — The while(true) polling loop doesn't accept or check an AbortSignal. When the SSE client disconnects, the server keeps polling the
  Databricks API for up to 120s (~40 API calls). The executeStream framework passes a signal via handler(combinedSignal), but the genie.ts handler doesn't
  forward it to streamGetMessage. Fix: Accept signal?: AbortSignal in options, check signal?.aborted before each poll, and use signal-aware sleep.

  3. Double timeout: executeStream interceptor vs. polling loop deadline
  genie.ts:213-214 + client.ts:396-398 — Both the interceptor and the generator have independent 120s timeouts racing each other. If the interceptor fires first,
   the generator gets an unclean termination. Fix: Either remove the internal deadline from streamGetMessage (relying on the signal from the interceptor), or set
   the interceptor timeout slightly longer.

  4. pollPendingMessage Promise has no .catch()
  use-genie-chat.ts:402-425 — The connectSSE call is fire-and-forget with only .then(). If connectSSE rejects for an uncaught reason, the status stays stuck at
  "streaming" forever with an unhandled rejection. Fix: Add .catch() to set error state.

  5. useCallback dependency chain is fragile
  use-genie-chat.ts — The chain processStreamEvent → pollPendingMessage → loadHistory → useEffect is stable today only because processStreamEvent closes over
  stable props. Adding any state dependency (like conversationId) would cause the entire chain to collapse, triggering effect re-fires and history reloads. Fix:
  Store processStreamEvent in a ref to break the identity cascade, making pollPendingMessage and loadHistory stable.

pls double check if that makes sense and create a plan to fix it , if so 👍

Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ const STATUS_LABELS: Record<string, string> = {
COMPLETED: "Done",
};

const TERMINAL_STATUSES = new Set(["COMPLETED", "FAILED"]);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we have duplication in two files for those statuses


function formatStatus(status: string): string {
return STATUS_LABELS[status] ?? status.replace(/_/g, " ").toLowerCase();
}
Expand Down Expand Up @@ -166,11 +168,18 @@ export function GenieChatMessageList({
const showStreamingIndicator =
status === "streaming" &&
lastMessage?.role === "assistant" &&
lastMessage.id === "";
!lastMessage.content &&
!TERMINAL_STATUSES.has(lastMessage.status);

return (
<ScrollArea ref={scrollRef} className={cn("flex-1 min-h-0 p-4", className)}>
<div className="flex flex-col gap-4">
<ScrollArea
ref={scrollRef}
className={cn(
"flex-1 min-h-0 p-4 [&_[data-slot=scroll-area-viewport]>div]:!block",
className,
)}
>
<div className="flex flex-col gap-4 min-w-0">
{hasPreviousPage && <div ref={sentinelRef} className="h-px" />}

{status === "loading-older" && (
Expand All @@ -192,7 +201,10 @@ export function GenieChatMessageList({

{messages
.filter(
(msg) => msg.role !== "assistant" || msg.id !== "" || msg.content,
(msg) =>
msg.role !== "assistant" ||
msg.content ||
(msg.id !== "" && TERMINAL_STATUSES.has(msg.status)),
)
.map((msg) => (
<GenieChatMessage key={msg.id} message={msg} />
Expand Down
16 changes: 6 additions & 10 deletions packages/appkit-ui/src/react/genie/genie-chat-message.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,12 @@ import type { GenieAttachmentResponse, GenieMessageItem } from "./types";
marked.setOptions({ breaks: true, gfm: true });

const markdownStyles = cn(
"text-sm",
"text-sm break-words",
"[&_p]:my-1 [&_ul]:my-1 [&_ol]:my-1 [&_li]:my-0",
"[&_pre]:bg-background/50 [&_pre]:p-2 [&_pre]:rounded [&_pre]:text-xs [&_pre]:overflow-x-auto",
"[&_code]:text-xs [&_code]:bg-background/50 [&_code]:px-1 [&_code]:rounded",
"[&_table]:text-xs [&_th]:px-2 [&_th]:py-1 [&_td]:px-2 [&_td]:py-1",
"[&_table]:text-xs [&_table]:block [&_table]:overflow-x-auto [&_table]:max-w-full",
"[&_th]:px-2 [&_th]:py-1 [&_td]:px-2 [&_td]:py-1",
"[&_table]:border-collapse [&_th]:border [&_td]:border",
"[&_th]:border-border [&_td]:border-border",
"[&_a]:underline",
Expand Down Expand Up @@ -66,15 +67,10 @@ export function GenieChatMessage({
</AvatarFallback>
</Avatar>

<div
className={cn(
"flex flex-col gap-2 max-w-[80%] min-w-0 overflow-hidden",
isUser ? "items-end" : "items-start",
)}
>
<div className="flex flex-col gap-2 max-w-[80%] min-w-0 overflow-hidden">
<Card
className={cn(
"px-4 py-3 max-w-full overflow-hidden",
"w-full px-4 py-3 overflow-hidden",
isUser
? "bg-primary text-primary-foreground [&_*::selection]:bg-primary-foreground/30 [&::selection]:bg-primary-foreground/30"
: "bg-muted",
Expand Down Expand Up @@ -122,7 +118,7 @@ export function GenieChatMessage({
</details>
</Card>
{queryResult != null && (
<Card className="px-4 py-3 overflow-hidden">
<Card className="w-full px-4 py-3 overflow-hidden">
<GenieQueryVisualization data={queryResult} />
</Card>
)}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import type { GenieStatementResponse } from "shared";
import { BaseChart } from "../charts/base";
import { ChartErrorBoundary } from "../charts/chart-error-boundary";
import type { ChartType } from "../charts/types";
import { cn } from "../lib/utils";
import { Button } from "../ui/button";
import {
DropdownMenu,
Expand Down Expand Up @@ -118,11 +119,11 @@ export function GenieQueryVisualization({
);

if (!inference || !activeChartType) {
return <div className={className}>{dataTable}</div>;
return <div className={cn("min-w-0", className)}>{dataTable}</div>;
}

return (
<Tabs defaultValue="chart" className={className}>
<Tabs defaultValue="chart" className={cn("min-w-0", className)}>
<div className="flex items-center justify-between">
<TabsList>
<TabsTrigger value="chart">Chart</TabsTrigger>
Expand Down Expand Up @@ -157,7 +158,7 @@ export function GenieQueryVisualization({
</DropdownMenu>
)}
</div>
<div className="grid [&>*]:col-start-1 [&>*]:row-start-1">
<div className="grid min-w-0 [&>*]:col-start-1 [&>*]:row-start-1 [&>*]:min-w-0">
<TabsContent
value="chart"
forceMount
Expand Down
107 changes: 90 additions & 17 deletions packages/appkit-ui/src/react/genie/use-genie-chat.ts
Original file line number Diff line number Diff line change
Expand Up @@ -63,13 +63,35 @@ function makeAssistantItem(msg: GenieMessageResponse): GenieMessageItem {
};
}

const TERMINAL_STATUSES = new Set(["COMPLETED", "FAILED"]);

/**
* The API bundles user question (content) and AI answer (attachments) in one message.
* Split into separate user + assistant items for display.
*
* When a message is still in-progress (non-terminal status) and has no
* attachments yet, we emit an empty assistant placeholder so the UI can
* show a loading indicator and later poll for the completed response.
*/
function messageResultToItems(msg: GenieMessageResponse): GenieMessageItem[] {
const hasAttachments = (msg.attachments?.length ?? 0) > 0;
if (!hasAttachments) return [makeUserItem(msg)];

if (!hasAttachments && TERMINAL_STATUSES.has(msg.status)) {
return [makeUserItem(msg)];
}
if (!hasAttachments) {
return [
makeUserItem(msg, "-user"),
{
id: msg.messageId,
role: "assistant",
content: "",
status: msg.status,
attachments: [],
queryResults: new Map(),
},
];
}
return [makeUserItem(msg, "-user"), makeAssistantItem(msg)];
}

Expand Down Expand Up @@ -202,19 +224,21 @@ export function useGenieChat(options: UseGenieChatOptions): UseGenieChatReturn {
const msg = event.message;
const hasAttachments = (msg.attachments?.length ?? 0) > 0;

if (hasAttachments) {
// During streaming we already appended the user message locally,
// so only handle assistant results. Messages without attachments
// are the user-message echo from the API — skip those.
const item = makeAssistantItem(msg);
setMessages((prev) => {
const last = prev[prev.length - 1];
if (last?.role === "assistant" && last.id === "") {
return [...prev.slice(0, -1), item];
}
return [...prev, item];
});
}
const item = makeAssistantItem(msg);
setMessages((prev) => {
const last = prev[prev.length - 1];
if (!last || last.role !== "assistant") return prev;

if (last.id === msg.messageId) {
return [...prev.slice(0, -1), item];
}

if (last.id === "" && hasAttachments) {
return [...prev.slice(0, -1), item];
}

return prev;
});
break;
}

Expand Down Expand Up @@ -362,6 +386,47 @@ export function useGenieChat(options: UseGenieChatOptions): UseGenieChatReturn {
[alias, basePath],
);

const pollPendingMessage = useCallback(
(
convId: string,
messageId: string,
parentAbortController: AbortController,
) => {
setStatus("streaming");

const requestId = crypto.randomUUID();
const url =
`${basePath}/${encodeURIComponent(alias)}/conversations/${encodeURIComponent(convId)}` +
`/messages/${encodeURIComponent(messageId)}?requestId=${encodeURIComponent(requestId)}`;

connectSSE({
url,
signal: parentAbortController.signal,
onMessage: async (message) => {
try {
processStreamEvent(JSON.parse(message.data) as GenieStreamEvent);
} catch {
// Malformed SSE data
}
},
onError: (err) => {
if (parentAbortController.signal.aborted) return;
setError(
err instanceof Error
? err.message
: "Failed to poll pending message.",
);
setStatus("error");
},
}).then(() => {
if (!parentAbortController.signal.aborted) {
setStatus((prev) => (prev === "error" ? "error" : "idle"));
}
});
},
[alias, basePath, processStreamEvent],
);

const loadHistory = useCallback(
(convId: string) => {
paginationAbortRef.current?.abort();
Expand All @@ -376,13 +441,21 @@ export function useGenieChat(options: UseGenieChatOptions): UseGenieChatReturn {
{ errorMessage: "Failed to load conversation history." },
);
promise.then((items) => {
if (!abortController.signal.aborted) {
setMessages(items);
if (abortController.signal.aborted) return;
setMessages(items);

const lastItem = items[items.length - 1];
if (
lastItem?.role === "assistant" &&
!TERMINAL_STATUSES.has(lastItem.status)
) {
pollPendingMessage(convId, lastItem.id, abortController);
} else {
setStatus((prev) => (prev === "error" ? "error" : "idle"));
}
});
},
[fetchPage],
[fetchPage, pollPendingMessage],
);

const fetchPreviousPage = useCallback(() => {
Expand Down
65 changes: 65 additions & 0 deletions packages/appkit/src/connectors/genie/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -381,6 +381,71 @@ export class GenieConnector {
}
}

/**
* Polls a single message via `getMessage` until it reaches a terminal
* state (`COMPLETED` or `FAILED`). Yields the same event types as
* `streamSendMessage` so callers can reuse the same SSE processing logic.
*/
async *streamGetMessage(
workspaceClient: WorkspaceClient,
spaceId: string,
conversationId: string,
messageId: string,
options?: { timeout?: number; pollInterval?: number },
): AsyncGenerator<GenieStreamEvent> {
const timeout = options?.timeout ?? this.config.timeout;
const pollInterval = options?.pollInterval ?? 3_000;
const deadline =
timeout > 0 ? Date.now() + timeout : Number.POSITIVE_INFINITY;
let lastStatus = "";

try {
while (true) {
const message = await workspaceClient.genie.getMessage({
space_id: spaceId,
conversation_id: conversationId,
message_id: messageId,
});

if (message.status && message.status !== lastStatus) {
lastStatus = message.status;
yield { type: "status", status: message.status };
}

const isTerminal =
message.status === "COMPLETED" || message.status === "FAILED";
if (isTerminal) {
const messageResponse = toMessageResponse(message);
yield { type: "message_result", message: messageResponse };
yield* this.emitQueryResults(
workspaceClient,
spaceId,
conversationId,
messageId,
messageResponse,
);
return;
}

if (Date.now() >= deadline) {
yield { type: "error", error: "Message polling timed out" };
return;
}

await new Promise((r) => setTimeout(r, pollInterval));
}
} catch (error) {
logger.error(
"Genie getMessage poll error (spaceId=%s, conversationId=%s, messageId=%s): %O",
spaceId,
conversationId,
messageId,
error,
);
yield { type: "error", error: classifyGenieError(error) };
}
}

async sendMessage(
workspaceClient: WorkspaceClient,
spaceId: string,
Expand Down
Loading
Loading