Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MAINT, ENH, TST: Butler.Memory #217

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions ec2/templates/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ nextbackenddocker:
- rabbitmqredis:RABBITREDIS
- mongodb:MONGODB
- minionworker:MINIONWORKER
- minionredis:6379
environment:
- PYTHONUNBUFFERED=TRUE
- PYTHONPATH=:/next_backend
Expand Down
1 change: 1 addition & 0 deletions local/docker-compose.yml.pre
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ nextbackenddocker:
- rabbitmqredis:RABBITREDIS
- mongodb:MONGODB
- minionworker:MINIONWORKER
- minionredis:6379
environment:
- PYTHONUNBUFFERED=TRUE
- PYTHONPATH=:/next_backend
Expand Down
273 changes: 175 additions & 98 deletions next/apps/Butler.py
Original file line number Diff line number Diff line change
@@ -1,129 +1,203 @@
import os
import sys
import StringIO
from functools import wraps
import pickle
import numpy as np
import traceback
import time
import ast
import inspect
from pprint import pprint
from functools import partial
import itertools

import redis

import next.constants as constants
import next.utils as utils


def mem_except_wrap(func):
def f(*args, **kwargs):
try:
return func(*args, **kwargs)
except Exception as e:
utils.debug_print("Butler.Collection.Memory.get exception: {}".format(e))
utils.debug_print(traceback.format_exc())
return f


class Memory(object):
def __init__(self, collection='', exp_uid='', uid_prefix=''):
self.key_prefix = collection + uid_prefix.format(exp_uid=exp_uid)
self.cache = None
self.max_entry_size = 500000000 # 500MB

def check_prefix(self):
if self.key_prefix == '':
utils.debug_print("butler.memory is deprecated."
" Change to butler.experiment.memory or butler.algorithm.memory, etc."
" wherever appropriate")

def ensure_connection(self):
try:
if self.cache is None:
self.cache = redis.StrictRedis(host=constants.MINIONREDIS_HOST, port=constants.MINIONREDIS_PORT)
except Exception as e:
raise Exception("Butler.Collection.Memory could not connect with RedisDB: {}".format(e))

def num_entries(self, size):
fns = inspect.getmembers(self, predicate=inspect.ismethod)
for name, fn in fns:
if name[0] == '_' or 'ensure_connection' in name:
continue
self.name = self._check_prefix(self._ensure_connection(fn))

def _check_prefix(self, fn=None):
def f(*args, **kwargs):
if self.key_prefix == '':
utils.debug_print("butler.memory is deprecated."
" Change to butler.experiment.memory or butler.algorithm.memory, etc."
" wherever appropriate")
return fn(*args, **kwargs)
return f

def _ensure_connection(self, fn=None):
if self.cache is None:
self.cache = redis.StrictRedis(host='minionredis_1')
return fn


def _num_entries(self, size):
if size % self.max_entry_size == 0:
return size / self.max_entry_size
else:
return (size / self.max_entry_size) + 1

def set(self, key, value):
self.check_prefix()
@mem_except_wrap
def set(self, key=None, value=None, uid='', verbose=True):
key = self.key_prefix + key + uid
l = sys.getsizeof(value)
if l < self.max_entry_size and not isinstance(value, np.ndarray):
return self.cache.set(key, value)
value = pickle.dumps(value)
n = self._num_entries(l)
if verbose:
utils.debug_print("Butler.py memory setting {} in {} entries".format(l, n))
for i in range(n):
k = key + ":" + str(i)
self.cache.set(k, value[i*self.max_entry_size:(i+1)*self.max_entry_size])
return self.cache.set(key, "__redis_set__" + "{}:{}".format(str(n), str(l)))

@mem_except_wrap
def set_file(self, key, f, verbose=False):
key = self.key_prefix + key
try:
self.ensure_connection()
l = len(value)
n = self.num_entries(l)
utils.debug_print("Setting {} in {} entries".format(l, n))
for i in range(n):
k = key + ":" + str(i)
self.cache.set(k, value[i*self.max_entry_size:(i+1)*self.max_entry_size])
return self.cache.set(key, "{}:{}".format(str(n), str(l)))
except Exception as e:
utils.debug_print("Butler.Collection.Memory.set exception: {}".format(e))
return False
f.seek(0, os.SEEK_END)
l = f.tell()
f.seek(0, 0)
n = self._num_entries(l)
if verbose:
utils.debug_print("butler.py memory setting {} bytes in {} entries".format(l, n))
for i in range(n):
k = key + ":" + str(i)
v = f.read(self.max_entry_size)
self.cache.set(k, v)
return self.cache.set(key, "{}:{}".format(str(n), str(l)))

@mem_except_wrap
def get(self, key=None, uid='', verbose=False):
key = self.key_prefix + key + uid
d = self.cache.get(key)
msg = '__redis_set__'
pickled = isinstance(d, str) and len(d) >= len(msg) and d[:len(msg)] == msg
if not pickled:
return self._from_db_fmt(d)

