-
-
Notifications
You must be signed in to change notification settings - Fork 51
/
hifi_singleton.py
160 lines (134 loc) · 4.99 KB
/
hifi_singleton.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
import json
import logging
import os
import platform
import time
try:
import fcntl
except ImportError:
fcntl = None
try:
import msvcrt
except ImportError:
msvcrt = None
logger = logging.getLogger(__name__)
# Used to ensure only one instance of the script runs at a time
class Singleton:
def __init__(self, path):
self.fh = None
self.windows = 'Windows' == platform.system()
self.path = path
def __enter__(self):
success = False
while not success:
try:
if self.windows:
if os.path.exists(self.path):
os.unlink(self.path)
self.fh = os.open(self.path, os.O_CREAT | os.O_EXCL | os.O_RDWR)
else:
self.fh = open(self.path, 'x')
fcntl.lockf(self.fh, fcntl.LOCK_EX | fcntl.LOCK_NB)
success = True
except EnvironmentError as err:
if self.fh is not None:
if self.windows:
os.close(self.fh)
else:
self.fh.close()
self.fh = None
# print is horked here so write directly to stdout.
with open(1, mode="w", closefd=False) as _stdout:
_stdout.write(f"Couldn't aquire lock {self.path}, retrying in 10 seconds\n")
_stdout.flush()
time.sleep(10)
return self
def __exit__(self, type, value, traceback):
if self.windows:
os.close(self.fh)
else:
fcntl.lockf(self.fh, fcntl.LOCK_UN)
self.fh.close()
os.unlink(self.path)
class FLock:
"""
File locking context manager
>> with FLock("/tmp/foo.lock"):
>> do_something_that_must_be_synced()
The lock file must stick around forever. The author is not aware of a no cross platform way to clean it up w/o introducting race conditions.
"""
def __init__(self, path):
self.fh = os.open(path, os.O_CREAT | os.O_RDWR)
self.path = path
def _lock_posix(self):
try:
fcntl.lockf(self.fh, fcntl.LOCK_EX | fcntl.LOCK_NB)
except BlockingIOError:
# Windows sleeps for 10 seconds before giving up on a lock.
# Lets mimic that behavior.
time.sleep(10)
return False
else:
return True
def _lock_windows(self):
try:
msvcrt.locking(self.fh, msvcrt.LK_LOCK, 1)
except OSError:
return False
else:
return True
if fcntl is not None:
_lock = _lock_posix
elif msvcrt is not None:
_lock = _lock_windows
else:
raise RuntimeError("No locking library found")
def read_stats(self):
data = {}
with open(self.fh, mode="r", closefd=False) as stats_file:
stats_file.seek(0)
try:
data = json.loads(stats_file.read())
except json.decoder.JSONDecodeError:
logger.warning("couldn't decode json in lock file")
except PermissionError:
# Can't read a locked file on Windows :(
pass
lock_age = time.time() - os.fstat(self.fh).st_mtime
if lock_age > 0:
data["Age"] = "%0.2f" % lock_age
with open(1, mode="w", closefd=False) as _stdout:
_stdout.write("Lock stats:\n")
for key, value in sorted(data.items()):
_stdout.write("* %s: %s\n" % (key, value))
_stdout.flush()
def write_stats(self):
stats = {
"Owner PID": os.getpid(),
}
flock_env_vars = os.getenv("FLOCK_ENV_VARS")
if flock_env_vars:
for env_var_name in flock_env_vars.split(":"):
stats[env_var_name] = os.getenv(env_var_name)
with open(self.fh, mode="w", closefd=False) as stats_file:
stats_file.truncate()
return stats_file.write(json.dumps(stats, indent=2))
def __enter__(self):
while not self._lock():
try:
self.read_stats()
except (IOError, ValueError) as exc:
logger.exception("couldn't read stats")
time.sleep(3.33) # don't hammer the file
self.write_stats()
return self
def __exit__(self, type, value, traceback):
os.close(self.fh)
# WARNING: `os.close` gives up the lock on `fh` then we attempt the `os.unlink`. On posix platforms this can lead to us deleting a lock file that another process owns. This step is required to maintain compatablity with Singleton. When and if FLock is completely rolled out to the build fleet this unlink should be removed.
try:
os.unlink(self.path)
except (FileNotFoundError, PermissionError):
logger.exception("couldn't unlink lock file")
if os.getenv("USE_FLOCK_CLS") is not None:
logger.warning("Using FLock locker")
Singleton = FLock