diff --git a/pkg/scheduler/partition.go b/pkg/scheduler/partition.go index 8bf2786e4..f4bde125d 100644 --- a/pkg/scheduler/partition.go +++ b/pkg/scheduler/partition.go @@ -685,6 +685,9 @@ func (pc *PartitionContext) removeNodeAllocations(node *objects.Node) ([]*object zap.String("nodeID", node.NodeID)) continue } + // Processing a removal while in the Completing state could race with the state change. + // Retrieve the queue early before a possible race. + queue := app.GetQueue() // check for an inflight replacement. if alloc.GetReleaseCount() != 0 { release := alloc.GetFirstRelease() @@ -706,7 +709,7 @@ func (pc *PartitionContext) removeNodeAllocations(node *objects.Node) ([]*object // The reverse case is handled during allocation. if delta.HasNegativeValue() { // this looks incorrect but the delta is negative and the result will be a real decrease - err := app.GetQueue().IncAllocatedResource(delta, false) + err := queue.IncAllocatedResource(delta, false) // this should not happen as we really decrease the value if err != nil { log.Log(log.SchedPartition).Warn("unexpected failure during queue update: replacing placeholder", @@ -761,16 +764,16 @@ func (pc *PartitionContext) removeNodeAllocations(node *objects.Node) ([]*object zap.String("nodeID", node.NodeID)) continue } - if err := app.GetQueue().DecAllocatedResource(alloc.GetAllocatedResource()); err != nil { + if err := queue.DecAllocatedResource(alloc.GetAllocatedResource()); err != nil { log.Log(log.SchedPartition).Warn("failed to release resources from queue", zap.String("appID", alloc.GetApplicationID()), zap.Error(err)) } else { - metrics.GetQueueMetrics(app.GetQueuePath()).IncReleasedContainer() + metrics.GetQueueMetrics(queue.GetQueuePath()).IncReleasedContainer() } // remove preempted resources if alloc.IsPreempted() { - app.GetQueue().DecPreemptingResource(alloc.GetAllocatedResource()) + queue.DecPreemptingResource(alloc.GetAllocatedResource()) } if alloc.IsPlaceholder() { pc.decPhAllocationCount(1) @@ -1251,6 +1254,12 @@ func (pc *PartitionContext) removeAllocation(release *si.AllocationRelease) ([]* zap.Stringer("terminationType", release.TerminationType)) return nil, nil } + // Processing a removal while in the Completing state could race with the state change. + // The race occurs between removing the allocation and updating the queue after node processing. + // If the state change removes the queue link before we get to updating the queue after the node we + // leave the resources as allocated on the queue. The queue cannot be removed yet at this point as + // there are still allocations left. So retrieve the queue early to sidestep the race. + queue := app.GetQueue() // temp store for allocations manipulated released := make([]*objects.Allocation, 0) var confirmed *objects.Allocation @@ -1295,7 +1304,7 @@ func (pc *PartitionContext) removeAllocation(release *si.AllocationRelease) ([]* for _, alloc := range released { node := pc.GetNode(alloc.GetNodeID()) if node == nil { - log.Log(log.SchedPartition).Info("node not found while releasing allocation", + log.Log(log.SchedPartition).Warn("node not found while releasing allocation", zap.String("appID", appID), zap.String("allocationId", alloc.GetUUID()), zap.String("nodeID", alloc.GetNodeID())) @@ -1342,7 +1351,6 @@ func (pc *PartitionContext) removeAllocation(release *si.AllocationRelease) ([]* } } if resources.StrictlyGreaterThanZero(total) { - queue := app.GetQueue() if err := queue.DecAllocatedResource(total); err != nil { log.Log(log.SchedPartition).Warn("failed to release resources from queue", zap.String("appID", appID), @@ -1353,7 +1361,7 @@ func (pc *PartitionContext) removeAllocation(release *si.AllocationRelease) ([]* } } if resources.StrictlyGreaterThanZero(totalPreempting) { - app.GetQueue().DecPreemptingResource(totalPreempting) + queue.DecPreemptingResource(totalPreempting) } // if confirmed is set we can assume there will just be one alloc in the released