Skip to content

Commit

Permalink
Merge pull request #147 from usc-isi-i2/development
Browse files Browse the repository at this point in the history
Development
  • Loading branch information
saggu authored Nov 16, 2017
2 parents ac35d71 + 9d0f48e commit 1d88b0f
Show file tree
Hide file tree
Showing 15 changed files with 1,818 additions and 934 deletions.
240 changes: 128 additions & 112 deletions etk/core.py

Large diffs are not rendered by default.

123 changes: 79 additions & 44 deletions etk/run_core_kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,11 @@
from kafka import KafkaProducer, KafkaConsumer
from digsandpaper.elasticsearch_indexing.index_knowledge_graph import index_knowledge_graph_fields
import traceback
import signal


consumer_pointer = None


def run_serial(input, output, core, prefix='', kafka_server=None, kafka_topic=None):
# ignore file output if kafka is set
Expand Down Expand Up @@ -49,50 +54,63 @@ def run_serial_cdrs(etk_core, consumer, producer, producer_topic, indexing=False

# high level api will handle batch thing
# will exit once timeout
for msg in consumer:
cdr = msg.value
cdr['@execution_profile'] = {'@worker_id': worker_id}
doc_arrived_time = time.time()
cdr['@execution_profile']['@doc_arrived_time'] = datetime.utcfromtimestamp(doc_arrived_time).isoformat()
cdr['@execution_profile']['@doc_wait_time'] = 0 if not prev_doc_sent_time \
else doc_arrived_time - prev_doc_sent_time

if 'doc_id' not in cdr:
cdr['doc_id'] = cdr.get('_id', cdr.get('document_id', ''))
if len(cdr['doc_id']) == 0:
print 'invalid cdr: unknown doc_id'
print 'processing', cdr['doc_id']

try:
start_run_core_time = time.time()
# run core
result = etk_core.process(cdr, create_knowledge_graph=True)
if not result:
raise Exception('run core error')

# indexing
if indexing:
result = index_knowledge_graph_fields(result)
cdr['@execution_profile']['@run_core_time'] = time.time() - start_run_core_time

doc_sent_time = time.time()
cdr['@execution_profile']['@doc_sent_time'] = datetime.utcfromtimestamp(doc_sent_time).isoformat()
prev_doc_sent_time = doc_sent_time
cdr['@execution_profile']['@doc_processed_time'] = doc_sent_time - doc_arrived_time
# dumping result
if result:
r = producer.send(producer_topic, result)
r.get(timeout=60) # wait till sent
else:
etk_core.log('fail to indexing doc {}'.format(cdr['doc_id']), core._ERROR)
print 'done'
try:
for msg in consumer:
cdr = msg.value
cdr['@execution_profile'] = {'@worker_id': worker_id}
doc_arrived_time = time.time()
cdr['@execution_profile']['@doc_arrived_time'] = datetime.utcfromtimestamp(doc_arrived_time).isoformat()
cdr['@execution_profile']['@doc_wait_time'] = 0.0 if not prev_doc_sent_time \
else float(doc_arrived_time - prev_doc_sent_time)
cdr['@execution_profile']['@doc_length'] = len(json.dumps(cdr))

if 'doc_id' not in cdr:
cdr['doc_id'] = cdr.get('_id', cdr.get('document_id', ''))
if len(cdr['doc_id']) == 0:
print 'invalid cdr: unknown doc_id'
print 'processing', cdr['doc_id']

try:
start_run_core_time = time.time()
# run core
result = etk_core.process(cdr, create_knowledge_graph=True)
if not result:
raise Exception('run core error')

# indexing
if indexing:
result = index_knowledge_graph_fields(result)
cdr['@execution_profile']['@run_core_time'] = float(time.time() - start_run_core_time)

doc_sent_time = time.time()
cdr['@execution_profile']['@doc_sent_time'] = datetime.utcfromtimestamp(doc_sent_time).isoformat()
prev_doc_sent_time = doc_sent_time
cdr['@execution_profile']['@doc_processed_time'] = float(doc_sent_time - doc_arrived_time)
# dumping result
if result:
r = producer.send(producer_topic, result)
r.get(timeout=60) # wait till sent
else:
etk_core.log('fail to indexing doc {}'.format(cdr['doc_id']), core._ERROR)
print 'done'


except Exception as e:
# print e
exc_type, exc_value, exc_traceback = sys.exc_info()
lines = traceback.format_exception(exc_type, exc_value, exc_traceback)
print ''.join(lines)
print 'failed at', cdr['doc_id']

except ValueError as e:
# I/O operation on closed epoll fd
print 'consumer closed'

except StopIteration as e:
# timeout
print 'consumer timeout'
sys.exit()

except Exception as e:
# print e
exc_type, exc_value, exc_traceback = sys.exc_info()
lines = traceback.format_exception(exc_type, exc_value, exc_traceback)
print ''.join(lines)
print 'failed at', cdr['doc_id']


def run_parallel_3(input_path, output_path, config_path, processes, kafka_server, kafka_topic):
Expand Down Expand Up @@ -148,6 +166,14 @@ def run_parallel_worker(worker_id, input_chunk_path, output_chunk_path, config_p
print 'worker #{} finished'.format(worker_id)


def termination_handler(signum, frame):
print 'SIGNAL #{} received, trying to exit...'.format(signum)

global consumer_pointer
if consumer_pointer:
consumer_pointer.close()


def usage():
return """\
Usage: python run_core.py [args]
Expand Down Expand Up @@ -203,6 +229,9 @@ def usage():

worker_id = int(c_options.workerId) if c_options.workerId is not None else 0

signal.signal(signal.SIGINT, termination_handler)
signal.signal(signal.SIGTERM, termination_handler)

# kafka input
if c_options.kafkaInputServer is not None:
try:
Expand All @@ -226,13 +255,19 @@ def usage():
**input_args
)
consumer.subscribe([c_options.kafkaInputTopic])
c = core.Core(json.load(codecs.open(c_options.configPath, 'r')))

global consumer_pointer
consumer_pointer = consumer

kafka_output_server = c_options.kafkaOutputServer.split(',')
producer = KafkaProducer(
bootstrap_servers=kafka_output_server,
value_serializer=lambda v: json.dumps(v).encode('utf-8'),
**output_args
)

c = core.Core(json.load(codecs.open(c_options.configPath, 'r')))

run_serial_cdrs(c, consumer, producer, c_options.kafkaOutputTopic, indexing=c_options.indexing,
worker_id=worker_id)

Expand Down
Loading

0 comments on commit 1d88b0f

Please sign in to comment.