diff --git a/modules/kubernetes/functions/k8s_utils.py b/modules/kubernetes/functions/k8s_utils.py index 58204b7..4a00447 100644 --- a/modules/kubernetes/functions/k8s_utils.py +++ b/modules/kubernetes/functions/k8s_utils.py @@ -9,6 +9,7 @@ MIRROR_POD_ANNOTATION_KEY = "kubernetes.io/config.mirror" CONTROLLER_KIND_DAEMON_SET = "DaemonSet" + def cordon_node(api, node_name): """Marks the specified node as unschedulable, which means that no new pods can be launched on the node by the Kubernetes scheduler. @@ -39,37 +40,47 @@ def remove_all_pods(api, node_name, poll=5): def pod_is_evictable(pod): if pod.metadata.annotations is not None and pod.metadata.annotations.get(MIRROR_POD_ANNOTATION_KEY): - logger.info("Skipping mirror pod {}/{}".format(pod.metadata.namespace, pod.metadata.name)) + logger.info( + "Skipping mirror pod {}/{}".format(pod.metadata.namespace, pod.metadata.name)) return False if pod.metadata.owner_references is None: return True for ref in pod.metadata.owner_references: if ref.controller is not None and ref.controller: if ref.kind == CONTROLLER_KIND_DAEMON_SET: - logger.info("Skipping DaemonSet {}/{}".format(pod.metadata.namespace, pod.metadata.name)) + logger.info( + "Skipping DaemonSet {}/{}".format(pod.metadata.namespace, pod.metadata.name)) return False return True def get_evictable_pods(api, node_name): field_selector = 'spec.nodeName=' + node_name - pods = api.list_pod_for_all_namespaces(watch=False, field_selector=field_selector) + pods = api.list_pod_for_all_namespaces( + watch=False, field_selector=field_selector) return [pod for pod in pods.items if pod_is_evictable(pod)] def evict_until_completed(api, pods, poll): + timeout = time.time() + 60 # Allow 1 minutes for pods to evict pending = pods while True: pending = evict_pods(api, pending) if (len(pending)) <= 0: return + elif time.time() > timeout: + logger.error( + "Timeout waiting for pods to evict, deleting remaining pods...") + delete_pods(api, pending, force=False) + return time.sleep(poll) def evict_pods(api, pods): remaining = [] for pod in pods: - logger.info('Evicting pod {} in namespace {}'.format(pod.metadata.name, pod.metadata.namespace)) + logger.info('Evicting pod {} in namespace {}'.format( + pod.metadata.name, pod.metadata.namespace)) body = { 'apiVersion': 'policy/v1beta1', 'kind': 'Eviction', @@ -80,28 +91,83 @@ def evict_pods(api, pods): } } try: - api.create_namespaced_pod_eviction(pod.metadata.name, pod.metadata.namespace, body) + api.create_namespaced_pod_eviction( + pod.metadata.name, pod.metadata.namespace, body) except ApiException as err: if err.status == 429: remaining.append(pod) - logger.warning("Pod {}/{} could not be evicted due to disruption budget. Will retry.".format(pod.metadata.namespace, pod.metadata.name)) + logger.warning("Failed to evict pod {}/{} due to disruption budget. Will retry.".format( + pod.metadata.namespace, pod.metadata.name)) else: - logger.exception("Unexpected error adding eviction for pod {}/{}".format(pod.metadata.namespace, pod.metadata.name)) + logger.exception("Unable to evict pod {}/{} due to \"{}\"".format( + pod.metadata.namespace, pod.metadata.name, err.reason)) except: - logger.exception("Unexpected error adding eviction for pod {}/{}".format(pod.metadata.namespace, pod.metadata.name)) + logger.exception("Unexpected error adding eviction for pod {}/{}, \"{}\"".format( + pod.metadata.namespace, pod.metadata.name, err)) return remaining +def delete_pods(api, pods, force=False): + msg_prefix = "Force deleting" if force else "Deleting" + for pod in pods: + logger.info("{} pod {}/{}".format( + msg_prefix, pod.metadata.namespace, pod.metadata.name)) + try: + body = {'gracePeriodSeconds': 0} if force else {} + api.delete_namespaced_pod( + pod.metadata.name, pod.metadata.namespace, body=body) + logger.info( + "Pod {}/{} deleted successfully".format(pod.metadata.namespace, pod.metadata.name)) + except ApiException as err: + if err.status == 404: + logger.warning("Pod {}/{} was not found. It may have been deleted by another process.".format( + pod.metadata.namespace, pod.metadata.name)) + except: + logger.exception( + "Unexpected error deleting pod {}/{}".format(pod.metadata.namespace, pod.metadata.name)) + + def wait_until_empty(api, node_name, poll): logger.info("Waiting for evictions to complete") + timeout = time.time() + 180 # Allow 3 minutes for pods to be terminated while True: pods = get_evictable_pods(api, node_name) if len(pods) <= 0: logger.info("All pods evicted successfully") return - logger.debug("Still waiting for deletion of the following pods: {}".format(", ".join(map(lambda pod: pod.metadata.namespace + "/" + pod.metadata.name, pods)))) + logger.info("Waiting for pod termination...") + logger.debug("Still waiting for deletion of the following pods: {}".format( + ", ".join(map(lambda pod: pod.metadata.namespace + "/" + pod.metadata.name, pods)))) + if time.time() > timeout: + logger.error( + "Timeout waiting for pods to be terminated, force deleting remaining pods") + delete_pods(api, pods, force=True) + taint_non_graceful_shutdown(api, node_name) + return time.sleep(poll) + +def taint_non_graceful_shutdown(api, node_name): + """Taints the specified node with the non-graceful shutdown taint, which indicates that the node""" + # Ref: https://kubernetes.io/blog/2022/12/16/kubernetes-1-26-non-graceful-node-shutdown-beta/ + patch_body = { + "spec": { + "taints": [{ + "effect": "NoSchedule", + "key": "node.kubernetes.io/out-of-service", + "value": "nodeshutdown" + }] + } + } + try: + api.patch_node(node_name, patch_body) + logger.info( + "Node {} has been tainted with the non-graceful shutdown taint".format(node_name)) + except: + logger.exception( + "There was an error tainting the node {} with the non-graceful shutdown taint".format(node_name)) + + def master_ready(api, asg_client, lb_client, ec2_client, asg_name, node_name, node_role, timeout): """Determines whether the K8s master node are ready""" @@ -114,24 +180,28 @@ def master_ready(api, asg_client, lb_client, ec2_client, asg_name, node_name, no )['AutoScalingGroups'][0] asg_instances = asg_info['Instances'] asg_desired_capacity = asg_info['DesiredCapacity'] - asg_remain_instances = [instance['InstanceId'] for instance in asg_instances if 'Terminating' not in instance['LifecycleState']] + asg_remain_instances = [instance['InstanceId'] + for instance in asg_instances if 'Terminating' not in instance['LifecycleState']] # The master asg contain multiple nodes if asg_desired_capacity > 1: return True - + # There is only one node in the master asg, waiting for the node bind to lb waiting_timeout = time.time() + timeout - lb_name = asg_client.describe_load_balancers(AutoScalingGroupName=asg_name)['LoadBalancers'][0]['LoadBalancerName'] + lb_name = asg_client.describe_load_balancers(AutoScalingGroupName=asg_name)[ + 'LoadBalancers'][0]['LoadBalancerName'] while True: if time.time() > waiting_timeout: - logger.exception('timeout waiting for master node {} ready'.format(node_name)) - return False + logger.exception( + 'timeout waiting for master node {} ready'.format(node_name)) + return False try: - lb_instances = lb_client.describe_load_balancers(LoadBalancerNames = [lb_name])['LoadBalancerDescriptions'][0]['Instances'] + lb_instances = lb_client.describe_load_balancers(LoadBalancerNames=[lb_name])[ + 'LoadBalancerDescriptions'][0]['Instances'] for target_instance in lb_instances: if target_instance['InstanceId'] in asg_remain_instances: @@ -146,19 +216,23 @@ def master_ready(api, asg_client, lb_client, ec2_client, asg_name, node_name, no )['InstanceStates'][0]['State'] if target_instance_state == 'InService': - master_instance = ec2_client.describe_instances(InstanceIds=[target_instance['InstanceId']])['Reservations'][0]['Instances'][0] + master_instance = ec2_client.describe_instances( + InstanceIds=[target_instance['InstanceId']])['Reservations'][0]['Instances'][0] node_name = master_instance['PrivateDnsName'] instance_lifecycle = 'Ec2Spot' if 'InstanceLifecycle' in master_instance else 'OnDemand' - append_node_labels(api, node_name, node_role, instance_lifecycle) + append_node_labels( + api, node_name, node_role, instance_lifecycle) return True time.sleep(10) except: - logger.exception('There was an error waiting the node {} ready'.format(node_name)) + logger.exception( + 'There was an error waiting the node {} ready'.format(node_name)) return False + def node_ready(api, node_name, timeout): """Determines whether the specified node is ready.""" @@ -167,7 +241,8 @@ def node_ready(api, node_name, timeout): while True: if time.time() > waiting_timeout: - logger.exception('timeout waiting for node {} launch'.format(node_name)) + logger.exception( + 'timeout waiting for node {} launch'.format(node_name)) return False try: @@ -176,17 +251,20 @@ def node_ready(api, node_name, timeout): time.sleep(10) continue - node = api.list_node(pretty=True, field_selector=field_selector).items[0] + node = api.list_node( + pretty=True, field_selector=field_selector).items[0] for condition in node.status.conditions: if condition.type == 'Ready' and condition.status == 'True': return True - + time.sleep(10) except: - logger.exception('There was an error waiting the node {} ready'.format(node_name)) + logger.exception( + 'There was an error waiting the node {} ready'.format(node_name)) return False + def node_exists(api, node_name): """Determines whether the specified node is still part of the cluster.""" @@ -196,12 +274,13 @@ def node_exists(api, node_name): return False if not node else True except: - return False + return False + def append_node_labels(api, node_name, node_role, instance_lifecycle): node_role_label = "node-role.kubernetes.io/%s" % (node_role) - + patch_body = { "metadata": { "labels": { @@ -214,7 +293,9 @@ def append_node_labels(api, node_name, node_role, instance_lifecycle): try: api.patch_node(node_name, patch_body) except: - logger.exception('There was an error appending labels to the node {} '.format(node_name)) + logger.exception( + 'There was an error appending labels to the node {} '.format(node_name)) + def abandon_lifecycle_action(asg_client, auto_scaling_group_name, lifecycle_hook_name, instance_id): """Completes the lifecycle action with the ABANDON result, which stops any remaining actions, @@ -225,6 +306,7 @@ def abandon_lifecycle_action(asg_client, auto_scaling_group_name, lifecycle_hook LifecycleActionResult='ABANDON', InstanceId=instance_id) + def continue_lifecycle_action(asg_client, auto_scaling_group_name, lifecycle_hook_name, instance_id): """Completes the lifecycle action with the CONTINUE result, which continues the remaining actions, such as other lifecycle hooks. @@ -234,6 +316,7 @@ def continue_lifecycle_action(asg_client, auto_scaling_group_name, lifecycle_hoo LifecycleActionResult='CONTINUE', InstanceId=instance_id) + def exclude_node_from_loadbalancer(api, node_name): """Excludes the node from external load balancers, such as AWS load balancer target groups. """ @@ -248,4 +331,5 @@ def exclude_node_from_loadbalancer(api, node_name): try: api.patch_node(node_name, patch_body) except: - logger.exception('There was an error appending exclude from loadbalancer label to the node {} '.format(node_name)) \ No newline at end of file + logger.exception( + 'There was an error appending exclude from loadbalancer label to the node {} '.format(node_name))