Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
Signed-off-by: Kevin Klues <[email protected]>
  • Loading branch information
klueska committed Jan 11, 2025
1 parent 2af5968 commit fb3e6cb
Show file tree
Hide file tree
Showing 2 changed files with 86 additions and 12 deletions.
95 changes: 83 additions & 12 deletions cmd/nvidia-dra-controller/mnenv.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,13 @@ import (
)

const (
resourceClaimFinalizer = "gpu.nvidia.com/finalizer.multiNodeEnvironment"
imexDeviceClass = "imex.nvidia.com"
multiNodeEnvironmentFinalizer = "gpu.nvidia.com/finalizer.multiNodeEnvironment"
imexDeviceClass = "imex.nvidia.com"

MultiNodeEnvironmentAddEvent = "onMultiNodeEnvironmentAddEvent"
MultiNodeEnvironmentDeleteEvent = "onMultiNodeEnvironmentDeleteEvent"
ResourceClaimAddEvent = "ResourceClaimAddEvent"
DeviceClassAddEvent = "DeviceClassAddEvent"
)

type WorkItem struct {
Expand All @@ -58,24 +59,30 @@ type MultiNodeEnvironmentManager struct {

multiNodeEnvironmentLister nvlisters.MultiNodeEnvironmentLister
resourceClaimLister resourcelisters.ResourceClaimLister
deviceClassLister resourcelisters.DeviceClassLister
}

// StartManager starts a MultiNodeEnvironmentManager.
func StartMultiNodeEnvironmentManager(ctx context.Context, config *Config) (*MultiNodeEnvironmentManager, error) {
queue := workqueue.New(workqueue.DefaultControllerRateLimiter())

mneInformerFactory := nvinformers.NewSharedInformerFactory(config.clientsets.Nvidia, 30*time.Second)
mneInformer := mneInformerFactory.Gpu().V1alpha1().MultiNodeEnvironments().Informer()
nvInformerFactory := nvinformers.NewSharedInformerFactory(config.clientsets.Nvidia, 30*time.Second)
coreInformerFactory := informers.NewSharedInformerFactory(config.clientsets.Core, 30*time.Second)

mneInformer := nvInformerFactory.Gpu().V1alpha1().MultiNodeEnvironments().Informer()
mneLister := nvlisters.NewMultiNodeEnvironmentLister(mneInformer.GetIndexer())

rcInformerFactory := informers.NewSharedInformerFactory(config.clientsets.Core, 30*time.Second)
rcInformer := rcInformerFactory.Resource().V1beta1().ResourceClaims().Informer()
rcInformer := coreInformerFactory.Resource().V1beta1().ResourceClaims().Informer()
rcLister := resourcelisters.NewResourceClaimLister(rcInformer.GetIndexer())

dcInformer := coreInformerFactory.Resource().V1beta1().DeviceClasses().Informer()
dcLister := resourcelisters.NewDeviceClassLister(dcInformer.GetIndexer())

m := &MultiNodeEnvironmentManager{
clientsets: config.clientsets,
multiNodeEnvironmentLister: mneLister,
resourceClaimLister: rcLister,
deviceClassLister: dcLister,
}

var err error
Expand All @@ -94,24 +101,31 @@ func StartMultiNodeEnvironmentManager(ctx context.Context, config *Config) (*Mul
return nil, fmt.Errorf("error adding event handlers for MultiNodeEnvironment informer: %w", err)
}

_, err = dcInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj any) { queue.Enqueue(obj, m.onDeviceClassAdd) },
})
if err != nil {
return nil, fmt.Errorf("error adding event handlers for MultiNodeEnvironment informer: %w", err)
}

m.waitGroup.Add(3)
go func() {
defer m.waitGroup.Done()
rcInformerFactory.Start(ctx.Done())
nvInformerFactory.Start(ctx.Done())
}()
go func() {
defer m.waitGroup.Done()
mneInformerFactory.Start(ctx.Done())
coreInformerFactory.Start(ctx.Done())
}()
go func() {
defer m.waitGroup.Done()
queue.Run(ctx.Done())
}()

