Skip to content

Commit

Permalink
feat: re-setup master when master role restarted or recreated and rec…
Browse files Browse the repository at this point in the history
…over dataset

Signed-off-by: SimonCqk <[email protected]>
  • Loading branch information
SimonCqk authored and SimonCqk committed Aug 29, 2023
1 parent b61a861 commit df7d99a
Show file tree
Hide file tree
Showing 12 changed files with 707 additions and 26 deletions.
4 changes: 4 additions & 0 deletions pkg/common/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,4 +186,8 @@ const (

JobPolicy = "fluid.io/jobPolicy"
CronPolicy = "cron"

// AnnotationLatestMasterStartedTime annotates latest master containers start-at timestamp
// to identify master restarted or recreated by accidentally.
AnnotationLatestMasterStartedTime = "fluid.io/master-latest-started-at"
)
92 changes: 86 additions & 6 deletions pkg/ctrl/master.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,21 @@ import (
"context"
"fmt"
"reflect"
"time"

datav1alpha1 "github.com/fluid-cloudnative/fluid/api/v1alpha1"
"github.com/fluid-cloudnative/fluid/pkg/ddc/base"
"github.com/fluid-cloudnative/fluid/pkg/utils"
"github.com/fluid-cloudnative/fluid/pkg/utils/kubeclient"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/client-go/tools/record"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/tools/record"

datav1alpha1 "github.com/fluid-cloudnative/fluid/api/v1alpha1"
"github.com/fluid-cloudnative/fluid/pkg/common"
"github.com/fluid-cloudnative/fluid/pkg/ddc/base"
"github.com/fluid-cloudnative/fluid/pkg/utils"
"github.com/fluid-cloudnative/fluid/pkg/utils/kubeclient"
"github.com/fluid-cloudnative/fluid/pkg/utils/patch"
)

// CheckMasterHealthy checks the sts healthy with role
Expand All @@ -53,6 +56,26 @@ func (e *Helper) CheckMasterHealthy(recorder record.EventRecorder, runtime base.
}

if healthy {
realHealthy, readyTime, err := e.recheckMasterHealthyByEachContainerStartedTime(sts)
if err != nil {
e.log.Error(err, "failed to recheck master healthy by each container started time")
return err
}

if !readyTime.IsZero() && sts.Annotations[common.AnnotationLatestMasterStartedTime] != readyTime.Format(time.RFC3339) {
toPatch := patch.NewStrategicPatch().InsertAnnotation(common.AnnotationLatestMasterStartedTime, readyTime.Format(time.RFC3339))
if err = e.client.Patch(context.Background(), sts, toPatch); err != nil {
return err
}
}

if !realHealthy {
return fmt.Errorf("the master %s in %s is not ready. Master pod must have restarted since last ready time",
sts.Name,
sts.Namespace,
)
}

cond := utils.NewRuntimeCondition(datav1alpha1.RuntimeMasterReady, "The master is ready.",
"The master is ready.", corev1.ConditionTrue)
_, oldCond := utils.GetRuntimeCondition(statusToUpdate.Conditions, cond.Type)
Expand Down Expand Up @@ -116,3 +139,60 @@ func (e *Helper) CheckMasterHealthy(recorder record.EventRecorder, runtime base.
return

}

func (e *Helper) StopBindDataSetInHealthCheck() bool {
ds, err := utils.GetDataset(e.client, e.runtimeInfo.GetName(), e.runtimeInfo.GetNamespace())
if err != nil {
return false
}
// When current dataset is Failed, we expect to re-execute `Setup` in
// runtime controller to recover the corrupted dataset, and runtime controller
// will update dataset as Bound after successfully setup, that is, health check
// does not do the bind operation to dataset.
return ds.Status.Phase == datav1alpha1.FailedDatasetPhase
}

func (e *Helper) recheckMasterHealthyByEachContainerStartedTime(masterSts *appsv1.StatefulSet) (ready bool, readyTime metav1.Time, err error) {
lastLatest := time.Time{}

if len(masterSts.Annotations[common.AnnotationLatestMasterStartedTime]) > 0 {
lastLatest, err = time.Parse(time.RFC3339, masterSts.Annotations[common.AnnotationLatestMasterStartedTime])
if err != nil {
e.log.Error(err, "failed to parse last latest master started time from annotation",
"value", masterSts.Annotations[common.AnnotationLatestMasterStartedTime])
return true, metav1.Time{}, err
}
}

selector, err := metav1.LabelSelectorAsSelector(masterSts.Spec.Selector)
if err != nil {
return false, metav1.Time{}, fmt.Errorf("error converting StatefulSet %s in namespace %s selector: %v", masterSts.Name, masterSts.Namespace, err)
}
pods, err := kubeclient.GetPodsForStatefulSet(e.client, masterSts, selector)
if err != nil {
return false, metav1.Time{}, err
}
if len(pods) < int(*masterSts.Spec.Replicas) {
return false, metav1.Time{}, nil
}

latest := metav1.Time{}
for pi := range pods {
curLatest := findLatestContainersStartedTime(&pods[pi])
if curLatest.After(latest.Time) {
latest = curLatest
}
}
return !lastLatest.IsZero() && lastLatest.Equal(latest.Time), latest, nil
}

func findLatestContainersStartedTime(pod *corev1.Pod) metav1.Time {
latest := metav1.Time{}
for csi := range pod.Status.ContainerStatuses {
cs := &pod.Status.ContainerStatuses[csi]
if cs.State.Running != nil && cs.State.Running.StartedAt.After(latest.Time) {
latest = cs.State.Running.StartedAt
}
}
return latest
}
Loading

0 comments on commit df7d99a

Please sign in to comment.