Skip to content

Commit

Permalink
Fix makefile, fix ti KeyError
Browse files Browse the repository at this point in the history
  • Loading branch information
ividito committed Dec 20, 2024
1 parent 46e66e6 commit da0650c
Show file tree
Hide file tree
Showing 4 changed files with 10 additions and 11 deletions.
8 changes: 4 additions & 4 deletions dags/veda_data_pipeline/groups/processing_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,12 @@
def log_task(text: str):
logging.info(text)

@task()
def extract_discovery_items_from_payload(ti, **kwargs):
discovery_items = ti.dag_run.conf.get("discovery_items")
@task
def extract_discovery_items_from_payload(ti, payload=None, **kwargs):
discovery_items = ti.dag_run.conf.get("discovery_items") if not payload else payload.get("discovery_items")
return discovery_items

@task()
@task
def remove_thumbnail_asset(ti):
payload = deepcopy(ti.dag_run.conf)
assets = payload.get("assets", {})
Expand Down
2 changes: 0 additions & 2 deletions dags/veda_data_pipeline/groups/transfer_group.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
from datetime import timedelta

from airflow.models.variable import Variable
from airflow.operators.python import BranchPythonOperator, PythonOperator
import json
Expand Down
3 changes: 1 addition & 2 deletions dags/veda_data_pipeline/veda_dataset_pipeline.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import pendulum
from airflow import DAG
from airflow.models.param import Param
from airflow.decorators import task
from veda_data_pipeline.groups.discover_group import discover_from_s3_task, get_files_task
from airflow.operators.dummy_operator import DummyOperator as EmptyOperator
from veda_data_pipeline.groups.collection_group import collection_task_group
Expand Down Expand Up @@ -50,7 +49,7 @@
end = EmptyOperator(task_id="end")

mutated_payloads = start >> collection_task_group() >> remove_thumbnail_asset()
discovery_items = extract_discovery_items_from_payload(mutated_payloads)
discovery_items = extract_discovery_items_from_payload(payload=mutated_payloads)
discover = discover_from_s3_task.partial(payload=mutated_payloads).expand(event=discovery_items)
get_files = get_files_task(payload=discover)
build_stac = build_stac_task.expand(payload=get_files)
Expand Down
8 changes: 5 additions & 3 deletions sm2a/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ info_message = \

count_down = \
@echo "Spinning up the system please wait..."; \
secs=40; \
while [ $secs -gt 0 ]; do \
secs=40 ;\
while [ $$secs -gt 0 ]; do \
printf "%d\033[0K\r" "$$secs"; \
sleep 1; \
secs=$$((secs - 1)); \
Expand All @@ -26,6 +26,9 @@ all: sm2a-local-init sm2a-local-run

refresh: sm2a-local-build sm2a-local-run

count_down_test:
$(count_down)

sm2a-local-run: sm2a-local-stop
@echo "Running SM2A"
docker compose up -d
Expand All @@ -36,7 +39,6 @@ sm2a-local-run: sm2a-local-stop
@echo "To use local SM2A with AWS update ${SM2A_FOLDER}/sm2a-local-config/.env AWS credentials"

sm2a-local-init:
cp sm2a-local-config/env_example sm2a-local-config/.env
cp -r ../dags .
docker compose run --rm airflow-cli db init
docker compose run --rm airflow-cli users create --email [email protected] --firstname airflow --lastname airflow --password airflow --username airflow --role Admin
Expand Down

0 comments on commit da0650c

Please sign in to comment.