Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Verification of the response #9

Open
wants to merge 9 commits into
base: master
Choose a base branch
from
Open
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
initial commit of two new workers
  • Loading branch information
dtrenkenshu authored and dtrenkenshu committed Oct 13, 2017
commit 2ba5b88df6cea82fc2a40f36d97fbd8086f9804a
5 changes: 5 additions & 0 deletions bot/dfs/bridge/caching.py
Original file line number Diff line number Diff line change
@@ -24,10 +24,12 @@ def __init__(self, config):
self.db = redis.StrictRedis(host=self._host, port=self._port, db=self._db_name)
self.set_value = self.db.set
self.has_value = self.db.exists
self.remove_value = self.db.delete
LOGGER.info("Cache initialized")
else:
self.set_value = lambda x, y, z: None
self.has_value = lambda x: None
self.remove_value = lambda x: None

def config_get(self, name):
return self.config.get('main').get(name)
@@ -40,6 +42,9 @@ def put(self, key, value, ex=86400):
LOGGER.info("Saving key {} to cache".format(key))
self.set_value(key, value, ex)

def remove(self, key):
self.remove_value(key)

def has(self, key):
LOGGER.info("Checking if code {} is in the cache".format(key))
return self.has_value(key)
15 changes: 3 additions & 12 deletions bot/dfs/bridge/constants.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,8 @@
# -*- coding: utf-8 -*-
from json import loads

import os

from pytz import timezone


def read_json(name):
curr_dir = os.path.dirname(os.path.realpath(__file__))
file_path = os.path.join(curr_dir, name)
with open(file_path) as lang_file:
data = lang_file.read()
return loads(data)


major = 0
minor = 0
bugfix = 1
@@ -28,5 +17,7 @@ def read_json(name):
FORM_NAME = "Jxxxxxxx"
qualification_procurementMethodType = ('aboveThresholdUA', 'aboveThresholdUA.defense', 'aboveThresholdEU',
'competitiveDialogueUA.stage2', 'competitiveDialogueEU.stage2')
HOLIDAYS = read_json('working_days.json')
HOLIDAYS_FILE = 'working_days.json'
TZ = timezone(os.environ['TZ'] if 'TZ' in os.environ else 'Europe/Kiev')
file_name = "sfs_reference.yaml"

28 changes: 20 additions & 8 deletions bot/dfs/bridge/process_tracker.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
# -*- coding: utf-8 -*-
# coding=utf-8
import pickle

from datetime import datetime

from caching import db_key
@@ -15,15 +17,15 @@ def __init__(self, db=None, ttl=300):

def set_item(self, tender_id, item_id, docs_amount=0):
self.processing_items[item_key(tender_id, item_id)] = docs_amount
self.add_docs_amount_to_tender(tender_id, docs_amount)
self._add_docs_amount_to_tender(tender_id, docs_amount)

def add_docs_amount_to_tender(self, tender_id, docs_amount):
def _add_docs_amount_to_tender(self, tender_id, docs_amount):
if self.tender_documents_to_process.get(tender_id):
self.tender_documents_to_process[tender_id] += docs_amount
else:
self.tender_documents_to_process[tender_id] = docs_amount

def remove_docs_amount_from_tender(self, tender_id):
def _remove_docs_amount_from_tender(self, tender_id):
if self.tender_documents_to_process[tender_id] > 1:
self.tender_documents_to_process[tender_id] -= 1
else:
@@ -41,14 +43,24 @@ def check_processed_item(self, tender_id, item_id):
def check_processed_tenders(self, tender_id):
return self._db.has(db_key(tender_id)) or False

def update_processing_items(self, tender_id, item_id):
def get_unprocessed_items(self):
return self._db.get_items("unprocessed_*") or []

def add_unprocessed_item(self, data):
self._db.put(data.doc_id(), pickle.dumps(data), self.ttl)

def _remove_unprocessed_item(self, document_id):
self._db.remove(document_id)

def _update_processing_items(self, tender_id, item_id, document_id):
key = item_key(tender_id, item_id)
if self.processing_items[key] > 1:
self.processing_items[key] -= 1
else:
self.processed_items[key] = datetime.now()
self._remove_unprocessed_item(document_id)
del self.processing_items[key]

def update_items_and_tender(self, tender_id, item_id):
self.update_processing_items(tender_id, item_id)
self.remove_docs_amount_from_tender(tender_id)
def update_items_and_tender(self, tender_id, item_id, document_id):
self._update_processing_items(tender_id, item_id, document_id)
self._remove_docs_amount_from_tender(tender_id)
61 changes: 57 additions & 4 deletions bot/dfs/bridge/utils.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,16 @@
# -*- coding: utf-8 -*-
from datetime import datetime, time
from json import loads
from logging import getLogger
from string import digits, uppercase
from uuid import uuid4

from constants import (AWARD_STATUS, DOC_TYPE, FORM_NAME, HOLIDAYS, TZ, qualification_procurementMethodType,
tender_status)
import os
import yaml
import io

