Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat: defined a type for exchange kinds #286

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion _examples/pubsub/pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func redial(ctx context.Context, url string) chan chan session {
log.Fatalf("cannot create channel: %v", err)
}

if err := ch.ExchangeDeclare(exchange, "fanout", false, true, false, false, nil); err != nil {
if err := ch.ExchangeDeclare(exchange, amqp.Fanout, false, true, false, false, nil); err != nil {
log.Fatalf("cannot declare fanout exchange: %v", err)
}

Expand Down
8 changes: 4 additions & 4 deletions channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -1301,15 +1301,15 @@ to respond to any exceptions.
Optional amqp.Table of arguments that are specific to the server's implementation of
the exchange can be sent for exchange types that require extra parameters.
*/
func (ch *Channel) ExchangeDeclare(name, kind string, durable, autoDelete, internal, noWait bool, args Table) error {
func (ch *Channel) ExchangeDeclare(name string, kind ExchangeType, durable, autoDelete, internal, noWait bool, args Table) error {
if err := args.Validate(); err != nil {
return err
}

return ch.call(
&exchangeDeclare{
Exchange: name,
Type: kind,
Type: string(kind),
Passive: false,
Durable: durable,
AutoDelete: autoDelete,
Expand All @@ -1328,15 +1328,15 @@ exchange is assumed by RabbitMQ to already exist, and attempting to connect to a
non-existent exchange will cause RabbitMQ to throw an exception. This function
can be used to detect the existence of an exchange.
*/
func (ch *Channel) ExchangeDeclarePassive(name, kind string, durable, autoDelete, internal, noWait bool, args Table) error {
func (ch *Channel) ExchangeDeclarePassive(name string, kind ExchangeType, durable, autoDelete, internal, noWait bool, args Table) error {
if err := args.Validate(); err != nil {
return err
}

return ch.call(
&exchangeDeclare{
Exchange: name,
Type: kind,
Type: string(kind),
Passive: true,
Durable: durable,
AutoDelete: autoDelete,
Expand Down
8 changes: 4 additions & 4 deletions examples_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ func ExampleChannel_Confirm_bridge() {
log.Fatalf("channel.open source: %s", err)
}

if err := chs.ExchangeDeclare("log", "topic", true, false, false, false, nil); err != nil {
if err := chs.ExchangeDeclare("log", amqp.Topic, true, false, false, false, nil); err != nil {
log.Fatalf("exchange.declare destination: %s", err)
}

Expand Down Expand Up @@ -159,7 +159,7 @@ func ExampleChannel_Confirm_bridge() {
log.Fatalf("channel.open destination: %s", err)
}

if err := chd.ExchangeDeclare("log", "topic", true, false, false, false, nil); err != nil {
if err := chd.ExchangeDeclare("log", amqp.Topic, true, false, false, false, nil); err != nil {
log.Fatalf("exchange.declare destination: %s", err)
}

Expand Down Expand Up @@ -239,7 +239,7 @@ func ExampleChannel_Consume() {
// are the same. This is part of AMQP being a programmable messaging model.
//
// See the Channel.Publish example for the complimentary declare.
err = c.ExchangeDeclare("logs", "topic", true, false, false, false, nil)
err = c.ExchangeDeclare("logs", amqp.Topic, true, false, false, false, nil)
if err != nil {
log.Fatalf("exchange.declare: %s", err)
}
Expand Down Expand Up @@ -365,7 +365,7 @@ func ExampleChannel_PublishWithContext() {
// are the same. This is part of AMQP being a programmable messaging model.
//
// See the Channel.Consume example for the complimentary declare.
err = c.ExchangeDeclare("logs", "topic", true, false, false, false, nil)
err = c.ExchangeDeclare("logs", amqp.Topic, true, false, false, false, nil)
if err != nil {
log.Fatalf("exchange.declare: %v", err)
}
Expand Down
44 changes: 22 additions & 22 deletions integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,12 +171,12 @@ func TestExchangePassiveOnMissingExchangeShouldError(t *testing.T) {

if err := ch.ExchangeDeclarePassive(
"test-integration-missing-passive-exchange",
"direct", // type
false, // duration (note: is durable)
true, // auto-delete
false, // internal
false, // nowait
nil, // args
Direct, // type
false, // duration (note: is durable)
true, // auto-delete
false, // internal
false, // nowait
nil, // args
); err == nil {
t.Fatal("ExchangeDeclarePassive of a missing exchange should return error")
}
Expand All @@ -199,7 +199,7 @@ func TestIntegrationExchangeDeclarePassiveOnDeclaredShouldNotError(t *testing.T)

if err := ch.ExchangeDeclare(
exchange, // name
"direct", // type
Direct, // type
false, // durable
true, // auto-delete
false, // internal
Expand All @@ -211,7 +211,7 @@ func TestIntegrationExchangeDeclarePassiveOnDeclaredShouldNotError(t *testing.T)

if err := ch.ExchangeDeclarePassive(
exchange, // name
"direct", // type
Direct, // type
false, // durable
true, // auto-delete
false, // internal
Expand All @@ -238,7 +238,7 @@ func TestIntegrationExchange(t *testing.T) {

if err := channel.ExchangeDeclare(
exchange, // name
"direct", // type
Direct, // type
false, // duration
true, // auto-delete
false, // internal
Expand Down Expand Up @@ -396,7 +396,7 @@ func TestIntegrationBasicQueueOperations(t *testing.T) {
for _, deleteQueueFirst := range deleteQueueFirstOptions {
if err := channel.ExchangeDeclare(
exchangeName, // name
"direct", // type
Direct, // type
true, // duration (note: is durable)
false, // auto-delete
false, // internal
Expand Down Expand Up @@ -1502,11 +1502,11 @@ func TestDeclareArgsRejectToDeadLetterQueue(t *testing.T) {

ch, _ := conn.Channel()

if err := ch.ExchangeDeclare(ex, "fanout", false, true, false, false, nil); err != nil {
if err := ch.ExchangeDeclare(ex, Fanout, false, true, false, false, nil); err != nil {
t.Fatalf("cannot declare %v: got: %v", ex, err)
}

if err := ch.ExchangeDeclare(dlex, "fanout", false, true, false, false, nil); err != nil {
if err := ch.ExchangeDeclare(dlex, Fanout, false, true, false, false, nil); err != nil {
t.Fatalf("cannot declare %v: got: %v", dlex, err)
}

Expand Down Expand Up @@ -1722,7 +1722,7 @@ func TestChannelExceptionWithCloseIssue43(t *testing.T) {

// This ensures that the 2nd channel is unaffected by the channel exception
// on channel 1.
err = c2.ExchangeDeclare("test-channel-still-exists", "direct", false, true, false, false, nil)
err = c2.ExchangeDeclare("test-channel-still-exists", Direct, false, true, false, false, nil)
if err != nil {
t.Fatalf("failed to declare exchange, got: %v", err)
}
Expand Down Expand Up @@ -1834,20 +1834,20 @@ func TestExchangeDeclarePrecondition(t *testing.T) {

err = ch.ExchangeDeclare(
exchange,
"direct", // exchangeType
false, // durable
true, // auto-delete
false, // internal
false, // noWait
nil, // arguments
Direct, // exchangeType
false, // durable
true, // auto-delete
false, // internal
false, // noWait
nil, // arguments
)
if err != nil {
t.Fatalf("Could not initially declare exchange")
}

err = ch.ExchangeDeclare(
exchange,
"direct",
Direct,
true, // different durability
true,
false,
Expand Down Expand Up @@ -2041,7 +2041,7 @@ func TestIntegrationGetNextPublishSeqNo(t *testing.T) {
}

ex := "test-get-next-pub"
if err = ch.ExchangeDeclare(ex, "direct", false, false, false, false, nil); err != nil {
if err = ch.ExchangeDeclare(ex, Direct, false, false, false, false, nil); err != nil {
t.Fatalf("cannot declare %v: got: %v", ex, err)
}

Expand Down Expand Up @@ -2075,7 +2075,7 @@ func TestIntegrationGetNextPublishSeqNoRace(t *testing.T) {
}

ex := "test-get-next-pub"
if err = ch.ExchangeDeclare(ex, "direct", false, false, false, false, nil); err != nil {
if err = ch.ExchangeDeclare(ex, Direct, false, false, false, false, nil); err != nil {
t.Fatalf("cannot declare %v: got: %v", ex, err)
}

Expand Down
11 changes: 7 additions & 4 deletions types.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,15 @@ import (
// name. Applications can route to a queue using the queue name as routing key.
const DefaultExchange = ""

// Type of Exchanges
type ExchangeType string

// Constants for standard AMQP 0-9-1 exchange types.
const (
ExchangeDirect = "direct"
ExchangeFanout = "fanout"
ExchangeTopic = "topic"
ExchangeHeaders = "headers"
Direct ExchangeType = "direct"
Fanout ExchangeType = "fanout"
Topic ExchangeType = "topic"
Headers ExchangeType = "headers"
)

var (
Expand Down
Loading