From c494436048a1c6c59fda55b8abfe016636b8f1d6 Mon Sep 17 00:00:00 2001 From: qzhu Date: Tue, 28 Nov 2023 11:52:33 +0800 Subject: [PATCH 1/4] YUNIKORN-2196 Optimize aggregated resources tracking feature to not add overhead to scheduling --- pkg/events/event_publisher.go | 75 +++++++++++++++++ pkg/scheduler/objects/application.go | 7 -- pkg/scheduler/objects/application_events.go | 4 +- pkg/tracking/resource_tracker.go | 91 +++++++++++++++++++++ 4 files changed, 169 insertions(+), 8 deletions(-) create mode 100644 pkg/tracking/resource_tracker.go diff --git a/pkg/events/event_publisher.go b/pkg/events/event_publisher.go index 216edb7ee..8cbabbff0 100644 --- a/pkg/events/event_publisher.go +++ b/pkg/events/event_publisher.go @@ -19,6 +19,13 @@ package events import ( + "fmt" + "github.com/apache/yunikorn-core/pkg/common" + "github.com/apache/yunikorn-core/pkg/common/resources" + "github.com/apache/yunikorn-scheduler-interface/lib/go/si" + "strconv" + "strings" + "sync" "sync/atomic" "time" @@ -31,10 +38,55 @@ import ( // stores the push event internal var defaultPushEventInterval = 2 * time.Second +// Util struct to keep track of application resource usage +type TrackedResource struct { + // Two level map for aggregated resource usage + // With instance type being the top level key, the mapped value is a map: + // resource type (CPU, memory etc) -> the aggregated used time (in seconds) of the resource type + // + TrackedResourceMap map[string]map[string]int64 + + sync.RWMutex +} + +// Aggregate the resource usage to UsedResourceMap[instType] +// The time the given resource used is the delta between the resource createTime and currentTime +func (ur *TrackedResource) AggregateTrackedResource(resource *resources.Resource, releaseTime time.Time, message string) { + ur.Lock() + defer ur.Unlock() + // The message is in the format of "instanceType:timestamp" + // Split the message to get the instance type and the timestamp for bind time + // Convert the string to an int64 + unixNano, err := strconv.ParseInt(strings.Split(message, common.Separator)[1], 10, 64) + if err != nil { + log.Log(log.Events).Warn("Failed to parse the timestamp", zap.Error(err), zap.String("message", message)) + return + } + + // Convert Unix timestamp in nanoseconds to a time.Time object + bindTime := time.Unix(0, unixNano) + timeDiff := int64(releaseTime.Sub(bindTime).Seconds()) + instType := strings.Split(message, common.Separator)[0] + aggregatedResourceTime, ok := ur.TrackedResourceMap[instType] + if !ok { + aggregatedResourceTime = map[string]int64{} + } + for key, element := range resource.Resources { + curUsage, ok := aggregatedResourceTime[key] + if !ok { + curUsage = 0 + } + curUsage += int64(element) * timeDiff // resource size times timeDiff + aggregatedResourceTime[key] = curUsage + } + ur.TrackedResourceMap[instType] = aggregatedResourceTime +} + type EventPublisher struct { store *EventStore pushEventInterval time.Duration stop atomic.Bool + trackingAppMap map[string]*TrackedResource // storing eventChannel } func CreateShimPublisher(store *EventStore) *EventPublisher { @@ -58,6 +110,29 @@ func (sp *EventPublisher) StartService() { log.Log(log.Events).Debug("Sending eventChannel", zap.Int("number of messages", len(messages))) eventPlugin.SendEvent(messages) } + for _, message := range messages { + log.Log(log.Events).Debug("aggregate resource usage", zap.String("message", fmt.Sprintf("%+v", message))) + if message.Type == si.EventRecord_APP && message.EventChangeType == si.EventRecord_REMOVE { + // We need to clean up the trackingAppMap when an application is removed + if message.ReferenceID == "" { + // This is an application removal event, remove the application from the trackingAppMap + delete(sp.trackingAppMap, message.ObjectID) + log.Log(log.Events).Info("YK_APP_SUMMARY:", + zap.String("appID", message.ObjectID), + zap.Any("resourceUsage", sp.trackingAppMap[message.ObjectID].TrackedResourceMap), + ) + } else { + // This is an allocation removal event, aggregate the resources used by the allocation + if _, ok := sp.trackingAppMap[message.ObjectID]; !ok { + sp.trackingAppMap[message.ObjectID] = &TrackedResource{ + TrackedResourceMap: make(map[string]map[string]int64), + } + } else { + sp.trackingAppMap[message.ObjectID].AggregateTrackedResource(resources.NewResourceFromProto(message.Resource), time.Unix(0, message.TimestampNano), message.Message) + } + } + } + } } time.Sleep(sp.pushEventInterval) } diff --git a/pkg/scheduler/objects/application.go b/pkg/scheduler/objects/application.go index 086f0219a..04ceda3cf 100644 --- a/pkg/scheduler/objects/application.go +++ b/pkg/scheduler/objects/application.go @@ -1806,16 +1806,11 @@ func (sa *Application) removeAllocationInternal(uuid string, releaseType si.Term eventWarning = "Application state not changed while removing a placeholder allocation" } } - // Aggregate the resources used by this alloc to the application's resource tracker - sa.trackCompletedResource(alloc) sa.decUserResourceUsage(alloc.GetAllocatedResource(), removeApp) } else { sa.allocatedResource = resources.Sub(sa.allocatedResource, alloc.GetAllocatedResource()) - // Aggregate the resources used by this alloc to the application's resource tracker - sa.trackCompletedResource(alloc) - // When the resource trackers are zero we should not expect anything to come in later. if sa.hasZeroAllocations() { removeApp = true @@ -1865,8 +1860,6 @@ func (sa *Application) RemoveAllAllocations() []*Allocation { allocationsToRelease := make([]*Allocation, 0) for _, alloc := range sa.allocations { allocationsToRelease = append(allocationsToRelease, alloc) - // Aggregate the resources used by this alloc to the application's user resource tracker - sa.trackCompletedResource(alloc) sa.appEvents.sendRemoveAllocationEvent(alloc, si.TerminationType_STOPPED_BY_RM) } diff --git a/pkg/scheduler/objects/application_events.go b/pkg/scheduler/objects/application_events.go index 59348dd91..f75229aef 100644 --- a/pkg/scheduler/objects/application_events.go +++ b/pkg/scheduler/objects/application_events.go @@ -20,6 +20,7 @@ package objects import ( "fmt" + "strconv" "time" "golang.org/x/time/rate" @@ -89,7 +90,8 @@ func (evt *applicationEvents) sendRemoveAllocationEvent(alloc *Allocation, termi eventChangeDetail = si.EventRecord_ALLOC_REPLACED } - event := events.CreateAppEventRecord(evt.app.ApplicationID, common.Empty, alloc.GetUUID(), si.EventRecord_REMOVE, eventChangeDetail, alloc.GetAllocatedResource()) + // add bind time and instanceType to the event in the message field + event := events.CreateAppEventRecord(evt.app.ApplicationID, alloc.GetInstanceType()+common.Separator+strconv.FormatInt(alloc.bindTime.UnixNano(), 10), alloc.GetUUID(), si.EventRecord_REMOVE, eventChangeDetail, alloc.GetAllocatedResource()) evt.eventSystem.AddEvent(event) } diff --git a/pkg/tracking/resource_tracker.go b/pkg/tracking/resource_tracker.go new file mode 100644 index 000000000..67ddd6669 --- /dev/null +++ b/pkg/tracking/resource_tracker.go @@ -0,0 +1,91 @@ +package tracking + +import ( + "github.com/apache/yunikorn-core/pkg/common/resources" + "github.com/apache/yunikorn-core/pkg/events" + "github.com/apache/yunikorn-scheduler-interface/lib/go/si" + "go.uber.org/atomic" + "sync" + "time" +) + +var resourceTracker ResourceTracker + +// Util struct to keep track of application resource usage +type TrackedResource struct { + // Two level map for aggregated resource usage + // With instance type being the top level key, the mapped value is a map: + // resource type (CPU, memory etc) -> the aggregated used time (in seconds) of the resource type + // + TrackedResourceMap map[string]map[string]int64 + + sync.RWMutex +} + +// Aggregate the resource usage to UsedResourceMap[instType] +// The time the given resource used is the delta between the resource createTime and currentTime +func (ur *TrackedResource) AggregateTrackedResource(instType string, + resource *resources.Resource, releaseTime time.Time, bindTime time.Time) { + ur.Lock() + defer ur.Unlock() + + timeDiff := int64(releaseTime.Sub(bindTime).Seconds()) + aggregatedResourceTime, ok := ur.TrackedResourceMap[instType] + if !ok { + aggregatedResourceTime = map[string]int64{} + } + for key, element := range resource.Resources { + curUsage, ok := aggregatedResourceTime[key] + if !ok { + curUsage = 0 + } + curUsage += int64(element) * timeDiff // resource size times timeDiff + aggregatedResourceTime[key] = curUsage + } + ur.TrackedResourceMap[instType] = aggregatedResourceTime +} + +type ResourceTracker interface { + AddEvent(event *si.EventRecord) + StartService() + Stop() + IsResourceTrackerEnabled() bool + GetEventsFromID(uint64, uint64) ([]*si.EventRecord, uint64, uint64) +} + +type ResourceTrackerImpl struct { + store *events.EventStore // storing eventChannel + trackingAppMap map[string]*TrackedResource // storing eventChannel + channel chan *si.EventRecord // channelling input eventChannel + stop atomic.Bool + stopped bool + + trackingEnabled bool + trackingInterval time.Duration + sync.RWMutex +} + +func (rt *ResourceTrackerImpl) StartService() { + go func() { + for { + if rt.stop.Load() { + break + } + messages := rt.store.CollectEvents() + if len(messages) > 0 { + for _, message := range messages { + if message.Type == si.EventRecord_APP { + if _, ok := rt.trackingAppMap[message.ObjectID]; !ok { + rt.trackingAppMap[message.ObjectID] = &TrackedResource{ + TrackedResourceMap: make(map[string]map[string]int64), + } + } else { + rt.trackingAppMap[message.ObjectID].AggregateTrackedResource("", resources.NewResourceFromProto(message.Resource), time.Unix(0, message.TimestampNano), time.Unix(0, strings.) + } + } + } + } + time.Sleep(rt.trackingInterval) + } + }() +} From 0487b15f58713724f55d2113d9a42a336180eae1 Mon Sep 17 00:00:00 2001 From: qzhu Date: Tue, 28 Nov 2023 13:56:03 +0800 Subject: [PATCH 2/4] fix --- pkg/events/event_publisher.go | 16 ++++++++++------ pkg/scheduler/objects/application.go | 2 ++ pkg/scheduler/partition.go | 2 +- 3 files changed, 13 insertions(+), 7 deletions(-) diff --git a/pkg/events/event_publisher.go b/pkg/events/event_publisher.go index 8cbabbff0..5b969c742 100644 --- a/pkg/events/event_publisher.go +++ b/pkg/events/event_publisher.go @@ -36,7 +36,7 @@ import ( ) // stores the push event internal -var defaultPushEventInterval = 2 * time.Second +var defaultPushEventInterval = 1 * time.Second // Util struct to keep track of application resource usage type TrackedResource struct { @@ -93,6 +93,7 @@ func CreateShimPublisher(store *EventStore) *EventPublisher { publisher := &EventPublisher{ store: store, pushEventInterval: defaultPushEventInterval, + trackingAppMap: make(map[string]*TrackedResource), } publisher.stop.Store(false) return publisher @@ -111,25 +112,28 @@ func (sp *EventPublisher) StartService() { eventPlugin.SendEvent(messages) } for _, message := range messages { - log.Log(log.Events).Debug("aggregate resource usage", zap.String("message", fmt.Sprintf("%+v", message))) if message.Type == si.EventRecord_APP && message.EventChangeType == si.EventRecord_REMOVE { + log.Log(log.Events).Debug("aggregate resource usage", zap.String("message", fmt.Sprintf("%+v", message))) // We need to clean up the trackingAppMap when an application is removed if message.ReferenceID == "" { - // This is an application removal event, remove the application from the trackingAppMap - delete(sp.trackingAppMap, message.ObjectID) log.Log(log.Events).Info("YK_APP_SUMMARY:", zap.String("appID", message.ObjectID), zap.Any("resourceUsage", sp.trackingAppMap[message.ObjectID].TrackedResourceMap), ) + // This is an application removal event, remove the application from the trackingAppMap + delete(sp.trackingAppMap, message.ObjectID) } else { // This is an allocation removal event, aggregate the resources used by the allocation if _, ok := sp.trackingAppMap[message.ObjectID]; !ok { sp.trackingAppMap[message.ObjectID] = &TrackedResource{ TrackedResourceMap: make(map[string]map[string]int64), } - } else { - sp.trackingAppMap[message.ObjectID].AggregateTrackedResource(resources.NewResourceFromProto(message.Resource), time.Unix(0, message.TimestampNano), message.Message) } + sp.trackingAppMap[message.ObjectID].AggregateTrackedResource(resources.NewResourceFromProto(message.Resource), time.Unix(0, message.TimestampNano), message.Message) + //log.Log(log.Events).Info("YK_APP_SUMMARY:", + // zap.String("appID", message.ObjectID), + // zap.Any("resourceUsage", sp.trackingAppMap[message.ObjectID].TrackedResourceMap), + //) } } } diff --git a/pkg/scheduler/objects/application.go b/pkg/scheduler/objects/application.go index 116fb021a..bb755b78d 100644 --- a/pkg/scheduler/objects/application.go +++ b/pkg/scheduler/objects/application.go @@ -1798,6 +1798,7 @@ func (sa *Application) removeAllocationInternal(uuid string, releaseType si.Term } } delete(sa.allocations, uuid) + sa.trackCompletedResource(alloc) sa.appEvents.sendRemoveAllocationEvent(alloc, releaseType) return alloc } @@ -1831,6 +1832,7 @@ func (sa *Application) RemoveAllAllocations() []*Allocation { for _, alloc := range sa.allocations { allocationsToRelease = append(allocationsToRelease, alloc) sa.appEvents.sendRemoveAllocationEvent(alloc, si.TerminationType_STOPPED_BY_RM) + sa.trackCompletedResource(alloc) } if resources.IsZero(sa.pending) { diff --git a/pkg/scheduler/partition.go b/pkg/scheduler/partition.go index 05a856e82..25d7f32bb 100644 --- a/pkg/scheduler/partition.go +++ b/pkg/scheduler/partition.go @@ -1483,7 +1483,7 @@ func (pc *PartitionContext) moveTerminatedApp(appID string) { log.Log(log.SchedPartition).Info("Removing terminated application from the application list", zap.String("appID", appID), zap.String("app status", app.CurrentState())) - //app.LogAppSummary(pc.RmID) + app.LogAppSummary(pc.RmID) app.CleanupTrackedResource() pc.Lock() defer pc.Unlock() From 351735ac4fab6a476074fb46a831524931d2952a Mon Sep 17 00:00:00 2001 From: qzhu Date: Tue, 28 Nov 2023 19:46:08 +0800 Subject: [PATCH 3/4] Move to tracking package. --- pkg/events/event_publisher.go | 59 ++------------------------- pkg/tracking/resource_tracker.go | 69 ++++++++------------------------ 2 files changed, 21 insertions(+), 107 deletions(-) diff --git a/pkg/events/event_publisher.go b/pkg/events/event_publisher.go index 5b969c742..3a9507d8e 100644 --- a/pkg/events/event_publisher.go +++ b/pkg/events/event_publisher.go @@ -20,12 +20,9 @@ package events import ( "fmt" - "github.com/apache/yunikorn-core/pkg/common" "github.com/apache/yunikorn-core/pkg/common/resources" + "github.com/apache/yunikorn-core/pkg/tracking" "github.com/apache/yunikorn-scheduler-interface/lib/go/si" - "strconv" - "strings" - "sync" "sync/atomic" "time" @@ -38,62 +35,18 @@ import ( // stores the push event internal var defaultPushEventInterval = 1 * time.Second -// Util struct to keep track of application resource usage -type TrackedResource struct { - // Two level map for aggregated resource usage - // With instance type being the top level key, the mapped value is a map: - // resource type (CPU, memory etc) -> the aggregated used time (in seconds) of the resource type - // - TrackedResourceMap map[string]map[string]int64 - - sync.RWMutex -} - -// Aggregate the resource usage to UsedResourceMap[instType] -// The time the given resource used is the delta between the resource createTime and currentTime -func (ur *TrackedResource) AggregateTrackedResource(resource *resources.Resource, releaseTime time.Time, message string) { - ur.Lock() - defer ur.Unlock() - // The message is in the format of "instanceType:timestamp" - // Split the message to get the instance type and the timestamp for bind time - // Convert the string to an int64 - unixNano, err := strconv.ParseInt(strings.Split(message, common.Separator)[1], 10, 64) - if err != nil { - log.Log(log.Events).Warn("Failed to parse the timestamp", zap.Error(err), zap.String("message", message)) - return - } - - // Convert Unix timestamp in nanoseconds to a time.Time object - bindTime := time.Unix(0, unixNano) - timeDiff := int64(releaseTime.Sub(bindTime).Seconds()) - instType := strings.Split(message, common.Separator)[0] - aggregatedResourceTime, ok := ur.TrackedResourceMap[instType] - if !ok { - aggregatedResourceTime = map[string]int64{} - } - for key, element := range resource.Resources { - curUsage, ok := aggregatedResourceTime[key] - if !ok { - curUsage = 0 - } - curUsage += int64(element) * timeDiff // resource size times timeDiff - aggregatedResourceTime[key] = curUsage - } - ur.TrackedResourceMap[instType] = aggregatedResourceTime -} - type EventPublisher struct { store *EventStore pushEventInterval time.Duration stop atomic.Bool - trackingAppMap map[string]*TrackedResource // storing eventChannel + trackingAppMap map[string]*tracking.TrackedResource // storing eventChannel } func CreateShimPublisher(store *EventStore) *EventPublisher { publisher := &EventPublisher{ store: store, pushEventInterval: defaultPushEventInterval, - trackingAppMap: make(map[string]*TrackedResource), + trackingAppMap: make(map[string]*tracking.TrackedResource), } publisher.stop.Store(false) return publisher @@ -125,15 +78,11 @@ func (sp *EventPublisher) StartService() { } else { // This is an allocation removal event, aggregate the resources used by the allocation if _, ok := sp.trackingAppMap[message.ObjectID]; !ok { - sp.trackingAppMap[message.ObjectID] = &TrackedResource{ + sp.trackingAppMap[message.ObjectID] = &tracking.TrackedResource{ TrackedResourceMap: make(map[string]map[string]int64), } } sp.trackingAppMap[message.ObjectID].AggregateTrackedResource(resources.NewResourceFromProto(message.Resource), time.Unix(0, message.TimestampNano), message.Message) - //log.Log(log.Events).Info("YK_APP_SUMMARY:", - // zap.String("appID", message.ObjectID), - // zap.Any("resourceUsage", sp.trackingAppMap[message.ObjectID].TrackedResourceMap), - //) } } } diff --git a/pkg/tracking/resource_tracker.go b/pkg/tracking/resource_tracker.go index 67ddd6669..be145e10d 100644 --- a/pkg/tracking/resource_tracker.go +++ b/pkg/tracking/resource_tracker.go @@ -1,16 +1,16 @@ package tracking import ( + "github.com/apache/yunikorn-core/pkg/common" "github.com/apache/yunikorn-core/pkg/common/resources" - "github.com/apache/yunikorn-core/pkg/events" - "github.com/apache/yunikorn-scheduler-interface/lib/go/si" - "go.uber.org/atomic" + "github.com/apache/yunikorn-core/pkg/log" + "go.uber.org/zap" + "strconv" + "strings" "sync" "time" ) -var resourceTracker ResourceTracker - // Util struct to keep track of application resource usage type TrackedResource struct { // Two level map for aggregated resource usage @@ -24,12 +24,22 @@ type TrackedResource struct { // Aggregate the resource usage to UsedResourceMap[instType] // The time the given resource used is the delta between the resource createTime and currentTime -func (ur *TrackedResource) AggregateTrackedResource(instType string, - resource *resources.Resource, releaseTime time.Time, bindTime time.Time) { +func (ur *TrackedResource) AggregateTrackedResource(resource *resources.Resource, releaseTime time.Time, message string) { ur.Lock() defer ur.Unlock() + // The message is in the format of "instanceType:timestamp" + // Split the message to get the instance type and the timestamp for bind time + // Convert the string to an int64 + unixNano, err := strconv.ParseInt(strings.Split(message, common.Separator)[1], 10, 64) + if err != nil { + log.Log(log.Events).Warn("Failed to parse the timestamp", zap.Error(err), zap.String("message", message)) + return + } + // Convert Unix timestamp in nanoseconds to a time.Time object + bindTime := time.Unix(0, unixNano) timeDiff := int64(releaseTime.Sub(bindTime).Seconds()) + instType := strings.Split(message, common.Separator)[0] aggregatedResourceTime, ok := ur.TrackedResourceMap[instType] if !ok { aggregatedResourceTime = map[string]int64{} @@ -44,48 +54,3 @@ func (ur *TrackedResource) AggregateTrackedResource(instType string, } ur.TrackedResourceMap[instType] = aggregatedResourceTime } - -type ResourceTracker interface { - AddEvent(event *si.EventRecord) - StartService() - Stop() - IsResourceTrackerEnabled() bool - GetEventsFromID(uint64, uint64) ([]*si.EventRecord, uint64, uint64) -} - -type ResourceTrackerImpl struct { - store *events.EventStore // storing eventChannel - trackingAppMap map[string]*TrackedResource // storing eventChannel - channel chan *si.EventRecord // channelling input eventChannel - stop atomic.Bool - stopped bool - - trackingEnabled bool - trackingInterval time.Duration - sync.RWMutex -} - -func (rt *ResourceTrackerImpl) StartService() { - go func() { - for { - if rt.stop.Load() { - break - } - messages := rt.store.CollectEvents() - if len(messages) > 0 { - for _, message := range messages { - if message.Type == si.EventRecord_APP { - if _, ok := rt.trackingAppMap[message.ObjectID]; !ok { - rt.trackingAppMap[message.ObjectID] = &TrackedResource{ - TrackedResourceMap: make(map[string]map[string]int64), - } - } else { - rt.trackingAppMap[message.ObjectID].AggregateTrackedResource("", resources.NewResourceFromProto(message.Resource), time.Unix(0, message.TimestampNano), time.Unix(0, strings.) - } - } - } - } - time.Sleep(rt.trackingInterval) - } - }() -} From 0d3567257a48e39b80c8a639e1e61021c5aec5ec Mon Sep 17 00:00:00 2001 From: qzhu Date: Tue, 28 Nov 2023 21:32:10 +0800 Subject: [PATCH 4/4] Remove scheduling tracking logic --- pkg/common/resources/tracked_resources.go | 3 +- .../resources/tracked_resources_test.go | 2 +- pkg/events/event_publisher.go | 68 +++++++----- pkg/scheduler/objects/application.go | 105 ++---------------- pkg/scheduler/objects/application_test.go | 89 --------------- pkg/scheduler/partition.go | 2 - pkg/scheduler/partition_test.go | 8 +- pkg/tracking/resource_tracker.go | 56 ---------- 8 files changed, 54 insertions(+), 279 deletions(-) delete mode 100644 pkg/tracking/resource_tracker.go diff --git a/pkg/common/resources/tracked_resources.go b/pkg/common/resources/tracked_resources.go index 209e1c114..b39b9ebfc 100644 --- a/pkg/common/resources/tracked_resources.go +++ b/pkg/common/resources/tracked_resources.go @@ -84,14 +84,13 @@ func (tr *TrackedResource) Clone() *TrackedResource { // AggregateTrackedResource aggregates resource usage to TrackedResourceMap[instType]. // The time the given resource used is the delta between the resource createTime and currentTime. func (tr *TrackedResource) AggregateTrackedResource(instType string, - resource *Resource, bindTime time.Time) { + resource *Resource, bindTime time.Time, releaseTime time.Time) { if resource == nil { return } tr.Lock() defer tr.Unlock() - releaseTime := time.Now() timeDiff := int64(releaseTime.Sub(bindTime).Seconds()) aggregatedResourceTime, ok := tr.TrackedResourceMap[instType] if !ok { diff --git a/pkg/common/resources/tracked_resources_test.go b/pkg/common/resources/tracked_resources_test.go index 0e2043796..3f7d9906d 100644 --- a/pkg/common/resources/tracked_resources_test.go +++ b/pkg/common/resources/tracked_resources_test.go @@ -201,7 +201,7 @@ func TestTrackedResourceAggregateTrackedResource(t *testing.T) { for _, tt := range tests { t.Run(tt.caseName, func(t *testing.T) { original := NewTrackedResourceFromMap(tt.input.trackedResource) - original.AggregateTrackedResource(tt.input.instType, &Resource{tt.input.otherResource}, tt.input.bindTime) + original.AggregateTrackedResource(tt.input.instType, &Resource{tt.input.otherResource}, tt.input.bindTime, time.Now()) expected := NewTrackedResourceFromMap(tt.expectedTrackedResource) if !reflect.DeepEqual(original.TrackedResourceMap, expected.TrackedResourceMap) { diff --git a/pkg/events/event_publisher.go b/pkg/events/event_publisher.go index 3a9507d8e..5cd70b1e7 100644 --- a/pkg/events/event_publisher.go +++ b/pkg/events/event_publisher.go @@ -20,9 +20,11 @@ package events import ( "fmt" + "github.com/apache/yunikorn-core/pkg/common" "github.com/apache/yunikorn-core/pkg/common/resources" - "github.com/apache/yunikorn-core/pkg/tracking" "github.com/apache/yunikorn-scheduler-interface/lib/go/si" + "strconv" + "strings" "sync/atomic" "time" @@ -39,14 +41,14 @@ type EventPublisher struct { store *EventStore pushEventInterval time.Duration stop atomic.Bool - trackingAppMap map[string]*tracking.TrackedResource // storing eventChannel + trackingAppMap map[string]*resources.TrackedResource // storing eventChannel } func CreateShimPublisher(store *EventStore) *EventPublisher { publisher := &EventPublisher{ store: store, pushEventInterval: defaultPushEventInterval, - trackingAppMap: make(map[string]*tracking.TrackedResource), + trackingAppMap: make(map[string]*resources.TrackedResource), } publisher.stop.Store(false) return publisher @@ -64,28 +66,7 @@ func (sp *EventPublisher) StartService() { log.Log(log.Events).Debug("Sending eventChannel", zap.Int("number of messages", len(messages))) eventPlugin.SendEvent(messages) } - for _, message := range messages { - if message.Type == si.EventRecord_APP && message.EventChangeType == si.EventRecord_REMOVE { - log.Log(log.Events).Debug("aggregate resource usage", zap.String("message", fmt.Sprintf("%+v", message))) - // We need to clean up the trackingAppMap when an application is removed - if message.ReferenceID == "" { - log.Log(log.Events).Info("YK_APP_SUMMARY:", - zap.String("appID", message.ObjectID), - zap.Any("resourceUsage", sp.trackingAppMap[message.ObjectID].TrackedResourceMap), - ) - // This is an application removal event, remove the application from the trackingAppMap - delete(sp.trackingAppMap, message.ObjectID) - } else { - // This is an allocation removal event, aggregate the resources used by the allocation - if _, ok := sp.trackingAppMap[message.ObjectID]; !ok { - sp.trackingAppMap[message.ObjectID] = &tracking.TrackedResource{ - TrackedResourceMap: make(map[string]map[string]int64), - } - } - sp.trackingAppMap[message.ObjectID].AggregateTrackedResource(resources.NewResourceFromProto(message.Resource), time.Unix(0, message.TimestampNano), message.Message) - } - } - } + sp.AggregateAppTrackedResourceFromEvents(messages) } time.Sleep(sp.pushEventInterval) } @@ -96,6 +77,43 @@ func (sp *EventPublisher) Stop() { sp.stop.Store(true) } +func (sp *EventPublisher) AggregateAppTrackedResourceFromEvents(messages []*si.EventRecord) { + for _, message := range messages { + if message.Type == si.EventRecord_APP && message.EventChangeType == si.EventRecord_REMOVE { + log.Log(log.Events).Debug("aggregate resource usage", zap.String("message", fmt.Sprintf("%+v", message))) + // We need to clean up the trackingAppMap when an application is removed + if message.ReferenceID == "" { + log.Log(log.Events).Info("YK_APP_SUMMARY:", + zap.String("appID", message.ObjectID), + zap.Any("resourceUsage", sp.trackingAppMap[message.ObjectID].TrackedResourceMap), + ) + // This is an application removal event, remove the application from the trackingAppMap + delete(sp.trackingAppMap, message.ObjectID) + } else { + // This is an allocation removal event, aggregate the resources used by the allocation + if _, ok := sp.trackingAppMap[message.ObjectID]; !ok { + sp.trackingAppMap[message.ObjectID] = &resources.TrackedResource{ + TrackedResourceMap: make(map[string]map[string]int64), + } + } + + // The message is in the format of "instanceType:timestamp" + // Split the message to get the instance type and the timestamp for bind time + // Convert the string to an int64 + unixNano, err := strconv.ParseInt(strings.Split(message.Message, common.Separator)[1], 10, 64) + if err != nil { + log.Log(log.Events).Warn("Failed to parse the timestamp", zap.Error(err), zap.String("message", message.Message)) + return + } + instType := strings.Split(message.Message, common.Separator)[0] + // Convert Unix timestamp in nanoseconds to a time.Time object + bindTime := time.Unix(0, unixNano) + sp.trackingAppMap[message.ObjectID].AggregateTrackedResource(instType, resources.NewResourceFromProto(message.Resource), bindTime, time.Unix(0, message.TimestampNano)) + } + } + } +} + func (sp *EventPublisher) getEventStore() *EventStore { return sp.store } diff --git a/pkg/scheduler/objects/application.go b/pkg/scheduler/objects/application.go index bb755b78d..31c9be42f 100644 --- a/pkg/scheduler/objects/application.go +++ b/pkg/scheduler/objects/application.go @@ -78,19 +78,14 @@ type Application struct { tags map[string]string // application tags used in scheduling // Private mutable fields need protection - queuePath string - queue *Queue // queue the application is running in - pending *resources.Resource // pending resources from asks for the app - reservations map[string]*reservation // a map of reservations - requests map[string]*AllocationAsk // a map of asks - sortedRequests sortedRequests // list of requests pre-sorted - user security.UserGroup // owner of the application - allocatedResource *resources.Resource // total allocated resources - - usedResource *resources.TrackedResource // keep track of resource usage of the application - preemptedResource *resources.TrackedResource // keep track of preempted resource usage of the application - placeholderResource *resources.TrackedResource // keep track of placeholder resource usage of the application - + queuePath string + queue *Queue // queue the application is running in + pending *resources.Resource // pending resources from asks for the app + reservations map[string]*reservation // a map of reservations + requests map[string]*AllocationAsk // a map of asks + sortedRequests sortedRequests // list of requests pre-sorted + user security.UserGroup // owner of the application + allocatedResource *resources.Resource // total allocated resources maxAllocatedResource *resources.Resource // max allocated resources allocatedPlaceholder *resources.Resource // total allocated placeholder resources allocations map[string]*Allocation // list of all allocations @@ -118,29 +113,6 @@ type Application struct { sync.RWMutex } -func (sa *Application) GetApplicationSummary(rmID string) *ApplicationSummary { - sa.RLock() - defer sa.RUnlock() - state := sa.stateMachine.Current() - resourceUsage := sa.usedResource.Clone() - preemptedUsage := sa.preemptedResource.Clone() - placeHolderUsage := sa.placeholderResource.Clone() - appSummary := &ApplicationSummary{ - ApplicationID: sa.ApplicationID, - SubmissionTime: sa.SubmissionTime, - StartTime: sa.startTime, - FinishTime: sa.finishedTime, - User: sa.user.User, - Queue: sa.queuePath, - State: state, - RmID: rmID, - ResourceUsage: resourceUsage, - PreemptedResource: preemptedUsage, - PlaceholderResource: placeHolderUsage, - } - return appSummary -} - func NewApplication(siApp *si.AddApplicationRequest, ugi security.UserGroup, eventHandler handler.EventHandler, rmID string) *Application { app := &Application{ ApplicationID: siApp.ApplicationID, @@ -150,9 +122,6 @@ func NewApplication(siApp *si.AddApplicationRequest, ugi security.UserGroup, eve tags: siApp.Tags, pending: resources.NewResource(), allocatedResource: resources.NewResource(), - usedResource: resources.NewTrackedResource(), - preemptedResource: resources.NewTrackedResource(), - placeholderResource: resources.NewTrackedResource(), maxAllocatedResource: resources.NewResource(), allocatedPlaceholder: resources.NewResource(), requests: make(map[string]*AllocationAsk), @@ -1655,39 +1624,6 @@ func (sa *Application) decUserResourceUsage(resource *resources.Resource, remove ugm.GetUserManager().DecreaseTrackedResource(sa.queuePath, sa.ApplicationID, resource, sa.user, removeApp) } -// Track used and preempted resources -func (sa *Application) trackCompletedResource(info *Allocation) { - switch { - case info.IsPreempted(): - sa.updatePreemptedResource(info) - case info.IsPlaceholder(): - sa.updatePlaceholderResource(info) - default: - sa.updateUsedResource(info) - } -} - -// When the resource allocated with this allocation is to be removed, -// have the usedResource to aggregate the resource used by this allocation -func (sa *Application) updateUsedResource(info *Allocation) { - sa.usedResource.AggregateTrackedResource(info.GetInstanceType(), - info.GetAllocatedResource(), info.GetBindTime()) -} - -// When the placeholder allocated with this allocation is to be removed, -// have the placeholderResource to aggregate the resource used by this allocation -func (sa *Application) updatePlaceholderResource(info *Allocation) { - sa.placeholderResource.AggregateTrackedResource(info.GetInstanceType(), - info.GetAllocatedResource(), info.GetBindTime()) -} - -// When the resource allocated with this allocation is to be preempted, -// have the preemptedResource to aggregate the resource used by this allocation -func (sa *Application) updatePreemptedResource(info *Allocation) { - sa.preemptedResource.AggregateTrackedResource(info.GetInstanceType(), - info.GetAllocatedResource(), info.GetBindTime()) -} - func (sa *Application) ReplaceAllocation(uuid string) *Allocation { sa.Lock() defer sa.Unlock() @@ -1798,7 +1734,6 @@ func (sa *Application) removeAllocationInternal(uuid string, releaseType si.Term } } delete(sa.allocations, uuid) - sa.trackCompletedResource(alloc) sa.appEvents.sendRemoveAllocationEvent(alloc, releaseType) return alloc } @@ -1832,7 +1767,6 @@ func (sa *Application) RemoveAllAllocations() []*Allocation { for _, alloc := range sa.allocations { allocationsToRelease = append(allocationsToRelease, alloc) sa.appEvents.sendRemoveAllocationEvent(alloc, si.TerminationType_STOPPED_BY_RM) - sa.trackCompletedResource(alloc) } if resources.IsZero(sa.pending) { @@ -2014,29 +1948,6 @@ func (sa *Application) cleanupAsks() { sa.sortedRequests = nil } -func (sa *Application) cleanupTrackedResource() { - sa.usedResource = nil - sa.placeholderResource = nil - sa.preemptedResource = nil -} - -func (sa *Application) CleanupTrackedResource() { - sa.Lock() - defer sa.Unlock() - sa.cleanupTrackedResource() -} - -func (sa *Application) LogAppSummary(rmID string) { - if sa.startTime.IsZero() { - return - } - appSummary := sa.GetApplicationSummary(rmID) - appSummary.DoLogging() - appSummary.ResourceUsage = nil - appSummary.PreemptedResource = nil - appSummary.PlaceholderResource = nil -} - func (sa *Application) HasPlaceholderAllocation() bool { sa.RLock() defer sa.RUnlock() diff --git a/pkg/scheduler/objects/application_test.go b/pkg/scheduler/objects/application_test.go index d52b86cd9..fb00b8cef 100644 --- a/pkg/scheduler/objects/application_test.go +++ b/pkg/scheduler/objects/application_test.go @@ -1074,91 +1074,6 @@ func assertPlaceHolderResource(t *testing.T, appSummary *ApplicationSummary, mem assert.Equal(t, vcoresSecconds, detailedResource["vcores"]) } -func TestResourceUsageAggregation(t *testing.T) { - setupUGM() - - app := newApplication(appID1, "default", "root.a") - - // nothing allocated - if !resources.IsZero(app.GetAllocatedResource()) { - t.Error("new application has allocated resources") - } - // create an allocation and check the assignment - resMap := map[string]string{"memory": "100", "vcores": "10"} - res, err := resources.NewResourceFromConf(resMap) - assert.NilError(t, err, "failed to create resource with error") - alloc := newAllocation(appID1, "uuid-1", nodeID1, res) - alloc.SetInstanceType(instType1) - // Mock the time to be 3 seconds before - alloc.SetBindTime(time.Now().Add(-3 * time.Second)) - app.AddAllocation(alloc) - - if !resources.Equals(app.allocatedResource, res) { - t.Errorf("allocated resources is not updated correctly: %v", app.allocatedResource) - } - allocs := app.GetAllAllocations() - assert.Equal(t, len(allocs), 1) - assert.Assert(t, app.getPlaceholderTimer() == nil, "Placeholder timer should not be initialized as the allocation is not a placeholder") - assertUserGroupResource(t, getTestUserGroup(), resources.Multiply(res, 1)) - - err = app.HandleApplicationEvent(RunApplication) - assert.NilError(t, err, "no error expected new to accepted (completed test)") - - appSummary := app.GetApplicationSummary("default") - appSummary.DoLogging() - assertResourceUsage(t, appSummary, 0, 0) - - // add more allocations to test the removals - alloc = newAllocation(appID1, "uuid-2", nodeID1, res) - alloc.SetInstanceType(instType1) - - // Mock the time to be 3 seconds before - alloc.SetBindTime(time.Now().Add(-3 * time.Second)) - app.AddAllocation(alloc) - assertUserGroupResource(t, getTestUserGroup(), resources.Multiply(res, 2)) - - // remove one of the 2 - if alloc = app.RemoveAllocation("uuid-2", si.TerminationType_UNKNOWN_TERMINATION_TYPE); alloc == nil { - t.Error("returned allocations was nil allocation was not removed") - } - assertUserGroupResource(t, getTestUserGroup(), resources.Multiply(res, 1)) - - appSummary = app.GetApplicationSummary("default") - appSummary.DoLogging() - assertResourceUsage(t, appSummary, 300, 30) - - alloc = newAllocation(appID1, "uuid-3", nodeID1, res) - alloc.SetInstanceType(instType1) - app.AddAllocation(alloc) - allocs = app.GetAllAllocations() - assert.Equal(t, len(allocs), 2) - assertUserGroupResource(t, getTestUserGroup(), resources.Multiply(res, 2)) - - appSummary = app.GetApplicationSummary("default") - appSummary.DoLogging() - assertResourceUsage(t, appSummary, 300, 30) - - // try to remove a non existing alloc - if alloc = app.RemoveAllocation("does-not-exist", si.TerminationType_UNKNOWN_TERMINATION_TYPE); alloc != nil { - t.Errorf("returned allocations was not allocation was incorrectly removed: %v", alloc) - } - - // remove all left over allocations - if allocs = app.RemoveAllAllocations(); allocs == nil || len(allocs) != 2 { - t.Errorf("returned number of allocations was incorrect: %v", allocs) - } - allocs = app.GetAllAllocations() - assert.Equal(t, len(allocs), 0) - assertUserGroupResource(t, getTestUserGroup(), nil) - - err = app.HandleApplicationEvent(CompleteApplication) - assert.NilError(t, err, "no error expected accepted to completing (completed test)") - - appSummary = app.GetApplicationSummary("default") - appSummary.DoLogging() - assertResourceUsage(t, appSummary, 600, 60) -} - func TestRejected(t *testing.T) { terminatedTimeout = time.Millisecond * 100 defer func() { @@ -1377,10 +1292,6 @@ func TestReplaceAllocationTracking(t *testing.T) { app.RemoveAllocation("uuid-3", si.TerminationType_PLACEHOLDER_REPLACED) assert.Equal(t, "uuid-3", alloc.uuid) assert.Equal(t, false, app.HasPlaceholderAllocation()) - - // check placeholder resource usage - appSummary := app.GetApplicationSummary("default") - assertPlaceHolderResource(t, appSummary, 3000, 300) } func TestTimeoutPlaceholderSoftStyle(t *testing.T) { diff --git a/pkg/scheduler/partition.go b/pkg/scheduler/partition.go index 25d7f32bb..1f0ff2634 100644 --- a/pkg/scheduler/partition.go +++ b/pkg/scheduler/partition.go @@ -1483,8 +1483,6 @@ func (pc *PartitionContext) moveTerminatedApp(appID string) { log.Log(log.SchedPartition).Info("Removing terminated application from the application list", zap.String("appID", appID), zap.String("app status", app.CurrentState())) - app.LogAppSummary(pc.RmID) - app.CleanupTrackedResource() pc.Lock() defer pc.Unlock() delete(pc.applications, appID) diff --git a/pkg/scheduler/partition_test.go b/pkg/scheduler/partition_test.go index e62923c45..f1450a1ef 100644 --- a/pkg/scheduler/partition_test.go +++ b/pkg/scheduler/partition_test.go @@ -1880,7 +1880,7 @@ func assertPreemptedResource(t *testing.T, appSummary *objects.ApplicationSummar func TestPreemption(t *testing.T) { setupUGM() - partition, app1, app2, alloc1, alloc2 := setupPreemption(t) + partition, _, app2, alloc1, alloc2 := setupPreemption(t) res, err := resources.NewResourceFromConf(map[string]string{"vcore": "5"}) assert.NilError(t, err, "failed to create resource") @@ -1937,12 +1937,6 @@ func TestPreemption(t *testing.T) { assert.Equal(t, alloc.GetResult(), objects.Allocated, "result should be allocated") assert.Equal(t, alloc.GetAllocationKey(), allocID3, "expected ask alloc-3 to be allocated") assertUserGroupResourceMaxLimits(t, getTestUserGroup(), resources.NewResourceFromMap(map[string]resources.Quantity{"vcore": 10000}), getExpectedQueuesLimitsForPreemption()) - - appSummary := app1.GetApplicationSummary("default") - assertPreemptedResource(t, appSummary, -1, 5000) - - appSummary = app2.GetApplicationSummary("default") - assertPreemptedResource(t, appSummary, -1, 0) } // Preemption followed by a normal allocation diff --git a/pkg/tracking/resource_tracker.go b/pkg/tracking/resource_tracker.go deleted file mode 100644 index be145e10d..000000000 --- a/pkg/tracking/resource_tracker.go +++ /dev/null @@ -1,56 +0,0 @@ -package tracking - -import ( - "github.com/apache/yunikorn-core/pkg/common" - "github.com/apache/yunikorn-core/pkg/common/resources" - "github.com/apache/yunikorn-core/pkg/log" - "go.uber.org/zap" - "strconv" - "strings" - "sync" - "time" -) - -// Util struct to keep track of application resource usage -type TrackedResource struct { - // Two level map for aggregated resource usage - // With instance type being the top level key, the mapped value is a map: - // resource type (CPU, memory etc) -> the aggregated used time (in seconds) of the resource type - // - TrackedResourceMap map[string]map[string]int64 - - sync.RWMutex -} - -// Aggregate the resource usage to UsedResourceMap[instType] -// The time the given resource used is the delta between the resource createTime and currentTime -func (ur *TrackedResource) AggregateTrackedResource(resource *resources.Resource, releaseTime time.Time, message string) { - ur.Lock() - defer ur.Unlock() - // The message is in the format of "instanceType:timestamp" - // Split the message to get the instance type and the timestamp for bind time - // Convert the string to an int64 - unixNano, err := strconv.ParseInt(strings.Split(message, common.Separator)[1], 10, 64) - if err != nil { - log.Log(log.Events).Warn("Failed to parse the timestamp", zap.Error(err), zap.String("message", message)) - return - } - - // Convert Unix timestamp in nanoseconds to a time.Time object - bindTime := time.Unix(0, unixNano) - timeDiff := int64(releaseTime.Sub(bindTime).Seconds()) - instType := strings.Split(message, common.Separator)[0] - aggregatedResourceTime, ok := ur.TrackedResourceMap[instType] - if !ok { - aggregatedResourceTime = map[string]int64{} - } - for key, element := range resource.Resources { - curUsage, ok := aggregatedResourceTime[key] - if !ok { - curUsage = 0 - } - curUsage += int64(element) * timeDiff // resource size times timeDiff - aggregatedResourceTime[key] = curUsage - } - ur.TrackedResourceMap[instType] = aggregatedResourceTime -}