Skip to content

Commit

Permalink
FEATURE: Enhanced pod management and volume detachment during non-gra…
Browse files Browse the repository at this point in the history
…ceful node shutdowns (#6)

Allow ASG (Auto Scaling Group) lifecycle hooks to:
- Force delete pods once the timeout is reached. (3 mins)
- Delete pods when the eviction timeout is reached. (1 min)
- Add a non-graceful shutdown taint to the node to detach volumes.
-
https://kubernetes.io/blog/2022/12/16/kubernetes-1-26-non-graceful-node-shutdown-beta
  • Loading branch information
Downager authored Apr 16, 2024
1 parent 2d88467 commit 111fa15
Showing 1 changed file with 110 additions and 26 deletions.
136 changes: 110 additions & 26 deletions modules/kubernetes/functions/k8s_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
MIRROR_POD_ANNOTATION_KEY = "kubernetes.io/config.mirror"
CONTROLLER_KIND_DAEMON_SET = "DaemonSet"


def cordon_node(api, node_name):
"""Marks the specified node as unschedulable, which means that no new pods can be launched on the
node by the Kubernetes scheduler.
Expand Down Expand Up @@ -39,37 +40,47 @@ def remove_all_pods(api, node_name, poll=5):

def pod_is_evictable(pod):
if pod.metadata.annotations is not None and pod.metadata.annotations.get(MIRROR_POD_ANNOTATION_KEY):
logger.info("Skipping mirror pod {}/{}".format(pod.metadata.namespace, pod.metadata.name))
logger.info(
"Skipping mirror pod {}/{}".format(pod.metadata.namespace, pod.metadata.name))
return False
if pod.metadata.owner_references is None:
return True
for ref in pod.metadata.owner_references:
if ref.controller is not None and ref.controller:
if ref.kind == CONTROLLER_KIND_DAEMON_SET:
logger.info("Skipping DaemonSet {}/{}".format(pod.metadata.namespace, pod.metadata.name))
logger.info(
"Skipping DaemonSet {}/{}".format(pod.metadata.namespace, pod.metadata.name))
return False
return True


def get_evictable_pods(api, node_name):
field_selector = 'spec.nodeName=' + node_name
pods = api.list_pod_for_all_namespaces(watch=False, field_selector=field_selector)
pods = api.list_pod_for_all_namespaces(
watch=False, field_selector=field_selector)
return [pod for pod in pods.items if pod_is_evictable(pod)]


def evict_until_completed(api, pods, poll):
timeout = time.time() + 60 # Allow 1 minutes for pods to evict
pending = pods
while True:
pending = evict_pods(api, pending)
if (len(pending)) <= 0:
return
elif time.time() > timeout:
logger.error(
"Timeout waiting for pods to evict, deleting remaining pods...")
delete_pods(api, pending, force=False)
return
time.sleep(poll)


def evict_pods(api, pods):
remaining = []
for pod in pods:
logger.info('Evicting pod {} in namespace {}'.format(pod.metadata.name, pod.metadata.namespace))
logger.info('Evicting pod {} in namespace {}'.format(
pod.metadata.name, pod.metadata.namespace))
body = {
'apiVersion': 'policy/v1beta1',
'kind': 'Eviction',
Expand All @@ -80,28 +91,83 @@ def evict_pods(api, pods):
}
}
try:
api.create_namespaced_pod_eviction(pod.metadata.name, pod.metadata.namespace, body)
api.create_namespaced_pod_eviction(
pod.metadata.name, pod.metadata.namespace, body)
except ApiException as err:
if err.status == 429:
remaining.append(pod)
logger.warning("Pod {}/{} could not be evicted due to disruption budget. Will retry.".format(pod.metadata.namespace, pod.metadata.name))
logger.warning("Failed to evict pod {}/{} due to disruption budget. Will retry.".format(
pod.metadata.namespace, pod.metadata.name))
else:
logger.exception("Unexpected error adding eviction for pod {}/{}".format(pod.metadata.namespace, pod.metadata.name))
logger.exception("Unable to evict pod {}/{} due to \"{}\"".format(
pod.metadata.namespace, pod.metadata.name, err.reason))
except:
logger.exception("Unexpected error adding eviction for pod {}/{}".format(pod.metadata.namespace, pod.metadata.name))
logger.exception("Unexpected error adding eviction for pod {}/{}, \"{}\"".format(
pod.metadata.namespace, pod.metadata.name, err))
return remaining


def delete_pods(api, pods, force=False):
msg_prefix = "Force deleting" if force else "Deleting"
for pod in pods:
logger.info("{} pod {}/{}".format(
msg_prefix, pod.metadata.namespace, pod.metadata.name))
try:
body = {'gracePeriodSeconds': 0} if force else {}
api.delete_namespaced_pod(
pod.metadata.name, pod.metadata.namespace, body=body)
logger.info(
"Pod {}/{} deleted successfully".format(pod.metadata.namespace, pod.metadata.name))
except ApiException as err:
if err.status == 404:
logger.warning("Pod {}/{} was not found. It may have been deleted by another process.".format(
pod.metadata.namespace, pod.metadata.name))
except:
logger.exception(
"Unexpected error deleting pod {}/{}".format(pod.metadata.namespace, pod.metadata.name))


def wait_until_empty(api, node_name, poll):
logger.info("Waiting for evictions to complete")
timeout = time.time() + 180 # Allow 3 minutes for pods to be terminated
while True:
pods = get_evictable_pods(api, node_name)
if len(pods) <= 0:
logger.info("All pods evicted successfully")
return
logger.debug("Still waiting for deletion of the following pods: {}".format(", ".join(map(lambda pod: pod.metadata.namespace + "/" + pod.metadata.name, pods))))
logger.info("Waiting for pod termination...")
logger.debug("Still waiting for deletion of the following pods: {}".format(
", ".join(map(lambda pod: pod.metadata.namespace + "/" + pod.metadata.name, pods))))
if time.time() > timeout:
logger.error(
"Timeout waiting for pods to be terminated, force deleting remaining pods")
delete_pods(api, pods, force=True)
taint_non_graceful_shutdown(api, node_name)
return
time.sleep(poll)


def taint_non_graceful_shutdown(api, node_name):
"""Taints the specified node with the non-graceful shutdown taint, which indicates that the node"""
# Ref: https://kubernetes.io/blog/2022/12/16/kubernetes-1-26-non-graceful-node-shutdown-beta/
patch_body = {
"spec": {
"taints": [{
"effect": "NoSchedule",
"key": "node.kubernetes.io/out-of-service",
"value": "nodeshutdown"
}]
}
}
try:
api.patch_node(node_name, patch_body)
logger.info(
"Node {} has been tainted with the non-graceful shutdown taint".format(node_name))
except:
logger.exception(
"There was an error tainting the node {} with the non-graceful shutdown taint".format(node_name))


def master_ready(api, asg_client, lb_client, ec2_client, asg_name, node_name, node_role, timeout):
"""Determines whether the K8s master node are ready"""

Expand All @@ -114,24 +180,28 @@ def master_ready(api, asg_client, lb_client, ec2_client, asg_name, node_name, no
)['AutoScalingGroups'][0]
asg_instances = asg_info['Instances']
asg_desired_capacity = asg_info['DesiredCapacity']
asg_remain_instances = [instance['InstanceId'] for instance in asg_instances if 'Terminating' not in instance['LifecycleState']]
asg_remain_instances = [instance['InstanceId']
for instance in asg_instances if 'Terminating' not in instance['LifecycleState']]

# The master asg contain multiple nodes
if asg_desired_capacity > 1:
return True

# There is only one node in the master asg, waiting for the node bind to lb
waiting_timeout = time.time() + timeout

lb_name = asg_client.describe_load_balancers(AutoScalingGroupName=asg_name)['LoadBalancers'][0]['LoadBalancerName']
lb_name = asg_client.describe_load_balancers(AutoScalingGroupName=asg_name)[
'LoadBalancers'][0]['LoadBalancerName']

while True:
if time.time() > waiting_timeout:
logger.exception('timeout waiting for master node {} ready'.format(node_name))
return False
logger.exception(
'timeout waiting for master node {} ready'.format(node_name))
return False

try:
lb_instances = lb_client.describe_load_balancers(LoadBalancerNames = [lb_name])['LoadBalancerDescriptions'][0]['Instances']
lb_instances = lb_client.describe_load_balancers(LoadBalancerNames=[lb_name])[
'LoadBalancerDescriptions'][0]['Instances']

for target_instance in lb_instances:
if target_instance['InstanceId'] in asg_remain_instances:
Expand All @@ -146,19 +216,23 @@ def master_ready(api, asg_client, lb_client, ec2_client, asg_name, node_name, no
)['InstanceStates'][0]['State']

if target_instance_state == 'InService':
master_instance = ec2_client.describe_instances(InstanceIds=[target_instance['InstanceId']])['Reservations'][0]['Instances'][0]
master_instance = ec2_client.describe_instances(
InstanceIds=[target_instance['InstanceId']])['Reservations'][0]['Instances'][0]
node_name = master_instance['PrivateDnsName']
instance_lifecycle = 'Ec2Spot' if 'InstanceLifecycle' in master_instance else 'OnDemand'
append_node_labels(api, node_name, node_role, instance_lifecycle)
append_node_labels(
api, node_name, node_role, instance_lifecycle)

return True

time.sleep(10)

except:
logger.exception('There was an error waiting the node {} ready'.format(node_name))
logger.exception(
'There was an error waiting the node {} ready'.format(node_name))
return False


def node_ready(api, node_name, timeout):
"""Determines whether the specified node is ready."""

Expand All @@ -167,7 +241,8 @@ def node_ready(api, node_name, timeout):

while True:
if time.time() > waiting_timeout:
logger.exception('timeout waiting for node {} launch'.format(node_name))
logger.exception(
'timeout waiting for node {} launch'.format(node_name))
return False

try:
Expand All @@ -176,17 +251,20 @@ def node_ready(api, node_name, timeout):
time.sleep(10)
continue

node = api.list_node(pretty=True, field_selector=field_selector).items[0]
node = api.list_node(
pretty=True, field_selector=field_selector).items[0]

for condition in node.status.conditions:
if condition.type == 'Ready' and condition.status == 'True':
return True

time.sleep(10)
except:
logger.exception('There was an error waiting the node {} ready'.format(node_name))
logger.exception(
'There was an error waiting the node {} ready'.format(node_name))
return False


def node_exists(api, node_name):
"""Determines whether the specified node is still part of the cluster."""

Expand All @@ -196,12 +274,13 @@ def node_exists(api, node_name):

return False if not node else True
except:
return False
return False


def append_node_labels(api, node_name, node_role, instance_lifecycle):

node_role_label = "node-role.kubernetes.io/%s" % (node_role)

patch_body = {
"metadata": {
"labels": {
Expand All @@ -214,7 +293,9 @@ def append_node_labels(api, node_name, node_role, instance_lifecycle):
try:
api.patch_node(node_name, patch_body)
except:
logger.exception('There was an error appending labels to the node {} '.format(node_name))
logger.exception(
'There was an error appending labels to the node {} '.format(node_name))


def abandon_lifecycle_action(asg_client, auto_scaling_group_name, lifecycle_hook_name, instance_id):
"""Completes the lifecycle action with the ABANDON result, which stops any remaining actions,
Expand All @@ -225,6 +306,7 @@ def abandon_lifecycle_action(asg_client, auto_scaling_group_name, lifecycle_hook
LifecycleActionResult='ABANDON',
InstanceId=instance_id)


def continue_lifecycle_action(asg_client, auto_scaling_group_name, lifecycle_hook_name, instance_id):
"""Completes the lifecycle action with the CONTINUE result, which continues the remaining actions,
such as other lifecycle hooks.
Expand All @@ -234,6 +316,7 @@ def continue_lifecycle_action(asg_client, auto_scaling_group_name, lifecycle_hoo
LifecycleActionResult='CONTINUE',
InstanceId=instance_id)


def exclude_node_from_loadbalancer(api, node_name):
"""Excludes the node from external load balancers, such as AWS load balancer target groups.
"""
Expand All @@ -248,4 +331,5 @@ def exclude_node_from_loadbalancer(api, node_name):
try:
api.patch_node(node_name, patch_body)
except:
logger.exception('There was an error appending exclude from loadbalancer label to the node {} '.format(node_name))
logger.exception(
'There was an error appending exclude from loadbalancer label to the node {} '.format(node_name))

0 comments on commit 111fa15

Please sign in to comment.