diff --git a/Streamistry.Core/Fluent/ParserBuilder.cs b/Streamistry.Core/Fluent/ParserBuilder.cs index 0f2b992..a3838ac 100644 --- a/Streamistry.Core/Fluent/ParserBuilder.cs +++ b/Streamistry.Core/Fluent/ParserBuilder.cs @@ -7,6 +7,7 @@ using Streamistry.Pipes.Parsers; namespace Streamistry.Fluent; + public class ParserBuilder : PipeElementBuilder { protected IFormatProvider? FormatProvider { get; set; } @@ -14,7 +15,7 @@ public class ParserBuilder : PipeElementBuilder upstream, ParserDelegate parseFunction) : base(upstream) - => ParseFunction = parseFunction; + => (ParseFunction) = (parseFunction); public ParserBuilder WithFormatProvider(IFormatProvider formatProvider) { diff --git a/Streamistry.Core/Testability/PipeTestingExtensions.cs b/Streamistry.Core/Testability/PipeTestingExtensions.cs index c12bfbf..4358139 100644 --- a/Streamistry.Core/Testability/PipeTestingExtensions.cs +++ b/Streamistry.Core/Testability/PipeTestingExtensions.cs @@ -120,6 +120,16 @@ public static bool[] EmitAndAnyOutputs(this IProcessablePipe inp return [.. outputs]; } + public static (T1?[], T2?[]) GetMultipleOutputs(this Action action, IChainablePort output1, IChainablePort output2) + { + var outputs1 = new List(); + output1.RegisterDownstream(outputs1.Add); + var outputs2 = new List(); + output2.RegisterDownstream(outputs2.Add); + action.Invoke(); + return ([.. outputs1], [.. outputs2]); + } + public static void RegisterDownstreamIfPossible(this IChainablePipe pipe, Action action) { // Get the type of the pipe object diff --git a/Streamistry.SourceGenerator/BranchesBuilder.scriban b/Streamistry.SourceGenerator/BranchesBuilder.scriban index 10e26eb..7596cf6 100644 --- a/Streamistry.SourceGenerator/BranchesBuilder.scriban +++ b/Streamistry.SourceGenerator/BranchesBuilder.scriban @@ -29,4 +29,23 @@ public CombinatorBuilder<{{ generics | array.join ", " }}, TOutput> Zip(Func<{{ generics | array.join "?, " }}?, TOutput> function) => new(this, function); + + public BranchesBuilder Checkpoints(out IChainablePort[] ports) + { + ports = BuildPipeElement(); + return this; + } + + public BranchesBuilder Checkpoints( + {{- + func concat; ret string.append "out IChainablePort port" | string.append $0; end + indexes | array.each @concat | array.join ", " + -}}) + { + var ports = BuildPipeElement(); + {{~ for type in generics ~}} + port{{ for.index + 1 }} = ports[{{ for.index}}] as IChainablePort<{{type}}> ?? throw new InvalidCastException(); + {{~ end ~}} + return this; + } } diff --git a/Streamistry.Testing/Fluent/PipelineBuilderTests.cs b/Streamistry.Testing/Fluent/PipelineBuilderTests.cs index ab0b40c..f6433e8 100644 --- a/Streamistry.Testing/Fluent/PipelineBuilderTests.cs +++ b/Streamistry.Testing/Fluent/PipelineBuilderTests.cs @@ -395,6 +395,145 @@ public void Build_CombineThreeUpstreamsCheckpoint_Success() Assert.That(output, Does.Contain("on 16 September 2025")); } + [Test] + public void Build_InBranchCheckpoint_Success() + { + var pipeline = new PipelineBuilder() + .Source(["2024-09-14", "2024-09-15", "2024-45-78"]) + .Parse() + .AsDate() + .Branch( + day => day.Map(x => x.AddDays(1)).Pluck(x => x.Day) + , month => month.Map(x => x.ToString("MMMM", CultureInfo.InvariantCulture)) + ).Checkpoints(out var ports) + .Build(); + + Assert.That(pipeline, Is.Not.Null); + Assert.That(ports, Is.Not.Null); + Assert.That(ports, Has.Length.EqualTo(2)); + + var outputMonth = ((IChainablePort)ports[1]).GetOutputs(pipeline.Start); + Assert.That(outputMonth, Does.Contain("September")); + } + + [Test] + public void Build_InBranchCheckpointForAllPorts_Success() + { + var pipeline = new PipelineBuilder() + .Source(["2024-09-14", "2024-09-15", "2024-45-78"]) + .Parse() + .AsDate() + .Branch( + day => day.Map(x => x.AddDays(1)).Pluck(x => x.Day) + , month => month.Map(x => x.ToString("MMMM", CultureInfo.InvariantCulture)) + ).Checkpoints(out var ports) + .Build(); + + Assert.That(pipeline, Is.Not.Null); + Assert.That(ports, Is.Not.Null); + Assert.That(ports, Has.Length.EqualTo(2)); + + var outputMonth = ((IChainablePort)ports[1]).GetOutputs(pipeline.Start); + Assert.That(outputMonth, Does.Contain("September")); + } + + + [Test] + public void Build_InBranchCheckpointWithDiscardedPorts_Success() + { + var pipeline = new PipelineBuilder() + .Source(["2024-09-14", "2024-09-15", "2024-45-78"]) + .Parse() + .AsDate() + .Branch( + day => day.Map(x => x.AddDays(1)).Pluck(x => x.Day) + , month => month.Map(x => x.ToString("MMMM", CultureInfo.InvariantCulture)) + , year => year.Pluck(x => x.Year).Map(x => x + 1) + ).Checkpoints(out var _, out var monthPort, out var _) + .Build(); + + Assert.That(pipeline, Is.Not.Null); + Assert.That(monthPort, Is.Not.Null); + + var outputMonth = monthPort.GetOutputs(pipeline.Start); + Assert.That(outputMonth, Does.Contain("September")); + } + + [Test] + public void Build_InBranchCheckpointForAllPortsAllAsserted_Success() + { + var pipeline = new PipelineBuilder() + .Source(["2024-09-14", "2024-09-15", "2024-45-78"]) + .Parse() + .AsDate() + .Branch( + day => day.Map(x => x.AddDays(1)).Pluck(x => x.Day) + , month => month.Map(x => x.ToString("MMMM", CultureInfo.InvariantCulture)) + ).Checkpoints(out var ports) + .Build(); + + Assert.That(pipeline, Is.Not.Null); + Assert.That(ports, Is.Not.Null); + Assert.That(ports, Has.Length.EqualTo(2)); + + var action = pipeline.Start; + var (outputDay, outputMonth) = action.GetMultipleOutputs((IChainablePort)ports[0], (IChainablePort)ports[1]); + Assert.That(outputDay, Does.Contain(15)); + Assert.That(outputDay, Does.Contain(16)); + Assert.That(outputMonth, Does.Contain("September")); + } + + [Test] + public void Build_InBranchCheckpointForAllPortsTypedAllAsserted_Success() + { + var pipeline = new PipelineBuilder() + .Source(["2024-09-14", "2024-09-15", "2024-45-78"]) + .Parse() + .AsDate() + .Branch( + day => day.Map(x => x.AddDays(1)).Pluck(x => x.Day) + , month => month.Map(x => x.ToString("MMMM", CultureInfo.InvariantCulture)) + ).Checkpoints(out var portDay, out var portMonth) + .Build(); + + Assert.That(pipeline, Is.Not.Null); + Assert.That(portDay, Is.Not.Null); + Assert.That(portMonth, Is.Not.Null); + + var action = pipeline.Start; + var (outputDay, outputMonth) = action.GetMultipleOutputs(portDay, portMonth); + Assert.That(outputDay, Does.Contain(15)); + Assert.That(outputDay, Does.Contain(16)); + Assert.That(outputMonth, Does.Contain("September")); + } + + [Test] + public void Build_InBranchOfBranchCheckpointForAllPortsTypedAllAsserted_Success() + { + var pipeline = new PipelineBuilder() + .Source(["2024-09-14", "2024-09-15", "2024-45-78"]) + .Parse() + .AsDate() + .Branch( + day => day.Map(x => x.AddDays(1)).Pluck(x => x.Day).Branch( + day1 => day1.Map(x => x + 1) + , day2 => day2.Map(x => x + 2) + ).Zip((x,y)=> x + y) + , month => month.Map(x => x.ToString("MMMM", CultureInfo.InvariantCulture)) + ).Checkpoints(out var portDay, out var portMonth) + .Build(); + + Assert.That(pipeline, Is.Not.Null); + Assert.That(portDay, Is.Not.Null); + Assert.That(portMonth, Is.Not.Null); + + var action = pipeline.Start; + var (outputDay, outputMonth) = action.GetMultipleOutputs(portDay, portMonth); + Assert.That(outputDay, Does.Contain(33)); + Assert.That(outputDay, Does.Contain(35)); + Assert.That(outputMonth, Does.Contain("September")); + } + [Test] public void Build_CombineFiveUpstreamsCheckpoint_Success() {