-
Notifications
You must be signed in to change notification settings - Fork 4
/
Copy pathstream.go
281 lines (231 loc) · 7.12 KB
/
stream.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
/*
Package eventsource is a library for dealing with server sent events in Go.
The library attempts to make as few assumptions as possible about how your apps
will be written, and remains as generic as possible while still being simple and useful.
The three core objects to the library are Clients, Events, and Streams.
Client wraps an HTTP connection, and runs a worker routine to send events to the connected
client in a thread-safe way. It gracefully handles client disconnects.
Event encapsulates all the data that can be sent with SSE, and contains the logic for converting
it to wire format to send. Events are not thread-safe by themselves.
Stream is an abstraction for 0 or more Client connections, and adds some multiplexing and filtering
on top of the Client. It also can act as an http.Handler to automatically register inbound client
connections and disconnections.
A quick example of a simple sever that broadcasts a "tick" event every second
func main() {
stream := eventsource.NewStream()
go func(s *eventsource.Stream) {
for {
time.Sleep(time.Second)
stream.Broadcast(eventsource.DataEvent("tick"))
}
}(stream)
http.ListenAndServe(":8080", stream)
}
*/
package eventsource
import (
"net/http"
"sync"
)
// Stream abstracts several client connections together and allows
// for event multiplexing and topics.
// A stream also implements an http.Handler to easily register incoming
// http requests as new clients.
type Stream struct {
clients map[*Client]topicList
listLock sync.RWMutex
shutdownWait sync.WaitGroup
clientConnectHook func(*http.Request, *Client)
errors chan *ClientError
}
// ClientError is published down a stream's Error channel when there are
// errors with Publishing or Broadcasting.
// The error is the value returned from the Client.Send call
type ClientError struct {
Err error
Client *Client
}
type topicList map[string]bool
// NewStream creates a new stream object
func NewStream() *Stream {
return &Stream{
clients: make(map[*Client]topicList),
}
}
// Errors creates a buffered channel of errors that will contain
// errors from Publish or Broadcast events.
// The channel is no created before this call, so previous errors will not
// be delivered here.
// The channel is buffered to the given size. If the buffer is full and further
// errors occur, they are silently dropped.
func (s *Stream) Errors(size uint) <-chan *ClientError {
s.errors = make(chan *ClientError, size)
return s.errors
}
// Register adds a client to the stream to receive all broadcast
// messages. Has no effect if the client is already registered.
func (s *Stream) Register(c *Client) {
s.listLock.Lock()
defer s.listLock.Unlock()
// see if the client has been registered
if _, found := s.clients[c]; found {
return
}
// append new client
s.clients[c] = make(topicList)
}
// Remove will remove a client from this stream, but not shut the client down.
func (s *Stream) Remove(c *Client) {
s.listLock.Lock()
defer s.listLock.Unlock()
delete(s.clients, c)
}
// Broadcast sends the event to all clients registered on this stream.
func (s *Stream) Broadcast(e *Event) {
s.listLock.RLock()
defer s.listLock.RUnlock()
for cli := range s.clients {
err := cli.Send(e)
if err != nil {
tryPushError(s.errors, cli, err)
}
}
}
// Subscribe add the client to the list of clients receiving publications
// to this topic. Subscribe will also Register an unregistered
// client.
func (s *Stream) Subscribe(topic string, c *Client) {
s.listLock.Lock()
defer s.listLock.Unlock()
// see if the client is registered
topics, found := s.clients[c]
// register if not
if !found {
topics = make(topicList)
s.clients[c] = topics
}
topics[topic] = true
}
// Unsubscribe removes clients from the topic, but not from broadcasts.
func (s *Stream) Unsubscribe(topic string, c *Client) {
s.listLock.Lock()
defer s.listLock.Unlock()
topics, found := s.clients[c]
if !found {
return
}
topics[topic] = false
}
// Publish sends the event to clients that have subscribed to the given topic.
func (s *Stream) Publish(topic string, e *Event) {
s.listLock.RLock()
defer s.listLock.RUnlock()
for cli, topics := range s.clients {
if topics[topic] {
err := cli.Send(e)
if err != nil {
tryPushError(s.errors, cli, err)
}
}
}
}
// Shutdown terminates all clients connected to the stream and removes them
func (s *Stream) Shutdown() {
s.listLock.Lock()
defer s.listLock.Unlock()
for client := range s.clients {
client.Shutdown()
delete(s.clients, client)
}
}
// CloseTopic removes all client associations with this topic, but does not
// terminate them or remove
func (s *Stream) CloseTopic(topic string) {
s.listLock.Lock()
defer s.listLock.Unlock()
for _, topics := range s.clients {
topics[topic] = false
}
}
// ServeHTTP takes a client connection, registers it for broadcasts,
// then blocks so long as the connection is alive.
func (s *Stream) ServeHTTP(w http.ResponseWriter, r *http.Request) {
// ensure the client accepts an event-stream
if !checkRequest(r) {
http.Error(w, "This is an EventStream endpoint", http.StatusNotAcceptable)
return
}
// create the client
c := NewClient(w, r)
if c == nil {
http.Error(w, "EventStream not supported for this connection", http.StatusInternalServerError)
return
}
// wait for the client to exit or be shutdown
s.Register(c)
if s.clientConnectHook != nil {
s.clientConnectHook(r, c)
}
c.Wait()
s.Remove(c)
}
// TopicHandler returns an HTTP handler that will register a client for broadcasts
// and for any topics, and then block so long as they are connected
func (s *Stream) TopicHandler(topics []string) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
// ensure the client accepts an event-stream
if !checkRequest(r) {
http.Error(w, "This is an EventStream endpoint", http.StatusNotAcceptable)
return
}
// create the client
c := NewClient(w, r)
if c == nil {
http.Error(w, "EventStream not supported for this connection", http.StatusInternalServerError)
return
}
// broadcasts
s.Register(c)
// topics
for _, topic := range topics {
s.Subscribe(topic, c)
}
if s.clientConnectHook != nil {
s.clientConnectHook(r, c)
}
// wait for the client to exit or be shutdown
c.Wait()
s.Remove(c)
}
}
// ClientConnectHook sets a function to be called when a client connects to this stream's
// HTTP handler.
// Only one handler may be registered. Further calls overwrite the previous.
func (s *Stream) ClientConnectHook(fn func(*http.Request, *Client)) {
s.clientConnectHook = fn
}
// NumClients returns the number of currently connected clients
func (s *Stream) NumClients() int {
return len(s.clients)
}
// Checks that a client expects an event-stream
func checkRequest(r *http.Request) bool {
return r.Header.Get("Accept") == "text/event-stream"
}
// try and push an error to the error channel
// fail silently if channel doesn't exist or is full
func tryPushError(c chan<- *ClientError, cli *Client, err error) {
if c == nil {
return
}
cliErr := &ClientError{
err,
cli,
}
select {
case c <- cliErr:
// error posted
default:
// silently dropping error
}
}