Skip to content

Commit

Permalink
Merge pull request #1528 from stebet/otel-integration-package
Browse files Browse the repository at this point in the history
Adding proper OpenTelemetry integration via. registration helpers and better context propagation
  • Loading branch information
lukebakken authored Jun 4, 2024
2 parents a912b00 + 58fab44 commit 72713ee
Show file tree
Hide file tree
Showing 12 changed files with 916 additions and 87 deletions.
6 changes: 6 additions & 0 deletions RabbitMQDotNetClient.sln
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Common", "projects\Test\Com
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "ToxiproxyNetCore", "projects\toxiproxy-netcore\src\ToxiproxyNetCore\ToxiproxyNetCore.csproj", "{AB5B7C53-D7EC-4985-A6DE-70178E4B688A}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "RabbitMQ.Client.OpenTelemetry", "projects\RabbitMQ.Client.OpenTelemetry\RabbitMQ.Client.OpenTelemetry.csproj", "{16BF2086-AC7D-4EC3-8660-CC16E663ACB1}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Expand Down Expand Up @@ -90,6 +92,10 @@ Global
{AB5B7C53-D7EC-4985-A6DE-70178E4B688A}.Debug|Any CPU.Build.0 = Debug|Any CPU
{AB5B7C53-D7EC-4985-A6DE-70178E4B688A}.Release|Any CPU.ActiveCfg = Release|Any CPU
{AB5B7C53-D7EC-4985-A6DE-70178E4B688A}.Release|Any CPU.Build.0 = Release|Any CPU
{16BF2086-AC7D-4EC3-8660-CC16E663ACB1}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{16BF2086-AC7D-4EC3-8660-CC16E663ACB1}.Debug|Any CPU.Build.0 = Debug|Any CPU
{16BF2086-AC7D-4EC3-8660-CC16E663ACB1}.Release|Any CPU.ActiveCfg = Release|Any CPU
{16BF2086-AC7D-4EC3-8660-CC16E663ACB1}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
Expand Down
54 changes: 54 additions & 0 deletions projects/RabbitMQ.Client.OpenTelemetry/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
# RabbitMQ .NET Client - OpenTelemetry Instrumentation

## Introduction
This library makes it easy to instrument your RabbitMQ .NET Client applications with OpenTelemetry.

## Examples
The following examples demonstrate how to use the RabbitMQ .NET Client OpenTelemetry Instrumentation.

### Basic Usage

#### ASP.NET Core Configuration Example
```csharp
using OpenTelemetry.Trace;

// Configure the OpenTelemetry SDK to trace ASP.NET Core, the RabbitMQ .NET Client and export the traces to the console.
// Also configures context propagation to propagate the TraceContext and Baggage using the W3C specification.
var compositeTextMapPropagator = new CompositeTextMapPropagator(new TextMapPropagator[]
{
new TraceContextPropagator(),
new BaggagePropagator()
});

Sdk.SetDefaultTextMapPropagator(compositeTextMapPropagator);

builder.Services.AddOpenTelemetry()
.ConfigureResource(resource => resource
.AddService(serviceName: builder.Environment.ApplicationName))
.WithTracing(tracing => tracing
.AddAspNetCoreInstrumentation()
.AddRabbitMQInstrumentation()
.AddConsoleExporter());
```

