Skip to content

Commit

Permalink
Merge branch 'master' into pre-commit-ci-update-config
Browse files Browse the repository at this point in the history
  • Loading branch information
mergify[bot] authored Apr 26, 2024
2 parents 0eefa98 + 83756bf commit 9c02703
Show file tree
Hide file tree
Showing 5 changed files with 254 additions and 24 deletions.
25 changes: 25 additions & 0 deletions pipelines/rj_smtr/br_rj_riodejaneiro_onibus_gps/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# Changelog - br_rj_riodejaneiro_onibus_gps

## [1.0.1] - 2024-04-26

### Adicionado

- Cria task `clean_br_rj_riodejaneiro_onibus_gps` (https://github.com/prefeitura-rio/pipelines/pull/673)

### Alterado

- Otimiza e inclui parâmetros de rematerialização no flow `materialize_sppo` (https://github.com/prefeitura-rio/pipelines/pull/673)

## [1.0.0] - 2024-04-26

### Adicionado

- Adiciona flow `recaptura_realocacao_sppo` (https://github.com/prefeitura-rio/pipelines/pull/668)

### Alterado

- Altera flow `recaptura`, incluindo acionamento do `recaptura_realocacao_sppo` (https://github.com/prefeitura-rio/pipelines/pull/668)

### Corrigido

- Corrigido parâmetro `timestamp` do flow `realocacao_sppo` (https://github.com/prefeitura-rio/pipelines/pull/668)
172 changes: 150 additions & 22 deletions pipelines/rj_smtr/br_rj_riodejaneiro_onibus_gps/flows.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,12 @@
Flows for br_rj_riodejaneiro_onibus_gps
"""

from prefect import Parameter, case
from prefect import Parameter, case, task
from prefect.run_configs import KubernetesRun
from prefect.storage import GCS
from prefect.tasks.prefect import create_flow_run, wait_for_flow_run
from prefect.utilities.edges import unmapped
from prefect.tasks.control_flow import merge


# EMD Imports #
Expand All @@ -34,6 +35,7 @@
get_materialization_date_range,
# get_local_dbt_client,
get_raw,
get_rounded_timestamp,
parse_timestamp_to_string,
query_logs,
save_raw_local,
Expand All @@ -47,6 +49,7 @@
create_api_url_onibus_gps,
create_api_url_onibus_realocacao,
pre_treatment_br_rj_riodejaneiro_onibus_realocacao,
clean_br_rj_riodejaneiro_onibus_gps,
)

from pipelines.rj_smtr.schedules import (
Expand Down Expand Up @@ -79,7 +82,7 @@
rebuild = Parameter("rebuild", False)

# SETUP
timestamp = get_current_timestamp()
timestamp = get_rounded_timestamp(interval_minutes=10)

rename_flow_run = rename_current_flow_run_now_time(
prefix=realocacao_sppo.name + ": ", now_time=timestamp
Expand Down Expand Up @@ -154,6 +157,9 @@
dataset_id = Parameter("dataset_id", default=constants.GPS_SPPO_DATASET_ID.value)
table_id = Parameter("table_id", default=constants.GPS_SPPO_TREATED_TABLE_ID.value)
rebuild = Parameter("rebuild", False)
rematerialization = Parameter("rematerialization", default=False)
date_range_start = Parameter("date_range_start", default=None)
date_range_end = Parameter("date_range_end", default=None)

LABELS = get_current_flow_labels()
MODE = get_current_flow_mode(LABELS)
Expand All @@ -164,22 +170,35 @@
# dbt_client = get_local_dbt_client(host="localhost", port=3001)

# Set specific run parameters #
date_range = get_materialization_date_range(
dataset_id=dataset_id,
table_id=table_id,
raw_dataset_id=raw_dataset_id,
raw_table_id=raw_table_id,
table_run_datetime_column_name="timestamp_gps",
mode=MODE,
delay_hours=constants.GPS_SPPO_MATERIALIZE_DELAY_HOURS.value,
with case(rematerialization, False):
rematerialization_dates_false = date_range = get_materialization_date_range(
dataset_id=dataset_id,
table_id=table_id,
raw_dataset_id=raw_dataset_id,
raw_table_id=raw_table_id,
table_run_datetime_column_name="timestamp_gps",
mode=MODE,
delay_hours=constants.GPS_SPPO_MATERIALIZE_DELAY_HOURS.value,
)
with case(rematerialization, True):
date_range = {
"date_range_start": date_range_start,
"date_range_end": date_range_end,
}
rematerialization_dates_true = clean_br_rj_riodejaneiro_onibus_gps(date_range)

rematerialization_dates = merge(
rematerialization_dates_true, rematerialization_dates_false
)

dataset_sha = fetch_dataset_sha(
dataset_id=dataset_id,
upstream_tasks=[rematerialization_dates],
)

# Run materialization #
with case(rebuild, True):
RUN = run_dbt_model(
RUN_TRUE = run_dbt_model(
dbt_client=dbt_client,
dataset_id=dataset_id,
table_id=table_id,
Expand All @@ -188,30 +207,39 @@
_vars=[date_range, dataset_sha],
flags="--full-refresh",
)
set_last_run_timestamp(
dataset_id=dataset_id,
table_id=table_id,
timestamp=date_range["date_range_end"],
wait=RUN,
mode=MODE,
)

with case(rebuild, False):
RUN = run_dbt_model(
RUN_FALSE = run_dbt_model(
dbt_client=dbt_client,
dataset_id=dataset_id,
table_id=table_id,
exclude="+data_versao_efetiva",
_vars=[date_range, dataset_sha],
upstream=True,
)
set_last_run_timestamp(

RUN = merge(RUN_TRUE, RUN_FALSE)

with case(rematerialization, False):
SET_FALSE = set_last_run_timestamp(
dataset_id=dataset_id,
table_id=table_id,
timestamp=date_range["date_range_end"],
wait=RUN,
mode=MODE,
)

with case(rematerialization, True):
SET_TRUE = task(
lambda: [None],
checkpoint=False,
name="assign_none_to_previous_runs",
)()

SET = merge(SET_TRUE, SET_FALSE)

materialize_sppo.set_reference_tasks([RUN, rematerialization_dates, SET])

materialize_sppo.storage = GCS(emd_constants.GCS_FLOWS_BUCKET.value)
materialize_sppo.run_config = KubernetesRun(
image=emd_constants.DOCKER_IMAGE.value,
Expand Down Expand Up @@ -282,20 +310,120 @@
)
captura_sppo_v2.schedule = every_minute

with Flow(
"SMTR: GPS SPPO Realocação - Recaptura (subflow)",
code_owners=["caio", "fernanda", "boris", "rodrigo"],
) as recaptura_realocacao_sppo:
timestamp = Parameter("timestamp", default=None)
recapture_window_days = Parameter("recapture_window_days", default=1)

# SETUP #
LABELS = get_current_flow_labels()

# Consulta de logs para verificar erros
errors, timestamps, previous_errors = query_logs(
dataset_id=constants.GPS_SPPO_RAW_DATASET_ID.value,
table_id=constants.GPS_SPPO_REALOCACAO_RAW_TABLE_ID.value,
datetime_filter=get_rounded_timestamp(timestamp=timestamp, interval_minutes=10),
interval_minutes=10,
recapture_window_days=recapture_window_days,
)

rename_flow_run = rename_current_flow_run_now_time(
prefix=recaptura_realocacao_sppo.name + ": ",
now_time=get_now_time(),
wait=timestamps,
)

# Em caso de erros, executa a recaptura
with case(errors, True):
# SETUP #
partitions = create_date_hour_partition.map(timestamps)
filename = parse_timestamp_to_string.map(timestamps)

filepath = create_local_partition_path.map(
dataset_id=unmapped(constants.GPS_SPPO_RAW_DATASET_ID.value),
table_id=unmapped(constants.GPS_SPPO_REALOCACAO_RAW_TABLE_ID.value),
filename=filename,
partitions=partitions,
)

url = create_api_url_onibus_realocacao.map(timestamp=timestamps)

# EXTRACT #
raw_status = get_raw.map(url)

raw_filepath = save_raw_local.map(status=raw_status, file_path=filepath)

# CLEAN #
treated_status = pre_treatment_br_rj_riodejaneiro_onibus_realocacao.map(
status=raw_status, timestamp=timestamps
)

treated_filepath = save_treated_local.map(
status=treated_status, file_path=filepath
)

# LOAD #
error = bq_upload.map(
dataset_id=unmapped(constants.GPS_SPPO_RAW_DATASET_ID.value),
table_id=unmapped(constants.GPS_SPPO_REALOCACAO_RAW_TABLE_ID.value),
filepath=treated_filepath,
raw_filepath=raw_filepath,
partitions=partitions,
status=treated_status,
)

upload_logs_to_bq.map(
dataset_id=unmapped(constants.GPS_SPPO_RAW_DATASET_ID.value),
parent_table_id=unmapped(constants.GPS_SPPO_REALOCACAO_RAW_TABLE_ID.value),
error=error,
previous_error=previous_errors,
timestamp=timestamps,
recapture=unmapped(True),
)

recaptura_realocacao_sppo.storage = GCS(emd_constants.GCS_FLOWS_BUCKET.value)
recaptura_realocacao_sppo.run_config = KubernetesRun(
image=emd_constants.DOCKER_IMAGE.value,
labels=[emd_constants.RJ_SMTR_AGENT_LABEL.value],
)

with Flow(
"SMTR: GPS SPPO - Tratamento", code_owners=["caio", "fernanda", "boris", "rodrigo"]
) as recaptura:
version = Parameter("version", default=2)
datetime_filter = Parameter("datetime_filter", default=None)
datetime_filter_gps = Parameter("datetime_filter_gps", default=None)
materialize = Parameter("materialize", default=True)
# SETUP #
LABELS = get_current_flow_labels()

rounded_timestamp = get_rounded_timestamp(interval_minutes=60)
rounded_timestamp_str = parse_timestamp_to_string(
timestamp=rounded_timestamp, pattern="%Y-%m-%d %H:%M:%S"
)

# roda o subflow de recaptura da realocação
run_recaptura_realocacao_sppo = create_flow_run(
flow_name=recaptura_realocacao_sppo.name,
project_name=emd_constants.PREFECT_DEFAULT_PROJECT.value,
labels=LABELS,
run_name=recaptura_realocacao_sppo.name,
parameters={"timestamp": rounded_timestamp_str},
)

wait_recaptura_realocacao_sppo = wait_for_flow_run(
run_recaptura_realocacao_sppo,
stream_states=True,
stream_logs=True,
raise_final_state=True,
)

errors, timestamps, previous_errors = query_logs(
dataset_id=constants.GPS_SPPO_RAW_DATASET_ID.value,
table_id=constants.GPS_SPPO_RAW_TABLE_ID.value,
datetime_filter=datetime_filter,
datetime_filter=datetime_filter_gps,
upstream_tasks=[wait_recaptura_realocacao_sppo],
)

rename_flow_run = rename_current_flow_run_now_time(
Expand Down
70 changes: 70 additions & 0 deletions pipelines/rj_smtr/br_rj_riodejaneiro_onibus_gps/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
import pandas as pd
from prefect import task
import pendulum
import basedosdados as bd
from typing import Union

# EMD Imports #

Expand Down Expand Up @@ -254,3 +256,71 @@ def pre_treatment_br_rj_riodejaneiro_onibus_gps(
log(f"[CATCHED] Task failed with error: \n{error}", level="error")

return {"data": df_gps, "error": error}


@task
def clean_br_rj_riodejaneiro_onibus_gps(date_range: dict) -> Union[str, None]:
"""
Clean GPS data for a given date range.
This function deletes records from three different tables in the database:
- `rj-smtr.br_rj_riodejaneiro_onibus_gps.sppo_aux_registros_filtrada`
- `rj-smtr.br_rj_riodejaneiro_onibus_gps.sppo_aux_registros_realocacao`
- `rj-smtr.br_rj_riodejaneiro_veiculos.gps_sppo`
The records to be deleted are determined by the provided
date range and the timestamp_gps column.
Parameters:
- date_range (dict): A dictionary containing the start
and end dates for the data to be cleaned.
Returns:
- str or None: If an error occurs during the cleaning process,
the error message is returned. Otherwise, None is returned.
"""
error = None

try:
q = f"""
DELETE
FROM
`rj-smtr.br_rj_riodejaneiro_onibus_gps.sppo_aux_registros_filtrada`
WHERE
(data BETWEEN DATE("{date_range['date_range_start']}")
AND DATE("{date_range['date_range_end']}"))
AND (timestamp_gps > "{date_range['date_range_start']}"
AND timestamp_gps <= "{date_range['date_range_end']}");
DELETE
FROM
`rj-smtr.br_rj_riodejaneiro_onibus_gps.sppo_aux_registros_realocacao`
WHERE
(data BETWEEN DATE("{date_range['date_range_start']}")
AND DATE("{date_range['date_range_end']}"))
AND (timestamp_gps > "{date_range['date_range_start']}"
AND timestamp_gps <= "{date_range['date_range_end']}");
DELETE
FROM
`rj-smtr.br_rj_riodejaneiro_veiculos.gps_sppo`
WHERE
(data BETWEEN DATE("{date_range['date_range_start']}")
AND DATE("{date_range['date_range_end']}"))
AND (timestamp_gps > "{date_range['date_range_start']}"
AND timestamp_gps <= "{date_range['date_range_end']}");
"""
log(q)

results = bd.read_sql(q)

log(
f"""Cleaned GPS data for
{date_range['date_range_start']} to {date_range['date_range_end']}\n
Resulting:\n
{results}"""
)
except Exception: # pylint: disable = W0703
error = traceback.format_exc()
log(f"[CATCHED] Task failed with error: \n{error}", level="error")

return error
7 changes: 7 additions & 0 deletions pipelines/rj_smtr/veiculo/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
# Changelog - veiculo

## [1.0.0] - 2024-04-25

### Alterado

- Desliga schedule dos flows `sppo_infracao_captura` e `sppo_licenciamento_captura` em razão de indisponibilidade e geração de dados imprecisos na fonte (SIURB) (https://github.com/prefeitura-rio/pipelines/pull/672)
4 changes: 2 additions & 2 deletions pipelines/rj_smtr/veiculo/flows.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@
image=emd_constants.DOCKER_IMAGE.value,
labels=[emd_constants.RJ_SMTR_AGENT_LABEL.value],
)
sppo_licenciamento_captura.schedule = every_day_hour_seven
# sppo_licenciamento_captura.schedule = every_day_hour_seven

with Flow(
f"SMTR: {constants.VEICULO_DATASET_ID.value} {constants.SPPO_INFRACAO_TABLE_ID.value} - Captura",
Expand Down Expand Up @@ -218,7 +218,7 @@
image=emd_constants.DOCKER_IMAGE.value,
labels=[emd_constants.RJ_SMTR_AGENT_LABEL.value],
)
sppo_infracao_captura.schedule = every_day_hour_seven
# sppo_infracao_captura.schedule = every_day_hour_seven

# flake8: noqa: E501
with Flow(
Expand Down

0 comments on commit 9c02703

Please sign in to comment.