Skip to content

Commit

Permalink
feat: combination of more than 2 streams (#72)
Browse files Browse the repository at this point in the history
  • Loading branch information
Seddryck authored Sep 15, 2024
1 parent bc7a6c2 commit 7a09ade
Show file tree
Hide file tree
Showing 14 changed files with 360 additions and 116 deletions.
71 changes: 0 additions & 71 deletions Streamistry.Core/Combinator.cs

This file was deleted.

41 changes: 0 additions & 41 deletions Streamistry.Core/Pipes/Combinators/Zipper.cs

This file was deleted.

6 changes: 5 additions & 1 deletion Streamistry.Core/Streamistry.csproj
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
<Project Sdk="Microsoft.NET.Sdk">
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<PackageId>Streamistry</PackageId>
Expand All @@ -21,6 +21,10 @@
<PackageReference Include="Microsoft.Extensions.FileSystemGlobbing" Version="8.0.0" />
</ItemGroup>

<ItemGroup>
<ProjectReference Include="..\Streamistry.SourceGenerator\Streamistry.SourceGenerator.csproj" OutputItemType="Analyzer" ReferenceOutputAssembly="false" />
</ItemGroup>

<ItemGroup>
<PackageReference Update="DotNet.ReproducibleBuilds" Version="1.2.25">
<PrivateAssets>all</PrivateAssets>
Expand Down
3 changes: 2 additions & 1 deletion Streamistry.Json/ObjectPropertyAppender.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
using System.Threading.Tasks;
using Json.More;
using Json.Path;
using Streamistry.Pipes.Combinators;


namespace Streamistry.Json;
Expand All @@ -15,7 +16,7 @@ public class ObjectPropertyAppender<TInputMain, TInputSecondary> : Zipper<TInput
where TInputSecondary : JsonNode
{

public ObjectPropertyAppender(IChainablePort<TInputMain> mainUpstream, IChainablePort<TInputSecondary> secondUpstream, string path)
public ObjectPropertyAppender(IChainablePort<TInputMain?> mainUpstream, IChainablePort<TInputSecondary?> secondUpstream, string path)
: base(mainUpstream, secondUpstream, (x, y) => AppendProperty(x, y, path))
{ }

Expand Down
72 changes: 72 additions & 0 deletions Streamistry.SourceGenerator/BaseSourceGenerator.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
using System;
using System.Collections.Generic;
using System.Reflection;
using System.Text;
using Microsoft.CodeAnalysis;
using Microsoft.CodeAnalysis.Text;
using Scriban;

namespace Streamistry.SourceGenerator;
public abstract class BaseSourceGenerator : ISourceGenerator
{
public const int MIN_CARDINALITY = 2;
public const int MAX_CARDINALITY = 7;

public string Classname
{
get
{
var classname = GetType().Name;
return classname.Substring(0, classname.Length - 15);
}
}

public void Execute(GeneratorExecutionContext context)
{
var source = GenerateClasses();
context.AddSource($"{Classname}.gen.cs", SourceText.From(source, Encoding.UTF8));
}

public void Initialize(GeneratorInitializationContext context)
{ }

public abstract string GenerateClasses();

public string ReadTemplate(string resourceName)
{
var assembly = Assembly.GetExecutingAssembly();
string fullResourceName = $"Streamistry.SourceGenerator.{resourceName}";

using (var stream = assembly.GetManifestResourceStream(fullResourceName))
{
if (stream is null)
throw new FileNotFoundException("Embedded resource not found.", fullResourceName);

using (var reader = new StreamReader(stream))
return reader.ReadToEnd();
}
}

public string GenerateClass(string classname, int numGenerics)
{
var templateText = ReadTemplate($"{classname}.scriban");
var template = Template.Parse(templateText);

var generics = new List<string>();
var indexes = new List<int>();
for (int i = 1; i <= numGenerics; i++)
{
generics.Add($"T{i}");
indexes.Add(i);
}

var model = new
{
generics = generics.ToArray(),
indexes = indexes.ToArray()
};

string result = template.Render(model);
return result;
}
}
80 changes: 80 additions & 0 deletions Streamistry.SourceGenerator/Combinator.scriban
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
/// <summary>
/// Represents a pipeline element that merges multiple upstream streams into a single downstream stream by applying a specified mapping function to corresponding values from each upstream.
/// The output stream type is determined by the result of the mapping function applied to the input elements.
/// </summary>
{{~ for type in generics ~}}
/// <typeparam name="{{type}}">The type of the elements in the input stream {{ for.index + 1 }}.</typeparam>
{{~ end ~}}
/// <typeparam name="TResult">The type of the elements in the output stream, determined by the result of the mapping function.</typeparam>
public abstract partial class Combinator<{{ generics | array.join ", " }}, TResult > : ChainablePipe<TResult>
{
public Func<{{ generics | array.join "?, " }}?, TResult?> Function { get; init; }
protected int BranchesCompleted { get; set; }
{{~ for type in generics ~}}
protected IChainablePort<{{ type }}?> Upstream{{ for.index + 1 }} { get; }
{{~ end ~}}

public Combinator(
{{~ for type in generics ~}}
IChainablePort<{{ type }}?> upstream{{ for.index + 1 }},
{{~ end ~}}
Func<{{ generics | array.join "?, " }}?, TResult?> function)
: base(upstream1.Pipe)
{
{{~ for type in generics ~}}
upstream{{ for.index + 1 }}.RegisterDownstream(Emit{{ for.index + 1 }});
upstream{{ for.index + 1 }}.Pipe.RegisterOnCompleted(Complete);
Upstream{{ for.index + 1 }} = upstream{{ for.index + 1 }};
{{~ end ~}}
Function = function;
}

{{~ for type in generics ~}}
public void Emit{{ for.index + 1 }}({{ type }}? value)
{
Queue(Upstream{{for.index + 1}}, value);
if (TryGetElements(
{{-
func concat; ret string.append "out var item" $0; end
indexes | array.each @concat | array.join ", "
}}))
PushDownstream(Invoke(
{{-
func concat; ret string.append "item" $0; end
indexes | array.each @concat | array.join ", "
}}));
}
{{~ end ~}}

public override void Complete()
{
BranchesCompleted += 1;
if (BranchesCompleted >= {{ generics.size }})
{
BranchesCompleted = 0;
PushComplete();
}
}

[Trace]
protected TResult? Invoke(
{{-
func concat; ret string.append "T" $0 | string.append "? item" | string.append $0; end
indexes | array.each @concat | array.join ", "
-}}
)
=> Function.Invoke(
{{-
func concat; ret string.append "item" $0; end
indexes | array.each @concat | array.join ", "
-}}
);

protected abstract bool TryGetElements(
{{-
func concat; ret string.append "out T" $0 | string.append "? item" | string.append $0; end
indexes | array.each @concat | array.join ", "
}});

protected abstract void Queue<T>(IChainablePort<T?> upstream, T? value);
}
28 changes: 28 additions & 0 deletions Streamistry.SourceGenerator/CombinatorSourceGenerator.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
using System;
using System.Collections.Generic;
using System.Text;
using Microsoft.CodeAnalysis.Text;
using Microsoft.CodeAnalysis;
using System.Resources;
using Scriban;
using System.Reflection;

namespace Streamistry.SourceGenerator;

[Generator]
public class CombinatorSourceGenerator : BaseSourceGenerator
{
public override string GenerateClasses()
{
var sb = new StringBuilder();
sb.Append(ReadTemplate("Header.scriban")).AppendLine();

for (int i = MIN_CARDINALITY; i <= MAX_CARDINALITY; i++)
{
sb.Append(GenerateClass(Classname, i));
sb.AppendLine();
}

return sb.ToString();
}
}
9 changes: 9 additions & 0 deletions Streamistry.SourceGenerator/Header.scriban
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Streamistry.Observability;

namespace Streamistry;
36 changes: 36 additions & 0 deletions Streamistry.SourceGenerator/Streamistry.SourceGenerator.csproj
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<TargetFrameworks>netstandard2.0</TargetFrameworks>
<OutputType>Library</OutputType>
<EnforceExtendedAnalyzerRules>true</EnforceExtendedAnalyzerRules>
</PropertyGroup>

<ItemGroup>
<None Remove="Combinator.scriban" />
<None Remove="Header.scriban" />
<None Remove="Zipper.scriban" />
</ItemGroup>

<ItemGroup>
<EmbeddedResource Include="Zipper.scriban" />
<EmbeddedResource Include="Header.scriban" />
<EmbeddedResource Include="Combinator.scriban" />
</ItemGroup>

<ItemGroup>
<PackageReference Include="Microsoft.CodeAnalysis.CSharp" Version="4.9.2" PrivateAssets="all" />
<PackageReference Include="Microsoft.CodeAnalysis.Analyzers" Version="3.3.4" PrivateAssets="all" />
<PackageReference Include="Scriban" Version="5.10.0" GeneratePathProperty="true" PrivateAssets="all" />
</ItemGroup>
<PropertyGroup>
<GetTargetPathDependsOn>$(GetTargetPathDependsOn);GetDependencyTargetPaths</GetTargetPathDependsOn>
</PropertyGroup>

<Target Name="GetDependencyTargetPaths">
<ItemGroup>
<TargetPathWithTargetPlatformMoniker Include="$(PKGScriban)\lib\netstandard2.0\Scriban.dll" IncludeRuntimeDependency="false" />
</ItemGroup>
</Target>

</Project>
Loading

0 comments on commit 7a09ade

Please sign in to comment.