Skip to content

Commit

Permalink
feat: branching with fluent method handles many checkpoints (#84)
Browse files Browse the repository at this point in the history
- support interception of more than a single checkpoint
- test: assert that we can create branches within a branch
  • Loading branch information
Seddryck authored Sep 16, 2024
1 parent 2770793 commit c70e1c2
Show file tree
Hide file tree
Showing 4 changed files with 170 additions and 1 deletion.
3 changes: 2 additions & 1 deletion Streamistry.Core/Fluent/ParserBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,15 @@
using Streamistry.Pipes.Parsers;

namespace Streamistry.Fluent;

public class ParserBuilder<TInput, TOutput> : PipeElementBuilder<TInput, TOutput>
{
protected IFormatProvider? FormatProvider { get; set; }
protected ParserDelegate<TInput, TOutput> ParseFunction { get; }

public ParserBuilder(IPipeBuilder<TInput> upstream, ParserDelegate<TInput, TOutput> parseFunction)
: base(upstream)
=> ParseFunction = parseFunction;
=> (ParseFunction) = (parseFunction);

public ParserBuilder<TInput, TOutput> WithFormatProvider(IFormatProvider formatProvider)
{
Expand Down
10 changes: 10 additions & 0 deletions Streamistry.Core/Testability/PipeTestingExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,16 @@ public static bool[] EmitAndAnyOutputs<TInput>(this IProcessablePipe<TInput> inp
return [.. outputs];
}

public static (T1?[], T2?[]) GetMultipleOutputs<T1, T2>(this Action action, IChainablePort<T1> output1, IChainablePort<T2> output2)
{
var outputs1 = new List<T1?>();
output1.RegisterDownstream(outputs1.Add);
var outputs2 = new List<T2?>();
output2.RegisterDownstream(outputs2.Add);
action.Invoke();
return ([.. outputs1], [.. outputs2]);
}

public static void RegisterDownstreamIfPossible(this IChainablePipe pipe, Action<object?> action)
{
// Get the type of the pipe object
Expand Down
19 changes: 19 additions & 0 deletions Streamistry.SourceGenerator/BranchesBuilder.scriban
Original file line number Diff line number Diff line change
Expand Up @@ -29,4 +29,23 @@

public CombinatorBuilder<{{ generics | array.join ", " }}, TOutput> Zip<TOutput>(Func<{{ generics | array.join "?, " }}?, TOutput> function)
=> new(this, function);

public BranchesBuilder<TInput, {{ generics | array.join ", " }}> Checkpoints(out IChainablePort[] ports)
{
ports = BuildPipeElement();
return this;
}

public BranchesBuilder<TInput, {{ generics | array.join ", " }}> Checkpoints(
{{-
func concat; ret string.append "out IChainablePort<T" $0 | string.append "> port" | string.append $0; end
indexes | array.each @concat | array.join ", "
-}})
{
var ports = BuildPipeElement();
{{~ for type in generics ~}}
port{{ for.index + 1 }} = ports[{{ for.index}}] as IChainablePort<{{type}}> ?? throw new InvalidCastException();
{{~ end ~}}
return this;
}
}
139 changes: 139 additions & 0 deletions Streamistry.Testing/Fluent/PipelineBuilderTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -395,6 +395,145 @@ public void Build_CombineThreeUpstreamsCheckpoint_Success()
Assert.That(output, Does.Contain("on 16 September 2025"));
}

[Test]
public void Build_InBranchCheckpoint_Success()
{
var pipeline = new PipelineBuilder<string>()
.Source(["2024-09-14", "2024-09-15", "2024-45-78"])
.Parse()
.AsDate()
.Branch(
day => day.Map(x => x.AddDays(1)).Pluck(x => x.Day)
, month => month.Map(x => x.ToString("MMMM", CultureInfo.InvariantCulture))
).Checkpoints(out var ports)
.Build();

Assert.That(pipeline, Is.Not.Null);
Assert.That(ports, Is.Not.Null);
Assert.That(ports, Has.Length.EqualTo(2));

var outputMonth = ((IChainablePort<string>)ports[1]).GetOutputs(pipeline.Start);
Assert.That(outputMonth, Does.Contain("September"));
}

[Test]
public void Build_InBranchCheckpointForAllPorts_Success()
{
var pipeline = new PipelineBuilder<string>()
.Source(["2024-09-14", "2024-09-15", "2024-45-78"])
.Parse()
.AsDate()
.Branch(
day => day.Map(x => x.AddDays(1)).Pluck(x => x.Day)
, month => month.Map(x => x.ToString("MMMM", CultureInfo.InvariantCulture))
).Checkpoints(out var ports)
.Build();

Assert.That(pipeline, Is.Not.Null);
Assert.That(ports, Is.Not.Null);
Assert.That(ports, Has.Length.EqualTo(2));

var outputMonth = ((IChainablePort<string>)ports[1]).GetOutputs(pipeline.Start);
Assert.That(outputMonth, Does.Contain("September"));
}


[Test]
public void Build_InBranchCheckpointWithDiscardedPorts_Success()
{
var pipeline = new PipelineBuilder<string>()
.Source(["2024-09-14", "2024-09-15", "2024-45-78"])
.Parse()
.AsDate()
.Branch(
day => day.Map(x => x.AddDays(1)).Pluck(x => x.Day)
, month => month.Map(x => x.ToString("MMMM", CultureInfo.InvariantCulture))
, year => year.Pluck(x => x.Year).Map(x => x + 1)
).Checkpoints(out var _, out var monthPort, out var _)
.Build();

Assert.That(pipeline, Is.Not.Null);
Assert.That(monthPort, Is.Not.Null);

var outputMonth = monthPort.GetOutputs(pipeline.Start);
Assert.That(outputMonth, Does.Contain("September"));
}

[Test]
public void Build_InBranchCheckpointForAllPortsAllAsserted_Success()
{
var pipeline = new PipelineBuilder<string>()
.Source(["2024-09-14", "2024-09-15", "2024-45-78"])
.Parse()
.AsDate()
.Branch(
day => day.Map(x => x.AddDays(1)).Pluck(x => x.Day)
, month => month.Map(x => x.ToString("MMMM", CultureInfo.InvariantCulture))
).Checkpoints(out var ports)
.Build();

Assert.That(pipeline, Is.Not.Null);
Assert.That(ports, Is.Not.Null);
Assert.That(ports, Has.Length.EqualTo(2));

var action = pipeline.Start;
var (outputDay, outputMonth) = action.GetMultipleOutputs((IChainablePort<int>)ports[0], (IChainablePort<string>)ports[1]);
Assert.That(outputDay, Does.Contain(15));
Assert.That(outputDay, Does.Contain(16));
Assert.That(outputMonth, Does.Contain("September"));
}

[Test]
public void Build_InBranchCheckpointForAllPortsTypedAllAsserted_Success()
{
var pipeline = new PipelineBuilder<string>()
.Source(["2024-09-14", "2024-09-15", "2024-45-78"])
.Parse()
.AsDate()
.Branch(
day => day.Map(x => x.AddDays(1)).Pluck(x => x.Day)
, month => month.Map(x => x.ToString("MMMM", CultureInfo.InvariantCulture))
).Checkpoints(out var portDay, out var portMonth)
.Build();

Assert.That(pipeline, Is.Not.Null);
Assert.That(portDay, Is.Not.Null);
Assert.That(portMonth, Is.Not.Null);

var action = pipeline.Start;
var (outputDay, outputMonth) = action.GetMultipleOutputs(portDay, portMonth);
Assert.That(outputDay, Does.Contain(15));
Assert.That(outputDay, Does.Contain(16));
Assert.That(outputMonth, Does.Contain("September"));
}

[Test]
public void Build_InBranchOfBranchCheckpointForAllPortsTypedAllAsserted_Success()
{
var pipeline = new PipelineBuilder<string>()
.Source(["2024-09-14", "2024-09-15", "2024-45-78"])
.Parse()
.AsDate()
.Branch(
day => day.Map(x => x.AddDays(1)).Pluck(x => x.Day).Branch(
day1 => day1.Map(x => x + 1)
, day2 => day2.Map(x => x + 2)
).Zip((x,y)=> x + y)
, month => month.Map(x => x.ToString("MMMM", CultureInfo.InvariantCulture))
).Checkpoints(out var portDay, out var portMonth)
.Build();

Assert.That(pipeline, Is.Not.Null);
Assert.That(portDay, Is.Not.Null);
Assert.That(portMonth, Is.Not.Null);

var action = pipeline.Start;
var (outputDay, outputMonth) = action.GetMultipleOutputs(portDay, portMonth);
Assert.That(outputDay, Does.Contain(33));
Assert.That(outputDay, Does.Contain(35));
Assert.That(outputMonth, Does.Contain("September"));
}

[Test]
public void Build_CombineFiveUpstreamsCheckpoint_Success()
{
Expand Down

0 comments on commit c70e1c2

Please sign in to comment.