Skip to content

Commit

Permalink
Add network-topology-aware plugin and hyperNode score callback
Browse files Browse the repository at this point in the history
Signed-off-by: ecosysbin <[email protected]>
  • Loading branch information
ecosysbin committed Jan 20, 2025
1 parent 67957a1 commit 6cd178c
Show file tree
Hide file tree
Showing 9 changed files with 2,544 additions and 19 deletions.
20 changes: 1 addition & 19 deletions pkg/scheduler/actions/allocate/allocate.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
package allocate

import (
"sort"
"time"

"k8s.io/klog/v2"
Expand All @@ -34,7 +33,6 @@ type Action struct {
session *framework.Session
// configured flag for error cache
enablePredicateErrorCache bool
hyperNodesTiers []int

// hyperNodeScoresByJob stores job total score for all available hyperNodes, this is used for accumulate
// all nodes' scores in each available hyperNode only when job has hard network topology constrains
Expand All @@ -45,7 +43,6 @@ type Action struct {
func New() *Action {
return &Action{
enablePredicateErrorCache: true, // default to enable it
hyperNodesTiers: []int{},
hyperNodeScoresByJob: make(map[string]map[string]float64),
}
}
Expand All @@ -61,26 +58,11 @@ func (alloc *Action) parseArguments(ssn *framework.Session) {
arguments.GetBool(&alloc.enablePredicateErrorCache, conf.EnablePredicateErrCacheKey)
}

func (alloc *Action) parseHyperNodesTiers(ssn *framework.Session) {
if ssn.HyperNodesSetByTier == nil || len(ssn.HyperNodesSetByTier) == 0 {
return
}

// sort to guarantee the traverse order is from down to top.
var tiers []int
for tier := range ssn.HyperNodesSetByTier {
tiers = append(tiers, tier)
}
sort.Ints(tiers)
alloc.hyperNodesTiers = tiers
}

func (alloc *Action) Execute(ssn *framework.Session) {
klog.V(5).Infof("Enter Allocate ...")
defer klog.V(5).Infof("Leaving Allocate ...")

alloc.parseArguments(ssn)
alloc.parseHyperNodesTiers(ssn)

// the allocation for pod may have many stages
// 1. pick a queue named Q (using ssn.QueueOrderFn)
Expand Down Expand Up @@ -241,7 +223,7 @@ func (alloc *Action) allocateResourceForTasksWithTopology(tasks *util.PriorityQu
jobAllocatedHyperNode := job.PodGroup.Annotations[api.JobAllocatedHyperNode]

// Find a suitable hyperNode in one tier from down to top everytime to ensure that the selected hyperNode spans the least tier.
for _, tier := range alloc.hyperNodesTiers {
for _, tier := range ssn.HyperNodesTiers {
if tier > highestAllowedTier {
klog.V(4).ErrorS(nil, "Skip search for higher tier cause highest allowed tier reached", "jobName", job.UID, "highestAllowedTier", highestAllowedTier, "tier", tier)
break
Expand Down
252 changes: 252 additions & 0 deletions pkg/scheduler/actions/allocate/allocate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -620,6 +620,132 @@ func TestAllocateWithNetWorkTopologies(t *testing.T) {
ExpectBindsNum: 1,
MinimalBindCheck: true,
},
{
Name: "hard network topology constrain and tasks in job rescheduled, can allocate job when highestTierAllowed not reached and hyperNodesInfo has three tier",
PodGroups: []*schedulingv1.PodGroup{
util.BuildPodGroupWithNetWorkTopologies("pg1", "c1", "s3", "q1", 2, nil, schedulingv1.PodGroupInqueue, "hard", 2),
},
Pods: []*v1.Pod{
// should use different role, because allocate actions default to enable the role caches when predicate
util.BuildPod("c1", "p1", "s3-n1", v1.PodRunning, api.BuildResourceList("2", "4G"), "pg1", map[string]string{"volcano.sh/task-spec": "master"}, nil),
util.BuildPod("c1", "p2", "s3-n2", v1.PodRunning, api.BuildResourceList("2", "4G"), "pg1", map[string]string{"volcano.sh/task-spec": "worker"}, nil),
util.BuildPod("c1", "p3", "", v1.PodPending, api.BuildResourceList("2", "4G"), "pg1", map[string]string{"volcano.sh/task-spec": "worker"}, nil),
},
Nodes: []*v1.Node{
util.BuildNode("s3-n1", api.BuildResourceList("2", "4Gi", []api.ScalarResource{{Name: "pods", Value: "10"}}...), nil),
util.BuildNode("s3-n2", api.BuildResourceList("2", "4Gi", []api.ScalarResource{{Name: "pods", Value: "10"}}...), nil),
util.BuildNode("s4-n1", api.BuildResourceList("2", "4Gi", []api.ScalarResource{{Name: "pods", Value: "10"}}...), nil),
util.BuildNode("s4-n2", api.BuildResourceList("2", "4Gi", []api.ScalarResource{{Name: "pods", Value: "10"}}...), nil),
util.BuildNode("s5-n1", api.BuildResourceList("2", "4Gi", []api.ScalarResource{{Name: "pods", Value: "10"}}...), nil),
util.BuildNode("s5-n2", api.BuildResourceList("2", "4Gi", []api.ScalarResource{{Name: "pods", Value: "10"}}...), nil),
util.BuildNode("s6-n1", api.BuildResourceList("2", "4Gi", []api.ScalarResource{{Name: "pods", Value: "10"}}...), nil),
util.BuildNode("s6-n2", api.BuildResourceList("2", "4Gi", []api.ScalarResource{{Name: "pods", Value: "10"}}...), nil),
},
HyperNodesSetByTier: map[int]sets.Set[string]{
1: sets.New[string]("s3", "s4", "s5", "s6"),
2: sets.New[string]("s1", "s2"),
3: sets.New[string]("s0")},
HyperNodesMap: map[string]*api.HyperNodeInfo{
"s0": api.NewHyperNodeInfo(api.BuildHyperNode("s0", 3, []api.MemberConfig{
{
Name: "s1",
Type: topologyv1alpha1.MemberTypeHyperNode,
Selector: "exact",
},
{
Name: "s2",
Type: topologyv1alpha1.MemberTypeHyperNode,
Selector: "exact",
},
})),
"s1": api.NewHyperNodeInfo(api.BuildHyperNode("s1", 2, []api.MemberConfig{
{
Name: "s3",
Type: topologyv1alpha1.MemberTypeHyperNode,
Selector: "exact",
},
{
Name: "s4",
Type: topologyv1alpha1.MemberTypeHyperNode,
Selector: "exact",
},
})),
"s2": api.NewHyperNodeInfo(api.BuildHyperNode("s2", 2, []api.MemberConfig{
{
Name: "s5",
Type: topologyv1alpha1.MemberTypeHyperNode,
Selector: "exact",
},
{
Name: "s6",
Type: topologyv1alpha1.MemberTypeHyperNode,
Selector: "exact",
},
})),
"s3": api.NewHyperNodeInfo(api.BuildHyperNode("s3", 1, []api.MemberConfig{
{
Name: "s3-n1",
Type: topologyv1alpha1.MemberTypeNode,
Selector: "exact",
},
{
Name: "s3-n2",
Type: topologyv1alpha1.MemberTypeNode,
Selector: "exact",
},
})),
"s4": api.NewHyperNodeInfo(api.BuildHyperNode("s4", 1, []api.MemberConfig{
{
Name: "s4-n1",
Type: topologyv1alpha1.MemberTypeNode,
Selector: "exact",
},
{
Name: "s4-n2",
Type: topologyv1alpha1.MemberTypeNode,
Selector: "exact",
},
})),
"s5": api.NewHyperNodeInfo(api.BuildHyperNode("s5", 1, []api.MemberConfig{
{
Name: "s5-n1",
Type: topologyv1alpha1.MemberTypeNode,
Selector: "exact",
},
{
Name: "s5-n2",
Type: topologyv1alpha1.MemberTypeNode,
Selector: "exact",
},
})),
"s6": api.NewHyperNodeInfo(api.BuildHyperNode("s6", 1, []api.MemberConfig{
{
Name: "s6-n1",
Type: topologyv1alpha1.MemberTypeNode,
Selector: "exact",
},
{
Name: "s6-n2",
Type: topologyv1alpha1.MemberTypeNode,
Selector: "exact",
},
})),
},
HyperNodes: map[string]sets.Set[string]{
"s0": sets.New[string]("s3-n1", "s3-n2", "s4-n1", "s4-n2", "s5-n1", "s5-n2", "s6-n1", "s6-n2"),
"s1": sets.New[string]("s3-n1", "s3-n2", "s4-n1", "s4-n2"),
"s2": sets.New[string]("s5-n1", "s5-n2", "s6-n1", "s6-n2"),
"s3": sets.New[string]("s3-n1", "s3-n2"),
"s4": sets.New[string]("s4-n1", "s4-n2"),
"s5": sets.New[string]("s5-n1", "s5-n2"),
"s6": sets.New[string]("s6-n1", "s6-n2"),
},
Queues: []*schedulingv1.Queue{
util.BuildQueue("q1", 1, nil),
},
ExpectBindsNum: 1,
MinimalBindCheck: true,
},
{
Name: "hard network topology constrain and tasks in job rescheduled, can not allocate job when cross highestTierAllowed tier",
PodGroups: []*schedulingv1.PodGroup{
Expand Down Expand Up @@ -687,6 +813,132 @@ func TestAllocateWithNetWorkTopologies(t *testing.T) {
ExpectBindsNum: 0,
MinimalBindCheck: true,
},
{
Name: "hard network topology constrain and tasks in job rescheduled, can not allocate job when cross highestTierAllowed tier and hyperNodesInfo has three tier",
PodGroups: []*schedulingv1.PodGroup{
util.BuildPodGroupWithNetWorkTopologies("pg1", "c1", "s3", "q1", 2, nil, schedulingv1.PodGroupInqueue, "hard", 1),
},
Pods: []*v1.Pod{
// should use different role, because allocate actions default to enable the role caches when predicate
util.BuildPod("c1", "p1", "s3-n1", v1.PodRunning, api.BuildResourceList("2", "4G"), "pg1", map[string]string{"volcano.sh/task-spec": "master"}, nil),
util.BuildPod("c1", "p2", "s3-n2", v1.PodRunning, api.BuildResourceList("2", "4G"), "pg1", map[string]string{"volcano.sh/task-spec": "worker"}, nil),
util.BuildPod("c1", "p3", "", v1.PodPending, api.BuildResourceList("2", "4G"), "pg1", map[string]string{"volcano.sh/task-spec": "worker"}, nil),
},
Nodes: []*v1.Node{
util.BuildNode("s3-n1", api.BuildResourceList("2", "4Gi", []api.ScalarResource{{Name: "pods", Value: "10"}}...), nil),
util.BuildNode("s3-n2", api.BuildResourceList("2", "4Gi", []api.ScalarResource{{Name: "pods", Value: "10"}}...), nil),
util.BuildNode("s4-n1", api.BuildResourceList("2", "4Gi", []api.ScalarResource{{Name: "pods", Value: "10"}}...), nil),
util.BuildNode("s4-n2", api.BuildResourceList("2", "4Gi", []api.ScalarResource{{Name: "pods", Value: "10"}}...), nil),
util.BuildNode("s5-n1", api.BuildResourceList("2", "4Gi", []api.ScalarResource{{Name: "pods", Value: "10"}}...), nil),
util.BuildNode("s5-n2", api.BuildResourceList("2", "4Gi", []api.ScalarResource{{Name: "pods", Value: "10"}}...), nil),
util.BuildNode("s6-n1", api.BuildResourceList("2", "4Gi", []api.ScalarResource{{Name: "pods", Value: "10"}}...), nil),
util.BuildNode("s6-n2", api.BuildResourceList("2", "4Gi", []api.ScalarResource{{Name: "pods", Value: "10"}}...), nil),
},
HyperNodesSetByTier: map[int]sets.Set[string]{
1: sets.New[string]("s3", "s4", "s5", "s6"),
2: sets.New[string]("s1", "s2"),
3: sets.New[string]("s0")},
HyperNodesMap: map[string]*api.HyperNodeInfo{
"s0": api.NewHyperNodeInfo(api.BuildHyperNode("s0", 3, []api.MemberConfig{
{
Name: "s1",
Type: topologyv1alpha1.MemberTypeHyperNode,
Selector: "exact",
},
{
Name: "s2",
Type: topologyv1alpha1.MemberTypeHyperNode,
Selector: "exact",
},
})),
"s1": api.NewHyperNodeInfo(api.BuildHyperNode("s1", 2, []api.MemberConfig{
{
Name: "s3",
Type: topologyv1alpha1.MemberTypeHyperNode,
Selector: "exact",
},
{
Name: "s4",
Type: topologyv1alpha1.MemberTypeHyperNode,
Selector: "exact",
},
})),
"s2": api.NewHyperNodeInfo(api.BuildHyperNode("s2", 2, []api.MemberConfig{
{
Name: "s5",
Type: topologyv1alpha1.MemberTypeHyperNode,
Selector: "exact",
},
{
Name: "s6",
Type: topologyv1alpha1.MemberTypeHyperNode,
Selector: "exact",
},
})),
"s3": api.NewHyperNodeInfo(api.BuildHyperNode("s3", 1, []api.MemberConfig{
{
Name: "s3-n1",
Type: topologyv1alpha1.MemberTypeNode,
Selector: "exact",
},
{
Name: "s3-n2",
Type: topologyv1alpha1.MemberTypeNode,
Selector: "exact",
},
})),
"s4": api.NewHyperNodeInfo(api.BuildHyperNode("s4", 1, []api.MemberConfig{
{
Name: "s4-n1",
Type: topologyv1alpha1.MemberTypeNode,
Selector: "exact",
},
{
Name: "s4-n2",
Type: topologyv1alpha1.MemberTypeNode,
Selector: "exact",
},
})),
"s5": api.NewHyperNodeInfo(api.BuildHyperNode("s5", 1, []api.MemberConfig{
{
Name: "s5-n1",
Type: topologyv1alpha1.MemberTypeNode,
Selector: "exact",
},
{
Name: "s5-n2",
Type: topologyv1alpha1.MemberTypeNode,
Selector: "exact",
},
})),
"s6": api.NewHyperNodeInfo(api.BuildHyperNode("s6", 1, []api.MemberConfig{
{
Name: "s6-n1",
Type: topologyv1alpha1.MemberTypeNode,
Selector: "exact",
},
{
Name: "s6-n2",
Type: topologyv1alpha1.MemberTypeNode,
Selector: "exact",
},
})),
},
HyperNodes: map[string]sets.Set[string]{
"s0": sets.New[string]("s3-n1", "s3-n2", "s4-n1", "s4-n2", "s5-n1", "s5-n2", "s6-n1", "s6-n2"),
"s1": sets.New[string]("s3-n1", "s3-n2", "s4-n1", "s4-n2"),
"s2": sets.New[string]("s5-n1", "s5-n2", "s6-n1", "s6-n2"),
"s3": sets.New[string]("s3-n1", "s3-n2"),
"s4": sets.New[string]("s4-n1", "s4-n2"),
"s5": sets.New[string]("s5-n1", "s5-n2"),
"s6": sets.New[string]("s6-n1", "s6-n2"),
},
Queues: []*schedulingv1.Queue{
util.BuildQueue("q1", 1, nil),
},
ExpectBindsNum: 0,
MinimalBindCheck: true,
},
{
Name: "hard network topology constrain and tasks in job rescheduled, can not allocate job when LCAHyperNode is empty",
PodGroups: []*schedulingv1.PodGroup{
Expand Down
2 changes: 2 additions & 0 deletions pkg/scheduler/conf/scheduler_conf.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,8 @@ type PluginOption struct {
EnabledOverused *bool `yaml:"enabledOverused"`
// EnabledAllocatable defines whether allocatable is enabled
EnabledAllocatable *bool `yaml:"enabledAllocatable"`
// EnabledNetworkTopology defines whether network topology is enabled
EnabledNetworkTopology *bool `yaml:"enabledNetworkTopology"`
// Arguments defines the different arguments that can be given to different plugins
Arguments map[string]interface{} `yaml:"arguments"`
}
17 changes: 17 additions & 0 deletions pkg/scheduler/framework/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package framework
import (
"context"
"fmt"
"sort"

v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/equality"
Expand Down Expand Up @@ -82,6 +83,7 @@ type Session struct {
// have the same topology domain, e.g., nodes under the same switch or tor, jobs allocated in the same
// hyperNode can gain a better performance, the lower the tier of hyperNode, the better performance.
HyperNodesSetByTier map[int]sets.Set[string]
HyperNodesTiers []int
// RealNodesList maps hyperNode Name -> nodes under the hyperNode.
RealNodesList map[string][]*api.NodeInfo
HyperNodesReadyToSchedule bool
Expand Down Expand Up @@ -202,6 +204,7 @@ func openSession(cache cache.Cache) *Session {
ssn.NodeList = util.GetNodeList(snapshot.Nodes, snapshot.NodeList)
ssn.HyperNodes = snapshot.HyperNodes
ssn.HyperNodesSetByTier = snapshot.HyperNodesSetByTier
parseHyperNodesTiers(ssn)
ssn.RealNodesList = util.GetRealNodesListByHyperNode(snapshot.RealNodesSet, snapshot.Nodes)
ssn.HyperNodesReadyToSchedule = snapshot.HyperNodesReadyToSchedule
ssn.Nodes = snapshot.Nodes
Expand All @@ -220,6 +223,20 @@ func openSession(cache cache.Cache) *Session {
return ssn
}

func parseHyperNodesTiers(ssn *Session) {
if len(ssn.HyperNodesSetByTier) == 0 {
return
}

// sort to guarantee the traverse order is from down to top.
var tiers []int
for tier := range ssn.HyperNodesSetByTier {
tiers = append(tiers, tier)
}
sort.Ints(tiers)
ssn.HyperNodesTiers = tiers
}

// updateQueueStatus updates allocated field in queue status on session close.
func updateQueueStatus(ssn *Session) {
rootQueue := api.QueueID("root")
Expand Down
Loading

0 comments on commit 6cd178c

Please sign in to comment.