d = d[len(msg):]
n, l = d.split(":")
l = int(l)
n = int(n)
ans = ""
if verbose:
utils.debug_print("Getting {} bytes in {} entries".format(l, n))
for i in range(n):
k = key + ":" + str(i)
ans += self.cache.get(k)
return pickle.loads(ans)

def set_file(self, key, f):
self.check_prefix()
@mem_except_wrap
def get_file(self, key):
key = self.key_prefix + key
try:
self.ensure_connection()
f.seek(0, os.SEEK_END)
l = f.tell()
f.seek(0, 0)
n = self.num_entries(l)
utils.debug_print("Setting {} bytes in {} entries".format(l, n))
for i in range(n):
k = key + ":" + str(i)
v = f.read(self.max_entry_size)
self.cache.set(k, v)
return self.cache.set(key, "{}:{}".format(str(n), str(l)))
except Exception as e:
utils.debug_print("Butler.Collection.Memory.set_file exception: {}".format(e))
return False
d = self.cache.get(key)
f = StringIO.StringIO()
n, l = d.split(":")
l = int(l)
n = int(n)
utils.debug_print("Getting {} bytes in {} entries".format(l, n))
for i in range(n):
k = key + ":" + str(i)
f.write(self.cache.get(k))
f.seek(0, 0)
return f

@mem_except_wrap
def lock(self, name, **kwargs):
name = self.key_prefix + name
return self.cache.lock(name, **kwargs)

@mem_except_wrap
def exists(self, key, uid=''):
key = self.key_prefix + key + uid
return self.cache.exists(key)

@mem_except_wrap
def append(self, key=None, value=None, uid=''):
if not isinstance(value, list):
value = [value]
if not self.exists(key, uid=uid):
old = "[]"
else:
old_key = self.key_prefix + key + uid
old = self.cache.get(old_key)
old = ast.literal_eval(old)

def get(self, key):
self.check_prefix()
try:
self.ensure_connection()
key = self.key_prefix + key
d = self.cache.get(key)
n, l = d.split(":")
l = int(l)
n = int(n)
ans = ""
utils.debug_print("Getting {} bytes in {} entries".format(l, n))
for i in range(n):
k = key + ":" + str(i)
ans += self.cache.get(k)
return ans
except Exception as e:
utils.debug_print("Butler.Collection.Memory.get exception: {}".format(e))
return None
key = self.key_prefix + key + uid
return self.cache.set(key, old + value)

def get_file(self, key):
self.check_prefix()
try:
self.ensure_connection()
key = self.key_prefix + key
d = self.cache.get(key)
f = StringIO.StringIO()
n, l = d.split(":")
l = int(l)
n = int(n)
utils.debug_print("Getting {} bytes in {} entries".format(l, n))
for i in range(n):
k = key + ":" + str(i)
f.write(self.cache.get(k))
f.seek(0, 0)
return f
except Exception as e:
utils.debug_print("Butler.Collection.Memory.get_file exception: {}".format(e))
return None
@mem_except_wrap
def increment(self, key=None, value=1, uid=''):
key = key + uid
key = self.key_prefix + key
return self.cache.incr(key, amount=value)

@mem_except_wrap
def set_many(self, key_value_dict=None, uid=''):
with self.cache.pipeline() as pipe:
for k, v in key_value_dict.items():
k = self.key_prefix + k + uid
v = self._to_db_fmt(v)
pipe.set(k, v)
ret = pipe.execute()
return ret

@mem_except_wrap
def get_many(self, key=None, uid=''):
keys = [k for k in key]
with self.cache.pipeline() as pipe:
out = []
for key in keys:
key = self.key_prefix + key + uid
pipe.get(key)
values = pipe.execute()
return {k: self._from_db_fmt(v) for k, v in zip(keys, values)}

@mem_except_wrap
def pipeline(self):
"""
Returns redis pipeline. Get values with pipe.execute(),
close with pipe.close()
"""
return self.cache.pipeline()

def lock(self, name, **kwargs):
try:
self.ensure_connection()
name = self.key_prefix + name
return self.cache.lock(name, **kwargs)
except Exception as e:
utils.debug_print("Butler.Collection.Memory.lock exception: {}".format(e))
return None
def _to_db_fmt(self, v):
if isinstance(v, np.ndarray):
return pickle.dumps(v)
return v

def exists(self, key):
def _from_db_fmt(self, v):
try:
self.ensure_connection()
key = self.key_prefix + key
return self.cache.exists(key)
except Exception as e:
utils.debug_print("Butler.Collection.Memory.exists exception: {}".format(e))
return None
return ast.literal_eval(v)
except:
try:
return pickle.loads(v)
except:
return v


class Collection(object):
Expand Down Expand Up @@ -194,6 +268,9 @@ def get(self, uid="", key=None, pattern=None, exp=None):
else:
return self.db.get_docs_with_filter(self.collection, pattern)

def get_many(self, **kwargs):
return self.get(**kwargs)

@timed(op_type='get')
def get_and_delete(self, uid="", key=None, exp=None):
"""
Expand Down
Loading