From 92547948b1fe1d159ace8ebd53445ae13f6d0115 Mon Sep 17 00:00:00 2001 From: linjiemiao Date: Mon, 21 Sep 2020 15:07:45 +0800 Subject: [PATCH 1/9] eliminate share variable: broker group name --- pkg/constants/constants.go | 3 ++ pkg/controller/broker/broker_controller.go | 1 - .../nameservice/nameservice_controller.go | 30 +++++++++++++++++-- pkg/share/share.go | 3 -- 4 files changed, 30 insertions(+), 7 deletions(-) diff --git a/pkg/constants/constants.go b/pkg/constants/constants.go index 653b3d7a..4af520fd 100644 --- a/pkg/constants/constants.go +++ b/pkg/constants/constants.go @@ -55,6 +55,9 @@ const ( // UpdateBrokerConfig is update broker config command UpdateBrokerConfig = "updateBrokerConfig" + // ClusterList is the command of cluster list + ClusterList = "clusterList" + // ParamNameServiceAddress is the name of name server list parameter ParamNameServiceAddress = "namesrvAddr" diff --git a/pkg/controller/broker/broker_controller.go b/pkg/controller/broker/broker_controller.go index 030a3d65..aca5dbbd 100644 --- a/pkg/controller/broker/broker_controller.go +++ b/pkg/controller/broker/broker_controller.go @@ -165,7 +165,6 @@ func (r *ReconcileBroker) Reconcile(ctx context.Context, request reconcile.Reque return reconcile.Result{Requeue: true, RequeueAfter: time.Duration(cons.RequeueIntervalInSecond) * time.Second}, nil } - share.BrokerClusterName = broker.Name replicaPerGroup := broker.Spec.ReplicaPerGroup reqLogger.Info("brokerGroupNum=" + strconv.Itoa(share.GroupNum) + ", replicaPerGroup=" + strconv.Itoa(replicaPerGroup)) for brokerGroupIndex := 0; brokerGroupIndex < share.GroupNum; brokerGroupIndex++ { diff --git a/pkg/controller/nameservice/nameservice_controller.go b/pkg/controller/nameservice/nameservice_controller.go index ece34de1..05d6ff70 100644 --- a/pkg/controller/nameservice/nameservice_controller.go +++ b/pkg/controller/nameservice/nameservice_controller.go @@ -217,13 +217,37 @@ func (r *ReconcileNameService) updateNameServiceStatus(instance *rocketmqv1alpha // use admin tool to update broker config if share.IsNameServersStrUpdated && (len(oldNameServerListStr) > cons.MinIpListLength) && (len(share.NameServersStr) > cons.MinIpListLength) { + // bash-4.4$ ./mqadmin clusterList -n 192.168.180.36:9876 + // #Cluster Name #Broker Name #BID #Addr #Version #InTPS(LOAD) #OutTPS(LOAD) #PCWait(ms) #Hour #SPACE + // broker broker-0 0 192.168.180.40:10911 V4_5_0 0.00(0,0ms) 0.00(0,0ms) 0 471030.34 -1.0000 + // broker broker-0 1 192.168.137.89:10911 V4_5_0 0.00(0,0ms) 0.00(0,0ms) 0 471030.34 0.2673 + clusterListCmd := exec.Command("sh", cons.AdminToolDir, cons.ClusterList, "-n", oldNameServerListStr) + clusterListOutput, err := clusterListCmd.Output() + if err != nil { + reqLogger.Error(err, "Get cluster list failed, command: "+cons.AdminToolDir+" "+cons.ClusterList+" -n "+oldNameServerListStr) + return reconcile.Result{Requeue: true}, err + } + // get cluster of output + clusterName := "" + for _, line := range strings.Split(string(clusterListOutput), "\n") { + if strings.HasPrefix(line, "#Cluster Name") { + continue + } + for _, f := range strings.Fields(line) { + clusterName = f + break + } + } + if clusterName == "" { + reqLogger.Error(err, "Get empty cluster name, command: "+cons.AdminToolDir+" "+cons.ClusterList+" -n "+oldNameServerListStr) + return reconcile.Result{Requeue: true}, err + } + mqAdmin := cons.AdminToolDir subCmd := cons.UpdateBrokerConfig key := cons.ParamNameServiceAddress reqLogger.Info("share.GroupNum=broker.Spec.Size=" + strconv.Itoa(share.GroupNum)) - - clusterName := share.BrokerClusterName reqLogger.Info("Updating config " + key + " of cluster" + clusterName) command := mqAdmin + " " + subCmd + " -c " + clusterName + " -k " + key + " -n " + oldNameServerListStr + " -v " + share.NameServersStr cmd := exec.Command("sh", mqAdmin, subCmd, "-c", clusterName, "-k", key, "-n", oldNameServerListStr, "-v", share.NameServersStr) @@ -249,7 +273,7 @@ func (r *ReconcileNameService) updateNameServiceStatus(instance *rocketmqv1alpha reqLogger.Info("Share variables", "GroupNum", share.GroupNum, "NameServersStr", share.NameServersStr, "IsNameServersStrUpdated", share.IsNameServersStrUpdated, - "IsNameServersStrInitialized", share.IsNameServersStrInitialized, "BrokerClusterName", share.BrokerClusterName) + "IsNameServersStrInitialized", share.IsNameServersStrInitialized) if requeue { return reconcile.Result{Requeue: true, RequeueAfter: time.Duration(cons.RequeueIntervalInSecond) * time.Second}, nil diff --git a/pkg/share/share.go b/pkg/share/share.go index 0656bc28..9476c1b9 100644 --- a/pkg/share/share.go +++ b/pkg/share/share.go @@ -31,9 +31,6 @@ var ( // IsNameServersStrInitialized is whether the name server list is initialized IsNameServersStrInitialized = false - // BrokerClusterName is the broker cluster name - BrokerClusterName = "" - // svc of controller for brokers ControllerAccessPoint = "" ) From fe34a956c22ad1777bcfbde693c5468ba8368a93 Mon Sep 17 00:00:00 2001 From: drivebyer Date: Tue, 26 Sep 2023 14:38:40 +0800 Subject: [PATCH 2/9] add rocketmq name field --- pkg/apis/rocketmq/v1alpha1/broker_types.go | 4 ++++ pkg/apis/rocketmq/v1alpha1/console_types.go | 3 +++ pkg/apis/rocketmq/v1alpha1/controller_types.go | 3 +++ pkg/apis/rocketmq/v1alpha1/nameservice_types.go | 3 +++ pkg/apis/rocketmq/v1alpha1/topictransfer_types.go | 3 +++ 5 files changed, 16 insertions(+) diff --git a/pkg/apis/rocketmq/v1alpha1/broker_types.go b/pkg/apis/rocketmq/v1alpha1/broker_types.go index 1647f9a9..14f81f4d 100644 --- a/pkg/apis/rocketmq/v1alpha1/broker_types.go +++ b/pkg/apis/rocketmq/v1alpha1/broker_types.go @@ -28,6 +28,10 @@ import ( // BrokerSpec defines the desired state of Broker // +k8s:openapi-gen=true type BrokerSpec struct { + // RocketMqName is the name of the RocketMQ cluster. + // +kubebuilder:default:="rocketmq" + RocketMqName string `json:"rocketMqName"` + // INSERT ADDITIONAL SPEC FIELDS - desired state of cluster // Important: Run "operator-sdk generate k8s" to regenerate code after modifying this file // Add custom validation using kubebuilder tags: https://book-v1.book.kubebuilder.io/beyond_basics/generating_crd.html diff --git a/pkg/apis/rocketmq/v1alpha1/console_types.go b/pkg/apis/rocketmq/v1alpha1/console_types.go index 00feac54..2ea7ed04 100644 --- a/pkg/apis/rocketmq/v1alpha1/console_types.go +++ b/pkg/apis/rocketmq/v1alpha1/console_types.go @@ -28,6 +28,9 @@ import ( // ConsoleSpec defines the desired state of Console // +k8s:openapi-gen=true type ConsoleSpec struct { + // RocketMqName is the name of the RocketMQ cluster + RocketMqName string `json:"rocketMqName"` + // INSERT ADDITIONAL SPEC FIELDS - desired state of cluster // Important: Run "operator-sdk generate k8s" to regenerate code after modifying this file // Add custom validation using kubebuilder tags: https://book-v1.book.kubebuilder.io/beyond_basics/generating_crd.html diff --git a/pkg/apis/rocketmq/v1alpha1/controller_types.go b/pkg/apis/rocketmq/v1alpha1/controller_types.go index 5e8152c8..3e020d86 100644 --- a/pkg/apis/rocketmq/v1alpha1/controller_types.go +++ b/pkg/apis/rocketmq/v1alpha1/controller_types.go @@ -28,6 +28,9 @@ import ( // ControllerSpec defines the desired state of Controller // +k8s:openapi-gen=true type ControllerSpec struct { + // RocketMqName is the name of the RocketMQ cluster + RocketMqName string `json:"rocketMqName"` + // INSERT ADDITIONAL SPEC FIELDS - desired state of cluster // Important: Run "operator-sdk generate k8s" to regenerate code after modifying this file // Add custom validation using kubebuilder tags: https://book-v1.book.kubebuilder.io/beyond_basics/generating_crd.html diff --git a/pkg/apis/rocketmq/v1alpha1/nameservice_types.go b/pkg/apis/rocketmq/v1alpha1/nameservice_types.go index a6f4ace5..ead78752 100644 --- a/pkg/apis/rocketmq/v1alpha1/nameservice_types.go +++ b/pkg/apis/rocketmq/v1alpha1/nameservice_types.go @@ -28,6 +28,9 @@ import ( // NameServiceSpec defines the desired state of NameService // +k8s:openapi-gen=true type NameServiceSpec struct { + // RocketMqName is the name of the RocketMQ cluster + RocketMqName string `json:"rocketMqName"` + // INSERT ADDITIONAL SPEC FIELDS - desired state of cluster // Important: Run "operator-sdk generate k8s" to regenerate code after modifying this file // Add custom validation using kubebuilder tags: https://book-v1.book.kubebuilder.io/beyond_basics/generating_crd.html diff --git a/pkg/apis/rocketmq/v1alpha1/topictransfer_types.go b/pkg/apis/rocketmq/v1alpha1/topictransfer_types.go index 82a7e37e..5d40f234 100644 --- a/pkg/apis/rocketmq/v1alpha1/topictransfer_types.go +++ b/pkg/apis/rocketmq/v1alpha1/topictransfer_types.go @@ -27,6 +27,9 @@ import ( // TopicTransferSpec defines the desired state of TopicTransfer // +k8s:openapi-gen=true type TopicTransferSpec struct { + // RocketMqName is the name of the RocketMQ cluster + RocketMqName string `json:"rocketMqName"` + // INSERT ADDITIONAL SPEC FIELDS - desired state of cluster // Important: Run "operator-sdk generate k8s" to regenerate code after modifying this file // Add custom validation using kubebuilder tags: https://book-v1.book.kubebuilder.io/beyond_basics/generating_crd.html From 6f14b62599bc3ce410be3ab3027b50426f0f8a9d Mon Sep 17 00:00:00 2001 From: drivebyer Date: Tue, 26 Sep 2023 17:45:12 +0800 Subject: [PATCH 3/9] eliminate share variable: controller access point --- pkg/controller/broker/broker_controller.go | 45 +++++++++++++++---- .../controller/dledger_controller.go | 3 -- pkg/share/share.go | 3 -- 3 files changed, 36 insertions(+), 15 deletions(-) diff --git a/pkg/controller/broker/broker_controller.go b/pkg/controller/broker/broker_controller.go index aca5dbbd..28460347 100644 --- a/pkg/controller/broker/broker_controller.go +++ b/pkg/controller/broker/broker_controller.go @@ -25,6 +25,8 @@ import ( "strings" "time" + "k8s.io/apimachinery/pkg/fields" + "github.com/google/uuid" rocketmqv1alpha1 "github.com/apache/rocketmq-operator/pkg/apis/rocketmq/v1alpha1" @@ -160,7 +162,8 @@ func (r *ReconcileBroker) Reconcile(ctx context.Context, request reconcile.Reque broker.Spec.ClusterMode = "STATIC" } - if broker.Spec.ClusterMode == "CONTROLLER" && share.ControllerAccessPoint == "" { + controllerAccessPoint := r.getControllerAccessPoint(broker.Namespace, broker.Spec.RocketMqName) + if broker.Spec.ClusterMode == "CONTROLLER" && controllerAccessPoint == "" { log.Info("Broker Waiting for Controller ready...") return reconcile.Result{Requeue: true, RequeueAfter: time.Duration(cons.RequeueIntervalInSecond) * time.Second}, nil } @@ -169,7 +172,7 @@ func (r *ReconcileBroker) Reconcile(ctx context.Context, request reconcile.Reque reqLogger.Info("brokerGroupNum=" + strconv.Itoa(share.GroupNum) + ", replicaPerGroup=" + strconv.Itoa(replicaPerGroup)) for brokerGroupIndex := 0; brokerGroupIndex < share.GroupNum; brokerGroupIndex++ { reqLogger.Info("Check Broker cluster " + strconv.Itoa(brokerGroupIndex+1) + "/" + strconv.Itoa(share.GroupNum)) - dep := r.getBrokerStatefulSet(broker, brokerGroupIndex, 0) + dep := r.getBrokerStatefulSet(broker, brokerGroupIndex, 0, controllerAccessPoint) // Check if the statefulSet already exists, if not create a new one found := &appsv1.StatefulSet{} err = r.client.Get(context.TODO(), types.NamespacedName{Name: dep.Name, Namespace: dep.Namespace}, found) @@ -185,7 +188,7 @@ func (r *ReconcileBroker) Reconcile(ctx context.Context, request reconcile.Reque for replicaIndex := 1; replicaIndex <= replicaPerGroup; replicaIndex++ { reqLogger.Info("Check Replica Broker of cluster-" + strconv.Itoa(brokerGroupIndex) + " " + strconv.Itoa(replicaIndex) + "/" + strconv.Itoa(replicaPerGroup)) - replicaDep := r.getBrokerStatefulSet(broker, brokerGroupIndex, replicaIndex) + replicaDep := r.getBrokerStatefulSet(broker, brokerGroupIndex, replicaIndex, controllerAccessPoint) err = r.client.Get(context.TODO(), types.NamespacedName{Name: replicaDep.Name, Namespace: replicaDep.Namespace}, found) if err != nil && errors.IsNotFound(err) { reqLogger.Info("Creating a new Replica Broker StatefulSet.", "StatefulSet.Namespace", replicaDep.Namespace, "StatefulSet.Name", replicaDep.Name) @@ -207,7 +210,7 @@ func (r *ReconcileBroker) Reconcile(ctx context.Context, request reconcile.Reque brokerName := getBrokerName(broker, brokerGroupIndex) // Update master broker reqLogger.Info("Update Master Broker NAMESRV_ADDR of " + brokerName) - dep := r.getBrokerStatefulSet(broker, brokerGroupIndex, 0) + dep := r.getBrokerStatefulSet(broker, brokerGroupIndex, 0, controllerAccessPoint) found := &appsv1.StatefulSet{} err = r.client.Get(context.TODO(), types.NamespacedName{Name: dep.Name, Namespace: dep.Namespace}, found) if err != nil { @@ -225,7 +228,7 @@ func (r *ReconcileBroker) Reconcile(ctx context.Context, request reconcile.Reque // Update replicas brokers for replicaIndex := 1; replicaIndex <= replicaPerGroup; replicaIndex++ { reqLogger.Info("Update Replica Broker NAMESRV_ADDR of " + brokerName + " " + strconv.Itoa(replicaIndex) + "/" + strconv.Itoa(replicaPerGroup)) - replicaDep := r.getBrokerStatefulSet(broker, brokerGroupIndex, replicaIndex) + replicaDep := r.getBrokerStatefulSet(broker, brokerGroupIndex, replicaIndex, controllerAccessPoint) replicaFound := &appsv1.StatefulSet{} err = r.client.Get(context.TODO(), types.NamespacedName{Name: replicaDep.Name, Namespace: replicaDep.Namespace}, replicaFound) if err != nil { @@ -342,6 +345,30 @@ func (r *ReconcileBroker) Reconcile(ctx context.Context, request reconcile.Reque return reconcile.Result{Requeue: true, RequeueAfter: time.Duration(cons.RequeueIntervalInSecond) * time.Second}, nil } +func (r *ReconcileBroker) getControllerAccessPoint(namespace string, rocketMqName string) string { + controllerList := &rocketmqv1alpha1.ControllerList{} + err := r.client.List(context.TODO(), controllerList, &client.ListOptions{ + Namespace: namespace, + FieldSelector: fields.SelectorFromSet(fields.Set{ + "spec.rocketMqName": rocketMqName, + }), + }) + if err != nil { + log.Error(err, "Failed to list controller.", "Controller.Namespace", namespace, "Controller.Name", rocketMqName) + return "" + } + if len(controllerList.Items) != 1 { + return "" + } + + controller := controllerList.Items[0] + + if controller.Status.Size != controller.Spec.Size { + return "" + } + return tool.BuildSvcResourceName(controller.Name) + ":9878" +} + func getCopyMetadataJsonCommand(dir string, sourcePodName string, namespace string, k8s *tool.K8sClient) string { cmdOpts := buildInputCommand(dir) topicsJsonStr, err := exec(cmdOpts, sourcePodName, k8s, namespace) @@ -398,7 +425,7 @@ func getBrokerName(broker *rocketmqv1alpha1.Broker, brokerGroupIndex int) string } // getBrokerStatefulSet returns a broker StatefulSet object -func (r *ReconcileBroker) getBrokerStatefulSet(broker *rocketmqv1alpha1.Broker, brokerGroupIndex int, replicaIndex int) *appsv1.StatefulSet { +func (r *ReconcileBroker) getBrokerStatefulSet(broker *rocketmqv1alpha1.Broker, brokerGroupIndex int, replicaIndex int, controllerAccessPoint string) *appsv1.StatefulSet { ls := labelsForBroker(broker.Name) var a int32 = 1 var c = &a @@ -459,7 +486,7 @@ func (r *ReconcileBroker) getBrokerStatefulSet(broker *rocketmqv1alpha1.Broker, }, SecurityContext: getContainerSecurityContext(broker), ImagePullPolicy: broker.Spec.ImagePullPolicy, - Env: getENV(broker, replicaIndex, brokerGroupIndex), + Env: getENV(broker, replicaIndex, brokerGroupIndex, controllerAccessPoint), Ports: []corev1.ContainerPort{{ ContainerPort: cons.BrokerVipContainerPort, Name: cons.BrokerVipContainerPortName, @@ -498,7 +525,7 @@ func (r *ReconcileBroker) getBrokerStatefulSet(broker *rocketmqv1alpha1.Broker, } -func getENV(broker *rocketmqv1alpha1.Broker, replicaIndex int, brokerGroupIndex int) []corev1.EnvVar { +func getENV(broker *rocketmqv1alpha1.Broker, replicaIndex int, brokerGroupIndex int, controllerAccessPoint string) []corev1.EnvVar { envs := []corev1.EnvVar{{ Name: cons.EnvNameServiceAddress, Value: share.NameServersStr, @@ -514,7 +541,7 @@ func getENV(broker *rocketmqv1alpha1.Broker, replicaIndex int, brokerGroupIndex }} if broker.Spec.ClusterMode == "CONTROLLER" { envs = append(envs, corev1.EnvVar{Name: cons.EnvEnableControllerMode, Value: "true"}) - envs = append(envs, corev1.EnvVar{Name: cons.EnvControllerAddr, Value: share.ControllerAccessPoint}) + envs = append(envs, corev1.EnvVar{Name: cons.EnvControllerAddr, Value: controllerAccessPoint}) } envs = append(envs, broker.Spec.Env...) return envs diff --git a/pkg/controller/controller/dledger_controller.go b/pkg/controller/controller/dledger_controller.go index 1056f244..3e1818ce 100644 --- a/pkg/controller/controller/dledger_controller.go +++ b/pkg/controller/controller/dledger_controller.go @@ -29,7 +29,6 @@ import ( rocketmqv1alpha1 "github.com/apache/rocketmq-operator/pkg/apis/rocketmq/v1alpha1" cons "github.com/apache/rocketmq-operator/pkg/constants" - "github.com/apache/rocketmq-operator/pkg/share" "github.com/apache/rocketmq-operator/pkg/tool" appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" @@ -224,8 +223,6 @@ func (r *ReconcileController) Reconcile(ctx context.Context, request reconcile.R return reconcile.Result{}, err } } - share.ControllerAccessPoint = controllerSvcName + ":9878" - return reconcile.Result{Requeue: true, RequeueAfter: time.Duration(cons.RequeueIntervalInSecond) * time.Second}, nil } diff --git a/pkg/share/share.go b/pkg/share/share.go index 9476c1b9..38043481 100644 --- a/pkg/share/share.go +++ b/pkg/share/share.go @@ -30,7 +30,4 @@ var ( // IsNameServersStrInitialized is whether the name server list is initialized IsNameServersStrInitialized = false - - // svc of controller for brokers - ControllerAccessPoint = "" ) From d83e947c9f7d66c9da6145e7fe2081c2f6a716f1 Mon Sep 17 00:00:00 2001 From: drivebyer Date: Tue, 26 Sep 2023 17:48:12 +0800 Subject: [PATCH 4/9] eliminate share variable: group number --- pkg/controller/broker/broker_controller.go | 11 ++++++----- pkg/controller/nameservice/nameservice_controller.go | 3 +-- pkg/share/share.go | 3 --- 3 files changed, 7 insertions(+), 10 deletions(-) diff --git a/pkg/controller/broker/broker_controller.go b/pkg/controller/broker/broker_controller.go index 28460347..6965613c 100644 --- a/pkg/controller/broker/broker_controller.go +++ b/pkg/controller/broker/broker_controller.go @@ -138,10 +138,11 @@ func (r *ReconcileBroker) Reconcile(ctx context.Context, request reconcile.Reque return reconcile.Result{}, err } + var groupNum int if broker.Status.Size == 0 { - share.GroupNum = broker.Spec.Size + groupNum = broker.Spec.Size } else { - share.GroupNum = broker.Status.Size + groupNum = broker.Status.Size } if broker.Spec.NameServers == "" { @@ -169,9 +170,9 @@ func (r *ReconcileBroker) Reconcile(ctx context.Context, request reconcile.Reque } replicaPerGroup := broker.Spec.ReplicaPerGroup - reqLogger.Info("brokerGroupNum=" + strconv.Itoa(share.GroupNum) + ", replicaPerGroup=" + strconv.Itoa(replicaPerGroup)) - for brokerGroupIndex := 0; brokerGroupIndex < share.GroupNum; brokerGroupIndex++ { - reqLogger.Info("Check Broker cluster " + strconv.Itoa(brokerGroupIndex+1) + "/" + strconv.Itoa(share.GroupNum)) + reqLogger.Info("brokerGroupNum=" + strconv.Itoa(groupNum) + ", replicaPerGroup=" + strconv.Itoa(replicaPerGroup)) + for brokerGroupIndex := 0; brokerGroupIndex < groupNum; brokerGroupIndex++ { + reqLogger.Info("Check Broker cluster " + strconv.Itoa(brokerGroupIndex+1) + "/" + strconv.Itoa(groupNum)) dep := r.getBrokerStatefulSet(broker, brokerGroupIndex, 0, controllerAccessPoint) // Check if the statefulSet already exists, if not create a new one found := &appsv1.StatefulSet{} diff --git a/pkg/controller/nameservice/nameservice_controller.go b/pkg/controller/nameservice/nameservice_controller.go index 05d6ff70..b7381f72 100644 --- a/pkg/controller/nameservice/nameservice_controller.go +++ b/pkg/controller/nameservice/nameservice_controller.go @@ -247,7 +247,6 @@ func (r *ReconcileNameService) updateNameServiceStatus(instance *rocketmqv1alpha subCmd := cons.UpdateBrokerConfig key := cons.ParamNameServiceAddress - reqLogger.Info("share.GroupNum=broker.Spec.Size=" + strconv.Itoa(share.GroupNum)) reqLogger.Info("Updating config " + key + " of cluster" + clusterName) command := mqAdmin + " " + subCmd + " -c " + clusterName + " -k " + key + " -n " + oldNameServerListStr + " -v " + share.NameServersStr cmd := exec.Command("sh", mqAdmin, subCmd, "-c", clusterName, "-k", key, "-n", oldNameServerListStr, "-v", share.NameServersStr) @@ -271,7 +270,7 @@ func (r *ReconcileNameService) updateNameServiceStatus(instance *rocketmqv1alpha share.NameServersStr = nameServerListStr // reassign if operator restarts } - reqLogger.Info("Share variables", "GroupNum", share.GroupNum, + reqLogger.Info("Share variables", "NameServersStr", share.NameServersStr, "IsNameServersStrUpdated", share.IsNameServersStrUpdated, "IsNameServersStrInitialized", share.IsNameServersStrInitialized) diff --git a/pkg/share/share.go b/pkg/share/share.go index 38043481..cada66be 100644 --- a/pkg/share/share.go +++ b/pkg/share/share.go @@ -19,9 +19,6 @@ package share var ( - // GroupNum is the number of broker group - GroupNum = 0 - // NameServersStr is the name server list NameServersStr = "" From bd2abd2f147782838f12a3507e47a968210cc833 Mon Sep 17 00:00:00 2001 From: drivebyer Date: Tue, 26 Sep 2023 22:01:52 +0800 Subject: [PATCH 5/9] rocketMqName should not be required --- deploy/crds/rocketmq.apache.org_brokers.yaml | 3 +++ deploy/crds/rocketmq.apache.org_consoles.yaml | 3 +++ deploy/crds/rocketmq.apache.org_controllers.yaml | 3 +++ deploy/crds/rocketmq.apache.org_nameservices.yaml | 3 +++ deploy/crds/rocketmq.apache.org_topictransfers.yaml | 3 +++ pkg/apis/rocketmq/v1alpha1/broker_types.go | 3 +-- pkg/apis/rocketmq/v1alpha1/console_types.go | 2 +- pkg/apis/rocketmq/v1alpha1/controller_types.go | 2 +- pkg/apis/rocketmq/v1alpha1/nameservice_types.go | 2 +- pkg/apis/rocketmq/v1alpha1/topictransfer_types.go | 2 +- 10 files changed, 20 insertions(+), 6 deletions(-) diff --git a/deploy/crds/rocketmq.apache.org_brokers.yaml b/deploy/crds/rocketmq.apache.org_brokers.yaml index ffc0ad1b..e318d100 100644 --- a/deploy/crds/rocketmq.apache.org_brokers.yaml +++ b/deploy/crds/rocketmq.apache.org_brokers.yaml @@ -1203,6 +1203,9 @@ spec: to an implementation-defined value. More info: https://kubernetes.io/docs/concepts/configuration/manage-resources-containers/' type: object type: object + rocketMqName: + description: RocketMqName is the name of the RocketMQ cluster. + type: string scalePodName: description: The name of pod where the metadata from type: string diff --git a/deploy/crds/rocketmq.apache.org_consoles.yaml b/deploy/crds/rocketmq.apache.org_consoles.yaml index fab41e0c..e0394f3e 100644 --- a/deploy/crds/rocketmq.apache.org_consoles.yaml +++ b/deploy/crds/rocketmq.apache.org_consoles.yaml @@ -7880,6 +7880,9 @@ spec: nameServers: description: NameServers defines the name service list e.g. 192.168.1.1:9876;192.168.1.2:9876 type: string + rocketMqName: + description: RocketMqName is the name of the RocketMQ cluster + type: string required: - consoleDeployment type: object diff --git a/deploy/crds/rocketmq.apache.org_controllers.yaml b/deploy/crds/rocketmq.apache.org_controllers.yaml index 4f98506b..60d8864b 100644 --- a/deploy/crds/rocketmq.apache.org_controllers.yaml +++ b/deploy/crds/rocketmq.apache.org_controllers.yaml @@ -1178,6 +1178,9 @@ spec: to an implementation-defined value. More info: https://kubernetes.io/docs/concepts/configuration/manage-resources-containers/' type: object type: object + rocketMqName: + description: RocketMqName is the name of the RocketMQ cluster + type: string securityContext: description: Pod Security Context properties: diff --git a/deploy/crds/rocketmq.apache.org_nameservices.yaml b/deploy/crds/rocketmq.apache.org_nameservices.yaml index 315d85e9..a8dbaf93 100644 --- a/deploy/crds/rocketmq.apache.org_nameservices.yaml +++ b/deploy/crds/rocketmq.apache.org_nameservices.yaml @@ -1082,6 +1082,9 @@ spec: to an implementation-defined value. More info: https://kubernetes.io/docs/concepts/configuration/manage-resources-containers/' type: object type: object + rocketMqName: + description: RocketMqName is the name of the RocketMQ cluster + type: string securityContext: description: Pod Security Context properties: diff --git a/deploy/crds/rocketmq.apache.org_topictransfers.yaml b/deploy/crds/rocketmq.apache.org_topictransfers.yaml index 4a1f1dd8..f6e4c62a 100644 --- a/deploy/crds/rocketmq.apache.org_topictransfers.yaml +++ b/deploy/crds/rocketmq.apache.org_topictransfers.yaml @@ -36,6 +36,9 @@ spec: spec: description: TopicTransferSpec defines the desired state of TopicTransfer properties: + rocketMqName: + description: RocketMqName is the name of the RocketMQ cluster + type: string sourceCluster: description: The cluster where the transferred topic from type: string diff --git a/pkg/apis/rocketmq/v1alpha1/broker_types.go b/pkg/apis/rocketmq/v1alpha1/broker_types.go index 14f81f4d..0dfb7a36 100644 --- a/pkg/apis/rocketmq/v1alpha1/broker_types.go +++ b/pkg/apis/rocketmq/v1alpha1/broker_types.go @@ -29,8 +29,7 @@ import ( // +k8s:openapi-gen=true type BrokerSpec struct { // RocketMqName is the name of the RocketMQ cluster. - // +kubebuilder:default:="rocketmq" - RocketMqName string `json:"rocketMqName"` + RocketMqName string `json:"rocketMqName,omitempty"` // INSERT ADDITIONAL SPEC FIELDS - desired state of cluster // Important: Run "operator-sdk generate k8s" to regenerate code after modifying this file diff --git a/pkg/apis/rocketmq/v1alpha1/console_types.go b/pkg/apis/rocketmq/v1alpha1/console_types.go index 2ea7ed04..f8104c97 100644 --- a/pkg/apis/rocketmq/v1alpha1/console_types.go +++ b/pkg/apis/rocketmq/v1alpha1/console_types.go @@ -29,7 +29,7 @@ import ( // +k8s:openapi-gen=true type ConsoleSpec struct { // RocketMqName is the name of the RocketMQ cluster - RocketMqName string `json:"rocketMqName"` + RocketMqName string `json:"rocketMqName,omitempty"` // INSERT ADDITIONAL SPEC FIELDS - desired state of cluster // Important: Run "operator-sdk generate k8s" to regenerate code after modifying this file diff --git a/pkg/apis/rocketmq/v1alpha1/controller_types.go b/pkg/apis/rocketmq/v1alpha1/controller_types.go index 3e020d86..5710acce 100644 --- a/pkg/apis/rocketmq/v1alpha1/controller_types.go +++ b/pkg/apis/rocketmq/v1alpha1/controller_types.go @@ -29,7 +29,7 @@ import ( // +k8s:openapi-gen=true type ControllerSpec struct { // RocketMqName is the name of the RocketMQ cluster - RocketMqName string `json:"rocketMqName"` + RocketMqName string `json:"rocketMqName,omitempty"` // INSERT ADDITIONAL SPEC FIELDS - desired state of cluster // Important: Run "operator-sdk generate k8s" to regenerate code after modifying this file diff --git a/pkg/apis/rocketmq/v1alpha1/nameservice_types.go b/pkg/apis/rocketmq/v1alpha1/nameservice_types.go index ead78752..78742cbe 100644 --- a/pkg/apis/rocketmq/v1alpha1/nameservice_types.go +++ b/pkg/apis/rocketmq/v1alpha1/nameservice_types.go @@ -29,7 +29,7 @@ import ( // +k8s:openapi-gen=true type NameServiceSpec struct { // RocketMqName is the name of the RocketMQ cluster - RocketMqName string `json:"rocketMqName"` + RocketMqName string `json:"rocketMqName,omitempty"` // INSERT ADDITIONAL SPEC FIELDS - desired state of cluster // Important: Run "operator-sdk generate k8s" to regenerate code after modifying this file diff --git a/pkg/apis/rocketmq/v1alpha1/topictransfer_types.go b/pkg/apis/rocketmq/v1alpha1/topictransfer_types.go index 5d40f234..63e89c1e 100644 --- a/pkg/apis/rocketmq/v1alpha1/topictransfer_types.go +++ b/pkg/apis/rocketmq/v1alpha1/topictransfer_types.go @@ -28,7 +28,7 @@ import ( // +k8s:openapi-gen=true type TopicTransferSpec struct { // RocketMqName is the name of the RocketMQ cluster - RocketMqName string `json:"rocketMqName"` + RocketMqName string `json:"rocketMqName,omitempty"` // INSERT ADDITIONAL SPEC FIELDS - desired state of cluster // Important: Run "operator-sdk generate k8s" to regenerate code after modifying this file From 392e6bc8639f9dbb8d262a4b671dadd24faf6bcd Mon Sep 17 00:00:00 2001 From: drivebyer Date: Tue, 26 Sep 2023 23:13:48 +0800 Subject: [PATCH 6/9] add indexer for list option --- pkg/apis/rocketmq/v1alpha1/controller_types.go | 4 ++++ pkg/apis/rocketmq/v1alpha1/nameservice_types.go | 4 ++++ pkg/controller/broker/broker_controller.go | 9 ++------- pkg/controller/controller/dledger_controller.go | 13 +++++++++++++ .../nameservice/nameservice_controller.go | 13 +++++++++++++ 5 files changed, 36 insertions(+), 7 deletions(-) diff --git a/pkg/apis/rocketmq/v1alpha1/controller_types.go b/pkg/apis/rocketmq/v1alpha1/controller_types.go index 5710acce..50b8b04e 100644 --- a/pkg/apis/rocketmq/v1alpha1/controller_types.go +++ b/pkg/apis/rocketmq/v1alpha1/controller_types.go @@ -22,6 +22,10 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) +const ( + ControllerRocketMqNameIndexKey = "spec.rocketMqNameNamespaced" +) + // EDIT THIS FILE! THIS IS SCAFFOLDING FOR YOU TO OWN! // NOTE: json tags are required. Any new fields you add must have json tags for the fields to be serialized. diff --git a/pkg/apis/rocketmq/v1alpha1/nameservice_types.go b/pkg/apis/rocketmq/v1alpha1/nameservice_types.go index 78742cbe..72263640 100644 --- a/pkg/apis/rocketmq/v1alpha1/nameservice_types.go +++ b/pkg/apis/rocketmq/v1alpha1/nameservice_types.go @@ -22,6 +22,10 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) +const ( + NameServiceRocketMqNameIndexKey = ".spec.rocketMqNameNamespaced" +) + // EDIT THIS FILE! THIS IS SCAFFOLDING FOR YOU TO OWN! // NOTE: json tags are required. Any new fields you add must have json tags for the fields to be serialized. diff --git a/pkg/controller/broker/broker_controller.go b/pkg/controller/broker/broker_controller.go index 6965613c..eb19d305 100644 --- a/pkg/controller/broker/broker_controller.go +++ b/pkg/controller/broker/broker_controller.go @@ -25,8 +25,6 @@ import ( "strings" "time" - "k8s.io/apimachinery/pkg/fields" - "github.com/google/uuid" rocketmqv1alpha1 "github.com/apache/rocketmq-operator/pkg/apis/rocketmq/v1alpha1" @@ -348,11 +346,8 @@ func (r *ReconcileBroker) Reconcile(ctx context.Context, request reconcile.Reque func (r *ReconcileBroker) getControllerAccessPoint(namespace string, rocketMqName string) string { controllerList := &rocketmqv1alpha1.ControllerList{} - err := r.client.List(context.TODO(), controllerList, &client.ListOptions{ - Namespace: namespace, - FieldSelector: fields.SelectorFromSet(fields.Set{ - "spec.rocketMqName": rocketMqName, - }), + err := r.client.List(context.TODO(), controllerList, &client.MatchingFields{ + rocketmqv1alpha1.ControllerRocketMqNameIndexKey: rocketMqName + "-" + namespace, }) if err != nil { log.Error(err, "Failed to list controller.", "Controller.Namespace", namespace, "Controller.Name", rocketMqName) diff --git a/pkg/controller/controller/dledger_controller.go b/pkg/controller/controller/dledger_controller.go index 3e1818ce..bc6ed2e7 100644 --- a/pkg/controller/controller/dledger_controller.go +++ b/pkg/controller/controller/dledger_controller.go @@ -63,6 +63,19 @@ func newReconciler(mgr manager.Manager) reconcile.Reconciler { // add adds a new Controller to mgr with r as the reconcile.Reconciler func add(mgr manager.Manager, r reconcile.Reconciler) error { + err := mgr.GetCache().IndexField(context.TODO(), &rocketmqv1alpha1.Controller{}, rocketmqv1alpha1.ControllerRocketMqNameIndexKey, + func(rawObj client.Object) []string { + c, ok := rawObj.(*rocketmqv1alpha1.Controller) + if !ok { + return nil + } + return []string{c.Spec.RocketMqName + "-" + c.Namespace} + }, + ) + if err != nil { + return err + } + // Create a new controller c, err := controller.New("dledger-controller", mgr, controller.Options{Reconciler: r}) if err != nil { diff --git a/pkg/controller/nameservice/nameservice_controller.go b/pkg/controller/nameservice/nameservice_controller.go index b7381f72..2d62295d 100644 --- a/pkg/controller/nameservice/nameservice_controller.go +++ b/pkg/controller/nameservice/nameservice_controller.go @@ -68,6 +68,19 @@ func newReconciler(mgr manager.Manager) reconcile.Reconciler { // add adds a new Controller to mgr with r as the reconcile.Reconciler func add(mgr manager.Manager, r reconcile.Reconciler) error { + err := mgr.GetFieldIndexer().IndexField(context.TODO(), &rocketmqv1alpha1.NameService{}, rocketmqv1alpha1.NameServiceRocketMqNameIndexKey, + func(rawObj client.Object) []string { + n, ok := rawObj.(*rocketmqv1alpha1.NameService) + if !ok { + return nil + } + return []string{n.Spec.RocketMqName + "-" + n.Namespace} + }, + ) + if err != nil { + return err + } + // Create a new controller c, err := controller.New("nameservice-controller", mgr, controller.Options{Reconciler: r}) if err != nil { From 983b6a2a625286ea1fc2d371476dda6cd43ae861 Mon Sep 17 00:00:00 2001 From: drivebyer Date: Wed, 27 Sep 2023 11:40:45 +0800 Subject: [PATCH 7/9] eliminate share variable: nameserver address related --- pkg/controller/broker/broker_controller.go | 64 +++++++++---------- pkg/controller/console/console_controller.go | 11 ++-- .../nameservice/nameservice_controller.go | 30 +++------ .../topictransfer/topictransfer_controller.go | 2 +- pkg/share/share.go | 60 ++++++++++++++--- pkg/tool/resource_name.go | 4 ++ 6 files changed, 105 insertions(+), 66 deletions(-) diff --git a/pkg/controller/broker/broker_controller.go b/pkg/controller/broker/broker_controller.go index eb19d305..8c32f4bf 100644 --- a/pkg/controller/broker/broker_controller.go +++ b/pkg/controller/broker/broker_controller.go @@ -143,10 +143,11 @@ func (r *ReconcileBroker) Reconcile(ctx context.Context, request reconcile.Reque groupNum = broker.Status.Size } + var nameServersStr string if broker.Spec.NameServers == "" { // wait for name server ready when create broker cluster if nameServers is omitted for { - if share.IsNameServersStrInitialized { + if nameServersStr = share.GetNameServersStr(r.client, broker.Namespace, broker.Spec.RocketMqName); nameServersStr != "" { break } else { log.Info("Broker Waiting for name server ready...") @@ -154,7 +155,7 @@ func (r *ReconcileBroker) Reconcile(ctx context.Context, request reconcile.Reque } } } else { - share.NameServersStr = broker.Spec.NameServers + nameServersStr = broker.Spec.NameServers } if broker.Spec.ClusterMode == "" { @@ -171,7 +172,7 @@ func (r *ReconcileBroker) Reconcile(ctx context.Context, request reconcile.Reque reqLogger.Info("brokerGroupNum=" + strconv.Itoa(groupNum) + ", replicaPerGroup=" + strconv.Itoa(replicaPerGroup)) for brokerGroupIndex := 0; brokerGroupIndex < groupNum; brokerGroupIndex++ { reqLogger.Info("Check Broker cluster " + strconv.Itoa(brokerGroupIndex+1) + "/" + strconv.Itoa(groupNum)) - dep := r.getBrokerStatefulSet(broker, brokerGroupIndex, 0, controllerAccessPoint) + dep := r.getBrokerStatefulSet(broker, brokerGroupIndex, 0, controllerAccessPoint, nameServersStr) // Check if the statefulSet already exists, if not create a new one found := &appsv1.StatefulSet{} err = r.client.Get(context.TODO(), types.NamespacedName{Name: dep.Name, Namespace: dep.Namespace}, found) @@ -187,7 +188,7 @@ func (r *ReconcileBroker) Reconcile(ctx context.Context, request reconcile.Reque for replicaIndex := 1; replicaIndex <= replicaPerGroup; replicaIndex++ { reqLogger.Info("Check Replica Broker of cluster-" + strconv.Itoa(brokerGroupIndex) + " " + strconv.Itoa(replicaIndex) + "/" + strconv.Itoa(replicaPerGroup)) - replicaDep := r.getBrokerStatefulSet(broker, brokerGroupIndex, replicaIndex, controllerAccessPoint) + replicaDep := r.getBrokerStatefulSet(broker, brokerGroupIndex, replicaIndex, controllerAccessPoint, nameServersStr) err = r.client.Get(context.TODO(), types.NamespacedName{Name: replicaDep.Name, Namespace: replicaDep.Namespace}, found) if err != nil && errors.IsNotFound(err) { reqLogger.Info("Creating a new Replica Broker StatefulSet.", "StatefulSet.Namespace", replicaDep.Namespace, "StatefulSet.Name", replicaDep.Name) @@ -203,19 +204,18 @@ func (r *ReconcileBroker) Reconcile(ctx context.Context, request reconcile.Reque // Check for name server scaling if broker.Spec.AllowRestart { - // The following code will restart all brokers to update NAMESRV_ADDR env - if share.IsNameServersStrUpdated { - for brokerGroupIndex := 0; brokerGroupIndex < broker.Spec.Size; brokerGroupIndex++ { - brokerName := getBrokerName(broker, brokerGroupIndex) - // Update master broker - reqLogger.Info("Update Master Broker NAMESRV_ADDR of " + brokerName) - dep := r.getBrokerStatefulSet(broker, brokerGroupIndex, 0, controllerAccessPoint) - found := &appsv1.StatefulSet{} - err = r.client.Get(context.TODO(), types.NamespacedName{Name: dep.Name, Namespace: dep.Namespace}, found) - if err != nil { - reqLogger.Error(err, "Failed to get broker master StatefulSet of "+brokerName) - } else { - found.Spec.Template.Spec.Containers[0].Env[0].Value = share.NameServersStr + // The following code will restart all brokers to update NAMESRV_ADDR env if name server list is updated + for brokerGroupIndex := 0; brokerGroupIndex < broker.Spec.Size; brokerGroupIndex++ { + brokerName := getBrokerName(broker, brokerGroupIndex) + dep := r.getBrokerStatefulSet(broker, brokerGroupIndex, 0, controllerAccessPoint, nameServersStr) + found := &appsv1.StatefulSet{} + err = r.client.Get(context.TODO(), types.NamespacedName{Name: dep.Name, Namespace: dep.Namespace}, found) + if err != nil { + reqLogger.Error(err, "Failed to get broker master StatefulSet of "+brokerName) + } else { + // update if name server list is updated + if found.Spec.Template.Spec.Containers[0].Env[0].Value != nameServersStr { + found.Spec.Template.Spec.Containers[0].Env[0].Value = nameServersStr err = r.client.Update(context.TODO(), found) if err != nil { reqLogger.Error(err, "Failed to update NAMESRV_ADDR of master broker "+brokerName, "StatefulSet.Namespace", found.Namespace, "StatefulSet.Name", found.Name) @@ -224,18 +224,19 @@ func (r *ReconcileBroker) Reconcile(ctx context.Context, request reconcile.Reque } time.Sleep(time.Duration(cons.RestartBrokerPodIntervalInSecond) * time.Second) } - // Update replicas brokers - for replicaIndex := 1; replicaIndex <= replicaPerGroup; replicaIndex++ { - reqLogger.Info("Update Replica Broker NAMESRV_ADDR of " + brokerName + " " + strconv.Itoa(replicaIndex) + "/" + strconv.Itoa(replicaPerGroup)) - replicaDep := r.getBrokerStatefulSet(broker, brokerGroupIndex, replicaIndex, controllerAccessPoint) - replicaFound := &appsv1.StatefulSet{} - err = r.client.Get(context.TODO(), types.NamespacedName{Name: replicaDep.Name, Namespace: replicaDep.Namespace}, replicaFound) - if err != nil { - reqLogger.Error(err, "Failed to get broker replica StatefulSet of "+brokerName) - } else { + } + // Update replicas brokers + for replicaIndex := 1; replicaIndex <= replicaPerGroup; replicaIndex++ { + replicaDep := r.getBrokerStatefulSet(broker, brokerGroupIndex, replicaIndex, controllerAccessPoint, nameServersStr) + replicaFound := &appsv1.StatefulSet{} + err = r.client.Get(context.TODO(), types.NamespacedName{Name: replicaDep.Name, Namespace: replicaDep.Namespace}, replicaFound) + if err != nil { + reqLogger.Error(err, "Failed to get broker replica StatefulSet of "+brokerName) + } else { + if replicaFound.Spec.Template.Spec.Containers[0].Env[0].Value != nameServersStr { for index := range replicaFound.Spec.Template.Spec.Containers[0].Env { if cons.EnvNameServiceAddress == replicaFound.Spec.Template.Spec.Containers[0].Env[index].Name { - replicaFound.Spec.Template.Spec.Containers[0].Env[index].Value = share.NameServersStr + replicaFound.Spec.Template.Spec.Containers[0].Env[index].Value = nameServersStr break } } @@ -250,7 +251,6 @@ func (r *ReconcileBroker) Reconcile(ctx context.Context, request reconcile.Reque } } } - share.IsNameServersStrUpdated = false } // List the pods for this broker's statefulSet @@ -421,7 +421,7 @@ func getBrokerName(broker *rocketmqv1alpha1.Broker, brokerGroupIndex int) string } // getBrokerStatefulSet returns a broker StatefulSet object -func (r *ReconcileBroker) getBrokerStatefulSet(broker *rocketmqv1alpha1.Broker, brokerGroupIndex int, replicaIndex int, controllerAccessPoint string) *appsv1.StatefulSet { +func (r *ReconcileBroker) getBrokerStatefulSet(broker *rocketmqv1alpha1.Broker, brokerGroupIndex int, replicaIndex int, controllerAccessPoint, nameServersStr string) *appsv1.StatefulSet { ls := labelsForBroker(broker.Name) var a int32 = 1 var c = &a @@ -482,7 +482,7 @@ func (r *ReconcileBroker) getBrokerStatefulSet(broker *rocketmqv1alpha1.Broker, }, SecurityContext: getContainerSecurityContext(broker), ImagePullPolicy: broker.Spec.ImagePullPolicy, - Env: getENV(broker, replicaIndex, brokerGroupIndex, controllerAccessPoint), + Env: getENV(broker, replicaIndex, brokerGroupIndex, controllerAccessPoint, nameServersStr), Ports: []corev1.ContainerPort{{ ContainerPort: cons.BrokerVipContainerPort, Name: cons.BrokerVipContainerPortName, @@ -521,10 +521,10 @@ func (r *ReconcileBroker) getBrokerStatefulSet(broker *rocketmqv1alpha1.Broker, } -func getENV(broker *rocketmqv1alpha1.Broker, replicaIndex int, brokerGroupIndex int, controllerAccessPoint string) []corev1.EnvVar { +func getENV(broker *rocketmqv1alpha1.Broker, replicaIndex int, brokerGroupIndex int, controllerAccessPoint, nameServersStr string) []corev1.EnvVar { envs := []corev1.EnvVar{{ Name: cons.EnvNameServiceAddress, - Value: share.NameServersStr, + Value: nameServersStr, }, { Name: cons.EnvBrokerId, Value: strconv.Itoa(replicaIndex), diff --git a/pkg/controller/console/console_controller.go b/pkg/controller/console/console_controller.go index 63c3fe30..251c45b8 100644 --- a/pkg/controller/console/console_controller.go +++ b/pkg/controller/console/console_controller.go @@ -124,10 +124,11 @@ func (r *ReconcileConsole) Reconcile(ctx context.Context, request reconcile.Requ return reconcile.Result{}, err } + var nameserverStr string if instance.Spec.NameServers == "" { // wait for name server ready if nameServers is omitted for { - if share.IsNameServersStrInitialized { + if nameserverStr = share.GetNameServersStr(r.client, instance.Namespace, instance.Spec.RocketMqName); nameserverStr != "" { break } else { log.Info("Waiting for name server ready...") @@ -135,10 +136,10 @@ func (r *ReconcileConsole) Reconcile(ctx context.Context, request reconcile.Requ } } } else { - share.NameServersStr = instance.Spec.NameServers + nameserverStr = instance.Spec.NameServers } - consoleDeployment := newDeploymentForCR(instance) + consoleDeployment := newDeploymentForCR(instance, nameserverStr) // Set Console instance as the owner and controller if err := controllerutil.SetControllerReference(instance, consoleDeployment, r.scheme); err != nil { @@ -180,10 +181,10 @@ func (r *ReconcileConsole) Reconcile(ctx context.Context, request reconcile.Requ } // newDeploymentForCR returns a deployment pod with modifying the ENV -func newDeploymentForCR(cr *rocketmqv1alpha1.Console) *appsv1.Deployment { +func newDeploymentForCR(cr *rocketmqv1alpha1.Console, nameServersStr string) *appsv1.Deployment { env := corev1.EnvVar{ Name: "JAVA_OPTS", - Value: fmt.Sprintf("-Drocketmq.namesrv.addr=%s -Dcom.rocketmq.sendMessageWithVIPChannel=false", share.NameServersStr), + Value: fmt.Sprintf("-Drocketmq.namesrv.addr=%s -Dcom.rocketmq.sendMessageWithVIPChannel=false", nameServersStr), } dep := &appsv1.Deployment{ diff --git a/pkg/controller/nameservice/nameservice_controller.go b/pkg/controller/nameservice/nameservice_controller.go index 2d62295d..f4f997ca 100644 --- a/pkg/controller/nameservice/nameservice_controller.go +++ b/pkg/controller/nameservice/nameservice_controller.go @@ -30,7 +30,6 @@ import ( rocketmqv1alpha1 "github.com/apache/rocketmq-operator/pkg/apis/rocketmq/v1alpha1" cons "github.com/apache/rocketmq-operator/pkg/constants" - "github.com/apache/rocketmq-operator/pkg/share" appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" @@ -200,7 +199,7 @@ func (r *ReconcileNameService) updateNameServiceStatus(instance *rocketmqv1alpha for _, value := range hostIps { nameServerListStr = nameServerListStr + value + ":9876;" } - + newNameServerListStr := "" // Update status.NameServers if needed if !reflect.DeepEqual(hostIps, instance.Status.NameServers) { oldNameServerListStr := "" @@ -208,14 +207,15 @@ func (r *ReconcileNameService) updateNameServiceStatus(instance *rocketmqv1alpha oldNameServerListStr = oldNameServerListStr + value + ":9876;" } - share.NameServersStr = nameServerListStr[:len(nameServerListStr)-1] - reqLogger.Info("share.NameServersStr:" + share.NameServersStr) + newNameServerListStr = nameServerListStr[:len(nameServerListStr)-1] + reqLogger.Info("newNameServersStr:" + newNameServerListStr) + var isNameServersStrUpdated bool if len(oldNameServerListStr) <= cons.MinIpListLength { - oldNameServerListStr = share.NameServersStr - } else if len(share.NameServersStr) > cons.MinIpListLength { + oldNameServerListStr = newNameServerListStr + } else if len(newNameServerListStr) > cons.MinIpListLength { oldNameServerListStr = oldNameServerListStr[:len(oldNameServerListStr)-1] - share.IsNameServersStrUpdated = true + isNameServersStrUpdated = true } reqLogger.Info("oldNameServerListStr:" + oldNameServerListStr) @@ -229,7 +229,7 @@ func (r *ReconcileNameService) updateNameServiceStatus(instance *rocketmqv1alpha } // use admin tool to update broker config - if share.IsNameServersStrUpdated && (len(oldNameServerListStr) > cons.MinIpListLength) && (len(share.NameServersStr) > cons.MinIpListLength) { + if isNameServersStrUpdated && (len(oldNameServerListStr) > cons.MinIpListLength) && (len(newNameServerListStr) > cons.MinIpListLength) { // bash-4.4$ ./mqadmin clusterList -n 192.168.180.36:9876 // #Cluster Name #Broker Name #BID #Addr #Version #InTPS(LOAD) #OutTPS(LOAD) #PCWait(ms) #Hour #SPACE // broker broker-0 0 192.168.180.40:10911 V4_5_0 0.00(0,0ms) 0.00(0,0ms) 0 471030.34 -1.0000 @@ -261,8 +261,8 @@ func (r *ReconcileNameService) updateNameServiceStatus(instance *rocketmqv1alpha key := cons.ParamNameServiceAddress reqLogger.Info("Updating config " + key + " of cluster" + clusterName) - command := mqAdmin + " " + subCmd + " -c " + clusterName + " -k " + key + " -n " + oldNameServerListStr + " -v " + share.NameServersStr - cmd := exec.Command("sh", mqAdmin, subCmd, "-c", clusterName, "-k", key, "-n", oldNameServerListStr, "-v", share.NameServersStr) + command := mqAdmin + " " + subCmd + " -c " + clusterName + " -k " + key + " -n " + oldNameServerListStr + " -v " + newNameServerListStr + cmd := exec.Command("sh", mqAdmin, subCmd, "-c", clusterName, "-k", key, "-n", oldNameServerListStr, "-v", newNameServerListStr) output, err := cmd.Output() if err != nil { reqLogger.Error(err, "Update Broker config "+key+" failed of cluster "+clusterName+", command: "+command) @@ -277,16 +277,6 @@ func (r *ReconcileNameService) updateNameServiceStatus(instance *rocketmqv1alpha reqLogger.Info("NameServers IP " + strconv.Itoa(i) + ": " + value) } - runningNameServerNum := getRunningNameServersNum(podList.Items) - if runningNameServerNum == instance.Spec.Size { - share.IsNameServersStrInitialized = true - share.NameServersStr = nameServerListStr // reassign if operator restarts - } - - reqLogger.Info("Share variables", - "NameServersStr", share.NameServersStr, "IsNameServersStrUpdated", share.IsNameServersStrUpdated, - "IsNameServersStrInitialized", share.IsNameServersStrInitialized) - if requeue { return reconcile.Result{Requeue: true, RequeueAfter: time.Duration(cons.RequeueIntervalInSecond) * time.Second}, nil } diff --git a/pkg/controller/topictransfer/topictransfer_controller.go b/pkg/controller/topictransfer/topictransfer_controller.go index 5f212a47..8856b846 100644 --- a/pkg/controller/topictransfer/topictransfer_controller.go +++ b/pkg/controller/topictransfer/topictransfer_controller.go @@ -128,7 +128,7 @@ func (r *ReconcileTopicTransfer) Reconcile(ctx context.Context, request reconcil targetCluster := topicTransfer.Spec.TargetCluster sourceCluster := topicTransfer.Spec.SourceCluster - nameServer := strings.Split(share.NameServersStr, ";")[0] + nameServer := strings.Split(share.GetNameServersStr(r.client, topicTransfer.Namespace, topicTransfer.Spec.RocketMqName), ";")[0] if len(nameServer) < cons.MinIpListLength { reqLogger.Info("There is no available name server now thus the topic transfer process is terminated.") // terminate the transfer process diff --git a/pkg/share/share.go b/pkg/share/share.go index cada66be..4111ba69 100644 --- a/pkg/share/share.go +++ b/pkg/share/share.go @@ -18,13 +18,57 @@ // Package share defines some variables shared by different packages package share -var ( - // NameServersStr is the name server list - NameServersStr = "" +import ( + "context" + "sort" + "strings" - // IsNameServersStrUpdated is whether the name server list is updated - IsNameServersStrUpdated = false - - // IsNameServersStrInitialized is whether the name server list is initialized - IsNameServersStrInitialized = false + rocketmqv1alpha1 "github.com/apache/rocketmq-operator/pkg/apis/rocketmq/v1alpha1" + "github.com/apache/rocketmq-operator/pkg/tool" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/labels" + "sigs.k8s.io/controller-runtime/pkg/client" ) + +func GetNameServersStr(r client.Reader, namespace, rocketMqName string) string { + nameserviceList := &rocketmqv1alpha1.NameServiceList{} + err := r.List(context.TODO(), nameserviceList, &client.MatchingFields{ + rocketmqv1alpha1.NameServiceRocketMqNameIndexKey: rocketMqName + "-" + namespace, + }) + if err != nil { + return "" + } + if len(nameserviceList.Items) != 1 { + return "" + } + + nameservice := nameserviceList.Items[0] + labelSelector := labels.SelectorFromSet(tool.LabelsForNameService(nameservice.Name)) + listOps := &client.ListOptions{ + Namespace: nameservice.Namespace, + LabelSelector: labelSelector, + } + podList := &corev1.PodList{} + err = r.List(context.Background(), podList, listOps) + if err != nil { + return "" + } + if len(podList.Items) == 0 { + return "" + } + + var hostIps []string + for _, pod := range podList.Items { + if pod.Status.Phase == corev1.PodRunning && !strings.EqualFold(pod.Status.PodIP, "") { + hostIps = append(hostIps, pod.Status.PodIP) + } + } + sort.Strings(hostIps) + + nameServerListStr := "" + for _, value := range hostIps { + nameServerListStr = nameServerListStr + value + ":9876;" + } + + return nameServerListStr[:len(nameServerListStr)-1] +} diff --git a/pkg/tool/resource_name.go b/pkg/tool/resource_name.go index 7862e620..193f47e4 100644 --- a/pkg/tool/resource_name.go +++ b/pkg/tool/resource_name.go @@ -26,3 +26,7 @@ func BuildHeadlessSvcResourceName(name string) string { func BuildSvcResourceName(name string) string { return fmt.Sprintf("%s-svc", name) } + +func LabelsForNameService(name string) map[string]string { + return map[string]string{"app": "name_service", "name_service_cr": name} +} From d8b14eaae328fa29df69a0b23708c60cd7fdd101 Mon Sep 17 00:00:00 2001 From: drivebyer Date: Thu, 28 Sep 2023 21:10:14 +0800 Subject: [PATCH 8/9] fix potential panic --- pkg/share/share.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/pkg/share/share.go b/pkg/share/share.go index 4111ba69..5ff86b52 100644 --- a/pkg/share/share.go +++ b/pkg/share/share.go @@ -63,6 +63,10 @@ func GetNameServersStr(r client.Reader, namespace, rocketMqName string) string { hostIps = append(hostIps, pod.Status.PodIP) } } + if len(hostIps) == 0 { + return "" + } + sort.Strings(hostIps) nameServerListStr := "" From 4fa2c0a9048f2269982424bc815025a497c5784e Mon Sep 17 00:00:00 2001 From: drivebyer Date: Thu, 28 Sep 2023 21:28:01 +0800 Subject: [PATCH 9/9] execute make manifests --- .../crds/rocketmq.apache.org_brokers.yaml | 3 +++ .../crds/rocketmq.apache.org_consoles.yaml | 3 +++ .../crds/rocketmq.apache.org_controllers.yaml | 3 +++ .../crds/rocketmq.apache.org_nameservices.yaml | 3 +++ .../rocketmq.apache.org_topictransfers.yaml | 3 +++ .../templates/role_binding.yaml | 18 ++++++++---------- .../templates/service_account.yaml | 2 +- 7 files changed, 24 insertions(+), 11 deletions(-) diff --git a/charts/rocketmq-operator/crds/rocketmq.apache.org_brokers.yaml b/charts/rocketmq-operator/crds/rocketmq.apache.org_brokers.yaml index ffc0ad1b..e318d100 100644 --- a/charts/rocketmq-operator/crds/rocketmq.apache.org_brokers.yaml +++ b/charts/rocketmq-operator/crds/rocketmq.apache.org_brokers.yaml @@ -1203,6 +1203,9 @@ spec: to an implementation-defined value. More info: https://kubernetes.io/docs/concepts/configuration/manage-resources-containers/' type: object type: object + rocketMqName: + description: RocketMqName is the name of the RocketMQ cluster. + type: string scalePodName: description: The name of pod where the metadata from type: string diff --git a/charts/rocketmq-operator/crds/rocketmq.apache.org_consoles.yaml b/charts/rocketmq-operator/crds/rocketmq.apache.org_consoles.yaml index fab41e0c..e0394f3e 100644 --- a/charts/rocketmq-operator/crds/rocketmq.apache.org_consoles.yaml +++ b/charts/rocketmq-operator/crds/rocketmq.apache.org_consoles.yaml @@ -7880,6 +7880,9 @@ spec: nameServers: description: NameServers defines the name service list e.g. 192.168.1.1:9876;192.168.1.2:9876 type: string + rocketMqName: + description: RocketMqName is the name of the RocketMQ cluster + type: string required: - consoleDeployment type: object diff --git a/charts/rocketmq-operator/crds/rocketmq.apache.org_controllers.yaml b/charts/rocketmq-operator/crds/rocketmq.apache.org_controllers.yaml index 4f98506b..60d8864b 100644 --- a/charts/rocketmq-operator/crds/rocketmq.apache.org_controllers.yaml +++ b/charts/rocketmq-operator/crds/rocketmq.apache.org_controllers.yaml @@ -1178,6 +1178,9 @@ spec: to an implementation-defined value. More info: https://kubernetes.io/docs/concepts/configuration/manage-resources-containers/' type: object type: object + rocketMqName: + description: RocketMqName is the name of the RocketMQ cluster + type: string securityContext: description: Pod Security Context properties: diff --git a/charts/rocketmq-operator/crds/rocketmq.apache.org_nameservices.yaml b/charts/rocketmq-operator/crds/rocketmq.apache.org_nameservices.yaml index 315d85e9..a8dbaf93 100644 --- a/charts/rocketmq-operator/crds/rocketmq.apache.org_nameservices.yaml +++ b/charts/rocketmq-operator/crds/rocketmq.apache.org_nameservices.yaml @@ -1082,6 +1082,9 @@ spec: to an implementation-defined value. More info: https://kubernetes.io/docs/concepts/configuration/manage-resources-containers/' type: object type: object + rocketMqName: + description: RocketMqName is the name of the RocketMQ cluster + type: string securityContext: description: Pod Security Context properties: diff --git a/charts/rocketmq-operator/crds/rocketmq.apache.org_topictransfers.yaml b/charts/rocketmq-operator/crds/rocketmq.apache.org_topictransfers.yaml index 4a1f1dd8..f6e4c62a 100644 --- a/charts/rocketmq-operator/crds/rocketmq.apache.org_topictransfers.yaml +++ b/charts/rocketmq-operator/crds/rocketmq.apache.org_topictransfers.yaml @@ -36,6 +36,9 @@ spec: spec: description: TopicTransferSpec defines the desired state of TopicTransfer properties: + rocketMqName: + description: RocketMqName is the name of the RocketMQ cluster + type: string sourceCluster: description: The cluster where the transferred topic from type: string diff --git a/charts/rocketmq-operator/templates/role_binding.yaml b/charts/rocketmq-operator/templates/role_binding.yaml index fc8ce958..f330555b 100644 --- a/charts/rocketmq-operator/templates/role_binding.yaml +++ b/charts/rocketmq-operator/templates/role_binding.yaml @@ -13,17 +13,15 @@ # See the License for the specific language governing permissions and # limitations under the License. -apiVersion: rbac.authorization.k8s.io/v1 kind: ClusterRoleBinding +apiVersion: rbac.authorization.k8s.io/v1 metadata: - name: {{ include "rocketmq-operator.fullname" . }} - labels: - {{- include "rocketmq-operator.labels" . | nindent 4 }} + name: rocketmq-operator +subjects: +- kind: ServiceAccount + name: rocketmq-operator + namespace: default roleRef: - apiGroup: rbac.authorization.k8s.io kind: ClusterRole - name: {{ template "rocketmq-operator.fullname" . }} -subjects: - - name: {{ template "rocketmq-operator.serviceAccountName" . }} - namespace: {{ .Release.Namespace | quote }} - kind: ServiceAccount \ No newline at end of file + name: rocketmq-operator + apiGroup: rbac.authorization.k8s.io diff --git a/charts/rocketmq-operator/templates/service_account.yaml b/charts/rocketmq-operator/templates/service_account.yaml index 8f1a58ff..2dde8f9e 100644 --- a/charts/rocketmq-operator/templates/service_account.yaml +++ b/charts/rocketmq-operator/templates/service_account.yaml @@ -16,4 +16,4 @@ apiVersion: v1 kind: ServiceAccount metadata: - name: {{ template "rocketmq-operator.serviceAccountName" . }} + name: rocketmq-operator