Skip to content

Commit

Permalink
bugfix: support events and add basic test cases. (#12)
Browse files Browse the repository at this point in the history
  • Loading branch information
DingpingZhang committed Nov 15, 2021
1 parent cf1b5f4 commit c81663d
Show file tree
Hide file tree
Showing 7 changed files with 132 additions and 87 deletions.
27 changes: 22 additions & 5 deletions HandyIpc.Generator/ClientProxy.cs
Original file line number Diff line number Diff line change
Expand Up @@ -32,19 +32,36 @@ public class {nameof(ClientProxy)}{className}{typeParameters} : {interfaceType}
private readonly string _key;
private readonly AwaiterManager _awaiterManager;
{events.For(item => $@"private event {item.Type.ToTypeDeclaration()} _{item.Name};")}
{events.For(item =>
{
var eSymbol = ((INamedTypeSymbol)item.Type).DelegateInvokeMethod!.Parameters[1];
string eType = eSymbol.Type.ToTypeDeclaration();
return $@"
public event {item.Type.ToTypeDeclaration()} {item.Name}
{{
add => _awaiterManager.Subscribe(""{item.Name}"", value.GetHashCode(), args =>
add
{{
if (_{item.Name} == null)
{{
_awaiterManager.Subscribe(""{item.Name}"", args =>
{{
var e = ({eType})_serializer.Deserialize(args, typeof({eType}));
_{item.Name}?.Invoke(this, e);
}});
}}
_{item.Name} += value;
}}
remove
{{
var e = ({eType})_serializer.Deserialize(args, typeof({eType}));
value(this, e);
}});
remove => _awaiterManager.Unsubscribe(""{item.Name}"", value.GetHashCode());
_{item.Name} -= value;
if (_{item.Name} == null)
{{
_awaiterManager.Unsubscribe(""{item.Name}"");
}}
}}
}}
";
})}
Expand Down
70 changes: 59 additions & 11 deletions HandyIpc.Tests/EventTypeTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,24 +20,72 @@ public EventTypeTest(NamedPipeFixture namedPipeFixture, SocketFixture socketFixt
}

[Fact]
public void TestEventHandler()
public void TestEventHandlerWithSocket()
{
int count = 0;
var instance = _socketFixture.Client.Resolve<IEventType>();
instance.Changed += Instance_Changed;
instance.Changed += (sender, e) => count++;

instance.RaiseChanged(EventArgs.Empty);
instance.RaiseChanged(EventArgs.Empty);
instance.RaiseChanged(EventArgs.Empty);
instance.RaiseChanged(EventArgs.Empty);
TestEventHandlerSubscribeAndUnsubscribe(instance);
}

Assert.Equal(4, count);
[Fact]
public void TestEventHandlerWithNamedPipe()
{
var instance = _namedPipeFixture.Client.Resolve<IEventType>();
TestEventHandlerSubscribeAndUnsubscribe(instance);
}

private void Instance_Changed(object? sender, EventArgs e)
private void TestEventHandlerSubscribeAndUnsubscribe(IEventType instance)
{
int count1 = 0;
int count2 = 0;
int count3 = 0;

// ReSharper disable AccessToModifiedClosure
void Handler1(object? _, EventArgs e) => count1++;
EventHandler handler2 = (_, _) => count2++;
EventHandler handler3 = (_, _) => count3++;
// ReSharper restore AccessToModifiedClosure

instance.Changed += Handler1;
instance.Changed += handler2;
instance.Changed += handler3;

for (int i = 0; i < 10; i++)
{
instance.RaiseChanged(EventArgs.Empty);
Assert.Equal(i + 1, count1);
Assert.Equal(i + 1, count2);
Assert.Equal(i + 1, count3);
}

count1 = 0;
count2 = 0;
count3 = 0;

instance.Changed -= Handler1;
instance.Changed -= handler2;
instance.Changed -= handler3;

for (int i = 0; i < 10; i++)
{
instance.RaiseChanged(EventArgs.Empty);
Assert.Equal(0, count1);
Assert.Equal(0, count2);
Assert.Equal(0, count3);
}

instance.Changed += Handler1;
instance.Changed += Handler1;
instance.Changed += handler2;
instance.Changed += handler2;
instance.Changed += handler3;

for (int i = 0; i < 10; i++)
{
instance.RaiseChanged(EventArgs.Empty);
Assert.Equal(2 * (i + 1), count1);
Assert.Equal(2 * (i + 1), count2);
Assert.Equal(i + 1, count3);
}
}
}
}
5 changes: 5 additions & 0 deletions HandyIpc.Tests/Implementations/EventType.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System;
using System.ComponentModel;
using HandyIpcTests.Interfaces;

namespace HandyIpcTests.Implementations
Expand All @@ -7,6 +8,10 @@ internal class EventType : IEventType
{
public event EventHandler? Changed;

public event EventHandler<string>? EventWithArgs;

public event PropertyChangedEventHandler? PropertyChanged;

public void RaiseChanged(EventArgs e)
{
Changed?.Invoke(this, e);
Expand Down
5 changes: 5 additions & 0 deletions HandyIpc.Tests/Interfaces/IEventType.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System;
using System.ComponentModel;
using HandyIpc;

namespace HandyIpcTests.Interfaces
Expand All @@ -8,6 +9,10 @@ public interface IEventType
{
event EventHandler Changed;

event EventHandler<string> EventWithArgs;

event PropertyChangedEventHandler PropertyChanged;

public void RaiseChanged(EventArgs e);
}
}
95 changes: 32 additions & 63 deletions HandyIpc/Core/AwaiterManager.cs
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;

namespace HandyIpc.Core
Expand All @@ -20,91 +18,62 @@ public AwaiterManager(string key, Sender sender, ISerializer serializer)
_serializer = serializer;
}

