Skip to content

Commit

Permalink
Add support for AMQP property filters (#36)
Browse files Browse the repository at this point in the history
Also, allow multiple subject values
(made it easier to test property filters)
  • Loading branch information
mkuratczyk authored Oct 15, 2024
1 parent 8115eb3 commit 77496f2
Show file tree
Hide file tree
Showing 7 changed files with 109 additions and 48 deletions.
32 changes: 31 additions & 1 deletion cmd/cmd_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ func TestPublishWithPriorities(t *testing.T) {
}
}

func TestAMQPStreamFilters(t *testing.T) {
func TestAMQPStreamAppPropertyFilters(t *testing.T) {
defer metrics.Reset()

rootCmd := RootCmd()
Expand Down Expand Up @@ -207,6 +207,36 @@ func TestAMQPStreamFilters(t *testing.T) {
}, 2*time.Second, 100*time.Millisecond)
}

func TestAMQPStreamPropertyFilters(t *testing.T) {
defer metrics.Reset()

rootCmd := RootCmd()

args := []string{"amqp",
"-C", "3",
"--publish-to", "/queues/stream-with-property-filters",
"--consume-from", "/queues/stream-with-property-filters",
"--amqp-subject", "foo,bar,baz",
"--amqp-property-filter", "subject=baz",
"--queues", "stream",
"--cleanup-queues=true",
"--time", "2s",
}

rootCmd.SetArgs(args)
fmt.Println("Running test: omq", strings.Join(args, " "))
err := rootCmd.Execute()
assert.Nil(t, err)

assert.Eventually(t, func() bool {
return 3 == metrics.MessagesPublished.Get()

}, 2*time.Second, 100*time.Millisecond)
assert.Eventually(t, func() bool {
return 1 == metrics.MessagesConsumedNormalPriority.Get()
}, 2*time.Second, 100*time.Millisecond)
}

