Skip to content

Commit

Permalink
[YUNIKORN-1993] race condition in allocation removal (apache#662)
Browse files Browse the repository at this point in the history
Race between allocation removal and the timed transition to Completed
state change in the partition code.
If the removal of an allocation is in progress when the state transition
causes the queue removal (entry of the Completed state) the queue will
not be updated and the allocation will leak.

No test case as the timing required is not reproducible in unit or e2e
tests.

Closes: apache#662

Signed-off-by: Wilfred Spiegelenburg <[email protected]>
  • Loading branch information
wilfred-s committed Sep 22, 2023
1 parent 418714d commit 57544a6
Showing 1 changed file with 15 additions and 7 deletions.
22 changes: 15 additions & 7 deletions pkg/scheduler/partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -685,6 +685,9 @@ func (pc *PartitionContext) removeNodeAllocations(node *objects.Node) ([]*object
zap.String("nodeID", node.NodeID))
continue
}
// Processing a removal while in the Completing state could race with the state change.
// Retrieve the queue early before a possible race.
queue := app.GetQueue()
// check for an inflight replacement.
if alloc.GetReleaseCount() != 0 {
release := alloc.GetFirstRelease()
Expand All @@ -706,7 +709,7 @@ func (pc *PartitionContext) removeNodeAllocations(node *objects.Node) ([]*object
// The reverse case is handled during allocation.
if delta.HasNegativeValue() {
// this looks incorrect but the delta is negative and the result will be a real decrease
err := app.GetQueue().IncAllocatedResource(delta, false)
err := queue.IncAllocatedResource(delta, false)
// this should not happen as we really decrease the value
if err != nil {
log.Log(log.SchedPartition).Warn("unexpected failure during queue update: replacing placeholder",
Expand Down Expand Up @@ -761,16 +764,16 @@ func (pc *PartitionContext) removeNodeAllocations(node *objects.Node) ([]*object
zap.String("nodeID", node.NodeID))
continue
}
if err := app.GetQueue().DecAllocatedResource(alloc.GetAllocatedResource()); err != nil {
if err := queue.DecAllocatedResource(alloc.GetAllocatedResource()); err != nil {
log.Log(log.SchedPartition).Warn("failed to release resources from queue",
zap.String("appID", alloc.GetApplicationID()),
zap.Error(err))
} else {
metrics.GetQueueMetrics(app.GetQueuePath()).IncReleasedContainer()
metrics.GetQueueMetrics(queue.GetQueuePath()).IncReleasedContainer()
}
// remove preempted resources
if alloc.IsPreempted() {
app.GetQueue().DecPreemptingResource(alloc.GetAllocatedResource())
queue.DecPreemptingResource(alloc.GetAllocatedResource())
}
if alloc.IsPlaceholder() {
pc.decPhAllocationCount(1)
Expand Down Expand Up @@ -1251,6 +1254,12 @@ func (pc *PartitionContext) removeAllocation(release *si.AllocationRelease) ([]*
zap.Stringer("terminationType", release.TerminationType))
return nil, nil
}
// Processing a removal while in the Completing state could race with the state change.
// The race occurs between removing the allocation and updating the queue after node processing.
// If the state change removes the queue link before we get to updating the queue after the node we
// leave the resources as allocated on the queue. The queue cannot be removed yet at this point as
// there are still allocations left. So retrieve the queue early to sidestep the race.
queue := app.GetQueue()
// temp store for allocations manipulated
released := make([]*objects.Allocation, 0)
var confirmed *objects.Allocation
Expand Down Expand Up @@ -1295,7 +1304,7 @@ func (pc *PartitionContext) removeAllocation(release *si.AllocationRelease) ([]*
for _, alloc := range released {
node := pc.GetNode(alloc.GetNodeID())
if node == nil {
log.Log(log.SchedPartition).Info("node not found while releasing allocation",
log.Log(log.SchedPartition).Warn("node not found while releasing allocation",
zap.String("appID", appID),
zap.String("allocationId", alloc.GetUUID()),
zap.String("nodeID", alloc.GetNodeID()))
Expand Down Expand Up @@ -1342,7 +1351,6 @@ func (pc *PartitionContext) removeAllocation(release *si.AllocationRelease) ([]*
}
}
if resources.StrictlyGreaterThanZero(total) {
queue := app.GetQueue()
if err := queue.DecAllocatedResource(total); err != nil {
log.Log(log.SchedPartition).Warn("failed to release resources from queue",
zap.String("appID", appID),
Expand All @@ -1353,7 +1361,7 @@ func (pc *PartitionContext) removeAllocation(release *si.AllocationRelease) ([]*
}
}
if resources.StrictlyGreaterThanZero(totalPreempting) {
app.GetQueue().DecPreemptingResource(totalPreempting)
queue.DecPreemptingResource(totalPreempting)
}

// if confirmed is set we can assume there will just be one alloc in the released
Expand Down

0 comments on commit 57544a6

Please sign in to comment.