Skip to content

Commit

Permalink
[YUNIKORN-2928] [core] Update foreign pod resource usage (#990)
Browse files Browse the repository at this point in the history
Closes: #990

Signed-off-by: Craig Condit <[email protected]>
  • Loading branch information
pbacsko authored and craigcondit committed Oct 30, 2024
1 parent 1e5d68b commit 7057201
Show file tree
Hide file tree
Showing 7 changed files with 185 additions and 50 deletions.
2 changes: 1 addition & 1 deletion pkg/scheduler/health_checker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ func TestGetSchedulerHealthStatusContext(t *testing.T) {
assert.Assert(t, healthInfo.HealthChecks[8].Succeeded, "The orphan allocation check on the app still fails after removing the app")

// check that foreign allocation does not interfere with health check
falloc := newForeignAllocation("foreign-1", "node")
falloc := newForeignAllocation("foreign-1", "node", resources.Zero)
node.AddAllocation(falloc)
healthInfo = GetSchedulerHealthStatus(schedulerMetrics, schedulerContext)
assert.Assert(t, healthInfo.HealthChecks[7].Succeeded, "Foreign allocation was detected as orphan")
Expand Down
25 changes: 25 additions & 0 deletions pkg/scheduler/objects/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -364,6 +364,31 @@ func (sn *Node) AddAllocation(alloc *Allocation) {
_ = sn.addAllocationInternal(alloc, true)
}

// UpdateForeignAllocation updates a foreign allocation and re-calculates the available/occupied resources
func (sn *Node) UpdateForeignAllocation(alloc *Allocation) *Allocation {
sn.Lock()
defer sn.Unlock()
key := alloc.GetAllocationKey()
existing := sn.allocations[key]
sn.allocations[key] = alloc
if existing == nil {
log.Log(log.SchedNode).Debug("unknown allocation to update",
zap.String("allocationKey", key))
return nil
}

existingResource := existing.GetAllocatedResource().Clone()
newResource := alloc.GetAllocatedResource().Clone()
delta := resources.Sub(newResource, existingResource)
delta.Prune()

sn.occupiedResource.AddTo(delta)
sn.occupiedResource.Prune()
sn.refreshAvailableResource()

return existing
}

func (sn *Node) addAllocationInternal(alloc *Allocation, force bool) bool {
if alloc == nil {
return false
Expand Down
24 changes: 24 additions & 0 deletions pkg/scheduler/objects/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -956,3 +956,27 @@ func TestGetAllocations(t *testing.T) {
assert.Assert(t, m[foreignAlloc2])
})
}

func TestUpdateForeignAllocation(t *testing.T) {
node := newNode(nodeID1, map[string]resources.Quantity{"first": 100, "second": 200})
allocRes := resources.NewResourceFromMap(map[string]resources.Quantity{"first": 10, "second": 20})
alloc := newForeignAllocation(foreignAlloc1, nodeID1, allocRes)
node.AddAllocation(alloc)

// update existing allocation
updatedRes := resources.NewResourceFromMap(map[string]resources.Quantity{"first": 15, "second": 0})
allocUpd := newForeignAllocation(foreignAlloc1, nodeID1, updatedRes)
prev := node.UpdateForeignAllocation(allocUpd)
expectedOccupied := resources.NewResourceFromMap(map[string]resources.Quantity{"first": 15})
expectedAvailable := resources.NewResourceFromMap(map[string]resources.Quantity{"first": 85, "second": 200})
assert.Assert(t, prev == alloc, "returned previous allocation is different")
assert.Assert(t, resources.Equals(node.GetOccupiedResource(), expectedOccupied), "occupied resource has been updated incorrectly")
assert.Assert(t, resources.Equals(node.GetAllocatedResource(), resources.Zero), "allocated resource has changed")
assert.Assert(t, resources.Equals(node.GetAvailableResource(), expectedAvailable), "avaiable resource has been updated incorrectly")

// update non-existing allocation
alloc2 := newForeignAllocation(foreignAlloc2, nodeID1, allocRes)
prev = node.UpdateForeignAllocation(alloc2)
assert.Assert(t, prev == nil, "unexpected previous allocation returned")
assert.Assert(t, node.GetAllocation(foreignAlloc2) == alloc2, "foreign allocation not found")
}
43 changes: 26 additions & 17 deletions pkg/scheduler/partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ type PartitionContext struct {
reservations int // number of reservations
placeholderAllocations int // number of placeholder allocations
preemptionEnabled bool // whether preemption is enabled or not
foreignAllocs map[string]string // allocKey-nodeID assignment of non-Yunikorn allocations
foreignAllocs map[string]*objects.Allocation // foreign (non-Yunikorn) allocations

// The partition write lock must not be held while manipulating an application.
// Scheduling is running continuously as a lock free background task. Scheduling an application
Expand Down Expand Up @@ -95,7 +95,7 @@ func newPartitionContext(conf configs.PartitionConfig, rmID string, cc *ClusterC
applications: make(map[string]*objects.Application),
completedApplications: make(map[string]*objects.Application),
nodes: objects.NewNodeCollection(conf.Name),
foreignAllocs: make(map[string]string),
foreignAllocs: make(map[string]*objects.Allocation),
}
pc.partitionManager = newPartitionManager(pc, cc)
if err := pc.initialPartitionFromConfig(conf); err != nil {
Expand Down Expand Up @@ -1303,8 +1303,8 @@ func (pc *PartitionContext) handleForeignAllocation(allocationKey, applicationID
return false, false, fmt.Errorf("failed to find node %s for allocation %s", nodeID, allocationKey)
}

existingNodeID := pc.getOrSetNodeIDForAlloc(allocationKey, nodeID)
if existingNodeID == "" {
exists := pc.getOrStoreForeignAlloc(alloc)
if !exists {
log.Log(log.SchedPartition).Info("handling new foreign allocation",
zap.String("partitionName", pc.Name),
zap.String("nodeID", nodeID),
Expand All @@ -1317,7 +1317,12 @@ func (pc *PartitionContext) handleForeignAllocation(allocationKey, applicationID
zap.String("partitionName", pc.Name),
zap.String("appID", applicationID),
zap.String("allocationKey", allocationKey))
// this is a placeholder for eventual resource updates; nothing to do yet
prev := node.UpdateForeignAllocation(alloc)
if prev == nil {
log.Log(log.SchedPartition).Warn("BUG: previous allocation not found during update",
zap.String("allocationKey", allocationKey))
}

return false, false, nil
}

Expand All @@ -1327,16 +1332,17 @@ func (pc *PartitionContext) convertUGI(ugi *si.UserGroupInformation, forced bool
return pc.userGroupCache.ConvertUGI(ugi, forced)
}

// getOrSetNodeIDForAlloc returns the nodeID for a given foreign allocation, or sets is if it's unset
func (pc *PartitionContext) getOrSetNodeIDForAlloc(allocKey, nodeID string) string {
// getOrStoreForeignAlloc returns whether the allocation already exists or stores it if it's new
func (pc *PartitionContext) getOrStoreForeignAlloc(alloc *objects.Allocation) bool {
pc.Lock()
defer pc.Unlock()
id := pc.foreignAllocs[allocKey]
if id != "" {
return id
allocKey := alloc.GetAllocationKey()
existing := pc.foreignAllocs[allocKey]
if existing == nil {
pc.foreignAllocs[allocKey] = alloc
return false
}
pc.foreignAllocs[allocKey] = nodeID
return ""
return true
}

// calculate overall nodes resource usage and returns a map as the result,
Expand Down Expand Up @@ -1548,14 +1554,17 @@ func (pc *PartitionContext) removeAllocation(release *si.AllocationRelease) ([]*
}

func (pc *PartitionContext) removeForeignAllocation(allocID string) {
nodeID := pc.foreignAllocs[allocID]
if nodeID == "" {
pc.Lock()
defer pc.Unlock()
alloc := pc.foreignAllocs[allocID]
delete(pc.foreignAllocs, allocID)
if alloc == nil {
log.Log(log.SchedPartition).Debug("Tried to remove a non-existing foreign allocation",
zap.String("allocationID", allocID),
zap.String("nodeID", nodeID))
zap.String("allocationID", allocID))
return
}
delete(pc.foreignAllocs, allocID)

nodeID := alloc.GetNodeID()
node := pc.GetNode(nodeID)
if node == nil {
log.Log(log.SchedPartition).Debug("Node not found for foreign allocation",
Expand Down
73 changes: 62 additions & 11 deletions pkg/scheduler/partition_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4706,13 +4706,13 @@ func TestPlaceholderAllocationAndReplacementAfterRecovery(t *testing.T) {
assert.Equal(t, "tg-1", confirmed.GetTaskGroup())
}

func TestForeignAllocation(t *testing.T) {
func TestForeignAllocation(t *testing.T) { //nolint:funlen
setupUGM()
partition, err := newBasePartition()
assert.NilError(t, err, "partition create failed")
nodeRes := resources.NewResourceFromMap(map[string]resources.Quantity{"first": 10})
node1 := newNodeMaxResource(nodeID1, nodeRes)
err = partition.AddNode(node1)
node := newNodeMaxResource(nodeID1, nodeRes)
err = partition.AddNode(node)
assert.NilError(t, err)

// error: adding request (non-allocation)
Expand All @@ -4721,32 +4721,68 @@ func TestForeignAllocation(t *testing.T) {
assert.Assert(t, !reqCreated)
assert.Assert(t, !allocCreated)
assert.Error(t, err, "trying to add a foreign request (non-allocation) foreign-nonalloc")
assert.Equal(t, 0, len(partition.foreignAllocs))

// error: empty node ID
req = newForeignAllocation(foreignAlloc1, "")
req = newForeignAllocation(foreignAlloc1, "", resources.Zero)
reqCreated, allocCreated, err = partition.UpdateAllocation(req)
assert.Assert(t, !reqCreated)
assert.Assert(t, !allocCreated)
assert.Error(t, err, "node ID is empty for allocation foreign-alloc-1")
assert.Equal(t, 0, len(partition.foreignAllocs))

// error: no node found
req = newForeignAllocation(foreignAlloc1, nodeID2)
req = newForeignAllocation(foreignAlloc1, nodeID2, resources.Zero)
reqCreated, allocCreated, err = partition.UpdateAllocation(req)
assert.Assert(t, !reqCreated)
assert.Assert(t, !allocCreated)
assert.Error(t, err, "failed to find node node-2 for allocation foreign-alloc-1")
assert.Equal(t, 0, len(partition.foreignAllocs))

// add new allocation
req = newForeignAllocation(foreignAlloc1, nodeID1)
allocRes := resources.NewResourceFromMap(map[string]resources.Quantity{"first": 1})
req = newForeignAllocation(foreignAlloc1, nodeID1, allocRes)
reqCreated, allocCreated, err = partition.UpdateAllocation(req)
assert.Assert(t, !reqCreated)
assert.Assert(t, allocCreated)
assert.NilError(t, err)
assert.Equal(t, 1, len(partition.foreignAllocs))
assert.Equal(t, nodeID1, partition.foreignAllocs[foreignAlloc1])
assert.Equal(t, 0, len(node1.GetYunikornAllocations()))
assert.Assert(t, node1.GetAllocation(foreignAlloc1) != nil)
assert.Equal(t, 0, len(node.GetYunikornAllocations()))
assert.Assert(t, node.GetAllocation(foreignAlloc1) != nil)
occupied := node.GetOccupiedResource().Clone()
available := node.GetAvailableResource().Clone()
allocated := node.GetAllocatedResource().Clone()
assert.Assert(t, resources.Equals(occupied, allocRes), "occupied resources has been calculated incorrectly")
assert.Assert(t, resources.Equals(allocated, resources.Zero), "allocated resources has changed")
expectedAvailable := resources.NewResourceFromMap(map[string]resources.Quantity{"first": 9})
assert.Assert(t, resources.Equals(available, expectedAvailable), "available resources has been calculated incorrectly")

// update resources
updatedRes := resources.NewResourceFromMap(map[string]resources.Quantity{"first": 2})
update := newForeignAllocation(foreignAlloc1, nodeID1, updatedRes)
reqCreated, allocCreated, err = partition.UpdateAllocation(update)
assert.Assert(t, !reqCreated)
assert.Assert(t, !allocCreated)
assert.NilError(t, err)
updatedOccupied := node.GetOccupiedResource().Clone()
updatedAvailable := node.GetAvailableResource().Clone()
updatedAllocated := node.GetAllocatedResource().Clone()
assert.Assert(t, resources.Equals(updatedOccupied, updatedRes), "occupied resources has been updated incorrectly")
assert.Assert(t, resources.Equals(updatedAllocated, resources.Zero), "allocated resources has changed")
expectedAvailable = resources.NewResourceFromMap(map[string]resources.Quantity{"first": 8})
assert.Assert(t, resources.Equals(updatedAvailable, expectedAvailable), "available resources has been updated incorrectly")

// simulate update error
node.RemoveAllocation(foreignAlloc1)
reqCreated, allocCreated, err = partition.UpdateAllocation(update) // should re-add alloc object to the node
assert.Assert(t, !reqCreated)
assert.Assert(t, !allocCreated)
assert.NilError(t, err)
assert.Assert(t, node.GetAllocation(foreignAlloc1) != nil)
assert.Assert(t, resources.Equals(updatedOccupied, updatedRes), "occupied resources has been updated incorrectly")
assert.Assert(t, resources.Equals(updatedAllocated, resources.Zero), "allocated resources has changed")
expectedAvailable = resources.NewResourceFromMap(map[string]resources.Quantity{"first": 8})
assert.Assert(t, resources.Equals(updatedAvailable, expectedAvailable), "available resources has been updated incorrectly")

// remove allocation
released, confirmed := partition.removeAllocation(&si.AllocationRelease{
Expand All @@ -4755,6 +4791,21 @@ func TestForeignAllocation(t *testing.T) {
assert.Assert(t, released == nil)
assert.Assert(t, confirmed == nil)
assert.Equal(t, 0, len(partition.foreignAllocs))
assert.Equal(t, 0, len(node1.GetYunikornAllocations()))
assert.Assert(t, node1.GetAllocation(foreignAlloc1) == nil)
assert.Equal(t, 0, len(node.GetYunikornAllocations()))
assert.Assert(t, node.GetAllocation(foreignAlloc1) == nil)

// add + simulate removal failure
req = newForeignAllocation(foreignAlloc2, nodeID1, allocRes)
reqCreated, allocCreated, err = partition.UpdateAllocation(req)
assert.NilError(t, err)
assert.Assert(t, !reqCreated)
assert.Assert(t, allocCreated)
partition.nodes.RemoveNode("node-1") // node gets removed
released, confirmed = partition.removeAllocation(&si.AllocationRelease{
AllocationKey: foreignAlloc2,
})
assert.Assert(t, released == nil)
assert.Assert(t, confirmed == nil)
assert.Equal(t, 0, len(partition.foreignAllocs))
assert.Equal(t, 0, len(node.GetYunikornAllocations()))
}
62 changes: 43 additions & 19 deletions pkg/scheduler/tests/operation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -564,7 +564,7 @@ partitions:
}
}

func TestForeignPodResourceUsage(t *testing.T) {
func TestForeignPodResourceUsage(t *testing.T) { //nolint:funlen
// Register RM
configData := `
partitions:
Expand Down Expand Up @@ -617,12 +617,10 @@ partitions:
waitForNewNode(t, context, "node-1:1234", "[rm:123]default", 1000)

assert.Equal(t, len(partitionInfo.GetNodes()), 1)
node1 := partitionInfo.GetNode("node-1:1234")
assert.Equal(t, int64(node1.GetCapacity().Resources[common.Memory]), int64(100))
schedulingNode1 := ms.scheduler.GetClusterContext().
GetNode("node-1:1234", "[rm:123]default")
assert.Equal(t, int64(schedulingNode1.GetAllocatedResource().Resources[common.Memory]), int64(0))
assert.Equal(t, int64(schedulingNode1.GetAvailableResource().Resources[common.Memory]), int64(100))
node := partitionInfo.GetNode("node-1:1234")
assert.Equal(t, int64(node.GetCapacity().Resources[common.Memory]), int64(100))
assert.Equal(t, int64(node.GetAllocatedResource().Resources[common.Memory]), int64(0))
assert.Equal(t, int64(node.GetAvailableResource().Resources[common.Memory]), int64(100))

// update node capacity - add foreign pod
res := resources.NewResourceFromMap(map[string]resources.Quantity{
Expand All @@ -643,12 +641,38 @@ partitions:
assert.NilError(t, err, "NodeRequest failed")
waitForAvailableNodeResource(t, ms.scheduler.GetClusterContext(), "[rm:123]default",
[]string{"node-1:1234"}, 20, 1000)
assert.Equal(t, int64(node1.GetCapacity().Resources[common.Memory]), int64(100))
assert.Equal(t, int64(node1.GetCapacity().Resources[common.CPU]), int64(10))
assert.Equal(t, int64(node1.GetOccupiedResource().Resources[common.Memory]), int64(80))
assert.Equal(t, int64(node1.GetOccupiedResource().Resources[common.CPU]), int64(5))
assert.Equal(t, int64(schedulingNode1.GetAllocatedResource().Resources[common.Memory]), int64(0))
assert.Equal(t, int64(schedulingNode1.GetAvailableResource().Resources[common.Memory]), int64(20))
assert.Equal(t, int64(node.GetCapacity().Resources[common.Memory]), int64(100))
assert.Equal(t, int64(node.GetCapacity().Resources[common.CPU]), int64(10))
assert.Equal(t, int64(node.GetOccupiedResource().Resources[common.Memory]), int64(80))
assert.Equal(t, int64(node.GetOccupiedResource().Resources[common.CPU]), int64(5))
assert.Equal(t, int64(node.GetAllocatedResource().Resources[common.Memory]), int64(0))
assert.Equal(t, int64(node.GetAvailableResource().Resources[common.Memory]), int64(20))

// update resource usage
resUpd := resources.NewResourceFromMap(map[string]resources.Quantity{
"memory": 50, "vcore": 10})
err = ms.proxy.UpdateAllocation(&si.AllocationRequest{
Allocations: []*si.Allocation{
{
AllocationKey: "foreignpod-1",
ResourcePerAlloc: resUpd.ToProto(),
AllocationTags: map[string]string{
common.Foreign: common.AllocTypeDefault,
},
NodeID: "node-1:1234",
},
},
RmID: "rm:123",
})
assert.NilError(t, err)
waitForAvailableNodeResource(t, ms.scheduler.GetClusterContext(), "[rm:123]default",
[]string{"node-1:1234"}, 50, 1000)
assert.Equal(t, int64(node.GetCapacity().Resources[common.Memory]), int64(100))
assert.Equal(t, int64(node.GetCapacity().Resources[common.CPU]), int64(10))
assert.Equal(t, int64(node.GetOccupiedResource().Resources[common.Memory]), int64(50))
assert.Equal(t, int64(node.GetOccupiedResource().Resources[common.CPU]), int64(10))
assert.Equal(t, int64(node.GetAllocatedResource().Resources[common.Memory]), int64(0))
assert.Equal(t, int64(node.GetAvailableResource().Resources[common.Memory]), int64(50))

// update node capacity - remove foreign pod
err = ms.proxy.UpdateAllocation(&si.AllocationRequest{
Expand All @@ -664,10 +688,10 @@ partitions:
assert.NilError(t, err)
waitForAvailableNodeResource(t, ms.scheduler.GetClusterContext(), "[rm:123]default",
[]string{"node-1:1234"}, 100, 1000)
assert.Equal(t, int64(node1.GetCapacity().Resources[common.Memory]), int64(100))
assert.Equal(t, int64(node1.GetCapacity().Resources[common.CPU]), int64(10))
assert.Equal(t, int64(node1.GetOccupiedResource().Resources[common.Memory]), int64(0))
assert.Equal(t, int64(node1.GetOccupiedResource().Resources[common.CPU]), int64(0))
assert.Equal(t, int64(schedulingNode1.GetAllocatedResource().Resources[common.Memory]), int64(0))
assert.Equal(t, int64(schedulingNode1.GetAvailableResource().Resources[common.Memory]), int64(100))
assert.Equal(t, int64(node.GetCapacity().Resources[common.Memory]), int64(100))
assert.Equal(t, int64(node.GetCapacity().Resources[common.CPU]), int64(10))
assert.Equal(t, int64(node.GetOccupiedResource().Resources[common.Memory]), int64(0))
assert.Equal(t, int64(node.GetOccupiedResource().Resources[common.CPU]), int64(0))
assert.Equal(t, int64(node.GetAllocatedResource().Resources[common.Memory]), int64(0))
assert.Equal(t, int64(node.GetAvailableResource().Resources[common.Memory]), int64(100))
}
6 changes: 4 additions & 2 deletions pkg/scheduler/utilities_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ const (
maxresources = "maxresources"
maxapplications = "maxapplications"
foreignAlloc1 = "foreign-alloc-1"
foreignAlloc2 = "foreign-alloc-2"
)

func newBasePartitionNoRootDefault() (*PartitionContext, error) {
Expand Down Expand Up @@ -610,7 +611,7 @@ func newForeignRequest(allocKey string) *objects.Allocation {
})
}

func newForeignAllocation(allocKey, nodeID string) *objects.Allocation {
func newForeignAllocation(allocKey, nodeID string, allocated *resources.Resource) *objects.Allocation {
var alloc *objects.Allocation
defer func() {
if nodeID == "" {
Expand All @@ -626,7 +627,8 @@ func newForeignAllocation(allocKey, nodeID string) *objects.Allocation {
AllocationTags: map[string]string{
siCommon.Foreign: siCommon.AllocTypeDefault,
},
NodeID: id,
NodeID: id,
ResourcePerAlloc: allocated.ToProto(),
})
return alloc
}
Expand Down

0 comments on commit 7057201

Please sign in to comment.