Skip to content

Commit

Permalink
Merge pull request #66 from ITVaan/dev_refactor_from_cache
Browse files Browse the repository at this point in the history
Refactoring filter_tender and edr_handler 1
  • Loading branch information
Danil Trenkenshu authored Aug 17, 2017
2 parents 31f369d + 10010f5 commit 7095871
Showing 19 changed files with 1,569 additions and 908 deletions.
3 changes: 2 additions & 1 deletion openprocurement/bot/identification/client.py
Original file line number Diff line number Diff line change
@@ -17,7 +17,6 @@ def verify(self, param, code, headers):
"""Send request to Proxy server to verify EDRPOU code"""
url = '{url}?{param}={code}'.format(url=self.verify_url, param=param, code=code)
response = self.session.get(url=url, auth=(self.user, self.password), timeout=self.timeout, headers=headers)

return response

def health(self, sandbox_mode):
@@ -38,8 +37,10 @@ def __init__(self, host, user, password, port=6555, timeout=None):
self.user = user
self.password = password
self.timeout = timeout
self.headers = {}

def upload(self, filename, in_file, content_type, headers):
files = {'file': (filename, in_file, content_type)}
self.headers.update(headers)
response = self.session.post(url=self.url, auth=(self.user, self.password), timeout=self.timeout, files=files, headers=headers)
return response
67 changes: 40 additions & 27 deletions openprocurement/bot/identification/databridge/bridge.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
# -*- coding: utf-8 -*-
from gevent import monkey
from retrying import retry

monkey.patch_all()

@@ -13,15 +12,18 @@
from functools import partial
from yaml import load
from gevent.queue import Queue
from retrying import retry
from restkit import request, RequestError, ResourceError
from requests import RequestException
from constants import retry_mult

from openprocurement_client.client import TendersClientSync as BaseTendersClientSync, TendersClient as BaseTendersClient
from openprocurement.bot.identification.client import DocServiceClient, ProxyClient
from openprocurement.bot.identification.databridge.scanner import Scanner
from openprocurement.bot.identification.databridge.filter_tender import FilterTenders
from openprocurement.bot.identification.databridge.edr_handler import EdrHandler
from openprocurement.bot.identification.databridge.upload_file import UploadFile
from openprocurement.bot.identification.databridge.upload_file_to_tender import UploadFileToTender
from openprocurement.bot.identification.databridge.upload_file_to_doc_service import UploadFileToDocService
from openprocurement.bot.identification.databridge.utils import journal_context, check_412, ProcessTracker
from caching import Db
from openprocurement.bot.identification.databridge.journal_msg_ids import (
@@ -34,14 +36,12 @@


class TendersClientSync(BaseTendersClientSync):

@check_412
def request(self, *args, **kwargs):
return super(TendersClientSync, self).request(*args, **kwargs)


class TendersClient(BaseTendersClient):

@check_412
def _create_tender_resource_item(self, *args, **kwargs):
return super(TendersClient, self)._create_tender_resource_item(*args, **kwargs)
@@ -83,13 +83,13 @@ def __init__(self, config):
# init queues for workers
self.filtered_tender_ids_queue = Queue(maxsize=buffers_size) # queue of tender IDs with appropriate status
self.edrpou_codes_queue = Queue(maxsize=buffers_size) # queue with edrpou codes (Data objects stored in it)
self.upload_to_doc_service_queue = Queue(maxsize=buffers_size) # queue with detailed info from EDR (Data.file_content)
# upload_to_tender_queue - queue with file's get_url
self.upload_to_doc_service_queue = Queue(maxsize=buffers_size) # queue with info from EDR (Data.file_content)
self.upload_to_tender_queue = Queue(maxsize=buffers_size)

# blockers
self.initialization_event = gevent.event.Event()
self.services_not_available = gevent.event.Event()
self.services_not_available.set()
self.db = Db(config)
self.process_tracker = ProcessTracker(self.db, self.time_to_live)

@@ -119,15 +119,22 @@ def __init__(self, config):
services_not_available=self.services_not_available,
delay=self.delay)

