Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions backend/pkg/api/connect/service/console/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ func (api *Service) ListMessages(
MaxResults: int(req.Msg.GetMaxResults()),
FilterInterpreterCode: req.Msg.GetFilterInterpreterCode(),
Enterprise: req.Msg.GetEnterprise(),
PageToken: req.Msg.GetPageToken(),
}

interpreterCode, err := lmq.DecodeInterpreterCode()
Expand Down Expand Up @@ -96,6 +97,8 @@ func (api *Service) ListMessages(
IgnoreMaxSizeLimit: req.Msg.GetIgnoreMaxSizeLimit(),
KeyDeserializer: fromProtoEncoding(req.Msg.GetKeyDeserializer()),
ValueDeserializer: fromProtoEncoding(req.Msg.GetValueDeserializer()),
PageToken: lmq.PageToken,
PageSize: int(req.Msg.GetPageSize()),
}

timeout := 35 * time.Second
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ func (p *streamProgressReporter) OnMessage(message *console.TopicMessage) {
}
}

func (p *streamProgressReporter) OnComplete(elapsedMs int64, isCancelled bool) {
func (p *streamProgressReporter) OnComplete(elapsedMs int64, isCancelled bool, nextPageToken string) {
p.writeMutex.Lock()
defer p.writeMutex.Unlock()

Expand All @@ -225,6 +225,7 @@ func (p *streamProgressReporter) OnComplete(elapsedMs int64, isCancelled bool) {
IsCancelled: isCancelled,
MessagesConsumed: p.messagesConsumed.Load(),
BytesConsumed: p.bytesConsumed.Load(),
NextPageToken: nextPageToken,
}

if err := p.stream.Send(
Expand Down
221 changes: 221 additions & 0 deletions backend/pkg/api/handle_topic_messages_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"connectrpc.com/connect"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/twmb/franz-go/pkg/kadm"
"github.com/twmb/franz-go/pkg/kgo"
"github.com/twmb/franz-go/pkg/sr"
"google.golang.org/genproto/googleapis/rpc/errdetails"
Expand Down Expand Up @@ -1050,3 +1051,223 @@ func randomString(n int) string {
}
return string(b)
}

func (s *APIIntegrationTestSuite) TestListMessages_Pagination() {
t := s.T()

require := require.New(t)
assert := assert.New(t)

ctx := t.Context()

client := v1ac.NewConsoleServiceClient(
http.DefaultClient,
s.httpAddress(),
connect.WithGRPCWeb(),
)

// Create test topic with multiple messages
testTopicName := testutil.TopicNameForTest("pagination_test")
const messageCount = 150

// Create topic
_, err := s.kafkaAdminClient.CreateTopic(ctx, 3, 1, nil, testTopicName)
require.NoError(err)

defer func() {
s.kafkaAdminClient.DeleteTopics(ctx, testTopicName)
}()

// Wait for topic to be ready
require.Eventually(func() bool {
metadata, err := s.kafkaAdminClient.Metadata(ctx, testTopicName)
if err != nil {
return false
}
topic, exists := metadata.Topics[testTopicName]
return exists && topic.Err == nil && len(topic.Partitions) == 3
}, 30*time.Second, 100*time.Millisecond, "Topic should be created and ready")

// Produce messages
records := make([]*kgo.Record, messageCount)
for i := 0; i < messageCount; i++ {
records[i] = &kgo.Record{
Key: []byte(fmt.Sprintf("key-%d", i)),
Value: []byte(fmt.Sprintf(`{"id": %d, "message": "test message %d"}`, i, i)),
Topic: testTopicName,
}
}

produceResults := s.kafkaClient.ProduceSync(ctx, records...)
require.NoError(produceResults.FirstErr())

// Give Kafka a moment to commit
time.Sleep(500 * time.Millisecond)

// Debug: Check actual message distribution across partitions
offsets, err := s.kafkaAdminClient.ListEndOffsets(ctx, testTopicName)
require.NoError(err)
totalMessages := int64(0)
offsets.Each(func(offset kadm.ListedOffset) {
count := offset.Offset
totalMessages += count
t.Logf("Partition %d: %d messages (offset: 0 to %d)", offset.Partition, count, offset.Offset-1)
})
t.Logf("Total messages across all partitions: %d (expected %d)", totalMessages, messageCount)

t.Run("first page with pagination mode", func(t *testing.T) {
stream, err := client.ListMessages(ctx, connect.NewRequest(&v1pb.ListMessagesRequest{
Topic: testTopicName,
PartitionId: -1, // All partitions
StartOffset: -1, // Recent
MaxResults: -1, // Pagination mode
PageToken: "", // First page
}))
require.NoError(err)
require.NotNil(stream)

var messages []*v1pb.ListMessagesResponse_DataMessage
var done *v1pb.ListMessagesResponse_StreamCompletedMessage

for stream.Receive() {
msg := stream.Msg()
switch v := msg.ControlMessage.(type) {
case *v1pb.ListMessagesResponse_Data:
messages = append(messages, v.Data)
case *v1pb.ListMessagesResponse_Done:
done = v.Done
}
}

require.NoError(stream.Err())
require.NotNil(done, "Should have completion message")

// Verify first page results
assert.Equal(int64(50), done.MessagesConsumed, "Should consume 50 messages (default page size)")
assert.NotEmpty(done.NextPageToken, "Should have next page token")
assert.Len(messages, 50, "Should return 50 messages")

// Verify descending order within partitions
// Group messages by partition
messagesByPartition := make(map[int32][]*v1pb.ListMessagesResponse_DataMessage)
for _, msg := range messages {
messagesByPartition[msg.PartitionId] = append(messagesByPartition[msg.PartitionId], msg)
}

// Check each partition is in descending order
for partitionID, partitionMessages := range messagesByPartition {
if len(partitionMessages) > 1 {
for i := 0; i < len(partitionMessages)-1; i++ {
assert.Greater(partitionMessages[i].Offset, partitionMessages[i+1].Offset,
"Messages in partition %d should be in descending offset order", partitionID)
}
}
}
})

t.Run("paginate through all messages", func(t *testing.T) {
var allMessages []*v1pb.ListMessagesResponse_DataMessage
pageToken := ""
pageCount := 0
maxPages := 5 // Safety limit

for pageCount < maxPages {
stream, err := client.ListMessages(ctx, connect.NewRequest(&v1pb.ListMessagesRequest{
Topic: testTopicName,
PartitionId: -1,
StartOffset: -1,
MaxResults: -1,
PageToken: pageToken,
}))
require.NoError(err)
require.NotNil(stream)

var pageMessages []*v1pb.ListMessagesResponse_DataMessage
var done *v1pb.ListMessagesResponse_StreamCompletedMessage

for stream.Receive() {
msg := stream.Msg()
switch v := msg.ControlMessage.(type) {
case *v1pb.ListMessagesResponse_Data:
pageMessages = append(pageMessages, v.Data)
case *v1pb.ListMessagesResponse_Done:
done = v.Done
}
}

require.NoError(stream.Err())
require.NotNil(done)

allMessages = append(allMessages, pageMessages...)
pageCount++

t.Logf("Page %d: fetched %d messages, nextPageToken=%s", pageCount, len(pageMessages), done.NextPageToken)

if done.NextPageToken == "" {
break
}

pageToken = done.NextPageToken
}

// Verify we got all messages
assert.Equal(messageCount, len(allMessages), "Should fetch all %d messages across pages", messageCount)
assert.LessOrEqual(pageCount, 4, "Should complete in 3-4 pages (150 messages / 50 per page)")
})

t.Run("error when filter with pagination", func(t *testing.T) {
filterCode := base64.StdEncoding.EncodeToString([]byte("return true"))

stream, err := client.ListMessages(ctx, connect.NewRequest(&v1pb.ListMessagesRequest{
Topic: testTopicName,
PartitionId: -1,
StartOffset: -1,
MaxResults: -1,
PageToken: "",
FilterInterpreterCode: filterCode,
}))
require.NoError(err)
require.NotNil(stream)

// Stream should error
for stream.Receive() {
// Should not receive any messages
}

err = stream.Err()
require.Error(err, "Should error when using filter with pagination")
assert.Contains(err.Error(), "cannot use filters with pagination")
})

t.Run("legacy mode still works", func(t *testing.T) {
stream, err := client.ListMessages(ctx, connect.NewRequest(&v1pb.ListMessagesRequest{
Topic: testTopicName,
PartitionId: -1,
StartOffset: -2, // Oldest
MaxResults: 50, // Legacy mode
PageToken: "",
}))
require.NoError(err)
require.NotNil(stream)

var messages []*v1pb.ListMessagesResponse_DataMessage
var done *v1pb.ListMessagesResponse_StreamCompletedMessage

for stream.Receive() {
msg := stream.Msg()
switch v := msg.ControlMessage.(type) {
case *v1pb.ListMessagesResponse_Data:
messages = append(messages, v.Data)
case *v1pb.ListMessagesResponse_Done:
done = v.Done
}
}

require.NoError(stream.Err())
require.NotNil(done)

// In legacy mode, should not have pagination tokens
assert.Empty(done.NextPageToken, "Legacy mode should not return page token")
assert.LessOrEqual(len(messages), 50, "Should respect maxResults limit")
})
}
27 changes: 22 additions & 5 deletions backend/pkg/api/httptypes/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,15 @@ import (
// used in Console Enterprise to implement the hooks.
type ListMessagesRequest struct {
TopicName string `json:"topicName"`
StartOffset int64 `json:"startOffset"` // -1 for recent (newest - results), -2 for oldest offset, -3 for newest, -4 for timestamp
StartTimestamp int64 `json:"startTimestamp"` // Start offset by unix timestamp in ms (only considered if start offset is set to -4)
PartitionID int32 `json:"partitionId"` // -1 for all partition ids
MaxResults int `json:"maxResults"`
StartOffset int64 `json:"startOffset"` // -1 for recent (newest - results), -2 for oldest offset, -3 for newest, -4 for timestamp
StartTimestamp int64 `json:"startTimestamp"` // Start offset by unix timestamp in ms (only considered if start offset is set to -4)
PartitionID int32 `json:"partitionId"` // -1 for all partition ids
MaxResults int `json:"maxResults"` // -1 enables pagination mode, 1-500 uses legacy mode
FilterInterpreterCode string `json:"filterInterpreterCode"` // Base64 encoded code

// Pagination fields (only used when MaxResults == -1)
PageToken string `json:"pageToken,omitempty"`

// Enterprise may only be set in the Enterprise mode. The JSON deserialization is deferred
// to the enterprise backend.
Enterprise json.RawMessage `json:"enterprise,omitempty"`
Expand All @@ -38,6 +41,20 @@ func (l *ListMessagesRequest) OK() error {
return errors.New("topic name is required")
}

// Pagination mode: max_results = -1
if l.MaxResults == -1 {
if l.FilterInterpreterCode != "" {
decoded, _ := l.DecodeInterpreterCode()
if decoded != "" {
return errors.New("cannot use filters with pagination")
}
}

// PageToken validation is done in the console package DecodePageToken
return nil
}

// Legacy mode validation
if l.StartOffset < -4 {
return errors.New("start offset is smaller than -4")
}
Expand All @@ -47,7 +64,7 @@ func (l *ListMessagesRequest) OK() error {
}

if l.MaxResults <= 0 || l.MaxResults > 500 {
return errors.New("max results must be between 1 and 500")
return errors.New("max results must be between 1 and 500, or -1 for pagination")
}

if _, err := l.DecodeInterpreterCode(); err != nil {
Expand Down
Loading
Loading