Skip to content

Commit

Permalink
[ISSUE #219] fix create topictransfer encounter Failed to list topic …
Browse files Browse the repository at this point in the history
…with output: The sub command topiclist -c -n 192.168.25.11:9876 not exist.
  • Loading branch information
victorgkang committed Apr 8, 2024
1 parent 9752d8c commit 9b37e83
Showing 1 changed file with 48 additions and 39 deletions.
87 changes: 48 additions & 39 deletions pkg/controller/topictransfer/topictransfer_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,8 +166,8 @@ func (r *ReconcileTopicTransfer) Reconcile(ctx context.Context, request reconcil
for i, consumerGroup := range consumerGroups {
log.Info("Processing consumer group" + consumerGroup + " " + strconv.Itoa(i+1) + "/" + strconv.Itoa(len(consumerGroups)))
addConsumerGroupToTargetClusterCommand := buildAddConsumerGroupToClusterCommand(consumerGroup, targetCluster, nameServer)
reqLogger.Info("AddConsumerGroupToTargetClusterCommand: " + addConsumerGroupToTargetClusterCommand)
cmd := exec.Command(cons.BasicCommand, cons.AdminToolDir, addConsumerGroupToTargetClusterCommand)
reqLogger.Info("AddConsumerGroupToTargetClusterCommand: " + strings.Join(addConsumerGroupToTargetClusterCommand, " "))
cmd := exec.Command(cons.BasicCommand, addConsumerGroupToTargetClusterCommand...)
output, err := cmd.Output()
// validate command output
if err != nil || !isUpdateConsumerGroupSuccess(string(output)) {
Expand All @@ -182,8 +182,8 @@ func (r *ReconcileTopicTransfer) Reconcile(ctx context.Context, request reconcil
// step2: add consumer group to target cluster
status = 2
addTopicToTargetClusterCommand := buildAddTopicToClusterCommand(topic, targetCluster, nameServer)
reqLogger.Info("addTopicToTargetClusterCommand: " + addTopicToTargetClusterCommand)
cmd := exec.Command(cons.BasicCommand, cons.AdminToolDir, addTopicToTargetClusterCommand)
reqLogger.Info("addTopicToTargetClusterCommand: " + strings.Join(addTopicToTargetClusterCommand, " "))
cmd := exec.Command(cons.BasicCommand, addTopicToTargetClusterCommand...)
output, err := cmd.Output()
// validate command output
if err != nil || !isUpdateTopicCommandSuccess(string(output)) {
Expand All @@ -197,8 +197,8 @@ func (r *ReconcileTopicTransfer) Reconcile(ctx context.Context, request reconcil
// step3: stop write in source cluster topic
status = 3
stopSourceClusterTopicWriteCommand := buildStopClusterTopicWriteCommand(topic, sourceCluster, nameServer)
reqLogger.Info("stopSourceClusterTopicWriteCommand: " + stopSourceClusterTopicWriteCommand)
cmd = exec.Command(cons.BasicCommand, cons.AdminToolDir, stopSourceClusterTopicWriteCommand)
reqLogger.Info("stopSourceClusterTopicWriteCommand: " + strings.Join(stopSourceClusterTopicWriteCommand, " "))
cmd = exec.Command(cons.BasicCommand, stopSourceClusterTopicWriteCommand...)
output, err = cmd.Output()
// validate command output
if err != nil || !isUpdateTopicCommandSuccess(string(output)) {
Expand All @@ -215,8 +215,8 @@ func (r *ReconcileTopicTransfer) Reconcile(ctx context.Context, request reconcil
log.Info("Processing consumer group" + consumerGroup + " " + strconv.Itoa(i+1) + "/" + strconv.Itoa(len(consumerGroups)))
for {
checkConsumeProgressCommand := buildCheckConsumeProgressCommand(consumerGroup, nameServer)
reqLogger.Info("checkConsumeProgressCommand: " + checkConsumeProgressCommand)
cmd = exec.Command(cons.BasicCommand, cons.AdminToolDir, checkConsumeProgressCommand)
reqLogger.Info("checkConsumeProgressCommand: " + strings.Join(checkConsumeProgressCommand, " "))
cmd = exec.Command(cons.BasicCommand, checkConsumeProgressCommand...)
output, err = cmd.Output()
if err != nil || !isCheckConsumeProcessCommandSuccess(string(output)) {
reqLogger.Error(err, "Failed to check consumerGroup "+consumerGroup+" with output: "+string(output))
Expand All @@ -237,8 +237,8 @@ func (r *ReconcileTopicTransfer) Reconcile(ctx context.Context, request reconcil
// step5: delete topic in source cluster
status = 5
deleteSourceClusterTopicCommand := buildDeleteSourceClusterTopicCommand(topic, sourceCluster, nameServer)
reqLogger.Info("deleteSourceClusterTopicCommand: " + deleteSourceClusterTopicCommand)
cmd = exec.Command(cons.BasicCommand, cons.AdminToolDir, deleteSourceClusterTopicCommand)
reqLogger.Info("deleteSourceClusterTopicCommand: " + strings.Join(deleteSourceClusterTopicCommand, " "))
cmd = exec.Command(cons.BasicCommand, deleteSourceClusterTopicCommand...)
output, err = cmd.Output()
if err != nil || !isDeleteTopicCommandSuccess(string(output)) {
reqLogger.Error(err, "Failed to delete Topic "+topic+" in SourceCluster "+sourceCluster+" with output: "+string(output))
Expand All @@ -253,8 +253,8 @@ func (r *ReconcileTopicTransfer) Reconcile(ctx context.Context, request reconcil
for i, consumerGroup := range consumerGroups {
log.Info("Processing consumer group" + consumerGroup + " " + strconv.Itoa(i+1) + "/" + strconv.Itoa(len(consumerGroups)))
deleteConsumerGroupCommand := buildDeleteConsumeGroupCommand(consumerGroup, sourceCluster, nameServer)
reqLogger.Info("deleteConsumerGroupCommand: " + deleteConsumerGroupCommand)
cmd = exec.Command(cons.BasicCommand, cons.AdminToolDir, deleteConsumerGroupCommand)
reqLogger.Info("deleteConsumerGroupCommand: " + strings.Join(deleteConsumerGroupCommand, " "))
cmd = exec.Command(cons.BasicCommand, deleteConsumerGroupCommand...)
output, err = cmd.Output()
if err != nil || !isDeleteConsumerGroupSuccess(string(output)) {
reqLogger.Error(err, "Failed to delete consumer group "+consumerGroup+" in SourceCluster "+sourceCluster+" with output: "+string(output))
Expand All @@ -270,8 +270,8 @@ func (r *ReconcileTopicTransfer) Reconcile(ctx context.Context, request reconcil
for i, consumerGroup := range consumerGroups {
log.Info("Processing consumer group" + consumerGroup + " " + strconv.Itoa(i+1) + "/" + strconv.Itoa(len(consumerGroups)))
createRetryTopicCommand := buildAddRetryTopicToClusterCommand(consumerGroup, targetCluster, nameServer)
reqLogger.Info("createRetryTopicCommand: " + createRetryTopicCommand)
cmd = exec.Command(cons.BasicCommand, cons.AdminToolDir, createRetryTopicCommand)
reqLogger.Info("createRetryTopicCommand: " + strings.Join(createRetryTopicCommand, " "))
cmd = exec.Command(cons.BasicCommand, createRetryTopicCommand...)
output, err = cmd.Output()
if err != nil || !isUpdateTopicCommandSuccess(string(output)) {
reqLogger.Error(err, "Failed to create retry topic of consumer group "+consumerGroup+" in TargetCluster "+targetCluster+" with output: "+string(output))
Expand All @@ -290,8 +290,8 @@ func (r *ReconcileTopicTransfer) Reconcile(ctx context.Context, request reconcil

func undoStopWrite(topic string, cluster string, nameServer string) {
addTopicToClusterCommand := buildUndoStopWriteCommand(topic, cluster, nameServer)
log.Info("undoStopWrite: " + addTopicToClusterCommand)
cmd := exec.Command(cons.BasicCommand, cons.AdminToolDir, addTopicToClusterCommand)
log.Info("undoStopWrite: " + strings.Join(addTopicToClusterCommand, " "))
cmd := exec.Command(cons.BasicCommand, addTopicToClusterCommand...)
output, err := cmd.Output()
if err != nil || !isUpdateTopicCommandSuccess(string(output)) {
log.Error(err, "Failed to undo stop write topic with output: "+string(output))
Expand All @@ -301,8 +301,8 @@ func undoStopWrite(topic string, cluster string, nameServer string) {

func undoDeleteTopic(topic string, cluster string, nameServer string) {
addTopicToClusterCommand := buildAddTopicToClusterCommand(topic, cluster, nameServer)
log.Info("undoDeleteTopic: " + addTopicToClusterCommand)
cmd := exec.Command(cons.BasicCommand, cons.AdminToolDir, addTopicToClusterCommand)
log.Info("undoDeleteTopic: " + strings.Join(addTopicToClusterCommand, " "))
cmd := exec.Command(cons.BasicCommand, addTopicToClusterCommand...)
output, err := cmd.Output()
if err != nil || !isUpdateTopicCommandSuccess(string(output)) {
log.Error(err, "Failed to undo delete topic with output: "+string(output))
Expand All @@ -313,8 +313,8 @@ func undoDeleteTopic(topic string, cluster string, nameServer string) {
func undoDeleteConsumeGroup(consumerGroups []string, cluster string, nameServer string) {
for _, consumerGroup := range consumerGroups {
addConsumerGroupToTargetClusterCommand := buildAddConsumerGroupToClusterCommand(consumerGroup, cluster, nameServer)
log.Info("undoDeleteConsumeGroup: " + addConsumerGroupToTargetClusterCommand)
cmd := exec.Command(cons.BasicCommand, cons.AdminToolDir, addConsumerGroupToTargetClusterCommand)
log.Info("undoDeleteConsumeGroup: " + strings.Join(addConsumerGroupToTargetClusterCommand, " "))
cmd := exec.Command(cons.BasicCommand, addConsumerGroupToTargetClusterCommand...)
output, err := cmd.Output()
if err != nil || !isUpdateConsumerGroupSuccess(string(output)) {
log.Error(err, "Failed to undo delete consume group with output: "+string(output))
Expand All @@ -326,7 +326,7 @@ func undoDeleteConsumeGroup(consumerGroups []string, cluster string, nameServer
func getConsumerGroupByTopic(topic string, nameServer string) []string {
var consumerGroups []string
topicListCmd := buildTopicListCommand(nameServer)
cmd := exec.Command(cons.BasicCommand, cons.AdminToolDir, topicListCmd)
cmd := exec.Command(cons.BasicCommand, topicListCmd...)
output, err := cmd.Output()
if err != nil || !isTopicListSuccess(string(output)) {
log.Error(err, "Failed to list topic with output: "+string(output))
Expand Down Expand Up @@ -371,8 +371,9 @@ func isUpdateConsumerGroupSuccess(s string) bool {
return strings.Contains(s, "groupName")
}

func buildUndoStopWriteCommand(topic string, cluster string, nameServer string) string {
func buildUndoStopWriteCommand(topic string, cluster string, nameServer string) []string {
cmdOpts := []string{
cons.AdminToolDir,
"updatetopic",
"-t",
topic,
Expand All @@ -387,21 +388,23 @@ func buildUndoStopWriteCommand(topic string, cluster string, nameServer string)
"-n",
nameServer,
}
return strings.Join(cmdOpts, " ")
return cmdOpts
}

func buildTopicListCommand(nameServer string) string {
func buildTopicListCommand(nameServer string) []string {
cmdOpts := []string{
cons.AdminToolDir,
"topiclist",
"-c",
"-n",
nameServer,
}
return strings.Join(cmdOpts, " ")
return cmdOpts
}

func buildAddRetryTopicToClusterCommand(consumerGroup string, cluster string, nameServer string) string {
func buildAddRetryTopicToClusterCommand(consumerGroup string, cluster string, nameServer string) []string {
cmdOpts := []string{
cons.AdminToolDir,
"updatetopic",
"-t",
"%RETRY%" + consumerGroup,
Expand All @@ -416,7 +419,7 @@ func buildAddRetryTopicToClusterCommand(consumerGroup string, cluster string, na
"-n",
nameServer,
}
return strings.Join(cmdOpts, " ")
return cmdOpts
}

func getClusterBrokerNames(cluster string) []string {
Expand Down Expand Up @@ -446,8 +449,9 @@ func isConsumeFinished(output string, topic string, cluster string) bool {
return true
}

func buildDeleteConsumeGroupCommand(consumerGroup string, cluster string, nameServer string) string {
func buildDeleteConsumeGroupCommand(consumerGroup string, cluster string, nameServer string) []string {
cmdOpts := []string{
cons.AdminToolDir,
"deleteSubGroup",
"-g",
consumerGroup,
Expand All @@ -456,11 +460,12 @@ func buildDeleteConsumeGroupCommand(consumerGroup string, cluster string, nameSe
"-n",
nameServer,
}
return strings.Join(cmdOpts, " ")
return cmdOpts
}

func buildDeleteSourceClusterTopicCommand(topic string, sourceCluster string, nameServer string) string {
func buildDeleteSourceClusterTopicCommand(topic string, sourceCluster string, nameServer string) []string {
cmdOpts := []string{
cons.AdminToolDir,
"deletetopic",
"-t",
topic,
Expand All @@ -469,22 +474,24 @@ func buildDeleteSourceClusterTopicCommand(topic string, sourceCluster string, na
"-n",
nameServer,
}
return strings.Join(cmdOpts, " ")
return cmdOpts
}

func buildCheckConsumeProgressCommand(consumerGroup string, nameServer string) string {
func buildCheckConsumeProgressCommand(consumerGroup string, nameServer string) []string {
cmdOpts := []string{
cons.AdminToolDir,
"consumerprogress",
"-g",
consumerGroup,
"-n",
nameServer,
}
return strings.Join(cmdOpts, " ")
return cmdOpts
}

func buildStopClusterTopicWriteCommand(topic string, cluster string, nameServer string) string {
func buildStopClusterTopicWriteCommand(topic string, cluster string, nameServer string) []string {
cmdOpts := []string{
cons.AdminToolDir,
"updatetopic",
"-t",
topic,
Expand All @@ -497,11 +504,12 @@ func buildStopClusterTopicWriteCommand(topic string, cluster string, nameServer
"-n",
nameServer,
}
return strings.Join(cmdOpts, " ")
return cmdOpts
}

func buildAddConsumerGroupToClusterCommand(consumerGroup string, cluster string, nameServer string) string {
func buildAddConsumerGroupToClusterCommand(consumerGroup string, cluster string, nameServer string) []string {
cmdOpts := []string{
cons.AdminToolDir,
"updatesubgroup",
"-g",
consumerGroup,
Expand All @@ -514,11 +522,12 @@ func buildAddConsumerGroupToClusterCommand(consumerGroup string, cluster string,
"-n",
nameServer,
}
return strings.Join(cmdOpts, " ")
return cmdOpts
}

func buildAddTopicToClusterCommand(topic string, cluster string, nameServer string) string {
func buildAddTopicToClusterCommand(topic string, cluster string, nameServer string) []string {
cmdOpts := []string{
cons.AdminToolDir,
"updatetopic",
"-t",
topic,
Expand All @@ -527,5 +536,5 @@ func buildAddTopicToClusterCommand(topic string, cluster string, nameServer stri
"-n",
nameServer,
}
return strings.Join(cmdOpts, " ")
return cmdOpts
}

0 comments on commit 9b37e83

Please sign in to comment.