func TestFanInFromMQTTtoAMQP(t *testing.T) {
defer metrics.Reset()

Expand Down
15 changes: 14 additions & 1 deletion cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ var (
var metricTags []string
var amqpAppProperties []string
var amqpAppPropertyFilters []string
var amqpPropertyFilters []string

func Execute() {
rootCmd := RootCmd()
Expand Down Expand Up @@ -226,6 +227,17 @@ func RootCmd() *cobra.Command {
cfg.Amqp.AppPropertyFilters[parts[0]] = parts[1]
}

// AMQP property filters
cfg.Amqp.PropertyFilters = make(map[string]string)
for _, filter := range amqpPropertyFilters {
parts := strings.SplitN(filter, "=", 2)
if len(parts) != 2 {
_, _ = fmt.Fprintf(os.Stderr, "ERROR: invalid AMQP property filter: %s, use key=filterExpression format\n", filter)
os.Exit(1)
}
cfg.Amqp.PropertyFilters[parts[0]] = parts[1]
}

// split metric tags into key-value pairs
cfg.MetricTags = make(map[string]string)
for _, tag := range metricTags {
Expand Down Expand Up @@ -270,7 +282,7 @@ func RootCmd() *cobra.Command {
BoolVarP(&cfg.UseMillis, "use-millis", "m", false, "Use milliseconds for timestamps (automatically enabled when no publishers or no consumers)")
rootCmd.PersistentFlags().
VarP(enumflag.New(&cfg.QueueDurability, "queue-durability", config.AmqpDurabilityModes, enumflag.EnumCaseInsensitive), "queue-durability", "", "Queue durability (default: configuration - the queue definition is durable)")
rootCmd.PersistentFlags().StringVar(&cfg.Amqp.Subject, "amqp-subject", "", "AMQP 1.0 message subject")
rootCmd.PersistentFlags().StringSliceVar(&cfg.Amqp.Subjects, "amqp-subject", []string{}, "AMQP 1.0 message subject(s)")
rootCmd.PersistentFlags().StringVar(&cfg.Amqp.BindingKey, "amqp-binding-key", "", "AMQP 1.0 consumer binding key")
rootCmd.PersistentFlags().
BoolVar(&cfg.Amqp.SendSettled, "amqp-send-settled", false, "Send settled messages (fire and forget)")
Expand All @@ -293,6 +305,7 @@ func RootCmd() *cobra.Command {
rootCmd.PersistentFlags().DurationVar(&cfg.ConsumerStartupDelay, "consumer-startup-delay", 0, "Delay consumer startup to allow a backlog of messages to build up (eg. 10s)")
rootCmd.PersistentFlags().StringArrayVar(&amqpAppProperties, "amqp-app-property", []string{}, "AMQP application properties, eg. key1=val1,val2")
rootCmd.PersistentFlags().StringArrayVar(&amqpAppPropertyFilters, "amqp-app-property-filter", []string{}, "AMQP application property filters, eg. key1=$p:prefix")
rootCmd.PersistentFlags().StringArrayVar(&amqpPropertyFilters, "amqp-property-filter", []string{}, "AMQP property filters, eg. key1=$p:prefix")

rootCmd.AddCommand(amqp_amqp)
rootCmd.AddCommand(amqp_stomp)
Expand Down
10 changes: 6 additions & 4 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ module github.com/rabbitmq/omq

go 1.23.0

replace github.com/Azure/go-amqp => github.com/mkuratczyk/go-amqp v0.0.0-20241014095613-d9376f5f2d9f

require (
github.com/Azure/go-amqp v1.2.0
github.com/VictoriaMetrics/metrics v1.35.1
Expand Down Expand Up @@ -31,16 +33,16 @@ require (
github.com/rogpeppe/go-internal v1.11.0 // indirect
github.com/valyala/fastrand v1.1.0 // indirect
github.com/valyala/histogram v1.2.0 // indirect
golang.org/x/exp v0.0.0-20240909161429-701f63a606c0 // indirect
golang.org/x/net v0.29.0 // indirect
golang.org/x/exp v0.0.0-20241009180824-f66d83c29e7c // indirect
golang.org/x/net v0.30.0 // indirect
golang.org/x/sync v0.8.0 // indirect
golang.org/x/sys v0.25.0 // indirect
golang.org/x/sys v0.26.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)

require (
github.com/go-stomp/stomp/v3 v3.1.2
github.com/inconshreveable/mousetrap v1.1.0 // indirect
github.com/rabbitmq/rabbitmq-amqp-go-client v0.0.0-20240924141749-6d8aaeb2b8f7
github.com/rabbitmq/rabbitmq-amqp-go-client v0.0.0-20241001222512-5fc29f4968d0
github.com/spf13/pflag v1.0.5 // indirect
)
59 changes: 24 additions & 35 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,9 +1,3 @@
github.com/Azure/go-amqp v1.1.0 h1:XUhx5f4lZFVf6LQc5kBUFECW0iJW9VLxKCYrBeGwl0U=
github.com/Azure/go-amqp v1.1.0/go.mod h1:vZAogwdrkbyK3Mla8m/CxSc/aKdnTZ4IbPxl51Y5WZE=
github.com/Azure/go-amqp v1.1.1-0.20240913224415-f631e6909719 h1:rL7yrEV9yputQV7T+Y9eJVmTVkK4B0aHlBc8TUITC5A=
github.com/Azure/go-amqp v1.1.1-0.20240913224415-f631e6909719/go.mod h1:vZAogwdrkbyK3Mla8m/CxSc/aKdnTZ4IbPxl51Y5WZE=
github.com/Azure/go-amqp v1.2.0 h1:NNyfN3/cRszWzMvjmm64yaPZDHX/2DJkowv8Ub9y01I=
github.com/Azure/go-amqp v1.2.0/go.mod h1:vZAogwdrkbyK3Mla8m/CxSc/aKdnTZ4IbPxl51Y5WZE=
github.com/VictoriaMetrics/metrics v1.35.1 h1:o84wtBKQbzLdDy14XeskkCZih6anG+veZ1SwJHFGwrU=
github.com/VictoriaMetrics/metrics v1.35.1/go.mod h1:r7hveu6xMdUACXvB8TYdAj8WEsKzWB0EkpJN+RDtOf8=
github.com/aymanbagabas/go-osc52/v2 v2.0.1 h1:HwpRHbFMcZLEVr42D4p7XBqjyuxQH5SMiErDT4WkJ2k=
Expand All @@ -12,8 +6,6 @@ github.com/charmbracelet/lipgloss v0.13.0 h1:4X3PPeoWEDCMvzDvGmTajSyYPcZM4+y8sCA
github.com/charmbracelet/lipgloss v0.13.0/go.mod h1:nw4zy0SBX/F/eAO1cWdcvy6qnkDUxr8Lw7dvFrAIbbY=
github.com/charmbracelet/log v0.4.0 h1:G9bQAcx8rWA2T3pWvx7YtPTPwgqpk7D68BX21IRW8ZM=
github.com/charmbracelet/log v0.4.0/go.mod h1:63bXt/djrizTec0l11H20t8FDSvA4CRZJ1KH22MdptM=
github.com/charmbracelet/x/ansi v0.2.3 h1:VfFN0NUpcjBRd4DnKfRaIRo53KRgey/nhOoEqosGDEY=
github.com/charmbracelet/x/ansi v0.2.3/go.mod h1:dk73KoMTT5AX5BsX0KrqhsTqAnhZZoCBjs7dGWp4Ktw=
github.com/charmbracelet/x/ansi v0.3.2 h1:wsEwgAN+C9U06l9dCVMX0/L3x7ptvY1qmjMwyfE6USY=
github.com/charmbracelet/x/ansi v0.3.2/go.mod h1:dk73KoMTT5AX5BsX0KrqhsTqAnhZZoCBjs7dGWp4Ktw=
github.com/cpuguy83/go-md2man/v2 v2.0.4/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o=
Expand All @@ -26,18 +18,19 @@ github.com/fortytw2/leaktest v1.3.0 h1:u8491cBMTQ8ft8aeV+adlcytMZylmA5nnwwkRZjI8
github.com/fortytw2/leaktest v1.3.0/go.mod h1:jDsjWgpAGjm2CA7WthBh/CdZYEPF31XHquHwclZch5g=
github.com/go-logfmt/logfmt v0.6.0 h1:wGYYu3uicYdqXVgoYbvnkrPVXkuLM1p1ifugDMEdRi4=
github.com/go-logfmt/logfmt v0.6.0/go.mod h1:WYhtIu8zTZfxdn5+rREduYbwxfcBr/Vr6KEVveWlfTs=
github.com/go-logr/logr v1.2.4 h1:g01GSCwiDw2xSZfjJ2/T9M+S6pFdcNtFYsp+Y43HYDQ=
github.com/go-logr/logr v1.2.4/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A=
github.com/go-logr/logr v1.4.2 h1:6pFjapn8bFcIbiKo3XT4j/BhANplGihG6tvd+8rYgrY=
github.com/go-logr/logr v1.4.2/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY=
github.com/go-stomp/stomp/v3 v3.1.2 h1:kmrNek021BsFUO8rxDhbkOYslRomKO/JIrUCIqyL0r8=
github.com/go-stomp/stomp/v3 v3.1.2/go.mod h1:ztzZej6T2W4Y6FlD+Tb5n7HQP3/O5UNQiuC169pIp10=
github.com/go-task/slim-sprig v0.0.0-20230315185526-52ccab3ef572 h1:tfuBGBXKqDEevZMzYi5KSi8KkcZtzBcTgAUUtapy0OI=
github.com/go-task/slim-sprig v0.0.0-20230315185526-52ccab3ef572/go.mod h1:9Pwr4B2jHnOSGXyyzV8ROjYa2ojvAY6HCGYYfMoC3Ls=
github.com/go-task/slim-sprig/v3 v3.0.0 h1:sUs3vkvUymDpBKi3qH1YSqBQk9+9D/8M2mN1vB6EwHI=
github.com/go-task/slim-sprig/v3 v3.0.0/go.mod h1:W848ghGpv3Qj3dhTPRyJypKRiqCdHZiAzKg9hl15HA8=
github.com/golang/mock v1.6.0 h1:ErTB+efbowRARo13NNdxyJji2egdxLGQhRaY+DUumQc=
github.com/golang/mock v1.6.0/go.mod h1:p6yTPP+5HYm5mzsMV8JkE6ZKdX+/wYM6Hr+LicevLPs=
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/pprof v0.0.0-20210407192527-94a9f03dee38 h1:yAJXTCF9TqKcTiHJAE8dj7HMvPfh66eeA2JYW7eFpSE=
github.com/google/pprof v0.0.0-20210407192527-94a9f03dee38/go.mod h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE=
github.com/google/pprof v0.0.0-20240910150728-a0b0bb1d4134 h1:c5FlPPgxOn7kJz3VoPLkQYQXGBS3EklQ4Zfi57uOuqQ=
github.com/google/pprof v0.0.0-20240910150728-a0b0bb1d4134/go.mod h1:vavhavw2zAxS5dIdcRluK6cSGGPlZynqzFM8NdvU144=
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aNNg=
Expand All @@ -57,17 +50,19 @@ github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWE
github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y=
github.com/mattn/go-runewidth v0.0.16 h1:E5ScNMtiwvlvB5paMFdw9p4kSQzbXFikJ5SQO6TULQc=
github.com/mattn/go-runewidth v0.0.16/go.mod h1:Jdepj2loyihRzMpdS35Xk/zdY8IAYHsh153qUoGf23w=
github.com/mkuratczyk/go-amqp v0.0.0-20241014095613-d9376f5f2d9f h1:WVdm43rq1NhTOuRrxn73Tw4j9rrrC/ilOEKWTYxFWqo=
github.com/mkuratczyk/go-amqp v0.0.0-20241014095613-d9376f5f2d9f/go.mod h1:vZAogwdrkbyK3Mla8m/CxSc/aKdnTZ4IbPxl51Y5WZE=
github.com/muesli/termenv v0.15.2 h1:GohcuySI0QmI3wN8Ok9PtKGkgkFIk7y6Vpb5PvrY+Wo=
github.com/muesli/termenv v0.15.2/go.mod h1:Epx+iuz8sNs7mNKhxzH4fWXGNpZwUaJKRS1noLXviQ8=
github.com/onsi/ginkgo/v2 v2.13.0 h1:0jY9lJquiL8fcf3M4LAXN5aMlS/b2BV86HFFPCPMgE4=
github.com/onsi/ginkgo/v2 v2.13.0/go.mod h1:TE309ZR8s5FsKKpuB1YAQYBzCaAfUgatB/xlT/ETL/o=
github.com/onsi/gomega v1.28.1 h1:MijcGUbfYuznzK/5R4CPNoUP/9Xvuo20sXfEm6XxoTA=
github.com/onsi/gomega v1.28.1/go.mod h1:9sxs+SwGrKI0+PWe4Fxa9tFQQBG5xSsSbMXOI8PPpoQ=
github.com/onsi/ginkgo/v2 v2.20.2 h1:7NVCeyIWROIAheY21RLS+3j2bb52W0W82tkberYytp4=
github.com/onsi/ginkgo/v2 v2.20.2/go.mod h1:K9gyxPIlb+aIvnZ8bd9Ak+YP18w3APlR+5coaZoE2ag=
github.com/onsi/gomega v1.34.2 h1:pNCwDkzrsv7MS9kpaQvVb1aVLahQXyJ/Tv5oAZMI3i8=
github.com/onsi/gomega v1.34.2/go.mod h1:v1xfxRgk0KIsG+QOdm7p8UosrOzPYRo60fd3B/1Dukc=
github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsKMEsNbx1VGcRFpLqf3715MtcvvzbA=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/rabbitmq/rabbitmq-amqp-go-client v0.0.0-20240924141749-6d8aaeb2b8f7 h1:PqmnnV/+mTJp+yKe+cHKtvi9lM7PgmIL6prhFRL1qnA=
github.com/rabbitmq/rabbitmq-amqp-go-client v0.0.0-20240924141749-6d8aaeb2b8f7/go.mod h1:fatkNU802Qu/1pmg6x7pmHC6GKLTJsmjNVPrJ5BoZfE=
github.com/rabbitmq/rabbitmq-amqp-go-client v0.0.0-20241001222512-5fc29f4968d0 h1:aX62gNFKXqBsacs3ejwl21dfyP8GT8WUPX04cRuLzwQ=
github.com/rabbitmq/rabbitmq-amqp-go-client v0.0.0-20241001222512-5fc29f4968d0/go.mod h1:Km231GyOZAw9I3SZIqkfB9VVzCsu8jvFWYdghmnwueM=
github.com/relvacode/iso8601 v1.4.0 h1:GsInVSEJfkYuirYFxa80nMLbH2aydgZpIf52gYZXUJs=
github.com/relvacode/iso8601 v1.4.0/go.mod h1:FlNp+jz+TXpyRqgmM7tnzHHzBnz776kmAH2h3sZCn0I=
github.com/rivo/uniseg v0.2.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc=
Expand Down Expand Up @@ -96,18 +91,14 @@ github.com/valyala/histogram v1.2.0/go.mod h1:Hb4kBwb4UxsaNbbbh+RRz8ZR6pdodR57tz
github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/exp v0.0.0-20240823005443-9b4947da3948 h1:kx6Ds3MlpiUHKj7syVnbp57++8WpuKPcR5yjLBjvLEA=
golang.org/x/exp v0.0.0-20240823005443-9b4947da3948/go.mod h1:akd2r19cwCdwSwWeIdzYQGa/EZZyqcOdwWiwj5L5eKQ=
golang.org/x/exp v0.0.0-20240909161429-701f63a606c0 h1:e66Fs6Z+fZTbFBAxKfP3PALWBtpfqks2bwGcexMxgtk=
golang.org/x/exp v0.0.0-20240909161429-701f63a606c0/go.mod h1:2TbTHSBQa924w8M6Xs1QcRcFwyucIwBGpK1p2f1YFFY=
golang.org/x/exp v0.0.0-20241009180824-f66d83c29e7c h1:7dEasQXItcW1xKJ2+gg5VOiBnqWrJc+rq0DPKyvvdbY=
golang.org/x/exp v0.0.0-20241009180824-f66d83c29e7c/go.mod h1:NQtJDoLvd6faHhE7m4T/1IY708gDefGGjR/iUW8yQQ8=
golang.org/x/mod v0.4.2/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM=
golang.org/x/net v0.28.0 h1:a9JDOJc5GMUJ0+UDqmLT86WiEy7iWyIhz8gz8E4e5hE=
golang.org/x/net v0.28.0/go.mod h1:yqtgsTWOOnlGLG9GFRrK3++bGOUEkNBoHZc8MEDWPNg=
golang.org/x/net v0.29.0 h1:5ORfpBpCs4HzDYoodCDBbwHzdR5UrLBZ3sOnUJmFoHo=
golang.org/x/net v0.29.0/go.mod h1:gLkgy8jTGERgjzMic6DS9+SP0ajcu6Xu3Orq/SpETg0=
golang.org/x/net v0.30.0 h1:AcW1SDZMkb8IpzCdQUaIq2sP4sZ4zw+55h6ynffypl4=
golang.org/x/net v0.30.0/go.mod h1:2wGyMJ5iFasEhkwi13ChkO/t1ECNC4X4eBKkVFyYFlU=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.8.0 h1:3NFvSEYkUoMifnESzZl15y791HH1qU2xm6eCJU5ZPXQ=
Expand All @@ -118,20 +109,18 @@ golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7w
golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.24.0 h1:Twjiwq9dn6R1fQcyiK+wQyHWfaz/BJB+YIpzU/Cv3Xg=
golang.org/x/sys v0.24.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.25.0 h1:r+8e+loiHxRqhXVl6ML1nO3l1+oFoWbnlu2Ehimmi34=
golang.org/x/sys v0.25.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.26.0 h1:KHjCJyddX0LoSTb3J+vWpupP9p0oznkqVk/IfjymZbo=
golang.org/x/sys v0.26.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.17.0 h1:XtiM5bkSOt+ewxlOE/aE/AKEHibwj/6gvWMl9Rsh0Qc=
golang.org/x/text v0.17.0/go.mod h1:BuEKDfySbSR4drPmRPG/7iBdf8hvFMuRexcpahXilzY=
golang.org/x/text v0.19.0 h1:kTxAhCbGbxhK0IwgSKiMO5awPoDQ0RpfiVYBfK860YM=
golang.org/x/text v0.19.0/go.mod h1:BuEKDfySbSR4drPmRPG/7iBdf8hvFMuRexcpahXilzY=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.1.1/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk=
golang.org/x/tools v0.24.0 h1:J1shsA93PJUEVaUSaay7UXAyE8aimq3GW0pjlolpa24=
golang.org/x/tools v0.24.0/go.mod h1:YhNqVBIfWHdzvTLs0d8LCuMhkKUgSUKldakyV7W/WDQ=
golang.org/x/tools v0.26.0 h1:v/60pFQmzmT9ExmjDv2gGIfi3OqfKoEP6I5+umXlbnQ=
golang.org/x/tools v0.26.0/go.mod h1:TPVVj70c7JJ3WCazhD8OdXcZg/og+b9+tH/KxylGwH0=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
Expand Down
34 changes: 30 additions & 4 deletions pkg/amqp10_client/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,14 @@ func (c *Amqp10Consumer) CreateReceiver(ctx context.Context) {

for c.Receiver == nil {
mgmt.DeclareAndBind(c.Config, c.Terminus, c.Id)
receiver, err := c.Session.NewReceiver(ctx, c.Terminus, &amqp.ReceiverOptions{SourceDurability: durability, Credit: int32(c.Config.ConsumerCredits), Properties: buildLinkProperties(c.Config), Filters: buildLinkFilters(c.Config)})
receiver, err := c.Session.NewReceiver(ctx,
c.Terminus,
&amqp.ReceiverOptions{
SourceDurability: durability,
Credit: int32(c.Config.ConsumerCredits),
Properties: buildLinkProperties(c.Config),
Filters: buildLinkFilters(c.Config),
})
if err != nil {
if err == context.Canceled {
return
Expand Down Expand Up @@ -159,11 +166,19 @@ func (c *Amqp10Consumer) Start(ctx context.Context, subscribed chan bool) {

if c.Config.LogOutOfOrder && timeSent.Before(previousMessageTimeSent) {
metrics.MessagesConsumedOutOfOrderMetric(priority).Inc()
log.Info("Out of order message received. This message was sent before the previous message", "this messsage", timeSent, "previous message", previousMessageTimeSent)
log.Info("Out of order message received. This message was sent before the previous message",
"this messsage", timeSent,
"previous message", previousMessageTimeSent)
}
previousMessageTimeSent = timeSent

log.Debug("message received", "id", c.Id, "terminus", c.Terminus, "size", len(payload), "priority", priority, "latency", latency, "appProps", msg.ApplicationProperties)
log.Debug("message received",
"id", c.Id,
"terminus", c.Terminus,
"size", len(payload),
"priority", priority,
"latency", latency,
"appProps", msg.ApplicationProperties)

if c.Config.ConsumerLatency > 0 {
log.Debug("consumer latency", "id", c.Id, "latency", c.Config.ConsumerLatency)
Expand Down Expand Up @@ -249,7 +264,18 @@ func buildLinkFilters(cfg config.Config) []amqp.LinkFilter {
}

for appProperty, filterExpression := range cfg.Amqp.AppPropertyFilters {
filters = append(filters, amqp.NewLinkFilter("amqp:application-properties-filter", 0, map[string]any{appProperty: filterExpression}))
filters = append(filters, amqp.NewLinkFilter("amqp:application-properties-filter",
0,
map[string]any{
appProperty: filterExpression}))
}

for property, filterExpression := range cfg.Amqp.PropertyFilters {
filters = append(filters,
amqp.NewLinkFilter("amqp:properties-filter",
0,
map[amqp.Symbol]any{
amqp.Symbol(property): filterExpression}))
}
return filters
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/amqp10_client/publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,8 +207,8 @@ func (p *Amqp10Publisher) Send() error {
}
}

if p.Config.Amqp.Subject != "" {
msg.Properties = &amqp.MessageProperties{Subject: &p.Config.Amqp.Subject}
if len(p.Config.Amqp.Subjects) > 0 {
msg.Properties = &amqp.MessageProperties{Subject: &p.Config.Amqp.Subjects[metrics.MessagesPublished.Get()%uint64(len(p.Config.Amqp.Subjects))]}
}

if p.Config.StreamFilterValueSet != "" {
Expand Down
3 changes: 2 additions & 1 deletion pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,12 @@ var QueueTypes = map[QueueType][]string{
}

type AmqpOptions struct {
Subject string
Subjects []string
SendSettled bool
ReleaseRate int
RejectRate int
BindingKey string
PropertyFilters map[string]string
AppProperties map[string][]string
AppPropertyFilters map[string]string
}
Expand Down

0 comments on commit 77496f2

Please sign in to comment.