forked from habataku/P
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathcache.py
23 lines (19 loc) · 800 Bytes
/
cache.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
from functools import lru_cache, wraps
from datetime import datetime, timedelta
from threading import Lock
def cache(seconds: int, max_size: int = 128, typed: bool = False):
def wrapper(f):
func = lru_cache(maxsize=max_size, typed=typed)(f)
func.ttl = seconds
func.expire = datetime.utcnow() + timedelta(seconds=func.ttl)
@wraps(f)
def inner(*args, **kwargs):
with Lock():
if datetime.utcnow() > func.expire:
func.cache_clear()
func.expire = datetime.utcnow() + timedelta(seconds=func.ttl)
return func(*args, **kwargs)
inner.clear_cache = func.cache_clear
inner.cache_info = func.cache_info
return inner
return wrapper