diff --git a/pipelines/rj_smtr/constants.py b/pipelines/rj_smtr/constants.py index cf2348936..a6a424597 100644 --- a/pipelines/rj_smtr/constants.py +++ b/pipelines/rj_smtr/constants.py @@ -1510,6 +1510,7 @@ class constants(Enum): # pylint: disable=c0103 # INFRAÇÃO SPPO_INFRACAO_URL = "https://siurblab.rio.rj.gov.br/SMTR/Multas/multas.txt" + SPPO_INFRACAO_MAPPING_KEYS = { "permissao": "permissao", "modal": "modo", diff --git a/pipelines/rj_smtr/veiculo/flows.py b/pipelines/rj_smtr/veiculo/flows.py index 95cfc721a..46e2a20b6 100644 --- a/pipelines/rj_smtr/veiculo/flows.py +++ b/pipelines/rj_smtr/veiculo/flows.py @@ -35,7 +35,6 @@ create_date_hour_partition, create_local_partition_path, get_current_timestamp, - get_raw, parse_timestamp_to_string, save_raw_local, save_treated_local, @@ -48,6 +47,7 @@ ) from pipelines.rj_smtr.veiculo.tasks import ( + get_raw_ftp, pre_treatment_sppo_licenciamento, pre_treatment_sppo_infracao, get_veiculo_raw_storage, @@ -97,10 +97,11 @@ csv_args=constants.SPPO_LICENCIAMENTO_CSV_ARGS.value, ) - raw_status_url = get_raw( - url=constants.SPPO_LICENCIAMENTO_URL.value, + raw_status_url = get_raw_ftp( + ftp_path="LICENCIAMENTO/CadastrodeVeiculos", filetype="txt", csv_args=constants.SPPO_LICENCIAMENTO_CSV_ARGS.value, + timestamp=timestamp, ) ifelse(get_from_storage.is_equal(True), raw_status_gcs, raw_status_url) @@ -140,7 +141,7 @@ image=emd_constants.DOCKER_IMAGE.value, labels=[emd_constants.RJ_SMTR_AGENT_LABEL.value], ) -# sppo_licenciamento_captura.schedule = every_day_hour_seven +sppo_licenciamento_captura.schedule = every_day_hour_seven with Flow( f"SMTR: {constants.VEICULO_DATASET_ID.value} {constants.SPPO_INFRACAO_TABLE_ID.value} - Captura", @@ -178,10 +179,11 @@ timestamp=timestamp, csv_args=constants.SPPO_INFRACAO_CSV_ARGS.value, ) - raw_status_url = get_raw( - url=constants.SPPO_INFRACAO_URL.value, + raw_status_url = get_raw_ftp( + ftp_path="MULTAS/MULTAS", filetype="txt", csv_args=constants.SPPO_INFRACAO_CSV_ARGS.value, + timestamp=timestamp, ) ifelse(get_from_storage.is_equal(True), raw_status_gcs, raw_status_url) @@ -218,7 +220,7 @@ image=emd_constants.DOCKER_IMAGE.value, labels=[emd_constants.RJ_SMTR_AGENT_LABEL.value], ) -# sppo_infracao_captura.schedule = every_day_hour_seven +sppo_infracao_captura.schedule = every_day_hour_seven # flake8: noqa: E501 with Flow( diff --git a/pipelines/rj_smtr/veiculo/tasks.py b/pipelines/rj_smtr/veiculo/tasks.py index 06b09e82d..74f07ca57 100644 --- a/pipelines/rj_smtr/veiculo/tasks.py +++ b/pipelines/rj_smtr/veiculo/tasks.py @@ -19,6 +19,7 @@ from pipelines.rj_smtr.constants import constants from pipelines.rj_smtr.utils import ( + connect_ftp, data_info_str, filter_data, ) @@ -243,3 +244,51 @@ def pre_treatment_sppo_infracao(status: dict, timestamp: datetime): error = exp return {"data": data, "error": error} + + +@task +def get_raw_ftp( + ftp_path: str, + filetype: str, + csv_args: dict, + timestamp: datetime, +): + """ + Retrieves raw data from an FTP server. + + Args: + ftp_path (str): The path to the file on the FTP server. + filetype (str): The file extension of the raw data file. + csv_args (dict): Additional arguments to be passed to the `pd.read_csv` function. + timestamp (datetime): The timestamp used to construct the file name. + + Returns: + dict: A dictionary containing the retrieved data and any error messages. + The 'data' key holds the retrieved data as a list of dictionaries. + The 'error' key holds any error message encountered during the retrieval process. + """ + data = None + error = None + try: + if filetype in ("csv", "txt"): + ftp_client = connect_ftp(constants.RDO_FTPS_SECRET_PATH.value) + data = io.BytesIO() + ftp_client.retrbinary( + f"RETR {ftp_path}_{timestamp.strftime('%Y%m%d')}.{filetype}", + data.write, + ) + data.seek(0) + data = pd.read_csv( + io.StringIO(data.read().decode("utf-8")), + **csv_args, + ).to_dict(orient="records") + ftp_client.quit() + else: + error = "Unsupported raw file extension. Supported only: csv and txt" + + except Exception: + error = traceback.format_exc() + data = None + log(f"[CATCHED] Task failed with error: \n{error}", level="error") + + return {"data": data, "error": error}