Skip to content

Commit

Permalink
[YUNIKORN-2928] [core] Update foreign pod resource usage
Browse files Browse the repository at this point in the history
  • Loading branch information
pbacsko committed Oct 29, 2024
1 parent 44705ae commit e16451a
Show file tree
Hide file tree
Showing 7 changed files with 124 additions and 15 deletions.
2 changes: 1 addition & 1 deletion pkg/scheduler/health_checker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ func TestGetSchedulerHealthStatusContext(t *testing.T) {
assert.Assert(t, healthInfo.HealthChecks[8].Succeeded, "The orphan allocation check on the app still fails after removing the app")

// check that foreign allocation does not interfere with health check
falloc := newForeignAllocation("foreign-1", "node")
falloc := newForeignAllocation("foreign-1", "node", resources.Zero)
node.AddAllocation(falloc)
healthInfo = GetSchedulerHealthStatus(schedulerMetrics, schedulerContext)
assert.Assert(t, healthInfo.HealthChecks[7].Succeeded, "Foreign allocation was detected as orphan")
Expand Down
13 changes: 13 additions & 0 deletions pkg/scheduler/objects/foreign_allocations.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package objects

type ForeignAllocationsHandler struct {
allocToNode map[string]string // allocKey-nodeID assignment of non-Yunikorn allocations
allocations map[string]*Allocation
}

func NewForeignAllocationsHandler() *ForeignAllocationsHandler {
return &ForeignAllocationsHandler{
allocations: make(map[string]*Allocation),
allocToNode: make(map[string]string),
}
}
25 changes: 25 additions & 0 deletions pkg/scheduler/objects/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -364,6 +364,31 @@ func (sn *Node) AddAllocation(alloc *Allocation) {
_ = sn.addAllocationInternal(alloc, true)
}

// UpdateForeignAllocation updates a foreign allocation and re-calculates the available/occupied resources
func (sn *Node) UpdateForeignAllocation(alloc *Allocation) *Allocation {
sn.Lock()
defer sn.Unlock()
key := alloc.GetAllocationKey()
existing := sn.allocations[key]
sn.allocations[key] = alloc
if existing == nil {
log.Log(log.SchedNode).Debug("unknown allocation to update",
zap.String("allocationKey", key))
return nil
}

existingResource := existing.GetAllocatedResource().Clone()
newResource := alloc.GetAllocatedResource().Clone()
delta := resources.Sub(newResource, existingResource)
delta.Prune()

sn.occupiedResource.AddTo(delta)
sn.occupiedResource.Prune()
sn.refreshAvailableResource()

return existing
}

func (sn *Node) addAllocationInternal(alloc *Allocation, force bool) bool {
if alloc == nil {
return false
Expand Down
24 changes: 24 additions & 0 deletions pkg/scheduler/objects/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -963,3 +963,27 @@ func TestGetAllocations(t *testing.T) {
assert.Assert(t, m[foreignAlloc2])
})
}

func TestUpdateForeignAllocation(t *testing.T) {
node := newNode(nodeID1, map[string]resources.Quantity{"first": 100, "second": 200})
allocRes := resources.NewResourceFromMap(map[string]resources.Quantity{"first": 10, "second": 20})
alloc := newForeignAllocation(foreignAlloc1, nodeID1, allocRes)
node.AddAllocation(alloc)

// update existing allocation
updatedRes := resources.NewResourceFromMap(map[string]resources.Quantity{"first": 15, "second": 0})
allocUpd := newForeignAllocation(foreignAlloc1, nodeID1, updatedRes)
prev := node.UpdateForeignAllocation(allocUpd)
expectedOccupied := resources.NewResourceFromMap(map[string]resources.Quantity{"first": 15})
expectedAvailable := resources.NewResourceFromMap(map[string]resources.Quantity{"first": 85, "second": 200})
assert.Assert(t, prev == alloc, "returned previous allocation is different")
assert.Assert(t, resources.Equals(node.GetOccupiedResource(), expectedOccupied), "occupied resource has been updated incorrectly")
assert.Assert(t, resources.Equals(node.GetAllocatedResource(), resources.Zero), "allocated resource has changed")
assert.Assert(t, resources.Equals(node.GetAvailableResource(), expectedAvailable), "avaiable resource has been updated incorrectly")

// update non-existing allocation
alloc2 := newForeignAllocation(foreignAlloc2, nodeID1, allocRes)
prev = node.UpdateForeignAllocation(alloc2)
assert.Assert(t, prev == nil, "unexpected previous allocation returned")
assert.Assert(t, node.GetAllocation(foreignAlloc2) == alloc2, "foreign allocation not found")
}
32 changes: 24 additions & 8 deletions pkg/scheduler/partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,8 @@ type PartitionContext struct {
reservations int // number of reservations
placeholderAllocations int // number of placeholder allocations
preemptionEnabled bool // whether preemption is enabled or not
foreignAllocs map[string]string // allocKey-nodeID assignment of non-Yunikorn allocations
foreignAllocs map[string]*objects.Allocation // foreign (non-Yunikorn) allocations
foreignNodes map[string]string // allocKey-nodeID assignment of non-Yunikorn allocations

// The partition write lock must not be held while manipulating an application.
// Scheduling is running continuously as a lock free background task. Scheduling an application
Expand Down Expand Up @@ -95,7 +96,8 @@ func newPartitionContext(conf configs.PartitionConfig, rmID string, cc *ClusterC
applications: make(map[string]*objects.Application),
completedApplications: make(map[string]*objects.Application),
nodes: objects.NewNodeCollection(conf.Name),
foreignAllocs: make(map[string]string),
foreignAllocs: make(map[string]*objects.Allocation),
foreignNodes: make(map[string]string),
}
pc.partitionManager = newPartitionManager(pc, cc)
if err := pc.initialPartitionFromConfig(conf); err != nil {
Expand Down Expand Up @@ -1310,14 +1312,20 @@ func (pc *PartitionContext) handleForeignAllocation(allocationKey, applicationID
zap.String("nodeID", nodeID),
zap.String("allocationKey", allocationKey))
node.AddAllocation(alloc)
pc.foreignAllocs[allocationKey] = alloc
return false, true, nil
}

log.Log(log.SchedPartition).Info("handling foreign allocation update",
zap.String("partitionName", pc.Name),
zap.String("appID", applicationID),
zap.String("allocationKey", allocationKey))
// this is a placeholder for eventual resource updates; nothing to do yet
prev := node.UpdateForeignAllocation(alloc)
if prev == nil {
log.Log(log.SchedPartition).Warn("BUG: previous allocation not found during update",
zap.String("allocationKey", allocationKey))
}

return false, false, nil
}

Expand All @@ -1331,11 +1339,11 @@ func (pc *PartitionContext) convertUGI(ugi *si.UserGroupInformation, forced bool
func (pc *PartitionContext) getOrSetNodeIDForAlloc(allocKey, nodeID string) string {
pc.Lock()
defer pc.Unlock()
id := pc.foreignAllocs[allocKey]
id := pc.foreignNodes[allocKey]
if id != "" {
return id
}
pc.foreignAllocs[allocKey] = nodeID
pc.foreignNodes[allocKey] = nodeID
return ""
}

Expand Down Expand Up @@ -1548,14 +1556,22 @@ func (pc *PartitionContext) removeAllocation(release *si.AllocationRelease) ([]*
}

func (pc *PartitionContext) removeForeignAllocation(allocID string) {
nodeID := pc.foreignAllocs[allocID]
if nodeID == "" {
alloc := pc.foreignAllocs[allocID]
delete(pc.foreignAllocs, allocID)
if alloc == nil {
log.Log(log.SchedPartition).Debug("Tried to remove a non-existing foreign allocation",
zap.String("allocationID", allocID))
}

nodeID := pc.foreignNodes[allocID]
delete(pc.foreignNodes, allocID)
if nodeID == "" {
log.Log(log.SchedPartition).Debug("Assigned node not found for foreign allocation",
zap.String("allocationID", allocID),
zap.String("nodeID", nodeID))
return
}
delete(pc.foreignAllocs, allocID)

node := pc.GetNode(nodeID)
if node == nil {
log.Log(log.SchedPartition).Debug("Node not found for foreign allocation",
Expand Down
38 changes: 34 additions & 4 deletions pkg/scheduler/partition_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4705,32 +4705,61 @@ func TestForeignAllocation(t *testing.T) {
assert.Assert(t, !reqCreated)
assert.Assert(t, !allocCreated)
assert.Error(t, err, "trying to add a foreign request (non-allocation) foreign-nonalloc")
assert.Equal(t, 0, len(partition.foreignAllocs))
assert.Equal(t, 0, len(partition.foreignNodes))

// error: empty node ID
req = newForeignAllocation(foreignAlloc1, "")
req = newForeignAllocation(foreignAlloc1, "", resources.Zero)
reqCreated, allocCreated, err = partition.UpdateAllocation(req)
assert.Assert(t, !reqCreated)
assert.Assert(t, !allocCreated)
assert.Error(t, err, "node ID is empty for allocation foreign-alloc-1")
assert.Equal(t, 0, len(partition.foreignAllocs))
assert.Equal(t, 0, len(partition.foreignNodes))

// error: no node found
req = newForeignAllocation(foreignAlloc1, nodeID2)
req = newForeignAllocation(foreignAlloc1, nodeID2, resources.Zero)
reqCreated, allocCreated, err = partition.UpdateAllocation(req)
assert.Assert(t, !reqCreated)
assert.Assert(t, !allocCreated)
assert.Error(t, err, "failed to find node node-2 for allocation foreign-alloc-1")
assert.Equal(t, 0, len(partition.foreignAllocs))
assert.Equal(t, 0, len(partition.foreignNodes))

// add new allocation
req = newForeignAllocation(foreignAlloc1, nodeID1)
allocRes := resources.NewResourceFromMap(map[string]resources.Quantity{"first": 1})
req = newForeignAllocation(foreignAlloc1, nodeID1, allocRes)
reqCreated, allocCreated, err = partition.UpdateAllocation(req)
assert.Assert(t, !reqCreated)
assert.Assert(t, allocCreated)
assert.NilError(t, err)
assert.Equal(t, 1, len(partition.foreignAllocs))
assert.Equal(t, nodeID1, partition.foreignAllocs[foreignAlloc1])
assert.Equal(t, 1, len(partition.foreignNodes))
assert.Equal(t, nodeID1, partition.foreignNodes[foreignAlloc1])
assert.Equal(t, 0, len(node1.GetYunikornAllocations()))
assert.Assert(t, node1.GetAllocation(foreignAlloc1) != nil)
occupied := node1.GetOccupiedResource().Clone()
available := node1.GetAvailableResource().Clone()
allocated := node1.GetAllocatedResource().Clone()
assert.Assert(t, resources.Equals(occupied, allocRes), "occupied resources has been calculated incorrectly")
assert.Assert(t, resources.Equals(allocated, resources.Zero), "allocated resources has changed")
expectedAvailable := resources.NewResourceFromMap(map[string]resources.Quantity{"first": 9})
assert.Assert(t, resources.Equals(available, expectedAvailable), "available resources has been calculated incorrectly")

// update resources
updatedRes := resources.NewResourceFromMap(map[string]resources.Quantity{"first": 2})
update := newForeignAllocation(foreignAlloc1, nodeID1, updatedRes)
reqCreated, allocCreated, err = partition.UpdateAllocation(update)
assert.Assert(t, !reqCreated)
assert.Assert(t, !allocCreated)
assert.NilError(t, err)
updatedOccupied := node1.GetOccupiedResource().Clone()
updatedAvailable := node1.GetAvailableResource().Clone()
updatedAllocated := node1.GetAllocatedResource().Clone()
assert.Assert(t, resources.Equals(updatedOccupied, updatedRes), "occupied resources has been updated incorrectly")
assert.Assert(t, resources.Equals(updatedAllocated, resources.Zero), "allocated resources has changed")
expectedAvailable = resources.NewResourceFromMap(map[string]resources.Quantity{"first": 8})
assert.Assert(t, resources.Equals(updatedAvailable, expectedAvailable), "available resources has been updated incorrectly")

// remove allocation
released, confirmed := partition.removeAllocation(&si.AllocationRelease{
Expand All @@ -4739,6 +4768,7 @@ func TestForeignAllocation(t *testing.T) {
assert.Assert(t, released == nil)
assert.Assert(t, confirmed == nil)
assert.Equal(t, 0, len(partition.foreignAllocs))
assert.Equal(t, 0, len(partition.foreignNodes))
assert.Equal(t, 0, len(node1.GetYunikornAllocations()))
assert.Assert(t, node1.GetAllocation(foreignAlloc1) == nil)
}
5 changes: 3 additions & 2 deletions pkg/scheduler/utilities_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -610,7 +610,7 @@ func newForeignRequest(allocKey string) *objects.Allocation {
})
}

func newForeignAllocation(allocKey, nodeID string) *objects.Allocation {
func newForeignAllocation(allocKey, nodeID string, allocated *resources.Resource) *objects.Allocation {
var alloc *objects.Allocation
defer func() {
if nodeID == "" {
Expand All @@ -626,7 +626,8 @@ func newForeignAllocation(allocKey, nodeID string) *objects.Allocation {
AllocationTags: map[string]string{
siCommon.Foreign: siCommon.AllocTypeDefault,
},
NodeID: id,
NodeID: id,
ResourcePerAlloc: allocated.ToProto(),
})
return alloc
}
Expand Down

0 comments on commit e16451a

Please sign in to comment.