Skip to content

Commit

Permalink
Make postgres listen on different ports (55432, 55433 by default) for…
Browse files Browse the repository at this point in the history
… master (RW) and slave (RO) connections#
  • Loading branch information
miketonks committed Jul 20, 2015
1 parent 08dd9bf commit 6c753d2
Show file tree
Hide file tree
Showing 4 changed files with 89 additions and 24 deletions.
43 changes: 36 additions & 7 deletions governor.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
#!/usr/bin/env python

import sys, os, yaml, time, urllib2, atexit
import sys, os, yaml, time, urllib2, atexit, signal
import logging

from helpers.keystore import Etcd
from helpers.postgresql import Postgresql
from helpers.ha import Ha
from helpers.ascii import splash, showtime

LOG_LEVEL = logging.DEBUG if os.getenv('DEBUG', None) else logging.INFO

Expand All @@ -32,6 +33,9 @@
if os.getenv('GOVERNOR_POSTGRESQL_LISTEN'):
config['postgresql']['listen'] = os.getenv('GOVERNOR_POSTGRESQL_LISTEN')

if os.getenv('GOVERNOR_POSTGRESQL_READ_ONLY_PORT'):
config['postgresql']['read_only_port'] = os.getenv('GOVERNOR_POSTGRESQL_READ_ONLY_PORT')

if os.getenv('GOVERNOR_POSTGRESQL_DATA_DIR'):
config['postgresql']['data_dir'] = os.getenv('GOVERNOR_POSTGRESQL_DATA_DIR')

Expand All @@ -42,13 +46,29 @@
postgresql = Postgresql(config["postgresql"])
ha = Ha(postgresql, etcd)

# stop postgresql on script exit
def stop_postgresql():

# leave things clean when shutting down, if possible
def shutdown():
logging.info("Governor Shutting Down")
try:
if ha.has_lock():
logging.info("Governor Shutting Down: Abdicating Leadership")
etcd.abdicate(postgresql.name)

logging.info("Governor Shutting Down: Remiving Membership")
etcd.delete_member(postgresql.name)
except:
pass

logging.info("Governor Shutting Down: Stopping Postgres")
postgresql.stop()
atexit.register(stop_postgresql)
sys.exit(0)

atexit.register(shutdown)
signal.signal(signal.SIGTERM, shutdown)

# wait for etcd to be available
splash()
logging.info("Governor Starting up: Connect to Etcd")
etcd_ready = False
while not etcd_ready:
Expand All @@ -70,7 +90,7 @@ def stop_postgresql():
logging.info("Governor Starting up: Initialise Complete")
etcd.take_leader(postgresql.name)
logging.info("Governor Starting up: Starting Postgres")
postgresql.start()
postgresql.start(master=True)
else:
logging.info("Governor Starting up: Initialisation Race ... LOST")
logging.info("Governor Starting up: Sync Postgres from Leader")
Expand All @@ -84,16 +104,17 @@ def stop_postgresql():
logging.info("Governor Starting up: Sync Completed")
postgresql.write_recovery_conf(leader)
logging.info("Governor Starting up: Starting Postgres")
postgresql.start()
postgresql.start(master=False)
synced_from_leader = True
else:
time.sleep(5)
else:
logging.info("Governor Starting up: Existing Data Dir")
postgresql.follow_no_leader()
logging.info("Governor Starting up: Starting Postgres")
postgresql.start()
postgresql.start(master=False)

showtime()
logging.info("Governor Running: Starting Running Loop")
while True:
logging.info("Governor Running: %s" % ha.run_cycle())
Expand All @@ -110,3 +131,11 @@ def stop_postgresql():
etcd.touch_member(postgresql.name, postgresql.advertised_connection_string)

time.sleep(config["loop_wait"])








8 changes: 4 additions & 4 deletions helpers/ha.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,18 +74,18 @@ def run_cycle(self):
self.state_handler.promote()
return "promoted self to leader because i had the session lock"
else:
return "no action. i am the leader with the lock"
return "I am the leader with the lock"
else:
logger.info("does not have lock")
logger.debug("does not have lock")
if self.state_handler.is_leader():
self.state_handler.demote(self.fetch_current_leader())
return "demoting self because i do not have the lock and i was a leader"
else:
self.state_handler.follow_the_leader(self.fetch_current_leader())
return "no action. i am a secondary and i am following a leader"
return "I am a secondary and i am following a leader"
else:
if not self.state_handler.is_running():
self.state_handler.start()
self.state_handler.start(master=self.has_lock())
return "postgresql was stopped. starting again."
return "no action. not healthy enough to do anything."
except helpers.errors.CurrentLeaderError:
Expand Down
8 changes: 7 additions & 1 deletion helpers/keystore.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,9 @@ def members(self):
def touch_member(self, member, connection_string):
self.set("/members/%s" % member, connection_string, ttl=self.ttl)

def delete_member(self, member):
self.client.delete("/members/%s" % member)

def take_leader(self, value):
self.set("/leader", value, ttl=self.ttl)

Expand Down Expand Up @@ -116,9 +119,12 @@ def leader_unlocked(self):

