Skip to content

Commit

Permalink
feat: add Branch and Zip methods to fluent interface (#81)
Browse files Browse the repository at this point in the history
  • Loading branch information
Seddryck authored Sep 16, 2024
1 parent 20b5b39 commit 0f69373
Show file tree
Hide file tree
Showing 25 changed files with 335 additions and 56 deletions.
16 changes: 7 additions & 9 deletions Streamistry.Core/Fluent/AggregatorBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
using Streamistry.Pipes.Aggregators;

namespace Streamistry.Fluent;
internal class AggregatorBuilder<TInput, TAccumulate, TOutput>
public class AggregatorBuilder<TInput, TAccumulate, TOutput>
{
protected IPipeBuilder<TInput> Upstream { get; }

Expand All @@ -26,25 +26,24 @@ public SpecializedAggregatorBuilder<TInput, TAccumulate, TOutput> AsSum()
=> new SpecializedAggregatorBuilder<TInput, TAccumulate, TOutput>(Upstream, typeof(Sum<,>), [typeof(TInput), typeof(TOutput)]);
public SpecializedAggregatorBuilder<TInput, TAccumulate, TOutput> AsCount()
=> new SpecializedAggregatorBuilder<TInput, TAccumulate, TOutput>(Upstream, typeof(Count<,>), [typeof(TInput), typeof(TOutput)]);

}

internal class SpecializedAggregatorBuilder<TInput, TAccumulate, TOutput> : PipeElementBuilder<TInput, TOutput>
public class SpecializedAggregatorBuilder<TInput, TAccumulate, TOutput> : PipeElementBuilder<TInput, TOutput>
{
protected Type Type { get; }
protected Type[] GenericTypeParameters { get; } = [typeof(int)];
public SpecializedAggregatorBuilder(IPipeBuilder<TInput> upstream, Type type, Type[] genericTypeParameters)
: base(upstream)
=> (Type, GenericTypeParameters) = (type, genericTypeParameters);

public override IChainablePort<TOutput> OnBuildPort()
public override IChainablePort<TOutput> OnBuildPipeElement()
{
var t = Type.MakeGenericType(GenericTypeParameters);
return (IChainablePort<TOutput>)Activator.CreateInstance(t, Upstream.BuildPort(), null)!;
return (IChainablePort<TOutput>)Activator.CreateInstance(t, Upstream.BuildPipeElement(), null)!;
}
}

internal class UniversalAggregatorBuilder<TInput, TAccumulate, TOutput> : PipeElementBuilder<TInput, TOutput>
public class UniversalAggregatorBuilder<TInput, TAccumulate, TOutput> : PipeElementBuilder<TInput, TOutput>
{
protected Func<TAccumulate?, TInput?, TAccumulate?>? Accumulator { get; }
protected Func<TAccumulate?, TOutput?>? Selector { get; set; } = x => (TOutput?)Convert.ChangeType(x, typeof(TOutput));
Expand All @@ -66,12 +65,11 @@ public UniversalAggregatorBuilder<TInput, TAccumulate, TOutput> WithSeed(TAccumu
return this;
}

public override IChainablePort<TOutput> OnBuildPort()
public override IChainablePort<TOutput> OnBuildPipeElement()
=> new Aggregator<TInput, TAccumulate, TOutput>(
Upstream.BuildPort()
Upstream.BuildPipeElement()
, Accumulator ?? throw new InvalidOperationException()
, Selector ?? throw new InvalidOperationException()
, Seed
);

}
11 changes: 5 additions & 6 deletions Streamistry.Core/Fluent/BasePipeBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,18 +7,18 @@

namespace Streamistry.Fluent;

internal abstract class BasePipeBuilder<TOutput> : IPipeBuilder<TOutput>
public abstract partial class BasePipeBuilder<TOutput> : IPipeBuilder<TOutput>
{
protected IChainablePort<TOutput>? Instance { get; set; }

public abstract IChainablePort<TOutput> OnBuildPort();
public abstract IChainablePort<TOutput> OnBuildPipeElement();

public IChainablePort<TOutput> BuildPort()
=> Instance ??= OnBuildPort();
public IChainablePort<TOutput> BuildPipeElement()
=> Instance ??= OnBuildPipeElement();

public Pipeline Build()
{
BuildPort();
BuildPipeElement();
return Instance!.Pipe.Pipeline!;
}

Expand Down Expand Up @@ -47,5 +47,4 @@ public ParserBuilder<TOutput, TNext> Parse<TNext>(ParserDelegate<TOutput, TNext>
=> new(this, parser);
public ParserBuilder<TOutput> Parse()
=> new(this);

}
27 changes: 27 additions & 0 deletions Streamistry.Core/Fluent/BranchesBuilder.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace Streamistry.Fluent;
public abstract class BranchesBuilder<TInput> : IBuilder<IChainablePort[]>
{
protected BasePipeBuilder<TInput> Upstream { get; }
protected IChainablePort[]? Instances { get; set; }

public BranchesBuilder(BasePipeBuilder<TInput> upstream)
=> (Upstream) = (upstream);

public IChainablePort[] BuildPipeElement()
=> Instances ??= OnBuildPipeElement();

public abstract IChainablePort[] OnBuildPipeElement();

public Pipeline Build()
{
BuildPipeElement();
return Instances![0].Pipe.Pipeline!;
}
}
22 changes: 22 additions & 0 deletions Streamistry.Core/Fluent/CombinatorBuilder.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace Streamistry.Fluent;
public abstract class BaseCombinatorBuilder<TOutput> : BasePipeBuilder<TOutput>
{
protected IBuilder<IChainablePort[]> Upstream { get; }

public BaseCombinatorBuilder(IBuilder<IChainablePort[]> upstream)
: base()
=> (Upstream) = (upstream);

public BasePipeBuilder<TOutput> Checkpoint(out IChainablePort<TOutput> port)
{
port = BuildPipeElement();
return this;
}
}
6 changes: 3 additions & 3 deletions Streamistry.Core/Fluent/FilterBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,17 @@
using System.Threading.Tasks;

namespace Streamistry.Fluent;
internal class FilterBuilder<TInput> : PipeElementBuilder<TInput, TInput>, IPipeBuilder<TInput>
public class FilterBuilder<TInput> : PipeElementBuilder<TInput, TInput>, IPipeBuilder<TInput>
{
protected Func<TInput?, bool>? Function { get; }

public FilterBuilder(IPipeBuilder<TInput> upstream, Func<TInput?, bool>? function)
:base(upstream)
=> (Function) = (function);

public override IChainablePort<TInput> OnBuildPort()
public override IChainablePort<TInput> OnBuildPipeElement()
=> new Filter<TInput>(
Upstream.BuildPort()
Upstream.BuildPipeElement()
, Function ?? throw new InvalidOperationException()
);
}
8 changes: 4 additions & 4 deletions Streamistry.Core/Fluent/IPipeBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,11 @@
using System.Threading.Tasks;

namespace Streamistry.Fluent;
internal interface IPipeBuilder<T> : IBuilder<IChainablePort<T>>
public interface IPipeBuilder<T> : IBuilder<IChainablePort<T>>
{ }

internal interface IBuilder<T>
public interface IBuilder<T>
{
T BuildPort();
T OnBuildPort();
T BuildPipeElement();
T OnBuildPipeElement();
}
6 changes: 3 additions & 3 deletions Streamistry.Core/Fluent/MapperBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,17 @@
using System.Threading.Tasks;

namespace Streamistry.Fluent;
internal class MapperBuilder<TInput, TOutput> : PipeElementBuilder<TInput, TOutput>
public class MapperBuilder<TInput, TOutput> : PipeElementBuilder<TInput, TOutput>
{
protected Func<TInput?, TOutput?>? Function { get; set; }

public MapperBuilder(IPipeBuilder<TInput> upstream, Func<TInput?, TOutput?>? function)
: base(upstream)
=> (Function) = (function);

public override IChainablePort<TOutput> OnBuildPort()
public override IChainablePort<TOutput> OnBuildPipeElement()
=> new Mapper<TInput, TOutput>(
Upstream.BuildPort()
Upstream.BuildPipeElement()
, Function ?? throw new InvalidOperationException()
);
}
18 changes: 9 additions & 9 deletions Streamistry.Core/Fluent/ParserBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
using Streamistry.Pipes.Parsers;

namespace Streamistry.Fluent;
internal class ParserBuilder<TInput, TOutput> : PipeElementBuilder<TInput, TOutput>
public class ParserBuilder<TInput, TOutput> : PipeElementBuilder<TInput, TOutput>
{
protected IFormatProvider? FormatProvider { get; set; }
protected ParserDelegate<TInput, TOutput> ParseFunction { get; }
Expand All @@ -22,11 +22,11 @@ public ParserBuilder<TInput, TOutput> WithFormatProvider(IFormatProvider formatP
return this;
}

public override IChainablePort<TOutput> OnBuildPort()
=> new Parser<TInput, TOutput>(Upstream.BuildPort(), ParseFunction);
public override IChainablePort<TOutput> OnBuildPipeElement()
=> new Parser<TInput, TOutput>(Upstream.BuildPipeElement(), ParseFunction);
}

internal class ParserBuilder<TInput>
public class ParserBuilder<TInput>
{
protected IPipeBuilder<TInput> Upstream { get; }
protected IFormatProvider? FormatProvider { get; set; }
Expand All @@ -46,7 +46,7 @@ public ParserBuilder<TInput> WithFormatProvider(IFormatProvider formatProvider)
}
}

internal class SpecializedParserBuilder<TInput, TOutput> : PipeElementBuilder<TInput, TOutput>
public class SpecializedParserBuilder<TInput, TOutput> : PipeElementBuilder<TInput, TOutput>
{
protected Type Type { get; }
protected IFormatProvider? FormatProvider { get; set; }
Expand All @@ -55,9 +55,9 @@ public SpecializedParserBuilder(IPipeBuilder<TInput> upstream, Type type, IForma
: base(upstream)
=> (Type, FormatProvider) = (type, formatProvider);

public override IChainablePort<TOutput> OnBuildPort()
public override IChainablePort<TOutput> OnBuildPipeElement()
{
return (IChainablePort<TOutput>)Activator.CreateInstance(Type, Upstream.BuildPort(), FormatProvider)!;
return (IChainablePort<TOutput>)Activator.CreateInstance(Type, Upstream.BuildPipeElement(), FormatProvider)!;
}

public SpecializedParserBuilder<TInput, TOutput> WithFormatProvider(IFormatProvider formatProvider)
Expand Down Expand Up @@ -89,9 +89,9 @@ public SpecializedParserBuilder<TInput, TOutput> WithFormatProvider(IFormatProvi
// return this;
// }

// public override IChainablePort<TOutput> OnBuildPort()
// public override IChainablePort<TOutput> OnBuildPipeElement()
// => new Parser<TInput, TAccumulate, TOutput>(
// Upstream.BuildPort()
// Upstream.BuildPipeElement()
// , Accumulator ?? throw new InvalidOperationException()
// , Selector ?? throw new InvalidOperationException()
// , Seed
Expand Down
4 changes: 2 additions & 2 deletions Streamistry.Core/Fluent/PipeElementBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

namespace Streamistry.Fluent;

internal abstract class PipeElementBuilder<TInput, TOutput> : BasePipeBuilder<TOutput>
public abstract class PipeElementBuilder<TInput, TOutput> : BasePipeBuilder<TOutput>
{
protected IPipeBuilder<TInput> Upstream { get; }

Expand All @@ -17,7 +17,7 @@ public PipeElementBuilder(IPipeBuilder<TInput> upstream)

public PipeElementBuilder<TInput, TOutput> Checkpoint(out IChainablePort<TOutput> port)
{
port = BuildPort();
port = BuildPipeElement();
return this;
}
}
6 changes: 3 additions & 3 deletions Streamistry.Core/Fluent/PipelineBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@ internal class PipelineBuilder<T> : IBuilder<Pipeline>
public SourceBuilder<T> Source(IEnumerable<T> enumeration)
=> new (this, enumeration);

public Pipeline BuildPort()
=> Instance ??= OnBuildPort();
public Pipeline BuildPipeElement()
=> Instance ??= OnBuildPipeElement();

public Pipeline OnBuildPort()
public Pipeline OnBuildPipeElement()
=> new Pipeline();
}
6 changes: 3 additions & 3 deletions Streamistry.Core/Fluent/PluckerBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,17 @@
using Streamistry.Pipes.Mappers;

namespace Streamistry.Fluent;
internal class PluckerBuilder<TInput, TOutput> : PipeElementBuilder<TInput, TOutput>
public class PluckerBuilder<TInput, TOutput> : PipeElementBuilder<TInput, TOutput>
{
protected Expression<Func<TInput, TOutput?>> Expr { get; set; }

public PluckerBuilder(IPipeBuilder<TInput> upstream, Expression<Func<TInput, TOutput?>> expr)
: base(upstream)
=> (Expr) = (expr);

public override IChainablePort<TOutput> OnBuildPort()
public override IChainablePort<TOutput> OnBuildPipeElement()
=> new Plucker<TInput, TOutput>(
Upstream.BuildPort()
Upstream.BuildPipeElement()
, Expr ?? throw new InvalidOperationException()
);
}
4 changes: 2 additions & 2 deletions Streamistry.Core/Fluent/SinkBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
using Streamistry.Pipes.Sinks;

namespace Streamistry.Fluent;
internal class SinkBuilder<TInput>
public class SinkBuilder<TInput>
{
protected IPipeBuilder<TInput> Upstream { get; }
private Type? SinkType { get; set; }
Expand All @@ -22,7 +22,7 @@ public SinkBuilder<TInput> InMemory()

public Pipeline Build()
{
var sink = (Sink<TInput>)Activator.CreateInstance(SinkType ?? throw new InvalidOperationException(), [Upstream.BuildPort()])!;
var sink = (Sink<TInput>)Activator.CreateInstance(SinkType ?? throw new InvalidOperationException(), [Upstream.BuildPipeElement()])!;
return sink.Pipeline!;
}
}
4 changes: 2 additions & 2 deletions Streamistry.Core/Fluent/SourceBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@ internal class SourceBuilder<TOutput> : BasePipeBuilder<TOutput>
public SourceBuilder(IBuilder<Pipeline> upstream, IEnumerable<TOutput> enumeration)
=> (Enumeration, Upstream) = (enumeration, upstream);

public override IChainablePort<TOutput> OnBuildPort()
public override IChainablePort<TOutput> OnBuildPipeElement()
{
var pipeline = Upstream.BuildPort();
var pipeline = Upstream.BuildPipeElement();
var source = new EnumerableSource<TOutput>(pipeline, Enumeration);
pipeline.AddSource(source);
return source;
Expand Down
6 changes: 3 additions & 3 deletions Streamistry.Core/Fluent/SplitterBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,17 @@
using System.Threading.Tasks;

namespace Streamistry.Fluent;
internal class SplitterBuilder<TInput, TOutput> : PipeElementBuilder<TInput, TOutput>
public class SplitterBuilder<TInput, TOutput> : PipeElementBuilder<TInput, TOutput>
{
protected Func<TInput?, TOutput[]?>? Function { get; set; }

public SplitterBuilder(IPipeBuilder<TInput> upstream, Func<TInput?, TOutput[]?>? function)
: base(upstream)
=> (Function) = (function);

public override IChainablePort<TOutput> OnBuildPort()
public override IChainablePort<TOutput> OnBuildPipeElement()
=> new Splitter<TInput, TOutput>(
Upstream.BuildPort()
Upstream.BuildPipeElement()
, Function ?? throw new InvalidOperationException()
);
}
9 changes: 9 additions & 0 deletions Streamistry.SourceGenerator/BasePipeBuilder.scriban
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
public abstract partial class BasePipeBuilder<TOutput> : IPipeBuilder<TOutput>
{
public BranchesBuilder<TOutput, {{ generics | array.join ", " }}> Branch<{{ generics | array.join ", " }}>(
{{-
func concat; ret string.append "Func<BasePipeBuilder<TOutput>, BasePipeBuilder<T" $0 | string.append ">> upstream" | string.append $0; end
indexes | array.each @concat | array.join ", "
}})
=> new(this, upstream{{ indexes | array.join ", upstream" }});
}
31 changes: 31 additions & 0 deletions Streamistry.SourceGenerator/BasePipeBuilderSourceGenerator.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
using System;
using System.Collections.Generic;
using System.Text;
using Microsoft.CodeAnalysis.Text;
using Microsoft.CodeAnalysis;
using System.Resources;
using Scriban;
using System.Reflection;

namespace Streamistry.SourceGenerator;

[Generator]
public class BasePipeBuilderSourceGenerator : BaseSourceGenerator
{
public override string GenerateClasses()
{
var sb = new StringBuilder();
sb.Append(ReadTemplate("Header.scriban"))
.AppendLine()
.AppendLine("namespace Streamistry.Fluent;")
.AppendLine();

for (int i = MIN_CARDINALITY; i <= MAX_CARDINALITY; i++)
{
sb.Append(GenerateClass(Classname, i));
sb.AppendLine();
}

return sb.ToString();
}
}
Loading

0 comments on commit 0f69373

Please sign in to comment.