Skip to content

Commit

Permalink
Merge pull request #83 from progaudi/fix/reconnect
Browse files Browse the repository at this point in the history
fix lost connection and reconnect automatically
  • Loading branch information
aensidhe authored Dec 28, 2016
2 parents 286108b + 0750eee commit 9132a64
Show file tree
Hide file tree
Showing 13 changed files with 465 additions and 128 deletions.
51 changes: 4 additions & 47 deletions src/tarantool.client/Box.cs
Original file line number Diff line number Diff line change
@@ -1,11 +1,8 @@
using System;
using System.Linq;
using System.Threading.Tasks;
using System.Threading.Tasks;

using ProGaudi.Tarantool.Client.Model;
using ProGaudi.Tarantool.Client.Model.Requests;
using ProGaudi.Tarantool.Client.Model.Responses;
using ProGaudi.Tarantool.Client.Utils;

namespace ProGaudi.Tarantool.Client
{
Expand All @@ -15,40 +12,17 @@ public class Box : IBox

private readonly ILogicalConnection _logicalConnection;

private readonly IResponseReader _responseReader;

private readonly INetworkStreamPhysicalConnection _physicalConnection;

public Box(ClientOptions options)
{
_clientOptions = options;
TarantoolConvertersRegistrator.Register(options.MsgPackContext);

_physicalConnection = new NetworkStreamPhysicalConnection();
_logicalConnection = new LogicalConnection(options, _physicalConnection);
_responseReader = new ResponseReader(_logicalConnection, options, _physicalConnection);
_logicalConnection = new LogicalConnectionManager(options);
}

public async Task Connect()
{
await _physicalConnection.Connect(_clientOptions);

var greetingsResponseBytes = new byte[128];
var readCount = await _physicalConnection.ReadAsync(greetingsResponseBytes, 0, greetingsResponseBytes.Length);
if (readCount != greetingsResponseBytes.Length)
{
throw ExceptionHelper.UnexpectedGreetingBytesCount(readCount);
}

var greetings = new GreetingsResponse(greetingsResponseBytes);

_clientOptions.LogWriter?.WriteLine($"Greetings received, salt is {Convert.ToBase64String(greetings.Salt)} .");

_responseReader.BeginReading();

_clientOptions.LogWriter?.WriteLine("Server responses reading started.");

await LoginIfNotGuest(greetings);
await _logicalConnection.Connect();
}

public static async Task<Box> Connect(string replicationSource)
Expand All @@ -72,8 +46,7 @@ public void Dispose()
{
_clientOptions.LogWriter?.WriteLine("Box is disposing...");
_clientOptions.LogWriter?.Flush();
_responseReader.Dispose();
_physicalConnection.Dispose();
_logicalConnection.Dispose();
}

public ISchema GetSchema()
Expand Down Expand Up @@ -141,21 +114,5 @@ public Task<DataResponse<TResponse[]>> Eval<TResponse>(string expression)
{
return Eval<TarantoolTuple, TResponse>(expression, TarantoolTuple.Empty);
}

private async Task LoginIfNotGuest(GreetingsResponse greetings)
{
var singleNode = _clientOptions.ConnectionOptions.Nodes.Single();

if (string.IsNullOrEmpty(singleNode.Uri.UserName))
{
_clientOptions.LogWriter?.WriteLine("Guest mode, no authentication attempt.");
return;
}

var authenticateRequest = AuthenticationRequest.Create(greetings, singleNode.Uri);

await _logicalConnection.SendRequestWithEmptyResponse(authenticateRequest);
_clientOptions.LogWriter?.WriteLine($"Authentication request send: {authenticateRequest}");
}
}
}
24 changes: 24 additions & 0 deletions src/tarantool.client/Converters/PingPacketConverter.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
using System;

using ProGaudi.MsgPack.Light;

using ProGaudi.Tarantool.Client.Model.Requests;

namespace ProGaudi.Tarantool.Client.Converters
{
internal class PingPacketConverter : IMsgPackConverter<PingRequest>
{
public void Initialize(MsgPackContext context)
{
}

public void Write(PingRequest value, IMsgPackWriter writer)
{
}

public PingRequest Read(IMsgPackReader reader)
{
throw new NotImplementedException();
}
}
}
14 changes: 6 additions & 8 deletions src/tarantool.client/ILogicalConnection.cs
Original file line number Diff line number Diff line change
@@ -1,23 +1,21 @@
using System.Collections.Generic;
using System.IO;
using System;
using System.Threading.Tasks;

