Skip to content

Commit

Permalink
additional changes for consistent handling of tracking
Browse files Browse the repository at this point in the history
remove inconsistent tracking update on timeout
extended testing and checks in placeholder timeout test cases
  • Loading branch information
wilfred-s committed Oct 19, 2024
1 parent 566d53c commit fc3c487
Show file tree
Hide file tree
Showing 2 changed files with 60 additions and 30 deletions.
9 changes: 3 additions & 6 deletions pkg/scheduler/objects/application.go
Original file line number Diff line number Diff line change
Expand Up @@ -402,7 +402,7 @@ func (sa *Application) clearPlaceholderTimer() {
}

// timeoutPlaceholderProcessing cleans up all placeholder asks and allocations that are not used after the timeout.
// If the application has started processing, Starting state or further, the application keeps on processing without
// If the application has started processing, Running state or further, the application keeps on processing without
// being able to use the placeholders.
// If the application is in New or Accepted state we clean up and take followup action based on the gang scheduling
// style.
Expand All @@ -422,17 +422,14 @@ func (sa *Application) timeoutPlaceholderProcessing() {
}
alloc.SetReleased(true)
toRelease = append(toRelease, alloc)
// mark as timeout out in the tracking data
if _, ok := sa.placeholderData[alloc.GetTaskGroup()]; ok {
sa.placeholderData[alloc.GetTaskGroup()].TimedOut++
}
}
log.Log(log.SchedApplication).Info("Placeholder timeout, releasing placeholders",
zap.String("AppID", sa.ApplicationID),
zap.Int("placeholders being replaced", replacing),
zap.Int("releasing placeholders", len(toRelease)))
// trigger the release of the placeholders: accounting updates when the release is done
sa.notifyRMAllocationReleased(toRelease, si.TerminationType_TIMEOUT, "releasing allocated placeholders on placeholder timeout")
// Case 2: in every other case fail the application, and notify the context about the expired placeholder asks
// Case 2: in every other case progress the application, and notify the context about the expired placeholder asks
default:
log.Log(log.SchedApplication).Info("Placeholder timeout, releasing asks and placeholders",
zap.String("AppID", sa.ApplicationID),
Expand Down
81 changes: 57 additions & 24 deletions pkg/scheduler/objects/application_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1513,11 +1513,11 @@ func TestReplaceAllocationTracking(t *testing.T) {
assertPlaceHolderResource(t, appSummary, 3000, 300)
}

func TestTimeoutPlaceholderSoftStyle(t *testing.T) {
func TestTimeoutPlaceholderSoft(t *testing.T) {
runTimeoutPlaceholderTest(t, Resuming.String(), Soft, []int{1, 2})
}

func TestTimeoutPlaceholderAllocAsk(t *testing.T) {
func TestTimeoutPlaceholderHard(t *testing.T) {
runTimeoutPlaceholderTest(t, Failing.String(), Hard, []int{1, 2})
}

Expand Down Expand Up @@ -1545,28 +1545,33 @@ func runTimeoutPlaceholderTest(t *testing.T, expectedState string, gangSchedulin
phAsk := newAllocationAskTG("ask-1", appID1, tg1, res)
err = app.AddAllocationAsk(phAsk)
assert.NilError(t, err, "Application ask should have been added")
assert.Assert(t, app.IsAccepted(), "Application should be in accepted state")

// check PlaceHolderData
assertPlaceholderData(t, app, tg1, 1, 0, 0, res)
assert.Assert(t, app.IsAccepted(), "Application should be in accepted state")

// add the placeholder to the app
ph := newPlaceholderAlloc(appID1, nodeID1, res, "tg")
tg2 := "tg-2"
ph := newPlaceholderAlloc(appID1, nodeID1, res, tg2)
app.AddAllocation(ph)
app.addPlaceholderDataWithLocking(ph)
assertPlaceholderData(t, app, tg2, 1, 0, 0, res)
assertUserGroupResource(t, getTestUserGroup(), res)
assert.Assert(t, app.getPlaceholderTimer() != nil, "Placeholder timer should be initiated after the first placeholder allocation")
// add a second one to check the filter
ph = newPlaceholderAlloc(appID1, nodeID1, res, "tg")
ph = newPlaceholderAlloc(appID1, nodeID1, res, tg2)
app.AddAllocation(ph)
app.addPlaceholderDataWithLocking(ph)
assertPlaceholderData(t, app, tg2, 2, 0, 0, res)
assertUserGroupResource(t, getTestUserGroup(), resources.Multiply(res, 2))
assert.Assert(t, app.IsAccepted(), "Application should be in accepted state")
err = common.WaitForCondition(10*time.Millisecond, 1*time.Second, func() bool {
app.RLock()
defer app.RUnlock()
return app.placeholderTimer == nil
})
// When the app is in an accepted state, timeout should equal to count
assertPlaceholderData(t, app, tg1, 1, 1, 0, res)
assert.NilError(t, err, "Placeholder timeout cleanup did not trigger unexpectedly")
// When the app was in the accepted state, timeout should equal to count
assertPlaceholderData(t, app, tg1, 1, 1, 0, res)
assertPlaceholderData(t, app, tg2, 2, 2, 0, res)
assert.Equal(t, app.stateMachine.Current(), expectedState, "Application did not progress into expected state")
assertUserGroupResource(t, getTestUserGroup(), resources.Multiply(res, 2))
events := testHandler.GetEvents()
Expand All @@ -1579,10 +1584,8 @@ func runTimeoutPlaceholderTest(t *testing.T, expectedState string, gangSchedulin
idx++
}
}
// Because the Count of PlaceHolderData is only added in AddAllocationAsk, so it stays 1
assertPlaceholderData(t, app, tg1, 1, 1, 0, res)

assert.Equal(t, found, 2, "release allocation or ask event not found in list")

// asks are completely cleaned up
assert.Assert(t, resources.IsZero(app.GetPendingResource()), "pending placeholder resources should be zero")
// a released placeholder still holds the resource until release confirmed by the RM
Expand Down Expand Up @@ -1610,19 +1613,19 @@ func TestTimeoutPlaceholderAllocReleased(t *testing.T) {
res, err := resources.NewResourceFromConf(resMap)
assert.NilError(t, err, "Unexpected error when creating resource from map")
// add the placeholders to the app: one released, one still available.
ph := newPlaceholderAlloc(appID1, nodeID1, res, "tg")
ph.SetReleased(true)
app.AddAllocation(ph)
// add PlaceholderData
app.addPlaceholderDataWithLocking(ph)
assertPlaceholderData(t, app, "tg", 1, 0, 0, res)
tg := "tg-1"
phReleased := newPlaceholderAlloc(appID1, nodeID1, res, tg)
phReleased.SetReleased(true)
app.AddAllocation(phReleased)
app.addPlaceholderDataWithLocking(phReleased)
assertPlaceholderData(t, app, tg, 1, 0, 0, res)
assertUserGroupResource(t, getTestUserGroup(), resources.Multiply(res, 1))

assert.Assert(t, app.getPlaceholderTimer() != nil, "Placeholder timer should be initiated after the first placeholder allocation")
ph = newPlaceholderAlloc(appID1, nodeID1, res, "tg")
ph := newPlaceholderAlloc(appID1, nodeID1, res, tg)
app.AddAllocation(ph)
app.addPlaceholderDataWithLocking(ph)
assertPlaceholderData(t, app, "tg", 2, 0, 0, res)
assertPlaceholderData(t, app, tg, 2, 0, 0, res)
assertUserGroupResource(t, getTestUserGroup(), resources.Multiply(res, 2))

alloc := newAllocation(appID1, nodeID1, res)
Expand All @@ -1648,8 +1651,24 @@ func TestTimeoutPlaceholderAllocReleased(t *testing.T) {
assert.Assert(t, resources.Equals(app.GetAllocatedResource(), res), "Unexpected allocated resources for the app")
// a released placeholder still holds the resource until release confirmed by the RM
assert.Assert(t, resources.Equals(app.GetPlaceholderResource(), resources.Multiply(res, 2)), "Unexpected placeholder resources for the app")
assertPlaceholderData(t, app, "tg", 2, 1, 0, res)
// tracking data not updated until confirmed by the RM
assertPlaceholderData(t, app, tg, 2, 0, 0, res)
assertUserGroupResource(t, getTestUserGroup(), resources.Multiply(res, 3))
// do what the RM does and respond to the release
removed := app.RemoveAllocation(ph.allocationKey, si.TerminationType_TIMEOUT)
assert.Assert(t, removed != nil, "expected allocation got nil")
assert.Equal(t, ph.allocationKey, removed.allocationKey, "expected placeholder to be returned")
assertPlaceholderData(t, app, tg, 2, 1, 0, res)
assert.Assert(t, resources.Equals(app.GetPlaceholderResource(), res), "placeholder resources still accounted for on the app")
assertUserGroupResource(t, getTestUserGroup(), resources.Multiply(res, 2))

// process the replacement no real alloc linked account for that
removed = app.ReplaceAllocation(phReleased.allocationKey)
assert.Assert(t, removed != nil, "expected allocation got nil")
assert.Equal(t, phReleased.allocationKey, removed.allocationKey, "expected placeholder to be returned")
assertPlaceholderData(t, app, tg, 2, 1, 1, res)
assert.Assert(t, resources.IsZero(app.GetPlaceholderResource()), "placeholder resources still accounted for on the app")
assertUserGroupResource(t, getTestUserGroup(), res)
}

func TestTimeoutPlaceholderCompleting(t *testing.T) {
Expand All @@ -1664,9 +1683,12 @@ func TestTimeoutPlaceholderCompleting(t *testing.T) {
res, err := resources.NewResourceFromConf(resMap)
assert.NilError(t, err, "Unexpected error when creating resource from map")
// add the placeholder to the app
ph := newPlaceholderAlloc(appID1, nodeID1, res, "tg")
tg := "tg-1"
ph := newPlaceholderAlloc(appID1, nodeID1, res, tg)
app.AddAllocation(ph)
assert.Assert(t, app.getPlaceholderTimer() != nil, "Placeholder timer should be initiated after the first placeholder allocation")
app.addPlaceholderDataWithLocking(ph)
assertPlaceholderData(t, app, tg, 1, 0, 0, res)
assertUserGroupResource(t, getTestUserGroup(), resources.Multiply(res, 1))
// add a real allocation as well
alloc := newAllocation(appID1, nodeID1, res)
Expand All @@ -1692,8 +1714,15 @@ func TestTimeoutPlaceholderCompleting(t *testing.T) {
}
}
assert.Assert(t, found, "release allocation event not found in list")
assert.Assert(t, app.IsCompleting(), "App should still be in completing state")
assert.Assert(t, app.IsCompleting(), "App should be in completing state")
assertUserGroupResource(t, getTestUserGroup(), resources.Multiply(res, 1))
// tracking data not updated until confirmed by the RM
assertPlaceholderData(t, app, tg, 1, 0, 0, res)
// do what the RM does and respond to the release
removed := app.RemoveAllocation(ph.allocationKey, si.TerminationType_TIMEOUT)
assert.Assert(t, removed != nil, "expected allocation got nil")
assert.Equal(t, ph.allocationKey, removed.allocationKey, "expected placeholder to be returned")
assertPlaceholderData(t, app, tg, 1, 1, 0, res)
}

func TestAppTimersAfterAppRemoval(t *testing.T) {
Expand All @@ -1708,9 +1737,12 @@ func TestAppTimersAfterAppRemoval(t *testing.T) {
res, err := resources.NewResourceFromConf(resMap)
assert.NilError(t, err, "Unexpected error when creating resource from map")
// add the placeholder to the app
ph := newPlaceholderAlloc(appID1, nodeID1, res, "tg")
tg := "tg-1"
ph := newPlaceholderAlloc(appID1, nodeID1, res, tg)
app.AddAllocation(ph)
assert.Assert(t, app.getPlaceholderTimer() != nil, "Placeholder timer should be initiated after the first placeholder allocation")
app.addPlaceholderDataWithLocking(ph)
assertPlaceholderData(t, app, tg, 1, 0, 0, res)
assertUserGroupResource(t, getTestUserGroup(), resources.Multiply(res, 1))
// add a real allocation as well
alloc := newAllocation(appID1, nodeID1, res)
Expand All @@ -1727,6 +1759,7 @@ func TestAppTimersAfterAppRemoval(t *testing.T) {
if app.stateTimer != nil {
t.Fatalf("State timer has not be cleared after app removal as expected, %v", app.stateTimer)
}
assertPlaceholderData(t, app, tg, 1, 1, 0, res)
}

func TestIncAndDecUserResourceUsage(t *testing.T) {
Expand Down

0 comments on commit fc3c487

Please sign in to comment.