Skip to content

Commit 4f8208a

Browse files
committed
fix: wire AbortSignal through message search and clear messages on new search
1 parent 59f351f commit 4f8208a

File tree

2 files changed

+92
-96
lines changed

2 files changed

+92
-96
lines changed

frontend/src/components/pages/topics/Tab.Messages/index.tsx

Lines changed: 78 additions & 85 deletions
Original file line numberDiff line numberDiff line change
@@ -485,93 +485,96 @@ export const TopicMessageView: FC<TopicMessageViewProps> = (props) => {
485485

486486
// Convert executeMessageSearch to useCallback
487487
// biome-ignore lint/complexity/noExcessiveCognitiveComplexity: complex business logic
488-
const executeMessageSearch = useCallback(async (): Promise<TopicMessage[]> => {
489-
const canUseFilters =
490-
(api.topicPermissions.get(props.topic.topicName)?.canUseSearchFilters ?? true) && !isServerless();
488+
const executeMessageSearch = useCallback(
489+
async (abortSignal?: AbortSignal): Promise<TopicMessage[]> => {
490+
const canUseFilters =
491+
(api.topicPermissions.get(props.topic.topicName)?.canUseSearchFilters ?? true) && !isServerless();
491492

492-
// Get current search params from Zustand store for filters
493-
const currentSearchParams = getSearchParams(props.topic.topicName);
493+
// Get current search params from Zustand store for filters
494+
const currentSearchParams = getSearchParams(props.topic.topicName);
494495

495-
let filterCode = '';
496-
if (canUseFilters) {
497-
const functionNames: string[] = [];
498-
const functions: string[] = [];
496+
let filterCode = '';
497+
if (canUseFilters) {
498+
const functionNames: string[] = [];
499+
const functions: string[] = [];
499500

500-
// Use filters from URL state instead of localStorage
501-
const filteredSearchParams = filters.filter(
502-
(searchParam) => searchParam.isActive && searchParam.code && searchParam.transpiledCode
503-
);
501+
// Use filters from URL state instead of localStorage
502+
const filteredSearchParams = filters.filter(
503+
(searchParam) => searchParam.isActive && searchParam.code && searchParam.transpiledCode
504+
);
504505

505-
for (const searchParam of filteredSearchParams) {
506-
const name = `filter${functionNames.length + 1}`;
507-
functionNames.push(name);
508-
functions.push(`function ${name}() {
506+
for (const searchParam of filteredSearchParams) {
507+
const name = `filter${functionNames.length + 1}`;
508+
functionNames.push(name);
509+
functions.push(`function ${name}() {
509510
${wrapFilterFragment(searchParam.transpiledCode)}
510511
}`);
511-
}
512+
}
512513

513-
if (functions.length > 0) {
514-
filterCode = `${functions.join('\n\n')}\n\nreturn ${functionNames.map((f) => `${f}()`).join(' && ')}`;
515-
if (IsDev) {
516-
// biome-ignore lint/suspicious/noConsole: intentional console usage
517-
console.log(`constructed filter code (${functions.length} functions)`, `\n\n${filterCode}`);
514+
if (functions.length > 0) {
515+
filterCode = `${functions.join('\n\n')}\n\nreturn ${functionNames.map((f) => `${f}()`).join(' && ')}`;
516+
if (IsDev) {
517+
// biome-ignore lint/suspicious/noConsole: intentional console usage
518+
console.log(`constructed filter code (${functions.length} functions)`, `\n\n${filterCode}`);
519+
}
518520
}
519521
}
520-
}
521522

522-
const request = {
523-
topicName: props.topic.topicName,
524-
partitionId: partitionID,
525-
startOffset,
526-
startTimestamp: currentSearchParams?.startTimestamp ?? uiState.topicSettings.searchParams.startTimestamp,
527-
maxResults,
528-
filterInterpreterCode: encodeBase64(sanitizeString(filterCode)),
529-
includeRawPayload: true,
530-
keyDeserializer,
531-
valueDeserializer,
532-
} as MessageSearchRequest;
523+
const request = {
524+
topicName: props.topic.topicName,
525+
partitionId: partitionID,
526+
startOffset,
527+
startTimestamp: currentSearchParams?.startTimestamp ?? uiState.topicSettings.searchParams.startTimestamp,
528+
maxResults,
529+
filterInterpreterCode: encodeBase64(sanitizeString(filterCode)),
530+
includeRawPayload: true,
531+
keyDeserializer,
532+
valueDeserializer,
533+
} as MessageSearchRequest;
533534

534-
try {
535-
setFetchError(null);
536-
setSearchPhase('Searching...');
535+
try {
536+
setFetchError(null);
537+
setSearchPhase('Searching...');
538+
539+
const messageSearch = createMessageSearch();
540+
const startTime = Date.now();
541+
542+
const result = await messageSearch.startSearch(request, abortSignal).catch((err: Error) => {
543+
const msg = err.message ?? String(err);
544+
// biome-ignore lint/suspicious/noConsole: intentional console usage
545+
console.error(`error in searchTopicMessages: ${msg}`);
546+
setFetchError(err);
547+
setSearchPhase(null);
548+
return [];
549+
});
537550

538-
const messageSearch = createMessageSearch();
539-
const startTime = Date.now();
551+
const endTime = Date.now();
552+
setMessages(result);
553+
setSearchPhase(null);
554+
setElapsedMs(endTime - startTime);
555+
setBytesConsumed(messageSearch.bytesConsumed);
556+
setTotalMessagesConsumed(messageSearch.totalMessagesConsumed);
540557

541-
const result = await messageSearch.startSearch(request).catch((err: Error) => {
542-
const msg = err.message ?? String(err);
558+
return result;
559+
} catch (error: unknown) {
543560
// biome-ignore lint/suspicious/noConsole: intentional console usage
544-
console.error(`error in searchTopicMessages: ${msg}`);
545-
setFetchError(err);
561+
console.error(`error in searchTopicMessages: ${(error as Error).message ?? String(error)}`);
562+
setFetchError(error as Error);
546563
setSearchPhase(null);
547564
return [];
548-
});
549-
550-
const endTime = Date.now();
551-
setMessages(result);
552-
setSearchPhase(null);
553-
setElapsedMs(endTime - startTime);
554-
setBytesConsumed(messageSearch.bytesConsumed);
555-
setTotalMessagesConsumed(messageSearch.totalMessagesConsumed);
556-
557-
return result;
558-
} catch (error: unknown) {
559-
// biome-ignore lint/suspicious/noConsole: intentional console usage
560-
console.error(`error in searchTopicMessages: ${(error as Error).message ?? String(error)}`);
561-
setFetchError(error as Error);
562-
setSearchPhase(null);
563-
return [];
564-
}
565-
}, [
566-
props.topic.topicName,
567-
partitionID,
568-
startOffset,
569-
maxResults,
570-
getSearchParams,
571-
keyDeserializer,
572-
valueDeserializer,
573-
filters,
574-
]);
565+
}
566+
},
567+
[
568+
props.topic.topicName,
569+
partitionID,
570+
startOffset,
571+
maxResults,
572+
getSearchParams,
573+
keyDeserializer,
574+
valueDeserializer,
575+
filters,
576+
]
577+
);
575578

576579
// Convert searchFunc to useCallback
577580
const searchFunc = useCallback(
@@ -582,12 +585,6 @@ export const TopicMessageView: FC<TopicMessageViewProps> = (props) => {
582585
const searchParams = `${startOffset} ${maxResults} ${partitionID} ${currentSearchParams?.startTimestamp ?? uiState.topicSettings.searchParams.startTimestamp} ${keyDeserializer} ${valueDeserializer} ${filtersSignature}`;
583586

584587
if (searchParams === currentSearchRunRef.current && source === 'auto') {
585-
// biome-ignore lint/suspicious/noConsole: intentional console usage
586-
console.log('ignoring search, search params are up to date, and source is auto', {
587-
newParams: searchParams,
588-
oldParams: currentSearchRunRef.current,
589-
trigger: source,
590-
});
591588
return;
592589
}
593590

@@ -597,19 +594,15 @@ export const TopicMessageView: FC<TopicMessageViewProps> = (props) => {
597594
abortControllerRef.current = null;
598595
}
599596

600-
// biome-ignore lint/suspicious/noConsole: intentional console usage
601-
console.log('starting a new message search', {
602-
newParams: searchParams,
603-
oldParams: currentSearchRunRef.current,
604-
trigger: source,
605-
});
606-
607597
// Start new search
608598
currentSearchRunRef.current = searchParams;
609599
abortControllerRef.current = new AbortController();
610600

601+
// Clear messages immediately when starting new search
602+
setMessages([]);
603+
611604
try {
612-
executeMessageSearch()
605+
executeMessageSearch(abortControllerRef.current?.signal)
613606
// biome-ignore lint/suspicious/noConsole: intentional console usage
614607
.catch(console.error)
615608
.finally(() => {

frontend/src/state/backend-api.ts

Lines changed: 14 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
import { create, type Registry } from '@bufbuild/protobuf';
1515
import type { ConnectError } from '@connectrpc/connect';
1616
import { Code } from '@connectrpc/connect';
17+
import { createLinkedAbortController } from '@connectrpc/connect/protocol';
1718
import { createStandaloneToast, redpandaTheme, redpandaToastOptions } from '@redpanda-data/ui';
1819
import {
1920
consoleHasEnterpriseFeature,
@@ -2765,7 +2766,7 @@ export function createMessageSearch() {
27652766
messages: observable([] as TopicMessage[], { deep: false }),
27662767

27672768
// biome-ignore lint/complexity/noExcessiveCognitiveComplexity: complexity 64, refactor later
2768-
async startSearch(_searchRequest: MessageSearchRequest): Promise<TopicMessage[]> {
2769+
async startSearch(_searchRequest: MessageSearchRequest, externalSignal?: AbortSignal): Promise<TopicMessage[]> {
27692770
// https://connectrpc.com/docs/web/using-clients
27702771
// https://github.com/connectrpc/connect-es
27712772
// https://github.com/connectrpc/examples-es
@@ -2800,8 +2801,11 @@ export function createMessageSearch() {
28002801
this.messages.length = 0;
28012802
this.elapsedMs = null;
28022803

2803-
const messageSearchAbortController = new AbortController();
2804-
this.abortController = messageSearchAbortController;
2804+
// Links external signal (component control) with internal controller - both can abort the same stream
2805+
// Ensures cleanup works from component unmount AND direct stopSearch() calls
2806+
const linkedController = createLinkedAbortController(externalSignal);
2807+
this.abortController = linkedController;
2808+
const abortSignal = linkedController.signal;
28052809

28062810
// do it
28072811
const req = create(ListMessagesRequestSchema);
@@ -2826,23 +2830,19 @@ export function createMessageSearch() {
28262830

28272831
try {
28282832
for await (const res of client.listMessages(req, {
2829-
signal: messageSearchAbortController.signal,
2833+
signal: abortSignal,
28302834
timeoutMs,
28312835
})) {
2832-
if (messageSearchAbortController.signal.aborted) {
2836+
if (abortSignal.aborted) {
28332837
break;
28342838
}
28352839

28362840
try {
28372841
switch (res.controlMessage.case) {
28382842
case 'phase':
2839-
// biome-ignore lint/suspicious/noConsole: intentional console usage
2840-
console.log(`phase: ${res.controlMessage.value.phase}`);
28412843
this.searchPhase = res.controlMessage.value.phase;
28422844
break;
28432845
case 'progress':
2844-
// biome-ignore lint/suspicious/noConsole: intentional console usage
2845-
console.log(`progress: ${res.controlMessage.value.messagesConsumed}`);
28462846
this.bytesConsumed = Number(res.controlMessage.value.bytesConsumed);
28472847
this.totalMessagesConsumed = Number(res.controlMessage.value.messagesConsumed);
28482848
break;
@@ -3094,19 +3094,22 @@ export function createMessageSearch() {
30943094
}
30953095
}
30963096
} catch (e) {
3097-
this.abortController = null;
30983097
this.searchPhase = 'Done';
30993098
this.bytesConsumed = 0;
31003099
this.totalMessagesConsumed = 0;
31013100
this.searchPhase = null;
31023101
// https://connectrpc.com/docs/web/errors
3103-
if (messageSearchAbortController.signal.aborted) {
3102+
if (abortSignal.aborted) {
31043103
// Do not throw, this is a user cancellation
31053104
} else {
31063105
// biome-ignore lint/suspicious/noConsole: intentional console usage
31073106
console.error('startMessageSearchNew: error in await loop of client.listMessages', { error: e });
31083107
throw e;
31093108
}
3109+
} finally {
3110+
// Always cleanup the abort controller reference to prevent memory leaks
3111+
// This ensures cleanup happens whether the stream completes, errors, or is cancelled
3112+
this.abortController = null;
31103113
}
31113114

31123115
// one done

0 commit comments

Comments
 (0)