Skip to content

Commit

Permalink
desativa schedules projeto_subsidio_sppo
Browse files Browse the repository at this point in the history
  • Loading branch information
akaBotelho committed Jun 21, 2024
1 parent a449316 commit 576360e
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 32 deletions.
31 changes: 31 additions & 0 deletions pipelines/rj_smtr/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,37 @@ class constants(Enum): # pylint: disable=c0103
SUBSIDIO_SPPO_DASHBOARD_STAGING_DATASET_ID = "dashboard_subsidio_sppo_staging"
SUBSIDIO_SPPO_DASHBOARD_TABLE_ID = "sumario_servico_dia"
SUBSIDIO_SPPO_DATA_CHECKS_PARAMS = {
"check_trips_processing": {
"query": """SELECT
s.data,
s.tipo_dia,
s.subtipo_dia,
s.tipo_os,
s.feed_version,
s.feed_start_date AS feed_start_date_invalido,
i.feed_start_date AS feed_start_date_valido,
FROM (
SELECT
*
FROM
rj-smtr.projeto_subsidio_sppo.subsidio_data_versao_efetiva
WHERE
DATA >= "2024-04-01" -- DATA_SUBSIDIO_V6_INICIO (Feature trajetos alternativos)
AND DATA BETWEEN DATE("{start_timestamp}")
AND DATE("{end_timestamp}")
) AS s
LEFT JOIN
rj-smtr.gtfs.feed_info AS i
ON
(DATA BETWEEN i.feed_start_date
AND i.feed_end_date
OR (DATA >= i.feed_start_date
AND i.feed_end_date IS NULL))
WHERE
i.feed_start_date != s.feed_start_date
""",
"order_columns": ["data"],
},
"check_gps_capture": {
"query": """WITH
t AS (
Expand Down
56 changes: 24 additions & 32 deletions pipelines/rj_smtr/projeto_subsidio_sppo/flows.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,52 +5,44 @@
"""

from prefect import Parameter, case, task
from prefect.tasks.control_flow import merge
from prefect.run_configs import KubernetesRun
from prefect.storage import GCS
from prefect.tasks.control_flow import merge
from prefect.tasks.prefect import create_flow_run, wait_for_flow_run
from prefect.utilities.edges import unmapped

# EMD Imports #

from pipelines.constants import constants
from pipelines.utils.tasks import (
rename_current_flow_run_now_time,
get_now_date,
get_current_flow_mode,
get_current_flow_labels,
)
from pipelines.utils.decorators import Flow
from pipelines.utils.execute_dbt_model.tasks import get_k8s_dbt_client

# SMTR Imports #

from pipelines.rj_smtr.constants import constants as smtr_constants

from pipelines.rj_smtr.tasks import (
from pipelines.rj_smtr.materialize_to_datario.flows import (
smtr_materialize_to_datario_viagem_sppo_flow,
)
from pipelines.rj_smtr.projeto_subsidio_sppo.tasks import (
check_param,
subsidio_data_quality_check,
)
from pipelines.rj_smtr.tasks import ( # get_local_dbt_client,; set_last_run_timestamp,
fetch_dataset_sha,
get_run_dates,
get_join_dict,
get_previous_date,
# get_local_dbt_client,
# set_last_run_timestamp,
get_run_dates,
)
from pipelines.rj_smtr.veiculo.flows import sppo_veiculo_dia
from pipelines.utils.decorators import Flow

from pipelines.rj_smtr.materialize_to_datario.flows import (
smtr_materialize_to_datario_viagem_sppo_flow,
# from pipelines.rj_smtr.schedules import every_day_hour_five, every_day_hour_seven
from pipelines.utils.execute_dbt_model.tasks import get_k8s_dbt_client, run_dbt_model
from pipelines.utils.tasks import (
get_current_flow_labels,
get_current_flow_mode,
get_now_date,
rename_current_flow_run_now_time,
)

from pipelines.rj_smtr.veiculo.flows import (
sppo_veiculo_dia,
)
# EMD Imports #

from pipelines.rj_smtr.schedules import every_day_hour_five, every_day_hour_seven
from pipelines.utils.execute_dbt_model.tasks import run_dbt_model

from pipelines.rj_smtr.projeto_subsidio_sppo.tasks import (
check_param,
subsidio_data_quality_check,
)
# SMTR Imports #


# Flows #

Expand Down Expand Up @@ -99,7 +91,7 @@
image=constants.DOCKER_IMAGE.value, labels=[constants.RJ_SMTR_AGENT_LABEL.value]
)

viagens_sppo.schedule = every_day_hour_five
# viagens_sppo.schedule = every_day_hour_five

with Flow(
"SMTR: Subsídio SPPO Apuração - Tratamento",
Expand Down Expand Up @@ -266,4 +258,4 @@
subsidio_sppo_apuracao.run_config = KubernetesRun(
image=constants.DOCKER_IMAGE.value, labels=[constants.RJ_SMTR_AGENT_LABEL.value]
)
subsidio_sppo_apuracao.schedule = every_day_hour_seven
# subsidio_sppo_apuracao.schedule = every_day_hour_seven

0 comments on commit 576360e

Please sign in to comment.