Skip to content

Commit

Permalink
feat(main): add test for automq
Browse files Browse the repository at this point in the history
Signed-off-by: cuisongliu <[email protected]>
  • Loading branch information
cuisongliu committed Oct 19, 2024
1 parent d832db1 commit 4850268
Show file tree
Hide file tree
Showing 6 changed files with 148 additions and 13 deletions.
139 changes: 129 additions & 10 deletions e2e/automq_cluster_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,12 @@ package e2e

import (
"context"
"fmt"
"github.com/cuisongliu/automq-operator/internal/controller"
v2 "k8s.io/api/apps/v1"
"k8s.io/apimachinery/pkg/labels"
"os"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
"time"

infrav1beta1 "github.com/cuisongliu/automq-operator/api/v1beta1"
Expand All @@ -33,7 +38,7 @@ import (
var _ = Describe("automq_controller", func() {
Context("automq_controller cr tests", func() {
ctx := context.Background()
namespaceName := "automq-operator"
namespaceName := "automq-cr"
namespace := &v1.Namespace{
ObjectMeta: metav1.ObjectMeta{
Name: namespaceName,
Expand All @@ -44,22 +49,23 @@ var _ = Describe("automq_controller", func() {
automq.Name = "automq-s1"
automq.Namespace = namespaceName
automq.Spec.ClusterID = "rZdE0DjZSrqy96PXrMUZVw"

BeforeEach(func() {
It("create cr namespace", func() {
By("Creating the Namespace to perform the tests")
err := k8sClient.Create(ctx, namespace)
Expect(err).To(Not(HaveOccurred()))
By("Setting the NAMESPACE_NAME ENV VAR which stores the Operand image")
err = os.Setenv("NAMESPACE_NAME", namespaceName)
Expect(err).To(Not(HaveOccurred()))
})
It("Update Endpoint", func() {
It("create cr", func() {
By("get minio ip and port")
minioService := &v1.Service{}
err := k8sClient.Get(ctx, client.ObjectKey{Namespace: "minio", Name: "minio"}, minioService)
Expect(err).To(Not(HaveOccurred()))
ip := minioService.Spec.ClusterIP
By("creating the custom resource for the automq")
err := k8sClient.Get(ctx, client.ObjectKeyFromObject(automq), automq)
err = k8sClient.Get(ctx, client.ObjectKeyFromObject(automq), automq)
if err != nil && errors.IsNotFound(err) {
// Let's mock our custom resource at the same way that we would
// apply on the cluster the manifest under config/samples
automq.Spec.S3.Endpoint = "http://minio.minio.svc.cluster.local:9000"
automq.Spec.S3.Endpoint = fmt.Sprintf("http://%s:9000", ip)
automq.Spec.S3.Bucket = "ko3"
automq.Spec.S3.AccessKeyID = "admin"
automq.Spec.S3.SecretAccessKey = "minio123"
Expand All @@ -72,7 +78,120 @@ var _ = Describe("automq_controller", func() {
Expect(err).To(Not(HaveOccurred()))
}
})
AfterEach(func() {
It("should successfully reconcile the resource", func() {
By("Reconciling the created resource")
controllerReconciler := &controller.AutoMQReconciler{
Client: k8sClient,
Scheme: k8sClient.Scheme(),
Finalizer: "apps.cuisongliu.com/automq.finalizer",
MountTZ: true,
}
_, err := controllerReconciler.Reconcile(ctx, reconcile.Request{
NamespacedName: client.ObjectKeyFromObject(automq),
})
Expect(err).NotTo(HaveOccurred())
})
It("get automq deployment", func() {
ctx := context.Background()
Eventually(func() error {
deployment := &v2.DeploymentList{}
labelSelector := labels.Set(map[string]string{"app.kubernetes.io/owner-by": "automq", "app.kubernetes.io/instance": automq.Name}).AsSelector()
err := k8sClient.List(ctx, deployment, &client.ListOptions{Namespace: automq.Namespace, LabelSelector: labelSelector})
if err != nil {
return err
}
if len(deployment.Items) != 4 {
return fmt.Errorf("expected 4 deploy, found %d", len(deployment.Items))
}
for i, deploy := range deployment.Items {
if deploy.Status.ReadyReplicas != 1 {
return fmt.Errorf("expected deploy %d ready replicas to be 1, got '%d'", i, deploy.Status.ReadyReplicas)
}
}
return nil
}, "60s", "1s").Should(Succeed())
})
It("check controller status", func() {
ctx := context.Background()
Eventually(func() error {
podList := &v1.PodList{}
labelSelector := labels.Set(map[string]string{"app.kubernetes.io/owner-by": "automq", "app.kubernetes.io/instance": automq.Name, "app.kubernetes.io/role": "controller"}).AsSelector()
err := k8sClient.List(ctx, podList, &client.ListOptions{Namespace: automq.Namespace, LabelSelector: labelSelector})
if err != nil {
return err
}
if len(podList.Items) != 3 {
return fmt.Errorf("expected 3 pod, found %d", len(podList.Items))
}
for i, pod := range podList.Items {
if pod.Status.Phase != v1.PodRunning {
return fmt.Errorf("expected pod %d phase to be 'Running', got '%s'", i, pod.Status.Phase)
}
}
return nil
}, "60s", "1s").Should(Succeed())
})

It("check broker status", func() {
ctx := context.Background()
Eventually(func() error {
podList := &v1.PodList{}
labelSelector := labels.Set(map[string]string{"app.kubernetes.io/owner-by": "automq", "app.kubernetes.io/instance": automq.Name, "app.kubernetes.io/role": "broker"}).AsSelector()
err := k8sClient.List(ctx, podList, &client.ListOptions{Namespace: automq.Namespace, LabelSelector: labelSelector})
if err != nil {
return err
}
if len(podList.Items) != 1 {
return fmt.Errorf("expected 1 pod, found %d", len(podList.Items))
}
for i, pod := range podList.Items {
if pod.Status.Phase != v1.PodRunning {
return fmt.Errorf("expected pod %d phase to be 'Running', got '%s'", i, pod.Status.Phase)
}
}
return nil
}, "60s", "1s").Should(Succeed())
})
It("check automq status", func() {
ctx := context.Background()
Eventually(func() error {
automq := &infrav1beta1.AutoMQ{}
err := k8sClient.Get(ctx, client.ObjectKeyFromObject(automq), automq)
if err != nil {
return err
}
if automq.Status.Phase != infrav1beta1.AutoMQReady {
return fmt.Errorf("expected automq phase to be 'Ready', got '%s'", automq.Status.Phase)
}
if automq.Status.ControllerReplicas != 3 {
return fmt.Errorf("expected automq controller replicas to be 3, got '%d'", automq.Status.ControllerReplicas)
}
if automq.Status.BrokerReplicas != 1 {
return fmt.Errorf("expected automq broker replicas to be 1, got '%d'", automq.Status.BrokerReplicas)
}
if automq.Status.ReadyPods != 4 {
return fmt.Errorf("expected automq ready pods to be 4, got '%d'", automq.Status.ReadyPods)
}
if len(automq.Status.ControllerAddresses) != 3 {
return fmt.Errorf("expected automq controller addresses to have 3 elements, got '%d'", len(automq.Status.ControllerAddresses))
}
if automq.Status.BootstrapInternalAddress == "" {
return fmt.Errorf("expected automq bootstrap internal address to be set")
}
bootstrapService := fmt.Sprintf("%s.%s.svc:%d", "automq-"+"broker-bootstrap", automq.Namespace, 9092)
if automq.Status.BootstrapInternalAddress != bootstrapService {
return fmt.Errorf("expected automq bootstrap internal address to be '%s', got '%s'", bootstrapService, automq.Status.BootstrapInternalAddress)
}
for i, address := range automq.Status.ControllerAddresses {
controllerService := fmt.Sprintf("%d@%s.%s.svc:%d", i, "automq-controller-"+fmt.Sprintf("%d", i), automq.Namespace, 9093)
if address != controllerService {
return fmt.Errorf("expected automq controller address %d to be '%s', got '%s'", i, controllerService, address)
}
}
return nil
}, "60s", "1s").Should(Succeed())
})
It("clean automq", func() {
By("removing the custom resource for the automq")
found := &infrav1beta1.AutoMQ{}
err := k8sClient.Get(ctx, client.ObjectKeyFromObject(automq), found)
Expand Down
2 changes: 2 additions & 0 deletions e2e/automq_cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,8 @@ var _ = BeforeSuite(func() {
go func() {
controller.APIRegistry(context.Background(), k8sClient)
}()
err = os.Setenv("NAMESPACE_NAME", "default")
Expect(err).To(Not(HaveOccurred()))
})

var _ = AfterSuite(func() {
Expand Down
1 change: 1 addition & 0 deletions internal/controller/automq_apis.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package controller

import (
"context"

"github.com/gin-gonic/gin"
v1 "k8s.io/api/core/v1"
ctrl "sigs.k8s.io/controller-runtime"
Expand Down
8 changes: 7 additions & 1 deletion internal/controller/automq_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,8 +172,9 @@ func (r *AutoMQReconciler) reconcile(ctx context.Context, obj client.Object) (ct
r.syncKafkaBootstrapService,
}
var ifRunning bool
for _, fn := range pipelines {
for index, fn := range pipelines {
ifRunning = fn(ctx, automq)
log.V(1).Info("update reconcile controller automq", "ifRunning", ifRunning, "index", index)
if !ifRunning {
break
}
Expand Down Expand Up @@ -220,6 +221,7 @@ func (r *AutoMQReconciler) syncStatus(ctx context.Context, automq *infrav1beta1.
}

func (r *AutoMQReconciler) s3Service(ctx context.Context, obj *infrav1beta1.AutoMQ) bool {
log := log.FromContext(ctx)
conditionType := "SyncS3ServiceReady"
sg, err := storage.NewBucket(storage.Config{
Type: "s3",
Expand All @@ -229,6 +231,7 @@ func (r *AutoMQReconciler) s3Service(ctx context.Context, obj *infrav1beta1.Auto
Endpoint: obj.Spec.S3.Endpoint,
})
if err != nil {
log.Error(err, "Failed to create S3 Bucket interface for the custom resource", "name", obj.Name, "namespace", obj.Namespace)
meta.SetStatusCondition(&obj.Status.Conditions, metav1.Condition{
Type: conditionType,
Status: metav1.ConditionFalse,
Expand All @@ -240,6 +243,7 @@ func (r *AutoMQReconciler) s3Service(ctx context.Context, obj *infrav1beta1.Auto
}
err = sg.MkBucket(ctx, obj.Spec.S3.Bucket)
if err != nil && !strings.Contains(err.Error(), "BucketAlready") {
log.Error(err, "Failed to create S3 Bucket interface for the custom resource", "name", obj.Name, "namespace", obj.Namespace)
meta.SetStatusCondition(&obj.Status.Conditions, metav1.Condition{
Type: conditionType,
Status: metav1.ConditionFalse,
Expand All @@ -264,6 +268,7 @@ func (r *AutoMQReconciler) scriptConfigmap(ctx context.Context, obj *infrav1beta
conditionType := "SyncConfigmapReady"
data, err := defaults.Asset("defaults/up.sh")
if err != nil {
log.Error(err, "Failed to create script configmap for the custom resource", "name", obj.Name, "namespace", obj.Namespace)
meta.SetStatusCondition(&obj.Status.Conditions, metav1.Condition{
Type: conditionType,
Status: metav1.ConditionFalse,
Expand Down Expand Up @@ -291,6 +296,7 @@ func (r *AutoMQReconciler) scriptConfigmap(ctx context.Context, obj *infrav1beta
log.V(1).Info("create or update configmap by AutoMQ", "OperationResult", change)
return nil
}); err != nil {
log.Error(err, "Failed to create script configmap for the custom resource", "name", obj.Name, "namespace", obj.Namespace)
meta.SetStatusCondition(&obj.Status.Conditions, metav1.Condition{
Type: conditionType,
Status: metav1.ConditionFalse,
Expand Down
5 changes: 4 additions & 1 deletion internal/controller/automq_controller_b.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ func (r *AutoMQReconciler) syncBrokerScale(ctx context.Context, obj *infrav1beta

func (r *AutoMQReconciler) syncBrokers(ctx context.Context, obj *infrav1beta1.AutoMQ) bool {
conditionType := "SyncBrokerReady"

log := log.FromContext(ctx)
// 1. sync pvc
// 2. sync deploy
// 3. sync svc
Expand All @@ -115,6 +115,7 @@ func (r *AutoMQReconciler) syncBrokers(ctx context.Context, obj *infrav1beta1.Au
Reason: "BrokerPVCReconciling",
Message: fmt.Sprintf("Failed to create pvc for the custom resource (%s): (%s)", obj.Name, err),
})
log.Error(err, "Failed to create pvc for the custom resource", "name", obj.Name, "role", brokerRole)
return true
}
if err := r.syncBrokerService(ctx, obj, int32(i)); err != nil {
Expand All @@ -125,6 +126,7 @@ func (r *AutoMQReconciler) syncBrokers(ctx context.Context, obj *infrav1beta1.Au
Reason: "BrokerServiceReconciling",
Message: fmt.Sprintf("Failed to create service for the custom resource (%s): (%s)", obj.Name, err),
})
log.Error(err, "Failed to create service for the custom resource", "name", obj.Name, "role", brokerRole)
return true
}
if err := r.syncBrokerDeploy(ctx, obj, int32(i)); err != nil {
Expand All @@ -135,6 +137,7 @@ func (r *AutoMQReconciler) syncBrokers(ctx context.Context, obj *infrav1beta1.Au
Reason: "BrokerSTSReconciling",
Message: fmt.Sprintf("Failed to create deploy for the custom resource (%s): (%s)", obj.Name, err),
})
log.Error(err, "Failed to create deploy for the custom resource", "name", obj.Name, "role", brokerRole)
return true
}
}
Expand Down
6 changes: 5 additions & 1 deletion internal/controller/automq_controller_c.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package controller
import (
"context"
"fmt"
"sigs.k8s.io/controller-runtime/pkg/log"
"strings"

"github.com/aws/aws-sdk-go-v2/aws"
Expand Down Expand Up @@ -90,7 +91,7 @@ func (r *AutoMQReconciler) syncControllersScale(ctx context.Context, obj *infrav

func (r *AutoMQReconciler) syncControllers(ctx context.Context, obj *infrav1beta1.AutoMQ) bool {
conditionType := "SyncControllerReady"

log := log.FromContext(ctx)
// 1. sync pvc
// 2. sync deploy
// 3. sync svc
Expand All @@ -105,6 +106,7 @@ func (r *AutoMQReconciler) syncControllers(ctx context.Context, obj *infrav1beta
Reason: "ControllerPVCReconciling",
Message: fmt.Sprintf("Failed to create pvc for the custom resource (%s): (%s)", obj.Name, err),
})
log.Error(err, "Failed to create pvc for the custom resource (%s)", obj.Name, "role", controllerRole)
return true
}
if err := r.syncControllerDeploy(ctx, obj, int32(i)); err != nil {
Expand All @@ -115,6 +117,7 @@ func (r *AutoMQReconciler) syncControllers(ctx context.Context, obj *infrav1beta
Reason: "ControllerSTSReconciling",
Message: fmt.Sprintf("Failed to create deploy for the custom resource (%s): (%s)", obj.Name, err),
})
log.Error(err, "Failed to create deploy for the custom resource (%s)", obj.Name, "role", controllerRole)
return true
}
if err := r.syncControllerService(ctx, obj, int32(i)); err != nil {
Expand All @@ -125,6 +128,7 @@ func (r *AutoMQReconciler) syncControllers(ctx context.Context, obj *infrav1beta
Reason: "ControllerServiceReconciling",
Message: fmt.Sprintf("Failed to create service for the custom resource (%s): (%s)", obj.Name, err),
})
log.Error(err, "Failed to create service for the custom resource (%s)", obj.Name, "role", controllerRole)
return true
}
}
Expand Down

0 comments on commit 4850268

Please sign in to comment.