diff --git a/dags/veda_data_pipeline/groups/processing_tasks.py b/dags/veda_data_pipeline/groups/processing_tasks.py index fbef6e9..4ce85e61 100644 --- a/dags/veda_data_pipeline/groups/processing_tasks.py +++ b/dags/veda_data_pipeline/groups/processing_tasks.py @@ -13,12 +13,12 @@ def log_task(text: str): logging.info(text) -@task() -def extract_discovery_items_from_payload(ti, **kwargs): - discovery_items = ti.dag_run.conf.get("discovery_items") +@task +def extract_discovery_items_from_payload(ti, payload=None, **kwargs): + discovery_items = ti.dag_run.conf.get("discovery_items") if not payload else payload.get("discovery_items") return discovery_items -@task() +@task def remove_thumbnail_asset(ti): payload = deepcopy(ti.dag_run.conf) assets = payload.get("assets", {}) diff --git a/dags/veda_data_pipeline/groups/transfer_group.py b/dags/veda_data_pipeline/groups/transfer_group.py index f3ddc22..6eab6cc 100644 --- a/dags/veda_data_pipeline/groups/transfer_group.py +++ b/dags/veda_data_pipeline/groups/transfer_group.py @@ -1,5 +1,3 @@ -from datetime import timedelta - from airflow.models.variable import Variable from airflow.operators.python import BranchPythonOperator, PythonOperator import json diff --git a/dags/veda_data_pipeline/veda_dataset_pipeline.py b/dags/veda_data_pipeline/veda_dataset_pipeline.py index 51b8404..6c5f1fc 100644 --- a/dags/veda_data_pipeline/veda_dataset_pipeline.py +++ b/dags/veda_data_pipeline/veda_dataset_pipeline.py @@ -1,7 +1,6 @@ import pendulum from airflow import DAG from airflow.models.param import Param -from airflow.decorators import task from veda_data_pipeline.groups.discover_group import discover_from_s3_task, get_files_task from airflow.operators.dummy_operator import DummyOperator as EmptyOperator from veda_data_pipeline.groups.collection_group import collection_task_group @@ -50,7 +49,7 @@ end = EmptyOperator(task_id="end") mutated_payloads = start >> collection_task_group() >> remove_thumbnail_asset() - discovery_items = extract_discovery_items_from_payload(mutated_payloads) + discovery_items = extract_discovery_items_from_payload(payload=mutated_payloads) discover = discover_from_s3_task.partial(payload=mutated_payloads).expand(event=discovery_items) get_files = get_files_task(payload=discover) build_stac = build_stac_task.expand(payload=get_files) diff --git a/sm2a/Makefile b/sm2a/Makefile index 6efc19f..b092e3c 100644 --- a/sm2a/Makefile +++ b/sm2a/Makefile @@ -10,8 +10,8 @@ info_message = \ count_down = \ @echo "Spinning up the system please wait..."; \ - secs=40; \ - while [ $secs -gt 0 ]; do \ + secs=40 ;\ + while [ $$secs -gt 0 ]; do \ printf "%d\033[0K\r" "$$secs"; \ sleep 1; \ secs=$$((secs - 1)); \ @@ -26,6 +26,9 @@ all: sm2a-local-init sm2a-local-run refresh: sm2a-local-build sm2a-local-run +count_down_test: + $(count_down) + sm2a-local-run: sm2a-local-stop @echo "Running SM2A" docker compose up -d @@ -36,7 +39,6 @@ sm2a-local-run: sm2a-local-stop @echo "To use local SM2A with AWS update ${SM2A_FOLDER}/sm2a-local-config/.env AWS credentials" sm2a-local-init: - cp sm2a-local-config/env_example sm2a-local-config/.env cp -r ../dags . docker compose run --rm airflow-cli db init docker compose run --rm airflow-cli users create --email airflow@example.com --firstname airflow --lastname airflow --password airflow --username airflow --role Admin