-
Notifications
You must be signed in to change notification settings - Fork 0
/
workflow.yaml
88 lines (88 loc) · 4.5 KB
/
workflow.yaml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
main:
params: [args]
steps:
- init:
assign:
- ordered_steps: ["copy", "parse", "create_external_tables"]
- output_to_input_key_names:
copy: {"gcs_path": "input_path"}
parse: {"parsed_files": "source_table_paths"}
- base_url: '${sys.get_env("CLOUD_RUN_SERVICE_URL") + "/"}'
- clinvar_release_date: ${text.replace_all(args["Release Date"], "-", "_")}
- create_workflow_execution:
call: http.post
args:
url: '${base_url + "create_workflow_execution_id/" + clinvar_release_date}'
body:
auth:
type: OIDC
result: workflow_execution_step_result
- init_workflow_execution_id:
assign:
- workflow_execution_id: ${workflow_execution_step_result.body.workflow_execution_id}
- args["destination_dataset"]: ${workflow_execution_id}
- for_in_steps:
for:
value: step
in: ${ordered_steps}
steps:
- log_step_key:
call: sys.log
args:
data: '${"Step is: " + step + " workflow_execution_id is: " + workflow_execution_id}'
- call_step:
call: http.post
args:
url: '${base_url + step + "/" + workflow_execution_id}'
body: '${args}'
auth:
type: OIDC
result: call_step_result
- log_call_step_result:
call: sys.log
args:
data: '${call_step_result}'
- status_check:
call: http.get
args:
url: '${base_url + "step_status/" + workflow_execution_id + "/" + step}'
auth:
type: OIDC
result: status_check_result
- log_status_check_result:
call: sys.log
args:
data: '${status_check_result}'
- check_if_complete:
switch:
- condition: ${"FAILED" == status_check_result.body.step_status}
next: step_failed
- condition: ${"SUCCEEDED" == status_check_result.body.step_status}
next: step_succeeded
- wait_for_result:
call: sys.sleep
args:
seconds: 60
next: status_check
- step_failed:
raise: '${"Step " + step + " failed for " + args.Name + " " + status_check_result.body.message}'
- step_succeeded:
steps:
- conditional_update_args_for_next_step:
switch:
- condition: ${step in output_to_input_key_names}
steps:
- update_args_with_step_results:
for:
value: output_key
in: ${keys(output_to_input_key_names[step])}
steps:
- log_json:
call: sys.log
args:
data: '${json.decode(status_check_result.body.message)}'
- assign_vars:
assign:
- json_message: ${json.decode(status_check_result.body.message)}
- input_key: ${output_to_input_key_names[step][output_key]}
- args[input_key]: ${json_message[output_key]}