Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding health check and exception handling #1

Open
wants to merge 2 commits into
base: djm
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,2 +1,5 @@
*~
__pycache__

.idea/
venv/
22 changes: 22 additions & 0 deletions healthcheck/health_check.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
from threading import Thread
from flask import Flask, jsonify

app = Flask(__name__)
Healthy = True


@app.route('/healthz')
def is_healthy():
if Healthy:
return jsonify({'status': 'OK'})
else:
return jsonify({'status': 'UNHEALTHY'}), 503


def start_health_endpoint(port):
app.run(host='0.0.0.0', port=port)


def run(options):
t = Thread(target=start_health_endpoint(options.port), daemon=True)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not really sure if this is correct. According to Python's doc:

target is the callable object to be invoked by the run() method

but here you've passed None which is returned from the execution of start_health_endpoint.

One thing to do is to return a lambda that will be started by Thread, I think:

def start_health_endpoint(port):
    return lambda port: app.run(host='0.0.0.0', port=port)

def run(options):
    t = Thread(target=start_health_endpoint(options.port), daemon=True)

t.start()
9 changes: 7 additions & 2 deletions k8s-sqs-autoscaler
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@

from optparse import OptionParser
from sqs import sqs
from logs.log import logger
from healthcheck import health_check
from logs import log


if __name__ == "__main__":
Expand Down Expand Up @@ -30,9 +31,12 @@ if __name__ == "__main__":
)
parser.add_option("--max-pods", dest="max_pods", type="int", default=10, help="")
parser.add_option("--min-pods", dest="min_pods", type="int", default=1, help="")
parser.add_option("--log-level", dest="log_level", type="string", default="DEBUG", help="")
parser.add_option("--port", dest="min_pods", type="int", default=5000, help="")

(options, args) = parser.parse_args()
logger.debug(options)
log.setup_logging(options)
log.logger.debug(options)

if not (options.sqs_queue_url or options.sqs_queue_name):
parser.error("SQS_QUEUE_URL / SQS_QUEUE_NAME not given")
Expand All @@ -45,4 +49,5 @@ if __name__ == "__main__":
if not options.kubernetes_deployment:
parser.error("KUBERNETES_DEPLOYMENT not given")

health_check.run(options)
sqs.run(options)
18 changes: 8 additions & 10 deletions logs/log.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,21 +3,19 @@
from logging.handlers import TimedRotatingFileHandler
from logging import StreamHandler

logger = None

def setup_logging():

def setup_logging(options):
global logger
"""Set up logging file handler for both app and sqs consumer"""
file_handler = TimedRotatingFileHandler("logs/autoscaling.log", "D", 1, 10)
file_handler.setFormatter(
logging.Formatter("%(asctime)s %(levelname)s: %(message)s [in %(pathname)s:%(lineno)d]")
)
stream_handler = StreamHandler(sys.stdout)
stream_handler.setFormatter(logging.Formatter(logging.BASIC_FORMAT))
logger_instance = logging.getLogger("autoscaling")
logger_instance.addHandler(file_handler)
logger_instance.addHandler(stream_handler)
level = os.environ.get("LOGGING_LEVEL", "ERROR")
logger_instance.setLevel(level)
return logger_instance


logger = setup_logging()
logger = logging.getLogger("autoscaling")
logger.addHandler(file_handler)
logger.addHandler(stream_handler)
logger.setLevel(options.log_level)
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,4 @@ cffi==1.11.5
pytest==3.3.2
flake8==3.4.1
kubernetes==8.0.0
flask==1.0.2
36 changes: 21 additions & 15 deletions sqs/sqs.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
import boto3
from time import sleep, time
from logs.log import logger
from logs import log
from healthcheck import health_check
from kubernetes import client, config


class SQSPoller:

logger = log.logger
options = None
sqs_client = None
extensions_v1_beta1 = None
Expand Down Expand Up @@ -40,7 +42,7 @@ def message_counts(self):
def poll(self):
message_count, invisible_message_count = self.message_counts()
deployment = self.deployment()
logger.debug(
self.logger.debug(
"Current message counts: %d visible / %d invisible. %d replicas."
% (message_count, invisible_message_count, deployment.spec.replicas)
)
Expand All @@ -50,40 +52,37 @@ def poll(self):
self.scale_up(deployment)
self.last_scale_up_time = t
else:
logger.debug("Waiting for scale up cooldown")
self.logger.debug("Waiting for scale up cooldown")
if message_count <= self.options.scale_down_messages:
# special case - do not scale to zero unless there are no invisible messages
if invisible_message_count > 0 and deployment.spec.replicas <= invisible_message_count:
logger.debug("Not scaling down because messages are still in-flight")
self.logger.debug("Not scaling down because messages are still in-flight")
elif t - self.last_scale_down_time > self.options.scale_down_cool_down:
self.scale_down(deployment)
self.last_scale_down_time = t
else:
if deployment.spec.replicas > self.options.min_pods:
logger.debug("Waiting for scale down cooldown")

# code for scale to use msg_count
sleep(self.options.poll_period)
self.logger.debug("Waiting for scale down cooldown")

def scale_up(self, deployment):
if deployment.spec.replicas < self.options.max_pods:
deployment.spec.replicas += 1
logger.info("Scaling up to %d" % deployment.spec.replicas)
self.logger.info("Scaling up to %d" % deployment.spec.replicas)
self.update_deployment(deployment)
elif deployment.spec.replicas > self.options.max_pods:
self.scale_down(deployment)
else:
logger.debug("Max pods reached")
self.logger.debug("Max pods reached")

def scale_down(self, deployment):
if deployment.spec.replicas > self.options.min_pods:
deployment.spec.replicas -= 1
logger.info("Scaling down to %d" % deployment.spec.replicas)
self.logger.info("Scaling down to %d" % deployment.spec.replicas)
self.update_deployment(deployment)
elif deployment.spec.replicas < self.options.min_pods:
self.scale_up(deployment)
else:
logger.debug("Min pods reached")
self.logger.debug("Min pods reached")

def deployment(self):
# logger.debug("loading deployment: {} from namespace: {}".format(self.options.kubernetes_deployment, self.options.kubernetes_namespace))
Expand All @@ -103,15 +102,22 @@ def update_deployment(self, deployment):
namespace=self.options.kubernetes_namespace,
body=deployment,
)
logger.debug("Deployment updated. status='%s'" % str(api_response.status))
self.logger.debug("Deployment updated. status='%s'" % str(api_response.status))

def run(self):
options = self.options
logger.debug(
self.logger.debug(
"Starting poll for {} every {}s".format(options.sqs_queue_url, options.poll_period)
)

while True:
self.poll()
try:
self.poll()
health_check.Healthy = True
sleep(self.options.poll_period)
except Exception:

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Won't this make it worse? It won't actually exit. I had a look at this code before, there's no exception catching, so I don't understand -- it should be exiting if there are issues (and then being restarted).

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We'll configure the liveness and readiness probes to restart the container if health check fails.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But it would already do that if you just let the exception bubble up; it seems like the problem is that it isn't quitting when there is an issue?

health_check.Healthy = False
self.logger.exception('Failed to autoscale')


def run(options):
Expand Down