From eb77f9ce6089c85ba6cb4536692c08aa437f30a9 Mon Sep 17 00:00:00 2001 From: Peter Bacsko Date: Fri, 16 Aug 2024 17:44:53 +0200 Subject: [PATCH] [YUNIKORN-2832] [core] Add non-Yunikorn tracking logic --- go.mod | 2 +- go.sum | 4 +- pkg/scheduler/context.go | 2 +- pkg/scheduler/context_test.go | 100 +++++++++++++++++++- pkg/scheduler/objects/allocation.go | 34 +++++++ pkg/scheduler/objects/allocation_test.go | 32 +++++++ pkg/scheduler/objects/node.go | 52 +++++++++-- pkg/scheduler/objects/node_test.go | 92 +++++++++++++++++- pkg/scheduler/objects/utilities_test.go | 33 +++++-- pkg/scheduler/partition.go | 77 ++++++++++++++- pkg/scheduler/partition_test.go | 61 +++++++++++- pkg/scheduler/tests/operation_test.go | 114 +++++++++++++++++++++-- pkg/scheduler/utilities_test.go | 23 +++++ pkg/webservice/dao/allocation_info.go | 9 ++ pkg/webservice/dao/node_info.go | 27 +++--- pkg/webservice/handlers.go | 48 +++++++--- pkg/webservice/handlers_test.go | 43 ++++++++- 17 files changed, 686 insertions(+), 67 deletions(-) diff --git a/go.mod b/go.mod index 16d56008e..26e243e83 100644 --- a/go.mod +++ b/go.mod @@ -22,7 +22,7 @@ module github.com/apache/yunikorn-core go 1.21 require ( - github.com/apache/yunikorn-scheduler-interface v0.0.0-20240827015655-68e8c6cca28a + github.com/apache/yunikorn-scheduler-interface v0.0.0-20240924203603-aaf51c93d3a0 github.com/google/btree v1.1.2 github.com/google/go-cmp v0.6.0 github.com/google/uuid v1.6.0 diff --git a/go.sum b/go.sum index 61b3fa66a..026aa4f79 100644 --- a/go.sum +++ b/go.sum @@ -1,5 +1,5 @@ -github.com/apache/yunikorn-scheduler-interface v0.0.0-20240827015655-68e8c6cca28a h1:3WRXGTvhunGBZj8AVZDxx7Bs/AXiH9mvf2jYcuDyklA= -github.com/apache/yunikorn-scheduler-interface v0.0.0-20240827015655-68e8c6cca28a/go.mod h1:co3uU98sj1CUTPNTM13lTyi+CY0DOgDndDW2KiUjktU= +github.com/apache/yunikorn-scheduler-interface v0.0.0-20240924203603-aaf51c93d3a0 h1:/9j0YXuifvoOl4YVEbO0r+DPkkYLzaQ+/ac+xCc7SY8= +github.com/apache/yunikorn-scheduler-interface v0.0.0-20240924203603-aaf51c93d3a0/go.mod h1:co3uU98sj1CUTPNTM13lTyi+CY0DOgDndDW2KiUjktU= github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= diff --git a/pkg/scheduler/context.go b/pkg/scheduler/context.go index b0df469a8..9a22b41e9 100644 --- a/pkg/scheduler/context.go +++ b/pkg/scheduler/context.go @@ -759,7 +759,7 @@ func (cc *ClusterContext) processAllocations(request *si.AllocationRequest) { continue } // at some point, we may need to handle new requests as well - if newAlloc { + if newAlloc && !alloc.IsForeign() { cc.notifyRMNewAllocation(request.RmID, alloc) } } diff --git a/pkg/scheduler/context_test.go b/pkg/scheduler/context_test.go index 075ab5630..ea9d34145 100644 --- a/pkg/scheduler/context_test.go +++ b/pkg/scheduler/context_test.go @@ -37,9 +37,10 @@ import ( const pName = "default" type mockEventHandler struct { - eventHandled bool - rejectedNodes []*si.RejectedNode - acceptedNodes []*si.AcceptedNode + eventHandled bool + rejectedNodes []*si.RejectedNode + acceptedNodes []*si.AcceptedNode + newAllocHandler func(*rmevent.RMNewAllocationsEvent) } func newMockEventHandler() *mockEventHandler { @@ -56,6 +57,10 @@ func (m *mockEventHandler) HandleEvent(ev interface{}) { m.rejectedNodes = append(m.rejectedNodes, nodeEvent.RejectedNodes...) m.acceptedNodes = append(m.acceptedNodes, nodeEvent.AcceptedNodes...) } + + if allocEvent, ok := ev.(*rmevent.RMNewAllocationsEvent); ok && m.newAllocHandler != nil { + m.newAllocHandler(allocEvent) + } } func createTestContext(t *testing.T, partitionName string) *ClusterContext { @@ -71,7 +76,13 @@ func createTestContext(t *testing.T, partitionName string) *ClusterContext { Name: "root", Parent: true, SubmitACL: "*", - Queues: nil, + Queues: []configs.QueueConfig{ + { + Name: "default", + Parent: false, + SubmitACL: "*", + }, + }, }, }, } @@ -296,6 +307,87 @@ func TestContextDrainingNodeBackToSchedulableMetrics(t *testing.T) { verifyMetrics(t, 0, "draining") } +func TestContext_OnAllocationNotification(t *testing.T) { + context := createTestContext(t, pName) + eventHandler := context.rmEventHandler.(*mockEventHandler) //nolint:errcheck + var lastAllocEvent *rmevent.RMNewAllocationsEvent + eventHandler.newAllocHandler = func(event *rmevent.RMNewAllocationsEvent) { + lastAllocEvent = event + go func() { + event.Channel <- &rmevent.Result{Succeeded: true} + }() + } + + n := getNodeInfoForAddingNode() + err := context.addNode(n, true) + assert.NilError(t, err, "unexpected error returned from addNode") + partition := context.GetPartition(pName) + assert.Assert(t, partition != nil) + assert.Equal(t, 1, len(partition.GetNodes()), "expected node not found on partition") + + // register application + appReq := &si.ApplicationRequest{ + New: []*si.AddApplicationRequest{ + { + QueueName: defQueue, + PartitionName: pName, + Ugi: &si.UserGroupInformation{ + User: "testuser", + Groups: []string{"testgroup"}, + }, + ApplicationID: appID1, + }, + }, + RmID: "rm:123", + } + context.handleRMUpdateApplicationEvent(&rmevent.RMUpdateApplicationEvent{Request: appReq}) + + // add a Yunikorn allocation + allocReq := &si.AllocationRequest{ + Allocations: []*si.Allocation{ + { + AllocationKey: allocKey, + ResourcePerAlloc: &si.Resource{ + Resources: map[string]*si.Quantity{ + "first": {Value: 1}, + }, + }, + ApplicationID: appID1, + NodeID: "test-1", + PartitionName: pName, + }, + }, + RmID: "rm:123", + } + context.handleRMUpdateAllocationEvent(&rmevent.RMUpdateAllocationEvent{Request: allocReq}) + assert.Assert(t, lastAllocEvent != nil) + assert.Equal(t, lastAllocEvent.Allocations[0].AllocationKey, allocKey) + + // add a non-Yunikorn allocation + lastAllocEvent = nil + nonYkAllocReq := &si.AllocationRequest{ + Allocations: []*si.Allocation{ + { + AllocationKey: "foreign-alloc-1", + ResourcePerAlloc: &si.Resource{ + Resources: map[string]*si.Quantity{ + "first": {Value: 1}, + }, + }, + AllocationTags: map[string]string{ + siCommon.Foreign: siCommon.AllocTypeDefault, + }, + NodeID: "test-1", + PartitionName: pName, + }, + }, + RmID: "rm:123", + } + + context.handleRMUpdateAllocationEvent(&rmevent.RMUpdateAllocationEvent{Request: nonYkAllocReq}) + assert.Assert(t, lastAllocEvent == nil, "unexpected allocation event") +} + func getNodeInfoForAddingNode() *si.NodeInfo { n := &si.NodeInfo{ NodeID: "test-1", diff --git a/pkg/scheduler/objects/allocation.go b/pkg/scheduler/objects/allocation.go index 8f2450e9c..a243edcf1 100644 --- a/pkg/scheduler/objects/allocation.go +++ b/pkg/scheduler/objects/allocation.go @@ -49,6 +49,8 @@ type Allocation struct { originator bool tags map[string]string resKeyWithoutNode string // the reservation key without node + foreign bool + preemptable bool // Mutable fields which need protection allocated bool @@ -106,6 +108,20 @@ func NewAllocationFromSI(alloc *si.Allocation) *Allocation { createTime = time.Unix(siCreationTime, 0) } + foreign := false + preemptable := true + if foreignType, ok := alloc.AllocationTags[siCommon.Foreign]; ok { + foreign = true + switch foreignType { + case siCommon.AllocTypeStatic: + preemptable = false + case siCommon.AllocTypeDefault: + default: + log.Log(log.SchedAllocation).Warn("Foreign tag has illegal value, using default", + zap.String("value", foreignType)) + } + } + var allocated bool var nodeID string var bindTime time.Time @@ -135,9 +151,19 @@ func NewAllocationFromSI(alloc *si.Allocation) *Allocation { allocated: allocated, nodeID: nodeID, bindTime: bindTime, + foreign: foreign, + preemptable: preemptable, } } +// NewAllocationFromSIAllocated creates an Allocation where the "allocated" flag is always true, +// regardless whehether the NodeID if empty or not. Used for testing. +func NewAllocationFromSIAllocated(siAlloc *si.Allocation) *Allocation { + alloc := NewAllocationFromSI(siAlloc) + alloc.allocated = true + return alloc +} + // NewSIFromAllocation converts the Allocation into a SI object. This is a limited set of values that gets copied into // the SI. This is only used to communicate *back* to the RM. All other fields are considered incoming fields from // the RM into the core. The limited set of fields link the Allocation to an Application and Node. @@ -573,3 +599,11 @@ func (a *Allocation) setUserQuotaCheckPassed() { a.askEvents.SendRequestFitsInUserQuota(a.allocationKey, a.applicationID, a.allocatedResource) } } + +func (a *Allocation) IsForeign() bool { + return a.foreign +} + +func (a *Allocation) IsPreemptable() bool { + return a.preemptable +} diff --git a/pkg/scheduler/objects/allocation_test.go b/pkg/scheduler/objects/allocation_test.go index 99a1593ad..f687a858e 100644 --- a/pkg/scheduler/objects/allocation_test.go +++ b/pkg/scheduler/objects/allocation_test.go @@ -466,3 +466,35 @@ func TestNewAllocFromSI(t *testing.T) { assert.Assert(t, !alloc.IsAllowPreemptSelf(), "alloc should not have allow-preempt-self set") assert.Assert(t, !alloc.IsAllowPreemptOther(), "alloc should not have allow-preempt-other set") } + +func TestNewForeignAllocFromSI(t *testing.T) { + res := resources.NewResourceFromMap(map[string]resources.Quantity{ + "first": 1, + }) + siAlloc := &si.Allocation{ + AllocationKey: "foreign-1", + NodeID: "node-1", + ResourcePerAlloc: res.ToProto(), + TaskGroupName: "", + AllocationTags: map[string]string{ + siCommon.Foreign: siCommon.AllocTypeDefault, + }, + } + + alloc := NewAllocationFromSI(siAlloc) + assert.Assert(t, alloc.IsPreemptable()) + assert.Assert(t, alloc.IsForeign()) + assert.Equal(t, "foreign-1", alloc.GetAllocationKey()) + assert.Equal(t, "node-1", alloc.GetNodeID()) + assert.Assert(t, resources.Equals(res, alloc.GetAllocatedResource())) + + // static allocation + siAlloc.AllocationTags[siCommon.Foreign] = siCommon.AllocTypeStatic + alloc = NewAllocationFromSI(siAlloc) + assert.Assert(t, !alloc.IsPreemptable()) + + // illegal value for foreign type + siAlloc.AllocationTags[siCommon.Foreign] = "xyz" + alloc = NewAllocationFromSI(siAlloc) + assert.Assert(t, alloc.IsPreemptable()) +} diff --git a/pkg/scheduler/objects/node.go b/pkg/scheduler/objects/node.go index d45063202..9abfffdc1 100644 --- a/pkg/scheduler/objects/node.go +++ b/pkg/scheduler/objects/node.go @@ -243,6 +243,31 @@ func (sn *Node) GetAllAllocations() []*Allocation { return arr } +// GetYunikornAllocations returns a copy of Yunikorn allocations on this node +func (sn *Node) GetYunikornAllocations() []*Allocation { + sn.RLock() + defer sn.RUnlock() + return sn.getAllocations(false) +} + +// GetForeignAllocations returns a copy of non-Yunikorn allocations on this node +func (sn *Node) GetForeignAllocations() []*Allocation { + sn.RLock() + defer sn.RUnlock() + return sn.getAllocations(true) +} + +func (sn *Node) getAllocations(foreign bool) []*Allocation { + arr := make([]*Allocation, 0) + for _, v := range sn.allocations { + if (v.IsForeign() && foreign) || (!v.IsForeign() && !foreign) { + arr = append(arr, v) + } + } + + return arr +} + // Set the node to unschedulable. // This will cause the node to be skipped during the scheduling cycle. // Visible for testing only @@ -312,15 +337,24 @@ func (sn *Node) FitInNode(resRequest *resources.Resource) bool { // is found the Allocation removed is returned. Used resources will decrease available // will increase as per the allocation removed. func (sn *Node) RemoveAllocation(allocationKey string) *Allocation { - defer sn.notifyListeners() + var alloc *Allocation + defer func() { + if alloc != nil && !alloc.IsForeign() { + sn.notifyListeners() + } + }() sn.Lock() defer sn.Unlock() - alloc := sn.allocations[allocationKey] + alloc = sn.allocations[allocationKey] if alloc != nil { delete(sn.allocations, allocationKey) - sn.allocatedResource.SubFrom(alloc.GetAllocatedResource()) - sn.allocatedResource.Prune() + if alloc.IsForeign() { + sn.occupiedResource = resources.Sub(sn.occupiedResource, alloc.GetAllocatedResource()) + } else { + sn.allocatedResource.SubFrom(alloc.GetAllocatedResource()) + sn.allocatedResource.Prune() + } sn.availableResource.AddTo(alloc.GetAllocatedResource()) sn.nodeEvents.SendAllocationRemovedEvent(sn.NodeID, alloc.allocationKey, alloc.GetAllocatedResource()) return alloc @@ -348,20 +382,26 @@ func (sn *Node) addAllocationInternal(alloc *Allocation, force bool) bool { return false } result := false + foreign := alloc.IsForeign() defer func() { // check result to ensure we don't notify listeners unnecessarily - if result { + if result && !foreign { sn.notifyListeners() } }() sn.Lock() defer sn.Unlock() + if foreign { + sn.occupiedResource = resources.Add(sn.occupiedResource, alloc.GetAllocatedResource()) + } // check if this still fits: it might have changed since pre-check res := alloc.GetAllocatedResource() if force || sn.availableResource.FitIn(res) { sn.allocations[alloc.GetAllocationKey()] = alloc - sn.allocatedResource.AddTo(res) + if !foreign { + sn.allocatedResource.AddTo(res) + } sn.availableResource.SubFrom(res) sn.availableResource.Prune() sn.nodeEvents.SendAllocationAddedEvent(sn.NodeID, alloc.allocationKey, res) diff --git a/pkg/scheduler/objects/node_test.go b/pkg/scheduler/objects/node_test.go index c64fa76a4..f24291eae 100644 --- a/pkg/scheduler/objects/node_test.go +++ b/pkg/scheduler/objects/node_test.go @@ -579,7 +579,7 @@ func TestGetAllocation(t *testing.T) { } } -func TestGetAllocations(t *testing.T) { +func TestGetAllAllocations(t *testing.T) { node := newNode("node-123", map[string]resources.Quantity{"first": 100, "second": 200}) if !resources.IsZero(node.GetAllocatedResource()) { t.Fatal("Failed to initialize resource") @@ -839,3 +839,93 @@ func TestPreconditions(t *testing.T) { assert.NilError(t, err) assert.Equal(t, 1, len(ask.allocLog)) } + +func TestForeignAllocation(t *testing.T) { + total := resources.NewResourceFromMap(map[string]resources.Quantity{"cpu": 100, "memory": 100}) + proto := newProto(testNode, total, nil, map[string]string{ + "ready": "true", + }) + node := NewNode(proto) + + listener := &testListener{} + node.AddListener(listener) + + // add foreign allocation + res := resources.NewResourceFromMap(map[string]resources.Quantity{"cpu": 10, "memory": 5}) + alloc := newForeignAllocation("foreign-1", testNode) + alloc.allocatedResource = res.Clone() + node.AddAllocation(alloc) + assert.Assert(t, resources.Equals(res, node.GetOccupiedResource())) + newTotal := resources.NewResourceFromMap(map[string]resources.Quantity{"cpu": 90, "memory": 95}) + assert.Assert(t, resources.Equals(newTotal, node.GetAvailableResource())) + assert.Assert(t, resources.Equals(resources.Zero, node.GetAllocatedResource())) + assert.Equal(t, 0, listener.updateCount) + + // remove foreign allocation + node.RemoveAllocation("foreign-1") + assert.Assert(t, resources.Equals(resources.Zero, node.GetOccupiedResource())) + assert.Assert(t, resources.Equals(total, node.GetAvailableResource())) + assert.Assert(t, resources.Equals(resources.Zero, node.GetAllocatedResource())) + assert.Equal(t, 0, listener.updateCount) +} + +func TestGetYunikornAllocations(t *testing.T) { + node := newNode(nodeID1, map[string]resources.Quantity{"first": 100, "second": 200}) + + // no allocations + assert.Equal(t, 0, len(node.GetYunikornAllocations())) + + alloc1 := newAllocationWithKey(aKey, appID1, nodeID1, nil) + alloc2 := newAllocationWithKey(aKey2, appID1, nodeID1, nil) + alloc3 := newForeignAllocation("foreign-1", nodeID1) + alloc4 := newForeignAllocation("foreign-2", nodeID1) + node.AddAllocation(alloc1) + node.AddAllocation(alloc2) + node.AddAllocation(alloc3) + node.AddAllocation(alloc4) + + ykAllocs := node.GetYunikornAllocations() + assert.Equal(t, 2, len(ykAllocs)) + m := map[string]bool{} + m[ykAllocs[0].allocationKey] = true + m[ykAllocs[1].allocationKey] = true + assert.Assert(t, m[aKey]) + assert.Assert(t, m[aKey2]) +} + +func TestGetAllocations(t *testing.T) { + node := newNode(nodeID1, map[string]resources.Quantity{"first": 100, "second": 200}) + + // no allocations + assert.Equal(t, 0, len(node.GetForeignAllocations())) + assert.Equal(t, 0, len(node.GetYunikornAllocations())) + + alloc1 := newAllocationWithKey(aKey, appID1, nodeID1, nil) + alloc2 := newAllocationWithKey(aKey2, appID1, nodeID1, nil) + alloc3 := newForeignAllocation(foreignAlloc1, nodeID1) + alloc4 := newForeignAllocation(foreignAlloc2, nodeID1) + node.AddAllocation(alloc1) + node.AddAllocation(alloc2) + node.AddAllocation(alloc3) + node.AddAllocation(alloc4) + + t.Run("GetYunikornAllocations", func(t *testing.T) { + allocs := node.GetYunikornAllocations() + assert.Equal(t, 2, len(allocs)) + m := map[string]bool{} + m[allocs[0].allocationKey] = true + m[allocs[1].allocationKey] = true + assert.Assert(t, m[aKey]) + assert.Assert(t, m[aKey2]) + }) + + t.Run("GetForeignAllocations", func(t *testing.T) { + allocs := node.GetForeignAllocations() + assert.Equal(t, 2, len(allocs)) + m := map[string]bool{} + m[allocs[0].allocationKey] = true + m[allocs[1].allocationKey] = true + assert.Assert(t, m[foreignAlloc1]) + assert.Assert(t, m[foreignAlloc2]) + }) +} diff --git a/pkg/scheduler/objects/utilities_test.go b/pkg/scheduler/objects/utilities_test.go index d12d644ec..411563a9f 100644 --- a/pkg/scheduler/objects/utilities_test.go +++ b/pkg/scheduler/objects/utilities_test.go @@ -34,20 +34,23 @@ import ( "github.com/apache/yunikorn-core/pkg/rmproxy" schedEvt "github.com/apache/yunikorn-core/pkg/scheduler/objects/events" "github.com/apache/yunikorn-core/pkg/scheduler/ugm" + siCommon "github.com/apache/yunikorn-scheduler-interface/lib/go/common" "github.com/apache/yunikorn-scheduler-interface/lib/go/si" ) const ( - appID0 = "app-0" - appID1 = "app-1" - appID2 = "app-2" - appID3 = "app-3" - aKey = "alloc-1" - aKey2 = "alloc-2" - nodeID1 = "node-1" - nodeID2 = "node-2" - instType1 = "itype-1" - testgroup = "testgroup" + appID0 = "app-0" + appID1 = "app-1" + appID2 = "app-2" + appID3 = "app-3" + aKey = "alloc-1" + aKey2 = "alloc-2" + nodeID1 = "node-1" + nodeID2 = "node-2" + instType1 = "itype-1" + testgroup = "testgroup" + foreignAlloc1 = "foreign-1" + foreignAlloc2 = "foreign-2" ) // Create the root queue, base for all testing @@ -224,6 +227,16 @@ func newAllocationWithKey(allocKey, appID, nodeID string, res *resources.Resourc return newAllocationAll(allocKey, appID, nodeID, "", res, false, 0) } +func newForeignAllocation(allocKey, nodeID string) *Allocation { + return NewAllocationFromSI(&si.Allocation{ + AllocationKey: allocKey, + AllocationTags: map[string]string{ + siCommon.Foreign: siCommon.AllocTypeDefault, + }, + NodeID: nodeID, + }) +} + // Create a new Allocation with a random ask key func newPlaceholderAlloc(appID, nodeID string, res *resources.Resource, taskGroup string) *Allocation { allocKey := strconv.FormatInt((time.Now()).UnixNano(), 10) diff --git a/pkg/scheduler/partition.go b/pkg/scheduler/partition.go index 1acd5af23..f68a56e21 100644 --- a/pkg/scheduler/partition.go +++ b/pkg/scheduler/partition.go @@ -65,6 +65,7 @@ type PartitionContext struct { reservations int // number of reservations placeholderAllocations int // number of placeholder allocations preemptionEnabled bool // whether preemption is enabled or not + foreignAllocs map[string]string // allocKey-nodeID assignment of non-Yunikorn allocations // The partition write lock must not be held while manipulating an application. // Scheduling is running continuously as a lock free background task. Scheduling an application @@ -94,6 +95,7 @@ func newPartitionContext(conf configs.PartitionConfig, rmID string, cc *ClusterC applications: make(map[string]*objects.Application), completedApplications: make(map[string]*objects.Application), nodes: objects.NewNodeCollection(conf.Name), + foreignAllocs: make(map[string]string), } pc.partitionManager = newPartitionManager(pc, cc) if err := pc.initialPartitionFromConfig(conf); err != nil { @@ -1131,12 +1133,17 @@ func (pc *PartitionContext) UpdateAllocation(alloc *objects.Allocation) (request allocationKey := alloc.GetAllocationKey() applicationID := alloc.GetApplicationID() nodeID := alloc.GetNodeID() + node := pc.GetNode(alloc.GetNodeID()) log.Log(log.SchedPartition).Info("processing allocation", zap.String("partitionName", pc.Name), zap.String("appID", applicationID), zap.String("allocationKey", allocationKey)) + if alloc.IsForeign() { + return pc.handleForeignAllocation(allocationKey, applicationID, nodeID, node, alloc) + } + // find application app := pc.getApplication(alloc.GetApplicationID()) if app == nil { @@ -1146,10 +1153,8 @@ func (pc *PartitionContext) UpdateAllocation(alloc *objects.Allocation) (request queue := app.GetQueue() // find node if one is specified - var node *objects.Node = nil allocated := alloc.IsAllocated() if allocated { - node = pc.GetNode(alloc.GetNodeID()) if node == nil { metrics.GetSchedulerMetrics().IncSchedulingError() return false, false, fmt.Errorf("failed to find node %s", nodeID) @@ -1286,12 +1291,54 @@ func (pc *PartitionContext) UpdateAllocation(alloc *objects.Allocation) (request return false, false, nil } +func (pc *PartitionContext) handleForeignAllocation(allocationKey, applicationID, nodeID string, node *objects.Node, alloc *objects.Allocation) (requestCreated bool, allocCreated bool, err error) { + allocated := alloc.IsAllocated() + if !allocated { + return false, false, fmt.Errorf("trying to add a foreign request (non-allocation) %s", allocationKey) + } + if alloc.GetNodeID() == common.Empty { + return false, false, fmt.Errorf("node ID is empty for allocation %s", allocationKey) + } + if node == nil { + return false, false, fmt.Errorf("failed to find node %s for allocation %s", nodeID, allocationKey) + } + + existingNodeID := pc.getOrSetNodeIDForAlloc(allocationKey, nodeID) + if existingNodeID == common.Empty { + log.Log(log.SchedPartition).Info("handling new foreign allocation", + zap.String("partitionName", pc.Name), + zap.String("nodeID", nodeID), + zap.String("allocationKey", allocationKey)) + node.AddAllocation(alloc) + return false, true, nil + } + + log.Log(log.SchedPartition).Info("handling foreign allocation update", + zap.String("partitionName", pc.Name), + zap.String("appID", applicationID), + zap.String("allocationKey", allocationKey)) + // this is a placeholder for eventual resource updates; nothing to do yet + return false, false, nil +} + func (pc *PartitionContext) convertUGI(ugi *si.UserGroupInformation, forced bool) (security.UserGroup, error) { pc.RLock() defer pc.RUnlock() return pc.userGroupCache.ConvertUGI(ugi, forced) } +// getOrSetNodeIDForAlloc returns the nodeID for a given foreign allocation, or sets is if it's unset +func (pc *PartitionContext) getOrSetNodeIDForAlloc(allocKey, nodeID string) string { + pc.Lock() + defer pc.Unlock() + id := pc.foreignAllocs[allocKey] + if id != common.Empty { + return id + } + pc.foreignAllocs[allocKey] = nodeID + return common.Empty +} + // calculate overall nodes resource usage and returns a map as the result, // where the key is the resource name, e.g memory, and the value is a []int, // which is a slice with 10 elements, @@ -1373,6 +1420,10 @@ func (pc *PartitionContext) removeAllocation(release *si.AllocationRelease) ([]* } appID := release.ApplicationID allocationKey := release.GetAllocationKey() + if appID == "" { + pc.removeForeignAllocation(allocationKey) + return nil, nil + } app := pc.getApplication(appID) // no app nothing to do everything should already be clean if app == nil { @@ -1496,6 +1547,28 @@ func (pc *PartitionContext) removeAllocation(release *si.AllocationRelease) ([]* return released, confirmed } +func (pc *PartitionContext) removeForeignAllocation(allocID string) { + nodeID := pc.foreignAllocs[allocID] + if nodeID == "" { + log.Log(log.SchedPartition).Warn("Tried to remove a non-existing foreign allocation", + zap.String("allocationID", allocID), + zap.String("nodeID", nodeID)) + return + } + delete(pc.foreignAllocs, allocID) + node := pc.GetNode(nodeID) + if node == nil { + log.Log(log.SchedPartition).Warn("Node not found for foreign allocation", + zap.String("allocationID", allocID), + zap.String("nodeID", nodeID)) + return + } + log.Log(log.SchedPartition).Info("Removing foreign allocation", + zap.String("allocationID", allocID), + zap.String("nodeID", nodeID)) + node.RemoveAllocation(allocID) +} + func (pc *PartitionContext) GetCurrentState() string { return pc.stateMachine.Current() } diff --git a/pkg/scheduler/partition_test.go b/pkg/scheduler/partition_test.go index ed0365029..3a75e765e 100644 --- a/pkg/scheduler/partition_test.go +++ b/pkg/scheduler/partition_test.go @@ -366,15 +366,15 @@ func TestCalculateNodesResourceUsage(t *testing.T) { err = partition.AddNode(node) assert.NilError(t, err) - occupiedResources := resources.NewResourceFromMap(map[string]resources.Quantity{"first": 50}) - alloc := newAllocation("key", "appID", nodeID1, occupiedResources) + res := resources.NewResourceFromMap(map[string]resources.Quantity{"first": 50}) + alloc := newAllocation("key", "appID", nodeID1, res) node.AddAllocation(alloc) usageMap := partition.calculateNodesResourceUsage() assert.Equal(t, node.GetAvailableResource().Resources["first"], resources.Quantity(50)) assert.Equal(t, usageMap["first"][4], 1) - occupiedResources = resources.NewResourceFromMap(map[string]resources.Quantity{"first": 50}) - alloc = newAllocation("key", "appID", nodeID1, occupiedResources) + res = resources.NewResourceFromMap(map[string]resources.Quantity{"first": 50}) + alloc = newAllocation("key", "appID", nodeID1, res) node.AddAllocation(alloc) usageMap = partition.calculateNodesResourceUsage() assert.Equal(t, node.GetAvailableResource().Resources["first"], resources.Quantity(0)) @@ -4689,3 +4689,56 @@ func TestPlaceholderAllocationAndReplacementAfterRecovery(t *testing.T) { assert.Equal(t, "real-alloc", confirmed.GetAllocationKey()) assert.Equal(t, "tg-1", confirmed.GetTaskGroup()) } + +func TestForeignAllocation(t *testing.T) { + setupUGM() + partition, err := newBasePartition() + assert.NilError(t, err, "partition create failed") + nodeRes := resources.NewResourceFromMap(map[string]resources.Quantity{"first": 10}) + node1 := newNodeMaxResource(nodeID1, nodeRes) + err = partition.AddNode(node1) + assert.NilError(t, err) + + // error: adding request (non-allocation) + req := newForeignRequest("foreign-nonalloc") + reqCreated, allocCreated, err := partition.UpdateAllocation(req) + assert.Assert(t, !reqCreated) + assert.Assert(t, !allocCreated) + assert.Error(t, err, "trying to add a foreign request (non-allocation) foreign-nonalloc") + + // error: empty node ID + req = newForeignAllocation(foreignAlloc1, common.Empty) + reqCreated, allocCreated, err = partition.UpdateAllocation(req) + assert.Assert(t, !reqCreated) + assert.Assert(t, !allocCreated) + assert.Error(t, err, "node ID is empty for allocation foreign-alloc-1") + + // error: no node found + req = newForeignAllocation(foreignAlloc1, nodeID2) + reqCreated, allocCreated, err = partition.UpdateAllocation(req) + assert.Assert(t, !reqCreated) + assert.Assert(t, !allocCreated) + assert.Error(t, err, "failed to find node node-2 for allocation foreign-alloc-1") + assert.Equal(t, 0, len(partition.foreignAllocs)) + + // add new allocation + req = newForeignAllocation(foreignAlloc1, nodeID1) + reqCreated, allocCreated, err = partition.UpdateAllocation(req) + assert.Assert(t, !reqCreated) + assert.Assert(t, allocCreated) + assert.NilError(t, err) + assert.Equal(t, 1, len(partition.foreignAllocs)) + assert.Equal(t, nodeID1, partition.foreignAllocs[foreignAlloc1]) + assert.Equal(t, 1, len(node1.GetAllAllocations())) + assert.Assert(t, node1.GetAllocation(foreignAlloc1) != nil) + + // remove allocation + released, confirmed := partition.removeAllocation(&si.AllocationRelease{ + AllocationKey: foreignAlloc1, + }) + assert.Assert(t, released == nil) + assert.Assert(t, confirmed == nil) + assert.Equal(t, 0, len(partition.foreignAllocs)) + assert.Equal(t, 0, len(node1.GetAllAllocations())) + assert.Assert(t, node1.GetAllocation(foreignAlloc1) == nil) +} diff --git a/pkg/scheduler/tests/operation_test.go b/pkg/scheduler/tests/operation_test.go index 98e3e7425..5018858d1 100644 --- a/pkg/scheduler/tests/operation_test.go +++ b/pkg/scheduler/tests/operation_test.go @@ -583,14 +583,11 @@ partitions: // Start all tests ms := &mockScheduler{} defer ms.Stop() - err := ms.Init(configData, false, false) assert.NilError(t, err, "RegisterResourceManager failed") - // Check queues of cache and scheduler. partitionInfo := ms.scheduler.GetClusterContext().GetPartition("[rm:123]default") assert.Assert(t, partitionInfo.GetTotalPartitionResource() == nil, "partition info max resource nil") - // Register a node err = ms.proxy.UpdateNode(&si.NodeRequest{ Nodes: []*si.NodeInfo{ @@ -608,14 +605,11 @@ partitions: }, RmID: "rm:123", }) - assert.NilError(t, err, "NodeRequest failed") - // Wait until node is registered context := ms.scheduler.GetClusterContext() ms.mockRM.waitForAcceptedNode(t, "node-1:1234", 1000) waitForNewNode(t, context, "node-1:1234", "[rm:123]default", 1000) - // verify node capacity assert.Equal(t, len(partitionInfo.GetNodes()), 1) node1 := partitionInfo.GetNode("node-1:1234") @@ -654,3 +648,111 @@ partitions: assert.Equal(t, int64(schedulingNode1.GetAllocatedResource().Resources[common.Memory]), int64(0)) assert.Equal(t, int64(schedulingNode1.GetAvailableResource().Resources[common.Memory]), int64(20)) } + +func TestForeignPodResourceUsage(t *testing.T) { + // Register RM + configData := ` +partitions: + - + name: default + queues: + - name: root + submitacl: "*" + queues: + - name: a + resources: + max: + memory: 150 + vcore: 20 +` + // Start all tests + ms := &mockScheduler{} + defer ms.Stop() + + err := ms.Init(configData, false, false) + assert.NilError(t, err, "RegisterResourceManager failed") + + // Check queues of cache and scheduler. + partitionInfo := ms.scheduler.GetClusterContext().GetPartition("[rm:123]default") + assert.Assert(t, partitionInfo.GetTotalPartitionResource() == nil, "partition info max resource nil") + + // Register a node + err = ms.proxy.UpdateNode(&si.NodeRequest{ + Nodes: []*si.NodeInfo{ + { + NodeID: "node-1:1234", + Attributes: map[string]string{}, + SchedulableResource: &si.Resource{ + Resources: map[string]*si.Quantity{ + "memory": {Value: 100}, + "vcore": {Value: 10}, + }, + }, + Action: si.NodeInfo_CREATE, + }, + }, + RmID: "rm:123", + }) + + assert.NilError(t, err, "NodeRequest failed") + + // Wait until node is registered + context := ms.scheduler.GetClusterContext() + ms.mockRM.waitForAcceptedNode(t, "node-1:1234", 1000) + waitForNewNode(t, context, "node-1:1234", "[rm:123]default", 1000) + + assert.Equal(t, len(partitionInfo.GetNodes()), 1) + node1 := partitionInfo.GetNode("node-1:1234") + assert.Equal(t, int64(node1.GetCapacity().Resources[common.Memory]), int64(100)) + schedulingNode1 := ms.scheduler.GetClusterContext(). + GetNode("node-1:1234", "[rm:123]default") + assert.Equal(t, int64(schedulingNode1.GetAllocatedResource().Resources[common.Memory]), int64(0)) + assert.Equal(t, int64(schedulingNode1.GetAvailableResource().Resources[common.Memory]), int64(100)) + + // update node capacity - add foreign pod + res := resources.NewResourceFromMap(map[string]resources.Quantity{ + "memory": 80, "vcore": 5}) + err = ms.proxy.UpdateAllocation(&si.AllocationRequest{ + Allocations: []*si.Allocation{ + { + AllocationKey: "foreignpod-1", + ResourcePerAlloc: res.ToProto(), + AllocationTags: map[string]string{ + common.Foreign: common.AllocTypeDefault, + }, + NodeID: "node-1:1234", + }, + }, + RmID: "rm:123", + }) + assert.NilError(t, err, "NodeRequest failed") + waitForAvailableNodeResource(t, ms.scheduler.GetClusterContext(), "[rm:123]default", + []string{"node-1:1234"}, 20, 1000) + assert.Equal(t, int64(node1.GetCapacity().Resources[common.Memory]), int64(100)) + assert.Equal(t, int64(node1.GetCapacity().Resources[common.CPU]), int64(10)) + assert.Equal(t, int64(node1.GetOccupiedResource().Resources[common.Memory]), int64(80)) + assert.Equal(t, int64(node1.GetOccupiedResource().Resources[common.CPU]), int64(5)) + assert.Equal(t, int64(schedulingNode1.GetAllocatedResource().Resources[common.Memory]), int64(0)) + assert.Equal(t, int64(schedulingNode1.GetAvailableResource().Resources[common.Memory]), int64(20)) + + // update node capacity - remove foreign pod + err = ms.proxy.UpdateAllocation(&si.AllocationRequest{ + Releases: &si.AllocationReleasesRequest{ + AllocationsToRelease: []*si.AllocationRelease{ + { + AllocationKey: "foreignpod-1", + }, + }, + }, + RmID: "rm:123", + }) + assert.NilError(t, err) + waitForAvailableNodeResource(t, ms.scheduler.GetClusterContext(), "[rm:123]default", + []string{"node-1:1234"}, 100, 1000) + assert.Equal(t, int64(node1.GetCapacity().Resources[common.Memory]), int64(100)) + assert.Equal(t, int64(node1.GetCapacity().Resources[common.CPU]), int64(10)) + assert.Equal(t, int64(node1.GetOccupiedResource().Resources[common.Memory]), int64(0)) + assert.Equal(t, int64(node1.GetOccupiedResource().Resources[common.CPU]), int64(0)) + assert.Equal(t, int64(schedulingNode1.GetAllocatedResource().Resources[common.Memory]), int64(0)) + assert.Equal(t, int64(schedulingNode1.GetAvailableResource().Resources[common.Memory]), int64(100)) +} diff --git a/pkg/scheduler/utilities_test.go b/pkg/scheduler/utilities_test.go index a87b26bd7..d557d5312 100644 --- a/pkg/scheduler/utilities_test.go +++ b/pkg/scheduler/utilities_test.go @@ -31,6 +31,7 @@ import ( "github.com/apache/yunikorn-core/pkg/scheduler/objects" "github.com/apache/yunikorn-core/pkg/scheduler/ugm" "github.com/apache/yunikorn-core/pkg/webservice/dao" + siCommon "github.com/apache/yunikorn-scheduler-interface/lib/go/common" "github.com/apache/yunikorn-scheduler-interface/lib/go/si" ) @@ -52,6 +53,7 @@ const ( allocKey3 = "alloc-3" maxresources = "maxresources" maxapplications = "maxapplications" + foreignAlloc1 = "foreign-alloc-1" ) func newBasePartitionNoRootDefault() (*PartitionContext, error) { @@ -599,6 +601,26 @@ func newAllocationAll(allocKey, appID, nodeID, taskGroup string, res *resources. }) } +func newForeignRequest(allocKey string) *objects.Allocation { + return objects.NewAllocationFromSI(&si.Allocation{ + AllocationKey: allocKey, + AllocationTags: map[string]string{ + siCommon.Foreign: siCommon.AllocTypeDefault, + }, + }) +} + +func newForeignAllocation(allocKey, nodeID string) *objects.Allocation { + alloc := objects.NewAllocationFromSIAllocated(&si.Allocation{ + AllocationKey: allocKey, + AllocationTags: map[string]string{ + siCommon.Foreign: siCommon.AllocTypeDefault, + }, + NodeID: nodeID, + }) + return alloc +} + func newAllocationAskPreempt(allocKey, appID string, prio int32, res *resources.Resource) *objects.Allocation { return objects.NewAllocationFromSI(&si.Allocation{ AllocationKey: allocKey, @@ -614,6 +636,7 @@ func newAllocationAskPreempt(allocKey, appID string, prio int32, res *resources. }, }) } + func newNodeWithResources(nodeID string, max, occupied *resources.Resource) *objects.Node { proto := &si.NodeInfo{ NodeID: nodeID, diff --git a/pkg/webservice/dao/allocation_info.go b/pkg/webservice/dao/allocation_info.go index 1aa4b587a..e399cbdfd 100644 --- a/pkg/webservice/dao/allocation_info.go +++ b/pkg/webservice/dao/allocation_info.go @@ -34,3 +34,12 @@ type AllocationDAOInfo struct { Preempted bool `json:"preempted,omitempty"` Originator bool `json:"originator,omitempty"` } + +type ForeignAllocationDAOInfo struct { + AllocationKey string `json:"allocationKey"` // no omitempty, allocation key should not be empty + AllocationTime int64 `json:"allocationTime,omitempty"` + ResourcePerAlloc map[string]int64 `json:"resource,omitempty"` + Priority string `json:"priority,omitempty"` + NodeID string `json:"nodeId,omitempty"` + Preemptable bool `json:"preemptable,omitempty"` +} diff --git a/pkg/webservice/dao/node_info.go b/pkg/webservice/dao/node_info.go index 27fd7d58d..ac2c99b45 100644 --- a/pkg/webservice/dao/node_info.go +++ b/pkg/webservice/dao/node_info.go @@ -24,17 +24,18 @@ type NodesDAOInfo struct { } type NodeDAOInfo struct { - NodeID string `json:"nodeID"` // no omitempty, node id should not be empty - HostName string `json:"hostName,omitempty"` - RackName string `json:"rackName,omitempty"` - Attributes map[string]string `json:"attributes,omitempty"` - Capacity map[string]int64 `json:"capacity,omitempty"` - Allocated map[string]int64 `json:"allocated,omitempty"` - Occupied map[string]int64 `json:"occupied,omitempty"` - Available map[string]int64 `json:"available,omitempty"` - Utilized map[string]int64 `json:"utilized,omitempty"` - Allocations []*AllocationDAOInfo `json:"allocations,omitempty"` - Schedulable bool `json:"schedulable"` // no omitempty, a false value gives a quick way to understand whether a node is schedulable. - IsReserved bool `json:"isReserved"` // no omitempty, a false value gives a quick way to understand whether a node is reserved. - Reservations []string `json:"reservations,omitempty"` + NodeID string `json:"nodeID"` // no omitempty, node id should not be empty + HostName string `json:"hostName,omitempty"` + RackName string `json:"rackName,omitempty"` + Attributes map[string]string `json:"attributes,omitempty"` + Capacity map[string]int64 `json:"capacity,omitempty"` + Allocated map[string]int64 `json:"allocated,omitempty"` + Occupied map[string]int64 `json:"occupied,omitempty"` + Available map[string]int64 `json:"available,omitempty"` + Utilized map[string]int64 `json:"utilized,omitempty"` + Allocations []*AllocationDAOInfo `json:"allocations,omitempty"` + ForeignAllocations []*ForeignAllocationDAOInfo `json:"foreign_allocations,omitempty"` + Schedulable bool `json:"schedulable"` // no omitempty, a false value gives a quick way to understand whether a node is schedulable. + IsReserved bool `json:"isReserved"` // no omitempty, a false value gives a quick way to understand whether a node is reserved. + Reservations []string `json:"reservations,omitempty"` } diff --git a/pkg/webservice/handlers.go b/pkg/webservice/handlers.go index 49309e704..8e455a712 100644 --- a/pkg/webservice/handlers.go +++ b/pkg/webservice/handlers.go @@ -254,6 +254,19 @@ func getAllocationDAO(alloc *objects.Allocation) *dao.AllocationDAOInfo { return allocDAO } +func getForeignAllocationDAO(alloc *objects.Allocation) *dao.ForeignAllocationDAOInfo { + allocTime := alloc.GetCreateTime().UnixNano() + allocDAO := &dao.ForeignAllocationDAOInfo{ + AllocationKey: alloc.GetAllocationKey(), + AllocationTime: allocTime, + ResourcePerAlloc: alloc.GetAllocatedResource().DAOMap(), + Priority: strconv.Itoa(int(alloc.GetPriority())), + NodeID: alloc.GetNodeID(), + Preemptable: alloc.IsPreemptable(), + } + return allocDAO +} + func getAllocationsDAO(allocations []*objects.Allocation) []*dao.AllocationDAOInfo { allocsDAO := make([]*dao.AllocationDAOInfo, 0, len(allocations)) for _, alloc := range allocations { @@ -262,6 +275,14 @@ func getAllocationsDAO(allocations []*objects.Allocation) []*dao.AllocationDAOIn return allocsDAO } +func getForeignAllocationsDAO(allocations []*objects.Allocation) []*dao.ForeignAllocationDAOInfo { + allocsDAO := make([]*dao.ForeignAllocationDAOInfo, 0, len(allocations)) + for _, alloc := range allocations { + allocsDAO = append(allocsDAO, getForeignAllocationDAO(alloc)) + } + return allocsDAO +} + func getPlaceholderDAO(ph *objects.PlaceholderData) *dao.PlaceholderDAOInfo { phDAO := &dao.PlaceholderDAOInfo{ TaskGroupName: ph.TaskGroupName, @@ -375,19 +396,20 @@ func getAllocationAsksDAO(asks []*objects.Allocation) []*dao.AllocationAskDAOInf func getNodeDAO(node *objects.Node) *dao.NodeDAOInfo { return &dao.NodeDAOInfo{ - NodeID: node.NodeID, - HostName: node.Hostname, - RackName: node.Rackname, - Attributes: node.GetAttributes(), - Capacity: node.GetCapacity().DAOMap(), - Occupied: node.GetOccupiedResource().DAOMap(), - Allocated: node.GetAllocatedResource().DAOMap(), - Available: node.GetAvailableResource().DAOMap(), - Utilized: node.GetUtilizedResource().DAOMap(), - Allocations: getAllocationsDAO(node.GetAllAllocations()), - Schedulable: node.IsSchedulable(), - IsReserved: node.IsReserved(), - Reservations: node.GetReservationKeys(), + NodeID: node.NodeID, + HostName: node.Hostname, + RackName: node.Rackname, + Attributes: node.GetAttributes(), + Capacity: node.GetCapacity().DAOMap(), + Occupied: node.GetOccupiedResource().DAOMap(), + Allocated: node.GetAllocatedResource().DAOMap(), + Available: node.GetAvailableResource().DAOMap(), + Utilized: node.GetUtilizedResource().DAOMap(), + Allocations: getAllocationsDAO(node.GetYunikornAllocations()), + ForeignAllocations: getForeignAllocationsDAO(node.GetForeignAllocations()), + Schedulable: node.IsSchedulable(), + IsReserved: node.IsReserved(), + Reservations: node.GetReservationKeys(), } } diff --git a/pkg/webservice/handlers_test.go b/pkg/webservice/handlers_test.go index b9e4d0881..c95e5fc95 100644 --- a/pkg/webservice/handlers_test.go +++ b/pkg/webservice/handlers_test.go @@ -1407,18 +1407,24 @@ func TestGetPartitionNode(t *testing.T) { resAlloc1 := resources.NewResourceFromMap(map[string]resources.Quantity{siCommon.Memory: 500, siCommon.CPU: 300}) resAlloc2 := resources.NewResourceFromMap(map[string]resources.Quantity{siCommon.Memory: 300, siCommon.CPU: 500}) alloc1 := newAlloc("alloc-1", appID, node1ID, resAlloc1) - allocs := []*objects.Allocation{alloc1} err = partition.AddNode(node1) assert.NilError(t, err, "add node to partition should not have failed") - _, allocCreated, err := partition.UpdateAllocation(allocs[0]) + _, allocCreated, err := partition.UpdateAllocation(alloc1) assert.NilError(t, err, "add alloc-1 should not have failed") assert.Check(t, allocCreated) + falloc1 := newForeignAlloc("foreign-1", "", node1ID, resAlloc1, siCommon.AllocTypeDefault, 0) + _, allocCreated, err = partition.UpdateAllocation(falloc1) + assert.NilError(t, err, "add falloc-1 should not have failed") + assert.Check(t, allocCreated) + falloc2 := newForeignAlloc("foreign-2", "", node1ID, resAlloc2, siCommon.AllocTypeStatic, 123) + _, allocCreated, err = partition.UpdateAllocation(falloc2) + assert.NilError(t, err, "add falloc-2 should not have failed") + assert.Check(t, allocCreated) alloc2 := newAlloc("alloc-2", appID, node2ID, resAlloc2) - allocs = []*objects.Allocation{alloc2} err = partition.AddNode(node2) assert.NilError(t, err, "add node to partition should not have failed") - _, allocCreated, err = partition.UpdateAllocation(allocs[0]) + _, allocCreated, err = partition.UpdateAllocation(alloc2) assert.NilError(t, err, "add alloc-2 should not have failed") assert.Check(t, allocCreated) @@ -1434,6 +1440,9 @@ func TestGetPartitionNode(t *testing.T) { err = json.Unmarshal(resp.outputBytes, &nodeInfo) assert.NilError(t, err, unmarshalError) assertNodeInfo(t, &nodeInfo, node1ID, "alloc-1", attributesOfnode1, map[string]int64{"memory": 50, "vcore": 30}) + assert.Equal(t, 2, len(nodeInfo.ForeignAllocations)) + assertForeignAllocation(t, "foreign-1", "0", node1ID, resAlloc1, true, nodeInfo.ForeignAllocations[0]) + assertForeignAllocation(t, "foreign-2", "123", node1ID, resAlloc2, false, nodeInfo.ForeignAllocations[1]) // Test node id is missing req, err = createRequest(t, "/ws/v1/partition/default/node/node_1", map[string]string{"partition": "default", "node": ""}) @@ -1464,6 +1473,20 @@ func assertNodeInfo(t *testing.T, node *dao.NodeDAOInfo, expectedID string, expe assert.DeepEqual(t, expectedUtilized, node.Utilized) } +func assertForeignAllocation(t *testing.T, key, priority, nodeID string, expectedRes *resources.Resource, preemptable bool, info *dao.ForeignAllocationDAOInfo) { + t.Helper() + assert.Equal(t, key, info.AllocationKey) + assert.Equal(t, priority, info.Priority) + assert.Equal(t, nodeID, info.NodeID) + resMap := make(map[string]resources.Quantity) + for k, v := range info.ResourcePerAlloc { + resMap[k] = resources.Quantity(v) + } + resFromInfo := resources.NewResourceFromMap(resMap) + assert.Assert(t, resources.Equals(resFromInfo, expectedRes)) + assert.Equal(t, preemptable, info.Preemptable) +} + // addApp Add app to the given partition and assert the app count, state etc func addApp(t *testing.T, id string, part *scheduler.PartitionContext, queueName string, isCompleted bool) *objects.Application { return addAppWithUserGroup(t, id, part, queueName, isCompleted, security.UserGroup{}) @@ -2991,3 +3014,15 @@ func newAlloc(allocationKey string, appID string, nodeID string, resAlloc *resou ResourcePerAlloc: resAlloc.ToProto(), }) } + +func newForeignAlloc(allocationKey string, appID string, nodeID string, resAlloc *resources.Resource, fType string, priority int32) *objects.Allocation { + return objects.NewAllocationFromSI(&si.Allocation{ + AllocationKey: allocationKey, + NodeID: nodeID, + ResourcePerAlloc: resAlloc.ToProto(), + AllocationTags: map[string]string{ + siCommon.Foreign: fType, + }, + Priority: priority, + }) +}