Skip to content

Commit

Permalink
cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
davidae committed Jul 3, 2018
1 parent b5d97f0 commit 7b8e0e0
Show file tree
Hide file tree
Showing 6 changed files with 25 additions and 43 deletions.
3 changes: 1 addition & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ type PublisherSubscriber interface {
### Publishing

```go
c, err := smplmsg.NewPublisher("localhost:321", "exchangeName", "clientID")
c, err := smplmsg.NewPublisher("localhost:321", "exchangeName", "clientID", SetContentType("application/json"))
if err != nil {
return err
}
Expand All @@ -52,7 +52,6 @@ if err != nil {
msgCh, errCh := c.Consume("routingKey")
for {
select {
conErr := <-errchan:
case err := <-errCh:
return err
case msg := <-msgCh:
Expand Down
50 changes: 17 additions & 33 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package smplmsg
import (
"crypto/rand"
"io"
"sync/atomic"
"time"

"github.com/pkg/errors"
Expand Down Expand Up @@ -38,7 +37,7 @@ type client struct {
endMonitoring chan struct{}
timeout time.Duration
retryTimeout time.Duration
amqpCh atomic.Value
amqpCh *amqpCh
contentType ContentType
deliveryMode DeliveryMode
}
Expand All @@ -50,8 +49,8 @@ type amqpCh struct {
}

// NewSubscriber initializes and returns a Client that implements the Subscriber interface
func NewSubscriber(msgURI, exchange, clientID string, opts ...ClientOption) (Subscriber, error) {
c, err := newClient(msgURI, exchange, clientID, opts...)
func NewSubscriber(URI, exchange, clientID string, opts ...ClientOption) (Subscriber, error) {
c, err := newClient(URI, exchange, clientID, opts...)
if err != nil {
return nil, errors.Wrap(err, "failed to initilize new subscriber")
}
Expand All @@ -60,8 +59,8 @@ func NewSubscriber(msgURI, exchange, clientID string, opts ...ClientOption) (Sub
}

// NewPublisher initializes and returns a Client that implements the Publisher interface
func NewPublisher(msgURI, exchange, clientID string, opts ...ClientOption) (Publisher, error) {
c, err := newClient(msgURI, exchange, clientID, opts...)
func NewPublisher(URI, exchange, clientID string, opts ...ClientOption) (Publisher, error) {
c, err := newClient(URI, exchange, clientID, opts...)
if err != nil {
return nil, errors.Wrap(err, "failed to initilize new publisher")
}
Expand All @@ -70,17 +69,17 @@ func NewPublisher(msgURI, exchange, clientID string, opts ...ClientOption) (Publ
}

// NewPubSub initializes and returns a Client that implements the PublisherSubscriber interface
func NewPubSub(msgURI, exchange, clientID string, opts ...ClientOption) (PublisherSubscriber, error) {
c, err := newClient(msgURI, exchange, clientID, opts...)
func NewPubSub(URI, exchange, clientID string, opts ...ClientOption) (PublisherSubscriber, error) {
c, err := newClient(URI, exchange, clientID, opts...)
if err != nil {
return nil, errors.Wrap(err, "failed to initilize new pub/sub")
}

return c, nil
}

func newClient(msgqURI, exchange, clientID string, opts ...ClientOption) (*client, error) {
conn, err := amqp.Dial(msgqURI)
func newClient(URI, exchange, clientID string, opts ...ClientOption) (*client, error) {
conn, err := amqp.Dial(URI)
if err != nil {
return nil, err
}
Expand All @@ -91,25 +90,24 @@ func newClient(msgqURI, exchange, clientID string, opts ...ClientOption) (*clien
}

c := &client{
uri: msgqURI,
uri: URI,
clientID: clientID,
exchange: exchange,
contentType: OctetStream,
deliveryMode: Transient,
retryTimeout: defaultRetryTimeout,
endMonitoring: make(chan struct{}),
amqpCh: &amqpCh{
conn: conn,
ch: ch,
errCh: conn.NotifyClose(make(chan *amqp.Error)),
},
}

for _, opt := range opts {
opt(c)
}

c.amqpCh.Store(&amqpCh{
conn: conn,
ch: ch,
errCh: conn.NotifyClose(make(chan *amqp.Error)),
})

err = declareExchange(ch, exchange)
if err != nil {
return nil, errors.Wrap(err, "failed to declare exchange")
Expand All @@ -120,27 +118,15 @@ func newClient(msgqURI, exchange, clientID string, opts ...ClientOption) (*clien
return c, nil
}

func (c *client) amqpCon() *amqp.Connection {
return c.amqpCh.Load().(amqpCh).conn
}

func (c *client) amqpChannel() *amqp.Channel {
return c.amqpCh.Load().(amqpCh).ch
}

func (c *client) amqpErrChan() chan *amqp.Error {
return c.amqpCh.Load().(amqpCh).errCh
}

// Close closes the channels
func (c *client) Close() error {
close(c.endMonitoring)

if err := c.amqpChannel().Close(); err != nil {
if err := c.amqpCh.ch.Close(); err != nil {
return errors.Wrap(err, "failed to close channel")
}

if err := c.amqpCon().Close(); err != nil {
if err := c.amqpCh.conn.Close(); err != nil {
return errors.Wrap(err, "failed to close connection")
}

Expand Down Expand Up @@ -173,7 +159,5 @@ func uuid() string {

uuid := make([]byte, 16)
fillWithRandomBits(uuid)
uuid[6] = (uuid[6] & 0x0f) | 0x40 // Version 4
uuid[8] = (uuid[8] & 0x3f) | 0x80 // Variant is 10
return string(uuid)
}
6 changes: 3 additions & 3 deletions consuming.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ func (c *client) Consume(routingKey string) (<-chan amqp.Delivery, <-chan error)
return nil, errCh
}

ch, err := c.amqpChannel().Consume(
ch, err := c.amqpCh.ch.Consume(
queue,
uuid(),
false,
Expand Down Expand Up @@ -50,7 +50,7 @@ func validateMessage(delivCh <-chan amqp.Delivery, errCh chan error) {
}

func (c *client) declareQueue(queue, routingKey string) error {
q, err := c.amqpChannel().QueueDeclare(
q, err := c.amqpCh.ch.QueueDeclare(
queue, // name of the queue
true, // durable
false, // delete when unused
Expand All @@ -62,7 +62,7 @@ func (c *client) declareQueue(queue, routingKey string) error {
return errors.Wrap(err, fmt.Sprintf("failed to declare queue (%s)", queue))
}

err = c.amqpChannel().QueueBind(
err = c.amqpCh.ch.QueueBind(
q.Name,
routingKey,
c.exchange,
Expand Down
2 changes: 1 addition & 1 deletion publishing.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ func (c *client) Publish(routing string, headers amqp.Table, payload []byte) err
MessageId: uuid(),
Timestamp: time.Now(),
}
err := c.amqpChannel().Publish(
err := c.amqpCh.ch.Publish(
c.exchange,
routing,
false,
Expand Down
7 changes: 3 additions & 4 deletions reconnect.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,11 @@ import (
)

func (c *client) monitorConnection() {
aqmpErrCh := c.amqpErrChan()
for {
select {
case <-c.endMonitoring:
return
case <-aqmpErrCh:
case <-c.amqpCh.errCh:
err := c.reconnect()
if err != nil {
time.Sleep(c.retryTimeout)
Expand All @@ -38,11 +37,11 @@ func (c *client) reconnect() error {
return errors.Wrap(err, "failed to open channel")
}

c.amqpCh.Store(amqpCh{
c.amqpCh = &amqpCh{
conn: conn,
ch: ch,
errCh: conn.NotifyClose(make(chan *amqp.Error)),
})
}

err = declareExchange(ch, c.exchange)
if err != nil {
Expand Down
File renamed without changes.

0 comments on commit 7b8e0e0

Please sign in to comment.