Skip to content

Commit 03eabd2

Browse files
authored
Merge branch 'main' into neng/deployment-annotations
2 parents c9d2b18 + 94db423 commit 03eabd2

File tree

9 files changed

+546
-15
lines changed

9 files changed

+546
-15
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,3 +52,4 @@ node_modules/
5252

5353
.cursor
5454
.envrc
55+
mise.toml

pkg/admin/dummy.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -236,6 +236,16 @@ func (d *DummyPulsarAdmin) GetSchema(string) (*v1alpha1.SchemaInfo, error) {
236236
return nil, nil
237237
}
238238

239+
// GetSchemaInfoWithVersion is a fake implements of GetSchemaInfoWithVersion
240+
func (d *DummyPulsarAdmin) GetSchemaInfoWithVersion(string) (*v1alpha1.SchemaInfo, int64, error) {
241+
return nil, 0, nil
242+
}
243+
244+
// GetSchemaVersionBySchemaInfo is a fake implements of GetSchemaVersionBySchemaInfo
245+
func (d *DummyPulsarAdmin) GetSchemaVersionBySchemaInfo(string, *v1alpha1.SchemaInfo) (int64, error) {
246+
return 0, nil
247+
}
248+
239249
// UploadSchema is a fake implements of UploadSchema
240250
func (d *DummyPulsarAdmin) UploadSchema(string, *SchemaParams) error {
241251
return nil

pkg/admin/errors.go

Lines changed: 34 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ package admin
1717
import (
1818
"errors"
1919
"net"
20+
"strings"
2021

2122
"github.com/apache/pulsar-client-go/pulsaradmin/pkg/rest"
2223
)
@@ -51,12 +52,22 @@ const (
5152

5253
// ErrorReason returns the HTTP status code for the error
5354
func ErrorReason(err error) Reason {
54-
cliError, ok := err.(rest.Error)
55-
if !ok {
56-
// can't determine error reason as can't convert to a cli error
55+
if err == nil {
5756
return ReasonUnknown
5857
}
59-
return Reason(cliError.Code)
58+
59+
var restErrPtr *rest.Error
60+
if errors.As(err, &restErrPtr) && restErrPtr != nil {
61+
return Reason(restErrPtr.Code)
62+
}
63+
64+
var restErr rest.Error
65+
if errors.As(err, &restErr) {
66+
return Reason(restErr.Code)
67+
}
68+
69+
// can't determine error reason as can't convert to a cli error
70+
return ReasonUnknown
6071
}
6172

6273
// IsNotFound returns true if the error indicates the resource is not found on server
@@ -66,7 +77,25 @@ func IsNotFound(err error) bool {
6677

6778
// IsAlreadyExist returns true if the error indicates the resource already exist
6879
func IsAlreadyExist(err error) bool {
69-
return ErrorReason(err) == ReasonAlreadyExist
80+
if err == nil {
81+
return false
82+
}
83+
84+
reason := ErrorReason(err)
85+
if reason == ReasonAlreadyExist {
86+
return true
87+
}
88+
if reason == ReasonInvalidParameter {
89+
return isAlreadyExistsMessage(err)
90+
}
91+
if reason != ReasonUnknown {
92+
return false
93+
}
94+
return isAlreadyExistsMessage(err)
95+
}
96+
97+
func isAlreadyExistsMessage(err error) bool {
98+
return strings.Contains(strings.ToLower(err.Error()), "already exist")
7099
}
71100

72101
// IsInternalServerError returns true if the error indicates the resource already exist

pkg/admin/errors_test.go

Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
// Copyright 2026 StreamNative
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
// Copyright 2026 StreamNative
16+
//
17+
// Licensed under the Apache License, Version 2.0 (the "License");
18+
// you may not use this file except in compliance with the License.
19+
// You may obtain a copy of the License at
20+
//
21+
// http://www.apache.org/licenses/LICENSE-2.0
22+
//
23+
// Unless required by applicable law or agreed to in writing, software
24+
// distributed under the License is distributed on an "AS IS" BASIS,
25+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
26+
// See the License for the specific language governing permissions and
27+
// limitations under the License.
28+
29+
package admin
30+
31+
import (
32+
"errors"
33+
"fmt"
34+
"testing"
35+
36+
"github.com/apache/pulsar-client-go/pulsaradmin/pkg/rest"
37+
)
38+
39+
func TestIsAlreadyExist(t *testing.T) {
40+
tests := []struct {
41+
name string
42+
err error
43+
expected bool
44+
}{
45+
{
46+
name: "rest.Error with 409",
47+
err: rest.Error{Code: 409, Reason: "already exists"},
48+
expected: true,
49+
},
50+
{
51+
name: "rest.Error pointer with 409",
52+
err: &rest.Error{Code: 409, Reason: "already exists"},
53+
expected: true,
54+
},
55+
{
56+
name: "wrapped rest.Error with 409",
57+
err: fmt.Errorf("wrapped: %w", rest.Error{Code: 409, Reason: "already exists"}),
58+
expected: true,
59+
},
60+
{
61+
name: "rest.Error with 412 and already exists reason",
62+
err: rest.Error{Code: 412, Reason: "This topic already exists"},
63+
expected: true,
64+
},
65+
{
66+
name: "wrapped error with already exists message",
67+
err: fmt.Errorf("wrapped: %w", errors.New("This topic already exists")),
68+
expected: true,
69+
},
70+
{
71+
name: "plain error with already exists message",
72+
err: errors.New("This topic already exists"),
73+
expected: true,
74+
},
75+
{
76+
name: "unrelated error",
77+
err: errors.New("permission denied"),
78+
expected: false,
79+
},
80+
}
81+
82+
for _, tt := range tests {
83+
t.Run(tt.name, func(t *testing.T) {
84+
result := IsAlreadyExist(tt.err)
85+
if result != tt.expected {
86+
t.Fatalf("IsAlreadyExist() = %v, want %v", result, tt.expected)
87+
}
88+
})
89+
}
90+
}

pkg/admin/impl.go

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1689,6 +1689,39 @@ func (p *PulsarAdminClient) GetSchema(topic string) (*v1alpha1.SchemaInfo, error
16891689
return rsp, nil
16901690
}
16911691

1692+
// GetSchemaInfoWithVersion get schema info with version for a given topic
1693+
func (p *PulsarAdminClient) GetSchemaInfoWithVersion(topic string) (*v1alpha1.SchemaInfo, int64, error) {
1694+
info, err := p.adminClient.Schemas().GetSchemaInfoWithVersion(topic)
1695+
if err != nil {
1696+
return nil, 0, err
1697+
}
1698+
if info == nil {
1699+
return nil, 0, errors.New("schema info is empty")
1700+
}
1701+
if info.SchemaInfo == nil {
1702+
return nil, info.Version, nil
1703+
}
1704+
rsp := &v1alpha1.SchemaInfo{
1705+
Type: info.SchemaInfo.Type,
1706+
Schema: string(info.SchemaInfo.Schema),
1707+
Properties: info.SchemaInfo.Properties,
1708+
}
1709+
return rsp, info.Version, nil
1710+
}
1711+
1712+
// GetSchemaVersionBySchemaInfo gets schema version for a given schema payload
1713+
func (p *PulsarAdminClient) GetSchemaVersionBySchemaInfo(topic string, info *v1alpha1.SchemaInfo) (int64, error) {
1714+
if info == nil {
1715+
return 0, errors.New("schema info is nil")
1716+
}
1717+
payload := utils.SchemaInfo{
1718+
Type: info.Type,
1719+
Schema: []byte(info.Schema),
1720+
Properties: info.Properties,
1721+
}
1722+
return p.adminClient.Schemas().GetVersionBySchemaInfo(topic, payload)
1723+
}
1724+
16921725
// UploadSchema creates or updates a schema for a given topic
16931726
func (p *PulsarAdminClient) UploadSchema(topic string, params *SchemaParams) error {
16941727
var payload utils.PostSchemaPayload

pkg/admin/interface.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -265,6 +265,12 @@ type PulsarAdmin interface {
265265
// GetSchema retrieves the latest schema of a topic
266266
GetSchema(topic string) (*v1alpha1.SchemaInfo, error)
267267

268+
// GetSchemaInfoWithVersion retrieves the latest schema and its version for a topic
269+
GetSchemaInfoWithVersion(topic string) (*v1alpha1.SchemaInfo, int64, error)
270+
271+
// GetSchemaVersionBySchemaInfo retrieves the version for a given schema payload
272+
GetSchemaVersionBySchemaInfo(topic string, info *v1alpha1.SchemaInfo) (int64, error)
273+
268274
// UploadSchema creates or updates a schema for a given topic
269275
UploadSchema(topic string, params *SchemaParams) error
270276

pkg/connection/reconcile_topic.go

Lines changed: 29 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@ package connection
1717
import (
1818
"context"
1919
"fmt"
20-
"reflect"
2120
"slices"
2221

2322
"github.com/apache/pulsar-client-go/pulsaradmin/pkg/utils"
@@ -287,23 +286,43 @@ func (r *PulsarTopicReconciler) ReconcileTopic(ctx context.Context, pulsarAdmin
287286
}
288287

289288
func applySchema(pulsarAdmin admin.PulsarAdmin, topic *resourcev1alpha1.PulsarTopic, log logr.Logger) error {
290-
schema, serr := pulsarAdmin.GetSchema(topic.Spec.Name)
291-
if serr != nil && !admin.IsNotFound(serr) {
292-
return serr
293-
}
294289
if topic.Spec.SchemaInfo != nil {
295-
// Only upload the schema when schema doesn't exist or the schema has been updated
296-
if admin.IsNotFound(serr) || !reflect.DeepEqual(topic.Spec.SchemaInfo, schema) {
290+
uploadSchema := func(currentVersion int64, desiredVersion int64) error {
297291
info := topic.Spec.SchemaInfo
298292
param := &admin.SchemaParams{
299293
Type: info.Type,
300294
Schema: info.Schema,
301295
Properties: info.Properties,
302296
}
303-
log.Info("Upload schema for the topic", "name", topic.Spec.Name, "type", info.Type, "schema", info.Schema, "properties", info.Properties)
304-
if err := pulsarAdmin.UploadSchema(topic.Spec.Name, param); err != nil {
305-
return err
297+
log.Info("Upload schema for the topic",
298+
"name", topic.Spec.Name,
299+
"type", info.Type,
300+
"schema", info.Schema,
301+
"properties", info.Properties,
302+
"currentVersion", currentVersion,
303+
"desiredVersion", desiredVersion)
304+
return pulsarAdmin.UploadSchema(topic.Spec.Name, param)
305+
}
306+
307+
_, currentVersion, serr := pulsarAdmin.GetSchemaInfoWithVersion(topic.Spec.Name)
308+
if serr != nil {
309+
if admin.IsNotFound(serr) {
310+
return uploadSchema(currentVersion, -1)
306311
}
312+
return serr
313+
}
314+
315+
desiredVersion, verr := pulsarAdmin.GetSchemaVersionBySchemaInfo(topic.Spec.Name, topic.Spec.SchemaInfo)
316+
if verr != nil {
317+
if admin.IsNotFound(verr) {
318+
return uploadSchema(currentVersion, -1)
319+
}
320+
return verr
321+
}
322+
323+
// Only upload the schema when schema doesn't exist or the schema has been updated
324+
if desiredVersion < 0 || desiredVersion != currentVersion {
325+
return uploadSchema(currentVersion, desiredVersion)
307326
}
308327
}
309328
// Note: We intentionally do NOT delete existing schemas when schemaInfo is not specified.

0 commit comments

Comments
 (0)