Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[YUNIKORN-2928] [core] Update foreign pod resource usage #990

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/scheduler/health_checker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ func TestGetSchedulerHealthStatusContext(t *testing.T) {
assert.Assert(t, healthInfo.HealthChecks[8].Succeeded, "The orphan allocation check on the app still fails after removing the app")

// check that foreign allocation does not interfere with health check
falloc := newForeignAllocation("foreign-1", "node")
falloc := newForeignAllocation("foreign-1", "node", resources.Zero)
node.AddAllocation(falloc)
healthInfo = GetSchedulerHealthStatus(schedulerMetrics, schedulerContext)
assert.Assert(t, healthInfo.HealthChecks[7].Succeeded, "Foreign allocation was detected as orphan")
Expand Down
25 changes: 25 additions & 0 deletions pkg/scheduler/objects/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -364,6 +364,31 @@ func (sn *Node) AddAllocation(alloc *Allocation) {
_ = sn.addAllocationInternal(alloc, true)
}

// UpdateForeignAllocation updates a foreign allocation and re-calculates the available/occupied resources
func (sn *Node) UpdateForeignAllocation(alloc *Allocation) *Allocation {
sn.Lock()
defer sn.Unlock()
key := alloc.GetAllocationKey()
existing := sn.allocations[key]
sn.allocations[key] = alloc
if existing == nil {
log.Log(log.SchedNode).Debug("unknown allocation to update",
zap.String("allocationKey", key))
return nil
}

existingResource := existing.GetAllocatedResource().Clone()
newResource := alloc.GetAllocatedResource().Clone()
delta := resources.Sub(newResource, existingResource)
delta.Prune()

sn.occupiedResource.AddTo(delta)
sn.occupiedResource.Prune()
sn.refreshAvailableResource()

return existing
}

func (sn *Node) addAllocationInternal(alloc *Allocation, force bool) bool {
if alloc == nil {
return false
Expand Down
24 changes: 24 additions & 0 deletions pkg/scheduler/objects/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -956,3 +956,27 @@ func TestGetAllocations(t *testing.T) {
assert.Assert(t, m[foreignAlloc2])
})
}

func TestUpdateForeignAllocation(t *testing.T) {
node := newNode(nodeID1, map[string]resources.Quantity{"first": 100, "second": 200})
allocRes := resources.NewResourceFromMap(map[string]resources.Quantity{"first": 10, "second": 20})
alloc := newForeignAllocation(foreignAlloc1, nodeID1, allocRes)
node.AddAllocation(alloc)

// update existing allocation
updatedRes := resources.NewResourceFromMap(map[string]resources.Quantity{"first": 15, "second": 0})
allocUpd := newForeignAllocation(foreignAlloc1, nodeID1, updatedRes)
prev := node.UpdateForeignAllocation(allocUpd)
expectedOccupied := resources.NewResourceFromMap(map[string]resources.Quantity{"first": 15})
expectedAvailable := resources.NewResourceFromMap(map[string]resources.Quantity{"first": 85, "second": 200})
assert.Assert(t, prev == alloc, "returned previous allocation is different")
assert.Assert(t, resources.Equals(node.GetOccupiedResource(), expectedOccupied), "occupied resource has been updated incorrectly")
assert.Assert(t, resources.Equals(node.GetAllocatedResource(), resources.Zero), "allocated resource has changed")
assert.Assert(t, resources.Equals(node.GetAvailableResource(), expectedAvailable), "avaiable resource has been updated incorrectly")

// update non-existing allocation
alloc2 := newForeignAllocation(foreignAlloc2, nodeID1, allocRes)
prev = node.UpdateForeignAllocation(alloc2)
assert.Assert(t, prev == nil, "unexpected previous allocation returned")
assert.Assert(t, node.GetAllocation(foreignAlloc2) == alloc2, "foreign allocation not found")
}
43 changes: 26 additions & 17 deletions pkg/scheduler/partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ type PartitionContext struct {
reservations int // number of reservations
placeholderAllocations int // number of placeholder allocations
preemptionEnabled bool // whether preemption is enabled or not
foreignAllocs map[string]string // allocKey-nodeID assignment of non-Yunikorn allocations
foreignAllocs map[string]*objects.Allocation // foreign (non-Yunikorn) allocations

// The partition write lock must not be held while manipulating an application.
// Scheduling is running continuously as a lock free background task. Scheduling an application
Expand Down Expand Up @@ -95,7 +95,7 @@ func newPartitionContext(conf configs.PartitionConfig, rmID string, cc *ClusterC
applications: make(map[string]*objects.Application),
completedApplications: make(map[string]*objects.Application),
nodes: objects.NewNodeCollection(conf.Name),
foreignAllocs: make(map[string]string),
foreignAllocs: make(map[string]*objects.Allocation),
}
pc.partitionManager = newPartitionManager(pc, cc)
if err := pc.initialPartitionFromConfig(conf); err != nil {
Expand Down Expand Up @@ -1303,8 +1303,8 @@ func (pc *PartitionContext) handleForeignAllocation(allocationKey, applicationID
return false, false, fmt.Errorf("failed to find node %s for allocation %s", nodeID, allocationKey)
}

existingNodeID := pc.getOrSetNodeIDForAlloc(allocationKey, nodeID)
if existingNodeID == "" {
exists := pc.getOrStoreForeignAlloc(alloc)
if !exists {
log.Log(log.SchedPartition).Info("handling new foreign allocation",
zap.String("partitionName", pc.Name),
zap.String("nodeID", nodeID),
Expand All @@ -1317,7 +1317,12 @@ func (pc *PartitionContext) handleForeignAllocation(allocationKey, applicationID
zap.String("partitionName", pc.Name),
zap.String("appID", applicationID),
zap.String("allocationKey", allocationKey))
// this is a placeholder for eventual resource updates; nothing to do yet
prev := node.UpdateForeignAllocation(alloc)
if prev == nil {
log.Log(log.SchedPartition).Warn("BUG: previous allocation not found during update",
zap.String("allocationKey", allocationKey))
}

return false, false, nil
}

Expand All @@ -1327,16 +1332,17 @@ func (pc *PartitionContext) convertUGI(ugi *si.UserGroupInformation, forced bool
return pc.userGroupCache.ConvertUGI(ugi, forced)
}

// getOrSetNodeIDForAlloc returns the nodeID for a given foreign allocation, or sets is if it's unset
func (pc *PartitionContext) getOrSetNodeIDForAlloc(allocKey, nodeID string) string {
// getOrStoreForeignAlloc returns whether the allocation already exists or stores it if it's new
func (pc *PartitionContext) getOrStoreForeignAlloc(alloc *objects.Allocation) bool {
pc.Lock()
defer pc.Unlock()
id := pc.foreignAllocs[allocKey]
if id != "" {
return id
allocKey := alloc.GetAllocationKey()
existing := pc.foreignAllocs[allocKey]
if existing == nil {
pc.foreignAllocs[allocKey] = alloc
return false
}
pc.foreignAllocs[allocKey] = nodeID
return ""
return true
}

// calculate overall nodes resource usage and returns a map as the result,
Expand Down Expand Up @@ -1548,14 +1554,17 @@ func (pc *PartitionContext) removeAllocation(release *si.AllocationRelease) ([]*
}

func (pc *PartitionContext) removeForeignAllocation(allocID string) {
nodeID := pc.foreignAllocs[allocID]
if nodeID == "" {
pc.Lock()
defer pc.Unlock()
alloc := pc.foreignAllocs[allocID]
delete(pc.foreignAllocs, allocID)
if alloc == nil {
log.Log(log.SchedPartition).Debug("Tried to remove a non-existing foreign allocation",
zap.String("allocationID", allocID),
zap.String("nodeID", nodeID))
zap.String("allocationID", allocID))
return
}
delete(pc.foreignAllocs, allocID)

nodeID := alloc.GetNodeID()
node := pc.GetNode(nodeID)
if node == nil {
log.Log(log.SchedPartition).Debug("Node not found for foreign allocation",
Expand Down
73 changes: 62 additions & 11 deletions pkg/scheduler/partition_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4706,13 +4706,13 @@ func TestPlaceholderAllocationAndReplacementAfterRecovery(t *testing.T) {
assert.Equal(t, "tg-1", confirmed.GetTaskGroup())
}

func TestForeignAllocation(t *testing.T) {
func TestForeignAllocation(t *testing.T) { //nolint:funlen
setupUGM()
partition, err := newBasePartition()
assert.NilError(t, err, "partition create failed")
nodeRes := resources.NewResourceFromMap(map[string]resources.Quantity{"first": 10})
node1 := newNodeMaxResource(nodeID1, nodeRes)
err = partition.AddNode(node1)
node := newNodeMaxResource(nodeID1, nodeRes)
err = partition.AddNode(node)
assert.NilError(t, err)

// error: adding request (non-allocation)
Expand All @@ -4721,32 +4721,68 @@ func TestForeignAllocation(t *testing.T) {
assert.Assert(t, !reqCreated)
assert.Assert(t, !allocCreated)
assert.Error(t, err, "trying to add a foreign request (non-allocation) foreign-nonalloc")
assert.Equal(t, 0, len(partition.foreignAllocs))

// error: empty node ID
req = newForeignAllocation(foreignAlloc1, "")
req = newForeignAllocation(foreignAlloc1, "", resources.Zero)
reqCreated, allocCreated, err = partition.UpdateAllocation(req)
assert.Assert(t, !reqCreated)
assert.Assert(t, !allocCreated)
assert.Error(t, err, "node ID is empty for allocation foreign-alloc-1")
assert.Equal(t, 0, len(partition.foreignAllocs))

// error: no node found
req = newForeignAllocation(foreignAlloc1, nodeID2)
req = newForeignAllocation(foreignAlloc1, nodeID2, resources.Zero)
reqCreated, allocCreated, err = partition.UpdateAllocation(req)
assert.Assert(t, !reqCreated)
assert.Assert(t, !allocCreated)
assert.Error(t, err, "failed to find node node-2 for allocation foreign-alloc-1")
assert.Equal(t, 0, len(partition.foreignAllocs))

// add new allocation
req = newForeignAllocation(foreignAlloc1, nodeID1)
allocRes := resources.NewResourceFromMap(map[string]resources.Quantity{"first": 1})
req = newForeignAllocation(foreignAlloc1, nodeID1, allocRes)
reqCreated, allocCreated, err = partition.UpdateAllocation(req)
assert.Assert(t, !reqCreated)
assert.Assert(t, allocCreated)
assert.NilError(t, err)
assert.Equal(t, 1, len(partition.foreignAllocs))
assert.Equal(t, nodeID1, partition.foreignAllocs[foreignAlloc1])
assert.Equal(t, 0, len(node1.GetYunikornAllocations()))
assert.Assert(t, node1.GetAllocation(foreignAlloc1) != nil)
assert.Equal(t, 0, len(node.GetYunikornAllocations()))
assert.Assert(t, node.GetAllocation(foreignAlloc1) != nil)
occupied := node.GetOccupiedResource().Clone()
available := node.GetAvailableResource().Clone()
allocated := node.GetAllocatedResource().Clone()
assert.Assert(t, resources.Equals(occupied, allocRes), "occupied resources has been calculated incorrectly")
assert.Assert(t, resources.Equals(allocated, resources.Zero), "allocated resources has changed")
expectedAvailable := resources.NewResourceFromMap(map[string]resources.Quantity{"first": 9})
assert.Assert(t, resources.Equals(available, expectedAvailable), "available resources has been calculated incorrectly")

// update resources
updatedRes := resources.NewResourceFromMap(map[string]resources.Quantity{"first": 2})
update := newForeignAllocation(foreignAlloc1, nodeID1, updatedRes)
reqCreated, allocCreated, err = partition.UpdateAllocation(update)
assert.Assert(t, !reqCreated)
assert.Assert(t, !allocCreated)
assert.NilError(t, err)
updatedOccupied := node.GetOccupiedResource().Clone()
updatedAvailable := node.GetAvailableResource().Clone()
updatedAllocated := node.GetAllocatedResource().Clone()
assert.Assert(t, resources.Equals(updatedOccupied, updatedRes), "occupied resources has been updated incorrectly")
assert.Assert(t, resources.Equals(updatedAllocated, resources.Zero), "allocated resources has changed")
expectedAvailable = resources.NewResourceFromMap(map[string]resources.Quantity{"first": 8})
assert.Assert(t, resources.Equals(updatedAvailable, expectedAvailable), "available resources has been updated incorrectly")

// simulate update error
node.RemoveAllocation(foreignAlloc1)
reqCreated, allocCreated, err = partition.UpdateAllocation(update) // should re-add alloc object to the node
assert.Assert(t, !reqCreated)
assert.Assert(t, !allocCreated)
assert.NilError(t, err)
assert.Assert(t, node.GetAllocation(foreignAlloc1) != nil)
assert.Assert(t, resources.Equals(updatedOccupied, updatedRes), "occupied resources has been updated incorrectly")
assert.Assert(t, resources.Equals(updatedAllocated, resources.Zero), "allocated resources has changed")
expectedAvailable = resources.NewResourceFromMap(map[string]resources.Quantity{"first": 8})
assert.Assert(t, resources.Equals(updatedAvailable, expectedAvailable), "available resources has been updated incorrectly")

// remove allocation
released, confirmed := partition.removeAllocation(&si.AllocationRelease{
Expand All @@ -4755,6 +4791,21 @@ func TestForeignAllocation(t *testing.T) {
assert.Assert(t, released == nil)
assert.Assert(t, confirmed == nil)
assert.Equal(t, 0, len(partition.foreignAllocs))
assert.Equal(t, 0, len(node1.GetYunikornAllocations()))
assert.Assert(t, node1.GetAllocation(foreignAlloc1) == nil)
assert.Equal(t, 0, len(node.GetYunikornAllocations()))
assert.Assert(t, node.GetAllocation(foreignAlloc1) == nil)

// add + simulate removal failure
req = newForeignAllocation(foreignAlloc2, nodeID1, allocRes)
reqCreated, allocCreated, err = partition.UpdateAllocation(req)
assert.NilError(t, err)
assert.Assert(t, !reqCreated)
assert.Assert(t, allocCreated)
partition.nodes.RemoveNode("node-1") // node gets removed
released, confirmed = partition.removeAllocation(&si.AllocationRelease{
AllocationKey: foreignAlloc2,
})
assert.Assert(t, released == nil)
assert.Assert(t, confirmed == nil)
assert.Equal(t, 0, len(partition.foreignAllocs))
assert.Equal(t, 0, len(node.GetYunikornAllocations()))
}
62 changes: 43 additions & 19 deletions pkg/scheduler/tests/operation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -564,7 +564,7 @@ partitions:
}
}

func TestForeignPodResourceUsage(t *testing.T) {
func TestForeignPodResourceUsage(t *testing.T) { //nolint:funlen
// Register RM
configData := `
partitions:
Expand Down Expand Up @@ -617,12 +617,10 @@ partitions:
waitForNewNode(t, context, "node-1:1234", "[rm:123]default", 1000)

assert.Equal(t, len(partitionInfo.GetNodes()), 1)
node1 := partitionInfo.GetNode("node-1:1234")
assert.Equal(t, int64(node1.GetCapacity().Resources[common.Memory]), int64(100))
schedulingNode1 := ms.scheduler.GetClusterContext().
GetNode("node-1:1234", "[rm:123]default")
assert.Equal(t, int64(schedulingNode1.GetAllocatedResource().Resources[common.Memory]), int64(0))
assert.Equal(t, int64(schedulingNode1.GetAvailableResource().Resources[common.Memory]), int64(100))
node := partitionInfo.GetNode("node-1:1234")
assert.Equal(t, int64(node.GetCapacity().Resources[common.Memory]), int64(100))
assert.Equal(t, int64(node.GetAllocatedResource().Resources[common.Memory]), int64(0))
assert.Equal(t, int64(node.GetAvailableResource().Resources[common.Memory]), int64(100))

// update node capacity - add foreign pod
res := resources.NewResourceFromMap(map[string]resources.Quantity{
Expand All @@ -643,12 +641,38 @@ partitions:
assert.NilError(t, err, "NodeRequest failed")
waitForAvailableNodeResource(t, ms.scheduler.GetClusterContext(), "[rm:123]default",
[]string{"node-1:1234"}, 20, 1000)
assert.Equal(t, int64(node1.GetCapacity().Resources[common.Memory]), int64(100))
assert.Equal(t, int64(node1.GetCapacity().Resources[common.CPU]), int64(10))
assert.Equal(t, int64(node1.GetOccupiedResource().Resources[common.Memory]), int64(80))
assert.Equal(t, int64(node1.GetOccupiedResource().Resources[common.CPU]), int64(5))
assert.Equal(t, int64(schedulingNode1.GetAllocatedResource().Resources[common.Memory]), int64(0))
assert.Equal(t, int64(schedulingNode1.GetAvailableResource().Resources[common.Memory]), int64(20))
assert.Equal(t, int64(node.GetCapacity().Resources[common.Memory]), int64(100))
assert.Equal(t, int64(node.GetCapacity().Resources[common.CPU]), int64(10))
assert.Equal(t, int64(node.GetOccupiedResource().Resources[common.Memory]), int64(80))
assert.Equal(t, int64(node.GetOccupiedResource().Resources[common.CPU]), int64(5))
assert.Equal(t, int64(node.GetAllocatedResource().Resources[common.Memory]), int64(0))
assert.Equal(t, int64(node.GetAvailableResource().Resources[common.Memory]), int64(20))

// update resource usage
resUpd := resources.NewResourceFromMap(map[string]resources.Quantity{
"memory": 50, "vcore": 10})
err = ms.proxy.UpdateAllocation(&si.AllocationRequest{
Allocations: []*si.Allocation{
{
AllocationKey: "foreignpod-1",
ResourcePerAlloc: resUpd.ToProto(),
AllocationTags: map[string]string{
common.Foreign: common.AllocTypeDefault,
},
NodeID: "node-1:1234",
},
},
RmID: "rm:123",
})
assert.NilError(t, err)
waitForAvailableNodeResource(t, ms.scheduler.GetClusterContext(), "[rm:123]default",
[]string{"node-1:1234"}, 50, 1000)
assert.Equal(t, int64(node.GetCapacity().Resources[common.Memory]), int64(100))
assert.Equal(t, int64(node.GetCapacity().Resources[common.CPU]), int64(10))
assert.Equal(t, int64(node.GetOccupiedResource().Resources[common.Memory]), int64(50))
assert.Equal(t, int64(node.GetOccupiedResource().Resources[common.CPU]), int64(10))
assert.Equal(t, int64(node.GetAllocatedResource().Resources[common.Memory]), int64(0))
assert.Equal(t, int64(node.GetAvailableResource().Resources[common.Memory]), int64(50))

// update node capacity - remove foreign pod
err = ms.proxy.UpdateAllocation(&si.AllocationRequest{
Expand All @@ -664,10 +688,10 @@ partitions:
assert.NilError(t, err)
waitForAvailableNodeResource(t, ms.scheduler.GetClusterContext(), "[rm:123]default",
[]string{"node-1:1234"}, 100, 1000)
assert.Equal(t, int64(node1.GetCapacity().Resources[common.Memory]), int64(100))
assert.Equal(t, int64(node1.GetCapacity().Resources[common.CPU]), int64(10))
assert.Equal(t, int64(node1.GetOccupiedResource().Resources[common.Memory]), int64(0))
assert.Equal(t, int64(node1.GetOccupiedResource().Resources[common.CPU]), int64(0))
assert.Equal(t, int64(schedulingNode1.GetAllocatedResource().Resources[common.Memory]), int64(0))
assert.Equal(t, int64(schedulingNode1.GetAvailableResource().Resources[common.Memory]), int64(100))
assert.Equal(t, int64(node.GetCapacity().Resources[common.Memory]), int64(100))
assert.Equal(t, int64(node.GetCapacity().Resources[common.CPU]), int64(10))
assert.Equal(t, int64(node.GetOccupiedResource().Resources[common.Memory]), int64(0))
assert.Equal(t, int64(node.GetOccupiedResource().Resources[common.CPU]), int64(0))
assert.Equal(t, int64(node.GetAllocatedResource().Resources[common.Memory]), int64(0))
assert.Equal(t, int64(node.GetAvailableResource().Resources[common.Memory]), int64(100))
}
6 changes: 4 additions & 2 deletions pkg/scheduler/utilities_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ const (
maxresources = "maxresources"
maxapplications = "maxapplications"
foreignAlloc1 = "foreign-alloc-1"
foreignAlloc2 = "foreign-alloc-2"
)

func newBasePartitionNoRootDefault() (*PartitionContext, error) {
Expand Down Expand Up @@ -610,7 +611,7 @@ func newForeignRequest(allocKey string) *objects.Allocation {
})
}

func newForeignAllocation(allocKey, nodeID string) *objects.Allocation {
func newForeignAllocation(allocKey, nodeID string, allocated *resources.Resource) *objects.Allocation {
var alloc *objects.Allocation
defer func() {
if nodeID == "" {
Expand All @@ -626,7 +627,8 @@ func newForeignAllocation(allocKey, nodeID string) *objects.Allocation {
AllocationTags: map[string]string{
siCommon.Foreign: siCommon.AllocTypeDefault,
},
NodeID: id,
NodeID: id,
ResourcePerAlloc: allocated.ToProto(),
})
return alloc
}
Expand Down
Loading