From 5457202125f2e96cbaa93a85cecfabfc48116537 Mon Sep 17 00:00:00 2001 From: SimonCqk Date: Mon, 7 Aug 2023 21:05:51 +0800 Subject: [PATCH] feat: re-setup master when master role restarted or recreated and recover dataset Signed-off-by: SimonCqk --- pkg/common/constants.go | 16 +- pkg/ctrl/master.go | 92 ++++++++- pkg/ctrl/master_test.go | 297 +++++++++++++++++++++++++++- pkg/ddc/base/setup.go | 1 + pkg/ddc/jindo/health_check.go | 16 +- pkg/ddc/jindocache/health_check.go | 12 +- pkg/ddc/jindofsx/dataset.go | 4 +- pkg/ddc/jindofsx/health_check.go | 12 +- pkg/ddc/jindofsx/ufs_internal.go | 2 +- pkg/utils/dataset.go | 14 +- pkg/utils/patch/patch_utils.go | 155 +++++++++++++++ pkg/utils/patch/patch_utils_test.go | 124 ++++++++++++ 12 files changed, 713 insertions(+), 32 deletions(-) create mode 100644 pkg/utils/patch/patch_utils.go create mode 100644 pkg/utils/patch/patch_utils_test.go diff --git a/pkg/common/constants.go b/pkg/common/constants.go index 7d937002a07..59c74e2d642 100644 --- a/pkg/common/constants.go +++ b/pkg/common/constants.go @@ -181,12 +181,12 @@ const ( const ( inject = ".fluid.io/inject" injectSidecar = ".sidecar" + inject - InjectServerless = "serverless" + inject // serverless.fluid.io/inject - InjectUnprivilegedFuseSidecar = "unprivileged" + injectSidecar // unprivileged.sidecar.fluid.io/inject - InjectCacheDir = "cachedir" + injectSidecar // cachedir.sidecar.fluid.io/inject - InjectWorkerSidecar = "worker" + injectSidecar // worker.sidecar.fluid.io/inject - InjectSidecarDone = "done" + injectSidecar // done.sidecar.fluid.io/inject - InjectAppPostStart = "app.poststart" + inject // app.poststart.fluid.io/inject + InjectServerless = "serverless" + inject // serverless.fluid.io/inject + InjectUnprivilegedFuseSidecar = "unprivileged" + injectSidecar // unprivileged.sidecar.fluid.io/inject + InjectCacheDir = "cachedir" + injectSidecar // cachedir.sidecar.fluid.io/inject + InjectWorkerSidecar = "worker" + injectSidecar // worker.sidecar.fluid.io/inject + InjectSidecarDone = "done" + injectSidecar // done.sidecar.fluid.io/inject + InjectAppPostStart = "app.poststart" + inject // app.poststart.fluid.io/inject InjectSidecarPostStart = "fuse.sidecar.poststart" + inject // fuse.sidecar.poststart.fluid.io/inject injectServerful = ".serverful" + inject @@ -200,4 +200,8 @@ const ( EnvServerlessPlatformVal = "VALUE_SERVERLESS_PLATFORM" EnvDisableApplicationController = "KEY_DISABLE_APP_CONTROLLER" EnvImagePullSecretsKey = "IMAGE_PULL_SECRETS" + + // AnnotationLatestMasterStartedTime annotates latest master containers start-at timestamp + // to identify master restarted or recreated by accidentally. + AnnotationLatestMasterStartedTime = "fluid.io/master-latest-started-at" ) diff --git a/pkg/ctrl/master.go b/pkg/ctrl/master.go index 1f2089af722..164cd136fa6 100644 --- a/pkg/ctrl/master.go +++ b/pkg/ctrl/master.go @@ -20,18 +20,21 @@ import ( "context" "fmt" "reflect" + "time" - datav1alpha1 "github.com/fluid-cloudnative/fluid/api/v1alpha1" - "github.com/fluid-cloudnative/fluid/pkg/ddc/base" - "github.com/fluid-cloudnative/fluid/pkg/utils" - "github.com/fluid-cloudnative/fluid/pkg/utils/kubeclient" appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" - "k8s.io/client-go/tools/record" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/tools/record" + + datav1alpha1 "github.com/fluid-cloudnative/fluid/api/v1alpha1" + "github.com/fluid-cloudnative/fluid/pkg/common" + "github.com/fluid-cloudnative/fluid/pkg/ddc/base" + "github.com/fluid-cloudnative/fluid/pkg/utils" + "github.com/fluid-cloudnative/fluid/pkg/utils/kubeclient" + "github.com/fluid-cloudnative/fluid/pkg/utils/patch" ) // CheckMasterHealthy checks the sts healthy with role @@ -53,6 +56,26 @@ func (e *Helper) CheckMasterHealthy(recorder record.EventRecorder, runtime base. } if healthy { + realHealthy, readyTime, err := e.recheckMasterHealthyByEachContainerStartedTime(sts) + if err != nil { + e.log.Error(err, "failed to recheck master healthy by each container started time") + return err + } + + if !readyTime.IsZero() && sts.Annotations[common.AnnotationLatestMasterStartedTime] != readyTime.Format(time.RFC3339) { + toPatch := patch.NewStrategicPatch().InsertAnnotation(common.AnnotationLatestMasterStartedTime, readyTime.Format(time.RFC3339)) + if err = e.client.Patch(context.Background(), sts, toPatch); err != nil { + return err + } + } + + if !realHealthy { + return fmt.Errorf("the master %s in %s is not ready. Master pod must have restarted since last ready time", + sts.Name, + sts.Namespace, + ) + } + cond := utils.NewRuntimeCondition(datav1alpha1.RuntimeMasterReady, "The master is ready.", "The master is ready.", corev1.ConditionTrue) _, oldCond := utils.GetRuntimeCondition(statusToUpdate.Conditions, cond.Type) @@ -116,3 +139,60 @@ func (e *Helper) CheckMasterHealthy(recorder record.EventRecorder, runtime base. return } + +func (e *Helper) StopBindDataSetInHealthCheck() bool { + ds, err := utils.GetDataset(e.client, e.runtimeInfo.GetName(), e.runtimeInfo.GetNamespace()) + if err != nil { + return false + } + // When current dataset is Failed, we expect to re-execute `Setup` in + // runtime controller to recover the corrupted dataset, and runtime controller + // will update dataset as Bound after successfully setup, that is, health check + // does not do the bind operation to dataset. + return ds.Status.Phase == datav1alpha1.FailedDatasetPhase +} + +func (e *Helper) recheckMasterHealthyByEachContainerStartedTime(masterSts *appsv1.StatefulSet) (ready bool, readyTime metav1.Time, err error) { + lastLatest := time.Time{} + + if len(masterSts.Annotations[common.AnnotationLatestMasterStartedTime]) > 0 { + lastLatest, err = time.Parse(time.RFC3339, masterSts.Annotations[common.AnnotationLatestMasterStartedTime]) + if err != nil { + e.log.Error(err, "failed to parse last latest master started time from annotation", + "value", masterSts.Annotations[common.AnnotationLatestMasterStartedTime]) + return true, metav1.Time{}, err + } + } + + selector, err := metav1.LabelSelectorAsSelector(masterSts.Spec.Selector) + if err != nil { + return false, metav1.Time{}, fmt.Errorf("error converting StatefulSet %s in namespace %s selector: %v", masterSts.Name, masterSts.Namespace, err) + } + pods, err := kubeclient.GetPodsForStatefulSet(e.client, masterSts, selector) + if err != nil { + return false, metav1.Time{}, err + } + if len(pods) < int(*masterSts.Spec.Replicas) { + return false, metav1.Time{}, nil + } + + latest := metav1.Time{} + for pi := range pods { + curLatest := findLatestContainersStartedTime(&pods[pi]) + if curLatest.After(latest.Time) { + latest = curLatest + } + } + return !lastLatest.IsZero() && lastLatest.Equal(latest.Time), latest, nil +} + +func findLatestContainersStartedTime(pod *corev1.Pod) metav1.Time { + latest := metav1.Time{} + for csi := range pod.Status.ContainerStatuses { + cs := &pod.Status.ContainerStatuses[csi] + if cs.State.Running != nil && cs.State.Running.StartedAt.After(latest.Time) { + latest = cs.State.Running.StartedAt + } + } + return latest +} diff --git a/pkg/ctrl/master_test.go b/pkg/ctrl/master_test.go index b51c1fd9ea8..d2d144eb368 100644 --- a/pkg/ctrl/master_test.go +++ b/pkg/ctrl/master_test.go @@ -18,18 +18,24 @@ package ctrl import ( "context" + "reflect" "testing" + "time" - datav1alpha1 "github.com/fluid-cloudnative/fluid/api/v1alpha1" - "github.com/fluid-cloudnative/fluid/pkg/ddc/base" - "github.com/fluid-cloudnative/fluid/pkg/utils/fake" appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/kubernetes/scheme" "k8s.io/client-go/tools/record" utilpointer "k8s.io/utils/pointer" + fakeclient "sigs.k8s.io/controller-runtime/pkg/client/fake" + + datav1alpha1 "github.com/fluid-cloudnative/fluid/api/v1alpha1" + "github.com/fluid-cloudnative/fluid/pkg/common" + "github.com/fluid-cloudnative/fluid/pkg/ddc/base" + "github.com/fluid-cloudnative/fluid/pkg/utils/fake" ) func TestCheckMasterHealthy(t *testing.T) { @@ -286,3 +292,288 @@ func TestCheckMasterHealthy(t *testing.T) { } } + +func Test_recheckMasterHealthyByEachContainerStartedTime(t *testing.T) { + startT1, _ := time.Parse(time.RFC3339, "2023-08-07T20:17:08+08:00") + startT2, _ := time.Parse(time.RFC3339, "2023-08-07T20:18:08+08:00") + + testCases := []struct { + name string + sts *appsv1.StatefulSet + pods []*corev1.Pod + expectedSts *appsv1.StatefulSet + ready bool + }{ + { + name: "check health at first shot", + sts: &appsv1.StatefulSet{ + ObjectMeta: metav1.ObjectMeta{ + Name: "ds-test-master", + Namespace: "default", + }, + Spec: appsv1.StatefulSetSpec{ + Replicas: utilpointer.Int32(1), + Selector: &metav1.LabelSelector{MatchLabels: map[string]string{ + "foo": "bar", + }}, + }, + }, + pods: []*corev1.Pod{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "ds-test-master-0", + Namespace: "default", + Labels: map[string]string{"foo": "bar"}, + }, + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Name: "main", + Image: "busybox", + }, + }, + }, + Status: corev1.PodStatus{ + Phase: corev1.PodRunning, + ContainerStatuses: []corev1.ContainerStatus{ + { + Name: "main", + State: corev1.ContainerState{Running: &corev1.ContainerStateRunning{ + StartedAt: metav1.NewTime(startT1), + }}, + }, + }, + }, + }, + }, + expectedSts: &appsv1.StatefulSet{ + ObjectMeta: metav1.ObjectMeta{ + Name: "ds-test-master", + Namespace: "default", + Annotations: map[string]string{ + common.AnnotationLatestMasterStartedTime: startT1.Format(time.RFC3339), + }, + }, + Spec: appsv1.StatefulSetSpec{ + Replicas: utilpointer.Int32(1), + Selector: &metav1.LabelSelector{MatchLabels: map[string]string{ + "foo": "bar", + }}, + }, + }, + ready: false, + }, + { + name: "check health when master recreated", + sts: &appsv1.StatefulSet{ + ObjectMeta: metav1.ObjectMeta{ + Name: "ds-test-master", + Namespace: "default", + Annotations: map[string]string{ + common.AnnotationLatestMasterStartedTime: startT1.Format(time.RFC3339), + }, + }, + Spec: appsv1.StatefulSetSpec{ + Replicas: utilpointer.Int32(1), + Selector: &metav1.LabelSelector{MatchLabels: map[string]string{ + "foo": "bar", + }}, + }, + }, + pods: []*corev1.Pod{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "ds-test-master-0", + Namespace: "default", + Labels: map[string]string{"foo": "bar"}, + }, + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Name: "main", + Image: "busybox", + }, + }, + }, + Status: corev1.PodStatus{ + Phase: corev1.PodPending, + ContainerStatuses: []corev1.ContainerStatus{ + { + Name: "main", + State: corev1.ContainerState{}, + }, + }, + }, + }, + }, + expectedSts: &appsv1.StatefulSet{ + ObjectMeta: metav1.ObjectMeta{ + Name: "ds-test-master", + Namespace: "default", + Annotations: map[string]string{ + common.AnnotationLatestMasterStartedTime: startT1.Format(time.RFC3339), + }, + }, + Spec: appsv1.StatefulSetSpec{ + Replicas: utilpointer.Int32(1), + Selector: &metav1.LabelSelector{MatchLabels: map[string]string{ + "foo": "bar", + }}, + }, + }, + ready: false, + }, + { + name: "check health when master just started", + sts: &appsv1.StatefulSet{ + ObjectMeta: metav1.ObjectMeta{ + Name: "ds-test-master", + Namespace: "default", + Annotations: map[string]string{ + common.AnnotationLatestMasterStartedTime: startT1.Format(time.RFC3339), + }, + }, + Spec: appsv1.StatefulSetSpec{ + Replicas: utilpointer.Int32(1), + Selector: &metav1.LabelSelector{MatchLabels: map[string]string{ + "foo": "bar", + }}, + }, + }, + pods: []*corev1.Pod{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "ds-test-master-0", + Namespace: "default", + Labels: map[string]string{"foo": "bar"}, + }, + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Name: "main", + Image: "busybox", + }, + }, + }, + Status: corev1.PodStatus{ + Phase: corev1.PodRunning, + ContainerStatuses: []corev1.ContainerStatus{ + { + Name: "main", + State: corev1.ContainerState{Running: &corev1.ContainerStateRunning{ + StartedAt: metav1.NewTime(startT2), + }}, + }, + }, + }, + }, + }, + expectedSts: &appsv1.StatefulSet{ + ObjectMeta: metav1.ObjectMeta{ + Name: "ds-test-master", + Namespace: "default", + Annotations: map[string]string{ + common.AnnotationLatestMasterStartedTime: startT2.Format(time.RFC3339), + }, + }, + Spec: appsv1.StatefulSetSpec{ + Replicas: utilpointer.Int32(1), + Selector: &metav1.LabelSelector{MatchLabels: map[string]string{ + "foo": "bar", + }}, + }, + }, + ready: true, + }, + { + name: "recheck health when master has started before", + sts: &appsv1.StatefulSet{ + ObjectMeta: metav1.ObjectMeta{ + Name: "ds-test-master", + Namespace: "default", + Annotations: map[string]string{ + common.AnnotationLatestMasterStartedTime: startT2.Format(time.RFC3339), + }, + }, + Spec: appsv1.StatefulSetSpec{ + Replicas: utilpointer.Int32(1), + Selector: &metav1.LabelSelector{MatchLabels: map[string]string{ + "foo": "bar", + }}, + }, + }, + pods: []*corev1.Pod{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "ds-test-master-0", + Namespace: "default", + Labels: map[string]string{"foo": "bar"}, + }, + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Name: "main", + Image: "busybox", + }, + }, + }, + Status: corev1.PodStatus{ + Phase: corev1.PodRunning, + ContainerStatuses: []corev1.ContainerStatus{ + { + Name: "main", + State: corev1.ContainerState{Running: &corev1.ContainerStateRunning{ + StartedAt: metav1.NewTime(startT2), + }}, + }, + }, + }, + }, + }, + expectedSts: &appsv1.StatefulSet{ + ObjectMeta: metav1.ObjectMeta{ + Name: "ds-test-master", + Namespace: "default", + Annotations: map[string]string{ + common.AnnotationLatestMasterStartedTime: startT2.Format(time.RFC3339), + }, + }, + Spec: appsv1.StatefulSetSpec{ + Replicas: utilpointer.Int32(1), + Selector: &metav1.LabelSelector{MatchLabels: map[string]string{ + "foo": "bar", + }}, + }, + }, + ready: true, + }, + } + + for _, testCase := range testCases { + t.Run(testCase.name, func(t *testing.T) { + fb := fakeclient.NewClientBuilder().WithScheme(scheme.Scheme) + if testCase.sts != nil { + fb.WithObjects(testCase.sts) + } + if len(testCase.pods) > 0 { + for pi := range testCase.pods { + fb.WithObjects(testCase.pods[pi]) + } + } + + h := Helper{client: fb.Build()} + ready, _, err := h.recheckMasterHealthyByEachContainerStartedTime(testCase.sts) + if err != nil { + t.Error(err) + } + if ready != testCase.ready { + t.Errorf("unexpected ready state, expected: %v, got: %v", testCase.ready, ready) + } + latestSts := appsv1.StatefulSet{} + _ = h.client.Get(context.Background(), types.NamespacedName{Name: testCase.sts.Name, Namespace: testCase.sts.Namespace}, &latestSts) + if !reflect.DeepEqual(latestSts.Annotations, testCase.expectedSts.Annotations) { + t.Errorf("unexpected updated sts, expected: %+v, got: %+v", testCase.expectedSts.Annotations, latestSts.Annotations) + } + }) + } +} diff --git a/pkg/ddc/base/setup.go b/pkg/ddc/base/setup.go index d60bec64b3f..26c4ba51cba 100644 --- a/pkg/ddc/base/setup.go +++ b/pkg/ddc/base/setup.go @@ -69,6 +69,7 @@ func (b *TemplateEngine) Setup(ctx cruntime.ReconcileRequestContext) (ready bool } if shouldCheckUFS { + b.Log.V(3).Info("start to prepare ufs", "runtime", ctx.Runtime.GetName()) err = b.Implement.PrepareUFS() if err != nil { b.Log.Error(err, "Failed to prepare ufs.") diff --git a/pkg/ddc/jindo/health_check.go b/pkg/ddc/jindo/health_check.go index 804ecf51059..4460ae53941 100644 --- a/pkg/ddc/jindo/health_check.go +++ b/pkg/ddc/jindo/health_check.go @@ -17,17 +17,20 @@ limitations under the License. package jindo import ( + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/util/retry" + data "github.com/fluid-cloudnative/fluid/api/v1alpha1" "github.com/fluid-cloudnative/fluid/pkg/common" "github.com/fluid-cloudnative/fluid/pkg/ctrl" fluiderrs "github.com/fluid-cloudnative/fluid/pkg/errors" "github.com/fluid-cloudnative/fluid/pkg/utils/kubeclient" - corev1 "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/types" - "k8s.io/client-go/util/retry" ) func (e *JindoEngine) CheckRuntimeHealthy() (err error) { + e.Log.V(1).Info("CheckRuntimeHealthy", "runtime name", e.runtimeInfo.GetName()) + // 1. Check the healthy of the master err = e.checkMasterHealthy() if err != nil { @@ -61,12 +64,19 @@ func (e *JindoEngine) CheckRuntimeHealthy() (err error) { return } + // In some conditions we expect not to update dataset as Bound in + // health check and let other components to handle it. + if e.StopBindDataSetInHealthCheck() { + return nil + } + // 4. Update the dataset as Bounded return e.UpdateDatasetStatus(data.BoundDatasetPhase) } // checkMasterHealthy checks the master healthy func (e *JindoEngine) checkMasterHealthy() (err error) { + e.Log.V(3).Info("checkMasterHealthy", "master name", e.getMasterName()) master, err := kubeclient.GetStatefulSet(e.Client, e.getMasterName(), e.namespace) if err != nil { return err diff --git a/pkg/ddc/jindocache/health_check.go b/pkg/ddc/jindocache/health_check.go index cc5192d1336..7c6de60f4ee 100644 --- a/pkg/ddc/jindocache/health_check.go +++ b/pkg/ddc/jindocache/health_check.go @@ -17,14 +17,15 @@ limitations under the License. package jindocache import ( + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/util/retry" + data "github.com/fluid-cloudnative/fluid/api/v1alpha1" "github.com/fluid-cloudnative/fluid/pkg/common" "github.com/fluid-cloudnative/fluid/pkg/ctrl" fluiderrs "github.com/fluid-cloudnative/fluid/pkg/errors" "github.com/fluid-cloudnative/fluid/pkg/utils/kubeclient" - corev1 "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/types" - "k8s.io/client-go/util/retry" ) func (e *JindoCacheEngine) CheckRuntimeHealthy() (err error) { @@ -68,6 +69,11 @@ func (e *JindoCacheEngine) CheckRuntimeHealthy() (err error) { } } + // In some conditions we expect not to update dataset as Bound in + // health check and let other components to handle it. + if e.StopBindDataSetInHealthCheck() { + return nil + } // 4. Update the dataset as Bounded return e.UpdateDatasetStatus(data.BoundDatasetPhase) } diff --git a/pkg/ddc/jindofsx/dataset.go b/pkg/ddc/jindofsx/dataset.go index eb7c97c8d8e..098a8790f41 100644 --- a/pkg/ddc/jindofsx/dataset.go +++ b/pkg/ddc/jindofsx/dataset.go @@ -63,11 +63,11 @@ func (e *JindoFSxEngine) UpdateDatasetStatus(phase datav1alpha1.DatasetPhase) (e "The ddc runtime is ready.", corev1.ConditionTrue) case datav1alpha1.FailedDatasetPhase: - cond = utils.NewDatasetCondition(datav1alpha1.DatasetReady, datav1alpha1.DatasetReadyReason, + cond = utils.NewDatasetCondition(datav1alpha1.DatasetNotReady, datav1alpha1.DatasetReadyReason, "The ddc runtime is not ready.", corev1.ConditionFalse) default: - cond = utils.NewDatasetCondition(datav1alpha1.DatasetReady, datav1alpha1.DatasetReadyReason, + cond = utils.NewDatasetCondition(datav1alpha1.DatasetUpdatingReason, datav1alpha1.DatasetReadyReason, "The ddc runtime is unknown.", corev1.ConditionFalse) } diff --git a/pkg/ddc/jindofsx/health_check.go b/pkg/ddc/jindofsx/health_check.go index bb91a3121cb..5302512fa07 100644 --- a/pkg/ddc/jindofsx/health_check.go +++ b/pkg/ddc/jindofsx/health_check.go @@ -17,14 +17,15 @@ limitations under the License. package jindofsx import ( + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/util/retry" + data "github.com/fluid-cloudnative/fluid/api/v1alpha1" "github.com/fluid-cloudnative/fluid/pkg/common" "github.com/fluid-cloudnative/fluid/pkg/ctrl" fluiderrs "github.com/fluid-cloudnative/fluid/pkg/errors" "github.com/fluid-cloudnative/fluid/pkg/utils/kubeclient" - corev1 "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/types" - "k8s.io/client-go/util/retry" ) func (e *JindoFSxEngine) CheckRuntimeHealthy() (err error) { @@ -67,6 +68,11 @@ func (e *JindoFSxEngine) CheckRuntimeHealthy() (err error) { return } } + // In some conditions we expect not to update dataset as Bound in + // health check and let other components to handle it. + if e.StopBindDataSetInHealthCheck() { + return nil + } // 4. Update the dataset as Bounded return e.UpdateDatasetStatus(data.BoundDatasetPhase) diff --git a/pkg/ddc/jindofsx/ufs_internal.go b/pkg/ddc/jindofsx/ufs_internal.go index 7112de84df0..f47d29b51f3 100644 --- a/pkg/ddc/jindofsx/ufs_internal.go +++ b/pkg/ddc/jindofsx/ufs_internal.go @@ -79,8 +79,8 @@ func (e *JindoFSxEngine) mountUFS() (err error) { // Iterate all the mount points, do mount if the mount point is not Fluid-native(e.g. Hostpath or PVC) for _, mount := range dataset.Spec.Mounts { - // first to check the path isMounted mounted := false + // first to check the path isMounted if strings.HasPrefix(mount.MountPoint, common.VolumeScheme.String()) { ufsVolumesPath := utils.UFSPathBuilder{}.GenLocalStoragePath(mount) mount.MountPoint = "local://" + ufsVolumesPath diff --git a/pkg/utils/dataset.go b/pkg/utils/dataset.go index 02ad375b781..4b89f19b6f7 100644 --- a/pkg/utils/dataset.go +++ b/pkg/utils/dataset.go @@ -20,12 +20,13 @@ import ( "fmt" "reflect" - datav1alpha1 "github.com/fluid-cloudnative/fluid/api/v1alpha1" - "github.com/fluid-cloudnative/fluid/pkg/common" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/util/retry" "sigs.k8s.io/controller-runtime/pkg/client" + + datav1alpha1 "github.com/fluid-cloudnative/fluid/api/v1alpha1" + "github.com/fluid-cloudnative/fluid/pkg/common" ) // GetDataset gets the dataset. @@ -45,8 +46,11 @@ func GetDataset(client client.Client, name, namespace string) (*datav1alpha1.Dat // checks the setup is done func IsSetupDone(dataset *datav1alpha1.Dataset) (done bool) { - index, _ := GetDatasetCondition(dataset.Status.Conditions, datav1alpha1.DatasetReady) - if index != -1 { + if dataset.Status.Phase != datav1alpha1.BoundDatasetPhase { + return false + } + index, cond := GetDatasetCondition(dataset.Status.Conditions, datav1alpha1.DatasetReady) + if index != -1 && cond.Status == corev1.ConditionTrue { // e.Log.V(1).Info("The runtime is already setup.") done = true } @@ -78,7 +82,7 @@ func IsTargetPathUnderFluidNativeMounts(targetPath string, dataset datav1alpha1. mPath := UFSPathBuilder{}.GenAlluxioMountPath(mount) - //TODO(xuzhihao): HasPrefix is not enough. + // TODO(xuzhihao): HasPrefix is not enough. // only for native scheme if !common.IsFluidNativeScheme(mount.MountPoint) { diff --git a/pkg/utils/patch/patch_utils.go b/pkg/utils/patch/patch_utils.go new file mode 100644 index 00000000000..5f8d1edde39 --- /dev/null +++ b/pkg/utils/patch/patch_utils.go @@ -0,0 +1,155 @@ +package patch + +import ( + "encoding/json" + "strings" + + "k8s.io/apimachinery/pkg/types" + "sigs.k8s.io/controller-runtime/pkg/client" +) + +const ( + NullHolder = "NULL_HOLDER" + NullHolderStr = "\"NULL_HOLDER\"" + NullHolderListStr = "[\"NULL_HOLDER\"]" +) + +var _ client.Patch = &Patch{} + +type Patch struct { + patchType types.PatchType + patchData patchData +} + +// Type implements Patch interface. +func (p *Patch) Type() types.PatchType { + return p.patchType +} + +// Data implements Patch interface. +func (p *Patch) Data(obj client.Object) ([]byte, error) { + return []byte(p.String()), nil +} + +func (p *Patch) IsMetaEmpty() bool { + return p.patchData.Meta == nil +} + +func (p *Patch) String() string { + js, _ := json.Marshal(&p.patchData) + // When remove finalizers, need to replace "[null]" to "null" + str := strings.Replace(string(js), NullHolderListStr, "null", -1) + return strings.Replace(str, NullHolderStr, "null", -1) +} + +// NewStrategicPatch returns a strategic-merge-patch type patch entity, which applies +// to build-in resource like pods, services... +func NewStrategicPatch() *Patch { + return &Patch{patchType: types.StrategicMergePatchType} +} + +// NewMergePatch returns a merge-patch type patch entity, which applies to CRDs like +// tfjobs, pytorchjobs... +func NewMergePatch() *Patch { + return &Patch{patchType: types.MergePatchType} +} + +// Append/Remove finalizer only works when patch type is StrategicMergePatchType. + +func (p *Patch) AddFinalizer(item string) *Patch { + if p.patchType != types.StrategicMergePatchType { + return p + } + + if p.patchData.Meta == nil { + p.patchData.Meta = &prunedMetadata{} + } + p.patchData.Meta.Finalizers = append(p.patchData.Meta.Finalizers, item) + return p +} + +func (p *Patch) RemoveFinalizer(item string) *Patch { + if p.patchType != types.StrategicMergePatchType { + return p + } + + if p.patchData.Meta == nil { + p.patchData.Meta = &prunedMetadata{} + } + p.patchData.Meta.DeletePrimitiveFinalizer = append(p.patchData.Meta.DeletePrimitiveFinalizer, item) + return p +} + +// OverrideFinalizer only works when patch type is MergePatchType. +func (p *Patch) OverrideFinalizer(items []string) *Patch { + if p.patchType != types.MergePatchType { + return p + } + + if p.patchData.Meta == nil { + p.patchData.Meta = &prunedMetadata{} + } + + if len(items) == 0 { + p.patchData.Meta.Finalizers = append(p.patchData.Meta.Finalizers, NullHolder) + } else { + p.patchData.Meta.Finalizers = items + } + + return p +} + +func (p *Patch) InsertLabel(key, value string) *Patch { + if p.patchData.Meta == nil { + p.patchData.Meta = &prunedMetadata{} + } + if p.patchData.Meta.Labels == nil { + p.patchData.Meta.Labels = make(map[string]string) + } + p.patchData.Meta.Labels[key] = value + return p +} + +func (p *Patch) DeleteLabel(key string) *Patch { + if p.patchData.Meta == nil { + p.patchData.Meta = &prunedMetadata{} + } + if p.patchData.Meta.Labels == nil { + p.patchData.Meta.Labels = make(map[string]string) + } + p.patchData.Meta.Labels[key] = NullHolder + return p +} + +func (p *Patch) InsertAnnotation(key, value string) *Patch { + if p.patchData.Meta == nil { + p.patchData.Meta = &prunedMetadata{} + } + if p.patchData.Meta.Annotations == nil { + p.patchData.Meta.Annotations = make(map[string]string) + } + p.patchData.Meta.Annotations[key] = value + return p +} + +func (p *Patch) DeleteAnnotation(key string) *Patch { + if p.patchData.Meta == nil { + p.patchData.Meta = &prunedMetadata{} + } + if p.patchData.Meta.Annotations == nil { + p.patchData.Meta.Annotations = make(map[string]string) + } + p.patchData.Meta.Annotations[key] = NullHolder + return p +} + +type patchData struct { + Meta *prunedMetadata `json:"metadata,omitempty"` +} + +type prunedMetadata struct { + Labels map[string]string `json:"labels,omitempty"` + Annotations map[string]string `json:"annotations,omitempty"` + Finalizers []string `json:"finalizers,omitempty"` + DeletePrimitiveFinalizer []string `json:"$deleteFromPrimitiveList/finalizers,omitempty"` +} diff --git a/pkg/utils/patch/patch_utils_test.go b/pkg/utils/patch/patch_utils_test.go new file mode 100644 index 00000000000..f57c6ca14e6 --- /dev/null +++ b/pkg/utils/patch/patch_utils_test.go @@ -0,0 +1,124 @@ +package patch + +import ( + "context" + "fmt" + "reflect" + "testing" + + "github.com/stretchr/testify/assert" + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/kubernetes/scheme" + fakeclient "sigs.k8s.io/controller-runtime/pkg/client/fake" +) + +func TestPatchBodyGeneration(t *testing.T) { + patchReq := NewStrategicPatch(). + AddFinalizer("new-finalizer"). + RemoveFinalizer("old-finalizer"). + InsertLabel("new-label", "foo1"). + DeleteLabel("old-label"). + InsertAnnotation("new-annotation", "foo2"). + DeleteAnnotation("old-annotation") + + expectedPatchBody := fmt.Sprintf(`{"metadata":{"labels":{"new-label":"foo1","old-label":null},"annotations":{"new-annotation":"foo2","old-annotation":null},"finalizers":["new-finalizer"],"$deleteFromPrimitiveList/finalizers":["old-finalizer"]}}`) + + if !reflect.DeepEqual(patchReq.String(), expectedPatchBody) { + t.Fatalf("Not equal: \n%s \n%s", expectedPatchBody, patchReq.String()) + } + +} + +func TestMergePatchBody(t *testing.T) { + finalizers := []string{"origin-finalizer"} + tests := []struct { + name string + mergePatchReq *Patch + expectedMergePatchBody string + }{ + { + name: "add finalizer", + mergePatchReq: NewMergePatch().OverrideFinalizer(append(finalizers, "new-finalizer")). + InsertLabel("new-label", "foo1").DeleteLabel("old-label"). + InsertAnnotation("new-annotation", "foo2").DeleteAnnotation("old-annotation"), + expectedMergePatchBody: fmt.Sprintf(`{"metadata":{"labels":{"new-label":"foo1","old-label":null},"annotations":{"new-annotation":"foo2","old-annotation":null},"finalizers":["origin-finalizer","new-finalizer"]}}`), + }, + { + name: "remove finalizer", + mergePatchReq: NewMergePatch().OverrideFinalizer(append(finalizers, "new-finalizer")). + InsertLabel("new-label", "foo1").DeleteLabel("old-label"). + InsertAnnotation("new-annotation", "foo2").DeleteAnnotation("old-annotation"), + expectedMergePatchBody: fmt.Sprintf(`{"metadata":{"labels":{"new-label":"foo1","old-label":null},"annotations":{"new-annotation":"foo2","old-annotation":null},"finalizers":["origin-finalizer","new-finalizer"]}}`), + }, + { + name: "add finalizer", + mergePatchReq: NewMergePatch().OverrideFinalizer(make([]string, 0)). + InsertLabel("new-label", "foo1").DeleteLabel("old-label"). + InsertAnnotation("new-annotation", "foo2").DeleteAnnotation("old-annotation"), + expectedMergePatchBody: fmt.Sprintf(`{"metadata":{"labels":{"new-label":"foo1","old-label":null},"annotations":{"new-annotation":"foo2","old-annotation":null},"finalizers":null}}`), + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if !reflect.DeepEqual(tt.mergePatchReq.String(), tt.expectedMergePatchBody) { + t.Fatalf("Not equal: \n%s \n%s", tt.expectedMergePatchBody, tt.mergePatchReq.String()) + } + }) + } + +} + +func TestPodPatchOperations(t *testing.T) { + p := &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "hello", + Namespace: "default", + }, + Spec: v1.PodSpec{ + Containers: []v1.Container{ + { + Name: "main", + Image: "busybox", + }, + }, + }, + } + nsName := types.NamespacedName{Name: "hello", Namespace: "default"} + + fake := fakeclient.NewFakeClientWithScheme(scheme.Scheme) + _ = fake.Create(context.Background(), p) + + patch := NewStrategicPatch() + patch.InsertLabel("foo", "bar") + err := fake.Patch(context.Background(), p, patch) + assert.Nil(t, err) + err = fake.Get(context.Background(), nsName, p) + assert.Nil(t, err) + assert.Equal(t, p.Labels["foo"], "bar") + patch.InsertLabel("foo", "bar1") + err = fake.Patch(context.Background(), p, patch) + assert.Nil(t, err) + err = fake.Get(context.Background(), nsName, p) + assert.Nil(t, err) + assert.Equal(t, p.Labels["foo"], "bar1") + + patch = NewStrategicPatch() + patch.InsertAnnotation("foo", "bar") + err = fake.Patch(context.Background(), p, patch) + assert.Nil(t, err) + err = fake.Get(context.Background(), nsName, p) + assert.Nil(t, err) + assert.Equal(t, p.Annotations["foo"], "bar") + + patch = NewStrategicPatch() + patch.AddFinalizer("finalizer") + err = fake.Patch(context.Background(), p, patch) + assert.Nil(t, err) + err = fake.Get(context.Background(), nsName, p) + assert.Nil(t, err) + assert.Equal(t, len(p.Finalizers), 1) + assert.Equal(t, p.Finalizers[0], "finalizer") +}