-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathengine.go
88 lines (74 loc) · 1.61 KB
/
engine.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
package engine
import (
"context"
"fmt"
"sync"
uuid "github.com/satori/go.uuid"
)
// Engine - for creating workers and distributing jobs
type Engine struct {
id string
ctx context.Context
stop chan struct{}
pool chan chan work
input chan work
workers []*worker
start *sync.Once
}
// NewEngine - initializing a new engine
func NewEngine(ctx context.Context) *Engine {
return &Engine{
id: fmt.Sprintf("%s-%s", "dispatcher", uuid.NewV4().String()),
ctx: ctx,
start: new(sync.Once),
}
}
// Start - starting workers and setting up dispatcher for use
func (e *Engine) Start(workerCount uint) {
e.start.Do(func() {
e.stop = make(chan struct{})
e.pool = make(chan chan work)
e.input = make(chan work)
e.workers = make([]*worker, 0)
for i := 0; i <= int(workerCount); i++ {
worker := NewWorker(e.ctx, e.pool)
e.workers = append(e.workers, worker)
worker.Start()
}
go e.dispatch()
})
}
// Stop - closes channels/goroutines
func (e *Engine) Stop() {
defer func() { e.start = new(sync.Once) }()
for _, worker := range e.workers {
worker.Stop()
}
close(e.stop)
}
// Do - executes work
func (e *Engine) Do(executable Executable) chan bool {
done := make(chan bool)
e.input <- work{
Executable: executable,
success: done,
}
return done
}
// WorkerCount - returns worker count
func (e *Engine) WorkerCount() int {
return len(e.workers)
}
func (e *Engine) dispatch() {
for {
select {
case work := <-e.input:
log(e.id).Debugf("dispatching: %v", work)
worker := <-e.pool
worker <- work
case <-e.stop:
log(e.id).Debugf("stopping...")
return
}
}
}