Skip to content

Commit

Permalink
feat: ipc contract interface support the csharp event. (#12)
Browse files Browse the repository at this point in the history
  • Loading branch information
DingpingZhang committed Oct 4, 2022
1 parent ab47e36 commit 602d89e
Show file tree
Hide file tree
Showing 7 changed files with 97 additions and 124 deletions.
14 changes: 7 additions & 7 deletions HandyIpc.Generator/ClientProxy.cs
Original file line number Diff line number Diff line change
Expand Up @@ -30,31 +30,31 @@ public class {nameof(ClientProxy)}{className}{typeParameters} : {interfaceType}
private readonly Sender _sender;
private readonly ISerializer _serializer;
private readonly string _key;
private readonly AwaitorManager _awaitorManager;
private readonly AwaiterManager _awaiterManager;
{events.For(item =>
{
var eSymbol = ((INamedTypeSymbol)item.Type).DelegateInvokeMethod.Parameters[1];
var eType = eSymbol.Type.ToTypeDeclaration();
var eSymbol = ((INamedTypeSymbol)item.Type).DelegateInvokeMethod!.Parameters[1];
string eType = eSymbol.Type.ToTypeDeclaration();
return $@"
public event {item.Type.ToTypeDeclaration()} {item.Name}
{{
add => _awaitorManager.Subscribe(""{item.Name}"", value.GetHashCode(), args =>
add => _awaiterManager.Subscribe(""{item.Name}"", value.GetHashCode(), args =>
{{
var e = ({eType})_serializer.Deserialize(args, typeof({eType}));
value(this, e);
}});
remove => _awaitorManager.Unsubscribe(""{item.Name}"", value.GetHashCode());
remove => _awaiterManager.Unsubscribe(""{item.Name}"", value.GetHashCode());
}}
";
}, RemoveLineIfEmpty)}
})}
public {nameof(ClientProxy)}{className}(Sender sender, ISerializer serializer, string key)
{{
_sender = sender;
_serializer = serializer;
_key = key;
_awaitorManager = new AwaitorManager(key, sender, serializer);
_awaiterManager = new AwaiterManager(key, sender, serializer);
}}
{methods.For(method =>
{
Expand Down
5 changes: 2 additions & 3 deletions HandyIpc.Tests/EventTypeTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ public EventTypeTest(NamedPipeFixture namedPipeFixture, SocketFixture socketFixt
}

[Fact]
public async Task TestEventHandler()
public void TestEventHandler()
{
int count = 0;
var instance = _socketFixture.Client.Resolve<IEventType>();
Expand All @@ -32,8 +32,7 @@ public async Task TestEventHandler()
instance.RaiseChanged(EventArgs.Empty);
instance.RaiseChanged(EventArgs.Empty);

await Task.Delay(2000);
Assert.Equal(3, count);
Assert.Equal(4, count);
}

private void Instance_Changed(object? sender, EventArgs e)
Expand Down
37 changes: 24 additions & 13 deletions HandyIpc/ContainerServer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -56,28 +56,23 @@ private async Task StartAsync(CancellationToken token)
while (!token.IsCancellationRequested)
{
IConnection connection = await _server.WaitForConnectionAsync();
RequestHandler handler = _middleware.ToHandler(
ctx =>
{
ctx.Logger = _logger;
ctx.Serializer = _serializer;
ctx.Connection = connection;
});

if (token.IsCancellationRequested)
{
break;
}

// Do not await the request handler, and go to await next stream connection directly.
#pragma warning disable 4014
HandleRequestAsync(connection, handler, token);
HandleRequestAsync(connection, token);
#pragma warning restore 4014
}
}

private async Task HandleRequestAsync(IConnection connection, RequestHandler handler, CancellationToken token)
private async Task HandleRequestAsync(IConnection connection, CancellationToken token)
{
Task Handler(Context context) => _middleware(context, () => Task.CompletedTask);

bool disposeConnection = true;
try
{
while (true)
Expand All @@ -94,8 +89,21 @@ private async Task HandleRequestAsync(IConnection connection, RequestHandler han
break;
}

byte[] output = await handler(buffer);
await connection.WriteAsync(output, token);
Context ctx = new()
{
Input = input,
Connection = connection,
Logger = _logger,
Serializer = _serializer,
};
await Handler(ctx);
await connection.WriteAsync(ctx.Output, token);

if (!ctx.KeepAlive)
{
disposeConnection = false;
break;
}
}
}
catch (OperationCanceledException)
Expand All @@ -108,7 +116,10 @@ private async Task HandleRequestAsync(IConnection connection, RequestHandler han
}
finally
{
connection.Dispose();
if (disposeConnection)
{
connection.Dispose();
}
}
}
}
Expand Down
53 changes: 27 additions & 26 deletions HandyIpc/Core/AwaitorManager.cs → HandyIpc/Core/AwaiterManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,14 @@

