diff --git a/pkg/upgrade/base/consts.go b/pkg/upgrade/base/consts.go index 7406826..867b490 100644 --- a/pkg/upgrade/base/consts.go +++ b/pkg/upgrade/base/consts.go @@ -73,6 +73,8 @@ const ( UpgradeStateDone = "upgrade-done" // UpgradeStateFailed is set when there are any failures during the driver upgrade UpgradeStateFailed = "upgrade-failed" + // TODO: UpgradeValidationRequestorModeAnnotationKeyFmt + UpgradeValidationRequestorModeAnnotationKeyFmt = "nvidia.com/%s-driver-upgrade-requestor-mode" ) const ( diff --git a/pkg/upgrade/base/upgrade_common.go b/pkg/upgrade/base/upgrade_common.go index c0693f5..0e5de3b 100644 --- a/pkg/upgrade/base/upgrade_common.go +++ b/pkg/upgrade/base/upgrade_common.go @@ -69,14 +69,12 @@ func NewClusterUpgradeState() ClusterUpgradeState { type ProcessNodeStateManager interface { ProcessUpgradeRequiredNodes(ctx context.Context, currentClusterState *ClusterUpgradeState, upgradePolicy *v1alpha1.DriverUpgradePolicySpec) error + ProcessUncordonRequiredNodes( + ctx context.Context, currentClusterState *ClusterUpgradeState) error } // CommonUpgradeStateManager interface is a unified cluster upgrade abstraction for both upgrade modes -// -//nolint:interfacebloat type CommonUpgradeStateManager interface { - // BuildState builds a point-in-time snapshot of the driver upgrade state in the cluster. - BuildState(ctx context.Context, namespace string, driverLabels map[string]string) (*ClusterUpgradeState, error) // GetTotalManagedNodes returns the total count of nodes managed for driver upgrades GetTotalManagedNodes(ctx context.Context, currentState *ClusterUpgradeState) int // GetUpgradesInProgress returns count of nodes on which upgrade is in progress @@ -213,77 +211,9 @@ func (m *CommonUpgradeManagerImpl) GetCurrentUnavailableNodes(ctx context.Contex return unavailableNodes } -// BuildState builds a point-in-time snapshot of the driver upgrade state in the cluster. -func (m *CommonUpgradeManagerImpl) BuildState(ctx context.Context, namespace string, - driverLabels map[string]string) (*ClusterUpgradeState, error) { - m.Log.V(consts.LogLevelInfo).Info("Building state") - - upgradeState := NewClusterUpgradeState() - - daemonSets, err := m.getDriverDaemonSets(ctx, namespace, driverLabels) - if err != nil { - m.Log.V(consts.LogLevelError).Error(err, "Failed to get driver DaemonSet list") - return nil, err - } - - m.Log.V(consts.LogLevelDebug).Info("Got driver DaemonSets", "length", len(daemonSets)) - - // Get list of driver pods - podList := &corev1.PodList{} - - err = m.K8sClient.List(ctx, podList, - client.InNamespace(namespace), - client.MatchingLabels(driverLabels), - ) - - if err != nil { - return nil, err - } - - filteredPodList := []corev1.Pod{} - for _, ds := range daemonSets { - dsPods := m.getPodsOwnedbyDs(ds, podList.Items) - if int(ds.Status.DesiredNumberScheduled) != len(dsPods) { - m.Log.V(consts.LogLevelInfo).Info("Driver DaemonSet has Unscheduled pods", "name", ds.Name) - return nil, fmt.Errorf("driver DaemonSet should not have Unscheduled pods") - } - filteredPodList = append(filteredPodList, dsPods...) - } - - // Collect also orphaned driver pods - filteredPodList = append(filteredPodList, m.getOrphanedPods(podList.Items)...) - - upgradeStateLabel := GetUpgradeStateLabelKey() - - for i := range filteredPodList { - pod := &filteredPodList[i] - var ownerDaemonSet *appsv1.DaemonSet - if isOrphanedPod(pod) { - ownerDaemonSet = nil - } else { - ownerDaemonSet = daemonSets[pod.OwnerReferences[0].UID] - } - // Check if pod is already scheduled to a Node - if pod.Spec.NodeName == "" && pod.Status.Phase == corev1.PodPending { - m.Log.V(consts.LogLevelInfo).Info("Driver Pod has no NodeName, skipping", "pod", pod.Name) - continue - } - nodeState, err := m.buildNodeUpgradeState(ctx, pod, ownerDaemonSet) - if err != nil { - m.Log.V(consts.LogLevelError).Error(err, "Failed to build node upgrade state for pod", "pod", pod) - return nil, err - } - nodeStateLabel := nodeState.Node.Labels[upgradeStateLabel] - upgradeState.NodeStates[nodeStateLabel] = append( - upgradeState.NodeStates[nodeStateLabel], nodeState) - } - - return &upgradeState, nil -} - -// buildNodeUpgradeState creates a mapping between a node, +// BuildNodeUpgradeState creates a mapping between a node, // the driver POD running on them and the daemon set, controlling this pod -func (m *CommonUpgradeManagerImpl) buildNodeUpgradeState( +func (m *CommonUpgradeManagerImpl) BuildNodeUpgradeState( ctx context.Context, pod *corev1.Pod, ds *appsv1.DaemonSet) (*NodeUpgradeState, error) { node, err := m.NodeUpgradeStateProvider.GetNode(ctx, pod.Spec.NodeName) if err != nil { @@ -297,8 +227,8 @@ func (m *CommonUpgradeManagerImpl) buildNodeUpgradeState( return &NodeUpgradeState{Node: node, DriverPod: pod, DriverDaemonSet: ds}, nil } -// getDriverDaemonSets retrieves DaemonSets with given labels and returns UID->DaemonSet map -func (m *CommonUpgradeManagerImpl) getDriverDaemonSets(ctx context.Context, namespace string, +// GetDriverDaemonSets retrieves DaemonSets with given labels and returns UID->DaemonSet map +func (m *CommonUpgradeManagerImpl) GetDriverDaemonSets(ctx context.Context, namespace string, labels map[string]string) (map[types.UID]*appsv1.DaemonSet, error) { // Get list of driver pods daemonSetList := &appsv1.DaemonSetList{} @@ -319,12 +249,12 @@ func (m *CommonUpgradeManagerImpl) getDriverDaemonSets(ctx context.Context, name return daemonSetMap, nil } -// getPodsOwnedbyDs returns a list of the pods owned by the specified DaemonSet -func (m *CommonUpgradeManagerImpl) getPodsOwnedbyDs(ds *appsv1.DaemonSet, pods []corev1.Pod) []corev1.Pod { +// GetPodsOwnedbyDs returns a list of the pods owned by the specified DaemonSet +func (m *CommonUpgradeManagerImpl) GetPodsOwnedbyDs(ds *appsv1.DaemonSet, pods []corev1.Pod) []corev1.Pod { dsPodList := []corev1.Pod{} for i := range pods { pod := &pods[i] - if isOrphanedPod(pod) { + if IsOrphanedPod(pod) { m.Log.V(consts.LogLevelInfo).Info("Driver Pod has no owner DaemonSet", "pod", pod.Name) continue } @@ -340,12 +270,12 @@ func (m *CommonUpgradeManagerImpl) getPodsOwnedbyDs(ds *appsv1.DaemonSet, pods [ return dsPodList } -// getOrphanedPods returns a list of the pods not owned by any DaemonSet -func (m *CommonUpgradeManagerImpl) getOrphanedPods(pods []corev1.Pod) []corev1.Pod { +// GetOrphanedPods returns a list of the pods not owned by any DaemonSet +func (m *CommonUpgradeManagerImpl) GetOrphanedPods(pods []corev1.Pod) []corev1.Pod { podList := []corev1.Pod{} for i := range pods { pod := &pods[i] - if isOrphanedPod(pod) { + if IsOrphanedPod(pod) { podList = append(podList, *pod) } } @@ -353,7 +283,7 @@ func (m *CommonUpgradeManagerImpl) getOrphanedPods(pods []corev1.Pod) []corev1.P return podList } -func isOrphanedPod(pod *corev1.Pod) bool { +func IsOrphanedPod(pod *corev1.Pod) bool { return pod.OwnerReferences == nil || len(pod.OwnerReferences) < 1 } diff --git a/pkg/upgrade/manager/upgrade_state.go b/pkg/upgrade/manager/upgrade_state.go index d29dd1e..0486352 100644 --- a/pkg/upgrade/manager/upgrade_state.go +++ b/pkg/upgrade/manager/upgrade_state.go @@ -21,8 +21,11 @@ import ( "fmt" "github.com/go-logr/logr" + appsv1 "k8s.io/api/apps/v1" + corev1 "k8s.io/api/core/v1" "k8s.io/client-go/rest" "k8s.io/client-go/tools/record" + "sigs.k8s.io/controller-runtime/pkg/client" "github.com/NVIDIA/k8s-operator-libs/api/upgrade/v1alpha1" "github.com/NVIDIA/k8s-operator-libs/pkg/consts" @@ -34,6 +37,9 @@ import ( // ExtendedUpgradeStateManager interface purpose is to decouple ApplyState implementation from base package // since upgrade pkg is a high level abstaction referencing inplace/requestor (maintenance OP) packages type ExtendedUpgradeStateManager interface { + // BuildState builds a point-in-time snapshot of the driver upgrade state in the cluster. + BuildState(ctx context.Context, namespace string, + driverLabels map[string]string) (*base.ClusterUpgradeState, error) // ApplyState receives a complete cluster upgrade state and, based on upgrade policy, processes each node's state. // Based on the current state of the node, it is calculated if the node can be moved to the next state right now // or whether any actions need to be scheduled for the node to move to the next state. @@ -88,6 +94,75 @@ func NewClusterUpgradeStateManager( return manager, nil } +// BuildState builds a point-in-time snapshot of the driver upgrade state in the cluster. +func (m *ClusterUpgradeStateManagerImpl) BuildState(ctx context.Context, namespace string, + driverLabels map[string]string) (*base.ClusterUpgradeState, error) { + m.Log.V(consts.LogLevelInfo).Info("Building state") + + upgradeState := base.NewClusterUpgradeState() + + daemonSets, err := m.GetDriverDaemonSets(ctx, namespace, driverLabels) + if err != nil { + m.Log.V(consts.LogLevelError).Error(err, "Failed to get driver DaemonSet list") + return nil, err + } + + m.Log.V(consts.LogLevelDebug).Info("Got driver DaemonSets", "length", len(daemonSets)) + + // Get list of driver pods + podList := &corev1.PodList{} + + err = m.K8sClient.List(ctx, podList, + client.InNamespace(namespace), + client.MatchingLabels(driverLabels), + ) + + if err != nil { + return nil, err + } + + filteredPodList := []corev1.Pod{} + for _, ds := range daemonSets { + dsPods := m.GetPodsOwnedbyDs(ds, podList.Items) + if int(ds.Status.DesiredNumberScheduled) != len(dsPods) { + m.Log.V(consts.LogLevelInfo).Info("Driver DaemonSet has Unscheduled pods", "name", ds.Name) + return nil, fmt.Errorf("driver DaemonSet should not have Unscheduled pods") + } + filteredPodList = append(filteredPodList, dsPods...) + } + + // Collect also orphaned driver pods + filteredPodList = append(filteredPodList, m.GetOrphanedPods(podList.Items)...) + + upgradeStateLabel := GetUpgradeStateLabelKey() + + for i := range filteredPodList { + pod := &filteredPodList[i] + var ownerDaemonSet *appsv1.DaemonSet + if base.IsOrphanedPod(pod) { + ownerDaemonSet = nil + } else { + ownerDaemonSet = daemonSets[pod.OwnerReferences[0].UID] + } + // Check if pod is already scheduled to a Node + if pod.Spec.NodeName == "" && pod.Status.Phase == corev1.PodPending { + m.Log.V(consts.LogLevelInfo).Info("Driver Pod has no NodeName, skipping", "pod", pod.Name) + continue + } + //TODO: Add existing nodeMaintenance objs to sync with nodeState reference + nodeState, err := m.BuildNodeUpgradeState(ctx, pod, ownerDaemonSet) + if err != nil { + m.Log.V(consts.LogLevelError).Error(err, "Failed to build node upgrade state for pod", "pod", pod) + return nil, err + } + nodeStateLabel := nodeState.Node.Labels[upgradeStateLabel] + upgradeState.NodeStates[nodeStateLabel] = append( + upgradeState.NodeStates[nodeStateLabel], nodeState) + } + + return &upgradeState, nil +} + // ApplyState receives a complete cluster upgrade state and, based on upgrade policy, processes each node's state. // Based on the current state of the node, it is calculated if the node can be moved to the next state right now // or whether any actions need to be scheduled for the node to move to the next state. @@ -115,6 +190,7 @@ func (m *ClusterUpgradeStateManagerImpl) ApplyState(ctx context.Context, base.UpgradeStatePodDeletionRequired, len(currentState.NodeStates[base.UpgradeStatePodDeletionRequired]), base.UpgradeStateFailed, len(currentState.NodeStates[base.UpgradeStateFailed]), base.UpgradeStateDrainRequired, len(currentState.NodeStates[base.UpgradeStateDrainRequired]), + base.UpgradeStateNodeMaintenanceRequired, len(currentState.NodeStates[base.UpgradeStateNodeMaintenanceRequired]), base.UpgradeStatePostMaintenanceRequired, len(currentState.NodeStates[base.UpgradeStatePostMaintenanceRequired]), base.UpgradeStatePodRestartRequired, len(currentState.NodeStates[base.UpgradeStatePodRestartRequired]), base.UpgradeStateValidationRequired, len(currentState.NodeStates[base.UpgradeStateValidationRequired]), @@ -137,12 +213,7 @@ func (m *ClusterUpgradeStateManagerImpl) ApplyState(ctx context.Context, return err } // Start upgrade process for upgradesAvailable number of nodes - if requestor.UseMaintenanceOperator { - err = m.requestor.ProcessUpgradeRequiredNodes(ctx, currentState, upgradePolicy) - } else { - err = m.inplace.ProcessUpgradeRequiredNodes(ctx, currentState, upgradePolicy) - } - + err = m.ProcessUpgradeRequiredNodesWrapper(ctx, currentState, upgradePolicy) if err != nil { m.Log.V(consts.LogLevelError).Error( err, "Failed to process nodes", "state", base.UpgradeStateUpgradeRequired) @@ -198,7 +269,8 @@ func (m *ClusterUpgradeStateManagerImpl) ApplyState(ctx context.Context, m.Log.V(consts.LogLevelError).Error(err, "Failed to validate driver upgrade") return err } - err = m.ProcessUncordonRequiredNodes(ctx, currentState) + + err = m.ProcessUncordonRequiredNodesWrapper(ctx, currentState) if err != nil { m.Log.V(consts.LogLevelError).Error(err, "Failed to uncordon nodes") return err @@ -210,3 +282,28 @@ func (m *ClusterUpgradeStateManagerImpl) ApplyState(ctx context.Context, func (m *ClusterUpgradeStateManagerImpl) GetRequestor() base.ProcessNodeStateManager { return m.requestor } + +func (m *ClusterUpgradeStateManagerImpl) ProcessUpgradeRequiredNodesWrapper(ctx context.Context, + currentState *base.ClusterUpgradeState, upgradePolicy *v1alpha1.DriverUpgradePolicySpec) error { + var err error + // Start upgrade process for upgradesAvailable number of nodes + if requestor.UseMaintenanceOperator { + err = m.requestor.ProcessUpgradeRequiredNodes(ctx, currentState, upgradePolicy) + } else { + err = m.inplace.ProcessUpgradeRequiredNodes(ctx, currentState, upgradePolicy) + } + return err +} + +func (m *ClusterUpgradeStateManagerImpl) ProcessUncordonRequiredNodesWrapper(ctx context.Context, + currentState *base.ClusterUpgradeState) error { + var err error + if requestor.UseMaintenanceOperator { + err = m.requestor.ProcessUncordonRequiredNodes(ctx, currentState) + } else { + err = m.inplace.ProcessUncordonRequiredNodes(ctx, currentState) + } + + m.Log.V(consts.LogLevelError).Error(err, "Failed to uncordon nodes") + return err +} diff --git a/pkg/upgrade/manager/upgrade_state_test.go b/pkg/upgrade/manager/upgrade_state_test.go index 9791322..6815ef0 100644 --- a/pkg/upgrade/manager/upgrade_state_test.go +++ b/pkg/upgrade/manager/upgrade_state_test.go @@ -1249,7 +1249,7 @@ var _ = Describe("UpgradeStateManager tests", func() { Expect(getNodeUpgradeState(upgradeFailedNode)).To(Equal(base.UpgradeStateFailed)) }) - It("UpgradeStateManager should use upgrade requestor mode", func() { + It("UpgradeStateManager should move to post-maintenance-required while using upgrade requestor mode", func() { cancel := withUpgradeRequestorMode(ctx, id) defer cancel() daemonSet := &appsv1.DaemonSet{ObjectMeta: v1.ObjectMeta{}} @@ -1276,23 +1276,70 @@ var _ = Describe("UpgradeStateManager tests", func() { } Expect(stateManagerInterface.ApplyState(ctx, &clusterState, policy)).To(Succeed()) - nm := &maintenancev1alpha1.NodeMaintenance{} - err := k8sClient.Get(ctx, types.NamespacedName{Namespace: requestor.MaintenanceOPRequestorNS, Name: "node1"}, nm, &client.GetOptions{}) - Expect(err).NotTo(HaveOccurred()) - - status := maintenancev1alpha1.NodeMaintenanceStatus{ - Conditions: []v1.Condition{{ - Type: maintenancev1alpha1.ConditionTypeReady, - Status: v1.ConditionTrue, - Reason: maintenancev1alpha1.ConditionReasonReady, - Message: "Maintenance completed successfully", - LastTransitionTime: v1.NewTime(time.Now()), - }}, + By("verify generated node-maintenance obj(s)") + nms := &maintenancev1alpha1.NodeMaintenanceList{} + Eventually(func() bool { + k8sClient.List(ctx, nms) + return len(nms.Items) == len(clusterState.NodeStates[base.UpgradeStateUpgradeRequired]) + }).WithTimeout(10 * time.Second).WithPolling(1 * 500 * time.Millisecond).Should(BeTrue()) + + By("set node-maintenance(s) finalizer to mimic maintenance-operator obj deletion ownership") + for _, item := range nms.Items { + nm := &maintenancev1alpha1.NodeMaintenance{} + err := k8sClient.Get(ctx, client.ObjectKey{Name: item.Name, Namespace: "default"}, nm) + Expect(err).NotTo(HaveOccurred()) + + nm.Finalizers = append(nm.Finalizers, requestor.MaintenanceOPFinalizerName) + err = k8sClient.Update(ctx, nm) + Expect(err).NotTo(HaveOccurred()) + + Eventually(func() error { + nm := &maintenancev1alpha1.NodeMaintenance{} + err := k8sClient.Get(ctx, client.ObjectKey{Name: item.Name, Namespace: "default"}, nm) + if err != nil { + return err + } + if len(nm.Finalizers) == 0 { + return fmt.Errorf("missing status condition") + } + return nil + }).WithTimeout(10 * time.Second).WithPolling(1 * 500 * time.Millisecond).Should(Succeed()) } - nm.Status = status - err = k8sClient.Status().Update(ctx, nm) - Expect(err).NotTo(HaveOccurred()) + By("set node-maintenance(s) status to mimic maintenance-operator 'Ready' condition flow") + for _, item := range nms.Items { + nm := &maintenancev1alpha1.NodeMaintenance{} + err := k8sClient.Get(ctx, client.ObjectKey{Name: item.Name, Namespace: "default"}, nm) + Expect(err).NotTo(HaveOccurred()) + + status := maintenancev1alpha1.NodeMaintenanceStatus{ + Conditions: []v1.Condition{{ + Type: maintenancev1alpha1.ConditionTypeReady, + Status: v1.ConditionTrue, + Reason: maintenancev1alpha1.ConditionReasonReady, + Message: "Maintenance completed successfully", + LastTransitionTime: v1.NewTime(time.Now()), + }}, + } + nm.Status = status + err = k8sClient.Status().Update(ctx, nm) + Expect(err).NotTo(HaveOccurred()) + } + // verify status is updated + for _, item := range nms.Items { + Eventually(func() error { + nm := &maintenancev1alpha1.NodeMaintenance{} + err := k8sClient.Get(ctx, client.ObjectKey{Name: item.Name, Namespace: "default"}, nm) + if err != nil { + return err + } + if len(nm.Status.Conditions) == 0 { + return fmt.Errorf("missing status condition") + } + return nil + }).WithTimeout(10 * time.Second).WithPolling(1 * 500 * time.Millisecond).Should(Succeed()) + } + By("verify node is in post-maintennace-required' state") node := &corev1.Node{} Eventually(func() error { err := k8sClient.Get(ctx, client.ObjectKey{Name: "node1", Namespace: "default"}, node) @@ -1304,7 +1351,6 @@ var _ = Describe("UpgradeStateManager tests", func() { } return nil }).WithTimeout(10 * time.Second).WithPolling(1 * 500 * time.Millisecond).Should(Succeed()) - Expect(err).NotTo(HaveOccurred()) }) It("UpgradeStateManager should restart Orphaned pod in upgrade requestor mode", func() { @@ -1339,10 +1385,13 @@ var _ = Describe("UpgradeStateManager tests", func() { return nil }) stateManager.PodManager = &podManagerMock - Expect(stateManagerInterface.ApplyState(ctx, &clusterState, policy)).To(Succeed()) }) + It("UpgradeStateManager todo: with upgrade requestor mode", func() { + //TODO: check that legacy flow continues for cordon-required/pod-restart-required + }) + }) func nodeWithUpgradeState(state string) *corev1.Node { diff --git a/pkg/upgrade/requestor/client.go b/pkg/upgrade/requestor/client.go index d0ea279..a1ad0ac 100644 --- a/pkg/upgrade/requestor/client.go +++ b/pkg/upgrade/requestor/client.go @@ -19,9 +19,11 @@ package requestor import ( "context" "fmt" + "strings" //nolint:depguard maintenancev1alpha1 "github.com/Mellanox/maintenance-operator/api/v1alpha1" + k8serrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime" @@ -73,6 +75,10 @@ func (m *UpgradeManagerImpl) CreateNodeMaintenance(ctx context.Context, nodeStat m.Log.V(consts.LogLevelInfo).Info("creating node maintenance", nodeState.Node.Name, nm.Name) err = m.K8sClient.Create(ctx, nm, &client.CreateOptions{}) if err != nil { + if k8serrors.IsAlreadyExists(err) { + m.Log.V(consts.LogLevelError).Error(err, "nodeMaintenance") + return nil + } return fmt.Errorf("failed to create node maintenance '%+v'. %v", nm, err) } @@ -80,19 +86,19 @@ func (m *UpgradeManagerImpl) CreateNodeMaintenance(ctx context.Context, nodeStat } // DeleteNodeMaintenance requests to delete nodeMaintenance obj -func (m *UpgradeManagerImpl) DeleteNodeMaintenance(nodeState *base.NodeUpgradeState) error { +func (m *UpgradeManagerImpl) DeleteNodeMaintenance(ctx context.Context, nodeState *base.NodeUpgradeState) error { curObj, err := validateNodeMaintenance(nodeState) if err != nil { return err } nm := &maintenancev1alpha1.NodeMaintenance{} - err = m.K8sClient.Get(context.TODO(), types.NamespacedName{Name: curObj.Name, Namespace: curObj.Namespace}, + err = m.K8sClient.Get(ctx, types.NamespacedName{Name: curObj.Name, Namespace: curObj.Namespace}, nm, &client.GetOptions{}) if err != nil { return err } // send deletion request assuming maintenance OP will handle actual obj deletion - err = m.K8sClient.Delete(context.TODO(), nm, &client.DeleteOptions{}) + err = m.K8sClient.Delete(ctx, nm) if err != nil { return err } @@ -101,7 +107,8 @@ func (m *UpgradeManagerImpl) DeleteNodeMaintenance(nodeState *base.NodeUpgradeSt // TODO func getNodeMaintenanceName(nodeState *base.NodeUpgradeState) string { - return nodeState.Node.Name // TODO: Need to set naming convention + // TODO: Need to consider naming convention + return fmt.Sprintf("node-maintenance-%s", strings.TrimPrefix(nodeState.Node.Name, "node")) } func validateNodeMaintenance(nodeState *base.NodeUpgradeState) (*maintenancev1alpha1.NodeMaintenance, error) { diff --git a/pkg/upgrade/requestor/controller.go b/pkg/upgrade/requestor/controller.go index 8eb4717..9a809a9 100644 --- a/pkg/upgrade/requestor/controller.go +++ b/pkg/upgrade/requestor/controller.go @@ -17,14 +17,17 @@ limitations under the License. package requestor import ( + "cmp" "context" - "fmt" "reflect" + "slices" //nolint:depguard maintenancev1alpha1 "github.com/Mellanox/maintenance-operator/api/v1alpha1" "github.com/go-logr/logr" + k8serrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/meta" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" utilruntime "k8s.io/apimachinery/pkg/util/runtime" clientgoscheme "k8s.io/client-go/kubernetes/scheme" @@ -44,7 +47,8 @@ var Scheme = runtime.NewScheme() // NodeMaintenanceConditionReady designates maintenance operation completed for a designated node type NodeMaintenanceCondition struct { // Node maintenance name - Name string + Name string + NodeName string // Node maintenance condition reason Reason string } @@ -65,61 +69,116 @@ func init() { //nolint:dupl func (r *NodeMaintenanceReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { reqLog := log.FromContext(ctx) - reqLog.V(consts.LogLevelInfo).Info("reconcile NodeMaintenance request") + reqLog.V(consts.LogLevelDebug).Info("reconcile NodeMaintenance request", req.Name, req.Namespace) + + // get NodeMaintenance object nm := &maintenancev1alpha1.NodeMaintenance{} if err := r.Get(ctx, req.NamespacedName, nm); err != nil { + if k8serrors.IsNotFound(err) { + reqLog.Info("NodeMaintenance object not found, nothing to do.") + return reconcile.Result{}, nil + } return reconcile.Result{}, err } - if !nm.GetDeletionTimestamp().IsZero() { - // object is being deleted - // TODO: wait for object deletion to update node uncodron is finished - return reconcile.Result{}, nil - } - cond := meta.FindStatusCondition(nm.Status.Conditions, maintenancev1alpha1.ConditionReasonReady) if cond != nil { - reqLog.V(consts.LogLevelInfo).Info("node maintenance operation completed", nm.Spec.NodeName, cond.Reason) + if len(nm.Finalizers) == 0 || !nm.GetDeletionTimestamp().IsZero() { + // object is being deleted + // TODO: wait for object deletion to update node uncodron is finished + reqLog.V(consts.LogLevelDebug).Info("node maintenance is beening deleted", nm.Name, nm.Spec.NodeName) + r.StatusCh <- NodeMaintenanceCondition{Name: nm.Name, NodeName: nm.Spec.NodeName, Reason: "deleting"} + return reconcile.Result{}, nil + } + reqLog.V(consts.LogLevelDebug).Info("node maintenance operation completed", nm.Spec.NodeName, cond.Reason) if cond.Reason == maintenancev1alpha1.ConditionReasonReady || cond.Reason == maintenancev1alpha1.ConditionReasonFailedMaintenance { //TODO: Add a channel to push node name + ready condition state // Upgrade manager should wait for channel and update to UpgradeStatePodRestartRequired - r.StatusCh <- NodeMaintenanceCondition{Name: nm.Spec.NodeName, Reason: cond.Reason} + r.StatusCh <- NodeMaintenanceCondition{Name: nm.Name, NodeName: nm.Spec.NodeName, Reason: cond.Reason} + return reconcile.Result{}, nil } } + if !nm.GetDeletionTimestamp().IsZero() { + // object is being deleted + // TODO: wait for object deletion to update node uncodron is finished + reqLog.V(consts.LogLevelDebug).Info("node maintenance is beening deleted", nm.Name, nm.Spec.NodeName) + r.StatusCh <- NodeMaintenanceCondition{Name: nm.Name, NodeName: nm.Spec.NodeName, Reason: "deleting"} + return reconcile.Result{}, nil + } + + reqLog.V(consts.LogLevelWarning).Info("nothing todo, check why", nm.Name, nm.Spec.NodeName) + return reconcile.Result{}, nil } func (r *NodeMaintenanceReconciler) SetupWithManager(mgr ctrl.Manager, log logr.Logger) error { log.V(consts.LogLevelInfo).Info("Started nodeMaintenance status manger") - statusPredicate := predicate.Funcs{ - // Don't reconcile on create - CreateFunc: func(e event.CreateEvent) bool { - _ = e - return false - }, - DeleteFunc: func(e event.DeleteEvent) bool { - // TODO: do we need to watch for deletion? - log.V(consts.LogLevelInfo).Info("Delete event detected, triggering reconcile", - e.Object.GetName(), e.Object.GetNamespace()) - return true - }, - UpdateFunc: func(e event.UpdateEvent) bool { - oldObj, okOld := e.ObjectOld.DeepCopyObject().(*maintenancev1alpha1.NodeMaintenance) - newObj, okNew := e.ObjectNew.DeepCopyObject().(*maintenancev1alpha1.NodeMaintenance) - - if !okOld || !okNew { - log.V(consts.LogLevelError).Error(fmt.Errorf("type assertion failed"), "unable to cast object to NodeMaintenance") - return false - } - // Reconcile only when the Status field changes - return !reflect.DeepEqual(oldObj.Status, newObj.Status) && oldObj.Generation == newObj.Generation - }, - } return ctrl.NewControllerManagedBy(mgr). - For(&maintenancev1alpha1.NodeMaintenance{}, builder.WithPredicates(statusPredicate)). + For(&maintenancev1alpha1.NodeMaintenance{}, builder.WithPredicates(NewConditionChangedPredicate(log))). Named(MaintenanceOPControllerName). Complete(r) } + +// NewConditionChangedPredicate creates a new ConditionChangedPredicate +func NewConditionChangedPredicate(log logr.Logger) ConditionChangedPredicate { + return ConditionChangedPredicate{ + Funcs: predicate.Funcs{}, + log: log, + } +} + +type ConditionChangedPredicate struct { + predicate.Funcs + + log logr.Logger +} + +// Update implements Predicate. +func (p ConditionChangedPredicate) Update(e event.TypedUpdateEvent[client.Object]) bool { + p.log.V(consts.LogLevelDebug).Info("ConditionChangedPredicate Update event") + + if e.ObjectOld == nil { + p.log.Error(nil, "old object is nil in update event, ignoring event.") + return false + } + if e.ObjectNew == nil { + p.log.Error(nil, "new object is nil in update event, ignoring event.") + return false + } + + oldO, ok := e.ObjectOld.(*maintenancev1alpha1.NodeMaintenance) + if !ok { + p.log.Error(nil, "failed to cast old object to NodeMaintenance in update event, ignoring event.") + return false + } + + newO, ok := e.ObjectNew.(*maintenancev1alpha1.NodeMaintenance) + if !ok { + p.log.Error(nil, "failed to cast new object to NodeMaintenance in update event, ignoring event.") + return false + } + + cmpByType := func(a, b metav1.Condition) int { + return cmp.Compare(a.Type, b.Type) + } + + // sort old and new obj.Status.Conditions so they can be compared using DeepEqual + slices.SortFunc(oldO.Status.Conditions, cmpByType) + slices.SortFunc(newO.Status.Conditions, cmpByType) + + condChanged := !reflect.DeepEqual(oldO.Status.Conditions, newO.Status.Conditions) + // Check if the object is marked for deletion + deleting := len(newO.Finalizers) == 0 && len(oldO.Finalizers) > 0 + deleting = deleting || !newO.DeletionTimestamp.IsZero() + enqueue := condChanged || deleting + + p.log.V(consts.LogLevelDebug).Info("update event for NodeMaintenance", + "name", newO.Name, "namespace", newO.Namespace, + "condition-changed", condChanged, + "deleting", deleting, "enqueue-request", enqueue) + + return enqueue +} diff --git a/pkg/upgrade/requestor/controller_test.go b/pkg/upgrade/requestor/controller_test.go index 571ef61..1658725 100644 --- a/pkg/upgrade/requestor/controller_test.go +++ b/pkg/upgrade/requestor/controller_test.go @@ -27,6 +27,7 @@ import ( corev1 "k8s.io/api/core/v1" k8serrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime" "sigs.k8s.io/controller-runtime/pkg/client" @@ -35,6 +36,8 @@ import ( "github.com/NVIDIA/k8s-operator-libs/pkg/upgrade/requestor" ) +var maxCreatedNodes = 5 + var _ = Describe("NodeMaintenance Controller", func() { var ( ctx context.Context @@ -59,30 +62,6 @@ var _ = Describe("NodeMaintenance Controller", func() { BeforeEach(func() { ctx = context.TODO() id = randSeq(5) - // create node objects for scheduler to use - By("create test nodes") - upgradeRequired := []*base.NodeUpgradeState{ - {Node: NewNode("node1").WithUpgradeState(base.UpgradeStateUpgradeRequired).Node}, - {Node: NewNode("node2").WithUpgradeState(base.UpgradeStateUpgradeRequired).Node}, - {Node: NewNode("node3").WithUpgradeState(base.UpgradeStateUpgradeRequired).Node}, - {Node: NewNode("node4").WithUpgradeState(base.UpgradeStateUpgradeRequired).Node}, - } - upgradeUnknown := []*base.NodeUpgradeState{ - {Node: NewNode("node5").WithUpgradeState(base.UpgradeStateUnknown).Node}, - } - - currentClusterState = &base.ClusterUpgradeState{ - NodeStates: map[string][]*base.NodeUpgradeState{ - base.UpgradeStateUpgradeRequired: upgradeRequired, - base.UpgradeStateUnknown: upgradeUnknown, - }, - } - - _ = NewNode("node1").WithUpgradeState(base.UpgradeStateUpgradeRequired).Create() - _ = NewNode("node2").WithUpgradeState(base.UpgradeStateUpgradeRequired).Create() - _ = NewNode("node3").WithUpgradeState(base.UpgradeStateUpgradeRequired).Create() - _ = NewNode("node4").WithUpgradeState(base.UpgradeStateUpgradeRequired).Create() - _ = NewNode("node5").WithUpgradeState(base.UpgradeStateUpgradeRequired).Create() // setup reconciler with manager By("create and start requstor upgrade manager") @@ -99,7 +78,14 @@ var _ = Describe("NodeMaintenance Controller", func() { AfterEach(func() { By("Cleanup NodeMaintenance resources") - for _, nodeState := range currentClusterState.NodeStates[base.UpgradeStateUpgradeRequired] { + var nmObjects []*base.NodeUpgradeState + for _, nodesState := range currentClusterState.NodeStates { + for _, nodeState := range nodesState { + nmObjects = append(nmObjects, nodeState) + _ = nodeState + } + } + for _, nodeState := range nmObjects { err := k8sClient.Get(ctx, client.ObjectKeyFromObject(nodeState.Node), nodeState.Node) if err == nil { // delete obj @@ -112,20 +98,48 @@ var _ = Describe("NodeMaintenance Controller", func() { if nodeState.NodeMaintenance != nil { nm := &maintenancev1alpha1.NodeMaintenance{} runtime.DefaultUnstructuredConverter.FromUnstructured(nodeState.NodeMaintenance.Object, nm) - err = k8sClient.Delete(ctx, nm) + err = removeFinalizersOrDelete(ctx, nm) + //err = k8sClient.Delete(ctx, nm) if err != nil && k8serrors.IsNotFound(err) { err = nil Expect(err).NotTo(HaveOccurred()) } } } - cancel() + nms := &maintenancev1alpha1.NodeMaintenanceList{} + err := k8sClient.List(ctx, nms) + Expect(err).NotTo(HaveOccurred()) + + for _, item := range nms.Items { + err = removeFinalizersOrDelete(ctx, &item) + Expect(err).NotTo(HaveOccurred()) + } + + nodes := &corev1.NodeList{} + err = k8sClient.List(ctx, nodes) + Expect(err).NotTo(HaveOccurred()) + + for _, item := range nodes.Items { + err = k8sClient.Delete(ctx, &item) + if err != nil && k8serrors.IsNotFound(err) { + err = nil + } + Expect(err).NotTo(HaveOccurred()) + } + time.Sleep(1 * time.Second) + + DeferCleanup(func() { + By("Shut down controller manager") + cancel() + }) }) It("verify created node maintenance(s)", func() { + By("create test nodes") + currentClusterState = newClusterState(false, base.UpgradeStateUpgradeRequired) + err := reqMngr.ProcessUpgradeRequiredNodes(ctx, currentClusterState, policy) Expect(err).NotTo(HaveOccurred()) - nms := &maintenancev1alpha1.NodeMaintenanceList{} Eventually(func() bool { k8sClient.List(ctx, nms) @@ -134,6 +148,14 @@ var _ = Describe("NodeMaintenance Controller", func() { }) It("node maintenance status condition check", func() { + By("create test nodes") + currentClusterState = newClusterState(false, base.UpgradeStateUpgradeRequired) + nodes := &corev1.NodeList{} + Eventually(func() bool { + k8sClient.List(ctx, nodes) + return len(nodes.Items) == maxCreatedNodes + }).WithTimeout(10 * time.Second).WithPolling(1 * 500 * time.Millisecond).Should(BeTrue()) + err := reqMngr.ProcessUpgradeRequiredNodes(ctx, currentClusterState, policy) Expect(err).NotTo(HaveOccurred()) @@ -143,6 +165,14 @@ var _ = Describe("NodeMaintenance Controller", func() { return len(nms.Items) == len(currentClusterState.NodeStates[base.UpgradeStateUpgradeRequired]) }).WithTimeout(10 * time.Second).WithPolling(1 * 500 * time.Millisecond).Should(BeTrue()) + for _, item := range nms.Items { + nm := &maintenancev1alpha1.NodeMaintenance{} + Eventually(func() bool { + k8sClient.Get(ctx, client.ObjectKey{Name: item.Name, Namespace: "default"}, nm) + return len(nm.Finalizers) == 0 + }).WithTimeout(10 * time.Second).WithPolling(1 * 500 * time.Millisecond).Should(BeTrue()) + } + status := maintenancev1alpha1.NodeMaintenanceStatus{ Conditions: []metav1.Condition{{ Type: maintenancev1alpha1.ConditionTypeReady, @@ -152,28 +182,38 @@ var _ = Describe("NodeMaintenance Controller", func() { LastTransitionTime: metav1.NewTime(time.Now()), }}, } - + By("set node-maintenance(s) finalizer") for _, item := range nms.Items { - item.Status = status - err = reqMngr.K8sClient.Status().Update(ctx, &item) + nm := &maintenancev1alpha1.NodeMaintenance{} + err := k8sClient.Get(ctx, client.ObjectKey{Name: item.Name, Namespace: "default"}, nm) + Expect(err).NotTo(HaveOccurred()) - Eventually(func() error { - nm := &maintenancev1alpha1.NodeMaintenance{} - err := k8sClient.Get(ctx, client.ObjectKey{Name: item.Name, Namespace: "default"}, nm) - if err != nil { - return err - } - if len(nm.Status.Conditions) == 0 { - return fmt.Errorf("missing status condition") - } - return nil + nm.Finalizers = append(nm.Finalizers, requestor.MaintenanceOPFinalizerName) + err = reqMngr.K8sClient.Update(ctx, nm) + Expect(err).NotTo(HaveOccurred()) + condition := func(nm *maintenancev1alpha1.NodeMaintenance) bool { + return len(nm.Finalizers) == 0 + } + Eventually(verifyNodeMaintenanceStatus(ctx, item.Name, condition)). + WithTimeout(10 * time.Second).WithPolling(1 * 500 * time.Millisecond).Should(Succeed()) + } - }).WithTimeout(10 * time.Second).WithPolling(1 * 500 * time.Millisecond).Should(Succeed()) + for _, item := range nms.Items { + nm := &maintenancev1alpha1.NodeMaintenance{} + err := k8sClient.Get(ctx, client.ObjectKey{Name: item.Name, Namespace: "default"}, nm) + Expect(err).NotTo(HaveOccurred()) + nm.Status = status + err = reqMngr.K8sClient.Status().Update(ctx, nm) Expect(err).NotTo(HaveOccurred()) + conditionFn := func(nm *maintenancev1alpha1.NodeMaintenance) bool { + return len(nm.Status.Conditions) == 0 + } + Eventually(verifyNodeMaintenanceStatus(ctx, item.Name, conditionFn)). + WithTimeout(10 * time.Second).WithPolling(1 * 500 * time.Millisecond).Should(Succeed()) node := &corev1.Node{} Eventually(func() error { - err := k8sClient.Get(ctx, client.ObjectKey{Name: item.Name, Namespace: "default"}, node) + err := k8sClient.Get(ctx, client.ObjectKey{Name: item.Spec.NodeName, Namespace: "default"}, node) if err != nil { return err } @@ -183,8 +223,100 @@ var _ = Describe("NodeMaintenance Controller", func() { return nil }).WithTimeout(10 * time.Second).WithPolling(1 * 500 * time.Millisecond).Should(Succeed()) Expect(err).NotTo(HaveOccurred()) + //Eventually(verifyNodeState(ctx, item.Spec.NodeName, base.UpgradeStatePostMaintenanceRequired)). + // WithTimeout(10 * time.Second).WithPolling(1 * 500 * time.Millisecond).Should(Succeed()) + } + }) + + It("node maintenance watch deleted objects", func() { + var err error + By("create test nodes") + currentClusterState = newClusterState(true, base.UpgradeStateUncordonRequired) + nms := &maintenancev1alpha1.NodeMaintenanceList{} + Eventually(func() bool { + k8sClient.List(ctx, nms) + return len(nms.Items) == len(currentClusterState.NodeStates[base.UpgradeStateUncordonRequired]) + }).WithTimeout(10 * time.Second).WithPolling(1 * 500 * time.Millisecond).Should(BeTrue()) + + err = reqMngr.ProcessUncordonRequiredNodes(ctx, currentClusterState) + Expect(err).NotTo(HaveOccurred()) + + nms = &maintenancev1alpha1.NodeMaintenanceList{} + Eventually(func() bool { + k8sClient.List(ctx, nms) + return len(nms.Items) == len(currentClusterState.NodeStates[base.UpgradeStateUncordonRequired]) + }).WithTimeout(10 * time.Second).WithPolling(1 * 500 * time.Millisecond).Should(BeTrue()) + By("remove node-maintenance finalizers") + for _, item := range nms.Items { + nm := &maintenancev1alpha1.NodeMaintenance{} + Expect(len(item.Finalizers)).To(Equal(1)) + err := k8sClient.Get(ctx, client.ObjectKey{Name: item.Name, Namespace: "default"}, nm) + Expect(err).NotTo(HaveOccurred()) + nm.Finalizers = []string{} + err = k8sClient.Update(ctx, nm) + Expect(err).NotTo(HaveOccurred()) } + for _, item := range nms.Items { + Eventually(verifyNodeState(ctx, item.Spec.NodeName, base.UpgradeStateDone)). + WithTimeout(10 * time.Second).WithPolling(1 * 500 * time.Millisecond).Should(Succeed()) + } }) }) + +func newClusterState(shouldCreate bool, state string) *base.ClusterUpgradeState { + var upgradeState []*base.NodeUpgradeState + + //integers := []rune("123456789") + for i := 1; i < maxCreatedNodes; i++ { + //suffix, _ := strconv.Atoi(string(integers[rand.Intn(len(integers))])) + upgradeState = append(upgradeState, &base.NodeUpgradeState{ + Node: NewNode(fmt.Sprintf("node%d", i)).WithUpgradeState(state).Create(), + NodeMaintenance: newNodeMaintenance(shouldCreate, i), + }) + } + upgradeUnknown := []*base.NodeUpgradeState{ + {Node: NewNode(fmt.Sprintf("node%d", 10)).WithUpgradeState(base.UpgradeStateUnknown).Create()}, + } + + return &base.ClusterUpgradeState{ + NodeStates: map[string][]*base.NodeUpgradeState{ + state: upgradeState, + base.UpgradeStateUnknown: upgradeUnknown, + }, + } +} + +func newNodeMaintenance(shouldCreate bool, id int) *unstructured.Unstructured { + if shouldCreate { + return NewNodeMaintenance(fmt.Sprintf("node-maintenance-%d", id), fmt.Sprintf("node%d", id)).Create() + } + return nil +} + +func verifyNodeState(ctx context.Context, name, state string) error { + node := &corev1.Node{} + err := k8sClient.Get(ctx, client.ObjectKey{Name: name, Namespace: "default"}, node) + if err != nil { + return err + } + if getNodeUpgradeState(node) != state { + return fmt.Errorf("missing status condition") + } + return nil +} + +func verifyNodeMaintenanceStatus(ctx context.Context, name string, + condition func(nm *maintenancev1alpha1.NodeMaintenance) bool) error { + nm := &maintenancev1alpha1.NodeMaintenance{} + err := k8sClient.Get(ctx, client.ObjectKey{Name: name, Namespace: "default"}, nm) + if err != nil { + return err + } + if condition(nm) { + return fmt.Errorf("missing status condition") + } + return nil + +} diff --git a/pkg/upgrade/requestor/requestor_suite_test.go b/pkg/upgrade/requestor/requestor_suite_test.go index c0ce0a4..04eae20 100644 --- a/pkg/upgrade/requestor/requestor_suite_test.go +++ b/pkg/upgrade/requestor/requestor_suite_test.go @@ -20,6 +20,7 @@ import ( "context" "path/filepath" "testing" + "time" "github.com/go-logr/logr" . "github.com/onsi/ginkgo/v2" @@ -31,7 +32,10 @@ import ( maintenancev1alpha1 "github.com/Mellanox/maintenance-operator/api/v1alpha1" corev1 "k8s.io/api/core/v1" + k8serrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime" "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" "k8s.io/client-go/tools/record" @@ -98,6 +102,7 @@ var _ = BeforeSuite(func() { var _ = AfterSuite(func() { By("tearing down the test environment") err := testEnv.Stop() + log.Error(err, "failed test") Expect(err).NotTo(HaveOccurred()) }) @@ -173,6 +178,65 @@ func (n Node) Create() *corev1.Node { return node } +type NodeMaintenance struct { + *maintenancev1alpha1.NodeMaintenance +} + +func NewNodeMaintenance(name, nodeName string) NodeMaintenance { + status := maintenancev1alpha1.NodeMaintenanceStatus{ + Conditions: []metav1.Condition{{ + Type: maintenancev1alpha1.ConditionTypeReady, + Status: metav1.ConditionTrue, + Reason: maintenancev1alpha1.ConditionReasonReady, + Message: "Maintenance completed successfully", + LastTransitionTime: metav1.NewTime(time.Now()), + }}, + } + nm := &maintenancev1alpha1.NodeMaintenance{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: "default", + Labels: map[string]string{"hello": "world"}, + Finalizers: []string{requestor.MaintenanceOPFinalizerName}, + }, + Spec: maintenancev1alpha1.NodeMaintenanceSpec{ + RequestorID: requestor.MaintenanceOPRequestorID, + NodeName: nodeName, + }, + Status: status, + } + Expect(nm.Status.Conditions).NotTo(BeNil()) + return NodeMaintenance{nm} +} + +func (n NodeMaintenance) Create() *unstructured.Unstructured { + nm := n.NodeMaintenance + err := k8sClient.Create(context.TODO(), nm) + Expect(err).NotTo(HaveOccurred()) + createdObjects = append(createdObjects, nm) + + obj, _ := runtime.DefaultUnstructuredConverter.ToUnstructured(nm) + return &unstructured.Unstructured{Object: obj} +} + +func removeFinalizersOrDelete(ctx context.Context, nm *maintenancev1alpha1.NodeMaintenance) error { + var err error + instanceFinalizers := nm.GetFinalizers() + if len(instanceFinalizers) == 0 { + err = k8sClient.Delete(ctx, nm) + return err + } + + nm.SetFinalizers([]string{}) + err = k8sClient.Update(ctx, nm) + if err != nil && k8serrors.IsNotFound(err) { + err = nil + Expect(err).NotTo(HaveOccurred()) + } + + return err +} + func getNodeUpgradeState(node *corev1.Node) string { return node.Labels[base.GetUpgradeStateLabelKey()] } diff --git a/pkg/upgrade/requestor/upgrade_requestor.go b/pkg/upgrade/requestor/upgrade_requestor.go index 0a0c195..8de10ac 100644 --- a/pkg/upgrade/requestor/upgrade_requestor.go +++ b/pkg/upgrade/requestor/upgrade_requestor.go @@ -38,6 +38,7 @@ var ( // UseMaintenanceOperator enables requestor updrade mode UseMaintenanceOperator bool MaintenanceOPRequestorID = "nvidia.operator.com" + MaintenanceOPFinalizerName = "maintenance.nvidia.com/finalizer" MaintenanceOPRequestorNS string MaintenanceOPPodEvictionFilter *string MaintenanceOPControllerName = "node-maintenance" @@ -106,7 +107,7 @@ func (m *UpgradeManagerImpl) Start(ctx context.Context) { go func() { err := m.WatchNodeMaintenanceConditionChange(ctx) if err != nil { - m.Log.V(consts.LogLevelError).Error(err, "failed to update node condition") + m.Log.V(consts.LogLevelWarning).Error(err, "failed to update node condition") } }() } @@ -122,7 +123,7 @@ func (m *UpgradeManagerImpl) ProcessUpgradeRequiredNodes( for _, nodeState := range currentClusterState.NodeStates[base.UpgradeStateUpgradeRequired] { err := m.CreateNodeMaintenance(ctx, nodeState) if err != nil { - m.Log.V(consts.LogLevelError).Error(err, "Failed to create nodeMaintenance") + m.Log.V(consts.LogLevelError).Error(err, "failed to create nodeMaintenance") return err } // update node state to 'node-maintenance-required' @@ -135,9 +136,27 @@ func (m *UpgradeManagerImpl) ProcessUpgradeRequiredNodes( return nil } +func (m *UpgradeManagerImpl) ProcessUncordonRequiredNodes( + ctx context.Context, currentClusterState *base.ClusterUpgradeState) error { + m.Log.V(consts.LogLevelInfo).Info("ProcessUncordonRequiredNodes") + + for _, nodeState := range currentClusterState.NodeStates[base.UpgradeStateUncordonRequired] { + m.Log.V(consts.LogLevelDebug).Info("deleting node maintenance", + nodeState.NodeMaintenance.GetName(), nodeState.NodeMaintenance.GetNamespace()) + err := m.DeleteNodeMaintenance(ctx, nodeState) + if err != nil { + m.Log.V(consts.LogLevelWarning).Error( + err, "Node uncordon failed", "node", nodeState.Node) + return err + } + } + return nil +} + // WatchNodeMaintenanceConditionChange waits for nodeMaintenance status change by reconciler // in case of status change, fetch referenced node and update post maintenance state func (m *UpgradeManagerImpl) WatchNodeMaintenanceConditionChange(ctx context.Context) error { + m.Log.V(consts.LogLevelInfo).Info("starting node-maintenance condition change watcher") for { select { case <-ctx.Done(): @@ -150,18 +169,31 @@ func (m *UpgradeManagerImpl) WatchNodeMaintenanceConditionChange(ctx context.Con // Channel is closed return nil } + node, err := m.NodeUpgradeStateProvider.GetNode(ctx, cond.NodeName) + if err != nil { + m.Log.V(consts.LogLevelError).Error(err, "failed to find node") + continue + } + if cond.Reason == "deleting" { + //TODO: add handler func handleNodeMaintenanceDeletion(node) + upgradeStateLabel := base.GetUpgradeStateLabelKey() + m.Log.V(consts.LogLevelInfo).Info("handle NodeMaintenance deletion", node.Name, node.Labels[upgradeStateLabel]) + if node.Labels[upgradeStateLabel] == base.UpgradeStateUncordonRequired { + err = m.NodeUpgradeStateProvider.ChangeNodeUpgradeState(ctx, node, base.UpgradeStateDone) + if err != nil { + m.Log.V(consts.LogLevelError).Error(err, "failed to update node state") + continue + } + } + } // verify node maintenance operation completed // node should enter post maintenance state if cond.Reason == maintenancev1alpha1.ConditionReasonReady { - node, err := m.NodeUpgradeStateProvider.GetNode(ctx, cond.Name) - if err != nil { - return fmt.Errorf("failed to find node. %v", err) - } + m.Log.V(consts.LogLevelDebug).Info("handle NodeMaintenance update", node.Name, cond.NodeName) // update node state to 'post-maintenance-required' err = m.NodeUpgradeStateProvider.ChangeNodeUpgradeState(ctx, node, base.UpgradeStatePostMaintenanceRequired) if err != nil { m.Log.V(consts.LogLevelError).Error(err, "failed to update node state") - return fmt.Errorf("failed to update node state. %v", err) } } }