Skip to content

Commit

Permalink
Merge pull request #20 from huridocs/multi-queue
Browse files Browse the repository at this point in the history
Add queue processor package
  • Loading branch information
gabriel-piles authored Sep 17, 2024
2 parents a1d5021 + b9c3778 commit d4574a5
Show file tree
Hide file tree
Showing 4 changed files with 55 additions and 117 deletions.
3 changes: 1 addition & 2 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
git+https://github.com/huridocs/pdf-document-layout-analysis@d6cbcc4891391fd9f2fc577c9cef6f9c8f7d9e6f
git+https://github.com/huridocs/queue-processor@26c9413ac4fd950ace4ee542d6734e6959e10ea4
graypy==2.1.0
PyYAML==6.0.1
pymongo==4.8.0
PyRSMQ==0.5.1
redis==5.0.7
httpx==0.27.0
sentry-sdk==2.8.0

161 changes: 53 additions & 108 deletions src/QueueProcessor.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
import json
from time import sleep
import pymongo
import redis
from pydantic import ValidationError
from rsmq.consumer import RedisSMQConsumer
from rsmq import RedisSMQ, cmd
from queue_processor.QueueProcessor import QueueProcessor

from sentry_sdk.integrations.redis import RedisIntegration
import sentry_sdk

Expand All @@ -13,123 +11,69 @@
MONGO_PORT,
REDIS_HOST,
REDIS_PORT,
RESULTS_QUEUE_NAME,
TASK_QUEUE_NAME,
SERVICE_HOST,
SERVICE_PORT,
ENVIRONMENT,
SENTRY_DSN,
service_logger,
QUEUES_NAMES,
)
from data_model.ResultMessage import ResultMessage
from data_model.Task import Task
from extract_segments import get_xml_name, extract_segments


class QueueProcessor:
def __init__(self):
client = pymongo.MongoClient(f"{MONGO_HOST}:{MONGO_PORT}")
self.pdf_paragraph_db = client["pdf_paragraph"]
def get_failed_results_message(task: Task, message: str) -> ResultMessage:
return ResultMessage(
tenant=task.tenant,
task=task.task,
params=task.params,
success=False,
error_message=message,
)

self.results_queue = RedisSMQ(
host=REDIS_HOST,
port=REDIS_PORT,
qname=RESULTS_QUEUE_NAME,
)
self.extractions_tasks_queue = RedisSMQ(
host=REDIS_HOST,
port=REDIS_PORT,
qname=TASK_QUEUE_NAME,
)

def process(self, id, message, rc, ts):
try:
task = Task(**message)
except ValidationError:
service_logger.error(f"Not a valid Redis message: {message}")
return True
def process(message):
try:
task = Task(**message)
except ValidationError:
service_logger.error(f"validation error: {message}", exc_info=True)
return None

try:
service_logger.info(f"Processing Redis message: {message}")

try:
xml_file_name = get_xml_name(task)
extraction_data = extract_segments(task, xml_file_name)
service_url = f"{SERVICE_HOST}:{SERVICE_PORT}"
extraction_message = ResultMessage(
tenant=extraction_data.tenant,
task=task.task,
params=task.params,
success=True,
data_url=f"{service_url}/get_paragraphs/{task.tenant}/{task.params.filename}",
file_url=f"{service_url}/get_xml/{xml_file_name}",
)

extraction_data_json = extraction_data.model_dump_json()
self.pdf_paragraph_db.paragraphs.insert_one(json.loads(extraction_data_json))
service_logger.info(f"Results Redis message: {extraction_message}")
self.results_queue.sendMessage(delay=5).message(extraction_message.model_dump_json()).execute()

except RuntimeError:
extraction_message = ResultMessage(
tenant=task.tenant,
task=task.task,
params=task.params,
success=False,
error_message="Error processing the pdf",
)

self.results_queue.sendMessage().message(extraction_message.model_dump_json()).execute()
service_logger.info(extraction_message.model_dump_json(), exc_info=True)

except FileNotFoundError:
extraction_message = ResultMessage(
tenant=task.tenant,
task=task.task,
params=task.params,
success=False,
error_message="Error getting the xml from the pdf",
)

self.results_queue.sendMessage().message(extraction_message.model_dump_json()).execute()
service_logger.error(extraction_message.model_dump_json(), exc_info=True)

