Skip to content

Commit

Permalink
rebase with pr:8704
Browse files Browse the repository at this point in the history
Signed-off-by: ecosysbin <[email protected]>
  • Loading branch information
ecosysbin committed Dec 27, 2024
1 parent f06b8c9 commit 5036976
Show file tree
Hide file tree
Showing 5 changed files with 102 additions and 64 deletions.
1 change: 0 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -190,5 +190,4 @@ replace (
k8s.io/sample-apiserver => k8s.io/sample-apiserver v0.31.1
k8s.io/sample-cli-plugin => k8s.io/sample-cli-plugin v0.31.1
k8s.io/sample-controller => k8s.io/sample-controller v0.31.1
volcano.sh/apis => github.com/Monokaix/apis v0.0.0-20241205070443-2cc3b54d83c7
)
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/Microsoft/go-winio v0.6.0 h1:slsWYD/zyx7lCXoZVlvQrj0hPTM1HI4+v1sIda2yDvg=
github.com/Microsoft/go-winio v0.6.0/go.mod h1:cTAf44im0RAYeL23bpB+fzCyDH2MJiz2BO69KH/soAE=
github.com/Monokaix/apis v0.0.0-20241205070443-2cc3b54d83c7 h1:Fb5bCm+ZygQ4Rrqt9x+uTG60kVlm41prlOpWwuWyHA4=
github.com/Monokaix/apis v0.0.0-20241205070443-2cc3b54d83c7/go.mod h1:XHIjTlHDMZTLRg2Y2JAkj85iP0iiet2tv+HfPQZrsHs=
github.com/NYTimes/gziphandler v1.1.1 h1:ZUDjpQae29j0ryrS0u/B8HZfJBtBQHjqw2rQ2cqUQ3I=
github.com/NYTimes/gziphandler v1.1.1/go.mod h1:n/CVRwUEOgIxrgPvAQhUUr9oeUtvrhMomdKFjzJNB0c=
github.com/agiledragon/gomonkey/v2 v2.11.0 h1:5oxSgA+tC1xuGsrIorR+sYiziYltmJyEZ9qA25b6l5U=
Expand Down Expand Up @@ -512,3 +510,5 @@ sigs.k8s.io/yaml v1.4.0 h1:Mk1wCc2gy/F0THH0TAp1QYyJNzRm2KCLy3o5ASXVI5E=
sigs.k8s.io/yaml v1.4.0/go.mod h1:Ejl7/uTz7PSA4eKMyQCUTnhZYNmLIl+5c2lQPGR2BPY=
stathat.com/c/consistent v1.0.0 h1:ezyc51EGcRPJUxfHGSgJjWzJdj3NiMU9pNfLNGiXV0c=
stathat.com/c/consistent v1.0.0/go.mod h1:QkzMWzcbB+yQBL2AttO6sgsQS/JSTapcDISJalmCDS0=
volcano.sh/apis v1.10.0-alpha.0.0.20241218081838-e5d361b6bfbe h1:iHd1Xt36a7S47IFksuF0h9W9J4LKzhBEz0C9XbkBvB8=
volcano.sh/apis v1.10.0-alpha.0.0.20241218081838-e5d361b6bfbe/go.mod h1:XHIjTlHDMZTLRg2Y2JAkj85iP0iiet2tv+HfPQZrsHs=
44 changes: 8 additions & 36 deletions pkg/scheduler/actions/allocate/allocate.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,8 +247,13 @@ func (alloc *Action) allocateResourceForTasksWithTopology(tasks *util.PriorityQu
break
}
for _, hyperNodeName := range ssn.HyperNodesListByTier[tier] {
jobNewHyperNode, _ := util.FindLCAHyperNode(hyperNodeName, jobHyperNode, nil)
jobNewHyperNodeMap[hyperNodeName] = jobNewHyperNode
if jobHyperNode == "" {
// job first scheduler
jobNewHyperNodeMap[hyperNodeName] = hyperNodeName
} else {
jobNewHyperNode, _ := util.FindLCAHyperNode(hyperNodeName, jobHyperNode, nil)
jobNewHyperNodeMap[hyperNodeName] = jobNewHyperNode
}

nodes, ok := ssn.HyperNodes[hyperNodeName]
if !ok {
Expand Down Expand Up @@ -291,8 +296,6 @@ func (alloc *Action) allocateResourceForTasksWithTopology(tasks *util.PriorityQu
klog.V(4).InfoS("Find available hyperNodes for job", "jobName", job.UID, "tier", selectedTier, "hyperNodes", hyperNodes)
}
stmt, hyperNode := alloc.selectBestHyperNode(jobStmtsByTier[selectedTier], job)

// set the LCA hyperNode for job,next scheduling will use this hyperNode as the base for selecting hyperNode.
jobNewHyperNode := jobNewHyperNodeMap[hyperNode]
job.PodGroup.GetAnnotations()[api.TopologyAllocateLCAHyperNode] = jobNewHyperNode
return stmt, hyperNodesWithLeftTasks[hyperNode]
Expand Down Expand Up @@ -489,6 +492,7 @@ func (alloc *Action) allocateResourcesForTask(stmt *framework.Statement, task *a
if task.InitResreq.LessEqual(node.Idle, api.Zero) {
klog.V(3).Infof("Binding Task <%v/%v> to node <%v>", task.Namespace, task.Name, node.Name)
if err := stmt.Allocate(task, node); err != nil {
klog.Errorf("Failed to bind Task %v on %v in Session %v, err: %v",
task.UID, node.Name, alloc.session.UID, err)
if rollbackErr := stmt.UnAllocate(task); rollbackErr != nil {
klog.Errorf("Failed to unallocate Task %v on %v in Session %v for %v.",
Expand All @@ -501,38 +505,6 @@ func (alloc *Action) allocateResourcesForTask(stmt *framework.Statement, task *a
return
}

klog.V(3).Infof("Predicates failed in allocate for task <%s/%s> on node <%s> with limited resources",
task.Namespace, task.Name, node.Name)

// Allocate releasing resource to the task if any.
if task.InitResreq.LessEqual(node.FutureIdle(), api.Zero) {
klog.V(3).Infof("Pipelining Task <%v/%v> to node <%v> for <%v> on <%v>",
task.Namespace, task.Name, node.Name, task.InitResreq, node.Releasing)
if err := stmt.Pipeline(task, node.Name, false); err != nil {
klog.Errorf("Failed to pipeline Task %v on %v in Session %v for %v.",
task.UID, node.Name, alloc.session.UID, err)
} else {
metrics.UpdateE2eSchedulingDurationByJob(job.Name, string(job.Queue), job.Namespace, metrics.Duration(job.CreationTimestamp.Time))
metrics.UpdateE2eSchedulingLastTimeByJob(job.Name, string(job.Queue), job.Namespace, time.Now())
=======
if ssn.JobReady(job) {
klog.V(3).InfoS("Job ready, return statement", "jobName", job.UID)
return stmt
} else {
if !ssn.JobPipelined(job) {
// Allocate idle resource to the task.
if task.InitResreq.LessEqual(node.Idle, api.Zero) {
klog.V(3).Infof("Binding Task <%v/%v> to node <%v>", task.Namespace, task.Name, node.Name)
if err := stmt.Allocate(task, node); err != nil {
klog.Errorf("Failed to bind Task %v on %v in Session %v, err: %v",
task.UID, node.Name, alloc.session.UID, err)
} else {
metrics.UpdateE2eSchedulingDurationByJob(job.Name, string(job.Queue), job.Namespace, metrics.Duration(job.CreationTimestamp.Time))
metrics.UpdateE2eSchedulingLastTimeByJob(job.Name, string(job.Queue), job.Namespace, time.Now())
}
return
}

klog.V(3).Infof("Predicates failed in allocate for task <%s/%s> on node <%s> with limited resources",
task.Namespace, task.Name, node.Name)

Expand Down
115 changes: 92 additions & 23 deletions pkg/scheduler/util/scheduler_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,29 +137,31 @@ func PrioritizeHyperNodes(candidateHyperNodes map[string][]*api.NodeInfo, nodeSc
if err != nil {
return nil, err
}

// plugin scores of hyperNode.
for pluginName, scores := range mapScores {
for hyperNode, score := range scores {
klog.V(5).InfoS("Add plugin score at hypeNode", "jobName", job.UID, "pluginName", pluginName, "hyperNodeName", hyperNode, "score", score)
hyperNodesScoreMap[hyperNode] += score
}
}

// accumulate node scores in NodeOrder and hyperNode score itself as the final score of each hyperNode.
for hyperNodeName, score := range nodeScoresInHyperNode {
klog.V(5).InfoS("Add node level scores to final hyperNode score", "jobName", job.UID, "hyperNodeName", hyperNodeName, "score", score)
hyperNodesScoreMap[hyperNodeName] += score
}

hyperNodeScores := make(map[float64][]string)
hyperNodeScoreMap := make(map[string]float64)
for hyperNodeName := range candidateHyperNodes {
// If no plugin is applied to this node, the default is 0.0
score := 0.0
if value, ok := hyperNodesScoreMap[hyperNodeName]; ok {
score += value
}
hyperNodeScores[score] = append(hyperNodeScores[score], hyperNodeName)

if klog.V(5).Enabled() {
hyperNodeScoreMap[hyperNodeName] = score
}
}

klog.V(5).InfoS("Prioritize hyperNode score map for job", "jobName", job.UID, "scoreMap", hyperNodeScoreMap)
return hyperNodeScores, nil
}
Expand Down Expand Up @@ -215,24 +217,6 @@ func SelectBestHyperNode(hyperNodeScores map[float64][]string) string {
return bestHyperNodes[rand.Intn(len(bestHyperNodes))]
}

// SelectBestHyperNode return the best hyperNode name whose score is highest, pick one randomly if there are many hyperNodes with same score.
func SelectBestHyperNode(hyperNodeScores map[float64][]string) string {
var bestHyperNodes []string
maxScore := -1.0
for score, hyperNodes := range hyperNodeScores {
if score > maxScore {
maxScore = score
bestHyperNodes = hyperNodes
}
}

if len(bestHyperNodes) == 0 {
return ""
}

return bestHyperNodes[rand.Intn(len(bestHyperNodes))]
}

// GetNodeList returns values of the map 'nodes'
func GetNodeList(nodes map[string]*api.NodeInfo, nodeList []string) []*api.NodeInfo {
result := make([]*api.NodeInfo, 0, len(nodeList))
Expand Down Expand Up @@ -306,3 +290,88 @@ func ConvertRes2ResList(res *api.Resource) v1.ResourceList {
}
return rl
}

// Find the hyperNode to which the node belongs.
func FindHyperNodeOfNode(nodeName string, hyperNodeTree []map[string][]string) string {
for hyperNode, nodes := range hyperNodeTree[len(hyperNodeTree)-1] {
for _, node := range nodes {
if node == nodeName {
return hyperNode
}
}
}
return ""
}

func FindJobTaskNumOfHyperNode(hyperNodeName string, job *api.JobInfo, hyperNodeTree []map[string][]string) int {
revertHyperNodeTree := make([]map[string][]string, len(hyperNodeTree))
for i := len(hyperNodeTree) - 1; i >= 0; i-- {
revertHyperNodeTree[len(hyperNodeTree)-1-i] = hyperNodeTree[i]
}

hyperNodesMap := make(map[string]sets.Set[string])
for i := 0; i < len(revertHyperNodeTree); i++ {
for name, children := range revertHyperNodeTree[i] {
hyperNodesMap[name] = sets.Set[string]{}
hyperNodesMap[name].Insert(name)
for _, child := range children {
hyperNodesMap[name].Insert(child)
if v, ok := hyperNodesMap[child]; ok {
hyperNodesMap[name] = hyperNodesMap[name].Union(v)
}
}
}
}
// find the hyperNodeMap of the hyperNodeName
hyperNodeMap := hyperNodesMap[hyperNodeName]
taskCount := 0
for _, task := range job.Tasks {
if hyperNodeMap.Has(task.NodeName) {
taskCount++
}
}
return taskCount
}

// FindOutRootHyperNode find out the root hypernode of the job when the hypernode join the job.
func FindLCAHyperNode(hyperNodeName string, jobHyperNode string, hyperNodeTree []map[string][]string) (string, int) {
revertHyperNodeTree := make([]map[string][]string, len(hyperNodeTree))
for i := len(hyperNodeTree) - 1; i >= 0; i-- {
revertHyperNodeTree[len(hyperNodeTree)-1-i] = hyperNodeTree[i]
}

hyperNodesMap := make(map[string]sets.Set[string])
for i := 0; i < len(revertHyperNodeTree); i++ {
for name, children := range revertHyperNodeTree[i] {
hyperNodesMap[name] = sets.Set[string]{}
hyperNodesMap[name].Insert(name)
for _, child := range children {
hyperNodesMap[name].Insert(child)
if v, ok := hyperNodesMap[child]; ok {
hyperNodesMap[name] = hyperNodesMap[name].Union(v)
}
}
}
}

hyperNodesListByTier := [][]string{}
for i := 0; i < len(revertHyperNodeTree); i++ {
hyperNodes := []string{}
for name := range revertHyperNodeTree[i] {
hyperNodes = append(hyperNodes, name)
}
hyperNodesListByTier = append(hyperNodesListByTier, hyperNodes)
}

for index, tierHyperNodes := range hyperNodesListByTier {
for _, hyperNode := range tierHyperNodes {
hyperNodeSet := hyperNodesMap[hyperNode]
if hyperNodeSet.Has(hyperNodeName) {
if jobHyperNode == "" || hyperNodeSet.Has(jobHyperNode) {
return hyperNode, index + 1
}
}
}
}
return "", -1
}
2 changes: 0 additions & 2 deletions pkg/scheduler/util/scheduler_helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@ package util
import (
"testing"

"k8s.io/apimachinery/pkg/util/sets"

"github.com/stretchr/testify/assert"
"k8s.io/apimachinery/pkg/api/equality"
"k8s.io/apimachinery/pkg/util/sets"
Expand Down

0 comments on commit 5036976

Please sign in to comment.