Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Provide pod affinity to Alluxio master and workers #3536

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions api/v1alpha1/alluxioruntime_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,10 @@ type AlluxioCompTemplateSpec struct {
// +optional
NodeSelector map[string]string `json:"nodeSelector,omitempty"`

// RuntimePodAffinity defines the pod affinity policy of Alluxio's pods
// +optional
RuntimePodAffinity RuntimePodAffinity `json:"runtimePodAffinity,omitempty"`

// Whether to use hostnetwork or not
// +kubebuilder:validation:Enum=HostNetwork;"";ContainerNetwork
// +optional
Expand Down
6 changes: 6 additions & 0 deletions api/v1alpha1/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,12 @@ type Metadata struct {
Selector metav1.GroupKind `json:"selector,omitempty"`
}

// RuntimePodAffinity defines the podAffinity and podAntiAffinity of runtime pods.
type RuntimePodAffinity struct {
PodAffinity *corev1.PodAffinity `json:"podAffinity,omitempty"`
PodAntiAffinity *corev1.PodAntiAffinity `json:"podAntiAffinity,omitempty"`
}

// PodMetadata defines subgroup properties of metav1.ObjectMeta
type PodMetadata struct {
// Labels are labels of pod specification
Expand Down
26 changes: 26 additions & 0 deletions api/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions charts/alluxio/templates/worker/statefulset.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,10 @@ spec:
{{- if .Values.tolerations }}
tolerations:
{{ toYaml .Values.tolerations | indent 8 }}
{{- end }}
affinity:
{{- if .Values.worker.affinity }}
{{ toYaml .Values.worker.affinity | indent 8 }}
{{- end }}
initContainers:
{{ if .Values.initUsers.enabled -}}
Expand Down
4,047 changes: 3,651 additions & 396 deletions charts/fluid/fluid/crds/data.fluid.io_alluxioruntimes.yaml

Large diffs are not rendered by default.

4,047 changes: 3,651 additions & 396 deletions config/crd/bases/data.fluid.io_alluxioruntimes.yaml

Large diffs are not rendered by default.

173 changes: 89 additions & 84 deletions pkg/ctrl/affinity.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,109 +69,114 @@ func (e *Helper) BuildWorkersAffinity(workers *appsv1.StatefulSet) (workersToUpd

if workersToUpdate.Spec.Template.Spec.Affinity == nil {
workersToUpdate.Spec.Template.Spec.Affinity = &corev1.Affinity{}
dataset, err := utils.GetDataset(e.client, name, namespace)
if err != nil {
return workersToUpdate, err
}
// 1. Set pod anti affinity(required) for same dataset (Current using port conflict for scheduling, no need to do)

// 2. Set pod anti affinity for the different dataset
if dataset.IsExclusiveMode() {
workersToUpdate.Spec.Template.Spec.Affinity.PodAntiAffinity = &corev1.PodAntiAffinity{
RequiredDuringSchedulingIgnoredDuringExecution: []corev1.PodAffinityTerm{
{
LabelSelector: &metav1.LabelSelector{
MatchExpressions: []metav1.LabelSelectorRequirement{
{
Key: "fluid.io/dataset",
Operator: metav1.LabelSelectorOpExists,
},
},
}
dataset, err := utils.GetDataset(e.client, name, namespace)
if err != nil {
return workersToUpdate, err
}
// 1. Set pod anti affinity(required) for same dataset (Current using port conflict for scheduling, no need to do)

// 2. Set pod anti affinity for the different dataset
podAntiAffinityToUpdate := workersToUpdate.Spec.Template.Spec.Affinity.PodAntiAffinity
if podAntiAffinityToUpdate == nil {
workersToUpdate.Spec.Template.Spec.Affinity.PodAntiAffinity = &corev1.PodAntiAffinity{}
podAntiAffinityToUpdate = workersToUpdate.Spec.Template.Spec.Affinity.PodAntiAffinity
}
if podAntiAffinityToUpdate.RequiredDuringSchedulingIgnoredDuringExecution == nil {
podAntiAffinityToUpdate.RequiredDuringSchedulingIgnoredDuringExecution = []corev1.PodAffinityTerm{}
}
if podAntiAffinityToUpdate.PreferredDuringSchedulingIgnoredDuringExecution == nil {
podAntiAffinityToUpdate.PreferredDuringSchedulingIgnoredDuringExecution = []corev1.WeightedPodAffinityTerm{}
}
if dataset.IsExclusiveMode() {
podAntiAffinityToUpdate.RequiredDuringSchedulingIgnoredDuringExecution = append(
podAntiAffinityToUpdate.RequiredDuringSchedulingIgnoredDuringExecution, corev1.PodAffinityTerm{
LabelSelector: &metav1.LabelSelector{
MatchExpressions: []metav1.LabelSelectorRequirement{
{
Key: "fluid.io/dataset",
Operator: metav1.LabelSelectorOpExists,
},
TopologyKey: "kubernetes.io/hostname",
},
},
}
} else {
workersToUpdate.Spec.Template.Spec.Affinity.PodAntiAffinity = &corev1.PodAntiAffinity{
PreferredDuringSchedulingIgnoredDuringExecution: []corev1.WeightedPodAffinityTerm{
{
// The default weight is 50
Weight: 50,
PodAffinityTerm: corev1.PodAffinityTerm{
LabelSelector: &metav1.LabelSelector{
MatchExpressions: []metav1.LabelSelectorRequirement{
{
Key: "fluid.io/dataset",
Operator: metav1.LabelSelectorOpExists,
},
},
TopologyKey: "kubernetes.io/hostname",
},
)
} else {
podAntiAffinityToUpdate.PreferredDuringSchedulingIgnoredDuringExecution = append(
podAntiAffinityToUpdate.PreferredDuringSchedulingIgnoredDuringExecution, corev1.WeightedPodAffinityTerm{
// The default weight is 50
Weight: 50,
PodAffinityTerm: corev1.PodAffinityTerm{
LabelSelector: &metav1.LabelSelector{
MatchExpressions: []metav1.LabelSelectorRequirement{
{
Key: "fluid.io/dataset",
Operator: metav1.LabelSelectorOpExists,
},
TopologyKey: "kubernetes.io/hostname",
},
},
TopologyKey: "kubernetes.io/hostname",
},
RequiredDuringSchedulingIgnoredDuringExecution: []corev1.PodAffinityTerm{
{
LabelSelector: &metav1.LabelSelector{
MatchExpressions: []metav1.LabelSelectorRequirement{
{
Key: "fluid.io/dataset-placement",
Operator: metav1.LabelSelectorOpIn,
Values: []string{string(datav1alpha1.ExclusiveMode)},
},
},
},
)
podAntiAffinityToUpdate.RequiredDuringSchedulingIgnoredDuringExecution = append(
podAntiAffinityToUpdate.RequiredDuringSchedulingIgnoredDuringExecution, corev1.PodAffinityTerm{
LabelSelector: &metav1.LabelSelector{
MatchExpressions: []metav1.LabelSelectorRequirement{
{
Key: "fluid.io/dataset-placement",
Operator: metav1.LabelSelectorOpIn,
Values: []string{string(datav1alpha1.ExclusiveMode)},
},
TopologyKey: "kubernetes.io/hostname",
},
},
}

// TODO: remove this when EFC is ready for spread-first scheduling policy
// Currently EFC prefers binpack-first scheduling policy to spread-first scheduling policy. Set PreferredDuringSchedulingIgnoredDuringExecution to empty
// to avoid using spread-first scheduling policy
if e.runtimeInfo.GetRuntimeType() == common.EFCRuntime {
workersToUpdate.Spec.Template.Spec.Affinity.PodAntiAffinity.PreferredDuringSchedulingIgnoredDuringExecution = []corev1.WeightedPodAffinityTerm{}
}
TopologyKey: "kubernetes.io/hostname",
},
)
// TODO: remove this when EFC is ready for spread-first scheduling policy
// Currently EFC prefers binpack-first scheduling policy to spread-first scheduling policy. Set PreferredDuringSchedulingIgnoredDuringExecution to empty
// to avoid using spread-first scheduling policy
if e.runtimeInfo.GetRuntimeType() == common.EFCRuntime {
workersToUpdate.Spec.Template.Spec.Affinity.PodAntiAffinity.PreferredDuringSchedulingIgnoredDuringExecution = []corev1.WeightedPodAffinityTerm{}
}
}

// 3. Prefer to locate on the node which already has fuse on it
if workersToUpdate.Spec.Template.Spec.Affinity.NodeAffinity == nil {
workersToUpdate.Spec.Template.Spec.Affinity.NodeAffinity = &corev1.NodeAffinity{}
}
// 3. Prefer to locate on the node which already has fuse on it
if workersToUpdate.Spec.Template.Spec.Affinity.NodeAffinity == nil {
workersToUpdate.Spec.Template.Spec.Affinity.NodeAffinity = &corev1.NodeAffinity{}
}

if len(workersToUpdate.Spec.Template.Spec.Affinity.NodeAffinity.PreferredDuringSchedulingIgnoredDuringExecution) == 0 {
workersToUpdate.Spec.Template.Spec.Affinity.NodeAffinity.PreferredDuringSchedulingIgnoredDuringExecution = []corev1.PreferredSchedulingTerm{}
}
if len(workersToUpdate.Spec.Template.Spec.Affinity.NodeAffinity.PreferredDuringSchedulingIgnoredDuringExecution) == 0 {
workersToUpdate.Spec.Template.Spec.Affinity.NodeAffinity.PreferredDuringSchedulingIgnoredDuringExecution = []corev1.PreferredSchedulingTerm{}
}

workersToUpdate.Spec.Template.Spec.Affinity.NodeAffinity.PreferredDuringSchedulingIgnoredDuringExecution =
append(workersToUpdate.Spec.Template.Spec.Affinity.NodeAffinity.PreferredDuringSchedulingIgnoredDuringExecution,
corev1.PreferredSchedulingTerm{
Weight: 100,
Preference: corev1.NodeSelectorTerm{
MatchExpressions: []corev1.NodeSelectorRequirement{
{
Key: e.runtimeInfo.GetFuseLabelName(),
Operator: corev1.NodeSelectorOpIn,
Values: []string{"true"},
},
workersToUpdate.Spec.Template.Spec.Affinity.NodeAffinity.PreferredDuringSchedulingIgnoredDuringExecution =
append(workersToUpdate.Spec.Template.Spec.Affinity.NodeAffinity.PreferredDuringSchedulingIgnoredDuringExecution,
corev1.PreferredSchedulingTerm{
Weight: 100,
Preference: corev1.NodeSelectorTerm{
MatchExpressions: []corev1.NodeSelectorRequirement{
{
Key: e.runtimeInfo.GetFuseLabelName(),
Operator: corev1.NodeSelectorOpIn,
Values: []string{"true"},
},
},
})

// 3. set node affinity if possible
if dataset.Spec.NodeAffinity != nil {
if dataset.Spec.NodeAffinity.Required != nil {
workersToUpdate.Spec.Template.Spec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution =
dataset.Spec.NodeAffinity.Required
}
}
},
})

err = e.client.Update(context.TODO(), workersToUpdate)
if err != nil {
return workersToUpdate, err
// 3. set node affinity if possible
if dataset.Spec.NodeAffinity != nil {
if dataset.Spec.NodeAffinity.Required != nil {
workersToUpdate.Spec.Template.Spec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution =
dataset.Spec.NodeAffinity.Required
}
}

err = e.client.Update(context.TODO(), workersToUpdate)
if err != nil {
return workersToUpdate, err
}

return
Expand Down
16 changes: 16 additions & 0 deletions pkg/ddc/alluxio/transform.go
Original file line number Diff line number Diff line change
Expand Up @@ -312,6 +312,14 @@ func (e *AlluxioEngine) transformMasters(runtime *datav1alpha1.AlluxioRuntime,
value.Master.NodeSelector = nodeSelector
}

// Add pod affinity to master pod
if runtime.Spec.Master.RuntimePodAffinity.PodAffinity != nil {
value.Master.Affinity.PodAffinity = runtime.Spec.Master.RuntimePodAffinity.PodAffinity
}
if runtime.Spec.Master.RuntimePodAffinity.PodAntiAffinity != nil {
value.Master.Affinity.PodAntiAffinity = runtime.Spec.Master.RuntimePodAffinity.PodAntiAffinity
}

// // check the run as
// if runtime.Spec.RunAs != nil {
// value.Master.Env["ALLUXIO_USERNAME"] = alluxioUser
Expand Down Expand Up @@ -420,6 +428,14 @@ func (e *AlluxioEngine) transformWorkers(runtime *datav1alpha1.AlluxioRuntime, v
e.Log.Error(err, "failed to transform volumes for worker")
}

// Add pod affinity to worker pods
if runtime.Spec.Worker.RuntimePodAffinity.PodAffinity != nil {
value.Worker.Affinity.PodAffinity = runtime.Spec.Worker.RuntimePodAffinity.PodAffinity
}
if runtime.Spec.Worker.RuntimePodAffinity.PodAntiAffinity != nil {
value.Worker.Affinity.PodAntiAffinity = runtime.Spec.Worker.RuntimePodAffinity.PodAntiAffinity
}

return
}

Expand Down
4 changes: 1 addition & 3 deletions pkg/ddc/alluxio/transform_ufs.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,9 +69,7 @@ func (e *AlluxioEngine) transformDatasetToVolume(runtime *datav1alpha1.AlluxioRu
if len(value.UFSPaths) > 0 {
// fmt.Println("UFSPaths length 1")
if dataset.Spec.NodeAffinity != nil {
value.Master.Affinity = Affinity{
NodeAffinity: translateCacheToNodeAffinity(dataset.Spec.NodeAffinity),
}
value.Master.Affinity.NodeAffinity = translateCacheToNodeAffinity(dataset.Spec.NodeAffinity)
}
}

Expand Down
7 changes: 2 additions & 5 deletions pkg/ddc/alluxio/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ type JobWorker struct {
type Worker struct {
JvmOptions []string `json:"jvmOptions,omitempty"`
Env map[string]string `json:"env,omitempty"`
Affinity corev1.Affinity `json:"affinity,omitempty"`
NodeSelector map[string]string `json:"nodeSelector,omitempty"`
Properties map[string]string `json:"properties,omitempty"`
HostNetwork bool `json:"hostNetwork,omitempty"`
Expand All @@ -152,7 +153,7 @@ type Worker struct {
type Master struct {
JvmOptions []string `json:"jvmOptions,omitempty"`
Env map[string]string `json:"env,omitempty"`
Affinity Affinity `json:"affinity"`
Affinity corev1.Affinity `json:"affinity"`
NodeSelector map[string]string `json:"nodeSelector,omitempty"`
Properties map[string]string `json:"properties,omitempty"`
Replicas int32 `json:"replicaCount,omitempty"`
Expand Down Expand Up @@ -214,10 +215,6 @@ type Level struct {
Low string `json:"low,omitempty"`
}

type Affinity struct {
NodeAffinity *NodeAffinity `json:"nodeAffinity"`
}

type cacheHitStates struct {
cacheHitRatio string
localHitRatio string
Expand Down
Loading
Loading