Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ISSUE #219] create topictransfer encounter Failed to list topic with output: The sub command topiclist -c -n 192.168.25.11:9876 not exist. #220

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
87 changes: 48 additions & 39 deletions pkg/controller/topictransfer/topictransfer_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,8 +166,8 @@ func (r *ReconcileTopicTransfer) Reconcile(ctx context.Context, request reconcil
for i, consumerGroup := range consumerGroups {
log.Info("Processing consumer group" + consumerGroup + " " + strconv.Itoa(i+1) + "/" + strconv.Itoa(len(consumerGroups)))
addConsumerGroupToTargetClusterCommand := buildAddConsumerGroupToClusterCommand(consumerGroup, targetCluster, nameServer)
reqLogger.Info("AddConsumerGroupToTargetClusterCommand: " + addConsumerGroupToTargetClusterCommand)
cmd := exec.Command(cons.BasicCommand, cons.AdminToolDir, addConsumerGroupToTargetClusterCommand)
reqLogger.Info("AddConsumerGroupToTargetClusterCommand: " + strings.Join(addConsumerGroupToTargetClusterCommand, " "))
cmd := exec.Command(cons.BasicCommand, addConsumerGroupToTargetClusterCommand...)
output, err := cmd.Output()
// validate command output
if err != nil || !isUpdateConsumerGroupSuccess(string(output)) {
Expand All @@ -182,8 +182,8 @@ func (r *ReconcileTopicTransfer) Reconcile(ctx context.Context, request reconcil
// step2: add consumer group to target cluster
status = 2
addTopicToTargetClusterCommand := buildAddTopicToClusterCommand(topic, targetCluster, nameServer)
reqLogger.Info("addTopicToTargetClusterCommand: " + addTopicToTargetClusterCommand)
cmd := exec.Command(cons.BasicCommand, cons.AdminToolDir, addTopicToTargetClusterCommand)
reqLogger.Info("addTopicToTargetClusterCommand: " + strings.Join(addTopicToTargetClusterCommand, " "))
cmd := exec.Command(cons.BasicCommand, addTopicToTargetClusterCommand...)
output, err := cmd.Output()
// validate command output
if err != nil || !isUpdateTopicCommandSuccess(string(output)) {
Expand All @@ -197,8 +197,8 @@ func (r *ReconcileTopicTransfer) Reconcile(ctx context.Context, request reconcil
// step3: stop write in source cluster topic
status = 3
stopSourceClusterTopicWriteCommand := buildStopClusterTopicWriteCommand(topic, sourceCluster, nameServer)
reqLogger.Info("stopSourceClusterTopicWriteCommand: " + stopSourceClusterTopicWriteCommand)
cmd = exec.Command(cons.BasicCommand, cons.AdminToolDir, stopSourceClusterTopicWriteCommand)
reqLogger.Info("stopSourceClusterTopicWriteCommand: " + strings.Join(stopSourceClusterTopicWriteCommand, " "))
cmd = exec.Command(cons.BasicCommand, stopSourceClusterTopicWriteCommand...)
output, err = cmd.Output()
// validate command output
if err != nil || !isUpdateTopicCommandSuccess(string(output)) {
Expand All @@ -215,8 +215,8 @@ func (r *ReconcileTopicTransfer) Reconcile(ctx context.Context, request reconcil
log.Info("Processing consumer group" + consumerGroup + " " + strconv.Itoa(i+1) + "/" + strconv.Itoa(len(consumerGroups)))
for {
checkConsumeProgressCommand := buildCheckConsumeProgressCommand(consumerGroup, nameServer)
reqLogger.Info("checkConsumeProgressCommand: " + checkConsumeProgressCommand)
cmd = exec.Command(cons.BasicCommand, cons.AdminToolDir, checkConsumeProgressCommand)
reqLogger.Info("checkConsumeProgressCommand: " + strings.Join(checkConsumeProgressCommand, " "))
cmd = exec.Command(cons.BasicCommand, checkConsumeProgressCommand...)
output, err = cmd.Output()
if err != nil || !isCheckConsumeProcessCommandSuccess(string(output)) {
reqLogger.Error(err, "Failed to check consumerGroup "+consumerGroup+" with output: "+string(output))
Expand All @@ -237,8 +237,8 @@ func (r *ReconcileTopicTransfer) Reconcile(ctx context.Context, request reconcil
// step5: delete topic in source cluster
status = 5
deleteSourceClusterTopicCommand := buildDeleteSourceClusterTopicCommand(topic, sourceCluster, nameServer)
reqLogger.Info("deleteSourceClusterTopicCommand: " + deleteSourceClusterTopicCommand)
cmd = exec.Command(cons.BasicCommand, cons.AdminToolDir, deleteSourceClusterTopicCommand)
reqLogger.Info("deleteSourceClusterTopicCommand: " + strings.Join(deleteSourceClusterTopicCommand, " "))
cmd = exec.Command(cons.BasicCommand, deleteSourceClusterTopicCommand...)
output, err = cmd.Output()
if err != nil || !isDeleteTopicCommandSuccess(string(output)) {
reqLogger.Error(err, "Failed to delete Topic "+topic+" in SourceCluster "+sourceCluster+" with output: "+string(output))
Expand All @@ -253,8 +253,8 @@ func (r *ReconcileTopicTransfer) Reconcile(ctx context.Context, request reconcil
for i, consumerGroup := range consumerGroups {
log.Info("Processing consumer group" + consumerGroup + " " + strconv.Itoa(i+1) + "/" + strconv.Itoa(len(consumerGroups)))
deleteConsumerGroupCommand := buildDeleteConsumeGroupCommand(consumerGroup, sourceCluster, nameServer)
reqLogger.Info("deleteConsumerGroupCommand: " + deleteConsumerGroupCommand)
cmd = exec.Command(cons.BasicCommand, cons.AdminToolDir, deleteConsumerGroupCommand)
reqLogger.Info("deleteConsumerGroupCommand: " + strings.Join(deleteConsumerGroupCommand, " "))
cmd = exec.Command(cons.BasicCommand, deleteConsumerGroupCommand...)
output, err = cmd.Output()
if err != nil || !isDeleteConsumerGroupSuccess(string(output)) {
reqLogger.Error(err, "Failed to delete consumer group "+consumerGroup+" in SourceCluster "+sourceCluster+" with output: "+string(output))
Expand All @@ -270,8 +270,8 @@ func (r *ReconcileTopicTransfer) Reconcile(ctx context.Context, request reconcil
for i, consumerGroup := range consumerGroups {
log.Info("Processing consumer group" + consumerGroup + " " + strconv.Itoa(i+1) + "/" + strconv.Itoa(len(consumerGroups)))
createRetryTopicCommand := buildAddRetryTopicToClusterCommand(consumerGroup, targetCluster, nameServer)
reqLogger.Info("createRetryTopicCommand: " + createRetryTopicCommand)
cmd = exec.Command(cons.BasicCommand, cons.AdminToolDir, createRetryTopicCommand)
reqLogger.Info("createRetryTopicCommand: " + strings.Join(createRetryTopicCommand, " "))
cmd = exec.Command(cons.BasicCommand, createRetryTopicCommand...)
output, err = cmd.Output()
if err != nil || !isUpdateTopicCommandSuccess(string(output)) {
reqLogger.Error(err, "Failed to create retry topic of consumer group "+consumerGroup+" in TargetCluster "+targetCluster+" with output: "+string(output))
Expand All @@ -290,8 +290,8 @@ func (r *ReconcileTopicTransfer) Reconcile(ctx context.Context, request reconcil

func undoStopWrite(topic string, cluster string, nameServer string) {
addTopicToClusterCommand := buildUndoStopWriteCommand(topic, cluster, nameServer)
log.Info("undoStopWrite: " + addTopicToClusterCommand)
cmd := exec.Command(cons.BasicCommand, cons.AdminToolDir, addTopicToClusterCommand)
log.Info("undoStopWrite: " + strings.Join(addTopicToClusterCommand, " "))
cmd := exec.Command(cons.BasicCommand, addTopicToClusterCommand...)
output, err := cmd.Output()
if err != nil || !isUpdateTopicCommandSuccess(string(output)) {
log.Error(err, "Failed to undo stop write topic with output: "+string(output))
Expand All @@ -301,8 +301,8 @@ func undoStopWrite(topic string, cluster string, nameServer string) {

func undoDeleteTopic(topic string, cluster string, nameServer string) {
addTopicToClusterCommand := buildAddTopicToClusterCommand(topic, cluster, nameServer)
log.Info("undoDeleteTopic: " + addTopicToClusterCommand)
cmd := exec.Command(cons.BasicCommand, cons.AdminToolDir, addTopicToClusterCommand)
log.Info("undoDeleteTopic: " + strings.Join(addTopicToClusterCommand, " "))
cmd := exec.Command(cons.BasicCommand, addTopicToClusterCommand...)
output, err := cmd.Output()
if err != nil || !isUpdateTopicCommandSuccess(string(output)) {
log.Error(err, "Failed to undo delete topic with output: "+string(output))
Expand All @@ -313,8 +313,8 @@ func undoDeleteTopic(topic string, cluster string, nameServer string) {
func undoDeleteConsumeGroup(consumerGroups []string, cluster string, nameServer string) {
for _, consumerGroup := range consumerGroups {
addConsumerGroupToTargetClusterCommand := buildAddConsumerGroupToClusterCommand(consumerGroup, cluster, nameServer)
log.Info("undoDeleteConsumeGroup: " + addConsumerGroupToTargetClusterCommand)
cmd := exec.Command(cons.BasicCommand, cons.AdminToolDir, addConsumerGroupToTargetClusterCommand)
log.Info("undoDeleteConsumeGroup: " + strings.Join(addConsumerGroupToTargetClusterCommand, " "))
cmd := exec.Command(cons.BasicCommand, addConsumerGroupToTargetClusterCommand...)
output, err := cmd.Output()
if err != nil || !isUpdateConsumerGroupSuccess(string(output)) {
log.Error(err, "Failed to undo delete consume group with output: "+string(output))
Expand All @@ -326,7 +326,7 @@ func undoDeleteConsumeGroup(consumerGroups []string, cluster string, nameServer
func getConsumerGroupByTopic(topic string, nameServer string) []string {
var consumerGroups []string
topicListCmd := buildTopicListCommand(nameServer)
cmd := exec.Command(cons.BasicCommand, cons.AdminToolDir, topicListCmd)
cmd := exec.Command(cons.BasicCommand, topicListCmd...)
output, err := cmd.Output()
if err != nil || !isTopicListSuccess(string(output)) {
log.Error(err, "Failed to list topic with output: "+string(output))
Expand Down Expand Up @@ -371,8 +371,9 @@ func isUpdateConsumerGroupSuccess(s string) bool {
return strings.Contains(s, "groupName")
}

func buildUndoStopWriteCommand(topic string, cluster string, nameServer string) string {
func buildUndoStopWriteCommand(topic string, cluster string, nameServer string) []string {
cmdOpts := []string{
cons.AdminToolDir,
"updatetopic",
"-t",
topic,
Expand All @@ -387,21 +388,23 @@ func buildUndoStopWriteCommand(topic string, cluster string, nameServer string)
"-n",
nameServer,
}
return strings.Join(cmdOpts, " ")
return cmdOpts
}

func buildTopicListCommand(nameServer string) string {
func buildTopicListCommand(nameServer string) []string {
cmdOpts := []string{
cons.AdminToolDir,
"topiclist",
"-c",
"-n",
nameServer,
}
return strings.Join(cmdOpts, " ")
return cmdOpts
}

func buildAddRetryTopicToClusterCommand(consumerGroup string, cluster string, nameServer string) string {
func buildAddRetryTopicToClusterCommand(consumerGroup string, cluster string, nameServer string) []string {
cmdOpts := []string{
cons.AdminToolDir,
"updatetopic",
"-t",
"%RETRY%" + consumerGroup,
Expand All @@ -416,7 +419,7 @@ func buildAddRetryTopicToClusterCommand(consumerGroup string, cluster string, na
"-n",
nameServer,
}
return strings.Join(cmdOpts, " ")
return cmdOpts
}

func getClusterBrokerNames(cluster string) []string {
Expand Down Expand Up @@ -446,8 +449,9 @@ func isConsumeFinished(output string, topic string, cluster string) bool {
return true
}

func buildDeleteConsumeGroupCommand(consumerGroup string, cluster string, nameServer string) string {
func buildDeleteConsumeGroupCommand(consumerGroup string, cluster string, nameServer string) []string {
cmdOpts := []string{
cons.AdminToolDir,
"deleteSubGroup",
"-g",
consumerGroup,
Expand All @@ -456,11 +460,12 @@ func buildDeleteConsumeGroupCommand(consumerGroup string, cluster string, nameSe
"-n",
nameServer,
}
return strings.Join(cmdOpts, " ")
return cmdOpts
}

func buildDeleteSourceClusterTopicCommand(topic string, sourceCluster string, nameServer string) string {
func buildDeleteSourceClusterTopicCommand(topic string, sourceCluster string, nameServer string) []string {
cmdOpts := []string{
cons.AdminToolDir,
"deletetopic",
"-t",
topic,
Expand All @@ -469,22 +474,24 @@ func buildDeleteSourceClusterTopicCommand(topic string, sourceCluster string, na
"-n",
nameServer,
}
return strings.Join(cmdOpts, " ")
return cmdOpts
}

func buildCheckConsumeProgressCommand(consumerGroup string, nameServer string) string {
func buildCheckConsumeProgressCommand(consumerGroup string, nameServer string) []string {
cmdOpts := []string{
cons.AdminToolDir,
"consumerprogress",
"-g",
consumerGroup,
"-n",
nameServer,
}
return strings.Join(cmdOpts, " ")
return cmdOpts
}

func buildStopClusterTopicWriteCommand(topic string, cluster string, nameServer string) string {
func buildStopClusterTopicWriteCommand(topic string, cluster string, nameServer string) []string {
cmdOpts := []string{
cons.AdminToolDir,
"updatetopic",
"-t",
topic,
Expand All @@ -497,11 +504,12 @@ func buildStopClusterTopicWriteCommand(topic string, cluster string, nameServer
"-n",
nameServer,
}
return strings.Join(cmdOpts, " ")
return cmdOpts
}

func buildAddConsumerGroupToClusterCommand(consumerGroup string, cluster string, nameServer string) string {
func buildAddConsumerGroupToClusterCommand(consumerGroup string, cluster string, nameServer string) []string {
cmdOpts := []string{
cons.AdminToolDir,
"updatesubgroup",
"-g",
consumerGroup,
Expand All @@ -514,11 +522,12 @@ func buildAddConsumerGroupToClusterCommand(consumerGroup string, cluster string,
"-n",
nameServer,
}
return strings.Join(cmdOpts, " ")
return cmdOpts
}

func buildAddTopicToClusterCommand(topic string, cluster string, nameServer string) string {
func buildAddTopicToClusterCommand(topic string, cluster string, nameServer string) []string {
cmdOpts := []string{
cons.AdminToolDir,
"updatetopic",
"-t",
topic,
Expand All @@ -527,5 +536,5 @@ func buildAddTopicToClusterCommand(topic string, cluster string, nameServer stri
"-n",
nameServer,
}
return strings.Join(cmdOpts, " ")
return cmdOpts
}