Skip to content

Commit f0131ec

Browse files
committed
fix: address race conditions and improve pagination robustness
Frontend fixes: - Fix stale closure race condition in auto-load effect by capturing messageSearch reference and validating before state update - Add proper cleanup for loadMore operations on component unmount - Fix circular effect dependency in page reset by using ref instead of state in dependency array - Add retry limit (max 3 attempts) for loadMore failures to prevent infinite retry loops on network errors - Track mounted state to prevent state updates on unmounted component Backend fix: - Change pagination logging from INFO to DEBUG level to reduce log verbosity in production
1 parent 4956dff commit f0131ec

File tree

2 files changed

+67
-21
lines changed

2 files changed

+67
-21
lines changed

backend/pkg/console/list_messages.go

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -144,7 +144,7 @@ type TopicConsumeRequest struct {
144144
// 5. Start consume request via the Kafka Service
145145
// 6. Send a completion message to the frontend, that will show stats about the completed (or aborted) message search
146146
//
147-
//nolint:cyclop // complex logic
147+
//nolint:cyclop,gocognit // complex logic with multiple code paths for pagination vs legacy mode
148148
func (s *Service) ListMessages(ctx context.Context, listReq ListMessageRequest, progress IListMessagesProgress) error {
149149
cl, adminCl, err := s.kafkaClientFactory.GetKafkaClient(ctx)
150150
if err != nil {
@@ -491,6 +491,8 @@ func (s *Service) calculateConsumeRequests(
491491

492492
// calculateConsumeRequestsWithPageToken calculates consume requests for pagination mode (descending order).
493493
// It returns the consume requests map, the next page token, and whether more pages are available.
494+
//
495+
//nolint:gocognit,cyclop // complex logic for round-robin distribution across partitions with watermark tracking
494496
func (s *Service) calculateConsumeRequestsWithPageToken(
495497
ctx context.Context,
496498
token *PageToken,
@@ -718,14 +720,14 @@ func (s *Service) calculateConsumeRequestsWithPageToken(
718720

719721
// Log detailed partition information for debugging
720722
for partID, req := range requests {
721-
s.logger.InfoContext(ctx, "partition consume request",
723+
s.logger.DebugContext(ctx, "partition consume request",
722724
slog.Int("partition_id", int(partID)),
723725
slog.Int64("start_offset", req.StartOffset),
724726
slog.Int64("end_offset", req.EndOffset),
725727
slog.Int64("max_message_count", req.MaxMessageCount))
726728
}
727729

728-
s.logger.InfoContext(ctx, "pagination distribution complete",
730+
s.logger.DebugContext(ctx, "pagination distribution complete",
729731
slog.Int("page_size", token.PageSize),
730732
slog.Int64("total_assigned", totalAssigned),
731733
slog.Int("num_partitions", len(requests)))

frontend/src/components/pages/topics/Tab.Messages/index.tsx

Lines changed: 62 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -479,6 +479,13 @@ export const TopicMessageView: FC<TopicMessageViewProps> = (props) => {
479479
const prevPageIndexRef = useRef<number>(pageIndex);
480480
const [forceRefresh, setForceRefresh] = useState(0);
481481

482+
// Refs for tracking loadMore state to prevent stale closures and memory leaks
483+
const currentMessageSearchRef = useRef<ReturnType<typeof createMessageSearch> | null>(null);
484+
const loadMoreAbortRef = useRef<AbortController | null>(null);
485+
const [loadMoreFailures, setLoadMoreFailures] = useState(0);
486+
const isMountedRef = useRef(true);
487+
const MAX_LOAD_MORE_RETRIES = 3;
488+
482489
// Filter messages based on quick search
483490
const filteredMessages = quickSearch
484491
? messages.filter((m) => {
@@ -497,16 +504,25 @@ export const TopicMessageView: FC<TopicMessageViewProps> = (props) => {
497504
[topicSettings?.previewTags]
498505
);
499506

507+
// Keep currentMessageSearchRef in sync with messageSearch state
508+
useEffect(() => {
509+
currentMessageSearchRef.current = messageSearch;
510+
}, [messageSearch]);
511+
500512
// Cleanup effect (replaces componentWillUnmount)
501-
useEffect(
502-
() => () => {
513+
useEffect(() => {
514+
isMountedRef.current = true;
515+
return () => {
516+
isMountedRef.current = false;
503517
if (abortControllerRef.current) {
504518
abortControllerRef.current.abort();
505519
}
520+
if (loadMoreAbortRef.current) {
521+
loadMoreAbortRef.current.abort();
522+
}
506523
appGlobal.searchMessagesFunc = undefined;
507-
},
508-
[]
509-
);
524+
};
525+
}, []);
510526

511527
// Clear sorting when entering unlimited pagination mode
512528
useEffect(() => {
@@ -728,7 +744,8 @@ export const TopicMessageView: FC<TopicMessageViewProps> = (props) => {
728744
!messageSearch ||
729745
!messageSearch.nextPageToken ||
730746
isLoadingMore ||
731-
searchPhase
747+
searchPhase ||
748+
loadMoreFailures >= MAX_LOAD_MORE_RETRIES
732749
) {
733750
return;
734751
}
@@ -737,23 +754,44 @@ export const TopicMessageView: FC<TopicMessageViewProps> = (props) => {
737754
const isOnLastPage = pageIndex >= totalLoadedPages - 1;
738755

739756
if (isOnLastPage && messageSearch.nextPageToken) {
757+
// Create abort controller for this loadMore operation
758+
const abortController = new AbortController();
759+
loadMoreAbortRef.current = abortController;
760+
761+
// Capture the current messageSearch reference to detect stale responses
762+
const capturedMessageSearch = messageSearch;
763+
740764
setIsLoadingMore(true);
741-
messageSearch
765+
capturedMessageSearch
742766
.loadMore()
743767
.then(() => {
744-
setMessages([...messageSearch.messages]);
768+
// Only update state if component is still mounted and this is still the current search
769+
if (isMountedRef.current && currentMessageSearchRef.current === capturedMessageSearch) {
770+
setMessages([...capturedMessageSearch.messages]);
771+
// Reset failure count on success
772+
setLoadMoreFailures(0);
773+
}
745774
})
746775
.catch((err) => {
747-
toast({
748-
title: 'Failed to load more messages',
749-
description: (err as Error).message,
750-
status: 'error',
751-
duration: 5000,
752-
isClosable: true,
753-
});
776+
// Only show error if component is still mounted and not aborted
777+
if (isMountedRef.current && !abortController.signal.aborted) {
778+
setLoadMoreFailures((prev) => prev + 1);
779+
toast({
780+
title: 'Failed to load more messages',
781+
description: (err as Error).message,
782+
status: 'error',
783+
duration: 5000,
784+
isClosable: true,
785+
});
786+
}
754787
})
755788
.finally(() => {
756-
setIsLoadingMore(false);
789+
if (isMountedRef.current) {
790+
setIsLoadingMore(false);
791+
}
792+
if (loadMoreAbortRef.current === abortController) {
793+
loadMoreAbortRef.current = null;
794+
}
757795
});
758796
}
759797
}, [
@@ -766,22 +804,28 @@ export const TopicMessageView: FC<TopicMessageViewProps> = (props) => {
766804
messages.length,
767805
pageSize,
768806
toast,
807+
loadMoreFailures,
769808
]);
770809

771810
// Reset pagination when navigating back to page 1 in unlimited mode
772811
// This prevents keeping many pages in memory and triggering excessive requests
773812
useEffect(() => {
774813
// Check if we're in unlimited mode and user navigated back to page 1 from a higher page
775-
if (maxResults === -1 && pageIndex === 0 && prevPageIndexRef.current > 1 && messageSearch) {
814+
// Use ref to check messageSearch existence to avoid circular dependency
815+
if (maxResults === -1 && pageIndex === 0 && prevPageIndexRef.current > 1 && currentMessageSearchRef.current) {
776816
// Clear the message search and state
777817
setMessages([]);
778818
setMessageSearch(null);
819+
// Reset failure count when resetting pagination
820+
setLoadMoreFailures(0);
779821
// Clear the search run ref and trigger a forced refresh
780822
currentSearchRunRef.current = null;
781823
setForceRefresh((prev) => prev + 1);
782824
}
783825
prevPageIndexRef.current = pageIndex;
784-
}, [pageIndex, maxResults, messageSearch]);
826+
// Note: messageSearch intentionally excluded to avoid circular dependency
827+
// We use currentMessageSearchRef.current instead which is always in sync
828+
}, [pageIndex, maxResults]);
785829

786830
// Message Table rendering variables and functions
787831
const paginationParams = {

0 commit comments

Comments
 (0)