Skip to content

Commit 1441aa8

Browse files
author
Logan Seeley
committed
Add topic get/set replication clusters commands
1 parent d9dd3db commit 1441aa8

File tree

4 files changed

+299
-0
lines changed

4 files changed

+299
-0
lines changed
Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
package topic
19+
20+
import (
21+
"github.com/apache/pulsar-client-go/pulsaradmin/pkg/utils"
22+
23+
"github.com/streamnative/pulsarctl/pkg/cmdutils"
24+
)
25+
26+
func GetReplicationClustersCmd(vc *cmdutils.VerbCmd) {
27+
desc := cmdutils.LongDescription{}
28+
desc.CommandUsedFor = "Get the replication clusters for a topic"
29+
desc.CommandPermission = "This command requires tenant admin permissions."
30+
31+
var examples []cmdutils.Example
32+
getClusters := cmdutils.Example{
33+
Desc: "Get the replication clusters for a topic",
34+
Command: "pulsarctl topics get-replication-clusters persistent://tenant/namespace/topic",
35+
}
36+
examples = append(examples, getClusters)
37+
desc.CommandExamples = examples
38+
39+
var out []cmdutils.Output
40+
successOut := cmdutils.Output{
41+
Desc: "normal output",
42+
Out: "[\"cluster1\", \"cluster2\"]",
43+
}
44+
45+
out = append(out, successOut, ArgError)
46+
out = append(out, TopicNameErrors...)
47+
out = append(out, NamespaceErrors...)
48+
desc.CommandOutput = out
49+
50+
vc.SetDescription(
51+
"get-replication-clusters",
52+
"Get the replication clusters for a topic",
53+
desc.ToString(),
54+
desc.ExampleToString(),
55+
"get-replication-clusters",
56+
)
57+
58+
vc.SetRunFuncWithNameArg(func() error {
59+
return doGetReplicationClusters(vc)
60+
}, "the topic name is not specified or the topic name is specified more than one")
61+
62+
vc.EnableOutputFlagSet()
63+
}
64+
65+
func doGetReplicationClusters(vc *cmdutils.VerbCmd) error {
66+
// for testing
67+
if vc.NameError != nil {
68+
return vc.NameError
69+
}
70+
71+
topic, err := utils.GetTopicName(vc.NameArg)
72+
if err != nil {
73+
return err
74+
}
75+
76+
admin := cmdutils.NewPulsarClient()
77+
clusters, err := admin.Topics().GetReplicationClusters(*topic)
78+
if err == nil {
79+
cmdutils.PrintJSON(vc.Command.OutOrStdout(), &clusters)
80+
}
81+
return err
82+
}
Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,107 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
package topic
19+
20+
import (
21+
"encoding/json"
22+
"fmt"
23+
"strings"
24+
"testing"
25+
26+
"github.com/streamnative/pulsarctl/pkg/ctl/cluster"
27+
"github.com/streamnative/pulsarctl/pkg/ctl/tenant"
28+
"github.com/streamnative/pulsarctl/pkg/test"
29+
30+
"github.com/stretchr/testify/assert"
31+
)
32+
33+
func TestReplicationClustersCmd(t *testing.T) {
34+
topicName := fmt.Sprintf("persistent://public/default/test-replication-clusters-topic-%s", test.RandomSuffix())
35+
36+
// Create the topic first
37+
args := []string{"create", topicName, "0"}
38+
_, execErr, _, err := TestTopicCommands(CreateTopicCmd, args)
39+
assert.Nil(t, err)
40+
assert.Nil(t, execErr)
41+
42+
// Create a test cluster for replication
43+
clusterArgs := []string{"create", "test-replication-cluster", "--url", "http://192.168.12.11:8080"}
44+
_, execErr, _, err = cluster.TestClusterCommands(cluster.CreateClusterCmd, clusterArgs)
45+
assert.Nil(t, err)
46+
assert.Nil(t, execErr)
47+
48+
// Update tenant to allow the new cluster
49+
updateTenantArgs := []string{"update", "--allowed-clusters", "test-replication-cluster",
50+
"--allowed-clusters", "standalone", "public"}
51+
_, execErr, _, err = tenant.TestTenantCommands(tenant.UpdateTenantCmd, updateTenantArgs)
52+
assert.Nil(t, err)
53+
assert.Nil(t, execErr)
54+
55+
// Set replication clusters for the topic
56+
setArgs := []string{"set-replication-clusters", topicName, "--clusters", "test-replication-cluster"}
57+
setOut, execErr, _, _ := TestTopicCommands(SetReplicationClustersCmd, setArgs)
58+
assert.Nil(t, execErr)
59+
assert.True(t, strings.Contains(setOut.String(), "Set replication clusters successfully"))
60+
61+
// Get replication clusters for the topic
62+
getArgs := []string{"get-replication-clusters", topicName}
63+
getOut, execErr, _, _ := TestTopicCommands(GetReplicationClustersCmd, getArgs)
64+
assert.Nil(t, execErr)
65+
66+
var clusters []string
67+
err = json.Unmarshal(getOut.Bytes(), &clusters)
68+
assert.Nil(t, err)
69+
assert.Contains(t, clusters, "test-replication-cluster")
70+
71+
// Reset tenant clusters for other test cases
72+
updateTenantArgs = []string{"update", "--allowed-clusters", "standalone", "public"}
73+
_, execErr, _, err = tenant.TestTenantCommands(tenant.UpdateTenantCmd, updateTenantArgs)
74+
assert.Nil(t, err)
75+
assert.Nil(t, execErr)
76+
}
77+
78+
func TestSetReplicationClustersArgError(t *testing.T) {
79+
// Test with no topic name
80+
args := []string{"set-replication-clusters", "--clusters", "standalone"}
81+
_, _, nameErr, _ := TestTopicCommands(SetReplicationClustersCmd, args)
82+
assert.NotNil(t, nameErr)
83+
assert.Equal(t, "the topic name is not specified or the topic name is specified more than one", nameErr.Error())
84+
}
85+
86+
func TestGetReplicationClustersArgError(t *testing.T) {
87+
// Test with no topic name
88+
args := []string{"get-replication-clusters"}
89+
_, _, nameErr, _ := TestTopicCommands(GetReplicationClustersCmd, args)
90+
assert.NotNil(t, nameErr)
91+
assert.Equal(t, "the topic name is not specified or the topic name is specified more than one", nameErr.Error())
92+
}
93+
94+
func TestSetReplicationClustersInvalidCluster(t *testing.T) {
95+
topicName := fmt.Sprintf("persistent://public/default/test-invalid-cluster-topic-%s", test.RandomSuffix())
96+
97+
// Create the topic first
98+
args := []string{"create", topicName, "0"}
99+
_, execErr, _, err := TestTopicCommands(CreateTopicCmd, args)
100+
assert.Nil(t, err)
101+
assert.Nil(t, execErr)
102+
103+
// Try to set an invalid cluster
104+
setArgs := []string{"set-replication-clusters", topicName, "--clusters", "invalid-cluster"}
105+
_, execErr, _, _ = TestTopicCommands(SetReplicationClustersCmd, setArgs)
106+
assert.NotNil(t, execErr)
107+
}
Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,108 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
package topic
19+
20+
import (
21+
"strings"
22+
23+
"github.com/apache/pulsar-client-go/pulsaradmin/pkg/utils"
24+
"github.com/spf13/cobra"
25+
"github.com/spf13/pflag"
26+
27+
"github.com/streamnative/pulsarctl/pkg/cmdutils"
28+
)
29+
30+
func SetReplicationClustersCmd(vc *cmdutils.VerbCmd) {
31+
desc := cmdutils.LongDescription{}
32+
desc.CommandUsedFor = "Set the replication clusters for a topic"
33+
desc.CommandPermission = "This command requires tenant admin permissions."
34+
35+
var examples []cmdutils.Example
36+
setClusters := cmdutils.Example{
37+
Desc: "Set the replication clusters for a topic",
38+
Command: "pulsarctl topics set-replication-clusters persistent://tenant/namespace/topic --clusters cluster1,cluster2",
39+
}
40+
41+
examples = append(examples, setClusters)
42+
desc.CommandExamples = examples
43+
44+
var out []cmdutils.Output
45+
successOut := cmdutils.Output{
46+
Desc: "normal output",
47+
Out: "Set replication clusters successfully for [topic]",
48+
}
49+
50+
out = append(out, successOut, ArgError)
51+
out = append(out, TopicNameErrors...)
52+
out = append(out, NamespaceErrors...)
53+
54+
invalidClustersName := cmdutils.Output{
55+
Desc: "Invalid cluster name, please check if your cluster name has the appropriate " +
56+
"permissions under the current tenant",
57+
Out: "[✖] code: 403 reason: Invalid cluster id: <cluster-name>",
58+
}
59+
60+
out = append(out, invalidClustersName, TopicLevelPolicyNotEnabledError)
61+
desc.CommandOutput = out
62+
63+
vc.SetDescription(
64+
"set-replication-clusters",
65+
"Set the replication clusters for a topic",
66+
desc.ToString(),
67+
desc.ExampleToString(),
68+
"set-replication-clusters",
69+
)
70+
71+
var clusterIDs string
72+
73+
vc.SetRunFuncWithNameArg(func() error {
74+
return doSetReplicationClusters(vc, clusterIDs)
75+
}, "the topic name is not specified or the topic name is specified more than one")
76+
77+
vc.FlagSetGroup.InFlagSet("ReplicationClusters", func(flagSet *pflag.FlagSet) {
78+
flagSet.StringVarP(
79+
&clusterIDs,
80+
"clusters",
81+
"c",
82+
"",
83+
"Replication Cluster Ids list (comma separated values)")
84+
85+
_ = cobra.MarkFlagRequired(flagSet, "clusters")
86+
})
87+
vc.EnableOutputFlagSet()
88+
}
89+
90+
func doSetReplicationClusters(vc *cmdutils.VerbCmd, clusterIDs string) error {
91+
// for testing
92+
if vc.NameError != nil {
93+
return vc.NameError
94+
}
95+
96+
topic, err := utils.GetTopicName(vc.NameArg)
97+
if err != nil {
98+
return err
99+
}
100+
101+
admin := cmdutils.NewPulsarClient()
102+
clusters := strings.Split(clusterIDs, ",")
103+
err = admin.Topics().SetReplicationClusters(*topic, clusters)
104+
if err == nil {
105+
vc.Command.Printf("Set replication clusters successfully for [%s]\n", topic.String())
106+
}
107+
return err
108+
}

pkg/ctl/topic/topic.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,8 @@ func Command(flagGrouping *cmdutils.FlagGrouping) *cobra.Command {
9595
RemoveInactiveTopicCmd,
9696
SetDispatchRateCmd,
9797
RemoveDispatchRateCmd,
98+
GetReplicationClustersCmd,
99+
SetReplicationClustersCmd,
98100
}
99101

100102
cmdutils.AddVerbCmds(flagGrouping, resourceCmd, commands...)

0 commit comments

Comments
 (0)