-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathtask_manager.go
103 lines (87 loc) · 2.11 KB
/
task_manager.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
package server
import (
"sync"
"time"
)
// TaskManager is a component of the Router that controls the execution of external programs
// and tasks registered by the user
type TaskManager struct {
Router *Router
running bool
backgroundMutex *Mutex
programs map[string]*program
tasks map[string]*Task
ticker10s *time.Ticker
ticker1m *time.Ticker
ticker10m *time.Ticker
ticker30m *time.Ticker
ticker1h *time.Ticker
}
func (router *Router) newTaskManager() {
router.TaskMgr = &TaskManager {
Router: router, backgroundMutex: NewMutex(),
programs: make(map[string]*program), tasks: make(map[string]*Task),
ticker10s: time.NewTicker(time.Second * 10), ticker1m: time.NewTicker(time.Minute),
ticker10m: time.NewTicker(time.Minute * 10), ticker30m: time.NewTicker(time.Minute * 30),
ticker1h: time.NewTicker(time.Hour),
}
}
func (tm *TaskManager) start() {
tm.running = true
wg := new(sync.WaitGroup)
for _, t := range tm.tasks {
wg.Add(1)
go func(task *Task) {
tm.startTask(task)
wg.Done()
}(t)
}
wg.Wait()
tm.Router.Log(LOG_LEVEL_INFO, "Tasks startup completed")
go func() {
for tm.running {
select {
case <-tm.ticker10s.C:
tm.runTasksWithTimer(TASK_TIMER_10_SECONDS)
case <-tm.ticker1m.C:
tm.runTasksWithTimer(TASK_TIMER_1_MINUTE)
case <-tm.ticker10m.C:
tm.runTasksWithTimer(TASK_TIMER_10_MINUTES)
case <-tm.ticker30m.C:
tm.runTasksWithTimer(TASK_TIMER_30_MINUTES)
case <-tm.ticker1h.C:
tm.runTasksWithTimer(TASK_TIMER_1_HOUR)
}
}
}()
}
func (tm *TaskManager) stop() {
tm.running = false
tm.ticker1m.Stop()
tm.ticker10m.Stop()
tm.ticker30m.Stop()
tm.ticker1h.Stop()
var stillRunning int
wg := new(sync.WaitGroup)
for _, t := range tm.tasks {
stillRunning ++
wg.Add(1)
go func(task *Task) {
tm.stopTask(task)
stillRunning --
wg.Done()
}(t)
}
counter := 100
for stillRunning > 0 && counter > 0 {
time.Sleep(time.Millisecond * 100)
counter --
}
if counter == 0 {
for _, t := range tm.tasks {
t.killChan <- struct{}{}
}
}
wg.Wait()
tm.Router.Log(LOG_LEVEL_INFO, "Tasks cleanup completed")
}