Skip to content

Commit

Permalink
Merge branch 'master' into staging/fix_1746_kubernetes_flow_limits
Browse files Browse the repository at this point in the history
  • Loading branch information
mergify[bot] authored May 17, 2024
2 parents 81a40ea + c870c3d commit b3c9236
Show file tree
Hide file tree
Showing 3 changed files with 59 additions and 7 deletions.
1 change: 1 addition & 0 deletions pipelines/rj_smtr/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -1510,6 +1510,7 @@ class constants(Enum): # pylint: disable=c0103

# INFRAÇÃO
SPPO_INFRACAO_URL = "https://siurblab.rio.rj.gov.br/SMTR/Multas/multas.txt"

SPPO_INFRACAO_MAPPING_KEYS = {
"permissao": "permissao",
"modal": "modo",
Expand Down
16 changes: 9 additions & 7 deletions pipelines/rj_smtr/veiculo/flows.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
create_date_hour_partition,
create_local_partition_path,
get_current_timestamp,
get_raw,
parse_timestamp_to_string,
save_raw_local,
save_treated_local,
Expand All @@ -48,6 +47,7 @@
)

from pipelines.rj_smtr.veiculo.tasks import (
get_raw_ftp,
pre_treatment_sppo_licenciamento,
pre_treatment_sppo_infracao,
get_veiculo_raw_storage,
Expand Down Expand Up @@ -97,10 +97,11 @@
csv_args=constants.SPPO_LICENCIAMENTO_CSV_ARGS.value,
)

raw_status_url = get_raw(
url=constants.SPPO_LICENCIAMENTO_URL.value,
raw_status_url = get_raw_ftp(
ftp_path="LICENCIAMENTO/CadastrodeVeiculos",
filetype="txt",
csv_args=constants.SPPO_LICENCIAMENTO_CSV_ARGS.value,
timestamp=timestamp,
)

ifelse(get_from_storage.is_equal(True), raw_status_gcs, raw_status_url)
Expand Down Expand Up @@ -140,7 +141,7 @@
image=emd_constants.DOCKER_IMAGE.value,
labels=[emd_constants.RJ_SMTR_AGENT_LABEL.value],
)
# sppo_licenciamento_captura.schedule = every_day_hour_seven
sppo_licenciamento_captura.schedule = every_day_hour_seven

with Flow(
f"SMTR: {constants.VEICULO_DATASET_ID.value} {constants.SPPO_INFRACAO_TABLE_ID.value} - Captura",
Expand Down Expand Up @@ -178,10 +179,11 @@
timestamp=timestamp,
csv_args=constants.SPPO_INFRACAO_CSV_ARGS.value,
)
raw_status_url = get_raw(
url=constants.SPPO_INFRACAO_URL.value,
raw_status_url = get_raw_ftp(
ftp_path="MULTAS/MULTAS",
filetype="txt",
csv_args=constants.SPPO_INFRACAO_CSV_ARGS.value,
timestamp=timestamp,
)
ifelse(get_from_storage.is_equal(True), raw_status_gcs, raw_status_url)

Expand Down Expand Up @@ -218,7 +220,7 @@
image=emd_constants.DOCKER_IMAGE.value,
labels=[emd_constants.RJ_SMTR_AGENT_LABEL.value],
)
# sppo_infracao_captura.schedule = every_day_hour_seven
sppo_infracao_captura.schedule = every_day_hour_seven

# flake8: noqa: E501
with Flow(
Expand Down
49 changes: 49 additions & 0 deletions pipelines/rj_smtr/veiculo/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

from pipelines.rj_smtr.constants import constants
from pipelines.rj_smtr.utils import (
connect_ftp,
data_info_str,
filter_data,
)
Expand Down Expand Up @@ -243,3 +244,51 @@ def pre_treatment_sppo_infracao(status: dict, timestamp: datetime):
error = exp

return {"data": data, "error": error}


@task
def get_raw_ftp(
ftp_path: str,
filetype: str,
csv_args: dict,
timestamp: datetime,
):
"""
Retrieves raw data from an FTP server.
Args:
ftp_path (str): The path to the file on the FTP server.
filetype (str): The file extension of the raw data file.
csv_args (dict): Additional arguments to be passed to the `pd.read_csv` function.
timestamp (datetime): The timestamp used to construct the file name.
Returns:
dict: A dictionary containing the retrieved data and any error messages.
The 'data' key holds the retrieved data as a list of dictionaries.
The 'error' key holds any error message encountered during the retrieval process.
"""
data = None
error = None
try:
if filetype in ("csv", "txt"):
ftp_client = connect_ftp(constants.RDO_FTPS_SECRET_PATH.value)
data = io.BytesIO()
ftp_client.retrbinary(
f"RETR {ftp_path}_{timestamp.strftime('%Y%m%d')}.{filetype}",
data.write,
)
data.seek(0)
data = pd.read_csv(
io.StringIO(data.read().decode("utf-8")),
**csv_args,
).to_dict(orient="records")
ftp_client.quit()
else:
error = "Unsupported raw file extension. Supported only: csv and txt"

except Exception:
error = traceback.format_exc()
data = None
log(f"[CATCHED] Task failed with error: \n{error}", level="error")

return {"data": data, "error": error}

0 comments on commit b3c9236

Please sign in to comment.