#### Console Application Configuration Example
```csharp
using OpenTelemetry.Trace;

// Configure the OpenTelemetry SDK to trace ASP.NET Core, the RabbitMQ .NET Client and export the traces to the console.
// Also configures context propagation to propagate the TraceContext and Baggage using the W3C specification.
var compositeTextMapPropagator = new CompositeTextMapPropagator(new TextMapPropagator[]
{
new TraceContextPropagator(),
new BaggagePropagator()
});

Sdk.SetDefaultTextMapPropagator(compositeTextMapPropagator);

var tracerProvider = Sdk.CreateTracerProviderBuilder()
.AddRabbitMQInstrumentation()
.AddConsoleExporter()
.Build();
```
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<TargetFrameworks>net6.0;netstandard2.0</TargetFrameworks>
<NoWarn>$(NoWarn);CS1591</NoWarn>
<TreatWarningsAsErrors>true</TreatWarningsAsErrors>
<AssemblyTitle>RabbitMQ OpenTelemetry Integration Package for .NET</AssemblyTitle>
<Authors>VMware</Authors>
<Company>VMware, Inc. or its affiliates.</Company>
<Copyright>Copyright © 2007-2023 VMware, Inc. or its affiliates.</Copyright>
<Description>The RabbitMQ OpenTelemetry Library for .NET adds convenience extension methods for RabbitMQ/OpenTelemetry</Description>
<GenerateDocumentationFile>true</GenerateDocumentationFile>
<PackageIcon>icon.png</PackageIcon>
<PackageLicenseExpression>Apache-2.0 OR MPL-2.0</PackageLicenseExpression>
<PackageProjectUrl>https://www.rabbitmq.com/dotnet.html</PackageProjectUrl>
<PackageTags>rabbitmq, amqp, oauth2</PackageTags>
<Product>RabbitMQ</Product>
<PublishRepositoryUrl>true</PublishRepositoryUrl>
<RepositoryUrl>https://github.com/rabbitmq/rabbitmq-dotnet-client.git</RepositoryUrl>
<IncludeSymbols>true</IncludeSymbols>
<SymbolPackageFormat>snupkg</SymbolPackageFormat>
<AssemblyOriginatorKeyFile>../rabbit.snk</AssemblyOriginatorKeyFile>
<SignAssembly>true</SignAssembly>
<MinVerTagPrefix>otel-</MinVerTagPrefix>
<MinVerVerbosity>minimal</MinVerVerbosity>
<GeneratePackageOnBuild>true</GeneratePackageOnBuild>
<PackageOutputPath>../../packages</PackageOutputPath>
<PackageReadmeFile>README.md</PackageReadmeFile>
<LangVersion>7.3</LangVersion>
</PropertyGroup>

<PropertyGroup Condition="'$(Configuration)' == 'Release' And '$(CI)' == 'true'">
<ContinuousIntegrationBuild>true</ContinuousIntegrationBuild>
<Deterministic>true</Deterministic>
<EmbedUntrackedSources>true</EmbedUntrackedSources>
</PropertyGroup>

<ItemGroup Condition="'$(Configuration)' == 'Release' and '$(SourceRoot)' == ''">
<SourceRoot Include="$(MSBuildThisFileDirectory)/" />
</ItemGroup>

<ItemGroup>
<None Remove="icon.png" />
<Content Include="icon.png" PackagePath="" />
<None Include="README.md" Pack="true" PackagePath="/" />
<InternalsVisibleTo Include="Unit" />
<InternalsVisibleTo Include="Benchmarks" />
</ItemGroup>

<ItemGroup>
<PackageReference Include="Microsoft.CodeAnalysis.PublicApiAnalyzers" Version="3.3.4" PrivateAssets="all" />
<PackageReference Include="Microsoft.SourceLink.GitHub" Version="8.0.0" PrivateAssets="all" />
<PackageReference Include="MinVer" Version="5.0.0" PrivateAssets="all" />
<PackageReference Include="OpenTelemetry.Api" Version="1.7.0" />
</ItemGroup>

<ItemGroup Condition="$(TargetFramework) == 'netstandard2.0'">
</ItemGroup>

<ItemGroup>
<ProjectReference Include="../RabbitMQ.Client/RabbitMQ.Client.csproj" />
</ItemGroup>

</Project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Text;
using OpenTelemetry.Context.Propagation;
using RabbitMQ.Client;