from constants import (AWARD_STATUS, DOC_TYPE, FORM_NAME, HOLIDAYS_FILE, TZ, qualification_procurementMethodType,
tender_status, file_name)
from restkit import ResourceError

LOGGER = getLogger(__name__)
@@ -101,11 +106,59 @@ def to_base36(number):

def business_date_checker():
current_date = datetime.now(TZ)
if current_date.weekday() in [5, 6] and HOLIDAYS.get(current_date.date().isoformat(), True) or HOLIDAYS.get(
current_date.date().isoformat(), False):
holidays = read_json(HOLIDAYS_FILE)
if cond1(current_date, holidays) or cond2(current_date, holidays):
return False
else:
if time(9, 0) <= current_date.time() <= time(18, 0):
return True
else:
return False


def cond1(current_date, holidays):
return current_date.weekday() in [5, 6] and holidays.get(current_date.date().isoformat(), True)


def cond2(current_date, holidays):
return holidays.get(current_date.date().isoformat(), False)


#
# def is_weekend(current_date, holidays):
# return current_date.weekday() in [5, 6] and holidays.get(current_date.date().isoformat(), False)
#
#
# def is_holiday(current_date, holidays):
# return holidays.get(current_date.date().isoformat(), True)
#
#
# def is_working_day_and_time(current_date):
# import pdb;
# pdb.set_trace()
# return current_date.weekday() in [5, 6] and is_working_day(current_date)
#
#
# def is_working_day(today):
# import pdb;
# pdb.set_trace()
# return (read_json(HOLIDAYS_FILE).get(today.date().isoformat(), True) or
# read_json(HOLIDAYS_FILE).get(today.date().isoformat(), False))


def read_json(name):
curr_dir = os.path.dirname(os.path.realpath(__file__))
file_path = os.path.join(curr_dir, name)
with open(file_path) as lang_file:
data = lang_file.read()
return loads(data)


def create_file(details):
""" Return temp file with details """
temporary_file = io.BytesIO()
temporary_file.name = file_name
temporary_file.write(yaml.safe_dump(details, allow_unicode=True, default_flow_style=False))
temporary_file.seek(0)

return temporary_file
Original file line number Diff line number Diff line change
@@ -8,8 +8,8 @@

from gevent import Greenlet

from utils import journal_context
from journal_msg_ids import DATABRIDGE_START_UPLOAD
from bot.dfs.bridge.utils import journal_context
from bot.dfs.bridge.journal_msg_ids import DATABRIDGE_START_UPLOAD

logger = logging.getLogger(__name__)

2 changes: 1 addition & 1 deletion bot/dfs/bridge/workers/filter_tender.py
Original file line number Diff line number Diff line change
@@ -12,7 +12,7 @@

from bot.dfs.bridge.utils import generate_req_id, journal_context, is_code_invalid
from bot.dfs.bridge.data import Data
from bot.dfs.bridge.base_worker import BaseWorker
from bot.dfs.bridge.workers.base_worker import BaseWorker
from bot.dfs.bridge.journal_msg_ids import DATABRIDGE_TENDER_NOT_PROCESS
from bot.dfs.bridge.constants import scheme

2 changes: 1 addition & 1 deletion bot/dfs/bridge/workers/request_for_reference.py
Original file line number Diff line number Diff line change
@@ -7,7 +7,7 @@
from datetime import datetime
from gevent import spawn, sleep

from bot.dfs.bridge.base_worker import BaseWorker
from bot.dfs.bridge.workers.base_worker import BaseWorker
from bot.dfs.bridge.utils import business_date_checker

logger = logging.getLogger(__name__)
2 changes: 1 addition & 1 deletion bot/dfs/bridge/workers/scanner.py
Original file line number Diff line number Diff line change
@@ -6,7 +6,7 @@
from datetime import datetime

import gevent
from bot.dfs.bridge.base_worker import BaseWorker
from bot.dfs.bridge.workers.base_worker import BaseWorker
from bot.dfs.bridge.constants import retry_mult
from gevent import spawn
from gevent.event import Event
2 changes: 1 addition & 1 deletion bot/dfs/bridge/workers/sfs_worker.py
Original file line number Diff line number Diff line change
@@ -4,7 +4,7 @@
monkey.patch_all()
from datetime import datetime

from bot.dfs.bridge.base_worker import BaseWorker
from bot.dfs.bridge.workers.base_worker import BaseWorker
from bot.dfs.tests.utils import generate_request_id


135 changes: 135 additions & 0 deletions bot/dfs/bridge/workers/upload_file_to_doc_service.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
# -*- coding: utf-8 -*-
from gevent import monkey

monkey.patch_all()

import logging.config
import gevent

from retrying import retry
from gevent.queue import Queue
from datetime import datetime
from gevent.hub import LoopExit
from gevent import spawn

from bot.dfs.bridge.workers.base_worker import BaseWorker
from bot.dfs.bridge.utils import journal_context, create_file
from bot.dfs.bridge.journal_msg_ids import DATABRIDGE_SUCCESS_UPLOAD_TO_DOC_SERVICE, \
DATABRIDGE_UNSUCCESS_UPLOAD_TO_DOC_SERVICE
from bot.dfs.bridge.constants import file_name, retry_mult