using ProGaudi.Tarantool.Client.Model;
using ProGaudi.Tarantool.Client.Model.Requests;
using ProGaudi.Tarantool.Client.Model.Responses;

namespace ProGaudi.Tarantool.Client
{
public interface ILogicalConnection
public interface ILogicalConnection : IDisposable
{
Task Connect();

bool IsConnected();

Task SendRequestWithEmptyResponse<TRequest>(TRequest request)
where TRequest : IRequest;

Task<DataResponse<TResponse[]>> SendRequest<TRequest, TResponse>(TRequest request)
where TRequest : IRequest;

TaskCompletionSource<MemoryStream> PopResponseCompletionSource(RequestId requestId, MemoryStream resultStream);

IEnumerable<TaskCompletionSource<MemoryStream>> PopAllResponseCompletionSources();
}
}
1 change: 1 addition & 0 deletions src/tarantool.client/INetworkStreamPhysicalConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ public interface INetworkStreamPhysicalConnection : IDisposable
{
Task Connect(ClientOptions options);
Task Flush();
bool IsConnected();
Task<int> ReadAsync(byte[] buffer, int offset, int count);
void Write(byte[] buffer, int offset, int count);
}
Expand Down
9 changes: 9 additions & 0 deletions src/tarantool.client/IResponseReader.cs
Original file line number Diff line number Diff line change
@@ -1,9 +1,18 @@
using System;
using System.IO;
using System.Threading.Tasks;

using ProGaudi.Tarantool.Client.Model;

namespace ProGaudi.Tarantool.Client
{

public interface IResponseReader : IDisposable
{
void BeginReading();

Task<MemoryStream> GetResponseTask(RequestId requestId);

bool IsConnected();
}
}
146 changes: 102 additions & 44 deletions src/tarantool.client/LogicalConnection.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,4 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Threading;
Expand All @@ -20,20 +18,95 @@ internal class LogicalConnection : ILogicalConnection
{
private readonly MsgPackContext _msgPackContext;

private readonly ClientOptions _clientOptions;

private readonly RequestIdCounter _requestIdCounter;

private readonly INetworkStreamPhysicalConnection _physicalConnection;

private long _currentRequestId;
private readonly ReaderWriterLockSlim _physicalConnectionLock = new ReaderWriterLockSlim();

private readonly ConcurrentDictionary<RequestId, TaskCompletionSource<MemoryStream>> _pendingRequests =
new ConcurrentDictionary<RequestId, TaskCompletionSource<MemoryStream>>();
private readonly IResponseReader _responseReader;

private readonly ILog _logWriter;

public LogicalConnection(ClientOptions options, INetworkStreamPhysicalConnection physicalConnection)
private bool _disposed;

public LogicalConnection(ClientOptions options, RequestIdCounter requestIdCounter)
{
_clientOptions = options;
_requestIdCounter = requestIdCounter;
_msgPackContext = options.MsgPackContext;
_logWriter = options.LogWriter;
_physicalConnection = physicalConnection;

_physicalConnection = new NetworkStreamPhysicalConnection();
_responseReader = new ResponseReader(_clientOptions, _physicalConnection);
}

public void Dispose()
{
if (_disposed)
{
return;
}

_disposed = true;

_responseReader.Dispose();
_physicalConnection.Dispose();
}

public async Task Connect()
{
await _physicalConnection.Connect(_clientOptions);

var greetingsResponseBytes = new byte[128];
var readCount = await _physicalConnection.ReadAsync(greetingsResponseBytes, 0, greetingsResponseBytes.Length);
if (readCount != greetingsResponseBytes.Length)
{
throw ExceptionHelper.UnexpectedGreetingBytesCount(readCount);
}

var greetings = new GreetingsResponse(greetingsResponseBytes);

_clientOptions.LogWriter?.WriteLine($"Greetings received, salt is {Convert.ToBase64String(greetings.Salt)} .");

_responseReader.BeginReading();

_clientOptions.LogWriter?.WriteLine("Server responses reading started.");

await LoginIfNotGuest(greetings);
}

public bool IsConnected()
{
if (_disposed)
{
return false;
}

if (!_responseReader.IsConnected() || !_physicalConnection.IsConnected())
{
return false;
}

return true;
}

private async Task LoginIfNotGuest(GreetingsResponse greetings)
{
var singleNode = _clientOptions.ConnectionOptions.Nodes.Single();

if (string.IsNullOrEmpty(singleNode.Uri.UserName))
{
_clientOptions.LogWriter?.WriteLine("Guest mode, no authentication attempt.");
return;
}

var authenticateRequest = AuthenticationRequest.Create(greetings, singleNode.Uri);

await SendRequestWithEmptyResponse(authenticateRequest);
_clientOptions.LogWriter?.WriteLine($"Authentication request send: {authenticateRequest}");
}

public async Task SendRequestWithEmptyResponse<TRequest>(TRequest request)
Expand All @@ -48,15 +121,6 @@ public async Task<DataResponse<TResponse[]>> SendRequest<TRequest, TResponse>(TR
return await SendRequestImpl<TRequest, DataResponse<TResponse[]>>(request);
}

public TaskCompletionSource<MemoryStream> PopResponseCompletionSource(RequestId requestId, MemoryStream resultStream)
{
TaskCompletionSource<MemoryStream> request;

return _pendingRequests.TryRemove(requestId, out request)
? request
: null;
}

public static byte[] ReadFully(Stream input)
{
input.Position = 0;
Expand All @@ -72,32 +136,43 @@ public static byte[] ReadFully(Stream input)
}
}

