From 7607be63c8c258700df4544306f5b3e95a96b9ec Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Stef=C3=A1n=20J=C3=B6kull=20Sigur=C3=B0arson?= Date: Tue, 2 Apr 2024 14:50:35 +0000 Subject: [PATCH 1/9] * Making it possible to override the default context propagation in RabbitMQActivitySource to make it possible to add OpenTelemetry Baggage propagation * Adding a separate OpenTelemetry integration package to make it easier to integrate with OTel * Adding OTel tests to test trace context and baggage propagation --- RabbitMQDotNetClient.sln | 13 + .../RabbitMQ.Client.OpenTelemetry/README.md | 1 + .../RabbitMQ.Client.OpenTelemetry.csproj | 64 ++++ .../RabbitMQOpenTelemetryConfiguration.cs | 10 + .../TraceProviderBuilderExtensions.cs | 85 ++++++ .../RabbitMQ.Client.OpenTelemetry/icon.png | Bin 0 -> 6766 bytes .../RabbitMQ.Client/PublicAPI.Unshipped.txt | 6 +- .../RabbitMQ.Client/RabbitMQ.Client.csproj | 1 + .../client/api/IChannelExtensions.cs | 83 +++++- .../client/impl/ChannelBase.cs | 97 +----- .../client/impl/RabbitMQActivitySource.cs | 140 +++++---- projects/Test/OTel/OTel.csproj | 32 ++ .../Test/OTel/SequentialIntegrationFixture.cs | 82 +++++ projects/Test/OTel/TestOpenTelemetry.cs | 279 ++++++++++++++++++ .../TestActivitySource.cs | 235 +++++++++++++++ 15 files changed, 967 insertions(+), 161 deletions(-) create mode 100644 projects/RabbitMQ.Client.OpenTelemetry/README.md create mode 100644 projects/RabbitMQ.Client.OpenTelemetry/RabbitMQ.Client.OpenTelemetry.csproj create mode 100644 projects/RabbitMQ.Client.OpenTelemetry/RabbitMQOpenTelemetryConfiguration.cs create mode 100644 projects/RabbitMQ.Client.OpenTelemetry/TraceProviderBuilderExtensions.cs create mode 100644 projects/RabbitMQ.Client.OpenTelemetry/icon.png create mode 100644 projects/Test/OTel/OTel.csproj create mode 100644 projects/Test/OTel/SequentialIntegrationFixture.cs create mode 100644 projects/Test/OTel/TestOpenTelemetry.cs diff --git a/RabbitMQDotNetClient.sln b/RabbitMQDotNetClient.sln index 8717f1acee..88b7dc06ff 100644 --- a/RabbitMQDotNetClient.sln +++ b/RabbitMQDotNetClient.sln @@ -40,6 +40,10 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Common", "projects\Test\Com EndProject Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "ToxiproxyNetCore", "projects\toxiproxy-netcore\src\ToxiproxyNetCore\ToxiproxyNetCore.csproj", "{AB5B7C53-D7EC-4985-A6DE-70178E4B688A}" EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "RabbitMQ.Client.OpenTelemetry", "projects\RabbitMQ.Client.OpenTelemetry\RabbitMQ.Client.OpenTelemetry.csproj", "{16BF2086-AC7D-4EC3-8660-CC16E663ACB1}" +EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "OTel", "projects\Test\OTel\OTel.csproj", "{33E86EAF-C269-4336-8E5C-71418AE360A2}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -90,6 +94,14 @@ Global {AB5B7C53-D7EC-4985-A6DE-70178E4B688A}.Debug|Any CPU.Build.0 = Debug|Any CPU {AB5B7C53-D7EC-4985-A6DE-70178E4B688A}.Release|Any CPU.ActiveCfg = Release|Any CPU {AB5B7C53-D7EC-4985-A6DE-70178E4B688A}.Release|Any CPU.Build.0 = Release|Any CPU + {16BF2086-AC7D-4EC3-8660-CC16E663ACB1}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {16BF2086-AC7D-4EC3-8660-CC16E663ACB1}.Debug|Any CPU.Build.0 = Debug|Any CPU + {16BF2086-AC7D-4EC3-8660-CC16E663ACB1}.Release|Any CPU.ActiveCfg = Release|Any CPU + {16BF2086-AC7D-4EC3-8660-CC16E663ACB1}.Release|Any CPU.Build.0 = Release|Any CPU + {33E86EAF-C269-4336-8E5C-71418AE360A2}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {33E86EAF-C269-4336-8E5C-71418AE360A2}.Debug|Any CPU.Build.0 = Debug|Any CPU + {33E86EAF-C269-4336-8E5C-71418AE360A2}.Release|Any CPU.ActiveCfg = Release|Any CPU + {33E86EAF-C269-4336-8E5C-71418AE360A2}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE @@ -104,6 +116,7 @@ Global {F25725D7-2978-45F4-B90F-25D6F8B71C9E} = {EFD4BED5-13A5-4D9C-AADF-CAB7E1573704} {C11F25F4-7EA1-4874-9E25-DEB42E3A7C67} = {EFD4BED5-13A5-4D9C-AADF-CAB7E1573704} {AB5B7C53-D7EC-4985-A6DE-70178E4B688A} = {EFD4BED5-13A5-4D9C-AADF-CAB7E1573704} + {33E86EAF-C269-4336-8E5C-71418AE360A2} = {EFD4BED5-13A5-4D9C-AADF-CAB7E1573704} EndGlobalSection GlobalSection(ExtensibilityGlobals) = postSolution SolutionGuid = {3C6A0C44-FA63-4101-BBF9-2598641167D1} diff --git a/projects/RabbitMQ.Client.OpenTelemetry/README.md b/projects/RabbitMQ.Client.OpenTelemetry/README.md new file mode 100644 index 0000000000..dbcdcbe7bc --- /dev/null +++ b/projects/RabbitMQ.Client.OpenTelemetry/README.md @@ -0,0 +1 @@ +# RabbitMQ .NET Client - OAuth2 diff --git a/projects/RabbitMQ.Client.OpenTelemetry/RabbitMQ.Client.OpenTelemetry.csproj b/projects/RabbitMQ.Client.OpenTelemetry/RabbitMQ.Client.OpenTelemetry.csproj new file mode 100644 index 0000000000..834c361c18 --- /dev/null +++ b/projects/RabbitMQ.Client.OpenTelemetry/RabbitMQ.Client.OpenTelemetry.csproj @@ -0,0 +1,64 @@ + + + + net6.0;netstandard2.0 + $(NoWarn);CS1591 + true + RabbitMQ OpenTelemetry Integration Package for .NET + VMware + VMware, Inc. or its affiliates. + Copyright © 2007-2023 VMware, Inc. or its affiliates. + The RabbitMQ OAuth2 Client Library for .NET enables OAuth2 token refresh for RabbitMQ.Client + true + icon.png + Apache-2.0 OR MPL-2.0 + https://www.rabbitmq.com/dotnet.html + rabbitmq, amqp, oauth2 + RabbitMQ + true + https://github.com/rabbitmq/rabbitmq-dotnet-client.git + true + snupkg + ../rabbit.snk + true + otel- + minimal + true + ../../packages + README.md + 7.3 + + + + true + true + true + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/projects/RabbitMQ.Client.OpenTelemetry/RabbitMQOpenTelemetryConfiguration.cs b/projects/RabbitMQ.Client.OpenTelemetry/RabbitMQOpenTelemetryConfiguration.cs new file mode 100644 index 0000000000..7748444f27 --- /dev/null +++ b/projects/RabbitMQ.Client.OpenTelemetry/RabbitMQOpenTelemetryConfiguration.cs @@ -0,0 +1,10 @@ +namespace RabbitMQ.Client.OpenTelemetry +{ + public class RabbitMQOpenTelemetryConfiguration + { + public bool PropagateBaggage { get; set; } = true; + public bool UseRoutingKeyAsOperationName { get; set; } = true; + public bool IncludePublishers { get; set; } = true; + public bool IncludeSubscribers { get; set; } = true; + } +} diff --git a/projects/RabbitMQ.Client.OpenTelemetry/TraceProviderBuilderExtensions.cs b/projects/RabbitMQ.Client.OpenTelemetry/TraceProviderBuilderExtensions.cs new file mode 100644 index 0000000000..14e5c489dd --- /dev/null +++ b/projects/RabbitMQ.Client.OpenTelemetry/TraceProviderBuilderExtensions.cs @@ -0,0 +1,85 @@ +using System; +using System.Collections.Generic; +using System.Diagnostics; +using System.Linq; +using System.Text; +using OpenTelemetry.Context.Propagation; +using RabbitMQ.Client; +using RabbitMQ.Client.OpenTelemetry; + +namespace OpenTelemetry.Trace +{ + public static class OpenTelemetryExtensions + { + internal static TextMapPropagator s_propagator = Propagators.DefaultTextMapPropagator; + + public static TracerProviderBuilder AddRabbitMQ(this TracerProviderBuilder builder, + RabbitMQOpenTelemetryConfiguration configuration) + { + if (configuration.PropagateBaggage) + { + s_propagator = new CompositeTextMapPropagator(new TextMapPropagator[] + { + new TraceContextPropagator(), new BaggagePropagator() + }); + } + else + { + s_propagator = new TraceContextPropagator(); + } + + RabbitMQActivitySource.UseRoutingKeyAsOperationName = configuration.UseRoutingKeyAsOperationName; + RabbitMQActivitySource.ContextExtractor = OpenTelemetryContextExtractor; + RabbitMQActivitySource.ContextInjector = OpenTelemetryContextInjector; + + if (configuration.IncludeSubscribers) + { + builder.AddSource(RabbitMQActivitySource.SubscriberSourceName); + } + + if (configuration.IncludePublishers) + { + builder.AddSource(RabbitMQActivitySource.PublisherSourceName); + } + + return builder; + } + + private static ActivityContext OpenTelemetryContextExtractor(IReadOnlyBasicProperties props) + { + // Extract the PropagationContext of the upstream parent from the message headers. + var parentContext = s_propagator.Extract(default, props.Headers, OpenTelemetryContextGetter); + Baggage.Current = parentContext.Baggage; + return parentContext.ActivityContext; + } + + private static IEnumerable OpenTelemetryContextGetter(IDictionary carrier, string key) + { + try + { + if (carrier.TryGetValue(key, out object value)) + { + byte[] bytes = value as byte[]; + return new[] { Encoding.UTF8.GetString(bytes) }; + } + } + catch (Exception) + { + //this.logger.LogError(ex, "Failed to extract trace context."); + } + + return Enumerable.Empty(); + } + + private static void OpenTelemetryContextInjector(Activity activity, IDictionary props) + { + // Inject the current Activity's context into the message headers. + s_propagator.Inject(new PropagationContext(activity.Context, Baggage.Current), props, OpenTelemetryContextSetter); + } + + private static void OpenTelemetryContextSetter(IDictionary carrier, string key, string value) + { + carrier[key] = Encoding.UTF8.GetBytes(value); + } + } +} diff --git a/projects/RabbitMQ.Client.OpenTelemetry/icon.png b/projects/RabbitMQ.Client.OpenTelemetry/icon.png new file mode 100644 index 0000000000000000000000000000000000000000..092bfef15cf639ddcce7bcde13e5d4b83787566e GIT binary patch literal 6766 zcmV-!8j zaB^>EX>4U6ba`-PAZ2)IW&i+q+O3&scH=s-g#Tj|y#&k&EC=H`y@R*>{t~1lk5%@) z&nlY|Nnyx{jErQN^?(1}=0E&In~I4^O0GFueqxQ)H{KL`{nYc<*?9h*pT{LWzdLW9 z7d)o|$8dd=cHiGPpFR)d`up>y+*f?L2f8mj1`Im0=goY(NY4A~yo>ihbzN_AE%J7$ z-CxIX-*}dC|ITcOo=S3E($D7Yi6}&ACMF8rui%0`f8(`+aQ!xZ1Al!e+-E~M@1r>v z^1TuQ@@{*NqqT1X^e)JccjjaCpYJ{d-?#gG_z=r{$A~vyUU2E_;GYn`nK(RG#>6+Vao#SwWB}TuzaP)roaL$z=zWM<$pr|jmy@VPjsQEDbGC&!Ff3C%ydFz>P zxWdfSajSO>=J?9DFY~91zj>Lxm88zu@{ASh^2%$Lp~&f9?g9|EZ`{%e@Z)vEZ{P4o zsfs3?Fc&u1Y`=!+$-TH04$gu562~iroRN7CK!}(-u^1OXz-AYc%g&1T;v8`-0MuAH z`QSSUxRm^4kv@iyy%A5!FgM>3ckcDcXRoKxCV)sG2LYQJ8eqkk;E%)thgu3Lrj&9L zge}$7a>y~KoU>#Vxh6rPB*`LDq)IEH#F9!bQc9_%RbK-dm};(4ORcpvZyFOd#%kQ( zIMaO(J@(XdmtK19Z9tz9M;dv^D5H)x{R|VCn0d-9v(C1hSt!MdE3Ld_l~q^UaBYVj zciMT&F1zmbMD0fP)9V+gxf?Zqkkb3)i5h2($YThna*`A?5cAQ2xF`Y;+AC(ZxEQ@6 zr;$gCua>65WzS<{CqsBjK*^%gdq)0LH+9dX}?P3{1? zOCK7Z)w1`o_US_RVJ$JfF`cj3QFu$6i9K3^(g^29+OPM8N)3nSE|ul`wlws_F!dmF zrD5GPWd$Otl#tfQlrmsITBzEZJ7bx;hd0n*<=j^lAKK;{IYlV5s`9EWIjBXST&rXZ ztnuRx6XXf)!cncupO1`D4V#7SurI*(T_@~Ik3f&JEtohJL7QcdGD)oSDd=B9*ZR!F z!PHH$+}nqgDmPlayra!+r_y%oqnt0y3B6YV?F}#V%t~xXpwYE1Bi&;npNp({7bVXi zs{^|2!|%?^_Ku}-2#Z9}^k}k!AM;?bW0TSC9m;bwk2hZ&@fBXs zi{$I-e}lrMN?HTrpbnGkWJnFn%95N-px%$GvyntZN-{!?hi+-48%Jye97eEtw$nvm| zayFL2q8~;NC#zA%y8ZY{m&|uUrAYyPnjx~N-H73R`HlSAt%*Te1cJ?S!V!0Jf?%`q zlN0QQRkTZ@_;lrjJaS@c+PxBjt!>Q+lzq%cF&cHf&|j&(4>m(QYD#Zt_@KGi%Qxez zG9@TC{1j4iTWa=6fy$&xE<5_Al9?0!#=b@h`@Kg?%*_e<74csr#OHi4|12Rs=Yx5l z5EI0>4CRYv?~CyS8eXvq-J}{iMD=6~`#SkzI^;+z60ij`WyeVTRO8l$gzmj6Is0Cf z4hWa_7P*Y-%@Ww?qG=_@1tuJ=I}WzT9C}i z9#$C!W0-TAQ5?$%b4rSMMAVGn#n7f2nyAUQaBPY}pjL5CHWPSzIKMGLW|8Dc#fm6$ zofLuNwQ8J6N~iDj072R*ge=9iqd-^jdQ4+HkdudH;{QtaB-9R4+3Orw@V*g<(!D zC;+!h96fTaJ|+S(S`IkGiibBjHMNXSPSi7JArpOSL2t@y`LHXQ!+O=LD8Kry=mci8 zy4BTU8C|cg<1x>*UI6#`uD(O!*;>6OyB9(+z|+SL-kYQ=6vzjg8J^9R%(PV=WY$$@ zEnrvCIa6HAX3^mdmgbAlO7+OdPxeIS>nxRmWlnsi+-ccUCP(sI8<7OY73q~0?yD%A zG*@;vdRe>G-bXe-XYKlmfu2?$chyBxPZLKwIwKNN>8NPJePg8T3XKlsjKapASP`1| z&g>$J)0t0Z_a7e3FTUU=JWu-9A*+vO2xZla=G>Kk+E9IeG(Wx35|2a!47+i3=~-;G z^6R3_tIXPea-`aY9w&sHG?XzqZR2a%UBRpLy0g)421?{YOwbhsnfIjnUFX^O}xiHWk;)R z&_X3Cb%gs$#lEp|X;H3BbC+beM84Uusz<3^hL&p`3OCXr>mwpF&{TKQ!3io#CGJhd zEMiOvLq|+0YGg{DvtTG#HNL}&^h~r%LR*~J+D#%os$-YkhDkQ0A%*@uWmP{WFOr$U z?TE$$6ecpOOVNss0nSgP%mpb1mSU(6$WGC|$o~T92edceTc;^}e{SrUq?jUs~J0A*BL#WU$Z0}lX5-Bfm1B>no!1qEC zfwU_b6(RIRpYz#u$ae{d)}cqioLB&K&9fMsrke7UQ$;+OLG2d86pzsYWFadk+Nob) z&z>H`fNfEBigp3&WUn4rO|Bkur)(oFX4{Ol)aSzZe-K6zeOm}3o1mj0SP+9ngq7`; z9wZw605(+}yUgfOF>k8HTsnPJ+^MT|H57@aniTh%0s2fiyV?d-H&CR>T~w9~N%+XI zsy-LbUE*Y*c|X6jYT$oZ4nJ2z^Y6;yUkCbIIsD93WB*Ng{PRE-^>!}}*B+Rn1I8t1 z>6EWr*Cg^(m=)+ET%k2U*y>PlhFUDDr-H3L6b*s$r;syr6<1t`{=1&Q-yiO1$YVwF zDRggc`}mFf3pqdIb&jTmr{t~s+(@YqVA95!p{g9T9$N3B8;Dui00RR@zxzo1D&lH%eh zxE37zSgbm@IP2=*DhPrfAWn`>iY`*(|B^zB7!Qv7@$TN^?j7K7RG4aZjRUG?8R=v~ z$mUjs;41=%B8&lyO3c*fL@@==@pTU$U+>~P%lq7)qfg134DboWvrIQE;tk^IO-tvz zPaI(-Ng+Nb9y92I#E)E;U4G+SblA@`BSt1YPaGi@iXAL>Fe@1<@f2}PQ8miYT&7<|$tLvo}5O@E;Pyr0oG<$(TMpm)vft+kKS2OvXTrEY+OLtvyx z+3Ozf?&<99-!rZLegJiZa;T-EdY1qI00v@9M??Vs0RI60puMM)00009a7bBm000ib z000ib0l1NC?EnA(2XskIMF->q91j=|1(u^W000Z2NklQ*YiN@fAXaWXf;sY2>6k|wy;6WnBCOl|Bjf99u z#1Ei=5d1AicOz4w2w z^yb-^-#EleC5MIajkAUarknu9>|MlSi}G+(@bL)ide zc=^#c`1x1kY{v_z3dm9*mH%W4fuuRa&L3{##fSDg8^z(*_wv(QPGj=W222j%?sE)0 z4x?BrSTzX3a}N&l;MWg=9w-+8p4k!8o*BxvuI9^TW$8_K+fzY1HJ-XVb~cI^9 z8vzBiW=RX*6PwM2D5NS#D*WkZLrfkSDi;8DJ+%q5)a`N#`gHH}qt2TD_tRsjbvR8{ zP!%TrJIbM5X}JK<8n0z)e>ryj2>{TUHO`t(nhkK*Jc6Q<&C$$T6XgPcKxXIE%9gSL zKs;<3b=r;7Akab*NZCkW3AAPud7LqAmrMirvT069pz$;WMbdZ5#{tAK6|~u#`8ChW zN)`poy#P-PP#t#EZdq?jW24i`rvbdEPlqdHRlRvKDpC0WposHOt>BKTo5&-O%N7YN zLnydb6K_L^I9fgpARgj@Wg$>D4X=Y&od#&PQa|ugAPTR8SDmPO3};JRJ`LcU1ZF8g z4oRRtwQS5NvA72LR_@fPyvd+HbW}e<^1g zNZ^1bz+FeyYeGoLB7q`)eDug+BrrDWWekF1qO!dXE)pDzFr3_0#dTq!Kx*E z$+>3P?22fB>@c-+7&ufgvn_w;4s5cyYyj}Yom?)E^%>5L0{~Xl;?s=;dQ#1%Q%*0y zbtqUI#sM02Kd~S|G3M={0S3gt+UI{|%?WfK2^3K4E)LLYTW>RFZO92+bt=GGj~d2C z0RXEF0a4`f@Ojw|hWH?Xs^T<+9@ji_nOb-qTv-yBGr<6W!L)$egCX9=Atihr?Ai-Z zaT`KURs7`xq_v;{1~LH-0pRUih=AIXMgpDAn?e-3Q`18YPoZE1qa}?5I#;0%4^(L4 zkz!iP2>`e1UrAc-`kJ0dz0q^kP30qj&HLSxJdLOO2E+L8&SQD8L`-oV7e+OcSsZzW) z#paP0BUls>K`m&v4U@%91npIE7tK@aRY3%sABPjG*^dZnvj5BSJf%mAR3MFn4eL|R z{lFBLe__JD@4C7%oRa|{Nh6FW0N4m(YqQ``9vxB^cyWt7#=N72`K=%k>Kyt8ra^!S?rAhogxhzfn1`Je56ETslh@X#F_>!+RFjJ;2x7V3rdG*02cP z)h5N~U+mb!@s2|D^VyS*`*?kvBwmqB)p_#XF+(^MP)&(Y_WxzXUx}C?t4l0GYbuVz zOjK1=avG*#E-JeIDB`b9V31W91wwt+2@JBzTo^=cAc0jzfQXtf4^=h*m_Pz6y#9d% z1|vWqfz@99zyYd`g5U*IoCpLGSlOlTl0YfEiw{yE<*#`pFy{lo3#cRrG}H;KxOl7+ z7-ZFP0K^10Flc}v%dnjkW9%Y{eaid{&xLs+&dj?f*DxCWM;P_Y7T~ZS-QRX z)~>akn`Kc%6kUN!C=@_#NC{h(sWc#Bd-=|bW^_xz_UeM~OEHh!D-nqEXA8T(A9b(! zsT+w?>ca^aUD@QqD@NG1J)x5vrPFQ#uzu_;Kk-)w;jUtf+3-7&boX`H?AZK0>S!9zz8iO=xmH!wz zW4!uAhkL)=;_#jkP|1&iiBkmVO>s(^O4;h*JCi6%un3oa?j5ebYl`6w?_Ne*GA;*R znq|kQhd8i%)NOGk0TZdGV$tQY_-s9Yicfv-;Bqd0z9bttqAaQd>X6C7;+58N1zKaApq|U-Nc$0@!U0 zZ%@_2OC&=h!r0lP2D8y!C+2*H$2t(F&f-N<%n1zA$J%%*7T(wpgK=&KLOwwMTPO9> z&B%srh)a%^g#tr9K>v2r-#l5DwG}AU> zfWCL8pGZ)<)51jRa`)#XFsx~RzPYFS0z?~N9Cp?_7!nZBmn5kt5*TgEY7ILDStt?H zw{)iccuvj3tt@9#+^GPS@G_`h7l6)mgBR{QM55#FRwjLor+?86*E<|chy?a;HxFaMpiI_p`rn2v^>v;G(6VCehK!MU%Yo;ua!Rth#+2ELhQ-Ybi+*vV!|Eoq4vl zM3!}9Z7#jO#X0YPi>>FisE=&LL>mERrnwhTqIf++jjbTinmI(8G%*;4)}Kqe3Hx7| z=J`iD?0IY>X%c0$+=|=VTv2Lq8UV1QNi>D&jKE-#B;yHlc+oulY7RWu+_sD6SHlQa zC9m=CXB`xXV;c^JEJ+#IA~zt9uVy!iKvt4n^Q&5py#++F%d`+M7Q=kt_|RlZel#o` z4qYC*U~G1|)xx8H$KQn+2(ZzLP;l*@4i2&$vneZjhtVLbAMybLYC|Zv;-)J^0;|nh zLY=_sDt|(qzyN}f6Igi(bpk8P0R%ME39LL33;?J!3+%oQ4*38TU4IQFVyaFALL{)l z_MZ?53^k8WxExVgdxJBoQgKk&fX# zmrh_)g9LY=Qj!vJ&IhMsuD)f6*Pdyiof<|hYI4&M=BPy4J}(NVf-x40P_NH&)y?Zf zRpF64#`*Pk8`yRoQ6-WTn=W4=ghg-AIN@id_uH}m>ISY(z0Ku*PF>(k&zt0V-`3ty zfKzEx`h3xZz78dkBe-Tdf zOj(;f_bWeIlwI9+?3fna#V2UfTAi2KwQ!tWX$+-YaK#KCyGgk4+EIZ21Au|U{O}FX Q{Qv*}07*qoM6N<$f|tF>D*ylh literal 0 HcmV?d00001 diff --git a/projects/RabbitMQ.Client/PublicAPI.Unshipped.txt b/projects/RabbitMQ.Client/PublicAPI.Unshipped.txt index 58d5f93495..cb0c882910 100644 --- a/projects/RabbitMQ.Client/PublicAPI.Unshipped.txt +++ b/projects/RabbitMQ.Client/PublicAPI.Unshipped.txt @@ -848,7 +848,7 @@ static RabbitMQ.Client.Events.CallbackExceptionEventArgs.Build(System.Exception static RabbitMQ.Client.ExchangeType.All() -> System.Collections.Generic.ICollection static RabbitMQ.Client.IChannelExtensions.BasicPublishAsync(this RabbitMQ.Client.IChannel channel, RabbitMQ.Client.CachedString exchange, RabbitMQ.Client.CachedString routingKey, System.ReadOnlyMemory body = default(System.ReadOnlyMemory), bool mandatory = false) -> System.Threading.Tasks.ValueTask static RabbitMQ.Client.IChannelExtensions.BasicPublishAsync(this RabbitMQ.Client.IChannel channel, string exchange, string routingKey, System.ReadOnlyMemory body = default(System.ReadOnlyMemory), bool mandatory = false) -> System.Threading.Tasks.ValueTask -static RabbitMQ.Client.IChannelExtensions.BasicPublishAsync(this RabbitMQ.Client.IChannel channel, RabbitMQ.Client.PublicationAddress addr, in T basicProperties, System.ReadOnlyMemory body) -> System.Threading.Tasks.ValueTask +static RabbitMQ.Client.IChannelExtensions.BasicPublishAsync(this RabbitMQ.Client.IChannel! channel, RabbitMQ.Client.PublicationAddress! addr, T basicProperties, System.ReadOnlyMemory body) -> System.Threading.Tasks.ValueTask static RabbitMQ.Client.PublicationAddress.Parse(string uriLikeString) -> RabbitMQ.Client.PublicationAddress static RabbitMQ.Client.PublicationAddress.TryParse(string uriLikeString, out RabbitMQ.Client.PublicationAddress result) -> bool static RabbitMQ.Client.QueueDeclareOk.implicit operator string(RabbitMQ.Client.QueueDeclareOk declareOk) -> string @@ -956,4 +956,8 @@ virtual RabbitMQ.Client.TcpClientAdapter.ReceiveTimeout.set -> void ~static RabbitMQ.Client.IConnectionExtensions.CloseAsync(this RabbitMQ.Client.IConnection connection, System.TimeSpan timeout) -> System.Threading.Tasks.Task ~static RabbitMQ.Client.IConnectionExtensions.CloseAsync(this RabbitMQ.Client.IConnection connection, ushort reasonCode, string reasonText) -> System.Threading.Tasks.Task ~static RabbitMQ.Client.IConnectionExtensions.CloseAsync(this RabbitMQ.Client.IConnection connection, ushort reasonCode, string reasonText, System.TimeSpan timeout) -> System.Threading.Tasks.Task +~static RabbitMQ.Client.RabbitMQActivitySource.ContextExtractor.get -> System.Func +~static RabbitMQ.Client.RabbitMQActivitySource.ContextExtractor.set -> void +~static RabbitMQ.Client.RabbitMQActivitySource.ContextInjector.get -> System.Action> +~static RabbitMQ.Client.RabbitMQActivitySource.ContextInjector.set -> void ~virtual RabbitMQ.Client.DefaultBasicConsumer.HandleBasicDeliverAsync(string consumerTag, ulong deliveryTag, bool redelivered, string exchange, string routingKey, RabbitMQ.Client.ReadOnlyBasicProperties properties, System.ReadOnlyMemory body) -> System.Threading.Tasks.Task diff --git a/projects/RabbitMQ.Client/RabbitMQ.Client.csproj b/projects/RabbitMQ.Client/RabbitMQ.Client.csproj index 56cceae989..817ceb40e3 100644 --- a/projects/RabbitMQ.Client/RabbitMQ.Client.csproj +++ b/projects/RabbitMQ.Client/RabbitMQ.Client.csproj @@ -55,6 +55,7 @@ + diff --git a/projects/RabbitMQ.Client/client/api/IChannelExtensions.cs b/projects/RabbitMQ.Client/client/api/IChannelExtensions.cs index f48be542fa..3c636f1b57 100644 --- a/projects/RabbitMQ.Client/client/api/IChannelExtensions.cs +++ b/projects/RabbitMQ.Client/client/api/IChannelExtensions.cs @@ -31,6 +31,7 @@ using System; using System.Collections.Generic; +using System.Diagnostics; using System.Threading.Tasks; using RabbitMQ.Client.client.impl; @@ -86,17 +87,58 @@ public static Task BasicConsumeAsync(this IChannel channel, string queue /// /// The publication occurs with mandatory=false and immediate=false. /// - public static ValueTask BasicPublishAsync(this IChannel channel, PublicationAddress addr, in T basicProperties, ReadOnlyMemory body) + public static async ValueTask BasicPublishAsync(this IChannel channel, PublicationAddress addr, T basicProperties, ReadOnlyMemory body) where T : IReadOnlyBasicProperties, IAmqpHeader { - return channel.BasicPublishAsync(addr.ExchangeName, addr.RoutingKey, basicProperties, body); + using Activity? sendActivity = RabbitMQActivitySource.PublisherHasListeners + ? RabbitMQActivitySource.Send(addr.RoutingKey, addr.ExchangeName, body.Length) + : default; + + if (sendActivity != null) + { + BasicProperties props = PopulateActivityAndPropagateTraceId(EmptyBasicProperty.Empty, sendActivity); + await channel.BasicPublishAsync(addr.ExchangeName, addr.RoutingKey, props, body); + } + else + { + await channel.BasicPublishAsync(addr.ExchangeName, addr.RoutingKey, basicProperties, body); + } + } + + public static async ValueTask BasicPublishAsync(this IChannel channel, string exchange, string routingKey, ReadOnlyMemory body = default, bool mandatory = false) + { + using Activity? sendActivity = RabbitMQActivitySource.PublisherHasListeners + ? RabbitMQActivitySource.Send(routingKey, exchange, body.Length) + : default; + + if (sendActivity != null) + { + BasicProperties props = PopulateActivityAndPropagateTraceId(EmptyBasicProperty.Empty, sendActivity); + await channel.BasicPublishAsync(exchange, routingKey, props, body, mandatory); + } + else + { + await channel.BasicPublishAsync(exchange, routingKey, EmptyBasicProperty.Empty, body, mandatory); + } } - public static ValueTask BasicPublishAsync(this IChannel channel, string exchange, string routingKey, ReadOnlyMemory body = default, bool mandatory = false) - => channel.BasicPublishAsync(exchange, routingKey, EmptyBasicProperty.Empty, body, mandatory); + public static async ValueTask BasicPublishAsync(this IChannel channel, CachedString exchange, + CachedString routingKey, ReadOnlyMemory body = default, bool mandatory = false) + { + using Activity? sendActivity = RabbitMQActivitySource.PublisherHasListeners + ? RabbitMQActivitySource.Send(routingKey.Value, exchange.Value, body.Length) + : default; - public static ValueTask BasicPublishAsync(this IChannel channel, CachedString exchange, CachedString routingKey, ReadOnlyMemory body = default, bool mandatory = false) - => channel.BasicPublishAsync(exchange, routingKey, EmptyBasicProperty.Empty, body, mandatory); + if (sendActivity != null) + { + BasicProperties props = PopulateActivityAndPropagateTraceId(EmptyBasicProperty.Empty, sendActivity); + await channel.BasicPublishAsync(exchange, routingKey, props, body, mandatory); + } + else + { + await channel.BasicPublishAsync(exchange, routingKey, EmptyBasicProperty.Empty, body, mandatory); + } + } #nullable disable @@ -185,5 +227,34 @@ public static Task CloseAsync(this IChannel channel, ushort replyCode, string re { return channel.CloseAsync(replyCode, replyText, false); } + + private static BasicProperties PopulateActivityAndPropagateTraceId(TProperties basicProperties, + Activity sendActivity) where TProperties : IReadOnlyBasicProperties, IAmqpHeader + { + // This activity is marked as recorded, so let's propagate the trace and span ids. + if (sendActivity.IsAllDataRequested) + { + if (!string.IsNullOrEmpty(basicProperties.CorrelationId)) + { + sendActivity.SetTag(RabbitMQActivitySource.MessageConversationId, basicProperties.CorrelationId); + } + } + + BasicProperties props = default; + if (basicProperties is BasicProperties properties) + { + props = properties; + } + else if (basicProperties is EmptyBasicProperty) + { + props = new BasicProperties(); + } + + var headers = props.Headers ?? new Dictionary(); + // Inject the ActivityContext into the message headers to propagate trace context to the receiving service. + RabbitMQActivitySource.ContextInjector(sendActivity, headers); + props.Headers = headers; + return props; + } } } diff --git a/projects/RabbitMQ.Client/client/impl/ChannelBase.cs b/projects/RabbitMQ.Client/client/impl/ChannelBase.cs index 514c80aa20..3382cd1afe 100644 --- a/projects/RabbitMQ.Client/client/impl/ChannelBase.cs +++ b/projects/RabbitMQ.Client/client/impl/ChannelBase.cs @@ -1036,22 +1036,7 @@ public async ValueTask BasicPublishAsync(string exchange, string ro try { var cmd = new BasicPublish(exchange, routingKey, mandatory, default); - RabbitMQActivitySource.TryGetExistingContext(basicProperties, out ActivityContext existingContext); - using Activity sendActivity = RabbitMQActivitySource.PublisherHasListeners - ? RabbitMQActivitySource.Send(routingKey, exchange, body.Length, existingContext) - : default; - - if (sendActivity != null) - { - BasicProperties props = PopulateActivityAndPropagateTraceId(basicProperties, sendActivity); - // TODO cancellation token - await ModelSendAsync(in cmd, in props, body, CancellationToken.None); - } - else - { - // TODO cancellation token - await ModelSendAsync(in cmd, in basicProperties, body, CancellationToken.None); - } + await ModelSendAsync(in cmd, in basicProperties, body, CancellationToken.None); } catch { @@ -1068,20 +1053,6 @@ public async ValueTask BasicPublishAsync(string exchange, string ro } } - private static void InjectTraceContextIntoBasicProperties(object propsObj, string key, string value) - { - if (!(propsObj is Dictionary headers)) - { - return; - } - - // Only propagate headers if they haven't already been set - if (!headers.ContainsKey(key)) - { - headers[key] = value; - } - } - public async void BasicPublish(CachedString exchange, CachedString routingKey, TProperties basicProperties, ReadOnlyMemory body, bool mandatory) where TProperties : IReadOnlyBasicProperties, IAmqpHeader @@ -1097,23 +1068,7 @@ public async void BasicPublish(CachedString exchange, CachedString try { var cmd = new BasicPublishMemory(exchange.Bytes, routingKey.Bytes, mandatory, default); - - RabbitMQActivitySource.TryGetExistingContext(basicProperties, out ActivityContext existingContext); - using Activity sendActivity = RabbitMQActivitySource.PublisherHasListeners - ? RabbitMQActivitySource.Send(routingKey.Value, exchange.Value, body.Length, existingContext) - : default; - - if (sendActivity != null) - { - BasicProperties props = PopulateActivityAndPropagateTraceId(basicProperties, sendActivity); - // TODO cancellation token - await ModelSendAsync(in cmd, in basicProperties, body, CancellationToken.None); - } - else - { - // TODO cancellation token - await ModelSendAsync(in cmd, in basicProperties, body, CancellationToken.None); - } + await ModelSendAsync(in cmd, in basicProperties, body, CancellationToken.None); } catch { @@ -1145,23 +1100,7 @@ public async ValueTask BasicPublishAsync(CachedString exchange, Cac try { var cmd = new BasicPublishMemory(exchange.Bytes, routingKey.Bytes, mandatory, default); - - RabbitMQActivitySource.TryGetExistingContext(basicProperties, out ActivityContext existingContext); - using Activity sendActivity = RabbitMQActivitySource.PublisherHasListeners - ? RabbitMQActivitySource.Send(routingKey.Value, exchange.Value, body.Length, existingContext) - : default; - - if (sendActivity != null) - { - BasicProperties props = PopulateActivityAndPropagateTraceId(basicProperties, sendActivity); - // TODO cancellation token - await ModelSendAsync(in cmd, in props, body, CancellationToken.None); - } - else - { - // TODO cancellation token - await ModelSendAsync(in cmd, in basicProperties, body, CancellationToken.None); - } + await ModelSendAsync(in cmd, in basicProperties, body, CancellationToken.None); } catch { @@ -1844,35 +1783,5 @@ await CloseAsync(ea, false) throw; } } - - private static BasicProperties PopulateActivityAndPropagateTraceId(TProperties basicProperties, - Activity sendActivity) where TProperties : IReadOnlyBasicProperties, IAmqpHeader - { - // This activity is marked as recorded, so let's propagate the trace and span ids. - if (sendActivity.IsAllDataRequested) - { - if (!string.IsNullOrEmpty(basicProperties.CorrelationId)) - { - sendActivity.SetTag(RabbitMQActivitySource.MessageConversationId, basicProperties.CorrelationId); - } - } - - BasicProperties props = default; - if (basicProperties is BasicProperties properties) - { - props = properties; - } - else if (basicProperties is EmptyBasicProperty) - { - props = new BasicProperties(); - } - - var headers = props.Headers ?? new Dictionary(); - - // Inject the ActivityContext into the message headers to propagate trace context to the receiving service. - DistributedContextPropagator.Current.Inject(sendActivity, headers, InjectTraceContextIntoBasicProperties); - props.Headers = headers; - return props; - } } } diff --git a/projects/RabbitMQ.Client/client/impl/RabbitMQActivitySource.cs b/projects/RabbitMQ.Client/client/impl/RabbitMQActivitySource.cs index f68d4e1596..b62690b310 100644 --- a/projects/RabbitMQ.Client/client/impl/RabbitMQActivitySource.cs +++ b/projects/RabbitMQ.Client/client/impl/RabbitMQActivitySource.cs @@ -1,4 +1,5 @@ -using System.Collections.Generic; +using System; +using System.Collections.Generic; using System.Diagnostics; using System.Net; using System.Net.Sockets; @@ -29,12 +30,20 @@ public static class RabbitMQActivitySource .GetCustomAttribute() ?.InformationalVersion ?? ""; - private static readonly ActivitySource s_publisherSource = new ActivitySource(PublisherSourceName, AssemblyVersion); - private static readonly ActivitySource s_subscriberSource = new ActivitySource(SubscriberSourceName, AssemblyVersion); + private static readonly ActivitySource s_publisherSource = + new ActivitySource(PublisherSourceName, AssemblyVersion); + + private static readonly ActivitySource s_subscriberSource = + new ActivitySource(SubscriberSourceName, AssemblyVersion); public const string PublisherSourceName = "RabbitMQ.Client.Publisher"; public const string SubscriberSourceName = "RabbitMQ.Client.Subscriber"; + public static Action> ContextInjector { get; set; } = DefaultContextInjector; + + public static Func ContextExtractor { get; set; } = + DefaultContextExtractor; + public static bool UseRoutingKeyAsOperationName { get; set; } = true; internal static bool PublisherHasListeners => s_publisherSource.HasListeners(); internal static bool SubscriberHasListeners => s_subscriberSource.HasListeners(); @@ -52,9 +61,11 @@ internal static Activity Send(string routingKey, string exchange, int bodySize, if (s_publisherSource.HasListeners()) { Activity activity = linkedContext == default - ? s_publisherSource.StartRabbitMQActivity(UseRoutingKeyAsOperationName ? $"{routingKey} publish" : "publish", + ? s_publisherSource.StartRabbitMQActivity( + UseRoutingKeyAsOperationName ? $"{routingKey} publish" : "publish", ActivityKind.Producer) - : s_publisherSource.StartLinkedRabbitMQActivity(UseRoutingKeyAsOperationName ? $"{routingKey} publish" : "publish", + : s_publisherSource.StartLinkedRabbitMQActivity( + UseRoutingKeyAsOperationName ? $"{routingKey} publish" : "publish", ActivityKind.Producer, linkedContext); if (activity?.IsAllDataRequested == true) { @@ -74,7 +85,8 @@ internal static Activity ReceiveEmpty(string queue) return null; } - Activity activity = s_subscriberSource.StartRabbitMQActivity(UseRoutingKeyAsOperationName ? $"{queue} receive" : "receive", + Activity activity = s_subscriberSource.StartRabbitMQActivity( + UseRoutingKeyAsOperationName ? $"{queue} receive" : "receive", ActivityKind.Consumer); if (activity.IsAllDataRequested) { @@ -95,12 +107,9 @@ internal static Activity Receive(string routingKey, string exchange, ulong deliv } // Extract the PropagationContext of the upstream parent from the message headers. - DistributedContextPropagator.Current.ExtractTraceIdAndState(readOnlyBasicProperties.Headers, - ExtractTraceIdAndState, out string traceParent, out string traceState); - ActivityContext.TryParse(traceParent, traceState, out ActivityContext linkedContext); Activity activity = s_subscriberSource.StartLinkedRabbitMQActivity( UseRoutingKeyAsOperationName ? $"{routingKey} receive" : "receive", ActivityKind.Consumer, - linkedContext); + ContextExtractor(readOnlyBasicProperties)); if (activity.IsAllDataRequested) { PopulateMessagingTags("receive", routingKey, exchange, deliveryTag, readOnlyBasicProperties, @@ -118,12 +127,9 @@ internal static Activity Deliver(BasicDeliverEventArgs deliverEventArgs) } // Extract the PropagationContext of the upstream parent from the message headers. - DistributedContextPropagator.Current.ExtractTraceIdAndState(deliverEventArgs.BasicProperties.Headers, - ExtractTraceIdAndState, out string traceparent, out string traceState); - ActivityContext.TryParse(traceparent, traceState, out ActivityContext parentContext); Activity activity = s_subscriberSource.StartLinkedRabbitMQActivity( UseRoutingKeyAsOperationName ? $"{deliverEventArgs.RoutingKey} deliver" : "deliver", - ActivityKind.Consumer, parentContext); + ActivityKind.Consumer, ContextExtractor(deliverEventArgs.BasicProperties)); if (activity != null && activity.IsAllDataRequested) { PopulateMessagingTags("deliver", deliverEventArgs.RoutingKey, deliverEventArgs.Exchange, @@ -192,52 +198,6 @@ internal static void PopulateMessageEnvelopeSize(Activity activity, int size) } } - internal static bool TryGetExistingContext(T props, out ActivityContext context) - where T : IReadOnlyBasicProperties - { - if (props.Headers == null) - { - context = default; - return false; - } - - bool hasHeaders = false; - foreach (string header in DistributedContextPropagator.Current.Fields) - { - if (props.Headers.ContainsKey(header)) - { - hasHeaders = true; - break; - } - } - - if (hasHeaders) - { - DistributedContextPropagator.Current.ExtractTraceIdAndState(props.Headers, ExtractTraceIdAndState, - out string traceParent, out string traceState); - return ActivityContext.TryParse(traceParent, traceState, out context); - } - - context = default; - return false; - } - - private static void ExtractTraceIdAndState(object props, string name, out string value, - out IEnumerable values) - { - if (props is Dictionary headers && headers.TryGetValue(name, out object propsVal) && - propsVal is byte[] bytes) - { - value = Encoding.UTF8.GetString(bytes); - values = default; - } - else - { - value = default; - values = default; - } - } - internal static void SetNetworkTags(this Activity activity, IFrameHandler frameHandler) { if (PublisherHasListeners && activity != null && activity.IsAllDataRequested) @@ -286,5 +246,65 @@ internal static void SetNetworkTags(this Activity activity, IFrameHandler frameH } } } + + private static void DefaultContextInjector(Activity sendActivity, IDictionary props) + { + props ??= new Dictionary(); + DistributedContextPropagator.Current.Inject(sendActivity, props, DefaultContextSetter); + } + + private static ActivityContext DefaultContextExtractor(IReadOnlyBasicProperties props) + { + if (props.Headers == null) + { + return default; + } + + bool hasHeaders = false; + foreach (string header in DistributedContextPropagator.Current.Fields) + { + if (props.Headers.ContainsKey(header)) + { + hasHeaders = true; + break; + } + } + + + if (!hasHeaders) + { + return default; + } + + DistributedContextPropagator.Current.ExtractTraceIdAndState(props.Headers, DefaultContextGetter, out string traceParent, out string traceState); + return ActivityContext.TryParse(traceParent, traceState, out ActivityContext context) ? context : default; + } + + private static void DefaultContextSetter(object carrier, string name, string value) + { + if (!(carrier is IDictionary carrierDictionary)) + { + return; + } + + // Only propagate headers if they haven't already been set + carrierDictionary[name] = value; + } + + private static void DefaultContextGetter(object carrier, string name, out string value, + out IEnumerable values) + { + if (carrier is IDictionary carrierDict && + carrierDict.TryGetValue(name, out object propsVal) && propsVal is byte[] bytes) + { + value = Encoding.UTF8.GetString(bytes); + values = default; + } + else + { + value = default; + values = default; + } + } } } diff --git a/projects/Test/OTel/OTel.csproj b/projects/Test/OTel/OTel.csproj new file mode 100644 index 0000000000..59450b21ef --- /dev/null +++ b/projects/Test/OTel/OTel.csproj @@ -0,0 +1,32 @@ + + + + net6.0;net472 + + + + net6.0 + + + + ../../rabbit.snk + true + true + 7.3 + OpenTelemetry.Tests + + + + + + + + + + + + + + + + diff --git a/projects/Test/OTel/SequentialIntegrationFixture.cs b/projects/Test/OTel/SequentialIntegrationFixture.cs new file mode 100644 index 0000000000..92fd4daac6 --- /dev/null +++ b/projects/Test/OTel/SequentialIntegrationFixture.cs @@ -0,0 +1,82 @@ +// This source code is dual-licensed under the Apache License, version +// 2.0, and the Mozilla Public License, version 2.0. +// +// The APL v2.0: +// +//--------------------------------------------------------------------------- +// Copyright (c) 2007-2020 VMware, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +//--------------------------------------------------------------------------- +// +// The MPL v2.0: +// +//--------------------------------------------------------------------------- +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at https://mozilla.org/MPL/2.0/. +// +// Copyright (c) 2007-2020 VMware, Inc. All rights reserved. +//--------------------------------------------------------------------------- + +using System; +using System.Threading.Tasks; +using RabbitMQ.Client; +using Xunit.Abstractions; + +namespace Test.OpenTelemetry.SequentialIntegration +{ + public class SequentialIntegrationFixture : TestConnectionRecoveryBase + { + public SequentialIntegrationFixture(ITestOutputHelper output) : base(output) + { + } + + public async Task RestartRabbitMqAsync() + { + await StopRabbitMqAsync(); + await Task.Delay(TimeSpan.FromSeconds(1)); + await StartRabbitMqAsync(); + await AwaitRabbitMqAsync(); + } + + public Task StopRabbitMqAsync() + { + return _rabbitMQCtl.ExecRabbitMQCtlAsync("stop_app"); + } + + public Task StartRabbitMqAsync() + { + return _rabbitMQCtl.ExecRabbitMQCtlAsync("start_app"); + } + + public Task RestartServerAndWaitForRecoveryAsync() + { + return RestartServerAndWaitForRecoveryAsync(_conn); + } + + public async Task RestartServerAndWaitForRecoveryAsync(IConnection conn) + { + TaskCompletionSource sl = PrepareForShutdown(conn); + TaskCompletionSource rl = PrepareForRecovery(conn); + await RestartRabbitMqAsync(); + await WaitAsync(sl, "connection shutdown"); + await WaitAsync(rl, "connection recovery"); + } + + private Task AwaitRabbitMqAsync() + { + return _rabbitMQCtl.ExecRabbitMQCtlAsync("await_startup"); + } + } +} diff --git a/projects/Test/OTel/TestOpenTelemetry.cs b/projects/Test/OTel/TestOpenTelemetry.cs new file mode 100644 index 0000000000..25c489dc85 --- /dev/null +++ b/projects/Test/OTel/TestOpenTelemetry.cs @@ -0,0 +1,279 @@ +// This source code is dual-licensed under the Apache License, version +// 2.0, and the Mozilla Public License, version 2.0. +// +// The APL v2.0: +// +//--------------------------------------------------------------------------- +// Copyright (c) 2007-2020 VMware, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +//--------------------------------------------------------------------------- +// +// The MPL v2.0: +// +//--------------------------------------------------------------------------- +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at https://mozilla.org/MPL/2.0/. +// +// Copyright (c) 2007-2020 VMware, Inc. All rights reserved. +//--------------------------------------------------------------------------- + +using System; +using System.Collections.Generic; +using System.Diagnostics; +using System.Linq; +using System.Text; +using System.Threading.Tasks; +using OpenTelemetry; +using OpenTelemetry.Trace; +using RabbitMQ.Client; +using RabbitMQ.Client.Events; +using RabbitMQ.Client.OpenTelemetry; +using Test.OpenTelemetry.SequentialIntegration; +using Xunit; +using Xunit.Abstractions; +using Xunit.Sdk; + +namespace Test.OpenTelemetry +{ + public class TestOpenTelemetry : SequentialIntegrationFixture + { + public TestOpenTelemetry(ITestOutputHelper output) : base(output) + { + } + + void AssertStringTagEquals(Activity activity, string name, string expected) + { + string tag = activity.GetTagItem(name) as string; + Assert.NotNull(tag); + Assert.Equal(expected, tag); + } + + void AssertStringTagStartsWith(Activity activity, string name, string expected) + { + string tag = activity.GetTagItem(name) as string; + Assert.NotNull(tag); + Assert.StartsWith(expected, tag); + } + + void AssertStringTagNotNullOrEmpty(Activity activity, string name) + { + string tag = activity.GetTagItem(name) as string; + Assert.NotNull(tag); + Assert.False(string.IsNullOrEmpty(tag)); + } + + void AssertIntTagGreaterThanZero(Activity activity, string name) + { + Assert.True(activity.GetTagItem(name) is int result && result > 0); + } + + [Theory] + [InlineData(true)] + [InlineData(false)] + public async Task TestPublisherAndConsumerActivityTags(bool useRoutingKeyAsOperationName) + { + var exportedItems = new List(); + using (var tracer = Sdk.CreateTracerProviderBuilder() + .AddRabbitMQ(new RabbitMQOpenTelemetryConfiguration()) + .AddInMemoryExporter(exportedItems) + .Build()) + { + string baggageGuid = Guid.NewGuid().ToString(); + Baggage.SetBaggage("TestItem", baggageGuid); + Assert.Equal(baggageGuid, Baggage.GetBaggage("TestItem")); + await _channel.ConfirmSelectAsync(); + + RabbitMQActivitySource.UseRoutingKeyAsOperationName = useRoutingKeyAsOperationName; + await Task.Delay(500); + string queueName = $"{Guid.NewGuid()}"; + QueueDeclareOk q = await _channel.QueueDeclareAsync(queueName); + byte[] sendBody = Encoding.UTF8.GetBytes("hi"); + byte[] consumeBody = null; + var consumer = new EventingBasicConsumer(_channel); + var consumerReceivedTcs = + new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + consumer.Received += (o, a) => + { + consumeBody = a.Body.ToArray(); + string baggageItem = Baggage.GetBaggage("TestItem"); + if (baggageItem == baggageGuid) + { + consumerReceivedTcs.SetResult(true); + } + else + { + consumerReceivedTcs.SetException( + EqualException.ForMismatchedStrings(baggageGuid, baggageItem, 0, 0)); + } + }; + + string consumerTag = await _channel.BasicConsumeAsync(queueName, autoAck: true, consumer: consumer); + await _channel.BasicPublishAsync("", q.QueueName, sendBody, mandatory: true); + await _channel.WaitForConfirmsOrDieAsync(); + Baggage.ClearBaggage(); + Assert.Null(Baggage.GetBaggage("TestItem")); + + await consumerReceivedTcs.Task.WaitAsync(TimeSpan.FromSeconds(5)); + Assert.True(await consumerReceivedTcs.Task); + + await _channel.BasicCancelAsync(consumerTag); + await Task.Delay(500); + AssertActivityData(useRoutingKeyAsOperationName, queueName, exportedItems, true); + } + } + + [Theory] + [InlineData(true)] + [InlineData(false)] + public async Task TestPublisherAndConsumerActivityTagsAsync(bool useRoutingKeyAsOperationName) + { + var exportedItems = new List(); + using (var tracer = Sdk.CreateTracerProviderBuilder() + .AddRabbitMQ(new RabbitMQOpenTelemetryConfiguration()) + .AddInMemoryExporter(exportedItems) + .Build()) + { + string baggageGuid = Guid.NewGuid().ToString(); + Baggage.SetBaggage("TestItem", baggageGuid); + Assert.Equal(baggageGuid, Baggage.GetBaggage("TestItem")); + await _channel.ConfirmSelectAsync(); + + RabbitMQActivitySource.UseRoutingKeyAsOperationName = useRoutingKeyAsOperationName; + await Task.Delay(500); + + string queueName = $"{Guid.NewGuid()}"; + QueueDeclareOk q = await _channel.QueueDeclareAsync(queueName); + byte[] sendBody = Encoding.UTF8.GetBytes("hi"); + byte[] consumeBody = null; + var consumer = new EventingBasicConsumer(_channel); + var consumerReceivedTcs = + new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + consumer.Received += (o, a) => + { + consumeBody = a.Body.ToArray(); + string baggageItem = Baggage.GetBaggage("TestItem"); + if (baggageItem == baggageGuid) + { + consumerReceivedTcs.SetResult(true); + } + else + { + consumerReceivedTcs.SetException( + EqualException.ForMismatchedStrings(baggageGuid, baggageItem, 0, 0)); + } + }; + + string consumerTag = await _channel.BasicConsumeAsync(queueName, autoAck: true, consumer: consumer); + await _channel.BasicPublishAsync("", q.QueueName, sendBody, mandatory: true); + await _channel.WaitForConfirmsOrDieAsync(); + Baggage.ClearBaggage(); + Assert.Null(Baggage.GetBaggage("TestItem")); + + await consumerReceivedTcs.Task.WaitAsync(TimeSpan.FromSeconds(5)); + Assert.True(await consumerReceivedTcs.Task); + + await _channel.BasicCancelAsync(consumerTag); + await Task.Delay(500); + AssertActivityData(useRoutingKeyAsOperationName, queueName, exportedItems, true); + } + } + + [Theory] + [InlineData(true)] + [InlineData(false)] + public async Task TestPublisherAndBasicGetActivityTags(bool useRoutingKeyAsOperationName) + { + var exportedItems = new List(); + using (var tracer = Sdk.CreateTracerProviderBuilder() + .AddRabbitMQ(new RabbitMQOpenTelemetryConfiguration()) + .AddInMemoryExporter(exportedItems) + .Build()) + { + string baggageGuid = Guid.NewGuid().ToString(); + Baggage.SetBaggage("TestItem", baggageGuid); + Assert.Equal(baggageGuid, Baggage.GetBaggage("TestItem")); + await _channel.ConfirmSelectAsync(); + RabbitMQActivitySource.UseRoutingKeyAsOperationName = useRoutingKeyAsOperationName; + await Task.Delay(500); + string queue = $"queue-{Guid.NewGuid()}"; + const string msg = "for basic.get"; + + try + { + await _channel.QueueDeclareAsync(queue, false, false, false, null); + await _channel.BasicPublishAsync("", queue, Encoding.UTF8.GetBytes(msg), mandatory: true); + await _channel.WaitForConfirmsOrDieAsync(); + Baggage.ClearBaggage(); + Assert.Null(Baggage.GetBaggage("TestItem")); + QueueDeclareOk ok = await _channel.QueueDeclarePassiveAsync(queue); + Assert.Equal(1u, ok.MessageCount); + BasicGetResult res = await _channel.BasicGetAsync(queue, true); + Assert.Equal(msg, Encoding.UTF8.GetString(res.Body.ToArray())); + ok = await _channel.QueueDeclarePassiveAsync(queue); + Assert.Equal(0u, ok.MessageCount); + await Task.Delay(500); + AssertActivityData(useRoutingKeyAsOperationName, queue, exportedItems, false); + } + finally + { + await _channel.QueueDeleteAsync(queue); + } + } + } + + private void AssertActivityData(bool useRoutingKeyAsOperationName, string queueName, + List activityList, bool isDeliver = false, string baggageGuid = null) + { + string childName = isDeliver ? "deliver" : "receive"; + Activity[] activities = activityList.ToArray(); + Assert.NotEmpty(activities); + foreach (var item in activities) + { + _output.WriteLine( + $"{item.Context.TraceId}: {item.OperationName}"); + _output.WriteLine($" Tags: {string.Join(", ", item.Tags.Select(x => $"{x.Key}: {x.Value}"))}"); + _output.WriteLine($" Links: {string.Join(", ", item.Links.Select(x => $"{x.Context.TraceId}"))}"); + } + + Activity sendActivity = activities.First(x => + x.OperationName == (useRoutingKeyAsOperationName ? $"{queueName} publish" : "publish") && + x.GetTagItem(RabbitMQActivitySource.MessagingDestinationRoutingKey) is string routingKeyTag && + routingKeyTag == $"{queueName}"); + Activity receiveActivity = activities.Single(x => + x.OperationName == (useRoutingKeyAsOperationName ? $"{queueName} {childName}" : $"{childName}") && + x.Links.First().Context.TraceId == sendActivity.TraceId); + Assert.Equal(ActivityKind.Producer, sendActivity.Kind); + Assert.Equal(ActivityKind.Consumer, receiveActivity.Kind); + Assert.Null(receiveActivity.ParentId); + AssertStringTagNotNullOrEmpty(sendActivity, "network.peer.address"); + AssertStringTagNotNullOrEmpty(sendActivity, "network.local.address"); + AssertStringTagNotNullOrEmpty(sendActivity, "server.address"); + AssertStringTagNotNullOrEmpty(sendActivity, "client.address"); + AssertIntTagGreaterThanZero(sendActivity, "network.peer.port"); + AssertIntTagGreaterThanZero(sendActivity, "network.local.port"); + AssertIntTagGreaterThanZero(sendActivity, "server.port"); + AssertIntTagGreaterThanZero(sendActivity, "client.port"); + AssertStringTagStartsWith(sendActivity, "network.type", "ipv"); + AssertStringTagEquals(sendActivity, RabbitMQActivitySource.MessagingSystem, "rabbitmq"); + AssertStringTagEquals(sendActivity, RabbitMQActivitySource.ProtocolName, "amqp"); + AssertStringTagEquals(sendActivity, RabbitMQActivitySource.ProtocolVersion, "0.9.1"); + AssertStringTagEquals(sendActivity, RabbitMQActivitySource.MessagingDestination, "amq.default"); + AssertStringTagEquals(sendActivity, RabbitMQActivitySource.MessagingDestinationRoutingKey, queueName); + AssertIntTagGreaterThanZero(sendActivity, RabbitMQActivitySource.MessagingEnvelopeSize); + AssertIntTagGreaterThanZero(sendActivity, RabbitMQActivitySource.MessagingBodySize); + AssertIntTagGreaterThanZero(receiveActivity, RabbitMQActivitySource.MessagingBodySize); + } + } +} diff --git a/projects/Test/SequentialIntegration/TestActivitySource.cs b/projects/Test/SequentialIntegration/TestActivitySource.cs index 2be177bec3..28e333b435 100644 --- a/projects/Test/SequentialIntegration/TestActivitySource.cs +++ b/projects/Test/SequentialIntegration/TestActivitySource.cs @@ -37,6 +37,7 @@ using System.Threading.Tasks; using RabbitMQ.Client; +using RabbitMQ.Client.client.impl; using RabbitMQ.Client.Events; using Xunit; using Xunit.Abstractions; @@ -113,6 +114,85 @@ public async Task TestPublisherAndConsumerActivityTags(bool useRoutingKeyAsOpera } } + [Theory] + [InlineData(true)] + [InlineData(false)] + public async Task TestPublisherWithCachedStringsAndConsumerActivityTags(bool useRoutingKeyAsOperationName) + { + await _channel.ConfirmSelectAsync(); + + RabbitMQActivitySource.UseRoutingKeyAsOperationName = useRoutingKeyAsOperationName; + var _activities = new List(); + using (ActivityListener activityListener = StartActivityListener(_activities)) + { + await Task.Delay(500); + string queueName = $"{Guid.NewGuid()}"; + QueueDeclareOk q = await _channel.QueueDeclareAsync(queueName); + byte[] sendBody = Encoding.UTF8.GetBytes("hi"); + byte[] consumeBody = null; + var consumer = new EventingBasicConsumer(_channel); + var consumerReceivedTcs = + new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + consumer.Received += (o, a) => + { + consumeBody = a.Body.ToArray(); + consumerReceivedTcs.SetResult(true); + }; + + string consumerTag = await _channel.BasicConsumeAsync(queueName, autoAck: true, consumer: consumer); + CachedString exchange = new CachedString(""); + CachedString routingKey = new CachedString(q.QueueName); + await _channel.BasicPublishAsync(exchange, routingKey, sendBody, mandatory: true); + await _channel.WaitForConfirmsOrDieAsync(); + + await consumerReceivedTcs.Task.WaitAsync(TimeSpan.FromSeconds(5)); + Assert.True(await consumerReceivedTcs.Task); + + await _channel.BasicCancelAsync(consumerTag); + await Task.Delay(500); + AssertActivityData(useRoutingKeyAsOperationName, queueName, _activities, true); + } + } + + [Theory] + [InlineData(true)] + [InlineData(false)] + public async Task TestPublisherWithPublicationAddressAndConsumerActivityTags(bool useRoutingKeyAsOperationName) + { + await _channel.ConfirmSelectAsync(); + + RabbitMQActivitySource.UseRoutingKeyAsOperationName = useRoutingKeyAsOperationName; + var _activities = new List(); + using (ActivityListener activityListener = StartActivityListener(_activities)) + { + await Task.Delay(500); + string queueName = $"{Guid.NewGuid()}"; + QueueDeclareOk q = await _channel.QueueDeclareAsync(queueName); + byte[] sendBody = Encoding.UTF8.GetBytes("hi"); + byte[] consumeBody = null; + var consumer = new EventingBasicConsumer(_channel); + var consumerReceivedTcs = + new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + consumer.Received += (o, a) => + { + consumeBody = a.Body.ToArray(); + consumerReceivedTcs.SetResult(true); + }; + + string consumerTag = await _channel.BasicConsumeAsync(queueName, autoAck: true, consumer: consumer); + PublicationAddress publicationAddress = new PublicationAddress(ExchangeType.Direct, "", q.QueueName); + await _channel.BasicPublishAsync(publicationAddress, new BasicProperties(), sendBody); + await _channel.WaitForConfirmsOrDieAsync(); + + await consumerReceivedTcs.Task.WaitAsync(TimeSpan.FromSeconds(5)); + Assert.True(await consumerReceivedTcs.Task); + + await _channel.BasicCancelAsync(consumerTag); + await Task.Delay(500); + AssertActivityData(useRoutingKeyAsOperationName, queueName, _activities, true); + } + } + [Theory] [InlineData(true)] [InlineData(false)] @@ -152,6 +232,87 @@ public async Task TestPublisherAndConsumerActivityTagsAsync(bool useRoutingKeyAs } } + [Theory] + [InlineData(true)] + [InlineData(false)] + public async Task TestPublisherWithCachedStringsAndConsumerActivityTagsAsync(bool useRoutingKeyAsOperationName) + { + await _channel.ConfirmSelectAsync(); + + RabbitMQActivitySource.UseRoutingKeyAsOperationName = useRoutingKeyAsOperationName; + var activities = new List(); + using (ActivityListener activityListener = StartActivityListener(activities)) + { + await Task.Delay(500); + + string queueName = $"{Guid.NewGuid()}"; + QueueDeclareOk q = await _channel.QueueDeclareAsync(queueName); + byte[] sendBody = Encoding.UTF8.GetBytes("hi"); + byte[] consumeBody = null; + var consumer = new EventingBasicConsumer(_channel); + var consumerReceivedTcs = + new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + consumer.Received += (o, a) => + { + consumeBody = a.Body.ToArray(); + consumerReceivedTcs.SetResult(true); + }; + + string consumerTag = await _channel.BasicConsumeAsync(queueName, autoAck: true, consumer: consumer); + CachedString exchange = new CachedString(""); + CachedString routingKey = new CachedString(q.QueueName); + await _channel.BasicPublishAsync(exchange, routingKey, sendBody, mandatory: true); + await _channel.WaitForConfirmsOrDieAsync(); + + await consumerReceivedTcs.Task.WaitAsync(TimeSpan.FromSeconds(5)); + Assert.True(await consumerReceivedTcs.Task); + + await _channel.BasicCancelAsync(consumerTag); + await Task.Delay(500); + AssertActivityData(useRoutingKeyAsOperationName, queueName, activities, true); + } + } + + [Theory] + [InlineData(true)] + [InlineData(false)] + public async Task TestPublisherWithPublicationAddressAndConsumerActivityTagsAsync(bool useRoutingKeyAsOperationName) + { + await _channel.ConfirmSelectAsync(); + + RabbitMQActivitySource.UseRoutingKeyAsOperationName = useRoutingKeyAsOperationName; + var activities = new List(); + using (ActivityListener activityListener = StartActivityListener(activities)) + { + await Task.Delay(500); + + string queueName = $"{Guid.NewGuid()}"; + QueueDeclareOk q = await _channel.QueueDeclareAsync(queueName); + byte[] sendBody = Encoding.UTF8.GetBytes("hi"); + byte[] consumeBody = null; + var consumer = new EventingBasicConsumer(_channel); + var consumerReceivedTcs = + new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + consumer.Received += (o, a) => + { + consumeBody = a.Body.ToArray(); + consumerReceivedTcs.SetResult(true); + }; + + string consumerTag = await _channel.BasicConsumeAsync(queueName, autoAck: true, consumer: consumer); + var publicationAddress = new PublicationAddress(ExchangeType.Direct, "", q.QueueName); + await _channel.BasicPublishAsync(publicationAddress, new BasicProperties(), sendBody); + await _channel.WaitForConfirmsOrDieAsync(); + + await consumerReceivedTcs.Task.WaitAsync(TimeSpan.FromSeconds(5)); + Assert.True(await consumerReceivedTcs.Task); + + await _channel.BasicCancelAsync(consumerTag); + await Task.Delay(500); + AssertActivityData(useRoutingKeyAsOperationName, queueName, activities, true); + } + } + [Theory] [InlineData(true)] [InlineData(false)] @@ -187,6 +348,80 @@ public async Task TestPublisherAndBasicGetActivityTags(bool useRoutingKeyAsOpera } } + [Theory] + [InlineData(true)] + [InlineData(false)] + public async Task TestPublisherWithCachedStringsAndBasicGetActivityTags(bool useRoutingKeyAsOperationName) + { + await _channel.ConfirmSelectAsync(); + RabbitMQActivitySource.UseRoutingKeyAsOperationName = useRoutingKeyAsOperationName; + var activities = new List(); + using (ActivityListener activityListener = StartActivityListener(activities)) + { + await Task.Delay(500); + string queue = $"queue-{Guid.NewGuid()}"; + const string msg = "for basic.get"; + + try + { + CachedString exchange = new CachedString(""); + CachedString routingKey = new CachedString(queue); + await _channel.QueueDeclareAsync(queue, false, false, false, null); + await _channel.BasicPublishAsync(exchange, routingKey, Encoding.UTF8.GetBytes(msg), mandatory: true); + await _channel.WaitForConfirmsOrDieAsync(); + QueueDeclareOk ok = await _channel.QueueDeclarePassiveAsync(queue); + Assert.Equal(1u, ok.MessageCount); + BasicGetResult res = await _channel.BasicGetAsync(queue, true); + Assert.Equal(msg, Encoding.UTF8.GetString(res.Body.ToArray())); + ok = await _channel.QueueDeclarePassiveAsync(queue); + Assert.Equal(0u, ok.MessageCount); + await Task.Delay(500); + AssertActivityData(useRoutingKeyAsOperationName, queue, activities, false); + } + finally + { + await _channel.QueueDeleteAsync(queue); + } + } + } + + [Theory] + [InlineData(true)] + [InlineData(false)] + public async Task TestPublisherWithPublicationAddressAndBasicGetActivityTags(bool useRoutingKeyAsOperationName) + { + await _channel.ConfirmSelectAsync(); + RabbitMQActivitySource.UseRoutingKeyAsOperationName = useRoutingKeyAsOperationName; + var activities = new List(); + using (ActivityListener activityListener = StartActivityListener(activities)) + { + await Task.Delay(500); + string queue = $"queue-{Guid.NewGuid()}"; + const string msg = "for basic.get"; + + try + { + var publicationAddress = new PublicationAddress(ExchangeType.Direct, "", queue); + await _channel.QueueDeclareAsync(queue, false, false, false, null); + await _channel.BasicPublishAsync(publicationAddress, new BasicProperties(), + Encoding.UTF8.GetBytes(msg)); + await _channel.WaitForConfirmsOrDieAsync(); + QueueDeclareOk ok = await _channel.QueueDeclarePassiveAsync(queue); + Assert.Equal(1u, ok.MessageCount); + BasicGetResult res = await _channel.BasicGetAsync(queue, true); + Assert.Equal(msg, Encoding.UTF8.GetString(res.Body.ToArray())); + ok = await _channel.QueueDeclarePassiveAsync(queue); + Assert.Equal(0u, ok.MessageCount); + await Task.Delay(500); + AssertActivityData(useRoutingKeyAsOperationName, queue, activities, false); + } + finally + { + await _channel.QueueDeleteAsync(queue); + } + } + } + private static ActivityListener StartActivityListener(List activities) { ActivityListener activityListener = new ActivityListener(); From ed4cf97558db094813e10719b6c3babda765273b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Stef=C3=A1n=20J=C3=B6kull=20Sigur=C3=B0arson?= Date: Tue, 2 Apr 2024 16:07:08 +0000 Subject: [PATCH 2/9] * Reverting breaking changes for existing API methods * Adding OTel tests for all publish methods --- .../RabbitMQ.Client/PublicAPI.Unshipped.txt | 2 +- .../client/api/IChannelExtensions.cs | 85 ++----------- .../client/impl/ChannelBase.cs | 71 ++++++++++- projects/Test/OTel/TestOpenTelemetry.cs | 115 ++++++++++++++++++ 4 files changed, 193 insertions(+), 80 deletions(-) diff --git a/projects/RabbitMQ.Client/PublicAPI.Unshipped.txt b/projects/RabbitMQ.Client/PublicAPI.Unshipped.txt index cb0c882910..c032e110ac 100644 --- a/projects/RabbitMQ.Client/PublicAPI.Unshipped.txt +++ b/projects/RabbitMQ.Client/PublicAPI.Unshipped.txt @@ -848,7 +848,7 @@ static RabbitMQ.Client.Events.CallbackExceptionEventArgs.Build(System.Exception static RabbitMQ.Client.ExchangeType.All() -> System.Collections.Generic.ICollection static RabbitMQ.Client.IChannelExtensions.BasicPublishAsync(this RabbitMQ.Client.IChannel channel, RabbitMQ.Client.CachedString exchange, RabbitMQ.Client.CachedString routingKey, System.ReadOnlyMemory body = default(System.ReadOnlyMemory), bool mandatory = false) -> System.Threading.Tasks.ValueTask static RabbitMQ.Client.IChannelExtensions.BasicPublishAsync(this RabbitMQ.Client.IChannel channel, string exchange, string routingKey, System.ReadOnlyMemory body = default(System.ReadOnlyMemory), bool mandatory = false) -> System.Threading.Tasks.ValueTask -static RabbitMQ.Client.IChannelExtensions.BasicPublishAsync(this RabbitMQ.Client.IChannel! channel, RabbitMQ.Client.PublicationAddress! addr, T basicProperties, System.ReadOnlyMemory body) -> System.Threading.Tasks.ValueTask +static RabbitMQ.Client.IChannelExtensions.BasicPublishAsync(this RabbitMQ.Client.IChannel! channel, RabbitMQ.Client.PublicationAddress! addr, in T basicProperties, System.ReadOnlyMemory body) -> System.Threading.Tasks.ValueTask static RabbitMQ.Client.PublicationAddress.Parse(string uriLikeString) -> RabbitMQ.Client.PublicationAddress static RabbitMQ.Client.PublicationAddress.TryParse(string uriLikeString, out RabbitMQ.Client.PublicationAddress result) -> bool static RabbitMQ.Client.QueueDeclareOk.implicit operator string(RabbitMQ.Client.QueueDeclareOk declareOk) -> string diff --git a/projects/RabbitMQ.Client/client/api/IChannelExtensions.cs b/projects/RabbitMQ.Client/client/api/IChannelExtensions.cs index 3c636f1b57..2791119dee 100644 --- a/projects/RabbitMQ.Client/client/api/IChannelExtensions.cs +++ b/projects/RabbitMQ.Client/client/api/IChannelExtensions.cs @@ -87,58 +87,20 @@ public static Task BasicConsumeAsync(this IChannel channel, string queue /// /// The publication occurs with mandatory=false and immediate=false. /// - public static async ValueTask BasicPublishAsync(this IChannel channel, PublicationAddress addr, T basicProperties, ReadOnlyMemory body) + public static ValueTask BasicPublishAsync(this IChannel channel, PublicationAddress addr, in T basicProperties, + ReadOnlyMemory body) where T : IReadOnlyBasicProperties, IAmqpHeader { - using Activity? sendActivity = RabbitMQActivitySource.PublisherHasListeners - ? RabbitMQActivitySource.Send(addr.RoutingKey, addr.ExchangeName, body.Length) - : default; - - if (sendActivity != null) - { - BasicProperties props = PopulateActivityAndPropagateTraceId(EmptyBasicProperty.Empty, sendActivity); - await channel.BasicPublishAsync(addr.ExchangeName, addr.RoutingKey, props, body); - } - else - { - await channel.BasicPublishAsync(addr.ExchangeName, addr.RoutingKey, basicProperties, body); - } - } - - public static async ValueTask BasicPublishAsync(this IChannel channel, string exchange, string routingKey, ReadOnlyMemory body = default, bool mandatory = false) - { - using Activity? sendActivity = RabbitMQActivitySource.PublisherHasListeners - ? RabbitMQActivitySource.Send(routingKey, exchange, body.Length) - : default; - - if (sendActivity != null) - { - BasicProperties props = PopulateActivityAndPropagateTraceId(EmptyBasicProperty.Empty, sendActivity); - await channel.BasicPublishAsync(exchange, routingKey, props, body, mandatory); - } - else - { - await channel.BasicPublishAsync(exchange, routingKey, EmptyBasicProperty.Empty, body, mandatory); - } + return channel.BasicPublishAsync(addr.ExchangeName, addr.RoutingKey, basicProperties, body); } - public static async ValueTask BasicPublishAsync(this IChannel channel, CachedString exchange, - CachedString routingKey, ReadOnlyMemory body = default, bool mandatory = false) - { - using Activity? sendActivity = RabbitMQActivitySource.PublisherHasListeners - ? RabbitMQActivitySource.Send(routingKey.Value, exchange.Value, body.Length) - : default; + public static ValueTask BasicPublishAsync(this IChannel channel, string exchange, string routingKey, + ReadOnlyMemory body = default, bool mandatory = false) => + channel.BasicPublishAsync(exchange, routingKey, EmptyBasicProperty.Empty, body, mandatory); - if (sendActivity != null) - { - BasicProperties props = PopulateActivityAndPropagateTraceId(EmptyBasicProperty.Empty, sendActivity); - await channel.BasicPublishAsync(exchange, routingKey, props, body, mandatory); - } - else - { - await channel.BasicPublishAsync(exchange, routingKey, EmptyBasicProperty.Empty, body, mandatory); - } - } + public static ValueTask BasicPublishAsync(this IChannel channel, CachedString exchange, + CachedString routingKey, ReadOnlyMemory body = default, bool mandatory = false) => + channel.BasicPublishAsync(exchange, routingKey, EmptyBasicProperty.Empty, body, mandatory); #nullable disable @@ -227,34 +189,5 @@ public static Task CloseAsync(this IChannel channel, ushort replyCode, string re { return channel.CloseAsync(replyCode, replyText, false); } - - private static BasicProperties PopulateActivityAndPropagateTraceId(TProperties basicProperties, - Activity sendActivity) where TProperties : IReadOnlyBasicProperties, IAmqpHeader - { - // This activity is marked as recorded, so let's propagate the trace and span ids. - if (sendActivity.IsAllDataRequested) - { - if (!string.IsNullOrEmpty(basicProperties.CorrelationId)) - { - sendActivity.SetTag(RabbitMQActivitySource.MessageConversationId, basicProperties.CorrelationId); - } - } - - BasicProperties props = default; - if (basicProperties is BasicProperties properties) - { - props = properties; - } - else if (basicProperties is EmptyBasicProperty) - { - props = new BasicProperties(); - } - - var headers = props.Headers ?? new Dictionary(); - // Inject the ActivityContext into the message headers to propagate trace context to the receiving service. - RabbitMQActivitySource.ContextInjector(sendActivity, headers); - props.Headers = headers; - return props; - } } } diff --git a/projects/RabbitMQ.Client/client/impl/ChannelBase.cs b/projects/RabbitMQ.Client/client/impl/ChannelBase.cs index 3382cd1afe..d571cc59b5 100644 --- a/projects/RabbitMQ.Client/client/impl/ChannelBase.cs +++ b/projects/RabbitMQ.Client/client/impl/ChannelBase.cs @@ -1036,7 +1036,19 @@ public async ValueTask BasicPublishAsync(string exchange, string ro try { var cmd = new BasicPublish(exchange, routingKey, mandatory, default); - await ModelSendAsync(in cmd, in basicProperties, body, CancellationToken.None); + using Activity sendActivity = RabbitMQActivitySource.PublisherHasListeners + ? RabbitMQActivitySource.Send(routingKey, exchange, body.Length) + : default; + + if (sendActivity != null) + { + BasicProperties props = PopulateActivityAndPropagateTraceId(basicProperties, sendActivity); + await ModelSendAsync(in cmd, in props, body, CancellationToken.None); + } + else + { + await ModelSendAsync(in cmd, in basicProperties, body, CancellationToken.None); + } } catch { @@ -1068,7 +1080,19 @@ public async void BasicPublish(CachedString exchange, CachedString try { var cmd = new BasicPublishMemory(exchange.Bytes, routingKey.Bytes, mandatory, default); - await ModelSendAsync(in cmd, in basicProperties, body, CancellationToken.None); + using Activity sendActivity = RabbitMQActivitySource.PublisherHasListeners + ? RabbitMQActivitySource.Send(routingKey.Value, exchange.Value, body.Length) + : default; + + if (sendActivity != null) + { + BasicProperties props = PopulateActivityAndPropagateTraceId(basicProperties, sendActivity); + await ModelSendAsync(in cmd, in props, body, CancellationToken.None); + } + else + { + await ModelSendAsync(in cmd, in basicProperties, body, CancellationToken.None); + } } catch { @@ -1100,7 +1124,19 @@ public async ValueTask BasicPublishAsync(CachedString exchange, Cac try { var cmd = new BasicPublishMemory(exchange.Bytes, routingKey.Bytes, mandatory, default); - await ModelSendAsync(in cmd, in basicProperties, body, CancellationToken.None); + using Activity sendActivity = RabbitMQActivitySource.PublisherHasListeners + ? RabbitMQActivitySource.Send(routingKey.Value, exchange.Value, body.Length) + : default; + + if (sendActivity != null) + { + BasicProperties props = PopulateActivityAndPropagateTraceId(basicProperties, sendActivity); + await ModelSendAsync(in cmd, in props, body, CancellationToken.None); + } + else + { + await ModelSendAsync(in cmd, in basicProperties, body, CancellationToken.None); + } } catch { @@ -1783,5 +1819,34 @@ await CloseAsync(ea, false) throw; } } + + private static BasicProperties PopulateActivityAndPropagateTraceId(TProperties basicProperties, + Activity sendActivity) where TProperties : IReadOnlyBasicProperties, IAmqpHeader + { + // This activity is marked as recorded, so let's propagate the trace and span ids. + if (sendActivity.IsAllDataRequested) + { + if (!string.IsNullOrEmpty(basicProperties.CorrelationId)) + { + sendActivity.SetTag(RabbitMQActivitySource.MessageConversationId, basicProperties.CorrelationId); + } + } + + BasicProperties props = default; + if (basicProperties is BasicProperties properties) + { + props = properties; + } + else if (basicProperties is EmptyBasicProperty) + { + props = new BasicProperties(); + } + + var headers = props.Headers ?? new Dictionary(); + // Inject the ActivityContext into the message headers to propagate trace context to the receiving service. + RabbitMQActivitySource.ContextInjector(sendActivity, headers); + props.Headers = headers; + return props; + } } } diff --git a/projects/Test/OTel/TestOpenTelemetry.cs b/projects/Test/OTel/TestOpenTelemetry.cs index 25c489dc85..7aea18d103 100644 --- a/projects/Test/OTel/TestOpenTelemetry.cs +++ b/projects/Test/OTel/TestOpenTelemetry.cs @@ -189,6 +189,121 @@ public async Task TestPublisherAndConsumerActivityTagsAsync(bool useRoutingKeyAs AssertActivityData(useRoutingKeyAsOperationName, queueName, exportedItems, true); } } + + [Theory] + [InlineData(true)] + [InlineData(false)] + public async Task TestPublisherWithPublicationAddressAndConsumerActivityTagsAsync(bool useRoutingKeyAsOperationName) + { + var exportedItems = new List(); + using (var tracer = Sdk.CreateTracerProviderBuilder() + .AddRabbitMQ(new RabbitMQOpenTelemetryConfiguration()) + .AddInMemoryExporter(exportedItems) + .Build()) + { + string baggageGuid = Guid.NewGuid().ToString(); + Baggage.SetBaggage("TestItem", baggageGuid); + Assert.Equal(baggageGuid, Baggage.GetBaggage("TestItem")); + await _channel.ConfirmSelectAsync(); + + RabbitMQActivitySource.UseRoutingKeyAsOperationName = useRoutingKeyAsOperationName; + await Task.Delay(500); + + string queueName = $"{Guid.NewGuid()}"; + QueueDeclareOk q = await _channel.QueueDeclareAsync(queueName); + byte[] sendBody = Encoding.UTF8.GetBytes("hi"); + byte[] consumeBody = null; + var consumer = new EventingBasicConsumer(_channel); + var consumerReceivedTcs = + new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + consumer.Received += (o, a) => + { + consumeBody = a.Body.ToArray(); + string baggageItem = Baggage.GetBaggage("TestItem"); + if (baggageItem == baggageGuid) + { + consumerReceivedTcs.SetResult(true); + } + else + { + consumerReceivedTcs.SetException( + EqualException.ForMismatchedStrings(baggageGuid, baggageItem, 0, 0)); + } + }; + + string consumerTag = await _channel.BasicConsumeAsync(queueName, autoAck: true, consumer: consumer); + var publicationAddress = new PublicationAddress(ExchangeType.Direct, "", queueName); + await _channel.BasicPublishAsync(publicationAddress, new BasicProperties(), sendBody); + await _channel.WaitForConfirmsOrDieAsync(); + Baggage.ClearBaggage(); + Assert.Null(Baggage.GetBaggage("TestItem")); + + await consumerReceivedTcs.Task.WaitAsync(TimeSpan.FromSeconds(5)); + Assert.True(await consumerReceivedTcs.Task); + + await _channel.BasicCancelAsync(consumerTag); + await Task.Delay(500); + AssertActivityData(useRoutingKeyAsOperationName, queueName, exportedItems, true); + } + } + + [Theory] + [InlineData(true)] + [InlineData(false)] + public async Task TestPublisherWithCachedStringsAndConsumerActivityTagsAsync(bool useRoutingKeyAsOperationName) + { + var exportedItems = new List(); + using (var tracer = Sdk.CreateTracerProviderBuilder() + .AddRabbitMQ(new RabbitMQOpenTelemetryConfiguration()) + .AddInMemoryExporter(exportedItems) + .Build()) + { + string baggageGuid = Guid.NewGuid().ToString(); + Baggage.SetBaggage("TestItem", baggageGuid); + Assert.Equal(baggageGuid, Baggage.GetBaggage("TestItem")); + await _channel.ConfirmSelectAsync(); + + RabbitMQActivitySource.UseRoutingKeyAsOperationName = useRoutingKeyAsOperationName; + await Task.Delay(500); + + string queueName = $"{Guid.NewGuid()}"; + QueueDeclareOk q = await _channel.QueueDeclareAsync(queueName); + byte[] sendBody = Encoding.UTF8.GetBytes("hi"); + byte[] consumeBody = null; + var consumer = new EventingBasicConsumer(_channel); + var consumerReceivedTcs = + new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + consumer.Received += (o, a) => + { + consumeBody = a.Body.ToArray(); + string baggageItem = Baggage.GetBaggage("TestItem"); + if (baggageItem == baggageGuid) + { + consumerReceivedTcs.SetResult(true); + } + else + { + consumerReceivedTcs.SetException( + EqualException.ForMismatchedStrings(baggageGuid, baggageItem, 0, 0)); + } + }; + + string consumerTag = await _channel.BasicConsumeAsync(queueName, autoAck: true, consumer: consumer); + CachedString exchange = new CachedString(""); + CachedString routingKey = new CachedString(queueName); + await _channel.BasicPublishAsync(exchange, routingKey, sendBody); + await _channel.WaitForConfirmsOrDieAsync(); + Baggage.ClearBaggage(); + Assert.Null(Baggage.GetBaggage("TestItem")); + + await consumerReceivedTcs.Task.WaitAsync(TimeSpan.FromSeconds(5)); + Assert.True(await consumerReceivedTcs.Task); + + await _channel.BasicCancelAsync(consumerTag); + await Task.Delay(500); + AssertActivityData(useRoutingKeyAsOperationName, queueName, exportedItems, true); + } + } [Theory] [InlineData(true)] From e2634d68130581cf7ee9fc66fa40ee33b50acd66 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Stef=C3=A1n=20J=C3=B6kull=20Sigur=C3=B0arson?= Date: Tue, 2 Apr 2024 16:08:01 +0000 Subject: [PATCH 3/9] Fixing API contract --- projects/RabbitMQ.Client/PublicAPI.Unshipped.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/projects/RabbitMQ.Client/PublicAPI.Unshipped.txt b/projects/RabbitMQ.Client/PublicAPI.Unshipped.txt index c032e110ac..330c0fb1a2 100644 --- a/projects/RabbitMQ.Client/PublicAPI.Unshipped.txt +++ b/projects/RabbitMQ.Client/PublicAPI.Unshipped.txt @@ -848,7 +848,7 @@ static RabbitMQ.Client.Events.CallbackExceptionEventArgs.Build(System.Exception static RabbitMQ.Client.ExchangeType.All() -> System.Collections.Generic.ICollection static RabbitMQ.Client.IChannelExtensions.BasicPublishAsync(this RabbitMQ.Client.IChannel channel, RabbitMQ.Client.CachedString exchange, RabbitMQ.Client.CachedString routingKey, System.ReadOnlyMemory body = default(System.ReadOnlyMemory), bool mandatory = false) -> System.Threading.Tasks.ValueTask static RabbitMQ.Client.IChannelExtensions.BasicPublishAsync(this RabbitMQ.Client.IChannel channel, string exchange, string routingKey, System.ReadOnlyMemory body = default(System.ReadOnlyMemory), bool mandatory = false) -> System.Threading.Tasks.ValueTask -static RabbitMQ.Client.IChannelExtensions.BasicPublishAsync(this RabbitMQ.Client.IChannel! channel, RabbitMQ.Client.PublicationAddress! addr, in T basicProperties, System.ReadOnlyMemory body) -> System.Threading.Tasks.ValueTask +static RabbitMQ.Client.IChannelExtensions.BasicPublishAsync(this RabbitMQ.Client.IChannel channel, RabbitMQ.Client.PublicationAddress addr, in T basicProperties, System.ReadOnlyMemory body) -> System.Threading.Tasks.ValueTask static RabbitMQ.Client.PublicationAddress.Parse(string uriLikeString) -> RabbitMQ.Client.PublicationAddress static RabbitMQ.Client.PublicationAddress.TryParse(string uriLikeString, out RabbitMQ.Client.PublicationAddress result) -> bool static RabbitMQ.Client.QueueDeclareOk.implicit operator string(RabbitMQ.Client.QueueDeclareOk declareOk) -> string From 83e088f98d37889c942ac88bfa44cbaf574d6409 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Stef=C3=A1n=20J=C3=B6kull=20Sigur=C3=B0arson?= Date: Tue, 2 Apr 2024 16:10:53 +0000 Subject: [PATCH 4/9] dotnet format --- projects/RabbitMQ.Client/client/impl/ChannelBase.cs | 2 +- projects/Test/OTel/TestOpenTelemetry.cs | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/projects/RabbitMQ.Client/client/impl/ChannelBase.cs b/projects/RabbitMQ.Client/client/impl/ChannelBase.cs index d571cc59b5..515b658f38 100644 --- a/projects/RabbitMQ.Client/client/impl/ChannelBase.cs +++ b/projects/RabbitMQ.Client/client/impl/ChannelBase.cs @@ -1819,7 +1819,7 @@ await CloseAsync(ea, false) throw; } } - + private static BasicProperties PopulateActivityAndPropagateTraceId(TProperties basicProperties, Activity sendActivity) where TProperties : IReadOnlyBasicProperties, IAmqpHeader { diff --git a/projects/Test/OTel/TestOpenTelemetry.cs b/projects/Test/OTel/TestOpenTelemetry.cs index 7aea18d103..2dd66374a0 100644 --- a/projects/Test/OTel/TestOpenTelemetry.cs +++ b/projects/Test/OTel/TestOpenTelemetry.cs @@ -189,7 +189,7 @@ public async Task TestPublisherAndConsumerActivityTagsAsync(bool useRoutingKeyAs AssertActivityData(useRoutingKeyAsOperationName, queueName, exportedItems, true); } } - + [Theory] [InlineData(true)] [InlineData(false)] @@ -246,7 +246,7 @@ public async Task TestPublisherWithPublicationAddressAndConsumerActivityTagsAsyn AssertActivityData(useRoutingKeyAsOperationName, queueName, exportedItems, true); } } - + [Theory] [InlineData(true)] [InlineData(false)] From 7c182411a7a7dcbb7a512cd78ee3d33443978daa Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Stef=C3=A1n=20J=C3=B6kull=20Sigur=C3=B0arson?= Date: Wed, 3 Apr 2024 09:02:53 +0000 Subject: [PATCH 5/9] * Removed the BaggagePropagation configuration value. * Use the DefaultTextMapPropagator and make it possible for uses to configure their own. * Updated tests to configure the propagator. * Updating README.md with basic instructions for use --- .../RabbitMQ.Client.OpenTelemetry/README.md | 55 ++++++++++++++++++- .../RabbitMQOpenTelemetryConfiguration.cs | 17 ++++-- .../TraceProviderBuilderExtensions.cs | 26 +++------ projects/Test/OTel/TestOpenTelemetry.cs | 15 +++-- 4 files changed, 85 insertions(+), 28 deletions(-) diff --git a/projects/RabbitMQ.Client.OpenTelemetry/README.md b/projects/RabbitMQ.Client.OpenTelemetry/README.md index dbcdcbe7bc..8c697bbdbc 100644 --- a/projects/RabbitMQ.Client.OpenTelemetry/README.md +++ b/projects/RabbitMQ.Client.OpenTelemetry/README.md @@ -1 +1,54 @@ -# RabbitMQ .NET Client - OAuth2 +# RabbitMQ .NET Client - OpenTelemetry Instrumentation + +## Introduction +This library makes it easy to instrument your RabbitMQ .NET Client applications with OpenTelemetry. + +## Examples +The following examples demonstrate how to use the RabbitMQ .NET Client OpenTelemetry Instrumentation. + +### Basic Usage + +#### ASP.NET Core Configuration Example +```csharp +using OpenTelemetry.Trace; + +// Configure the OpenTelemetry SDK to trace ASP.NET Core, the RabbitMQ .NET Client and export the traces to the console. +// Also configures context propagation to propagate the TraceContext and Baggage using the W3C specification. + +var compositeTextMapPropagator = new CompositeTextMapPropagator(new TextMapPropagator[] +{ + new TraceContextPropagator(), + new BaggagePropagator() +}); + +Sdk.SetDefaultTextMapPropagator(compositeTextMapPropagator); + +builder.Services.AddOpenTelemetry() + .ConfigureResource(resource => resource + .AddService(serviceName: builder.Environment.ApplicationName)) + .WithTracing(tracing => tracing + .AddAspNetCoreInstrumentation() + .AddRabbitMQInstrumentation() + .AddConsoleExporter()); +``` + +#### Console Application Configuration Example +```csharp +using OpenTelemetry.Trace; + +// Configure the OpenTelemetry SDK to trace ASP.NET Core, the RabbitMQ .NET Client and export the traces to the console. +// Also configures context propagation to propagate the TraceContext and Baggage using the W3C specification. + +var compositeTextMapPropagator = new CompositeTextMapPropagator(new TextMapPropagator[] +{ + new TraceContextPropagator(), + new BaggagePropagator() +}); + +Sdk.SetDefaultTextMapPropagator(compositeTextMapPropagator); + +var tracerProvider = Sdk.CreateTracerProviderBuilder() + .AddRabbitMQInstrumentation() + .AddConsoleExporter() + .Build(); +``` diff --git a/projects/RabbitMQ.Client.OpenTelemetry/RabbitMQOpenTelemetryConfiguration.cs b/projects/RabbitMQ.Client.OpenTelemetry/RabbitMQOpenTelemetryConfiguration.cs index 7748444f27..c3c1776c16 100644 --- a/projects/RabbitMQ.Client.OpenTelemetry/RabbitMQOpenTelemetryConfiguration.cs +++ b/projects/RabbitMQ.Client.OpenTelemetry/RabbitMQOpenTelemetryConfiguration.cs @@ -2,9 +2,18 @@ namespace RabbitMQ.Client.OpenTelemetry { public class RabbitMQOpenTelemetryConfiguration { - public bool PropagateBaggage { get; set; } = true; - public bool UseRoutingKeyAsOperationName { get; set; } = true; - public bool IncludePublishers { get; set; } = true; - public bool IncludeSubscribers { get; set; } = true; + public RabbitMQOpenTelemetryConfiguration(bool useRoutingKeyAsOperationName = true, + bool includePublishers = true, + bool includeSubscribers = true) + { + UseRoutingKeyAsOperationName = useRoutingKeyAsOperationName; + IncludePublishers = includePublishers; + IncludeSubscribers = includeSubscribers; + } + + public bool UseRoutingKeyAsOperationName { get; } + public bool IncludePublishers { get; } + public bool IncludeSubscribers { get; } + public static RabbitMQOpenTelemetryConfiguration Default { get; } = new RabbitMQOpenTelemetryConfiguration(); } } diff --git a/projects/RabbitMQ.Client.OpenTelemetry/TraceProviderBuilderExtensions.cs b/projects/RabbitMQ.Client.OpenTelemetry/TraceProviderBuilderExtensions.cs index 14e5c489dd..916bbbc950 100644 --- a/projects/RabbitMQ.Client.OpenTelemetry/TraceProviderBuilderExtensions.cs +++ b/projects/RabbitMQ.Client.OpenTelemetry/TraceProviderBuilderExtensions.cs @@ -11,23 +11,14 @@ namespace OpenTelemetry.Trace { public static class OpenTelemetryExtensions { - internal static TextMapPropagator s_propagator = Propagators.DefaultTextMapPropagator; - - public static TracerProviderBuilder AddRabbitMQ(this TracerProviderBuilder builder, - RabbitMQOpenTelemetryConfiguration configuration) + public static TracerProviderBuilder AddRabbitMQInstrumentation(this TracerProviderBuilder builder, + RabbitMQOpenTelemetryConfiguration configuration = null) { - if (configuration.PropagateBaggage) + if (configuration == null) { - s_propagator = new CompositeTextMapPropagator(new TextMapPropagator[] - { - new TraceContextPropagator(), new BaggagePropagator() - }); + configuration = RabbitMQOpenTelemetryConfiguration.Default; } - else - { - s_propagator = new TraceContextPropagator(); - } - + RabbitMQActivitySource.UseRoutingKeyAsOperationName = configuration.UseRoutingKeyAsOperationName; RabbitMQActivitySource.ContextExtractor = OpenTelemetryContextExtractor; RabbitMQActivitySource.ContextInjector = OpenTelemetryContextInjector; @@ -48,7 +39,7 @@ public static TracerProviderBuilder AddRabbitMQ(this TracerProviderBuilder build private static ActivityContext OpenTelemetryContextExtractor(IReadOnlyBasicProperties props) { // Extract the PropagationContext of the upstream parent from the message headers. - var parentContext = s_propagator.Extract(default, props.Headers, OpenTelemetryContextGetter); + var parentContext = Propagators.DefaultTextMapPropagator.Extract(default, props.Headers, OpenTelemetryContextGetter); Baggage.Current = parentContext.Baggage; return parentContext.ActivityContext; } @@ -57,9 +48,8 @@ private static IEnumerable OpenTelemetryContextGetter(IDictionary OpenTelemetryContextGetter(IDictionary props) { // Inject the current Activity's context into the message headers. - s_propagator.Inject(new PropagationContext(activity.Context, Baggage.Current), props, OpenTelemetryContextSetter); + Propagators.DefaultTextMapPropagator.Inject(new PropagationContext(activity.Context, Baggage.Current), props, OpenTelemetryContextSetter); } private static void OpenTelemetryContextSetter(IDictionary carrier, string key, string value) diff --git a/projects/Test/OTel/TestOpenTelemetry.cs b/projects/Test/OTel/TestOpenTelemetry.cs index 2dd66374a0..6a943a74f3 100644 --- a/projects/Test/OTel/TestOpenTelemetry.cs +++ b/projects/Test/OTel/TestOpenTelemetry.cs @@ -36,6 +36,7 @@ using System.Text; using System.Threading.Tasks; using OpenTelemetry; +using OpenTelemetry.Context.Propagation; using OpenTelemetry.Trace; using RabbitMQ.Client; using RabbitMQ.Client.Events; @@ -51,6 +52,10 @@ public class TestOpenTelemetry : SequentialIntegrationFixture { public TestOpenTelemetry(ITestOutputHelper output) : base(output) { + Sdk.SetDefaultTextMapPropagator(new CompositeTextMapPropagator(new TextMapPropagator[] + { + new TraceContextPropagator(), new BaggagePropagator() + })); } void AssertStringTagEquals(Activity activity, string name, string expected) @@ -86,7 +91,7 @@ public async Task TestPublisherAndConsumerActivityTags(bool useRoutingKeyAsOpera { var exportedItems = new List(); using (var tracer = Sdk.CreateTracerProviderBuilder() - .AddRabbitMQ(new RabbitMQOpenTelemetryConfiguration()) + .AddRabbitMQInstrumentation(new RabbitMQOpenTelemetryConfiguration()) .AddInMemoryExporter(exportedItems) .Build()) { @@ -141,7 +146,7 @@ public async Task TestPublisherAndConsumerActivityTagsAsync(bool useRoutingKeyAs { var exportedItems = new List(); using (var tracer = Sdk.CreateTracerProviderBuilder() - .AddRabbitMQ(new RabbitMQOpenTelemetryConfiguration()) + .AddRabbitMQInstrumentation(new RabbitMQOpenTelemetryConfiguration()) .AddInMemoryExporter(exportedItems) .Build()) { @@ -197,7 +202,7 @@ public async Task TestPublisherWithPublicationAddressAndConsumerActivityTagsAsyn { var exportedItems = new List(); using (var tracer = Sdk.CreateTracerProviderBuilder() - .AddRabbitMQ(new RabbitMQOpenTelemetryConfiguration()) + .AddRabbitMQInstrumentation(new RabbitMQOpenTelemetryConfiguration()) .AddInMemoryExporter(exportedItems) .Build()) { @@ -254,7 +259,7 @@ public async Task TestPublisherWithCachedStringsAndConsumerActivityTagsAsync(boo { var exportedItems = new List(); using (var tracer = Sdk.CreateTracerProviderBuilder() - .AddRabbitMQ(new RabbitMQOpenTelemetryConfiguration()) + .AddRabbitMQInstrumentation(new RabbitMQOpenTelemetryConfiguration()) .AddInMemoryExporter(exportedItems) .Build()) { @@ -312,7 +317,7 @@ public async Task TestPublisherAndBasicGetActivityTags(bool useRoutingKeyAsOpera { var exportedItems = new List(); using (var tracer = Sdk.CreateTracerProviderBuilder() - .AddRabbitMQ(new RabbitMQOpenTelemetryConfiguration()) + .AddRabbitMQInstrumentation(new RabbitMQOpenTelemetryConfiguration()) .AddInMemoryExporter(exportedItems) .Build()) { From d9a7ff6403257509d434bee0ca50e0787c3105b8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Stef=C3=A1n=20J=C3=B6kull=20Sigur=C3=B0arson?= Date: Wed, 3 Apr 2024 09:03:35 +0000 Subject: [PATCH 6/9] dotnet format --- .../TraceProviderBuilderExtensions.cs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/projects/RabbitMQ.Client.OpenTelemetry/TraceProviderBuilderExtensions.cs b/projects/RabbitMQ.Client.OpenTelemetry/TraceProviderBuilderExtensions.cs index 916bbbc950..5ec0e70dc1 100644 --- a/projects/RabbitMQ.Client.OpenTelemetry/TraceProviderBuilderExtensions.cs +++ b/projects/RabbitMQ.Client.OpenTelemetry/TraceProviderBuilderExtensions.cs @@ -18,7 +18,7 @@ public static TracerProviderBuilder AddRabbitMQInstrumentation(this TracerProvid { configuration = RabbitMQOpenTelemetryConfiguration.Default; } - + RabbitMQActivitySource.UseRoutingKeyAsOperationName = configuration.UseRoutingKeyAsOperationName; RabbitMQActivitySource.ContextExtractor = OpenTelemetryContextExtractor; RabbitMQActivitySource.ContextInjector = OpenTelemetryContextInjector; @@ -39,7 +39,7 @@ public static TracerProviderBuilder AddRabbitMQInstrumentation(this TracerProvid private static ActivityContext OpenTelemetryContextExtractor(IReadOnlyBasicProperties props) { // Extract the PropagationContext of the upstream parent from the message headers. - var parentContext = Propagators.DefaultTextMapPropagator.Extract(default, props.Headers, OpenTelemetryContextGetter); + var parentContext = Propagators.DefaultTextMapPropagator.Extract(default, props.Headers, OpenTelemetryContextGetter); Baggage.Current = parentContext.Baggage; return parentContext.ActivityContext; } From 50ad87131eb30b8ec96686a5d7b9f325f4e3bac1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Stef=C3=A1n=20J=C3=B6kull=20Sigur=C3=B0arson?= Date: Wed, 3 Apr 2024 09:12:06 +0000 Subject: [PATCH 7/9] Moved the OTel tests to the SequentialIntegration project to simplify project structure. --- RabbitMQDotNetClient.sln | 7 -- .../RabbitMQ.Client/RabbitMQ.Client.csproj | 1 - projects/Test/OTel/OTel.csproj | 32 -------- .../Test/OTel/SequentialIntegrationFixture.cs | 82 ------------------- .../SequentialIntegration.csproj | 3 +- .../TestOpenTelemetry.cs | 3 +- 6 files changed, 3 insertions(+), 125 deletions(-) delete mode 100644 projects/Test/OTel/OTel.csproj delete mode 100644 projects/Test/OTel/SequentialIntegrationFixture.cs rename projects/Test/{OTel => SequentialIntegration}/TestOpenTelemetry.cs (99%) diff --git a/RabbitMQDotNetClient.sln b/RabbitMQDotNetClient.sln index 88b7dc06ff..8974d4a9ce 100644 --- a/RabbitMQDotNetClient.sln +++ b/RabbitMQDotNetClient.sln @@ -42,8 +42,6 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "ToxiproxyNetCore", "project EndProject Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "RabbitMQ.Client.OpenTelemetry", "projects\RabbitMQ.Client.OpenTelemetry\RabbitMQ.Client.OpenTelemetry.csproj", "{16BF2086-AC7D-4EC3-8660-CC16E663ACB1}" EndProject -Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "OTel", "projects\Test\OTel\OTel.csproj", "{33E86EAF-C269-4336-8E5C-71418AE360A2}" -EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -98,10 +96,6 @@ Global {16BF2086-AC7D-4EC3-8660-CC16E663ACB1}.Debug|Any CPU.Build.0 = Debug|Any CPU {16BF2086-AC7D-4EC3-8660-CC16E663ACB1}.Release|Any CPU.ActiveCfg = Release|Any CPU {16BF2086-AC7D-4EC3-8660-CC16E663ACB1}.Release|Any CPU.Build.0 = Release|Any CPU - {33E86EAF-C269-4336-8E5C-71418AE360A2}.Debug|Any CPU.ActiveCfg = Debug|Any CPU - {33E86EAF-C269-4336-8E5C-71418AE360A2}.Debug|Any CPU.Build.0 = Debug|Any CPU - {33E86EAF-C269-4336-8E5C-71418AE360A2}.Release|Any CPU.ActiveCfg = Release|Any CPU - {33E86EAF-C269-4336-8E5C-71418AE360A2}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE @@ -116,7 +110,6 @@ Global {F25725D7-2978-45F4-B90F-25D6F8B71C9E} = {EFD4BED5-13A5-4D9C-AADF-CAB7E1573704} {C11F25F4-7EA1-4874-9E25-DEB42E3A7C67} = {EFD4BED5-13A5-4D9C-AADF-CAB7E1573704} {AB5B7C53-D7EC-4985-A6DE-70178E4B688A} = {EFD4BED5-13A5-4D9C-AADF-CAB7E1573704} - {33E86EAF-C269-4336-8E5C-71418AE360A2} = {EFD4BED5-13A5-4D9C-AADF-CAB7E1573704} EndGlobalSection GlobalSection(ExtensibilityGlobals) = postSolution SolutionGuid = {3C6A0C44-FA63-4101-BBF9-2598641167D1} diff --git a/projects/RabbitMQ.Client/RabbitMQ.Client.csproj b/projects/RabbitMQ.Client/RabbitMQ.Client.csproj index 817ceb40e3..56cceae989 100644 --- a/projects/RabbitMQ.Client/RabbitMQ.Client.csproj +++ b/projects/RabbitMQ.Client/RabbitMQ.Client.csproj @@ -55,7 +55,6 @@ - diff --git a/projects/Test/OTel/OTel.csproj b/projects/Test/OTel/OTel.csproj deleted file mode 100644 index 59450b21ef..0000000000 --- a/projects/Test/OTel/OTel.csproj +++ /dev/null @@ -1,32 +0,0 @@ - - - - net6.0;net472 - - - - net6.0 - - - - ../../rabbit.snk - true - true - 7.3 - OpenTelemetry.Tests - - - - - - - - - - - - - - - - diff --git a/projects/Test/OTel/SequentialIntegrationFixture.cs b/projects/Test/OTel/SequentialIntegrationFixture.cs deleted file mode 100644 index 92fd4daac6..0000000000 --- a/projects/Test/OTel/SequentialIntegrationFixture.cs +++ /dev/null @@ -1,82 +0,0 @@ -// This source code is dual-licensed under the Apache License, version -// 2.0, and the Mozilla Public License, version 2.0. -// -// The APL v2.0: -// -//--------------------------------------------------------------------------- -// Copyright (c) 2007-2020 VMware, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// https://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. -//--------------------------------------------------------------------------- -// -// The MPL v2.0: -// -//--------------------------------------------------------------------------- -// This Source Code Form is subject to the terms of the Mozilla Public -// License, v. 2.0. If a copy of the MPL was not distributed with this -// file, You can obtain one at https://mozilla.org/MPL/2.0/. -// -// Copyright (c) 2007-2020 VMware, Inc. All rights reserved. -//--------------------------------------------------------------------------- - -using System; -using System.Threading.Tasks; -using RabbitMQ.Client; -using Xunit.Abstractions; - -namespace Test.OpenTelemetry.SequentialIntegration -{ - public class SequentialIntegrationFixture : TestConnectionRecoveryBase - { - public SequentialIntegrationFixture(ITestOutputHelper output) : base(output) - { - } - - public async Task RestartRabbitMqAsync() - { - await StopRabbitMqAsync(); - await Task.Delay(TimeSpan.FromSeconds(1)); - await StartRabbitMqAsync(); - await AwaitRabbitMqAsync(); - } - - public Task StopRabbitMqAsync() - { - return _rabbitMQCtl.ExecRabbitMQCtlAsync("stop_app"); - } - - public Task StartRabbitMqAsync() - { - return _rabbitMQCtl.ExecRabbitMQCtlAsync("start_app"); - } - - public Task RestartServerAndWaitForRecoveryAsync() - { - return RestartServerAndWaitForRecoveryAsync(_conn); - } - - public async Task RestartServerAndWaitForRecoveryAsync(IConnection conn) - { - TaskCompletionSource sl = PrepareForShutdown(conn); - TaskCompletionSource rl = PrepareForRecovery(conn); - await RestartRabbitMqAsync(); - await WaitAsync(sl, "connection shutdown"); - await WaitAsync(rl, "connection recovery"); - } - - private Task AwaitRabbitMqAsync() - { - return _rabbitMQCtl.ExecRabbitMQCtlAsync("await_startup"); - } - } -} diff --git a/projects/Test/SequentialIntegration/SequentialIntegration.csproj b/projects/Test/SequentialIntegration/SequentialIntegration.csproj index cea571fc19..28b5a1450f 100644 --- a/projects/Test/SequentialIntegration/SequentialIntegration.csproj +++ b/projects/Test/SequentialIntegration/SequentialIntegration.csproj @@ -16,7 +16,7 @@ - + @@ -36,6 +36,7 @@ + diff --git a/projects/Test/OTel/TestOpenTelemetry.cs b/projects/Test/SequentialIntegration/TestOpenTelemetry.cs similarity index 99% rename from projects/Test/OTel/TestOpenTelemetry.cs rename to projects/Test/SequentialIntegration/TestOpenTelemetry.cs index 6a943a74f3..c1a1a8fc9b 100644 --- a/projects/Test/OTel/TestOpenTelemetry.cs +++ b/projects/Test/SequentialIntegration/TestOpenTelemetry.cs @@ -41,12 +41,11 @@ using RabbitMQ.Client; using RabbitMQ.Client.Events; using RabbitMQ.Client.OpenTelemetry; -using Test.OpenTelemetry.SequentialIntegration; using Xunit; using Xunit.Abstractions; using Xunit.Sdk; -namespace Test.OpenTelemetry +namespace Test.SequentialIntegration { public class TestOpenTelemetry : SequentialIntegrationFixture { From 4da85c89977866ed8860b8df6f6fbf9e9388d1e5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Stef=C3=A1n=20J=C3=B6kull=20Sigur=C3=B0arson?= Date: Thu, 4 Apr 2024 10:15:52 +0000 Subject: [PATCH 8/9] Simplifying configuration --- .../RabbitMQOpenTelemetryConfiguration.cs | 19 --------------- .../TraceProviderBuilderExtensions.cs | 23 +++---------------- .../TestOpenTelemetry.cs | 11 ++++----- 3 files changed, 8 insertions(+), 45 deletions(-) delete mode 100644 projects/RabbitMQ.Client.OpenTelemetry/RabbitMQOpenTelemetryConfiguration.cs diff --git a/projects/RabbitMQ.Client.OpenTelemetry/RabbitMQOpenTelemetryConfiguration.cs b/projects/RabbitMQ.Client.OpenTelemetry/RabbitMQOpenTelemetryConfiguration.cs deleted file mode 100644 index c3c1776c16..0000000000 --- a/projects/RabbitMQ.Client.OpenTelemetry/RabbitMQOpenTelemetryConfiguration.cs +++ /dev/null @@ -1,19 +0,0 @@ -namespace RabbitMQ.Client.OpenTelemetry -{ - public class RabbitMQOpenTelemetryConfiguration - { - public RabbitMQOpenTelemetryConfiguration(bool useRoutingKeyAsOperationName = true, - bool includePublishers = true, - bool includeSubscribers = true) - { - UseRoutingKeyAsOperationName = useRoutingKeyAsOperationName; - IncludePublishers = includePublishers; - IncludeSubscribers = includeSubscribers; - } - - public bool UseRoutingKeyAsOperationName { get; } - public bool IncludePublishers { get; } - public bool IncludeSubscribers { get; } - public static RabbitMQOpenTelemetryConfiguration Default { get; } = new RabbitMQOpenTelemetryConfiguration(); - } -} diff --git a/projects/RabbitMQ.Client.OpenTelemetry/TraceProviderBuilderExtensions.cs b/projects/RabbitMQ.Client.OpenTelemetry/TraceProviderBuilderExtensions.cs index 5ec0e70dc1..0cbefbba10 100644 --- a/projects/RabbitMQ.Client.OpenTelemetry/TraceProviderBuilderExtensions.cs +++ b/projects/RabbitMQ.Client.OpenTelemetry/TraceProviderBuilderExtensions.cs @@ -5,34 +5,17 @@ using System.Text; using OpenTelemetry.Context.Propagation; using RabbitMQ.Client; -using RabbitMQ.Client.OpenTelemetry; namespace OpenTelemetry.Trace { public static class OpenTelemetryExtensions { - public static TracerProviderBuilder AddRabbitMQInstrumentation(this TracerProviderBuilder builder, - RabbitMQOpenTelemetryConfiguration configuration = null) + public static TracerProviderBuilder AddRabbitMQInstrumentation(this TracerProviderBuilder builder) { - if (configuration == null) - { - configuration = RabbitMQOpenTelemetryConfiguration.Default; - } - - RabbitMQActivitySource.UseRoutingKeyAsOperationName = configuration.UseRoutingKeyAsOperationName; + RabbitMQActivitySource.UseRoutingKeyAsOperationName = true; RabbitMQActivitySource.ContextExtractor = OpenTelemetryContextExtractor; RabbitMQActivitySource.ContextInjector = OpenTelemetryContextInjector; - - if (configuration.IncludeSubscribers) - { - builder.AddSource(RabbitMQActivitySource.SubscriberSourceName); - } - - if (configuration.IncludePublishers) - { - builder.AddSource(RabbitMQActivitySource.PublisherSourceName); - } - + builder.AddSource("RabbitMQ.Client.*"); return builder; } diff --git a/projects/Test/SequentialIntegration/TestOpenTelemetry.cs b/projects/Test/SequentialIntegration/TestOpenTelemetry.cs index c1a1a8fc9b..e113b31ba5 100644 --- a/projects/Test/SequentialIntegration/TestOpenTelemetry.cs +++ b/projects/Test/SequentialIntegration/TestOpenTelemetry.cs @@ -40,7 +40,6 @@ using OpenTelemetry.Trace; using RabbitMQ.Client; using RabbitMQ.Client.Events; -using RabbitMQ.Client.OpenTelemetry; using Xunit; using Xunit.Abstractions; using Xunit.Sdk; @@ -90,7 +89,7 @@ public async Task TestPublisherAndConsumerActivityTags(bool useRoutingKeyAsOpera { var exportedItems = new List(); using (var tracer = Sdk.CreateTracerProviderBuilder() - .AddRabbitMQInstrumentation(new RabbitMQOpenTelemetryConfiguration()) + .AddRabbitMQInstrumentation() .AddInMemoryExporter(exportedItems) .Build()) { @@ -145,7 +144,7 @@ public async Task TestPublisherAndConsumerActivityTagsAsync(bool useRoutingKeyAs { var exportedItems = new List(); using (var tracer = Sdk.CreateTracerProviderBuilder() - .AddRabbitMQInstrumentation(new RabbitMQOpenTelemetryConfiguration()) + .AddRabbitMQInstrumentation() .AddInMemoryExporter(exportedItems) .Build()) { @@ -201,7 +200,7 @@ public async Task TestPublisherWithPublicationAddressAndConsumerActivityTagsAsyn { var exportedItems = new List(); using (var tracer = Sdk.CreateTracerProviderBuilder() - .AddRabbitMQInstrumentation(new RabbitMQOpenTelemetryConfiguration()) + .AddRabbitMQInstrumentation() .AddInMemoryExporter(exportedItems) .Build()) { @@ -258,7 +257,7 @@ public async Task TestPublisherWithCachedStringsAndConsumerActivityTagsAsync(boo { var exportedItems = new List(); using (var tracer = Sdk.CreateTracerProviderBuilder() - .AddRabbitMQInstrumentation(new RabbitMQOpenTelemetryConfiguration()) + .AddRabbitMQInstrumentation() .AddInMemoryExporter(exportedItems) .Build()) { @@ -316,7 +315,7 @@ public async Task TestPublisherAndBasicGetActivityTags(bool useRoutingKeyAsOpera { var exportedItems = new List(); using (var tracer = Sdk.CreateTracerProviderBuilder() - .AddRabbitMQInstrumentation(new RabbitMQOpenTelemetryConfiguration()) + .AddRabbitMQInstrumentation() .AddInMemoryExporter(exportedItems) .Build()) { From 7b937ab4eb6790cc86ff36548cf87d9dcc2532b9 Mon Sep 17 00:00:00 2001 From: Luke Bakken Date: Thu, 16 May 2024 08:18:20 -0700 Subject: [PATCH 9/9] Update package description --- .../RabbitMQ.Client.OpenTelemetry.csproj | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/projects/RabbitMQ.Client.OpenTelemetry/RabbitMQ.Client.OpenTelemetry.csproj b/projects/RabbitMQ.Client.OpenTelemetry/RabbitMQ.Client.OpenTelemetry.csproj index 834c361c18..f3f0c07150 100644 --- a/projects/RabbitMQ.Client.OpenTelemetry/RabbitMQ.Client.OpenTelemetry.csproj +++ b/projects/RabbitMQ.Client.OpenTelemetry/RabbitMQ.Client.OpenTelemetry.csproj @@ -8,7 +8,7 @@ VMware VMware, Inc. or its affiliates. Copyright © 2007-2023 VMware, Inc. or its affiliates. - The RabbitMQ OAuth2 Client Library for .NET enables OAuth2 token refresh for RabbitMQ.Client + The RabbitMQ OpenTelemetry Library for .NET adds convenience extension methods for RabbitMQ/OpenTelemetry true icon.png Apache-2.0 OR MPL-2.0