Skip to content

Commit

Permalink
feat: re-setup master when master role restarted or recreated and rec…
Browse files Browse the repository at this point in the history
…over dataset
  • Loading branch information
qiukai.cqk committed Aug 7, 2023
1 parent b61a861 commit 804392c
Show file tree
Hide file tree
Showing 12 changed files with 536 additions and 24 deletions.
4 changes: 4 additions & 0 deletions pkg/common/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,4 +186,8 @@ const (

JobPolicy = "fluid.io/jobPolicy"
CronPolicy = "cron"

// AnnotationLatestMasterStartedTime annotates latest master containers start-at timestamp
// to identify master restarted or recreated by accidentally.
AnnotationLatestMasterStartedTime = "fluid.io/master-latest-started-at"
)
90 changes: 86 additions & 4 deletions pkg/ctrl/master.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,23 @@ import (
"context"
"fmt"
"reflect"
"time"

datav1alpha1 "github.com/fluid-cloudnative/fluid/api/v1alpha1"
"github.com/fluid-cloudnative/fluid/pkg/ddc/base"
"github.com/fluid-cloudnative/fluid/pkg/utils"
"github.com/fluid-cloudnative/fluid/pkg/utils/kubeclient"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/client-go/tools/record"
"sigs.k8s.io/controller-runtime/pkg/client"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"

datav1alpha1 "github.com/fluid-cloudnative/fluid/api/v1alpha1"
"github.com/fluid-cloudnative/fluid/pkg/common"
"github.com/fluid-cloudnative/fluid/pkg/ddc/base"
"github.com/fluid-cloudnative/fluid/pkg/utils"
"github.com/fluid-cloudnative/fluid/pkg/utils/kubeclient"
"github.com/fluid-cloudnative/fluid/pkg/utils/patch"
)

// CheckMasterHealthy checks the sts healthy with role
Expand All @@ -53,6 +58,26 @@ func (e *Helper) CheckMasterHealthy(recorder record.EventRecorder, runtime base.
}

