diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..46e5ebd --- /dev/null +++ b/.gitignore @@ -0,0 +1,2 @@ +__pycache__ +.devcontainer \ No newline at end of file diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..d3a338a --- /dev/null +++ b/Dockerfile @@ -0,0 +1,5 @@ +FROM python:3.11 +WORKDIR /app +COPY . /app +RUN pip install -r requirements.txt +CMD ["python", "app.py"] diff --git a/app.py b/app.py new file mode 100644 index 0000000..d699c45 --- /dev/null +++ b/app.py @@ -0,0 +1,32 @@ +from config import config +from cbf import worker + +start_consuming = worker.init_app( + host=config.RABBITMQ.HOST, + port=config.RABBITMQ.PORT, + virtual_host=config.RABBITMQ.VIRTUAL_HOST, + user=config.RABBITMQ.USER, + password=config.RABBITMQ.PASSWORD, + qtd_consumers=config.QTD_CONSUMERS, +) + +if config.DEMO: + wk = worker.worker.WorkerCBF( + connection=worker.get_connection( + host=config.RABBITMQ.HOST, + port=config.RABBITMQ.PORT, + virtual_host=config.RABBITMQ.VIRTUAL_HOST, + user=config.RABBITMQ.USER, + password=config.RABBITMQ.PASSWORD, + ) + ) + for partida in range(config.DEMO.inicio, config.DEMO.fim + 1): + wk.publish( + wk.QueryParams.new( + serie=config.DEMO.serie, + ano=config.DEMO.ano, + partida=partida, + ) + ) + +start_consuming(now=True) diff --git a/cbf/__init__.py b/cbf/__init__.py new file mode 100644 index 0000000..d53f3b5 --- /dev/null +++ b/cbf/__init__.py @@ -0,0 +1,51 @@ +from loguru import logger +import re +from requests import Session + + +class CBF(Session): + URL_BASE = "https://www.cbf.com.br" + URL_CAMP_BRASILEIRO_SERIE_FORMAT = ( + URL_BASE + "/futebol-brasileiro/competicoes/" + "campeonato-brasileiro-serie-" + "{serie}/{ano}/{partida}" + ) + URL_EXTERNAL_SUMULA_FORMAT = URL_BASE + "/external/sumula/{ano}/" + + def request_get_serie(self, serie, ano, partida): + msg = f"REQUEST GET SERIE! SERIE:{serie} ANO:{ano} PARTIDA:{partida}" + logger.debug(msg) + return self.get( + self.URL_CAMP_BRASILEIRO_SERIE_FORMAT.format( + serie=serie.lower(), ano=ano, partida=partida + ) + ) + + def get_url_external(self, html, ano): + url = self.URL_EXTERNAL_SUMULA_FORMAT.format(ano=ano) + for row in html.split("\n"): + if url in row: + # https://www.cbf.com.br/external/sumula/2023/[0-9]+ + query = f"{url}[0-9]+" + return re.search(query, row).group() + + def get_url_sumula_external(self, serie, ano, partida): + """PEGA O LINK DO PDF DE RELATORIO DA PARTIDA!""" + logger.debug( + "PEGA O LINK DO PDF DE RELATORIO DA PARTIDA!" + f"SERIE: {serie} ANO: {ano} PARTIDA: {partida}" + ) + response = self.request_get_serie(serie, ano, partida) + + return self.get_url_external(response.text, ano) + + def request_download_file(self, url): + logger.debug(f"FAZ O DOWNLOAD DE UM ARQUIVO! URL: {url}") + return self.get(url).content + + +if __name__ == "__main__": + cbf = CBF() + url = cbf.get_url_sumula_external("A", 2021, 1) + file = cbf.request_download_file(url) + print(type(file)) diff --git a/cbf/worker/__init__.py b/cbf/worker/__init__.py new file mode 100644 index 0000000..51c12f8 --- /dev/null +++ b/cbf/worker/__init__.py @@ -0,0 +1,22 @@ +from .worker import WorkerCBF, blocking_scheduler, get_connection + +from functools import partial + +__all__ = ["WorkerCBF", "blocking_scheduler", "init_app"] + + +def init_app(host, port, virtual_host, user, password, qtd_consumers): + connection = get_connection(host, port, virtual_host, user, password) + worker = WorkerCBF(connection, qtd_consumers=qtd_consumers) + blocking_scheduler.add_job( + worker.heartbeat, + trigger="interval", + seconds=60 * 5, + ) + return partial(start, worker=worker) + + +def start(now=False, worker=None): + if now: + worker.heartbeat() + blocking_scheduler.start() diff --git a/cbf/worker/worker.py b/cbf/worker/worker.py new file mode 100644 index 0000000..ed033a8 --- /dev/null +++ b/cbf/worker/worker.py @@ -0,0 +1,216 @@ +import pika +from pydantic import BaseModel +from uuid import uuid4 +import json +from apscheduler.schedulers.blocking import BlockingScheduler +from pika.channel import Channel +from multiprocessing import Process +from loguru import logger +import gzip +from .. import CBF + +blocking_scheduler = BlockingScheduler( + timezone="America/Sao_Paulo", + daemon=False, +) + + +def get_connection(host, port, virtual_host, user, password): + parameters = pika.ConnectionParameters( + host=host, + port=port, + virtual_host=virtual_host, + credentials=pika.PlainCredentials(user, password), + ) + connection = pika.BlockingConnection(parameters) + return connection + + +class QueryParams(BaseModel): + id: str + serie: str + ano: int + partida: int + + @classmethod + def new(cls, serie, ano, partida): + return cls(id=str(uuid4()), serie=serie, ano=ano, partida=partida) + + def process(self): + body = self.model_dump() + body = json.dumps(body) + return body + + +class WorkerCBF: + QUEUE_DOWNLOAD = "cbf.download.pdf" + SEND_EXCHANGE = "cbf.download.pdf.success" + BACKUP_SUCCSS = "cbf.download.pdf.backup" + + SEGUNDO = 1000 + MINUTO = SEGUNDO * 60 + HORA = MINUTO * 60 + DIA = HORA * 24 + DIAS = DIA * 1 + qtd_consumers = 0 + + BACKUP_SUCCSS_TTL = DIAS * 1 + + QueryParams = QueryParams + + def __init__(self, connection, channel: Channel = None, qtd_consumers=0): + self.connection = connection + self.channel = channel or connection.channel() + self.qtd_consumers = qtd_consumers + + class SuccessParams(BaseModel): + queue_params: QueryParams + file: bytes + + def process(self): + body = self.model_dump() + body["file"] = gzip.compress(self.file).decode("latin1") + body = json.dumps(body) + return gzip.compress(body.encode("latin1")).decode("latin1") + + @classmethod + def _declare_queue( + cls, + channel, + queue, + create_dql=True, + args=None, + delete=False, + ): + if not args: + args = {} + ch = channel.connection.channel() + if create_dql: + queue_dlq = ch.queue_declare( + queue=f"{queue}.dlq", + durable=True, + ) + args.update( + { + "x-dead-letter-exchange": "", + "x-dead-letter-routing-key": queue_dlq.method.queue, + } + ) + try: + return ch.queue_declare( + queue=queue, + durable=True, + arguments=args, + ) + except Exception as e: + logger.error(e) + if delete: + ch = channel.connection.channel() + channel.queue_delete(queue=queue) + return ch.queue_declare( + queue=queue, + durable=True, + arguments=args, + ) + + def _declare_exchange(self, channel, exchange): + return channel.exchange_declare( + exchange=exchange, + durable=True, + exchange_type="fanout", + ) + + @classmethod + def _publish(cls, channel, queue, body): + body = body.process() + channel.basic_publish( + exchange="", + routing_key=queue, + body=body, + properties=pika.BasicProperties(delivery_mode=2), + ) + + def declare(self): + channel = self.connection.channel() + queue = self._declare_queue(channel, self.QUEUE_DOWNLOAD) + self._declare_exchange(channel, self.SEND_EXCHANGE) + queue_backup = self._declare_queue( + channel, + self.BACKUP_SUCCSS, + create_dql=False, + args={ + "x-message-ttl": self.BACKUP_SUCCSS_TTL, + }, + delete=True, + ) + channel.queue_bind( + exchange=self.SEND_EXCHANGE, + queue=queue_backup.method.queue, + ) + return queue + + def publish(self, body: QueryParams): + if not isinstance(body, self.QueryParams): + raise TypeError("body must be a pydantic BaseModel") + self._publish(self.channel, self.QUEUE_DOWNLOAD, body) + + def heartbeat(self): + logger.debug("HEARTBEAT") + queue = self.declare() + mensagens = queue.method.message_count + qtd_consumers = queue.method.consumer_count + diff = self.qtd_consumers - qtd_consumers + logger.debug(f"mensagens: {mensagens} qtd_consumers: {qtd_consumers}") + logger.debug(f"diff: {diff}") + if mensagens > 0 and diff > 0: + for _ in range(diff): + self.new_consumer() + + def new_consumer(self): + Process(target=self.consumer, args=(self.connection,)).start() + + @classmethod + def consumer(cls, connection): + worker = cls(connection) + channel: Channel = worker.channel + worker.declare() + channel.basic_qos(prefetch_count=1) + channel.basic_consume( + queue=worker.QUEUE_DOWNLOAD, + on_message_callback=worker.callback, + auto_ack=False, + ) + channel.start_consuming() + + def callback(self, ch, method, properties, body): + try: + body = json.loads(body) + params = self.QueryParams(**body) + except Exception as e: + logger.error(e) + ch.basic_reject(delivery_tag=method.delivery_tag, requeue=False) + + cbf = CBF() + url = cbf.get_url_sumula_external( + params.serie, + params.ano, + params.partida, + ) + file = cbf.request_download_file(url) + + self.send_to_success(ch, params, file) + + ch.basic_ack(delivery_tag=method.delivery_tag) + + def send_to_success(self, ch, query_params: QueryParams, file: bytes): + body = self.SuccessParams( + queue_params=query_params, + file=file, + ) + + ch.basic_publish( + exchange=self.SEND_EXCHANGE, + routing_key="", + body=body.process(), + properties=pika.BasicProperties(delivery_mode=2), + ) diff --git a/config.py b/config.py new file mode 100644 index 0000000..01c0777 --- /dev/null +++ b/config.py @@ -0,0 +1,31 @@ +import os +from feiticeiro_tec.config.with_dynaconf import TypeEnv +from pydantic import BaseModel + + +class RabbitMQ(BaseModel): + HOST: str + PORT: int + VIRTUAL_HOST: str + USER: str + PASSWORD: str + + +class DemoColeta(BaseModel): + serie: str + ano: int + inicio: int = 1 + fim: int = 1 + + +class Config(TypeEnv): + RABBITMQ: RabbitMQ + QTD_CONSUMERS: int + DEMO: DemoColeta = None + + +config = Config.load_env( + default="DEFAULT", + env=os.environ.get("ENV", "DEV"), + settings_files=["settings.yml"], +) diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..6d48bff --- /dev/null +++ b/requirements.txt @@ -0,0 +1,17 @@ +annotated-types==0.6.0 +APScheduler==3.10.4 +certifi==2023.11.17 +charset-normalizer==3.3.2 +dynaconf==3.2.4 +feiticeiro_tec==0.4.0 +idna==3.6 +loguru==0.7.2 +pika==1.3.2 +pydantic==2.5.3 +pydantic_core==2.14.6 +pytz==2023.3.post1 +requests==2.31.0 +six==1.16.0 +typing_extensions==4.9.0 +tzlocal==5.2 +urllib3==2.1.0 diff --git a/settings.yml b/settings.yml new file mode 100644 index 0000000..7aca4d8 --- /dev/null +++ b/settings.yml @@ -0,0 +1,21 @@ +DEFAULT: + QTD_CONSUMERS: 1 + RABBITMQ: + HOST: "localhost" + PORT: 5672 + VIRTUAL_HOST: "/" + USER: "guest" + PASSWORD: "guest" + +DEV: + DEMO: + serie: "A" + ano: 2023 + inicio: 1 + fim: 100 + RABBITMQ: + HOST: "172.17.0.5" + PORT: 5672 + VIRTUAL_HOST: "/" + USER: "guest" + PASSWORD: "guest"