-
Notifications
You must be signed in to change notification settings - Fork 142
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(otel): add opentelemety utility functions #272
base: main
Are you sure you want to change the base?
Changes from all commits
d537aee
d292598
75a6aeb
ccf814a
13a1894
e0fa7c6
1aeb2d0
47aa58b
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||||||
---|---|---|---|---|---|---|---|---|---|---|
|
@@ -1492,7 +1492,7 @@ func (ch *Channel) Publish(exchange, key string, mandatory, immediate bool, msg | |||||||||
/* | ||||||||||
PublishWithContext sends a Publishing from the client to an exchange on the server. | ||||||||||
|
||||||||||
NOTE: this function is equivalent to [Channel.Publish]. Context is not honoured. | ||||||||||
NOTE: Context termination is not honoured. | ||||||||||
|
||||||||||
When you want a single message to be delivered to a single queue, you can | ||||||||||
publish to the default exchange with the routingKey of the queue name. This is | ||||||||||
|
@@ -1523,8 +1523,9 @@ confirmations start at 1. Exit when all publishings are confirmed. | |||||||||
When Publish does not return an error and the channel is in confirm mode, the | ||||||||||
internal counter for DeliveryTags with the first confirmation starts at 1. | ||||||||||
*/ | ||||||||||
func (ch *Channel) PublishWithContext(_ context.Context, exchange, key string, mandatory, immediate bool, msg Publishing) error { | ||||||||||
return ch.Publish(exchange, key, mandatory, immediate, msg) | ||||||||||
func (ch *Channel) PublishWithContext(ctx context.Context, exchange, key string, mandatory, immediate bool, msg Publishing) error { | ||||||||||
_, err := ch.PublishWithDeferredConfirmWithContext(ctx, exchange, key, mandatory, immediate, msg) | ||||||||||
return err | ||||||||||
} | ||||||||||
|
||||||||||
/* | ||||||||||
|
@@ -1583,11 +1584,18 @@ DeferredConfirmation, allowing the caller to wait on the publisher confirmation | |||||||||
for this message. If the channel has not been put into confirm mode, | ||||||||||
the DeferredConfirmation will be nil. | ||||||||||
|
||||||||||
NOTE: PublishWithDeferredConfirmWithContext is equivalent to its non-context variant. The context passed | ||||||||||
to this function is not honoured. | ||||||||||
NOTE: PublishWithDeferredConfirmWithContext is equivalent to its non-context | ||||||||||
variant. The termination of the context passed to this function is not | ||||||||||
honoured. | ||||||||||
*/ | ||||||||||
func (ch *Channel) PublishWithDeferredConfirmWithContext(_ context.Context, exchange, key string, mandatory, immediate bool, msg Publishing) (*DeferredConfirmation, error) { | ||||||||||
return ch.PublishWithDeferredConfirm(exchange, key, mandatory, immediate, msg) | ||||||||||
func (ch *Channel) PublishWithDeferredConfirmWithContext(ctx context.Context, exchange, key string, mandatory, immediate bool, msg Publishing) (*DeferredConfirmation, error) { | ||||||||||
_, msg, errFn := spanForPublication(ctx, msg, exchange, key, immediate) | ||||||||||
dc, err := ch.PublishWithDeferredConfirm(exchange, key, mandatory, immediate, msg) | ||||||||||
if err != nil { | ||||||||||
errFn(err) | ||||||||||
Comment on lines
+1594
to
+1595
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
Maybe also rename |
||||||||||
return nil, err | ||||||||||
} | ||||||||||
return dc, nil | ||||||||||
} | ||||||||||
|
||||||||||
/* | ||||||||||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -6,8 +6,12 @@ | |
package amqp091 | ||
|
||
import ( | ||
"errors" | ||
"time" | ||
"context" | ||
"errors" | ||
"fmt" | ||
"time" | ||
|
||
"go.opentelemetry.io/otel/trace" | ||
) | ||
|
||
var errDeliveryNotInitialized = errors.New("delivery not initialized") | ||
|
@@ -17,88 +21,106 @@ var errDeliveryNotInitialized = errors.New("delivery not initialized") | |
// | ||
// Applications can provide mock implementations in tests of Delivery handlers. | ||
type Acknowledger interface { | ||
Ack(tag uint64, multiple bool) error | ||
Nack(tag uint64, multiple, requeue bool) error | ||
Reject(tag uint64, requeue bool) error | ||
Ack(tag uint64, multiple bool) error | ||
Nack(tag uint64, multiple, requeue bool) error | ||
Reject(tag uint64, requeue bool) error | ||
} | ||
|
||
// Delivery captures the fields for a previously delivered message resident in | ||
// a queue to be delivered by the server to a consumer from Channel.Consume or | ||
// Channel.Get. | ||
type Delivery struct { | ||
Acknowledger Acknowledger // the channel from which this delivery arrived | ||
|
||
Headers Table // Application or header exchange table | ||
|
||
// Properties | ||
ContentType string // MIME content type | ||
ContentEncoding string // MIME content encoding | ||
DeliveryMode uint8 // queue implementation use - non-persistent (1) or persistent (2) | ||
Priority uint8 // queue implementation use - 0 to 9 | ||
CorrelationId string // application use - correlation identifier | ||
ReplyTo string // application use - address to reply to (ex: RPC) | ||
Expiration string // implementation use - message expiration spec | ||
MessageId string // application use - message identifier | ||
Timestamp time.Time // application use - message timestamp | ||
Type string // application use - message type name | ||
UserId string // application use - creating user - should be authenticated user | ||
AppId string // application use - creating application id | ||
|
||
// Valid only with Channel.Consume | ||
ConsumerTag string | ||
|
||
// Valid only with Channel.Get | ||
MessageCount uint32 | ||
|
||
DeliveryTag uint64 | ||
Redelivered bool | ||
Exchange string // basic.publish exchange | ||
RoutingKey string // basic.publish routing key | ||
|
||
Body []byte | ||
Acknowledger Acknowledger // the channel from which this delivery arrived | ||
|
||
Headers Table // Application or header exchange table | ||
|
||
// Properties | ||
ContentType string // MIME content type | ||
ContentEncoding string // MIME content encoding | ||
DeliveryMode uint8 // queue implementation use - non-persistent (1) or persistent (2) | ||
Priority uint8 // queue implementation use - 0 to 9 | ||
CorrelationId string // application use - correlation identifier | ||
ReplyTo string // application use - address to reply to (ex: RPC) | ||
Expiration string // implementation use - message expiration spec | ||
MessageId string // application use - message identifier | ||
Timestamp time.Time // application use - message timestamp | ||
Type string // application use - message type name | ||
UserId string // application use - creating user - should be authenticated user | ||
AppId string // application use - creating application id | ||
|
||
// Valid only with Channel.Consume | ||
ConsumerTag string | ||
|
||
// Valid only with Channel.Get | ||
MessageCount uint32 | ||
|
||
DeliveryTag uint64 | ||
Redelivered bool | ||
Exchange string // basic.publish exchange | ||
RoutingKey string // basic.publish routing key | ||
|
||
Body []byte | ||
} | ||
|
||
// Span returns context and a span that for the delivery | ||
// the resulting span is linked to the publication that created it, if it has | ||
// the appropraite headers set. See [context-propagation] for more details | ||
// | ||
// [context-propagation]: https://opentelemetry.io/docs/concepts/context-propagation/ | ||
func (d Delivery) Span( | ||
ctx context.Context, | ||
options ...trace.SpanStartOption, | ||
) (context.Context, trace.Span) { | ||
return spanForDelivery(ctx, &d, options...) | ||
} | ||
|
||
// Link returns a link for the delivery. The link points to the publication, if | ||
// the appropriate headers are set. | ||
func (d Delivery) Link(ctx context.Context) trace.Link { | ||
return extractLinkFromDelivery(ctx, &d) | ||
} | ||
|
||
func newDelivery(channel *Channel, msg messageWithContent) *Delivery { | ||
props, body := msg.getContent() | ||
|
||
delivery := Delivery{ | ||
Acknowledger: channel, | ||
|
||
Headers: props.Headers, | ||
ContentType: props.ContentType, | ||
ContentEncoding: props.ContentEncoding, | ||
DeliveryMode: props.DeliveryMode, | ||
Priority: props.Priority, | ||
CorrelationId: props.CorrelationId, | ||
ReplyTo: props.ReplyTo, | ||
Expiration: props.Expiration, | ||
MessageId: props.MessageId, | ||
Timestamp: props.Timestamp, | ||
Type: props.Type, | ||
UserId: props.UserId, | ||
AppId: props.AppId, | ||
|
||
Body: body, | ||
} | ||
|
||
// Properties for the delivery types | ||
switch m := msg.(type) { | ||
case *basicDeliver: | ||
delivery.ConsumerTag = m.ConsumerTag | ||
delivery.DeliveryTag = m.DeliveryTag | ||
delivery.Redelivered = m.Redelivered | ||
delivery.Exchange = m.Exchange | ||
delivery.RoutingKey = m.RoutingKey | ||
|
||
case *basicGetOk: | ||
delivery.MessageCount = m.MessageCount | ||
delivery.DeliveryTag = m.DeliveryTag | ||
delivery.Redelivered = m.Redelivered | ||
delivery.Exchange = m.Exchange | ||
delivery.RoutingKey = m.RoutingKey | ||
} | ||
|
||
return &delivery | ||
props, body := msg.getContent() | ||
|
||
delivery := Delivery{ | ||
Acknowledger: channel, | ||
|
||
Headers: props.Headers, | ||
ContentType: props.ContentType, | ||
ContentEncoding: props.ContentEncoding, | ||
DeliveryMode: props.DeliveryMode, | ||
Priority: props.Priority, | ||
CorrelationId: props.CorrelationId, | ||
ReplyTo: props.ReplyTo, | ||
Expiration: props.Expiration, | ||
MessageId: props.MessageId, | ||
Timestamp: props.Timestamp, | ||
Type: props.Type, | ||
UserId: props.UserId, | ||
AppId: props.AppId, | ||
|
||
Body: body, | ||
} | ||
|
||
// Properties for the delivery types | ||
switch m := msg.(type) { | ||
case *basicDeliver: | ||
delivery.ConsumerTag = m.ConsumerTag | ||
delivery.DeliveryTag = m.DeliveryTag | ||
delivery.Redelivered = m.Redelivered | ||
delivery.Exchange = m.Exchange | ||
delivery.RoutingKey = m.RoutingKey | ||
|
||
case *basicGetOk: | ||
delivery.MessageCount = m.MessageCount | ||
delivery.DeliveryTag = m.DeliveryTag | ||
delivery.Redelivered = m.Redelivered | ||
delivery.Exchange = m.Exchange | ||
delivery.RoutingKey = m.RoutingKey | ||
} | ||
|
||
return &delivery | ||
} | ||
|
||
/* | ||
|
@@ -121,10 +143,10 @@ Either Delivery.Ack, Delivery.Reject or Delivery.Nack must be called for every | |
delivery that is not automatically acknowledged. | ||
*/ | ||
func (d Delivery) Ack(multiple bool) error { | ||
if d.Acknowledger == nil { | ||
return errDeliveryNotInitialized | ||
} | ||
return d.Acknowledger.Ack(d.DeliveryTag, multiple) | ||
if d.Acknowledger == nil { | ||
return errDeliveryNotInitialized | ||
} | ||
return d.Acknowledger.Ack(d.DeliveryTag, multiple) | ||
} | ||
|
||
/* | ||
|
@@ -141,10 +163,10 @@ Either Delivery.Ack, Delivery.Reject or Delivery.Nack must be called for every | |
delivery that is not automatically acknowledged. | ||
*/ | ||
func (d Delivery) Reject(requeue bool) error { | ||
if d.Acknowledger == nil { | ||
return errDeliveryNotInitialized | ||
} | ||
return d.Acknowledger.Reject(d.DeliveryTag, requeue) | ||
if d.Acknowledger == nil { | ||
return errDeliveryNotInitialized | ||
} | ||
return d.Acknowledger.Reject(d.DeliveryTag, requeue) | ||
} | ||
|
||
/* | ||
|
@@ -166,8 +188,43 @@ Either Delivery.Ack, Delivery.Reject or Delivery.Nack must be called for every | |
delivery that is not automatically acknowledged. | ||
*/ | ||
func (d Delivery) Nack(multiple, requeue bool) error { | ||
if d.Acknowledger == nil { | ||
return errDeliveryNotInitialized | ||
} | ||
return d.Acknowledger.Nack(d.DeliveryTag, multiple, requeue) | ||
if d.Acknowledger == nil { | ||
return errDeliveryNotInitialized | ||
} | ||
return d.Acknowledger.Nack(d.DeliveryTag, multiple, requeue) | ||
} | ||
|
||
type DeliveryResponse uint8 | ||
|
||
const ( | ||
Ack DeliveryResponse = iota | ||
Reject | ||
Nack | ||
) | ||
|
||
func (r DeliveryResponse) Name() string { | ||
switch r { | ||
case Ack: | ||
return "ack" | ||
case Nack: | ||
return "nack" | ||
case Reject: | ||
return "reject" | ||
default: | ||
return "unknown" | ||
} | ||
} | ||
|
||
func (d Delivery) Settle(ctx context.Context, response DeliveryResponse, multiple, requeue bool) error { | ||
defer settleDelivery(ctx, &d, response, multiple, requeue) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Based on my understanding of messaging semconv, ( |
||
switch response { | ||
case Ack: | ||
return d.Ack(multiple) | ||
case Nack: | ||
return d.Nack(multiple, requeue) | ||
case Reject: | ||
return d.Reject(requeue) | ||
default: | ||
return fmt.Errorf("unknown operation %s", response.Name()) | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,5 +1,17 @@ | ||
module github.com/rabbitmq/amqp091-go | ||
|
||
go 1.20 | ||
go 1.21 | ||
|
||
require go.uber.org/goleak v1.3.0 | ||
toolchain go1.22.0 | ||
|
||
require ( | ||
go.opentelemetry.io/otel v1.27.0 | ||
go.opentelemetry.io/otel/trace v1.27.0 | ||
go.uber.org/goleak v1.3.0 | ||
) | ||
|
||
require ( | ||
github.com/go-logr/logr v1.4.2 // indirect | ||
github.com/go-logr/stdr v1.2.2 // indirect | ||
go.opentelemetry.io/otel/metric v1.27.0 // indirect | ||
) |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,6 +1,23 @@ | ||
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= | ||
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= | ||
github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= | ||
github.com/go-logr/logr v1.4.2 h1:6pFjapn8bFcIbiKo3XT4j/BhANplGihG6tvd+8rYgrY= | ||
github.com/go-logr/logr v1.4.2/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= | ||
github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= | ||
github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE= | ||
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= | ||
github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= | ||
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= | ||
github.com/stretchr/testify v1.8.0 h1:pSgiaMZlXftHpm5L7V1+rVB+AZJydKsMxsQBIJw4PKk= | ||
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= | ||
github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= | ||
github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= | ||
go.opentelemetry.io/otel v1.27.0 h1:9BZoF3yMK/O1AafMiQTVu0YDj5Ea4hPhxCs7sGva+cg= | ||
go.opentelemetry.io/otel v1.27.0/go.mod h1:DMpAK8fzYRzs+bi3rS5REupisuqTheUlSZJ1WnZaPAQ= | ||
go.opentelemetry.io/otel/metric v1.27.0 h1:hvj3vdEKyeCi4YaYfNjv2NUje8FqKqUY8IlF0FxV/ik= | ||
go.opentelemetry.io/otel/metric v1.27.0/go.mod h1:mVFgmRlhljgBiuk/MP/oKylr4hs85GZAylncepAX/ak= | ||
go.opentelemetry.io/otel/trace v1.27.0 h1:IqYb813p7cmbHk0a5y6pD5JPakbVfftRXABGt5/Rscw= | ||
go.opentelemetry.io/otel/trace v1.27.0/go.mod h1:6RiD1hkAprV4/q+yd2ln1HG9GoPx39SuvvstaLBl+l4= | ||
go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= | ||
go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= | ||
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= | ||
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we're now using the context for span propagation in-process.