Skip to content

Commit

Permalink
Volcano scheduler: Supports network topology aware scheduling when po…
Browse files Browse the repository at this point in the history
…ds rescheduled

Signed-off-by: ecosysbin <[email protected]>
  • Loading branch information
ecosysbin committed Jan 15, 2025
1 parent 2e8f63b commit 196b72b
Show file tree
Hide file tree
Showing 11 changed files with 287 additions and 22 deletions.
18 changes: 17 additions & 1 deletion pkg/scheduler/actions/allocate/allocate.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,8 @@ func (alloc *Action) allocateResourceForTasksWithTopology(tasks *util.PriorityQu
hyperNodesWithLeftTasks := make(map[string]*util.PriorityQueue)
ssn := alloc.session
selectedTier := 0
LCAHyperNodeMap := map[string]string{}
jobHyperNode := job.PodGroup.Annotations[api.TopologyAllocateLCAHyperNode]

// Find a suitable hyperNode in one tier from down to top everytime to ensure that the selected hyperNode spans the least tier.
for index, tier := range alloc.hyperNodesTiers {
Expand All @@ -254,7 +256,17 @@ func (alloc *Action) allocateResourceForTasksWithTopology(tasks *util.PriorityQu
klog.ErrorS(nil, "HyperNode not exists.", "jobName", job.UID, "name", hyperNodeName, "tier", tier)
continue
}

LCAHyperNodeMap[hyperNodeName] = hyperNodeName
// The job still has remaining tasks to be scheduled, check whether the least common ancestor hypernode still meets the requirement of the highest allowed tier
if jobHyperNode != "" {
LCAHyperNode := ssn.HyperNodes.GetLCAHyperNode(hyperNodeName, jobHyperNode)
hyperNodeInfo := ssn.HyperNodes[LCAHyperNode]
if hyperNodeInfo.Tier() > highestAllowedTier {
klog.V(4).InfoS("Current tier of LCAHyperNode is larger than highestAllowedTier, skipping", "jobName", job.UID, "highestAllowedTier", highestAllowedTier, "tier", index+1)
continue
}
LCAHyperNodeMap[hyperNodeName] = LCAHyperNode
}
// Clone tasks queue and rest job's fit err to make sure it's a clean cache when everytime filter a hyperNode and do not affect each other between hyperNodes.
tasksQueue := tasks.Clone()
job.ResetFitErr()
Expand Down Expand Up @@ -290,6 +302,10 @@ func (alloc *Action) allocateResourceForTasksWithTopology(tasks *util.PriorityQu
klog.V(4).InfoS("Find available hyperNodes for job", "jobName", job.UID, "tier", selectedTier, "hyperNodes", hyperNodes)
}
stmt, hyperNode := alloc.selectBestHyperNode(jobStmtsByTier[selectedTier], job)
jobNewHyperNode := LCAHyperNodeMap[hyperNode]
if jobNewHyperNode != jobHyperNode {
job.PodGroup.GetAnnotations()[api.TopologyAllocateLCAHyperNode] = jobHyperNode
}
return stmt, hyperNodesWithLeftTasks[hyperNode]
}

Expand Down
236 changes: 228 additions & 8 deletions pkg/scheduler/actions/allocate/allocate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ import (
"volcano.sh/volcano/pkg/scheduler/plugins/proportion"
"volcano.sh/volcano/pkg/scheduler/uthelper"
"volcano.sh/volcano/pkg/scheduler/util"

topologyv1alpha1 "volcano.sh/apis/pkg/apis/topology/v1alpha1"
)

func TestMain(m *testing.M) {
Expand Down Expand Up @@ -277,10 +279,228 @@ func TestAllocateWithNetWorkTopologies(t *testing.T) {
}

tests := []uthelper.TestCommonStruct{
{
Name: "hard network topology constrain and job rescheduler, can not allocate job when cross highestTierAllowed tier",
PodGroups: []*schedulingv1.PodGroup{
util.BuildPodGroupWithNetWorkTopologies("pg1", "c1", "s0", "q1", 2, nil, schedulingv1.PodGroupInqueue, "hard", 1),
},
Pods: []*v1.Pod{
// should use different role, because allocate actions default to enable the role caches when predicate
util.BuildPod("c1", "p1", "s0-n1", v1.PodRunning, api.BuildResourceList("2", "4G"), "pg1", map[string]string{"volcano.sh/task-spec": "master"}, nil),
util.BuildPod("c1", "p2", "s0-n2", v1.PodRunning, api.BuildResourceList("2", "4G"), "pg1", map[string]string{"volcano.sh/task-spec": "worker"}, nil),
util.BuildPod("c1", "p3", "", v1.PodPending, api.BuildResourceList("2", "4G"), "pg1", map[string]string{"volcano.sh/task-spec": "worker"}, nil),
},
Nodes: []*v1.Node{
util.BuildNode("s0-n1", api.BuildResourceList("2", "4Gi", []api.ScalarResource{{Name: "pods", Value: "10"}}...), nil),
util.BuildNode("s0-n2", api.BuildResourceList("2", "4Gi", []api.ScalarResource{{Name: "pods", Value: "10"}}...), nil),
util.BuildNode("s1-n3", api.BuildResourceList("2", "4Gi", []api.ScalarResource{{Name: "pods", Value: "10"}}...), nil),
util.BuildNode("s1-n4", api.BuildResourceList("2", "4Gi", []api.ScalarResource{{Name: "pods", Value: "10"}}...), nil),
},
HyperNodesSetByTier: map[int]sets.Set[string]{1: sets.New[string]("s0", "s1"), 2: sets.New[string]("s2")},
HyperNodesMap: map[string]*api.HyperNodeInfo{
"s0": api.NewHyperNodeInfo(&topologyv1alpha1.HyperNode{
ObjectMeta: metav1.ObjectMeta{
Name: "s0",
},
Spec: topologyv1alpha1.HyperNodeSpec{
Tier: 1,
Members: []topologyv1alpha1.MemberSpec{
{
Type: topologyv1alpha1.MemberTypeNode,
Selector: topologyv1alpha1.MemberSelector{
ExactMatch: &topologyv1alpha1.ExactMatch{
Name: "s0-n1",
},
},
},
{
Type: topologyv1alpha1.MemberTypeNode,
Selector: topologyv1alpha1.MemberSelector{
ExactMatch: &topologyv1alpha1.ExactMatch{
Name: "s0-n2",
},
},
},
},
},
}),
"s1": api.NewHyperNodeInfo(&topologyv1alpha1.HyperNode{
ObjectMeta: metav1.ObjectMeta{
Name: "s1",
},
Spec: topologyv1alpha1.HyperNodeSpec{
Tier: 1,
Members: []topologyv1alpha1.MemberSpec{
{
Type: topologyv1alpha1.MemberTypeNode,
Selector: topologyv1alpha1.MemberSelector{
ExactMatch: &topologyv1alpha1.ExactMatch{
Name: "s1-n3",
},
},
},
{
Type: topologyv1alpha1.MemberTypeNode,
Selector: topologyv1alpha1.MemberSelector{
ExactMatch: &topologyv1alpha1.ExactMatch{
Name: "s1-n4",
},
},
},
},
},
}),
"s2": api.NewHyperNodeInfo(&topologyv1alpha1.HyperNode{
ObjectMeta: metav1.ObjectMeta{
Name: "s2",
},
Spec: topologyv1alpha1.HyperNodeSpec{
Tier: 2,
Members: []topologyv1alpha1.MemberSpec{
{
Type: topologyv1alpha1.MemberTypeHyperNode,
Selector: topologyv1alpha1.MemberSelector{
ExactMatch: &topologyv1alpha1.ExactMatch{
Name: "s0",
},
},
},
{
Type: topologyv1alpha1.MemberTypeHyperNode,
Selector: topologyv1alpha1.MemberSelector{
ExactMatch: &topologyv1alpha1.ExactMatch{
Name: "s1",
},
},
},
},
},
}),
},
HyperNodes: map[string]sets.Set[string]{
"s0": sets.New[string]("s0-n1", "s0-n2"),
"s1": sets.New[string]("s1-n3", "s1-n4"),
"s2": sets.New[string]("s0-n1", "s0-n2", "s1-n3", "s1-n4"),
},
Queues: []*schedulingv1.Queue{
util.BuildQueue("q1", 1, nil),
},
ExpectBindsNum: 0,
MinimalBindCheck: true,
},
{
Name: "hard network topology constrain and job rescheduler, can allocate job when highestTierAllowed not reached",
PodGroups: []*schedulingv1.PodGroup{
util.BuildPodGroupWithNetWorkTopologies("pg1", "c1", "s0", "q1", 2, nil, schedulingv1.PodGroupInqueue, "hard", 2),
},
Pods: []*v1.Pod{
// should use different role, because allocate actions default to enable the role caches when predicate
util.BuildPod("c1", "p1", "s0-n1", v1.PodRunning, api.BuildResourceList("2", "4G"), "pg1", map[string]string{"volcano.sh/task-spec": "master"}, nil),
util.BuildPod("c1", "p2", "s0-n2", v1.PodRunning, api.BuildResourceList("2", "4G"), "pg1", map[string]string{"volcano.sh/task-spec": "worker"}, nil),
util.BuildPod("c1", "p3", "", v1.PodPending, api.BuildResourceList("2", "4G"), "pg1", map[string]string{"volcano.sh/task-spec": "worker"}, nil),
},
Nodes: []*v1.Node{
util.BuildNode("s0-n1", api.BuildResourceList("2", "4Gi", []api.ScalarResource{{Name: "pods", Value: "10"}}...), nil),
util.BuildNode("s0-n2", api.BuildResourceList("2", "4Gi", []api.ScalarResource{{Name: "pods", Value: "10"}}...), nil),
util.BuildNode("s1-n3", api.BuildResourceList("2", "4Gi", []api.ScalarResource{{Name: "pods", Value: "10"}}...), nil),
util.BuildNode("s1-n4", api.BuildResourceList("2", "4Gi", []api.ScalarResource{{Name: "pods", Value: "10"}}...), nil),
},
HyperNodesSetByTier: map[int]sets.Set[string]{1: sets.New[string]("s0", "s1"), 2: sets.New[string]("s2")},
HyperNodesMap: map[string]*api.HyperNodeInfo{
"s0": api.NewHyperNodeInfo(&topologyv1alpha1.HyperNode{
ObjectMeta: metav1.ObjectMeta{
Name: "s0",
},
Spec: topologyv1alpha1.HyperNodeSpec{
Tier: 0,
Members: []topologyv1alpha1.MemberSpec{
{
Type: topologyv1alpha1.MemberTypeNode,
Selector: topologyv1alpha1.MemberSelector{
ExactMatch: &topologyv1alpha1.ExactMatch{
Name: "s0-n1",
},
},
},
{
Type: topologyv1alpha1.MemberTypeNode,
Selector: topologyv1alpha1.MemberSelector{
ExactMatch: &topologyv1alpha1.ExactMatch{
Name: "s0-n2",
},
},
},
},
},
}),
"s1": api.NewHyperNodeInfo(&topologyv1alpha1.HyperNode{
ObjectMeta: metav1.ObjectMeta{
Name: "s1",
},
Spec: topologyv1alpha1.HyperNodeSpec{
Tier: 0,
Members: []topologyv1alpha1.MemberSpec{
{
Type: topologyv1alpha1.MemberTypeNode,
Selector: topologyv1alpha1.MemberSelector{
ExactMatch: &topologyv1alpha1.ExactMatch{
Name: "s1-n3",
},
},
},
{
Type: topologyv1alpha1.MemberTypeNode,
Selector: topologyv1alpha1.MemberSelector{
ExactMatch: &topologyv1alpha1.ExactMatch{
Name: "s1-n4",
},
},
},
},
},
}),
"s2": api.NewHyperNodeInfo(&topologyv1alpha1.HyperNode{
ObjectMeta: metav1.ObjectMeta{
Name: "s2",
},
Spec: topologyv1alpha1.HyperNodeSpec{
Tier: 1,
Members: []topologyv1alpha1.MemberSpec{
{
Type: topologyv1alpha1.MemberTypeHyperNode,
Selector: topologyv1alpha1.MemberSelector{
ExactMatch: &topologyv1alpha1.ExactMatch{
Name: "s0",
},
},
},
{
Type: topologyv1alpha1.MemberTypeHyperNode,
Selector: topologyv1alpha1.MemberSelector{
ExactMatch: &topologyv1alpha1.ExactMatch{
Name: "s1",
},
},
},
},
},
}),
},
HyperNodes: map[string]sets.Set[string]{
"s0": sets.New[string]("s0-n1", "s0-n2"),
"s1": sets.New[string]("s1-n3", "s1-n4"),
"s2": sets.New[string]("s0-n1", "s0-n2", "s1-n3", "s1-n4"),
},
Queues: []*schedulingv1.Queue{
util.BuildQueue("q1", 1, nil),
},
ExpectBindsNum: 1,
MinimalBindCheck: true,
},
{
Name: "soft network topology constrain, can allocate job when resources are enough",
PodGroups: []*schedulingv1.PodGroup{
util.BuildPodGroupWithNetWorkTopologies("pg1", "c1", "q1", 3, nil, schedulingv1.PodGroupInqueue, "soft", 1),
util.BuildPodGroupWithNetWorkTopologies("pg1", "c1", "", "q1", 3, nil, schedulingv1.PodGroupInqueue, "soft", 1),
},
Pods: []*v1.Pod{
// should use different role, because allocate actions default to enable the role caches when predicate
Expand Down Expand Up @@ -309,7 +529,7 @@ func TestAllocateWithNetWorkTopologies(t *testing.T) {
{
Name: "hard network topology constrain, can not allocate job when cross highestTierAllowed tier",
PodGroups: []*schedulingv1.PodGroup{
util.BuildPodGroupWithNetWorkTopologies("pg1", "c1", "q1", 3, nil, schedulingv1.PodGroupInqueue, "hard", 1),
util.BuildPodGroupWithNetWorkTopologies("pg1", "c1", "", "q1", 3, nil, schedulingv1.PodGroupInqueue, "hard", 1),
},
Pods: []*v1.Pod{
// should use different role, because allocate actions default to enable the role caches when predicate
Expand Down Expand Up @@ -339,7 +559,7 @@ func TestAllocateWithNetWorkTopologies(t *testing.T) {
{
Name: "hard network topology constrain, can allocate job when highestTierAllowed not reached",
PodGroups: []*schedulingv1.PodGroup{
util.BuildPodGroupWithNetWorkTopologies("pg1", "c1", "q1", 3, nil, schedulingv1.PodGroupInqueue, "hard", 2),
util.BuildPodGroupWithNetWorkTopologies("pg1", "c1", "", "q1", 3, nil, schedulingv1.PodGroupInqueue, "hard", 2),
},
Pods: []*v1.Pod{
// should use different role, because allocate actions default to enable the role caches when predicate
Expand Down Expand Up @@ -368,7 +588,7 @@ func TestAllocateWithNetWorkTopologies(t *testing.T) {
{
Name: "hard network topology constrain, can allocate job when multi hyperNodes are available",
PodGroups: []*schedulingv1.PodGroup{
util.BuildPodGroupWithNetWorkTopologies("pg1", "c1", "q1", 2, nil, schedulingv1.PodGroupInqueue, "hard", 1),
util.BuildPodGroupWithNetWorkTopologies("pg1", "c1", "", "q1", 2, nil, schedulingv1.PodGroupInqueue, "hard", 1),
},
Pods: []*v1.Pod{
// should use different role, because allocate actions default to enable the role caches when predicate
Expand Down Expand Up @@ -396,7 +616,7 @@ func TestAllocateWithNetWorkTopologies(t *testing.T) {
{
Name: "hard network topology constrain, can allocate job when minavailiable < replicas",
PodGroups: []*schedulingv1.PodGroup{
util.BuildPodGroupWithNetWorkTopologies("pg1", "c1", "q1", 1, nil, schedulingv1.PodGroupInqueue, "hard", 1),
util.BuildPodGroupWithNetWorkTopologies("pg1", "c1", "", "q1", 1, nil, schedulingv1.PodGroupInqueue, "hard", 1),
},
Pods: []*v1.Pod{
// should use different role, because allocate actions default to enable the role caches when predicate
Expand Down Expand Up @@ -424,7 +644,7 @@ func TestAllocateWithNetWorkTopologies(t *testing.T) {
{
Name: "hard network topology constrain, two available hyperNodes, can allocate job to nodes with affinity",
PodGroups: []*schedulingv1.PodGroup{
util.BuildPodGroupWithNetWorkTopologies("pg1", "c1", "q1", 1, nil, schedulingv1.PodGroupInqueue, "hard", 1),
util.BuildPodGroupWithNetWorkTopologies("pg1", "c1", "", "q1", 1, nil, schedulingv1.PodGroupInqueue, "hard", 1),
},
Pods: []*v1.Pod{
util.BuildPod("c1", "p1", "", v1.PodPending, api.BuildResourceList("2", "4G"), "pg1", map[string]string{"volcano.sh/task-spec": "master"}, map[string]string{"nodeRole": "master"}),
Expand Down Expand Up @@ -490,8 +710,8 @@ func TestNodeLevelScoreWithNetWorkTopologies(t *testing.T) {
{
Name: "hard network topology constrain, allocate job to highest score hypeNode with node level binpack",
PodGroups: []*schedulingv1.PodGroup{
util.BuildPodGroupWithNetWorkTopologies("pg1", "c1", "q1", 2, nil, schedulingv1.PodGroupInqueue, "hard", 1),
util.BuildPodGroupWithNetWorkTopologies("pg2", "c1", "q1", 2, nil, schedulingv1.PodGroupRunning, "", 1),
util.BuildPodGroupWithNetWorkTopologies("pg1", "c1", "", "q1", 2, nil, schedulingv1.PodGroupInqueue, "hard", 1),
util.BuildPodGroupWithNetWorkTopologies("pg2", "c1", "", "q1", 2, nil, schedulingv1.PodGroupRunning, "", 1),
},
Pods: []*v1.Pod{
// should use different role, because allocate actions default to enable the role caches when predicate
Expand Down
4 changes: 2 additions & 2 deletions pkg/scheduler/api/hyper_node_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,9 +68,9 @@ func NewHyperNodesInfo(lister listerv1.NodeLister) *HyperNodesInfo {
// NewHyperNodesInfoWithCache initializes a new HyperNodesInfo instance with cache.
// This is just used for ut.
// TODO: abstract an interface to mock for ut.
func NewHyperNodesInfoWithCache(hyperNodesSetByTier map[int]sets.Set[string], realNodesSet map[string]sets.Set[string], ready *atomic.Bool) *HyperNodesInfo {
func NewHyperNodesInfoWithCache(hyperNodesMap map[string]*HyperNodeInfo, hyperNodesSetByTier map[int]sets.Set[string], realNodesSet map[string]sets.Set[string], ready *atomic.Bool) *HyperNodesInfo {
return &HyperNodesInfo{
hyperNodes: make(map[string]*HyperNodeInfo),
hyperNodes: hyperNodesMap,
hyperNodesSetByTier: hyperNodesSetByTier,
realNodesSet: realNodesSet,
ready: ready,
Expand Down
10 changes: 10 additions & 0 deletions pkg/scheduler/api/node_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"k8s.io/klog/v2"
k8sframework "k8s.io/kubernetes/pkg/scheduler/framework"

"volcano.sh/apis/pkg/apis/scheduling"
"volcano.sh/apis/pkg/apis/scheduling/v1beta1"

"volcano.sh/volcano/pkg/scheduler/api/devices/nvidia/gpushare"
Expand Down Expand Up @@ -90,6 +91,15 @@ type NodeInfo struct {
ImageStates map[string]*k8sframework.ImageStateSummary
}

// Recored podgroup old state
type PodGroupOldState struct {
// podGroupStatus cache podgroup status during schedule
// This should not be mutated after initiated
Status map[JobID]scheduling.PodGroupStatus
// recored old annotations for podgroup, used to detect changes
Annotations map[JobID]map[string]string
}

// FutureIdle returns resources that will be idle in the future:
//
// That is current idle resources plus released resources minus pipelined resources.
Expand Down
3 changes: 3 additions & 0 deletions pkg/scheduler/api/well_known_labels.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,4 +42,7 @@ const (

// topologyDecisionAnnotation is the key of topology decision about pod request resource
topologyDecisionAnnotation = "volcano.sh/topology-decision"

// TopologyAllocateLCAHyperNode is the key to the lowest common ancestor of the network topology to which the tasks assigned to a job belong.
TopologyAllocateLCAHyperNode = "volcano.sh/allocate-lca-hypernode"
)
4 changes: 4 additions & 0 deletions pkg/scheduler/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ import (
topologyinformerv1alpha1 "volcano.sh/apis/pkg/client/informers/externalversions/topology/v1alpha1"
"volcano.sh/volcano/cmd/scheduler/app/options"
"volcano.sh/volcano/pkg/features"
"volcano.sh/volcano/pkg/scheduler/api"
schedulingapi "volcano.sh/volcano/pkg/scheduler/api"
volumescheduling "volcano.sh/volcano/pkg/scheduler/capabilities/volumebinding"
"volcano.sh/volcano/pkg/scheduler/metrics"
Expand Down Expand Up @@ -1564,6 +1565,9 @@ func (sc *SchedulerCache) UpdateJobStatus(job *schedulingapi.JobInfo, updatePG b
if err != nil {
return nil, err
}
sc.Mutex.Lock()
sc.Jobs[job.UID].PodGroup.GetAnnotations()[api.TopologyAllocateLCAHyperNode] = job.PodGroup.GetAnnotations()[api.TopologyAllocateLCAHyperNode]
sc.Mutex.Unlock()
job.PodGroup = pg
}

Expand Down
Loading

0 comments on commit 196b72b

Please sign in to comment.