if !cache.WaitForCacheSync(ctx.Done(), mneInformer.HasSynced, rcInformer.HasSynced) {
if !cache.WaitForCacheSync(ctx.Done(), mneInformer.HasSynced, rcInformer.HasSynced, dcInformer.HasSynced) {
klog.Warning("Cache sync failed; retrying in 5 seconds")
time.Sleep(5 * time.Second)
if !cache.WaitForCacheSync(ctx.Done(), mneInformer.HasSynced, rcInformer.HasSynced) {
if !cache.WaitForCacheSync(ctx.Done(), mneInformer.HasSynced, rcInformer.HasSynced, dcInformer.HasSynced) {
return nil, fmt.Errorf("informer cache sync failed twice")
}
}
Expand Down Expand Up @@ -164,7 +178,7 @@ func (m *MultiNodeEnvironmentManager) onMultiNodeEnvironmentAdd(obj any) error {
Name: mne.Spec.ResourceClaimName,
Namespace: mne.Namespace,
OwnerReferences: []metav1.OwnerReference{ownerReference},
Finalizers: []string{resourceClaimFinalizer},
Finalizers: []string{multiNodeEnvironmentFinalizer},
},
Spec: resourceapi.ResourceClaimSpec{
Devices: resourceapi.DeviceClaim{
Expand Down Expand Up @@ -229,6 +243,37 @@ func (m *MultiNodeEnvironmentManager) onResourceClaimAdd(obj any) error {
return nil
}

func (m *MultiNodeEnvironmentManager) onDeviceClassAdd(obj any) error {
dc, ok := obj.(*resourceapi.DeviceClass)
if !ok {
return fmt.Errorf("failed to cast to DeviceClass")
}

klog.Infof("Processing added DeviceClass: %s/%s", dc.Namespace, dc.Name)

if len(dc.OwnerReferences) != 1 {
return nil
}

if dc.OwnerReferences[0].Kind != nvapi.MultiNodeEnvironmentKind {
return nil
}

_, err := m.multiNodeEnvironmentLister.MultiNodeEnvironments(dc.Namespace).Get(dc.OwnerReferences[0].Name)
if err == nil {
return nil
}
if !errors.IsNotFound(err) {
return fmt.Errorf("error retrieving DeviceClass's OwnerReference '%s': %w", dc.OwnerReferences[0].Name, err)
}

if err := m.removeDeviceClassFinalizer(dc.Name); err != nil {
return fmt.Errorf("error removing finalizer on DeviceClass '%s': %w", dc.Name, err)
}

return nil
}

func (m *MultiNodeEnvironmentManager) removeResourceClaimFinalizer(namespace, name string) error {
rc, err := m.resourceClaimLister.ResourceClaims(namespace).Get(name)
if err != nil && errors.IsNotFound(err) {
Expand All @@ -242,7 +287,7 @@ func (m *MultiNodeEnvironmentManager) removeResourceClaimFinalizer(namespace, na

newRC.Finalizers = []string{}
for _, f := range rc.Finalizers {
if f != resourceClaimFinalizer {
if f != multiNodeEnvironmentFinalizer {
newRC.Finalizers = append(newRC.Finalizers, f)
}
}
Expand All @@ -254,3 +299,29 @@ func (m *MultiNodeEnvironmentManager) removeResourceClaimFinalizer(namespace, na

return nil
}

func (m *MultiNodeEnvironmentManager) removeDeviceClassFinalizer(name string) error {
dc, err := m.deviceClassLister.Get(name)
if err != nil && errors.IsNotFound(err) {
return fmt.Errorf("DeviceClass not found")
}
if err != nil {
return fmt.Errorf("error retrieving DeviceClass: %w", err)
}

newDC := dc.DeepCopy()

newDC.Finalizers = []string{}
for _, f := range dc.Finalizers {
if f != multiNodeEnvironmentFinalizer {
newDC.Finalizers = append(newDC.Finalizers, f)
}
}

_, err = m.clientsets.Core.ResourceV1beta1().DeviceClasses().Update(context.Background(), newDC, metav1.UpdateOptions{})
if err != nil {
return fmt.Errorf("failed to update DeviceClass: %w", err)
}

return nil
}
3 changes: 3 additions & 0 deletions deployments/helm/k8s-dra-driver/templates/clusterrole.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@ rules:
- apiGroups: ["resource.k8s.io"]
resources: ["resourceclaims"]
verbs: ["get", "list", "watch", "create", "update"]
- apiGroups: ["resource.k8s.io"]
resources: ["deviceclasses"]
verbs: ["get", "list", "watch", "create", "update"]
- apiGroups: ["resource.k8s.io"]
resources: ["resourceclaims/status"]
verbs: ["update"]
Expand Down

0 comments on commit fb3e6cb

Please sign in to comment.