-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmessage.go
149 lines (118 loc) · 4.47 KB
/
message.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
package message_channel
import (
"context"
"sync"
"time"
)
// Channel 用于把多个Channel连接为一个channel,这样就可以用基于channel构建更复杂的通信模型
// 每个Channel对象是对go原生的channel的一个包装,同时增加了一些功能
type Channel[Message any] struct {
// 全局唯一的ID,每个信道的ID都不同,用于区分不同的信道
ID uint64
// 真实存储数据的channel,每个channel都有一个消息发送方和消息接收方
channel chan Message
// 此信道创建的子信道,子信道会被连接到父信道,同时父信道要等所有的子信道退出后才能退出
// 在当前信道要调用其他组件传入信道时就需要创建一个子信道传进去
childrenChannelMap *ChildrenMap[Message]
// 自己的负责处理消息的协程的退出标志位,用于外界的同步等待
selfWorkerWg *sync.WaitGroup
// 创建信道时的选项
options *ChannelOptions[Message]
}
// NewChannel 创建一个信道
func NewChannel[Message any](options *ChannelOptions[Message]) *Channel[Message] {
x := &Channel[Message]{
ID: idGenerator.Add(1),
channel: make(chan Message, options.ChannelBuffSize),
options: options,
childrenChannelMap: NewChildrenMap[Message](),
selfWorkerWg: &sync.WaitGroup{},
}
// 启动处理消息的协程
x.selfWorkerWg.Add(1)
go func() {
defer func() {
// 退出的时候需要设置自己的退出标记位
x.selfWorkerWg.Done()
// 同时退出的时候如果有事件回调的话需要触发一下事件回调
if x.options.CloseEventListener != nil {
x.options.CloseEventListener()
}
}()
// 开始消费,处理channel
count := 0
for message := range x.channel {
count++
if x.options.ChannelConsumerFunc != nil {
x.options.ChannelConsumerFunc(count, message)
}
}
}()
return x
}
// Send 往当前的消息队列中发送一条消息,消息放入之后会被异步处理
func (x *Channel[Message]) Send(ctx context.Context, message Message) error {
select {
case x.channel <- message:
return nil
case <-ctx.Done():
return context.Canceled
}
}
// MakeChildChannel 创建一条新的消息队列,对接到当前的消息队列上作为一个子队列
// 当前队列关闭之前需要等待所有的孩子队列关闭
func (x *Channel[Message]) MakeChildChannel() *Channel[Message] {
subChannel := NewChannel[Message](&ChannelOptions[Message]{
// 创建一个子信道,并将子信道上的所有消息都转发到父信道上,这意味着父信道只能等子信道关闭之后才能够关闭
ChannelConsumerFunc: func(index int, message Message) {
x.channel <- message
},
// 子信道的缓存大小和父信道保持一致
ChannelBuffSize: x.options.ChannelBuffSize,
})
// 在子信道关闭的时候告知父信道自己已经退出了
subChannel.options.CloseEventListener = func() {
ctx, cancelFunc := context.WithTimeout(context.Background(), time.Second*30)
defer cancelFunc()
err := x.childrenChannelMap.Remove(ctx, subChannel.ID)
if err != nil {
// TODO
}
}
// 为当前信道增加一个孩子信道
ctx, cancelFunc := context.WithTimeout(context.Background(), time.Second*30)
defer cancelFunc()
err := x.childrenChannelMap.Set(ctx, subChannel.ID, subChannel)
if err != nil {
// TODO
}
return subChannel
}
// ReceiverWait 消息的接收方调用的,消息的接收方需要同步等待此消息信道被处理完毕时调用
func (x *Channel[Message]) ReceiverWait(ctx context.Context) {
// 消息接收方等待发送消息的协程退出就认为是信道已经处理完了
x.selfWorkerWg.Wait()
select {}
}
// SenderWaitAndClose 消息的发送方调用,消息的发送方需要同步等待消息被处理完时调用
func (x *Channel[Message]) SenderWaitAndClose(f ...MapRunFunc[Message]) {
if len(f) == 0 {
f = append(f, nil)
}
// 等待子channel消费完成退出
timeout, cancelFunc := context.WithTimeout(context.Background(), time.Second*30)
defer cancelFunc()
err := x.childrenChannelMap.BlockUtilEmpty(timeout, f[0])
if err != nil {
// TODO
}
// 关闭channel表示发送者不会再发送了,发送完队列中剩余的想这些就要退出了
close(x.channel)
// 等待消费完队列中剩余的消息
x.selfWorkerWg.Wait()
}
// TopologyAscii 把拓扑逻辑转为ASCII图形,这样就能比较方便的观察依赖关系了
func (x *Channel[Message]) TopologyAscii(f ...MapRunFunc[Message]) string {
// TODO
return ""
}