Skip to content

Commit

Permalink
Support for AMQP protocol over WebSockets
Browse files Browse the repository at this point in the history
  • Loading branch information
Havret committed Aug 27, 2023
1 parent 6b4a2ac commit 0b0f175
Show file tree
Hide file tree
Showing 10 changed files with 94 additions and 13 deletions.
1 change: 1 addition & 0 deletions .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ jobs:
ARTEMIS_PASSWORD: "artemis"
ARTEMIS_HOST: "localhost"
ARTEMIS_PORT: 5672
ARTEMIS_WS_PORT: 80
steps:
- uses: actions/checkout@v1
- name: Setup .NET Core
Expand Down
3 changes: 2 additions & 1 deletion src/ArtemisNetClient/ArtemisNetClient.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@
<PackageReference Include="Microsoft.SourceLink.GitHub" Version="1.0.0" PrivateAssets="All" />
</ItemGroup>
<ItemGroup>
<PackageReference Include="AMQPNetLite" Version="2.4.5" />
<PackageReference Include="AMQPNetLite" Version="2.4.6" />
<PackageReference Include="AMQPNetLite.WebSockets" Version="2.4.6" />
<PackageReference Include="Microsoft.Extensions.Logging.Abstractions" Version="2.0.0" />
<PackageReference Include="Nito.AsyncEx.Coordination" Version="5.1.0" />
<PackageReference Include="Polly" Version="7.2.1" />
Expand Down
5 changes: 4 additions & 1 deletion src/ArtemisNetClient/Builders/ConnectionBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,10 @@ public async Task<IConnection> CreateAsync(Endpoint endpoint, CancellationToken
cancellationToken.ThrowIfCancellationRequested();
using var _ = cancellationToken.Register(() => _tcs.TrySetCanceled());

var connectionFactory = new Amqp.ConnectionFactory();
var connectionFactory = endpoint.Scheme is Scheme.Ws or Scheme.Wss
? new Amqp.ConnectionFactory(new[] { new WebSocketTransportFactory() })
: new Amqp.ConnectionFactory();

try
{
var open = GetOpenFrame(endpoint);
Expand Down
16 changes: 13 additions & 3 deletions src/ArtemisNetClient/Configuration/Endpoint.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ public sealed class Endpoint
{
private const string Amqp = "AMQP";
private const string Amqps = "AMQPS";
private const string Ws = "WS";
private const string Wss = "WSS";

private Endpoint(Address address)
{
Expand Down Expand Up @@ -41,13 +43,18 @@ private Endpoint(Address address)
/// </summary>
public string Password => Address.Password;

public static Endpoint Create(string host, int port, string user = null, string password = null, Scheme scheme = Scheme.Amqp)
/// <summary>
/// Gets the path of the endpoint.
/// </summary>
public string Path => Address.Path;

public static Endpoint Create(string host, int port, string user = null, string password = null, Scheme scheme = Scheme.Amqp, string path = "/")
{
var protocolScheme = GetScheme(scheme);

try
{
return new Endpoint(new Address(host, port, user, password, "/", protocolScheme))
return new Endpoint(new Address(host, port, user, password, path, protocolScheme))
{
Scheme = scheme
};
Expand All @@ -68,13 +75,16 @@ private static string GetScheme(Scheme scheme)
{
Scheme.Amqp => Amqp,
Scheme.Amqps => Amqps,
Scheme.Ws => Ws,
Scheme.Wss => Wss,
_ => throw new CreateEndpointException($"Protocol scheme {scheme.ToString()} is invalid.", ErrorCode.InvalidField)
};
}

/// <inheritdoc />
public override string ToString()
{
return $@"{Scheme.ToString().ToLower()}://{Host}:{Port.ToString()}";
return $@"{Scheme.ToString().ToLower()}://{Host}:{Port.ToString()}{Path}";
}
}
}
22 changes: 21 additions & 1 deletion src/ArtemisNetClient/Configuration/Scheme.cs
Original file line number Diff line number Diff line change
@@ -1,8 +1,28 @@
namespace ActiveMQ.Artemis.Client
{
/// <summary>
/// Represents the protocol schemes used for AMQP connections.
/// </summary>
public enum Scheme
{
/// <summary>
/// Represents the standard AMQP protocol without security.
/// </summary>
Amqp,
Amqps

/// <summary>
/// Represents the standard AMQP protocol secured with SSL/TLS.
/// </summary>
Amqps,

/// <summary>
/// Represents the AMQP protocol over WebSocket without security.
/// </summary>
Ws,

/// <summary>
/// Represents the AMQP protocol over WebSocket secured with SSL/TLS.
/// </summary>
Wss
}
}
38 changes: 38 additions & 0 deletions test/ArtemisNetClient.IntegrationTests/WebSocketsSpec.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
using System;
using System.Threading.Tasks;
using Xunit;
using Xunit.Abstractions;

namespace ActiveMQ.Artemis.Client.IntegrationTests;

public class WebSocketsSpec : ActiveMQNetIntegrationSpec
{
public WebSocketsSpec(ITestOutputHelper output) : base(output)
{
}

[Fact]
public async Task Should_send_and_receive_message_with_web_socket_endpoint()
{
string userName = Environment.GetEnvironmentVariable("ARTEMIS_USERNAME") ?? "artemis";
string password = Environment.GetEnvironmentVariable("ARTEMIS_PASSWORD") ?? "artemis";
string host = Environment.GetEnvironmentVariable("ARTEMIS_HOST") ?? "localhost";
int port = int.Parse(Environment.GetEnvironmentVariable("ARTEMIS_WS_PORT") ?? "80");

var endpoint = Endpoint.Create(host: host, port: port, user: userName, password: password, Scheme.Ws);

var address = Guid.NewGuid().ToString();

var connectionFactory = new ConnectionFactory();
await using var connection = await connectionFactory.CreateAsync(endpoint, CancellationToken);
await using var consumer = await connection.CreateConsumerAsync(address, RoutingType.Anycast);
await using var producer = await connection.CreateProducerAsync(address, RoutingType.Anycast);

await producer.SendAsync(new Message("msg"));

var msg = await consumer.ReceiveAsync();
await consumer.AcceptAsync(msg);

Assert.Equal("msg", msg.GetBody<string>());
}
}
15 changes: 9 additions & 6 deletions test/ArtemisNetClient.UnitTests/EndpointSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -46,14 +46,16 @@ public static IEnumerable<object[]> SchemaData()
return new[]
{
new object[] { Scheme.Amqp },
new object[] { Scheme.Amqps }
new object[] { Scheme.Amqps },
new object[] { Scheme.Ws },
new object[] { Scheme.Wss },
};
}

[Fact]
public void Throws_when_invalid_scheme_specified()
{
var exception = Assert.Throws<CreateEndpointException>(() => Endpoint.Create("localhost", 5672, "guest", "guest", (Scheme) 999));
var exception = Assert.Throws<CreateEndpointException>(() => Endpoint.Create("localhost", 5672, "guest", "guest", scheme: (Scheme) 999));
Assert.Equal(ErrorCode.InvalidField, exception.ErrorCode);
}

Expand All @@ -67,10 +69,11 @@ public static IEnumerable<object[]> EndpointData()
{
return new[]
{
new object[] { Endpoint.Create("localhost", 5762), "amqp://localhost:5762" },
new object[] { Endpoint.Create("localhost", 5762, scheme: Scheme.Amqps), "amqps://localhost:5762" },
new object[] { Endpoint.Create("localhost", 5762, "admin", password: "secret"), "amqp://localhost:5762" },
new object[] { Endpoint.Create("localhost", 5762, "admin", password: "secret", Scheme.Amqps), "amqps://localhost:5762" }
new object[] { Endpoint.Create("localhost", 5762), "amqp://localhost:5762/" },
new object[] { Endpoint.Create("localhost", 5762, scheme: Scheme.Amqps), "amqps://localhost:5762/" },
new object[] { Endpoint.Create("localhost", 5762, "admin", password: "secret"), "amqp://localhost:5762/" },
new object[] { Endpoint.Create("localhost", 5762, "admin", password: "secret", scheme: Scheme.Amqps), "amqps://localhost:5762/" },
new object[] { Endpoint.Create("localhost", 80, "admin", password: "secret", scheme: Scheme.Wss, path: "/redirectMeToBrokerA"), "wss://localhost:80/redirectMeToBrokerA" },
};
}
}
Expand Down
4 changes: 3 additions & 1 deletion test/artemis/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ EXPOSE 8161 \
# Port for MQTT
1883 \
#Port for STOMP
61613
61613 \
# Port for WS
80

ENTRYPOINT ["/artemis/amq/bin/artemis", "run"]
2 changes: 2 additions & 0 deletions test/artemis/broker.xml
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,8 @@ under the License.
<!-- MQTT Acceptor -->
<acceptor name="mqtt">tcp://0.0.0.0:1883?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=MQTT;useEpoll=true</acceptor>

<acceptor name="amqp-ws-acceptor">tcp://0.0.0.0:80?protocols=AMQP</acceptor>

</acceptors>


Expand Down
1 change: 1 addition & 0 deletions test/artemis/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,5 +6,6 @@ services:
ports:
- 5672:5672
- 8161:8161
- 80:80
volumes:
- ./broker.xml:/artemis/amq/etc/broker.xml

0 comments on commit 0b0f175

Please sign in to comment.