Skip to content

Commit

Permalink
feat: improve testability with extension methods (#54)
Browse files Browse the repository at this point in the history
  • Loading branch information
Seddryck authored Sep 3, 2024
1 parent a689343 commit 4451805
Show file tree
Hide file tree
Showing 26 changed files with 531 additions and 349 deletions.
1 change: 1 addition & 0 deletions Streamistry.Core/ChainablePipe.cs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ public void RegisterDownstream(Action<T?> downstream, Action? completion)

public void RegisterOnCompleted(Action? action)
=> Completion += action;

public void RegisterDownstream(Action<T?> action)
=> Main.RegisterDownstream(action);

Expand Down
6 changes: 3 additions & 3 deletions Streamistry.Core/Filter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,19 +13,19 @@ namespace Streamistry;
/// The output stream is composed of elements that satisfy the predicate; elements that do not satisfy the predicate are excluded from the downstream stream.
/// </summary>
/// <typeparam name="TInput">The type of the elements in both the input and output streams.</typeparam>
public class Filter<TInput> : ChainablePipe<TInput>, IProcessablePipe<TInput>
public class Filter<TInput> : BaseSingleRouterPipe<TInput, TInput>
{
public Func<TInput?, bool> Predicate { get; init; }

public Filter(IChainablePort<TInput> upstream, Func<TInput?, bool> predicate)
: base(upstream.Pipe.GetObservabilityProvider())
: base(upstream)
{
upstream.RegisterDownstream(Emit);
upstream.Pipe.RegisterOnCompleted(Complete);
Predicate = predicate;
}

public void Emit(TInput? obj)
public override void Emit(TInput? obj)
{
if (Invoke(obj))
PushDownstream(obj);
Expand Down
2 changes: 1 addition & 1 deletion Streamistry.Core/IDualRoute.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
using System.Threading.Tasks;

namespace Streamistry;
public interface IDualRoute<TMain, TAlternate> : IChainablePort<TMain>
public interface IDualRoute<TMain, TAlternate> : IChainablePort<TMain>, IProcessablePipe<TAlternate>
{
OutputPort<TMain> Main { get; }
OutputPort<TAlternate> Alternate { get; }
Expand Down
2 changes: 1 addition & 1 deletion Streamistry.Core/Pipes/Aggregators/Median.cs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public MedianState<T> Append(T? value)
var right = list.ElementAt((count) / 2);
if (left == right)
return left;
return ((left / T.CreateChecked(2)) + (right / T.CreateChecked(2)));
return ((left / T.CreateChecked(2)) + (right / T.CreateChecked(2)) + (T.IsOddInteger(left) && T.IsOddInteger(right) ? T.One : T.Zero));
}

public static readonly MedianState<T> @Default = new();
Expand Down
16 changes: 13 additions & 3 deletions Streamistry.Core/SingleRouterPipe.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,27 @@
using Streamistry.Observability;

namespace Streamistry;
public abstract class SingleRouterPipe<TInput, TOutput> : ChainablePipe<TOutput>, IProcessablePipe<TInput>
public abstract class BaseSingleRouterPipe<TInput, TOutput> : ChainablePipe<TOutput>, IProcessablePipe<TInput>
{
protected SingleRouterPipe(IChainablePort<TInput> upstream)
protected BaseSingleRouterPipe(IChainablePort<TInput> upstream)
: base(upstream.Pipe.GetObservabilityProvider())
{
upstream.RegisterDownstream(Emit);
upstream.Pipe.RegisterOnCompleted(Complete);
}

[Meter]
public void Emit(TInput? obj)
public abstract void Emit(TInput? obj);
}

public abstract class SingleRouterPipe<TInput, TOutput> : BaseSingleRouterPipe<TInput, TOutput>
{
protected SingleRouterPipe(IChainablePort<TInput> upstream)
: base(upstream)
{ }

[Meter]
public override void Emit(TInput? obj)
{
var value = Invoke(obj);
PushDownstream(value);
Expand Down
8 changes: 3 additions & 5 deletions Streamistry.Core/Splitter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,20 +13,18 @@ namespace Streamistry;
/// </summary>
/// <typeparam name="TInput">The type of the elements in the input stream.</typeparam>
/// <typeparam name="TOutput">The type of the elements in the output stream after the function is applied.</typeparam>
public class Splitter<TInput, TOutput> : ChainablePipe<TOutput>, IProcessablePipe<TInput>
public class Splitter<TInput, TOutput> : BaseSingleRouterPipe<TInput, TOutput>
{
public Func<TInput?, TOutput[]?> Function { get; init; }

public Splitter(IChainablePort<TInput> upstream, Func<TInput?, TOutput[]?> function)
: base(upstream.Pipe.GetObservabilityProvider())
: base(upstream)
{
upstream.RegisterDownstream(Emit);
upstream.Pipe.RegisterOnCompleted(Complete);
Function = function;
}

[Meter]
public void Emit(TInput? obj)
public override void Emit(TInput? obj)
{
var results = Invoke(obj);
if (results is null)
Expand Down
151 changes: 151 additions & 0 deletions Streamistry.Core/Testability/PipeTestingExtensions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace Streamistry.Testability;
public static class PipeTestingExtensions
{
public static TOutput? EmitAndGetOutput<TInput, TOutput>(this BaseSingleRouterPipe<TInput, TOutput> pipe, TInput value)
{
TOutput? result = default;
pipe.RegisterDownstream(x => result = x);
pipe.Emit(value);
return result;
}

public static bool EmitAndAnyOutput<TInput, TOutput>(this BaseSingleRouterPipe<TInput, TOutput> pipe, TInput value)
{
var anyResult = false;
pipe.RegisterDownstream(x => anyResult = true);
pipe.Emit(value);
return anyResult;
}

public static TOutput?[] EmitAndGetManyOutputs<TInput, TOutput>(this BaseSingleRouterPipe<TInput, TOutput> pipe, TInput value)
{
var results = new List<TOutput?>();
pipe.RegisterDownstream(results.Add);
pipe.Emit(value);
return [.. results];
}

public static TOutput? EmitAndGetOutput<TInput, TOutput>(this IProcessablePipe<TInput> input, TInput value, IChainablePipe<TOutput> output)
{
TOutput? result = default;
output.RegisterDownstream(x => result = x);
input.Emit(value);
return result;
}

public static bool EmitAndAnyOutput<TInput, TOutput>(this IProcessablePipe<TInput> input, TInput value, IChainablePipe<TOutput> output)
{
var anyResult = false;
output.RegisterDownstream(x => anyResult = true);
input.Emit(value);
return anyResult;
}

public static TOutput? EmitAndGetOutput<TInput, TOutput>(this IDualRoute<TOutput, TInput> input, TInput value)
{
TOutput? result = default;
input.Main.RegisterDownstream(x => result = x);
input.Emit(value);
return result;
}

public static bool EmitAndAnyOutput<TInput, TOutput>(this IDualRoute<TOutput, TInput> input, TInput value)
{
var anyResult = false;
input.Main.RegisterDownstream(x => anyResult = true);
input.Emit(value);
return anyResult;
}

public static TInput? EmitAndGetAlternateOutput<TInput, TOutput>(this IDualRoute<TOutput, TInput> input, TInput value)
{
TInput? result = default;
input.Alternate.RegisterDownstream(x => result = x);
input.Emit(value);
return result;
}

public static bool EmitAndAnyAlternateOutput<TInput, TOutput>(this IDualRoute<TOutput, TInput> input, TInput value)
{
var anyResult = false;
input.Alternate.RegisterDownstream(x => anyResult = true);
input.Emit(value);
return anyResult;
}

public static object?[] EmitAndGetOutputs<TInput>(this IProcessablePipe<TInput> input, TInput value, IChainablePipe[] outputs)
{
var result = new List<object?>();
foreach (var output in outputs)
output.RegisterDownstreamIfPossible(x => result.Add(x));

input.Emit(value);
return [.. result];
}

public static bool[] EmitAndAnyOutputs<TInput>(this IProcessablePipe<TInput> input, TInput value, IChainablePipe[] outputs)
{
var results = new bool[outputs.Length];

for (var i = 0; i < outputs.Length; i++)
{
var index = i; // Capture the current index for the lambda
outputs[i].RegisterDownstreamIfPossible(x => results[index] = true);
}

input.Emit(value);
return results;
}

public static TOutput?[] GetOutputs<TOutput>(this IChainablePort<TOutput> output, Action action)
{
var outputs = new List<TOutput?>();
output.RegisterDownstream(outputs.Add);
action.Invoke();
return [.. outputs];
}

public static void RegisterDownstreamIfPossible(this IChainablePipe pipe, Action<object?> action)
{
// Get the type of the pipe object
var pipeType = pipe.GetType();

// Find the IChainablePipe<T> interface implemented by this object
var chainablePipeInterface = pipeType.GetInterfaces()
.FirstOrDefault(i => i.IsGenericType && i.GetGenericTypeDefinition() == typeof(IChainablePipe<>));

if (chainablePipeInterface != null)
{
// Get the generic argument T of IChainablePipe<T>
var genericArgumentType = chainablePipeInterface.GetGenericArguments()[0];

// Create a new Action<T?> that wraps the original Action<object?>
var method = typeof(PipeTestingExtensions)
.GetMethod(nameof(ConvertAction), System.Reflection.BindingFlags.NonPublic | System.Reflection.BindingFlags.Static)!
.MakeGenericMethod(genericArgumentType);

var genericAction = method.Invoke(null, [action]);

// Get the RegisterDownstream method of IChainablePipe<T>
var registerMethod = chainablePipeInterface.GetMethod("RegisterDownstream", [genericAction!.GetType(), typeof(Action)]);

// Invoke RegisterDownstream with the dynamic Action<T?>
registerMethod?.Invoke(pipe, [genericAction, null]);
}
else
{
Console.WriteLine("The object does not implement IChainablePipe<T>.");
}
}

private static Action<T?> ConvertAction<T>(Action<object?> action)
{
return obj => action(obj);
}
}
14 changes: 5 additions & 9 deletions Streamistry.Testing/AggregatorTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@
using Newtonsoft.Json.Converters;
using NUnit.Framework;
using Streamistry.Pipes.Aggregators;
using Streamistry.Pipes.Sinks;
using Streamistry.Pipes.Sources;
using Streamistry.Testability;

namespace Streamistry.Testing;
public class AggregatorTests
Expand All @@ -22,14 +22,12 @@ public void Completion_ResetItself_AggregatorReset()
var union = new Union<int>([firstSource, secondSource]);
var aggregator = new Sum<int>(union, x => x.Reset());
secondSource.WaitOnCompleted(aggregator);
var sink = new MemorySink<int>(aggregator);
pipeline.Start();

Assert.That(sink.State, Has.Count.EqualTo(6));
Assert.Multiple(() =>
{
var results = aggregator.GetOutputs(pipeline.Start);
foreach (var value in (int[])[1, 3, 6, -1, -3, 2])
Assert.That(sink.State, Does.Contain(value));
Assert.That(results, Does.Contain(value));
});
}

Expand All @@ -43,14 +41,12 @@ public void Completion_AdditionalEmit_EmitInThePipeline()
var union = new Union<int>([firstSource, secondSource]);
var aggregator = new Sum<int>(union, x => x.Emit(-x.State));
secondSource.WaitOnCompleted(aggregator);
var sink = new MemorySink<int>(aggregator);
pipeline.Start();

Assert.That(sink.State, Has.Count.EqualTo(8));
Assert.Multiple(() =>
{
var results = aggregator.GetOutputs(pipeline.Start);
foreach (var value in (int[])[1, 3, 6, 0, -1, -3, 2, 0])
Assert.That(sink.State, Does.Contain(value));
Assert.That(results, Does.Contain(value));
});
}
}
64 changes: 20 additions & 44 deletions Streamistry.Testing/ExceptionRouteMapperTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
using NUnit.Framework;
using Streamistry;
using Streamistry.Pipes.Sinks;
using Streamistry.Testability;

namespace Streamistry.Testing;
public class ExceptionRouterMapperTests
Expand All @@ -15,63 +16,38 @@ public void Emit_ValidData_MainOnly()
{
var pipeline = new Pipeline<int>();
var mapper = new ExceptionMapper<int, int>(pipeline, x => 60 / x);
var mainSink = new MemorySink<int>(mapper);
var exceptionSink = new MemorySink<int>(mapper.Alternate);
pipeline.Emit(10);
pipeline.Emit(20);
pipeline.Emit(6);

Assert.That(mainSink.State.Count, Is.EqualTo(3));
Assert.That(mainSink.State.First, Is.EqualTo(6));
Assert.That(mainSink.State.Last, Is.EqualTo(10));
Assert.That(exceptionSink.State.Count, Is.EqualTo(0));
Assert.Multiple(() =>
{
Assert.That(mapper.EmitAndGetOutput(10), Is.EqualTo(6));
Assert.That(mapper.EmitAndGetOutput(20), Is.EqualTo(3));
Assert.That(mapper.EmitAndGetOutput(6), Is.EqualTo(10));
});
}

[Test]
public void Emit_InvalidData_ExceptionOnly()
{
var pipeline = new Pipeline<int>();
var mapper = new ExceptionMapper<int, int>(pipeline, x => 60 / x);
var mainSink = new MemorySink<int>(mapper);
var exceptionSink = new MemorySink<int>(mapper.Alternate);
pipeline.Emit(0);

Assert.That(mainSink.State.Count, Is.EqualTo(0));
Assert.That(exceptionSink.State.Count, Is.EqualTo(1));
Assert.That(exceptionSink.State.First, Is.EqualTo(0));
}

[Test]
public void Emit_MixedDataNoExceptionPath_DontFail()
{
var pipeline = new Pipeline<int>();
var mapper = new ExceptionMapper<int, int>(pipeline, x => 60 / x);
var mainSink = new MemorySink<int>(mapper);
pipeline.Emit(10);
pipeline.Emit(0);
pipeline.Emit(3);

Assert.That(mainSink.State.Count, Is.EqualTo(2));
Assert.That(mainSink.State.First, Is.EqualTo(6));
Assert.That(mainSink.State.Last, Is.EqualTo(20));
Assert.Multiple(() =>
{
Assert.That(mapper.EmitAndAnyOutput(0), Is.False);
Assert.That(mapper.EmitAndAnyAlternateOutput(0), Is.True);
Assert.That(mapper.EmitAndGetAlternateOutput(0), Is.EqualTo(0));
});
}

[Test]
public void Emit_MixedDataWithExceptionPath_DontFail()
public void Emit_MixedData_Successful()
{
var pipeline = new Pipeline<int>();
var mapper = new ExceptionMapper<int, int>(pipeline, x => 60 / x);
var mainSink = new MemorySink<int>(mapper);
var exceptionSink = new MemorySink<int>(mapper.Alternate);
pipeline.Emit(10);
pipeline.Emit(0);
pipeline.Emit(3);

Assert.That(mainSink.State.Count, Is.EqualTo(2));
Assert.That(mainSink.State.First, Is.EqualTo(6));
Assert.That(mainSink.State.Last, Is.EqualTo(20));
Assert.That(exceptionSink.State.Count, Is.EqualTo(1));
Assert.That(exceptionSink.State.First, Is.EqualTo(0));
Assert.Multiple(() =>
{
Assert.That(mapper.EmitAndGetOutput(10), Is.EqualTo(6));
Assert.That(mapper.EmitAndGetAlternateOutput(0), Is.EqualTo(0));
Assert.That(mapper.EmitAndGetOutput(3), Is.EqualTo(20));
});
}

[Test]
Expand Down
Loading

0 comments on commit 4451805

Please sign in to comment.