diff --git a/governor.py b/governor.py index bba60f16..61dd2fb3 100755 --- a/governor.py +++ b/governor.py @@ -1,11 +1,12 @@ #!/usr/bin/env python -import sys, os, yaml, time, urllib2, atexit +import sys, os, yaml, time, urllib2, atexit, signal import logging from helpers.keystore import Etcd from helpers.postgresql import Postgresql from helpers.ha import Ha +from helpers.ascii import splash, showtime LOG_LEVEL = logging.DEBUG if os.getenv('DEBUG', None) else logging.INFO @@ -32,6 +33,9 @@ if os.getenv('GOVERNOR_POSTGRESQL_LISTEN'): config['postgresql']['listen'] = os.getenv('GOVERNOR_POSTGRESQL_LISTEN') +if os.getenv('GOVERNOR_POSTGRESQL_READ_ONLY_PORT'): + config['postgresql']['read_only_port'] = os.getenv('GOVERNOR_POSTGRESQL_READ_ONLY_PORT') + if os.getenv('GOVERNOR_POSTGRESQL_DATA_DIR'): config['postgresql']['data_dir'] = os.getenv('GOVERNOR_POSTGRESQL_DATA_DIR') @@ -42,13 +46,29 @@ postgresql = Postgresql(config["postgresql"]) ha = Ha(postgresql, etcd) -# stop postgresql on script exit -def stop_postgresql(): + +# leave things clean when shutting down, if possible +def shutdown(): + logging.info("Governor Shutting Down") + try: + if ha.has_lock(): + logging.info("Governor Shutting Down: Abdicating Leadership") + etcd.abdicate(postgresql.name) + + logging.info("Governor Shutting Down: Remiving Membership") + etcd.delete_member(postgresql.name) + except: + pass + + logging.info("Governor Shutting Down: Stopping Postgres") postgresql.stop() -atexit.register(stop_postgresql) + sys.exit(0) +atexit.register(shutdown) +signal.signal(signal.SIGTERM, shutdown) # wait for etcd to be available +splash() logging.info("Governor Starting up: Connect to Etcd") etcd_ready = False while not etcd_ready: @@ -70,7 +90,7 @@ def stop_postgresql(): logging.info("Governor Starting up: Initialise Complete") etcd.take_leader(postgresql.name) logging.info("Governor Starting up: Starting Postgres") - postgresql.start() + postgresql.start(master=True) else: logging.info("Governor Starting up: Initialisation Race ... LOST") logging.info("Governor Starting up: Sync Postgres from Leader") @@ -84,7 +104,7 @@ def stop_postgresql(): logging.info("Governor Starting up: Sync Completed") postgresql.write_recovery_conf(leader) logging.info("Governor Starting up: Starting Postgres") - postgresql.start() + postgresql.start(master=False) synced_from_leader = True else: time.sleep(5) @@ -92,8 +112,9 @@ def stop_postgresql(): logging.info("Governor Starting up: Existing Data Dir") postgresql.follow_no_leader() logging.info("Governor Starting up: Starting Postgres") - postgresql.start() + postgresql.start(master=False) +showtime() logging.info("Governor Running: Starting Running Loop") while True: logging.info("Governor Running: %s" % ha.run_cycle()) @@ -110,3 +131,11 @@ def stop_postgresql(): etcd.touch_member(postgresql.name, postgresql.advertised_connection_string) time.sleep(config["loop_wait"]) + + + + + + + + diff --git a/helpers/ha.py b/helpers/ha.py index b7495409..4c6dbc73 100644 --- a/helpers/ha.py +++ b/helpers/ha.py @@ -74,18 +74,18 @@ def run_cycle(self): self.state_handler.promote() return "promoted self to leader because i had the session lock" else: - return "no action. i am the leader with the lock" + return "I am the leader with the lock" else: - logger.info("does not have lock") + logger.debug("does not have lock") if self.state_handler.is_leader(): self.state_handler.demote(self.fetch_current_leader()) return "demoting self because i do not have the lock and i was a leader" else: self.state_handler.follow_the_leader(self.fetch_current_leader()) - return "no action. i am a secondary and i am following a leader" + return "I am a secondary and i am following a leader" else: if not self.state_handler.is_running(): - self.state_handler.start() + self.state_handler.start(master=self.has_lock()) return "postgresql was stopped. starting again." return "no action. not healthy enough to do anything." except helpers.errors.CurrentLeaderError: diff --git a/helpers/keystore.py b/helpers/keystore.py index 60299a0d..01191842 100644 --- a/helpers/keystore.py +++ b/helpers/keystore.py @@ -76,6 +76,9 @@ def members(self): def touch_member(self, member, connection_string): self.set("/members/%s" % member, connection_string, ttl=self.ttl) + def delete_member(self, member): + self.client.delete("/members/%s" % member) + def take_leader(self, value): self.set("/leader", value, ttl=self.ttl) @@ -116,9 +119,12 @@ def leader_unlocked(self): def am_i_leader(self, value): leader = self.get("/leader") - logger.info("Lock owner: %s; I am %s", leader, value) + logger.debug("Lock owner: %s; I am %s", leader, value) return leader == value + def abdicate(self, value): + self.client.delete("/leader", prevValue=value) + def race(self, path, value): try: self.set(path, value, prevExist=False) diff --git a/helpers/postgresql.py b/helpers/postgresql.py index c548948e..40714229 100644 --- a/helpers/postgresql.py +++ b/helpers/postgresql.py @@ -11,6 +11,7 @@ class Postgresql: def __init__(self, config): self.name = config["name"] self.host, self.port = config["listen"].split(":") + self.read_only_port = config.get('read_only_port', self.port) self.data_dir = config["data_dir"] self.replication = config["replication"] @@ -21,17 +22,21 @@ def __init__(self, config): # advertised connection for replication self.advertised_connection_string = "postgres://%s:%s@%s:%s/postgres" % (self.replication["username"], self.replication["password"], self.host, self.port) - # local connection for admin control and local reads - if self.config.get('connect', None) == 'local': - self.local_connection_string = "user=postgres" - else: - self.local_connection_string = "postgres://%s:%s/postgres" % (self.host, self.port) - self.conn = None + self.master = None def cursor(self): if not self.cursor_holder: - self.conn = psycopg2.connect(self.local_connection_string) + local_connection_string = None + + # local connection for admin control and local reads + if self.config.get('connect', None) == 'local': + local_connection_string = "user=postgres port=%s" % self.server_port() + else: + local_connection_string = "postgres://%s:%s/postgres" % (self.host, self.server_port()) + + logger.info("CONNECT: %s", local_connection_string) + self.conn = psycopg2.connect(local_connection_string) self.conn.autocommit = True self.cursor_holder = self.conn.cursor() @@ -92,7 +97,7 @@ def is_leader(self): def is_running(self): return os.system("pg_ctl status -D %s > /dev/null" % self.data_dir) == 0 - def start(self): + def start(self, master=False): if self.is_running(): logger.error("Cannot start PostgreSQL because one is already running.") return False @@ -102,26 +107,43 @@ def start(self): os.remove(pid_path) logger.info("Removed %s" % pid_path) + self.master = master + if master: + logger.info("Starting PostgreSQL in Master mode") + else: + logger.info("Starting PostgreSQL in Slave mode") + command_code = os.system("postgres -D %s %s &" % (self.data_dir, self.server_options())) while not self.is_running(): time.sleep(5) return command_code != 0 def stop(self): + logger.info("Stopping PostgreSQL") return os.system("pg_ctl stop -w -D %s -m fast -w" % self.data_dir) != 0 def reload(self): return os.system("pg_ctl reload -w -D %s" % self.data_dir) == 0 - def restart(self): - return os.system("pg_ctl restart -w -D %s -m fast" % self.data_dir) == 0 + def restart(self, master=False): + self.master = master + if master: + logger.info("Restarting PostgreSQL in Master mode") + else: + logger.info("Restarting PostgreSQL in Slave mode") + + return os.system("pg_ctl restart -w -D %s -o \"%s\" -m fast" % (self.data_dir, self.server_options())) == 0 def server_options(self): - options = "-c listen_addresses=%s -c port=%s" % (self.host, self.port) + options = "-c listen_addresses=%s -c port=%s" % (self.host, self.server_port()) for setting, value in self.config["parameters"].iteritems(): options += " -c \"%s=%s\"" % (setting, value) return options + def server_port(self): + logger.info("MASTER: %s", self.master) + return self.port if self.master else self.read_only_port + def is_healthy(self): if not self.is_running(): logger.warning("Postgresql is not running.") @@ -178,6 +200,7 @@ def write_recovery_conf(self, leader_hash): """ % {"recovery_slot": self.name}) if leader_hash is not None: leader = urlparse(leader_hash["address"]) + logger.info("Write Recovery Conf: %s:%s", leader.hostname, leader.port) f.write(""" primary_conninfo = 'user=%(user)s password=%(password)s host=%(hostname)s port=%(port)s sslmode=prefer sslcompression=1' """ % {"user": leader.username, "password": leader.password, "hostname": leader.hostname, "port": leader.port}) @@ -202,13 +225,20 @@ def follow_no_leader(self): return True def promote(self): + self.stop() + self.start(master=True) return os.system("pg_ctl promote -w -D %s" % self.data_dir) == 0 + # self.restart(master=True) def demote(self, leader): self.write_recovery_conf(leader) - self.restart() + # self.restart() + self.stop() + self.start(master=False) def create_replication_user(self): + #logger.info("Governor Starting Up: Running postgres single user mode to create repliaction user") + #os.system("postgres --single -jE << CREATE USER '%s' WITH REPLICATION ENCRYPTED PASSWORD '%s';" % (self.replication["username"], self.replication["password"])) self.query("CREATE USER \"%s\" WITH REPLICATION ENCRYPTED PASSWORD '%s';" % (self.replication["username"], self.replication["password"])) def create_replication_slot(self, member):