Skip to content

Commit

Permalink
Add retries logic for headless Sentinel
Browse files Browse the repository at this point in the history
  • Loading branch information
rad-pat committed Jan 18, 2024
1 parent 621f93a commit 8105c69
Showing 1 changed file with 29 additions and 10 deletions.
39 changes: 29 additions & 10 deletions redis_stomp/redis_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import redis.retry
from redis.asyncio import Redis, Sentinel, RedisCluster
from redis.asyncio.retry import Retry
from redis.backoff import FullJitterBackoff
from redis.backoff import FullJitterBackoff, NoBackoff

R = TypeVar('R', bound=redis.Redis)
CLIENT_NAME = socket.gethostname().rsplit('-', 2)[0]
Expand Down Expand Up @@ -149,23 +149,42 @@ def connect(redis_url: str, read_only: bool = False, socket_timeout: float = No
)

elif rinfo.sentinel: # We're connecting to a sentinel cluster.
# establish how many retries would be required to check all IPs of a headless service
min_retries = max([
len(socket.getaddrinfo(host, port, type=socket.SOCK_STREAM))
for host, port in rinfo.hosts
])
sentinel_connection = redis.Sentinel(
rinfo.hosts, socket_timeout=socket_timeout or rinfo.socket_timeout,
db=rinfo.database, password=rinfo.password,
health_check_interval=30,
retry=redis.retry.Retry(FullJitterBackoff(), 1),
client_name=CLIENT_NAME,
rinfo.hosts,
sentinel_kwargs={
'retry_on_timeout': True, # required for retry on socket timeout for the async class
'retry': redis.retry.Retry(NoBackoff(), min_retries),
'socket_timeout': socket_timeout or rinfo.socket_timeout,
'health_check_interval': 30,
'client_name': CLIENT_NAME,
},
## connection kwargs that will be applied to masters/slaves if not overridden
db=rinfo.database,
password=rinfo.password,
health_check_interval=30,
retry_on_timeout=True, # required for retry on socket timeout for the async class
retry=redis.retry.Retry(FullJitterBackoff(), 1),
client_name=CLIENT_NAME,
)
if read_only:
return sentinel_connection.slave_for(
rinfo.service_name, socket_timeout=socket_timeout or rinfo.socket_timeout,
redis_class=redis_class, decode_responses=decode_responses,
rinfo.service_name,
redis_class=redis_class,
socket_timeout=socket_timeout or rinfo.socket_timeout,
decode_responses=decode_responses,
client_name=CLIENT_NAME,
)
else:
return sentinel_connection.master_for(
rinfo.service_name, socket_timeout=socket_timeout or rinfo.socket_timeout,
redis_class=redis_class, decode_responses=decode_responses,
rinfo.service_name,
redis_class=redis_class,
socket_timeout=socket_timeout or rinfo.socket_timeout,
decode_responses=decode_responses,
client_name=CLIENT_NAME,
)
else: # Single redis instance
Expand Down

0 comments on commit 8105c69

Please sign in to comment.