logger = logging.getLogger(__name__)


class UploadFileToDocService(BaseWorker):
""" Upload file with details """

def __init__(self, upload_to_doc_service_queue, upload_to_tender_queue, process_tracker, doc_service_client,
services_not_available, sleep_change_value, delay=15):
super(UploadFileToDocService, self).__init__(services_not_available)
self.start_time = datetime.now()

self.delay = delay
self.process_tracker = process_tracker
# init client
self.doc_service_client = doc_service_client

# init queues for workers
self.upload_to_doc_service_queue = upload_to_doc_service_queue
self.upload_to_tender_queue = upload_to_tender_queue
self.sleep_change_value = sleep_change_value
# retry queues for workers
self.retry_upload_to_doc_service_queue = Queue(maxsize=500)

def upload_worker(self):
while not self.exit:
self.services_not_available.wait()
self.try_peek_and_upload(False)
gevent.sleep(0)

def retry_upload_worker(self):
while not self.exit:
self.services_not_available.wait()
self.try_peek_and_upload(True)
gevent.sleep(0)

def try_peek_and_upload(self, is_retry):
try:
tender_data = self.peek_from_queue(is_retry)
except LoopExit:
gevent.sleep(0)
else:
self.try_upload_to_doc_service(tender_data, is_retry)

def peek_from_queue(self, is_retry):
return self.retry_upload_to_doc_service_queue.peek() if is_retry else self.upload_to_doc_service_queue.peek()

def try_upload_to_doc_service(self, tender_data, is_retry):
try:
response = self.update_headers_and_upload(tender_data, is_retry)
except Exception as e:
self.remove_bad_data(tender_data, e, is_retry)
else:
self.move_to_tender_if_200(response, tender_data, is_retry)

def update_headers_and_upload(self, tender_data, is_retry):
if is_retry:
return self.update_headers_and_upload_retry(tender_data)
else:
return self.doc_service_client.upload(file_name, create_file(tender_data.file_content), 'application/yaml',
headers={'X-Client-Request-ID': tender_data.doc_id()})

def update_headers_and_upload_retry(self, tender_data):
self.doc_service_client.headers.update({'X-Client-Request-ID': tender_data.doc_id()})
return self.client_upload_to_doc_service(tender_data)

def remove_bad_data(self, tender_data, e, is_retry):
logger.exception('Exception while uploading file to doc service {} doc_id: {}. Message: {}. {}'.
format(tender_data, tender_data.doc_id(), e, "Removed tender data" if is_retry else ""),
extra=journal_context({"MESSAGE_ID": DATABRIDGE_UNSUCCESS_UPLOAD_TO_DOC_SERVICE},
tender_data.log_params()))
if is_retry:
self.retry_upload_to_doc_service_queue.get()
self.process_tracker.update_items_and_tender(tender_data.tender_id, tender_data.item_id,
tender_data.doc_id())
raise e
else:
self.retry_upload_to_doc_service_queue.put(tender_data)
self.upload_to_doc_service_queue.get()

def move_to_tender_if_200(self, response, tender_data, is_retry):
if response.status_code == 200:
self.move_to_tender_queue(tender_data, response, is_retry)
else:
self.move_data_to_retry_or_leave(response, tender_data, is_retry)

def move_to_tender_queue(self, tender_data, response, is_retry):
data = tender_data
data.file_content = dict(response.json(), **{'meta': {'id': tender_data.doc_id()}})
self.upload_to_tender_queue.put(data)
if not is_retry:
self.upload_to_doc_service_queue.get()
else:
self.retry_upload_to_doc_service_queue.get()
logger.info('Successfully uploaded file to doc service {} doc_id: {}'.format(tender_data, tender_data.doc_id()),
extra=journal_context({"MESSAGE_ID": DATABRIDGE_SUCCESS_UPLOAD_TO_DOC_SERVICE},
tender_data.log_params()))

def move_data_to_retry_or_leave(self, response, tender_data, is_retry):
logger.info('Not successful response from document service while uploading {} doc_id: {}. Response {}'.
format(tender_data, tender_data.doc_id(), response.status_code),
extra=journal_context({"MESSAGE_ID": DATABRIDGE_UNSUCCESS_UPLOAD_TO_DOC_SERVICE},
tender_data.log_params()))
if not is_retry:
self.retry_upload_to_doc_service_queue.put(tender_data)
self.upload_to_doc_service_queue.get()

@retry(stop_max_attempt_number=5, wait_exponential_multiplier=retry_mult)
def client_upload_to_doc_service(self, tender_data):
"""Process upload request for retry queue objects."""
return self.doc_service_client.upload(file_name, create_file(tender_data.file_content), 'application/yaml',
headers={'X-Client-Request-ID': tender_data.doc_id()})

def _start_jobs(self):
return {'upload_worker': spawn(self.upload_worker),
'retry_upload_worker': spawn(self.retry_upload_worker)}
Loading