Skip to content

Commit

Permalink
compute GetFairMaxResource O(n) instead of NLog(N)
Browse files Browse the repository at this point in the history
  • Loading branch information
psantacl committed Aug 23, 2024
1 parent de7b37b commit 066797f
Show file tree
Hide file tree
Showing 3 changed files with 48 additions and 27 deletions.
4 changes: 3 additions & 1 deletion pkg/scheduler/objects/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -1177,6 +1177,7 @@ func (sq *Queue) sortQueues() []*Queue {
}
// Create a list of the queues with pending resources
sortedQueues := make([]*Queue, 0)
sortedMaxFairResources := make([]*resources.Resource, 0)
for _, child := range sq.GetCopyOfChildren() {
// a stopped queue cannot be scheduled
if child.IsStopped() {
Expand All @@ -1185,10 +1186,11 @@ func (sq *Queue) sortQueues() []*Queue {
// queue must have pending resources to be considered for scheduling
if resources.StrictlyGreaterThanZero(child.GetPendingResource()) {
sortedQueues = append(sortedQueues, child)
sortedMaxFairResources = append(sortedMaxFairResources, child.GetFairMaxResource())
}
}
// Sort the queues
sortQueue(sortedQueues, sq.getSortType(), sq.IsPrioritySortEnabled())
sortQueue(sortedQueues, sortedMaxFairResources, sq.getSortType(), sq.IsPrioritySortEnabled())

return sortedQueues
}
Expand Down
18 changes: 9 additions & 9 deletions pkg/scheduler/objects/sorters.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,13 @@ import (
"github.com/apache/yunikorn-core/pkg/scheduler/policies"
)

func sortQueue(queues []*Queue, sortType policies.SortPolicy, considerPriority bool) {
func sortQueue(queues []*Queue, fairMaxResources []*resources.Resource, sortType policies.SortPolicy, considerPriority bool) {
sortingStart := time.Now()
if sortType == policies.FairSortPolicy {
if considerPriority {
sortQueuesByPriorityAndFairness(queues)
sortQueuesByPriorityAndFairness(queues, fairMaxResources)
} else {
sortQueuesByFairnessAndPriority(queues)
sortQueuesByFairnessAndPriority(queues, fairMaxResources)
}
} else {
if considerPriority {
Expand All @@ -53,7 +53,7 @@ func sortQueuesByPriority(queues []*Queue) {
})
}

func sortQueuesByPriorityAndFairness(queues []*Queue) {
func sortQueuesByPriorityAndFairness(queues []*Queue, fairMaxResources []*resources.Resource) {
sort.SliceStable(queues, func(i, j int) bool {
l := queues[i]
r := queues[j]
Expand All @@ -66,8 +66,8 @@ func sortQueuesByPriorityAndFairness(queues []*Queue) {
return false
}

comp := resources.CompUsageRatioSeparately(l.GetAllocatedResource(), l.GetGuaranteedResource(), l.GetFairMaxResource(),
r.GetAllocatedResource(), r.GetGuaranteedResource(), r.GetFairMaxResource())
comp := resources.CompUsageRatioSeparately(l.GetAllocatedResource(), l.GetGuaranteedResource(), fairMaxResources[i],
r.GetAllocatedResource(), r.GetGuaranteedResource(), fairMaxResources[j])

if comp == 0 {
return resources.StrictlyGreaterThan(resources.Sub(l.GetPendingResource(), r.GetPendingResource()), resources.Zero)
Expand All @@ -76,13 +76,13 @@ func sortQueuesByPriorityAndFairness(queues []*Queue) {
})
}

func sortQueuesByFairnessAndPriority(queues []*Queue) {
func sortQueuesByFairnessAndPriority(queues []*Queue, fairMaxResources []*resources.Resource) {
sort.SliceStable(queues, func(i, j int) bool {
l := queues[i]
r := queues[j]

comp := resources.CompUsageRatioSeparately(l.GetAllocatedResource(), l.GetGuaranteedResource(), l.GetFairMaxResource(),
r.GetAllocatedResource(), r.GetGuaranteedResource(), r.GetFairMaxResource())
comp := resources.CompUsageRatioSeparately(l.GetAllocatedResource(), l.GetGuaranteedResource(), fairMaxResources[i],
r.GetAllocatedResource(), r.GetGuaranteedResource(), fairMaxResources[j])
if comp == 0 {
lPriority := l.GetCurrentPriority()
rPriority := r.GetCurrentPriority()
Expand Down
53 changes: 36 additions & 17 deletions pkg/scheduler/objects/sorters_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,19 @@ import (
"github.com/apache/yunikorn-core/pkg/scheduler/policies"
)

// verify queue ordering is working
func TestSortQueues(t *testing.T) {
// verify queue ordering is working when explicity guarantees are provided
func TestSortQueuesWithGuarantees(t *testing.T) {
root, err := createRootQueue(nil)
assert.NilError(t, err, "queue create failed")

var q0, q1, q2, q3 *Queue
var queues []*Queue
fairMaxResources := []*resources.Resource{resources.NewResourceFromMap(map[string]resources.Quantity{}),
resources.NewResourceFromMap(map[string]resources.Quantity{}),
resources.NewResourceFromMap(map[string]resources.Quantity{}),
resources.NewResourceFromMap(map[string]resources.Quantity{}),
}

q0, err = createManagedQueue(root, "q0", false, nil)
assert.NilError(t, err, "failed to create leaf queue")
q0.guaranteedResource = resources.NewResourceFromMap(map[string]resources.Quantity{"memory": 500, "vcore": 500})
Expand All @@ -63,49 +69,50 @@ func TestSortQueues(t *testing.T) {

// fifo
queues = []*Queue{q0, q1, q2, q3}
sortQueue(queues, policies.FifoSortPolicy, false)

sortQueue(queues, fairMaxResources, policies.FifoSortPolicy, false)
assert.Equal(t, queueNames(queues), queueNames([]*Queue{q0, q1, q2, q3}), "fifo first")

queues = []*Queue{q0, q1, q2, q3}
sortQueue(queues, policies.FifoSortPolicy, true)
sortQueue(queues, fairMaxResources, policies.FifoSortPolicy, true)
assert.Equal(t, queueNames(queues), queueNames([]*Queue{q3, q0, q1, q2}), "fifo first - priority")

// fifo - different starting order
queues = []*Queue{q1, q3, q0, q2}
sortQueue(queues, policies.FifoSortPolicy, false)
sortQueue(queues, fairMaxResources, policies.FifoSortPolicy, false)
assert.Equal(t, queueNames(queues), queueNames([]*Queue{q1, q3, q0, q2}), "fifo second")

queues = []*Queue{q1, q3, q0, q2}
sortQueue(queues, policies.FifoSortPolicy, true)
sortQueue(queues, fairMaxResources, policies.FifoSortPolicy, true)
assert.Equal(t, queueNames(queues), queueNames([]*Queue{q3, q1, q0, q2}), "fifo second - priority")

// fairness ratios: q0:300/500=0.6, q1:200/300=0.67, q2:100/200=0.5, q3:100/200=0.5
queues = []*Queue{q0, q1, q2, q3}
sortQueue(queues, policies.FairSortPolicy, false)
sortQueue(queues, fairMaxResources, policies.FairSortPolicy, false)
assert.Equal(t, queueNames(queues), queueNames([]*Queue{q3, q2, q0, q1}), "fair first")

queues = []*Queue{q0, q1, q2, q3}
sortQueue(queues, policies.FairSortPolicy, true)
sortQueue(queues, fairMaxResources, policies.FairSortPolicy, true)
assert.Equal(t, queueNames(queues), queueNames([]*Queue{q3, q2, q0, q1}), "fair first - priority")

// fairness ratios: q0:200/500=0.4, q1:300/300=1, q2:100/200=0.5, q3:100/200=0.5
q0.allocatedResource = resources.NewResourceFromMap(map[string]resources.Quantity{"memory": 200, "vcore": 200})
q1.allocatedResource = resources.NewResourceFromMap(map[string]resources.Quantity{"memory": 300, "vcore": 300})
queues = []*Queue{q0, q1, q2, q3}
sortQueue(queues, policies.FairSortPolicy, false)
sortQueue(queues, fairMaxResources, policies.FairSortPolicy, false)
assert.Equal(t, queueNames(queues), queueNames([]*Queue{q0, q3, q2, q1}), "fair second")
queues = []*Queue{q0, q1, q2, q3}
sortQueue(queues, policies.FairSortPolicy, true)
sortQueue(queues, fairMaxResources, policies.FairSortPolicy, true)
assert.Equal(t, queueNames(queues), queueNames([]*Queue{q3, q0, q2, q1}), "fair second - priority")

// fairness ratios: q0:150/500=0.3, q1:120/300=0.4, q2:100/200=0.5, q3:100/200=0.5
q0.allocatedResource = resources.NewResourceFromMap(map[string]resources.Quantity{"memory": 150, "vcore": 150})
q1.allocatedResource = resources.NewResourceFromMap(map[string]resources.Quantity{"memory": 120, "vcore": 120})
queues = []*Queue{q0, q1, q2, q3}
sortQueue(queues, policies.FairSortPolicy, false)
sortQueue(queues, fairMaxResources, policies.FairSortPolicy, false)
assert.Equal(t, queueNames(queues), queueNames([]*Queue{q0, q1, q3, q2}), "fair third")
queues = []*Queue{q0, q1, q2, q3}
sortQueue(queues, policies.FairSortPolicy, true)
sortQueue(queues, fairMaxResources, policies.FairSortPolicy, true)
assert.Equal(t, queueNames(queues), queueNames([]*Queue{q3, q0, q1, q2}), "fair third - priority")

// fairness ratios: q0:400/800=0.5, q1:200/400= 0.5, q2:100/200=0.5, q3:100/200=0.5
Expand All @@ -114,12 +121,12 @@ func TestSortQueues(t *testing.T) {
q1.guaranteedResource = resources.NewResourceFromMap(map[string]resources.Quantity{"memory": 400, "vcore": 300})
q1.allocatedResource = resources.NewResourceFromMap(map[string]resources.Quantity{"memory": 200, "vcore": 150})
queues = []*Queue{q0, q1, q2, q3}
sortQueue(queues, policies.FairSortPolicy, false)
sortQueue(queues, fairMaxResources, policies.FairSortPolicy, false)
assert.Equal(t, queueNames(queues), queueNames([]*Queue{q3, q0, q1, q2}), "fair - pending resource")
}

// queue guaranteed resource is not set (same as a zero resource)
func TestNoQueueLimits(t *testing.T) {
func TestSortQueuesNoGuarantees(t *testing.T) {
root, err := createRootQueue(nil)
assert.NilError(t, err, "queue create failed")

Expand All @@ -145,12 +152,24 @@ func TestNoQueueLimits(t *testing.T) {
q3.currentPriority = 3

queues := []*Queue{q0, q1, q2, q3}
sortQueue(queues, policies.FairSortPolicy, true)
assert.Equal(t, queueNames(queues), queueNames([]*Queue{q3, q2, q0, q1}), "fair no limit first - priority")
fairMaxResources := []*resources.Resource{resources.NewResourceFromMap(map[string]resources.Quantity{"memory": 1000, "vcore": 1000}),
resources.NewResourceFromMap(map[string]resources.Quantity{"memory": 1000, "vcore": 1000}),
resources.NewResourceFromMap(map[string]resources.Quantity{"memory": 1000, "vcore": 1000}),
resources.NewResourceFromMap(map[string]resources.Quantity{"memory": 1000, "vcore": 1000}),
}
sortQueue(queues, fairMaxResources, policies.FairSortPolicy, false)
assert.Equal(t, queueNames(queues), queueNames([]*Queue{q3, q2, q1, q0}), "fair no gaurantees first")

sortQueue(queues, fairMaxResources, policies.FairSortPolicy, true)
assert.Equal(t, queueNames(queues), queueNames([]*Queue{q3, q2, q0, q1}), "fair no gaurantees first - priority")

q0.allocatedResource = resources.NewResourceFromMap(map[string]resources.Quantity{"memory": 200, "vcore": 200})
q1.allocatedResource = resources.NewResourceFromMap(map[string]resources.Quantity{"memory": 300, "vcore": 300})
sortQueue(queues, policies.FairSortPolicy, true)

sortQueue(queues, fairMaxResources, policies.FairSortPolicy, false)
assert.Equal(t, queueNames(queues), queueNames([]*Queue{q3, q2, q0, q1}), "fair no gaurantees second")

sortQueue(queues, fairMaxResources, policies.FairSortPolicy, true)
assert.Equal(t, queueNames(queues), queueNames([]*Queue{q3, q2, q0, q1}), "fair no limit second - priority")
}

Expand Down

0 comments on commit 066797f

Please sign in to comment.