Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

YUNIKORN-2196 Optimize aggregated resources tracking feature to not add overhead to scheduling #739

Draft
wants to merge 5 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions pkg/common/resources/tracked_resources.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,14 +84,13 @@ func (tr *TrackedResource) Clone() *TrackedResource {
// AggregateTrackedResource aggregates resource usage to TrackedResourceMap[instType].
// The time the given resource used is the delta between the resource createTime and currentTime.
func (tr *TrackedResource) AggregateTrackedResource(instType string,
resource *Resource, bindTime time.Time) {
resource *Resource, bindTime time.Time, releaseTime time.Time) {
if resource == nil {
return
}
tr.Lock()
defer tr.Unlock()

releaseTime := time.Now()
timeDiff := int64(releaseTime.Sub(bindTime).Seconds())
aggregatedResourceTime, ok := tr.TrackedResourceMap[instType]
if !ok {
Expand Down
2 changes: 1 addition & 1 deletion pkg/common/resources/tracked_resources_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ func TestTrackedResourceAggregateTrackedResource(t *testing.T) {
for _, tt := range tests {
t.Run(tt.caseName, func(t *testing.T) {
original := NewTrackedResourceFromMap(tt.input.trackedResource)
original.AggregateTrackedResource(tt.input.instType, &Resource{tt.input.otherResource}, tt.input.bindTime)
original.AggregateTrackedResource(tt.input.instType, &Resource{tt.input.otherResource}, tt.input.bindTime, time.Now())
expected := NewTrackedResourceFromMap(tt.expectedTrackedResource)

if !reflect.DeepEqual(original.TrackedResourceMap, expected.TrackedResourceMap) {
Expand Down
48 changes: 47 additions & 1 deletion pkg/events/event_publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,12 @@
package events

import (
"fmt"
"github.com/apache/yunikorn-core/pkg/common"
"github.com/apache/yunikorn-core/pkg/common/resources"
"github.com/apache/yunikorn-scheduler-interface/lib/go/si"
"strconv"
"strings"
"sync/atomic"
"time"

Expand All @@ -29,18 +35,20 @@ import (
)

// stores the push event internal
var defaultPushEventInterval = 2 * time.Second
var defaultPushEventInterval = 1 * time.Second

type EventPublisher struct {
store *EventStore
pushEventInterval time.Duration
stop atomic.Bool
trackingAppMap map[string]*resources.TrackedResource // storing eventChannel
}

func CreateShimPublisher(store *EventStore) *EventPublisher {
publisher := &EventPublisher{
store: store,
pushEventInterval: defaultPushEventInterval,
trackingAppMap: make(map[string]*resources.TrackedResource),
}
publisher.stop.Store(false)
return publisher
Expand All @@ -58,6 +66,7 @@ func (sp *EventPublisher) StartService() {
log.Log(log.Events).Debug("Sending eventChannel", zap.Int("number of messages", len(messages)))
eventPlugin.SendEvent(messages)
}
sp.AggregateAppTrackedResourceFromEvents(messages)
}
time.Sleep(sp.pushEventInterval)
}
Expand All @@ -68,6 +77,43 @@ func (sp *EventPublisher) Stop() {
sp.stop.Store(true)
}

func (sp *EventPublisher) AggregateAppTrackedResourceFromEvents(messages []*si.EventRecord) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is very complicated for something which isn't even a problem in my opinion.

I'm a strong -1 for optimizing tracking this way.

for _, message := range messages {
if message.Type == si.EventRecord_APP && message.EventChangeType == si.EventRecord_REMOVE {
log.Log(log.Events).Debug("aggregate resource usage", zap.String("message", fmt.Sprintf("%+v", message)))
// We need to clean up the trackingAppMap when an application is removed
if message.ReferenceID == "" {
log.Log(log.Events).Info("YK_APP_SUMMARY:",
zap.String("appID", message.ObjectID),
zap.Any("resourceUsage", sp.trackingAppMap[message.ObjectID].TrackedResourceMap),
)
// This is an application removal event, remove the application from the trackingAppMap
delete(sp.trackingAppMap, message.ObjectID)
} else {
// This is an allocation removal event, aggregate the resources used by the allocation
if _, ok := sp.trackingAppMap[message.ObjectID]; !ok {
sp.trackingAppMap[message.ObjectID] = &resources.TrackedResource{
TrackedResourceMap: make(map[string]map[string]int64),
}
}

// The message is in the format of "instanceType:timestamp"
// Split the message to get the instance type and the timestamp for bind time
// Convert the string to an int64
unixNano, err := strconv.ParseInt(strings.Split(message.Message, common.Separator)[1], 10, 64)
if err != nil {
log.Log(log.Events).Warn("Failed to parse the timestamp", zap.Error(err), zap.String("message", message.Message))
return
}
instType := strings.Split(message.Message, common.Separator)[0]
// Convert Unix timestamp in nanoseconds to a time.Time object
bindTime := time.Unix(0, unixNano)
sp.trackingAppMap[message.ObjectID].AggregateTrackedResource(instType, resources.NewResourceFromProto(message.Resource), bindTime, time.Unix(0, message.TimestampNano))
}
}
}
}

func (sp *EventPublisher) getEventStore() *EventStore {
return sp.store
}
110 changes: 8 additions & 102 deletions pkg/scheduler/objects/application.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,19 +78,14 @@ type Application struct {
tags map[string]string // application tags used in scheduling

// Private mutable fields need protection
queuePath string
queue *Queue // queue the application is running in
pending *resources.Resource // pending resources from asks for the app
reservations map[string]*reservation // a map of reservations
requests map[string]*AllocationAsk // a map of asks
sortedRequests sortedRequests // list of requests pre-sorted
user security.UserGroup // owner of the application
allocatedResource *resources.Resource // total allocated resources

usedResource *resources.TrackedResource // keep track of resource usage of the application
preemptedResource *resources.TrackedResource // keep track of preempted resource usage of the application
placeholderResource *resources.TrackedResource // keep track of placeholder resource usage of the application

queuePath string
queue *Queue // queue the application is running in
pending *resources.Resource // pending resources from asks for the app
reservations map[string]*reservation // a map of reservations
requests map[string]*AllocationAsk // a map of asks
sortedRequests sortedRequests // list of requests pre-sorted
user security.UserGroup // owner of the application
allocatedResource *resources.Resource // total allocated resources
maxAllocatedResource *resources.Resource // max allocated resources
allocatedPlaceholder *resources.Resource // total allocated placeholder resources
allocations map[string]*Allocation // list of all allocations
Expand Down Expand Up @@ -118,29 +113,6 @@ type Application struct {
sync.RWMutex
}

func (sa *Application) GetApplicationSummary(rmID string) *ApplicationSummary {
sa.RLock()
defer sa.RUnlock()
state := sa.stateMachine.Current()
resourceUsage := sa.usedResource.Clone()
preemptedUsage := sa.preemptedResource.Clone()
placeHolderUsage := sa.placeholderResource.Clone()
appSummary := &ApplicationSummary{
ApplicationID: sa.ApplicationID,
SubmissionTime: sa.SubmissionTime,
StartTime: sa.startTime,
FinishTime: sa.finishedTime,
User: sa.user.User,
Queue: sa.queuePath,
State: state,
RmID: rmID,
ResourceUsage: resourceUsage,
PreemptedResource: preemptedUsage,
PlaceholderResource: placeHolderUsage,
}
return appSummary
}

func NewApplication(siApp *si.AddApplicationRequest, ugi security.UserGroup, eventHandler handler.EventHandler, rmID string) *Application {
app := &Application{
ApplicationID: siApp.ApplicationID,
Expand All @@ -150,9 +122,6 @@ func NewApplication(siApp *si.AddApplicationRequest, ugi security.UserGroup, eve
tags: siApp.Tags,
pending: resources.NewResource(),
allocatedResource: resources.NewResource(),
usedResource: resources.NewTrackedResource(),
preemptedResource: resources.NewTrackedResource(),
placeholderResource: resources.NewTrackedResource(),
maxAllocatedResource: resources.NewResource(),
allocatedPlaceholder: resources.NewResource(),
requests: make(map[string]*AllocationAsk),
Expand Down Expand Up @@ -1655,39 +1624,6 @@ func (sa *Application) decUserResourceUsage(resource *resources.Resource, remove
ugm.GetUserManager().DecreaseTrackedResource(sa.queuePath, sa.ApplicationID, resource, sa.user, removeApp)
}

// Track used and preempted resources
func (sa *Application) trackCompletedResource(info *Allocation) {
switch {
case info.IsPreempted():
sa.updatePreemptedResource(info)
case info.IsPlaceholder():
sa.updatePlaceholderResource(info)
default:
sa.updateUsedResource(info)
}
}

// When the resource allocated with this allocation is to be removed,
// have the usedResource to aggregate the resource used by this allocation
func (sa *Application) updateUsedResource(info *Allocation) {
sa.usedResource.AggregateTrackedResource(info.GetInstanceType(),
info.GetAllocatedResource(), info.GetBindTime())
}

// When the placeholder allocated with this allocation is to be removed,
// have the placeholderResource to aggregate the resource used by this allocation
func (sa *Application) updatePlaceholderResource(info *Allocation) {
sa.placeholderResource.AggregateTrackedResource(info.GetInstanceType(),
info.GetAllocatedResource(), info.GetBindTime())
}

// When the resource allocated with this allocation is to be preempted,
// have the preemptedResource to aggregate the resource used by this allocation
func (sa *Application) updatePreemptedResource(info *Allocation) {
sa.preemptedResource.AggregateTrackedResource(info.GetInstanceType(),
info.GetAllocatedResource(), info.GetBindTime())
}

func (sa *Application) ReplaceAllocation(uuid string) *Allocation {
sa.Lock()
defer sa.Unlock()
Expand Down Expand Up @@ -1776,16 +1712,11 @@ func (sa *Application) removeAllocationInternal(uuid string, releaseType si.Term
eventWarning = "Application state not changed while removing a placeholder allocation"
}
}
// Aggregate the resources used by this alloc to the application's resource tracker
sa.trackCompletedResource(alloc)

sa.decUserResourceUsage(alloc.GetAllocatedResource(), removeApp)
} else {
sa.allocatedResource = resources.Sub(sa.allocatedResource, alloc.GetAllocatedResource())

// Aggregate the resources used by this alloc to the application's resource tracker
sa.trackCompletedResource(alloc)

// When the resource trackers are zero we should not expect anything to come in later.
if sa.hasZeroAllocations() {
removeApp = true
Expand Down Expand Up @@ -1835,8 +1766,6 @@ func (sa *Application) RemoveAllAllocations() []*Allocation {
allocationsToRelease := make([]*Allocation, 0)
for _, alloc := range sa.allocations {
allocationsToRelease = append(allocationsToRelease, alloc)
// Aggregate the resources used by this alloc to the application's user resource tracker
sa.trackCompletedResource(alloc)
sa.appEvents.sendRemoveAllocationEvent(alloc, si.TerminationType_STOPPED_BY_RM)
}

Expand Down Expand Up @@ -2019,29 +1948,6 @@ func (sa *Application) cleanupAsks() {
sa.sortedRequests = nil
}

func (sa *Application) cleanupTrackedResource() {
sa.usedResource = nil
sa.placeholderResource = nil
sa.preemptedResource = nil
}

func (sa *Application) CleanupTrackedResource() {
sa.Lock()
defer sa.Unlock()
sa.cleanupTrackedResource()
}

func (sa *Application) LogAppSummary(rmID string) {
if sa.startTime.IsZero() {
return
}
appSummary := sa.GetApplicationSummary(rmID)
appSummary.DoLogging()
appSummary.ResourceUsage = nil
appSummary.PreemptedResource = nil
appSummary.PlaceholderResource = nil
}

func (sa *Application) HasPlaceholderAllocation() bool {
sa.RLock()
defer sa.RUnlock()
Expand Down
4 changes: 3 additions & 1 deletion pkg/scheduler/objects/application_events.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package objects

import (
"fmt"
"strconv"
"time"

"golang.org/x/time/rate"
Expand Down Expand Up @@ -89,7 +90,8 @@ func (evt *applicationEvents) sendRemoveAllocationEvent(alloc *Allocation, termi
eventChangeDetail = si.EventRecord_ALLOC_REPLACED
}

event := events.CreateAppEventRecord(evt.app.ApplicationID, common.Empty, alloc.GetUUID(), si.EventRecord_REMOVE, eventChangeDetail, alloc.GetAllocatedResource())
// add bind time and instanceType to the event in the message field
event := events.CreateAppEventRecord(evt.app.ApplicationID, alloc.GetInstanceType()+common.Separator+strconv.FormatInt(alloc.bindTime.UnixNano(), 10), alloc.GetUUID(), si.EventRecord_REMOVE, eventChangeDetail, alloc.GetAllocatedResource())
evt.eventSystem.AddEvent(event)
}

Expand Down
89 changes: 0 additions & 89 deletions pkg/scheduler/objects/application_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1074,91 +1074,6 @@ func assertPlaceHolderResource(t *testing.T, appSummary *ApplicationSummary, mem
assert.Equal(t, vcoresSecconds, detailedResource["vcores"])
}

func TestResourceUsageAggregation(t *testing.T) {
setupUGM()

app := newApplication(appID1, "default", "root.a")

// nothing allocated
if !resources.IsZero(app.GetAllocatedResource()) {
t.Error("new application has allocated resources")
}
// create an allocation and check the assignment
resMap := map[string]string{"memory": "100", "vcores": "10"}
res, err := resources.NewResourceFromConf(resMap)
assert.NilError(t, err, "failed to create resource with error")
alloc := newAllocation(appID1, "uuid-1", nodeID1, res)
alloc.SetInstanceType(instType1)
// Mock the time to be 3 seconds before
alloc.SetBindTime(time.Now().Add(-3 * time.Second))
app.AddAllocation(alloc)

if !resources.Equals(app.allocatedResource, res) {
t.Errorf("allocated resources is not updated correctly: %v", app.allocatedResource)
}
allocs := app.GetAllAllocations()
assert.Equal(t, len(allocs), 1)
assert.Assert(t, app.getPlaceholderTimer() == nil, "Placeholder timer should not be initialized as the allocation is not a placeholder")
assertUserGroupResource(t, getTestUserGroup(), resources.Multiply(res, 1))

err = app.HandleApplicationEvent(RunApplication)
assert.NilError(t, err, "no error expected new to accepted (completed test)")

appSummary := app.GetApplicationSummary("default")
appSummary.DoLogging()
assertResourceUsage(t, appSummary, 0, 0)

// add more allocations to test the removals
alloc = newAllocation(appID1, "uuid-2", nodeID1, res)
alloc.SetInstanceType(instType1)

// Mock the time to be 3 seconds before
alloc.SetBindTime(time.Now().Add(-3 * time.Second))
app.AddAllocation(alloc)
assertUserGroupResource(t, getTestUserGroup(), resources.Multiply(res, 2))

// remove one of the 2
if alloc = app.RemoveAllocation("uuid-2", si.TerminationType_UNKNOWN_TERMINATION_TYPE); alloc == nil {
t.Error("returned allocations was nil allocation was not removed")
}
assertUserGroupResource(t, getTestUserGroup(), resources.Multiply(res, 1))

appSummary = app.GetApplicationSummary("default")
appSummary.DoLogging()
assertResourceUsage(t, appSummary, 300, 30)

alloc = newAllocation(appID1, "uuid-3", nodeID1, res)
alloc.SetInstanceType(instType1)
app.AddAllocation(alloc)
allocs = app.GetAllAllocations()
assert.Equal(t, len(allocs), 2)
assertUserGroupResource(t, getTestUserGroup(), resources.Multiply(res, 2))

appSummary = app.GetApplicationSummary("default")
appSummary.DoLogging()
assertResourceUsage(t, appSummary, 300, 30)

// try to remove a non existing alloc
if alloc = app.RemoveAllocation("does-not-exist", si.TerminationType_UNKNOWN_TERMINATION_TYPE); alloc != nil {
t.Errorf("returned allocations was not allocation was incorrectly removed: %v", alloc)
}

// remove all left over allocations
if allocs = app.RemoveAllAllocations(); allocs == nil || len(allocs) != 2 {
t.Errorf("returned number of allocations was incorrect: %v", allocs)
}
allocs = app.GetAllAllocations()
assert.Equal(t, len(allocs), 0)
assertUserGroupResource(t, getTestUserGroup(), nil)

err = app.HandleApplicationEvent(CompleteApplication)
assert.NilError(t, err, "no error expected accepted to completing (completed test)")

appSummary = app.GetApplicationSummary("default")
appSummary.DoLogging()
assertResourceUsage(t, appSummary, 600, 60)
}

func TestRejected(t *testing.T) {
terminatedTimeout = time.Millisecond * 100
defer func() {
Expand Down Expand Up @@ -1377,10 +1292,6 @@ func TestReplaceAllocationTracking(t *testing.T) {
app.RemoveAllocation("uuid-3", si.TerminationType_PLACEHOLDER_REPLACED)
assert.Equal(t, "uuid-3", alloc.uuid)
assert.Equal(t, false, app.HasPlaceholderAllocation())

// check placeholder resource usage
appSummary := app.GetApplicationSummary("default")
assertPlaceHolderResource(t, appSummary, 3000, 300)
}

func TestTimeoutPlaceholderSoftStyle(t *testing.T) {
Expand Down
Loading
Loading