generated from interTwin-eu/repository-template
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathdags_airflow_fede_rucio_k8sexecutor.py
112 lines (94 loc) · 3.54 KB
/
dags_airflow_fede_rucio_k8sexecutor.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
"""
Create a pod that can access rucio endpoint at CNAF
- Using k8s executor
"""
from __future__ import annotations
import logging
import pendulum
# from airflow.configuration import conf
from airflow.decorators import task
from airflow.models.dag import DAG
# import os
# from pathlib import Path
log = logging.getLogger(__name__)
# Check k8s is there
try:
from kubernetes.client import models as k8s
except ImportError:
log.warning(
"This DAG requires the kubernetes provider."
" Please install it with: pip install apache-airflow[cncf.kubernetes]"
)
k8s = None
default_queue = "kubernetes"
# default_queue = "default"
if k8s:
with DAG(
dag_id="rucio_executor",
schedule=None,
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
catchup=False,
tags=["fede"],
) as dag:
#############################################################
# Define config for pod with data access at CNAF and rucio
#############################################################
repo = "leggerf/rucio-intertwin"
tag = "0.0.0"
# use image from worker node as example
# repo = conf.get("kubernetes_executor", "worker_container_repository")
# tag = conf.get("kubernetes_executor", "worker_container_tag")
kube_exec_config_rucio = {
"pod_override": k8s.V1Pod(
spec=k8s.V1PodSpec(
containers=[
k8s.V1Container(
name="base", # the image must be named base
# image=f"{repo}:{tag}", # the image must contain airflow
# args=["sleep 3000"],
# command=["bash", "-cx"],
),
k8s.V1Container(
name="sidecar",
image=f"{repo}:{tag}",
# command=["bash", "-cx"],
# args=["sleep 3000"],
# args=["./get-token.sh"],
command=["./get-token.sh"],
image_pull_policy="Always",
),
],
image_pull_secrets=[
k8s.V1LocalObjectReference(
name="dockerhub",
),
],
)
)
}
#############################################################
# pod with access to rucio
#############################################################
@task(
executor_config=kube_exec_config_rucio,
queue=default_queue,
task_id="data_access",
)
def data_access_task():
log.info("Using image " + f"{repo}:{tag}")
rucio_task = data_access_task()
#############################################################
# another pod with access to rucio
#############################################################
@task(
executor_config=kube_exec_config_rucio,
queue=default_queue,
task_id="data_access_1",
)
def data_access_1_task():
log.info("Using image " + f"{repo}:{tag}")
rucio_1_task = data_access_1_task()
#############################################################
# Define DAG execution
#############################################################
(rucio_task >> rucio_1_task)