Skip to content

Commit

Permalink
Added subject transforms to stream source config (#134)
Browse files Browse the repository at this point in the history
* Added subject transforms to stream source config

* Fixed nats-cli download for tests

* binaries.nats.dev isn't working for main

* binaries.nats.dev should be working for main

* Add release/v2.9.23 to test matrix

* Add latest to test matrix

* Fixed potential test flapper

* Chasing test flapper

* Chasing test flapper
  • Loading branch information
mtmk authored Sep 27, 2023
1 parent 6b490ea commit 50f2221
Show file tree
Hide file tree
Showing 6 changed files with 48 additions and 12 deletions.
7 changes: 5 additions & 2 deletions .github/workflows/perf.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ jobs:
fail-fast: false
matrix:
config:
- branch: dev
- branch: release/v2.9.23
- branch: latest
- branch: main
runs-on: ubuntu-latest
env:
Expand All @@ -23,7 +24,9 @@ jobs:
steps:
- name: Install nats
run: |
rel=$(curl -s https://api.github.com/repos/nats-io/natscli/releases/latest | jq -r .tag_name | sed s/v//)
# latest 0.1.1 doesn't have binaries
# rel=$(curl -s https://api.github.com/repos/nats-io/natscli/releases/latest | jq -r .tag_name | sed s/v//)
rel=0.1.0
wget https://github.com/nats-io/natscli/releases/download/v$rel/nats-$rel-linux-amd64.zip
unzip nats-$rel-linux-amd64.zip
sudo mv nats-$rel-linux-amd64/nats /usr/local/bin
Expand Down
6 changes: 4 additions & 2 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ jobs:
fail-fast: false
matrix:
config:
- branch: dev
- branch: release/v2.9.23
- branch: latest
- branch: main
runs-on: ubuntu-latest
env:
Expand Down Expand Up @@ -55,7 +56,8 @@ jobs:
fail-fast: false
matrix:
config:
- branch: dev
- branch: release/v2.9.23
- branch: latest
- branch: main
runs-on: windows-latest
env:
Expand Down
6 changes: 3 additions & 3 deletions src/NATS.Client.JetStream/Models/StreamSource.cs
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,11 @@ public record StreamSource
public string FilterSubject { get; set; } = default!;

/// <summary>
/// Map matching subjects according to this transform destination
/// Subject transforms to apply to matching messages
/// </summary>
[System.Text.Json.Serialization.JsonPropertyName("subject_transform_dest")]
[System.Text.Json.Serialization.JsonPropertyName("subject_transforms")]
[System.Text.Json.Serialization.JsonIgnore(Condition = System.Text.Json.Serialization.JsonIgnoreCondition.WhenWritingDefault)]
public string SubjectTransformDest { get; set; } = default!;
public System.Collections.Generic.ICollection<SubjectTransform> SubjectTransforms { get; set; } = default!;

[System.Text.Json.Serialization.JsonPropertyName("external")]
[System.Text.Json.Serialization.JsonIgnore(Condition = System.Text.Json.Serialization.JsonIgnoreCondition.WhenWritingDefault)]
Expand Down
4 changes: 3 additions & 1 deletion tests/NATS.Client.Core.Tests/RequestReplyTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@ public class RequestReplyTest
[Fact]
public async Task Simple_request_reply_test()
{
await using var server = NatsServer.Start();
// Trace to hunt flapper!
await using var server = NatsServer.StartWithTrace(_output);

await using var nats = server.CreateClientConnection();

var sub = await nats.SubscribeAsync<int>("foo");
Expand Down
21 changes: 18 additions & 3 deletions tests/NATS.Client.JetStream.Tests/ConsumerConsumeTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ await Retry.Until(
public async Task Consume_idle_heartbeat_test()
{
var cts = new CancellationTokenSource(TimeSpan.FromSeconds(30));
await using var server = NatsServer.StartJS();
await using var server = NatsServer.StartJSWithTrace(_output);

var (nats, proxy) = server.CreateProxiedClientConnection();

Expand Down Expand Up @@ -120,14 +120,29 @@ public async Task Consume_idle_heartbeat_test()

await Retry.Until(
"all pull requests are received",
() => proxy.ClientFrames.Count(f => f.Message.StartsWith("PUB $JS.API.CONSUMER.MSG.NEXT.s1.c1")) == 2);
() => proxy.ClientFrames.Count(f => f.Message.StartsWith("PUB $JS.API.CONSUMER.MSG.NEXT.s1.c1")) >= 2);

var msgNextRequests = proxy
.ClientFrames
.Where(f => f.Message.StartsWith("PUB $JS.API.CONSUMER.MSG.NEXT.s1.c1"))
.ToList();

Assert.Equal(2, msgNextRequests.Count);
// In some cases we are receiving more than two requests which
// is possible if the tests are running in a slow container and taking
// more than the timeout? Looking at the test and the code I can't make
// sense of it, really, but I'm going to assume it's fine to receive 3 pull
// requests as well as 2 since test failure reported 3 and failed once.
if (msgNextRequests.Count > 2)
{
_output.WriteLine($"Pull request count more than expected: {msgNextRequests.Count}");
foreach (var frame in msgNextRequests)
{
_output.WriteLine($"PULL REQUEST: {frame}");
}
}

// Still fail and check traces if it happens again
Assert.True(msgNextRequests.Count is 2);

// Pull requests
foreach (var frame in msgNextRequests)
Expand Down
16 changes: 15 additions & 1 deletion tests/NATS.Client.TestUtilities/NatsServer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,14 @@ public int ConnectionPort

public static NatsServer StartJS() => StartJS(new NullOutputHelper(), TransportType.Tcp);

public static NatsServer StartJSWithTrace(ITestOutputHelper outputHelper) => Start(
outputHelper: outputHelper,
opts: new NatsServerOptsBuilder()
.UseTransport(TransportType.Tcp)
.Trace()
.UseJetStream()
.Build());

public static NatsServer StartJS(ITestOutputHelper outputHelper, TransportType transportType) => Start(
outputHelper: outputHelper,
opts: new NatsServerOptsBuilder()
Expand All @@ -151,7 +159,13 @@ public static NatsServer StartJS(ITestOutputHelper outputHelper, TransportType t

public static NatsServer Start() => Start(new NullOutputHelper(), TransportType.Tcp);

public static NatsServer Start(ITestOutputHelper outputHelper) => Start(outputHelper, TransportType.Tcp);
public static NatsServer StartWithTrace(ITestOutputHelper outputHelper)
=> Start(
outputHelper,
new NatsServerOptsBuilder()
.Trace()
.UseTransport(TransportType.Tcp)
.Build());

public static NatsServer Start(ITestOutputHelper outputHelper, TransportType transportType) =>
Start(outputHelper, new NatsServerOptsBuilder().UseTransport(transportType).Build());
Expand Down

0 comments on commit 50f2221

Please sign in to comment.