-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathpool.go
175 lines (149 loc) · 2.98 KB
/
pool.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
package grpool
import (
"container/list"
"errors"
"sync"
"time"
)
type Job func()
type Pool struct {
Max int
MaxIdle int // 必须设置,否则可能死锁
Idle time.Duration
Live time.Duration
Wait bool
idleSignal chan *worker
mu sync.Mutex
idlelist *list.List
cnt int
idleCnt int
running bool
}
func (p *Pool) lazyinit() {
if p.running {
return
}
p.idlelist = list.New()
p.idleSignal = make(chan *worker, p.MaxIdle)
p.running = true
}
// 添加任务
func (p *Pool) Submit(job Job) error {
p.lazyinit()
p.mu.Lock()
defer p.mu.Unlock()
// 删除老旧的worker
p.kill()
EXEC:
// 存在空闲worker, 执行job
w, ok := p.getWorker()
if ok != false {
w.jobs <- job
p.idlelist.Remove(p.idlelist.Front())
p.idleCnt--
return nil
}
// 不存在空闲worker
// 1. 数量未达到上限
if p.cnt < p.Max {
now := time.Now()
w := &worker{
pool: p,
created: now,
jobs: make(chan Job, 1),
}
w.jobs <- job
p.cnt++
// pool写goroutine,而不是worker起,原因是这样可以在关闭池的时候做到更好的控制
// 比如这里可以将包装匿名函数,在匿名函数中加上sync.WaitGroup,那么就Close就可以做到等到所有worker完工后才退出
go w.run()
return nil
}
// 2. 数量达到上限
if !p.Wait {
return errors.New("no worker")
}
w = <-p.idleSignal
p.idleCnt++
p.idlelist.PushBack(w)
p.keepMaxIdle()
goto EXEC
}
// 删除闲置时间过长的worker
func (p *Pool) kill() {
var (
cur = p.idlelist.Front()
next *list.Element
)
for cur != nil {
next = cur.Next()
// 释放无用连接
now := time.Now()
w := cur.Value.(*worker)
if !w.sleep.Add(p.Idle).Before(now) {
p.idlelist.Remove(cur)
p.cnt--
}
cur = next
}
}
// 保存空闲的数量维持在MaxIdle
func (p *Pool) keepMaxIdle() {
var (
cur = p.idlelist.Front()
next *list.Element
)
for {
select {
case w := <-p.idleSignal:
p.idlelist.PushBack(w)
p.idleCnt++
default:
for p.idlelist.Len() > p.MaxIdle {
next = cur.Next()
p.idlelist.Remove(cur)
cur = next
}
return
}
}
}
// 删除存在时间过长的worker,返回第一个未在移除条件内worker
func (p *Pool) getWorker() (w *worker, get bool) {
var (
cur = p.idlelist.Front()
next *list.Element
)
for cur != nil {
next = cur.Next()
now := time.Now()
w := cur.Value.(*worker)
if !w.created.Add(p.Live).Before(now) {
p.idlelist.Remove(cur)
p.cnt--
} else {
return w, true
}
cur = next
}
return
}
// 释放池中所有的worker, 仅仅是发出退出信号,如果需要等待worker完全退出需要在发起goroutine时使用sync.WaitGroup控制
func (p *Pool) Close() {
p.mu.Lock()
defer p.mu.Unlock()
for e := p.idlelist.Front(); e != nil; e = e.Next() {
w := e.Value.(*worker)
w.release()
}
p.cnt = p.cnt - p.idlelist.Len()
p.idlelist = nil
for p.cnt != 0 {
w := <-p.idleSignal
w.release()
p.cnt--
}
p.idleCnt = 0
p.idleSignal = nil
p.running = false
}