-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathconcurrent.go
123 lines (107 loc) · 3.45 KB
/
concurrent.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
package pipeline
import (
"context"
"errors"
)
type (
// ConcurrentStep wraps multiple steps of a given Input/Output and runs them concurrently, later
// reducing them into a single output of the same type.
ConcurrentStep[I, O any] struct {
steps []Step[I, O]
reduce reducer[O]
}
// reducer reduces two values of the same type in a single one
reducer[O any] func(context.Context, O, O) (O, error)
// concurrentResult is a discriminated union of a result or error.
concurrentResult[T any] struct {
Ret T
Err error
}
)
// NewConcurrentStep creates a step that will run each of the inner steps concurrently.
// The step will wait for all of the steps to finish before returning.
//
// If one of them fails, the step will wait until everyone finishes and after that return the first encountered error.
//
// This step (as all the others) doesn't handle panics. Be careful since this step creates goroutines and the panics
// not necessarily will be signaled in the same goroutine as the origin call.
// Make sure to handle panics on your own if your code is unsafe (through decorations / deferrals in steps / etc)
func NewConcurrentStep[I, O any](steps []Step[I, O], reduce reducer[O]) ConcurrentStep[I, O] {
return ConcurrentStep[I, O]{
steps: steps,
reduce: reduce,
}
}
func (c ConcurrentStep[I, O]) Draw(graph Graph) {
if len(c.steps) > 0 {
var forkSteps []GraphDrawer
for _, s := range c.steps {
forkSteps = append(forkSteps, s.Draw)
}
graph.AddConcurrency(forkSteps...)
}
}
// Run the step concurrently, if one of them fails an error will be returned.
//
// This step waits for all of the concurrent ones to finish.
//
// Note that this step may use goroutines and (as all other steps) doesn't handle panics,
// hence it is advise to handle them on your own if you can't guarantee a panic-safe environment.
func (c ConcurrentStep[I, O]) Run(ctx context.Context, in I) (O, error) {
if len(c.steps) == 0 {
return *new(O), errors.New("cannot run with empty concurrent steps")
}
mch := c.runConcurrently(ctx, c.steps, in)
var acc O
var err error
for i := 0; i < len(c.steps); i++ {
v := <-mch
if err != nil {
continue // we want all steps to finish, so simply cut here and wait for next step to end.
}
if v.Err != nil {
err = v.Err // step errored.
continue
}
if i == 0 {
acc = v.Ret
} else {
acc, err = c.reduce(ctx, acc, v.Ret)
}
}
return acc, err
}
// Run a number of workers concurrently, waiting for all of them to finish.
// After they're all done, if one of them failed the error is returned.
// If more than one fails, the last error is returned
//
// Note: this method doesn't recover from panics in goroutines. To be cohesive across the whole
// API none of the steps handle panics to let the client handle them on their own
// (through decorations / same steps with deferrals / whatever he wants to)
func (c ConcurrentStep[I, O]) runConcurrently(
ctx context.Context,
workers []Step[I, O],
in I,
) <-chan concurrentResult[O] {
ch := make(chan concurrentResult[O], len(workers))
if len(workers) > 1 {
for i := 0; i < len(workers); i++ {
go c.runStep(ctx, in, workers[i], ch)
}
} else { // avoid concurrency, no need to spawn and wait just use current
c.runStep(ctx, in, workers[0], ch)
}
return ch
}
func (c ConcurrentStep[I, O]) runStep(
ctx context.Context,
in I,
step Step[I, O],
ch chan<- concurrentResult[O],
) {
res, err := step.Run(ctx, in)
ch <- concurrentResult[O]{
Ret: res,
Err: err,
}
}