From 9b37e83f05594b10f26884e78a3783303533c4be Mon Sep 17 00:00:00 2001 From: guokang Date: Sun, 31 Mar 2024 10:17:38 +0800 Subject: [PATCH] [ISSUE #219] fix create topictransfer encounter Failed to list topic with output: The sub command topiclist -c -n 192.168.25.11:9876 not exist. --- .../topictransfer/topictransfer_controller.go | 87 ++++++++++--------- 1 file changed, 48 insertions(+), 39 deletions(-) diff --git a/pkg/controller/topictransfer/topictransfer_controller.go b/pkg/controller/topictransfer/topictransfer_controller.go index 5f212a47..1220c898 100644 --- a/pkg/controller/topictransfer/topictransfer_controller.go +++ b/pkg/controller/topictransfer/topictransfer_controller.go @@ -166,8 +166,8 @@ func (r *ReconcileTopicTransfer) Reconcile(ctx context.Context, request reconcil for i, consumerGroup := range consumerGroups { log.Info("Processing consumer group" + consumerGroup + " " + strconv.Itoa(i+1) + "/" + strconv.Itoa(len(consumerGroups))) addConsumerGroupToTargetClusterCommand := buildAddConsumerGroupToClusterCommand(consumerGroup, targetCluster, nameServer) - reqLogger.Info("AddConsumerGroupToTargetClusterCommand: " + addConsumerGroupToTargetClusterCommand) - cmd := exec.Command(cons.BasicCommand, cons.AdminToolDir, addConsumerGroupToTargetClusterCommand) + reqLogger.Info("AddConsumerGroupToTargetClusterCommand: " + strings.Join(addConsumerGroupToTargetClusterCommand, " ")) + cmd := exec.Command(cons.BasicCommand, addConsumerGroupToTargetClusterCommand...) output, err := cmd.Output() // validate command output if err != nil || !isUpdateConsumerGroupSuccess(string(output)) { @@ -182,8 +182,8 @@ func (r *ReconcileTopicTransfer) Reconcile(ctx context.Context, request reconcil // step2: add consumer group to target cluster status = 2 addTopicToTargetClusterCommand := buildAddTopicToClusterCommand(topic, targetCluster, nameServer) - reqLogger.Info("addTopicToTargetClusterCommand: " + addTopicToTargetClusterCommand) - cmd := exec.Command(cons.BasicCommand, cons.AdminToolDir, addTopicToTargetClusterCommand) + reqLogger.Info("addTopicToTargetClusterCommand: " + strings.Join(addTopicToTargetClusterCommand, " ")) + cmd := exec.Command(cons.BasicCommand, addTopicToTargetClusterCommand...) output, err := cmd.Output() // validate command output if err != nil || !isUpdateTopicCommandSuccess(string(output)) { @@ -197,8 +197,8 @@ func (r *ReconcileTopicTransfer) Reconcile(ctx context.Context, request reconcil // step3: stop write in source cluster topic status = 3 stopSourceClusterTopicWriteCommand := buildStopClusterTopicWriteCommand(topic, sourceCluster, nameServer) - reqLogger.Info("stopSourceClusterTopicWriteCommand: " + stopSourceClusterTopicWriteCommand) - cmd = exec.Command(cons.BasicCommand, cons.AdminToolDir, stopSourceClusterTopicWriteCommand) + reqLogger.Info("stopSourceClusterTopicWriteCommand: " + strings.Join(stopSourceClusterTopicWriteCommand, " ")) + cmd = exec.Command(cons.BasicCommand, stopSourceClusterTopicWriteCommand...) output, err = cmd.Output() // validate command output if err != nil || !isUpdateTopicCommandSuccess(string(output)) { @@ -215,8 +215,8 @@ func (r *ReconcileTopicTransfer) Reconcile(ctx context.Context, request reconcil log.Info("Processing consumer group" + consumerGroup + " " + strconv.Itoa(i+1) + "/" + strconv.Itoa(len(consumerGroups))) for { checkConsumeProgressCommand := buildCheckConsumeProgressCommand(consumerGroup, nameServer) - reqLogger.Info("checkConsumeProgressCommand: " + checkConsumeProgressCommand) - cmd = exec.Command(cons.BasicCommand, cons.AdminToolDir, checkConsumeProgressCommand) + reqLogger.Info("checkConsumeProgressCommand: " + strings.Join(checkConsumeProgressCommand, " ")) + cmd = exec.Command(cons.BasicCommand, checkConsumeProgressCommand...) output, err = cmd.Output() if err != nil || !isCheckConsumeProcessCommandSuccess(string(output)) { reqLogger.Error(err, "Failed to check consumerGroup "+consumerGroup+" with output: "+string(output)) @@ -237,8 +237,8 @@ func (r *ReconcileTopicTransfer) Reconcile(ctx context.Context, request reconcil // step5: delete topic in source cluster status = 5 deleteSourceClusterTopicCommand := buildDeleteSourceClusterTopicCommand(topic, sourceCluster, nameServer) - reqLogger.Info("deleteSourceClusterTopicCommand: " + deleteSourceClusterTopicCommand) - cmd = exec.Command(cons.BasicCommand, cons.AdminToolDir, deleteSourceClusterTopicCommand) + reqLogger.Info("deleteSourceClusterTopicCommand: " + strings.Join(deleteSourceClusterTopicCommand, " ")) + cmd = exec.Command(cons.BasicCommand, deleteSourceClusterTopicCommand...) output, err = cmd.Output() if err != nil || !isDeleteTopicCommandSuccess(string(output)) { reqLogger.Error(err, "Failed to delete Topic "+topic+" in SourceCluster "+sourceCluster+" with output: "+string(output)) @@ -253,8 +253,8 @@ func (r *ReconcileTopicTransfer) Reconcile(ctx context.Context, request reconcil for i, consumerGroup := range consumerGroups { log.Info("Processing consumer group" + consumerGroup + " " + strconv.Itoa(i+1) + "/" + strconv.Itoa(len(consumerGroups))) deleteConsumerGroupCommand := buildDeleteConsumeGroupCommand(consumerGroup, sourceCluster, nameServer) - reqLogger.Info("deleteConsumerGroupCommand: " + deleteConsumerGroupCommand) - cmd = exec.Command(cons.BasicCommand, cons.AdminToolDir, deleteConsumerGroupCommand) + reqLogger.Info("deleteConsumerGroupCommand: " + strings.Join(deleteConsumerGroupCommand, " ")) + cmd = exec.Command(cons.BasicCommand, deleteConsumerGroupCommand...) output, err = cmd.Output() if err != nil || !isDeleteConsumerGroupSuccess(string(output)) { reqLogger.Error(err, "Failed to delete consumer group "+consumerGroup+" in SourceCluster "+sourceCluster+" with output: "+string(output)) @@ -270,8 +270,8 @@ func (r *ReconcileTopicTransfer) Reconcile(ctx context.Context, request reconcil for i, consumerGroup := range consumerGroups { log.Info("Processing consumer group" + consumerGroup + " " + strconv.Itoa(i+1) + "/" + strconv.Itoa(len(consumerGroups))) createRetryTopicCommand := buildAddRetryTopicToClusterCommand(consumerGroup, targetCluster, nameServer) - reqLogger.Info("createRetryTopicCommand: " + createRetryTopicCommand) - cmd = exec.Command(cons.BasicCommand, cons.AdminToolDir, createRetryTopicCommand) + reqLogger.Info("createRetryTopicCommand: " + strings.Join(createRetryTopicCommand, " ")) + cmd = exec.Command(cons.BasicCommand, createRetryTopicCommand...) output, err = cmd.Output() if err != nil || !isUpdateTopicCommandSuccess(string(output)) { reqLogger.Error(err, "Failed to create retry topic of consumer group "+consumerGroup+" in TargetCluster "+targetCluster+" with output: "+string(output)) @@ -290,8 +290,8 @@ func (r *ReconcileTopicTransfer) Reconcile(ctx context.Context, request reconcil func undoStopWrite(topic string, cluster string, nameServer string) { addTopicToClusterCommand := buildUndoStopWriteCommand(topic, cluster, nameServer) - log.Info("undoStopWrite: " + addTopicToClusterCommand) - cmd := exec.Command(cons.BasicCommand, cons.AdminToolDir, addTopicToClusterCommand) + log.Info("undoStopWrite: " + strings.Join(addTopicToClusterCommand, " ")) + cmd := exec.Command(cons.BasicCommand, addTopicToClusterCommand...) output, err := cmd.Output() if err != nil || !isUpdateTopicCommandSuccess(string(output)) { log.Error(err, "Failed to undo stop write topic with output: "+string(output)) @@ -301,8 +301,8 @@ func undoStopWrite(topic string, cluster string, nameServer string) { func undoDeleteTopic(topic string, cluster string, nameServer string) { addTopicToClusterCommand := buildAddTopicToClusterCommand(topic, cluster, nameServer) - log.Info("undoDeleteTopic: " + addTopicToClusterCommand) - cmd := exec.Command(cons.BasicCommand, cons.AdminToolDir, addTopicToClusterCommand) + log.Info("undoDeleteTopic: " + strings.Join(addTopicToClusterCommand, " ")) + cmd := exec.Command(cons.BasicCommand, addTopicToClusterCommand...) output, err := cmd.Output() if err != nil || !isUpdateTopicCommandSuccess(string(output)) { log.Error(err, "Failed to undo delete topic with output: "+string(output)) @@ -313,8 +313,8 @@ func undoDeleteTopic(topic string, cluster string, nameServer string) { func undoDeleteConsumeGroup(consumerGroups []string, cluster string, nameServer string) { for _, consumerGroup := range consumerGroups { addConsumerGroupToTargetClusterCommand := buildAddConsumerGroupToClusterCommand(consumerGroup, cluster, nameServer) - log.Info("undoDeleteConsumeGroup: " + addConsumerGroupToTargetClusterCommand) - cmd := exec.Command(cons.BasicCommand, cons.AdminToolDir, addConsumerGroupToTargetClusterCommand) + log.Info("undoDeleteConsumeGroup: " + strings.Join(addConsumerGroupToTargetClusterCommand, " ")) + cmd := exec.Command(cons.BasicCommand, addConsumerGroupToTargetClusterCommand...) output, err := cmd.Output() if err != nil || !isUpdateConsumerGroupSuccess(string(output)) { log.Error(err, "Failed to undo delete consume group with output: "+string(output)) @@ -326,7 +326,7 @@ func undoDeleteConsumeGroup(consumerGroups []string, cluster string, nameServer func getConsumerGroupByTopic(topic string, nameServer string) []string { var consumerGroups []string topicListCmd := buildTopicListCommand(nameServer) - cmd := exec.Command(cons.BasicCommand, cons.AdminToolDir, topicListCmd) + cmd := exec.Command(cons.BasicCommand, topicListCmd...) output, err := cmd.Output() if err != nil || !isTopicListSuccess(string(output)) { log.Error(err, "Failed to list topic with output: "+string(output)) @@ -371,8 +371,9 @@ func isUpdateConsumerGroupSuccess(s string) bool { return strings.Contains(s, "groupName") } -func buildUndoStopWriteCommand(topic string, cluster string, nameServer string) string { +func buildUndoStopWriteCommand(topic string, cluster string, nameServer string) []string { cmdOpts := []string{ + cons.AdminToolDir, "updatetopic", "-t", topic, @@ -387,21 +388,23 @@ func buildUndoStopWriteCommand(topic string, cluster string, nameServer string) "-n", nameServer, } - return strings.Join(cmdOpts, " ") + return cmdOpts } -func buildTopicListCommand(nameServer string) string { +func buildTopicListCommand(nameServer string) []string { cmdOpts := []string{ + cons.AdminToolDir, "topiclist", "-c", "-n", nameServer, } - return strings.Join(cmdOpts, " ") + return cmdOpts } -func buildAddRetryTopicToClusterCommand(consumerGroup string, cluster string, nameServer string) string { +func buildAddRetryTopicToClusterCommand(consumerGroup string, cluster string, nameServer string) []string { cmdOpts := []string{ + cons.AdminToolDir, "updatetopic", "-t", "%RETRY%" + consumerGroup, @@ -416,7 +419,7 @@ func buildAddRetryTopicToClusterCommand(consumerGroup string, cluster string, na "-n", nameServer, } - return strings.Join(cmdOpts, " ") + return cmdOpts } func getClusterBrokerNames(cluster string) []string { @@ -446,8 +449,9 @@ func isConsumeFinished(output string, topic string, cluster string) bool { return true } -func buildDeleteConsumeGroupCommand(consumerGroup string, cluster string, nameServer string) string { +func buildDeleteConsumeGroupCommand(consumerGroup string, cluster string, nameServer string) []string { cmdOpts := []string{ + cons.AdminToolDir, "deleteSubGroup", "-g", consumerGroup, @@ -456,11 +460,12 @@ func buildDeleteConsumeGroupCommand(consumerGroup string, cluster string, nameSe "-n", nameServer, } - return strings.Join(cmdOpts, " ") + return cmdOpts } -func buildDeleteSourceClusterTopicCommand(topic string, sourceCluster string, nameServer string) string { +func buildDeleteSourceClusterTopicCommand(topic string, sourceCluster string, nameServer string) []string { cmdOpts := []string{ + cons.AdminToolDir, "deletetopic", "-t", topic, @@ -469,22 +474,24 @@ func buildDeleteSourceClusterTopicCommand(topic string, sourceCluster string, na "-n", nameServer, } - return strings.Join(cmdOpts, " ") + return cmdOpts } -func buildCheckConsumeProgressCommand(consumerGroup string, nameServer string) string { +func buildCheckConsumeProgressCommand(consumerGroup string, nameServer string) []string { cmdOpts := []string{ + cons.AdminToolDir, "consumerprogress", "-g", consumerGroup, "-n", nameServer, } - return strings.Join(cmdOpts, " ") + return cmdOpts } -func buildStopClusterTopicWriteCommand(topic string, cluster string, nameServer string) string { +func buildStopClusterTopicWriteCommand(topic string, cluster string, nameServer string) []string { cmdOpts := []string{ + cons.AdminToolDir, "updatetopic", "-t", topic, @@ -497,11 +504,12 @@ func buildStopClusterTopicWriteCommand(topic string, cluster string, nameServer "-n", nameServer, } - return strings.Join(cmdOpts, " ") + return cmdOpts } -func buildAddConsumerGroupToClusterCommand(consumerGroup string, cluster string, nameServer string) string { +func buildAddConsumerGroupToClusterCommand(consumerGroup string, cluster string, nameServer string) []string { cmdOpts := []string{ + cons.AdminToolDir, "updatesubgroup", "-g", consumerGroup, @@ -514,11 +522,12 @@ func buildAddConsumerGroupToClusterCommand(consumerGroup string, cluster string, "-n", nameServer, } - return strings.Join(cmdOpts, " ") + return cmdOpts } -func buildAddTopicToClusterCommand(topic string, cluster string, nameServer string) string { +func buildAddTopicToClusterCommand(topic string, cluster string, nameServer string) []string { cmdOpts := []string{ + cons.AdminToolDir, "updatetopic", "-t", topic, @@ -527,5 +536,5 @@ func buildAddTopicToClusterCommand(topic string, cluster string, nameServer stri "-n", nameServer, } - return strings.Join(cmdOpts, " ") + return cmdOpts }