-
Notifications
You must be signed in to change notification settings - Fork 43
/
Copy pathevent_emitter.go
270 lines (227 loc) · 6.98 KB
/
event_emitter.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
package mediasoup
import (
"encoding/json"
"fmt"
"reflect"
"sync"
"github.com/go-logr/logr"
)
// EventEmitter defines an interface of the Event-based architecture(like EventEmitter in JavaScript).
type IEventEmitter interface {
// On adds the listener function to the end of the listeners array for the event named eventName.
// No checks are made to see if the listener has already been added.
// Multiple calls passing the same combination of eventName and listener will result in the listener
// being added, and called, multiple times.
// By default, a maximum of 10 listeners can be registered for any single event.
// This is a useful default that helps finding memory leaks. Note that this is not a hard limit.
// The EventEmitter instance will allow more listeners to be added but will output a trace warning
// to log indicating that a "possible EventEmitter memory leak" has been detected.
On(eventName string, listener interface{})
// Once adds a one-time listener function for the event named eventName.
// The next time eventName is triggered, this listener is removed and then invoked.
Once(eventName string, listener interface{})
// Emit calls each of the listeners registered for the event named eventName,
// in the order they were registered, passing the supplied arguments to each.
// Returns true if the event had listeners, false otherwise.
Emit(eventName string, argv ...interface{}) bool
// SafeEmit calls each of the listeners registered for the event named eventName. It recovers
// panic and logs panic info with provided logger.
SafeEmit(eventName string, argv ...interface{}) bool
// Off removes the specified listener from the listener array for the event named eventName.
Off(eventName string, listener interface{})
// RemoveAllListeners removes all listeners, or those of the specified eventNames.
RemoveAllListeners(eventNames ...string)
// ListenerCount returns total number of all listeners, or those of the specified eventNames.
ListenerCount(eventNames ...string) int
}
type EventEmitter struct {
mu sync.Mutex
listeners map[string][]*intervalListener
logger logr.Logger
}
func NewEventEmitter() IEventEmitter {
return &EventEmitter{
logger: NewLogger("EventEmitter"),
}
}
func (e *EventEmitter) On(event string, listener interface{}) {
e.mu.Lock()
defer e.mu.Unlock()
if e.listeners == nil {
e.listeners = make(map[string][]*intervalListener)
}
e.listeners[event] = append(e.listeners[event], newInternalListener(listener, false))
}
func (e *EventEmitter) Once(event string, listener interface{}) {
e.mu.Lock()
defer e.mu.Unlock()
if e.listeners == nil {
e.listeners = make(map[string][]*intervalListener)
}
e.listeners[event] = append(e.listeners[event], newInternalListener(listener, true))
}
func (e *EventEmitter) Emit(event string, args ...interface{}) bool {
e.mu.Lock()
if e.listeners == nil {
e.mu.Unlock()
return false
}
listeners := e.listeners[event]
e.mu.Unlock()
for _, listener := range listeners {
if listener.once != nil {
e.Off(event, listener.listenerValue.Interface())
}
// may panic
listener.Call(args...)
}
return len(listeners) > 0
}
func (e *EventEmitter) SafeEmit(event string, args ...interface{}) bool {
e.mu.Lock()
if e.listeners == nil {
e.mu.Unlock()
return false
}
listeners := e.listeners[event]
e.mu.Unlock()
call := func(listener *intervalListener) {
defer func() {
if r := recover(); r != nil {
e.logger.Error(fmt.Errorf("%v", r), "emit panic")
}
}()
// may panic
listener.Call(args...)
}
for _, listener := range listeners {
if listener.once != nil {
e.Off(event, listener.listenerValue.Interface())
}
call(listener)
}
return len(listeners) > 0
}
func (e *EventEmitter) Off(event string, listener interface{}) {
e.mu.Lock()
defer e.mu.Unlock()
if e.listeners == nil {
return
}
listeners := e.listeners[event]
handlerPtr := reflect.ValueOf(listener).Pointer()
for i, internalListener := range listeners {
if internalListener.listenerValue.Pointer() == handlerPtr {
e.listeners[event] = append(listeners[0:i], listeners[i+1:]...)
break
}
}
}
func (e *EventEmitter) RemoveAllListeners(events ...string) {
e.mu.Lock()
defer e.mu.Unlock()
if e.listeners == nil || len(events) == 0 {
e.listeners = nil
return
}
for _, event := range events {
delete(e.listeners, event)
}
}
func (e *EventEmitter) ListenerCount(events ...string) (total int) {
e.mu.Lock()
defer e.mu.Unlock()
if e.listeners == nil || len(events) == 0 {
for _, listeners := range e.listeners {
total += len(listeners)
}
return
}
for _, event := range events {
total += len(e.listeners[event])
}
return
}
type intervalListener struct {
listenerValue reflect.Value
argTypes []reflect.Type
once *sync.Once
}
func newInternalListener(listener interface{}, once bool) *intervalListener {
var argTypes []reflect.Type
listenerValue := reflect.ValueOf(listener)
listenerType := listenerValue.Type()
for i := 0; i < listenerType.NumIn(); i++ {
argTypes = append(argTypes, listenerType.In(i))
}
l := &intervalListener{
listenerValue: listenerValue,
argTypes: argTypes,
}
if once {
l.once = &sync.Once{}
}
return l
}
func (l *intervalListener) Call(args ...interface{}) {
call := func() {
argValues := make([]reflect.Value, len(args))
for i, arg := range args {
argValues[i] = reflect.ValueOf(arg)
}
if !l.listenerValue.Type().IsVariadic() {
argValues = l.alignArguments(argValues)
}
argValues = l.convertArguments(argValues)
// call listener function and ignore returns
l.listenerValue.Call(argValues)
}
if l.once != nil {
l.once.Do(call)
} else {
call()
}
}
func (l intervalListener) convertArguments(args []reflect.Value) []reflect.Value {
if len(args) != len(l.argTypes) {
return args
}
actualArgs := make([]reflect.Value, len(args))
for i, arg := range args {
// Unmarshal bytes to golang type
if isBytesType(arg.Type()) && !isBytesType(l.argTypes[i]) {
val := reflect.New(l.argTypes[i]).Interface()
if err := json.Unmarshal(arg.Bytes(), val); err == nil {
actualArgs[i] = reflect.ValueOf(val).Elem()
}
} else if arg.Type() != l.argTypes[i] &&
arg.Type().ConvertibleTo(l.argTypes[i]) {
actualArgs[i] = arg.Convert(l.argTypes[i])
} else {
actualArgs[i] = arg
}
}
return actualArgs
}
func (l intervalListener) alignArguments(args []reflect.Value) (actualArgs []reflect.Value) {
// delete unwanted arguments
if argLen := len(l.argTypes); len(args) >= argLen {
actualArgs = args[0:argLen]
} else {
actualArgs = args[:]
// append missing arguments with zero value
for _, argType := range l.argTypes[len(args):] {
actualArgs = append(actualArgs, reflect.Zero(argType))
}
}
return actualArgs
}
func isValidListener(fn interface{}) error {
if reflect.TypeOf(fn).Kind() != reflect.Func {
return fmt.Errorf("%s is not a reflect.Func", reflect.TypeOf(fn))
}
return nil
}
func isBytesType(tp reflect.Type) bool {
return tp.Kind() == reflect.Slice && tp.Elem().Kind() == reflect.Uint8
}