Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Реалізували сплячий режим для бота, перевіряємо працездатність proxy, api, doc service #66

Merged
merged 11 commits into from
Jun 7, 2017
2 changes: 1 addition & 1 deletion openprocurement/bot/identification/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ def verify(self, param, code, headers):

return response

def details(self, id, headers):
def details(self, id, headers={}):
""" Send request to Proxy server to get details."""
url = '{url}/{id}'.format(url=self.details_url, id=id)
response = self.session.get(url=url, auth=(self.user, self.password), timeout=self.timeout, headers=headers)
Expand Down
41 changes: 30 additions & 11 deletions openprocurement/bot/identification/databridge/bridge.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ def __init__(self, config):
self.config = config

api_server = self.config_get('tenders_api_server')
api_version = self.config_get('tenders_api_version')
self.api_version = self.config_get('tenders_api_version')
ro_api_server = self.config_get('public_tenders_api_server') or api_server
buffers_size = self.config_get('buffers_size') or 500
self.delay = self.config_get('delay') or 15
Expand All @@ -46,8 +46,8 @@ def __init__(self, config):
self.doc_service_port = self.config_get('doc_service_port') or 6555

# init clients
self.tenders_sync_client = TendersClientSync('', host_url=ro_api_server, api_version=api_version)
self.client = TendersClient(self.config_get('api_token'), host_url=api_server, api_version=api_version)
self.tenders_sync_client = TendersClientSync('', host_url=ro_api_server, api_version=self.api_version)
self.client = TendersClient(self.config_get('api_token'), host_url=api_server, api_version=self.api_version)
self.proxyClient = ProxyClient(host=self.config_get('proxy_server'),
user=self.config_get('proxy_user'),
password=self.config_get('proxy_password'),
Expand Down Expand Up @@ -112,9 +112,6 @@ def config_get(self, name):
return self.config.get('main').get(name)

def check_doc_service(self):
"""Makes request to the doc_service, returns True if it's up, raises RequestError otherwise
Separated to allow for possible granular checks
"""
try:
request("{host}:{port}/".format(host=self.doc_service_host, port=self.doc_service_port))
except RequestError as e:
Expand All @@ -124,10 +121,17 @@ def check_doc_service(self):
else:
return True

def check_openprocurement_api(self):
try:
self.client.head('/api/{}/spore'.format(self.api_version))
except RequestError as e:
logger.info('TendersServer connection error, message {}'.format(e),
extra=journal_context({"MESSAGE_ID": DATABRIDGE_DOC_SERVICE_CONN_ERROR}, {}))
raise e
else:
return True

def check_proxy(self):
"""Makes request to the EDR proxy, returns True if it's up, raises RequestError otherwise
Separated to allow for possible granular checks
"""
try:
self.proxyClient.health()
except RequestException as e:
Expand All @@ -137,6 +141,20 @@ def check_proxy(self):
else:
return True

def set_sleep(self, new_status):
for job in self.jobs.values():
job.exit = new_status

def check_services(self):
try:
self.check_proxy() and self.check_openprocurement_api() and self.check_doc_service()
except Exception as e:
logger.info("Service is unavailable, message {}".format(e))
self.set_sleep(True)
else:
logger.info("All services have become available, starting all workers")
self.set_sleep(False)

def _start_jobs(self):
self.jobs = {'scanner': self.scanner(),
'filter_tender': self.filter_tender(),
Expand All @@ -150,6 +168,7 @@ def run(self):
try:
while True:
gevent.sleep(self.delay)
self.check_services()
if counter == 20:
logger.info('Current state: Filtered tenders {}; Edrpou codes queue {}; Retry edrpou codes queue {}; '
'Edr ids queue {}; Retry edr ids queue {}; Upload to doc service {}; Retry upload to doc service {}; '
Expand All @@ -167,7 +186,7 @@ def run(self):
counter = 0
counter += 1
for name, job in self.jobs.items():
if job.dead:
if job.dead and not job.exit:
logger.warning('Restarting {} worker'.format(name),
extra=journal_context({"MESSAGE_ID": DATABRIDGE_RESTART_WORKER}))
self.jobs[name] = gevent.spawn(getattr(self, name))
Expand All @@ -188,7 +207,7 @@ def main():
config = load(config_file_obj.read())
logging.config.dictConfig(config)
bridge = EdrDataBridge(config)
bridge.check_proxy() and bridge.check_doc_service()
bridge.check_proxy() and bridge.check_doc_service() and bridge.check_openprocurement_api()
bridge.run()
else:
logger.info('Invalid configuration file. Exiting...')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ def get_edr_details(self):
self.retry_edr_ids_queue.put(Data(tender_data.tender_id, tender_data.item_id, tender_data.code,
tender_data.item_name, [edr_id], file_content))
self.handle_status_response(response, tender_data.tender_id)
logger.info('Put tender {} with {} id {} {} to retry_edr_ids_queue'.format(
logger.info('Put tender {} with {} id {} document_id {} to retry_edr_ids_queue'.format(
tender_data.tender_id, tender_data.item_name, tender_data.item_id, document_id),
extra=journal_context(params={"TENDER_ID": tender_data.tender_id, "DOCUMENT_ID": document_id}))
self.edr_ids_queue.get()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
DATABRIDGE_RESTART_RETRY_GET_EDR_ID = 'edr_databridge_restart_retry_get_edr_id'
DATABRIDGE_RESTART_RETRY_GET_EDR_DETAILS = 'edr_databridge_restart_retry_get_edr_details'
DATABRIDGE_DOC_SERVICE_CONN_ERROR = 'edr_databridge_doc_service_conn_error'
DATABRIDGE_TENDERS_SERVER_CONN_ERROR = 'edr_databridge_tenders_server_conn_error'
DATABRIDGE_PROXY_SERVER_CONN_ERROR = 'edr_databridge_proxy_server_conn_error'
DATABRIDGE_422_UPLOAD_TO_TENDER = 'edr_databridge_422_upload_to_tender'
DATABRIDGE_ITEM_STATUS_CHANGED_WHILE_PROCESSING = 'edr_databridge_item_status_changed_while_processing'
58 changes: 30 additions & 28 deletions openprocurement/bot/identification/databridge/scanner.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,36 +92,38 @@ def get_tenders(self, params={}, direction=""):
raise re

def get_tenders_forward(self):
logger.info('Start forward data sync worker...')
params = {'opt_fields': 'status,procurementMethodType', 'mode': '_all_'}
try:
for tender in self.get_tenders(params=params, direction="forward"):
logger.info('Forward sync: Put tender {} to process...'.format(tender['id']),
extra=journal_context({"MESSAGE_ID": DATABRIDGE_TENDER_PROCESS},
{"TENDER_ID": tender['id']}))
self.filtered_tender_ids_queue.put(tender['id'])
except Exception as e:
logger.warning('Forward worker died!', extra=journal_context({"MESSAGE_ID": DATABRIDGE_WORKER_DIED}, {}))
logger.exception(e)
else:
logger.warning('Forward data sync finished!')
if not self.exit:
logger.info('Start forward data sync worker...')
params = {'opt_fields': 'status,procurementMethodType', 'mode': '_all_'}
try:
for tender in self.get_tenders(params=params, direction="forward"):
logger.info('Forward sync: Put tender {} to process...'.format(tender['id']),
extra=journal_context({"MESSAGE_ID": DATABRIDGE_TENDER_PROCESS},
{"TENDER_ID": tender['id']}))
self.filtered_tender_ids_queue.put(tender['id'])
except Exception as e:
logger.warning('Forward worker died!', extra=journal_context({"MESSAGE_ID": DATABRIDGE_WORKER_DIED}, {}))
logger.exception(e)
else:
logger.warning('Forward data sync finished!')

def get_tenders_backward(self):
logger.info('Start backward data sync worker...')
params = {'opt_fields': 'status,procurementMethodType', 'descending': 1, 'mode': '_all_'}
try:
for tender in self.get_tenders(params=params, direction="backward"):
logger.info('Backward sync: Put tender {} to process...'.format(tender['id']),
extra=journal_context({"MESSAGE_ID": DATABRIDGE_TENDER_PROCESS},
{"TENDER_ID": tender['id']}))
self.filtered_tender_ids_queue.put(tender['id'])
except Exception as e:
logger.warning('Backward worker died!', extra=journal_context({"MESSAGE_ID": DATABRIDGE_WORKER_DIED}, {}))
logger.exception(e)
return False
else:
logger.info('Backward data sync finished.')
return True
if not self.exit:
logger.info('Start backward data sync worker...')
params = {'opt_fields': 'status,procurementMethodType', 'descending': 1, 'mode': '_all_'}
try:
for tender in self.get_tenders(params=params, direction="backward"):
logger.info('Backward sync: Put tender {} to process...'.format(tender['id']),
extra=journal_context({"MESSAGE_ID": DATABRIDGE_TENDER_PROCESS},
{"TENDER_ID": tender['id']}))
self.filtered_tender_ids_queue.put(tender['id'])
except Exception as e:
logger.warning('Backward worker died!', extra=journal_context({"MESSAGE_ID": DATABRIDGE_WORKER_DIED}, {}))
logger.exception(e)
return False
else:
logger.info('Backward data sync finished.')
return True

def _start_synchronization_workers(self):
logger.info('Scanner starting forward and backward sync workers')
Expand Down
118 changes: 92 additions & 26 deletions openprocurement/bot/identification/tests/bridge.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ def setUpClass(cls):
cls.proxy_server_bottle = Bottle()
cls.doc_server_bottle = Bottle()
cls.api_server = WSGIServer(('127.0.0.1', 20604), cls.api_server_bottle, log=None)
setup_routing(cls.api_server_bottle, response_spore)
setup_routing(cls.api_server_bottle, response_spore, method='HEAD')
cls.public_api_server = WSGIServer(('127.0.0.1', 20605), cls.api_server_bottle, log=None)
cls.doc_server = WSGIServer(('127.0.0.1', 20606), cls.doc_server_bottle, log=None)
setup_routing(cls.doc_server_bottle, doc_response, path='/')
Expand All @@ -84,14 +84,21 @@ def tearDownClass(cls):
cls.doc_server.close()
cls.proxy_server.close()

def setUp(self):
self.worker = EdrDataBridge(config)
workers = {'scanner': MagicMock(return_value=MagicMock(exit=False)),
'filter_tender': MagicMock(return_value=MagicMock(exit=False)),
'edr_handler': MagicMock(return_value=MagicMock(exit=False)),
'upload_file': MagicMock(return_value=MagicMock(exit=False))}
for name, value in workers.items():
setattr(self.worker, name, value)

def tearDown(self):
del self.worker


def setup_routing(app, func, path='/api/0/spore', method='GET'):
app.route(path, method, func)


def response_spore():
response.set_cookie("SERVER_ID", ("a7afc9b1fc79e640f2487ba48243ca071c07a823d27"
"8cf9b7adf0fae467a524747e3c6c6973262130fac2b"
Expand All @@ -102,15 +109,17 @@ def response_spore():
def doc_response():
return response


def proxy_response():
return response

def proxy_response_402():
response.status = "402 Payment required"
return response


class TestBridgeWorker(BaseServersTest):

def test_init(self):
# setup_routing(self.api_server_bottle, response_spore)
self.worker = EdrDataBridge(config)
self.assertEqual(self.worker.delay, 15)
self.assertEqual(self.worker.increment_step, 1)
Expand Down Expand Up @@ -161,7 +170,6 @@ def test_tender_sync_clients(self, sync_client, client, doc_service_client, prox
'version': config['main']['proxy_version']})

def test_start_jobs(self):
# setup_routing(self.api_server_bottle, response_spore)
self.worker = EdrDataBridge(config)

scanner, filter_tender, edr_handler, upload_file = [MagicMock(return_value=i) for i in range(4)]
Expand All @@ -173,9 +181,9 @@ def test_start_jobs(self):
self.worker._start_jobs()
# check that all jobs were started
self.assertTrue(scanner.called)
self.assertTrue(scanner.called)
self.assertTrue(scanner.called)
self.assertTrue(scanner.called)
self.assertTrue(filter_tender.called)
self.assertTrue(edr_handler.called)
self.assertTrue(upload_file.called)

self.assertEqual(self.worker.jobs['scanner'], 0)
self.assertEqual(self.worker.jobs['filter_tender'], 1)
Expand All @@ -194,31 +202,89 @@ def test_run(self, sleep):

with patch('__builtin__.True', AlmostAlwaysTrue(100)):
self.worker.run()
self.assertEqual(scanner.call_count, 1)
self.assertEqual(filter_tender.call_count, 1)
self.assertEqual(edr_handler.call_count, 1)
self.assertEqual(upload_file.call_count, 1)
self.assertEqual(self.worker.scanner.call_count, 1)
self.assertEqual(self.worker.filter_tender.call_count, 1)
self.assertEqual(self.worker.edr_handler.call_count, 1)
self.assertEqual(self.worker.upload_file.call_count, 1)

def test_proxy_server_failure(self):
def test_proxy_server(self):
self.proxy_server.stop()
self.worker = EdrDataBridge(config)
with self.assertRaises(RequestException):
self.worker.check_proxy()
self.proxy_server.start()
self.assertEqual(self.worker.check_proxy(), True)
self.assertTrue(self.worker.check_proxy())

def test_proxy_server_success(self):
self.worker = EdrDataBridge(config)
self.assertEqual(self.worker.check_proxy(), True)

def test_doc_service_failure(self):
def test_doc_service(self):
self.doc_server.stop()
self.worker = EdrDataBridge(config)
with self.assertRaises(RequestError):
self.worker.check_doc_service()
self.doc_server.start()
self.assertEqual(self.worker.check_doc_service(), True)
self.assertTrue(self.worker.check_doc_service())

def test_doc_service_success(self):
self.worker = EdrDataBridge(config)
self.assertEqual(self.worker.check_doc_service(), True)
def test_api(self):
self.api_server.stop()
with self.assertRaises(RequestError):
self.worker.check_openprocurement_api()
self.api_server.start()
self.assertTrue(self.worker.check_openprocurement_api())

def test_check_services_did_not_stop(self):
self.worker._start_jobs()
functions = {'check_proxy': MagicMock(return_value = True),
'check_doc_service': MagicMock(return_value = True),
'check_openprocurement_api': MagicMock(return_value = True)}
for name, value in functions.items():
setattr(self.worker, name, value)
self.worker.check_services()
self.assertTrue(all([i.call_count == 1 for i in functions.values()]))
self.assertFalse(all([i.exit for i in self.worker.jobs.values()]))

def test_check_services(self):
self.worker._start_jobs()
self.proxy_server.stop()
self.worker.check_services()
self.assertTrue(all([i.exit for i in self.worker.jobs.values()]))
self.proxy_server.start()
self.worker.set_sleep(False)

self.doc_server.stop()
self.worker.check_services()
self.assertTrue(all([i.exit for i in self.worker.jobs.values()]))
self.doc_server.start()
self.worker.set_sleep(False)

self.api_server.stop()
self.worker.check_services()
self.assertTrue(all([i.exit for i in self.worker.jobs.values()]))
self.api_server.start()
self.worker.set_sleep(False)

def test_check_services_needs_all(self):
self.worker._start_jobs()
self.worker.set_sleep(True)
self.proxy_server.stop()
self.doc_server.stop()
self.api_server.stop()

self.proxy_server.start()
self.worker.check_services()
self.assertTrue(all([i.exit for i in self.worker.jobs.values()]))

self.doc_server.start()
self.worker.check_services()
self.assertTrue(all([i.exit for i in self.worker.jobs.values()]))
self.worker.set_sleep(False)

self.api_server.start()
self.worker.check_services()
self.assertFalse(all([i.exit for i in self.worker.jobs.values()]))

@patch('gevent.sleep')
def test_run_with_mock_check_services(self, sleep):
"""Basic test to ensure run() goes into the while (and inside that for) loops and that jobs are called only once"""
self.worker.check_services = MagicMock()
self.worker.run()
self.assertEqual(self.worker.scanner.call_count, 1)
self.assertEqual(self.worker.filter_tender.call_count, 1)
self.assertEqual(self.worker.edr_handler.call_count, 1)
self.assertEqual(self.worker.upload_file.call_count, 1)