Skip to content

Commit

Permalink
Add the possibility to use multiple routing keys (#14)
Browse files Browse the repository at this point in the history
  • Loading branch information
johnduro authored and Christian Häusler committed Jan 23, 2018
1 parent a9b48f5 commit 6d4ef44
Show file tree
Hide file tree
Showing 4 changed files with 95 additions and 13 deletions.
22 changes: 18 additions & 4 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ type Config struct {
Global bool
}
QueueSettings struct {
Routingkey string
Routingkey []string
MessageTTL int
DeadLetterExchange string
DeadLetterRoutingKey string
Expand Down Expand Up @@ -112,9 +112,13 @@ func (c Config) MessageTTL() int32 {
return int32(c.QueueSettings.MessageTTL)
}

// RoutingKey returns the configured key for message routing.
func (c Config) RoutingKey() string {
return transformToStringValue(c.QueueSettings.Routingkey)
// RoutingKeys returns the configured keys for message routing.
func (c Config) RoutingKeys() []string {
if len(c.QueueSettings.Routingkey) < 1 {
return []string{""}
}

return transformArrayOfStringToStringValue(c.QueueSettings.Routingkey)
}

// HasDeadLetterExchange checks if a dead letter exchange is configured.
Expand Down Expand Up @@ -182,3 +186,13 @@ func transformToStringValue(val string) string {

return val
}

func transformArrayOfStringToStringValue(iterable []string) []string {
var ret []string

for _, str := range iterable {
ret = append(ret, transformToStringValue(str))
}

return ret
}
18 changes: 10 additions & 8 deletions consumer/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,14 +128,16 @@ func (c *rabbitMqConnection) declareExchange() error {

// Bind queue
c.outLog.Printf("Binding queue \"%s\" to exchange \"%s\"...", c.cfg.RabbitMq.Queue, c.cfg.Exchange.Name)
if err := c.ch.QueueBind(
c.cfg.RabbitMq.Queue,
c.cfg.RoutingKey(),
c.cfg.ExchangeName(),
false,
nil,
); err != nil {
return fmt.Errorf("failed to bind queue to exchange: %v", err)
for _, routingKey := range c.cfg.RoutingKeys() {
if err := c.ch.QueueBind(
c.cfg.RabbitMq.Queue,
routingKey,
c.cfg.ExchangeName(),
false,
nil,
); err != nil {
return fmt.Errorf("failed to bind queue to exchange: %v", err)
}
}

return nil
Expand Down
65 changes: 65 additions & 0 deletions consumer/connection_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,34 @@ const priorityConfig = `[rabbitmq]
name=worker
type=test`

const multipleRoutingKeysConfig = `[rabbitmq]
queue=worker
[queuesettings]
routingkey=foo
routingkey=bar
[exchange]
name=worker
type=test`

const oneEmptyRoutingKeyConfig = `[rabbitmq]
queue=worker
[queuesettings]
routingkey="<empty>"
[exchange]
name=worker
type=test`

const noRoutingKeyConfig = `[rabbitmq]
queue=worker
[exchange]
name=worker
type=test`

var amqpTable amqp.Table

var queueTests = []struct {
Expand Down Expand Up @@ -117,6 +145,43 @@ var queueTests = []struct {
},
nil,
},
// Define queue with multiple routing keys.
{
"queueWithMultipleRoutingKeys",
multipleRoutingKeysConfig,
func(ch *TestChannel) {
ch.On("Qos", 3, 0, false).Return(nil).Once()
ch.On("QueueDeclare", "worker", true, false, false, false, amqpTable).Return(amqp.Queue{}, nil).Once()
ch.On("ExchangeDeclare", "worker", "test", false, false, false, false, amqp.Table{}).Return(nil).Once()
ch.On("QueueBind", "worker", "foo", "worker", false, amqpTable).Return(nil).Once()
ch.On("QueueBind", "worker", "bar", "worker", false, amqpTable).Return(nil).Once()
},
nil,
},
// Define queue with one emtpy routing key.
{
"queueWithOneEmptyRoutingKey",
oneEmptyRoutingKeyConfig,
func(ch *TestChannel) {
ch.On("Qos", 3, 0, false).Return(nil).Once()
ch.On("QueueDeclare", "worker", true, false, false, false, amqpTable).Return(amqp.Queue{}, nil).Once()
ch.On("ExchangeDeclare", "worker", "test", false, false, false, false, amqp.Table{}).Return(nil).Once()
ch.On("QueueBind", "worker", "", "worker", false, amqpTable).Return(nil).Once()
},
nil,
},
// Define queue without routing key.
{
"queueWithoutRoutingKey",
noRoutingKeyConfig,
func(ch *TestChannel) {
ch.On("Qos", 3, 0, false).Return(nil).Once()
ch.On("QueueDeclare", "worker", true, false, false, false, amqpTable).Return(amqp.Queue{}, nil).Once()
ch.On("ExchangeDeclare", "worker", "test", false, false, false, false, amqp.Table{}).Return(nil).Once()
ch.On("QueueBind", "worker", "", "worker", false, amqpTable).Return(nil).Once()
},
nil,
},
// Set QoS fails.
{
"setQosFail",
Expand Down
3 changes: 2 additions & 1 deletion example.conf
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,9 @@ durable = On
# settings.
[queuesettings]
# The routing key used to determine if a message send to the above configured
# exchange will be routed to the queue.
# exchange will be routed to the queue. Multiple routing keys can be configured.
routingkey = somekey
routingkey = anotherkey

# The default TTL of a message in the queue.
#
Expand Down

0 comments on commit 6d4ef44

Please sign in to comment.