def am_i_leader(self, value):
leader = self.get("/leader")
logger.info("Lock owner: %s; I am %s", leader, value)
logger.debug("Lock owner: %s; I am %s", leader, value)
return leader == value

def abdicate(self, value):
self.client.delete("/leader", prevValue=value)

def race(self, path, value):
try:
self.set(path, value, prevExist=False)
Expand Down
54 changes: 42 additions & 12 deletions helpers/postgresql.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ class Postgresql:
def __init__(self, config):
self.name = config["name"]
self.host, self.port = config["listen"].split(":")
self.read_only_port = config.get('read_only_port', self.port)
self.data_dir = config["data_dir"]
self.replication = config["replication"]

Expand All @@ -21,17 +22,21 @@ def __init__(self, config):
# advertised connection for replication
self.advertised_connection_string = "postgres://%s:%s@%s:%s/postgres" % (self.replication["username"], self.replication["password"], self.host, self.port)

# local connection for admin control and local reads
if self.config.get('connect', None) == 'local':
self.local_connection_string = "user=postgres"
else:
self.local_connection_string = "postgres://%s:%s/postgres" % (self.host, self.port)

self.conn = None
self.master = None

def cursor(self):
if not self.cursor_holder:
self.conn = psycopg2.connect(self.local_connection_string)
local_connection_string = None

# local connection for admin control and local reads
if self.config.get('connect', None) == 'local':
local_connection_string = "user=postgres port=%s" % self.server_port()
else:
local_connection_string = "postgres://%s:%s/postgres" % (self.host, self.server_port())

logger.info("CONNECT: %s", local_connection_string)
self.conn = psycopg2.connect(local_connection_string)
self.conn.autocommit = True
self.cursor_holder = self.conn.cursor()

Expand Down Expand Up @@ -92,7 +97,7 @@ def is_leader(self):
def is_running(self):
return os.system("pg_ctl status -D %s > /dev/null" % self.data_dir) == 0

def start(self):
def start(self, master=False):
if self.is_running():
logger.error("Cannot start PostgreSQL because one is already running.")
return False
Expand All @@ -102,26 +107,43 @@ def start(self):
os.remove(pid_path)
logger.info("Removed %s" % pid_path)

self.master = master
if master:
logger.info("Starting PostgreSQL in Master mode")
else:
logger.info("Starting PostgreSQL in Slave mode")

command_code = os.system("postgres -D %s %s &" % (self.data_dir, self.server_options()))
while not self.is_running():
time.sleep(5)
return command_code != 0

def stop(self):
logger.info("Stopping PostgreSQL")
return os.system("pg_ctl stop -w -D %s -m fast -w" % self.data_dir) != 0

def reload(self):
return os.system("pg_ctl reload -w -D %s" % self.data_dir) == 0

def restart(self):
return os.system("pg_ctl restart -w -D %s -m fast" % self.data_dir) == 0
def restart(self, master=False):
self.master = master
if master:
logger.info("Restarting PostgreSQL in Master mode")
else:
logger.info("Restarting PostgreSQL in Slave mode")

return os.system("pg_ctl restart -w -D %s -o \"%s\" -m fast" % (self.data_dir, self.server_options())) == 0

def server_options(self):
options = "-c listen_addresses=%s -c port=%s" % (self.host, self.port)
options = "-c listen_addresses=%s -c port=%s" % (self.host, self.server_port())
for setting, value in self.config["parameters"].iteritems():
options += " -c \"%s=%s\"" % (setting, value)
return options

def server_port(self):
logger.info("MASTER: %s", self.master)
return self.port if self.master else self.read_only_port

def is_healthy(self):
if not self.is_running():
logger.warning("Postgresql is not running.")
Expand Down Expand Up @@ -178,6 +200,7 @@ def write_recovery_conf(self, leader_hash):
""" % {"recovery_slot": self.name})
if leader_hash is not None:
leader = urlparse(leader_hash["address"])
logger.info("Write Recovery Conf: %s:%s", leader.hostname, leader.port)
f.write("""
primary_conninfo = 'user=%(user)s password=%(password)s host=%(hostname)s port=%(port)s sslmode=prefer sslcompression=1'
""" % {"user": leader.username, "password": leader.password, "hostname": leader.hostname, "port": leader.port})
Expand All @@ -202,13 +225,20 @@ def follow_no_leader(self):
return True

def promote(self):
self.stop()
self.start(master=True)
return os.system("pg_ctl promote -w -D %s" % self.data_dir) == 0
# self.restart(master=True)

def demote(self, leader):
self.write_recovery_conf(leader)
self.restart()
# self.restart()
self.stop()
self.start(master=False)

def create_replication_user(self):
#logger.info("Governor Starting Up: Running postgres single user mode to create repliaction user")
#os.system("postgres --single -jE << CREATE USER '%s' WITH REPLICATION ENCRYPTED PASSWORD '%s';" % (self.replication["username"], self.replication["password"]))
self.query("CREATE USER \"%s\" WITH REPLICATION ENCRYPTED PASSWORD '%s';" % (self.replication["username"], self.replication["password"]))

def create_replication_slot(self, member):
Expand Down

0 comments on commit 6c753d2

Please sign in to comment.