Skip to content

Commit f2e5f3e

Browse files
committed
fix: resolve consumer offsets topic consumption failure
Upgrade franz-go kadm to v1.17.2 and improve partition offset error handling to gracefully skip problematic partitions instead of failing the entire request.
1 parent 9cec547 commit f2e5f3e

File tree

3 files changed

+78
-40
lines changed

3 files changed

+78
-40
lines changed

backend/go.mod

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ require (
5353
github.com/testcontainers/testcontainers-go v0.38.0
5454
github.com/testcontainers/testcontainers-go/modules/redpanda v0.38.0
5555
github.com/twmb/franz-go v1.20.6
56-
github.com/twmb/franz-go/pkg/kadm v1.17.1
56+
github.com/twmb/franz-go/pkg/kadm v1.17.2
5757
github.com/twmb/franz-go/pkg/kfake v0.0.0-20251115002817-3affad808a82
5858
github.com/twmb/franz-go/pkg/kmsg v1.12.0
5959
github.com/twmb/franz-go/pkg/sasl/kerberos v1.1.0
@@ -66,9 +66,9 @@ require (
6666
go.uber.org/mock v0.6.0
6767
go.vallahaye.net/connect-gateway v0.11.0
6868
golang.org/x/exp v0.0.0-20251113190631-e25ba8c21ef6
69-
golang.org/x/net v0.47.0
70-
golang.org/x/sync v0.18.0
71-
golang.org/x/text v0.31.0
69+
golang.org/x/net v0.48.0
70+
golang.org/x/sync v0.19.0
71+
golang.org/x/text v0.33.0
7272
google.golang.org/genproto v0.0.0-20251111163417-95abcf5c77ba
7373
google.golang.org/genproto/googleapis/api v0.0.0-20251111163417-95abcf5c77ba
7474
google.golang.org/genproto/googleapis/rpc v0.0.0-20251111163417-95abcf5c77ba
@@ -152,7 +152,7 @@ require (
152152
github.com/josharian/intern v1.0.0 // indirect
153153
github.com/json-iterator/go v1.1.12 // indirect
154154
github.com/kevinburke/ssh_config v1.4.0 // indirect
155-
github.com/klauspost/compress v1.18.2 // indirect
155+
github.com/klauspost/compress v1.18.3 // indirect
156156
github.com/klauspost/cpuid/v2 v2.3.0 // indirect
157157
github.com/knadh/koanf/maps v0.1.2 // indirect
158158
github.com/kylelemons/godebug v1.1.0 // indirect
@@ -184,7 +184,7 @@ require (
184184
github.com/opencontainers/go-digest v1.0.0 // indirect
185185
github.com/opencontainers/image-spec v1.1.1 // indirect
186186
github.com/perimeterx/marshmallow v1.1.5 // indirect
187-
github.com/pierrec/lz4/v4 v4.1.22 // indirect
187+
github.com/pierrec/lz4/v4 v4.1.25 // indirect
188188
github.com/pjbgf/sha1cd v0.5.0 // indirect
189189
github.com/pkg/errors v0.9.1 // indirect
190190
github.com/pmezard/go-difflib v1.0.0 // indirect
@@ -225,10 +225,10 @@ require (
225225
go.uber.org/zap v1.27.1 // indirect
226226
go.yaml.in/yaml/v2 v2.4.3 // indirect
227227
go.yaml.in/yaml/v3 v3.0.4 // indirect
228-
golang.org/x/crypto v0.45.0 // indirect
229-
golang.org/x/mod v0.30.0 // indirect
228+
golang.org/x/crypto v0.47.0 // indirect
229+
golang.org/x/mod v0.31.0 // indirect
230230
golang.org/x/oauth2 v0.31.0 // indirect
231-
golang.org/x/sys v0.38.0 // indirect
231+
golang.org/x/sys v0.40.0 // indirect
232232
golang.org/x/time v0.13.0 // indirect
233233
gopkg.in/linkedin/goavro.v1 v1.0.5 // indirect
234234
gopkg.in/natefinch/lumberjack.v2 v2.2.1 // indirect

backend/go.sum

Lines changed: 22 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -277,8 +277,8 @@ github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHm
277277
github.com/kevinburke/ssh_config v1.4.0 h1:6xxtP5bZ2E4NF5tuQulISpTO2z8XbtH8cg1PWkxoFkQ=
278278
github.com/kevinburke/ssh_config v1.4.0/go.mod h1:q2RIzfka+BXARoNexmF9gkxEX7DmvbW9P4hIVx2Kg4M=
279279
github.com/klauspost/compress v1.15.9/go.mod h1:PhcZ0MbTNciWF3rruxRgKxI5NkcHHrHUDtV4Yw2GlzU=
280-
github.com/klauspost/compress v1.18.2 h1:iiPHWW0YrcFgpBYhsA6D1+fqHssJscY/Tm/y2Uqnapk=
281-
github.com/klauspost/compress v1.18.2/go.mod h1:R0h/fSBs8DE4ENlcrlib3PsXS61voFxhIs2DeRhCvJ4=
280+
github.com/klauspost/compress v1.18.3 h1:9PJRvfbmTabkOX8moIpXPbMMbYN60bWImDDU7L+/6zw=
281+
github.com/klauspost/compress v1.18.3/go.mod h1:R0h/fSBs8DE4ENlcrlib3PsXS61voFxhIs2DeRhCvJ4=
282282
github.com/klauspost/cpuid/v2 v2.3.0 h1:S4CRMLnYUhGeDFDqkGriYKdfoFlDnMtqTiI/sFzhA9Y=
283283
github.com/klauspost/cpuid/v2 v2.3.0/go.mod h1:hqwkgyIinND0mEev00jJYCxPNVRVXFQeu1XKlok6oO0=
284284
github.com/klauspost/pgzip v1.2.6 h1:8RXeL5crjEUFnR2/Sn6GJNWtSQ3Dk8pq4CL3jvdDyjU=
@@ -378,8 +378,8 @@ github.com/pelletier/go-toml/v2 v2.2.4/go.mod h1:2gIqNv+qfxSVS7cM2xJQKtLSTLUE9V8
378378
github.com/perimeterx/marshmallow v1.1.5 h1:a2LALqQ1BlHM8PZblsDdidgv1mWi1DgC2UmX50IvK2s=
379379
github.com/perimeterx/marshmallow v1.1.5/go.mod h1:dsXbUu8CRzfYP5a87xpp0xq9S3u0Vchtcl8we9tYaXw=
380380
github.com/pierrec/lz4/v4 v4.1.15/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4=
381-
github.com/pierrec/lz4/v4 v4.1.22 h1:cKFw6uJDK+/gfw5BcDL0JL5aBsAFdsIT18eRtLj7VIU=
382-
github.com/pierrec/lz4/v4 v4.1.22/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4=
381+
github.com/pierrec/lz4/v4 v4.1.25 h1:kocOqRffaIbU5djlIBr7Wh+cx82C0vtFb0fOurZHqD0=
382+
github.com/pierrec/lz4/v4 v4.1.25/go.mod h1:EoQMVJgeeEOMsCqCzqFm2O0cJvljX2nGZjcRIPL34O4=
383383
github.com/pjbgf/sha1cd v0.5.0 h1:a+UkboSi1znleCDUNT3M5YxjOnN1fz2FhN48FlwCxs0=
384384
github.com/pjbgf/sha1cd v0.5.0/go.mod h1:lhpGlyHLpQZoxMv8HcgXvZEhcGs0PG/vsZnEJ7H0iCM=
385385
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
@@ -469,8 +469,8 @@ github.com/tklauser/numcpus v0.11.0/go.mod h1:z+LwcLq54uWZTX0u/bGobaV34u6V7KNlTZ
469469
github.com/twmb/franz-go v1.7.0/go.mod h1:PMze0jNfNghhih2XHbkmTFykbMF5sJqmNJB31DOOzro=
470470
github.com/twmb/franz-go v1.20.6 h1:TpQTt4QcixJ1cHEmQGPOERvTzo99s8jAutmS7rbSD6w=
471471
github.com/twmb/franz-go v1.20.6/go.mod h1:u+FzH2sInp7b9HNVv2cZN8AxdXy6y/AQ1Bkptu4c0FM=
472-
github.com/twmb/franz-go/pkg/kadm v1.17.1 h1:Bt02Y/RLgnFO2NP2HVP1kd2TFtGRiJZx+fSArjZDtpw=
473-
github.com/twmb/franz-go/pkg/kadm v1.17.1/go.mod h1:s4duQmrDbloVW9QTMXhs6mViTepze7JLG43xwPcAeTg=
472+
github.com/twmb/franz-go/pkg/kadm v1.17.2 h1:g5f1sAxnTkYC6G96pV5u715HWhxd66hWaDZUAQ8xHY8=
473+
github.com/twmb/franz-go/pkg/kadm v1.17.2/go.mod h1:ST55zUB+sUS+0y+GcKY/Tf1XxgVilaFpB9I19UubLmU=
474474
github.com/twmb/franz-go/pkg/kfake v0.0.0-20251115002817-3affad808a82 h1:0UwzcAL8jEC+gnDyO4ELYUE8kXcorWu9bhoH6M/MeFY=
475475
github.com/twmb/franz-go/pkg/kfake v0.0.0-20251115002817-3affad808a82/go.mod h1:d8HaJtUEgZfU2n+Ps/fCtzlFLtgdrlZgTWwvCqQ3eDo=
476476
github.com/twmb/franz-go/pkg/kmsg v1.2.0/go.mod h1:SxG/xJKhgPu25SamAq0rrucfp7lbzCpEXOC+vH/ELrY=
@@ -556,14 +556,14 @@ golang.org/x/crypto v0.0.0-20220622213112-05595931fe9d/go.mod h1:IxCIyHEi3zRg3s0
556556
golang.org/x/crypto v0.0.0-20220722155217-630584e8d5aa/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
557557
golang.org/x/crypto v0.0.0-20220817201139-bc19a97f63c8/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
558558
golang.org/x/crypto v0.6.0/go.mod h1:OFC/31mSvZgRz0V1QTNCzfAI1aIRzbiufJtkMIlEp58=
559-
golang.org/x/crypto v0.45.0 h1:jMBrvKuj23MTlT0bQEOBcAE0mjg8mK9RXFhRH6nyF3Q=
560-
golang.org/x/crypto v0.45.0/go.mod h1:XTGrrkGJve7CYK7J8PEww4aY7gM3qMCElcJQ8n8JdX4=
559+
golang.org/x/crypto v0.47.0 h1:V6e3FRj+n4dbpw86FJ8Fv7XVOql7TEwpHapKoMJ/GO8=
560+
golang.org/x/crypto v0.47.0/go.mod h1:ff3Y9VzzKbwSSEzWqJsJVBnWmRwRSHt/6Op5n9bQc4A=
561561
golang.org/x/exp v0.0.0-20251113190631-e25ba8c21ef6 h1:zfMcR1Cs4KNuomFFgGefv5N0czO2XZpUbxGUy8i8ug0=
562562
golang.org/x/exp v0.0.0-20251113190631-e25ba8c21ef6/go.mod h1:46edojNIoXTNOhySWIWdix628clX9ODXwPsQuG6hsK0=
563563
golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4=
564564
golang.org/x/mod v0.8.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs=
565-
golang.org/x/mod v0.30.0 h1:fDEXFVZ/fmCKProc/yAXXUijritrDzahmwwefnjoPFk=
566-
golang.org/x/mod v0.30.0/go.mod h1:lAsf5O2EvJeSFMiBxXDki7sCgAxEUcZHXoXMKT4GJKc=
565+
golang.org/x/mod v0.31.0 h1:HaW9xtz0+kOcWKwli0ZXy79Ix+UW/vOfmWI5QVd2tgI=
566+
golang.org/x/mod v0.31.0/go.mod h1:43JraMp9cGx1Rx3AqioxrbrhNsLl2l/iNAvuBkrezpg=
567567
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
568568
golang.org/x/net v0.0.0-20200114155413-6afb5195e5aa/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
569569
golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg=
@@ -575,15 +575,15 @@ golang.org/x/net v0.0.0-20220812174116-3211cb980234/go.mod h1:YDH+HFinaLZZlnHAfS
575575
golang.org/x/net v0.6.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs=
576576
golang.org/x/net v0.7.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs=
577577
golang.org/x/net v0.8.0/go.mod h1:QVkue5JL9kW//ek3r6jTKnTFis1tRmNAW2P1shuFdJc=
578-
golang.org/x/net v0.47.0 h1:Mx+4dIFzqraBXUugkia1OOvlD6LemFo1ALMHjrXDOhY=
579-
golang.org/x/net v0.47.0/go.mod h1:/jNxtkgq5yWUGYkaZGqo27cfGZ1c5Nen03aYrrKpVRU=
578+
golang.org/x/net v0.48.0 h1:zyQRTTrjc33Lhh0fBgT/H3oZq9WuvRR5gPC70xpDiQU=
579+
golang.org/x/net v0.48.0/go.mod h1:+ndRgGjkh8FGtu1w1FGbEC31if4VrNVMuKTgcAAnQRY=
580580
golang.org/x/oauth2 v0.31.0 h1:8Fq0yVZLh4j4YA47vHKFTa9Ew5XIrCP8LC6UeNZnLxo=
581581
golang.org/x/oauth2 v0.31.0/go.mod h1:lzm5WQJQwKZ3nwavOZ3IS5Aulzxi68dUSgRHujetwEA=
582582
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
583583
golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
584584
golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
585-
golang.org/x/sync v0.18.0 h1:kr88TuHDroi+UVf+0hZnirlk8o8T+4MrK6mr60WkH/I=
586-
golang.org/x/sync v0.18.0/go.mod h1:9KTHXmSnoGruLpwFjVSX0lNNA75CykiMECbovNTZqGI=
585+
golang.org/x/sync v0.19.0 h1:vV+1eWNmZ5geRlYjzm2adRgW2/mcpevXNg50YZtPCE4=
586+
golang.org/x/sync v0.19.0/go.mod h1:9KTHXmSnoGruLpwFjVSX0lNNA75CykiMECbovNTZqGI=
587587
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
588588
golang.org/x/sys v0.0.0-20190916202348-b4ddaad3f8a3/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
589589
golang.org/x/sys v0.0.0-20191026070338-33540a1f6037/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
@@ -600,30 +600,30 @@ golang.org/x/sys v0.0.0-20220728004956-3c1f35247d10/go.mod h1:oPkhp1MJrh7nUepCBc
600600
golang.org/x/sys v0.1.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
601601
golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
602602
golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
603-
golang.org/x/sys v0.38.0 h1:3yZWxaJjBmCWXqhN1qh02AkOnCQ1poK6oF+a7xWL6Gc=
604-
golang.org/x/sys v0.38.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks=
603+
golang.org/x/sys v0.40.0 h1:DBZZqJ2Rkml6QMQsZywtnjnnGvHza6BTfYFWY9kjEWQ=
604+
golang.org/x/sys v0.40.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks=
605605
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
606606
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
607607
golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k=
608608
golang.org/x/term v0.6.0/go.mod h1:m6U89DPEgQRMq3DNkDClhWw02AUbt2daBVO4cn4Hv9U=
609-
golang.org/x/term v0.37.0 h1:8EGAD0qCmHYZg6J17DvsMy9/wJ7/D/4pV/wfnld5lTU=
610-
golang.org/x/term v0.37.0/go.mod h1:5pB4lxRNYYVZuTLmy8oR2BH8dflOR+IbTYFD8fi3254=
609+
golang.org/x/term v0.39.0 h1:RclSuaJf32jOqZz74CkPA9qFuVTX7vhLlpfj/IGWlqY=
610+
golang.org/x/term v0.39.0/go.mod h1:yxzUCTP/U+FzoxfdKmLaA0RV1WgE0VY7hXBwKtY/4ww=
611611
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
612612
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
613613
golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
614614
golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ=
615615
golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8=
616616
golang.org/x/text v0.8.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8=
617-
golang.org/x/text v0.31.0 h1:aC8ghyu4JhP8VojJ2lEHBnochRno1sgL6nEi9WGFGMM=
618-
golang.org/x/text v0.31.0/go.mod h1:tKRAlv61yKIjGGHX/4tP1LTbc13YSec1pxVEWXzfoeM=
617+
golang.org/x/text v0.33.0 h1:B3njUFyqtHDUI5jMn1YIr5B0IE2U0qck04r6d4KPAxE=
618+
golang.org/x/text v0.33.0/go.mod h1:LuMebE6+rBincTi9+xWTY8TztLzKHc/9C1uBCG27+q8=
619619
golang.org/x/time v0.13.0 h1:eUlYslOIt32DgYD6utsuUeHs4d7AsEYLuIAdg7FlYgI=
620620
golang.org/x/time v0.13.0/go.mod h1:eL/Oa2bBBK0TkX57Fyni+NgnyQQN4LitPmob2Hjnqw4=
621621
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
622622
golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
623623
golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc=
624624
golang.org/x/tools v0.6.0/go.mod h1:Xwgl3UAJ/d3gWutnCtw505GrjyAbvKui8lOU390QaIU=
625-
golang.org/x/tools v0.39.0 h1:ik4ho21kwuQln40uelmciQPp9SipgNDdrafrYA4TmQQ=
626-
golang.org/x/tools v0.39.0/go.mod h1:JnefbkDPyD8UU2kI5fuf8ZX4/yUeh9W877ZeBONxUqQ=
625+
golang.org/x/tools v0.40.0 h1:yLkxfA+Qnul4cs9QA3KnlFu0lVmd8JJfoq+E41uSutA=
626+
golang.org/x/tools v0.40.0/go.mod h1:Ik/tzLRlbscWpqqMRjyWYDisX8bG13FrdXp3o4Sr9lc=
627627
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
628628
gonum.org/v1/gonum v0.16.0 h1:5+ul4Swaf3ESvrOnidPp4GZbzf0mxVQpDCYUQE7OJfk=
629629
gonum.org/v1/gonum v0.16.0/go.mod h1:fef3am4MQ93R2HHpKnLk4/Tbh/s0+wqD5nfa6Pnwy4E=

backend/pkg/console/list_messages.go

Lines changed: 47 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -206,8 +206,8 @@ func (s *Service) ListMessages(ctx context.Context, listReq ListMessageRequest,
206206
if err != nil {
207207
return fmt.Errorf("failed to get end offsets: %w", err)
208208
}
209-
if startOffsets.Error() != nil {
210-
return fmt.Errorf("failed to get start offsets: %w", startOffsets.Error())
209+
if endOffsets.Error() != nil {
210+
return fmt.Errorf("failed to get end offsets: %w", endOffsets.Error())
211211
}
212212

213213
// Get partition consume request by calculating start and end offsets for each partition
@@ -255,7 +255,7 @@ func (s *Service) ListMessages(ctx context.Context, listReq ListMessageRequest,
255255
// be returned if it fails to request the partition offsets for the given timestamp.
256256
// makes it harder to understand how the consume request is calculated in total though.
257257
//
258-
//nolint:cyclop,gocognit // This is indeed a complex function. Breaking this into multiple smaller functions possibly
258+
//nolint:cyclop,gocognit,gocyclo // This is indeed a complex function. Breaking this into multiple smaller functions possibly
259259
func (s *Service) calculateConsumeRequests(
260260
ctx context.Context,
261261
cl *kgo.Client,
@@ -284,13 +284,46 @@ func (s *Service) calculateConsumeRequests(
284284
// Init result map
285285
notInitialized := int64(-100)
286286
for _, partitionID := range partitionIDs {
287-
startOffset, exists := startOffsets.Lookup(listReq.TopicName, partitionID)
288-
if !exists {
289-
return nil, fmt.Errorf("could not find partition end offset for topic %q and partition %d", listReq.TopicName, partitionID)
287+
startOffset, startExists := startOffsets.Lookup(listReq.TopicName, partitionID)
288+
endOffset, endExists := endOffsets.Lookup(listReq.TopicName, partitionID)
289+
290+
// Check if partition offsets are missing from the response (exists=false means the
291+
// partition is not in the map, possibly due to ACL restrictions on internal topics)
292+
if !startExists {
293+
s.logger.WarnContext(ctx,
294+
"skipping partition: start offset not in response",
295+
slog.String("topic", listReq.TopicName),
296+
slog.Int("partition", int(partitionID)),
297+
)
298+
continue
290299
}
291-
endOffset, exists := endOffsets.Lookup(listReq.TopicName, partitionID)
292-
if !exists {
293-
return nil, fmt.Errorf("could not find partition end offset for topic %q and partition %d", listReq.TopicName, partitionID)
300+
if !endExists {
301+
s.logger.WarnContext(ctx,
302+
"skipping partition: end offset not in response",
303+
slog.String("topic", listReq.TopicName),
304+
slog.Int("partition", int(partitionID)),
305+
)
306+
continue
307+
}
308+
309+
// Check for errors on the partition offsets that are in the response
310+
if startOffset.Err != nil {
311+
s.logger.WarnContext(ctx,
312+
"skipping partition: failed to get start offset",
313+
slog.String("topic", listReq.TopicName),
314+
slog.Int("partition", int(partitionID)),
315+
slog.Any("error", startOffset.Err),
316+
)
317+
continue
318+
}
319+
if endOffset.Err != nil {
320+
s.logger.WarnContext(ctx,
321+
"skipping partition: failed to get end offset",
322+
slog.String("topic", listReq.TopicName),
323+
slog.Int("partition", int(partitionID)),
324+
slog.Any("error", endOffset.Err),
325+
)
326+
continue
294327
}
295328

296329
p := PartitionConsumeRequest{
@@ -358,6 +391,11 @@ func (s *Service) calculateConsumeRequests(
358391
requests[startOffset.Partition] = &p
359392
}
360393

394+
// Validate that at least one partition was successfully processed
395+
if len(requests) == 0 {
396+
return nil, fmt.Errorf("no partitions available for consumption: all partitions failed offset retrieval. Check topic availability and permissions for %q", listReq.TopicName)
397+
}
398+
361399
if !predictableResults {
362400
// Predictable results are required for the balancing method we usually try to apply. If that's not possible
363401
// we can quit early as there won't be any balancing across partitions enforced.

0 commit comments

Comments
 (0)