Skip to content

Commit

Permalink
cache assigned pod count (#708)
Browse files Browse the repository at this point in the history
Signed-off-by: KunWuLuan <[email protected]>
  • Loading branch information
KunWuLuan authored Jan 13, 2025
1 parent f633dd2 commit 1e1b6b0
Show file tree
Hide file tree
Showing 5 changed files with 242 additions and 41 deletions.
117 changes: 91 additions & 26 deletions pkg/coscheduling/core/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,10 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
informerv1 "k8s.io/client-go/informers/core/v1"
listerv1 "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/scheduler/framework"
"sigs.k8s.io/controller-runtime/pkg/client"
Expand Down Expand Up @@ -64,10 +66,11 @@ func (s *PermitState) Clone() framework.StateData {
type Manager interface {
PreFilter(context.Context, *corev1.Pod) error
Permit(context.Context, *framework.CycleState, *corev1.Pod) Status
Unreserve(context.Context, *corev1.Pod)
GetPodGroup(context.Context, *corev1.Pod) (string, *v1alpha1.PodGroup)
GetAssignedPodCount(string) int
GetCreationTimestamp(context.Context, *corev1.Pod, time.Time) time.Time
DeletePermittedPodGroup(context.Context, string)
CalculateAssignedPods(context.Context, string, string) int
ActivateSiblings(ctx context.Context, pod *corev1.Pod, state *framework.CycleState)
BackoffPodGroup(string, time.Duration)
}
Expand All @@ -87,9 +90,34 @@ type PodGroupManager struct {
backedOffPG *gocache.Cache
// podLister is pod lister
podLister listerv1.PodLister
// assignedPodsByPG stores the pods assumed or bound for podgroups
assignedPodsByPG map[string]sets.Set[string]
sync.RWMutex
}

func AddPodFactory(pgMgr *PodGroupManager) func(obj interface{}) {
return func(obj interface{}) {
p, ok := obj.(*corev1.Pod)
if !ok {
return
}
if p.Spec.NodeName == "" {
return
}
pgFullName, _ := pgMgr.GetPodGroup(context.Background(), p)
if pgFullName == "" {
return
}
pgMgr.RWMutex.Lock()
defer pgMgr.RWMutex.Unlock()
if assigned, exist := pgMgr.assignedPodsByPG[pgFullName]; exist {
assigned.Insert(p.Name)
} else {
pgMgr.assignedPodsByPG[pgFullName] = sets.New(p.Name)
}
}
}

// NewPodGroupManager creates a new operation object.
func NewPodGroupManager(client client.Client, snapshotSharedLister framework.SharedLister, scheduleTimeout *time.Duration, podInformer informerv1.PodInformer) *PodGroupManager {
pgMgr := &PodGroupManager{
Expand All @@ -99,10 +127,43 @@ func NewPodGroupManager(client client.Client, snapshotSharedLister framework.Sha
podLister: podInformer.Lister(),
permittedPG: gocache.New(3*time.Second, 3*time.Second),
backedOffPG: gocache.New(10*time.Second, 10*time.Second),
assignedPodsByPG: map[string]sets.Set[string]{},
}
podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: AddPodFactory(pgMgr),
DeleteFunc: func(obj interface{}) {
switch t := obj.(type) {
case *corev1.Pod:
pod := t
if pod.Spec.NodeName == "" {
return
}
pgMgr.Unreserve(context.Background(), pod)
return
case cache.DeletedFinalStateUnknown:
pod, ok := t.Obj.(*corev1.Pod)
if !ok {
return
}
if pod.Spec.NodeName == "" {
return
}
pgMgr.Unreserve(context.Background(), pod)
return
default:
return
}
},
})
return pgMgr
}

func (pgMgr *PodGroupManager) GetAssignedPodCount(pgName string) int {
pgMgr.RWMutex.RLock()
defer pgMgr.RWMutex.RUnlock()
return len(pgMgr.assignedPodsByPG[pgName])
}

func (pgMgr *PodGroupManager) BackoffPodGroup(pgName string, backoff time.Duration) {
if backoff == time.Duration(0) {
return
Expand Down Expand Up @@ -222,16 +283,23 @@ func (pgMgr *PodGroupManager) Permit(ctx context.Context, state *framework.Cycle
return PodGroupNotFound
}

assigned := pgMgr.CalculateAssignedPods(ctx, pg.Name, pg.Namespace)
pgMgr.RWMutex.RLock()
defer pgMgr.RWMutex.RUnlock()
assigned, exist := pgMgr.assignedPodsByPG[pgFullName]
if !exist {
assigned = sets.Set[string]{}
pgMgr.assignedPodsByPG[pgFullName] = assigned
}
assigned.Insert(pod.Name)
// The number of pods that have been assigned nodes is calculated from the snapshot.
// The current pod in not included in the snapshot during the current scheduling cycle.
if int32(assigned)+1 >= pg.Spec.MinMember {
if len(assigned) >= int(pg.Spec.MinMember) {
return Success
}

if assigned == 0 {
if len(assigned) == 1 {
// Given we've reached Permit(), it's mean all PreFilter checks (minMember & minResource)
// already pass through, so if assigned == 0, it could be due to:
// already pass through, so if len(assigned) == 1, it could be due to:
// - minResource get satisfied
// - new pods added
// In either case, we should and only should use this 0-th pod to trigger activating
Expand All @@ -244,6 +312,24 @@ func (pgMgr *PodGroupManager) Permit(ctx context.Context, state *framework.Cycle
return Wait
}

// Unreserve invalidates assigned pod from assignedPodsByPG when schedule or bind failed.
func (pgMgr *PodGroupManager) Unreserve(ctx context.Context, pod *corev1.Pod) {
pgFullName, _ := pgMgr.GetPodGroup(ctx, pod)
if pgFullName == "" {
return
}

pgMgr.RWMutex.Lock()
defer pgMgr.RWMutex.Unlock()
assigned, exist := pgMgr.assignedPodsByPG[pgFullName]
if exist {
assigned.Delete(pod.Name)
if len(assigned) == 0 {
delete(pgMgr.assignedPodsByPG, pgFullName)
}
}
}

// GetCreationTimestamp returns the creation time of a podGroup or a pod.
func (pgMgr *PodGroupManager) GetCreationTimestamp(ctx context.Context, pod *corev1.Pod, ts time.Time) time.Time {
pgName := util.GetPodGroupLabel(pod)
Expand Down Expand Up @@ -275,27 +361,6 @@ func (pgMgr *PodGroupManager) GetPodGroup(ctx context.Context, pod *corev1.Pod)
return fmt.Sprintf("%v/%v", pod.Namespace, pgName), &pg
}

// CalculateAssignedPods returns the number of pods that has been assigned nodes: assumed or bound.
func (pgMgr *PodGroupManager) CalculateAssignedPods(ctx context.Context, podGroupName, namespace string) int {
lh := klog.FromContext(ctx)
nodeInfos, err := pgMgr.snapshotSharedLister.NodeInfos().List()
if err != nil {
lh.Error(err, "Cannot get nodeInfos from frameworkHandle")
return 0
}
var count int
for _, nodeInfo := range nodeInfos {
for _, podInfo := range nodeInfo.Pods {
pod := podInfo.Pod
if util.GetPodGroupLabel(pod) == podGroupName && pod.Namespace == namespace && pod.Spec.NodeName != "" {
count++
}
}
}

return count
}

// CheckClusterResource checks if resource capacity of the cluster can satisfy <resourceRequest>.
// It returns an error detailing the resource gap if not satisfied; otherwise returns nil.
func CheckClusterResource(ctx context.Context, nodeList []*framework.NodeInfo, resourceRequest corev1.ResourceList, desiredPodGroupName string) error {
Expand Down
12 changes: 6 additions & 6 deletions pkg/coscheduling/core/core_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/client-go/informers"
clientsetfake "k8s.io/client-go/kubernetes/fake"
clicache "k8s.io/client-go/tools/cache"
Expand Down Expand Up @@ -170,6 +171,7 @@ func TestPreFilter(t *testing.T) {
scheduleTimeout: &scheduleTimeout,
permittedPG: newCache(),
backedOffPG: newCache(),
assignedPodsByPG: make(map[string]sets.Set[string]),
}

informerFactory.Start(ctx.Done())
Expand Down Expand Up @@ -264,19 +266,17 @@ func TestPermit(t *testing.T) {
informerFactory := informers.NewSharedInformerFactory(cs, 0)
podInformer := informerFactory.Core().V1().Pods()

pgMgr := &PodGroupManager{
client: client,
snapshotSharedLister: tu.NewFakeSharedLister(tt.existingPods, nodes),
podLister: podInformer.Lister(),
scheduleTimeout: &scheduleTimeout,
}
pgMgr := NewPodGroupManager(client, tu.NewFakeSharedLister(tt.existingPods, nodes), &scheduleTimeout, podInformer)

informerFactory.Start(ctx.Done())
if !clicache.WaitForCacheSync(ctx.Done(), podInformer.Informer().HasSynced) {
t.Fatal("WaitForCacheSync failed")
}
addFunc := AddPodFactory(pgMgr)
for _, p := range tt.existingPods {
podInformer.Informer().GetStore().Add(p)
// we call add func here because we can not ensure existing pods are added before premit are called
addFunc(p)
}

if got := pgMgr.Permit(ctx, &framework.CycleState{}, tt.pod); got != tt.want {
Expand Down
3 changes: 2 additions & 1 deletion pkg/coscheduling/coscheduling.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ func (cs *Coscheduling) PostFilter(ctx context.Context, state *framework.CycleSt

// This indicates there are already enough Pods satisfying the PodGroup,
// so don't bother to reject the whole PodGroup.
assigned := cs.pgMgr.CalculateAssignedPods(ctx, pg.Name, pod.Namespace)
assigned := cs.pgMgr.GetAssignedPodCount(pgName)
if assigned >= int(pg.Spec.MinMember) {
lh.V(4).Info("Assigned pods", "podGroup", klog.KObj(pg), "assigned", assigned)
return &framework.PostFilterResult{}, framework.NewStatus(framework.Unschedulable)
Expand Down Expand Up @@ -256,6 +256,7 @@ func (cs *Coscheduling) Unreserve(ctx context.Context, state *framework.CycleSta
if pg == nil {
return
}
cs.pgMgr.Unreserve(ctx, pod)
cs.frameworkHandler.IterateOverWaitingPods(func(waitingPod framework.WaitingPod) {
if waitingPod.GetPod().Namespace == pod.Namespace && util.GetPodGroupLabel(waitingPod.GetPod()) == pg.Name {
lh.V(3).Info("Unreserve rejects", "pod", klog.KObj(waitingPod.GetPod()), "podGroup", klog.KObj(pg))
Expand Down
19 changes: 11 additions & 8 deletions pkg/coscheduling/coscheduling_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -622,24 +622,27 @@ func TestPostFilter(t *testing.T) {
cs := clientsetfake.NewSimpleClientset()
informerFactory := informers.NewSharedInformerFactory(cs, 0)
podInformer := informerFactory.Core().V1().Pods()

pgMgr := core.NewPodGroupManager(
client,
tu.NewFakeSharedLister(tt.existingPods, nodes),
&scheduleTimeout,
podInformer,
)
pl := &Coscheduling{
frameworkHandler: f,
pgMgr: core.NewPodGroupManager(
client,
tu.NewFakeSharedLister(tt.existingPods, nodes),
&scheduleTimeout,
podInformer,
),
scheduleTimeout: &scheduleTimeout,
pgMgr: pgMgr,
scheduleTimeout: &scheduleTimeout,
}

informerFactory.Start(ctx.Done())
if !clicache.WaitForCacheSync(ctx.Done(), podInformer.Informer().HasSynced) {
t.Fatal("WaitForCacheSync failed")
}
addFunc := core.AddPodFactory(pgMgr)
for _, p := range tt.existingPods {
podInformer.Informer().GetStore().Add(p)
// we call add func here because we can not ensure existing pods are added before premit are called
addFunc(p)
}

_, got := pl.PostFilter(ctx, framework.NewCycleState(), tt.pod, nodeStatusMap)
Expand Down
Loading

0 comments on commit 1e1b6b0

Please sign in to comment.