if healthy {
realHealthy, readyTime, err := e.recheckMasterHealthyByEachContainerStartedTime(sts, e.client)
if err != nil {
e.log.Error(err, "failed to recheck master healthy by each container started time")
return err
}

if !readyTime.IsZero() && sts.Annotations[common.AnnotationLatestMasterStartedTime] != readyTime.Format(time.RFC3339) {
toPatch := patch.NewStrategicPatch().InsertAnnotation(common.AnnotationLatestMasterStartedTime, readyTime.Format(time.RFC3339))
if err = e.client.Patch(context.Background(), sts, toPatch); err != nil {
return err
}
}

if !realHealthy {
return fmt.Errorf("the master %s in %s is not ready. Master pod must have restarted since last ready time",
sts.Name,
sts.Namespace,
)
}

cond := utils.NewRuntimeCondition(datav1alpha1.RuntimeMasterReady, "The master is ready.",
"The master is ready.", corev1.ConditionTrue)
_, oldCond := utils.GetRuntimeCondition(statusToUpdate.Conditions, cond.Type)
Expand Down Expand Up @@ -116,3 +141,60 @@ func (e *Helper) CheckMasterHealthy(recorder record.EventRecorder, runtime base.
return

}

func (e *Helper) StopBindDataSetInHealthCheck() bool {
ds, err := utils.GetDataset(e.client, e.runtimeInfo.GetName(), e.runtimeInfo.GetNamespace())
if err != nil {
return false
}
// When current dataset is Failed, we expect to re-execute `Setup` in
// runtime controller to recover the corrupted dataset, and runtime controller
// will update dataset as Bound after successfully setup, that is, health check
// does not do the bind operation to dataset.
return ds.Status.Phase == datav1alpha1.FailedDatasetPhase
}

func (e *Helper) recheckMasterHealthyByEachContainerStartedTime(masterSts *appsv1.StatefulSet, c client.Client) (ready bool, readyTime metav1.Time, err error) {
lastLatest := time.Time{}

if len(masterSts.Annotations[common.AnnotationLatestMasterStartedTime]) > 0 {
lastLatest, err = time.Parse(time.RFC3339, masterSts.Annotations[common.AnnotationLatestMasterStartedTime])
if err != nil {
e.log.Error(err, "failed to parse last latest master started time from annotation",
"value", masterSts.Annotations[common.AnnotationLatestMasterStartedTime])
return true, metav1.Time{}, err
}
}

selector, err := metav1.LabelSelectorAsSelector(masterSts.Spec.Selector)
if err != nil {
return false, metav1.Time{}, fmt.Errorf("error converting StatefulSet %s in namespace %s selector: %v", masterSts.Name, masterSts.Namespace, err)
}
pods, err := kubeclient.GetPodsForStatefulSet(c, masterSts, selector)
if err != nil {
return false, metav1.Time{}, err
}
if len(pods) < int(*masterSts.Spec.Replicas) {
return false, metav1.Time{}, nil
}

latest := metav1.Time{}
for pi := range pods {
curLatest := findLatestContainersStartedTime(&pods[pi])
if curLatest.After(latest.Time) {
latest = curLatest
}
}
return !lastLatest.IsZero() && lastLatest.Equal(latest.Time), latest, nil
}

func findLatestContainersStartedTime(pod *corev1.Pod) metav1.Time {
latest := metav1.Time{}
for csi := range pod.Status.ContainerStatuses {
cs := &pod.Status.ContainerStatuses[csi]
if cs.State.Running != nil && cs.State.Running.StartedAt.After(latest.Time) {
latest = cs.State.Running.StartedAt
}
}
return latest
}
61 changes: 58 additions & 3 deletions pkg/ctrl/master_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,17 @@ import (
"context"
"testing"

datav1alpha1 "github.com/fluid-cloudnative/fluid/api/v1alpha1"
"github.com/fluid-cloudnative/fluid/pkg/ddc/base"
"github.com/fluid-cloudnative/fluid/pkg/utils/fake"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/tools/record"
utilpointer "k8s.io/utils/pointer"

datav1alpha1 "github.com/fluid-cloudnative/fluid/api/v1alpha1"
"github.com/fluid-cloudnative/fluid/pkg/ddc/base"
"github.com/fluid-cloudnative/fluid/pkg/utils/fake"
)

func TestCheckMasterHealthy(t *testing.T) {
Expand Down Expand Up @@ -286,3 +287,57 @@ func TestCheckMasterHealthy(t *testing.T) {

}
}

func Test_recheckMasterHealthyByEachContainerStartedTime(t *testing.T) {
// testCases := []struct {
// name string
// sts *appsv1.StatefulSet
// pods []*corev1.Pod
// expectedSts *appsv1.StatefulSet
// ready bool
// }{
// {
// name: "check health at first shot",
// sts: &appsv1.StatefulSet{
// ObjectMeta: metav1.ObjectMeta{
// Name: "ds-test-master",
// Namespace: "default",
// },
// Spec: appsv1.StatefulSetSpec{
// Replicas: utilpointer.Int32(1),
// Selector: &metav1.LabelSelector{MatchLabels: map[string]string{
// "foo": "bar",
// }},
// },
// },
// pods: []*corev1.Pod{
// {
// ObjectMeta: metav1.ObjectMeta{
// Name: "ds-test-master-0",
// Namespace: "default",
// Labels: map[string]string{"foo": "bar"},
// },
// Spec: corev1.PodSpec{
// Containers: []corev1.Container{
// {
// Name: "main",
// Image: "busybox",
// },
// },
// },
// Status: corev1.PodStatus{
// Phase: corev1.PodRunning,
// ContainerStatuses: []corev1.ContainerStatus{
// {
// Name: "main",
// State: corev1.ContainerState{Running: &corev1.ContainerStateRunning{
// StartedAt: metav1.Time{time.Parse(time.RFC3339,)}
// }},
// },
// },
// },
// },
// },
// },
// }
}
1 change: 1 addition & 0 deletions pkg/ddc/base/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ func (b *TemplateEngine) Setup(ctx cruntime.ReconcileRequestContext) (ready bool
}

if shouldCheckUFS {
b.Log.V(3).Info("start to prepare ufs", "runtime", ctx.Runtime.GetName())
err = b.Implement.PrepareUFS()
if err != nil {
b.Log.Error(err, "Failed to prepare ufs.")
Expand Down
16 changes: 13 additions & 3 deletions pkg/ddc/jindo/health_check.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,20 @@ limitations under the License.
package jindo

import (
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/util/retry"

data "github.com/fluid-cloudnative/fluid/api/v1alpha1"
"github.com/fluid-cloudnative/fluid/pkg/common"
"github.com/fluid-cloudnative/fluid/pkg/ctrl"
fluiderrs "github.com/fluid-cloudnative/fluid/pkg/errors"
"github.com/fluid-cloudnative/fluid/pkg/utils/kubeclient"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/util/retry"
)

func (e *JindoEngine) CheckRuntimeHealthy() (err error) {
e.Log.V(3).Info("CheckRuntimeHealthy", "runtime name", e.runtimeInfo.GetName())

// 1. Check the healthy of the master
err = e.checkMasterHealthy()
if err != nil {
Expand Down Expand Up @@ -61,12 +64,19 @@ func (e *JindoEngine) CheckRuntimeHealthy() (err error) {
return
}

// In some conditions we expect not to update dataset as Bound in
// health check and let other components to handle it.
if e.StopBindDataSetInHealthCheck() {
return nil
}

// 4. Update the dataset as Bounded
return e.UpdateDatasetStatus(data.BoundDatasetPhase)
}

// checkMasterHealthy checks the master healthy
func (e *JindoEngine) checkMasterHealthy() (err error) {
e.Log.V(3).Info("checkMasterHealthy", "master name", e.getMasterName())
master, err := kubeclient.GetStatefulSet(e.Client, e.getMasterName(), e.namespace)
if err != nil {
return err
Expand Down
12 changes: 9 additions & 3 deletions pkg/ddc/jindocache/health_check.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,15 @@ limitations under the License.
package jindocache

import (
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/util/retry"

data "github.com/fluid-cloudnative/fluid/api/v1alpha1"
"github.com/fluid-cloudnative/fluid/pkg/common"
"github.com/fluid-cloudnative/fluid/pkg/ctrl"
fluiderrs "github.com/fluid-cloudnative/fluid/pkg/errors"
"github.com/fluid-cloudnative/fluid/pkg/utils/kubeclient"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/util/retry"
)

func (e *JindoCacheEngine) CheckRuntimeHealthy() (err error) {
Expand Down Expand Up @@ -68,6 +69,11 @@ func (e *JindoCacheEngine) CheckRuntimeHealthy() (err error) {
}
}

// In some conditions we expect not to update dataset as Bound in
// health check and let other components to handle it.
if e.StopBindDataSetInHealthCheck() {
return nil
}
// 4. Update the dataset as Bounded
return e.UpdateDatasetStatus(data.BoundDatasetPhase)
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/ddc/jindofsx/dataset.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,11 +63,11 @@ func (e *JindoFSxEngine) UpdateDatasetStatus(phase datav1alpha1.DatasetPhase) (e
"The ddc runtime is ready.",
corev1.ConditionTrue)
case datav1alpha1.FailedDatasetPhase:
cond = utils.NewDatasetCondition(datav1alpha1.DatasetReady, datav1alpha1.DatasetReadyReason,
cond = utils.NewDatasetCondition(datav1alpha1.DatasetNotReady, datav1alpha1.DatasetReadyReason,
"The ddc runtime is not ready.",
corev1.ConditionFalse)
default:
cond = utils.NewDatasetCondition(datav1alpha1.DatasetReady, datav1alpha1.DatasetReadyReason,
cond = utils.NewDatasetCondition(datav1alpha1.DatasetUpdatingReason, datav1alpha1.DatasetReadyReason,
"The ddc runtime is unknown.",
corev1.ConditionFalse)
}
Expand Down
12 changes: 9 additions & 3 deletions pkg/ddc/jindofsx/health_check.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,15 @@ limitations under the License.
package jindofsx

import (
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/util/retry"

data "github.com/fluid-cloudnative/fluid/api/v1alpha1"
"github.com/fluid-cloudnative/fluid/pkg/common"
"github.com/fluid-cloudnative/fluid/pkg/ctrl"
fluiderrs "github.com/fluid-cloudnative/fluid/pkg/errors"
"github.com/fluid-cloudnative/fluid/pkg/utils/kubeclient"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/util/retry"
)

func (e *JindoFSxEngine) CheckRuntimeHealthy() (err error) {
Expand Down Expand Up @@ -67,6 +68,11 @@ func (e *JindoFSxEngine) CheckRuntimeHealthy() (err error) {
return
}
}
// In some conditions we expect not to update dataset as Bound in
// health check and let other components to handle it.
if e.StopBindDataSetInHealthCheck() {
return nil
}

// 4. Update the dataset as Bounded
return e.UpdateDatasetStatus(data.BoundDatasetPhase)
Expand Down
2 changes: 1 addition & 1 deletion pkg/ddc/jindofsx/ufs_internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,8 @@ func (e *JindoFSxEngine) mountUFS() (err error) {
// Iterate all the mount points, do mount if the mount point is not Fluid-native(e.g. Hostpath or PVC)
for _, mount := range dataset.Spec.Mounts {

// first to check the path isMounted
mounted := false
// first to check the path isMounted
if strings.HasPrefix(mount.MountPoint, common.VolumeScheme.String()) {
ufsVolumesPath := utils.UFSPathBuilder{}.GenLocalStoragePath(mount)
mount.MountPoint = "local://" + ufsVolumesPath
Expand Down
14 changes: 9 additions & 5 deletions pkg/utils/dataset.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,13 @@ import (
"fmt"
"reflect"

datav1alpha1 "github.com/fluid-cloudnative/fluid/api/v1alpha1"
"github.com/fluid-cloudnative/fluid/pkg/common"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/util/retry"
"sigs.k8s.io/controller-runtime/pkg/client"

datav1alpha1 "github.com/fluid-cloudnative/fluid/api/v1alpha1"
"github.com/fluid-cloudnative/fluid/pkg/common"
)

// GetDataset gets the dataset.
Expand All @@ -45,8 +46,11 @@ func GetDataset(client client.Client, name, namespace string) (*datav1alpha1.Dat

// checks the setup is done
func IsSetupDone(dataset *datav1alpha1.Dataset) (done bool) {
index, _ := GetDatasetCondition(dataset.Status.Conditions, datav1alpha1.DatasetReady)
if index != -1 {
if dataset.Status.Phase != datav1alpha1.BoundDatasetPhase {
return false
}
index, cond := GetDatasetCondition(dataset.Status.Conditions, datav1alpha1.DatasetReady)
if index != -1 && cond.Status == corev1.ConditionTrue {
// e.Log.V(1).Info("The runtime is already setup.")
done = true
}
Expand Down Expand Up @@ -78,7 +82,7 @@ func IsTargetPathUnderFluidNativeMounts(targetPath string, dataset datav1alpha1.

mPath := UFSPathBuilder{}.GenAlluxioMountPath(mount, dataset.Spec.Mounts)

//TODO(xuzhihao): HasPrefix is not enough.
// TODO(xuzhihao): HasPrefix is not enough.

// only for native scheme
if !common.IsFluidNativeScheme(mount.MountPoint) {
Expand Down
Loading

0 comments on commit 804392c

Please sign in to comment.