-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathstate-machine.asl.yml
904 lines (893 loc) · 30.4 KB
/
state-machine.asl.yml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
StartAt: Normalize Input
States:
# Start of pre-processing
Normalize Input:
Comment: Normalizes input data
Type: Task
InputPath: "$"
Resource: "${NormalizeInputLambdaFunctionArn}"
Parameters:
Input.$: "$"
StateMachine:
Id.$: "$$.StateMachine.Id"
ResultPath: "$"
OutputPath: "$"
Next: JobReceived Callbacks Map
Retry:
- ErrorEquals:
- MissingTaskTypeError
MaxAttempts: 0
- ErrorEquals:
- States.ALL
IntervalSeconds: 1
MaxAttempts: 2
BackoffRate: 1
Catch:
- ErrorEquals:
- States.ALL
ResultPath: "$.Error"
Next: Normalize Input Catcher
Normalize Input Catcher:
Comment: Add a parameter so normalize errors can be identified
Type: Pass
InputPath: "$"
Result: NORMALIZE_INPUT_ERROR
ResultPath: "$.State"
OutputPath: "$"
Next: Add Empty TaskResults
JobReceived Callbacks Map:
Type: Map
Comment: Iterates over all callback endpoints to indicate the job was received
InputPath: "$"
ItemsPath: "$.Job.Callbacks"
Parameters:
Callback.$: "$$.Map.Item.Value"
StateMachine:
Id.$: "$$.StateMachine.Id"
Execution:
Id.$: "$$.Execution.Id"
Message: # The JSON of this value will be sent to endpoints
JobReceived:
Job:
Id.$: "$.Job.Id"
Execution:
Id.$: "$$.Execution.Id"
State: RECEIVED
ResultPath: "$.Void" # The output of the iterator states is discarded
OutputPath: "$"
Next: Ingest Source File
MaxConcurrency: 0
Iterator:
StartAt: Send JobReceived Callback
States:
Send JobReceived Callback:
Type: Task
Comment: >-
Sends a job received message for a single callback endpoint in
the iterator
InputPath: "$"
Resource: "${CallbackLambdaFunctionArn}"
ResultPath: "$"
OutputPath: "$"
End: true
Retry:
- ErrorEquals:
- States.ALL
IntervalSeconds: 5
MaxAttempts: 3
BackoffRate: 2
Ingest Source File:
Comment: Creates an artifact of the source file in S3
Type: Task
InputPath: "$"
Parameters:
Job:
Source.$: "$.Job.Source"
Execution:
Id.$: "$$.Execution.Id"
Resource: "${IngestLambdaFunctionArn}"
ResultPath: "$.Artifact"
OutputPath: "$"
Next: Detect Source File Type
Retry:
- ErrorEquals:
- UnknownSourceModeError
MaxAttempts: 0
- ErrorEquals:
- States.ALL
IntervalSeconds: 5
MaxAttempts: 3
BackoffRate: 2
Catch:
- ErrorEquals:
- States.ALL
ResultPath: "$.Error"
Next: Ingest Source File Catcher
Ingest Source File Catcher:
Comment: Add a parameter so ingest errors can be identified
Type: Pass
InputPath: "$"
Result: SOURCE_FILE_INGEST_ERROR
ResultPath: "$.State"
OutputPath: "$"
Next: Add Empty TaskResults
Detect Source File Type:
Comment: Detects the file type of the source file artifact
Type: Task
InputPath: "$"
Parameters:
Artifact.$: "$.Artifact"
Resource: "${SourceTypeLambdaFunctionArn}"
ResultPath: "$.Artifact.Descriptor"
OutputPath: "$"
Next: Tasks Map
Retry:
- ErrorEquals:
- States.ALL
IntervalSeconds: 5
MaxAttempts: 3
BackoffRate: 2
Catch:
- ErrorEquals:
- States.ALL
ResultPath: "$.Error"
Next: Detect Source File Type Catcher
Detect Source File Type Catcher:
Comment: Add a parameter so type detection errors can be identified
Type: Pass
InputPath: "$"
Result: SOURCE_FILE_TYPE_DETECTION_ERROR
ResultPath: "$.State"
OutputPath: "$"
Next: Add Empty TaskResults
# End of pre-processing
# Start of task execution
Tasks Map:
# The key output of this map state is the TaskResults array. There will
# be one element for each task, whether it succeeded or failed. Successful
# task elements will have a `Task` property, and failed tasks will have a
# `Type` property.
Type: Map
Comment: Iterates over all tasks included in the job
InputPath: "$"
ItemsPath: "$.Job.Tasks"
Parameters:
# Parameters' value is what is passed to each state within the iterator,
# but each state decides independetly what is passed to its backing
# resource (Lambda, etc). For example, Callbacks is available to each
# state, but usually not passed into Lambdas, since most tasks don't
# utilize that data.
Job:
Id.$: "$.Job.Id"
Execution:
Id.$: "$$.Execution.Id"
Artifact.$: "$.Artifact"
TaskIndex.$: "$$.Map.Item.Index"
Task.$: "$$.Map.Item.Value" # This value is a task defined in the job input, e.g., { "Type": "Copy" }
Callbacks.$: "$.Job.Callbacks"
ResultPath: "$.TaskResults"
OutputPath: "$"
Next: JobResult Callback Map
MaxConcurrency: 0
Iterator:
StartAt: Route Task By Type
States:
Route Task By Type:
Type: Choice
Comment: >-
For the current task being iterated, routes to the appropriate
state
Default: Unknown Task Type
Choices:
- Variable: "$.Task.Type"
StringEquals: Inspect
Next: Inspect Task Handler
- And:
- Variable: "$.Task.Type"
StringEquals: Copy
- Variable: "$.Task.Mode"
StringEquals: AWS/S3
Next: Copy Task Handler
- And:
- Variable: "$.Task.Type"
StringEquals: Copy
- Variable: "$.Task.Mode"
StringEquals: FTP/Passive
Next: FTP Copy Task Fargate Execution
- And:
- Variable: "$.Task.Type"
StringEquals: Copy
- Variable: "$.Task.Mode"
StringEquals: FTP/Active
Next: FTP Copy Task Fargate Execution
- And:
- Variable: "$.Task.Type"
StringEquals: Copy
- Variable: "$.Task.Mode"
StringEquals: FTP/Auto
Next: FTP Copy Task Fargate Execution
- Variable: "$.Task.Type"
StringEquals: Transcode
Next: Transcode Task Fargate Execution
- Variable: "$.Task.Type"
StringEquals: Image
Next: Image Task Handler
- Variable: "$.Task.Type"
StringEquals: Transcribe
Next: Transcribe Task Job Start
- Variable: "$.Task.Type"
StringEquals: WavWrap
Next: WAV Wrap Task Handler
- Variable: "$.Task.Type"
StringEquals: DetectSilence
Next: Detect Silence Task Handler
- Variable: "$.Task.Type"
StringEquals: DetectTone
Next: Detect Tone Task Handler
- Variable: "$.Task.Type"
StringEquals: Waveform
Next: Waveform Task Handler
# This is a no-op. No callbacks are sent, and any record of this task
# will be filtered out of the job result callbacks
Unknown Task Type:
Type: Pass
Comment: Handles and blackholes tasks with an unknown type
End: true
ResultPath: "$"
Result:
Task: "Null"
# Task operations
# - The InputPath and OutputPath of all of these must be $, so that
# all data is available to states down the line.
# - The ResultPath for states that are returning the final output of a
# task's operation (i.e., states immediately prior to the callbacks
# map) must be $.TaskResult, and their return value must be the
# expected standard task output (e.g., { Task: 'Copy' })
# - All task operation states, not just the final state, should catch
# to the TaskResult Error Callback Map.
Inspect Task Handler:
Type: Task
Comment: Inspects the artifact file
Resource: "${InspectMediaLambdaFunctionArn}"
InputPath: "$"
Parameters:
Job:
Id.$: "$.Job.Id"
Artifact.$: "$.Artifact"
Task.$: "$.Task"
ResultPath: "$.TaskResult"
OutputPath: "$"
Next: TaskResult Callbacks Map
Retry:
- ErrorEquals:
- States.ALL
IntervalSeconds: 5
MaxAttempts: 3
BackoffRate: 2
Catch:
- ErrorEquals:
- States.ALL
ResultPath: "$.Error"
Next: TaskResult Error Callback Map
Copy Task Handler:
Type: Task
Comment: Copies the artifact
Resource: "${CopyLambdaFunctionArn}"
InputPath: "$"
Parameters:
Job:
Id.$: "$.Job.Id"
Artifact.$: "$.Artifact"
Task.$: "$.Task"
ResultPath: "$.TaskResult"
OutputPath: "$"
Next: TaskResult Callbacks Map
Retry:
- ErrorEquals:
- UnknownCopyTaskModeError
MaxAttempts: 0
- ErrorEquals:
- States.ALL
IntervalSeconds: 5
MaxAttempts: 3
BackoffRate: 2
Catch:
- ErrorEquals:
- States.ALL
ResultPath: "$.Error"
Next: TaskResult Error Callback Map
FTP Copy Task Fargate Execution:
Type: Task
Comment: Sends artifact to an FTP destination
Resource: arn:aws:states:::ecs:runTask.sync
InputPath: "$"
Parameters:
Cluster: "${EcsClusterArn}"
LaunchType: FARGATE
Overrides:
ContainerOverrides:
- Environment:
- Name: STATE_MACHINE_ARN
Value.$: "$$.StateMachine.Id"
- Name: STATE_MACHINE_NAME
Value.$: "$$.StateMachine.Name"
- Name: STATE_MACHINE_EXECUTION_ID
Value.$: "$$.Execution.Id"
- Name: STATE_MACHINE_JOB_ID
Value.$: "$.Job.Id"
- Name: STATE_MACHINE_TASK_INDEX
Value.$: States.Format('{}', $.TaskIndex)
- Name: STATE_MACHINE_AWS_REGION
Value: "${AwsRegion}"
- Name: STATE_MACHINE_ARTIFACT_BUCKET_NAME
Value.$: "$.Artifact.BucketName"
- Name: STATE_MACHINE_ARTIFACT_OBJECT_KEY
Value.$: "$.Artifact.ObjectKey"
- Name: STATE_MACHINE_TASK_JSON
Value.$: States.JsonToString($.Task)
Name: "${FtpCopyContainerName}"
NetworkConfiguration:
AwsvpcConfiguration:
AssignPublicIp: ENABLED
SecurityGroups:
- "${FtpCopyEcsTaskSecurityGroupId}"
Subnets:
- "${VpcPublicSubnet1}"
- "${VpcPublicSubnet2}"
PropagateTags: TASK_DEFINITION
TaskDefinition: "${FtpCopyEcsTaskDefinitionArn}"
ResultPath: "$.Void"
OutputPath: "$"
Next: FTP Copy Task Results Formatter
Retry:
- ErrorEquals:
- States.ALL
IntervalSeconds: 10
MaxAttempts: 3
BackoffRate: 2
Catch:
- ErrorEquals:
- States.ALL
ResultPath: "$.Error"
Next: TaskResult Error Callback Map
FTP Copy Task Results Formatter:
Type: Task
Comment: Formats the output of an FTP copy task
Resource: "${FtpCopyTaskOutputLambdaFunctionArn}"
InputPath: "$"
Parameters:
Job:
Id.$: "$.Job.Id"
Execution:
Id.$: "$$.Execution.Id"
Task.$: "$.Task"
TaskIteratorIndex.$: "$.TaskIndex"
ResultPath: "$.TaskResult"
OutputPath: "$"
Next: TaskResult Callbacks Map
Retry:
- ErrorEquals:
- FtpOperationError
MaxAttempts: 0
- ErrorEquals:
- States.ALL
IntervalSeconds: 5
MaxAttempts: 3
BackoffRate: 2
Catch:
- ErrorEquals:
- States.ALL
ResultPath: "$.Error"
Next: TaskResult Error Callback Map
Transcode Task Fargate Execution:
Type: Task
Comment: Transcodes multimedia artifact
Resource: arn:aws:states:::ecs:runTask.sync
InputPath: "$"
Parameters:
Cluster: "${EcsClusterArn}"
LaunchType: FARGATE
Overrides:
ContainerOverrides:
- Environment:
- Name: STATE_MACHINE_ARN
Value.$: "$$.StateMachine.Id"
- Name: STATE_MACHINE_NAME
Value.$: "$$.StateMachine.Name"
- Name: STATE_MACHINE_EXECUTION_ID
Value.$: "$$.Execution.Id"
- Name: STATE_MACHINE_JOB_ID
Value.$: "$.Job.Id"
- Name: STATE_MACHINE_TASK_INDEX
Value.$: States.Format('{}', $.TaskIndex)
- Name: STATE_MACHINE_S3_DESTINATION_WRITER_ROLE
Value: "${S3DestinationWriterRoleArn}"
- Name: STATE_MACHINE_AWS_REGION
Value: "${AwsRegion}"
- Name: STATE_MACHINE_ARTIFACT_BUCKET_NAME
Value.$: "$.Artifact.BucketName"
- Name: STATE_MACHINE_ARTIFACT_OBJECT_KEY
Value.$: "$.Artifact.ObjectKey"
- Name: STATE_MACHINE_DESTINATION_JSON
Value.$: States.JsonToString($.Task.Destination)
- Name: STATE_MACHINE_DESTINATION_MODE
Value.$: "$.Task.Destination.Mode"
- Name: STATE_MACHINE_DESTINATION_BUCKET_NAME
Value.$: "$.Task.Destination.BucketName"
- Name: STATE_MACHINE_DESTINATION_OBJECT_KEY
Value.$: "$.Task.Destination.ObjectKey"
- Name: STATE_MACHINE_DESTINATION_FORMAT
Value.$: "$.Task.Format"
- Name: STATE_MACHINE_FFMPEG_GLOBAL_OPTIONS
Value.$: "$.Task.FFmpeg.GlobalOptions"
- Name: STATE_MACHINE_FFMPEG_INPUT_FILE_OPTIONS
Value.$: "$.Task.FFmpeg.InputFileOptions"
- Name: STATE_MACHINE_FFMPEG_OUTPUT_FILE_OPTIONS
Value.$: "$.Task.FFmpeg.OutputFileOptions"
# Guaranteed and required values should be listed and
# accessed directly. Optional or conditional values can be
# pulled from the JSON, to avoid state machine runtime
# errors
- Name: STATE_MACHINE_TASK_JSON
Value.$: States.JsonToString($.Task)
- Name: STATE_MACHINE_ARTIFACT_JSON
Value.$: States.JsonToString($.Artifact)
Name: "${TranscodeContainerName}"
NetworkConfiguration:
AwsvpcConfiguration:
AssignPublicIp: ENABLED
Subnets:
- "${VpcPublicSubnet1}"
- "${VpcPublicSubnet2}"
PropagateTags: TASK_DEFINITION
TaskDefinition: "${TranscodeEcsTaskDefinitionArn}"
ResultPath: "$.Void"
OutputPath: "$"
Next: Transcode Task Results Formatter
Retry:
- ErrorEquals:
- States.ALL
IntervalSeconds: 15
MaxAttempts: 5
BackoffRate: 2
Catch:
- ErrorEquals:
- States.ALL
ResultPath: "$.Error"
Next: TaskResult Error Callback Map
Transcode Task Results Formatter:
Type: Task
Comment: Formats the output of a transcode task
Resource: "${TranscodeTaskOutputLambdaFunctionArn}"
InputPath: "$"
Parameters:
Job:
Id.$: "$.Job.Id"
Execution:
Id.$: "$$.Execution.Id"
Task.$: "$.Task"
TaskIteratorIndex.$: "$.TaskIndex"
ResultPath: "$.TaskResult"
OutputPath: "$"
Next: TaskResult Callbacks Map
Retry:
- ErrorEquals:
- States.ALL
IntervalSeconds: 5
MaxAttempts: 3
BackoffRate: 2
Catch:
- ErrorEquals:
- States.ALL
ResultPath: "$.Error"
Next: TaskResult Error Callback Map
Image Task Handler:
Type: Task
Comment: Manipulates and converts image artifacts
Resource: "${ImageTransformLambdaFunctionArn}"
InputPath: "$"
Parameters:
Job:
Id.$: "$.Job.Id"
Artifact.$: "$.Artifact"
Task.$: "$.Task"
ResultPath: "$.TaskResult"
OutputPath: "$"
Next: TaskResult Callbacks Map
Retry:
- ErrorEquals:
- States.ALL
IntervalSeconds: 5
MaxAttempts: 3
BackoffRate: 2
Catch:
- ErrorEquals:
- States.ALL
ResultPath: "$.Error"
Next: TaskResult Error Callback Map
WAV Wrap Task Handler:
Type: Task
Comment: WAV wraps audio artifacts
Resource: "${WavWrapLambdaFunctionArn}"
InputPath: "$"
Parameters:
Job:
Id.$: "$.Job.Id"
Artifact.$: "$.Artifact"
Task.$: "$.Task"
ResultPath: "$.TaskResult"
OutputPath: "$"
Next: TaskResult Callbacks Map
Retry:
- ErrorEquals:
- States.ALL
IntervalSeconds: 5
MaxAttempts: 3
BackoffRate: 2
Catch:
- ErrorEquals:
- States.ALL
ResultPath: "$.Error"
Next: TaskResult Error Callback Map
Detect Silence Task Handler:
Type: Task
Comment: Detects silence in audio files
Resource: "${SilenceDetectionLambdaFunctionArn}"
InputPath: "$"
Parameters:
Job:
Id.$: "$.Job.Id"
Artifact.$: "$.Artifact"
Task.$: "$.Task"
ResultPath: "$.TaskResult"
OutputPath: "$"
Next: TaskResult Callbacks Map
Retry:
- ErrorEquals:
- States.ALL
IntervalSeconds: 5
MaxAttempts: 3
BackoffRate: 2
Catch:
- ErrorEquals:
- States.ALL
ResultPath: "$.Error"
Next: TaskResult Error Callback Map
Detect Tone Task Handler:
Type: Task
Comment: Detects tones in audio files
Resource: "${ToneDetectionLambdaFunctionArn}"
InputPath: "$"
Parameters:
Job:
Id.$: "$.Job.Id"
Artifact.$: "$.Artifact"
Task.$: "$.Task"
ResultPath: "$.TaskResult"
OutputPath: "$"
Next: TaskResult Callbacks Map
Retry:
- ErrorEquals:
- States.ALL
IntervalSeconds: 5
MaxAttempts: 3
BackoffRate: 2
Catch:
- ErrorEquals:
- States.ALL
ResultPath: "$.Error"
Next: TaskResult Error Callback Map
Waveform Task Handler:
Type: Task
Comment: Generates waveform data from audio files
Resource: "${WaveformLambdaFunctionArn}"
InputPath: "$"
Parameters:
Job:
Id.$: "$.Job.Id"
Artifact.$: "$.Artifact"
Task.$: "$.Task"
ResultPath: "$.TaskResult"
OutputPath: "$"
Next: TaskResult Callbacks Map
Retry:
- ErrorEquals:
- States.ALL
IntervalSeconds: 5
MaxAttempts: 3
BackoffRate: 2
Catch:
- ErrorEquals:
- States.ALL
ResultPath: "$.Error"
Next: TaskResult Error Callback Map
Transcribe Task Job Start:
Type: Task
Comment: Transcribes audio from artifacts
Resource: arn:${AwsPartition}:states:::lambda:invoke.waitForTaskToken
InputPath: "$"
Parameters:
FunctionName: "${TranscriptionJobStartLambdaFunctionArn}"
Payload:
Job:
Id.$: "$.Job.Id"
Execution:
Id.$: "$$.Execution.Id"
Artifact.$: "$.Artifact"
Task.$: "$.Task"
TaskIteratorIndex.$: "$.TaskIndex"
TaskToken.$: "$$.Task.Token"
ResultPath: "$.TranscriptionJob"
OutputPath: "$"
Next: Transcribe Task Job Success Handler
Retry:
- ErrorEquals:
- InvalidTranscribeTaskInputError
- UnknownDestinationModeError
MaxAttempts: 0
- ErrorEquals:
- States.ALL
IntervalSeconds: 5
MaxAttempts: 3
BackoffRate: 2
Catch:
- ErrorEquals:
- States.ALL
ResultPath: "$.Error"
Next: TaskResult Error Callback Map
Transcribe Task Job Success Handler:
Type: Task
Comment: Formats the output of a transcribe task
Resource: "${TranscriptionJobResultsLambdaFunctionArn}"
InputPath: "$"
Parameters:
Task.$: "$.Task"
TranscriptionJob.$: "$.TranscriptionJob"
ResultPath: "$.TaskResult"
OutputPath: "$"
Next: TaskResult Callbacks Map
Retry:
- ErrorEquals:
- States.ALL
IntervalSeconds: 5
MaxAttempts: 3
BackoffRate: 2
Catch:
- ErrorEquals:
- States.ALL
ResultPath: "$.Error"
Next: TaskResult Error Callback Map
# Task callbacks
# All tasks operations end up at one of these callback states,
# depending on whether the task was successful or not. The value for
# a given task in the TaskResults array coming out of the Tasks Map
# will be the output whichever of these callback states is used.
#
# For failed tasks, the task callback will include information about
# the error. This information is not included in the job result
# callback.
#
# The output for a successful task is the result of the task.
# The output for a failed task is the original input task definition.
TaskResult Callbacks Map:
Type: Map
Comment: >-
Iterates over all callback endpoints to send messages when tasks
are successful
InputPath: "$"
ItemsPath: "$.Callbacks"
Parameters:
Callback.$: "$$.Map.Item.Value"
StateMachine:
Id.$: "$$.StateMachine.Id"
Execution:
Id.$: "$$.Execution.Id"
TaskIteratorIndex.$: "$.TaskIndex"
Message: # The JSON of this value will be sent to endpoints
Task.$: "$.Task"
TaskResult:
Job:
Id.$: "$.Job.Id"
Execution:
Id.$: "$$.Execution.Id"
Result.$: "$.TaskResult"
ResultPath: "$.Void" # The output of the iterator states is discarded
OutputPath: "$.TaskResult"
End: true
MaxConcurrency: 0
Iterator:
StartAt: Send TaskResult Callback
States:
Send TaskResult Callback:
Type: Task
Comment: >-
Sends a callback message to a single endpoint when tasks
are successful
InputPath: "$"
Resource: "${CallbackLambdaFunctionArn}"
ResultPath: "$"
OutputPath: "$"
End: true
Retry:
- ErrorEquals:
- States.ALL
IntervalSeconds: 5
MaxAttempts: 3
BackoffRate: 2
TaskResult Error Callback Map:
Type: Map
Comment: >-
Iterates over all callback endpoints to send messages when tasks
fail
InputPath: "$"
ItemsPath: "$.Callbacks"
Parameters:
Callback.$: "$$.Map.Item.Value"
StateMachine:
Id.$: "$$.StateMachine.Id"
Execution:
Id.$: "$$.Execution.Id"
TaskIteratorIndex.$: "$.TaskIndex"
Message: # The JSON of this value will be sent to endpoints
Task.$: "$.Task"
TaskResult:
Job:
Id.$: "$.Job.Id"
Execution:
Id.$: "$$.Execution.Id"
Error.$: "$.Error"
ResultPath: "$.Void" # The output of the iterator states is discarded
OutputPath: "$.Task"
End: true
MaxConcurrency: 0
Iterator:
StartAt: Send TaskResult Error Callback
States:
Send TaskResult Error Callback:
Type: Task
Comment: >-
Sends a callback message to a single endpoint when tasks
fail
InputPath: "$"
Resource: "${CallbackLambdaFunctionArn}"
ResultPath: "$"
OutputPath: "$"
End: true
Retry:
- ErrorEquals:
- States.ALL
IntervalSeconds: 5
MaxAttempts: 3
BackoffRate: 2
Catch:
- ErrorEquals:
- States.ALL
ResultPath: "$.Error"
Next: Tasks Map Catcher
Tasks Map Catcher:
Comment: >-
Add a parameter so task map errors can be identified. Is NOT responsible
for catching individual task execution errors (like if a Copy task fails)
Type: Pass
InputPath: "$"
Result: ITERATOR_ERROR
ResultPath: "$.State"
OutputPath: "$"
Next: Add Empty TaskResults
# End of task execution
# States below this will expect $.TaskResults to exist
Add Empty TaskResults:
Comment: >-
Add a TaskResults key with an empty array value, for cases where the task
iterator does not succeed and it's not otherwise added
Type: Pass
InputPath: "$"
Result: []
ResultPath: "$.TaskResults"
OutputPath: "$"
Next: JobResult Callback Map
# Start of post-processing
JobResult Callback Map:
Type: Map
Comment: Iterates over all callback endpoints to send job results
InputPath: "$"
ItemsPath: "$.Job.Callbacks"
Parameters:
Callback.$: "$$.Map.Item.Value"
StateMachine:
Id.$: "$$.StateMachine.Id"
Execution:
Id.$: "$$.Execution.Id"
Message: # The JSON of this value will be sent to endpoints
JobResult:
Job:
Id.$: "$.Job.Id"
Execution:
Id.$: "$$.Execution.Id"
State.$: "$.State"
# Task/Type = Null elements are filtered out.
# Successful tasks will have a Task property
TaskResults.$: "$.TaskResults.[?(@.Task && @.Task != 'Null')]"
# Failed tasks will have a Type property
FailedTasks.$: "$.TaskResults.[?(@.Type && @.Type != 'Null')]"
ResultPath: "$.Void" # The output of the iterator states is discarded
OutputPath: "$"
Next: Serialized Jobs Map
MaxConcurrency: 0
Iterator:
StartAt: Send JobResult Callback
States:
Send JobResult Callback:
Type: Task
Comment: >-
Sends a callback message to a single endpoint in the iterator with
a job result
InputPath: "$"
Resource: "${CallbackLambdaFunctionArn}"
ResultPath: "$"
OutputPath: "$"
End: true
Retry:
- ErrorEquals:
- States.ALL
IntervalSeconds: 5
MaxAttempts: 3
BackoffRate: 2
Serialized Jobs Map:
Type: Map
Comment: Iterates over all serialized jobs and sends them to SNS
InputPath: "$"
ItemsPath: "$.Job.SerializedJobs"
Parameters:
Execution:
Id.$: "$$.Execution.Id"
ExecutionTrace.$: "$.Job.ExecutionTrace"
SerializedJob.$: "$$.Map.Item.Value"
ResultPath: "$.Void" # The output of the iterator states is discarded
OutputPath: "$"
Next: Normalize Output
MaxConcurrency: 0
Iterator:
StartAt: Start Serialized Job Execution
States:
Start Serialized Job Execution:
Type: Task
Comment: Sends a serialized job to the job execution SNS topic
InputPath: "$"
Resource: "${JobSerializerLambdaFunctionArn}"
ResultPath: "$"
OutputPath: "$"
End: true
Retry:
- ErrorEquals:
- States.ALL
IntervalSeconds: 5
MaxAttempts: 3
BackoffRate: 2
Normalize Output:
Comment: Normalizes output data
Type: Task
InputPath: "$"
Retry:
- ErrorEquals:
- States.ALL
IntervalSeconds: 1
MaxAttempts: 2
BackoffRate: 1
Parameters:
StateMachine:
Id.$: "$$.StateMachine.Id"
Message:
JobResult:
Job:
Id.$: "$.Job.Id"
Execution:
Id.$: "$$.Execution.Id"
State.$: "$.State"
TaskResults.$: "$.TaskResults.[?(@.Task && @.Task != 'Null')]"
FailedTasks.$: "$.TaskResults.[?(@.Type && @.Type != 'Null')]"
Resource: "${NormalizeOutputLambdaFunctionArn}"
ResultPath: "$"
OutputPath: "$"
End: true
# End of post-processing