Skip to content

Commit

Permalink
Merge pull request #99 from ITVaan/refactoring_databridge_and_tests
Browse files Browse the repository at this point in the history
Refactoring databridge and tests
  • Loading branch information
kroman0 authored Aug 10, 2017
2 parents 8a4fb1a + dd44e7e commit 31f369d
Show file tree
Hide file tree
Showing 10 changed files with 880 additions and 1,537 deletions.
3 changes: 3 additions & 0 deletions openprocurement/bot/identification/databridge/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,6 @@
file_name = 'edr_identification.yaml'
author = "IdentificationBot"
retry_mult = 1000
pre_qualification_procurementMethodType = ('aboveThresholdEU', 'competitiveDialogueUA', 'competitiveDialogueEU')
qualification_procurementMethodType = ('aboveThresholdUA', 'aboveThresholdUA.defense', 'aboveThresholdEU',
'competitiveDialogueUA.stage2', 'competitiveDialogueEU.stage2')
89 changes: 58 additions & 31 deletions openprocurement/bot/identification/databridge/filter_tender.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
# -*- coding: utf-8 -*-
from gevent import monkey

monkey.patch_all()

import logging.config
Expand All @@ -21,15 +22,15 @@
from openprocurement.bot.identification.databridge.constants import author
from restkit import ResourceError


logger = logging.getLogger(__name__)


class FilterTenders(Greenlet):
""" Edr API Data Bridge """
identification_scheme = u'UA-EDR'

