Skip to content

Commit

Permalink
Speculative fix for AMQP vhost+TLS support
Browse files Browse the repository at this point in the history
  • Loading branch information
mkuratczyk committed May 6, 2024
1 parent 87b3f82 commit 8bad78b
Show file tree
Hide file tree
Showing 3 changed files with 12 additions and 5 deletions.
4 changes: 2 additions & 2 deletions pkg/amqp10_client/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
"github.com/rabbitmq/omq/pkg/log"
)

func amqpVHost(connectionString string) string {
func hostAndVHost(connectionString string) (string, string) {
uri, err := url.Parse(connectionString)
if err != nil {
log.Error("failed to parse connection string", "error", err.Error())
Expand All @@ -20,5 +20,5 @@ func amqpVHost(connectionString string) string {
vhost = strings.TrimPrefix(uri.Path, "/")
}

return "vhost:" + vhost
return uri.Hostname(), "vhost:" + vhost
}
7 changes: 5 additions & 2 deletions pkg/amqp10_client/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package amqp10_client

import (
"context"
"crypto/tls"
"fmt"
"os"
"strconv"
Expand Down Expand Up @@ -29,9 +30,11 @@ type Amqp10Consumer struct {

func NewConsumer(cfg config.Config, id int) *Amqp10Consumer {
// open connection
hostname, vhost := hostAndVHost(cfg.ConsumerUri)
conn, err := amqp.Dial(context.TODO(), cfg.ConsumerUri, &amqp.ConnOptions{
HostName: amqpVHost(cfg.ConsumerUri),
})
HostName: vhost,
TLSConfig: &tls.Config{
ServerName: hostname}})
if err != nil {
log.Error("consumer failed to connect", "protocol", "amqp-1.0", "consumerId", id, "error", err.Error())
return nil
Expand Down
6 changes: 5 additions & 1 deletion pkg/amqp10_client/publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package amqp10_client

import (
"context"
"crypto/tls"
"math/rand"
"strconv"
"time"
Expand All @@ -28,8 +29,11 @@ type Amqp10Publisher struct {

func NewPublisher(cfg config.Config, n int) *Amqp10Publisher {
// open connection
hostname, vhost := hostAndVHost(cfg.PublisherUri)
conn, err := amqp.Dial(context.TODO(), cfg.PublisherUri, &amqp.ConnOptions{
HostName: amqpVHost(cfg.PublisherUri)})
HostName: vhost,
TLSConfig: &tls.Config{
ServerName: hostname}})
if err != nil {
log.Error("publisher connection failed", "protocol", "amqp-1.0", "publisherId", n, "error", err.Error())
return nil
Expand Down

0 comments on commit 8bad78b

Please sign in to comment.