except Exception:
extraction_message = ResultMessage(
tenant=task.tenant,
task=task.task,
params=task.params,
success=False,
error_message="Error getting segments",
)

self.results_queue.sendMessage().message(extraction_message.model_dump_json()).execute()
service_logger.error(extraction_message.model_dump_json(), exc_info=True)

return True

def subscribe_to_extractions_tasks_queue(self):
while True:
try:
self.extractions_tasks_queue.getQueueAttributes().exec_command()
self.results_queue.getQueueAttributes().exec_command()

service_logger.info(f"Connecting to redis: {REDIS_HOST}:{REDIS_PORT}")

redis_smq_consumer = RedisSMQConsumer(
qname=TASK_QUEUE_NAME,
processor=self.process,
host=REDIS_HOST,
port=REDIS_PORT,
)
redis_smq_consumer.run()
except redis.exceptions.ConnectionError:
service_logger.error(f"Error connecting to redis: {REDIS_HOST}:{REDIS_PORT}")
sleep(20)
except cmd.exceptions.QueueDoesNotExist:
service_logger.info("Creating queues")
self.extractions_tasks_queue.createQueue().vt(120).exceptions(False).execute()
self.results_queue.createQueue().exceptions(False).execute()
service_logger.info("Queues have been created")
return process_task(task).model_dump_json()
except RuntimeError:
extraction_message = get_failed_results_message(task, "Error processing PDF document")
service_logger.error(extraction_message.model_dump_json(), exc_info=True)
except FileNotFoundError:
extraction_message = get_failed_results_message(task, "Error getting the xml from the pdf")
service_logger.error(extraction_message.model_dump_json(), exc_info=True)
except Exception:
extraction_message = get_failed_results_message(task, "Error getting segments")
service_logger.error(extraction_message.model_dump_json(), exc_info=True)

return extraction_message.model_dump_json()


def process_task(task):
xml_file_name = get_xml_name(task)
extraction_data = extract_segments(task, xml_file_name)
service_url = f"{SERVICE_HOST}:{SERVICE_PORT}"
extraction_message = ResultMessage(
tenant=extraction_data.tenant,
task=task.task,
params=task.params,
success=True,
data_url=f"{service_url}/get_paragraphs/{task.tenant}/{task.params.filename}",
file_url=f"{service_url}/get_xml/{xml_file_name}",
)
extraction_data_json = extraction_data.model_dump_json()
client = pymongo.MongoClient(f"{MONGO_HOST}:{MONGO_PORT}")
pdf_paragraph_db = client["pdf_paragraph"]
pdf_paragraph_db.paragraphs.insert_one(json.loads(extraction_data_json))
service_logger.info(f"Results Redis message: {extraction_message}")
return extraction_message


if __name__ == "__main__":
Expand All @@ -143,5 +87,6 @@ def subscribe_to_extractions_tasks_queue(self):
except Exception:
pass

redis_tasks_processor = QueueProcessor()
redis_tasks_processor.subscribe_to_extractions_tasks_queue()
queues_names = QUEUES_NAMES.split(" ")
queue_processor = QueueProcessor(REDIS_HOST, REDIS_PORT, queues_names, service_logger)
queue_processor.start(process)
4 changes: 1 addition & 3 deletions src/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,7 @@
from pathlib import Path
import graypy

SERVICE_NAME = "segmentation"
TASK_QUEUE_NAME = SERVICE_NAME + "_tasks"
RESULTS_QUEUE_NAME = SERVICE_NAME + "_results"
QUEUES_NAMES = os.environ.get("QUEUES_NAMES", "segmentation")

SERVICE_HOST = os.environ.get("SERVICE_HOST", "http://127.0.0.1")
SERVICE_PORT = os.environ.get("SERVICE_PORT", "5051")
Expand Down
4 changes: 0 additions & 4 deletions src/test_end_to_end.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,11 @@
from data_model.ResultMessage import ResultMessage
from data_model.Params import Params
from data_model.Task import Task
from delete_queues import delete_queues


class TestEndToEnd(TestCase):
service_url = "http://localhost:5051"

def setUp(self):
delete_queues()

def test_error_file(self):
tenant = "end_to_end_test_error"
pdf_file_name = "error_pdf.pdf"
Expand Down

0 comments on commit d4574a5

Please sign in to comment.