namespace OpenTelemetry.Trace
{
public static class OpenTelemetryExtensions
{
public static TracerProviderBuilder AddRabbitMQInstrumentation(this TracerProviderBuilder builder)
{
RabbitMQActivitySource.UseRoutingKeyAsOperationName = true;
RabbitMQActivitySource.ContextExtractor = OpenTelemetryContextExtractor;
RabbitMQActivitySource.ContextInjector = OpenTelemetryContextInjector;
builder.AddSource("RabbitMQ.Client.*");
return builder;
}

private static ActivityContext OpenTelemetryContextExtractor(IReadOnlyBasicProperties props)
{
// Extract the PropagationContext of the upstream parent from the message headers.
var parentContext = Propagators.DefaultTextMapPropagator.Extract(default, props.Headers, OpenTelemetryContextGetter);
Baggage.Current = parentContext.Baggage;
return parentContext.ActivityContext;
}

private static IEnumerable<string> OpenTelemetryContextGetter(IDictionary<string, object> carrier, string key)
{
try
{
if (carrier.TryGetValue(key, out object value) && value is byte[] bytes)
{
return new[] { Encoding.UTF8.GetString(bytes) };
}
}
catch (Exception)
{
//this.logger.LogError(ex, "Failed to extract trace context.");
}

return Enumerable.Empty<string>();
}

private static void OpenTelemetryContextInjector(Activity activity, IDictionary<string, object> props)
{
// Inject the current Activity's context into the message headers.
Propagators.DefaultTextMapPropagator.Inject(new PropagationContext(activity.Context, Baggage.Current), props, OpenTelemetryContextSetter);
}

private static void OpenTelemetryContextSetter(IDictionary<string, object> carrier, string key, string value)
{
carrier[key] = Encoding.UTF8.GetBytes(value);
}
}
}
Binary file added projects/RabbitMQ.Client.OpenTelemetry/icon.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
4 changes: 4 additions & 0 deletions projects/RabbitMQ.Client/PublicAPI.Unshipped.txt
Original file line number Diff line number Diff line change
Expand Up @@ -956,4 +956,8 @@ virtual RabbitMQ.Client.TcpClientAdapter.ReceiveTimeout.set -> void
~static RabbitMQ.Client.IConnectionExtensions.CloseAsync(this RabbitMQ.Client.IConnection connection, System.TimeSpan timeout) -> System.Threading.Tasks.Task
~static RabbitMQ.Client.IConnectionExtensions.CloseAsync(this RabbitMQ.Client.IConnection connection, ushort reasonCode, string reasonText, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task
~static RabbitMQ.Client.IConnectionExtensions.CloseAsync(this RabbitMQ.Client.IConnection connection, ushort reasonCode, string reasonText, System.TimeSpan timeout) -> System.Threading.Tasks.Task
~static RabbitMQ.Client.RabbitMQActivitySource.ContextExtractor.get -> System.Func<RabbitMQ.Client.IReadOnlyBasicProperties, System.Diagnostics.ActivityContext>
~static RabbitMQ.Client.RabbitMQActivitySource.ContextExtractor.set -> void
~static RabbitMQ.Client.RabbitMQActivitySource.ContextInjector.get -> System.Action<System.Diagnostics.Activity, System.Collections.Generic.IDictionary<string, object>>
~static RabbitMQ.Client.RabbitMQActivitySource.ContextInjector.set -> void
~virtual RabbitMQ.Client.DefaultBasicConsumer.HandleBasicDeliverAsync(string consumerTag, ulong deliveryTag, bool redelivered, string exchange, string routingKey, RabbitMQ.Client.ReadOnlyBasicProperties properties, System.ReadOnlyMemory<byte> body) -> System.Threading.Tasks.Task
14 changes: 9 additions & 5 deletions projects/RabbitMQ.Client/client/api/IChannelExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@

using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;
using RabbitMQ.Client.client.impl;
Expand Down Expand Up @@ -87,17 +88,20 @@ public static Task<string> BasicConsumeAsync(this IChannel channel, string queue
/// <remarks>
/// The publication occurs with mandatory=false and immediate=false.
/// </remarks>
public static ValueTask BasicPublishAsync<T>(this IChannel channel, PublicationAddress addr, in T basicProperties, ReadOnlyMemory<byte> body)
public static ValueTask BasicPublishAsync<T>(this IChannel channel, PublicationAddress addr, in T basicProperties,
ReadOnlyMemory<byte> body)
where T : IReadOnlyBasicProperties, IAmqpHeader
{
return channel.BasicPublishAsync(addr.ExchangeName, addr.RoutingKey, basicProperties, body);
}

public static ValueTask BasicPublishAsync(this IChannel channel, string exchange, string routingKey, ReadOnlyMemory<byte> body = default, bool mandatory = false)
=> channel.BasicPublishAsync(exchange, routingKey, EmptyBasicProperty.Empty, body, mandatory);
public static ValueTask BasicPublishAsync(this IChannel channel, string exchange, string routingKey,
ReadOnlyMemory<byte> body = default, bool mandatory = false) =>
channel.BasicPublishAsync(exchange, routingKey, EmptyBasicProperty.Empty, body, mandatory);

public static ValueTask BasicPublishAsync(this IChannel channel, CachedString exchange, CachedString routingKey, ReadOnlyMemory<byte> body = default, bool mandatory = false)
=> channel.BasicPublishAsync(exchange, routingKey, EmptyBasicProperty.Empty, body, mandatory);
public static ValueTask BasicPublishAsync(this IChannel channel, CachedString exchange,
CachedString routingKey, ReadOnlyMemory<byte> body = default, bool mandatory = false) =>
channel.BasicPublishAsync(exchange, routingKey, EmptyBasicProperty.Empty, body, mandatory);

#nullable disable

Expand Down
28 changes: 7 additions & 21 deletions projects/RabbitMQ.Client/client/impl/ChannelBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1048,20 +1048,6 @@ await ModelSendAsync(method, k.CancellationToken)
}
}

