-
Notifications
You must be signed in to change notification settings - Fork 1
/
flow.go
68 lines (60 loc) · 1.3 KB
/
flow.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
package flow
import (
"time"
"github.com/konimarti/flow/filters"
"github.com/konimarti/flow/observer"
)
//New return an Observer that receive the results of the flow
func New(nf filters.Filter, s Source) observer.Observer {
return s.Run(nf)
}
//Source is the interface for input for the flow
type Source interface {
Run(f filters.Filter) observer.Observer
}
//Func implements the Source interface and regularly calls a function
type Func struct {
Fn func() interface{}
Refresh time.Duration
}
//Run calls the given function in regular intervals
func (f *Func) Run(nf filters.Filter) observer.Observer {
o := observer.NewObserver()
c := time.Tick(f.Refresh)
go func() {
for {
select {
case <-c:
if v := f.Fn(); nf.Check(v) {
o.Notify(nf.Update(v))
}
case <-o.Control().C:
o.Control().D <- true
return
}
}
}()
return o
}
//Chan implements the Source interface and provides the input for the flow
type Chan struct {
Ch chan interface{}
}
//Run passed the channel data to the filters
func (c *Chan) Run(nf filters.Filter) observer.Observer {
o := observer.NewObserver()
go func() {
for {
select {
case v := <-c.Ch:
if nf.Check(v) {
o.Notify(nf.Update(v))
}
case <-o.Control().C:
o.Control().D <- true
return
}
}
}()
return o
}