def __init__(self, tenders_sync_client, filtered_tender_ids_queue, edrpou_codes_queue, process_tracker, services_not_available, sleep_change_value, delay=15):
def __init__(self, tenders_sync_client, filtered_tender_ids_queue, edrpou_codes_queue, process_tracker,
services_not_available, sleep_change_value, delay=15):
super(FilterTenders, self).__init__()
self.exit = False
self.start_time = datetime.now()
Expand Down Expand Up @@ -58,12 +59,15 @@ def prepare_data(self):
gevent.sleep(0)
continue
try:
response = self.tenders_sync_client.request("GET", path='{}/{}'.format(self.tenders_sync_client.prefix_path, tender_id),
response = self.tenders_sync_client.request("GET",
path='{}/{}'.format(self.tenders_sync_client.prefix_path,
tender_id),
headers={'X-Client-Request-ID': generate_req_id()})
except ResourceError as re:
if re.status_int == 429:
self.sleep_change_value.increment()
logger.info("Waiting tender {} for sleep_change_value: {} seconds".format(tender_id, self.sleep_change_value.time_between_requests))
logger.info("Waiting tender {} for sleep_change_value: {} seconds".format(tender_id,
self.sleep_change_value.time_between_requests))
else:
logger.warning('Fail to get tender info {}'.format(tender_id),
extra=journal_context(params={"TENDER_ID": tender_id}))
Expand All @@ -84,12 +88,14 @@ def prepare_data(self):
tender = munchify(loads(response.body_string()))['data']
logger.info('Get tender {} from filtered_tender_ids_queue'.format(tender_id),
extra=journal_context({"MESSAGE_ID": DATABRIDGE_GET_TENDER_FROM_QUEUE},
params={"TENDER_ID": tender['id']}))
params={"TENDER_ID": tender['id']}))
if 'awards' in tender:
for award in tender['awards']:
logger.info('Processing tender {} bid {} award {}'.format(tender['id'], award['bid_id'], award['id']),
extra=journal_context({"MESSAGE_ID": DATABRIDGE_TENDER_PROCESS},
params={"TENDER_ID": tender['id'], "BID_ID": award['bid_id'], "AWARD_ID": award['id']}))
logger.info(
'Processing tender {} bid {} award {}'.format(tender['id'], award['bid_id'], award['id']),
extra=journal_context({"MESSAGE_ID": DATABRIDGE_TENDER_PROCESS},
params={"TENDER_ID": tender['id'], "BID_ID": award['bid_id'],
"AWARD_ID": award['id']}))
if self.should_process_item(award):
for supplier in award['suppliers']:
code = supplier['identifier']['id']
Expand All @@ -98,29 +104,40 @@ def prepare_data(self):
logger.info(u'Tender {} bid {} award {} identifier id {} is not valid.'.format(
tender['id'], award['bid_id'], award['id'], code),
extra=journal_context({"MESSAGE_ID": DATABRIDGE_TENDER_NOT_PROCESS},
params={"TENDER_ID": tender['id'], "BID_ID": award['bid_id'], "AWARD_ID": award['id']}))
params={"TENDER_ID": tender['id'],
"BID_ID": award['bid_id'],
"AWARD_ID": award['id']}))
continue
# quick check if item was already processed
# quick check if item was already processed
if self.process_tracker.check_processed_item(tender['id'], award['id']):
logger.info('Tender {} bid {} award {} was already processed.'.format(
tender['id'], award['bid_id'], award['id']),
extra=journal_context({"MESSAGE_ID": DATABRIDGE_TENDER_NOT_PROCESS},
params={"TENDER_ID": tender['id'], "BID_ID": award['bid_id'], "AWARD_ID": award['id']}))
params={"TENDER_ID": tender['id'],
"BID_ID": award['bid_id'],
"AWARD_ID": award['id']}))
elif self.should_process_award(supplier, tender, award):
self.process_tracker.set_item(tender['id'], award['id'])
document_id = generate_doc_id()
tender_data = Data(tender['id'], award['id'], str(code),
'awards', {'meta': {'id': document_id, 'author': author, 'sourceRequests': [response.headers['X-Request-ID']]}})
'awards', {'meta': {'id': document_id, 'author': author,
'sourceRequests': [
response.headers['X-Request-ID']]}})
self.edrpou_codes_queue.put(tender_data)
else:
logger.info('Tender {} bid {} award {} identifier schema isn\'t UA-EDR or tender is already in process.'.format(
tender['id'], award['bid_id'], award['id']),
logger.info(
'Tender {} bid {} award {} identifier schema isn\'t UA-EDR or tender is already in process.'.format(
tender['id'], award['bid_id'], award['id']),
extra=journal_context({"MESSAGE_ID": DATABRIDGE_TENDER_NOT_PROCESS},
params={"TENDER_ID": tender['id'], "BID_ID": award['bid_id'], "AWARD_ID": award['id']}))
params={"TENDER_ID": tender['id'],
"BID_ID": award['bid_id'],
"AWARD_ID": award['id']}))
else:
logger.info('Tender {} bid {} award {} is not in status pending or award has already document '
'with documentType registerExtract.'.format(tender_id, award['bid_id'], award['id']),
extra=journal_context(params={"TENDER_ID": tender['id'], "BID_ID": award['bid_id'], "AWARD_ID": award['id']}))
logger.info(
'Tender {} bid {} award {} is not in status pending or award has already document '
'with documentType registerExtract.'.format(tender_id, award['bid_id'], award['id']),
extra=journal_context(params={"TENDER_ID": tender['id'], "BID_ID": award['bid_id'],
"AWARD_ID": award['id']}))
elif 'qualifications' in tender:
for qualification in tender['qualifications']:
if self.should_process_item(qualification):
Expand All @@ -132,38 +149,48 @@ def prepare_data(self):
tender['id'], qualification['bidID'], qualification['id'], code),
extra=journal_context({"MESSAGE_ID": DATABRIDGE_TENDER_NOT_PROCESS},
params={"TENDER_ID": tender['id'], "BID_ID":
qualification['bidID'], "QUALIFICATION_ID": qualification['id']}))
qualification['bidID'],
"QUALIFICATION_ID": qualification['id']}))
continue
# quick check if item was already processed
if self.process_tracker.check_processed_item(tender['id'], qualification['id']):
logger.info('Tender {} bid {} qualification {} was already processed.'.format(
tender['id'], qualification['bidID'], qualification['id']),
tender['id'], qualification['bidID'], qualification['id']),
extra=journal_context({"MESSAGE_ID": DATABRIDGE_TENDER_NOT_PROCESS},
params={"TENDER_ID": tender['id'],
"BID_ID": qualification['bidID'], "QUALIFICATION_ID": qualification['id']}))
"BID_ID": qualification['bidID'],
"QUALIFICATION_ID": qualification['id']}))
# check first identification scheme, if yes then check if item is already in process or not
elif self.should_process_qualification(appropriate_bid, tender, qualification):
self.process_tracker.set_item(tender['id'], qualification['id'])
document_id = generate_doc_id()
tender_data = Data(tender['id'], qualification['id'], str(code),
'qualifications', {'meta': {'id': document_id, 'author': author, 'sourceRequests': [response.headers['X-Request-ID']]}})
'qualifications', {'meta': {'id': document_id, 'author': author,
'sourceRequests': [
response.headers['X-Request-ID']]}})
self.edrpou_codes_queue.put(tender_data)
logger.info('Processing tender {} bid {} qualification {}'.format(
tender['id'], qualification['bidID'], qualification['id']),
extra=journal_context({"MESSAGE_ID": DATABRIDGE_TENDER_PROCESS},
params={"TENDER_ID": tender['id'], "BID_ID": qualification['bidID'], "QUALIFICATION_ID": qualification['id']}))
params={"TENDER_ID": tender['id'],
"BID_ID": qualification['bidID'],
"QUALIFICATION_ID": qualification['id']}))
else:
logger.info('Tender {} bid {} qualification {} identifier schema is not UA-EDR or tender is already in process.'.format(
tender['id'], qualification['bidID'], qualification['id']),
logger.info(
'Tender {} bid {} qualification {} identifier schema is not UA-EDR or tender is already in process.'.format(
tender['id'], qualification['bidID'], qualification['id']),
extra=journal_context({"MESSAGE_ID": DATABRIDGE_TENDER_NOT_PROCESS},
params={"TENDER_ID": tender['id'],
"BID_ID": qualification['bidID'], "QUALIFICATION_ID": qualification['id']}))
"BID_ID": qualification['bidID'],
"QUALIFICATION_ID": qualification['id']}))
else:
logger.info('Tender {} bid {} qualification {} is not in status pending or qualification has '
'already document with documentType registerExtract.'.format(
tender_id, qualification['bidID'], qualification['id']),
extra=journal_context(params={"TENDER_ID": tender['id'],
"BID_ID": qualification['bidID'], "QUALIFICATION_ID": qualification['id']}))
logger.info(
'Tender {} bid {} qualification {} is not in status pending or qualification has '
'already document with documentType registerExtract.'.format(
tender_id, qualification['bidID'], qualification['id']),
extra=journal_context(params={"TENDER_ID": tender['id'],
"BID_ID": qualification['bidID'],
"QUALIFICATION_ID": qualification['id']}))
self.filtered_tender_ids_queue.get() # Remove elem from queue
gevent.sleep(self.sleep_change_value.time_between_requests)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,8 @@
DATABRIDGE_SUCCESS_UPLOAD_TO_DOC_SERVICE= 'edr_databridge_success_upload_to_doc_service'
DATABRIDGE_RESTART_WORKER = 'edr_databridge_restart_worker'
DATABRIDGE_PROCESSING_TENDER = 'edr_databridge_processing_tender'
DATABRIDGE_UNSUCCESS_UPLOAD_TO_DOC_SERVICE = 'edr_databridge_unsuccess_upload_to_doc_service'
DATABRIDGE_UNSUCCESS_RETRY_UPLOAD_TO_DOC_SERVICE = 'edr_databridge_unsuccess_retry_upload_to_doc_service'
DATABRIDGE_UNSUCCESS_UPLOAD_TO_TENDER = 'edr_databridge_unsuccess_upload_to_tender'
DATABRIDGE_UNSUCCESS_RETRY_UPLOAD_TO_TENDER = 'edr_databridge_unsuccess_retry_upload_to_tender'
DATABRIDGE_UNSUCCESS_UPLOAD_TO_DOC_SERVICE = 'edr_databridge_unsuccess{}_upload_to_doc_service'
DATABRIDGE_UNSUCCESS_UPLOAD_TO_TENDER = 'edr_databridge_unsuccess{}_upload_to_tender'
DATABRIDGE_SUCCESS_UPLOAD_TO_TENDER = 'edr_databridge_success_upload_to_tender'
DATABRIDGE_RESTART_UPLOAD_TO_DOC_SERVICE = 'edr_databridge_restart_upload_to_doc_service'
DATABRIDGE_START_UPLOAD = 'edr_databridge_start_upload'
Expand All @@ -39,5 +37,4 @@
DATABRIDGE_DOC_SERVICE_CONN_ERROR = 'edr_databridge_doc_service_conn_error'
DATABRIDGE_TENDERS_SERVER_CONN_ERROR = 'edr_databridge_tenders_server_conn_error'
DATABRIDGE_PROXY_SERVER_CONN_ERROR = 'edr_databridge_proxy_server_conn_error'
DATABRIDGE_422_UPLOAD_TO_TENDER = 'edr_databridge_422_upload_to_tender'
DATABRIDGE_ITEM_STATUS_CHANGED_WHILE_PROCESSING = 'edr_databridge_item_status_changed_while_processing'
Loading

0 comments on commit 31f369d

Please sign in to comment.