diff --git a/pipelines/rj_cor/__init__.py b/pipelines/rj_cor/__init__.py index 34ffc0e76..711a915fe 100644 --- a/pipelines/rj_cor/__init__.py +++ b/pipelines/rj_cor/__init__.py @@ -7,6 +7,7 @@ from pipelines.rj_cor.meteorologia.meteorologia_redemet.flows import * from pipelines.rj_cor.meteorologia.precipitacao_alertario.flows import * from pipelines.rj_cor.meteorologia.precipitacao_cemaden.flows import * +from pipelines.rj_cor.meteorologia.precipitacao_inea.flows import * from pipelines.rj_cor.meteorologia.satelite.flows import * from pipelines.rj_cor.meteorologia.precipitacao_websirene.flows import * from pipelines.rj_cor.meteorologia.radar.precipitacao.flows import * diff --git a/pipelines/rj_cor/meteorologia/precipitacao_inea/flows.py b/pipelines/rj_cor/meteorologia/precipitacao_inea/flows.py new file mode 100644 index 000000000..19307c5b4 --- /dev/null +++ b/pipelines/rj_cor/meteorologia/precipitacao_inea/flows.py @@ -0,0 +1,240 @@ +# -*- coding: utf-8 -*- +# pylint: disable=C0103 +""" +Flows for precipitacao_inea. +""" +from datetime import timedelta + +from prefect import case, Parameter +from prefect.run_configs import KubernetesRun +from prefect.storage import GCS +from prefect.tasks.prefect import create_flow_run + +from pipelines.constants import constants +from pipelines.utils.constants import constants as utils_constants +from pipelines.utils.custom import wait_for_flow_run_with_timeout +from pipelines.rj_cor.meteorologia.precipitacao_inea.tasks import ( + check_for_new_stations, + check_new_data, + download_data, + treat_data, + save_data, + wait_task, +) +from pipelines.rj_cor.meteorologia.precipitacao_inea.schedules import ( + minute_schedule, +) +from pipelines.utils.decorators import Flow +from pipelines.utils.dump_db.constants import constants as dump_db_constants +from pipelines.utils.dump_to_gcs.constants import constants as dump_to_gcs_constants +from pipelines.utils.tasks import ( + create_table_and_upload_to_gcs, + get_current_flow_labels, +) + +wait_for_flow_run_with_2min_timeout = wait_for_flow_run_with_timeout( + timeout=timedelta(minutes=2) +) + +with Flow( + name="COR: Meteorologia - Precipitacao e Fluviometria INEA", + code_owners=[ + "paty", + ], + # skip_if_running=True, +) as cor_meteorologia_precipitacao_inea: + DUMP_MODE = Parameter("dump_mode", default="append", required=True) + DATASET_ID_PLUVIOMETRIC = Parameter( + "dataset_id_pluviometric", default="clima_pluviometro", required=True + ) + TABLE_ID_PLUVIOMETRIC = Parameter( + "table_id_pluviometric", default="taxa_precipitacao_inea", required=True + ) + DATASET_ID_FLUVIOMETRIC = Parameter( + "dataset_id_fluviometric", default="clima_fluviometro", required=True + ) + TABLE_ID_FLUVIOMETRIC = Parameter( + "table_id_fluviometric", default="lamina_agua_inea", required=True + ) + + # Materialization parameters + MATERIALIZE_AFTER_DUMP = Parameter( + "materialize_after_dump", default=True, required=False + ) + MATERIALIZE_TO_DATARIO = Parameter( + "materialize_to_datario", default=True, required=False + ) + MATERIALIZATION_MODE = Parameter("mode", default="prod", required=False) + + # Dump to GCS after? Should only dump to GCS if materializing to datario + DUMP_TO_GCS = Parameter("dump_to_gcs", default=False, required=False) + + MAXIMUM_BYTES_PROCESSED = Parameter( + "maximum_bytes_processed", + required=False, + default=dump_to_gcs_constants.MAX_BYTES_PROCESSED_PER_TABLE.value, + ) + + dataframe = download_data() + dfr_pluviometric, dfr_fluviometric = treat_data( + dataframe=dataframe, + dataset_id=DATASET_ID_PLUVIOMETRIC, + table_id=TABLE_ID_PLUVIOMETRIC, + mode=MATERIALIZATION_MODE, + ) + new_pluviometric_data, new_fluviometric_data = check_new_data( + dfr_pluviometric, dfr_fluviometric + ) + + with case(new_pluviometric_data, True): + path_pluviometric = save_data( + dataframe=dfr_pluviometric, folder_name="pluviometer" + ) + + # Create pluviometric table in BigQuery + UPLOAD_TABLE_PLUVIOMETRIC = create_table_and_upload_to_gcs( + data_path=path_pluviometric, + dataset_id=DATASET_ID_PLUVIOMETRIC, + table_id=TABLE_ID_PLUVIOMETRIC, + dump_mode=DUMP_MODE, + wait=path_pluviometric, + ) + + # Trigger pluviometric DBT flow run + with case(MATERIALIZE_AFTER_DUMP, True): + current_flow_labels = get_current_flow_labels() + materialization_flow = create_flow_run( + flow_name=utils_constants.FLOW_EXECUTE_DBT_MODEL_NAME.value, + project_name=constants.PREFECT_DEFAULT_PROJECT.value, + parameters={ + "dataset_id": DATASET_ID_PLUVIOMETRIC, + "table_id": TABLE_ID_PLUVIOMETRIC, + "mode": MATERIALIZATION_MODE, + "materialize_to_datario": MATERIALIZE_TO_DATARIO, + }, + labels=current_flow_labels, + run_name=f"Materialize {DATASET_ID_PLUVIOMETRIC}.{TABLE_ID_PLUVIOMETRIC}", + ) + + materialization_flow.set_upstream(current_flow_labels) + + wait_for_materialization = wait_for_flow_run_with_2min_timeout( + flow_run_id=materialization_flow, + stream_states=True, + stream_logs=True, + raise_final_state=True, + ) + wait_for_materialization.max_retries = ( + dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_ATTEMPTS.value + ) + wait_for_materialization.retry_delay = timedelta( + seconds=dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_INTERVAL.value + ) + + with case(DUMP_TO_GCS, True): + # Trigger Dump to GCS flow run with project id as datario + dump_to_gcs_flow = create_flow_run( + flow_name=utils_constants.FLOW_DUMP_TO_GCS_NAME.value, + project_name=constants.PREFECT_DEFAULT_PROJECT.value, + parameters={ + "project_id": "datario", + "dataset_id": DATASET_ID_PLUVIOMETRIC, + "table_id": TABLE_ID_PLUVIOMETRIC, + "maximum_bytes_processed": MAXIMUM_BYTES_PROCESSED, + }, + labels=[ + "datario", + ], + run_name=f"Dump to GCS {DATASET_ID_PLUVIOMETRIC}.{TABLE_ID_PLUVIOMETRIC}", + ) + dump_to_gcs_flow.set_upstream(wait_for_materialization) + + wait_for_dump_to_gcs = wait_for_flow_run_with_2min_timeout( + flow_run_id=dump_to_gcs_flow, + stream_states=True, + stream_logs=True, + raise_final_state=True, + ) + + status = wait_task() + status.set_upstream(UPLOAD_TABLE_PLUVIOMETRIC) + with case(new_fluviometric_data, True): + path_fluviometric = save_data( + dataframe=dfr_fluviometric, folder_name="fluviometer" + ) + path_fluviometric.set_upstream(status) + + # Create fluviometric table in BigQuery + UPLOAD_TABLE_FLUVIOMETRIC = create_table_and_upload_to_gcs( + data_path=path_fluviometric, + dataset_id=DATASET_ID_FLUVIOMETRIC, + table_id=TABLE_ID_FLUVIOMETRIC, + dump_mode=DUMP_MODE, + wait=path_fluviometric, + ) + + # Trigger DBT flow run + with case(MATERIALIZE_AFTER_DUMP, True): + current_flow_labels = get_current_flow_labels() + materialization_flow = create_flow_run( + flow_name=utils_constants.FLOW_EXECUTE_DBT_MODEL_NAME.value, + project_name=constants.PREFECT_DEFAULT_PROJECT.value, + parameters={ + "dataset_id": DATASET_ID_FLUVIOMETRIC, + "table_id": TABLE_ID_FLUVIOMETRIC, + "mode": MATERIALIZATION_MODE, + "materialize_to_datario": MATERIALIZE_TO_DATARIO, + }, + labels=current_flow_labels, + run_name=f"Materialize {DATASET_ID_FLUVIOMETRIC}.{TABLE_ID_FLUVIOMETRIC}", + ) + + materialization_flow.set_upstream(current_flow_labels) + + wait_for_materialization = wait_for_flow_run_with_2min_timeout( + flow_run_id=materialization_flow, + stream_states=True, + stream_logs=True, + raise_final_state=True, + ) + wait_for_materialization.max_retries = ( + dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_ATTEMPTS.value + ) + wait_for_materialization.retry_delay = timedelta( + seconds=dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_INTERVAL.value + ) + + with case(DUMP_TO_GCS, True): + # Trigger Dump to GCS flow run with project id as datario + dump_to_gcs_flow = create_flow_run( + flow_name=utils_constants.FLOW_DUMP_TO_GCS_NAME.value, + project_name=constants.PREFECT_DEFAULT_PROJECT.value, + parameters={ + "project_id": "datario", + "dataset_id": DATASET_ID_FLUVIOMETRIC, + "table_id": TABLE_ID_FLUVIOMETRIC, + "maximum_bytes_processed": MAXIMUM_BYTES_PROCESSED, + }, + labels=[ + "datario", + ], + run_name=f"Dump to GCS {DATASET_ID_FLUVIOMETRIC}.{TABLE_ID_FLUVIOMETRIC}", + ) + dump_to_gcs_flow.set_upstream(wait_for_materialization) + + wait_for_dump_to_gcs = wait_for_flow_run_with_2min_timeout( + flow_run_id=dump_to_gcs_flow, + stream_states=True, + stream_logs=True, + raise_final_state=True, + ) + + check_for_new_stations(dataframe, wait=UPLOAD_TABLE_PLUVIOMETRIC) + +# para rodar na cloud +cor_meteorologia_precipitacao_inea.storage = GCS(constants.GCS_FLOWS_BUCKET.value) +cor_meteorologia_precipitacao_inea.run_config = KubernetesRun( + image=constants.DOCKER_IMAGE.value, + labels=[constants.RJ_COR_AGENT_LABEL.value], +) +cor_meteorologia_precipitacao_inea.schedule = minute_schedule diff --git a/pipelines/rj_cor/meteorologia/precipitacao_inea/schedules.py b/pipelines/rj_cor/meteorologia/precipitacao_inea/schedules.py new file mode 100644 index 000000000..f16b41aaf --- /dev/null +++ b/pipelines/rj_cor/meteorologia/precipitacao_inea/schedules.py @@ -0,0 +1,34 @@ +# -*- coding: utf-8 -*- +# pylint: disable=C0103 +""" +Schedules for precipitacao_inea +Rodar a cada 1 minuto +""" +from datetime import timedelta, datetime +from prefect.schedules import Schedule +from prefect.schedules.clocks import IntervalClock +from pipelines.constants import constants + +minute_schedule = Schedule( + clocks=[ + IntervalClock( + interval=timedelta(minutes=5), + start_date=datetime(2023, 1, 1, 0, 1, 0), + labels=[ + constants.RJ_COR_AGENT_LABEL.value, + ], + parameter_defaults={ + # "trigger_rain_dashboard_update": True, + "materialize_after_dump": True, + "mode": "prod", + "materialize_to_datario": True, + "dump_to_gcs": False, + "dump_mode": "append", + "dataset_id_pluviometric": "clima_pluviometro", + "table_id_pluviometric": "taxa_precipitacao_inea", + "dataset_id_fluviometric": "clima_fluviometro", + "table_id_fluviometric": "lamina_agua_inea", + }, + ), + ] +) diff --git a/pipelines/rj_cor/meteorologia/precipitacao_inea/tasks.py b/pipelines/rj_cor/meteorologia/precipitacao_inea/tasks.py new file mode 100644 index 000000000..acc66e445 --- /dev/null +++ b/pipelines/rj_cor/meteorologia/precipitacao_inea/tasks.py @@ -0,0 +1,237 @@ +# -*- coding: utf-8 -*- +# pylint: disable=C0103 +""" +Tasks for precipitacao_inea +""" +from datetime import timedelta +from pathlib import Path +from typing import Union, Tuple + +import numpy as np +import pandas as pd +import pendulum +from prefect import task +from prefect.engine.signals import ENDRUN +from prefect.engine.state import Skipped, Failed +from pipelines.constants import constants +from pipelines.utils.utils import ( + log, + parse_date_columns, + save_updated_rows_on_redis, + to_partitions, +) + + +@task( + nout=2, + max_retries=constants.TASK_MAX_RETRIES.value, + retry_delay=timedelta(seconds=constants.TASK_RETRY_DELAY.value), +) +def download_data() -> pd.DataFrame: + """ + Download data from API + """ + + estacoes = { + "1": "225543320", # Campo Grande + "2": "BE70E166", # Capela Mayrink + "3": "225543250", # Eletrobras + "4": "2243088", # Realengo + "5": "225443130", # Sao Cristovao + } + + dataframe = pd.DataFrame() + for key, value in estacoes.items(): + url = f"http://200.20.53.8/alertadecheias/{value}.xlsx" + dataframe_temp = pd.read_excel(url) + dataframe_temp["id_estacao"] = key + dataframe = pd.concat([dataframe, dataframe_temp]) + return dataframe + + +@task( + nout=2, + max_retries=constants.TASK_MAX_RETRIES.value, + retry_delay=timedelta(seconds=constants.TASK_RETRY_DELAY.value), +) +def treat_data( + dataframe: pd.DataFrame, dataset_id: str, table_id: str, mode: str = "dev" +) -> Tuple[pd.DataFrame, pd.DataFrame]: + """ + Rename cols and filter data using Redis + """ + + dataframe["data_medicao"] = ( + pd.to_datetime(dataframe.Data, format="%d/%m/%Y").dt.strftime("%Y-%m-%d") + + " " + + dataframe["Hora"] + + ":00" + ) + + rename_cols = { + "Chuva Último dado": "acumulado_chuva_15_min", + " Chuva Acumulada 1H": "acumulado_chuva_1_h", + " Chuva Acumulada 4H": "acumulado_chuva_4_h", + " Chuva Acumulada 24H": "acumulado_chuva_24_h", + " Chuva Acumulada 96H": "acumulado_chuva_96_h", + " Chuva Acumulada 30D": "acumulado_chuva_30_d", + " Último Nível": "altura_agua", + } + dataframe.rename(columns=rename_cols, inplace=True) + + # replace all "Dado Nulo" to nan + dataframe.replace({"Dado Nulo": np.nan}, inplace=True) + + # Eliminate where the id_estacao is the same keeping the smallest one + dataframe.sort_values( + ["id_estacao", "data_medicao"] + list(rename_cols.values()), inplace=True + ) + dataframe.drop_duplicates(subset=["id_estacao", "data_medicao"], keep="first") + + date_format = "%Y-%m-%d %H:%M:%S" + # dataframe["data_medicao"] = dataframe["data_medicao"].dt.strftime(date_format) + + log(f"Dataframe before comparing with last data saved on redis {dataframe.head()}") + log(f"Dataframe before comparing {dataframe[dataframe['id_estacao']=='1']}") + + dataframe = save_updated_rows_on_redis( + dataframe, + dataset_id, + table_id, + unique_id="id_estacao", + date_column="data_medicao", + date_format=date_format, + mode=mode, + ) + + log(f"Dataframe after comparing with last data saved on redis {dataframe.head()}") + log(f"Dataframe after comparing {dataframe[dataframe['id_estacao']=='1']}") + + # If df is empty stop flow + if dataframe.shape[0] == 0: + skip_text = "No new data available on API" + log(skip_text) + raise ENDRUN(state=Skipped(skip_text)) + + pluviometric_cols = [ + "id_estacao", + "data_medicao", + "acumulado_chuva_15_min", + "acumulado_chuva_1_h", + "acumulado_chuva_4_h", + "acumulado_chuva_24_h", + "acumulado_chuva_96_h", + "acumulado_chuva_30_d", + ] + fluviometric_cols = ["id_estacao", "data_medicao", "altura_agua"] + + dfr_pluviometric = dataframe[pluviometric_cols].copy() + dfr_fluviometric = dataframe.loc[ + dataframe["altura_agua"] != "Estação pluviométrica", fluviometric_cols + ].copy() + + # Replace all values bigger than 10000 on "altura_agua" to nan + dfr_fluviometric.loc[ + dfr_fluviometric["altura_agua"] > 10000, "altura_agua" + ] = np.nan + + fluviometric_cols_order = [ + "id_estacao", + "data_medicao", + "altura_agua", + ] + dfr_fluviometric = dfr_fluviometric[fluviometric_cols_order].copy() + + return dfr_pluviometric, dfr_fluviometric + + +@task( + nout=2, + max_retries=constants.TASK_MAX_RETRIES.value, + retry_delay=timedelta(seconds=constants.TASK_RETRY_DELAY.value), +) +def check_new_data( + dfr_pluviometric: pd.DataFrame, + dfr_fluviometric: pd.DataFrame, +) -> Tuple[bool, bool]: + """ + Check if the dataframes are empty + """ + + new_pluviometric_data = True + new_fluviometric_data = True + + if dfr_pluviometric.shape[0] == 0: + log("No new pluviometric data available on API") + new_pluviometric_data = False + if dfr_fluviometric.shape[0] == 0: + log("No new fluviometric data available on API") + new_fluviometric_data = False + return new_pluviometric_data, new_fluviometric_data + + +@task(skip_on_upstream_skip=False) +def wait_task() -> None: + """Task create because prefect was messing up paths to be saved on each table""" + log("End waiting pluviometric task to end.") + + +@task +def save_data(dataframe: pd.DataFrame, folder_name: str = None) -> Union[str, Path]: + """ + Save data on a csv file to be uploaded to GCP + """ + + prepath = Path("/tmp/precipitacao") + if folder_name: + prepath = Path("/tmp/precipitacao") / folder_name + prepath.mkdir(parents=True, exist_ok=True) + + log(f"Start saving data on {prepath}") + log(f"Data to be saved {dataframe.head()}") + + partition_column = "data_medicao" + dataframe, partitions = parse_date_columns(dataframe, partition_column) + current_time = pendulum.now("America/Sao_Paulo").strftime("%Y%m%d%H%M") + + to_partitions( + data=dataframe, + partition_columns=partitions, + savepath=prepath, + data_type="csv", + suffix=current_time, + ) + log(f"[DEBUG] Files saved on {prepath}") + return prepath + + +@task +def check_for_new_stations( + dataframe: pd.DataFrame, + wait=None, # pylint: disable=unused-argument +) -> None: + """ + Check if the updated stations are the same as before. + If not, consider flow as failed and call attention to + add this new station on estacoes_cemaden. + I can't automatically update this new station, because + I couldn't find a url that gives me the lat and lon for + all the stations. + """ + + stations_before = [ + "1", + "2", + "3", + "4", + "5", + ] + new_stations = [ + i for i in dataframe.id_estacao.unique() if str(i) not in stations_before + ] + if len(new_stations) != 0: + message = f"New station identified. You need to update INEA\ + estacoes_inea adding station(s) {new_stations}: \ + {dataframe[dataframe.id_estacao.isin(new_stations)]} " + log(message) + raise ENDRUN(state=Failed(message)) diff --git a/poetry.lock b/poetry.lock index 76ec6ab48..8d50fb59c 100644 --- a/poetry.lock +++ b/poetry.lock @@ -1612,6 +1612,18 @@ files = [ {file = "entrypoints-0.4.tar.gz", hash = "sha256:b706eddaa9218a19ebcd67b56818f05bb27589b1ca9e8d797b74affad4ccacd4"}, ] +[[package]] +name = "et-xmlfile" +version = "1.1.0" +description = "An implementation of lxml.xmlfile for the standard library" +category = "main" +optional = false +python-versions = ">=3.6" +files = [ + {file = "et_xmlfile-1.1.0-py3-none-any.whl", hash = "sha256:a2ba85d1d6a74ef63837eed693bcb89c3f752169b0e3e7ae5b16ca5e1b3deada"}, + {file = "et_xmlfile-1.1.0.tar.gz", hash = "sha256:8eb9e2bc2f8c97e37a2dc85a09ecdcdec9d8a396530a6d5a33b30b9a92da0c5c"}, +] + [[package]] name = "exceptiongroup" version = "1.1.2" @@ -4219,6 +4231,21 @@ numpy = [ {version = ">=1.21.4", markers = "python_version >= \"3.10\" and platform_system == \"Darwin\""}, ] +[[package]] +name = "openpyxl" +version = "3.1.2" +description = "A Python library to read/write Excel 2010 xlsx/xlsm files" +category = "main" +optional = false +python-versions = ">=3.6" +files = [ + {file = "openpyxl-3.1.2-py2.py3-none-any.whl", hash = "sha256:f91456ead12ab3c6c2e9491cf33ba6d08357d802192379bb482f1033ade496f5"}, + {file = "openpyxl-3.1.2.tar.gz", hash = "sha256:a6f5977418eff3b2d5500d54d9db50c8277a368436f4e4f8ddb1be3422870184"}, +] + +[package.dependencies] +et-xmlfile = "*" + [[package]] name = "orjson" version = "3.9.2" @@ -7582,4 +7609,4 @@ testing = ["coverage (>=5.0.3)", "zope.event", "zope.testing"] [metadata] lock-version = "2.0" python-versions = ">=3.9,<3.11" -content-hash = "e858f1313971a28f2977c43b2bf0fbaf0490f8a44ad450df133a7a3e96f90666" +content-hash = "63042a0ef31df90a35c18fd5e65e78a8ca1f67c54289f0182b1957b19b465c9c" diff --git a/pyproject.toml b/pyproject.toml index 83db62307..0ba4b7f52 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -66,6 +66,7 @@ pyodbc = "^5.0.1" h3 = "^3.7.6" dask = "^2023.11.0" cartopy = "^0.22.0" +openpyxl = "^3.1.2" [tool.poetry.dev-dependencies] pylint = "^2.12.2"