diff --git a/pkg/scheduler/health_checker_test.go b/pkg/scheduler/health_checker_test.go index 02402fc9f..e4017828e 100644 --- a/pkg/scheduler/health_checker_test.go +++ b/pkg/scheduler/health_checker_test.go @@ -230,7 +230,7 @@ func TestGetSchedulerHealthStatusContext(t *testing.T) { assert.Assert(t, healthInfo.HealthChecks[8].Succeeded, "The orphan allocation check on the app still fails after removing the app") // check that foreign allocation does not interfere with health check - falloc := newForeignAllocation("foreign-1", "node") + falloc := newForeignAllocation("foreign-1", "node", resources.Zero) node.AddAllocation(falloc) healthInfo = GetSchedulerHealthStatus(schedulerMetrics, schedulerContext) assert.Assert(t, healthInfo.HealthChecks[7].Succeeded, "Foreign allocation was detected as orphan") diff --git a/pkg/scheduler/objects/node.go b/pkg/scheduler/objects/node.go index 29d26f80f..8187f7194 100644 --- a/pkg/scheduler/objects/node.go +++ b/pkg/scheduler/objects/node.go @@ -364,6 +364,31 @@ func (sn *Node) AddAllocation(alloc *Allocation) { _ = sn.addAllocationInternal(alloc, true) } +// UpdateForeignAllocation updates a foreign allocation and re-calculates the available/occupied resources +func (sn *Node) UpdateForeignAllocation(alloc *Allocation) *Allocation { + sn.Lock() + defer sn.Unlock() + key := alloc.GetAllocationKey() + existing := sn.allocations[key] + sn.allocations[key] = alloc + if existing == nil { + log.Log(log.SchedNode).Debug("unknown allocation to update", + zap.String("allocationKey", key)) + return nil + } + + existingResource := existing.GetAllocatedResource().Clone() + newResource := alloc.GetAllocatedResource().Clone() + delta := resources.Sub(newResource, existingResource) + delta.Prune() + + sn.occupiedResource.AddTo(delta) + sn.occupiedResource.Prune() + sn.refreshAvailableResource() + + return existing +} + func (sn *Node) addAllocationInternal(alloc *Allocation, force bool) bool { if alloc == nil { return false diff --git a/pkg/scheduler/objects/node_test.go b/pkg/scheduler/objects/node_test.go index bf0d7b6c5..27ad614da 100644 --- a/pkg/scheduler/objects/node_test.go +++ b/pkg/scheduler/objects/node_test.go @@ -956,3 +956,27 @@ func TestGetAllocations(t *testing.T) { assert.Assert(t, m[foreignAlloc2]) }) } + +func TestUpdateForeignAllocation(t *testing.T) { + node := newNode(nodeID1, map[string]resources.Quantity{"first": 100, "second": 200}) + allocRes := resources.NewResourceFromMap(map[string]resources.Quantity{"first": 10, "second": 20}) + alloc := newForeignAllocation(foreignAlloc1, nodeID1, allocRes) + node.AddAllocation(alloc) + + // update existing allocation + updatedRes := resources.NewResourceFromMap(map[string]resources.Quantity{"first": 15, "second": 0}) + allocUpd := newForeignAllocation(foreignAlloc1, nodeID1, updatedRes) + prev := node.UpdateForeignAllocation(allocUpd) + expectedOccupied := resources.NewResourceFromMap(map[string]resources.Quantity{"first": 15}) + expectedAvailable := resources.NewResourceFromMap(map[string]resources.Quantity{"first": 85, "second": 200}) + assert.Assert(t, prev == alloc, "returned previous allocation is different") + assert.Assert(t, resources.Equals(node.GetOccupiedResource(), expectedOccupied), "occupied resource has been updated incorrectly") + assert.Assert(t, resources.Equals(node.GetAllocatedResource(), resources.Zero), "allocated resource has changed") + assert.Assert(t, resources.Equals(node.GetAvailableResource(), expectedAvailable), "avaiable resource has been updated incorrectly") + + // update non-existing allocation + alloc2 := newForeignAllocation(foreignAlloc2, nodeID1, allocRes) + prev = node.UpdateForeignAllocation(alloc2) + assert.Assert(t, prev == nil, "unexpected previous allocation returned") + assert.Assert(t, node.GetAllocation(foreignAlloc2) == alloc2, "foreign allocation not found") +} diff --git a/pkg/scheduler/partition.go b/pkg/scheduler/partition.go index b3d72ba06..0ae63b683 100644 --- a/pkg/scheduler/partition.go +++ b/pkg/scheduler/partition.go @@ -65,7 +65,7 @@ type PartitionContext struct { reservations int // number of reservations placeholderAllocations int // number of placeholder allocations preemptionEnabled bool // whether preemption is enabled or not - foreignAllocs map[string]string // allocKey-nodeID assignment of non-Yunikorn allocations + foreignAllocs map[string]*objects.Allocation // foreign (non-Yunikorn) allocations // The partition write lock must not be held while manipulating an application. // Scheduling is running continuously as a lock free background task. Scheduling an application @@ -95,7 +95,7 @@ func newPartitionContext(conf configs.PartitionConfig, rmID string, cc *ClusterC applications: make(map[string]*objects.Application), completedApplications: make(map[string]*objects.Application), nodes: objects.NewNodeCollection(conf.Name), - foreignAllocs: make(map[string]string), + foreignAllocs: make(map[string]*objects.Allocation), } pc.partitionManager = newPartitionManager(pc, cc) if err := pc.initialPartitionFromConfig(conf); err != nil { @@ -1303,8 +1303,8 @@ func (pc *PartitionContext) handleForeignAllocation(allocationKey, applicationID return false, false, fmt.Errorf("failed to find node %s for allocation %s", nodeID, allocationKey) } - existingNodeID := pc.getOrSetNodeIDForAlloc(allocationKey, nodeID) - if existingNodeID == "" { + exists := pc.getOrStoreForeignAlloc(alloc) + if !exists { log.Log(log.SchedPartition).Info("handling new foreign allocation", zap.String("partitionName", pc.Name), zap.String("nodeID", nodeID), @@ -1317,7 +1317,12 @@ func (pc *PartitionContext) handleForeignAllocation(allocationKey, applicationID zap.String("partitionName", pc.Name), zap.String("appID", applicationID), zap.String("allocationKey", allocationKey)) - // this is a placeholder for eventual resource updates; nothing to do yet + prev := node.UpdateForeignAllocation(alloc) + if prev == nil { + log.Log(log.SchedPartition).Warn("BUG: previous allocation not found during update", + zap.String("allocationKey", allocationKey)) + } + return false, false, nil } @@ -1327,16 +1332,17 @@ func (pc *PartitionContext) convertUGI(ugi *si.UserGroupInformation, forced bool return pc.userGroupCache.ConvertUGI(ugi, forced) } -// getOrSetNodeIDForAlloc returns the nodeID for a given foreign allocation, or sets is if it's unset -func (pc *PartitionContext) getOrSetNodeIDForAlloc(allocKey, nodeID string) string { +// getOrStoreForeignAlloc returns whether the allocation already exists or stores it if it's new +func (pc *PartitionContext) getOrStoreForeignAlloc(alloc *objects.Allocation) bool { pc.Lock() defer pc.Unlock() - id := pc.foreignAllocs[allocKey] - if id != "" { - return id + allocKey := alloc.GetAllocationKey() + existing := pc.foreignAllocs[allocKey] + if existing == nil { + pc.foreignAllocs[allocKey] = alloc + return false } - pc.foreignAllocs[allocKey] = nodeID - return "" + return true } // calculate overall nodes resource usage and returns a map as the result, @@ -1548,14 +1554,17 @@ func (pc *PartitionContext) removeAllocation(release *si.AllocationRelease) ([]* } func (pc *PartitionContext) removeForeignAllocation(allocID string) { - nodeID := pc.foreignAllocs[allocID] - if nodeID == "" { + pc.Lock() + defer pc.Unlock() + alloc := pc.foreignAllocs[allocID] + delete(pc.foreignAllocs, allocID) + if alloc == nil { log.Log(log.SchedPartition).Debug("Tried to remove a non-existing foreign allocation", - zap.String("allocationID", allocID), - zap.String("nodeID", nodeID)) + zap.String("allocationID", allocID)) return } - delete(pc.foreignAllocs, allocID) + + nodeID := alloc.GetNodeID() node := pc.GetNode(nodeID) if node == nil { log.Log(log.SchedPartition).Debug("Node not found for foreign allocation", diff --git a/pkg/scheduler/partition_test.go b/pkg/scheduler/partition_test.go index e1c6f03ac..32e3f5937 100644 --- a/pkg/scheduler/partition_test.go +++ b/pkg/scheduler/partition_test.go @@ -4706,13 +4706,13 @@ func TestPlaceholderAllocationAndReplacementAfterRecovery(t *testing.T) { assert.Equal(t, "tg-1", confirmed.GetTaskGroup()) } -func TestForeignAllocation(t *testing.T) { +func TestForeignAllocation(t *testing.T) { //nolint:funlen setupUGM() partition, err := newBasePartition() assert.NilError(t, err, "partition create failed") nodeRes := resources.NewResourceFromMap(map[string]resources.Quantity{"first": 10}) - node1 := newNodeMaxResource(nodeID1, nodeRes) - err = partition.AddNode(node1) + node := newNodeMaxResource(nodeID1, nodeRes) + err = partition.AddNode(node) assert.NilError(t, err) // error: adding request (non-allocation) @@ -4721,16 +4721,18 @@ func TestForeignAllocation(t *testing.T) { assert.Assert(t, !reqCreated) assert.Assert(t, !allocCreated) assert.Error(t, err, "trying to add a foreign request (non-allocation) foreign-nonalloc") + assert.Equal(t, 0, len(partition.foreignAllocs)) // error: empty node ID - req = newForeignAllocation(foreignAlloc1, "") + req = newForeignAllocation(foreignAlloc1, "", resources.Zero) reqCreated, allocCreated, err = partition.UpdateAllocation(req) assert.Assert(t, !reqCreated) assert.Assert(t, !allocCreated) assert.Error(t, err, "node ID is empty for allocation foreign-alloc-1") + assert.Equal(t, 0, len(partition.foreignAllocs)) // error: no node found - req = newForeignAllocation(foreignAlloc1, nodeID2) + req = newForeignAllocation(foreignAlloc1, nodeID2, resources.Zero) reqCreated, allocCreated, err = partition.UpdateAllocation(req) assert.Assert(t, !reqCreated) assert.Assert(t, !allocCreated) @@ -4738,15 +4740,49 @@ func TestForeignAllocation(t *testing.T) { assert.Equal(t, 0, len(partition.foreignAllocs)) // add new allocation - req = newForeignAllocation(foreignAlloc1, nodeID1) + allocRes := resources.NewResourceFromMap(map[string]resources.Quantity{"first": 1}) + req = newForeignAllocation(foreignAlloc1, nodeID1, allocRes) reqCreated, allocCreated, err = partition.UpdateAllocation(req) assert.Assert(t, !reqCreated) assert.Assert(t, allocCreated) assert.NilError(t, err) assert.Equal(t, 1, len(partition.foreignAllocs)) - assert.Equal(t, nodeID1, partition.foreignAllocs[foreignAlloc1]) - assert.Equal(t, 0, len(node1.GetYunikornAllocations())) - assert.Assert(t, node1.GetAllocation(foreignAlloc1) != nil) + assert.Equal(t, 0, len(node.GetYunikornAllocations())) + assert.Assert(t, node.GetAllocation(foreignAlloc1) != nil) + occupied := node.GetOccupiedResource().Clone() + available := node.GetAvailableResource().Clone() + allocated := node.GetAllocatedResource().Clone() + assert.Assert(t, resources.Equals(occupied, allocRes), "occupied resources has been calculated incorrectly") + assert.Assert(t, resources.Equals(allocated, resources.Zero), "allocated resources has changed") + expectedAvailable := resources.NewResourceFromMap(map[string]resources.Quantity{"first": 9}) + assert.Assert(t, resources.Equals(available, expectedAvailable), "available resources has been calculated incorrectly") + + // update resources + updatedRes := resources.NewResourceFromMap(map[string]resources.Quantity{"first": 2}) + update := newForeignAllocation(foreignAlloc1, nodeID1, updatedRes) + reqCreated, allocCreated, err = partition.UpdateAllocation(update) + assert.Assert(t, !reqCreated) + assert.Assert(t, !allocCreated) + assert.NilError(t, err) + updatedOccupied := node.GetOccupiedResource().Clone() + updatedAvailable := node.GetAvailableResource().Clone() + updatedAllocated := node.GetAllocatedResource().Clone() + assert.Assert(t, resources.Equals(updatedOccupied, updatedRes), "occupied resources has been updated incorrectly") + assert.Assert(t, resources.Equals(updatedAllocated, resources.Zero), "allocated resources has changed") + expectedAvailable = resources.NewResourceFromMap(map[string]resources.Quantity{"first": 8}) + assert.Assert(t, resources.Equals(updatedAvailable, expectedAvailable), "available resources has been updated incorrectly") + + // simulate update error + node.RemoveAllocation(foreignAlloc1) + reqCreated, allocCreated, err = partition.UpdateAllocation(update) // should re-add alloc object to the node + assert.Assert(t, !reqCreated) + assert.Assert(t, !allocCreated) + assert.NilError(t, err) + assert.Assert(t, node.GetAllocation(foreignAlloc1) != nil) + assert.Assert(t, resources.Equals(updatedOccupied, updatedRes), "occupied resources has been updated incorrectly") + assert.Assert(t, resources.Equals(updatedAllocated, resources.Zero), "allocated resources has changed") + expectedAvailable = resources.NewResourceFromMap(map[string]resources.Quantity{"first": 8}) + assert.Assert(t, resources.Equals(updatedAvailable, expectedAvailable), "available resources has been updated incorrectly") // remove allocation released, confirmed := partition.removeAllocation(&si.AllocationRelease{ @@ -4755,6 +4791,21 @@ func TestForeignAllocation(t *testing.T) { assert.Assert(t, released == nil) assert.Assert(t, confirmed == nil) assert.Equal(t, 0, len(partition.foreignAllocs)) - assert.Equal(t, 0, len(node1.GetYunikornAllocations())) - assert.Assert(t, node1.GetAllocation(foreignAlloc1) == nil) + assert.Equal(t, 0, len(node.GetYunikornAllocations())) + assert.Assert(t, node.GetAllocation(foreignAlloc1) == nil) + + // add + simulate removal failure + req = newForeignAllocation(foreignAlloc2, nodeID1, allocRes) + reqCreated, allocCreated, err = partition.UpdateAllocation(req) + assert.NilError(t, err) + assert.Assert(t, !reqCreated) + assert.Assert(t, allocCreated) + partition.nodes.RemoveNode("node-1") // node gets removed + released, confirmed = partition.removeAllocation(&si.AllocationRelease{ + AllocationKey: foreignAlloc2, + }) + assert.Assert(t, released == nil) + assert.Assert(t, confirmed == nil) + assert.Equal(t, 0, len(partition.foreignAllocs)) + assert.Equal(t, 0, len(node.GetYunikornAllocations())) } diff --git a/pkg/scheduler/tests/operation_test.go b/pkg/scheduler/tests/operation_test.go index 4a0c60158..292ae57cf 100644 --- a/pkg/scheduler/tests/operation_test.go +++ b/pkg/scheduler/tests/operation_test.go @@ -564,7 +564,7 @@ partitions: } } -func TestForeignPodResourceUsage(t *testing.T) { +func TestForeignPodResourceUsage(t *testing.T) { //nolint:funlen // Register RM configData := ` partitions: @@ -617,12 +617,10 @@ partitions: waitForNewNode(t, context, "node-1:1234", "[rm:123]default", 1000) assert.Equal(t, len(partitionInfo.GetNodes()), 1) - node1 := partitionInfo.GetNode("node-1:1234") - assert.Equal(t, int64(node1.GetCapacity().Resources[common.Memory]), int64(100)) - schedulingNode1 := ms.scheduler.GetClusterContext(). - GetNode("node-1:1234", "[rm:123]default") - assert.Equal(t, int64(schedulingNode1.GetAllocatedResource().Resources[common.Memory]), int64(0)) - assert.Equal(t, int64(schedulingNode1.GetAvailableResource().Resources[common.Memory]), int64(100)) + node := partitionInfo.GetNode("node-1:1234") + assert.Equal(t, int64(node.GetCapacity().Resources[common.Memory]), int64(100)) + assert.Equal(t, int64(node.GetAllocatedResource().Resources[common.Memory]), int64(0)) + assert.Equal(t, int64(node.GetAvailableResource().Resources[common.Memory]), int64(100)) // update node capacity - add foreign pod res := resources.NewResourceFromMap(map[string]resources.Quantity{ @@ -643,12 +641,38 @@ partitions: assert.NilError(t, err, "NodeRequest failed") waitForAvailableNodeResource(t, ms.scheduler.GetClusterContext(), "[rm:123]default", []string{"node-1:1234"}, 20, 1000) - assert.Equal(t, int64(node1.GetCapacity().Resources[common.Memory]), int64(100)) - assert.Equal(t, int64(node1.GetCapacity().Resources[common.CPU]), int64(10)) - assert.Equal(t, int64(node1.GetOccupiedResource().Resources[common.Memory]), int64(80)) - assert.Equal(t, int64(node1.GetOccupiedResource().Resources[common.CPU]), int64(5)) - assert.Equal(t, int64(schedulingNode1.GetAllocatedResource().Resources[common.Memory]), int64(0)) - assert.Equal(t, int64(schedulingNode1.GetAvailableResource().Resources[common.Memory]), int64(20)) + assert.Equal(t, int64(node.GetCapacity().Resources[common.Memory]), int64(100)) + assert.Equal(t, int64(node.GetCapacity().Resources[common.CPU]), int64(10)) + assert.Equal(t, int64(node.GetOccupiedResource().Resources[common.Memory]), int64(80)) + assert.Equal(t, int64(node.GetOccupiedResource().Resources[common.CPU]), int64(5)) + assert.Equal(t, int64(node.GetAllocatedResource().Resources[common.Memory]), int64(0)) + assert.Equal(t, int64(node.GetAvailableResource().Resources[common.Memory]), int64(20)) + + // update resource usage + resUpd := resources.NewResourceFromMap(map[string]resources.Quantity{ + "memory": 50, "vcore": 10}) + err = ms.proxy.UpdateAllocation(&si.AllocationRequest{ + Allocations: []*si.Allocation{ + { + AllocationKey: "foreignpod-1", + ResourcePerAlloc: resUpd.ToProto(), + AllocationTags: map[string]string{ + common.Foreign: common.AllocTypeDefault, + }, + NodeID: "node-1:1234", + }, + }, + RmID: "rm:123", + }) + assert.NilError(t, err) + waitForAvailableNodeResource(t, ms.scheduler.GetClusterContext(), "[rm:123]default", + []string{"node-1:1234"}, 50, 1000) + assert.Equal(t, int64(node.GetCapacity().Resources[common.Memory]), int64(100)) + assert.Equal(t, int64(node.GetCapacity().Resources[common.CPU]), int64(10)) + assert.Equal(t, int64(node.GetOccupiedResource().Resources[common.Memory]), int64(50)) + assert.Equal(t, int64(node.GetOccupiedResource().Resources[common.CPU]), int64(10)) + assert.Equal(t, int64(node.GetAllocatedResource().Resources[common.Memory]), int64(0)) + assert.Equal(t, int64(node.GetAvailableResource().Resources[common.Memory]), int64(50)) // update node capacity - remove foreign pod err = ms.proxy.UpdateAllocation(&si.AllocationRequest{ @@ -664,10 +688,10 @@ partitions: assert.NilError(t, err) waitForAvailableNodeResource(t, ms.scheduler.GetClusterContext(), "[rm:123]default", []string{"node-1:1234"}, 100, 1000) - assert.Equal(t, int64(node1.GetCapacity().Resources[common.Memory]), int64(100)) - assert.Equal(t, int64(node1.GetCapacity().Resources[common.CPU]), int64(10)) - assert.Equal(t, int64(node1.GetOccupiedResource().Resources[common.Memory]), int64(0)) - assert.Equal(t, int64(node1.GetOccupiedResource().Resources[common.CPU]), int64(0)) - assert.Equal(t, int64(schedulingNode1.GetAllocatedResource().Resources[common.Memory]), int64(0)) - assert.Equal(t, int64(schedulingNode1.GetAvailableResource().Resources[common.Memory]), int64(100)) + assert.Equal(t, int64(node.GetCapacity().Resources[common.Memory]), int64(100)) + assert.Equal(t, int64(node.GetCapacity().Resources[common.CPU]), int64(10)) + assert.Equal(t, int64(node.GetOccupiedResource().Resources[common.Memory]), int64(0)) + assert.Equal(t, int64(node.GetOccupiedResource().Resources[common.CPU]), int64(0)) + assert.Equal(t, int64(node.GetAllocatedResource().Resources[common.Memory]), int64(0)) + assert.Equal(t, int64(node.GetAvailableResource().Resources[common.Memory]), int64(100)) } diff --git a/pkg/scheduler/utilities_test.go b/pkg/scheduler/utilities_test.go index 05986c039..d2f9df80d 100644 --- a/pkg/scheduler/utilities_test.go +++ b/pkg/scheduler/utilities_test.go @@ -54,6 +54,7 @@ const ( maxresources = "maxresources" maxapplications = "maxapplications" foreignAlloc1 = "foreign-alloc-1" + foreignAlloc2 = "foreign-alloc-2" ) func newBasePartitionNoRootDefault() (*PartitionContext, error) { @@ -610,7 +611,7 @@ func newForeignRequest(allocKey string) *objects.Allocation { }) } -func newForeignAllocation(allocKey, nodeID string) *objects.Allocation { +func newForeignAllocation(allocKey, nodeID string, allocated *resources.Resource) *objects.Allocation { var alloc *objects.Allocation defer func() { if nodeID == "" { @@ -626,7 +627,8 @@ func newForeignAllocation(allocKey, nodeID string) *objects.Allocation { AllocationTags: map[string]string{ siCommon.Foreign: siCommon.AllocTypeDefault, }, - NodeID: id, + NodeID: id, + ResourcePerAlloc: allocated.ToProto(), }) return alloc }