From eac5399f51fcdc0cd72a8a7d798cdc9921eabacf Mon Sep 17 00:00:00 2001 From: Michael Mendy Date: Thu, 16 Jan 2025 10:58:54 -0800 Subject: [PATCH] @dataclass decorator - Used `@dataclass` decorator - Simplifies class definition with `automatic __init__` - Makes the class structure more declarative - Adds type hints for all attributes --- redset/locks.py | 162 +++++++++++++++++++++++++----------------------- 1 file changed, 86 insertions(+), 76 deletions(-) diff --git a/redset/locks.py b/redset/locks.py index 8a5dd31..6092878 100644 --- a/redset/locks.py +++ b/redset/locks.py @@ -1,93 +1,103 @@ """ -Locks used to synchronize mutations on queues. - +Locks used to synchronize mutations on queues in Redis. """ - +from dataclasses import dataclass import time +from typing import Optional, Any from redset.exceptions import LockTimeout -__all__ = ( - 'Lock', -) - +__all__ = ['Lock'] -# redis or redis-py truncates timestamps to the hundredth REDIS_TIME_PRECISION = 0.01 - -class Lock(object): +@dataclass +class Lock: """ - Context manager that implements a distributed lock with redis. - - Based on Chris Lamb's version - (https://chris-lamb.co.uk/posts/distributing-locking-python-and-redis) - + A distributed lock implementation using Redis SETNX and GETSET. + + Implements a context manager pattern for distributed locking. Based on + Chris Lamb's implementation. + + Example: + >>> with Lock(redis_client, 'my_lock'): + ... print("Critical section") + + Attributes: + redis: Redis client instance + key: Unique key for the lock + expires: Time in seconds after which the lock is considered stale (default: 20) + timeout: Maximum time in seconds to wait for lock acquisition (default: 10) + poll_interval: Time in seconds between lock acquisition attempts (default: 0.2) """ - def __init__(self, - redis, - key, - expires=None, - timeout=None, - poll_interval=None, - ): + + redis: Any # Type hint is Any since redis client type varies + key: str + expires: float = 20.0 + timeout: float = 10.0 + poll_interval: float = 0.2 + + def __post_init__(self) -> None: + """Validate initialization parameters.""" + if self.poll_interval < REDIS_TIME_PRECISION: + raise ValueError( + f"Poll interval must be >= {REDIS_TIME_PRECISION} due to Redis precision limits" + ) + if self.expires <= 0: + raise ValueError("Expires must be positive") + if self.timeout < 0: + raise ValueError("Timeout must be non-negative") + + def _try_acquire_lock(self, expires: float) -> bool: """ - Distributed locking using Redis SETNX and GETSET. - - Usage:: - - with Lock('my_lock'): - print "Critical section" - - :param redis: the redis client - :param key: the key the lock is labeled with - :param timeout: If another client has already obtained the lock, - sleep for a maximum of ``timeout`` seconds before - giving up. A value of 0 means we never wait. Defaults to 10. - :param expires: We consider any existing lock older than - ``expires`` seconds to be invalid in order to - detect crashed clients. This value must be higher - than it takes the critical section to execute. Defaults to 20. - :param poll_interval: How often we should poll for lock acquisition. - Note that poll intervals below 0.01 don't make sense since - timestamps stored in redis are truncated to the hundredth. - Defaults to 0.2. - :raises: LockTimeout - + Attempt to acquire the lock. + + Args: + expires: Timestamp when the lock should expire + + Returns: + bool: True if lock was acquired, False otherwise """ - self.redis = redis - self.key = key - self.timeout = timeout or 10 - self.expires = expires or 20 - self.poll_interval = poll_interval or 0.2 - - def __enter__(self): - timeout = self.timeout - - while timeout >= 0: + # Try to acquire a new lock + if self.redis.setnx(self.key, expires): + return True + + # Check if existing lock is expired + current_value = self.redis.get(self.key) + if not current_value: + return False + + # Account for Redis precision when checking expiration + adjusted_time = float(current_value) + REDIS_TIME_PRECISION + if adjusted_time < time.time(): + # Try to replace expired lock + if self.redis.getset(self.key, expires) == current_value: + return True + + return False + + def __enter__(self) -> None: + """ + Enter the context manager, acquiring the lock. + + Raises: + LockTimeout: If the lock cannot be acquired within the timeout period + """ + remaining_timeout = self.timeout + + while remaining_timeout >= 0: expires = time.time() + self.expires - - if self.redis.setnx(self.key, expires): - # We gained the lock; enter critical section + + if self._try_acquire_lock(expires): return - - current_value = self.redis.get(self.key) - - # We found an expired lock and nobody raced us to replacing it - has_expired = ( - current_value and - # bump the retrieved time by redis' precision so that we don't - # erroneously consider a recently acquired lock as expired - (float(current_value) + REDIS_TIME_PRECISION) < time.time() and - self.redis.getset(self.key, expires) == current_value - ) - if has_expired: - return - - timeout -= self.poll_interval + + remaining_timeout -= self.poll_interval time.sleep(self.poll_interval) - - raise LockTimeout("Timeout while waiting for lock '%s'" % self.key) - - def __exit__(self, exc_type, exc_value, traceback): + + raise LockTimeout(f"Timeout while waiting for lock '{self.key}'") + + def __exit__(self, exc_type: Optional[type], + exc_val: Optional[Exception], + exc_tb: Optional[Any]) -> None: + """Exit the context manager, releasing the lock.""" self.redis.delete(self.key)