diff --git a/pipelines/rj_smtr/constants.py b/pipelines/rj_smtr/constants.py index cb3393d12..d41c037c0 100644 --- a/pipelines/rj_smtr/constants.py +++ b/pipelines/rj_smtr/constants.py @@ -168,6 +168,37 @@ class constants(Enum): # pylint: disable=c0103 SUBSIDIO_SPPO_DASHBOARD_STAGING_DATASET_ID = "dashboard_subsidio_sppo_staging" SUBSIDIO_SPPO_DASHBOARD_TABLE_ID = "sumario_servico_dia" SUBSIDIO_SPPO_DATA_CHECKS_PARAMS = { + "check_trips_processing": { + "query": """SELECT + s.data, + s.tipo_dia, + s.subtipo_dia, + s.tipo_os, + s.feed_version, + s.feed_start_date AS feed_start_date_invalido, + i.feed_start_date AS feed_start_date_valido, + FROM ( + SELECT + * + FROM + rj-smtr.projeto_subsidio_sppo.subsidio_data_versao_efetiva + WHERE + DATA >= "2024-04-01" -- DATA_SUBSIDIO_V6_INICIO (Feature trajetos alternativos) + AND DATA BETWEEN DATE("{start_timestamp}") + AND DATE("{end_timestamp}") + ) AS s + LEFT JOIN + rj-smtr.gtfs.feed_info AS i + ON + (DATA BETWEEN i.feed_start_date + AND i.feed_end_date + OR (DATA >= i.feed_start_date + AND i.feed_end_date IS NULL)) + WHERE + i.feed_start_date != s.feed_start_date + """, + "order_columns": ["data"], + }, "check_gps_capture": { "query": """WITH t AS ( diff --git a/pipelines/rj_smtr/projeto_subsidio_sppo/flows.py b/pipelines/rj_smtr/projeto_subsidio_sppo/flows.py index a3c4c53c7..a94899d37 100644 --- a/pipelines/rj_smtr/projeto_subsidio_sppo/flows.py +++ b/pipelines/rj_smtr/projeto_subsidio_sppo/flows.py @@ -5,52 +5,44 @@ """ from prefect import Parameter, case, task -from prefect.tasks.control_flow import merge from prefect.run_configs import KubernetesRun from prefect.storage import GCS +from prefect.tasks.control_flow import merge from prefect.tasks.prefect import create_flow_run, wait_for_flow_run from prefect.utilities.edges import unmapped -# EMD Imports # - from pipelines.constants import constants -from pipelines.utils.tasks import ( - rename_current_flow_run_now_time, - get_now_date, - get_current_flow_mode, - get_current_flow_labels, -) -from pipelines.utils.decorators import Flow -from pipelines.utils.execute_dbt_model.tasks import get_k8s_dbt_client - -# SMTR Imports # - from pipelines.rj_smtr.constants import constants as smtr_constants - -from pipelines.rj_smtr.tasks import ( +from pipelines.rj_smtr.materialize_to_datario.flows import ( + smtr_materialize_to_datario_viagem_sppo_flow, +) +from pipelines.rj_smtr.projeto_subsidio_sppo.tasks import ( + check_param, + subsidio_data_quality_check, +) +from pipelines.rj_smtr.tasks import ( # get_local_dbt_client,; set_last_run_timestamp, fetch_dataset_sha, - get_run_dates, get_join_dict, get_previous_date, - # get_local_dbt_client, - # set_last_run_timestamp, + get_run_dates, ) +from pipelines.rj_smtr.veiculo.flows import sppo_veiculo_dia +from pipelines.utils.decorators import Flow -from pipelines.rj_smtr.materialize_to_datario.flows import ( - smtr_materialize_to_datario_viagem_sppo_flow, +# from pipelines.rj_smtr.schedules import every_day_hour_five, every_day_hour_seven +from pipelines.utils.execute_dbt_model.tasks import get_k8s_dbt_client, run_dbt_model +from pipelines.utils.tasks import ( + get_current_flow_labels, + get_current_flow_mode, + get_now_date, + rename_current_flow_run_now_time, ) -from pipelines.rj_smtr.veiculo.flows import ( - sppo_veiculo_dia, -) +# EMD Imports # -from pipelines.rj_smtr.schedules import every_day_hour_five, every_day_hour_seven -from pipelines.utils.execute_dbt_model.tasks import run_dbt_model -from pipelines.rj_smtr.projeto_subsidio_sppo.tasks import ( - check_param, - subsidio_data_quality_check, -) +# SMTR Imports # + # Flows # @@ -99,7 +91,7 @@ image=constants.DOCKER_IMAGE.value, labels=[constants.RJ_SMTR_AGENT_LABEL.value] ) -viagens_sppo.schedule = every_day_hour_five +# viagens_sppo.schedule = every_day_hour_five with Flow( "SMTR: Subsídio SPPO Apuração - Tratamento", @@ -266,4 +258,4 @@ subsidio_sppo_apuracao.run_config = KubernetesRun( image=constants.DOCKER_IMAGE.value, labels=[constants.RJ_SMTR_AGENT_LABEL.value] ) -subsidio_sppo_apuracao.schedule = every_day_hour_seven +# subsidio_sppo_apuracao.schedule = every_day_hour_seven