Skip to content

Commit

Permalink
Attempt to handle headless sentinel service better
Browse files Browse the repository at this point in the history
  • Loading branch information
rad-pat committed Jan 19, 2024
1 parent b4f235c commit 94a1452
Showing 1 changed file with 209 additions and 40 deletions.
249 changes: 209 additions & 40 deletions redis_stomp/redis_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ class ParsedRedisURL(NamedTuple):
database: int = 0
cluster: bool = False
# options:
headless: bool = False
quorum: int = 0


def parse_url(url: str) -> ParsedRedisURL:
Expand All @@ -31,6 +33,9 @@ def parse_url(url: str) -> ParsedRedisURL:
def is_sentinel():
return url.scheme == 'redis+sentinel' or url.scheme == 'sentinel'

def is_headless():
return 'headless' in url.scheme

def is_cluster():
return url.scheme == 'redis-cluster'

Expand Down Expand Up @@ -67,6 +72,7 @@ def parse_host(s: str):
'client_type': str,
'socket_timeout': float,
'socket_connect_timeout': float,
'quorum': int,
}
options = {}

Expand Down Expand Up @@ -115,9 +121,133 @@ def parse_host(s: str):
master=(client_type == "master"),
service_name=service_name,
database=db,
headless=is_headless(),
quorum=options.get("quorum", 0),
)


class HeadlessSentinel(Sentinel):
def __init__(
self,
headless_host, port,
min_other_sentinels=0,
sentinel_kwargs=None,
**connection_kwargs,
):
self.headless_host = headless_host
self.port = port
super().__init__(
sentinels=[(ip, port) for ip in self.sentinels_from_headless_host()],
min_other_sentinels=min_other_sentinels,
sentinel_kwargs=sentinel_kwargs,
**connection_kwargs,
)

def sentinels_from_headless_host(self) -> list[str]:
host, alias_list, ip_list = socket.gethostbyname_ex(self.headless_host)
return ip_list

def refresh_sentinels(self):
self.sentinels = [
Redis(host=ip, port=self.port, **self.sentinel_kwargs)
for ip, port in self.sentinels_from_headless_host()
]

# async def execute_command(self, *args, **kwargs):
# """
# Execute Sentinel command in sentinel nodes.
# once - If set to True, then execute the resulting command on a single
# node at random, rather than across the entire sentinel cluster.
# """
# try:
# return await super().execute_command(*args, **kwargs)
# except DEFAULT_RETRY_ERRORS:
# if not kwargs.get('sentinels_refreshed', False):
# self.refresh_sentinels()
# kwargs['sentinels_refreshed'] = True
# return await self.execute_command(*args, **kwargs)
# raise

async def discover_master(self, service_name, has_refreshed: bool = False):
try:
return await super().discover_master(service_name)
except DEFAULT_RETRY_ERRORS:
if not has_refreshed:
self.refresh_sentinels()
return await self.discover_master(service_name, True)
raise

async def discover_slaves(self, service_name, has_refreshed: bool = False):
try:
return await super().discover_slaves(service_name)
except DEFAULT_RETRY_ERRORS:
if not has_refreshed:
self.refresh_sentinels()
return await self.discover_slaves(service_name, True)
raise


class HeadlessSentinelSync(redis.Sentinel):
def __init__(
self,
headless_host, port,
min_other_sentinels=0,
sentinel_kwargs=None,
**connection_kwargs,
):
self.headless_host = headless_host
self.port = port
super().__init__(
sentinels=[(ip, port) for ip in self.sentinels_from_headless_host()],
min_other_sentinels=min_other_sentinels,
sentinel_kwargs=sentinel_kwargs,
**connection_kwargs,
)

def sentinels_from_headless_host(self) -> list[str]:
host, alias_list, ip_list = socket.gethostbyname_ex(self.headless_host)
return ip_list

def refresh_sentinels(self):
self.sentinels = [
redis.Redis(host=ip, port=self.port, **self.sentinel_kwargs)
for ip, port in self.sentinels_from_headless_host()
]
#
# async def execute_command(self, *args, **kwargs):
# """
# Execute Sentinel command in sentinel nodes.
# once - If set to True, then execute the resulting command on a single
# node at random, rather than across the entire sentinel cluster.
# """
# try:
# return await super().execute_command(*args, **kwargs)
# except DEFAULT_RETRY_ERRORS:
# if not kwargs.get('sentinels_refreshed', False):
# self.refresh_sentinels()
# kwargs['sentinels_refreshed'] = True
# return await self.execute_command(*args, **kwargs)
# raise

def discover_master(self, service_name, has_refreshed: bool = False):
try:
return super().discover_master(service_name)
except DEFAULT_RETRY_ERRORS:
if not has_refreshed:
self.refresh_sentinels()
return self.discover_master(service_name, True)
raise

def discover_slaves(self, service_name, has_refreshed: bool = False):
try:
return super().discover_slaves(service_name)
except DEFAULT_RETRY_ERRORS:
if not has_refreshed:
self.refresh_sentinels()
return self.discover_slaves(service_name, True)
raise


