Skip to content

Commit

Permalink
[YUNIKORN-2928] [core] Update foreign pod resource usage
Browse files Browse the repository at this point in the history
  • Loading branch information
pbacsko committed Oct 30, 2024
1 parent 44705ae commit 300304e
Show file tree
Hide file tree
Showing 7 changed files with 190 additions and 41 deletions.
2 changes: 1 addition & 1 deletion pkg/scheduler/health_checker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ func TestGetSchedulerHealthStatusContext(t *testing.T) {
assert.Assert(t, healthInfo.HealthChecks[8].Succeeded, "The orphan allocation check on the app still fails after removing the app")

// check that foreign allocation does not interfere with health check
falloc := newForeignAllocation("foreign-1", "node")
falloc := newForeignAllocation("foreign-1", "node", resources.Zero)
node.AddAllocation(falloc)
healthInfo = GetSchedulerHealthStatus(schedulerMetrics, schedulerContext)
assert.Assert(t, healthInfo.HealthChecks[7].Succeeded, "Foreign allocation was detected as orphan")
Expand Down
25 changes: 25 additions & 0 deletions pkg/scheduler/objects/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -364,6 +364,31 @@ func (sn *Node) AddAllocation(alloc *Allocation) {
_ = sn.addAllocationInternal(alloc, true)
}

// UpdateForeignAllocation updates a foreign allocation and re-calculates the available/occupied resources
func (sn *Node) UpdateForeignAllocation(alloc *Allocation) *Allocation {
sn.Lock()
defer sn.Unlock()
key := alloc.GetAllocationKey()
existing := sn.allocations[key]
sn.allocations[key] = alloc
if existing == nil {
log.Log(log.SchedNode).Debug("unknown allocation to update",
zap.String("allocationKey", key))
return nil
}

existingResource := existing.GetAllocatedResource().Clone()
newResource := alloc.GetAllocatedResource().Clone()
delta := resources.Sub(newResource, existingResource)
delta.Prune()

sn.occupiedResource.AddTo(delta)
sn.occupiedResource.Prune()
sn.refreshAvailableResource()

return existing
}

func (sn *Node) addAllocationInternal(alloc *Allocation, force bool) bool {
if alloc == nil {
return false
Expand Down
24 changes: 24 additions & 0 deletions pkg/scheduler/objects/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -963,3 +963,27 @@ func TestGetAllocations(t *testing.T) {
assert.Assert(t, m[foreignAlloc2])
})
}

func TestUpdateForeignAllocation(t *testing.T) {
node := newNode(nodeID1, map[string]resources.Quantity{"first": 100, "second": 200})
allocRes := resources.NewResourceFromMap(map[string]resources.Quantity{"first": 10, "second": 20})
alloc := newForeignAllocation(foreignAlloc1, nodeID1, allocRes)
node.AddAllocation(alloc)

// update existing allocation
updatedRes := resources.NewResourceFromMap(map[string]resources.Quantity{"first": 15, "second": 0})
allocUpd := newForeignAllocation(foreignAlloc1, nodeID1, updatedRes)
prev := node.UpdateForeignAllocation(allocUpd)
expectedOccupied := resources.NewResourceFromMap(map[string]resources.Quantity{"first": 15})
expectedAvailable := resources.NewResourceFromMap(map[string]resources.Quantity{"first": 85, "second": 200})
assert.Assert(t, prev == alloc, "returned previous allocation is different")
assert.Assert(t, resources.Equals(node.GetOccupiedResource(), expectedOccupied), "occupied resource has been updated incorrectly")
assert.Assert(t, resources.Equals(node.GetAllocatedResource(), resources.Zero), "allocated resource has changed")
assert.Assert(t, resources.Equals(node.GetAvailableResource(), expectedAvailable), "avaiable resource has been updated incorrectly")

// update non-existing allocation
alloc2 := newForeignAllocation(foreignAlloc2, nodeID1, allocRes)
prev = node.UpdateForeignAllocation(alloc2)
assert.Assert(t, prev == nil, "unexpected previous allocation returned")
assert.Assert(t, node.GetAllocation(foreignAlloc2) == alloc2, "foreign allocation not found")
}
32 changes: 24 additions & 8 deletions pkg/scheduler/partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,8 @@ type PartitionContext struct {
reservations int // number of reservations
placeholderAllocations int // number of placeholder allocations
preemptionEnabled bool // whether preemption is enabled or not
foreignAllocs map[string]string // allocKey-nodeID assignment of non-Yunikorn allocations
foreignAllocs map[string]*objects.Allocation // foreign (non-Yunikorn) allocations
foreignNodes map[string]string // allocKey-nodeID assignment of non-Yunikorn allocations

// The partition write lock must not be held while manipulating an application.
// Scheduling is running continuously as a lock free background task. Scheduling an application
Expand Down Expand Up @@ -95,7 +96,8 @@ func newPartitionContext(conf configs.PartitionConfig, rmID string, cc *ClusterC
applications: make(map[string]*objects.Application),
completedApplications: make(map[string]*objects.Application),
nodes: objects.NewNodeCollection(conf.Name),
foreignAllocs: make(map[string]string),
foreignAllocs: make(map[string]*objects.Allocation),
foreignNodes: make(map[string]string),
}
pc.partitionManager = newPartitionManager(pc, cc)
if err := pc.initialPartitionFromConfig(conf); err != nil {
Expand Down Expand Up @@ -1310,14 +1312,20 @@ func (pc *PartitionContext) handleForeignAllocation(allocationKey, applicationID
zap.String("nodeID", nodeID),
zap.String("allocationKey", allocationKey))
node.AddAllocation(alloc)
pc.foreignAllocs[allocationKey] = alloc
return false, true, nil
}

log.Log(log.SchedPartition).Info("handling foreign allocation update",
zap.String("partitionName", pc.Name),
zap.String("appID", applicationID),
zap.String("allocationKey", allocationKey))
// this is a placeholder for eventual resource updates; nothing to do yet
prev := node.UpdateForeignAllocation(alloc)
if prev == nil {
log.Log(log.SchedPartition).Warn("BUG: previous allocation not found during update",
zap.String("allocationKey", allocationKey))
}

return false, false, nil
}

Expand All @@ -1331,11 +1339,11 @@ func (pc *PartitionContext) convertUGI(ugi *si.UserGroupInformation, forced bool
func (pc *PartitionContext) getOrSetNodeIDForAlloc(allocKey, nodeID string) string {
pc.Lock()
defer pc.Unlock()
id := pc.foreignAllocs[allocKey]
id := pc.foreignNodes[allocKey]
if id != "" {
return id
}
pc.foreignAllocs[allocKey] = nodeID
pc.foreignNodes[allocKey] = nodeID
return ""
}

Expand Down Expand Up @@ -1548,14 +1556,22 @@ func (pc *PartitionContext) removeAllocation(release *si.AllocationRelease) ([]*
}

func (pc *PartitionContext) removeForeignAllocation(allocID string) {
nodeID := pc.foreignAllocs[allocID]
if nodeID == "" {
alloc := pc.foreignAllocs[allocID]
delete(pc.foreignAllocs, allocID)
if alloc == nil {
log.Log(log.SchedPartition).Debug("Tried to remove a non-existing foreign allocation",
zap.String("allocationID", allocID))
}

nodeID := pc.foreignNodes[allocID]
delete(pc.foreignNodes, allocID)
if nodeID == "" {
log.Log(log.SchedPartition).Debug("Assigned node not found for foreign allocation",
zap.String("allocationID", allocID),
zap.String("nodeID", nodeID))
return
}
delete(pc.foreignAllocs, allocID)

node := pc.GetNode(nodeID)
if node == nil {
log.Log(log.SchedPartition).Debug("Node not found for foreign allocation",
Expand Down
80 changes: 69 additions & 11 deletions pkg/scheduler/partition_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4690,13 +4690,13 @@ func TestPlaceholderAllocationAndReplacementAfterRecovery(t *testing.T) {
assert.Equal(t, "tg-1", confirmed.GetTaskGroup())
}

func TestForeignAllocation(t *testing.T) {
func TestForeignAllocation(t *testing.T) { //nolint:funlen
setupUGM()
partition, err := newBasePartition()
assert.NilError(t, err, "partition create failed")
nodeRes := resources.NewResourceFromMap(map[string]resources.Quantity{"first": 10})
node1 := newNodeMaxResource(nodeID1, nodeRes)
err = partition.AddNode(node1)
node := newNodeMaxResource(nodeID1, nodeRes)
err = partition.AddNode(node)
assert.NilError(t, err)

// error: adding request (non-allocation)
Expand All @@ -4705,32 +4705,73 @@ func TestForeignAllocation(t *testing.T) {
assert.Assert(t, !reqCreated)
assert.Assert(t, !allocCreated)
assert.Error(t, err, "trying to add a foreign request (non-allocation) foreign-nonalloc")
assert.Equal(t, 0, len(partition.foreignAllocs))
assert.Equal(t, 0, len(partition.foreignNodes))

// error: empty node ID
req = newForeignAllocation(foreignAlloc1, "")
req = newForeignAllocation(foreignAlloc1, "", resources.Zero)
reqCreated, allocCreated, err = partition.UpdateAllocation(req)
assert.Assert(t, !reqCreated)
assert.Assert(t, !allocCreated)
assert.Error(t, err, "node ID is empty for allocation foreign-alloc-1")
assert.Equal(t, 0, len(partition.foreignAllocs))
assert.Equal(t, 0, len(partition.foreignNodes))

// error: no node found
req = newForeignAllocation(foreignAlloc1, nodeID2)
req = newForeignAllocation(foreignAlloc1, nodeID2, resources.Zero)
reqCreated, allocCreated, err = partition.UpdateAllocation(req)
assert.Assert(t, !reqCreated)
assert.Assert(t, !allocCreated)
assert.Error(t, err, "failed to find node node-2 for allocation foreign-alloc-1")
assert.Equal(t, 0, len(partition.foreignAllocs))
assert.Equal(t, 0, len(partition.foreignNodes))

// add new allocation
req = newForeignAllocation(foreignAlloc1, nodeID1)
allocRes := resources.NewResourceFromMap(map[string]resources.Quantity{"first": 1})
req = newForeignAllocation(foreignAlloc1, nodeID1, allocRes)
reqCreated, allocCreated, err = partition.UpdateAllocation(req)
assert.Assert(t, !reqCreated)
assert.Assert(t, allocCreated)
assert.NilError(t, err)
assert.Equal(t, 1, len(partition.foreignAllocs))
assert.Equal(t, nodeID1, partition.foreignAllocs[foreignAlloc1])
assert.Equal(t, 0, len(node1.GetYunikornAllocations()))
assert.Assert(t, node1.GetAllocation(foreignAlloc1) != nil)
assert.Equal(t, 1, len(partition.foreignNodes))
assert.Equal(t, nodeID1, partition.foreignNodes[foreignAlloc1])
assert.Equal(t, 0, len(node.GetYunikornAllocations()))
assert.Assert(t, node.GetAllocation(foreignAlloc1) != nil)
occupied := node.GetOccupiedResource().Clone()
available := node.GetAvailableResource().Clone()
allocated := node.GetAllocatedResource().Clone()
assert.Assert(t, resources.Equals(occupied, allocRes), "occupied resources has been calculated incorrectly")
assert.Assert(t, resources.Equals(allocated, resources.Zero), "allocated resources has changed")
expectedAvailable := resources.NewResourceFromMap(map[string]resources.Quantity{"first": 9})
assert.Assert(t, resources.Equals(available, expectedAvailable), "available resources has been calculated incorrectly")

// update resources
updatedRes := resources.NewResourceFromMap(map[string]resources.Quantity{"first": 2})
update := newForeignAllocation(foreignAlloc1, nodeID1, updatedRes)
reqCreated, allocCreated, err = partition.UpdateAllocation(update)
assert.Assert(t, !reqCreated)
assert.Assert(t, !allocCreated)
assert.NilError(t, err)
updatedOccupied := node.GetOccupiedResource().Clone()
updatedAvailable := node.GetAvailableResource().Clone()
updatedAllocated := node.GetAllocatedResource().Clone()
assert.Assert(t, resources.Equals(updatedOccupied, updatedRes), "occupied resources has been updated incorrectly")
assert.Assert(t, resources.Equals(updatedAllocated, resources.Zero), "allocated resources has changed")
expectedAvailable = resources.NewResourceFromMap(map[string]resources.Quantity{"first": 8})
assert.Assert(t, resources.Equals(updatedAvailable, expectedAvailable), "available resources has been updated incorrectly")

// simulate update error
node.RemoveAllocation(foreignAlloc1)
reqCreated, allocCreated, err = partition.UpdateAllocation(update) // should re-add alloc object to the node
assert.Assert(t, !reqCreated)
assert.Assert(t, !allocCreated)
assert.NilError(t, err)
assert.Assert(t, node.GetAllocation(foreignAlloc1) != nil)
assert.Assert(t, resources.Equals(updatedOccupied, updatedRes), "occupied resources has been updated incorrectly")
assert.Assert(t, resources.Equals(updatedAllocated, resources.Zero), "allocated resources has changed")
expectedAvailable = resources.NewResourceFromMap(map[string]resources.Quantity{"first": 8})
assert.Assert(t, resources.Equals(updatedAvailable, expectedAvailable), "available resources has been updated incorrectly")

// remove allocation
released, confirmed := partition.removeAllocation(&si.AllocationRelease{
Expand All @@ -4739,6 +4780,23 @@ func TestForeignAllocation(t *testing.T) {
assert.Assert(t, released == nil)
assert.Assert(t, confirmed == nil)
assert.Equal(t, 0, len(partition.foreignAllocs))
assert.Equal(t, 0, len(node1.GetYunikornAllocations()))
assert.Assert(t, node1.GetAllocation(foreignAlloc1) == nil)
assert.Equal(t, 0, len(partition.foreignNodes))
assert.Equal(t, 0, len(node.GetYunikornAllocations()))
assert.Assert(t, node.GetAllocation(foreignAlloc1) == nil)

// add + simulate removal failure
req = newForeignAllocation(foreignAlloc2, nodeID1, allocRes)
reqCreated, allocCreated, err = partition.UpdateAllocation(req)
assert.NilError(t, err)
assert.Assert(t, !reqCreated)
assert.Assert(t, !allocCreated)
partition.nodes.RemoveNode("node-1") // node gets removed
released, confirmed = partition.removeAllocation(&si.AllocationRelease{
AllocationKey: foreignAlloc2,
})
assert.Assert(t, released == nil)
assert.Assert(t, confirmed == nil)
assert.Equal(t, 0, len(partition.foreignAllocs))
assert.Equal(t, 0, len(partition.foreignNodes))
assert.Equal(t, 0, len(node.GetYunikornAllocations()))
}
62 changes: 43 additions & 19 deletions pkg/scheduler/tests/operation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -655,7 +655,7 @@ partitions:
assert.Equal(t, int64(schedulingNode1.GetAvailableResource().Resources[common.Memory]), int64(20))
}

func TestForeignPodResourceUsage(t *testing.T) {
func TestForeignPodResourceUsage(t *testing.T) { //nolint:funlen
// Register RM
configData := `
partitions:
Expand Down Expand Up @@ -708,12 +708,10 @@ partitions:
waitForNewNode(t, context, "node-1:1234", "[rm:123]default", 1000)

assert.Equal(t, len(partitionInfo.GetNodes()), 1)
node1 := partitionInfo.GetNode("node-1:1234")
assert.Equal(t, int64(node1.GetCapacity().Resources[common.Memory]), int64(100))
schedulingNode1 := ms.scheduler.GetClusterContext().
GetNode("node-1:1234", "[rm:123]default")
assert.Equal(t, int64(schedulingNode1.GetAllocatedResource().Resources[common.Memory]), int64(0))
assert.Equal(t, int64(schedulingNode1.GetAvailableResource().Resources[common.Memory]), int64(100))
node := partitionInfo.GetNode("node-1:1234")
assert.Equal(t, int64(node.GetCapacity().Resources[common.Memory]), int64(100))
assert.Equal(t, int64(node.GetAllocatedResource().Resources[common.Memory]), int64(0))
assert.Equal(t, int64(node.GetAvailableResource().Resources[common.Memory]), int64(100))

// update node capacity - add foreign pod
res := resources.NewResourceFromMap(map[string]resources.Quantity{
Expand All @@ -734,12 +732,38 @@ partitions:
assert.NilError(t, err, "NodeRequest failed")
waitForAvailableNodeResource(t, ms.scheduler.GetClusterContext(), "[rm:123]default",
[]string{"node-1:1234"}, 20, 1000)
assert.Equal(t, int64(node1.GetCapacity().Resources[common.Memory]), int64(100))
assert.Equal(t, int64(node1.GetCapacity().Resources[common.CPU]), int64(10))
assert.Equal(t, int64(node1.GetOccupiedResource().Resources[common.Memory]), int64(80))
assert.Equal(t, int64(node1.GetOccupiedResource().Resources[common.CPU]), int64(5))
assert.Equal(t, int64(schedulingNode1.GetAllocatedResource().Resources[common.Memory]), int64(0))
assert.Equal(t, int64(schedulingNode1.GetAvailableResource().Resources[common.Memory]), int64(20))
assert.Equal(t, int64(node.GetCapacity().Resources[common.Memory]), int64(100))
assert.Equal(t, int64(node.GetCapacity().Resources[common.CPU]), int64(10))
assert.Equal(t, int64(node.GetOccupiedResource().Resources[common.Memory]), int64(80))
assert.Equal(t, int64(node.GetOccupiedResource().Resources[common.CPU]), int64(5))
assert.Equal(t, int64(node.GetAllocatedResource().Resources[common.Memory]), int64(0))
assert.Equal(t, int64(node.GetAvailableResource().Resources[common.Memory]), int64(20))

// update resource usage
resUpd := resources.NewResourceFromMap(map[string]resources.Quantity{
"memory": 50, "vcore": 10})
err = ms.proxy.UpdateAllocation(&si.AllocationRequest{
Allocations: []*si.Allocation{
{
AllocationKey: "foreignpod-1",
ResourcePerAlloc: resUpd.ToProto(),
AllocationTags: map[string]string{
common.Foreign: common.AllocTypeDefault,
},
NodeID: "node-1:1234",
},
},
RmID: "rm:123",
})
assert.NilError(t, err)
waitForAvailableNodeResource(t, ms.scheduler.GetClusterContext(), "[rm:123]default",
[]string{"node-1:1234"}, 50, 1000)
assert.Equal(t, int64(node.GetCapacity().Resources[common.Memory]), int64(100))
assert.Equal(t, int64(node.GetCapacity().Resources[common.CPU]), int64(10))
assert.Equal(t, int64(node.GetOccupiedResource().Resources[common.Memory]), int64(50))
assert.Equal(t, int64(node.GetOccupiedResource().Resources[common.CPU]), int64(10))
assert.Equal(t, int64(node.GetAllocatedResource().Resources[common.Memory]), int64(0))
assert.Equal(t, int64(node.GetAvailableResource().Resources[common.Memory]), int64(50))

// update node capacity - remove foreign pod
err = ms.proxy.UpdateAllocation(&si.AllocationRequest{
Expand All @@ -755,10 +779,10 @@ partitions:
assert.NilError(t, err)
waitForAvailableNodeResource(t, ms.scheduler.GetClusterContext(), "[rm:123]default",
[]string{"node-1:1234"}, 100, 1000)
assert.Equal(t, int64(node1.GetCapacity().Resources[common.Memory]), int64(100))
assert.Equal(t, int64(node1.GetCapacity().Resources[common.CPU]), int64(10))
assert.Equal(t, int64(node1.GetOccupiedResource().Resources[common.Memory]), int64(0))
assert.Equal(t, int64(node1.GetOccupiedResource().Resources[common.CPU]), int64(0))
assert.Equal(t, int64(schedulingNode1.GetAllocatedResource().Resources[common.Memory]), int64(0))
assert.Equal(t, int64(schedulingNode1.GetAvailableResource().Resources[common.Memory]), int64(100))
assert.Equal(t, int64(node.GetCapacity().Resources[common.Memory]), int64(100))
assert.Equal(t, int64(node.GetCapacity().Resources[common.CPU]), int64(10))
assert.Equal(t, int64(node.GetOccupiedResource().Resources[common.Memory]), int64(0))
assert.Equal(t, int64(node.GetOccupiedResource().Resources[common.CPU]), int64(0))
assert.Equal(t, int64(node.GetAllocatedResource().Resources[common.Memory]), int64(0))
assert.Equal(t, int64(node.GetAvailableResource().Resources[common.Memory]), int64(100))
}
6 changes: 4 additions & 2 deletions pkg/scheduler/utilities_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ const (
maxresources = "maxresources"
maxapplications = "maxapplications"
foreignAlloc1 = "foreign-alloc-1"
foreignAlloc2 = "foreign-alloc-2"
)

func newBasePartitionNoRootDefault() (*PartitionContext, error) {
Expand Down Expand Up @@ -610,7 +611,7 @@ func newForeignRequest(allocKey string) *objects.Allocation {
})
}

func newForeignAllocation(allocKey, nodeID string) *objects.Allocation {
func newForeignAllocation(allocKey, nodeID string, allocated *resources.Resource) *objects.Allocation {
var alloc *objects.Allocation
defer func() {
if nodeID == "" {
Expand All @@ -626,7 +627,8 @@ func newForeignAllocation(allocKey, nodeID string) *objects.Allocation {
AllocationTags: map[string]string{
siCommon.Foreign: siCommon.AllocTypeDefault,
},
NodeID: id,
NodeID: id,
ResourcePerAlloc: allocated.ToProto(),
})
return alloc
}
Expand Down

0 comments on commit 300304e

Please sign in to comment.