Skip to content

Commit

Permalink
Merge pull request #1 from feiticeiro-tec/development
Browse files Browse the repository at this point in the history
Development
  • Loading branch information
feiticeiro-tec authored Jan 15, 2024
2 parents 1391e23 + 11136a3 commit 5f827ca
Show file tree
Hide file tree
Showing 9 changed files with 397 additions and 0 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
__pycache__
.devcontainer
5 changes: 5 additions & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
FROM python:3.11
WORKDIR /app
COPY . /app
RUN pip install -r requirements.txt
CMD ["python", "app.py"]
32 changes: 32 additions & 0 deletions app.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
from config import config
from cbf import worker

start_consuming = worker.init_app(
host=config.RABBITMQ.HOST,
port=config.RABBITMQ.PORT,
virtual_host=config.RABBITMQ.VIRTUAL_HOST,
user=config.RABBITMQ.USER,
password=config.RABBITMQ.PASSWORD,
qtd_consumers=config.QTD_CONSUMERS,
)

if config.DEMO:
wk = worker.worker.WorkerCBF(
connection=worker.get_connection(
host=config.RABBITMQ.HOST,
port=config.RABBITMQ.PORT,
virtual_host=config.RABBITMQ.VIRTUAL_HOST,
user=config.RABBITMQ.USER,
password=config.RABBITMQ.PASSWORD,
)
)
for partida in range(config.DEMO.inicio, config.DEMO.fim + 1):
wk.publish(
wk.QueryParams.new(
serie=config.DEMO.serie,
ano=config.DEMO.ano,
partida=partida,
)
)

start_consuming(now=True)
51 changes: 51 additions & 0 deletions cbf/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
from loguru import logger
import re
from requests import Session


class CBF(Session):
URL_BASE = "https://www.cbf.com.br"
URL_CAMP_BRASILEIRO_SERIE_FORMAT = (
URL_BASE + "/futebol-brasileiro/competicoes/"
"campeonato-brasileiro-serie-"
"{serie}/{ano}/{partida}"
)
URL_EXTERNAL_SUMULA_FORMAT = URL_BASE + "/external/sumula/{ano}/"

def request_get_serie(self, serie, ano, partida):
msg = f"REQUEST GET SERIE! SERIE:{serie} ANO:{ano} PARTIDA:{partida}"
logger.debug(msg)
return self.get(
self.URL_CAMP_BRASILEIRO_SERIE_FORMAT.format(
serie=serie.lower(), ano=ano, partida=partida
)
)

def get_url_external(self, html, ano):
url = self.URL_EXTERNAL_SUMULA_FORMAT.format(ano=ano)
for row in html.split("\n"):
if url in row:
# https://www.cbf.com.br/external/sumula/2023/[0-9]+
query = f"{url}[0-9]+"
return re.search(query, row).group()

def get_url_sumula_external(self, serie, ano, partida):
"""PEGA O LINK DO PDF DE RELATORIO DA PARTIDA!"""
logger.debug(
"PEGA O LINK DO PDF DE RELATORIO DA PARTIDA!"
f"SERIE: {serie} ANO: {ano} PARTIDA: {partida}"
)
response = self.request_get_serie(serie, ano, partida)

return self.get_url_external(response.text, ano)

def request_download_file(self, url):
logger.debug(f"FAZ O DOWNLOAD DE UM ARQUIVO! URL: {url}")
return self.get(url).content


if __name__ == "__main__":
cbf = CBF()
url = cbf.get_url_sumula_external("A", 2021, 1)
file = cbf.request_download_file(url)
print(type(file))
22 changes: 22 additions & 0 deletions cbf/worker/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
from .worker import WorkerCBF, blocking_scheduler, get_connection

from functools import partial

__all__ = ["WorkerCBF", "blocking_scheduler", "init_app"]


def init_app(host, port, virtual_host, user, password, qtd_consumers):
connection = get_connection(host, port, virtual_host, user, password)
worker = WorkerCBF(connection, qtd_consumers=qtd_consumers)
blocking_scheduler.add_job(
worker.heartbeat,
trigger="interval",
seconds=60 * 5,
)
return partial(start, worker=worker)


def start(now=False, worker=None):
if now:
worker.heartbeat()
blocking_scheduler.start()
216 changes: 216 additions & 0 deletions cbf/worker/worker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,216 @@
import pika
from pydantic import BaseModel
from uuid import uuid4
import json
from apscheduler.schedulers.blocking import BlockingScheduler
from pika.channel import Channel
from multiprocessing import Process
from loguru import logger
import gzip
from .. import CBF

blocking_scheduler = BlockingScheduler(
timezone="America/Sao_Paulo",
daemon=False,
)


def get_connection(host, port, virtual_host, user, password):
parameters = pika.ConnectionParameters(
host=host,
port=port,
virtual_host=virtual_host,
credentials=pika.PlainCredentials(user, password),
)
connection = pika.BlockingConnection(parameters)
return connection


class QueryParams(BaseModel):
id: str
serie: str
ano: int
partida: int

@classmethod
def new(cls, serie, ano, partida):
return cls(id=str(uuid4()), serie=serie, ano=ano, partida=partida)

def process(self):
body = self.model_dump()
body = json.dumps(body)
return body


class WorkerCBF:
QUEUE_DOWNLOAD = "cbf.download.pdf"
SEND_EXCHANGE = "cbf.download.pdf.success"
BACKUP_SUCCSS = "cbf.download.pdf.backup"

