Skip to content

Commit

Permalink
feat: checkpoints for branches (#90)
Browse files Browse the repository at this point in the history
* feat: branch fluent method is featuring a checkpoints helper
* feat: support interception of more than a single checkpoint
* test: assert that we can create branches within a branch
* refactor: remove unused code in ParserBuilder
* feat: bind and unbind a pipe to a pipeline
  • Loading branch information
Seddryck authored Sep 21, 2024
1 parent c70e1c2 commit 71a9ad0
Show file tree
Hide file tree
Showing 8 changed files with 130 additions and 8 deletions.
3 changes: 3 additions & 0 deletions Streamistry.Core/ChainablePipe.cs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@ public void RegisterOnCompleted(Action? action)
public void RegisterDownstream(Action<T?> action)
=> Main.RegisterDownstream(action);

public void UnregisterDownstream(Action<T?> downstream)
=> Main.UnregisterDownstream(downstream);

protected void PushDownstream(T? obj)
=> Main.PushDownstream(obj);

Expand Down
13 changes: 12 additions & 1 deletion Streamistry.Core/DualRouterPipe.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
using Streamistry.Observability;

namespace Streamistry;
public abstract class DualRouterPipe<TInput, TOutput> : ChainablePipe<TOutput>, IDualRoute<TOutput, TInput>
public abstract class DualRouterPipe<TInput, TOutput> : ChainablePipe<TOutput>, IDualRoute<TOutput, TInput>, IBindablePipe<TInput>
{
public OutputPort<TInput> Alternate { get; }
public new OutputPort<TOutput> Main { get => base.Main; }
Expand All @@ -22,4 +22,15 @@ public DualRouterPipe(IChainablePort<TInput>? upstream)

[Meter]
public abstract void Emit(TInput? obj);

public void Bind(IChainablePort<TInput> input)
{
input.RegisterDownstream(Emit);
Pipeline = input.Pipe.Pipeline;
}

public void Unbind(IChainablePort<TInput> input)
{
input.UnregisterDownstream(Emit);
}
}
1 change: 1 addition & 0 deletions Streamistry.Core/IChainablePipe.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ public interface IChainablePipe : IObservablePipe
public interface IChainablePort<T> : IChainablePort
{
void RegisterDownstream(Action<T?> action);
void UnregisterDownstream(Action<T?> downstream);
}

public interface IChainablePort
Expand Down
7 changes: 7 additions & 0 deletions Streamistry.Core/IProcessablePipe.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,13 @@
using System.Threading.Tasks;

namespace Streamistry;

public interface IBindablePipe<T>
{
void Bind(IChainablePort<T> input);
void Unbind(IChainablePort<T> input);
}

public interface IProcessablePipe<T>
{
void Emit(T? obj);
Expand Down
14 changes: 10 additions & 4 deletions Streamistry.Core/OuputPort.cs → Streamistry.Core/OutputPort.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,23 +5,29 @@
using System.Threading.Tasks;

namespace Streamistry;

public class OutputPort<T> : IChainablePort<T>
{
private Action<T?>? Downstream { get; set; }

public void RegisterDownstream(Action<T?> downstream)
=> Downstream += downstream;

public string Name { get; init; }

public IChainablePipe Pipe { get; init; }


public OutputPort(IChainablePipe pipe, string name)
=> (Name, Pipe) = (name, pipe);

public void RegisterDownstream(Action<T?> downstream)
=> Downstream += downstream;

public void UnregisterDownstream(Action<T?> downstream)
=> Downstream -= downstream;

public void PushDownstream(T? obj)
=> Downstream?.Invoke(obj);

public Action<T?>[] GetDownstreamInvocations()
=> Downstream?.GetInvocationList().Cast<Action<T?>>().ToArray() ?? Array.Empty<Action<T?>>();
}

public class MainOutputPort<T> : OutputPort<T>
Expand Down
13 changes: 12 additions & 1 deletion Streamistry.Core/SingleRouterPipe.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
using Streamistry.Observability;

namespace Streamistry;
public abstract class BaseSingleRouterPipe<TInput, TOutput> : ChainablePipe<TOutput>, IProcessablePipe<TInput>
public abstract class BaseSingleRouterPipe<TInput, TOutput> : ChainablePipe<TOutput>, IProcessablePipe<TInput>, IBindablePipe<TInput>
{
protected BaseSingleRouterPipe(IChainablePort<TInput>? upstream)
: base(upstream?.Pipe)
Expand All @@ -18,6 +18,17 @@ protected BaseSingleRouterPipe(IChainablePort<TInput>? upstream)

[Meter]
public abstract void Emit(TInput? obj);

public void Bind(IChainablePort<TInput> input)
{
input.RegisterDownstream(Emit);
Pipeline = input.Pipe.Pipeline;
}

public void Unbind(IChainablePort<TInput> input)
{
input.UnregisterDownstream(Emit);
}
}

public abstract class SingleRouterPipe<TInput, TOutput> : BaseSingleRouterPipe<TInput, TOutput>
Expand Down
14 changes: 12 additions & 2 deletions Streamistry.Core/Sink.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,10 @@
using Streamistry.Observability;

namespace Streamistry;
public class Sink<T> : ObservablePipe, IProcessablePipe<T>
public class Sink<T> : ObservablePipe, IProcessablePipe<T>, IBindablePipe<T>
{
public Action<T?> Action { get; }
public Pipeline? Pipeline { get; }
public Pipeline? Pipeline { get; private set; }

public Sink(IChainablePort<T> upstream, Action<T?> function)
:this(function, upstream)
Expand All @@ -30,4 +30,14 @@ public void Emit(T? obj)
[Trace]
protected void Invoke(T? obj)
=> Action.Invoke(obj);
public void Bind(IChainablePort<T> input)
{
input.RegisterDownstream(Emit);
Pipeline = input.Pipe.Pipeline;
}

public void Unbind(IChainablePort<T> input)
{
input.UnregisterDownstream(Emit);
}
}
73 changes: 73 additions & 0 deletions Streamistry.Testing/PipelineTests.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using NUnit.Framework;
using Streamistry.Pipes.Sources;
using Streamistry.Testability;

namespace Streamistry.Testing;
public class PipelineTests
{
[Test]
public void Start_StraightPipeline_Successful()
{
var source = new EnumerableSource<int>([0, 1, 10]);
var pipeline = new Pipeline(source);
var incrementMap = new Mapper<int, int>(source, x => ++x);

var outputs = incrementMap.GetOutputs(pipeline.Start);
Assert.That(outputs, Does.Contain(1));
Assert.That(outputs, Does.Contain(2));
Assert.That(outputs, Does.Contain(11));
}

[Test]
public void Start_BindSegment_Successful()
{
var source = new EnumerableSource<int>([0, 1, 10]);
var pipeline = new Pipeline(source);
var incrementMap = new Mapper<int, int>(source, x => ++x);

var doubleMap = new Mapper<int, int>(x => x * 2);
var decrementMap = new Mapper<int, int>(doubleMap, x => --x);
doubleMap.Bind(incrementMap.Main);

var outputs = decrementMap.GetOutputs(pipeline.Start);
Assert.That(outputs, Does.Contain(1));
Assert.That(outputs, Does.Contain(3));
Assert.That(outputs, Does.Contain(21));
}

[Test]
public void Start_UnbindSegment_Successful()
{
var source = new EnumerableSource<int>([0, 1, 10]);
var pipeline = new Pipeline(source);
var incrementMap = new Mapper<int, int>(source, x => ++x);
var doubleMap = new Mapper<int, int>(incrementMap, x => x * 2);
var decrementMap = new Mapper<int, int>(doubleMap, x => --x);
doubleMap.Unbind(incrementMap.Main);

var outputs = doubleMap.GetOutputs(pipeline.Start);
Assert.That(outputs, Is.Empty);
}

[Test]
public void Start_UnbindAndBindSegment_Successful()
{
var source = new EnumerableSource<int>([0, 1, 10]);
var pipeline = new Pipeline(source);
var incrementMap = new Mapper<int, int>(source, x => ++x);
var doubleMap = new Mapper<int, int>(incrementMap, x => x * 2);
var decrementMap = new Mapper<int, int>(doubleMap, x => --x);
doubleMap.Unbind(incrementMap.Main);
decrementMap.Bind(incrementMap.Main);

var outputs = decrementMap.GetOutputs(pipeline.Start);
Assert.That(outputs, Does.Contain(0));
Assert.That(outputs, Does.Contain(1));
Assert.That(outputs, Does.Contain(10));
}
}

0 comments on commit 71a9ad0

Please sign in to comment.