-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathjunction.go
64 lines (57 loc) · 1.57 KB
/
junction.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
package gopipe
import (
"fmt"
"log"
)
/*
Junction joins one or more pipelines together
*/
type Junction struct {
routingFn func(interface{}) interface{}
router map[interface{}]*Pipeline
DebugMode bool // Option to log pipeline state transitions, false by default
}
/*
RoutingFunc takes in an item flowing through the pipeline and maps it to another item.
The output item is used to then route data flowing through the Junction to the right
pipeline attached to it
*/
type RoutingFunc func(in interface{}) interface{}
/*
NewJunction creates a new Junction
*/
func NewJunction(rFunc RoutingFunc) Junction {
return Junction{routingFn: rFunc, router: make(map[interface{}]*Pipeline)}
}
/*
AddPipeline adds a pipeline to a junction. Items that output the given key
when fed into the routing function for this junction are routed the given pipeline
*/
func (j *Junction) AddPipeline(key interface{}, p *Pipeline) *Junction {
j.router[key] = p
return j
}
// route will attach to a pipeline and start routing messages
// to other pipelines based on routing function
func (j *Junction) route(in chan interface{}) {
go func() {
for item := range in {
routingKey := j.routingFn(item)
j.debug(fmt.Sprintf("routing key: %v", routingKey))
if dest, ok := j.router[routingKey]; ok {
dest.Enqueue(item)
} else {
j.debug(fmt.Sprintf("Junction discarding item: %+v no dest pipeline for routing key: %+v",
item, routingKey))
}
}
}()
}
/*
debug Prints log statements if debugLog is true
*/
func (j *Junction) debug(values ...string) {
if j.DebugMode {
log.Println(values)
}
}