diff --git a/backend/pkg/api/connect/service/console/service.go b/backend/pkg/api/connect/service/console/service.go index c0ca8353a7..aa355b732d 100644 --- a/backend/pkg/api/connect/service/console/service.go +++ b/backend/pkg/api/connect/service/console/service.go @@ -61,6 +61,7 @@ func (api *Service) ListMessages( MaxResults: int(req.Msg.GetMaxResults()), FilterInterpreterCode: req.Msg.GetFilterInterpreterCode(), Enterprise: req.Msg.GetEnterprise(), + PageToken: req.Msg.GetPageToken(), } interpreterCode, err := lmq.DecodeInterpreterCode() @@ -96,6 +97,8 @@ func (api *Service) ListMessages( IgnoreMaxSizeLimit: req.Msg.GetIgnoreMaxSizeLimit(), KeyDeserializer: fromProtoEncoding(req.Msg.GetKeyDeserializer()), ValueDeserializer: fromProtoEncoding(req.Msg.GetValueDeserializer()), + PageToken: lmq.PageToken, + PageSize: int(req.Msg.GetPageSize()), } timeout := 35 * time.Second diff --git a/backend/pkg/api/connect/service/console/stream_progress_reporter.go b/backend/pkg/api/connect/service/console/stream_progress_reporter.go index 640afaff02..66d0bd770a 100644 --- a/backend/pkg/api/connect/service/console/stream_progress_reporter.go +++ b/backend/pkg/api/connect/service/console/stream_progress_reporter.go @@ -216,7 +216,7 @@ func (p *streamProgressReporter) OnMessage(message *console.TopicMessage) { } } -func (p *streamProgressReporter) OnComplete(elapsedMs int64, isCancelled bool) { +func (p *streamProgressReporter) OnComplete(elapsedMs int64, isCancelled bool, nextPageToken string) { p.writeMutex.Lock() defer p.writeMutex.Unlock() @@ -225,6 +225,7 @@ func (p *streamProgressReporter) OnComplete(elapsedMs int64, isCancelled bool) { IsCancelled: isCancelled, MessagesConsumed: p.messagesConsumed.Load(), BytesConsumed: p.bytesConsumed.Load(), + NextPageToken: nextPageToken, } if err := p.stream.Send( diff --git a/backend/pkg/api/handle_topic_messages_integration_test.go b/backend/pkg/api/handle_topic_messages_integration_test.go index 2a4b49b417..03f1f52dd0 100644 --- a/backend/pkg/api/handle_topic_messages_integration_test.go +++ b/backend/pkg/api/handle_topic_messages_integration_test.go @@ -28,6 +28,7 @@ import ( "connectrpc.com/connect" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "github.com/twmb/franz-go/pkg/kadm" "github.com/twmb/franz-go/pkg/kgo" "github.com/twmb/franz-go/pkg/sr" "google.golang.org/genproto/googleapis/rpc/errdetails" @@ -1050,3 +1051,223 @@ func randomString(n int) string { } return string(b) } + +func (s *APIIntegrationTestSuite) TestListMessages_Pagination() { + t := s.T() + + require := require.New(t) + assert := assert.New(t) + + ctx := t.Context() + + client := v1ac.NewConsoleServiceClient( + http.DefaultClient, + s.httpAddress(), + connect.WithGRPCWeb(), + ) + + // Create test topic with multiple messages + testTopicName := testutil.TopicNameForTest("pagination_test") + const messageCount = 150 + + // Create topic + _, err := s.kafkaAdminClient.CreateTopic(ctx, 3, 1, nil, testTopicName) + require.NoError(err) + + defer func() { + s.kafkaAdminClient.DeleteTopics(ctx, testTopicName) + }() + + // Wait for topic to be ready + require.Eventually(func() bool { + metadata, err := s.kafkaAdminClient.Metadata(ctx, testTopicName) + if err != nil { + return false + } + topic, exists := metadata.Topics[testTopicName] + return exists && topic.Err == nil && len(topic.Partitions) == 3 + }, 30*time.Second, 100*time.Millisecond, "Topic should be created and ready") + + // Produce messages + records := make([]*kgo.Record, messageCount) + for i := 0; i < messageCount; i++ { + records[i] = &kgo.Record{ + Key: []byte(fmt.Sprintf("key-%d", i)), + Value: []byte(fmt.Sprintf(`{"id": %d, "message": "test message %d"}`, i, i)), + Topic: testTopicName, + } + } + + produceResults := s.kafkaClient.ProduceSync(ctx, records...) + require.NoError(produceResults.FirstErr()) + + // Give Kafka a moment to commit + time.Sleep(500 * time.Millisecond) + + // Debug: Check actual message distribution across partitions + offsets, err := s.kafkaAdminClient.ListEndOffsets(ctx, testTopicName) + require.NoError(err) + totalMessages := int64(0) + offsets.Each(func(offset kadm.ListedOffset) { + count := offset.Offset + totalMessages += count + t.Logf("Partition %d: %d messages (offset: 0 to %d)", offset.Partition, count, offset.Offset-1) + }) + t.Logf("Total messages across all partitions: %d (expected %d)", totalMessages, messageCount) + + t.Run("first page with pagination mode", func(t *testing.T) { + stream, err := client.ListMessages(ctx, connect.NewRequest(&v1pb.ListMessagesRequest{ + Topic: testTopicName, + PartitionId: -1, // All partitions + StartOffset: -1, // Recent + MaxResults: -1, // Pagination mode + PageToken: "", // First page + })) + require.NoError(err) + require.NotNil(stream) + + var messages []*v1pb.ListMessagesResponse_DataMessage + var done *v1pb.ListMessagesResponse_StreamCompletedMessage + + for stream.Receive() { + msg := stream.Msg() + switch v := msg.ControlMessage.(type) { + case *v1pb.ListMessagesResponse_Data: + messages = append(messages, v.Data) + case *v1pb.ListMessagesResponse_Done: + done = v.Done + } + } + + require.NoError(stream.Err()) + require.NotNil(done, "Should have completion message") + + // Verify first page results + assert.Equal(int64(50), done.MessagesConsumed, "Should consume 50 messages (default page size)") + assert.NotEmpty(done.NextPageToken, "Should have next page token") + assert.Len(messages, 50, "Should return 50 messages") + + // Verify descending order within partitions + // Group messages by partition + messagesByPartition := make(map[int32][]*v1pb.ListMessagesResponse_DataMessage) + for _, msg := range messages { + messagesByPartition[msg.PartitionId] = append(messagesByPartition[msg.PartitionId], msg) + } + + // Check each partition is in descending order + for partitionID, partitionMessages := range messagesByPartition { + if len(partitionMessages) > 1 { + for i := 0; i < len(partitionMessages)-1; i++ { + assert.Greater(partitionMessages[i].Offset, partitionMessages[i+1].Offset, + "Messages in partition %d should be in descending offset order", partitionID) + } + } + } + }) + + t.Run("paginate through all messages", func(t *testing.T) { + var allMessages []*v1pb.ListMessagesResponse_DataMessage + pageToken := "" + pageCount := 0 + maxPages := 5 // Safety limit + + for pageCount < maxPages { + stream, err := client.ListMessages(ctx, connect.NewRequest(&v1pb.ListMessagesRequest{ + Topic: testTopicName, + PartitionId: -1, + StartOffset: -1, + MaxResults: -1, + PageToken: pageToken, + })) + require.NoError(err) + require.NotNil(stream) + + var pageMessages []*v1pb.ListMessagesResponse_DataMessage + var done *v1pb.ListMessagesResponse_StreamCompletedMessage + + for stream.Receive() { + msg := stream.Msg() + switch v := msg.ControlMessage.(type) { + case *v1pb.ListMessagesResponse_Data: + pageMessages = append(pageMessages, v.Data) + case *v1pb.ListMessagesResponse_Done: + done = v.Done + } + } + + require.NoError(stream.Err()) + require.NotNil(done) + + allMessages = append(allMessages, pageMessages...) + pageCount++ + + t.Logf("Page %d: fetched %d messages, nextPageToken=%s", pageCount, len(pageMessages), done.NextPageToken) + + if done.NextPageToken == "" { + break + } + + pageToken = done.NextPageToken + } + + // Verify we got all messages + assert.Equal(messageCount, len(allMessages), "Should fetch all %d messages across pages", messageCount) + assert.LessOrEqual(pageCount, 4, "Should complete in 3-4 pages (150 messages / 50 per page)") + }) + + t.Run("error when filter with pagination", func(t *testing.T) { + filterCode := base64.StdEncoding.EncodeToString([]byte("return true")) + + stream, err := client.ListMessages(ctx, connect.NewRequest(&v1pb.ListMessagesRequest{ + Topic: testTopicName, + PartitionId: -1, + StartOffset: -1, + MaxResults: -1, + PageToken: "", + FilterInterpreterCode: filterCode, + })) + require.NoError(err) + require.NotNil(stream) + + // Stream should error + for stream.Receive() { + // Should not receive any messages + } + + err = stream.Err() + require.Error(err, "Should error when using filter with pagination") + assert.Contains(err.Error(), "cannot use filters with pagination") + }) + + t.Run("legacy mode still works", func(t *testing.T) { + stream, err := client.ListMessages(ctx, connect.NewRequest(&v1pb.ListMessagesRequest{ + Topic: testTopicName, + PartitionId: -1, + StartOffset: -2, // Oldest + MaxResults: 50, // Legacy mode + PageToken: "", + })) + require.NoError(err) + require.NotNil(stream) + + var messages []*v1pb.ListMessagesResponse_DataMessage + var done *v1pb.ListMessagesResponse_StreamCompletedMessage + + for stream.Receive() { + msg := stream.Msg() + switch v := msg.ControlMessage.(type) { + case *v1pb.ListMessagesResponse_Data: + messages = append(messages, v.Data) + case *v1pb.ListMessagesResponse_Done: + done = v.Done + } + } + + require.NoError(stream.Err()) + require.NotNil(done) + + // In legacy mode, should not have pagination tokens + assert.Empty(done.NextPageToken, "Legacy mode should not return page token") + assert.LessOrEqual(len(messages), 50, "Should respect maxResults limit") + }) +} diff --git a/backend/pkg/api/httptypes/types.go b/backend/pkg/api/httptypes/types.go index 73bd444c24..d2ea0b6a2e 100644 --- a/backend/pkg/api/httptypes/types.go +++ b/backend/pkg/api/httptypes/types.go @@ -21,12 +21,15 @@ import ( // used in Console Enterprise to implement the hooks. type ListMessagesRequest struct { TopicName string `json:"topicName"` - StartOffset int64 `json:"startOffset"` // -1 for recent (newest - results), -2 for oldest offset, -3 for newest, -4 for timestamp - StartTimestamp int64 `json:"startTimestamp"` // Start offset by unix timestamp in ms (only considered if start offset is set to -4) - PartitionID int32 `json:"partitionId"` // -1 for all partition ids - MaxResults int `json:"maxResults"` + StartOffset int64 `json:"startOffset"` // -1 for recent (newest - results), -2 for oldest offset, -3 for newest, -4 for timestamp + StartTimestamp int64 `json:"startTimestamp"` // Start offset by unix timestamp in ms (only considered if start offset is set to -4) + PartitionID int32 `json:"partitionId"` // -1 for all partition ids + MaxResults int `json:"maxResults"` // -1 enables pagination mode, 1-500 uses legacy mode FilterInterpreterCode string `json:"filterInterpreterCode"` // Base64 encoded code + // Pagination fields (only used when MaxResults == -1) + PageToken string `json:"pageToken,omitempty"` + // Enterprise may only be set in the Enterprise mode. The JSON deserialization is deferred // to the enterprise backend. Enterprise json.RawMessage `json:"enterprise,omitempty"` @@ -38,6 +41,20 @@ func (l *ListMessagesRequest) OK() error { return errors.New("topic name is required") } + // Pagination mode: max_results = -1 + if l.MaxResults == -1 { + if l.FilterInterpreterCode != "" { + decoded, _ := l.DecodeInterpreterCode() + if decoded != "" { + return errors.New("cannot use filters with pagination") + } + } + + // PageToken validation is done in the console package DecodePageToken + return nil + } + + // Legacy mode validation if l.StartOffset < -4 { return errors.New("start offset is smaller than -4") } @@ -47,7 +64,7 @@ func (l *ListMessagesRequest) OK() error { } if l.MaxResults <= 0 || l.MaxResults > 500 { - return errors.New("max results must be between 1 and 500") + return errors.New("max results must be between 1 and 500, or -1 for pagination") } if _, err := l.DecodeInterpreterCode(); err != nil { diff --git a/backend/pkg/console/list_messages.go b/backend/pkg/console/list_messages.go index 1d45b2032b..063b550683 100644 --- a/backend/pkg/console/list_messages.go +++ b/backend/pkg/console/list_messages.go @@ -40,6 +40,11 @@ const ( StartOffsetNewest int64 = -3 // StartOffsetTimestamp = Start offset is specified as unix timestamp in ms StartOffsetTimestamp int64 = -4 + + // DirectionDescending = Newest messages first (high water mark to low water mark) + DirectionDescending string = "desc" + // DirectionAscending = Oldest messages first (low water mark to high water mark) + DirectionAscending string = "asc" ) // ListMessageRequest carries all filter, sort and cancellation options for fetching messages from Kafka @@ -48,13 +53,17 @@ type ListMessageRequest struct { PartitionID int32 // -1 for all partitions StartOffset int64 // -1 for recent (high - n), -2 for oldest offset, -3 for newest offset, -4 for timestamp StartTimestamp int64 // Start offset by unix timestamp in ms - MessageCount int + MessageCount int // -1 enables pagination mode, 1-500 uses legacy mode FilterInterpreterCode string Troubleshoot bool IncludeRawPayload bool IgnoreMaxSizeLimit bool KeyDeserializer serde.PayloadEncoding ValueDeserializer serde.PayloadEncoding + + // Pagination fields (only used when MessageCount == -1) + PageToken string + PageSize int // Number of messages per page (defaults to 50 if not set) } // ListMessageResponse returns the requested kafka messages along with some metadata about the operation @@ -72,7 +81,7 @@ type IListMessagesProgress interface { OnPhase(name string) // todo(?): eventually we might want to convert this into an enum OnMessage(message *TopicMessage) OnMessageConsumed(size int64) - OnComplete(elapsedMs int64, isCancelled bool) + OnComplete(elapsedMs int64, isCancelled bool, nextPageToken string) OnError(msg string) } @@ -123,6 +132,7 @@ type TopicConsumeRequest struct { IgnoreMaxSizeLimit bool KeyDeserializer serde.PayloadEncoding ValueDeserializer serde.PayloadEncoding + Direction string // "desc" or "asc" - used for message ordering } // ListMessages processes a list message request as sent from the Frontend. This function is responsible (mostly @@ -134,7 +144,7 @@ type TopicConsumeRequest struct { // 5. Start consume request via the Kafka Service // 6. Send a completion message to the frontend, that will show stats about the completed (or aborted) message search // -//nolint:cyclop // complex logic +//nolint:cyclop,gocognit // complex logic with multiple code paths for pagination vs legacy mode func (s *Service) ListMessages(ctx context.Context, listReq ListMessageRequest, progress IListMessagesProgress) error { cl, adminCl, err := s.kafkaClientFactory.GetKafkaClient(ctx) if err != nil { @@ -211,15 +221,68 @@ func (s *Service) ListMessages(ctx context.Context, listReq ListMessageRequest, } // Get partition consume request by calculating start and end offsets for each partition - consumeRequests, err := s.calculateConsumeRequests(ctx, cl, &listReq, partitionIDs, startOffsets, endOffsets) - if err != nil { - return fmt.Errorf("failed to calculate consume request: %w", err) + // Use pagination mode if MessageCount == -1 + var consumeRequests map[int32]*PartitionConsumeRequest + var nextPageToken string + var token *PageToken // Declare token outside if block so it's accessible later + + if listReq.MessageCount == -1 { + // Pagination mode + if listReq.FilterInterpreterCode != "" { + return errors.New("cannot use filters with pagination") + } + + if listReq.PageToken != "" { + token, err = DecodePageToken(listReq.PageToken) + if err != nil { + return fmt.Errorf("invalid page token: %w", err) + } + } else { + // First page - create initial token with page size from request (default 50) + pageSize := listReq.PageSize + if pageSize <= 0 { + pageSize = 50 // Default page size + } + + // Determine direction based on startOffset + // StartOffsetOldest (-2) = ascending (oldest first) + // StartOffsetNewest (-3) = descending (newest first) + // Default to descending for backwards compatibility + direction := DirectionDescending + if listReq.StartOffset == StartOffsetOldest { + direction = DirectionAscending + } + + token, err = CreateInitialPageToken(listReq.TopicName, startOffsets, endOffsets, pageSize, direction) + if err != nil { + return fmt.Errorf("failed to create initial page token: %w", err) + } + } + + consumeRequests, nextPageToken, _, err = s.calculateConsumeRequestsWithPageToken(ctx, token, partitionIDs, startOffsets, endOffsets) + if err != nil { + return fmt.Errorf("failed to calculate consume request with page token: %w", err) + } + } else { + // Legacy mode + consumeRequests, err = s.calculateConsumeRequests(ctx, cl, &listReq, partitionIDs, startOffsets, endOffsets) + if err != nil { + return fmt.Errorf("failed to calculate consume request: %w", err) + } } + if len(consumeRequests) == 0 { // No partitions/messages to consume, we can quit early. - progress.OnComplete(time.Since(start).Milliseconds(), false) + progress.OnComplete(time.Since(start).Milliseconds(), false, "") return nil } + // Determine direction based on mode + direction := DirectionAscending // Legacy mode is ascending + if listReq.MessageCount == -1 && token != nil { + // Pagination mode: use direction from token + direction = token.Direction + } + topicConsumeRequest := TopicConsumeRequest{ TopicName: listReq.TopicName, MaxMessageCount: listReq.MessageCount, @@ -230,6 +293,7 @@ func (s *Service) ListMessages(ctx context.Context, listReq ListMessageRequest, IgnoreMaxSizeLimit: listReq.IgnoreMaxSizeLimit, KeyDeserializer: listReq.KeyDeserializer, ValueDeserializer: listReq.ValueDeserializer, + Direction: direction, } progress.OnPhase("Consuming messages") @@ -240,7 +304,7 @@ func (s *Service) ListMessages(ctx context.Context, listReq ListMessageRequest, } isCancelled := ctx.Err() != nil - progress.OnComplete(time.Since(start).Milliseconds(), isCancelled) + progress.OnComplete(time.Since(start).Milliseconds(), isCancelled, nextPageToken) if isCancelled { return errors.New("request was cancelled while waiting for messages") } @@ -463,6 +527,263 @@ func (s *Service) calculateConsumeRequests( return filteredRequests, nil } +// calculateConsumeRequestsWithPageToken calculates consume requests for pagination mode (descending order). +// It returns the consume requests map, the next page token, and whether more pages are available. +// +//nolint:gocognit,cyclop // complex logic for round-robin distribution across partitions with watermark tracking +func (s *Service) calculateConsumeRequestsWithPageToken( + ctx context.Context, + token *PageToken, + requestedPartitionIDs []int32, + startOffsets, endOffsets kadm.ListedOffsets, +) (map[int32]*PartitionConsumeRequest, string, bool, error) { + requests := make(map[int32]*PartitionConsumeRequest) + + // Validate partition count hasn't changed + if int32(len(requestedPartitionIDs)) != token.PartitionCount { + return nil, "", false, fmt.Errorf("partition count changed: expected %d, got %d", token.PartitionCount, len(requestedPartitionIDs)) + } + + // Build a next token to track state for the next page + nextToken := &PageToken{ + TopicName: token.TopicName, + PartitionCount: token.PartitionCount, + Partitions: make([]PartitionCursor, 0, len(token.Partitions)), + Direction: token.Direction, + PageSize: token.PageSize, + } + + anyPartitionHasMore := false + + // Initialize consume requests for all non-exhausted partitions + // We'll distribute the page size across partitions using round-robin + type partitionState struct { + cursor PartitionCursor + updatedLowWaterMark int64 + updatedHighWaterMark int64 + nextOffsetForNextPage int64 + messagesAvailable int64 + isDrained bool + } + + partitionStates := make([]*partitionState, 0, len(token.Partitions)) + + // First pass: collect partition states and check exhaustion + for _, cursor := range token.Partitions { + // Get current water marks + startOffset, exists := startOffsets.Lookup(token.TopicName, cursor.ID) + if !exists { + s.logger.WarnContext(ctx, "partition from token not found in startOffsets, skipping", + slog.String("topic", token.TopicName), + slog.Int("partition", int(cursor.ID))) + continue + } + + endOffset, exists := endOffsets.Lookup(token.TopicName, cursor.ID) + if !exists { + s.logger.WarnContext(ctx, "partition from token not found in endOffsets, skipping", + slog.String("topic", token.TopicName), + slog.Int("partition", int(cursor.ID))) + continue + } + + // Update water marks in case they've changed + updatedLowWaterMark := startOffset.Offset + updatedHighWaterMark := endOffset.Offset + + // Check if partition is exhausted based on direction + var isExhausted bool + if token.Direction == DirectionDescending { + // Descending: exhausted when nextOffset < lowWaterMark + isExhausted = cursor.NextOffset < updatedLowWaterMark + } else { + // Ascending: exhausted when nextOffset >= highWaterMark + isExhausted = cursor.NextOffset >= updatedHighWaterMark + } + + if isExhausted { + // Partition exhausted, don't include in next token + s.logger.DebugContext(ctx, "partition exhausted", + slog.String("topic", token.TopicName), + slog.Int("partition", int(cursor.ID)), + slog.Int64("next_offset", cursor.NextOffset), + slog.String("direction", token.Direction)) + continue + } + + // Calculate available messages for this partition + var messagesAvailable int64 + if token.Direction == DirectionDescending { + // Available from nextOffset down to lowWaterMark (inclusive) + messagesAvailable = cursor.NextOffset - updatedLowWaterMark + 1 + } else { + // Available from nextOffset up to highWaterMark (exclusive) + messagesAvailable = updatedHighWaterMark - cursor.NextOffset + } + + if messagesAvailable <= 0 { + continue + } + + state := &partitionState{ + cursor: cursor, + updatedLowWaterMark: updatedLowWaterMark, + updatedHighWaterMark: updatedHighWaterMark, + messagesAvailable: messagesAvailable, + isDrained: false, + } + + partitionStates = append(partitionStates, state) + } + + // If no partitions available, return empty + if len(partitionStates) == 0 { + return requests, "", false, nil + } + + // Second pass: Distribute pageSize across partitions using round-robin + remainingMessages := int64(token.PageSize) + yieldingPartitions := len(partitionStates) + + // Initialize each partition with 0 messages + for _, state := range partitionStates { + requests[state.cursor.ID] = &PartitionConsumeRequest{ + PartitionID: state.cursor.ID, + IsDrained: false, + LowWaterMark: state.updatedLowWaterMark, + HighWaterMark: state.updatedHighWaterMark, + StartOffset: state.cursor.NextOffset, + EndOffset: state.cursor.NextOffset, + MaxMessageCount: 0, + } + } + + // Round-robin distribution: give each partition one message at a time + for remainingMessages > 0 && yieldingPartitions > 0 { + progressMade := false + + for _, state := range partitionStates { + if state.isDrained || remainingMessages == 0 { + continue + } + + req := requests[state.cursor.ID] + + // Check if this partition can yield more messages + if req.MaxMessageCount >= state.messagesAvailable { + state.isDrained = true + yieldingPartitions-- + continue + } + + // Give this partition one more message + req.MaxMessageCount++ + remainingMessages-- + progressMade = true + } + + // Safety: if no progress was made in a round, break to avoid infinite loop + if !progressMade { + break + } + } + + // Third pass: Calculate actual read ranges and next offsets for each partition + totalAssigned := int64(0) + for _, state := range partitionStates { + req := requests[state.cursor.ID] + + if req.MaxMessageCount == 0 { + // No messages assigned to this partition, remove it + delete(requests, state.cursor.ID) + continue + } + totalAssigned += req.MaxMessageCount + + // Calculate the actual read range based on direction + var readStart, readEnd int64 + var nextOffsetForNextPage int64 + + if token.Direction == DirectionDescending { + // Descending: read from (nextOffset - count + 1) to nextOffset + readStart = state.cursor.NextOffset - req.MaxMessageCount + 1 + if readStart < state.updatedLowWaterMark { + readStart = state.updatedLowWaterMark + } + readEnd = state.cursor.NextOffset + nextOffsetForNextPage = readStart - 1 + } else { + // Ascending: read from nextOffset to (nextOffset + count - 1) + readStart = state.cursor.NextOffset + readEnd = state.cursor.NextOffset + req.MaxMessageCount - 1 + if readEnd >= state.updatedHighWaterMark { + readEnd = state.updatedHighWaterMark - 1 + } + nextOffsetForNextPage = readEnd + 1 + } + + // Update request with calculated ranges + req.StartOffset = readStart + req.EndOffset = readEnd + // MaxMessageCount stays as assigned by round-robin + // But if the actual range is smaller (near water marks), we need to adjust + actualAvailable := readEnd - readStart + 1 + if actualAvailable < req.MaxMessageCount { + req.MaxMessageCount = actualAvailable + } + + // Determine if this partition has more messages for next page + var hasMore bool + if token.Direction == DirectionDescending { + hasMore = nextOffsetForNextPage >= state.updatedLowWaterMark + } else { + hasMore = nextOffsetForNextPage < state.updatedHighWaterMark + } + + if hasMore { + anyPartitionHasMore = true + } + + // Store next offset for this partition + state.nextOffsetForNextPage = nextOffsetForNextPage + + // Add to next token + nextToken.Partitions = append(nextToken.Partitions, PartitionCursor{ + ID: state.cursor.ID, + NextOffset: nextOffsetForNextPage, + LowWaterMark: state.updatedLowWaterMark, + HighWaterMark: state.updatedHighWaterMark, + }) + } + + // Log detailed partition information for debugging + for partID, req := range requests { + s.logger.DebugContext(ctx, "partition consume request", + slog.Int("partition_id", int(partID)), + slog.Int64("start_offset", req.StartOffset), + slog.Int64("end_offset", req.EndOffset), + slog.Int64("max_message_count", req.MaxMessageCount)) + } + + s.logger.DebugContext(ctx, "pagination distribution complete", + slog.Int("page_size", token.PageSize), + slog.Int64("total_assigned", totalAssigned), + slog.Int("num_partitions", len(requests))) + + // If no partitions have more data, return empty token + if !anyPartitionHasMore { + return requests, "", false, nil + } + + // Encode next token + encodedNextToken, err := nextToken.Encode() + if err != nil { + return nil, "", false, fmt.Errorf("failed to encode next page token: %w", err) + } + + return requests, encodedNextToken, true, nil +} + // FetchMessages is in charge of fulfilling the topic consume request. This is tricky // in many cases, often due to the fact that we can't consume backwards, but we offer // users to consume the most recent messages. @@ -524,6 +845,9 @@ func (s *Service) fetchMessages(ctx context.Context, cl *kgo.Client, progress IL messageCount := 0 messageCountByPartition := make(map[int32]int64) + // For descending order, we need to collect messages per partition and reverse them + messagesPerPartition := make(map[int32][]*TopicMessage) + for msg := range resultsCh { // Since a 'kafka message' is likely transmitted in compressed batches this size is not really accurate progress.OnMessageConsumed(msg.MessageSize) @@ -533,13 +857,38 @@ func (s *Service) fetchMessages(ctx context.Context, cl *kgo.Client, progress IL messageCount++ messageCountByPartition[msg.PartitionID]++ - progress.OnMessage(msg) + if consumeReq.Direction == DirectionDescending { + // Collect messages for reversal + messagesPerPartition[msg.PartitionID] = append(messagesPerPartition[msg.PartitionID], msg) + } else { + // Stream messages immediately in ascending order + progress.OnMessage(msg) + } } // Do we need more messages to satisfy the user request? Return if request is satisfied isRequestSatisfied := messageCount == consumeReq.MaxMessageCount if isRequestSatisfied { - return nil + break + } + } + + // If descending order, reverse and send messages + if consumeReq.Direction == DirectionDescending { + // Reverse messages in each partition + for partitionID, messages := range messagesPerPartition { + for i, j := 0, len(messages)-1; i < j; i, j = i+1, j-1 { + messages[i], messages[j] = messages[j], messages[i] + } + + // Send reversed messages + for _, msg := range messages { + progress.OnMessage(msg) + } + + s.logger.DebugContext(ctx, "reversed messages for partition", + slog.Int("partition", int(partitionID)), + slog.Int("count", len(messages))) } } @@ -683,11 +1032,11 @@ func (s *Service) startMessageWorker(ctx context.Context, wg *sync.WaitGroup, // consumeKafkaMessages consumes messages for the consume request and sends responses to the jobs channel. // This function will close the channel. // The caller is responsible for closing the client if desired. -// -//nolint:gocognit // end condition if statements func (s *Service) consumeKafkaMessages(ctx context.Context, client *kgo.Client, consumeReq TopicConsumeRequest, jobs chan<- *kgo.Record) { defer close(jobs) + // Track which partitions have finished reading + finishedPartitions := make(map[int32]bool) remainingPartitionRequests := len(consumeReq.Partitions) for { @@ -713,6 +1062,11 @@ func (s *Service) consumeKafkaMessages(ctx context.Context, client *kgo.Client, for !iter.Done() { record := iter.Next() + // Skip messages from partitions that are already finished + if finishedPartitions[record.Partition] { + continue + } + // Avoid a deadlock in case the jobs channel is full select { case <-ctx.Done(): @@ -722,10 +1076,11 @@ func (s *Service) consumeKafkaMessages(ctx context.Context, client *kgo.Client, partitionReq := consumeReq.Partitions[record.Partition] + // Stop reading from this partition when we've passed the EndOffset + // EndOffset is inclusive, so we need to check > not >= if record.Offset >= partitionReq.EndOffset { - if remainingPartitionRequests > 0 { - remainingPartitionRequests-- - } + finishedPartitions[record.Partition] = true + remainingPartitionRequests-- if remainingPartitionRequests == 0 { return diff --git a/backend/pkg/console/list_messages_integration_test.go b/backend/pkg/console/list_messages_integration_test.go index a9909da965..42d561e966 100644 --- a/backend/pkg/console/list_messages_integration_test.go +++ b/backend/pkg/console/list_messages_integration_test.go @@ -88,7 +88,7 @@ func (s *ConsoleIntegrationTestSuite) TestListMessages() { mockProgress.EXPECT().OnPhase("Get Partitions") mockProgress.EXPECT().OnPhase("Get Watermarks and calculate consuming requests") - mockProgress.EXPECT().OnComplete(gomock.Any(), false) + mockProgress.EXPECT().OnComplete(gomock.Any(), false, "") svc := createNewTestService(t, log, t.Name(), s.testSeedBroker, s.registryAddr) @@ -120,7 +120,7 @@ func (s *ConsoleIntegrationTestSuite) TestListMessages() { mockProgress.EXPECT().OnPhase("Consuming messages") mockProgress.EXPECT().OnMessage(gomock.AssignableToTypeOf(msg)).Times(20) mockProgress.EXPECT().OnMessageConsumed(gomock.AssignableToTypeOf(int64Type)).Times(20) - mockProgress.EXPECT().OnComplete(gomock.AssignableToTypeOf(int64Type), false) + mockProgress.EXPECT().OnComplete(gomock.AssignableToTypeOf(int64Type), false, "") svc := createNewTestService(t, log, t.Name(), s.testSeedBroker, s.registryAddr) @@ -151,7 +151,7 @@ func (s *ConsoleIntegrationTestSuite) TestListMessages() { mockProgress.EXPECT().OnPhase("Consuming messages") mockProgress.EXPECT().OnMessage(MatchesOrder("10")).Times(1) mockProgress.EXPECT().OnMessageConsumed(gomock.AssignableToTypeOf(int64Type)).Times(1) - mockProgress.EXPECT().OnComplete(gomock.AssignableToTypeOf(int64Type), false) + mockProgress.EXPECT().OnComplete(gomock.AssignableToTypeOf(int64Type), false, "") svc := createNewTestService(t, log, t.Name(), s.testSeedBroker, s.registryAddr) @@ -186,7 +186,7 @@ func (s *ConsoleIntegrationTestSuite) TestListMessages() { mockProgress.EXPECT().OnMessage(MatchesOrder("13")).Times(1) mockProgress.EXPECT().OnMessage(MatchesOrder("14")).Times(1) mockProgress.EXPECT().OnMessageConsumed(gomock.AssignableToTypeOf(int64Type)).Times(5) - mockProgress.EXPECT().OnComplete(gomock.AssignableToTypeOf(int64Type), false) + mockProgress.EXPECT().OnComplete(gomock.AssignableToTypeOf(int64Type), false, "") svc := createNewTestService(t, log, t.Name(), s.testSeedBroker, s.registryAddr) @@ -217,7 +217,7 @@ func (s *ConsoleIntegrationTestSuite) TestListMessages() { mockProgress.EXPECT().OnPhase("Consuming messages") mockProgress.EXPECT().OnMessage(MatchesOrder("19")).Times(1) mockProgress.EXPECT().OnMessageConsumed(gomock.AssignableToTypeOf(int64Type)).Times(1) - mockProgress.EXPECT().OnComplete(gomock.AssignableToTypeOf(int64Type), false) + mockProgress.EXPECT().OnComplete(gomock.AssignableToTypeOf(int64Type), false, "") svc := createNewTestService(t, log, t.Name(), s.testSeedBroker, s.registryAddr) @@ -253,7 +253,7 @@ func (s *ConsoleIntegrationTestSuite) TestListMessages() { mockProgress.EXPECT().OnMessage(MatchesOrder("14")).Times(1) mockProgress.EXPECT().OnMessage(MatchesOrder("15")).Times(1) mockProgress.EXPECT().OnMessageConsumed(gomock.AssignableToTypeOf(int64Type)).Times(5) - mockProgress.EXPECT().OnComplete(gomock.AssignableToTypeOf(int64Type), false) + mockProgress.EXPECT().OnComplete(gomock.AssignableToTypeOf(int64Type), false, "") svc := createNewTestService(t, log, t.Name(), s.testSeedBroker, s.registryAddr) @@ -326,7 +326,7 @@ func (s *ConsoleIntegrationTestSuite) TestListMessages() { mockProgress.EXPECT().OnPhase("Consuming messages") mockProgress.EXPECT().OnMessage(MatchesOrder("10")).Times(1) mockProgress.EXPECT().OnMessageConsumed(gomock.AssignableToTypeOf(int64Type)).Times(1) - mockProgress.EXPECT().OnComplete(gomock.AssignableToTypeOf(int64Type), false) + mockProgress.EXPECT().OnComplete(gomock.AssignableToTypeOf(int64Type), false, "") var fetchCalls int32 mdCalls := atomic.Int32{} @@ -570,7 +570,7 @@ func (s *ConsoleIntegrationTestSuite) TestListMessages() { mockProgress.EXPECT().OnPhase("Consuming messages") mockProgress.EXPECT().OnMessage(orderMatcher).Times(11) mockProgress.EXPECT().OnMessageConsumed(gomock.AssignableToTypeOf(int64Type)).AnyTimes() - mockProgress.EXPECT().OnComplete(gomock.AssignableToTypeOf(int64Type), false) + mockProgress.EXPECT().OnComplete(gomock.AssignableToTypeOf(int64Type), false, "") svc := createNewTestService(t, log, t.Name(), s.testSeedBroker, s.registryAddr) @@ -610,7 +610,7 @@ func (s *ConsoleIntegrationTestSuite) TestListMessages() { mockProgress.EXPECT().OnPhase("Consuming messages") mockProgress.EXPECT().OnMessage(orderMatcher).Times(4) mockProgress.EXPECT().OnMessageConsumed(gomock.AssignableToTypeOf(int64Type)).AnyTimes() - mockProgress.EXPECT().OnComplete(gomock.AssignableToTypeOf(int64Type), false) + mockProgress.EXPECT().OnComplete(gomock.AssignableToTypeOf(int64Type), false, "") svc := createNewTestService(t, log, t.Name(), s.testSeedBroker, s.registryAddr) @@ -656,7 +656,7 @@ func (s *ConsoleIntegrationTestSuite) TestListMessages() { mockProgress.EXPECT().OnPhase("Consuming messages") mockProgress.EXPECT().OnMessage(orderMatcher).Times(10) mockProgress.EXPECT().OnMessageConsumed(gomock.AssignableToTypeOf(int64Type)).AnyTimes() - mockProgress.EXPECT().OnComplete(gomock.AssignableToTypeOf(int64Type), false) + mockProgress.EXPECT().OnComplete(gomock.AssignableToTypeOf(int64Type), false, "") svc := createNewTestService(t, log, t.Name(), s.testSeedBroker, s.registryAddr) @@ -703,7 +703,7 @@ func (s *ConsoleIntegrationTestSuite) TestListMessages() { mockProgress.EXPECT().OnPhase("Consuming messages") mockProgress.EXPECT().OnMessageConsumed(gomock.AssignableToTypeOf(int64Type)).AnyTimes() mockProgress.EXPECT().OnMessage(orderMatcher).Times(10) - mockProgress.EXPECT().OnComplete(gomock.AssignableToTypeOf(int64Type), false) + mockProgress.EXPECT().OnComplete(gomock.AssignableToTypeOf(int64Type), false, "") svc := createNewTestService(t, log, t.Name(), s.testSeedBroker, s.registryAddr) @@ -749,7 +749,7 @@ func (s *ConsoleIntegrationTestSuite) TestListMessages() { mockProgress.EXPECT().OnPhase("Consuming messages") mockProgress.EXPECT().OnMessage(orderMatcher).Times(10) mockProgress.EXPECT().OnMessageConsumed(gomock.AssignableToTypeOf(int64Type)).AnyTimes() - mockProgress.EXPECT().OnComplete(gomock.AssignableToTypeOf(int64Type), false) + mockProgress.EXPECT().OnComplete(gomock.AssignableToTypeOf(int64Type), false, "") svc := createNewTestService(t, log, t.Name(), s.testSeedBroker, s.registryAddr) diff --git a/backend/pkg/console/list_messages_mocks_test.go b/backend/pkg/console/list_messages_mocks_test.go index bf9976f8d1..edc747d64c 100644 --- a/backend/pkg/console/list_messages_mocks_test.go +++ b/backend/pkg/console/list_messages_mocks_test.go @@ -19,6 +19,7 @@ import ( type MockIListMessagesProgress struct { ctrl *gomock.Controller recorder *MockIListMessagesProgressMockRecorder + isgomock struct{} } // MockIListMessagesProgressMockRecorder is the mock recorder for MockIListMessagesProgress. @@ -39,61 +40,61 @@ func (m *MockIListMessagesProgress) EXPECT() *MockIListMessagesProgressMockRecor } // OnComplete mocks base method. -func (m *MockIListMessagesProgress) OnComplete(arg0 int64, arg1 bool) { +func (m *MockIListMessagesProgress) OnComplete(elapsedMs int64, isCancelled bool, nextPageToken string) { m.ctrl.T.Helper() - m.ctrl.Call(m, "OnComplete", arg0, arg1) + m.ctrl.Call(m, "OnComplete", elapsedMs, isCancelled, nextPageToken) } // OnComplete indicates an expected call of OnComplete. -func (mr *MockIListMessagesProgressMockRecorder) OnComplete(arg0, arg1 any) *gomock.Call { +func (mr *MockIListMessagesProgressMockRecorder) OnComplete(elapsedMs, isCancelled, nextPageToken any) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "OnComplete", reflect.TypeOf((*MockIListMessagesProgress)(nil).OnComplete), arg0, arg1) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "OnComplete", reflect.TypeOf((*MockIListMessagesProgress)(nil).OnComplete), elapsedMs, isCancelled, nextPageToken) } // OnError mocks base method. -func (m *MockIListMessagesProgress) OnError(arg0 string) { +func (m *MockIListMessagesProgress) OnError(msg string) { m.ctrl.T.Helper() - m.ctrl.Call(m, "OnError", arg0) + m.ctrl.Call(m, "OnError", msg) } // OnError indicates an expected call of OnError. -func (mr *MockIListMessagesProgressMockRecorder) OnError(arg0 any) *gomock.Call { +func (mr *MockIListMessagesProgressMockRecorder) OnError(msg any) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "OnError", reflect.TypeOf((*MockIListMessagesProgress)(nil).OnError), arg0) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "OnError", reflect.TypeOf((*MockIListMessagesProgress)(nil).OnError), msg) } // OnMessage mocks base method. -func (m *MockIListMessagesProgress) OnMessage(arg0 *TopicMessage) { +func (m *MockIListMessagesProgress) OnMessage(message *TopicMessage) { m.ctrl.T.Helper() - m.ctrl.Call(m, "OnMessage", arg0) + m.ctrl.Call(m, "OnMessage", message) } // OnMessage indicates an expected call of OnMessage. -func (mr *MockIListMessagesProgressMockRecorder) OnMessage(arg0 any) *gomock.Call { +func (mr *MockIListMessagesProgressMockRecorder) OnMessage(message any) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "OnMessage", reflect.TypeOf((*MockIListMessagesProgress)(nil).OnMessage), arg0) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "OnMessage", reflect.TypeOf((*MockIListMessagesProgress)(nil).OnMessage), message) } // OnMessageConsumed mocks base method. -func (m *MockIListMessagesProgress) OnMessageConsumed(arg0 int64) { +func (m *MockIListMessagesProgress) OnMessageConsumed(size int64) { m.ctrl.T.Helper() - m.ctrl.Call(m, "OnMessageConsumed", arg0) + m.ctrl.Call(m, "OnMessageConsumed", size) } // OnMessageConsumed indicates an expected call of OnMessageConsumed. -func (mr *MockIListMessagesProgressMockRecorder) OnMessageConsumed(arg0 any) *gomock.Call { +func (mr *MockIListMessagesProgressMockRecorder) OnMessageConsumed(size any) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "OnMessageConsumed", reflect.TypeOf((*MockIListMessagesProgress)(nil).OnMessageConsumed), arg0) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "OnMessageConsumed", reflect.TypeOf((*MockIListMessagesProgress)(nil).OnMessageConsumed), size) } // OnPhase mocks base method. -func (m *MockIListMessagesProgress) OnPhase(arg0 string) { +func (m *MockIListMessagesProgress) OnPhase(name string) { m.ctrl.T.Helper() - m.ctrl.Call(m, "OnPhase", arg0) + m.ctrl.Call(m, "OnPhase", name) } // OnPhase indicates an expected call of OnPhase. -func (mr *MockIListMessagesProgressMockRecorder) OnPhase(arg0 any) *gomock.Call { +func (mr *MockIListMessagesProgressMockRecorder) OnPhase(name any) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "OnPhase", reflect.TypeOf((*MockIListMessagesProgress)(nil).OnPhase), arg0) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "OnPhase", reflect.TypeOf((*MockIListMessagesProgress)(nil).OnPhase), name) } diff --git a/backend/pkg/console/page_token.go b/backend/pkg/console/page_token.go new file mode 100644 index 0000000000..debb130b48 --- /dev/null +++ b/backend/pkg/console/page_token.go @@ -0,0 +1,239 @@ +// Copyright 2026 Redpanda Data, Inc. +// +// Use of this software is governed by the Business Source License +// included in the file https://github.com/redpanda-data/redpanda/blob/dev/licenses/bsl.md +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0 + +package console + +import ( + "encoding/base64" + "encoding/json" + "errors" + "fmt" + + "github.com/twmb/franz-go/pkg/kadm" +) + +// PageToken represents the pagination cursor state for ListMessages requests. +// It encodes per-partition offset information to enable stateless pagination +// across multiple API calls. +type PageToken struct { + TopicName string `json:"t"` // Topic name for validation + PartitionCount int32 `json:"pc"` // Partition count for change detection + Partitions []PartitionCursor `json:"p"` // Per-partition cursor state + Direction string `json:"d"` // DirectionDescending or DirectionAscending + PageSize int `json:"ps"` // Messages per page +} + +// PartitionCursor represents the offset state for a single partition. +type PartitionCursor struct { + ID int32 `json:"id"` // Partition ID + NextOffset int64 `json:"no"` // Next offset to read from + LowWaterMark int64 `json:"lw"` // Low water mark (for validation) + HighWaterMark int64 `json:"hw"` // High water mark (for validation) +} + +// Encode serializes the page token to a URL-safe base64 string. +func (pt *PageToken) Encode() (string, error) { + // Validate before encoding + if err := pt.Validate(); err != nil { + return "", fmt.Errorf("invalid page token: %w", err) + } + + jsonBytes, err := json.Marshal(pt) + if err != nil { + return "", fmt.Errorf("failed to marshal page token: %w", err) + } + + // Use URL-safe base64 encoding (no padding) + encoded := base64.RawURLEncoding.EncodeToString(jsonBytes) + return encoded, nil +} + +// DecodePageToken deserializes a page token from a base64 string. +func DecodePageToken(encoded string) (*PageToken, error) { + if encoded == "" { + return nil, errors.New("page token is empty") + } + + // Decode from URL-safe base64 + jsonBytes, err := base64.RawURLEncoding.DecodeString(encoded) + if err != nil { + return nil, fmt.Errorf("failed to decode page token: %w", err) + } + + var token PageToken + if err := json.Unmarshal(jsonBytes, &token); err != nil { + return nil, fmt.Errorf("failed to unmarshal page token: %w", err) + } + + // Validate after decoding + if err := token.Validate(); err != nil { + return nil, fmt.Errorf("invalid page token: %w", err) + } + + return &token, nil +} + +// Validate checks if the page token is valid. +func (pt *PageToken) Validate() error { + if pt.TopicName == "" { + return errors.New("topic name is empty") + } + + if pt.PartitionCount <= 0 { + return fmt.Errorf("invalid partition count: %d", pt.PartitionCount) + } + + if len(pt.Partitions) == 0 { + return errors.New("no partition cursors") + } + + if int32(len(pt.Partitions)) != pt.PartitionCount { + return fmt.Errorf("partition count mismatch: expected %d, got %d", pt.PartitionCount, len(pt.Partitions)) + } + + if pt.Direction != DirectionDescending && pt.Direction != DirectionAscending { + return fmt.Errorf("invalid direction: %s (must be 'desc' or 'asc')", pt.Direction) + } + + if pt.PageSize <= 0 || pt.PageSize > 500 { + return fmt.Errorf("invalid page size: %d (must be between 1 and 500)", pt.PageSize) + } + + // Validate each partition cursor + for i, cursor := range pt.Partitions { + if cursor.ID < 0 { + return fmt.Errorf("partition %d: invalid ID: %d", i, cursor.ID) + } + + // NextOffset can be -1 when a partition is exhausted in descending mode + // (when we've read all messages back to offset 0) + if cursor.NextOffset < -1 { + return fmt.Errorf("partition %d: invalid next offset: %d", i, cursor.NextOffset) + } + + if cursor.LowWaterMark < 0 { + return fmt.Errorf("partition %d: invalid low water mark: %d", i, cursor.LowWaterMark) + } + + if cursor.HighWaterMark < 0 { + return fmt.Errorf("partition %d: invalid high water mark: %d", i, cursor.HighWaterMark) + } + + if cursor.HighWaterMark < cursor.LowWaterMark { + return fmt.Errorf("partition %d: high water mark (%d) < low water mark (%d)", i, cursor.HighWaterMark, cursor.LowWaterMark) + } + } + + return nil +} + +// CreateInitialPageToken creates a new page token for the first page of results. +// It initializes partition cursors based on the specified direction: +// - DirectionDescending: Start from high water mark (newest messages first) +// - DirectionAscending: Start from low water mark (oldest messages first) +func CreateInitialPageToken( + topicName string, + startOffsets, endOffsets kadm.ListedOffsets, + pageSize int, + direction string, +) (*PageToken, error) { + if topicName == "" { + return nil, errors.New("topic name is empty") + } + + if pageSize <= 0 || pageSize > 500 { + return nil, fmt.Errorf("invalid page size: %d (must be between 1 and 500)", pageSize) + } + + if direction != DirectionDescending && direction != DirectionAscending { + return nil, fmt.Errorf("invalid direction: %s (must be 'desc' or 'asc')", direction) + } + + var partitions []PartitionCursor + + // Build cursors from water marks + startOffsets.Each(func(start kadm.ListedOffset) { + // Find corresponding end offset + end, exists := endOffsets.Lookup(topicName, start.Partition) + if !exists { + // Skip partitions without end offset + return + } + + var nextOffset int64 + if direction == DirectionDescending { + // For descending order, start from high water mark + // NextOffset points to the highest consumable offset (HWM - 1) + nextOffset = end.Offset - 1 + if nextOffset < start.Offset { + nextOffset = start.Offset + } + } else { + // For ascending order, start from low water mark + // NextOffset points to the lowest consumable offset (LWM) + nextOffset = start.Offset + } + + cursor := PartitionCursor{ + ID: start.Partition, + NextOffset: nextOffset, + LowWaterMark: start.Offset, + HighWaterMark: end.Offset, + } + + partitions = append(partitions, cursor) + }) + + if len(partitions) == 0 { + return nil, errors.New("no partitions available") + } + + token := &PageToken{ + TopicName: topicName, + PartitionCount: int32(len(partitions)), + Partitions: partitions, + Direction: direction, + PageSize: pageSize, + } + + return token, nil +} + +// HasMore returns true if there are more messages to fetch for any partition. +func (pt *PageToken) HasMore() bool { + for _, cursor := range pt.Partitions { + // Check if cursor has more messages to read based on direction + if pt.Direction == DirectionDescending { + // Descending: exhausted when nextOffset < lowWaterMark + if cursor.NextOffset >= cursor.LowWaterMark { + return true + } + } else { + // Ascending: exhausted when nextOffset >= highWaterMark + if cursor.NextOffset < cursor.HighWaterMark { + return true + } + } + } + return false +} + +// IsExhausted returns true if the given partition has no more messages. +// Direction is needed to determine if we're going forward or backward. +func (pt *PageToken) IsExhausted(partitionID int32) bool { + for _, cursor := range pt.Partitions { + if cursor.ID == partitionID { + if pt.Direction == DirectionDescending { + return cursor.NextOffset < cursor.LowWaterMark + } + return cursor.NextOffset >= cursor.HighWaterMark + } + } + return true +} diff --git a/backend/pkg/console/page_token_test.go b/backend/pkg/console/page_token_test.go new file mode 100644 index 0000000000..233dc38cfb --- /dev/null +++ b/backend/pkg/console/page_token_test.go @@ -0,0 +1,371 @@ +// Copyright 2026 Redpanda Data, Inc. +// +// Use of this software is governed by the Business Source License +// included in the file https://github.com/redpanda-data/redpanda/blob/dev/licenses/bsl.md +// +// As of the Change Date specified in that file, in accordance with the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0 + +package console + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/twmb/franz-go/pkg/kadm" +) + +func TestPageToken_EncodeDecodeRoundTrip(t *testing.T) { + token := &PageToken{ + TopicName: "test-topic", + PartitionCount: 3, + Partitions: []PartitionCursor{ + {ID: 0, NextOffset: 1000, LowWaterMark: 0, HighWaterMark: 1500}, + {ID: 1, NextOffset: 2000, LowWaterMark: 0, HighWaterMark: 2500}, + {ID: 2, NextOffset: 1500, LowWaterMark: 0, HighWaterMark: 2000}, + }, + Direction: DirectionDescending, + PageSize: 50, + } + + // Encode + encoded, err := token.Encode() + require.NoError(t, err) + assert.NotEmpty(t, encoded) + + // Decode + decoded, err := DecodePageToken(encoded) + require.NoError(t, err) + + // Verify round-trip + assert.Equal(t, token.TopicName, decoded.TopicName) + assert.Equal(t, token.PartitionCount, decoded.PartitionCount) + assert.Equal(t, token.Direction, decoded.Direction) + assert.Equal(t, token.PageSize, decoded.PageSize) + assert.Len(t, decoded.Partitions, 3) + + for i, cursor := range token.Partitions { + assert.Equal(t, cursor.ID, decoded.Partitions[i].ID) + assert.Equal(t, cursor.NextOffset, decoded.Partitions[i].NextOffset) + assert.Equal(t, cursor.LowWaterMark, decoded.Partitions[i].LowWaterMark) + assert.Equal(t, cursor.HighWaterMark, decoded.Partitions[i].HighWaterMark) + } +} + +func TestPageToken_ValidateErrors(t *testing.T) { + tests := []struct { + name string + token *PageToken + expectErr string + }{ + { + name: "empty topic name", + token: &PageToken{ + TopicName: "", + PartitionCount: 1, + Partitions: []PartitionCursor{{ID: 0, NextOffset: 100, LowWaterMark: 0, HighWaterMark: 200}}, + Direction: "desc", + PageSize: 50, + }, + expectErr: "topic name is empty", + }, + { + name: "invalid partition count", + token: &PageToken{ + TopicName: "test", + PartitionCount: 0, + Partitions: []PartitionCursor{{ID: 0, NextOffset: 100, LowWaterMark: 0, HighWaterMark: 200}}, + Direction: "desc", + PageSize: 50, + }, + expectErr: "invalid partition count", + }, + { + name: "partition count mismatch", + token: &PageToken{ + TopicName: "test", + PartitionCount: 2, + Partitions: []PartitionCursor{{ID: 0, NextOffset: 100, LowWaterMark: 0, HighWaterMark: 200}}, + Direction: "desc", + PageSize: 50, + }, + expectErr: "partition count mismatch", + }, + { + name: "invalid direction", + token: &PageToken{ + TopicName: "test", + PartitionCount: 1, + Partitions: []PartitionCursor{{ID: 0, NextOffset: 100, LowWaterMark: 0, HighWaterMark: 200}}, + Direction: "invalid", + PageSize: 50, + }, + expectErr: "invalid direction", + }, + { + name: "invalid page size - too small", + token: &PageToken{ + TopicName: "test", + PartitionCount: 1, + Partitions: []PartitionCursor{{ID: 0, NextOffset: 100, LowWaterMark: 0, HighWaterMark: 200}}, + Direction: "desc", + PageSize: 0, + }, + expectErr: "invalid page size", + }, + { + name: "invalid page size - too large", + token: &PageToken{ + TopicName: "test", + PartitionCount: 1, + Partitions: []PartitionCursor{{ID: 0, NextOffset: 100, LowWaterMark: 0, HighWaterMark: 200}}, + Direction: "desc", + PageSize: 501, + }, + expectErr: "invalid page size", + }, + { + name: "next offset -2 (too negative)", + token: &PageToken{ + TopicName: "test", + PartitionCount: 1, + Partitions: []PartitionCursor{{ID: 0, NextOffset: -2, LowWaterMark: 0, HighWaterMark: 200}}, + Direction: "desc", + PageSize: 50, + }, + expectErr: "invalid next offset", + }, + { + name: "high water mark < low water mark", + token: &PageToken{ + TopicName: "test", + PartitionCount: 1, + Partitions: []PartitionCursor{{ID: 0, NextOffset: 100, LowWaterMark: 200, HighWaterMark: 100}}, + Direction: "desc", + PageSize: 50, + }, + expectErr: "high water mark", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + err := tt.token.Validate() + require.Error(t, err) + assert.Contains(t, err.Error(), tt.expectErr) + }) + } +} + +func TestDecodePageToken_InvalidBase64(t *testing.T) { + _, err := DecodePageToken("not-valid-base64!!!") + require.Error(t, err) + assert.Contains(t, err.Error(), "failed to decode") +} + +func TestDecodePageToken_InvalidJSON(t *testing.T) { + // Valid base64 but invalid JSON + _, err := DecodePageToken("bm90LWpzb24") + require.Error(t, err) + assert.Contains(t, err.Error(), "failed to unmarshal") +} + +func TestCreateInitialPageToken(t *testing.T) { + // Mock kadm.ListedOffsets + startOffsets := kadm.ListedOffsets{ + "test-topic": { + 0: {Topic: "test-topic", Partition: 0, Offset: 0}, + 1: {Topic: "test-topic", Partition: 1, Offset: 0}, + 2: {Topic: "test-topic", Partition: 2, Offset: 100}, + }, + } + + endOffsets := kadm.ListedOffsets{ + "test-topic": { + 0: {Topic: "test-topic", Partition: 0, Offset: 1000}, + 1: {Topic: "test-topic", Partition: 1, Offset: 2000}, + 2: {Topic: "test-topic", Partition: 2, Offset: 1500}, + }, + } + + token, err := CreateInitialPageToken("test-topic", startOffsets, endOffsets, 50, DirectionDescending) + require.NoError(t, err) + + assert.Equal(t, "test-topic", token.TopicName) + assert.Equal(t, int32(3), token.PartitionCount) + assert.Equal(t, DirectionDescending, token.Direction) + assert.Equal(t, 50, token.PageSize) + assert.Len(t, token.Partitions, 3) + + // Check that NextOffset is set to HWM - 1 for descending + // Build map by partition ID since order is not guaranteed + partByID := make(map[int32]PartitionCursor) + for _, p := range token.Partitions { + partByID[p.ID] = p + } + assert.Equal(t, int64(999), partByID[0].NextOffset) + assert.Equal(t, int64(1999), partByID[1].NextOffset) + assert.Equal(t, int64(1499), partByID[2].NextOffset) +} + +func TestCreateInitialPageToken_EmptyTopic(t *testing.T) { + startOffsets := kadm.ListedOffsets{ + "test-topic": { + 0: {Topic: "test-topic", Partition: 0, Offset: 0}, + }, + } + + endOffsets := kadm.ListedOffsets{ + "test-topic": { + 0: {Topic: "test-topic", Partition: 0, Offset: 0}, + }, + } + + token, err := CreateInitialPageToken("test-topic", startOffsets, endOffsets, 50, DirectionDescending) + require.NoError(t, err) + + // NextOffset should be clamped to low water mark for empty topic + assert.Equal(t, int64(0), token.Partitions[0].NextOffset) +} + +func TestPageToken_HasMore(t *testing.T) { + tests := []struct { + name string + token *PageToken + expected bool + }{ + { + name: "has more messages", + token: &PageToken{ + TopicName: "test", + PartitionCount: 2, + Partitions: []PartitionCursor{ + {ID: 0, NextOffset: 100, LowWaterMark: 0, HighWaterMark: 200}, + {ID: 1, NextOffset: 150, LowWaterMark: 0, HighWaterMark: 300}, + }, + Direction: DirectionDescending, + PageSize: 50, + }, + expected: true, + }, + { + name: "no more messages - all exhausted", + token: &PageToken{ + TopicName: "test", + PartitionCount: 2, + Partitions: []PartitionCursor{ + {ID: 0, NextOffset: -1, LowWaterMark: 0, HighWaterMark: 200}, + {ID: 1, NextOffset: -1, LowWaterMark: 0, HighWaterMark: 300}, + }, + Direction: DirectionDescending, + PageSize: 50, + }, + expected: false, + }, + { + name: "some partitions exhausted", + token: &PageToken{ + TopicName: "test", + PartitionCount: 2, + Partitions: []PartitionCursor{ + {ID: 0, NextOffset: -1, LowWaterMark: 0, HighWaterMark: 200}, + {ID: 1, NextOffset: 50, LowWaterMark: 0, HighWaterMark: 300}, + }, + Direction: DirectionDescending, + PageSize: 50, + }, + expected: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result := tt.token.HasMore() + assert.Equal(t, tt.expected, result) + }) + } +} + +func TestPageToken_IsExhausted(t *testing.T) { + tests := []struct { + name string + token *PageToken + partitionID int32 + expected bool + }{ + { + name: "descending - not exhausted", + token: &PageToken{ + Direction: DirectionDescending, + Partitions: []PartitionCursor{ + {ID: 0, NextOffset: 100, LowWaterMark: 0, HighWaterMark: 200}, + }, + }, + partitionID: 0, + expected: false, + }, + { + name: "descending - exhausted below low water mark", + token: &PageToken{ + Direction: DirectionDescending, + Partitions: []PartitionCursor{ + {ID: 0, NextOffset: -1, LowWaterMark: 0, HighWaterMark: 200}, + }, + }, + partitionID: 0, + expected: true, + }, + { + name: "descending - at low water mark boundary", + token: &PageToken{ + Direction: DirectionDescending, + Partitions: []PartitionCursor{ + {ID: 0, NextOffset: 0, LowWaterMark: 0, HighWaterMark: 200}, + }, + }, + partitionID: 0, + expected: false, + }, + { + name: "ascending - not exhausted", + token: &PageToken{ + Direction: DirectionAscending, + Partitions: []PartitionCursor{ + {ID: 0, NextOffset: 100, LowWaterMark: 0, HighWaterMark: 200}, + }, + }, + partitionID: 0, + expected: false, + }, + { + name: "ascending - exhausted at high water mark", + token: &PageToken{ + Direction: DirectionAscending, + Partitions: []PartitionCursor{ + {ID: 0, NextOffset: 200, LowWaterMark: 0, HighWaterMark: 200}, + }, + }, + partitionID: 0, + expected: true, + }, + { + name: "ascending - one below high water mark boundary", + token: &PageToken{ + Direction: DirectionAscending, + Partitions: []PartitionCursor{ + {ID: 0, NextOffset: 199, LowWaterMark: 0, HighWaterMark: 200}, + }, + }, + partitionID: 0, + expected: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result := tt.token.IsExhausted(tt.partitionID) + assert.Equal(t, tt.expected, result) + }) + } +} diff --git a/backend/pkg/protogen/redpanda/api/console/v1alpha1/list_messages.pb.go b/backend/pkg/protogen/redpanda/api/console/v1alpha1/list_messages.pb.go index 9c830093e0..133b339e19 100644 --- a/backend/pkg/protogen/redpanda/api/console/v1alpha1/list_messages.pb.go +++ b/backend/pkg/protogen/redpanda/api/console/v1alpha1/list_messages.pb.go @@ -37,6 +37,8 @@ type ListMessagesRequest struct { KeyDeserializer *PayloadEncoding `protobuf:"varint,10,opt,name=key_deserializer,json=keyDeserializer,proto3,enum=redpanda.api.console.v1alpha1.PayloadEncoding,oneof" json:"key_deserializer,omitempty"` // Optionally specify key payload deserialization strategy to use. ValueDeserializer *PayloadEncoding `protobuf:"varint,11,opt,name=value_deserializer,json=valueDeserializer,proto3,enum=redpanda.api.console.v1alpha1.PayloadEncoding,oneof" json:"value_deserializer,omitempty"` // Optionally specify value payload deserialization strategy to use. IgnoreMaxSizeLimit bool `protobuf:"varint,12,opt,name=ignore_max_size_limit,json=ignoreMaxSizeLimit,proto3" json:"ignore_max_size_limit,omitempty"` // Optionally ignore configured maximum payload size limit. + PageToken string `protobuf:"bytes,13,opt,name=page_token,json=pageToken,proto3" json:"page_token,omitempty"` // Resume from cursor (only used when max_results = -1 for pagination mode). + PageSize int32 `protobuf:"varint,14,opt,name=page_size,json=pageSize,proto3" json:"page_size,omitempty"` // Number of messages to fetch per page (only used when max_results = -1). Defaults to 50 if not set. unknownFields protoimpl.UnknownFields sizeCache protoimpl.SizeCache } @@ -155,6 +157,20 @@ func (x *ListMessagesRequest) GetIgnoreMaxSizeLimit() bool { return false } +func (x *ListMessagesRequest) GetPageToken() string { + if x != nil { + return x.PageToken + } + return "" +} + +func (x *ListMessagesRequest) GetPageSize() int32 { + if x != nil { + return x.PageSize + } + return 0 +} + // ListMessagesResponse is the response for ListMessages call. type ListMessagesResponse struct { state protoimpl.MessageState `protogen:"open.v1"` @@ -587,6 +603,7 @@ type ListMessagesResponse_StreamCompletedMessage struct { IsCancelled bool `protobuf:"varint,2,opt,name=is_cancelled,json=isCancelled,proto3" json:"is_cancelled,omitempty"` // Whether the call was cancelled. MessagesConsumed int64 `protobuf:"varint,3,opt,name=messages_consumed,json=messagesConsumed,proto3" json:"messages_consumed,omitempty"` // Total consumed messages. BytesConsumed int64 `protobuf:"varint,4,opt,name=bytes_consumed,json=bytesConsumed,proto3" json:"bytes_consumed,omitempty"` // Total consumed bytes. + NextPageToken string `protobuf:"bytes,5,opt,name=next_page_token,json=nextPageToken,proto3" json:"next_page_token,omitempty"` // Token for next page (empty when max_results != -1 or no more pages). unknownFields protoimpl.UnknownFields sizeCache protoimpl.SizeCache } @@ -649,6 +666,13 @@ func (x *ListMessagesResponse_StreamCompletedMessage) GetBytesConsumed() int64 { return 0 } +func (x *ListMessagesResponse_StreamCompletedMessage) GetNextPageToken() string { + if x != nil { + return x.NextPageToken + } + return "" +} + // Error control message. type ListMessagesResponse_ErrorMessage struct { state protoimpl.MessageState `protogen:"open.v1"` @@ -706,7 +730,7 @@ var file_redpanda_api_console_v1alpha1_list_messages_proto_rawDesc = []byte{ 0x2f, 0x76, 0x61, 0x6c, 0x69, 0x64, 0x61, 0x74, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x1a, 0x2a, 0x72, 0x65, 0x64, 0x70, 0x61, 0x6e, 0x64, 0x61, 0x2f, 0x61, 0x70, 0x69, 0x2f, 0x63, 0x6f, 0x6e, 0x73, 0x6f, 0x6c, 0x65, 0x2f, 0x76, 0x31, 0x61, 0x6c, 0x70, 0x68, 0x61, 0x31, 0x2f, 0x63, - 0x6f, 0x6d, 0x6d, 0x6f, 0x6e, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0xdc, 0x05, 0x0a, 0x13, + 0x6f, 0x6d, 0x6d, 0x6f, 0x6e, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0xa4, 0x06, 0x0a, 0x13, 0x4c, 0x69, 0x73, 0x74, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x34, 0x0a, 0x05, 0x74, 0x6f, 0x70, 0x69, 0x63, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x42, 0x1e, 0xba, 0x48, 0x1b, 0x72, 0x19, 0x10, 0x01, 0x18, 0xf9, 0x01, 0x32, 0x12, @@ -749,89 +773,96 @@ var file_redpanda_api_console_v1alpha1_list_messages_proto_rawDesc = []byte{ 0x72, 0x69, 0x61, 0x6c, 0x69, 0x7a, 0x65, 0x72, 0x88, 0x01, 0x01, 0x12, 0x31, 0x0a, 0x15, 0x69, 0x67, 0x6e, 0x6f, 0x72, 0x65, 0x5f, 0x6d, 0x61, 0x78, 0x5f, 0x73, 0x69, 0x7a, 0x65, 0x5f, 0x6c, 0x69, 0x6d, 0x69, 0x74, 0x18, 0x0c, 0x20, 0x01, 0x28, 0x08, 0x52, 0x12, 0x69, 0x67, 0x6e, 0x6f, - 0x72, 0x65, 0x4d, 0x61, 0x78, 0x53, 0x69, 0x7a, 0x65, 0x4c, 0x69, 0x6d, 0x69, 0x74, 0x42, 0x13, - 0x0a, 0x11, 0x5f, 0x6b, 0x65, 0x79, 0x5f, 0x64, 0x65, 0x73, 0x65, 0x72, 0x69, 0x61, 0x6c, 0x69, - 0x7a, 0x65, 0x72, 0x42, 0x15, 0x0a, 0x13, 0x5f, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x5f, 0x64, 0x65, - 0x73, 0x65, 0x72, 0x69, 0x61, 0x6c, 0x69, 0x7a, 0x65, 0x72, 0x22, 0xa1, 0x0a, 0x0a, 0x14, 0x4c, - 0x69, 0x73, 0x74, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, - 0x6e, 0x73, 0x65, 0x12, 0x55, 0x0a, 0x04, 0x64, 0x61, 0x74, 0x61, 0x18, 0x01, 0x20, 0x01, 0x28, - 0x0b, 0x32, 0x3f, 0x2e, 0x72, 0x65, 0x64, 0x70, 0x61, 0x6e, 0x64, 0x61, 0x2e, 0x61, 0x70, 0x69, - 0x2e, 0x63, 0x6f, 0x6e, 0x73, 0x6f, 0x6c, 0x65, 0x2e, 0x76, 0x31, 0x61, 0x6c, 0x70, 0x68, 0x61, - 0x31, 0x2e, 0x4c, 0x69, 0x73, 0x74, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x73, 0x52, 0x65, - 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x2e, 0x44, 0x61, 0x74, 0x61, 0x4d, 0x65, 0x73, 0x73, 0x61, - 0x67, 0x65, 0x48, 0x00, 0x52, 0x04, 0x64, 0x61, 0x74, 0x61, 0x12, 0x58, 0x0a, 0x05, 0x70, 0x68, - 0x61, 0x73, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x40, 0x2e, 0x72, 0x65, 0x64, 0x70, + 0x72, 0x65, 0x4d, 0x61, 0x78, 0x53, 0x69, 0x7a, 0x65, 0x4c, 0x69, 0x6d, 0x69, 0x74, 0x12, 0x1d, + 0x0a, 0x0a, 0x70, 0x61, 0x67, 0x65, 0x5f, 0x74, 0x6f, 0x6b, 0x65, 0x6e, 0x18, 0x0d, 0x20, 0x01, + 0x28, 0x09, 0x52, 0x09, 0x70, 0x61, 0x67, 0x65, 0x54, 0x6f, 0x6b, 0x65, 0x6e, 0x12, 0x27, 0x0a, + 0x09, 0x70, 0x61, 0x67, 0x65, 0x5f, 0x73, 0x69, 0x7a, 0x65, 0x18, 0x0e, 0x20, 0x01, 0x28, 0x05, + 0x42, 0x0a, 0xba, 0x48, 0x07, 0x1a, 0x05, 0x18, 0xf4, 0x03, 0x28, 0x01, 0x52, 0x08, 0x70, 0x61, + 0x67, 0x65, 0x53, 0x69, 0x7a, 0x65, 0x42, 0x13, 0x0a, 0x11, 0x5f, 0x6b, 0x65, 0x79, 0x5f, 0x64, + 0x65, 0x73, 0x65, 0x72, 0x69, 0x61, 0x6c, 0x69, 0x7a, 0x65, 0x72, 0x42, 0x15, 0x0a, 0x13, 0x5f, + 0x76, 0x61, 0x6c, 0x75, 0x65, 0x5f, 0x64, 0x65, 0x73, 0x65, 0x72, 0x69, 0x61, 0x6c, 0x69, 0x7a, + 0x65, 0x72, 0x22, 0xc9, 0x0a, 0x0a, 0x14, 0x4c, 0x69, 0x73, 0x74, 0x4d, 0x65, 0x73, 0x73, 0x61, + 0x67, 0x65, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x55, 0x0a, 0x04, 0x64, + 0x61, 0x74, 0x61, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x3f, 0x2e, 0x72, 0x65, 0x64, 0x70, 0x61, 0x6e, 0x64, 0x61, 0x2e, 0x61, 0x70, 0x69, 0x2e, 0x63, 0x6f, 0x6e, 0x73, 0x6f, 0x6c, 0x65, 0x2e, 0x76, 0x31, 0x61, 0x6c, 0x70, 0x68, 0x61, 0x31, 0x2e, 0x4c, 0x69, 0x73, 0x74, 0x4d, 0x65, - 0x73, 0x73, 0x61, 0x67, 0x65, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x2e, 0x50, - 0x68, 0x61, 0x73, 0x65, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x48, 0x00, 0x52, 0x05, 0x70, - 0x68, 0x61, 0x73, 0x65, 0x12, 0x61, 0x0a, 0x08, 0x70, 0x72, 0x6f, 0x67, 0x72, 0x65, 0x73, 0x73, - 0x18, 0x03, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x43, 0x2e, 0x72, 0x65, 0x64, 0x70, 0x61, 0x6e, 0x64, - 0x61, 0x2e, 0x61, 0x70, 0x69, 0x2e, 0x63, 0x6f, 0x6e, 0x73, 0x6f, 0x6c, 0x65, 0x2e, 0x76, 0x31, - 0x61, 0x6c, 0x70, 0x68, 0x61, 0x31, 0x2e, 0x4c, 0x69, 0x73, 0x74, 0x4d, 0x65, 0x73, 0x73, 0x61, - 0x67, 0x65, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x2e, 0x50, 0x72, 0x6f, 0x67, - 0x72, 0x65, 0x73, 0x73, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x48, 0x00, 0x52, 0x08, 0x70, - 0x72, 0x6f, 0x67, 0x72, 0x65, 0x73, 0x73, 0x12, 0x60, 0x0a, 0x04, 0x64, 0x6f, 0x6e, 0x65, 0x18, - 0x04, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x4a, 0x2e, 0x72, 0x65, 0x64, 0x70, 0x61, 0x6e, 0x64, 0x61, - 0x2e, 0x61, 0x70, 0x69, 0x2e, 0x63, 0x6f, 0x6e, 0x73, 0x6f, 0x6c, 0x65, 0x2e, 0x76, 0x31, 0x61, - 0x6c, 0x70, 0x68, 0x61, 0x31, 0x2e, 0x4c, 0x69, 0x73, 0x74, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, - 0x65, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x2e, 0x53, 0x74, 0x72, 0x65, 0x61, - 0x6d, 0x43, 0x6f, 0x6d, 0x70, 0x6c, 0x65, 0x74, 0x65, 0x64, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, - 0x65, 0x48, 0x00, 0x52, 0x04, 0x64, 0x6f, 0x6e, 0x65, 0x12, 0x58, 0x0a, 0x05, 0x65, 0x72, 0x72, - 0x6f, 0x72, 0x18, 0x05, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x40, 0x2e, 0x72, 0x65, 0x64, 0x70, 0x61, - 0x6e, 0x64, 0x61, 0x2e, 0x61, 0x70, 0x69, 0x2e, 0x63, 0x6f, 0x6e, 0x73, 0x6f, 0x6c, 0x65, 0x2e, - 0x76, 0x31, 0x61, 0x6c, 0x70, 0x68, 0x61, 0x31, 0x2e, 0x4c, 0x69, 0x73, 0x74, 0x4d, 0x65, 0x73, - 0x73, 0x61, 0x67, 0x65, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x2e, 0x45, 0x72, - 0x72, 0x6f, 0x72, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x48, 0x00, 0x52, 0x05, 0x65, 0x72, - 0x72, 0x6f, 0x72, 0x1a, 0xbd, 0x03, 0x0a, 0x0b, 0x44, 0x61, 0x74, 0x61, 0x4d, 0x65, 0x73, 0x73, - 0x61, 0x67, 0x65, 0x12, 0x21, 0x0a, 0x0c, 0x70, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, - 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x05, 0x52, 0x0b, 0x70, 0x61, 0x72, 0x74, 0x69, - 0x74, 0x69, 0x6f, 0x6e, 0x49, 0x64, 0x12, 0x16, 0x0a, 0x06, 0x6f, 0x66, 0x66, 0x73, 0x65, 0x74, - 0x18, 0x02, 0x20, 0x01, 0x28, 0x03, 0x52, 0x06, 0x6f, 0x66, 0x66, 0x73, 0x65, 0x74, 0x12, 0x1c, - 0x0a, 0x09, 0x74, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x18, 0x03, 0x20, 0x01, 0x28, - 0x03, 0x52, 0x09, 0x74, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x12, 0x50, 0x0a, 0x0b, - 0x63, 0x6f, 0x6d, 0x70, 0x72, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x18, 0x04, 0x20, 0x01, 0x28, - 0x0e, 0x32, 0x2e, 0x2e, 0x72, 0x65, 0x64, 0x70, 0x61, 0x6e, 0x64, 0x61, 0x2e, 0x61, 0x70, 0x69, + 0x73, 0x73, 0x61, 0x67, 0x65, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x2e, 0x44, + 0x61, 0x74, 0x61, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x48, 0x00, 0x52, 0x04, 0x64, 0x61, + 0x74, 0x61, 0x12, 0x58, 0x0a, 0x05, 0x70, 0x68, 0x61, 0x73, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, + 0x0b, 0x32, 0x40, 0x2e, 0x72, 0x65, 0x64, 0x70, 0x61, 0x6e, 0x64, 0x61, 0x2e, 0x61, 0x70, 0x69, 0x2e, 0x63, 0x6f, 0x6e, 0x73, 0x6f, 0x6c, 0x65, 0x2e, 0x76, 0x31, 0x61, 0x6c, 0x70, 0x68, 0x61, - 0x31, 0x2e, 0x43, 0x6f, 0x6d, 0x70, 0x72, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x54, 0x79, 0x70, - 0x65, 0x52, 0x0b, 0x63, 0x6f, 0x6d, 0x70, 0x72, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x12, 0x29, - 0x0a, 0x10, 0x69, 0x73, 0x5f, 0x74, 0x72, 0x61, 0x6e, 0x73, 0x61, 0x63, 0x74, 0x69, 0x6f, 0x6e, - 0x61, 0x6c, 0x18, 0x05, 0x20, 0x01, 0x28, 0x08, 0x52, 0x0f, 0x69, 0x73, 0x54, 0x72, 0x61, 0x6e, - 0x73, 0x61, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x61, 0x6c, 0x12, 0x4a, 0x0a, 0x07, 0x68, 0x65, 0x61, - 0x64, 0x65, 0x72, 0x73, 0x18, 0x06, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x30, 0x2e, 0x72, 0x65, 0x64, + 0x31, 0x2e, 0x4c, 0x69, 0x73, 0x74, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x73, 0x52, 0x65, + 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x2e, 0x50, 0x68, 0x61, 0x73, 0x65, 0x4d, 0x65, 0x73, 0x73, + 0x61, 0x67, 0x65, 0x48, 0x00, 0x52, 0x05, 0x70, 0x68, 0x61, 0x73, 0x65, 0x12, 0x61, 0x0a, 0x08, + 0x70, 0x72, 0x6f, 0x67, 0x72, 0x65, 0x73, 0x73, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x43, + 0x2e, 0x72, 0x65, 0x64, 0x70, 0x61, 0x6e, 0x64, 0x61, 0x2e, 0x61, 0x70, 0x69, 0x2e, 0x63, 0x6f, + 0x6e, 0x73, 0x6f, 0x6c, 0x65, 0x2e, 0x76, 0x31, 0x61, 0x6c, 0x70, 0x68, 0x61, 0x31, 0x2e, 0x4c, + 0x69, 0x73, 0x74, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, + 0x6e, 0x73, 0x65, 0x2e, 0x50, 0x72, 0x6f, 0x67, 0x72, 0x65, 0x73, 0x73, 0x4d, 0x65, 0x73, 0x73, + 0x61, 0x67, 0x65, 0x48, 0x00, 0x52, 0x08, 0x70, 0x72, 0x6f, 0x67, 0x72, 0x65, 0x73, 0x73, 0x12, + 0x60, 0x0a, 0x04, 0x64, 0x6f, 0x6e, 0x65, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x4a, 0x2e, + 0x72, 0x65, 0x64, 0x70, 0x61, 0x6e, 0x64, 0x61, 0x2e, 0x61, 0x70, 0x69, 0x2e, 0x63, 0x6f, 0x6e, + 0x73, 0x6f, 0x6c, 0x65, 0x2e, 0x76, 0x31, 0x61, 0x6c, 0x70, 0x68, 0x61, 0x31, 0x2e, 0x4c, 0x69, + 0x73, 0x74, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, + 0x73, 0x65, 0x2e, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x43, 0x6f, 0x6d, 0x70, 0x6c, 0x65, 0x74, + 0x65, 0x64, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x48, 0x00, 0x52, 0x04, 0x64, 0x6f, 0x6e, + 0x65, 0x12, 0x58, 0x0a, 0x05, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x18, 0x05, 0x20, 0x01, 0x28, 0x0b, + 0x32, 0x40, 0x2e, 0x72, 0x65, 0x64, 0x70, 0x61, 0x6e, 0x64, 0x61, 0x2e, 0x61, 0x70, 0x69, 0x2e, + 0x63, 0x6f, 0x6e, 0x73, 0x6f, 0x6c, 0x65, 0x2e, 0x76, 0x31, 0x61, 0x6c, 0x70, 0x68, 0x61, 0x31, + 0x2e, 0x4c, 0x69, 0x73, 0x74, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x73, 0x52, 0x65, 0x73, + 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x2e, 0x45, 0x72, 0x72, 0x6f, 0x72, 0x4d, 0x65, 0x73, 0x73, 0x61, + 0x67, 0x65, 0x48, 0x00, 0x52, 0x05, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x1a, 0xbd, 0x03, 0x0a, 0x0b, + 0x44, 0x61, 0x74, 0x61, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x12, 0x21, 0x0a, 0x0c, 0x70, + 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, + 0x05, 0x52, 0x0b, 0x70, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x49, 0x64, 0x12, 0x16, + 0x0a, 0x06, 0x6f, 0x66, 0x66, 0x73, 0x65, 0x74, 0x18, 0x02, 0x20, 0x01, 0x28, 0x03, 0x52, 0x06, + 0x6f, 0x66, 0x66, 0x73, 0x65, 0x74, 0x12, 0x1c, 0x0a, 0x09, 0x74, 0x69, 0x6d, 0x65, 0x73, 0x74, + 0x61, 0x6d, 0x70, 0x18, 0x03, 0x20, 0x01, 0x28, 0x03, 0x52, 0x09, 0x74, 0x69, 0x6d, 0x65, 0x73, + 0x74, 0x61, 0x6d, 0x70, 0x12, 0x50, 0x0a, 0x0b, 0x63, 0x6f, 0x6d, 0x70, 0x72, 0x65, 0x73, 0x73, + 0x69, 0x6f, 0x6e, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0e, 0x32, 0x2e, 0x2e, 0x72, 0x65, 0x64, 0x70, + 0x61, 0x6e, 0x64, 0x61, 0x2e, 0x61, 0x70, 0x69, 0x2e, 0x63, 0x6f, 0x6e, 0x73, 0x6f, 0x6c, 0x65, + 0x2e, 0x76, 0x31, 0x61, 0x6c, 0x70, 0x68, 0x61, 0x31, 0x2e, 0x43, 0x6f, 0x6d, 0x70, 0x72, 0x65, + 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x54, 0x79, 0x70, 0x65, 0x52, 0x0b, 0x63, 0x6f, 0x6d, 0x70, 0x72, + 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x12, 0x29, 0x0a, 0x10, 0x69, 0x73, 0x5f, 0x74, 0x72, 0x61, + 0x6e, 0x73, 0x61, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x61, 0x6c, 0x18, 0x05, 0x20, 0x01, 0x28, 0x08, + 0x52, 0x0f, 0x69, 0x73, 0x54, 0x72, 0x61, 0x6e, 0x73, 0x61, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x61, + 0x6c, 0x12, 0x4a, 0x0a, 0x07, 0x68, 0x65, 0x61, 0x64, 0x65, 0x72, 0x73, 0x18, 0x06, 0x20, 0x03, + 0x28, 0x0b, 0x32, 0x30, 0x2e, 0x72, 0x65, 0x64, 0x70, 0x61, 0x6e, 0x64, 0x61, 0x2e, 0x61, 0x70, + 0x69, 0x2e, 0x63, 0x6f, 0x6e, 0x73, 0x6f, 0x6c, 0x65, 0x2e, 0x76, 0x31, 0x61, 0x6c, 0x70, 0x68, + 0x61, 0x31, 0x2e, 0x4b, 0x61, 0x66, 0x6b, 0x61, 0x52, 0x65, 0x63, 0x6f, 0x72, 0x64, 0x48, 0x65, + 0x61, 0x64, 0x65, 0x72, 0x52, 0x07, 0x68, 0x65, 0x61, 0x64, 0x65, 0x72, 0x73, 0x12, 0x43, 0x0a, + 0x03, 0x6b, 0x65, 0x79, 0x18, 0x07, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x31, 0x2e, 0x72, 0x65, 0x64, 0x70, 0x61, 0x6e, 0x64, 0x61, 0x2e, 0x61, 0x70, 0x69, 0x2e, 0x63, 0x6f, 0x6e, 0x73, 0x6f, 0x6c, 0x65, 0x2e, 0x76, 0x31, 0x61, 0x6c, 0x70, 0x68, 0x61, 0x31, 0x2e, 0x4b, 0x61, 0x66, 0x6b, 0x61, - 0x52, 0x65, 0x63, 0x6f, 0x72, 0x64, 0x48, 0x65, 0x61, 0x64, 0x65, 0x72, 0x52, 0x07, 0x68, 0x65, - 0x61, 0x64, 0x65, 0x72, 0x73, 0x12, 0x43, 0x0a, 0x03, 0x6b, 0x65, 0x79, 0x18, 0x07, 0x20, 0x01, - 0x28, 0x0b, 0x32, 0x31, 0x2e, 0x72, 0x65, 0x64, 0x70, 0x61, 0x6e, 0x64, 0x61, 0x2e, 0x61, 0x70, - 0x69, 0x2e, 0x63, 0x6f, 0x6e, 0x73, 0x6f, 0x6c, 0x65, 0x2e, 0x76, 0x31, 0x61, 0x6c, 0x70, 0x68, - 0x61, 0x31, 0x2e, 0x4b, 0x61, 0x66, 0x6b, 0x61, 0x52, 0x65, 0x63, 0x6f, 0x72, 0x64, 0x50, 0x61, - 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x52, 0x03, 0x6b, 0x65, 0x79, 0x12, 0x47, 0x0a, 0x05, 0x76, 0x61, - 0x6c, 0x75, 0x65, 0x18, 0x08, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x31, 0x2e, 0x72, 0x65, 0x64, 0x70, - 0x61, 0x6e, 0x64, 0x61, 0x2e, 0x61, 0x70, 0x69, 0x2e, 0x63, 0x6f, 0x6e, 0x73, 0x6f, 0x6c, 0x65, - 0x2e, 0x76, 0x31, 0x61, 0x6c, 0x70, 0x68, 0x61, 0x31, 0x2e, 0x4b, 0x61, 0x66, 0x6b, 0x61, 0x52, - 0x65, 0x63, 0x6f, 0x72, 0x64, 0x50, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x52, 0x05, 0x76, 0x61, - 0x6c, 0x75, 0x65, 0x1a, 0x24, 0x0a, 0x0c, 0x50, 0x68, 0x61, 0x73, 0x65, 0x4d, 0x65, 0x73, 0x73, - 0x61, 0x67, 0x65, 0x12, 0x14, 0x0a, 0x05, 0x70, 0x68, 0x61, 0x73, 0x65, 0x18, 0x01, 0x20, 0x01, - 0x28, 0x09, 0x52, 0x05, 0x70, 0x68, 0x61, 0x73, 0x65, 0x1a, 0x65, 0x0a, 0x0f, 0x50, 0x72, 0x6f, - 0x67, 0x72, 0x65, 0x73, 0x73, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x12, 0x2b, 0x0a, 0x11, - 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x73, 0x5f, 0x63, 0x6f, 0x6e, 0x73, 0x75, 0x6d, 0x65, - 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x03, 0x52, 0x10, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, - 0x73, 0x43, 0x6f, 0x6e, 0x73, 0x75, 0x6d, 0x65, 0x64, 0x12, 0x25, 0x0a, 0x0e, 0x62, 0x79, 0x74, - 0x65, 0x73, 0x5f, 0x63, 0x6f, 0x6e, 0x73, 0x75, 0x6d, 0x65, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, - 0x03, 0x52, 0x0d, 0x62, 0x79, 0x74, 0x65, 0x73, 0x43, 0x6f, 0x6e, 0x73, 0x75, 0x6d, 0x65, 0x64, - 0x1a, 0xae, 0x01, 0x0a, 0x16, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x43, 0x6f, 0x6d, 0x70, 0x6c, - 0x65, 0x74, 0x65, 0x64, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x12, 0x1d, 0x0a, 0x0a, 0x65, - 0x6c, 0x61, 0x70, 0x73, 0x65, 0x64, 0x5f, 0x6d, 0x73, 0x18, 0x01, 0x20, 0x01, 0x28, 0x03, 0x52, - 0x09, 0x65, 0x6c, 0x61, 0x70, 0x73, 0x65, 0x64, 0x4d, 0x73, 0x12, 0x21, 0x0a, 0x0c, 0x69, 0x73, - 0x5f, 0x63, 0x61, 0x6e, 0x63, 0x65, 0x6c, 0x6c, 0x65, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x08, - 0x52, 0x0b, 0x69, 0x73, 0x43, 0x61, 0x6e, 0x63, 0x65, 0x6c, 0x6c, 0x65, 0x64, 0x12, 0x2b, 0x0a, - 0x11, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x73, 0x5f, 0x63, 0x6f, 0x6e, 0x73, 0x75, 0x6d, - 0x65, 0x64, 0x18, 0x03, 0x20, 0x01, 0x28, 0x03, 0x52, 0x10, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, - 0x65, 0x73, 0x43, 0x6f, 0x6e, 0x73, 0x75, 0x6d, 0x65, 0x64, 0x12, 0x25, 0x0a, 0x0e, 0x62, 0x79, - 0x74, 0x65, 0x73, 0x5f, 0x63, 0x6f, 0x6e, 0x73, 0x75, 0x6d, 0x65, 0x64, 0x18, 0x04, 0x20, 0x01, - 0x28, 0x03, 0x52, 0x0d, 0x62, 0x79, 0x74, 0x65, 0x73, 0x43, 0x6f, 0x6e, 0x73, 0x75, 0x6d, 0x65, - 0x64, 0x1a, 0x28, 0x0a, 0x0c, 0x45, 0x72, 0x72, 0x6f, 0x72, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, + 0x52, 0x65, 0x63, 0x6f, 0x72, 0x64, 0x50, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x52, 0x03, 0x6b, + 0x65, 0x79, 0x12, 0x47, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x08, 0x20, 0x01, 0x28, + 0x0b, 0x32, 0x31, 0x2e, 0x72, 0x65, 0x64, 0x70, 0x61, 0x6e, 0x64, 0x61, 0x2e, 0x61, 0x70, 0x69, + 0x2e, 0x63, 0x6f, 0x6e, 0x73, 0x6f, 0x6c, 0x65, 0x2e, 0x76, 0x31, 0x61, 0x6c, 0x70, 0x68, 0x61, + 0x31, 0x2e, 0x4b, 0x61, 0x66, 0x6b, 0x61, 0x52, 0x65, 0x63, 0x6f, 0x72, 0x64, 0x50, 0x61, 0x79, + 0x6c, 0x6f, 0x61, 0x64, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x1a, 0x24, 0x0a, 0x0c, 0x50, + 0x68, 0x61, 0x73, 0x65, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x12, 0x14, 0x0a, 0x05, 0x70, + 0x68, 0x61, 0x73, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x70, 0x68, 0x61, 0x73, + 0x65, 0x1a, 0x65, 0x0a, 0x0f, 0x50, 0x72, 0x6f, 0x67, 0x72, 0x65, 0x73, 0x73, 0x4d, 0x65, 0x73, + 0x73, 0x61, 0x67, 0x65, 0x12, 0x2b, 0x0a, 0x11, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x73, + 0x5f, 0x63, 0x6f, 0x6e, 0x73, 0x75, 0x6d, 0x65, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x03, 0x52, + 0x10, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x73, 0x43, 0x6f, 0x6e, 0x73, 0x75, 0x6d, 0x65, + 0x64, 0x12, 0x25, 0x0a, 0x0e, 0x62, 0x79, 0x74, 0x65, 0x73, 0x5f, 0x63, 0x6f, 0x6e, 0x73, 0x75, + 0x6d, 0x65, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x03, 0x52, 0x0d, 0x62, 0x79, 0x74, 0x65, 0x73, + 0x43, 0x6f, 0x6e, 0x73, 0x75, 0x6d, 0x65, 0x64, 0x1a, 0xd6, 0x01, 0x0a, 0x16, 0x53, 0x74, 0x72, + 0x65, 0x61, 0x6d, 0x43, 0x6f, 0x6d, 0x70, 0x6c, 0x65, 0x74, 0x65, 0x64, 0x4d, 0x65, 0x73, 0x73, + 0x61, 0x67, 0x65, 0x12, 0x1d, 0x0a, 0x0a, 0x65, 0x6c, 0x61, 0x70, 0x73, 0x65, 0x64, 0x5f, 0x6d, + 0x73, 0x18, 0x01, 0x20, 0x01, 0x28, 0x03, 0x52, 0x09, 0x65, 0x6c, 0x61, 0x70, 0x73, 0x65, 0x64, + 0x4d, 0x73, 0x12, 0x21, 0x0a, 0x0c, 0x69, 0x73, 0x5f, 0x63, 0x61, 0x6e, 0x63, 0x65, 0x6c, 0x6c, + 0x65, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x08, 0x52, 0x0b, 0x69, 0x73, 0x43, 0x61, 0x6e, 0x63, + 0x65, 0x6c, 0x6c, 0x65, 0x64, 0x12, 0x2b, 0x0a, 0x11, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, + 0x73, 0x5f, 0x63, 0x6f, 0x6e, 0x73, 0x75, 0x6d, 0x65, 0x64, 0x18, 0x03, 0x20, 0x01, 0x28, 0x03, + 0x52, 0x10, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x73, 0x43, 0x6f, 0x6e, 0x73, 0x75, 0x6d, + 0x65, 0x64, 0x12, 0x25, 0x0a, 0x0e, 0x62, 0x79, 0x74, 0x65, 0x73, 0x5f, 0x63, 0x6f, 0x6e, 0x73, + 0x75, 0x6d, 0x65, 0x64, 0x18, 0x04, 0x20, 0x01, 0x28, 0x03, 0x52, 0x0d, 0x62, 0x79, 0x74, 0x65, + 0x73, 0x43, 0x6f, 0x6e, 0x73, 0x75, 0x6d, 0x65, 0x64, 0x12, 0x26, 0x0a, 0x0f, 0x6e, 0x65, 0x78, + 0x74, 0x5f, 0x70, 0x61, 0x67, 0x65, 0x5f, 0x74, 0x6f, 0x6b, 0x65, 0x6e, 0x18, 0x05, 0x20, 0x01, + 0x28, 0x09, 0x52, 0x0d, 0x6e, 0x65, 0x78, 0x74, 0x50, 0x61, 0x67, 0x65, 0x54, 0x6f, 0x6b, 0x65, + 0x6e, 0x1a, 0x28, 0x0a, 0x0c, 0x45, 0x72, 0x72, 0x6f, 0x72, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x12, 0x18, 0x0a, 0x07, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x42, 0x11, 0x0a, 0x0f, 0x63, 0x6f, 0x6e, 0x74, 0x72, 0x6f, 0x6c, 0x5f, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x22, 0xd8, diff --git a/frontend/src/components/pages/topics/Tab.Messages/index.tsx b/frontend/src/components/pages/topics/Tab.Messages/index.tsx index f83c46f427..7db1c009ff 100644 --- a/frontend/src/components/pages/topics/Tab.Messages/index.tsx +++ b/frontend/src/components/pages/topics/Tab.Messages/index.tsx @@ -40,6 +40,7 @@ import { MenuDivider, MenuItem, MenuList, + Spinner, Tooltip, useBreakpoint, useToast, @@ -194,6 +195,22 @@ const defaultSelectChakraStyles = { }), } as const; +const maxResultsSelectChakraStyles = { + control: (provided: Record) => ({ + ...provided, + minWidth: '140px', // Ensure enough width for "∞ Unlimited" + }), + option: (provided: Record) => ({ + ...provided, + wordBreak: 'keep-all', + whiteSpace: 'nowrap', + }), + menuList: (provided: Record) => ({ + ...provided, + minWidth: '140px', // Match control width + }), +} as const; + const inlineSelectChakraStyles = { ...defaultSelectChakraStyles, control: (provided: Record) => ({ @@ -451,8 +468,23 @@ export const TopicMessageView: FC = (props) => { const [totalMessagesConsumed, setTotalMessagesConsumed] = useState(0); const [elapsedMs, setElapsedMs] = useState(null); + // Pagination state + const [messageSearch, setMessageSearch] = useState | null>(null); + const [isLoadingMore, setIsLoadingMore] = useState(false); + const currentSearchRunRef = useRef(null); const abortControllerRef = useRef(null); + const prevStartOffsetRef = useRef(startOffset); + const prevMaxResultsRef = useRef(maxResults); + const prevPageIndexRef = useRef(pageIndex); + const [forceRefresh, setForceRefresh] = useState(0); + + // Refs for tracking loadMore state to prevent stale closures and memory leaks + const currentMessageSearchRef = useRef | null>(null); + const loadMoreAbortRef = useRef(null); + const [loadMoreFailures, setLoadMoreFailures] = useState(0); + const isMountedRef = useRef(true); + const MAX_LOAD_MORE_RETRIES = 3; // Filter messages based on quick search const filteredMessages = quickSearch @@ -472,16 +504,65 @@ export const TopicMessageView: FC = (props) => { [topicSettings?.previewTags] ); + // Keep currentMessageSearchRef in sync with messageSearch state + useEffect(() => { + currentMessageSearchRef.current = messageSearch; + }, [messageSearch]); + // Cleanup effect (replaces componentWillUnmount) - useEffect( - () => () => { + useEffect(() => { + isMountedRef.current = true; + return () => { + isMountedRef.current = false; if (abortControllerRef.current) { abortControllerRef.current.abort(); } + if (loadMoreAbortRef.current) { + loadMoreAbortRef.current.abort(); + } appGlobal.searchMessagesFunc = undefined; - }, - [] - ); + }; + }, []); + + // Clear sorting when entering unlimited pagination mode + useEffect(() => { + if (maxResults === -1 && sorting.length > 0) { + setSortingState([]); + } + }, [maxResults, sorting.length, setSortingState]); + + // Reset to page 1 when start offset changes (e.g., switching from Newest to Beginning) + useEffect(() => { + // Only reset if startOffset actually changed (not on initial mount or re-renders) + if (prevStartOffsetRef.current !== startOffset) { + setPageIndex(0); + prevStartOffsetRef.current = startOffset; + } + }, [startOffset, setPageIndex]); + + // Reset to page 1 when max results changes (e.g., switching from Unlimited to fixed size) + useEffect(() => { + // Only reset if maxResults actually changed (not on initial mount or re-renders) + if (prevMaxResultsRef.current !== maxResults) { + setPageIndex(0); + prevMaxResultsRef.current = maxResults; + } + }, [maxResults, setPageIndex]); + + // Reset maxResults to 50 when switching to Latest/Live from unlimited + const prevStartOffsetForMaxResultsRef = useRef(startOffset); + useEffect(() => { + // Only reset if we just switched TO Latest/Live (not already on it) + const justSwitchedToLatestLive = + prevStartOffsetForMaxResultsRef.current !== PartitionOffsetOrigin.End && + startOffset === PartitionOffsetOrigin.End; + + if (justSwitchedToLatestLive && maxResults === -1) { + setMaxResults(50); + } + + prevStartOffsetForMaxResultsRef.current = startOffset; + }, [startOffset, maxResults, setMaxResults]); // Convert executeMessageSearch to useCallback // biome-ignore lint/complexity/noExcessiveCognitiveComplexity: complex business logic @@ -520,12 +601,17 @@ export const TopicMessageView: FC = (props) => { } } + // Calculate backend page size: for pagination mode (maxResults === -1), + // fetch exactly 1 page at a time to handle compaction gaps reliably + const backendPageSize = maxResults === -1 ? pageSize : undefined; + const request = { topicName: props.topic.topicName, partitionId: partitionID, startOffset, startTimestamp: currentSearchParams?.startTimestamp ?? uiState.topicSettings.searchParams.startTimestamp, maxResults, + pageSize: backendPageSize, filterInterpreterCode: encodeBase64(sanitizeString(filterCode)), includeRawPayload: true, keyDeserializer, @@ -536,10 +622,11 @@ export const TopicMessageView: FC = (props) => { setFetchError(null); setSearchPhase('Searching...'); - const messageSearch = createMessageSearch(); + const search = createMessageSearch(); + setMessageSearch(search); const startTime = Date.now(); - const result = await messageSearch.startSearch(request, abortSignal).catch((err: Error) => { + const result = await search.startSearch(request, abortSignal).catch((err: Error) => { const msg = err.message ?? String(err); // biome-ignore lint/suspicious/noConsole: intentional console usage console.error(`error in searchTopicMessages: ${msg}`); @@ -552,8 +639,8 @@ export const TopicMessageView: FC = (props) => { setMessages(result); setSearchPhase(null); setElapsedMs(endTime - startTime); - setBytesConsumed(messageSearch.bytesConsumed); - setTotalMessagesConsumed(messageSearch.totalMessagesConsumed); + setBytesConsumed(search.bytesConsumed); + setTotalMessagesConsumed(search.totalMessagesConsumed); return result; } catch (error: unknown) { @@ -569,6 +656,7 @@ export const TopicMessageView: FC = (props) => { partitionID, startOffset, maxResults, + pageSize, getSearchParams, keyDeserializer, valueDeserializer, @@ -628,6 +716,7 @@ export const TopicMessageView: FC = (props) => { ); // Auto search when parameters change + // biome-ignore lint/correctness/useExhaustiveDependencies: forceRefresh is intentionally watched to trigger forced re-search useEffect(() => { // Set up auto-search with 100ms delay const timer = setTimeout(() => { @@ -637,7 +726,99 @@ export const TopicMessageView: FC = (props) => { appGlobal.searchMessagesFunc = searchFunc; return () => clearTimeout(timer); - }, [searchFunc]); + }, [searchFunc, forceRefresh]); + + // Auto-load more messages when user reaches the last loaded page (pagination mode only) + useEffect(() => { + // Only auto-load in pagination mode (maxResults === -1) and when filters are not active + if ( + maxResults !== -1 || + filters.length > 0 || + !messageSearch || + !messageSearch.nextPageToken || + isLoadingMore || + searchPhase || + loadMoreFailures >= MAX_LOAD_MORE_RETRIES + ) { + return; + } + + const totalLoadedPages = Math.ceil(messages.length / pageSize); + const isOnLastPage = pageIndex >= totalLoadedPages - 1; + + if (isOnLastPage && messageSearch.nextPageToken) { + // Create abort controller for this loadMore operation + const abortController = new AbortController(); + loadMoreAbortRef.current = abortController; + + // Capture the current messageSearch reference to detect stale responses + const capturedMessageSearch = messageSearch; + + setIsLoadingMore(true); + capturedMessageSearch + .loadMore() + .then(() => { + // Only update state if component is still mounted and this is still the current search + if (isMountedRef.current && currentMessageSearchRef.current === capturedMessageSearch) { + setMessages([...capturedMessageSearch.messages]); + // Reset failure count on success + setLoadMoreFailures(0); + } + }) + .catch((err) => { + // Only show error if component is still mounted and not aborted + if (isMountedRef.current && !abortController.signal.aborted) { + setLoadMoreFailures((prev) => prev + 1); + toast({ + title: 'Failed to load more messages', + description: (err as Error).message, + status: 'error', + duration: 5000, + isClosable: true, + }); + } + }) + .finally(() => { + if (isMountedRef.current) { + setIsLoadingMore(false); + } + if (loadMoreAbortRef.current === abortController) { + loadMoreAbortRef.current = null; + } + }); + } + }, [ + pageIndex, + maxResults, + filters.length, + messageSearch, + isLoadingMore, + searchPhase, + messages.length, + pageSize, + toast, + loadMoreFailures, + ]); + + // Reset pagination when navigating back to page 1 in unlimited mode + // This prevents keeping many pages in memory and triggering excessive requests + useEffect(() => { + // Check if we're in unlimited mode and user navigated back to page 1 from a higher page + // Use ref to check messageSearch existence to avoid circular dependency + if (maxResults === -1 && pageIndex === 0 && prevPageIndexRef.current > 1 && currentMessageSearchRef.current) { + // Clear the message search and state + setMessages([]); + setMessageSearch(null); + // Reset failure count when resetting pagination + setLoadMoreFailures(0); + // Clear the search run ref and trigger a forced refresh + currentSearchRunRef.current = null; + setForceRefresh((prev) => prev + 1); + } + prevPageIndexRef.current = pageIndex; + // Note: messageSearch intentionally excluded to avoid circular dependency + // We use currentMessageSearchRef.current instead which is always in sync + }, [pageIndex, maxResults]); // Message Table rendering variables and functions const paginationParams = { @@ -670,6 +851,7 @@ export const TopicMessageView: FC = (props) => { timestamp: { header: 'Timestamp', accessorKey: 'timestamp', + enableSorting: maxResults !== -1, // Disable sorting in unlimited pagination mode cell: ({ row: { original: { timestamp }, @@ -696,6 +878,7 @@ export const TopicMessageView: FC = (props) => { ), size: hasKeyTags ? 300 : 1, accessorKey: 'key', + enableSorting: maxResults !== -1, // Disable sorting in unlimited pagination mode cell: ({ row: { original } }) => ( = (props) => { 'Value' ), accessorKey: 'value', + enableSorting: maxResults !== -1, // Disable sorting in unlimited pagination mode cell: ({ row: { original } }) => ( = (props) => { label: ( - {`Newest - ${String(maxResults)}`} + + {maxResults === -1 ? 'Newest' : `Newest - ${String(maxResults)}`} + ), }, @@ -942,11 +1128,23 @@ export const TopicMessageView: FC = (props) => { @@ -1020,10 +1218,10 @@ export const TopicMessageView: FC = (props) => { {Boolean(searchPhase && searchPhase.length > 0) && ( @@ -1197,6 +1395,11 @@ export const TopicMessageView: FC = (props) => { } )} onSortingChange={(newSorting) => { + // Don't allow sorting changes in unlimited pagination mode + if (maxResults === -1) { + return; + } + const updatedSorting: SortingState = typeof newSorting === 'function' ? newSorting(sorting) : newSorting; setSortingState(updatedSorting); }} @@ -1224,6 +1427,25 @@ export const TopicMessageView: FC = (props) => { /> )} /> + + {/* Warning when filters are active with pagination */} + {maxResults === -1 && filters.length > 0 && messages.length > 0 && ( + + + + Auto-pagination is disabled when filters are active. Remove filters to enable automatic loading. + + + )} + + {/* Loading indicator when fetching more pages */} + {maxResults === -1 && isLoadingMore && ( + + + Loading more messages... + + )} +