Skip to content

Commit

Permalink
Add network-topology-aware plugin and hyperNode score callback
Browse files Browse the repository at this point in the history
Signed-off-by: ecosysbin <[email protected]>
  • Loading branch information
ecosysbin committed Jan 20, 2025
1 parent 67957a1 commit 325f845
Show file tree
Hide file tree
Showing 8 changed files with 2,176 additions and 19 deletions.
20 changes: 1 addition & 19 deletions pkg/scheduler/actions/allocate/allocate.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
package allocate

import (
"sort"
"time"

"k8s.io/klog/v2"
Expand All @@ -34,7 +33,6 @@ type Action struct {
session *framework.Session
// configured flag for error cache
enablePredicateErrorCache bool
hyperNodesTiers []int

// hyperNodeScoresByJob stores job total score for all available hyperNodes, this is used for accumulate
// all nodes' scores in each available hyperNode only when job has hard network topology constrains
Expand All @@ -45,7 +43,6 @@ type Action struct {
func New() *Action {
return &Action{
enablePredicateErrorCache: true, // default to enable it
hyperNodesTiers: []int{},
hyperNodeScoresByJob: make(map[string]map[string]float64),
}
}
Expand All @@ -61,26 +58,11 @@ func (alloc *Action) parseArguments(ssn *framework.Session) {
arguments.GetBool(&alloc.enablePredicateErrorCache, conf.EnablePredicateErrCacheKey)
}

func (alloc *Action) parseHyperNodesTiers(ssn *framework.Session) {
if ssn.HyperNodesSetByTier == nil || len(ssn.HyperNodesSetByTier) == 0 {
return
}

// sort to guarantee the traverse order is from down to top.
var tiers []int
for tier := range ssn.HyperNodesSetByTier {
tiers = append(tiers, tier)
}
sort.Ints(tiers)
alloc.hyperNodesTiers = tiers
}

func (alloc *Action) Execute(ssn *framework.Session) {
klog.V(5).Infof("Enter Allocate ...")
defer klog.V(5).Infof("Leaving Allocate ...")

alloc.parseArguments(ssn)
alloc.parseHyperNodesTiers(ssn)

// the allocation for pod may have many stages
// 1. pick a queue named Q (using ssn.QueueOrderFn)
Expand Down Expand Up @@ -241,7 +223,7 @@ func (alloc *Action) allocateResourceForTasksWithTopology(tasks *util.PriorityQu
jobAllocatedHyperNode := job.PodGroup.Annotations[api.JobAllocatedHyperNode]

// Find a suitable hyperNode in one tier from down to top everytime to ensure that the selected hyperNode spans the least tier.
for _, tier := range alloc.hyperNodesTiers {
for _, tier := range ssn.HyperNodesTiers {
if tier > highestAllowedTier {
klog.V(4).ErrorS(nil, "Skip search for higher tier cause highest allowed tier reached", "jobName", job.UID, "highestAllowedTier", highestAllowedTier, "tier", tier)
break
Expand Down
2 changes: 2 additions & 0 deletions pkg/scheduler/conf/scheduler_conf.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,8 @@ type PluginOption struct {
EnabledOverused *bool `yaml:"enabledOverused"`
// EnabledAllocatable defines whether allocatable is enabled
EnabledAllocatable *bool `yaml:"enabledAllocatable"`
// EnabledNetworkTopology defines whether network topology is enabled
EnabledNetworkTopology *bool `yaml:"enabledNetworkTopology"`
// Arguments defines the different arguments that can be given to different plugins
Arguments map[string]interface{} `yaml:"arguments"`
}
17 changes: 17 additions & 0 deletions pkg/scheduler/framework/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package framework
import (
"context"
"fmt"
"sort"

v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/equality"
Expand Down Expand Up @@ -82,6 +83,7 @@ type Session struct {
// have the same topology domain, e.g., nodes under the same switch or tor, jobs allocated in the same
// hyperNode can gain a better performance, the lower the tier of hyperNode, the better performance.
HyperNodesSetByTier map[int]sets.Set[string]
HyperNodesTiers []int
// RealNodesList maps hyperNode Name -> nodes under the hyperNode.
RealNodesList map[string][]*api.NodeInfo
HyperNodesReadyToSchedule bool
Expand Down Expand Up @@ -202,6 +204,7 @@ func openSession(cache cache.Cache) *Session {
ssn.NodeList = util.GetNodeList(snapshot.Nodes, snapshot.NodeList)
ssn.HyperNodes = snapshot.HyperNodes
ssn.HyperNodesSetByTier = snapshot.HyperNodesSetByTier
parseHyperNodesTiers(ssn)
ssn.RealNodesList = util.GetRealNodesListByHyperNode(snapshot.RealNodesSet, snapshot.Nodes)
ssn.HyperNodesReadyToSchedule = snapshot.HyperNodesReadyToSchedule
ssn.Nodes = snapshot.Nodes
Expand All @@ -220,6 +223,20 @@ func openSession(cache cache.Cache) *Session {
return ssn
}

func parseHyperNodesTiers(ssn *Session) {
if len(ssn.HyperNodesSetByTier) == 0 {
return
}

// sort to guarantee the traverse order is from down to top.
var tiers []int
for tier := range ssn.HyperNodesSetByTier {
tiers = append(tiers, tier)
}
sort.Ints(tiers)
ssn.HyperNodesTiers = tiers
}

// updateQueueStatus updates allocated field in queue status on session close.
func updateQueueStatus(ssn *Session) {
rootQueue := api.QueueID("root")
Expand Down
210 changes: 210 additions & 0 deletions pkg/scheduler/plugins/networktopologyaware/networktopologyaware.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,210 @@
/*
Copyright 2019 The Volcano Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package networktopologyaware

import (
"k8s.io/klog/v2"

"volcano.sh/volcano/pkg/scheduler/api"
"volcano.sh/volcano/pkg/scheduler/framework"
"volcano.sh/volcano/pkg/scheduler/util"
)

const (
// PluginName indicates name of volcano scheduler plugin.
PluginName = "networktopologyaware"
BaseScore = 100.0
TaskBaseScore = 10.0
ZeroScore = 0.0
NetworkTopologyWeight = "weight"
)

type networkTopologyAwarePlugin struct {
// Arguments given for the plugin
pluginArguments framework.Arguments
*hyperNodesTier
}

type hyperNodesTier struct {
maxTier int
minTier int
}

func (h *hyperNodesTier) init(hyperNodesSetByTier []int) {
if len(hyperNodesSetByTier) == 0 {
return
}
h.minTier = hyperNodesSetByTier[0]
h.maxTier = hyperNodesSetByTier[len(hyperNodesSetByTier)-1]
}

// New function returns prioritizePlugin object
func New(arguments framework.Arguments) framework.Plugin {
return &networkTopologyAwarePlugin{
pluginArguments: arguments,
hyperNodesTier: &hyperNodesTier{},
}
}

func (nta *networkTopologyAwarePlugin) Name() string {
return PluginName
}

func calculateWeight(args framework.Arguments) int {
weight := 1
args.GetInt(&weight, NetworkTopologyWeight)
return weight
}

func (nta *networkTopologyAwarePlugin) OnSessionOpen(ssn *framework.Session) {
klog.V(5).Infof("Enter networkTopologyAwarePlugin plugin ...")
defer func() {
klog.V(5).Infof("Leaving networkTopologyAware plugin ...")
}()

weight := calculateWeight(nta.pluginArguments)
nta.hyperNodesTier.init(ssn.HyperNodesTiers)

hyperNodeFn := func(job *api.JobInfo, hyperNodes map[string][]*api.NodeInfo) (map[string]float64, error) {
hyperNodeScores := make(map[string]float64)
jobAllocatedHyperNode := job.PodGroup.GetAnnotations()[api.JobAllocatedHyperNode]
if jobAllocatedHyperNode == "" {
for hyperNode := range hyperNodes {
hyperNodeScores[hyperNode] = ZeroScore
}
return hyperNodeScores, nil
}
// The job still has remaining tasks to be scheduled, calculate score based on LCAHyperNode tier.
maxScore := ZeroScore
scoreHyperNode := map[float64][]string{}
for hyperNode := range hyperNodes {
score := nta.networkTopologyAwareScore(hyperNode, jobAllocatedHyperNode, ssn.HyperNodes)
score *= float64(weight)
hyperNodeScores[hyperNode] = score
if score >= maxScore {
maxScore = score
scoreHyperNode[score] = append(scoreHyperNode[score], hyperNode)
}
}
// The job still has remaining tasks to be scheduled, calculate score based on task num when max score of hyperNode has more than one.
if len(scoreHyperNode[maxScore]) > 1 {
for hyperNode, score := range hyperNodeScores {
if score == maxScore {
taskNumScore := nta.networkTopologyAwareScoreWithTaskNum(hyperNode, job, ssn.RealNodesList)
taskNumScore *= float64(weight)
hyperNodeScores[hyperNode] += taskNumScore
}
}
}

klog.V(1).Infof("networkTopologyAware score is: %v", hyperNodeScores)
return hyperNodeScores, nil
}

nodeFn := func(task *api.TaskInfo, nodes []*api.NodeInfo) (map[string]float64, error) {
nodeScores := make(map[string]float64)

for _, node := range nodes {
nodeScores[node.Name] = ZeroScore
}

taskJob := ssn.Jobs[task.Job]
jobAllocatedHyperNode := taskJob.PodGroup.GetAnnotations()[api.JobAllocatedHyperNode]
if jobAllocatedHyperNode == "" {
return nodeScores, nil
}
// The job still has remaining tasks to be scheduled, calculate score based on LCAHyperNode tier.
maxScore := ZeroScore
scoreNodes := map[float64][]string{}
for _, node := range nodes {
hyperNode := util.FindHyperNodeOfNode(node.Name, ssn.RealNodesList, ssn.HyperNodesTiers, ssn.HyperNodesSetByTier)
score := nta.networkTopologyAwareScore(hyperNode, jobAllocatedHyperNode, ssn.HyperNodes)
score *= float64(weight)
nodeScores[node.Name] = score
if score >= maxScore {
maxScore = score
scoreNodes[score] = append(scoreNodes[score], node.Name)
}
}
// The job still has remaining tasks to be scheduled, calculate score based on task num when max score of hyperNode has more than one.
if maxScore != ZeroScore && len(scoreNodes[maxScore]) > 1 {
for node, score := range nodeScores {
if score == maxScore {
hyperNode := util.FindHyperNodeOfNode(node, ssn.RealNodesList, ssn.HyperNodesTiers, ssn.HyperNodesSetByTier)
taskNumScore := nta.networkTopologyAwareScoreWithTaskNum(hyperNode, taskJob, ssn.RealNodesList)
taskNumScore *= float64(weight)
nodeScores[node] += taskNumScore
}
}
}

klog.V(1).Infof("networkTopologyAware score is: %v", nodeScores)
return nodeScores, nil
}

ssn.AddHyperNodeOrederFn(nta.Name(), hyperNodeFn)
ssn.AddBatchNodeOrderFn(nta.Name(), nodeFn)
}

func (bp *networkTopologyAwarePlugin) OnSessionClose(ssn *framework.Session) {
}

// networkTopologyAwareScore use the best fit polices during scheduling.

// Goals:
// - The tier of LCAHyperNode of job allocatedHyperNode should be as low as possible.
func (nta *networkTopologyAwarePlugin) networkTopologyAwareScore(hyperNodeName, jobAllocatedHyperNode string, hyperNodeMap api.HyperNodeInfoMap) float64 {
if hyperNodeName == jobAllocatedHyperNode {
return BaseScore
}
// Calculate hyperNode tier index score.
LCAHyperNode := hyperNodeMap.GetLCAHyperNode(hyperNodeName, jobAllocatedHyperNode)
hyperNodeInfo, ok := hyperNodeMap[LCAHyperNode]
if !ok {
return ZeroScore
}

hyperNodeTierScore := BaseScore * scoreHyperNodeWithTier(hyperNodeInfo.Tier(), nta.minTier, nta.maxTier)
return hyperNodeTierScore
}

// Goals:
// - Tasks under a job should be scheduled to one hyperNode as much as possible.
func (nta *networkTopologyAwarePlugin) networkTopologyAwareScoreWithTaskNum(hyperNodeName string, job *api.JobInfo, realNodesList map[string][]*api.NodeInfo) float64 {
taskNum := util.FindJobTaskNumOfHyperNode(hyperNodeName, job, realNodesList)
taskNumScore := ZeroScore
if len(job.Tasks) > 0 {
taskNumScore = TaskBaseScore * scoreHyperNodeWithTaskNum(taskNum, len(job.Tasks))
}
return taskNumScore
}

func scoreHyperNodeWithTier(tier int, minTier int, sumTier int) float64 {
// Use tier to calculate scores and map the original score to the range between 0 and 1.
if minTier == sumTier {
return ZeroScore
}
return float64(sumTier-tier) / float64(sumTier-minTier)
}

func scoreHyperNodeWithTaskNum(taskNum int, allTaskNum int) float64 {
// Calculate task distribution rate as score and map the original score to the range between 0 and 1.
if allTaskNum == 0 {
return ZeroScore
}
return float64(taskNum) / float64(allTaskNum)
}
Loading

0 comments on commit 325f845

Please sign in to comment.