Skip to content

Commit ef4921d

Browse files
committed
fix: address race conditions and improve pagination robustness
Frontend fixes: - Fix stale closure race condition in auto-load effect by capturing messageSearch reference and validating before state update - Add proper cleanup for loadMore operations on component unmount - Fix circular effect dependency in page reset by using ref instead of state in dependency array - Add retry limit (max 3 attempts) for loadMore failures to prevent infinite retry loops on network errors - Track mounted state to prevent state updates on unmounted component Backend fix: - Change pagination logging from INFO to DEBUG level to reduce log verbosity in production
1 parent 1cc8a02 commit ef4921d

File tree

2 files changed

+67
-21
lines changed

2 files changed

+67
-21
lines changed

backend/pkg/console/list_messages.go

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -144,7 +144,7 @@ type TopicConsumeRequest struct {
144144
// 5. Start consume request via the Kafka Service
145145
// 6. Send a completion message to the frontend, that will show stats about the completed (or aborted) message search
146146
//
147-
//nolint:cyclop // complex logic
147+
//nolint:cyclop,gocognit // complex logic with multiple code paths for pagination vs legacy mode
148148
func (s *Service) ListMessages(ctx context.Context, listReq ListMessageRequest, progress IListMessagesProgress) error {
149149
cl, adminCl, err := s.kafkaClientFactory.GetKafkaClient(ctx)
150150
if err != nil {
@@ -529,6 +529,8 @@ func (s *Service) calculateConsumeRequests(
529529

530530
// calculateConsumeRequestsWithPageToken calculates consume requests for pagination mode (descending order).
531531
// It returns the consume requests map, the next page token, and whether more pages are available.
532+
//
533+
//nolint:gocognit,cyclop // complex logic for round-robin distribution across partitions with watermark tracking
532534
func (s *Service) calculateConsumeRequestsWithPageToken(
533535
ctx context.Context,
534536
token *PageToken,
@@ -756,14 +758,14 @@ func (s *Service) calculateConsumeRequestsWithPageToken(
756758

757759
// Log detailed partition information for debugging
758760
for partID, req := range requests {
759-
s.logger.InfoContext(ctx, "partition consume request",
761+
s.logger.DebugContext(ctx, "partition consume request",
760762
slog.Int("partition_id", int(partID)),
761763
slog.Int64("start_offset", req.StartOffset),
762764
slog.Int64("end_offset", req.EndOffset),
763765
slog.Int64("max_message_count", req.MaxMessageCount))
764766
}
765767

766-
s.logger.InfoContext(ctx, "pagination distribution complete",
768+
s.logger.DebugContext(ctx, "pagination distribution complete",
767769
slog.Int("page_size", token.PageSize),
768770
slog.Int64("total_assigned", totalAssigned),
769771
slog.Int("num_partitions", len(requests)))

frontend/src/components/pages/topics/Tab.Messages/index.tsx

Lines changed: 62 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -479,6 +479,13 @@ export const TopicMessageView: FC<TopicMessageViewProps> = (props) => {
479479
const prevPageIndexRef = useRef<number>(pageIndex);
480480
const [forceRefresh, setForceRefresh] = useState(0);
481481

482+
// Refs for tracking loadMore state to prevent stale closures and memory leaks
483+
const currentMessageSearchRef = useRef<ReturnType<typeof createMessageSearch> | null>(null);
484+
const loadMoreAbortRef = useRef<AbortController | null>(null);
485+
const [loadMoreFailures, setLoadMoreFailures] = useState(0);
486+
const isMountedRef = useRef(true);
487+
const MAX_LOAD_MORE_RETRIES = 3;
488+
482489
// Filter messages based on quick search
483490
const filteredMessages = quickSearch
484491
? messages.filter((m) => {
@@ -497,16 +504,25 @@ export const TopicMessageView: FC<TopicMessageViewProps> = (props) => {
497504
[topicSettings?.previewTags]
498505
);
499506

507+
// Keep currentMessageSearchRef in sync with messageSearch state
508+
useEffect(() => {
509+
currentMessageSearchRef.current = messageSearch;
510+
}, [messageSearch]);
511+
500512
// Cleanup effect (replaces componentWillUnmount)
501-
useEffect(
502-
() => () => {
513+
useEffect(() => {
514+
isMountedRef.current = true;
515+
return () => {
516+
isMountedRef.current = false;
503517
if (abortControllerRef.current) {
504518
abortControllerRef.current.abort();
505519
}
520+
if (loadMoreAbortRef.current) {
521+
loadMoreAbortRef.current.abort();
522+
}
506523
appGlobal.searchMessagesFunc = undefined;
507-
},
508-
[]
509-
);
524+
};
525+
}, []);
510526

511527
// Clear sorting when entering unlimited pagination mode
512528
useEffect(() => {
@@ -720,7 +736,8 @@ export const TopicMessageView: FC<TopicMessageViewProps> = (props) => {
720736
!messageSearch ||
721737
!messageSearch.nextPageToken ||
722738
isLoadingMore ||
723-
searchPhase
739+
searchPhase ||
740+
loadMoreFailures >= MAX_LOAD_MORE_RETRIES
724741
) {
725742
return;
726743
}
@@ -729,23 +746,44 @@ export const TopicMessageView: FC<TopicMessageViewProps> = (props) => {
729746
const isOnLastPage = pageIndex >= totalLoadedPages - 1;
730747

731748
if (isOnLastPage && messageSearch.nextPageToken) {
749+
// Create abort controller for this loadMore operation
750+
const abortController = new AbortController();
751+
loadMoreAbortRef.current = abortController;
752+
753+
// Capture the current messageSearch reference to detect stale responses
754+
const capturedMessageSearch = messageSearch;
755+
732756
setIsLoadingMore(true);
733-
messageSearch
757+
capturedMessageSearch
734758
.loadMore()
735759
.then(() => {
736-
setMessages([...messageSearch.messages]);
760+
// Only update state if component is still mounted and this is still the current search
761+
if (isMountedRef.current && currentMessageSearchRef.current === capturedMessageSearch) {
762+
setMessages([...capturedMessageSearch.messages]);
763+
// Reset failure count on success
764+
setLoadMoreFailures(0);
765+
}
737766
})
738767
.catch((err) => {
739-
toast({
740-
title: 'Failed to load more messages',
741-
description: (err as Error).message,
742-
status: 'error',
743-
duration: 5000,
744-
isClosable: true,
745-
});
768+
// Only show error if component is still mounted and not aborted
769+
if (isMountedRef.current && !abortController.signal.aborted) {
770+
setLoadMoreFailures((prev) => prev + 1);
771+
toast({
772+
title: 'Failed to load more messages',
773+
description: (err as Error).message,
774+
status: 'error',
775+
duration: 5000,
776+
isClosable: true,
777+
});
778+
}
746779
})
747780
.finally(() => {
748-
setIsLoadingMore(false);
781+
if (isMountedRef.current) {
782+
setIsLoadingMore(false);
783+
}
784+
if (loadMoreAbortRef.current === abortController) {
785+
loadMoreAbortRef.current = null;
786+
}
749787
});
750788
}
751789
}, [
@@ -758,22 +796,28 @@ export const TopicMessageView: FC<TopicMessageViewProps> = (props) => {
758796
messages.length,
759797
pageSize,
760798
toast,
799+
loadMoreFailures,
761800
]);
762801

763802
// Reset pagination when navigating back to page 1 in unlimited mode
764803
// This prevents keeping many pages in memory and triggering excessive requests
765804
useEffect(() => {
766805
// Check if we're in unlimited mode and user navigated back to page 1 from a higher page
767-
if (maxResults === -1 && pageIndex === 0 && prevPageIndexRef.current > 1 && messageSearch) {
806+
// Use ref to check messageSearch existence to avoid circular dependency
807+
if (maxResults === -1 && pageIndex === 0 && prevPageIndexRef.current > 1 && currentMessageSearchRef.current) {
768808
// Clear the message search and state
769809
setMessages([]);
770810
setMessageSearch(null);
811+
// Reset failure count when resetting pagination
812+
setLoadMoreFailures(0);
771813
// Clear the search run ref and trigger a forced refresh
772814
currentSearchRunRef.current = null;
773815
setForceRefresh((prev) => prev + 1);
774816
}
775817
prevPageIndexRef.current = pageIndex;
776-
}, [pageIndex, maxResults, messageSearch]);
818+
// Note: messageSearch intentionally excluded to avoid circular dependency
819+
// We use currentMessageSearchRef.current instead which is always in sync
820+
}, [pageIndex, maxResults]);
777821

778822
// Message Table rendering variables and functions
779823
const paginationParams = {

0 commit comments

Comments
 (0)