-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathpublisher.go
167 lines (143 loc) · 3.7 KB
/
publisher.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
package events
import (
"fmt"
"runtime"
"sync"
"time"
"github.com/pkg/errors"
"github.com/streadway/amqp"
"github.com/w1ck3dg0ph3r/rabbit-events/pkg/channel"
)
// publisher is an event publishing worker
//
// Encapsulates AMQP channel that is configured for publishing messages. When message is
// sent on the Publishings channel, publisher will send the message and wait for acknowledgement
// from the broker. When publishing is acked/nacked by the broker or the bus is shutdown,
// publisher will send the result on publishing's done chan.
type publisher struct {
ID int
Connection ConnectionOpenCloser
Exchange string
ShutdownTimeout time.Duration
MaxPublishingsInFlight int
Publishings chan publishing
ShouldQuit chan error
Logger Logger
ch channel.Channel
err chan *amqp.Error
confirms chan amqp.Confirmation
// publishing delivery tag, starting with 1
tag uint64
// publishings in flight by delivery tag
publishings map[uint64]publishing
}
type publishing struct {
e *Event
done chan bool
err error
}
func newPublishing(e *Event) publishing {
return publishing{e, make(chan bool), nil}
}
// Run loops over received publishing confirmations until quit is closed or unrecoverable
// connection error occurs
func (p *publisher) Run(wg *sync.WaitGroup, quit <-chan struct{}) {
defer wg.Done()
p.debugf("running event publisher %d", p.ID)
err := p.createChannel()
if err != nil {
err = errors.Wrap(err, "cant create publisher channel")
p.ShouldQuit <- err
return
}
for {
select {
case pub := <-p.Publishings:
msg := eventToPublishing(pub.e)
err := p.ch.Publish(p.Exchange, pub.e.Name, false, false, *msg)
if err != nil {
break
}
p.tag++
p.publishings[p.tag] = pub
case c := <-p.confirms:
pub := p.publishings[c.DeliveryTag]
delete(p.publishings, c.DeliveryTag)
if !c.Ack {
pub.err = fmt.Errorf("publisher %d delivery %d nacked by broker", p.ID, c.DeliveryTag)
}
pub.done <- c.Ack
case amqpErr, ok := <-p.err:
if ok {
if err := p.handleProtocolError(amqpErr); err != nil {
return
}
}
case <-quit:
p.debugf("stopping event publisher %d", p.ID)
waitForOutstandingPublishings(p.publishings, p.ShutdownTimeout)
return
}
}
}
func (p *publisher) handleProtocolError(amqpErr error) (err error) {
p.debugf("publisher %d error: %s", p.ID, amqpErr)
p.nackOutstandingPublishings(amqpErr)
err = p.createChannel()
if err != nil {
p.debugf("cant recreate publisher channel %d: %s", p.ID, err)
p.ShouldQuit <- err
return
}
return
}
// createChannel opens and configures for publishing AMQP channel
func (p *publisher) createChannel() (err error) {
p.ch, err = p.Connection.Open()
if err != nil {
return
}
// put channel in confirm mode
err = p.ch.Confirm(false)
if err != nil {
return
}
// subscribe to error notifications
p.err = make(chan *amqp.Error, 1)
p.ch.NotifyClose(p.err)
// subscribe to publishing confirmations
p.confirms = make(chan amqp.Confirmation, p.MaxPublishingsInFlight)
p.ch.NotifyPublish(p.confirms)
p.publishings = make(map[uint64]publishing, p.MaxPublishingsInFlight)
p.tag = 0
return
}
func (p *publisher) nackOutstandingPublishings(err error) {
for tag := range p.publishings {
pub := p.publishings[tag]
pub.err = err
pub.done <- false
}
}
func waitForOutstandingPublishings(pubs map[uint64]publishing, timeout time.Duration) {
done := make(chan struct{}, 1)
go func() {
for {
l := len(pubs)
if l == 0 {
done <- struct{}{}
return
}
runtime.Gosched()
}
}()
select {
case <-done:
case <-time.After(timeout):
}
}
func (p *publisher) debugf(f string, a ...interface{}) {
if p.Logger != nil {
p.Logger.Debugf(f, a...)
}
}