diff --git a/openprocurement/bot/identification/client.py b/openprocurement/bot/identification/client.py index 5ab8f50..718106f 100644 --- a/openprocurement/bot/identification/client.py +++ b/openprocurement/bot/identification/client.py @@ -17,7 +17,6 @@ def verify(self, param, code, headers): """Send request to Proxy server to verify EDRPOU code""" url = '{url}?{param}={code}'.format(url=self.verify_url, param=param, code=code) response = self.session.get(url=url, auth=(self.user, self.password), timeout=self.timeout, headers=headers) - return response def health(self, sandbox_mode): @@ -38,8 +37,10 @@ def __init__(self, host, user, password, port=6555, timeout=None): self.user = user self.password = password self.timeout = timeout + self.headers = {} def upload(self, filename, in_file, content_type, headers): files = {'file': (filename, in_file, content_type)} + self.headers.update(headers) response = self.session.post(url=self.url, auth=(self.user, self.password), timeout=self.timeout, files=files, headers=headers) return response diff --git a/openprocurement/bot/identification/databridge/bridge.py b/openprocurement/bot/identification/databridge/bridge.py index dca2a6a..633e7c8 100644 --- a/openprocurement/bot/identification/databridge/bridge.py +++ b/openprocurement/bot/identification/databridge/bridge.py @@ -1,6 +1,5 @@ # -*- coding: utf-8 -*- from gevent import monkey -from retrying import retry monkey.patch_all() @@ -13,15 +12,18 @@ from functools import partial from yaml import load from gevent.queue import Queue +from retrying import retry from restkit import request, RequestError, ResourceError from requests import RequestException from constants import retry_mult + from openprocurement_client.client import TendersClientSync as BaseTendersClientSync, TendersClient as BaseTendersClient from openprocurement.bot.identification.client import DocServiceClient, ProxyClient from openprocurement.bot.identification.databridge.scanner import Scanner from openprocurement.bot.identification.databridge.filter_tender import FilterTenders from openprocurement.bot.identification.databridge.edr_handler import EdrHandler -from openprocurement.bot.identification.databridge.upload_file import UploadFile +from openprocurement.bot.identification.databridge.upload_file_to_tender import UploadFileToTender +from openprocurement.bot.identification.databridge.upload_file_to_doc_service import UploadFileToDocService from openprocurement.bot.identification.databridge.utils import journal_context, check_412, ProcessTracker from caching import Db from openprocurement.bot.identification.databridge.journal_msg_ids import ( @@ -34,14 +36,12 @@ class TendersClientSync(BaseTendersClientSync): - @check_412 def request(self, *args, **kwargs): return super(TendersClientSync, self).request(*args, **kwargs) class TendersClient(BaseTendersClient): - @check_412 def _create_tender_resource_item(self, *args, **kwargs): return super(TendersClient, self)._create_tender_resource_item(*args, **kwargs) @@ -83,13 +83,13 @@ def __init__(self, config): # init queues for workers self.filtered_tender_ids_queue = Queue(maxsize=buffers_size) # queue of tender IDs with appropriate status self.edrpou_codes_queue = Queue(maxsize=buffers_size) # queue with edrpou codes (Data objects stored in it) - self.upload_to_doc_service_queue = Queue(maxsize=buffers_size) # queue with detailed info from EDR (Data.file_content) - # upload_to_tender_queue - queue with file's get_url + self.upload_to_doc_service_queue = Queue(maxsize=buffers_size) # queue with info from EDR (Data.file_content) self.upload_to_tender_queue = Queue(maxsize=buffers_size) # blockers self.initialization_event = gevent.event.Event() self.services_not_available = gevent.event.Event() + self.services_not_available.set() self.db = Db(config) self.process_tracker = ProcessTracker(self.db, self.time_to_live) @@ -119,15 +119,22 @@ def __init__(self, config): services_not_available=self.services_not_available, delay=self.delay) - self.upload_file = partial(UploadFile.spawn, - client=self.client, - upload_to_doc_service_queue=self.upload_to_doc_service_queue, - upload_to_tender_queue=self.upload_to_tender_queue, - process_tracker=self.process_tracker, - doc_service_client=self.doc_service_client, - services_not_available=self.services_not_available, - sleep_change_value=self.sleep_change_value, - delay=self.delay) + self.upload_file_to_doc_service = partial(UploadFileToDocService.spawn, + upload_to_doc_service_queue=self.upload_to_doc_service_queue, + upload_to_tender_queue=self.upload_to_tender_queue, + process_tracker=self.process_tracker, + doc_service_client=self.doc_service_client, + services_not_available=self.services_not_available, + sleep_change_value=self.sleep_change_value, + delay=self.delay) + + self.upload_file_to_tender = partial(UploadFileToTender.spawn, + client=self.client, + upload_to_tender_queue=self.upload_to_tender_queue, + process_tracker=self.process_tracker, + services_not_available=self.services_not_available, + sleep_change_value=self.sleep_change_value, + delay=self.delay) def config_get(self, name): return self.config.get('main').get(name) @@ -194,7 +201,9 @@ def _start_jobs(self): self.jobs = {'scanner': self.scanner(), 'filter_tender': self.filter_tender(), 'edr_handler': self.edr_handler(), - 'upload_file': self.upload_file()} + 'upload_file_to_doc_service': self.upload_file_to_doc_service(), + 'upload_file_to_tender': self.upload_file_to_tender(), + } def launch(self): while True: @@ -213,17 +222,21 @@ def run(self): self.check_services() if counter == 20: counter = 0 - logger.info('Current state: Filtered tenders {}; Edrpou codes queue {}; Retry edrpou codes queue {};' - 'Upload to doc service {}; Retry upload to doc service {}; ' - 'Upload to tender {}; Retry upload to tender {}'.format( - self.filtered_tender_ids_queue.qsize(), - self.edrpou_codes_queue.qsize(), - self.jobs['edr_handler'].retry_edrpou_codes_queue.qsize() if self.jobs['edr_handler'] else 0, - self.upload_to_doc_service_queue.qsize(), - self.jobs['upload_file'].retry_upload_to_doc_service_queue.qsize() if self.jobs['upload_file'] else 0, - self.upload_to_tender_queue.qsize(), - self.jobs['upload_file'].retry_upload_to_tender_queue.qsize() if self.jobs['upload_file'] else 0 - )) + logger.info( + 'Current state: Filtered tenders {}; Edrpou codes queue {}; Retry edrpou codes queue {};' + 'Upload to doc service {}; Retry upload to doc service {}; ' + 'Upload to tender {}; Retry upload to tender {}'.format( + self.filtered_tender_ids_queue.qsize(), + self.edrpou_codes_queue.qsize(), + self.jobs['edr_handler'].retry_edrpou_codes_queue.qsize() if self.jobs[ + 'edr_handler'] else 0, + self.upload_to_doc_service_queue.qsize(), + self.jobs['upload_file_to_doc_service'].retry_upload_to_doc_service_queue.qsize() if self.jobs[ + 'upload_file_to_doc_service'] else 0, + self.upload_to_tender_queue.qsize(), + self.jobs['upload_file_to_tender'].retry_upload_to_tender_queue.qsize() if self.jobs[ + 'upload_file_to_tender'] else 0 + )) counter += 1 for name, job in self.jobs.items(): logger.debug("{}.dead: {}".format(name, job.dead)) diff --git a/openprocurement/bot/identification/databridge/constants.py b/openprocurement/bot/identification/databridge/constants.py index 835ca7a..b090e1f 100644 --- a/openprocurement/bot/identification/databridge/constants.py +++ b/openprocurement/bot/identification/databridge/constants.py @@ -1,7 +1,8 @@ +# coding=utf-8 major = 1 minor = 2 bugfix = 1 -version = '{}.{}.{}'.format(major, minor, bugfix) # major.minor.bugfix +version = '{}.{}.{}'.format(major, minor, bugfix) file_name = 'edr_identification.yaml' author = "IdentificationBot" retry_mult = 1000 diff --git a/openprocurement/bot/identification/databridge/edr_handler.py b/openprocurement/bot/identification/databridge/edr_handler.py index 0b1177a..2a595af 100644 --- a/openprocurement/bot/identification/databridge/edr_handler.py +++ b/openprocurement/bot/identification/databridge/edr_handler.py @@ -17,7 +17,7 @@ DATABRIDGE_EMPTY_RESPONSE ) from openprocurement.bot.identification.databridge.utils import ( - Data, journal_context, validate_param, RetryException, check_add_suffix, data_string + Data, journal_context, RetryException, check_add_suffix ) from openprocurement.bot.identification.databridge.constants import version, retry_mult @@ -64,58 +64,61 @@ def get_edr_data(self): except LoopExit: gevent.sleep() continue - item_name_id = tender_data.item_name[:-1].upper() + "_ID" - logger.info('Get {} from edrpou_codes_queue'.format(data_string(tender_data)), + logger.info('Get {} from edrpou_codes_queue'.format(tender_data), extra=journal_context({"MESSAGE_ID": DATABRIDGE_GET_TENDER_FROM_QUEUE}, - params={"TENDER_ID": tender_data.tender_id, item_name_id: tender_data.item_id})) + tender_data.log_params())) self.until_too_many_requests_event.wait() - document_id = tender_data.file_content['meta']['id'] - response = self.proxyClient.verify(validate_param(tender_data.code), tender_data.code, headers={'X-Client-Request-ID': document_id}) - if response.headers.get('X-Request-ID'): - tender_data.file_content['meta']['sourceRequests'].append(response.headers['X-Request-ID']) # add unique request id - if response.status_code == 404 and response.json().get('errors')[0].get('description')[0].get('error').get('code') == u"notFound": - logger.info('Empty response for {} doc_id {}.'.format(data_string(tender_data), document_id), - extra=journal_context({"MESSAGE_ID": DATABRIDGE_EMPTY_RESPONSE}, - params={"TENDER_ID": tender_data.tender_id, item_name_id: tender_data.item_id, "DOCUMENT_ID": document_id})) - file_content = response.json().get('errors')[0].get('description')[0] - file_content['meta'].update(tender_data.file_content['meta']) # add meta.id to file_content - file_content['meta'].update({"version": version}) # add filed meta.version - data = Data(tender_data.tender_id, tender_data.item_id, tender_data.code, tender_data.item_name, file_content) - self.process_tracker.set_item(tender_data.tender_id, tender_data.item_id, 1) - self.upload_to_doc_service_queue.put(data) - self.edrpou_codes_queue.get() - continue - if response.status_code == 200: - meta_id = tender_data.file_content['meta']['id'] - data_list = [] - try: - for i, obj in enumerate(response.json()['data']): - document_id = check_add_suffix(response.json()['data'], meta_id, i + 1) - file_content = {'meta': {'sourceDate': response.json()['meta']['detailsSourceDate'][i]}, - 'data': obj} - file_content['meta'].update(deepcopy(tender_data.file_content['meta'])) - file_content['meta'].update({"version": version}) # add filed meta.version - file_content['meta']['id'] = document_id - data = Data(tender_data.tender_id, tender_data.item_id, tender_data.code, - tender_data.item_name, file_content) - data_list.append(data) - except (KeyError, IndexError) as e: - logger.info('Error {}. {}'.format(data_string(tender_data), e)) - self.retry_edrpou_codes_queue.put(tender_data) - else: - self.process_tracker.set_item(tender_data.tender_id, tender_data.item_id, len(response.json()['data'])) - for data in data_list: - self.upload_to_doc_service_queue.put(data) - logger.info('Put tender {} doc_id {} to upload_to_doc_service_queue.'.format( - data_string(data), data.file_content['meta']['id'])) + response = self.proxyClient.verify(tender_data.param(), tender_data.code, + headers={'X-Client-Request-ID': tender_data.doc_id()}) + tender_data.add_unique_req_id(response) + try: + res_json = response.json() + except JSONDecodeError: + res_json = response.text + if self.is_no_document_in_edr(response, res_json): + self.move_data_nonexistent_edr(response.json(), tender_data, False) + elif response.status_code == 200: + self.move_data_existing_edr(response, tender_data, False) else: self.handle_status_response(response, tender_data.tender_id) self.retry_edrpou_codes_queue.put(tender_data) # Put tender to retry - logger.info('Put {} to retry_edrpou_codes_queue'.format(data_string(tender_data)), - extra=journal_context(params={"TENDER_ID": tender_data.tender_id, item_name_id: tender_data.item_id})) + logger.info('Put {} to retry_edrpou_codes_queue'.format(tender_data), + extra=journal_context(params=tender_data.log_params())) self.edrpou_codes_queue.get() gevent.sleep() + def is_no_document_in_edr(self, response, res_json): + return (response.status_code == 404 and isinstance(res_json, dict) + and res_json.get('errors')[0].get('description')[0].get('error').get('code') == u"notFound") + + def move_data_nonexistent_edr(self, res_json, tender_data, is_retry): + logger.info('Empty response for {} doc_id {}.'.format(tender_data, tender_data.doc_id()), + extra=journal_context({"MESSAGE_ID": DATABRIDGE_EMPTY_RESPONSE}, tender_data.log_params())) + file_content = res_json.get('errors')[0].get('description')[0] + file_content['meta'].update(tender_data.file_content['meta']) + file_content['meta'].update({"version": version}) + data = tender_data + data.file_content = file_content + self.process_tracker.set_item(tender_data.tender_id, tender_data.item_id, 1) + self.upload_to_doc_service_queue.put(data) + if is_retry: + self.retry_edrpou_codes_queue.get() + + def move_data_existing_edr(self, response, tender_data, is_retry): + data_list = [] + try: + self.fill_data_list(response, tender_data, data_list) + except (KeyError, IndexError) as e: + logger.info('Error {}. {}'.format(tender_data, e)) + self.retry_edrpou_codes_queue.put(self.retry_edrpou_codes_queue.get() if is_retry else tender_data) + else: + for data in data_list: + self.upload_to_doc_service_queue.put(data) + logger.info('Put tender {} doc_id {} to upload_to_doc_service_queue.'.format(data, data.doc_id())) + if is_retry: + self.retry_edrpou_codes_queue.get() + self.process_tracker.set_item(tender_data.tender_id, tender_data.item_id, len(response.json()['data'])) + def retry_get_edr_data(self): """Get data from retry_edrpou_codes_queue; Put data into upload_to_doc_service_queue if request is successful, otherwise put data back to retry_edrpou_codes_queue.""" @@ -126,79 +129,53 @@ def retry_get_edr_data(self): except LoopExit: gevent.sleep() continue - item_name_id = tender_data.item_name[:-1].upper() + "_ID" - logger.info('Get {} from retry_edrpou_codes_queue'.format(data_string(tender_data)), + logger.info('Get {} from retry_edrpou_codes_queue'.format(tender_data), extra=journal_context({"MESSAGE_ID": DATABRIDGE_GET_TENDER_FROM_QUEUE}, - params={"TENDER_ID": tender_data.tender_id, item_name_id: tender_data.item_id})) + tender_data.log_params())) self.until_too_many_requests_event.wait() - document_id = tender_data.file_content['meta']['id'] try: - response = self.get_edr_data_request(validate_param(tender_data.code), tender_data.code, document_id) - if response.headers.get('X-Request-ID'): - tender_data.file_content['meta']['sourceRequests'].append(response.headers['X-Request-ID']) + response = self.get_edr_data_request(tender_data.param(), tender_data.code, tender_data.doc_id()) + tender_data.add_unique_req_id(response) except RetryException as re: try: self.handle_status_response(re.args[1], tender_data.tender_id) res_json = re.args[1].json() except JSONDecodeError: res_json = re.args[1].text - if re.args[1].status_code == 404 and isinstance(res_json, dict) and res_json.get('errors')[0].get('description')[0].get('error').get('code') == u"notFound": - logger.info('Empty response for {} doc_id: {}.'.format(data_string(tender_data), document_id), - extra=journal_context({"MESSAGE_ID": DATABRIDGE_EMPTY_RESPONSE}, - params={"TENDER_ID": tender_data.tender_id, item_name_id: tender_data.item_id, "DOCUMENT_ID": document_id})) - file_content = res_json.get('errors')[0].get('description')[0] - file_content['meta'].update(tender_data.file_content['meta']) - file_content['meta'].update({"version": version}) # add filed meta.version - data = Data(tender_data.tender_id, tender_data.item_id, tender_data.code, - tender_data.item_name, file_content) - self.process_tracker.set_item(tender_data.tender_id, tender_data.item_id, 1) - self.upload_to_doc_service_queue.put(data) # Given EDRPOU code not found, file with error put into upload_to_doc_service_queue - self.retry_edrpou_codes_queue.get() - logger.info('Put {} in back of retry_edrpou_codes_queue. Response {}'.format(data_string(tender_data), res_json), - extra=journal_context(params={"TENDER_ID": tender_data.tender_id, item_name_id: tender_data.item_id})) - self.retry_edrpou_codes_queue.put(self.retry_edrpou_codes_queue.get()) - gevent.sleep() + if self.is_no_document_in_edr(re.args[1], res_json): + self.move_data_nonexistent_edr(res_json, tender_data, True) + else: + logger.info('Put {} in back of retry_edrpou_codes_queue. Response {}'.format(tender_data, res_json), + extra=journal_context(params=tender_data.log_params())) + self.retry_edrpou_codes_queue.put(self.retry_edrpou_codes_queue.get()) + gevent.sleep() except Exception as e: - logger.info('Put {} in back of retry_edrpou_codes_queue. Error: {}'.format(data_string(tender_data), e.message), - extra=journal_context(params={"TENDER_ID": tender_data.tender_id, item_name_id: tender_data.item_id})) + logger.info('Put {} in back of retry_edrpou_codes_queue. Error: {}'.format(tender_data, e.message), + extra=journal_context(params=tender_data.log_params())) self.retry_edrpou_codes_queue.put(self.retry_edrpou_codes_queue.get()) gevent.sleep() else: - # Create new Data object. Write to Data.code list of edr ids from EDR. - # List because EDR can return 0, 1 or 2 values to our request if response.status_code == 429: seconds_to_wait = response.headers.get('Retry-After', self.delay) logger.info('retry_get_edr_id: Too many requests to EDR API. Msg: {}, wait {} seconds.'.format( - response.text, seconds_to_wait), - extra=journal_context(params={"TENDER_ID": tender_data.tender_id, item_name_id: tender_data.item_id})) + response.text, seconds_to_wait), extra=journal_context(params=tender_data.log_params())) self.wait_until_too_many_requests(seconds_to_wait) - continue - if response.status_code == 200: - meta_id = tender_data.file_content['meta']['id'] - data_list = [] - try: - for i, obj in enumerate(response.json()['data']): - document_id = check_add_suffix(response.json()['data'], meta_id, i + 1) - file_content = {'meta': {'sourceDate': response.json()['meta']['detailsSourceDate'][i]}, - 'data': obj} - file_content['meta'].update(deepcopy(tender_data.file_content['meta'])) - file_content['meta'].update({"version": version}) # add filed meta.version - file_content['meta']['id'] = document_id - data = Data(tender_data.tender_id, tender_data.item_id, tender_data.code, - tender_data.item_name, file_content) - data_list.append(data) - except (KeyError, IndexError) as e: - logger.info('Error {}. {}'.format(data_string(tender_data), e)) - self.retry_edrpou_codes_queue.put(self.retry_edrpou_codes_queue.get()) - else: - for data in data_list: - self.upload_to_doc_service_queue.put(data) - logger.info('Put tender {} doc_id {} to upload_to_doc_service_queue from retry.'.format( - data_string(data), data.file_content['meta']['id'])) - self.retry_edrpou_codes_queue.get() - self.process_tracker.set_item(tender_data.tender_id, tender_data.item_id, len(response.json()['data'])) + elif response.status_code == 200: + self.move_data_existing_edr(response, tender_data, True) gevent.sleep() + def fill_data_list(self, response, tender_data, data_list): + for i, obj in enumerate(response.json()['data']): + document_id = check_add_suffix(response.json()['data'], tender_data.doc_id(), i + 1) + file_content = {'meta': {'sourceDate': response.json()['meta']['detailsSourceDate'][i]}, + 'data': obj} + file_content['meta'].update(deepcopy(tender_data.file_content['meta'])) + file_content['meta'].update({"version": version}) # add filed meta.version + file_content['meta']['id'] = document_id + data = Data(tender_data.tender_id, tender_data.item_id, tender_data.code, + tender_data.item_name, file_content) + data_list.append(data) + @retry(stop_max_attempt_number=5, wait_exponential_multiplier=retry_mult) def get_edr_data_request(self, param, code, document_id): """Execute request to EDR Api for retry queue objects.""" @@ -218,36 +195,42 @@ def handle_status_response(self, response, tender_id): logger.info('Too many requests to EDR API. Msg: {}, wait {} seconds.'.format(response.text, seconds_to_wait), extra=journal_context(params={"TENDER_ID": tender_id})) self.wait_until_too_many_requests(seconds_to_wait) - elif response.status_code == 403 and response.headers.get('content-type', '') == 'application/json' and response.json().get('errors')[0].get('description') == [{'message': 'Payment required.', 'code': 5}]: - logger.warning('Payment required for requesting info to EDR. Error description: {err}'.format( - err=response.text), - extra=journal_context(params={"TENDER_ID": tender_id})) + elif self.is_payment_required(response): + logger.warning('Payment required for requesting info to EDR. Message: {err}'.format(err=response.text), + extra=journal_context(params={"TENDER_ID": tender_id})) else: logger.warning('Error appeared while requesting to EDR. Description: {err}'.format(err=response.text), extra=journal_context(params={"TENDER_ID": tender_id})) + def is_payment_required(self, response): + return (response.status_code == 403 and response.headers.get('content-type', '') == 'application/json' + and (response.json().get('errors')[0].get('description') == + [{'message': 'Payment required.', 'code': 5}])) + def wait_until_too_many_requests(self, seconds_to_wait): if self.until_too_many_requests_event.ready(): logger.info('Bot is waiting...') self.until_too_many_requests_event.clear() self.until_too_many_requests_event.wait(float(seconds_to_wait)) - self.until_too_many_requests_event.set() logger.info('Bot stop waiting...') + self.until_too_many_requests_event.set() + + def check_and_revive_jobs(self): + for name, job in self.immortal_jobs.items(): + if job.dead: + logger.warning("EDR handler worker {} dead try restart".format(name), + extra=journal_context({"MESSAGE_ID": "DATABRIDGE_RESTART_{}".format(name.lower())}, {})) + self.immortal_jobs[name] = gevent.spawn(getattr(self, name)) + logger.info("EDR handler worker {} is up".format(name)) def _run(self): logger.info('Start EDR Handler', extra=journal_context({"MESSAGE_ID": DATABRIDGE_START_EDR_HANDLER}, {})) self.immortal_jobs = {'get_edr_data': spawn(self.get_edr_data), 'retry_get_edr_data': spawn(self.retry_get_edr_data)} - try: while not self.exit: gevent.sleep(self.delay) - for name, job in self.immortal_jobs.items(): - if job.dead: - logger.warning("EDR handler worker {} dead try restart".format(name), - extra=journal_context({"MESSAGE_ID": "DATABRIDGE_RESTART_{}".format(name.lower())}, {})) - self.immortal_jobs[name] = gevent.spawn(getattr(self, name)) - logger.info("EDR handler worker {} is up".format(name)) + self.check_and_revive_jobs() except Exception as e: logger.error(e) gevent.killall(self.immortal_jobs.values(), timeout=5) diff --git a/openprocurement/bot/identification/databridge/filter_tender.py b/openprocurement/bot/identification/databridge/filter_tender.py index 8c8675f..9cd5771 100644 --- a/openprocurement/bot/identification/databridge/filter_tender.py +++ b/openprocurement/bot/identification/databridge/filter_tender.py @@ -1,6 +1,5 @@ # -*- coding: utf-8 -*- from gevent import monkey - monkey.patch_all() import logging.config @@ -63,137 +62,123 @@ def prepare_data(self): path='{}/{}'.format(self.tenders_sync_client.prefix_path, tender_id), headers={'X-Client-Request-ID': generate_req_id()}) - except ResourceError as re: - if re.status_int == 429: + except Exception as e: + if getattr(e, "status_int", False) and e.status_int == 429: self.sleep_change_value.increment() - logger.info("Waiting tender {} for sleep_change_value: {} seconds".format(tender_id, - self.sleep_change_value.time_between_requests)) + logger.info("Waiting tender {} for sleep_change_value: {} seconds".format( + tender_id, self.sleep_change_value.time_between_requests)) else: logger.warning('Fail to get tender info {}'.format(tender_id), extra=journal_context(params={"TENDER_ID": tender_id})) - logger.exception("Message: {}, status_code {}".format(re.msg, re.status_int)) + logger.exception("Message: {}".format(e.message)) logger.info('Leave tender {} in tenders queue'.format(tender_id), extra=journal_context(params={"TENDER_ID": tender_id})) - gevent.sleep(0) - except Exception as e: - logger.warning('Fail to get tender info {}'.format(tender_id), - extra=journal_context(params={"TENDER_ID": tender_id})) - logger.exception("Message: {}".format(e.message)) - logger.info('Leave tender {} in tenders queue'.format(tender_id), - extra=journal_context(params={"TENDER_ID": tender_id})) - gevent.sleep(0) + gevent.sleep() else: - self.sleep_change_value.decrement() - if response.status_int == 200: - tender = munchify(loads(response.body_string()))['data'] - logger.info('Get tender {} from filtered_tender_ids_queue'.format(tender_id), - extra=journal_context({"MESSAGE_ID": DATABRIDGE_GET_TENDER_FROM_QUEUE}, - params={"TENDER_ID": tender['id']})) - if 'awards' in tender: - for award in tender['awards']: - logger.info( - 'Processing tender {} bid {} award {}'.format(tender['id'], award['bid_id'], award['id']), - extra=journal_context({"MESSAGE_ID": DATABRIDGE_TENDER_PROCESS}, - params={"TENDER_ID": tender['id'], "BID_ID": award['bid_id'], - "AWARD_ID": award['id']})) - if self.should_process_item(award): - for supplier in award['suppliers']: - code = supplier['identifier']['id'] - if self.is_code_invalid(code): - self.filtered_tender_ids_queue.get() - logger.info(u'Tender {} bid {} award {} identifier id {} is not valid.'.format( - tender['id'], award['bid_id'], award['id'], code), - extra=journal_context({"MESSAGE_ID": DATABRIDGE_TENDER_NOT_PROCESS}, - params={"TENDER_ID": tender['id'], - "BID_ID": award['bid_id'], - "AWARD_ID": award['id']})) - continue - # quick check if item was already processed - if self.process_tracker.check_processed_item(tender['id'], award['id']): - logger.info('Tender {} bid {} award {} was already processed.'.format( - tender['id'], award['bid_id'], award['id']), - extra=journal_context({"MESSAGE_ID": DATABRIDGE_TENDER_NOT_PROCESS}, - params={"TENDER_ID": tender['id'], - "BID_ID": award['bid_id'], - "AWARD_ID": award['id']})) - elif self.should_process_award(supplier, tender, award): - self.process_tracker.set_item(tender['id'], award['id']) - document_id = generate_doc_id() - tender_data = Data(tender['id'], award['id'], str(code), - 'awards', {'meta': {'id': document_id, 'author': author, - 'sourceRequests': [ - response.headers['X-Request-ID']]}}) - self.edrpou_codes_queue.put(tender_data) - else: - logger.info( - 'Tender {} bid {} award {} identifier schema isn\'t UA-EDR or tender is already in process.'.format( - tender['id'], award['bid_id'], award['id']), - extra=journal_context({"MESSAGE_ID": DATABRIDGE_TENDER_NOT_PROCESS}, - params={"TENDER_ID": tender['id'], - "BID_ID": award['bid_id'], - "AWARD_ID": award['id']})) - else: - logger.info( - 'Tender {} bid {} award {} is not in status pending or award has already document ' - 'with documentType registerExtract.'.format(tender_id, award['bid_id'], award['id']), - extra=journal_context(params={"TENDER_ID": tender['id'], "BID_ID": award['bid_id'], - "AWARD_ID": award['id']})) - elif 'qualifications' in tender: - for qualification in tender['qualifications']: - if self.should_process_item(qualification): - appropriate_bid = [b for b in tender['bids'] if b['id'] == qualification['bidID']][0] - code = appropriate_bid['tenderers'][0]['identifier']['id'] - if self.is_code_invalid(code): - self.filtered_tender_ids_queue.get() - logger.info(u'Tender {} bid {} qualification {} identifier id {} is not valid.'.format( - tender['id'], qualification['bidID'], qualification['id'], code), - extra=journal_context({"MESSAGE_ID": DATABRIDGE_TENDER_NOT_PROCESS}, - params={"TENDER_ID": tender['id'], "BID_ID": - qualification['bidID'], - "QUALIFICATION_ID": qualification['id']})) - continue - # quick check if item was already processed - if self.process_tracker.check_processed_item(tender['id'], qualification['id']): - logger.info('Tender {} bid {} qualification {} was already processed.'.format( - tender['id'], qualification['bidID'], qualification['id']), - extra=journal_context({"MESSAGE_ID": DATABRIDGE_TENDER_NOT_PROCESS}, - params={"TENDER_ID": tender['id'], - "BID_ID": qualification['bidID'], - "QUALIFICATION_ID": qualification['id']})) - # check first identification scheme, if yes then check if item is already in process or not - elif self.should_process_qualification(appropriate_bid, tender, qualification): - self.process_tracker.set_item(tender['id'], qualification['id']) - document_id = generate_doc_id() - tender_data = Data(tender['id'], qualification['id'], str(code), - 'qualifications', {'meta': {'id': document_id, 'author': author, - 'sourceRequests': [ - response.headers['X-Request-ID']]}}) - self.edrpou_codes_queue.put(tender_data) - logger.info('Processing tender {} bid {} qualification {}'.format( - tender['id'], qualification['bidID'], qualification['id']), - extra=journal_context({"MESSAGE_ID": DATABRIDGE_TENDER_PROCESS}, - params={"TENDER_ID": tender['id'], - "BID_ID": qualification['bidID'], - "QUALIFICATION_ID": qualification['id']})) - else: - logger.info( - 'Tender {} bid {} qualification {} identifier schema is not UA-EDR or tender is already in process.'.format( - tender['id'], qualification['bidID'], qualification['id']), - extra=journal_context({"MESSAGE_ID": DATABRIDGE_TENDER_NOT_PROCESS}, - params={"TENDER_ID": tender['id'], - "BID_ID": qualification['bidID'], - "QUALIFICATION_ID": qualification['id']})) - else: - logger.info( - 'Tender {} bid {} qualification {} is not in status pending or qualification has ' - 'already document with documentType registerExtract.'.format( - tender_id, qualification['bidID'], qualification['id']), - extra=journal_context(params={"TENDER_ID": tender['id'], - "BID_ID": qualification['bidID'], - "QUALIFICATION_ID": qualification['id']})) - self.filtered_tender_ids_queue.get() # Remove elem from queue + self.process_items_and_move(response, tender_id) gevent.sleep(self.sleep_change_value.time_between_requests) + def process_items_and_move(self, response, tender_id): + self.sleep_change_value.decrement() + if response.status_int == 200: + tender = munchify(loads(response.body_string()))['data'] + logger.info('Get tender {} from filtered_tender_ids_queue'.format(tender_id), + extra=journal_context({"MESSAGE_ID": DATABRIDGE_GET_TENDER_FROM_QUEUE}, + params={"TENDER_ID": tender['id']})) + if 'awards' in tender: + for award in tender['awards']: + self.process_item(response, tender, tender_id, award, "award") + elif 'qualifications' in tender: + for qualification in tender['qualifications']: + self.process_item(response, tender, tender_id, qualification, "qualification") + self.filtered_tender_ids_queue.get() # Remove elem from queue + + def process_item(self, response, tender, tender_id, item, item_name): + logger.info('Processing tender {} bid {} {} {}'.format(tender['id'], self.item_id(item), item_name, item['id']), + extra=journal_context({"MESSAGE_ID": DATABRIDGE_TENDER_PROCESS}, + params={"TENDER_ID": tender['id'], "BID_ID": self.item_id(item), + self.journal_item_name(item): item['id']})) + if self.should_process_item(item): + if item_name == 'award': + for supplier in item['suppliers']: + self.process_award_supplier(response, tender, item, supplier) + elif item_name == 'qualification': + self.process_qualification(response, tender, tender_id, item) + else: + logger.info('Tender {} bid {} {} {} is not in status pending or award has already document ' + 'with documentType registerExtract.'.format(tender_id, self.item_id(item), item_name, item['id']), + extra=journal_context(params={"TENDER_ID": tender['id'], "BID_ID": self.item_id(item), + self.journal_item_name(item): item['id']})) + + def temp_important_part_for_item(self, response, tender, item, item_name, code): + self.process_tracker.set_item(tender['id'], item['id']) + document_id = generate_doc_id() + tender_data = Data(tender['id'], item['id'], str(code), item_name+"s", + {'meta': {'id': document_id, 'author': author, + 'sourceRequests': [response.headers['X-Request-ID']]}}) + self.edrpou_codes_queue.put(tender_data) + logger.info('Processing tender {} bid {} {} {}'.format( + tender['id'], self.item_id(item), item_name, item['id']), + extra=journal_context({"MESSAGE_ID": DATABRIDGE_TENDER_PROCESS}, + params={"TENDER_ID": tender['id'], "BID_ID": self.item_id(item), + self.journal_item_name(item): item['id']})) + + def process_award_supplier(self, response, tender, award, supplier): + code = supplier['identifier']['id'] + if self.is_code_invalid(code): + self.remove_invalid_item(tender, award, "award", code) + elif self.process_tracker.check_processed_item(tender['id'], award['id']): + logger.info('Tender {} bid {} award {} was already processed.'.format( + tender['id'], award['bid_id'], award['id']), + extra=journal_context({"MESSAGE_ID": DATABRIDGE_TENDER_NOT_PROCESS}, + params={"TENDER_ID": tender['id'], "BID_ID": award['bid_id'], + "AWARD_ID": award['id']})) + elif self.should_process_award(supplier, tender, award): + self.temp_important_part_for_item(response, tender, award, "award", code) + else: + logger.info( + 'Tender {} bid {} award {} identifier schema isn\'t UA-EDR or tender is already in process.'.format( + tender['id'], award['bid_id'], award['id']), + extra=journal_context({"MESSAGE_ID": DATABRIDGE_TENDER_NOT_PROCESS}, + params={"TENDER_ID": tender['id'], "BID_ID": award['bid_id'], + "AWARD_ID": award['id']})) + + def remove_invalid_item(self, tender, item, item_name, code): + self.filtered_tender_ids_queue.get() + logger.info(u'Tender {} bid {} {} {} identifier id {} is not valid.'.format( + tender['id'], item_name, self.item_id(item), item['id'], code), + extra=journal_context({"MESSAGE_ID": DATABRIDGE_TENDER_NOT_PROCESS}, + params={"TENDER_ID": tender['id'], "BID_ID": self.item_id(item), + self.journal_item_name(item): item['id']})) + + def item_id(self, item): + return item['bidID' if item.get('bidID') else 'bid_id'] + + def journal_item_name(self, item): + return "QUALIFICATION_ID" if item.get('bidID') else "AWARD_ID" + + def process_qualification(self, response, tender, tender_id, qualification): + appropriate_bid = [b for b in tender['bids'] if b['id'] == qualification['bidID']][0] + code = appropriate_bid['tenderers'][0]['identifier']['id'] + if self.is_code_invalid(code): + self.remove_invalid_item(tender, qualification, "qualification", code) + elif self.process_tracker.check_processed_item(tender['id'], qualification['id']): + logger.info('Tender {} bid {} qualification {} was already processed.'.format( + tender['id'], qualification['bidID'], qualification['id']), + extra=journal_context({"MESSAGE_ID": DATABRIDGE_TENDER_NOT_PROCESS}, + params={"TENDER_ID": tender['id'], "BID_ID": qualification['bidID'], + "QUALIFICATION_ID": qualification['id']})) + elif self.should_process_qualification(appropriate_bid, tender, qualification): + self.temp_important_part_for_item(response, tender, qualification, "qualification", code) + else: + logger.info( + 'Tender {} bid {} qualification {} identifier schema is not UA-EDR or ' + 'tender is already in process.'.format( + tender['id'], qualification['bidID'], qualification['id']), + extra=journal_context({"MESSAGE_ID": DATABRIDGE_TENDER_NOT_PROCESS}, + params={"TENDER_ID": tender['id'], "BID_ID": qualification['bidID'], + "QUALIFICATION_ID": qualification['id']})) + def should_process_item(self, item): return (item['status'] == 'pending' and not [document for document in item.get('documents', []) if document.get('documentType') == 'registerExtract']) diff --git a/openprocurement/bot/identification/databridge/sleep_change_value.py b/openprocurement/bot/identification/databridge/sleep_change_value.py index 26b62ba..a216deb 100644 --- a/openprocurement/bot/identification/databridge/sleep_change_value.py +++ b/openprocurement/bot/identification/databridge/sleep_change_value.py @@ -1,13 +1,14 @@ -class APIRateController: +# coding=utf-8 +class APIRateController(object): def __init__(self, increment_step=1, decrement_step=1): self.increment_step = increment_step self.decrement_step = decrement_step self.time_between_requests = 0 def decrement(self): - self.time_between_requests -= self.decrement_step if self.decrement_step < self.time_between_requests else 0 + self.time_between_requests -= self.decrement_step if self.decrement_step <= self.time_between_requests else 0 return self.time_between_requests def increment(self): self.time_between_requests += self.increment_step - return self.time_between_requests \ No newline at end of file + return self.time_between_requests diff --git a/openprocurement/bot/identification/databridge/upload_file.py b/openprocurement/bot/identification/databridge/upload_file.py deleted file mode 100644 index 7854ea9..0000000 --- a/openprocurement/bot/identification/databridge/upload_file.py +++ /dev/null @@ -1,309 +0,0 @@ -# -*- coding: utf-8 -*- -from munch import munchify -from gevent.queue import Queue -from retrying import retry - -import logging.config -import gevent -from datetime import datetime -from gevent import Greenlet, spawn -from gevent.hub import LoopExit -from restkit import ResourceError - -from openprocurement.bot.identification.databridge.utils import journal_context, Data, create_file, data_string -from openprocurement.bot.identification.databridge.journal_msg_ids import DATABRIDGE_SUCCESS_UPLOAD_TO_DOC_SERVICE, \ - DATABRIDGE_UNSUCCESS_UPLOAD_TO_DOC_SERVICE, DATABRIDGE_SUCCESS_UPLOAD_TO_TENDER, \ - DATABRIDGE_UNSUCCESS_UPLOAD_TO_TENDER, DATABRIDGE_START_UPLOAD, DATABRIDGE_ITEM_STATUS_CHANGED_WHILE_PROCESSING -from openprocurement.bot.identification.databridge.constants import file_name, retry_mult - -logger = logging.getLogger(__name__) - - -class UploadFile(Greenlet): - """ Upload file with details """ - - def __init__(self, client, upload_to_doc_service_queue, upload_to_tender_queue, process_tracker, doc_service_client, - services_not_available, sleep_change_value, delay=15): - super(UploadFile, self).__init__() - self.exit = False - self.start_time = datetime.now() - - self.delay = delay - self.process_tracker = process_tracker - - # init clients - self.client = client - self.doc_service_client = doc_service_client - - # init queues for workers - self.upload_to_doc_service_queue = upload_to_doc_service_queue - self.upload_to_tender_queue = upload_to_tender_queue - - self.sleep_change_value = sleep_change_value - # retry queues for workers - self.retry_upload_to_doc_service_queue = Queue(maxsize=500) - self.retry_upload_to_tender_queue = Queue(maxsize=500) - - # blockers - self.services_not_available = services_not_available - - def removing_data(self, retry, re, tender_data, document_id, item_name_id): - logger.warning("Accept {} while uploading to {} doc_id: {}. Message {}".format( - re.status_int, data_string(tender_data), document_id, re.msg), - extra=journal_context({"MESSAGE_ID": DATABRIDGE_ITEM_STATUS_CHANGED_WHILE_PROCESSING}, - {"TENDER_ID": tender_data.tender_id, item_name_id: tender_data.item_id, - "DOCUMENT_ID": document_id})) - self.process_tracker.update_items_and_tender(tender_data.tender_id, tender_data.item_id) - self.sleep_change_value.decrement() - if retry == '': - self.upload_to_tender_queue.get() - elif retry == 'retry': - self.retry_upload_to_tender_queue.get() - - def decrease_request_frequency(self, re, tender_data, document_id, item_name_id): - logger.info("Accept 429 while uploading to tender {} {} {} doc_id: {}. Message {}".format( - tender_data.tender_id, tender_data.item_name, tender_data.item_id, document_id, re.msg), - extra=journal_context({"MESSAGE_ID": DATABRIDGE_ITEM_STATUS_CHANGED_WHILE_PROCESSING}, - {"TENDER_ID": tender_data.tender_id, item_name_id: tender_data.item_id, - "DOCUMENT_ID": document_id})) - self.sleep_change_value.increment() - - def resource_errors_else(self, retry, re, tender_data, document_id, item_name_id): - logger.info( - 'ResourceError while{} uploading file to {} doc_id: {}. ResourceErrorStatus_int: {}. Message: {}'.format( - '' if retry == '' else ' ' + retry, data_string(tender_data), document_id, re.status_int, re.message), - extra=journal_context( - {"MESSAGE_ID": DATABRIDGE_UNSUCCESS_UPLOAD_TO_TENDER.format('' if retry == '' else '_' + retry)}, - params={"TENDER_ID": tender_data.tender_id, - item_name_id: tender_data.item_id, "DOCUMENT_ID": document_id})) - self.sleep_change_value.decrement() - - def exception_errors(self, retry, e, tender_data, document_id, item_name_id): - logger.info('Exception while{} uploading file to {} doc_id: {}. Message: {}'.format( - '' if retry == '' else ' ' + retry, data_string(tender_data), document_id, e.message), - extra=journal_context( - {"MESSAGE_ID": DATABRIDGE_UNSUCCESS_UPLOAD_TO_TENDER.format('' if retry == '' else '_' + retry)}, - params={"TENDER_ID": tender_data.tender_id, item_name_id: tender_data.item_id, - "DOCUMENT_ID": document_id})) - logger.exception("Message: {}".format(e.message)) - self.sleep_change_value.decrement() - - def succesfully_uploaded_to_tender(self, retry, tender_data, document_id, item_name_id): - logger.info('Successfully uploaded file to {} doc_id: {} in retry'.format( - data_string(tender_data), document_id), - extra=journal_context({"MESSAGE_ID": DATABRIDGE_SUCCESS_UPLOAD_TO_TENDER}, - params={"TENDER_ID": tender_data.tender_id, item_name_id: tender_data.item_id, - "DOCUMENT_ID": document_id})) - # delete current tender after successful upload file (to avoid reloading file) - self.process_tracker.update_items_and_tender(tender_data.tender_id, tender_data.item_id) - self.sleep_change_value.decrement() - if retry == '': - self.upload_to_tender_queue.get() - elif retry == 'retry': - self.retry_upload_to_tender_queue.get() - - def move_to_tender_queue(self, retry, tender_data, response, document_id, item_name_id): - data = Data(tender_data.tender_id, tender_data.item_id, tender_data.code, - tender_data.item_name, dict(response.json(), **{'meta': {'id': document_id}})) - self.upload_to_tender_queue.put(data) - if retry == '': - self.upload_to_doc_service_queue.get() - elif retry == 'retry': - self.retry_upload_to_doc_service_queue.get() - logger.info('Successfully uploaded file to doc service {} doc_id: {}'.format( - data_string(tender_data), document_id), - extra=journal_context({"MESSAGE_ID": DATABRIDGE_SUCCESS_UPLOAD_TO_DOC_SERVICE}, - params={"TENDER_ID": tender_data.tender_id, - item_name_id: tender_data.item_id, - "DOCUMENT_ID": document_id})) - - def upload_to_doc_service(self): - """Get data from upload_to_doc_service_queue; Create file of the Data.file_content data; If upload successful put Data - object to upload_file_to_tender, otherwise put Data to retry_upload_file_queue.""" - while not self.exit: - self.services_not_available.wait() - try: - tender_data = self.upload_to_doc_service_queue.peek() - document_id = tender_data.file_content.get('meta', {}).get('id') - except LoopExit: - gevent.sleep(0) - continue - item_name_id = tender_data.item_name[:-1].upper() + "_ID" - try: - response = self.doc_service_client.upload(file_name, create_file(tender_data.file_content), - 'application/yaml', - headers={'X-Client-Request-ID': document_id}) - except Exception as e: - logger.warning('Exception while uploading file to doc service {}. Message: {}. ' - 'Put tender_data to retry queue '.format(data_string(tender_data), e.message), - extra=journal_context({"MESSAGE_ID": DATABRIDGE_UNSUCCESS_UPLOAD_TO_DOC_SERVICE}, - params={"TENDER_ID": tender_data.tender_id, - item_name_id: tender_data.item_id, - "DOCUMENT_ID": document_id})) - logger.exception("Message: {}".format(e.message)) - self.retry_upload_to_doc_service_queue.put(tender_data) - self.upload_to_doc_service_queue.get() - else: - if response.status_code == 200: - self.move_to_tender_queue('', tender_data, response, document_id, item_name_id) - else: - logger.info( - 'Not successful response from document service while uploading {} doc_id: {}. Response {}'. - format(data_string(tender_data), document_id, response.status_code), - extra=journal_context({"MESSAGE_ID": DATABRIDGE_UNSUCCESS_UPLOAD_TO_DOC_SERVICE}, - params={"TENDER_ID": tender_data.tender_id, - item_name_id: tender_data.item_id, "DOCUMENT_ID": document_id})) - self.retry_upload_to_doc_service_queue.put(tender_data) - self.upload_to_doc_service_queue.get() - gevent.sleep(0) - - def retry_upload_to_doc_service(self): - """Get data from retry_upload_to_doc_service_queue; If upload were successful put Data obj to - upload_to_tender_queue, otherwise put Data obj back to retry_upload_file_queue""" - while not self.exit: - self.services_not_available.wait() - try: - tender_data = self.retry_upload_to_doc_service_queue.peek() - document_id = tender_data.file_content.get('meta', {}).get('id') - except LoopExit: - gevent.sleep(0) - continue - item_name_id = tender_data.item_name[:-1].upper() + "_ID" - try: - # create patch request to award/qualification with document to upload - self.client.headers.update({'X-Client-Request-ID': document_id}) - response = self.client_upload_to_doc_service(tender_data) - except Exception as e: - logger.warning('Exception while uploading file to doc service {} doc_id: {}. Message: {}. ' - 'Lost tender_data'.format(data_string(tender_data), document_id, e.message), - extra=journal_context({"MESSAGE_ID": DATABRIDGE_UNSUCCESS_UPLOAD_TO_DOC_SERVICE}, - params={"TENDER_ID": tender_data.tender_id, - item_name_id: tender_data.item_id, - "DOCUMENT_ID": document_id})) - logger.exception("Message: {}".format(e.message)) - self.retry_upload_to_doc_service_queue.get() - self.process_tracker.update_items_and_tender(tender_data.tender_id, tender_data.item_id) - raise e - else: - if response.status_code == 200: - self.move_to_tender_queue('retry', tender_data, response, document_id, item_name_id) - else: - logger.info( - 'Not successful response in retry from document service while uploading {} {} {} {}. Response {}'. - format(tender_data.tender_id, tender_data.item_name, tender_data.item_id, document_id, - response.status_code), - extra=journal_context( - {"MESSAGE_ID": DATABRIDGE_UNSUCCESS_UPLOAD_TO_DOC_SERVICE.format('_retry')}, - params={"TENDER_ID": tender_data.tender_id, - item_name_id: tender_data.item_id, "DOCUMENT_ID": document_id})) - gevent.sleep(0) - - @retry(stop_max_attempt_number=5, wait_exponential_multiplier=retry_mult) - def client_upload_to_doc_service(self, tender_data): - """Process upload request for retry queue objects.""" - return self.doc_service_client.upload(file_name, create_file(tender_data.file_content), 'application/yaml', - headers={ - 'X-Client-Request-ID': tender_data.file_content.get('meta', {}).get( - 'id')}) - - def upload_to_tender(self): - """Get data from upload_to_tender_queue; Upload get_Url and documentType; - If upload to tender were unsuccessful put Data object to retry_upload_to_tender_queue, otherwise delete given - award/qualification from processing_items.""" - while not self.exit: - self.services_not_available.wait() - try: - tender_data = self.upload_to_tender_queue.peek() - except LoopExit: - gevent.sleep(0) - continue - document_data = tender_data.file_content.get('data', {}) - document_id = tender_data.file_content.get('meta', {}).get('id') - document_data["documentType"] = "registerExtract" - item_name_id = tender_data.item_name[:-1].upper() + "_ID" - try: - self.client.headers.update({'X-Client-Request-ID': document_id}) - self.client._create_tender_resource_item(munchify({'data': {'id': tender_data.tender_id}}), - {'data': document_data}, - '{}/{}/documents'.format(tender_data.item_name, - tender_data.item_id)) - except ResourceError as re: - if re.status_int == 403 or re.status_int == 422 or re.status_int is None: - self.removing_data('', re, tender_data, document_id, item_name_id) - elif re.status_int == 429: - self.decrease_request_frequency(re, tender_data, document_id, item_name_id) - else: - self.resource_errors_else('', re, tender_data, document_id, item_name_id) - self.retry_upload_to_tender_queue.put(tender_data) - self.upload_to_tender_queue.get() - except Exception as e: - self.exception_errors('', e, tender_data, document_id, item_name_id) - self.retry_upload_to_tender_queue.put(tender_data) - self.upload_to_tender_queue.get() - else: - self.succesfully_uploaded_to_tender('', tender_data, document_id, item_name_id) - gevent.sleep(self.sleep_change_value.time_between_requests) - - def retry_upload_to_tender(self): - """Get data from retry_upload_to_tender_queue; If upload was unsuccessful put Data obj back to - retry_upload_to_tender_queue""" - while not self.exit: - self.services_not_available.wait() - try: - tender_data = self.retry_upload_to_tender_queue.peek() - except LoopExit: - gevent.sleep(0) - continue - document_id = tender_data.file_content.get('meta', {}).get('id') - self.client.headers.update({'X-Client-Request-ID': document_id}) - item_name_id = tender_data.item_name[:-1].upper() + "_ID" - try: - self.client_upload_to_tender(tender_data) - except ResourceError as re: - if re.status_int == 403 or re.status_int == 422 or re.status_int is None: - self.removing_data('retry', re, tender_data, document_id, item_name_id) - elif re.status_int == 429: - self.decrease_request_frequency(re, tender_data, document_id, item_name_id) - else: - self.resource_errors_else('retry', re, tender_data, document_id, item_name_id) - except Exception as e: - self.exception_errors('retry', e, tender_data, document_id, item_name_id) - else: - self.succesfully_uploaded_to_tender('retry', tender_data, document_id, item_name_id) - gevent.sleep(self.sleep_change_value.time_between_requests) - - @retry(stop_max_attempt_number=5, wait_exponential_multiplier=retry_mult) - def client_upload_to_tender(self, tender_data): - """Process upload to tender request for retry queue objects.""" - document_data = tender_data.file_content.get('data', {}) - document_data["documentType"] = "registerExtract" - self.client.headers.update({'X-Client-Request-ID': tender_data.file_content.get('meta', {}).get('id')}) - self.client._create_tender_resource_item(munchify({'data': {'id': tender_data.tender_id}}), - {'data': document_data}, - '{}/{}/documents'.format(tender_data.item_name, - tender_data.item_id)) - - def _run(self): - logger.info('Start UploadFile worker', extra=journal_context({"MESSAGE_ID": DATABRIDGE_START_UPLOAD}, {})) - self.immortal_jobs = {'upload_to_doc_service': spawn(self.upload_to_doc_service), - 'upload_to_tender': spawn(self.upload_to_tender), - 'retry_upload_to_doc_service': spawn(self.retry_upload_to_doc_service), - 'retry_upload_to_tender': spawn(self.retry_upload_to_tender)} - - try: - while not self.exit: - gevent.sleep(self.delay) - for name, job in self.immortal_jobs.items(): - if job.dead: - logger.warning("{} worker dead try restart".format(name), extra=journal_context( - {"MESSAGE_ID": 'DATABRIDGE_RESTART_{}'.format(name.lower())}, {})) - self.immortal_jobs[name] = gevent.spawn(getattr(self, name)) - logger.info("{} worker is up".format(name)) - - except Exception as e: - logger.error(e) - gevent.killall(self.immortal_jobs.values(), timeout=5) - - def shutdown(self): - self.exit = True - logger.info('Worker UploadFile complete his job.') diff --git a/openprocurement/bot/identification/databridge/upload_file_to_doc_service.py b/openprocurement/bot/identification/databridge/upload_file_to_doc_service.py new file mode 100644 index 0000000..1248171 --- /dev/null +++ b/openprocurement/bot/identification/databridge/upload_file_to_doc_service.py @@ -0,0 +1,162 @@ +# -*- coding: utf-8 -*- +from munch import munchify +from gevent.queue import Queue +from retrying import retry + +import logging.config +import gevent +from datetime import datetime +from gevent import Greenlet, spawn +from gevent.hub import LoopExit + +from openprocurement.bot.identification.databridge.utils import journal_context, create_file +from openprocurement.bot.identification.databridge.journal_msg_ids import DATABRIDGE_SUCCESS_UPLOAD_TO_DOC_SERVICE, \ + DATABRIDGE_UNSUCCESS_UPLOAD_TO_DOC_SERVICE, DATABRIDGE_START_UPLOAD +from openprocurement.bot.identification.databridge.constants import file_name, retry_mult + +logger = logging.getLogger(__name__) + + +class UploadFileToDocService(Greenlet): + """ Upload file with details """ + + def __init__(self, upload_to_doc_service_queue, upload_to_tender_queue, process_tracker, doc_service_client, + services_not_available, sleep_change_value, delay=15): + super(UploadFileToDocService, self).__init__() + self.exit = False + self.start_time = datetime.now() + + self.delay = delay + self.process_tracker = process_tracker + + # init client + self.doc_service_client = doc_service_client + + # init queues for workers + self.upload_to_doc_service_queue = upload_to_doc_service_queue + self.upload_to_tender_queue = upload_to_tender_queue + + self.sleep_change_value = sleep_change_value + # retry queues for workers + self.retry_upload_to_doc_service_queue = Queue(maxsize=500) + # blockers + self.services_not_available = services_not_available + + def upload_to_doc_service(self): + """Get data from upload_to_doc_service_queue; Create file of the Data.file_content data; If upload successful put Data + object to upload_file_to_tender, otherwise put Data to retry_upload_file_queue.""" + while not self.exit: + self.services_not_available.wait() + self.try_peek_and_upload(is_retry=False) + gevent.sleep(0) + + def retry_upload_to_doc_service(self): + """Get data from retry_upload_to_doc_service_queue; If upload were successful put Data obj to + upload_to_tender_queue, otherwise put Data obj back to retry_upload_file_queue""" + while not self.exit: + self.services_not_available.wait() + self.try_peek_and_upload(is_retry=True) + gevent.sleep(0) + + def try_peek_and_upload(self, is_retry): + try: + tender_data = self.peek_from_queue(is_retry) + except LoopExit: + gevent.sleep(0) + else: + self.try_upload_to_doc_service(tender_data, is_retry) + + def peek_from_queue(self, is_retry): + return self.retry_upload_to_doc_service_queue.peek() if is_retry else self.upload_to_doc_service_queue.peek() + + def try_upload_to_doc_service(self, tender_data, is_retry): + try: + response = self.update_headers_and_upload(tender_data, is_retry) + except Exception as e: + self.remove_bad_data(tender_data, e, is_retry) + else: + self.move_to_tender_if_200(response, tender_data, is_retry) + + def update_headers_and_upload(self, tender_data, is_retry): + if is_retry: + return self.update_headers_and_upload_retry(tender_data) + else: + return self.doc_service_client.upload(file_name, create_file(tender_data.file_content), 'application/yaml', + headers={'X-Client-Request-ID': tender_data.doc_id()}) + + def update_headers_and_upload_retry(self, tender_data): + self.doc_service_client.headers.update({'X-Client-Request-ID': tender_data.doc_id()}) + return self.client_upload_to_doc_service(tender_data) + + def remove_bad_data(self, tender_data, e, is_retry): + logger.exception('Exception while uploading file to doc service {} doc_id: {}. Message: {}. {}'. + format(tender_data, tender_data.doc_id(), e, "Removed tender data" if is_retry else ""), + extra=journal_context({"MESSAGE_ID": DATABRIDGE_UNSUCCESS_UPLOAD_TO_DOC_SERVICE}, + tender_data.log_params())) + if is_retry: + self.retry_upload_to_doc_service_queue.get() + self.process_tracker.update_items_and_tender(tender_data.tender_id, tender_data.item_id) + raise e + else: + self.retry_upload_to_doc_service_queue.put(tender_data) + self.upload_to_doc_service_queue.get() + + def move_to_tender_if_200(self, response, tender_data, is_retry): + if response.status_code == 200: + self.move_to_tender_queue(tender_data, response, is_retry) + else: + self.move_data_to_retry_or_leave(response, tender_data, is_retry) + + def move_to_tender_queue(self, tender_data, response, is_retry): + data = tender_data + data.file_content = dict(response.json(), **{'meta': {'id': tender_data.doc_id()}}) + self.upload_to_tender_queue.put(data) + if not is_retry: + self.upload_to_doc_service_queue.get() + else: + self.retry_upload_to_doc_service_queue.get() + logger.info('Successfully uploaded file to doc service {} doc_id: {}'.format(tender_data, tender_data.doc_id()), + extra=journal_context({"MESSAGE_ID": DATABRIDGE_SUCCESS_UPLOAD_TO_DOC_SERVICE}, tender_data.log_params())) + + def move_data_to_retry_or_leave(self, response, tender_data, is_retry): + logger.info('Not successful response from document service while uploading {} doc_id: {}. Response {}'. + format(tender_data, tender_data.doc_id(), response.status_code), + extra=journal_context({"MESSAGE_ID": DATABRIDGE_UNSUCCESS_UPLOAD_TO_DOC_SERVICE}, + tender_data.log_params())) + if not is_retry: + self.retry_upload_to_doc_service_queue.put(tender_data) + self.upload_to_doc_service_queue.get() + + @retry(stop_max_attempt_number=5, wait_exponential_multiplier=retry_mult) + def client_upload_to_doc_service(self, tender_data): + """Process upload request for retry queue objects.""" + return self.doc_service_client.upload(file_name, create_file(tender_data.file_content), 'application/yaml', + headers={'X-Client-Request-ID': tender_data.doc_id()}) + + def _start_jobs(self): + return {'upload_to_doc_service': spawn(self.upload_to_doc_service), + 'retry_upload_to_doc_service': spawn(self.retry_upload_to_doc_service)} + + def _run(self): + logger.info('Start UploadFileToDocService worker', + extra=journal_context({"MESSAGE_ID": DATABRIDGE_START_UPLOAD}, {})) + self.immortal_jobs = self._start_jobs() + try: + while not self.exit: + gevent.sleep(self.delay) + self.check_and_revive_jobs() + except Exception as e: + logger.error(e) + gevent.killall(self.immortal_jobs.values(), timeout=5) + + def check_and_revive_jobs(self): + for name, job in self.immortal_jobs.items(): + if job.dead: + logger.warning("{} worker dead try restart".format(name), extra=journal_context( + {"MESSAGE_ID": 'DATABRIDGE_RESTART_{}'.format(name.lower())}, {})) + self.immortal_jobs[name] = gevent.spawn(getattr(self, name)) + logger.info("{} worker is up".format(name)) + + def shutdown(self): + self.exit = True + logger.info('Worker UploadFileToDocService complete his job.') diff --git a/openprocurement/bot/identification/databridge/upload_file_to_tender.py b/openprocurement/bot/identification/databridge/upload_file_to_tender.py new file mode 100644 index 0000000..9a07624 --- /dev/null +++ b/openprocurement/bot/identification/databridge/upload_file_to_tender.py @@ -0,0 +1,174 @@ +# coding=utf-8 +from munch import munchify +from gevent.queue import Queue, Empty +from retrying import retry + +import logging.config +import gevent +from datetime import datetime +from gevent import Greenlet, spawn +from gevent.hub import LoopExit +from restkit import ResourceError + +from openprocurement.bot.identification.databridge.utils import journal_context +from openprocurement.bot.identification.databridge.journal_msg_ids import DATABRIDGE_SUCCESS_UPLOAD_TO_TENDER, \ + DATABRIDGE_UNSUCCESS_UPLOAD_TO_TENDER, DATABRIDGE_ITEM_STATUS_CHANGED_WHILE_PROCESSING, DATABRIDGE_START_UPLOAD +from openprocurement.bot.identification.databridge.constants import retry_mult + + +logger = logging.getLogger(__name__) + + +class UploadFileToTender(Greenlet): + + def __init__(self, client, upload_to_tender_queue, process_tracker, services_not_available, sleep_change_value, delay=15): + super(UploadFileToTender, self).__init__() + self.exit = False + self.start_time = datetime.now() + + self.delay = delay + self.process_tracker = process_tracker + + # init clients + self.client = client + + # init queues for workers + self.upload_to_tender_queue = upload_to_tender_queue + self.retry_upload_to_tender_queue = Queue(maxsize=500) + + # blockers + self.services_not_available = services_not_available + self.sleep_change_value = sleep_change_value + + def upload_to_tender(self): + """Get data from upload_to_tender_queue; Upload get_Url and documentType; + If upload to tender were unsuccessful put Data object to retry_upload_to_tender_queue, otherwise delete given + award/qualification from processing_items.""" + while not self.exit: + self.services_not_available.wait() + self.try_peek_data_and_upload_to_tender(False) + gevent.sleep(self.sleep_change_value.time_between_requests) + + def retry_upload_to_tender(self): + """Get data from retry_upload_to_tender_queue; If upload was unsuccessful put Data obj back to + retry_upload_to_tender_queue""" + while not self.exit: + self.services_not_available.wait() + self.try_peek_data_and_upload_to_tender(True) + gevent.sleep(self.sleep_change_value.time_between_requests) + + def try_peek_data_and_upload_to_tender(self, is_retry): + try: + tender_data = self.peek_from_tender_queue(is_retry) + except LoopExit: + gevent.sleep(0) + else: + self.try_upload_to_tender(tender_data, is_retry) + + def peek_from_tender_queue(self, is_retry): + return self.retry_upload_to_tender_queue.peek() if is_retry else self.upload_to_tender_queue.peek() + + def try_upload_to_tender(self, tender_data, is_retry): + try: + self.update_headers_and_upload_to_tender(tender_data, is_retry) + except ResourceError as re: + self.remove_data_or_increase_wait(re, tender_data, is_retry) + except Exception as e: + self.handle_error(e, tender_data, is_retry) + else: + self.successfully_uploaded_to_tender(tender_data, is_retry) + + def update_headers_and_upload_to_tender(self, tender_data, is_retry): + if is_retry: + self.do_upload_to_tender_with_retry(tender_data) + else: + self.do_upload_to_tender(tender_data) + + def do_upload_to_tender(self, tender_data): + document_data = tender_data.file_content.get('data', {}) + document_data["documentType"] = "registerExtract" + self.client.headers.update({'X-Client-Request-ID': tender_data.doc_id()}) + self.client._create_tender_resource_item(munchify({'data': {'id': tender_data.tender_id}}), + {'data': document_data}, + '{}/{}/documents'.format(tender_data.item_name, tender_data.item_id)) + + @retry(stop_max_attempt_number=5, wait_exponential_multiplier=retry_mult) + def do_upload_to_tender_with_retry(self, tender_data): + """Process upload to tender request for retry queue objects.""" + self.do_upload_to_tender(tender_data) + + def remove_data_or_increase_wait(self, re, tender_data, is_retry): + if re.status_int == 403 or re.status_int == 422 or re.status_int is None: + self.removing_data(re, tender_data, is_retry) + elif re.status_int == 429: + self.decrease_request_frequency(re, tender_data) + else: + self.handle_error(re, tender_data, is_retry) + + def removing_data(self, re, tender_data, is_retry): + logger.warning("Accept {} while uploading to {} doc_id: {}. Message {}".format( + re.status_int, tender_data, tender_data.doc_id(), re.msg), + extra=journal_context({"MESSAGE_ID": DATABRIDGE_ITEM_STATUS_CHANGED_WHILE_PROCESSING}, + tender_data.log_params())) + self.process_tracker.update_items_and_tender(tender_data.tender_id, tender_data.item_id) + self.sleep_change_value.decrement() + if is_retry: + self.retry_upload_to_tender_queue.get() + else: + self.upload_to_tender_queue.get() + + def decrease_request_frequency(self, re, tender_data): + logger.info("Accept 429 while uploading to {} doc_id: {}. Message {}".format( + tender_data, tender_data.doc_id(), re.msg), + extra=journal_context({"MESSAGE_ID": DATABRIDGE_ITEM_STATUS_CHANGED_WHILE_PROCESSING}, + tender_data.log_params())) + self.sleep_change_value.increment() + + def handle_error(self, re, tender_data, is_retry): + logger.info('Error while uploading file to {} doc_id: {}. Status: {}. Message: {}'.format( + tender_data, tender_data.doc_id(), getattr(re, "status_int", None), re.message), + extra=journal_context({"MESSAGE_ID": DATABRIDGE_UNSUCCESS_UPLOAD_TO_TENDER}, tender_data.log_params())) + self.sleep_change_value.decrement() + if not is_retry: + self.retry_upload_to_tender_queue.put(tender_data) + self.upload_to_tender_queue.get() + + def successfully_uploaded_to_tender(self, tender_data, is_retry): + logger.info('Successfully uploaded file to {} doc_id: {}'.format(tender_data, tender_data.doc_id()), + extra=journal_context({"MESSAGE_ID": DATABRIDGE_SUCCESS_UPLOAD_TO_TENDER}, tender_data.log_params())) + # delete current tender after successful upload file (to avoid reloading file) + self.process_tracker.update_items_and_tender(tender_data.tender_id, tender_data.item_id) + self.sleep_change_value.decrement() + if is_retry: + self.retry_upload_to_tender_queue.get() + else: + self.upload_to_tender_queue.get() + + def _start_jobs(self): + return {'upload_to_tender': spawn(self.upload_to_tender), + 'retry_upload_to_tender': spawn(self.retry_upload_to_tender)} + + def _run(self): + logger.info('Start UploadFileToTender worker', + extra=journal_context({"MESSAGE_ID": DATABRIDGE_START_UPLOAD}, {})) + self.immortal_jobs = self._start_jobs() + try: + while not self.exit: + gevent.sleep(self.delay) + self.check_and_revive_jobs() + except Exception as e: + logger.error(e) + gevent.killall(self.immortal_jobs.values(), timeout=5) + + def check_and_revive_jobs(self): + for name, job in self.immortal_jobs.items(): + if job.dead: + logger.warning("{} worker dead try restart".format(name), extra=journal_context( + {"MESSAGE_ID": 'DATABRIDGE_RESTART_{}'.format(name.lower())}, {})) + self.immortal_jobs[name] = gevent.spawn(getattr(self, name)) + logger.info("{} worker is up".format(name)) + + def shutdown(self): + self.exit = True + logger.info('Worker UploadFileToTender complete his job.') + diff --git a/openprocurement/bot/identification/databridge/utils.py b/openprocurement/bot/identification/databridge/utils.py index 3054fa3..0e8e502 100644 --- a/openprocurement/bot/identification/databridge/utils.py +++ b/openprocurement/bot/identification/databridge/utils.py @@ -7,7 +7,6 @@ from logging import getLogger from datetime import datetime from restkit import ResourceError -from collections import namedtuple from openprocurement.bot.identification.databridge.constants import file_name @@ -69,17 +68,40 @@ def update_items_and_tender(self, tender_id, item_id): def item_key(tender_id, item_id): return '{}_{}'.format(tender_id, item_id) -Data = namedtuple('Data', [ - 'tender_id', # tender ID - 'item_id', # qualification or award ID - 'code', # EDRPOU, IPN or passport - 'item_name', # "qualifications" or "awards" - 'file_content' # details for file -]) +class Data(object): + def __init__(self, tender_id, item_id, code, item_name, file_content): + self.tender_id = tender_id + self.item_id = item_id + self.code = code + self.item_name = item_name + self.file_content = file_content -def data_string(data): - return "tender {} {} id: {}".format(data.tender_id, data.item_name[:-1], data.item_id) + def __eq__(self, other): + return (self.tender_id == other.tender_id and + self.item_id == other.item_id and + self.code == other.code and + self.item_name == other.item_name and + self.file_content == other.file_content) + + def __str__(self): + return "tender {} {} id: {}".format(self.tender_id, self.item_name[:-1], self.item_id) + + def doc_id(self): + return self.file_content['meta']['id'] + + def item_name_id(self): + return self.item_name[:-1].upper() + "_ID" + + def param(self): + return 'id' if self.code.isdigit() and len(self.code) != id_passport_len else 'passport' + + def add_unique_req_id(self, response): + if response.headers.get('X-Request-ID'): + self.file_content['meta']['sourceRequests'].append(response.headers['X-Request-ID']) + + def log_params(self): + return {"TENDER_ID": self.tender_id, self.item_name_id(): self.item_id, "DOCUMENT_ID": self.doc_id()} def journal_context(record={}, params={}): @@ -96,10 +118,6 @@ def generate_doc_id(): return uuid4().hex -def validate_param(code): - return 'id' if code.isdigit() and len(code) != id_passport_len else 'passport' - - def create_file(details): """ Return temp file with details """ temporary_file = io.BytesIO() diff --git a/openprocurement/bot/identification/tests/bridge.py b/openprocurement/bot/identification/tests/bridge.py index a9544bc..d3ef98a 100644 --- a/openprocurement/bot/identification/tests/bridge.py +++ b/openprocurement/bot/identification/tests/bridge.py @@ -1,21 +1,18 @@ # -*- coding: utf-8 -*- -import unittest import os - -from openprocurement.bot.identification.tests.utils import custom_sleep -from requests import RequestException +import unittest from mock import patch, MagicMock -from restkit import RequestError, ResourceError - from gevent.pywsgi import WSGIServer +from requests import RequestException from bottle import Bottle, response, request +from restkit import RequestError -from openprocurement.bot.identification.databridge.bridge import EdrDataBridge from openprocurement_client.client import TendersClientSync, TendersClient +from openprocurement.bot.identification.databridge.bridge import EdrDataBridge from openprocurement.bot.identification.client import DocServiceClient, ProxyClient -from openprocurement.bot.identification.databridge.utils import check_412 +from openprocurement.bot.identification.tests.utils import custom_sleep, AlmostAlwaysTrue config = { 'main': @@ -41,18 +38,6 @@ } -class AlmostAlwaysTrue(object): - def __init__(self, total_iterations=1): - self.total_iterations = total_iterations - self.current_iteration = 0 - - def __nonzero__(self): - if self.current_iteration < self.total_iterations: - self.current_iteration += 1 - return bool(1) - return bool(0) - - class BaseServersTest(unittest.TestCase): """Api server to test openprocurement.integrations.edr.databridge.bridge """ @@ -157,39 +142,46 @@ def test_tender_sync_clients(self, sync_client, client, doc_service_client, prox def test_start_jobs(self): self.worker = EdrDataBridge(config) - scanner, filter_tender, edr_handler, upload_file = [MagicMock(return_value=i) for i in range(4)] + scanner, filter_tender, edr_handler, upload_file_to_doc_service, upload_file_to_tender = \ + [MagicMock(return_value=i) for i in range(5)] self.worker.scanner = scanner self.worker.filter_tender = filter_tender self.worker.edr_handler = edr_handler - self.worker.upload_file = upload_file + self.worker.upload_file_to_doc_service = upload_file_to_doc_service + self.worker.upload_file_to_tender = upload_file_to_tender self.worker._start_jobs() # check that all jobs were started self.assertTrue(scanner.called) self.assertTrue(filter_tender.called) self.assertTrue(edr_handler.called) - self.assertTrue(upload_file.called) + self.assertTrue(upload_file_to_doc_service.called) + self.assertTrue(upload_file_to_tender.called) self.assertEqual(self.worker.jobs['scanner'], 0) self.assertEqual(self.worker.jobs['filter_tender'], 1) self.assertEqual(self.worker.jobs['edr_handler'], 2) - self.assertEqual(self.worker.jobs['upload_file'], 3) + self.assertEqual(self.worker.jobs['upload_file_to_doc_service'], 3) + self.assertEqual(self.worker.jobs['upload_file_to_tender'], 4) @patch('gevent.sleep') def test_run(self, sleep): self.worker = EdrDataBridge(config) # create mocks - scanner, filter_tender, edr_handler, upload_file = [MagicMock() for i in range(4)] + scanner, filter_tender, edr_handler, upload_file_to_doc_service, upload_file_to_tender = \ + [MagicMock() for i in range(5)] self.worker.scanner = scanner self.worker.filter_tender = filter_tender self.worker.edr_handler = edr_handler - self.worker.upload_file = upload_file + self.worker.upload_file_to_doc_service = upload_file_to_doc_service + self.worker.upload_file_to_tender = upload_file_to_tender with patch('__builtin__.True', AlmostAlwaysTrue(100)): self.worker.run() self.assertEqual(self.worker.scanner.call_count, 1) self.assertEqual(self.worker.filter_tender.call_count, 1) self.assertEqual(self.worker.edr_handler.call_count, 1) - self.assertEqual(self.worker.upload_file.call_count, 1) + self.assertEqual(self.worker.upload_file_to_doc_service.call_count, 1) + self.assertEqual(self.worker.upload_file_to_tender.call_count, 1) def test_proxy_server(self): self.worker = EdrDataBridge(config) diff --git a/openprocurement/bot/identification/tests/edr_handler.py b/openprocurement/bot/identification/tests/edr_handler.py index 11c2eb5..026a1f0 100644 --- a/openprocurement/bot/identification/tests/edr_handler.py +++ b/openprocurement/bot/identification/tests/edr_handler.py @@ -715,9 +715,8 @@ def test_value_error(self, mrequest, gevent_sleep): self.assertIsNotNone(mrequest.request_history[0].headers['X-Client-Request-ID']) self.assertIsNotNone(mrequest.request_history[1].headers['X-Client-Request-ID']) - @requests_mock.Mocker() @patch('gevent.sleep') - def test_value_error_mock(self, mrequest, gevent_sleep): + def test_value_error_mock(self, gevent_sleep): """Accept 'Gateway Timeout Error' while requesting /verify, then accept 200 status code.""" gevent_sleep.side_effect = custom_sleep self.worker.retry_edr_ids_queue = MagicMock() diff --git a/openprocurement/bot/identification/tests/filter_tender.py b/openprocurement/bot/identification/tests/filter_tender.py index f5cc4f4..bcb77a3 100644 --- a/openprocurement/bot/identification/tests/filter_tender.py +++ b/openprocurement/bot/identification/tests/filter_tender.py @@ -2,14 +2,9 @@ import uuid import unittest import datetime + from gevent.hub import LoopExit from gevent.queue import Queue -from openprocurement.bot.identification.databridge.constants import author -from openprocurement.bot.identification.databridge.filter_tender import FilterTenders -from openprocurement.bot.identification.databridge.utils import Data, ProcessTracker, item_key -from openprocurement.bot.identification.tests.utils import custom_sleep, generate_request_id, ResponseMock -from openprocurement.bot.identification.databridge.bridge import TendersClientSync -from openprocurement.bot.identification.databridge.sleep_change_value import APIRateController from mock import patch, MagicMock from time import sleep from munch import munchify @@ -18,6 +13,13 @@ from bottle import Bottle, response from simplejson import dumps +from openprocurement.bot.identification.databridge.constants import author +from openprocurement.bot.identification.databridge.filter_tender import FilterTenders +from openprocurement.bot.identification.databridge.utils import Data, ProcessTracker, item_key +from openprocurement.bot.identification.tests.utils import custom_sleep, generate_request_id, ResponseMock +from openprocurement.bot.identification.databridge.bridge import TendersClientSync +from openprocurement.bot.identification.databridge.sleep_change_value import APIRateController + SERVER_RESPONSE_FLAG = 0 SPORE_COOKIES = ("a7afc9b1fc79e640f2487ba48243ca071c07a823d27" "8cf9b7adf0fae467a524747e3c6c6973262130fac2b" diff --git a/openprocurement/bot/identification/tests/scanner.py b/openprocurement/bot/identification/tests/scanner.py index a5d366d..c09e4be 100644 --- a/openprocurement/bot/identification/tests/scanner.py +++ b/openprocurement/bot/identification/tests/scanner.py @@ -1,6 +1,5 @@ # -*- coding: utf-8 -*- from gevent import monkey -from openprocurement.bot.identification.databridge.utils import ProcessTracker monkey.patch_all() @@ -15,7 +14,7 @@ from openprocurement.bot.identification.databridge.scanner import Scanner from openprocurement.bot.identification.tests.utils import custom_sleep - +from openprocurement.bot.identification.databridge.utils import ProcessTracker from openprocurement.bot.identification.databridge.sleep_change_value import APIRateController diff --git a/openprocurement/bot/identification/tests/test_end_to_end.py b/openprocurement/bot/identification/tests/test_end_to_end.py new file mode 100644 index 0000000..5382dbf --- /dev/null +++ b/openprocurement/bot/identification/tests/test_end_to_end.py @@ -0,0 +1,278 @@ +# coding=utf-8 + +from gevent import monkey, subprocess + +monkey.patch_all() + +import os +import unittest +import uuid + +from simplejson import dumps + +from time import sleep +from gevent.queue import Queue +from mock import patch, MagicMock +from gevent.pywsgi import WSGIServer +from munch import munchify +from redis import StrictRedis +from requests import RequestException +from bottle import Bottle, response, request +from restkit import RequestError + +from openprocurement.bot.identification.databridge.caching import Db +from openprocurement.bot.identification.databridge.constants import author, version, file_name +from openprocurement.bot.identification.databridge.filter_tender import FilterTenders +from openprocurement.bot.identification.databridge.sleep_change_value import APIRateController +from openprocurement.bot.identification.databridge.utils import ProcessTracker, Data +from openprocurement_client.client import TendersClientSync, TendersClient +from openprocurement.bot.identification.databridge.bridge import EdrDataBridge +from openprocurement.bot.identification.client import DocServiceClient, ProxyClient +from openprocurement.bot.identification.tests.utils import custom_sleep, ResponseMock, generate_request_id + +config = { + 'main': + { + 'tenders_api_server': 'http://127.0.0.1:20604', + 'tenders_api_version': '2.3', + 'public_tenders_api_server': 'http://127.0.0.1:20605', + 'doc_service_server': 'http://127.0.0.1', + 'doc_service_port': 20606, + 'doc_service_user': 'broker', + 'doc_service_password': 'broker_pass', + 'proxy_server': 'http://127.0.0.1', + 'proxy_port': 20607, + 'proxy_user': 'robot', + 'proxy_password': 'robot', + 'proxy_version': 1.0, + 'buffers_size': 450, + 'full_stack_sync_delay': 15, + 'empty_stack_sync_delay': 101, + 'on_error_sleep_delay': 5, + 'api_token': "api_token", + 'cache_db_name': 0, + 'cache_host': '127.0.0.1', + 'cache_port': '16379', + 'time_to_live': 1800, + 'delay': 1, + 'time_to_live_negative': 120 + } +} + +CODES = ('14360570', '0013823', '23494714') +qualification_ids = [uuid.uuid4().hex for i in range(5)] +award_ids = [uuid.uuid4().hex for i in range(5)] +request_ids = [generate_request_id() for i in range(2)] +bid_ids = [uuid.uuid4().hex for _ in range(5)] + + + +def setup_routing(app, func, path='/api/2.3/spore', method='GET'): + app.route(path, method, func) + + +def response_spore(): + response.set_cookie("SERVER_ID", ("a7afc9b1fc79e640f2487ba48243ca071c07a823d27" + "8cf9b7adf0fae467a524747e3c6c6973262130fac2b" + "96a11693fa8bd38623e4daee121f60b4301aef012c")) + return response + + +def doc_response(): + return response + + +def awards(counter_id, counter_bid_id, status, sup_id): + return {'id': award_ids[counter_id], 'bid_id': bid_ids[counter_bid_id], 'status': status, + 'suppliers': [{'identifier': {'scheme': 'UA-EDR', 'id': sup_id}}]} + + +def bids(counter_id, edr_id): + return {'id': bid_ids[counter_id], 'tenderers': [{'identifier': {'scheme': 'UA-EDR', 'id': edr_id}}]} + + +def qualifications(status, counter_qual_id, counter_bid_id): + return {'status': status, 'id': qualification_ids[counter_qual_id], 'bidID': bid_ids[counter_bid_id]} + + +def proxy_response(): + if request.headers.get("sandbox-mode") != "True": # Imitation of health comparison + response.status = 400 + return response + + +def get_tenders_response(): + return munchify({'prev_page': {'offset': '123'}, + 'next_page': {'offset': '1234'}, + 'data': [{'status': "active.pre-qualification", + "id": '123', + 'procurementMethodType': 'aboveThresholdEU'}]}) + + +def get_tender_response(): + response.status = 200 + response.content_type = 'application/json' + response.headers.update({'X-Request-ID': request_ids[0]}) + return dumps({'prev_page': {'offset': '123'}, + 'next_page': {'offset': '1234'}, + 'data': {'status': "active.pre-qualification", + 'id': '123', + 'procurementMethodType': 'aboveThresholdEU', + 'bids': [bids(0, CODES[0]), + bids(1, CODES[1]), + bids(2, CODES[2]), + bids(3, CODES[2]), + {'id': bid_ids[4], + 'tenderers': [{'identifier': { + 'scheme': 'UA-ED', + 'id': CODES[2]}}]}], + 'qualifications': [ + qualifications('pending', 2, 2), + ]}}) + + +def get_proxy_response(): + response.status = 200 + response.content_type = 'application/json' + response.headers.update({'X-Request-ID': request_ids[0]}) + return {'data': [{}], + "meta": {"sourceDate": "2017-04-25T11:56:36+00:00", "id": "{}".format(generate_request_id()), + "version": version, 'author': author, + 'detailsSourceDate': ["2017-04-25T11:56:36+00:00"]}} + + +def get_doc_service_response(): + response.status = 200 + return {'url': 'http://docs-sandbox.openprocurement.org/get/8ccbfde0c6804143b119d9168452cb6f', + 'format': 'application/yaml', + 'hash': 'md5:9a0364b9e99bb480dd25e1f0284c8555', + 'title': file_name} + + +class BaseServersTest(unittest.TestCase): + """Api server to test openprocurement.integrations.edr.databridge.bridge """ + + relative_to = os.path.dirname(__file__) # crafty line + + @classmethod + def setUpClass(cls): + cls.api_server_bottle = Bottle() + cls.proxy_server_bottle = Bottle() + cls.doc_server_bottle = Bottle() + cls.api_server = WSGIServer(('127.0.0.1', 20604), cls.api_server_bottle, log=None) + setup_routing(cls.api_server_bottle, response_spore) + cls.public_api_server = WSGIServer(('127.0.0.1', 20605), cls.api_server_bottle, log=None) + cls.doc_server = WSGIServer(('127.0.0.1', 20606), cls.doc_server_bottle, log=None) + setup_routing(cls.doc_server_bottle, doc_response, path='/') + cls.proxy_server = WSGIServer(('127.0.0.1', 20607), cls.proxy_server_bottle, log=None) + setup_routing(cls.proxy_server_bottle, proxy_response, path='/api/1.0/health') + cls.redis_process = subprocess.Popen(['redis-server', '--port', str(config['main']['cache_port'])]) + sleep(0.1) + cls.redis = StrictRedis(port=str(config['main']['cache_port'])) + + # start servers + cls.api_server.start() + cls.public_api_server.start() + cls.doc_server.start() + cls.proxy_server.start() + + @classmethod + def tearDownClass(cls): + cls.api_server.close() + cls.public_api_server.close() + cls.doc_server.close() + cls.proxy_server.close() + cls.redis_process.terminate() + cls.redis_process.wait() + + def tearDown(self): + # del self.worker + self.redis.flushall() + + + +class EndToEndTest(BaseServersTest): + def check_data_objects(self, obj, example): + self.assertEqual(obj.tender_id, example.tender_id) + self.assertEqual(obj.item_id, example.item_id) + self.assertEqual(obj.code, example.code) + self.assertEqual(obj.item_name, example.item_name) + self.assertIsNotNone(obj.file_content['meta']['id']) + if obj.file_content['meta'].get('author'): + self.assertEqual(obj.file_content['meta']['author'], author) + if obj.file_content['meta'].get('sourceRequests'): + self.assertEqual(obj.file_content['meta']['sourceRequests'], example.file_content['meta']['sourceRequests']) + + def setUp(self): + self.filtered_tender_ids_queue = Queue(10) + self.edrpou_codes_queue = Queue(10) + self.process_tracker = ProcessTracker() + self.tender_id = uuid.uuid4().hex + self.sleep_change_value = APIRateController() + + def test_init(self): + self.worker = EdrDataBridge(config) + self.assertEqual(self.worker.delay, 1) + self.assertEqual(self.worker.sleep_change_value.time_between_requests, 0) + self.assertTrue(isinstance(self.worker.tenders_sync_client, TendersClientSync)) + self.assertTrue(isinstance(self.worker.client, TendersClient)) + self.assertTrue(isinstance(self.worker.proxyClient, ProxyClient)) + self.assertTrue(isinstance(self.worker.doc_service_client, DocServiceClient)) + self.assertFalse(self.worker.initialization_event.is_set()) + self.assertEqual(self.worker.process_tracker.processing_items, {}) + self.assertEqual(self.worker.db._backend, "redis") + self.assertEqual(self.worker.db._db_name, 0) + self.assertEqual(self.worker.db._port, "16379") + self.assertEqual(self.worker.db._host, "127.0.0.1") + + @patch('gevent.sleep') + def test_scanner_and_filter(self, gevent_sleep): + gevent_sleep.side_effect = custom_sleep + self.worker = EdrDataBridge(config) + setup_routing(self.api_server_bottle, get_tender_response, path='/api/2.3/tenders/123') + setup_routing(self.api_server_bottle, get_tenders_response, path='/api/2.3/tenders') + self.worker.scanner() + self.worker.filter_tender() + sleep(3) + data = Data('123', qualification_ids[2], CODES[2], "qualifications", + {'meta': {'sourceRequests': [request_ids[0]]}}) + self.assertEqual(self.worker.edrpou_codes_queue.qsize(), 1) + self.check_data_objects(self.worker.edrpou_codes_queue.get(), data) + + @patch('gevent.sleep') + def test_scanner_to_edr_handler(self, gevent_sleep): + gevent_sleep.side_effect = custom_sleep + self.worker = EdrDataBridge(config) + setup_routing(self.api_server_bottle, get_tender_response, path='/api/2.3/tenders/123') + setup_routing(self.api_server_bottle, get_tenders_response, path='/api/2.3/tenders') + setup_routing(self.proxy_server_bottle, get_proxy_response, path='/api/1.0/verify') + self.worker.scanner() + self.worker.filter_tender() + self.worker.edr_handler() + sleep(3) + data = Data('123', qualification_ids[2], CODES[2], "qualifications", + {'meta': {'sourceRequests': [request_ids[0], request_ids[0]]}}) + self.check_data_objects(self.worker.upload_to_doc_service_queue.get(), data) + self.assertEqual(self.worker.edrpou_codes_queue.qsize(), 0) + + @patch('gevent.sleep') + def test_scanner_to_upload_to_doc_service(self, gevent_sleep): + gevent_sleep.side_effect = custom_sleep + self.worker = EdrDataBridge(config) + setup_routing(self.api_server_bottle, get_tender_response, path='/api/2.3/tenders/123') + setup_routing(self.api_server_bottle, get_tenders_response, path='/api/2.3/tenders') + setup_routing(self.proxy_server_bottle, get_proxy_response, path='/api/1.0/verify') + setup_routing(self.doc_server_bottle, get_doc_service_response, path='/upload', method='POST') + self.worker.scanner() + self.worker.filter_tender() + self.worker.edr_handler() + self.worker.upload_file_to_doc_service() + sleep(3) + data = Data('123', qualification_ids[2], CODES[2], "qualifications", + {'meta': {}, 'url': 'http://docs-sandbox.openprocurement.org/get/8ccbfde0c6804143b119d9168452cb6f', + 'format': 'application/yaml', + 'hash': 'md5:9a0364b9e99bb480dd25e1f0284c8555', + 'title': file_name}) + self.assertEqual(self.worker.edrpou_codes_queue.qsize(), 0) + self.assertEqual(self.worker.upload_to_doc_service_queue.qsize(), 0) + self.check_data_objects(self.worker.upload_to_tender_queue.get(), data) diff --git a/openprocurement/bot/identification/tests/test_upload_file_to_tender.py b/openprocurement/bot/identification/tests/test_upload_file_to_tender.py new file mode 100644 index 0000000..53b0072 --- /dev/null +++ b/openprocurement/bot/identification/tests/test_upload_file_to_tender.py @@ -0,0 +1,532 @@ +from gevent import monkey + +monkey.patch_all() + +import uuid +import unittest +import requests_mock + +from gevent.queue import Queue +from gevent.hub import LoopExit +from time import sleep +from mock import patch, MagicMock +from restkit.errors import Unauthorized +from restkit import ResourceError +from gevent.pywsgi import WSGIServer +from bottle import Bottle, response +from simplejson import dumps + +from openprocurement.bot.identification.databridge.upload_file_to_tender import UploadFileToTender +from openprocurement.bot.identification.databridge.utils import Data, generate_doc_id, item_key, ProcessTracker +from openprocurement.bot.identification.tests.utils import custom_sleep, generate_answers, AlmostAlwaysTrue +from openprocurement.bot.identification.databridge.constants import file_name +from openprocurement.bot.identification.databridge.bridge import TendersClientSync +from openprocurement.bot.identification.databridge.sleep_change_value import APIRateController + +SERVER_RESPONSE_FLAG = 0 +SERVER_RETRY_COUNTER = 2 +SPORE_COOKIES = ("a7afc9b1fc79e640f2487ba48243ca071c07a823d27" + "8cf9b7adf0fae467a524747e3c6c6973262130fac2b" + "96a11693fa8bd38623e4daee121f60b4301aef012c") +COOKIES_412 = ("b7afc9b1fc79e640f2487ba48243ca071c07a823d27" + "8cf9b7adf0fae467a524747e3c6c6973262130fac2b" + "96a11693fa8bd38623e4daee121f60b4301aef012c") + + +def setup_routing(app, func, path='/api/2.3/spore', method='GET'): + app.routes = [] + app.route(path, method, func) + + +def response_spore(): + response.set_cookie("SERVER_ID", SPORE_COOKIES) + return response + + +def response_412(): + response.status = 412 + response.set_cookie("SERVER_ID", COOKIES_412) + return response + + +def response_get_tender(): + response.status = 201 + response.headers['X-Request-ID'] = '125' + return dumps({'data': {'id': uuid.uuid4().hex, + 'documentOf': 'tender', + 'documentType': 'registerExtract', + 'url': 'url'}}) + + +def generate_response(): + global SERVER_RESPONSE_FLAG + if SERVER_RESPONSE_FLAG == 0: + SERVER_RESPONSE_FLAG = 1 + return response_412() + return response_get_tender() + + +def generate_response_retry(): + global SERVER_RETRY_COUNTER + if SERVER_RETRY_COUNTER > 0: + SERVER_RETRY_COUNTER -= 1 + return response_412() + return response_get_tender() + + +class TestUploadFileToTenderWorker(unittest.TestCase): + def setUp(self): + self.tender_id = uuid.uuid4().hex + self.award_id = uuid.uuid4().hex + self.qualification_id = uuid.uuid4().hex + self.document_id = generate_doc_id() + self.process_tracker = ProcessTracker(db=MagicMock()) + self.process_tracker.set_item(self.tender_id, self.award_id, 1) + self.upload_to_tender_queue = Queue(10) + self.url = 'http://127.0.0.1:20604' + self.sleep_change_value = APIRateController() + self.data = Data(self.tender_id, self.award_id, '123', 'awards', + {'meta': {'id': self.document_id}, 'test_data': 'test_data'}) + self.qualification_data = Data(self.tender_id, self.qualification_id, '123', 'qualifications', + {'meta': {'id': self.document_id}, 'test_data': 'test_data'}) + self.client = MagicMock() + self.worker = UploadFileToTender(self.client, self.upload_to_tender_queue, + self.process_tracker, MagicMock(), self.sleep_change_value) + self.worker.retry_upload_to_tender_queue = Queue(10) + + def tearDown(self): + del self.worker + del self.upload_to_tender_queue + + @staticmethod + def get_tender(): + return {'data': {'id': uuid.uuid4().hex, + 'documentOf': 'tender', + 'documentType': 'registerExtract', + 'url': 'url'}} + + def is_working(self, worker): + return self.upload_to_tender_queue.qsize() or worker.retry_upload_to_tender_queue.qsize() + + def shutdown_when_done(self, worker): + worker.start() + while self.is_working(worker): + sleep(0.1) + worker.shutdown() + + @patch('gevent.sleep') + def test_upload_to_tender_429(self, gevent_sleep): + gevent_sleep.side_effect = custom_sleep + self.client._create_tender_resource_item = MagicMock(side_effect=[ + ResourceError(http_code=429), ResourceError(http_code=429), ResourceError(http_code=403)]) + self.upload_to_tender_queue.put(self.data) + self.shutdown_when_done(self.worker) + self.assertEqual(self.upload_to_tender_queue.qsize(), 0, 'Queue should be empty') + self.assertEqual(self.worker.sleep_change_value.time_between_requests, 1) + + @patch('gevent.sleep') + def test_upload_to_tender_exception(self, gevent_sleep): + gevent_sleep.side_effect = custom_sleep + self.upload_to_tender_queue.put(self.data) + self.client._create_tender_resource_item = MagicMock(side_effect=[Exception()]) + self.worker.do_upload_to_tender_with_retry = MagicMock(side_effect=ResourceError(http_code=403)) + self.shutdown_when_done(self.worker) + self.assertEqual(self.upload_to_tender_queue.qsize(), 0, 'Queue should be empty') + self.assertEqual(self.worker.sleep_change_value.time_between_requests, 0) + + @patch('gevent.sleep') + def test_upload_to_tender_exception_status_int_none(self, gevent_sleep): + gevent_sleep.side_effect = custom_sleep + self.upload_to_tender_queue.put(self.data) + client = MagicMock() + client._create_tender_resource_item = MagicMock(side_effect=[Unauthorized()]) + self.shutdown_when_done(self.worker) + self.assertEqual(self.upload_to_tender_queue.qsize(), 0, 'Queue should be empty') + self.assertEqual(self.worker.sleep_change_value.time_between_requests, 0) + + @patch('gevent.sleep') + def test_retry_upload_to_tender(self, gevent_sleep): + gevent_sleep.side_effect = custom_sleep + self.client._create_tender_resource_item.side_effect = [Unauthorized(http_code=401), + Unauthorized(http_code=403), + Unauthorized(http_code=429), self.get_tender()] + self.upload_to_tender_queue.put(self.data) + self.assertItemsEqual(self.process_tracker.processing_items.keys(), [item_key(self.tender_id, self.award_id)]) + self.assertEqual(self.upload_to_tender_queue.qsize(), 1) + self.shutdown_when_done(self.worker) + self.assertEqual(self.upload_to_tender_queue.qsize(), 0, 'Queue should be empty') + self.assertEqual(self.process_tracker.processing_items, {}) # test that item removed from processing_items + self.assertEqual(self.client._create_tender_resource_item.call_count, 4) # check upload to tender + + @patch('gevent.sleep') + def test_retry_upload_to_tender_422(self, gevent_sleep): + gevent_sleep.side_effect = custom_sleep + self.client_upload_to_tender = MagicMock(side_effect=ResourceError(http_code=422)) + self.worker.retry_upload_to_tender_queue = Queue(10) + self.worker.retry_upload_to_tender_queue.put(self.data) + self.shutdown_when_done(self.worker) + self.assertEqual(self.upload_to_tender_queue.qsize(), 0, 'Queue should be empty') + + @patch('gevent.sleep') + def test_retry_upload_to_tender_429(self, gevent_sleep): + gevent_sleep.side_effect = custom_sleep + self.client.client_upload_to_tender = MagicMock( + side_effect=[ResourceError(http_code=429), ResourceError(http_code=403)]) + self.worker.retry_upload_to_tender_queue = Queue(10) + self.worker.retry_upload_to_tender_queue.put(self.data) + self.shutdown_when_done(self.worker) + self.assertEqual(self.upload_to_tender_queue.qsize(), 0, 'Queue should be empty') + + @patch('gevent.sleep') + def test_retry_upload_to_tender_exception(self, gevent_sleep): + gevent_sleep.side_effect = custom_sleep + self.worker.do_upload_to_tender_with_retry = MagicMock(side_effect=[Exception(), ResourceError(http_code=403)]) + self.worker.retry_upload_to_tender_queue.put(self.data) + self.shutdown_when_done(self.worker) + self.assertEqual(self.upload_to_tender_queue.qsize(), 0, 'Queue should be empty') + + @patch('gevent.sleep') + def test_upload_to_tender_queue_loop_exit(self, gevent_sleep): + """ Test LoopExit for upload_to_tender_queue """ + gevent_sleep.side_effect = custom_sleep + self.client._create_tender_resource_item.side_effect = [self.get_tender() for _ in range(2)] + self.process_tracker.set_item(self.tender_id, self.award_id, 2) + self.worker.upload_to_tender_queue = MagicMock() + self.worker.upload_to_tender_queue.peek.side_effect = generate_answers( + answers=[LoopExit(), self.datum(), self.datum()], default=LoopExit()) + self.worker.start() + sleep(1) + self.worker.shutdown() + self.assertEqual(self.process_tracker.processing_items, {}) + self.assertIsNotNone(self.client.request_history[0].headers['X-Client-Request-ID']) + self.assertIsNotNone(self.client.request_history[1].headers['X-Client-Request-ID']) + self.assertEqual(self.client._create_tender_resource_item.call_count, 2) # check that processed just 1 request + + def datum(self): + return Data(self.tender_id, self.award_id, '123', 'awards', + {u'meta': {u'id': self.document_id}, + u'url': u'http://docs-sandbox.openprocurement.org/get/8ccbfde0c6804143b119d9168452cb6f', + u'format': u'application/yaml', + u'hash': u'md5:9a0364b9e99bb480dd25e1f0284c8555', + u'title': file_name}) + + @patch('gevent.sleep') + def test_retry_upload_to_tender_queue_loop_exit(self, gevent_sleep): + """ Test LoopExit for retry_upload_to_tender_queue """ + gevent_sleep.side_effect = custom_sleep + self.client._create_tender_resource_item.side_effect = [self.get_tender() for _ in range(2)] + self.worker.retry_upload_to_tender_queue = MagicMock() + self.worker.retry_upload_to_tender_queue.peek.side_effect = generate_answers( + answers=[LoopExit(), self.datum(), self.datum()], default=LoopExit()) + self.process_tracker.set_item(self.tender_id, self.award_id, 2) + self.worker.start() + sleep(1) + self.worker.shutdown() + self.assertEqual(self.process_tracker.processing_items, {}) + self.assertIsNotNone(self.client.request_history[0].headers['X-Client-Request-ID']) + self.assertIsNotNone(self.client.request_history[1].headers['X-Client-Request-ID']) + self.assertEqual(self.client._create_tender_resource_item.call_count, 2) # check that processed just 1 request + + @patch('gevent.sleep') + def test_request_failed_in_retry_item_status(self, gevent_sleep): + gevent_sleep.side_effect = custom_sleep + self.client._create_tender_resource_item.side_effect = [ResourceError(http_code=429)] + [ + ResourceError(http_code=403) for _ in range(4)] + self.worker.retry_upload_to_tender_queue.put(self.data) + self.shutdown_when_done(self.worker) + self.assertEqual(self.upload_to_tender_queue.qsize(), 0, 'Queue should be empty') + + @patch('gevent.sleep') + def test_request_failed_in_retry(self, gevent_sleep): + gevent_sleep.side_effect = custom_sleep + self.worker.do_upload_to_tender_with_retry = MagicMock() + self.worker.do_upload_to_tender_with_retry.side_effect = [ResourceError(http_code=429) for _ in range(5)] + [ + ResourceError(http_code=403)] + self.sleep_change_value.increment_step = 3 + self.sleep_change_value.decrement_step = 1.5 + self.worker.retry_upload_to_tender_queue.put(self.data) + self.shutdown_when_done(self.worker) + self.assertEqual(self.upload_to_tender_queue.qsize(), 0, 'Queue should be empty') + self.assertEqual(self.worker.sleep_change_value.time_between_requests, 13.5) + + @patch('gevent.sleep') + def test_process_412(self, gevent_sleep): + gevent_sleep.side_effect = custom_sleep + api_server_bottle = Bottle() + api_server = WSGIServer(('127.0.0.1', 20604), api_server_bottle, log=None) + setup_routing(api_server_bottle, response_spore) + setup_routing(api_server_bottle, response_412, path='/api/2.3/tenders/{}/awards/{}/documents'.format( + self.tender_id, self.award_id), method='POST') + api_server.start() + self.worker.client = TendersClientSync('', host_url='http://127.0.0.1:20604', api_version='2.3') + setup_routing(api_server_bottle, generate_response, path='/api/2.3/tenders/{}/awards/{}/documents'.format( + self.tender_id, self.award_id), method='POST') + self.assertEqual(self.worker.client.headers['Cookie'], 'SERVER_ID={}'.format(SPORE_COOKIES)) + self.upload_to_tender_queue.put(self.data) + self.assertItemsEqual(self.process_tracker.processing_items.keys(), [item_key(self.tender_id, self.award_id)]) + self.assertEqual(self.upload_to_tender_queue.qsize(), 1) + self.shutdown_when_done(self.worker) + self.assertEqual(self.worker.client.headers['Cookie'], 'SERVER_ID={}'.format(COOKIES_412)) + self.assertEqual(self.upload_to_tender_queue.qsize(), 0, 'Queue should be empty') + self.assertEqual(self.worker.retry_upload_to_tender_queue.qsize(), 0, 'Queue should be empty') + self.assertItemsEqual(self.process_tracker.processing_items.keys(), []) + api_server.stop() + + @patch('gevent.sleep') + def test_upload_to_tender_worker(self, gevent_sleep): + gevent_sleep.side_effect = custom_sleep + self.worker.services_not_available = MagicMock(wait=MagicMock()) + self.worker.try_peek_data_and_upload_to_tender = MagicMock() + with patch.object(self.worker, 'exit', AlmostAlwaysTrue()): + self.worker.upload_to_tender() + + self.worker.services_not_available.wait.assert_called_once() + self.worker.try_peek_data_and_upload_to_tender.assert_called_once_with(False) + + + @patch('gevent.sleep') + def test_upload_to_tender_worker(self, gevent_sleep): + gevent_sleep.side_effect = custom_sleep + self.worker.services_not_available = MagicMock(wait=MagicMock()) + self.worker.try_peek_data_and_upload_to_tender = MagicMock() + with patch.object(self.worker, 'exit', AlmostAlwaysTrue()): + self.worker.retry_upload_to_tender() + + self.worker.services_not_available.wait.assert_called_once() + self.worker.try_peek_data_and_upload_to_tender.assert_called_once_with(True) + + + def test_peek_from_tender_queue(self): + self.worker.upload_to_tender_queue.put(self.data) + self.assertEqual(self.worker.peek_from_tender_queue(False), self.data) + + def test_peek_from_tender_queue_retry(self): + self.worker.retry_upload_to_tender_queue.put(self.data) + self.assertEqual(self.worker.peek_from_tender_queue(True), self.data) + + def test_peek_from_tender_queue_empty(self): + self.worker.upload_to_tender_queue = MagicMock(peek=MagicMock(side_effect=LoopExit)) + with self.assertRaises(LoopExit): + self.worker.peek_from_tender_queue(False) + + def test_peek_from_tender_queue_retry_empty(self): + self.worker.retry_upload_to_tender_queue = MagicMock(peek=MagicMock(side_effect=LoopExit)) + with self.assertRaises(LoopExit): + self.worker.peek_from_tender_queue(True) + + def test_try_peek_data_and_upload_to_tender(self): + self.worker.upload_to_tender_queue.put(self.data) + self.worker.try_upload_to_tender = MagicMock() + self.worker.try_peek_data_and_upload_to_tender(False) + self.worker.try_upload_to_tender.assert_called_once_with(self.data, False) + + def test_try_peek_data_and_upload_to_tender_retry(self): + self.worker.retry_upload_to_tender_queue.put(self.data) + self.worker.try_upload_to_tender = MagicMock() + self.worker.try_peek_data_and_upload_to_tender(True) + self.worker.try_upload_to_tender.assert_called_once_with(self.data, True) + + def test_try_upload_to_tender(self): + self.worker.update_headers_and_upload_to_tender = MagicMock() + self.worker.successfully_uploaded_to_tender = MagicMock() + self.worker.try_upload_to_tender(self.data, False) + self.worker.update_headers_and_upload_to_tender.assert_called_once_with(self.data, False) + self.worker.successfully_uploaded_to_tender.assert_called_once_with(self.data, False) + + def test_try_upload_to_tender_retry(self): + self.worker.update_headers_and_upload_to_tender = MagicMock() + self.worker.successfully_uploaded_to_tender = MagicMock() + self.worker.try_upload_to_tender(self.data, True) + self.worker.update_headers_and_upload_to_tender.assert_called_once_with(self.data, True) + self.worker.successfully_uploaded_to_tender.assert_called_once_with(self.data, True) + + def test_try_upload_to_tender_no_mock(self): + self.upload_to_tender_queue.put(self.data) + self.worker.try_upload_to_tender(self.data, False) + self.assertEqual(self.upload_to_tender_queue.qsize(), 0) + self.assertEqual(self.process_tracker.processing_items, {}) + + def test_try_upload_to_tender_no_mock_retry(self): + self.worker.retry_upload_to_tender_queue.put(self.data) + self.worker.try_upload_to_tender(self.data, True) + self.assertEqual(self.upload_to_tender_queue.qsize(), 0) + self.assertEqual(self.process_tracker.processing_items, {}) + + def test_try_upload_to_tender_resource_error(self): + re = ResourceError("test resource error") + self.worker.update_headers_and_upload_to_tender = MagicMock(side_effect=re) + self.worker.remove_data_or_increase_wait = MagicMock() + self.worker.try_upload_to_tender(self.data, False) + self.worker.remove_data_or_increase_wait.assert_called_once_with(re, self.data, False) + + def test_try_upload_to_tender_exception(self): + e = Exception("exception") + self.worker.update_headers_and_upload_to_tender = MagicMock(side_effect=e) + self.worker.handle_error = MagicMock() + self.worker.try_upload_to_tender(self.data, False) + self.worker.handle_error.assert_called_once_with(e, self.data, False) + + def test_update_headers_and_upload_to_tender(self): + self.worker.do_upload_to_tender = MagicMock() + self.worker.update_headers_and_upload_to_tender(self.data, False) + self.worker.do_upload_to_tender.assert_called_once_with(self.data) + + def test_update_headers_and_upload_to_tender_retry(self): + self.worker.do_upload_to_tender_with_retry = MagicMock() + self.worker.update_headers_and_upload_to_tender(self.data, True) + self.worker.do_upload_to_tender_with_retry.assert_called_once_with(self.data) + + def test_do_upload_to_tender(self): + api_server_bottle = Bottle() + api_server = WSGIServer(('127.0.0.1', 20604), api_server_bottle, log=None) + setup_routing(api_server_bottle, response_spore) + api_server.start() + self.worker.client = TendersClientSync('', host_url='http://127.0.0.1:20604', api_version='2.3') + setup_routing(api_server_bottle, response_get_tender, path='/api/2.3/tenders/{}/awards/{}/documents'.format( + self.tender_id, self.award_id), method='POST') + self.worker.do_upload_to_tender(self.data) + api_server.stop() + + def test_do_upload_to_tender_failure(self): + api_server_bottle = Bottle() + api_server = WSGIServer(('127.0.0.1', 20604), api_server_bottle, log=None) + setup_routing(api_server_bottle, response_spore) + api_server.start() + self.worker.client = TendersClientSync('', host_url='http://127.0.0.1:20604', api_version='2.3') + setup_routing(api_server_bottle, response_412, path='/api/2.3/tenders/{}/awards/{}/documents'.format( + self.tender_id, self.award_id), method='POST') + with self.assertRaises(ResourceError): + self.worker.do_upload_to_tender(self.data) + api_server.stop() + + def test_do_upload_to_tender_with_retry(self): + api_server_bottle = Bottle() + api_server = WSGIServer(('127.0.0.1', 20604), api_server_bottle, log=None) + setup_routing(api_server_bottle, response_spore) + api_server.start() + self.worker.client = TendersClientSync('', host_url='http://127.0.0.1:20604', api_version='2.3') + setup_routing(api_server_bottle, response_get_tender, path='/api/2.3/tenders/{}/awards/{}/documents'.format( + self.tender_id, self.award_id), method='POST') + self.worker.do_upload_to_tender_with_retry(self.data) + api_server.stop() + + def test_do_upload_to_tender_with_retry_fail_then_success(self): + api_server_bottle = Bottle() + api_server = WSGIServer(('127.0.0.1', 20604), api_server_bottle, log=None) + setup_routing(api_server_bottle, response_spore) + api_server.start() + self.worker.client = TendersClientSync('', host_url='http://127.0.0.1:20604', api_version='2.3') + setup_routing(api_server_bottle, generate_response_retry, path='/api/2.3/tenders/{}/awards/{}/documents'.format( + self.tender_id, self.award_id), method='POST') + self.worker.do_upload_to_tender_with_retry(self.data) + api_server.stop() + + def test_do_upload_to_tender_with_retry_fail(self): + api_server_bottle = Bottle() + api_server = WSGIServer(('127.0.0.1', 20604), api_server_bottle, log=None) + setup_routing(api_server_bottle, response_spore) + api_server.start() + self.worker.client = TendersClientSync('', host_url='http://127.0.0.1:20604', api_version='2.3') + setup_routing(api_server_bottle, response_412, path='/api/2.3/tenders/{}/awards/{}/documents'.format( + self.tender_id, self.award_id), method='POST') + with self.assertRaises(ResourceError): + self.worker.do_upload_to_tender_with_retry(self.data) + api_server.stop() + + def test_remove_data_or_increase_wait(self): + re = ResourceError("error") + self.worker.removing_data = MagicMock() + self.worker.remove_data_or_increase_wait(re, self.data, False) + self.worker.removing_data.assert_called_once_with(re, self.data, False) + + def test_remove_data_or_increase_wait_429(self): + re = ResourceError("error", http_code=429) + self.worker.decrease_request_frequency = MagicMock() + self.worker.remove_data_or_increase_wait(re, self.data, False) + self.worker.decrease_request_frequency.assert_called_once_with(re, self.data) + + def test_remove_data_or_increase_wait_else(self): + re = ResourceError("error", http_code=404) + self.worker.handle_error = MagicMock() + self.worker.remove_data_or_increase_wait(re, self.data, False) + self.worker.handle_error.assert_called_once_with(re, self.data, False) + + def test_removing_data(self): + re = ResourceError("error") + self.worker.sleep_change_value.time_between_requests = 1 + self.worker.upload_to_tender_queue.put(self.data) + self.worker.removing_data(re, self.data, False) + self.assertEqual(self.worker.process_tracker.processing_items, {}) + self.assertEqual(self.worker.upload_to_tender_queue.qsize(), 0) + self.assertEqual(self.worker.sleep_change_value.time_between_requests, 0) + + def test_removing_data_retry(self): + re = ResourceError("error") + self.worker.sleep_change_value.time_between_requests = 1 + self.worker.retry_upload_to_tender_queue.put(self.data) + self.worker.removing_data(re, self.data, True) + self.assertEqual(self.worker.process_tracker.processing_items, {}) + self.assertEqual(self.worker.upload_to_tender_queue.qsize(), 0) + self.assertEqual(self.worker.retry_upload_to_tender_queue.qsize(), 0) + self.assertEqual(self.worker.sleep_change_value.time_between_requests, 0) + + def test_decrease_request_frequency(self): + re = ResourceError("error", 429) + self.worker.decrease_request_frequency(re, self.data) + self.assertEqual(self.worker.sleep_change_value.time_between_requests, 1) + + def test_handle_error(self): + re = ResourceError("error", 404) + self.worker.upload_to_tender_queue.put(self.data) + self.worker.handle_error(re, self.data, False) + self.assertEqual(self.worker.upload_to_tender_queue.qsize(), 0) + self.assertEqual(self.worker.retry_upload_to_tender_queue.get(), self.data) + self.assertEqual(self.worker.retry_upload_to_tender_queue.qsize(), 0) + + def test_handle_error_retry(self): + re = ResourceError("error", 404) + self.worker.upload_to_tender_queue.put(self.data) + self.worker.handle_error(re, self.data, True) + self.assertEqual(self.worker.upload_to_tender_queue.qsize(), 1) + self.assertEqual(self.worker.retry_upload_to_tender_queue.qsize(), 0) + + def test_successfully_uploaded_to_tender(self): + self.worker.upload_to_tender_queue.put(self.data) + self.assertEqual(self.worker.process_tracker.processing_items, {item_key(self.tender_id, self.award_id): 1}) + self.worker.successfully_uploaded_to_tender(self.data, False) + self.assertEqual(self.worker.upload_to_tender_queue.qsize(), 0) + self.assertEqual(self.worker.process_tracker.processing_items, {}) + + def test_successfully_uploaded_to_tender_retry(self): + self.worker.retry_upload_to_tender_queue.put(self.data) + self.assertEqual(self.worker.process_tracker.processing_items, {item_key(self.tender_id, self.award_id): 1}) + self.worker.successfully_uploaded_to_tender(self.data, True) + self.assertEqual(self.worker.retry_upload_to_tender_queue.qsize(), 0) + self.assertEqual(self.worker.process_tracker.processing_items, {}) + + def test_run(self): + self.worker.delay = 1 + upload_to_tender, retry_upload_to_tender = [MagicMock() for _ in range(2)] + self.worker.upload_to_tender = upload_to_tender + self.worker.retry_upload_to_tender = retry_upload_to_tender + with patch.object(self.worker, 'exit', AlmostAlwaysTrue(1)): + self.worker._run() + self.assertEqual(self.worker.upload_to_tender.call_count, 1) + self.assertEqual(self.worker.retry_upload_to_tender.call_count, 1) + + @patch('gevent.killall') + @patch('gevent.sleep') + def test_run_exception(self, gevent_sleep, killlall): + gevent_sleep.side_effect = custom_sleep + self.worker._start_jobs = MagicMock(return_value={"a": 1}) + self.worker.check_and_revive_jobs = MagicMock(side_effect=Exception("test error")) + self.worker._run() + killlall.assert_called_once_with([1], timeout=5) + + @patch('gevent.killall') + def test_run_exception(self, killlall): + self.worker.delay = 1 + self.worker._start_jobs = MagicMock(return_value={"a": 1}) + self.worker.check_and_revive_jobs = MagicMock(side_effect=Exception("test error")) + self.worker._run() + killlall.assert_called_once_with([1], timeout=5) diff --git a/openprocurement/bot/identification/tests/test_utils.py b/openprocurement/bot/identification/tests/test_utils.py index 37d50ee..26e492e 100644 --- a/openprocurement/bot/identification/tests/test_utils.py +++ b/openprocurement/bot/identification/tests/test_utils.py @@ -5,11 +5,12 @@ from unittest import TestCase from mock import MagicMock -from openprocurement.bot.identification.databridge.caching import Db, db_key -from openprocurement.bot.identification.databridge.utils import ProcessTracker, item_key, check_412 from redis import StrictRedis from restkit import ResourceError +from openprocurement.bot.identification.databridge.caching import Db, db_key +from openprocurement.bot.identification.databridge.utils import ProcessTracker, item_key, check_412 + config = { "main": { "cache_host": "127.0.0.1", diff --git a/openprocurement/bot/identification/tests/upload_file.py b/openprocurement/bot/identification/tests/upload_file.py index 2e231da..11d2955 100644 --- a/openprocurement/bot/identification/tests/upload_file.py +++ b/openprocurement/bot/identification/tests/upload_file.py @@ -10,61 +10,14 @@ from gevent.hub import LoopExit from time import sleep from mock import patch, MagicMock -from restkit.errors import Unauthorized -from restkit import ResourceError -from gevent.pywsgi import WSGIServer -from bottle import Bottle, response -from simplejson import dumps from openprocurement.bot.identification.client import DocServiceClient -from openprocurement.bot.identification.databridge.upload_file import UploadFile +from openprocurement.bot.identification.databridge.upload_file_to_doc_service import UploadFileToDocService from openprocurement.bot.identification.databridge.utils import Data, generate_doc_id, item_key, ProcessTracker -from openprocurement.bot.identification.tests.utils import custom_sleep, generate_answers +from openprocurement.bot.identification.tests.utils import custom_sleep, generate_answers, AlmostAlwaysTrue from openprocurement.bot.identification.databridge.constants import file_name -from openprocurement.bot.identification.databridge.bridge import TendersClientSync from openprocurement.bot.identification.databridge.sleep_change_value import APIRateController -SERVER_RESPONSE_FLAG = 0 -SPORE_COOKIES = ("a7afc9b1fc79e640f2487ba48243ca071c07a823d27" - "8cf9b7adf0fae467a524747e3c6c6973262130fac2b" - "96a11693fa8bd38623e4daee121f60b4301aef012c") -COOKIES_412 = ("b7afc9b1fc79e640f2487ba48243ca071c07a823d27" - "8cf9b7adf0fae467a524747e3c6c6973262130fac2b" - "96a11693fa8bd38623e4daee121f60b4301aef012c") - - -def setup_routing(app, func, path='/api/2.3/spore', method='GET'): - app.routes = [] - app.route(path, method, func) - - -def response_spore(): - response.set_cookie("SERVER_ID", SPORE_COOKIES) - return response - - -def response_412(): - response.status = 412 - response.set_cookie("SERVER_ID", COOKIES_412) - return response - - -def response_get_tender(): - response.status = 201 - response.headers['X-Request-ID'] = '125' - return dumps({'data': {'id': uuid.uuid4().hex, - 'documentOf': 'tender', - 'documentType': 'registerExtract', - 'url': 'url'}}) - - -def generate_response(): - global SERVER_RESPONSE_FLAG - if SERVER_RESPONSE_FLAG == 0: - SERVER_RESPONSE_FLAG = 1 - return response_412() - return response_get_tender() - class TestUploadFileWorker(unittest.TestCase): def setUp(self): @@ -81,11 +34,9 @@ def setUp(self): {'meta': {'id': self.document_id}, 'test_data': 'test_data'}) self.qualification_data = Data(self.tender_id, self.qualification_id, '123', 'qualifications', {'meta': {'id': self.document_id}, 'test_data': 'test_data'}) - self.client = MagicMock() self.doc_service_client = DocServiceClient(host='127.0.0.1', port='80', user='', password='') - self.worker = UploadFile(self.client, self.upload_to_doc_service_queue, self.upload_to_tender_queue, - self.process_tracker, self.doc_service_client, MagicMock(), self.sleep_change_value) - + self.worker = UploadFileToDocService(self.upload_to_doc_service_queue, self.upload_to_tender_queue, + self.process_tracker, self.doc_service_client, MagicMock(), self.sleep_change_value) self.url = '{url}'.format(url=self.doc_service_client.url) @staticmethod @@ -105,10 +56,13 @@ def get_tender(): def tearDown(self): del self.worker - def is_working(self, worker): + def old_is_working(self, worker): return (self.upload_to_doc_service_queue.qsize() or self.upload_to_tender_queue.qsize() or worker.retry_upload_to_doc_service_queue.qsize() or worker.retry_upload_to_tender_queue.qsize()) + def is_working(self, worker): + return self.upload_to_doc_service_queue.qsize() or worker.retry_upload_to_doc_service_queue.qsize() + def shutdown_when_done(self, worker): worker.start() while self.is_working(worker): @@ -116,10 +70,9 @@ def shutdown_when_done(self, worker): worker.shutdown() def test_init(self): - worker = UploadFile.spawn(None, None, None, None, None, None, None) + worker = UploadFileToDocService.spawn(None, None, None, None, None, None) self.assertGreater(datetime.datetime.now().isoformat(), worker.start_time.isoformat()) - self.assertEqual(worker.client, None) self.assertEqual(worker.upload_to_doc_service_queue, None) self.assertEqual(worker.upload_to_tender_queue, None) self.assertEqual(worker.process_tracker, None) @@ -136,18 +89,16 @@ def test_init(self): def test_successful_upload(self, mrequest, gevent_sleep): gevent_sleep.side_effect = custom_sleep mrequest.post(self.url, json=self.stat_200(), status_code=200) - self.client._create_tender_resource_item.side_effect = [self.get_tender()] self.upload_to_doc_service_queue.put(self.data) self.assertItemsEqual(self.process_tracker.processing_items.keys(), [item_key(self.tender_id, self.award_id)]) self.assertEqual(self.upload_to_doc_service_queue.qsize(), 1) self.shutdown_when_done(self.worker) self.assertEqual(self.upload_to_doc_service_queue.qsize(), 0, 'Queue should be empty') - self.assertEqual(self.upload_to_tender_queue.qsize(), 0, 'Queue should be empty') + self.assertEqual(self.upload_to_tender_queue.qsize(), 1, 'Queue should be have 1 element') self.assertEqual(mrequest.call_count, 1) self.assertEqual(mrequest.request_history[0].url, u'127.0.0.1:80/upload') self.assertIsNotNone(mrequest.request_history[0].headers['X-Client-Request-ID']) - self.assertItemsEqual(self.process_tracker.processing_items.keys(), []) - self.assertEqual(self.client._create_tender_resource_item.call_count, 1) # check upload to tender + self.assertItemsEqual(self.process_tracker.processing_items.keys(), [item_key(self.tender_id, self.award_id)]) @requests_mock.Mocker() @patch('gevent.sleep') @@ -160,180 +111,62 @@ def test_retry_doc_service(self, mrequest, gevent_sleep): 'hash': 'md5:9a0364b9e99bb480dd25e1f0284c8555', 'title': file_name}}, 'status_code': 200}]) - self.client._create_tender_resource_item.side_effect = [self.get_tender()] self.upload_to_doc_service_queue.put(self.data) self.assertItemsEqual(self.process_tracker.processing_items.keys(), [item_key(self.tender_id, self.award_id)]) self.assertEqual(self.upload_to_doc_service_queue.qsize(), 1) self.shutdown_when_done(self.worker) self.assertEqual(self.upload_to_doc_service_queue.qsize(), 0, 'Queue should be empty') - self.assertEqual(self.upload_to_tender_queue.qsize(), 0, 'Queue should be empty') + self.assertEqual(self.upload_to_tender_queue.qsize(), 1, 'Queue should be have 1 element') self.assertEqual(mrequest.call_count, 7) self.assertEqual(mrequest.request_history[0].url, u'127.0.0.1:80/upload') self.assertIsNotNone(mrequest.request_history[0].headers['X-Client-Request-ID']) - self.assertItemsEqual(self.process_tracker.processing_items.keys(), - []) # test that item removed from processing_items - self.assertEqual(self.client._create_tender_resource_item.call_count, 1) # check upload to tender - - @patch('gevent.sleep') - def test_upload_to_tender_429(self, gevent_sleep): - gevent_sleep.side_effect = custom_sleep - self.client._create_tender_resource_item = MagicMock(side_effect=[ - ResourceError(http_code=429), ResourceError(http_code=429), ResourceError(http_code=403)]) - self.upload_to_tender_queue.put(self.data) - self.shutdown_when_done(self.worker) - self.assertEqual(self.upload_to_tender_queue.qsize(), 0, 'Queue should be empty') - self.assertEqual(self.worker.sleep_change_value.time_between_requests, 1) - - @patch('gevent.sleep') - def test_upload_to_tender_exception(self, gevent_sleep): - gevent_sleep.side_effect = custom_sleep - self.upload_to_tender_queue.put(self.data) - self.client._create_tender_resource_item = MagicMock(side_effect=[Exception()]) - self.worker.client_upload_to_tender = MagicMock(side_effect=ResourceError(http_code=403)) - self.shutdown_when_done(self.worker) - self.assertEqual(self.upload_to_tender_queue.qsize(), 0, 'Queue should be empty') - self.assertEqual(self.worker.sleep_change_value.time_between_requests, 0) - - @patch('gevent.sleep') - def test_upload_to_tender_exception_status_int_none(self, gevent_sleep): - gevent_sleep.side_effect = custom_sleep - self.upload_to_tender_queue.put(self.data) - client = MagicMock() - client._create_tender_resource_item = MagicMock(side_effect=[Unauthorized()]) - self.shutdown_when_done(self.worker) - self.assertEqual(self.upload_to_tender_queue.qsize(), 0, 'Queue should be empty') - self.assertEqual(self.worker.sleep_change_value.time_between_requests, 0) - - @requests_mock.Mocker() - @patch('gevent.sleep') - def test_retry_upload_to_tender(self, mrequest, gevent_sleep): - gevent_sleep.side_effect = custom_sleep - mrequest.post(self.url, json=self.stat_200(), status_code=200) - self.client._create_tender_resource_item.side_effect = [Unauthorized(http_code=401), - Unauthorized(http_code=403), - Unauthorized(http_code=429), self.get_tender()] - self.upload_to_doc_service_queue.put(self.data) - self.assertItemsEqual(self.process_tracker.processing_items.keys(), [item_key(self.tender_id, self.award_id)]) - self.assertEqual(self.upload_to_doc_service_queue.qsize(), 1) - self.shutdown_when_done(self.worker) - self.assertEqual(self.upload_to_doc_service_queue.qsize(), 0, 'Queue should be empty') - self.assertEqual(self.upload_to_tender_queue.qsize(), 0, 'Queue should be empty') - self.assertEqual(mrequest.call_count, 1) - self.assertEqual(mrequest.request_history[0].url, u'127.0.0.1:80/upload') - self.assertIsNotNone(mrequest.request_history[0].headers['X-Client-Request-ID']) - self.assertEqual(self.process_tracker.processing_items, {}) # test that item removed from processing_items - self.assertEqual(self.client._create_tender_resource_item.call_count, 4) # check upload to tender - - @patch('gevent.sleep') - def test_retry_upload_to_tender_422(self, gevent_sleep): - gevent_sleep.side_effect = custom_sleep - self.client_upload_to_tender = MagicMock(side_effect=ResourceError(http_code=422)) - self.worker.retry_upload_to_tender_queue = Queue(10) - self.worker.retry_upload_to_tender_queue.put(self.data) - self.shutdown_when_done(self.worker) - self.assertEqual(self.upload_to_doc_service_queue.qsize(), 0, 'Queue should be empty') - self.assertEqual(self.upload_to_tender_queue.qsize(), 0, 'Queue should be empty') - - @patch('gevent.sleep') - def test_retry_upload_to_tender_429(self, gevent_sleep): - gevent_sleep.side_effect = custom_sleep - self.client.client_upload_to_tender = MagicMock( - side_effect=[ResourceError(http_code=429), ResourceError(http_code=403)]) - self.worker.retry_upload_to_tender_queue = Queue(10) - self.worker.retry_upload_to_tender_queue.put(self.data) - self.shutdown_when_done(self.worker) - self.assertEqual(self.upload_to_doc_service_queue.qsize(), 0, 'Queue should be empty') - self.assertEqual(self.upload_to_tender_queue.qsize(), 0, 'Queue should be empty') - - @patch('gevent.sleep') - def test_retry_upload_to_tender_exception(self, gevent_sleep): - gevent_sleep.side_effect = custom_sleep - self.worker.client_upload_to_tender = MagicMock(side_effect=[Exception(), ResourceError(http_code=403)]) - self.worker.retry_upload_to_tender_queue.put(self.data) - self.shutdown_when_done(self.worker) - self.assertEqual(self.upload_to_doc_service_queue.qsize(), 0, 'Queue should be empty') - self.assertEqual(self.upload_to_tender_queue.qsize(), 0, 'Queue should be empty') @requests_mock.Mocker() @patch('gevent.sleep') def test_request_failed(self, mrequest, gevent_sleep): gevent_sleep.side_effect = custom_sleep mrequest.post(self.url, json=self.stat_200(), status_code=200) - self.client._create_tender_resource_item.side_effect = ResourceError(http_code=422) self.upload_to_doc_service_queue.put(self.data) self.shutdown_when_done(self.worker) self.assertEqual(self.upload_to_doc_service_queue.qsize(), 0, 'Queue should be empty') - self.assertEqual(self.upload_to_tender_queue.qsize(), 0, 'Queue should be empty') + self.assertEqual(self.upload_to_tender_queue.get(), self.data) + self.assertEqual(self.process_tracker.processing_items, {item_key(self.tender_id, self.award_id): 1}) self.assertEqual(mrequest.call_count, 1) self.assertEqual(mrequest.request_history[0].url, u'127.0.0.1:80/upload') self.assertIsNotNone(mrequest.request_history[0].headers['X-Client-Request-ID']) - self.assertEqual(self.process_tracker.processing_items, {}) - self.assertEqual(self.client._create_tender_resource_item.call_count, 1) # check that processed just 1 request @requests_mock.Mocker() @patch('gevent.sleep') def test_request_failed_item_status_change(self, mrequest, gevent_sleep): gevent_sleep.side_effect = custom_sleep mrequest.post(self.url, json=self.stat_200(), status_code=200) - self.client._create_tender_resource_item.side_effect = [ResourceError(http_code=403) for _ in range(2)] self.process_tracker.set_item(self.tender_id, self.qualification_id, 1) self.upload_to_doc_service_queue.put(self.data) self.upload_to_doc_service_queue.put(self.qualification_data) self.shutdown_when_done(self.worker) self.assertEqual(self.upload_to_doc_service_queue.qsize(), 0, 'Queue should be empty') - self.assertEqual(self.upload_to_tender_queue.qsize(), 0, 'Queue should be empty') + self.assertEqual(self.upload_to_tender_queue.get(), self.data) + self.assertEqual(self.upload_to_tender_queue.get(), self.qualification_data) self.assertEqual(mrequest.call_count, 2) self.assertEqual(mrequest.request_history[0].url, u'127.0.0.1:80/upload') self.assertIsNotNone(mrequest.request_history[0].headers['X-Client-Request-ID']) - self.assertEqual(self.process_tracker.processing_items, {}) - self.assertEqual(self.client._create_tender_resource_item.call_count, 2) # check that processed just 1 request - - @patch('gevent.sleep') - def test_request_failed_in_retry(self, gevent_sleep): - gevent_sleep.side_effect = custom_sleep - self.worker.client_upload_to_tender = MagicMock() - self.worker.client_upload_to_tender.side_effect = [ResourceError(http_code=429) for _ in range(5)] + [ - ResourceError(http_code=403)] - self.sleep_change_value.increment_step = 3 - self.sleep_change_value.decrement_step = 1.5 - self.worker.retry_upload_to_tender_queue.put(self.data) - self.shutdown_when_done(self.worker) - self.assertEqual(self.upload_to_doc_service_queue.qsize(), 0, 'Queue should be empty') - self.assertEqual(self.upload_to_tender_queue.qsize(), 0, 'Queue should be empty') - self.assertEqual(self.worker.retry_upload_to_tender_queue.qsize(), 0, 'Queue should be empty') - self.assertEqual(self.worker.sleep_change_value.time_between_requests, 13.5) - self.assertEqual(self.process_tracker.processing_items, {}) - self.assertEqual(self.worker.client_upload_to_tender.call_count, 6) # check that processed just 1 request - - @requests_mock.Mocker() - @patch('gevent.sleep') - def test_request_failed_in_retry_item_status(self, mrequest, gevent_sleep): - gevent_sleep.side_effect = custom_sleep - mrequest.post(self.url, json=self.stat_200(), status_code=200) - self.client._create_tender_resource_item.side_effect = [ResourceError(http_code=429)] + [ - ResourceError(http_code=403) for _ in range(4)] - self.worker.retry_upload_to_tender_queue.put(self.data) - self.shutdown_when_done(self.worker) - self.assertEqual(self.upload_to_doc_service_queue.qsize(), 0, 'Queue should be empty') - self.assertEqual(self.upload_to_tender_queue.qsize(), 0, 'Queue should be empty') - self.assertEqual(self.process_tracker.processing_items, {}) - self.assertEqual(self.client._create_tender_resource_item.call_count, 5) # check that processed just 1 request + self.assertEqual(self.process_tracker.processing_items, + {item_key(self.tender_id, self.award_id): 1, + item_key(self.tender_id, self.qualification_id): 1}) @requests_mock.Mocker() @patch('gevent.sleep') def test_processing_items(self, mrequest, gevent_sleep): gevent_sleep.side_effect = custom_sleep mrequest.post(self.url, [{'json': self.stat_200(), 'status_code': 200} for _ in range(2)]) - self.client._create_tender_resource_item.side_effect = [self.get_tender() for _ in range(2)] self.process_tracker.set_item(self.tender_id, self.award_id, 2) self.upload_to_doc_service_queue.put(self.data) self.upload_to_doc_service_queue.put(self.data) self.shutdown_when_done(self.worker) - self.assertEqual(self.upload_to_tender_queue.qsize(), 0, 'Queue should be empty') + self.assertEqual(self.upload_to_tender_queue.get(), self.data) + self.assertEqual(self.upload_to_tender_queue.get(), self.data) + self.assertEqual(mrequest.request_history[0].url, u'127.0.0.1:80/upload') self.assertIsNotNone(mrequest.request_history[0].headers['X-Client-Request-ID']) - self.assertIsNotNone(mrequest.request_history[1].headers['X-Client-Request-ID']) - self.assertEqual(self.process_tracker.processing_items, {}) - self.assertEqual(self.client._create_tender_resource_item.call_count, 2) # check that processed just 1 request @requests_mock.Mocker() @patch('gevent.sleep') @@ -345,68 +178,12 @@ def test_upload_to_doc_service_queue_loop_exit(self, mrequest, gevent_sleep): self.worker.upload_to_doc_service_queue.peek.side_effect = generate_answers( answers=[LoopExit(), self.data, self.data], default=LoopExit()) mrequest.post(self.url, [{'json': self.stat_200(), 'status_code': 200} for _ in range(2)]) - self.client._create_tender_resource_item.side_effect = [self.get_tender() for _ in range(2)] self.worker.start() sleep(1) - self.assertEqual(self.upload_to_tender_queue.qsize(), 0, 'Queue should be empty') + self.assertEqual(self.upload_to_tender_queue.get(), self.data) self.assertIsNotNone(mrequest.request_history[0].headers['X-Client-Request-ID']) self.assertIsNotNone(mrequest.request_history[1].headers['X-Client-Request-ID']) - self.assertEqual(self.process_tracker.processing_items, {}) - self.assertEqual(self.client._create_tender_resource_item.call_count, 2) # check that processed just 1 request - - @requests_mock.Mocker() - @patch('gevent.sleep') - def test_upload_to_tender_queue_loop_exit(self, mrequest, gevent_sleep): - """ Test LoopExit for upload_to_tender_queue """ - gevent_sleep.side_effect = custom_sleep - self.client._create_tender_resource_item.side_effect = [self.get_tender() for _ in range(2)] - self.process_tracker.set_item(self.tender_id, self.award_id, 2) - self.worker.upload_to_doc_service_queue = Queue(1) - self.worker.upload_to_tender_queue = MagicMock() - self.worker.upload_to_tender_queue.peek.side_effect = generate_answers( - answers=[LoopExit()] + [Data(tender_id=self.tender_id, - item_id=self.award_id, - code='123', item_name='awards', - file_content={ - u'meta': {u'id': self.document_id}, - u'url': u'http://docs-sandbox.openprocurement.org/get/8ccbfde0c6804143b119d9168452cb6f', - u'format': u'application/yaml', - u'hash': u'md5:9a0364b9e99bb480dd25e1f0284c8555', - u'title': file_name}) for _ in range(2)], - default=LoopExit()) - self.worker.start() - sleep(1) - self.worker.shutdown() - self.assertEqual(self.process_tracker.processing_items, {}) - self.assertIsNotNone(self.client.request_history[0].headers['X-Client-Request-ID']) - self.assertIsNotNone(self.client.request_history[1].headers['X-Client-Request-ID']) - self.assertEqual(self.client._create_tender_resource_item.call_count, 2) # check that processed just 1 request - - @patch('gevent.sleep') - def test_retry_upload_to_tender_queue_loop_exit(self, gevent_sleep): - """ Test LoopExit for retry_upload_to_tender_queue """ - gevent_sleep.side_effect = custom_sleep - self.client._create_tender_resource_item.side_effect = [self.get_tender() for _ in range(2)] - self.worker.retry_upload_to_tender_queue = MagicMock() - self.worker.retry_upload_to_tender_queue.peek.side_effect = generate_answers( - answers=[LoopExit()] + [Data(tender_id=self.tender_id, - item_id=self.award_id, - code='123', item_name='awards', - file_content={ - u'meta': {u'id': self.document_id}, - u'url': u'http://docs-sandbox.openprocurement.org/get/8ccbfde0c6804143b119d9168452cb6f', - u'format': u'application/yaml', - u'hash': u'md5:9a0364b9e99bb480dd25e1f0284c8555', - u'title': file_name}) for _ in range(2)], - default=LoopExit()) - self.process_tracker.set_item(self.tender_id, self.award_id, 2) - self.worker.start() - sleep(1) - self.worker.shutdown() - self.assertEqual(self.process_tracker.processing_items, {}) - self.assertIsNotNone(self.client.request_history[0].headers['X-Client-Request-ID']) - self.assertIsNotNone(self.client.request_history[1].headers['X-Client-Request-ID']) - self.assertEqual(self.client._create_tender_resource_item.call_count, 2) # check that processed just 1 request + self.assertEqual(self.process_tracker.processing_items, {item_key(self.tender_id, self.award_id): 2}) @requests_mock.Mocker() @patch('gevent.sleep') @@ -414,7 +191,6 @@ def test_retry_upload_to_doc_service_queue_loop_exit(self, mrequest, gevent_slee """ Test LoopExit for retry_upload_to_doc_service_queue """ gevent_sleep.side_effect = custom_sleep mrequest.post(self.url, [{'json': self.stat_200(), 'status_code': 200} for _ in range(2)]) - self.client._create_tender_resource_item.side_effect = [self.get_tender() for _ in range(2)] self.process_tracker.set_item(self.tender_id, self.award_id, 2) self.worker.retry_upload_to_doc_service_queue = MagicMock() self.worker.retry_upload_to_doc_service_queue.peek.side_effect = generate_answers( @@ -422,38 +198,79 @@ def test_retry_upload_to_doc_service_queue_loop_exit(self, mrequest, gevent_slee self.worker.start() sleep(1) self.worker.shutdown() - self.assertEqual(self.upload_to_tender_queue.qsize(), 0, 'Queue should be empty') - self.assertEqual(self.process_tracker.processing_items, {}) - self.assertIsNotNone(self.client.request_history[0].headers['X-Client-Request-ID']) - self.assertIsNotNone(self.client.request_history[1].headers['X-Client-Request-ID']) - self.assertEqual(self.client._create_tender_resource_item.call_count, 2) # check that processed just 1 request + self.assertEqual(self.upload_to_tender_queue.get(), self.data) + self.assertEqual(self.process_tracker.processing_items, {item_key(self.tender_id, self.award_id): 2}) + self.assertEqual(mrequest.request_history[0].url, u'127.0.0.1:80/upload') + self.assertIsNotNone(mrequest.request_history[0].headers['X-Client-Request-ID']) - @requests_mock.Mocker() + + def test_remove_bad_data(self): + self.worker.upload_to_doc_service_queue = MagicMock(get=MagicMock()) + self.worker.process_tracker = MagicMock(update_items_and_tender=MagicMock()) + + self.worker.remove_bad_data(self.data, Exception("test message"), False) + + self.worker.upload_to_doc_service_queue.get.assert_called_once() + self.assertEqual(self.worker.retry_upload_to_doc_service_queue.get(), self.data) + + def test_remove_bad_data_retry(self): + self.worker.retry_upload_to_doc_service_queue = MagicMock(get=MagicMock()) + self.worker.process_tracker = MagicMock(update_items_and_tender=MagicMock()) + + with self.assertRaises(Exception): + self.worker.remove_bad_data(self.data, Exception("test message"), True) + + self.worker.retry_upload_to_doc_service_queue.get.assert_called_once() + self.worker.process_tracker.update_items_and_tender.assert_called_with(self.data.tender_id, self.data.item_id) + + def test_try_upload_to_doc_service(self): + e = Exception("test error") + self.worker.update_headers_and_upload = MagicMock(side_effect=e) + self.worker.remove_bad_data = MagicMock() + + self.worker.try_upload_to_doc_service(self.data, False) + + self.worker.update_headers_and_upload.assert_called_once() + self.worker.remove_bad_data.assert_called_once_with(self.data, e, False) + + def test_try_upload_to_doc_service_retry(self): + e = Exception("test error") + self.worker.update_headers_and_upload = MagicMock(side_effect=e) + self.worker.remove_bad_data = MagicMock() + + self.worker.try_upload_to_doc_service(self.data, True) + + self.worker.update_headers_and_upload.assert_called_once() + self.worker.remove_bad_data.assert_called_with(self.data, e, True) + + def test_run(self): + self.worker.delay = 1 + upload_to_doc_service, retry_upload_to_doc_service = [MagicMock() for _ in range(2)] + self.worker.upload_to_doc_service = upload_to_doc_service + self.worker.retry_upload_to_doc_service = retry_upload_to_doc_service + with patch.object(self.worker, 'exit', AlmostAlwaysTrue(1)): + self.worker._run() + self.assertEqual(self.worker.upload_to_doc_service.call_count, 1) + self.assertEqual(self.worker.retry_upload_to_doc_service.call_count, 1) + + @patch('gevent.killall') + def test_run_exception(self, killlall): + self.worker.delay = 1 + self.worker._start_jobs = MagicMock(return_value={"a": 1}) + self.worker.check_and_revive_jobs = MagicMock(side_effect=Exception("test error")) + self.worker._run() + killlall.assert_called_once_with([1], timeout=5) + + @patch('gevent.killall') @patch('gevent.sleep') - def test_412(self, mrequest, gevent_sleep): + def test_run_exception(self, gevent_sleep, killlall): gevent_sleep.side_effect = custom_sleep - mrequest.post(self.url, json=self.stat_200(), status_code=200) - api_server_bottle = Bottle() - api_server = WSGIServer(('127.0.0.1', 20604), api_server_bottle, log=None) - setup_routing(api_server_bottle, response_spore) - setup_routing(api_server_bottle, response_412, path='/api/2.3/tenders/{}/awards/{}/documents'.format( - self.tender_id, self.award_id), method='POST') - api_server.start() - self.worker.client = TendersClientSync('', host_url='http://127.0.0.1:20604', api_version='2.3') - setup_routing(api_server_bottle, generate_response, path='/api/2.3/tenders/{}/awards/{}/documents'.format( - self.tender_id, self.award_id), method='POST') - self.assertEqual(self.worker.client.headers['Cookie'], - 'SERVER_ID={}'.format(SPORE_COOKIES)) # check that response_spore set cookies - self.upload_to_doc_service_queue.put(self.data) - self.assertItemsEqual(self.process_tracker.processing_items.keys(), [item_key(self.tender_id, self.award_id)]) - self.assertEqual(self.upload_to_doc_service_queue.qsize(), 1) - self.worker.start() - self.shutdown_when_done(self.worker) - self.assertEqual(self.worker.client.headers['Cookie'], - 'SERVER_ID={}'.format(COOKIES_412)) # check that response_412 change cookies - self.assertEqual(self.upload_to_doc_service_queue.qsize(), 0, 'Queue should be empty') - self.assertEqual(self.upload_to_tender_queue.qsize(), 0, 'Queue should be empty') - self.assertEqual(mrequest.call_count, 1) - self.assertEqual(mrequest.request_history[0].url, u'127.0.0.1:80/upload') - self.assertIsNotNone(mrequest.request_history[0].headers['X-Client-Request-ID']) - self.assertItemsEqual(self.process_tracker.processing_items.keys(), []) + self.worker._start_jobs = MagicMock(return_value={"a": 1}) + self.worker.check_and_revive_jobs = MagicMock(side_effect=Exception("test error")) + + self.worker._run() + + killlall.assert_called_once_with([1], timeout=5) + + + diff --git a/openprocurement/bot/identification/tests/utils.py b/openprocurement/bot/identification/tests/utils.py index e2bc68b..c5c9967 100644 --- a/openprocurement/bot/identification/tests/utils.py +++ b/openprocurement/bot/identification/tests/utils.py @@ -34,3 +34,15 @@ def body_string(self): def next(self): pass + + +class AlmostAlwaysTrue(object): + def __init__(self, total_iterations=1): + self.total_iterations = total_iterations + self.current_iteration = 0 + + def __nonzero__(self): + if self.current_iteration < self.total_iterations: + self.current_iteration += 1 + return bool(0) + return bool(1) \ No newline at end of file