From 300304e27888bcbc386d787bccd539e3969dfca6 Mon Sep 17 00:00:00 2001 From: Peter Bacsko Date: Tue, 29 Oct 2024 15:29:28 +0100 Subject: [PATCH] [YUNIKORN-2928] [core] Update foreign pod resource usage --- pkg/scheduler/health_checker_test.go | 2 +- pkg/scheduler/objects/node.go | 25 +++++++++ pkg/scheduler/objects/node_test.go | 24 ++++++++ pkg/scheduler/partition.go | 32 ++++++++--- pkg/scheduler/partition_test.go | 80 +++++++++++++++++++++++---- pkg/scheduler/tests/operation_test.go | 62 ++++++++++++++------- pkg/scheduler/utilities_test.go | 6 +- 7 files changed, 190 insertions(+), 41 deletions(-) diff --git a/pkg/scheduler/health_checker_test.go b/pkg/scheduler/health_checker_test.go index 02402fc9f..e4017828e 100644 --- a/pkg/scheduler/health_checker_test.go +++ b/pkg/scheduler/health_checker_test.go @@ -230,7 +230,7 @@ func TestGetSchedulerHealthStatusContext(t *testing.T) { assert.Assert(t, healthInfo.HealthChecks[8].Succeeded, "The orphan allocation check on the app still fails after removing the app") // check that foreign allocation does not interfere with health check - falloc := newForeignAllocation("foreign-1", "node") + falloc := newForeignAllocation("foreign-1", "node", resources.Zero) node.AddAllocation(falloc) healthInfo = GetSchedulerHealthStatus(schedulerMetrics, schedulerContext) assert.Assert(t, healthInfo.HealthChecks[7].Succeeded, "Foreign allocation was detected as orphan") diff --git a/pkg/scheduler/objects/node.go b/pkg/scheduler/objects/node.go index db51fa746..3a5c7d999 100644 --- a/pkg/scheduler/objects/node.go +++ b/pkg/scheduler/objects/node.go @@ -364,6 +364,31 @@ func (sn *Node) AddAllocation(alloc *Allocation) { _ = sn.addAllocationInternal(alloc, true) } +// UpdateForeignAllocation updates a foreign allocation and re-calculates the available/occupied resources +func (sn *Node) UpdateForeignAllocation(alloc *Allocation) *Allocation { + sn.Lock() + defer sn.Unlock() + key := alloc.GetAllocationKey() + existing := sn.allocations[key] + sn.allocations[key] = alloc + if existing == nil { + log.Log(log.SchedNode).Debug("unknown allocation to update", + zap.String("allocationKey", key)) + return nil + } + + existingResource := existing.GetAllocatedResource().Clone() + newResource := alloc.GetAllocatedResource().Clone() + delta := resources.Sub(newResource, existingResource) + delta.Prune() + + sn.occupiedResource.AddTo(delta) + sn.occupiedResource.Prune() + sn.refreshAvailableResource() + + return existing +} + func (sn *Node) addAllocationInternal(alloc *Allocation, force bool) bool { if alloc == nil { return false diff --git a/pkg/scheduler/objects/node_test.go b/pkg/scheduler/objects/node_test.go index 4b38e4360..fb0863715 100644 --- a/pkg/scheduler/objects/node_test.go +++ b/pkg/scheduler/objects/node_test.go @@ -963,3 +963,27 @@ func TestGetAllocations(t *testing.T) { assert.Assert(t, m[foreignAlloc2]) }) } + +func TestUpdateForeignAllocation(t *testing.T) { + node := newNode(nodeID1, map[string]resources.Quantity{"first": 100, "second": 200}) + allocRes := resources.NewResourceFromMap(map[string]resources.Quantity{"first": 10, "second": 20}) + alloc := newForeignAllocation(foreignAlloc1, nodeID1, allocRes) + node.AddAllocation(alloc) + + // update existing allocation + updatedRes := resources.NewResourceFromMap(map[string]resources.Quantity{"first": 15, "second": 0}) + allocUpd := newForeignAllocation(foreignAlloc1, nodeID1, updatedRes) + prev := node.UpdateForeignAllocation(allocUpd) + expectedOccupied := resources.NewResourceFromMap(map[string]resources.Quantity{"first": 15}) + expectedAvailable := resources.NewResourceFromMap(map[string]resources.Quantity{"first": 85, "second": 200}) + assert.Assert(t, prev == alloc, "returned previous allocation is different") + assert.Assert(t, resources.Equals(node.GetOccupiedResource(), expectedOccupied), "occupied resource has been updated incorrectly") + assert.Assert(t, resources.Equals(node.GetAllocatedResource(), resources.Zero), "allocated resource has changed") + assert.Assert(t, resources.Equals(node.GetAvailableResource(), expectedAvailable), "avaiable resource has been updated incorrectly") + + // update non-existing allocation + alloc2 := newForeignAllocation(foreignAlloc2, nodeID1, allocRes) + prev = node.UpdateForeignAllocation(alloc2) + assert.Assert(t, prev == nil, "unexpected previous allocation returned") + assert.Assert(t, node.GetAllocation(foreignAlloc2) == alloc2, "foreign allocation not found") +} diff --git a/pkg/scheduler/partition.go b/pkg/scheduler/partition.go index b3d72ba06..b05551650 100644 --- a/pkg/scheduler/partition.go +++ b/pkg/scheduler/partition.go @@ -65,7 +65,8 @@ type PartitionContext struct { reservations int // number of reservations placeholderAllocations int // number of placeholder allocations preemptionEnabled bool // whether preemption is enabled or not - foreignAllocs map[string]string // allocKey-nodeID assignment of non-Yunikorn allocations + foreignAllocs map[string]*objects.Allocation // foreign (non-Yunikorn) allocations + foreignNodes map[string]string // allocKey-nodeID assignment of non-Yunikorn allocations // The partition write lock must not be held while manipulating an application. // Scheduling is running continuously as a lock free background task. Scheduling an application @@ -95,7 +96,8 @@ func newPartitionContext(conf configs.PartitionConfig, rmID string, cc *ClusterC applications: make(map[string]*objects.Application), completedApplications: make(map[string]*objects.Application), nodes: objects.NewNodeCollection(conf.Name), - foreignAllocs: make(map[string]string), + foreignAllocs: make(map[string]*objects.Allocation), + foreignNodes: make(map[string]string), } pc.partitionManager = newPartitionManager(pc, cc) if err := pc.initialPartitionFromConfig(conf); err != nil { @@ -1310,6 +1312,7 @@ func (pc *PartitionContext) handleForeignAllocation(allocationKey, applicationID zap.String("nodeID", nodeID), zap.String("allocationKey", allocationKey)) node.AddAllocation(alloc) + pc.foreignAllocs[allocationKey] = alloc return false, true, nil } @@ -1317,7 +1320,12 @@ func (pc *PartitionContext) handleForeignAllocation(allocationKey, applicationID zap.String("partitionName", pc.Name), zap.String("appID", applicationID), zap.String("allocationKey", allocationKey)) - // this is a placeholder for eventual resource updates; nothing to do yet + prev := node.UpdateForeignAllocation(alloc) + if prev == nil { + log.Log(log.SchedPartition).Warn("BUG: previous allocation not found during update", + zap.String("allocationKey", allocationKey)) + } + return false, false, nil } @@ -1331,11 +1339,11 @@ func (pc *PartitionContext) convertUGI(ugi *si.UserGroupInformation, forced bool func (pc *PartitionContext) getOrSetNodeIDForAlloc(allocKey, nodeID string) string { pc.Lock() defer pc.Unlock() - id := pc.foreignAllocs[allocKey] + id := pc.foreignNodes[allocKey] if id != "" { return id } - pc.foreignAllocs[allocKey] = nodeID + pc.foreignNodes[allocKey] = nodeID return "" } @@ -1548,14 +1556,22 @@ func (pc *PartitionContext) removeAllocation(release *si.AllocationRelease) ([]* } func (pc *PartitionContext) removeForeignAllocation(allocID string) { - nodeID := pc.foreignAllocs[allocID] - if nodeID == "" { + alloc := pc.foreignAllocs[allocID] + delete(pc.foreignAllocs, allocID) + if alloc == nil { log.Log(log.SchedPartition).Debug("Tried to remove a non-existing foreign allocation", + zap.String("allocationID", allocID)) + } + + nodeID := pc.foreignNodes[allocID] + delete(pc.foreignNodes, allocID) + if nodeID == "" { + log.Log(log.SchedPartition).Debug("Assigned node not found for foreign allocation", zap.String("allocationID", allocID), zap.String("nodeID", nodeID)) return } - delete(pc.foreignAllocs, allocID) + node := pc.GetNode(nodeID) if node == nil { log.Log(log.SchedPartition).Debug("Node not found for foreign allocation", diff --git a/pkg/scheduler/partition_test.go b/pkg/scheduler/partition_test.go index 8a16d34dc..76778fd74 100644 --- a/pkg/scheduler/partition_test.go +++ b/pkg/scheduler/partition_test.go @@ -4690,13 +4690,13 @@ func TestPlaceholderAllocationAndReplacementAfterRecovery(t *testing.T) { assert.Equal(t, "tg-1", confirmed.GetTaskGroup()) } -func TestForeignAllocation(t *testing.T) { +func TestForeignAllocation(t *testing.T) { //nolint:funlen setupUGM() partition, err := newBasePartition() assert.NilError(t, err, "partition create failed") nodeRes := resources.NewResourceFromMap(map[string]resources.Quantity{"first": 10}) - node1 := newNodeMaxResource(nodeID1, nodeRes) - err = partition.AddNode(node1) + node := newNodeMaxResource(nodeID1, nodeRes) + err = partition.AddNode(node) assert.NilError(t, err) // error: adding request (non-allocation) @@ -4705,32 +4705,73 @@ func TestForeignAllocation(t *testing.T) { assert.Assert(t, !reqCreated) assert.Assert(t, !allocCreated) assert.Error(t, err, "trying to add a foreign request (non-allocation) foreign-nonalloc") + assert.Equal(t, 0, len(partition.foreignAllocs)) + assert.Equal(t, 0, len(partition.foreignNodes)) // error: empty node ID - req = newForeignAllocation(foreignAlloc1, "") + req = newForeignAllocation(foreignAlloc1, "", resources.Zero) reqCreated, allocCreated, err = partition.UpdateAllocation(req) assert.Assert(t, !reqCreated) assert.Assert(t, !allocCreated) assert.Error(t, err, "node ID is empty for allocation foreign-alloc-1") + assert.Equal(t, 0, len(partition.foreignAllocs)) + assert.Equal(t, 0, len(partition.foreignNodes)) // error: no node found - req = newForeignAllocation(foreignAlloc1, nodeID2) + req = newForeignAllocation(foreignAlloc1, nodeID2, resources.Zero) reqCreated, allocCreated, err = partition.UpdateAllocation(req) assert.Assert(t, !reqCreated) assert.Assert(t, !allocCreated) assert.Error(t, err, "failed to find node node-2 for allocation foreign-alloc-1") assert.Equal(t, 0, len(partition.foreignAllocs)) + assert.Equal(t, 0, len(partition.foreignNodes)) // add new allocation - req = newForeignAllocation(foreignAlloc1, nodeID1) + allocRes := resources.NewResourceFromMap(map[string]resources.Quantity{"first": 1}) + req = newForeignAllocation(foreignAlloc1, nodeID1, allocRes) reqCreated, allocCreated, err = partition.UpdateAllocation(req) assert.Assert(t, !reqCreated) assert.Assert(t, allocCreated) assert.NilError(t, err) assert.Equal(t, 1, len(partition.foreignAllocs)) - assert.Equal(t, nodeID1, partition.foreignAllocs[foreignAlloc1]) - assert.Equal(t, 0, len(node1.GetYunikornAllocations())) - assert.Assert(t, node1.GetAllocation(foreignAlloc1) != nil) + assert.Equal(t, 1, len(partition.foreignNodes)) + assert.Equal(t, nodeID1, partition.foreignNodes[foreignAlloc1]) + assert.Equal(t, 0, len(node.GetYunikornAllocations())) + assert.Assert(t, node.GetAllocation(foreignAlloc1) != nil) + occupied := node.GetOccupiedResource().Clone() + available := node.GetAvailableResource().Clone() + allocated := node.GetAllocatedResource().Clone() + assert.Assert(t, resources.Equals(occupied, allocRes), "occupied resources has been calculated incorrectly") + assert.Assert(t, resources.Equals(allocated, resources.Zero), "allocated resources has changed") + expectedAvailable := resources.NewResourceFromMap(map[string]resources.Quantity{"first": 9}) + assert.Assert(t, resources.Equals(available, expectedAvailable), "available resources has been calculated incorrectly") + + // update resources + updatedRes := resources.NewResourceFromMap(map[string]resources.Quantity{"first": 2}) + update := newForeignAllocation(foreignAlloc1, nodeID1, updatedRes) + reqCreated, allocCreated, err = partition.UpdateAllocation(update) + assert.Assert(t, !reqCreated) + assert.Assert(t, !allocCreated) + assert.NilError(t, err) + updatedOccupied := node.GetOccupiedResource().Clone() + updatedAvailable := node.GetAvailableResource().Clone() + updatedAllocated := node.GetAllocatedResource().Clone() + assert.Assert(t, resources.Equals(updatedOccupied, updatedRes), "occupied resources has been updated incorrectly") + assert.Assert(t, resources.Equals(updatedAllocated, resources.Zero), "allocated resources has changed") + expectedAvailable = resources.NewResourceFromMap(map[string]resources.Quantity{"first": 8}) + assert.Assert(t, resources.Equals(updatedAvailable, expectedAvailable), "available resources has been updated incorrectly") + + // simulate update error + node.RemoveAllocation(foreignAlloc1) + reqCreated, allocCreated, err = partition.UpdateAllocation(update) // should re-add alloc object to the node + assert.Assert(t, !reqCreated) + assert.Assert(t, !allocCreated) + assert.NilError(t, err) + assert.Assert(t, node.GetAllocation(foreignAlloc1) != nil) + assert.Assert(t, resources.Equals(updatedOccupied, updatedRes), "occupied resources has been updated incorrectly") + assert.Assert(t, resources.Equals(updatedAllocated, resources.Zero), "allocated resources has changed") + expectedAvailable = resources.NewResourceFromMap(map[string]resources.Quantity{"first": 8}) + assert.Assert(t, resources.Equals(updatedAvailable, expectedAvailable), "available resources has been updated incorrectly") // remove allocation released, confirmed := partition.removeAllocation(&si.AllocationRelease{ @@ -4739,6 +4780,23 @@ func TestForeignAllocation(t *testing.T) { assert.Assert(t, released == nil) assert.Assert(t, confirmed == nil) assert.Equal(t, 0, len(partition.foreignAllocs)) - assert.Equal(t, 0, len(node1.GetYunikornAllocations())) - assert.Assert(t, node1.GetAllocation(foreignAlloc1) == nil) + assert.Equal(t, 0, len(partition.foreignNodes)) + assert.Equal(t, 0, len(node.GetYunikornAllocations())) + assert.Assert(t, node.GetAllocation(foreignAlloc1) == nil) + + // add + simulate removal failure + req = newForeignAllocation(foreignAlloc2, nodeID1, allocRes) + reqCreated, allocCreated, err = partition.UpdateAllocation(req) + assert.NilError(t, err) + assert.Assert(t, !reqCreated) + assert.Assert(t, !allocCreated) + partition.nodes.RemoveNode("node-1") // node gets removed + released, confirmed = partition.removeAllocation(&si.AllocationRelease{ + AllocationKey: foreignAlloc2, + }) + assert.Assert(t, released == nil) + assert.Assert(t, confirmed == nil) + assert.Equal(t, 0, len(partition.foreignAllocs)) + assert.Equal(t, 0, len(partition.foreignNodes)) + assert.Equal(t, 0, len(node.GetYunikornAllocations())) } diff --git a/pkg/scheduler/tests/operation_test.go b/pkg/scheduler/tests/operation_test.go index 2245697b0..72497f1e6 100644 --- a/pkg/scheduler/tests/operation_test.go +++ b/pkg/scheduler/tests/operation_test.go @@ -655,7 +655,7 @@ partitions: assert.Equal(t, int64(schedulingNode1.GetAvailableResource().Resources[common.Memory]), int64(20)) } -func TestForeignPodResourceUsage(t *testing.T) { +func TestForeignPodResourceUsage(t *testing.T) { //nolint:funlen // Register RM configData := ` partitions: @@ -708,12 +708,10 @@ partitions: waitForNewNode(t, context, "node-1:1234", "[rm:123]default", 1000) assert.Equal(t, len(partitionInfo.GetNodes()), 1) - node1 := partitionInfo.GetNode("node-1:1234") - assert.Equal(t, int64(node1.GetCapacity().Resources[common.Memory]), int64(100)) - schedulingNode1 := ms.scheduler.GetClusterContext(). - GetNode("node-1:1234", "[rm:123]default") - assert.Equal(t, int64(schedulingNode1.GetAllocatedResource().Resources[common.Memory]), int64(0)) - assert.Equal(t, int64(schedulingNode1.GetAvailableResource().Resources[common.Memory]), int64(100)) + node := partitionInfo.GetNode("node-1:1234") + assert.Equal(t, int64(node.GetCapacity().Resources[common.Memory]), int64(100)) + assert.Equal(t, int64(node.GetAllocatedResource().Resources[common.Memory]), int64(0)) + assert.Equal(t, int64(node.GetAvailableResource().Resources[common.Memory]), int64(100)) // update node capacity - add foreign pod res := resources.NewResourceFromMap(map[string]resources.Quantity{ @@ -734,12 +732,38 @@ partitions: assert.NilError(t, err, "NodeRequest failed") waitForAvailableNodeResource(t, ms.scheduler.GetClusterContext(), "[rm:123]default", []string{"node-1:1234"}, 20, 1000) - assert.Equal(t, int64(node1.GetCapacity().Resources[common.Memory]), int64(100)) - assert.Equal(t, int64(node1.GetCapacity().Resources[common.CPU]), int64(10)) - assert.Equal(t, int64(node1.GetOccupiedResource().Resources[common.Memory]), int64(80)) - assert.Equal(t, int64(node1.GetOccupiedResource().Resources[common.CPU]), int64(5)) - assert.Equal(t, int64(schedulingNode1.GetAllocatedResource().Resources[common.Memory]), int64(0)) - assert.Equal(t, int64(schedulingNode1.GetAvailableResource().Resources[common.Memory]), int64(20)) + assert.Equal(t, int64(node.GetCapacity().Resources[common.Memory]), int64(100)) + assert.Equal(t, int64(node.GetCapacity().Resources[common.CPU]), int64(10)) + assert.Equal(t, int64(node.GetOccupiedResource().Resources[common.Memory]), int64(80)) + assert.Equal(t, int64(node.GetOccupiedResource().Resources[common.CPU]), int64(5)) + assert.Equal(t, int64(node.GetAllocatedResource().Resources[common.Memory]), int64(0)) + assert.Equal(t, int64(node.GetAvailableResource().Resources[common.Memory]), int64(20)) + + // update resource usage + resUpd := resources.NewResourceFromMap(map[string]resources.Quantity{ + "memory": 50, "vcore": 10}) + err = ms.proxy.UpdateAllocation(&si.AllocationRequest{ + Allocations: []*si.Allocation{ + { + AllocationKey: "foreignpod-1", + ResourcePerAlloc: resUpd.ToProto(), + AllocationTags: map[string]string{ + common.Foreign: common.AllocTypeDefault, + }, + NodeID: "node-1:1234", + }, + }, + RmID: "rm:123", + }) + assert.NilError(t, err) + waitForAvailableNodeResource(t, ms.scheduler.GetClusterContext(), "[rm:123]default", + []string{"node-1:1234"}, 50, 1000) + assert.Equal(t, int64(node.GetCapacity().Resources[common.Memory]), int64(100)) + assert.Equal(t, int64(node.GetCapacity().Resources[common.CPU]), int64(10)) + assert.Equal(t, int64(node.GetOccupiedResource().Resources[common.Memory]), int64(50)) + assert.Equal(t, int64(node.GetOccupiedResource().Resources[common.CPU]), int64(10)) + assert.Equal(t, int64(node.GetAllocatedResource().Resources[common.Memory]), int64(0)) + assert.Equal(t, int64(node.GetAvailableResource().Resources[common.Memory]), int64(50)) // update node capacity - remove foreign pod err = ms.proxy.UpdateAllocation(&si.AllocationRequest{ @@ -755,10 +779,10 @@ partitions: assert.NilError(t, err) waitForAvailableNodeResource(t, ms.scheduler.GetClusterContext(), "[rm:123]default", []string{"node-1:1234"}, 100, 1000) - assert.Equal(t, int64(node1.GetCapacity().Resources[common.Memory]), int64(100)) - assert.Equal(t, int64(node1.GetCapacity().Resources[common.CPU]), int64(10)) - assert.Equal(t, int64(node1.GetOccupiedResource().Resources[common.Memory]), int64(0)) - assert.Equal(t, int64(node1.GetOccupiedResource().Resources[common.CPU]), int64(0)) - assert.Equal(t, int64(schedulingNode1.GetAllocatedResource().Resources[common.Memory]), int64(0)) - assert.Equal(t, int64(schedulingNode1.GetAvailableResource().Resources[common.Memory]), int64(100)) + assert.Equal(t, int64(node.GetCapacity().Resources[common.Memory]), int64(100)) + assert.Equal(t, int64(node.GetCapacity().Resources[common.CPU]), int64(10)) + assert.Equal(t, int64(node.GetOccupiedResource().Resources[common.Memory]), int64(0)) + assert.Equal(t, int64(node.GetOccupiedResource().Resources[common.CPU]), int64(0)) + assert.Equal(t, int64(node.GetAllocatedResource().Resources[common.Memory]), int64(0)) + assert.Equal(t, int64(node.GetAvailableResource().Resources[common.Memory]), int64(100)) } diff --git a/pkg/scheduler/utilities_test.go b/pkg/scheduler/utilities_test.go index 56845fd9e..9dd2ef6af 100644 --- a/pkg/scheduler/utilities_test.go +++ b/pkg/scheduler/utilities_test.go @@ -54,6 +54,7 @@ const ( maxresources = "maxresources" maxapplications = "maxapplications" foreignAlloc1 = "foreign-alloc-1" + foreignAlloc2 = "foreign-alloc-2" ) func newBasePartitionNoRootDefault() (*PartitionContext, error) { @@ -610,7 +611,7 @@ func newForeignRequest(allocKey string) *objects.Allocation { }) } -func newForeignAllocation(allocKey, nodeID string) *objects.Allocation { +func newForeignAllocation(allocKey, nodeID string, allocated *resources.Resource) *objects.Allocation { var alloc *objects.Allocation defer func() { if nodeID == "" { @@ -626,7 +627,8 @@ func newForeignAllocation(allocKey, nodeID string) *objects.Allocation { AllocationTags: map[string]string{ siCommon.Foreign: siCommon.AllocTypeDefault, }, - NodeID: id, + NodeID: id, + ResourcePerAlloc: allocated.ToProto(), }) return alloc }