SEGUNDO = 1000
MINUTO = SEGUNDO * 60
HORA = MINUTO * 60
DIA = HORA * 24
DIAS = DIA * 1
qtd_consumers = 0

BACKUP_SUCCSS_TTL = DIAS * 1

QueryParams = QueryParams

def __init__(self, connection, channel: Channel = None, qtd_consumers=0):
self.connection = connection
self.channel = channel or connection.channel()
self.qtd_consumers = qtd_consumers

class SuccessParams(BaseModel):
queue_params: QueryParams
file: bytes

def process(self):
body = self.model_dump()
body["file"] = gzip.compress(self.file).decode("latin1")
body = json.dumps(body)
return gzip.compress(body.encode("latin1")).decode("latin1")

@classmethod
def _declare_queue(
cls,
channel,
queue,
create_dql=True,
args=None,
delete=False,
):
if not args:
args = {}
ch = channel.connection.channel()
if create_dql:
queue_dlq = ch.queue_declare(
queue=f"{queue}.dlq",
durable=True,
)
args.update(
{
"x-dead-letter-exchange": "",
"x-dead-letter-routing-key": queue_dlq.method.queue,
}
)
try:
return ch.queue_declare(
queue=queue,
durable=True,
arguments=args,
)
except Exception as e:
logger.error(e)
if delete:
ch = channel.connection.channel()
channel.queue_delete(queue=queue)
return ch.queue_declare(
queue=queue,
durable=True,
arguments=args,
)

def _declare_exchange(self, channel, exchange):
return channel.exchange_declare(
exchange=exchange,
durable=True,
exchange_type="fanout",
)

@classmethod
def _publish(cls, channel, queue, body):
body = body.process()
channel.basic_publish(
exchange="",
routing_key=queue,
body=body,
properties=pika.BasicProperties(delivery_mode=2),
)

def declare(self):
channel = self.connection.channel()
queue = self._declare_queue(channel, self.QUEUE_DOWNLOAD)
self._declare_exchange(channel, self.SEND_EXCHANGE)
queue_backup = self._declare_queue(
channel,
self.BACKUP_SUCCSS,
create_dql=False,
args={
"x-message-ttl": self.BACKUP_SUCCSS_TTL,
},
delete=True,
)
channel.queue_bind(
exchange=self.SEND_EXCHANGE,
queue=queue_backup.method.queue,
)
return queue

def publish(self, body: QueryParams):
if not isinstance(body, self.QueryParams):
raise TypeError("body must be a pydantic BaseModel")
self._publish(self.channel, self.QUEUE_DOWNLOAD, body)

def heartbeat(self):
logger.debug("HEARTBEAT")
queue = self.declare()
mensagens = queue.method.message_count
qtd_consumers = queue.method.consumer_count
diff = self.qtd_consumers - qtd_consumers
logger.debug(f"mensagens: {mensagens} qtd_consumers: {qtd_consumers}")
logger.debug(f"diff: {diff}")
if mensagens > 0 and diff > 0:
for _ in range(diff):
self.new_consumer()

def new_consumer(self):
Process(target=self.consumer, args=(self.connection,)).start()

@classmethod
def consumer(cls, connection):
worker = cls(connection)
channel: Channel = worker.channel
worker.declare()
channel.basic_qos(prefetch_count=1)
channel.basic_consume(
queue=worker.QUEUE_DOWNLOAD,
on_message_callback=worker.callback,
auto_ack=False,
)
channel.start_consuming()

def callback(self, ch, method, properties, body):
try:
body = json.loads(body)
params = self.QueryParams(**body)
except Exception as e:
logger.error(e)
ch.basic_reject(delivery_tag=method.delivery_tag, requeue=False)

cbf = CBF()
url = cbf.get_url_sumula_external(
params.serie,
params.ano,
params.partida,
)
file = cbf.request_download_file(url)

self.send_to_success(ch, params, file)

ch.basic_ack(delivery_tag=method.delivery_tag)

def send_to_success(self, ch, query_params: QueryParams, file: bytes):
body = self.SuccessParams(
queue_params=query_params,
file=file,
)

ch.basic_publish(
exchange=self.SEND_EXCHANGE,
routing_key="",
body=body.process(),
properties=pika.BasicProperties(delivery_mode=2),
)
31 changes: 31 additions & 0 deletions config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
import os
from feiticeiro_tec.config.with_dynaconf import TypeEnv
from pydantic import BaseModel


class RabbitMQ(BaseModel):
HOST: str
PORT: int
VIRTUAL_HOST: str
USER: str
PASSWORD: str


class DemoColeta(BaseModel):
serie: str
ano: int
inicio: int = 1
fim: int = 1


class Config(TypeEnv):
RABBITMQ: RabbitMQ
QTD_CONSUMERS: int
DEMO: DemoColeta = None


config = Config.load_env(
default="DEFAULT",
env=os.environ.get("ENV", "DEV"),
settings_files=["settings.yml"],
)
17 changes: 17 additions & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
annotated-types==0.6.0
APScheduler==3.10.4
certifi==2023.11.17
charset-normalizer==3.3.2
dynaconf==3.2.4
feiticeiro_tec==0.4.0
idna==3.6
loguru==0.7.2
pika==1.3.2
pydantic==2.5.3
pydantic_core==2.14.6
pytz==2023.3.post1
requests==2.31.0
six==1.16.0
typing_extensions==4.9.0
tzlocal==5.2
urllib3==2.1.0
Loading

0 comments on commit 5f827ca

Please sign in to comment.