Skip to content

Commit

Permalink
Wait for Deployment rollout to succeed
Browse files Browse the repository at this point in the history
  • Loading branch information
Starefossen committed Dec 30, 2023
1 parent dc2630d commit db4c23b
Show file tree
Hide file tree
Showing 4 changed files with 229 additions and 25 deletions.
82 changes: 57 additions & 25 deletions controllers/unleash_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/tools/record"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
Expand All @@ -30,6 +31,7 @@ import (
"github.com/nais/unleasherator/pkg/federation"
"github.com/nais/unleasherator/pkg/resources"
"github.com/nais/unleasherator/pkg/unleashclient"
"github.com/nais/unleasherator/pkg/utils"
)

const (
Expand All @@ -40,6 +42,9 @@ const (
)

var (
deploymentTimeout = 5 * time.Minute
requeueAfter = 1 * time.Hour

// unleashStatus is a Prometheus metric which will be used to expose the status of the Unleash instances
unleashStatus = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Expand Down Expand Up @@ -205,38 +210,38 @@ func (r *UnleashReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct

res, err = r.reconcileSecrets(ctx, unleash)
if err != nil {
if err := r.updateStatusReconcileFailed(ctx, unleash, err, "Failed to reconcile Secrets"); err != nil {
return ctrl.Result{}, err
if statsusErr := r.updateStatusReconcileFailed(ctx, unleash, err, "Failed to reconcile Secrets"); statsusErr != nil {
return ctrl.Result{}, statsusErr
}
return ctrl.Result{}, err
} else if res.Requeue {
return res, nil
}

res, err = r.reconcileDeployment(ctx, unleash)
res, err = r.reconcileNetworkPolicy(ctx, unleash)
if err != nil {
if err := r.updateStatusReconcileFailed(ctx, unleash, err, "Failed to reconcile Deployment"); err != nil {
return ctrl.Result{}, err
if statsusErr := r.updateStatusReconcileFailed(ctx, unleash, err, "Failed to reconcile NetworkPolicy"); statsusErr != nil {
return ctrl.Result{}, statsusErr
}
return ctrl.Result{}, err
} else if res.Requeue {
return res, nil
}

res, err = r.reconcileService(ctx, unleash)
res, err = r.reconcileDeployment(ctx, unleash)
if err != nil {
if err := r.updateStatusReconcileFailed(ctx, unleash, err, "Failed to reconcile Service"); err != nil {
return ctrl.Result{}, err
if statsusErr := r.updateStatusReconcileFailed(ctx, unleash, err, "Failed to reconcile Deployment"); statsusErr != nil {
return ctrl.Result{}, statsusErr
}
return ctrl.Result{}, err
} else if res.Requeue {
return res, nil
}

res, err = r.reconcileNetworkPolicy(ctx, unleash)
res, err = r.reconcileService(ctx, unleash)
if err != nil {
if err := r.updateStatusReconcileFailed(ctx, unleash, err, "Failed to reconcile NetworkPolicy"); err != nil {
return ctrl.Result{}, err
if statsusErr := r.updateStatusReconcileFailed(ctx, unleash, err, "Failed to reconcile Service"); statsusErr != nil {
return ctrl.Result{}, statsusErr
}
return ctrl.Result{}, err
} else if res.Requeue {
Expand All @@ -245,8 +250,8 @@ func (r *UnleashReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct

res, err = r.reconcileIngresses(ctx, unleash)
if err != nil {
if err := r.updateStatusReconcileFailed(ctx, unleash, err, "Failed to reconcile Ingresses"); err != nil {
return ctrl.Result{}, err
if statsusErr := r.updateStatusReconcileFailed(ctx, unleash, err, "Failed to reconcile Ingresses"); statsusErr != nil {
return ctrl.Result{}, statsusErr
}
return ctrl.Result{}, err
} else if res.Requeue {
Expand All @@ -255,8 +260,8 @@ func (r *UnleashReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct

res, err = r.reconcileServiceMonitor(ctx, unleash)
if err != nil {
if err := r.updateStatusReconcileFailed(ctx, unleash, err, "Failed to reconcile ServiceMonitor"); err != nil {
return ctrl.Result{}, err
if statsusErr := r.updateStatusReconcileFailed(ctx, unleash, err, "Failed to reconcile ServiceMonitor"); statsusErr != nil {
return ctrl.Result{}, statsusErr
}
return ctrl.Result{}, err
} else if res.Requeue {
Expand All @@ -270,6 +275,19 @@ func (r *UnleashReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct
return ctrl.Result{}, err
}

// Wait for Deployment rollout to finish before testing connection This is to
// avoid testing connection to the previous instance if the Deployment is not
// ready yet. Delay requeue to avoid tying up the reconciler since waiting is
// done in the same reconcile loop.
log.Info("Waiting for Deployment rollout to finish")
err = r.waitForDeployment(ctx, deploymentTimeout, req.NamespacedName)
if err != nil {
if statusErr := r.updateStatusReconcileFailed(ctx, unleash, err, "Deployment rollout timed out"); statusErr != nil {
return ctrl.Result{RequeueAfter: deploymentTimeout}, statusErr
}
return ctrl.Result{RequeueAfter: deploymentTimeout}, err
}

// Set the reconcile status of the Unleash instance to available
log.Info("Successfully reconciled Unleash resources")
if err = r.updateStatusReconcileSuccess(ctx, unleash); err != nil {
Expand All @@ -279,28 +297,27 @@ func (r *UnleashReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct
// Test connection to Unleash instance
stats, err := r.testConnection(unleash, ctx, log)
if err != nil {
if err := r.updateStatusConnectionFailed(ctx, unleash, err, "Failed to connect to Unleash instance"); err != nil {
return ctrl.Result{}, err
if statusErr := r.updateStatusConnectionFailed(ctx, unleash, err, "Failed to connect to Unleash instance"); statusErr != nil {
return ctrl.Result{}, statusErr
}

return ctrl.Result{}, err
}

// Set the connection status of the Unleash instance to available
log.Info("Successfully connected to Unleash instance", "version", stats.VersionOSS)
err = r.updateStatusConnectionSuccess(ctx, unleash, stats)
if err != nil {
return ctrl.Result{}, err
statusErr := r.updateStatusConnectionSuccess(ctx, unleash, stats)
if statusErr != nil {
return ctrl.Result{}, statusErr
}

// Publish the Unleash instance to federation if enabled
err = r.publish(ctx, unleash)
if err != nil {
if err = r.publish(ctx, unleash); err != nil {
return ctrl.Result{}, err
}

log.Info("Reconciliation of Unleash finished")
return ctrl.Result{RequeueAfter: 1 * time.Hour}, nil
return ctrl.Result{RequeueAfter: requeueAfter}, nil
}

// publish the Unleash instance to pubsub if federation is enabled.
Expand Down Expand Up @@ -530,8 +547,6 @@ func (r *UnleashReconciler) reconcileIngress(ctx context.Context, unleash *unlea
return ctrl.Result{}, nil
}

// update prometheus metrics

return ctrl.Result{}, nil
}

Expand Down Expand Up @@ -677,6 +692,23 @@ func (r *UnleashReconciler) reconcileService(ctx context.Context, unleash *unlea
return ctrl.Result{}, nil
}

// waitForDeployment will wait for the deployment to be available
func (r *UnleashReconciler) waitForDeployment(ctx context.Context, timeout time.Duration, key types.NamespacedName) error {
ctx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()

err := wait.PollUntilContextCancel(ctx, time.Second, true, func(ctx context.Context) (bool, error) {
deployment := &appsv1.Deployment{}
if err := r.Client.Get(ctx, key, deployment); err != nil {
return false, err
}

return utils.DeploymentIsReady(deployment), nil
})

return err
}

// testConnection will test the connection to the Unleash instance
func (r *UnleashReconciler) testConnection(unleash resources.UnleashInstance, ctx context.Context, log logr.Logger) (*unleashclient.InstanceAdminStatsResult, error) {
client, err := unleash.ApiClient(ctx, r.Client, r.OperatorNamespace)
Expand Down
88 changes: 88 additions & 0 deletions controllers/unleash_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,47 @@ func getUnleash(k8sClient client.Client, ctx context.Context, unleash *unleashv1
return unsetConditionLastTransitionTime(unleash.Status.Conditions), nil
}

func getDeployment(k8sClient client.Client, ctx context.Context, namespacedName client.ObjectKey, deployment *appsv1.Deployment) error {
return k8sClient.Get(ctx, namespacedName, deployment)
}

func setDeploymentStatusFailed(deployment *appsv1.Deployment) {
deployment.Status.Conditions = []appsv1.DeploymentCondition{
{
Type: appsv1.DeploymentProgressing,
Status: corev1.ConditionFalse,
Reason: "ProgressDeadlineExceeded",
Message: `Progress deadline exceeded.`,
LastUpdateTime: metav1.Time{
Time: time.Now(),
},
LastTransitionTime: metav1.Time{
Time: time.Now(),
},
},
}
}

func setDeploymentStatusAvailable(deployment *appsv1.Deployment) {
deployment.Status.Conditions = []appsv1.DeploymentCondition{
{
Type: appsv1.DeploymentProgressing,
Status: corev1.ConditionTrue,
Reason: "NewReplicaSetAvailable",
Message: `ReplicaSet "fake-abc123" has successfully progressed.`,
LastUpdateTime: metav1.Time{
Time: time.Now(),
},
LastTransitionTime: metav1.Time{
Time: time.Now(),
},
},
}
}

var _ = Describe("Unleash controller", func() {
deploymentTimeout = time.Second * 1

const (
UnleashNamespace = "default"
UnleashVersion = "v5.1.2"
Expand Down Expand Up @@ -76,6 +116,40 @@ var _ = Describe("Unleash controller", func() {
Expect(k8sClient.Delete(ctx, createdUnleash)).Should(Succeed())
})

It("Should fail if Deployment rollout is not complete", func() {
ctx := context.Background()

By("By creating a new Unleash")
unleash := unleashResource("test-unleash-rollout-fail", UnleashNamespace, unleashv1.UnleashSpec{

Database: unleashv1.UnleashDatabaseConfig{
URL: "postgres://unleash:unleash@unleash-postgres:5432/unleash?ssl=false",
},
})
Expect(k8sClient.Create(ctx, unleash)).Should(Succeed())

By("By faking Deployment status as failed")
createdDeployment := &appsv1.Deployment{}
Eventually(getDeployment, timeout, interval).WithArguments(k8sClient, ctx, unleash.NamespacedName(), createdDeployment).Should(Succeed())
setDeploymentStatusFailed(createdDeployment)
Expect(k8sClient.Status().Update(ctx, createdDeployment)).Should(Succeed())

By("By checking that Unleash is failed")
createdUnleash := &unleashv1.Unleash{ObjectMeta: unleash.ObjectMeta}
Eventually(getUnleash, timeout, interval).WithArguments(k8sClient, ctx, createdUnleash).Should(ContainElement(metav1.Condition{
Type: unleashv1.UnleashStatusConditionTypeReconciled,
Status: metav1.ConditionFalse,
Reason: "Reconciling",
Message: "Deployment rollout timed out",
}))
Expect(createdUnleash.IsReady()).To(BeFalse())
Expect(createdUnleash.Status.Reconciled).To(BeFalse())
Expect(createdUnleash.Status.Connected).To(BeFalse())

By("By cleaning up the Unleash")
Expect(k8sClient.Delete(ctx, createdUnleash)).Should(Succeed())
})

PIt("Should fail when it cannot connect to Unleash")

It("Should succeed when it can connect to Unleash", func() {
Expand All @@ -89,6 +163,13 @@ var _ = Describe("Unleash controller", func() {
})
Expect(k8sClient.Create(ctx, unleash)).Should(Succeed())

By("By faking Deployment status as available")
createdDeployment := &appsv1.Deployment{}
Eventually(getDeployment, timeout, interval).WithArguments(k8sClient, ctx, unleash.NamespacedName(), createdDeployment).Should(Succeed())
setDeploymentStatusAvailable(createdDeployment)
Expect(k8sClient.Status().Update(ctx, createdDeployment)).Should(Succeed())

By("By checking that Unleash is connected")
createdUnleash := &unleashv1.Unleash{ObjectMeta: unleash.ObjectMeta}
Eventually(getUnleash, timeout, interval).WithArguments(k8sClient, ctx, createdUnleash).Should(ContainElement(metav1.Condition{
Type: unleashv1.UnleashStatusConditionTypeConnected,
Expand Down Expand Up @@ -157,6 +238,13 @@ var _ = Describe("Unleash controller", func() {
})
Expect(k8sClient.Create(ctx, unleash)).Should(Succeed())

By("By faking Deployment status as available")
createdDeployment := &appsv1.Deployment{}
Eventually(getDeployment, timeout, interval).WithArguments(k8sClient, ctx, unleash.NamespacedName(), createdDeployment).Should(Succeed())
setDeploymentStatusAvailable(createdDeployment)
Expect(k8sClient.Status().Update(ctx, createdDeployment)).Should(Succeed())

By("By checking that Unleash is connected")
createdUnleash := &unleashv1.Unleash{ObjectMeta: unleash.ObjectMeta}
Eventually(getUnleash, timeout, interval).WithArguments(k8sClient, ctx, createdUnleash).Should(ContainElement(metav1.Condition{
Type: unleashv1.UnleashStatusConditionTypeConnected,
Expand Down
11 changes: 11 additions & 0 deletions pkg/utils/k8s.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"sync"

appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
"sigs.k8s.io/controller-runtime/pkg/client"

Expand Down Expand Up @@ -33,6 +34,16 @@ func SecretEnvVar(name, secretName, secretKey string) corev1.EnvVar {
}
}

// DeploymentIsReady returns true if the rollout of the given deployment has completed successfully.
func DeploymentIsReady(deployment *appsv1.Deployment) bool {
for _, condition := range deployment.Status.Conditions {
if condition.Type == appsv1.DeploymentProgressing && condition.Status == corev1.ConditionTrue && condition.Reason == "NewReplicaSetAvailable" {
return true
}
}
return false
}

// UpsertObject upserts the given object in Kubernetes. If the object already exists, it is updated.
// If the object does not exist, it is created. The object is identified by its key, which is extracted
// from the object itself. The function returns an error if the upsert operation fails.
Expand Down
Loading

0 comments on commit db4c23b

Please sign in to comment.