def connect(redis_url: str, read_only: bool = False, socket_timeout: float = None,
redis_class: Type[R] = redis.Redis, decode_responses: bool = False) -> [R, redis.RedisCluster]:
"""Returns a synchronous redis-py connection
Expand Down Expand Up @@ -150,25 +280,45 @@ def connect(redis_url: str, read_only: bool = False, socket_timeout: float = No

elif rinfo.sentinel: # We're connecting to a sentinel cluster.
# establish how many retries would be required to check all IPs of a headless service
min_retries = max([
len(socket.getaddrinfo(host, port, type=socket.SOCK_STREAM))
for host, port in rinfo.hosts
])
sentinel_connection = redis.Sentinel(
rinfo.hosts,
sentinel_kwargs={
'retry': redis.retry.Retry(NoBackoff(), min_retries),
'socket_timeout': socket_timeout or rinfo.socket_timeout,
'health_check_interval': 30,
'client_name': CLIENT_NAME,
},
## connection kwargs that will be applied to masters/slaves if not overridden
db=rinfo.database,
password=rinfo.password,
health_check_interval=30,
retry=redis.retry.Retry(FullJitterBackoff(), 1),
client_name=CLIENT_NAME,
)
if rinfo.headless:
# min_retries = max([
# len(socket.getaddrinfo(host, port, type=socket.SOCK_STREAM))
# for host, port in rinfo.hosts
# ])
sentinel_connection = HeadlessSentinelSync(
rinfo.hosts[0][0],
rinfo.hosts[0][1],
min_other_sentinels=rinfo.quorum or 0,
sentinel_kwargs={
'retry': redis.retry.Retry(NoBackoff(), 3),
'socket_timeout': socket_timeout or rinfo.socket_timeout,
'health_check_interval': 30,
'client_name': CLIENT_NAME,
},
## connection kwargs that will be applied to masters/slaves if not overridden
db=rinfo.database,
password=rinfo.password,
health_check_interval=30,
retry=redis.retry.Retry(FullJitterBackoff(), 1),
client_name=CLIENT_NAME,
)
else:
sentinel_connection = redis.Sentinel(
rinfo.hosts,
min_other_sentinels=rinfo.quorum or 0,
sentinel_kwargs={
'retry': redis.retry.Retry(NoBackoff(), 3),
'socket_timeout': socket_timeout or rinfo.socket_timeout,
'health_check_interval': 30,
'client_name': CLIENT_NAME,
},
## connection kwargs that will be applied to masters/slaves if not overridden
db=rinfo.database,
password=rinfo.password,
health_check_interval=30,
retry=redis.retry.Retry(FullJitterBackoff(), 1),
client_name=CLIENT_NAME,
)
if read_only:
return sentinel_connection.slave_for(
rinfo.service_name,
Expand Down Expand Up @@ -233,27 +383,46 @@ def aio_connect(redis_url: str, read_only: bool = False, socket_timeout: float =
)
elif rinfo.sentinel:
# We're connecting to a sentinel cluster.
min_retries = max([
len(socket.getaddrinfo(host, port, type=socket.SOCK_STREAM))
for host, port in rinfo.hosts
])
sentinel_connection = Sentinel(
rinfo.hosts,
sentinel_kwargs={
'retry_on_timeout': True, # required for retry on socket timeout for the async class
'retry': Retry(NoBackoff(), min_retries),
'socket_timeout': socket_timeout or rinfo.socket_timeout,
'health_check_interval': 30,
'client_name': CLIENT_NAME,
},
## connection kwargs that will be applied to masters/slaves if not overridden
db=rinfo.database,
password=rinfo.password,
health_check_interval=30,
retry_on_timeout=True, # required for retry on socket timeout for the async class
retry=Retry(FullJitterBackoff(), 1),
client_name=CLIENT_NAME,
)
if rinfo.headless:
# min_retries = max([
# len(socket.getaddrinfo(host, port, type=socket.SOCK_STREAM))
# for host, port in rinfo.hosts
# ])
sentinel_connection = HeadlessSentinel(
rinfo.hosts[0][0],
rinfo.hosts[0][1],
min_other_sentinels=rinfo.quorum or 0,
sentinel_kwargs={
'retry': redis.retry.Retry(NoBackoff(), 3),
'socket_timeout': socket_timeout or rinfo.socket_timeout,
'health_check_interval': 30,
'client_name': CLIENT_NAME,
},
## connection kwargs that will be applied to masters/slaves if not overridden
db=rinfo.database,
password=rinfo.password,
health_check_interval=30,
retry=redis.retry.Retry(FullJitterBackoff(), 1),
client_name=CLIENT_NAME,
)
else:
sentinel_connection = Sentinel(
rinfo.hosts,
sentinel_kwargs={
'retry_on_timeout': True, # required for retry on socket timeout for the async class
'retry': Retry(NoBackoff(), 3),
'socket_timeout': socket_timeout or rinfo.socket_timeout,
'health_check_interval': 30,
'client_name': CLIENT_NAME,
},
## connection kwargs that will be applied to masters/slaves if not overridden
db=rinfo.database,
password=rinfo.password,
health_check_interval=30,
retry_on_timeout=True, # required for retry on socket timeout for the async class
retry=Retry(FullJitterBackoff(), 1),
client_name=CLIENT_NAME,
)
if read_only:
return sentinel_connection.slave_for(
rinfo.service_name,
Expand Down

0 comments on commit 94a1452

Please sign in to comment.