From 0f693738550df93a25acdfba8bf77738bef5345d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?C=C3=A9dric=20L=2E=20Charlier?= Date: Mon, 16 Sep 2024 20:18:17 +0200 Subject: [PATCH] feat: add Branch and Zip methods to fluent interface (#81) --- Streamistry.Core/Fluent/AggregatorBuilder.cs | 16 ++--- Streamistry.Core/Fluent/BasePipeBuilder.cs | 11 ++- Streamistry.Core/Fluent/BranchesBuilder.cs | 27 +++++++ Streamistry.Core/Fluent/CombinatorBuilder.cs | 22 ++++++ Streamistry.Core/Fluent/FilterBuilder.cs | 6 +- Streamistry.Core/Fluent/IPipeBuilder.cs | 8 +-- Streamistry.Core/Fluent/MapperBuilder.cs | 6 +- Streamistry.Core/Fluent/ParserBuilder.cs | 18 ++--- Streamistry.Core/Fluent/PipeElementBuilder.cs | 4 +- Streamistry.Core/Fluent/PipelineBuilder.cs | 6 +- Streamistry.Core/Fluent/PluckerBuilder.cs | 6 +- Streamistry.Core/Fluent/SinkBuilder.cs | 4 +- Streamistry.Core/Fluent/SourceBuilder.cs | 4 +- Streamistry.Core/Fluent/SplitterBuilder.cs | 6 +- .../BasePipeBuilder.scriban | 9 +++ .../BasePipeBuilderSourceGenerator.cs | 31 ++++++++ .../BranchesBuilder.scriban | 32 +++++++++ .../BranchesBuilderSourceGenerator.cs | 31 ++++++++ .../CombinatorBuilder.scriban | 23 ++++++ .../CombinatorBuilderSourceGenerator.cs | 31 ++++++++ .../CombinatorSourceGenerator.cs | 5 +- Streamistry.SourceGenerator/Header.scriban | 3 +- .../Streamistry.SourceGenerator.csproj | 6 ++ .../ZipperSourceGenerator.cs | 4 +- .../Fluent/PipelineBuilderTests.cs | 72 ++++++++++++++++++- 25 files changed, 335 insertions(+), 56 deletions(-) create mode 100644 Streamistry.Core/Fluent/BranchesBuilder.cs create mode 100644 Streamistry.Core/Fluent/CombinatorBuilder.cs create mode 100644 Streamistry.SourceGenerator/BasePipeBuilder.scriban create mode 100644 Streamistry.SourceGenerator/BasePipeBuilderSourceGenerator.cs create mode 100644 Streamistry.SourceGenerator/BranchesBuilder.scriban create mode 100644 Streamistry.SourceGenerator/BranchesBuilderSourceGenerator.cs create mode 100644 Streamistry.SourceGenerator/CombinatorBuilder.scriban create mode 100644 Streamistry.SourceGenerator/CombinatorBuilderSourceGenerator.cs diff --git a/Streamistry.Core/Fluent/AggregatorBuilder.cs b/Streamistry.Core/Fluent/AggregatorBuilder.cs index a8f07b7..0865f7f 100644 --- a/Streamistry.Core/Fluent/AggregatorBuilder.cs +++ b/Streamistry.Core/Fluent/AggregatorBuilder.cs @@ -7,7 +7,7 @@ using Streamistry.Pipes.Aggregators; namespace Streamistry.Fluent; -internal class AggregatorBuilder +public class AggregatorBuilder { protected IPipeBuilder Upstream { get; } @@ -26,10 +26,9 @@ public SpecializedAggregatorBuilder AsSum() => new SpecializedAggregatorBuilder(Upstream, typeof(Sum<,>), [typeof(TInput), typeof(TOutput)]); public SpecializedAggregatorBuilder AsCount() => new SpecializedAggregatorBuilder(Upstream, typeof(Count<,>), [typeof(TInput), typeof(TOutput)]); - } -internal class SpecializedAggregatorBuilder : PipeElementBuilder +public class SpecializedAggregatorBuilder : PipeElementBuilder { protected Type Type { get; } protected Type[] GenericTypeParameters { get; } = [typeof(int)]; @@ -37,14 +36,14 @@ public SpecializedAggregatorBuilder(IPipeBuilder upstream, Type type, Ty : base(upstream) => (Type, GenericTypeParameters) = (type, genericTypeParameters); - public override IChainablePort OnBuildPort() + public override IChainablePort OnBuildPipeElement() { var t = Type.MakeGenericType(GenericTypeParameters); - return (IChainablePort)Activator.CreateInstance(t, Upstream.BuildPort(), null)!; + return (IChainablePort)Activator.CreateInstance(t, Upstream.BuildPipeElement(), null)!; } } -internal class UniversalAggregatorBuilder : PipeElementBuilder +public class UniversalAggregatorBuilder : PipeElementBuilder { protected Func? Accumulator { get; } protected Func? Selector { get; set; } = x => (TOutput?)Convert.ChangeType(x, typeof(TOutput)); @@ -66,12 +65,11 @@ public UniversalAggregatorBuilder WithSeed(TAccumu return this; } - public override IChainablePort OnBuildPort() + public override IChainablePort OnBuildPipeElement() => new Aggregator( - Upstream.BuildPort() + Upstream.BuildPipeElement() , Accumulator ?? throw new InvalidOperationException() , Selector ?? throw new InvalidOperationException() , Seed ); - } diff --git a/Streamistry.Core/Fluent/BasePipeBuilder.cs b/Streamistry.Core/Fluent/BasePipeBuilder.cs index 6e1c98c..4ce6edd 100644 --- a/Streamistry.Core/Fluent/BasePipeBuilder.cs +++ b/Streamistry.Core/Fluent/BasePipeBuilder.cs @@ -7,18 +7,18 @@ namespace Streamistry.Fluent; -internal abstract class BasePipeBuilder : IPipeBuilder +public abstract partial class BasePipeBuilder : IPipeBuilder { protected IChainablePort? Instance { get; set; } - public abstract IChainablePort OnBuildPort(); + public abstract IChainablePort OnBuildPipeElement(); - public IChainablePort BuildPort() - => Instance ??= OnBuildPort(); + public IChainablePort BuildPipeElement() + => Instance ??= OnBuildPipeElement(); public Pipeline Build() { - BuildPort(); + BuildPipeElement(); return Instance!.Pipe.Pipeline!; } @@ -47,5 +47,4 @@ public ParserBuilder Parse(ParserDelegate => new(this, parser); public ParserBuilder Parse() => new(this); - } diff --git a/Streamistry.Core/Fluent/BranchesBuilder.cs b/Streamistry.Core/Fluent/BranchesBuilder.cs new file mode 100644 index 0000000..fec4aec --- /dev/null +++ b/Streamistry.Core/Fluent/BranchesBuilder.cs @@ -0,0 +1,27 @@ +using System; +using System.Collections.Generic; +using System.IO; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace Streamistry.Fluent; +public abstract class BranchesBuilder : IBuilder +{ + protected BasePipeBuilder Upstream { get; } + protected IChainablePort[]? Instances { get; set; } + + public BranchesBuilder(BasePipeBuilder upstream) + => (Upstream) = (upstream); + + public IChainablePort[] BuildPipeElement() + => Instances ??= OnBuildPipeElement(); + + public abstract IChainablePort[] OnBuildPipeElement(); + + public Pipeline Build() + { + BuildPipeElement(); + return Instances![0].Pipe.Pipeline!; + } +} diff --git a/Streamistry.Core/Fluent/CombinatorBuilder.cs b/Streamistry.Core/Fluent/CombinatorBuilder.cs new file mode 100644 index 0000000..fc31c51 --- /dev/null +++ b/Streamistry.Core/Fluent/CombinatorBuilder.cs @@ -0,0 +1,22 @@ +using System; +using System.Collections.Generic; +using System.IO; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace Streamistry.Fluent; +public abstract class BaseCombinatorBuilder : BasePipeBuilder +{ + protected IBuilder Upstream { get; } + + public BaseCombinatorBuilder(IBuilder upstream) + : base() + => (Upstream) = (upstream); + + public BasePipeBuilder Checkpoint(out IChainablePort port) + { + port = BuildPipeElement(); + return this; + } +} diff --git a/Streamistry.Core/Fluent/FilterBuilder.cs b/Streamistry.Core/Fluent/FilterBuilder.cs index 5debb90..90651fa 100644 --- a/Streamistry.Core/Fluent/FilterBuilder.cs +++ b/Streamistry.Core/Fluent/FilterBuilder.cs @@ -5,7 +5,7 @@ using System.Threading.Tasks; namespace Streamistry.Fluent; -internal class FilterBuilder : PipeElementBuilder, IPipeBuilder +public class FilterBuilder : PipeElementBuilder, IPipeBuilder { protected Func? Function { get; } @@ -13,9 +13,9 @@ public FilterBuilder(IPipeBuilder upstream, Func? functio :base(upstream) => (Function) = (function); - public override IChainablePort OnBuildPort() + public override IChainablePort OnBuildPipeElement() => new Filter( - Upstream.BuildPort() + Upstream.BuildPipeElement() , Function ?? throw new InvalidOperationException() ); } diff --git a/Streamistry.Core/Fluent/IPipeBuilder.cs b/Streamistry.Core/Fluent/IPipeBuilder.cs index e2e54c9..4c958e0 100644 --- a/Streamistry.Core/Fluent/IPipeBuilder.cs +++ b/Streamistry.Core/Fluent/IPipeBuilder.cs @@ -5,11 +5,11 @@ using System.Threading.Tasks; namespace Streamistry.Fluent; -internal interface IPipeBuilder : IBuilder> +public interface IPipeBuilder : IBuilder> { } -internal interface IBuilder +public interface IBuilder { - T BuildPort(); - T OnBuildPort(); + T BuildPipeElement(); + T OnBuildPipeElement(); } diff --git a/Streamistry.Core/Fluent/MapperBuilder.cs b/Streamistry.Core/Fluent/MapperBuilder.cs index 9848b54..c4c4e3e 100644 --- a/Streamistry.Core/Fluent/MapperBuilder.cs +++ b/Streamistry.Core/Fluent/MapperBuilder.cs @@ -5,7 +5,7 @@ using System.Threading.Tasks; namespace Streamistry.Fluent; -internal class MapperBuilder : PipeElementBuilder +public class MapperBuilder : PipeElementBuilder { protected Func? Function { get; set; } @@ -13,9 +13,9 @@ public MapperBuilder(IPipeBuilder upstream, Func? fun : base(upstream) => (Function) = (function); - public override IChainablePort OnBuildPort() + public override IChainablePort OnBuildPipeElement() => new Mapper( - Upstream.BuildPort() + Upstream.BuildPipeElement() , Function ?? throw new InvalidOperationException() ); } diff --git a/Streamistry.Core/Fluent/ParserBuilder.cs b/Streamistry.Core/Fluent/ParserBuilder.cs index 28a20ab..0f2b992 100644 --- a/Streamistry.Core/Fluent/ParserBuilder.cs +++ b/Streamistry.Core/Fluent/ParserBuilder.cs @@ -7,7 +7,7 @@ using Streamistry.Pipes.Parsers; namespace Streamistry.Fluent; -internal class ParserBuilder : PipeElementBuilder +public class ParserBuilder : PipeElementBuilder { protected IFormatProvider? FormatProvider { get; set; } protected ParserDelegate ParseFunction { get; } @@ -22,11 +22,11 @@ public ParserBuilder WithFormatProvider(IFormatProvider formatP return this; } - public override IChainablePort OnBuildPort() - => new Parser(Upstream.BuildPort(), ParseFunction); + public override IChainablePort OnBuildPipeElement() + => new Parser(Upstream.BuildPipeElement(), ParseFunction); } -internal class ParserBuilder +public class ParserBuilder { protected IPipeBuilder Upstream { get; } protected IFormatProvider? FormatProvider { get; set; } @@ -46,7 +46,7 @@ public ParserBuilder WithFormatProvider(IFormatProvider formatProvider) } } -internal class SpecializedParserBuilder : PipeElementBuilder +public class SpecializedParserBuilder : PipeElementBuilder { protected Type Type { get; } protected IFormatProvider? FormatProvider { get; set; } @@ -55,9 +55,9 @@ public SpecializedParserBuilder(IPipeBuilder upstream, Type type, IForma : base(upstream) => (Type, FormatProvider) = (type, formatProvider); - public override IChainablePort OnBuildPort() + public override IChainablePort OnBuildPipeElement() { - return (IChainablePort)Activator.CreateInstance(Type, Upstream.BuildPort(), FormatProvider)!; + return (IChainablePort)Activator.CreateInstance(Type, Upstream.BuildPipeElement(), FormatProvider)!; } public SpecializedParserBuilder WithFormatProvider(IFormatProvider formatProvider) @@ -89,9 +89,9 @@ public SpecializedParserBuilder WithFormatProvider(IFormatProvi // return this; // } -// public override IChainablePort OnBuildPort() +// public override IChainablePort OnBuildPipeElement() // => new Parser( -// Upstream.BuildPort() +// Upstream.BuildPipeElement() // , Accumulator ?? throw new InvalidOperationException() // , Selector ?? throw new InvalidOperationException() // , Seed diff --git a/Streamistry.Core/Fluent/PipeElementBuilder.cs b/Streamistry.Core/Fluent/PipeElementBuilder.cs index 02e6565..23cb867 100644 --- a/Streamistry.Core/Fluent/PipeElementBuilder.cs +++ b/Streamistry.Core/Fluent/PipeElementBuilder.cs @@ -8,7 +8,7 @@ namespace Streamistry.Fluent; -internal abstract class PipeElementBuilder : BasePipeBuilder +public abstract class PipeElementBuilder : BasePipeBuilder { protected IPipeBuilder Upstream { get; } @@ -17,7 +17,7 @@ public PipeElementBuilder(IPipeBuilder upstream) public PipeElementBuilder Checkpoint(out IChainablePort port) { - port = BuildPort(); + port = BuildPipeElement(); return this; } } diff --git a/Streamistry.Core/Fluent/PipelineBuilder.cs b/Streamistry.Core/Fluent/PipelineBuilder.cs index 75583a0..3d5c8a0 100644 --- a/Streamistry.Core/Fluent/PipelineBuilder.cs +++ b/Streamistry.Core/Fluent/PipelineBuilder.cs @@ -12,9 +12,9 @@ internal class PipelineBuilder : IBuilder public SourceBuilder Source(IEnumerable enumeration) => new (this, enumeration); - public Pipeline BuildPort() - => Instance ??= OnBuildPort(); + public Pipeline BuildPipeElement() + => Instance ??= OnBuildPipeElement(); - public Pipeline OnBuildPort() + public Pipeline OnBuildPipeElement() => new Pipeline(); } diff --git a/Streamistry.Core/Fluent/PluckerBuilder.cs b/Streamistry.Core/Fluent/PluckerBuilder.cs index 897b9e8..19da13a 100644 --- a/Streamistry.Core/Fluent/PluckerBuilder.cs +++ b/Streamistry.Core/Fluent/PluckerBuilder.cs @@ -7,7 +7,7 @@ using Streamistry.Pipes.Mappers; namespace Streamistry.Fluent; -internal class PluckerBuilder : PipeElementBuilder +public class PluckerBuilder : PipeElementBuilder { protected Expression> Expr { get; set; } @@ -15,9 +15,9 @@ public PluckerBuilder(IPipeBuilder upstream, Expression (Expr) = (expr); - public override IChainablePort OnBuildPort() + public override IChainablePort OnBuildPipeElement() => new Plucker( - Upstream.BuildPort() + Upstream.BuildPipeElement() , Expr ?? throw new InvalidOperationException() ); } diff --git a/Streamistry.Core/Fluent/SinkBuilder.cs b/Streamistry.Core/Fluent/SinkBuilder.cs index 1181917..3251fd2 100644 --- a/Streamistry.Core/Fluent/SinkBuilder.cs +++ b/Streamistry.Core/Fluent/SinkBuilder.cs @@ -6,7 +6,7 @@ using Streamistry.Pipes.Sinks; namespace Streamistry.Fluent; -internal class SinkBuilder +public class SinkBuilder { protected IPipeBuilder Upstream { get; } private Type? SinkType { get; set; } @@ -22,7 +22,7 @@ public SinkBuilder InMemory() public Pipeline Build() { - var sink = (Sink)Activator.CreateInstance(SinkType ?? throw new InvalidOperationException(), [Upstream.BuildPort()])!; + var sink = (Sink)Activator.CreateInstance(SinkType ?? throw new InvalidOperationException(), [Upstream.BuildPipeElement()])!; return sink.Pipeline!; } } diff --git a/Streamistry.Core/Fluent/SourceBuilder.cs b/Streamistry.Core/Fluent/SourceBuilder.cs index c3cc495..ac93618 100644 --- a/Streamistry.Core/Fluent/SourceBuilder.cs +++ b/Streamistry.Core/Fluent/SourceBuilder.cs @@ -15,9 +15,9 @@ internal class SourceBuilder : BasePipeBuilder public SourceBuilder(IBuilder upstream, IEnumerable enumeration) => (Enumeration, Upstream) = (enumeration, upstream); - public override IChainablePort OnBuildPort() + public override IChainablePort OnBuildPipeElement() { - var pipeline = Upstream.BuildPort(); + var pipeline = Upstream.BuildPipeElement(); var source = new EnumerableSource(pipeline, Enumeration); pipeline.AddSource(source); return source; diff --git a/Streamistry.Core/Fluent/SplitterBuilder.cs b/Streamistry.Core/Fluent/SplitterBuilder.cs index 6773c8c..adc2cc9 100644 --- a/Streamistry.Core/Fluent/SplitterBuilder.cs +++ b/Streamistry.Core/Fluent/SplitterBuilder.cs @@ -5,7 +5,7 @@ using System.Threading.Tasks; namespace Streamistry.Fluent; -internal class SplitterBuilder : PipeElementBuilder +public class SplitterBuilder : PipeElementBuilder { protected Func? Function { get; set; } @@ -13,9 +13,9 @@ public SplitterBuilder(IPipeBuilder upstream, Func? : base(upstream) => (Function) = (function); - public override IChainablePort OnBuildPort() + public override IChainablePort OnBuildPipeElement() => new Splitter( - Upstream.BuildPort() + Upstream.BuildPipeElement() , Function ?? throw new InvalidOperationException() ); } diff --git a/Streamistry.SourceGenerator/BasePipeBuilder.scriban b/Streamistry.SourceGenerator/BasePipeBuilder.scriban new file mode 100644 index 0000000..fc51bb7 --- /dev/null +++ b/Streamistry.SourceGenerator/BasePipeBuilder.scriban @@ -0,0 +1,9 @@ +public abstract partial class BasePipeBuilder : IPipeBuilder +{ + public BranchesBuilder Branch<{{ generics | array.join ", " }}>( + {{- + func concat; ret string.append "Func, BasePipeBuilder> upstream" | string.append $0; end + indexes | array.each @concat | array.join ", " + }}) + => new(this, upstream{{ indexes | array.join ", upstream" }}); +} diff --git a/Streamistry.SourceGenerator/BasePipeBuilderSourceGenerator.cs b/Streamistry.SourceGenerator/BasePipeBuilderSourceGenerator.cs new file mode 100644 index 0000000..39fb8b8 --- /dev/null +++ b/Streamistry.SourceGenerator/BasePipeBuilderSourceGenerator.cs @@ -0,0 +1,31 @@ +using System; +using System.Collections.Generic; +using System.Text; +using Microsoft.CodeAnalysis.Text; +using Microsoft.CodeAnalysis; +using System.Resources; +using Scriban; +using System.Reflection; + +namespace Streamistry.SourceGenerator; + +[Generator] +public class BasePipeBuilderSourceGenerator : BaseSourceGenerator +{ + public override string GenerateClasses() + { + var sb = new StringBuilder(); + sb.Append(ReadTemplate("Header.scriban")) + .AppendLine() + .AppendLine("namespace Streamistry.Fluent;") + .AppendLine(); + + for (int i = MIN_CARDINALITY; i <= MAX_CARDINALITY; i++) + { + sb.Append(GenerateClass(Classname, i)); + sb.AppendLine(); + } + + return sb.ToString(); + } +} diff --git a/Streamistry.SourceGenerator/BranchesBuilder.scriban b/Streamistry.SourceGenerator/BranchesBuilder.scriban new file mode 100644 index 0000000..10e26eb --- /dev/null +++ b/Streamistry.SourceGenerator/BranchesBuilder.scriban @@ -0,0 +1,32 @@ +public partial class BranchesBuilder : BranchesBuilder +{ + {{~ for type in generics ~}} + protected Func, BasePipeBuilder<{{type}}>> Pipe{{ for.index + 1 }} { get; } + {{~ end ~}} + + public BranchesBuilder( + BasePipeBuilder upstream + {{~ for type in generics ~}} + , Func, BasePipeBuilder<{{type}}>> pipe{{ for.index + 1 }} + {{~ end ~}} + ) + : base(upstream) + { + {{~ for type in generics ~}} + Pipe{{ for.index + 1 }} = pipe{{ for.index + 1 }}; + {{~ end ~}} + } + + public override IChainablePort[] OnBuildPipeElement() + { + Upstream.BuildPipeElement(); + return [ + {{~ for type in generics ~}} + Pipe{{ for.index + 1 }}.Invoke(Upstream).BuildPipeElement(), + {{~ end ~}} + ]; + } + + public CombinatorBuilder<{{ generics | array.join ", " }}, TOutput> Zip(Func<{{ generics | array.join "?, " }}?, TOutput> function) + => new(this, function); +} diff --git a/Streamistry.SourceGenerator/BranchesBuilderSourceGenerator.cs b/Streamistry.SourceGenerator/BranchesBuilderSourceGenerator.cs new file mode 100644 index 0000000..7bd2659 --- /dev/null +++ b/Streamistry.SourceGenerator/BranchesBuilderSourceGenerator.cs @@ -0,0 +1,31 @@ +using System; +using System.Collections.Generic; +using System.Text; +using Microsoft.CodeAnalysis.Text; +using Microsoft.CodeAnalysis; +using System.Resources; +using Scriban; +using System.Reflection; + +namespace Streamistry.SourceGenerator; + +[Generator] +public class BranchesBuilderSourceGenerator : BaseSourceGenerator +{ + public override string GenerateClasses() + { + var sb = new StringBuilder(); + sb.Append(ReadTemplate("Header.scriban")) + .AppendLine() + .AppendLine("namespace Streamistry.Fluent;") + .AppendLine(); + + for (int i = MIN_CARDINALITY; i <= MAX_CARDINALITY; i++) + { + sb.Append(GenerateClass(Classname, i)); + sb.AppendLine(); + } + + return sb.ToString(); + } +} diff --git a/Streamistry.SourceGenerator/CombinatorBuilder.scriban b/Streamistry.SourceGenerator/CombinatorBuilder.scriban new file mode 100644 index 0000000..2def3b4 --- /dev/null +++ b/Streamistry.SourceGenerator/CombinatorBuilder.scriban @@ -0,0 +1,23 @@ +public class CombinatorBuilder<{{ generics | array.join ", " }}, TOutput> : BaseCombinatorBuilder +{ + protected Func<{{ generics | array.join "?, " }}?, TOutput> Function { get; } + + public CombinatorBuilder( + IBuilder upstream + , Func<{{ generics | array.join "?, " }}?, TOutput> function + ) + : base(upstream) + => (Function) = (function); + + public override IChainablePort OnBuildPipeElement() + { + var upstreams = Upstream.BuildPipeElement(); + + return new Zipper<{{ generics | array.join ", " }}, TOutput>( + {{~ for type in generics ~}} + (IChainablePort<{{type}}?>)upstreams[{{ for.index }}], + {{~ end ~}} + Function ?? throw new InvalidOperationException() + ); + } +} diff --git a/Streamistry.SourceGenerator/CombinatorBuilderSourceGenerator.cs b/Streamistry.SourceGenerator/CombinatorBuilderSourceGenerator.cs new file mode 100644 index 0000000..8456abe --- /dev/null +++ b/Streamistry.SourceGenerator/CombinatorBuilderSourceGenerator.cs @@ -0,0 +1,31 @@ +using System; +using System.Collections.Generic; +using System.Text; +using Microsoft.CodeAnalysis.Text; +using Microsoft.CodeAnalysis; +using System.Resources; +using Scriban; +using System.Reflection; + +namespace Streamistry.SourceGenerator; + +[Generator] +public class CombinatorBuilderSourceGenerator : BaseSourceGenerator +{ + public override string GenerateClasses() + { + var sb = new StringBuilder(); + sb.Append(ReadTemplate("Header.scriban")) + .AppendLine() + .AppendLine("namespace Streamistry.Fluent;") + .AppendLine(); + + for (int i = MIN_CARDINALITY; i <= MAX_CARDINALITY; i++) + { + sb.Append(GenerateClass(Classname, i)); + sb.AppendLine(); + } + + return sb.ToString(); + } +} diff --git a/Streamistry.SourceGenerator/CombinatorSourceGenerator.cs b/Streamistry.SourceGenerator/CombinatorSourceGenerator.cs index cc7afee..a957ab6 100644 --- a/Streamistry.SourceGenerator/CombinatorSourceGenerator.cs +++ b/Streamistry.SourceGenerator/CombinatorSourceGenerator.cs @@ -15,7 +15,10 @@ public class CombinatorSourceGenerator : BaseSourceGenerator public override string GenerateClasses() { var sb = new StringBuilder(); - sb.Append(ReadTemplate("Header.scriban")).AppendLine(); + sb.Append(ReadTemplate("Header.scriban")) + .AppendLine() + .AppendLine("namespace Streamistry;") + .AppendLine(); for (int i = MIN_CARDINALITY; i <= MAX_CARDINALITY; i++) { diff --git a/Streamistry.SourceGenerator/Header.scriban b/Streamistry.SourceGenerator/Header.scriban index ffd3ab7..e46d129 100644 --- a/Streamistry.SourceGenerator/Header.scriban +++ b/Streamistry.SourceGenerator/Header.scriban @@ -5,5 +5,4 @@ using System.Linq; using System.Text; using System.Threading.Tasks; using Streamistry.Observability; - -namespace Streamistry; +using Streamistry.Pipes.Combinators; diff --git a/Streamistry.SourceGenerator/Streamistry.SourceGenerator.csproj b/Streamistry.SourceGenerator/Streamistry.SourceGenerator.csproj index a551942..01dd255 100644 --- a/Streamistry.SourceGenerator/Streamistry.SourceGenerator.csproj +++ b/Streamistry.SourceGenerator/Streamistry.SourceGenerator.csproj @@ -7,12 +7,18 @@ + + + + + + diff --git a/Streamistry.SourceGenerator/ZipperSourceGenerator.cs b/Streamistry.SourceGenerator/ZipperSourceGenerator.cs index 71da91b..51379c3 100644 --- a/Streamistry.SourceGenerator/ZipperSourceGenerator.cs +++ b/Streamistry.SourceGenerator/ZipperSourceGenerator.cs @@ -16,8 +16,8 @@ public override string GenerateClasses() { var sb = new StringBuilder(); sb.Append(ReadTemplate("Header.scriban")) - .Remove(sb.Length-3, 3) - .AppendLine(".Pipes.Combinators;") + .AppendLine() + .AppendLine("namespace Streamistry.Pipes.Combinators;") .AppendLine(); for (int i = MIN_CARDINALITY; i <= MAX_CARDINALITY; i++) diff --git a/Streamistry.Testing/Fluent/PipelineBuilderTests.cs b/Streamistry.Testing/Fluent/PipelineBuilderTests.cs index 3f8b6a6..ab0b40c 100644 --- a/Streamistry.Testing/Fluent/PipelineBuilderTests.cs +++ b/Streamistry.Testing/Fluent/PipelineBuilderTests.cs @@ -1,5 +1,6 @@ using System; using System.Collections.Generic; +using System.Globalization; using System.Linq; using System.Text; using System.Threading.Tasks; @@ -13,7 +14,7 @@ public class PipelineBuilderTests [Test] public void Build_EmptyPipeline_Pipeline() { - var pipeline = new PipelineBuilder().BuildPort(); + var pipeline = new PipelineBuilder().BuildPipeElement(); Assert.That(pipeline, Is.Not.Null); Assert.That(pipeline, Is.TypeOf()); } @@ -316,7 +317,7 @@ public void Build_ParserRomanFiguresCheckpoint_Success() 'X' => 10, _ => -1 }; - return y>=0; + return y >= 0; }).Checkpoint(out var parser) .Build(); @@ -349,4 +350,71 @@ public void Build_ComplexTryOnlyMainCheckpoint_Success() var output = aggr.GetOutputs(pipeline.Start); Assert.That(output.Last(), Is.EqualTo(16)); } + + + [Test] + public void Build_CombineTwoUpstreamsCheckpoint_Success() + { + var pipeline = new PipelineBuilder() + .Source(["2024-09-14", "2024-09-15", "2024-45-78"]) + .Parse() + .AsDate() + .Branch( + day => day.Map(x => x.AddDays(1)).Pluck(x => x.Day) + , month => month.Map(x => x.ToString("MMMM", CultureInfo.InvariantCulture))) + .Zip((day, month) => $"{day} {month}").Checkpoint(out var zip) + .Build(); + + Assert.That(pipeline, Is.Not.Null); + Assert.That(zip, Is.Not.Null); + + var output = zip.GetOutputs(pipeline.Start); + Assert.That(output, Does.Contain("15 September")); + Assert.That(output, Does.Contain("16 September")); + } + + [Test] + public void Build_CombineThreeUpstreamsCheckpoint_Success() + { + var pipeline = new PipelineBuilder() + .Source(["2024-09-14", "2024-09-15", "2024-45-78"]) + .Parse() + .AsDate() + .Branch( + day => day.Map(x => x.AddDays(1)).Pluck(x => x.Day) + , month => month.Map(x => x.ToString("MMMM", CultureInfo.InvariantCulture)) + , year => year.Pluck(x => x.Year).Map(x => x + 1)) + .Zip((day, month, year) => $"on {day} {month} {year}").Checkpoint(out var zip) + .Build(); + + Assert.That(pipeline, Is.Not.Null); + Assert.That(zip, Is.Not.Null); + + var output = zip.GetOutputs(pipeline.Start); + Assert.That(output, Does.Contain("on 15 September 2025")); + Assert.That(output, Does.Contain("on 16 September 2025")); + } + + [Test] + public void Build_CombineFiveUpstreamsCheckpoint_Success() + { + var pipeline = new PipelineBuilder() + .Source([1, 2, 3]) + .Branch( + stream1 => stream1.Map(x => x += 1) + , stream2 => stream2.Map(x => x += 2) + , stream3 => stream3.Map(x => x += 3) + , stream4 => stream4.Map(x => x += 4) + , stream5 => stream5.Map(x => x += 5)) + .Zip((stream1, stream2, stream3, stream4, stream5) => stream1 + stream2 + stream3 + stream4 + stream5).Checkpoint(out var zip) + .Build(); + + Assert.That(pipeline, Is.Not.Null); + Assert.That(zip, Is.Not.Null); + + var output = zip.GetOutputs(pipeline.Start); + Assert.That(output, Does.Contain(20)); + Assert.That(output, Does.Contain(25)); + Assert.That(output, Does.Contain(30)); + } }