forked from bitcoinaverage/bitcoinaverage
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathparser_daemon.py
53 lines (41 loc) · 1.57 KB
/
parser_daemon.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
#!/usr/bin/python2.7
import time
import logging
import redis
import simplejson as json
import eventlet
from bitcoinaverage import api_parsers
from bitcoinaverage.config import API_QUERY_FREQUENCY, EXCHANGE_LIST
logger = logging.getLogger("parser_daemon")
logger.info("started API parser daemon")
red = redis.StrictRedis(host="localhost", port=6379, db=0)
red.delete("ba:exchanges", "ba:exchanges_ignored") # Reset
pool = eventlet.GreenPool()
queue = eventlet.Queue()
def worker(exchange_name, q):
result = api_parsers.callAPI(exchange_name)
q.put(result)
for exchange_name in EXCHANGE_LIST:
pool.spawn_n(worker, exchange_name, queue)
while True:
start_time = time.time()
results = []
while not queue.empty():
results.append(queue.get())
for exchange_name, exchange_data, exchange_ignore_reason in results:
if exchange_ignore_reason is None:
red.hset("ba:exchanges",
exchange_name,
json.dumps(exchange_data, use_decimal=True))
red.hdel("ba:exchanges_ignored", exchange_name)
else:
red.hset("ba:exchanges_ignored",
exchange_name,
exchange_ignore_reason)
red.hdel("ba:exchanges", exchange_name)
pool.spawn_n(worker, exchange_name, queue)
logger.info("saved {0} results".format(len(results)))
cycle_time = time.time() - start_time
sleep_time = max(0, API_QUERY_FREQUENCY['_all'] - cycle_time)
logger.info("spent {0}, sleeping {1}".format(cycle_time, sleep_time))
eventlet.sleep(sleep_time)