diff --git a/pkg/common/constants.go b/pkg/common/constants.go index 4d9a4dafc3e..50ac59188cd 100644 --- a/pkg/common/constants.go +++ b/pkg/common/constants.go @@ -186,4 +186,8 @@ const ( JobPolicy = "fluid.io/jobPolicy" CronPolicy = "cron" + + // AnnotationLatestMasterStartedTime annotates latest master containers start-at timestamp + // to identify master restarted or recreated by accidentally. + AnnotationLatestMasterStartedTime = "fluid.io/master-latest-started-at" ) diff --git a/pkg/ctrl/master.go b/pkg/ctrl/master.go index 1f2089af722..023a29b1aac 100644 --- a/pkg/ctrl/master.go +++ b/pkg/ctrl/master.go @@ -20,18 +20,23 @@ import ( "context" "fmt" "reflect" + "time" - datav1alpha1 "github.com/fluid-cloudnative/fluid/api/v1alpha1" - "github.com/fluid-cloudnative/fluid/pkg/ddc/base" - "github.com/fluid-cloudnative/fluid/pkg/utils" - "github.com/fluid-cloudnative/fluid/pkg/utils/kubeclient" appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" "k8s.io/client-go/tools/record" + "sigs.k8s.io/controller-runtime/pkg/client" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/types" + + datav1alpha1 "github.com/fluid-cloudnative/fluid/api/v1alpha1" + "github.com/fluid-cloudnative/fluid/pkg/common" + "github.com/fluid-cloudnative/fluid/pkg/ddc/base" + "github.com/fluid-cloudnative/fluid/pkg/utils" + "github.com/fluid-cloudnative/fluid/pkg/utils/kubeclient" + "github.com/fluid-cloudnative/fluid/pkg/utils/patch" ) // CheckMasterHealthy checks the sts healthy with role @@ -53,6 +58,26 @@ func (e *Helper) CheckMasterHealthy(recorder record.EventRecorder, runtime base. } if healthy { + realHealthy, readyTime, err := e.recheckMasterHealthyByEachContainerStartedTime(sts, e.client) + if err != nil { + e.log.Error(err, "failed to recheck master healthy by each container started time") + return err + } + + if !readyTime.IsZero() && sts.Annotations[common.AnnotationLatestMasterStartedTime] != readyTime.Format(time.RFC3339) { + toPatch := patch.NewStrategicPatch().InsertAnnotation(common.AnnotationLatestMasterStartedTime, readyTime.Format(time.RFC3339)) + if err = e.client.Patch(context.Background(), sts, toPatch); err != nil { + return err + } + } + + if !realHealthy { + return fmt.Errorf("the master %s in %s is not ready. Master pod must have restarted since last ready time", + sts.Name, + sts.Namespace, + ) + } + cond := utils.NewRuntimeCondition(datav1alpha1.RuntimeMasterReady, "The master is ready.", "The master is ready.", corev1.ConditionTrue) _, oldCond := utils.GetRuntimeCondition(statusToUpdate.Conditions, cond.Type) @@ -116,3 +141,60 @@ func (e *Helper) CheckMasterHealthy(recorder record.EventRecorder, runtime base. return } + +func (e *Helper) StopBindDataSetInHealthCheck() bool { + ds, err := utils.GetDataset(e.client, e.runtimeInfo.GetName(), e.runtimeInfo.GetNamespace()) + if err != nil { + return false + } + // When current dataset is Failed, we expect to re-execute `Setup` in + // runtime controller to recover the corrupted dataset, and runtime controller + // will update dataset as Bound after successfully setup, that is, health check + // does not do the bind operation to dataset. + return ds.Status.Phase == datav1alpha1.FailedDatasetPhase +} + +func (e *Helper) recheckMasterHealthyByEachContainerStartedTime(masterSts *appsv1.StatefulSet, c client.Client) (ready bool, readyTime metav1.Time, err error) { + lastLatest := time.Time{} + + if len(masterSts.Annotations[common.AnnotationLatestMasterStartedTime]) > 0 { + lastLatest, err = time.Parse(time.RFC3339, masterSts.Annotations[common.AnnotationLatestMasterStartedTime]) + if err != nil { + e.log.Error(err, "failed to parse last latest master started time from annotation", + "value", masterSts.Annotations[common.AnnotationLatestMasterStartedTime]) + return true, metav1.Time{}, err + } + } + + selector, err := metav1.LabelSelectorAsSelector(masterSts.Spec.Selector) + if err != nil { + return false, metav1.Time{}, fmt.Errorf("error converting StatefulSet %s in namespace %s selector: %v", masterSts.Name, masterSts.Namespace, err) + } + pods, err := kubeclient.GetPodsForStatefulSet(c, masterSts, selector) + if err != nil { + return false, metav1.Time{}, err + } + if len(pods) < int(*masterSts.Spec.Replicas) { + return false, metav1.Time{}, nil + } + + latest := metav1.Time{} + for pi := range pods { + curLatest := findLatestContainersStartedTime(&pods[pi]) + if curLatest.After(latest.Time) { + latest = curLatest + } + } + return !lastLatest.IsZero() && lastLatest.Equal(latest.Time), latest, nil +} + +func findLatestContainersStartedTime(pod *corev1.Pod) metav1.Time { + latest := metav1.Time{} + for csi := range pod.Status.ContainerStatuses { + cs := &pod.Status.ContainerStatuses[csi] + if cs.State.Running != nil && cs.State.Running.StartedAt.After(latest.Time) { + latest = cs.State.Running.StartedAt + } + } + return latest +} diff --git a/pkg/ctrl/master_test.go b/pkg/ctrl/master_test.go index b51c1fd9ea8..f604a0effca 100644 --- a/pkg/ctrl/master_test.go +++ b/pkg/ctrl/master_test.go @@ -20,9 +20,6 @@ import ( "context" "testing" - datav1alpha1 "github.com/fluid-cloudnative/fluid/api/v1alpha1" - "github.com/fluid-cloudnative/fluid/pkg/ddc/base" - "github.com/fluid-cloudnative/fluid/pkg/utils/fake" appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -30,6 +27,10 @@ import ( "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/tools/record" utilpointer "k8s.io/utils/pointer" + + datav1alpha1 "github.com/fluid-cloudnative/fluid/api/v1alpha1" + "github.com/fluid-cloudnative/fluid/pkg/ddc/base" + "github.com/fluid-cloudnative/fluid/pkg/utils/fake" ) func TestCheckMasterHealthy(t *testing.T) { @@ -286,3 +287,57 @@ func TestCheckMasterHealthy(t *testing.T) { } } + +func Test_recheckMasterHealthyByEachContainerStartedTime(t *testing.T) { + // testCases := []struct { + // name string + // sts *appsv1.StatefulSet + // pods []*corev1.Pod + // expectedSts *appsv1.StatefulSet + // ready bool + // }{ + // { + // name: "check health at first shot", + // sts: &appsv1.StatefulSet{ + // ObjectMeta: metav1.ObjectMeta{ + // Name: "ds-test-master", + // Namespace: "default", + // }, + // Spec: appsv1.StatefulSetSpec{ + // Replicas: utilpointer.Int32(1), + // Selector: &metav1.LabelSelector{MatchLabels: map[string]string{ + // "foo": "bar", + // }}, + // }, + // }, + // pods: []*corev1.Pod{ + // { + // ObjectMeta: metav1.ObjectMeta{ + // Name: "ds-test-master-0", + // Namespace: "default", + // Labels: map[string]string{"foo": "bar"}, + // }, + // Spec: corev1.PodSpec{ + // Containers: []corev1.Container{ + // { + // Name: "main", + // Image: "busybox", + // }, + // }, + // }, + // Status: corev1.PodStatus{ + // Phase: corev1.PodRunning, + // ContainerStatuses: []corev1.ContainerStatus{ + // { + // Name: "main", + // State: corev1.ContainerState{Running: &corev1.ContainerStateRunning{ + // StartedAt: metav1.Time{time.Parse(time.RFC3339,)} + // }}, + // }, + // }, + // }, + // }, + // }, + // }, + // } +} diff --git a/pkg/ddc/base/setup.go b/pkg/ddc/base/setup.go index d60bec64b3f..26c4ba51cba 100644 --- a/pkg/ddc/base/setup.go +++ b/pkg/ddc/base/setup.go @@ -69,6 +69,7 @@ func (b *TemplateEngine) Setup(ctx cruntime.ReconcileRequestContext) (ready bool } if shouldCheckUFS { + b.Log.V(3).Info("start to prepare ufs", "runtime", ctx.Runtime.GetName()) err = b.Implement.PrepareUFS() if err != nil { b.Log.Error(err, "Failed to prepare ufs.") diff --git a/pkg/ddc/jindo/health_check.go b/pkg/ddc/jindo/health_check.go index 804ecf51059..fb4692445fc 100644 --- a/pkg/ddc/jindo/health_check.go +++ b/pkg/ddc/jindo/health_check.go @@ -17,17 +17,20 @@ limitations under the License. package jindo import ( + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/util/retry" + data "github.com/fluid-cloudnative/fluid/api/v1alpha1" "github.com/fluid-cloudnative/fluid/pkg/common" "github.com/fluid-cloudnative/fluid/pkg/ctrl" fluiderrs "github.com/fluid-cloudnative/fluid/pkg/errors" "github.com/fluid-cloudnative/fluid/pkg/utils/kubeclient" - corev1 "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/types" - "k8s.io/client-go/util/retry" ) func (e *JindoEngine) CheckRuntimeHealthy() (err error) { + e.Log.V(3).Info("CheckRuntimeHealthy", "runtime name", e.runtimeInfo.GetName()) + // 1. Check the healthy of the master err = e.checkMasterHealthy() if err != nil { @@ -61,12 +64,19 @@ func (e *JindoEngine) CheckRuntimeHealthy() (err error) { return } + // In some conditions we expect not to update dataset as Bound in + // health check and let other components to handle it. + if e.StopBindDataSetInHealthCheck() { + return nil + } + // 4. Update the dataset as Bounded return e.UpdateDatasetStatus(data.BoundDatasetPhase) } // checkMasterHealthy checks the master healthy func (e *JindoEngine) checkMasterHealthy() (err error) { + e.Log.V(3).Info("checkMasterHealthy", "master name", e.getMasterName()) master, err := kubeclient.GetStatefulSet(e.Client, e.getMasterName(), e.namespace) if err != nil { return err diff --git a/pkg/ddc/jindocache/health_check.go b/pkg/ddc/jindocache/health_check.go index cc5192d1336..7c6de60f4ee 100644 --- a/pkg/ddc/jindocache/health_check.go +++ b/pkg/ddc/jindocache/health_check.go @@ -17,14 +17,15 @@ limitations under the License. package jindocache import ( + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/util/retry" + data "github.com/fluid-cloudnative/fluid/api/v1alpha1" "github.com/fluid-cloudnative/fluid/pkg/common" "github.com/fluid-cloudnative/fluid/pkg/ctrl" fluiderrs "github.com/fluid-cloudnative/fluid/pkg/errors" "github.com/fluid-cloudnative/fluid/pkg/utils/kubeclient" - corev1 "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/types" - "k8s.io/client-go/util/retry" ) func (e *JindoCacheEngine) CheckRuntimeHealthy() (err error) { @@ -68,6 +69,11 @@ func (e *JindoCacheEngine) CheckRuntimeHealthy() (err error) { } } + // In some conditions we expect not to update dataset as Bound in + // health check and let other components to handle it. + if e.StopBindDataSetInHealthCheck() { + return nil + } // 4. Update the dataset as Bounded return e.UpdateDatasetStatus(data.BoundDatasetPhase) } diff --git a/pkg/ddc/jindofsx/dataset.go b/pkg/ddc/jindofsx/dataset.go index eb7c97c8d8e..098a8790f41 100644 --- a/pkg/ddc/jindofsx/dataset.go +++ b/pkg/ddc/jindofsx/dataset.go @@ -63,11 +63,11 @@ func (e *JindoFSxEngine) UpdateDatasetStatus(phase datav1alpha1.DatasetPhase) (e "The ddc runtime is ready.", corev1.ConditionTrue) case datav1alpha1.FailedDatasetPhase: - cond = utils.NewDatasetCondition(datav1alpha1.DatasetReady, datav1alpha1.DatasetReadyReason, + cond = utils.NewDatasetCondition(datav1alpha1.DatasetNotReady, datav1alpha1.DatasetReadyReason, "The ddc runtime is not ready.", corev1.ConditionFalse) default: - cond = utils.NewDatasetCondition(datav1alpha1.DatasetReady, datav1alpha1.DatasetReadyReason, + cond = utils.NewDatasetCondition(datav1alpha1.DatasetUpdatingReason, datav1alpha1.DatasetReadyReason, "The ddc runtime is unknown.", corev1.ConditionFalse) } diff --git a/pkg/ddc/jindofsx/health_check.go b/pkg/ddc/jindofsx/health_check.go index bb91a3121cb..5302512fa07 100644 --- a/pkg/ddc/jindofsx/health_check.go +++ b/pkg/ddc/jindofsx/health_check.go @@ -17,14 +17,15 @@ limitations under the License. package jindofsx import ( + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/util/retry" + data "github.com/fluid-cloudnative/fluid/api/v1alpha1" "github.com/fluid-cloudnative/fluid/pkg/common" "github.com/fluid-cloudnative/fluid/pkg/ctrl" fluiderrs "github.com/fluid-cloudnative/fluid/pkg/errors" "github.com/fluid-cloudnative/fluid/pkg/utils/kubeclient" - corev1 "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/types" - "k8s.io/client-go/util/retry" ) func (e *JindoFSxEngine) CheckRuntimeHealthy() (err error) { @@ -67,6 +68,11 @@ func (e *JindoFSxEngine) CheckRuntimeHealthy() (err error) { return } } + // In some conditions we expect not to update dataset as Bound in + // health check and let other components to handle it. + if e.StopBindDataSetInHealthCheck() { + return nil + } // 4. Update the dataset as Bounded return e.UpdateDatasetStatus(data.BoundDatasetPhase) diff --git a/pkg/ddc/jindofsx/ufs_internal.go b/pkg/ddc/jindofsx/ufs_internal.go index 7112de84df0..f47d29b51f3 100644 --- a/pkg/ddc/jindofsx/ufs_internal.go +++ b/pkg/ddc/jindofsx/ufs_internal.go @@ -79,8 +79,8 @@ func (e *JindoFSxEngine) mountUFS() (err error) { // Iterate all the mount points, do mount if the mount point is not Fluid-native(e.g. Hostpath or PVC) for _, mount := range dataset.Spec.Mounts { - // first to check the path isMounted mounted := false + // first to check the path isMounted if strings.HasPrefix(mount.MountPoint, common.VolumeScheme.String()) { ufsVolumesPath := utils.UFSPathBuilder{}.GenLocalStoragePath(mount) mount.MountPoint = "local://" + ufsVolumesPath diff --git a/pkg/utils/dataset.go b/pkg/utils/dataset.go index c43f77400b3..ca919eb629d 100644 --- a/pkg/utils/dataset.go +++ b/pkg/utils/dataset.go @@ -20,12 +20,13 @@ import ( "fmt" "reflect" - datav1alpha1 "github.com/fluid-cloudnative/fluid/api/v1alpha1" - "github.com/fluid-cloudnative/fluid/pkg/common" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/util/retry" "sigs.k8s.io/controller-runtime/pkg/client" + + datav1alpha1 "github.com/fluid-cloudnative/fluid/api/v1alpha1" + "github.com/fluid-cloudnative/fluid/pkg/common" ) // GetDataset gets the dataset. @@ -45,8 +46,11 @@ func GetDataset(client client.Client, name, namespace string) (*datav1alpha1.Dat // checks the setup is done func IsSetupDone(dataset *datav1alpha1.Dataset) (done bool) { - index, _ := GetDatasetCondition(dataset.Status.Conditions, datav1alpha1.DatasetReady) - if index != -1 { + if dataset.Status.Phase != datav1alpha1.BoundDatasetPhase { + return false + } + index, cond := GetDatasetCondition(dataset.Status.Conditions, datav1alpha1.DatasetReady) + if index != -1 && cond.Status == corev1.ConditionTrue { // e.Log.V(1).Info("The runtime is already setup.") done = true } @@ -78,7 +82,7 @@ func IsTargetPathUnderFluidNativeMounts(targetPath string, dataset datav1alpha1. mPath := UFSPathBuilder{}.GenAlluxioMountPath(mount, dataset.Spec.Mounts) - //TODO(xuzhihao): HasPrefix is not enough. + // TODO(xuzhihao): HasPrefix is not enough. // only for native scheme if !common.IsFluidNativeScheme(mount.MountPoint) { diff --git a/pkg/utils/patch/patch_utils.go b/pkg/utils/patch/patch_utils.go new file mode 100644 index 00000000000..5f8d1edde39 --- /dev/null +++ b/pkg/utils/patch/patch_utils.go @@ -0,0 +1,155 @@ +package patch + +import ( + "encoding/json" + "strings" + + "k8s.io/apimachinery/pkg/types" + "sigs.k8s.io/controller-runtime/pkg/client" +) + +const ( + NullHolder = "NULL_HOLDER" + NullHolderStr = "\"NULL_HOLDER\"" + NullHolderListStr = "[\"NULL_HOLDER\"]" +) + +var _ client.Patch = &Patch{} + +type Patch struct { + patchType types.PatchType + patchData patchData +} + +// Type implements Patch interface. +func (p *Patch) Type() types.PatchType { + return p.patchType +} + +// Data implements Patch interface. +func (p *Patch) Data(obj client.Object) ([]byte, error) { + return []byte(p.String()), nil +} + +func (p *Patch) IsMetaEmpty() bool { + return p.patchData.Meta == nil +} + +func (p *Patch) String() string { + js, _ := json.Marshal(&p.patchData) + // When remove finalizers, need to replace "[null]" to "null" + str := strings.Replace(string(js), NullHolderListStr, "null", -1) + return strings.Replace(str, NullHolderStr, "null", -1) +} + +// NewStrategicPatch returns a strategic-merge-patch type patch entity, which applies +// to build-in resource like pods, services... +func NewStrategicPatch() *Patch { + return &Patch{patchType: types.StrategicMergePatchType} +} + +// NewMergePatch returns a merge-patch type patch entity, which applies to CRDs like +// tfjobs, pytorchjobs... +func NewMergePatch() *Patch { + return &Patch{patchType: types.MergePatchType} +} + +// Append/Remove finalizer only works when patch type is StrategicMergePatchType. + +func (p *Patch) AddFinalizer(item string) *Patch { + if p.patchType != types.StrategicMergePatchType { + return p + } + + if p.patchData.Meta == nil { + p.patchData.Meta = &prunedMetadata{} + } + p.patchData.Meta.Finalizers = append(p.patchData.Meta.Finalizers, item) + return p +} + +func (p *Patch) RemoveFinalizer(item string) *Patch { + if p.patchType != types.StrategicMergePatchType { + return p + } + + if p.patchData.Meta == nil { + p.patchData.Meta = &prunedMetadata{} + } + p.patchData.Meta.DeletePrimitiveFinalizer = append(p.patchData.Meta.DeletePrimitiveFinalizer, item) + return p +} + +// OverrideFinalizer only works when patch type is MergePatchType. +func (p *Patch) OverrideFinalizer(items []string) *Patch { + if p.patchType != types.MergePatchType { + return p + } + + if p.patchData.Meta == nil { + p.patchData.Meta = &prunedMetadata{} + } + + if len(items) == 0 { + p.patchData.Meta.Finalizers = append(p.patchData.Meta.Finalizers, NullHolder) + } else { + p.patchData.Meta.Finalizers = items + } + + return p +} + +func (p *Patch) InsertLabel(key, value string) *Patch { + if p.patchData.Meta == nil { + p.patchData.Meta = &prunedMetadata{} + } + if p.patchData.Meta.Labels == nil { + p.patchData.Meta.Labels = make(map[string]string) + } + p.patchData.Meta.Labels[key] = value + return p +} + +func (p *Patch) DeleteLabel(key string) *Patch { + if p.patchData.Meta == nil { + p.patchData.Meta = &prunedMetadata{} + } + if p.patchData.Meta.Labels == nil { + p.patchData.Meta.Labels = make(map[string]string) + } + p.patchData.Meta.Labels[key] = NullHolder + return p +} + +func (p *Patch) InsertAnnotation(key, value string) *Patch { + if p.patchData.Meta == nil { + p.patchData.Meta = &prunedMetadata{} + } + if p.patchData.Meta.Annotations == nil { + p.patchData.Meta.Annotations = make(map[string]string) + } + p.patchData.Meta.Annotations[key] = value + return p +} + +func (p *Patch) DeleteAnnotation(key string) *Patch { + if p.patchData.Meta == nil { + p.patchData.Meta = &prunedMetadata{} + } + if p.patchData.Meta.Annotations == nil { + p.patchData.Meta.Annotations = make(map[string]string) + } + p.patchData.Meta.Annotations[key] = NullHolder + return p +} + +type patchData struct { + Meta *prunedMetadata `json:"metadata,omitempty"` +} + +type prunedMetadata struct { + Labels map[string]string `json:"labels,omitempty"` + Annotations map[string]string `json:"annotations,omitempty"` + Finalizers []string `json:"finalizers,omitempty"` + DeletePrimitiveFinalizer []string `json:"$deleteFromPrimitiveList/finalizers,omitempty"` +} diff --git a/pkg/utils/patch/patch_utils_test.go b/pkg/utils/patch/patch_utils_test.go new file mode 100644 index 00000000000..bafa61b895e --- /dev/null +++ b/pkg/utils/patch/patch_utils_test.go @@ -0,0 +1,189 @@ +package patch + +import ( + "context" + "fmt" + "reflect" + "testing" + + "github.com/stretchr/testify/assert" + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/kubernetes/scheme" + "k8s.io/utils/pointer" + fakeclient "sigs.k8s.io/controller-runtime/pkg/client/fake" + + tfv1 "github.com/alibaba/kubedl/apis/tensorflow/v1" + apiv1 "github.com/alibaba/kubedl/pkg/job_controller/api/v1" +) + +func TestPatchBodyGeneration(t *testing.T) { + patchReq := NewStrategicPatch(). + AddFinalizer("new-finalizer"). + RemoveFinalizer("old-finalizer"). + InsertLabel("new-label", "foo1"). + DeleteLabel("old-label"). + InsertAnnotation("new-annotation", "foo2"). + DeleteAnnotation("old-annotation") + + expectedPatchBody := fmt.Sprintf(`{"metadata":{"labels":{"new-label":"foo1","old-label":null},"annotations":{"new-annotation":"foo2","old-annotation":null},"finalizers":["new-finalizer"],"$deleteFromPrimitiveList/finalizers":["old-finalizer"]}}`) + + if !reflect.DeepEqual(patchReq.String(), expectedPatchBody) { + t.Fatalf("Not equal: \n%s \n%s", expectedPatchBody, patchReq.String()) + } + +} + +func TestMergePatchBody(t *testing.T) { + finalizers := []string{"origin-finalizer"} + tests := []struct { + name string + mergePatchReq *Patch + expectedMergePatchBody string + }{ + { + name: "add finalizer", + mergePatchReq: NewMergePatch().OverrideFinalizer(append(finalizers, "new-finalizer")). + InsertLabel("new-label", "foo1").DeleteLabel("old-label"). + InsertAnnotation("new-annotation", "foo2").DeleteAnnotation("old-annotation"), + expectedMergePatchBody: fmt.Sprintf(`{"metadata":{"labels":{"new-label":"foo1","old-label":null},"annotations":{"new-annotation":"foo2","old-annotation":null},"finalizers":["origin-finalizer","new-finalizer"]}}`), + }, + { + name: "remove finalizer", + mergePatchReq: NewMergePatch().OverrideFinalizer(append(finalizers, "new-finalizer")). + InsertLabel("new-label", "foo1").DeleteLabel("old-label"). + InsertAnnotation("new-annotation", "foo2").DeleteAnnotation("old-annotation"), + expectedMergePatchBody: fmt.Sprintf(`{"metadata":{"labels":{"new-label":"foo1","old-label":null},"annotations":{"new-annotation":"foo2","old-annotation":null},"finalizers":["origin-finalizer","new-finalizer"]}}`), + }, + { + name: "add finalizer", + mergePatchReq: NewMergePatch().OverrideFinalizer(make([]string, 0)). + InsertLabel("new-label", "foo1").DeleteLabel("old-label"). + InsertAnnotation("new-annotation", "foo2").DeleteAnnotation("old-annotation"), + expectedMergePatchBody: fmt.Sprintf(`{"metadata":{"labels":{"new-label":"foo1","old-label":null},"annotations":{"new-annotation":"foo2","old-annotation":null},"finalizers":null}}`), + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if !reflect.DeepEqual(tt.mergePatchReq.String(), tt.expectedMergePatchBody) { + t.Fatalf("Not equal: \n%s \n%s", tt.expectedMergePatchBody, tt.mergePatchReq.String()) + } + }) + } + +} + +func TestPodPatchOperations(t *testing.T) { + p := &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "hello", + Namespace: "default", + }, + Spec: v1.PodSpec{ + Containers: []v1.Container{ + { + Name: "main", + Image: "busybox", + }, + }, + }, + } + nsName := types.NamespacedName{Name: "hello", Namespace: "default"} + + fake := fakeclient.NewFakeClientWithScheme(scheme.Scheme) + _ = fake.Create(context.Background(), p) + + patch := NewStrategicPatch() + patch.InsertLabel("foo", "bar") + err := fake.Patch(context.Background(), p, patch) + assert.Nil(t, err) + err = fake.Get(context.Background(), nsName, p) + assert.Nil(t, err) + assert.Equal(t, p.Labels["foo"], "bar") + patch.InsertLabel("foo", "bar1") + err = fake.Patch(context.Background(), p, patch) + assert.Nil(t, err) + err = fake.Get(context.Background(), nsName, p) + assert.Nil(t, err) + assert.Equal(t, p.Labels["foo"], "bar1") + + patch = NewStrategicPatch() + patch.InsertAnnotation("foo", "bar") + err = fake.Patch(context.Background(), p, patch) + assert.Nil(t, err) + err = fake.Get(context.Background(), nsName, p) + assert.Nil(t, err) + assert.Equal(t, p.Annotations["foo"], "bar") + + patch = NewStrategicPatch() + patch.AddFinalizer("finalizer") + err = fake.Patch(context.Background(), p, patch) + assert.Nil(t, err) + err = fake.Get(context.Background(), nsName, p) + assert.Nil(t, err) + assert.Equal(t, len(p.Finalizers), 1) + assert.Equal(t, p.Finalizers[0], "finalizer") +} + +func TestJobPatchOperations(t *testing.T) { + job := &tfv1.TFJob{ + ObjectMeta: metav1.ObjectMeta{ + Name: "hello", + Namespace: "default", + }, + Spec: tfv1.TFJobSpec{ + TFReplicaSpecs: map[apiv1.ReplicaType]*apiv1.ReplicaSpec{ + tfv1.TFReplicaTypeWorker: { + Replicas: pointer.Int32Ptr(1), + Template: v1.PodTemplateSpec{ + Spec: v1.PodSpec{ + Containers: []v1.Container{ + { + Name: "main", + Image: "busybox", + }, + }, + }, + }, + }, + }, + }, + } + nsName := types.NamespacedName{Name: "hello", Namespace: "default"} + + tfv1.SchemeBuilder.AddToScheme(scheme.Scheme) + fake := fakeclient.NewFakeClientWithScheme(scheme.Scheme) + _ = fake.Create(context.Background(), job) + + patch := NewMergePatch() + patch.InsertLabel("foo", "bar") + err := fake.Patch(context.Background(), job, patch) + assert.Nil(t, err) + err = fake.Get(context.Background(), nsName, job) + assert.Nil(t, err) + assert.Equal(t, job.Labels["foo"], "bar") + patch.InsertLabel("foo", "bar1") + err = fake.Patch(context.Background(), job, patch) + assert.Nil(t, err) + err = fake.Get(context.Background(), nsName, job) + assert.Nil(t, err) + assert.Equal(t, job.Labels["foo"], "bar1") + + patch = NewMergePatch() + patch.InsertAnnotation("foo", "bar") + err = fake.Patch(context.Background(), job, patch) + assert.Nil(t, err) + err = fake.Get(context.Background(), nsName, job) + assert.Nil(t, err) + assert.Equal(t, job.Annotations["foo"], "bar") + + patch = NewMergePatch() + patch.AddFinalizer("finalizer") + err = fake.Patch(context.Background(), job, patch) + assert.Nil(t, err) + err = fake.Get(context.Background(), nsName, job) + assert.Nil(t, err) + assert.Equal(t, len(job.Finalizers), 0) +}