Skip to content

Commit

Permalink
Adding 'UncordonRequired' support for requestor flow
Browse files Browse the repository at this point in the history
Signed-off-by: Ido Heyvi <[email protected]>
  • Loading branch information
heyvister1 committed Jan 19, 2025
1 parent 0ccdd26 commit f5be649
Show file tree
Hide file tree
Showing 9 changed files with 570 additions and 198 deletions.
2 changes: 2 additions & 0 deletions pkg/upgrade/base/consts.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,8 @@ const (
UpgradeStateDone = "upgrade-done"
// UpgradeStateFailed is set when there are any failures during the driver upgrade
UpgradeStateFailed = "upgrade-failed"
// TODO: UpgradeValidationRequestorModeAnnotationKeyFmt
UpgradeValidationRequestorModeAnnotationKeyFmt = "nvidia.com/%s-driver-upgrade-requestor-mode"
)

const (
Expand Down
96 changes: 13 additions & 83 deletions pkg/upgrade/base/upgrade_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,14 +69,12 @@ func NewClusterUpgradeState() ClusterUpgradeState {
type ProcessNodeStateManager interface {
ProcessUpgradeRequiredNodes(ctx context.Context,
currentClusterState *ClusterUpgradeState, upgradePolicy *v1alpha1.DriverUpgradePolicySpec) error
ProcessUncordonRequiredNodes(
ctx context.Context, currentClusterState *ClusterUpgradeState) error
}

// CommonUpgradeStateManager interface is a unified cluster upgrade abstraction for both upgrade modes
//
//nolint:interfacebloat
type CommonUpgradeStateManager interface {
// BuildState builds a point-in-time snapshot of the driver upgrade state in the cluster.
BuildState(ctx context.Context, namespace string, driverLabels map[string]string) (*ClusterUpgradeState, error)
// GetTotalManagedNodes returns the total count of nodes managed for driver upgrades
GetTotalManagedNodes(ctx context.Context, currentState *ClusterUpgradeState) int
// GetUpgradesInProgress returns count of nodes on which upgrade is in progress
Expand Down Expand Up @@ -213,77 +211,9 @@ func (m *CommonUpgradeManagerImpl) GetCurrentUnavailableNodes(ctx context.Contex
return unavailableNodes
}

// BuildState builds a point-in-time snapshot of the driver upgrade state in the cluster.
func (m *CommonUpgradeManagerImpl) BuildState(ctx context.Context, namespace string,
driverLabels map[string]string) (*ClusterUpgradeState, error) {
m.Log.V(consts.LogLevelInfo).Info("Building state")

upgradeState := NewClusterUpgradeState()

daemonSets, err := m.getDriverDaemonSets(ctx, namespace, driverLabels)
if err != nil {
m.Log.V(consts.LogLevelError).Error(err, "Failed to get driver DaemonSet list")
return nil, err
}

m.Log.V(consts.LogLevelDebug).Info("Got driver DaemonSets", "length", len(daemonSets))

// Get list of driver pods
podList := &corev1.PodList{}

err = m.K8sClient.List(ctx, podList,
client.InNamespace(namespace),
client.MatchingLabels(driverLabels),
)

if err != nil {
return nil, err
}

filteredPodList := []corev1.Pod{}
for _, ds := range daemonSets {
dsPods := m.getPodsOwnedbyDs(ds, podList.Items)
if int(ds.Status.DesiredNumberScheduled) != len(dsPods) {
m.Log.V(consts.LogLevelInfo).Info("Driver DaemonSet has Unscheduled pods", "name", ds.Name)
return nil, fmt.Errorf("driver DaemonSet should not have Unscheduled pods")
}
filteredPodList = append(filteredPodList, dsPods...)
}

// Collect also orphaned driver pods
filteredPodList = append(filteredPodList, m.getOrphanedPods(podList.Items)...)

upgradeStateLabel := GetUpgradeStateLabelKey()

for i := range filteredPodList {
pod := &filteredPodList[i]
var ownerDaemonSet *appsv1.DaemonSet
if isOrphanedPod(pod) {
ownerDaemonSet = nil
} else {
ownerDaemonSet = daemonSets[pod.OwnerReferences[0].UID]
}
// Check if pod is already scheduled to a Node
if pod.Spec.NodeName == "" && pod.Status.Phase == corev1.PodPending {
m.Log.V(consts.LogLevelInfo).Info("Driver Pod has no NodeName, skipping", "pod", pod.Name)
continue
}
nodeState, err := m.buildNodeUpgradeState(ctx, pod, ownerDaemonSet)
if err != nil {
m.Log.V(consts.LogLevelError).Error(err, "Failed to build node upgrade state for pod", "pod", pod)
return nil, err
}
nodeStateLabel := nodeState.Node.Labels[upgradeStateLabel]
upgradeState.NodeStates[nodeStateLabel] = append(
upgradeState.NodeStates[nodeStateLabel], nodeState)
}

return &upgradeState, nil
}

// buildNodeUpgradeState creates a mapping between a node,
// BuildNodeUpgradeState creates a mapping between a node,
// the driver POD running on them and the daemon set, controlling this pod
func (m *CommonUpgradeManagerImpl) buildNodeUpgradeState(
func (m *CommonUpgradeManagerImpl) BuildNodeUpgradeState(
ctx context.Context, pod *corev1.Pod, ds *appsv1.DaemonSet) (*NodeUpgradeState, error) {
node, err := m.NodeUpgradeStateProvider.GetNode(ctx, pod.Spec.NodeName)
if err != nil {
Expand All @@ -297,8 +227,8 @@ func (m *CommonUpgradeManagerImpl) buildNodeUpgradeState(
return &NodeUpgradeState{Node: node, DriverPod: pod, DriverDaemonSet: ds}, nil
}

// getDriverDaemonSets retrieves DaemonSets with given labels and returns UID->DaemonSet map
func (m *CommonUpgradeManagerImpl) getDriverDaemonSets(ctx context.Context, namespace string,
// GetDriverDaemonSets retrieves DaemonSets with given labels and returns UID->DaemonSet map
func (m *CommonUpgradeManagerImpl) GetDriverDaemonSets(ctx context.Context, namespace string,
labels map[string]string) (map[types.UID]*appsv1.DaemonSet, error) {
// Get list of driver pods
daemonSetList := &appsv1.DaemonSetList{}
Expand All @@ -319,12 +249,12 @@ func (m *CommonUpgradeManagerImpl) getDriverDaemonSets(ctx context.Context, name
return daemonSetMap, nil
}

// getPodsOwnedbyDs returns a list of the pods owned by the specified DaemonSet
func (m *CommonUpgradeManagerImpl) getPodsOwnedbyDs(ds *appsv1.DaemonSet, pods []corev1.Pod) []corev1.Pod {
// GetPodsOwnedbyDs returns a list of the pods owned by the specified DaemonSet
func (m *CommonUpgradeManagerImpl) GetPodsOwnedbyDs(ds *appsv1.DaemonSet, pods []corev1.Pod) []corev1.Pod {
dsPodList := []corev1.Pod{}
for i := range pods {
pod := &pods[i]
if isOrphanedPod(pod) {
if IsOrphanedPod(pod) {
m.Log.V(consts.LogLevelInfo).Info("Driver Pod has no owner DaemonSet", "pod", pod.Name)
continue
}
Expand All @@ -340,20 +270,20 @@ func (m *CommonUpgradeManagerImpl) getPodsOwnedbyDs(ds *appsv1.DaemonSet, pods [
return dsPodList
}

// getOrphanedPods returns a list of the pods not owned by any DaemonSet
func (m *CommonUpgradeManagerImpl) getOrphanedPods(pods []corev1.Pod) []corev1.Pod {
// GetOrphanedPods returns a list of the pods not owned by any DaemonSet
func (m *CommonUpgradeManagerImpl) GetOrphanedPods(pods []corev1.Pod) []corev1.Pod {
podList := []corev1.Pod{}
for i := range pods {
pod := &pods[i]
if isOrphanedPod(pod) {
if IsOrphanedPod(pod) {
podList = append(podList, *pod)
}
}
m.Log.V(consts.LogLevelInfo).Info("Total orphaned Pods found:", "count", len(podList))
return podList
}

func isOrphanedPod(pod *corev1.Pod) bool {
func IsOrphanedPod(pod *corev1.Pod) bool {
return pod.OwnerReferences == nil || len(pod.OwnerReferences) < 1
}

Expand Down
111 changes: 104 additions & 7 deletions pkg/upgrade/manager/upgrade_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,11 @@ import (
"fmt"

"github.com/go-logr/logr"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/record"
"sigs.k8s.io/controller-runtime/pkg/client"

"github.com/NVIDIA/k8s-operator-libs/api/upgrade/v1alpha1"
"github.com/NVIDIA/k8s-operator-libs/pkg/consts"
Expand All @@ -34,6 +37,9 @@ import (
// ExtendedUpgradeStateManager interface purpose is to decouple ApplyState implementation from base package
// since upgrade pkg is a high level abstaction referencing inplace/requestor (maintenance OP) packages
type ExtendedUpgradeStateManager interface {
// BuildState builds a point-in-time snapshot of the driver upgrade state in the cluster.
BuildState(ctx context.Context, namespace string,
driverLabels map[string]string) (*base.ClusterUpgradeState, error)
// ApplyState receives a complete cluster upgrade state and, based on upgrade policy, processes each node's state.
// Based on the current state of the node, it is calculated if the node can be moved to the next state right now
// or whether any actions need to be scheduled for the node to move to the next state.
Expand Down Expand Up @@ -88,6 +94,75 @@ func NewClusterUpgradeStateManager(
return manager, nil
}

// BuildState builds a point-in-time snapshot of the driver upgrade state in the cluster.
func (m *ClusterUpgradeStateManagerImpl) BuildState(ctx context.Context, namespace string,
driverLabels map[string]string) (*base.ClusterUpgradeState, error) {
m.Log.V(consts.LogLevelInfo).Info("Building state")

upgradeState := base.NewClusterUpgradeState()

daemonSets, err := m.GetDriverDaemonSets(ctx, namespace, driverLabels)
if err != nil {
m.Log.V(consts.LogLevelError).Error(err, "Failed to get driver DaemonSet list")
return nil, err
}

m.Log.V(consts.LogLevelDebug).Info("Got driver DaemonSets", "length", len(daemonSets))

// Get list of driver pods
podList := &corev1.PodList{}

err = m.K8sClient.List(ctx, podList,
client.InNamespace(namespace),
client.MatchingLabels(driverLabels),
)

if err != nil {
return nil, err
}

filteredPodList := []corev1.Pod{}
for _, ds := range daemonSets {
dsPods := m.GetPodsOwnedbyDs(ds, podList.Items)
if int(ds.Status.DesiredNumberScheduled) != len(dsPods) {
m.Log.V(consts.LogLevelInfo).Info("Driver DaemonSet has Unscheduled pods", "name", ds.Name)
return nil, fmt.Errorf("driver DaemonSet should not have Unscheduled pods")
}
filteredPodList = append(filteredPodList, dsPods...)
}

// Collect also orphaned driver pods
filteredPodList = append(filteredPodList, m.GetOrphanedPods(podList.Items)...)

upgradeStateLabel := GetUpgradeStateLabelKey()

for i := range filteredPodList {
pod := &filteredPodList[i]
var ownerDaemonSet *appsv1.DaemonSet
if base.IsOrphanedPod(pod) {
ownerDaemonSet = nil
} else {
ownerDaemonSet = daemonSets[pod.OwnerReferences[0].UID]
}
// Check if pod is already scheduled to a Node
if pod.Spec.NodeName == "" && pod.Status.Phase == corev1.PodPending {
m.Log.V(consts.LogLevelInfo).Info("Driver Pod has no NodeName, skipping", "pod", pod.Name)
continue
}
//TODO: Add existing nodeMaintenance objs to sync with nodeState reference
nodeState, err := m.BuildNodeUpgradeState(ctx, pod, ownerDaemonSet)
if err != nil {
m.Log.V(consts.LogLevelError).Error(err, "Failed to build node upgrade state for pod", "pod", pod)
return nil, err
}
nodeStateLabel := nodeState.Node.Labels[upgradeStateLabel]
upgradeState.NodeStates[nodeStateLabel] = append(
upgradeState.NodeStates[nodeStateLabel], nodeState)
}

return &upgradeState, nil
}

// ApplyState receives a complete cluster upgrade state and, based on upgrade policy, processes each node's state.
// Based on the current state of the node, it is calculated if the node can be moved to the next state right now
// or whether any actions need to be scheduled for the node to move to the next state.
Expand Down Expand Up @@ -115,6 +190,7 @@ func (m *ClusterUpgradeStateManagerImpl) ApplyState(ctx context.Context,
base.UpgradeStatePodDeletionRequired, len(currentState.NodeStates[base.UpgradeStatePodDeletionRequired]),
base.UpgradeStateFailed, len(currentState.NodeStates[base.UpgradeStateFailed]),
base.UpgradeStateDrainRequired, len(currentState.NodeStates[base.UpgradeStateDrainRequired]),
base.UpgradeStateNodeMaintenanceRequired, len(currentState.NodeStates[base.UpgradeStateNodeMaintenanceRequired]),
base.UpgradeStatePostMaintenanceRequired, len(currentState.NodeStates[base.UpgradeStatePostMaintenanceRequired]),
base.UpgradeStatePodRestartRequired, len(currentState.NodeStates[base.UpgradeStatePodRestartRequired]),
base.UpgradeStateValidationRequired, len(currentState.NodeStates[base.UpgradeStateValidationRequired]),
Expand All @@ -137,12 +213,7 @@ func (m *ClusterUpgradeStateManagerImpl) ApplyState(ctx context.Context,
return err
}
// Start upgrade process for upgradesAvailable number of nodes
if requestor.UseMaintenanceOperator {
err = m.requestor.ProcessUpgradeRequiredNodes(ctx, currentState, upgradePolicy)
} else {
err = m.inplace.ProcessUpgradeRequiredNodes(ctx, currentState, upgradePolicy)
}

err = m.ProcessUpgradeRequiredNodesWrapper(ctx, currentState, upgradePolicy)
if err != nil {
m.Log.V(consts.LogLevelError).Error(
err, "Failed to process nodes", "state", base.UpgradeStateUpgradeRequired)
Expand Down Expand Up @@ -198,7 +269,8 @@ func (m *ClusterUpgradeStateManagerImpl) ApplyState(ctx context.Context,
m.Log.V(consts.LogLevelError).Error(err, "Failed to validate driver upgrade")
return err
}
err = m.ProcessUncordonRequiredNodes(ctx, currentState)

err = m.ProcessUncordonRequiredNodesWrapper(ctx, currentState)
if err != nil {
m.Log.V(consts.LogLevelError).Error(err, "Failed to uncordon nodes")
return err
Expand All @@ -210,3 +282,28 @@ func (m *ClusterUpgradeStateManagerImpl) ApplyState(ctx context.Context,
func (m *ClusterUpgradeStateManagerImpl) GetRequestor() base.ProcessNodeStateManager {
return m.requestor
}

func (m *ClusterUpgradeStateManagerImpl) ProcessUpgradeRequiredNodesWrapper(ctx context.Context,
currentState *base.ClusterUpgradeState, upgradePolicy *v1alpha1.DriverUpgradePolicySpec) error {
var err error
// Start upgrade process for upgradesAvailable number of nodes
if requestor.UseMaintenanceOperator {
err = m.requestor.ProcessUpgradeRequiredNodes(ctx, currentState, upgradePolicy)
} else {
err = m.inplace.ProcessUpgradeRequiredNodes(ctx, currentState, upgradePolicy)
}
return err
}

func (m *ClusterUpgradeStateManagerImpl) ProcessUncordonRequiredNodesWrapper(ctx context.Context,
currentState *base.ClusterUpgradeState) error {
var err error
if requestor.UseMaintenanceOperator {
err = m.requestor.ProcessUncordonRequiredNodes(ctx, currentState)
} else {
err = m.inplace.ProcessUncordonRequiredNodes(ctx, currentState)
}

m.Log.V(consts.LogLevelError).Error(err, "Failed to uncordon nodes")
return err
}
Loading

0 comments on commit f5be649

Please sign in to comment.