private static void InjectTraceContextIntoBasicProperties(object propsObj, string key, string value)
{
if (!(propsObj is Dictionary<string, object> headers))
{
return;
}

// Only propagate headers if they haven't already been set
if (!headers.ContainsKey(key))
{
headers[key] = value;
}
}

public async ValueTask BasicPublishAsync<TProperties>(string exchange, string routingKey,
TProperties basicProperties, ReadOnlyMemory<byte> body, bool mandatory,
CancellationToken cancellationToken)
Expand All @@ -1083,10 +1069,12 @@ await _confirmSemaphore.WaitAsync(cancellationToken)

try
{
var cmd = new BasicPublish(exchange, routingKey, mandatory, default);
RabbitMQActivitySource.TryGetExistingContext(basicProperties, out ActivityContext existingContext);
var cmd = new BasicPublishMemory(
Encoding.UTF8.GetBytes(exchange),
Encoding.UTF8.GetBytes(routingKey),
mandatory, default);
using Activity sendActivity = RabbitMQActivitySource.PublisherHasListeners
? RabbitMQActivitySource.Send(routingKey, exchange, body.Length, existingContext)
? RabbitMQActivitySource.Send(routingKey, exchange, body.Length)
: default;

if (sendActivity != null)
Expand Down Expand Up @@ -1144,10 +1132,8 @@ await _confirmSemaphore.WaitAsync(cancellationToken)
try
{
var cmd = new BasicPublishMemory(exchange.Bytes, routingKey.Bytes, mandatory, default);

RabbitMQActivitySource.TryGetExistingContext(basicProperties, out ActivityContext existingContext);
using Activity sendActivity = RabbitMQActivitySource.PublisherHasListeners
? RabbitMQActivitySource.Send(routingKey.Value, exchange.Value, body.Length, existingContext)
? RabbitMQActivitySource.Send(routingKey.Value, exchange.Value, body.Length)
: default;

if (sendActivity != null)
Expand Down Expand Up @@ -1908,7 +1894,7 @@ private static BasicProperties PopulateActivityAndPropagateTraceId<TProperties>(
IDictionary<string, object> headers = props.Headers ?? new Dictionary<string, object>();

// Inject the ActivityContext into the message headers to propagate trace context to the receiving service.
DistributedContextPropagator.Current.Inject(sendActivity, headers, InjectTraceContextIntoBasicProperties);
RabbitMQActivitySource.ContextInjector(sendActivity, headers);
props.Headers = headers;
return props;
}
Expand Down
Loading

0 comments on commit 72713ee

Please sign in to comment.