self.upload_file = partial(UploadFile.spawn,
client=self.client,
upload_to_doc_service_queue=self.upload_to_doc_service_queue,
upload_to_tender_queue=self.upload_to_tender_queue,
process_tracker=self.process_tracker,
doc_service_client=self.doc_service_client,
services_not_available=self.services_not_available,
sleep_change_value=self.sleep_change_value,
delay=self.delay)
self.upload_file_to_doc_service = partial(UploadFileToDocService.spawn,
upload_to_doc_service_queue=self.upload_to_doc_service_queue,
upload_to_tender_queue=self.upload_to_tender_queue,
process_tracker=self.process_tracker,
doc_service_client=self.doc_service_client,
services_not_available=self.services_not_available,
sleep_change_value=self.sleep_change_value,
delay=self.delay)

self.upload_file_to_tender = partial(UploadFileToTender.spawn,
client=self.client,
upload_to_tender_queue=self.upload_to_tender_queue,
process_tracker=self.process_tracker,
services_not_available=self.services_not_available,
sleep_change_value=self.sleep_change_value,
delay=self.delay)

def config_get(self, name):
return self.config.get('main').get(name)
@@ -194,7 +201,9 @@ def _start_jobs(self):
self.jobs = {'scanner': self.scanner(),
'filter_tender': self.filter_tender(),
'edr_handler': self.edr_handler(),
'upload_file': self.upload_file()}
'upload_file_to_doc_service': self.upload_file_to_doc_service(),
'upload_file_to_tender': self.upload_file_to_tender(),
}

def launch(self):
while True:
@@ -213,17 +222,21 @@ def run(self):
self.check_services()
if counter == 20:
counter = 0
logger.info('Current state: Filtered tenders {}; Edrpou codes queue {}; Retry edrpou codes queue {};'
'Upload to doc service {}; Retry upload to doc service {}; '
'Upload to tender {}; Retry upload to tender {}'.format(
self.filtered_tender_ids_queue.qsize(),
self.edrpou_codes_queue.qsize(),
self.jobs['edr_handler'].retry_edrpou_codes_queue.qsize() if self.jobs['edr_handler'] else 0,
self.upload_to_doc_service_queue.qsize(),
self.jobs['upload_file'].retry_upload_to_doc_service_queue.qsize() if self.jobs['upload_file'] else 0,
self.upload_to_tender_queue.qsize(),
self.jobs['upload_file'].retry_upload_to_tender_queue.qsize() if self.jobs['upload_file'] else 0
))
logger.info(
'Current state: Filtered tenders {}; Edrpou codes queue {}; Retry edrpou codes queue {};'
'Upload to doc service {}; Retry upload to doc service {}; '
'Upload to tender {}; Retry upload to tender {}'.format(
self.filtered_tender_ids_queue.qsize(),
self.edrpou_codes_queue.qsize(),
self.jobs['edr_handler'].retry_edrpou_codes_queue.qsize() if self.jobs[
'edr_handler'] else 0,
self.upload_to_doc_service_queue.qsize(),
self.jobs['upload_file_to_doc_service'].retry_upload_to_doc_service_queue.qsize() if self.jobs[
'upload_file_to_doc_service'] else 0,
self.upload_to_tender_queue.qsize(),
self.jobs['upload_file_to_tender'].retry_upload_to_tender_queue.qsize() if self.jobs[
'upload_file_to_tender'] else 0
))
counter += 1
for name, job in self.jobs.items():
logger.debug("{}.dead: {}".format(name, job.dead))
3 changes: 2 additions & 1 deletion openprocurement/bot/identification/databridge/constants.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
# coding=utf-8
major = 1
minor = 2
bugfix = 1
version = '{}.{}.{}'.format(major, minor, bugfix) # major.minor.bugfix
version = '{}.{}.{}'.format(major, minor, bugfix)
file_name = 'edr_identification.yaml'
author = "IdentificationBot"
retry_mult = 1000
Loading

0 comments on commit 7095871

Please sign in to comment.