Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

@dataclass decorator #14

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
162 changes: 86 additions & 76 deletions redset/locks.py
Original file line number Diff line number Diff line change
@@ -1,93 +1,103 @@
"""
Locks used to synchronize mutations on queues.

Locks used to synchronize mutations on queues in Redis.
"""

from dataclasses import dataclass
import time
from typing import Optional, Any

from redset.exceptions import LockTimeout

__all__ = (
'Lock',
)

__all__ = ['Lock']

# redis or redis-py truncates timestamps to the hundredth
REDIS_TIME_PRECISION = 0.01


class Lock(object):
@dataclass
class Lock:
"""
Context manager that implements a distributed lock with redis.

Based on Chris Lamb's version
(https://chris-lamb.co.uk/posts/distributing-locking-python-and-redis)

A distributed lock implementation using Redis SETNX and GETSET.

Implements a context manager pattern for distributed locking. Based on
Chris Lamb's implementation.

Example:
>>> with Lock(redis_client, 'my_lock'):
... print("Critical section")

Attributes:
redis: Redis client instance
key: Unique key for the lock
expires: Time in seconds after which the lock is considered stale (default: 20)
timeout: Maximum time in seconds to wait for lock acquisition (default: 10)
poll_interval: Time in seconds between lock acquisition attempts (default: 0.2)
"""
def __init__(self,
redis,
key,
expires=None,
timeout=None,
poll_interval=None,
):

redis: Any # Type hint is Any since redis client type varies
key: str
expires: float = 20.0
timeout: float = 10.0
poll_interval: float = 0.2

def __post_init__(self) -> None:
"""Validate initialization parameters."""
if self.poll_interval < REDIS_TIME_PRECISION:
raise ValueError(
f"Poll interval must be >= {REDIS_TIME_PRECISION} due to Redis precision limits"
)
if self.expires <= 0:
raise ValueError("Expires must be positive")
if self.timeout < 0:
raise ValueError("Timeout must be non-negative")

def _try_acquire_lock(self, expires: float) -> bool:
"""
Distributed locking using Redis SETNX and GETSET.

Usage::

with Lock('my_lock'):
print "Critical section"

:param redis: the redis client
:param key: the key the lock is labeled with
:param timeout: If another client has already obtained the lock,
sleep for a maximum of ``timeout`` seconds before
giving up. A value of 0 means we never wait. Defaults to 10.
:param expires: We consider any existing lock older than
``expires`` seconds to be invalid in order to
detect crashed clients. This value must be higher
than it takes the critical section to execute. Defaults to 20.
:param poll_interval: How often we should poll for lock acquisition.
Note that poll intervals below 0.01 don't make sense since
timestamps stored in redis are truncated to the hundredth.
Defaults to 0.2.
:raises: LockTimeout

Attempt to acquire the lock.

Args:
expires: Timestamp when the lock should expire

Returns:
bool: True if lock was acquired, False otherwise
"""
self.redis = redis
self.key = key
self.timeout = timeout or 10
self.expires = expires or 20
self.poll_interval = poll_interval or 0.2

def __enter__(self):
timeout = self.timeout

while timeout >= 0:
# Try to acquire a new lock
if self.redis.setnx(self.key, expires):
return True

# Check if existing lock is expired
current_value = self.redis.get(self.key)
if not current_value:
return False

# Account for Redis precision when checking expiration
adjusted_time = float(current_value) + REDIS_TIME_PRECISION
if adjusted_time < time.time():
# Try to replace expired lock
if self.redis.getset(self.key, expires) == current_value:
return True

return False

def __enter__(self) -> None:
"""
Enter the context manager, acquiring the lock.

Raises:
LockTimeout: If the lock cannot be acquired within the timeout period
"""
remaining_timeout = self.timeout

while remaining_timeout >= 0:
expires = time.time() + self.expires

if self.redis.setnx(self.key, expires):
# We gained the lock; enter critical section

if self._try_acquire_lock(expires):
return

current_value = self.redis.get(self.key)

# We found an expired lock and nobody raced us to replacing it
has_expired = (
current_value and
# bump the retrieved time by redis' precision so that we don't
# erroneously consider a recently acquired lock as expired
(float(current_value) + REDIS_TIME_PRECISION) < time.time() and
self.redis.getset(self.key, expires) == current_value
)
if has_expired:
return

timeout -= self.poll_interval

remaining_timeout -= self.poll_interval
time.sleep(self.poll_interval)

raise LockTimeout("Timeout while waiting for lock '%s'" % self.key)

def __exit__(self, exc_type, exc_value, traceback):

raise LockTimeout(f"Timeout while waiting for lock '{self.key}'")

def __exit__(self, exc_type: Optional[type],
exc_val: Optional[Exception],
exc_tb: Optional[Any]) -> None:
"""Exit the context manager, releasing the lock."""
self.redis.delete(self.key)