public void Subscribe(string name, int handlerId, Action<byte[]> callback)
public void Subscribe(string name, Action<byte[]> callback)
{
Awaiter awaiter = _pool.GetOrAdd(name, _ => new Awaiter());
lock (awaiter.Locker)
{
if (awaiter.Handlers.Count == 0)
{
RentedValue<IConnection> rented = _sender.ConnectionPool.Rent();
IConnection connection = rented.Value;
connection.Write(Subscription.Add(_key, name, _serializer));
byte[] addResult = connection.Read();
if (!addResult.IsUnit())
{
// TODO: Use exact exception.
throw new InvalidOperationException();
}
IConnection connection = _sender.ConnectionPool.Rent().Value;
Awaiter awaiter = _pool.GetOrAdd(name, _ => new Awaiter(callback, connection));

Task.Run(() => LoopWait(rented, name, awaiter, awaiter.Source.Token));
}

awaiter.Handlers[handlerId] = callback;
byte[] addResult = connection.Invoke(Subscription.Add(_key, name, _serializer));
if (!addResult.IsUnit())
{
// TODO: Use exact exception.
throw new InvalidOperationException();
}

Task.Run(() => LoopWait(awaiter));
}

public void Unsubscribe(string name, int handlerId)
public void Unsubscribe(string name)
{
if (!_pool.TryGetValue(name, out Awaiter awaiter))
{
return;
}

lock (awaiter.Locker)
if (_pool.TryRemove(name, out _))
{
awaiter.Handlers.Remove(handlerId);
if (awaiter.Handlers.Count == 0)
using var rented = _sender.ConnectionPool.Rent();
byte[] removeResult = rented.Value.Invoke(Subscription.Remove(_key, name, _serializer));
if (!removeResult.IsUnit())
{
_pool.TryRemove(name, out _);
awaiter.Source.Cancel();
// TODO: Logging.
}
}
}

private async Task LoopWait(RentedValue<IConnection> rented, string name, Awaiter awaiter, CancellationToken token)
private static void LoopWait(Awaiter awaiter)
{
using (rented)
using IConnection connection = awaiter.Connection;
while (true)
{
IConnection connection = rented.Value;
while (!token.IsCancellationRequested)
// Will blocked until accepted a notification.
byte[] input = connection.Read();
if (input.IsEmpty())
{
// Will blocked until accepted a notification.
byte[] input = await connection.ReadAsync(token);
lock (awaiter.Locker)
{
foreach (var handler in awaiter.Handlers.Values)
{
try
{
handler(input);
}
catch
{
// ignored
}
}
}

await connection.WriteAsync(Signals.Unit, token);
break;
}

await connection.WriteAsync(Subscription.Remove(_key, name, _serializer), token);
byte[] removeResult = await connection.ReadAsync(token);
if (!removeResult.IsUnit())
{
// TODO: Logging.
}
connection.Write(Signals.Unit);
awaiter.Handler(input);
}
}

private class Awaiter
{
public readonly object Locker = new();
public Action<byte[]> Handler { get; }

public Dictionary<int, Action<byte[]>> Handlers { get; } = new();
public IConnection Connection { get; }

public CancellationTokenSource Source { get; } = new();
public Awaiter(Action<byte[]> handler, IConnection connection)
{
Handler = handler;
Connection = connection;
}
}
}
}
5 changes: 1 addition & 4 deletions HandyIpc/Core/Middlewares.cs
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ public static Middleware GetHandleEvent(ConcurrentDictionary<string, NotifierMan
var manager = notifiers.GetOrAdd(subscription.Name, _ => new NotifierManager(ctx.Serializer));
manager.Subscribe(subscription.CallbackName, subscription.ProcessId, ctx.Connection);
ctx.Output = Signals.Unit;
ctx.KeepAlive = false;
}
break;
case SubscriptionType.Remove:
Expand All @@ -79,12 +80,8 @@ public static Middleware GetHandleEvent(ConcurrentDictionary<string, NotifierMan
ctx.Output = Signals.Unit;
}
break;
case SubscriptionType.Promise:
// ignored
break;
}
ctx.KeepAlive = false;
return;
}
Expand Down
12 changes: 8 additions & 4 deletions HandyIpc/Core/NotifierManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public void Unsubscribe(string name, int processId)
{
lock (_locker)
{
if (_notifiers.TryGetValue(_key, out Notifier notifier))
if (_notifiers.TryGetValue(name, out Notifier notifier))
{
notifier.Unsubscribe(processId);
}
Expand All @@ -67,8 +67,7 @@ public void Publish(byte[] e)

try
{
connection.Write(e);
byte[] result = connection.Read();
byte[] result = connection.Invoke(e);
if (!result.IsUnit())
{
// TODO: Handle exception.
Expand All @@ -94,7 +93,12 @@ public void Unsubscribe(int processId)
{
lock (_locker)
{
_connections.Remove(processId);
if (_connections.TryGetValue(processId, out IConnection connection))
{
_connections.Remove(processId);
// Send a signal to notify end this connection.
connection.Write(Signals.Empty);
}
}
}
}
Expand Down

0 comments on commit c81663d

Please sign in to comment.