namespace HandyIpc.Core
{
public class AwaitorManager
public class AwaiterManager
{
private readonly ConcurrentDictionary<string, Awaitor> _pool = new();
private readonly ConcurrentDictionary<string, Awaiter> _pool = new();
private readonly string _key;
private readonly ISerializer _serializer;
private readonly Sender _sender;

public AwaitorManager(string key, Sender sender, ISerializer serializer)
public AwaiterManager(string key, Sender sender, ISerializer serializer)
{
_key = key;
_sender = sender;
Expand All @@ -22,57 +22,58 @@ public AwaitorManager(string key, Sender sender, ISerializer serializer)

public void Subscribe(string name, int handlerId, Action<byte[]> callback)
{
Awaitor awaitor = _pool.GetOrAdd(name, _ => new Awaitor());
lock (awaitor.Locker)
Awaiter awaiter = _pool.GetOrAdd(name, _ => new Awaiter());
lock (awaiter.Locker)
{
if (awaitor.Handlers.Count == 0)
if (awaiter.Handlers.Count == 0)
{
RentedValue<IConnection> connection = _sender.ConnectionPool.Rent();
Task.Run(() => LoopWait(connection, name, awaitor, awaitor.Source.Token));
RentedValue<IConnection> rented = _sender.ConnectionPool.Rent();
IConnection connection = rented.Value;
connection.Write(Subscription.Add(_key, name, _serializer));
byte[] addResult = connection.Read();
if (!addResult.IsUnit())
{
// TODO: Use exact exception.
throw new InvalidOperationException();
}

Task.Run(() => LoopWait(rented, name, awaiter, awaiter.Source.Token));
}

awaitor.Handlers[handlerId] = callback;
awaiter.Handlers[handlerId] = callback;
}
}

public void Unsubscribe(string name, int handlerId)
{
if (!_pool.TryGetValue(name, out Awaitor awaitor))
if (!_pool.TryGetValue(name, out Awaiter awaiter))
{
return;
}

lock (awaitor.Locker)
lock (awaiter.Locker)
{
awaitor.Handlers.Remove(handlerId);
if (awaitor.Handlers.Count == 0)
awaiter.Handlers.Remove(handlerId);
if (awaiter.Handlers.Count == 0)
{
_pool.TryRemove(name, out _);
awaitor.Source.Cancel();
awaiter.Source.Cancel();
}
}
}

private async Task LoopWait(RentedValue<IConnection> rented, string name, Awaitor awaitor, CancellationToken token)
private async Task LoopWait(RentedValue<IConnection> rented, string name, Awaiter awaiter, CancellationToken token)
{
using (rented)
{
IConnection connection = rented.Value;
await connection.WriteAsync(Subscription.Add(_key, name, _serializer), token);
byte[] addResult = await connection.ReadAsync(token);
if (!addResult.IsUnit())
{
// TODO: Use exact exception.
throw new InvalidOperationException();
}

while (!token.IsCancellationRequested)
{
// Will blocked until accepted a notification.
byte[] input = await connection.ReadAsync(token);
lock (awaitor.Locker)
lock (awaiter.Locker)
{
foreach (var handler in awaitor.Handlers.Values)
foreach (var handler in awaiter.Handlers.Values)
{
try
{
Expand All @@ -97,7 +98,7 @@ private async Task LoopWait(RentedValue<IConnection> rented, string name, Awaito
}
}

private class Awaitor
private class Awaiter
{
public readonly object Locker = new();

Expand Down
10 changes: 3 additions & 7 deletions HandyIpc/Core/Context.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,11 @@ public class Context
{
private static readonly byte[] EmptyBytes = Array.Empty<byte>();

public byte[] Input { get; }
// FIXME: Replace 'set;' with 'init;'
public byte[] Input { get; set; } = null!;

public byte[] Output { get; set; } = EmptyBytes;

public IDictionary<object, object> Items { get; } = new Dictionary<object, object>();

public Request? Request { get; set; }

public ISerializer Serializer { get; set; } = null!;
Expand All @@ -21,9 +20,6 @@ public class Context

public IConnection Connection { get; set; } = null!;

public Context(byte[] input)
{
Input = input;
}
public bool KeepAlive { get; set; } = true;
}
}
17 changes: 4 additions & 13 deletions HandyIpc/Core/Middlewares.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@ namespace HandyIpc.Core
{
public delegate Task Middleware(Context context, Func<Task> next);

public delegate Task<byte[]> RequestHandler(byte[] input);

public static class Middlewares
{
#region Build-in middlewares
Expand Down Expand Up @@ -81,8 +79,12 @@ public static Middleware GetHandleEvent(ConcurrentDictionary<string, NotifierMan
ctx.Output = Signals.Unit;
}
break;
case SubscriptionType.Promise:
// ignored
break;
}
ctx.KeepAlive = false;
return;
}
Expand Down Expand Up @@ -148,16 +150,5 @@ public static Middleware Compose(params Middleware[] middlewareArray)
{
return middlewareArray.Compose();
}

public static RequestHandler ToHandler(this Middleware middleware, Action<Context> configure)
{
return async input =>
{
var ctx = new Context(input);
configure(ctx);
await middleware(ctx, () => Task.CompletedTask);
return ctx.Output;
};
}
}
}
Loading

0 comments on commit 602d89e

Please sign in to comment.