public IEnumerable<TaskCompletionSource<MemoryStream>> PopAllResponseCompletionSources()
{
var result = _pendingRequests.Values.ToArray();
_pendingRequests.Clear();
return result;
}

private async Task<TResponse> SendRequestImpl<TRequest, TResponse>(TRequest request)
where TRequest : IRequest
{
if (_disposed)
{
throw new ObjectDisposedException(nameof(LogicalConnection));
}

var bodyBuffer = MsgPackSerializer.Serialize(request, _msgPackContext);

var requestId = GetRequestId();
var responseTask = GetResponseTask(requestId);
var requestId = _requestIdCounter.GetRequestId();
var responseTask = _responseReader.GetResponseTask(requestId);

long headerLength;
var headerBuffer = CreateAndSerializeBuffer(request, requestId, bodyBuffer, out headerLength);

lock (_physicalConnection)
try
{
_physicalConnectionLock.EnterWriteLock();

_logWriter?.WriteLine($"Begin sending request header buffer, requestId: {requestId}, code: {request.Code}, length: {headerBuffer.Length}");
_physicalConnection.Write(headerBuffer, 0, Constants.PacketSizeBufferSize + (int) headerLength);
_physicalConnection.Write(headerBuffer, 0, Constants.PacketSizeBufferSize + (int)headerLength);

_logWriter?.WriteLine($"Begin sending request body buffer, length: {bodyBuffer.Length}");
_physicalConnection.Write(bodyBuffer, 0, bodyBuffer.Length);
}
catch (Exception ex)
{
_logWriter?.WriteLine(
$"Request with requestId {requestId} failed, header:\n{ToReadableString(headerBuffer)} \n body: \n{ToReadableString(bodyBuffer)}");
Dispose();
throw;
}
finally
{
_physicalConnectionLock.ExitWriteLock();
}

try
{
Expand Down Expand Up @@ -137,22 +212,5 @@ private byte[] CreateAndSerializeBuffer<TRequest>(
MsgPackSerializer.Serialize(packetLength, stream, _msgPackContext);
return packetSizeBuffer;
}

private RequestId GetRequestId()
{
var requestId = Interlocked.Increment(ref _currentRequestId);
return (RequestId) (ulong) requestId;
}

private Task<MemoryStream> GetResponseTask(RequestId requestId)
{
var tcs = new TaskCompletionSource<MemoryStream>();
if (!_pendingRequests.TryAdd(requestId, tcs))
{
throw ExceptionHelper.RequestWithSuchIdAlreadySent(requestId);
}

return tcs.Task;
}
}
}
Loading

0 comments on commit 9132a64

Please sign in to comment.