Skip to content

Commit

Permalink
Merge pull request #147 from karlovnv/master
Browse files Browse the repository at this point in the history
PR implements batching
  • Loading branch information
aensidhe authored Sep 17, 2019
2 parents 7101e2f + 6a9eeec commit 89c91c6
Show file tree
Hide file tree
Showing 6 changed files with 110 additions and 40 deletions.
2 changes: 1 addition & 1 deletion src/progaudi.tarantool/IRequestWriter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,6 @@ internal interface IRequestWriter : IDisposable

bool IsConnected { get; }

void Write(ArraySegment<byte> header, ArraySegment<byte> body);
void Write(ArraySegment<byte> request);
}
}
43 changes: 26 additions & 17 deletions src/progaudi.tarantool/LogicalConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ internal class LogicalConnection : ILogicalConnection
private readonly IRequestWriter _requestWriter;

private readonly ILog _logWriter;

private bool _disposed;

public LogicalConnection(ClientOptions options, RequestIdCounter requestIdCounter)
Expand Down Expand Up @@ -152,15 +151,26 @@ private async Task<MemoryStream> SendRequestImpl<TRequest>(TRequest request, Tim
throw new ObjectDisposedException(nameof(LogicalConnection));
}

var bodyBuffer = MsgPackSerializer.Serialize(request, _msgPackContext);


var requestId = _requestIdCounter.GetRequestId();
var responseTask = _responseReader.GetResponseTask(requestId);

var headerBuffer = CreateAndSerializeHeader(request, requestId, bodyBuffer);
var stream = CreateAndSerializeHeader(request, requestId);
MsgPackSerializer.Serialize(request, stream, _msgPackContext);
var totalLength = stream.Position - Constants.PacketSizeBufferSize;
var packetLength = new PacketSize((uint)(totalLength));
AddPacketSize(stream, packetLength);

ArraySegment<byte> buffer;
if(!stream.TryGetBuffer(out buffer))
{
throw new InvalidOperationException("broken buffer");
}

//keep API for the sake of backward comp.
_requestWriter.Write(
headerBuffer,
new ArraySegment<byte>(bodyBuffer, 0, bodyBuffer.Length));
// merged header and body
buffer);

try
{
Expand All @@ -177,7 +187,7 @@ private async Task<MemoryStream> SendRequestImpl<TRequest>(TRequest request, Tim
}
catch (ArgumentException)
{
_logWriter?.WriteLine($"Response with requestId {requestId} failed, header:\n{headerBuffer.ToReadableString()} \n body: \n{bodyBuffer.ToReadableString()}");
_logWriter?.WriteLine($"Response with requestId {requestId} failed, content:\n{buffer.ToReadableString()} ");
throw;
}
catch (TimeoutException)
Expand All @@ -187,24 +197,23 @@ private async Task<MemoryStream> SendRequestImpl<TRequest>(TRequest request, Tim
}
}

private ArraySegment<byte> CreateAndSerializeHeader<TRequest>(
private MemoryStream CreateAndSerializeHeader<TRequest>(
TRequest request,
RequestId requestId,
byte[] serializedRequest) where TRequest : IRequest
RequestId requestId) where TRequest : IRequest
{
var packetSizeBuffer = new byte[Constants.PacketSizeBufferSize + Constants.MaxHeaderLength];
var stream = new MemoryStream(packetSizeBuffer);

var stream = new MemoryStream();

var requestHeader = new RequestHeader(request.Code, requestId);
stream.Seek(Constants.PacketSizeBufferSize, SeekOrigin.Begin);
MsgPackSerializer.Serialize(requestHeader, stream, _msgPackContext);

var lengthAndHeaderLengthByteCount = (int)stream.Position;
var headerLength = lengthAndHeaderLengthByteCount - Constants.PacketSizeBufferSize;
var packetLength = new PacketSize((uint) (headerLength + serializedRequest.Length));
return stream;
}

private void AddPacketSize(MemoryStream stream, PacketSize packetLength)
{
stream.Seek(0, SeekOrigin.Begin);
MsgPackSerializer.Serialize(packetLength, stream, _msgPackContext);
return new ArraySegment<byte>(packetSizeBuffer, 0, lengthAndHeaderLengthByteCount);
}
}
}
6 changes: 5 additions & 1 deletion src/progaudi.tarantool/Model/ClientOptions.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
using ProGaudi.MsgPack.Light;
using System;
using System.Net.Sockets;
using ProGaudi.MsgPack.Light;

namespace ProGaudi.Tarantool.Client.Model
{
Expand All @@ -24,6 +26,8 @@ private ClientOptions(ConnectionOptions options, ILog log, MsgPackContext contex
}
}

public Action<Socket> ConfigureSocket { get; set; }

public ILog LogWriter { get; }

public MsgPackContext MsgPackContext { get; }
Expand Down
15 changes: 14 additions & 1 deletion src/progaudi.tarantool/Model/ConnectionOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,21 @@ private void Parse(string replicationSource, ILog log)
}
}
}
public int WriteStreamBufferSize { get; set; } = 8192 * 2;

public int ReadStreamBufferSize { get; set; } = 4096;
/// <summary>
/// If number of pending requests more than the value, Throttle does not apply
/// </summary>
public int MinRequestsWithThrottle { get; set; } = 16;

/// <summary>
/// 0 - unlimited
/// </summary>
public int MaxRequestsInBatch { get; set; } = 0;

public int WriteThrottlePeriodInMs { get; set; } = 10;

public int ReadStreamBufferSize { get; set; } = 8192;

public int WriteNetworkTimeout { get; set; } = -1;

Expand Down
6 changes: 6 additions & 0 deletions src/progaudi.tarantool/NetworkStreamPhysicalConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,12 @@ public async Task Connect(ClientOptions options)
{
NoDelay = true
};

if(options.ConfigureSocket != null)
{
options.ConfigureSocket(_socket);
}

await ConnectAsync(_socket, singleNode.Uri.Host, singleNode.Uri.Port).ConfigureAwait(false);;

_stream = new NetworkStream(_socket, true);
Expand Down
78 changes: 58 additions & 20 deletions src/progaudi.tarantool/RequestWriter.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.IO;
using System.Threading;
using System.Threading.Tasks;
using ProGaudi.Tarantool.Client.Model;

namespace ProGaudi.Tarantool.Client
Expand All @@ -9,25 +12,28 @@ internal class RequestWriter : IRequestWriter
{
private readonly ClientOptions _clientOptions;
private readonly IPhysicalConnection _physicalConnection;
private readonly Queue<Tuple<ArraySegment<byte>, ArraySegment<byte>>> _buffer;
private readonly Queue<ArraySegment<byte>> _buffer;
private readonly object _lock = new object();
private readonly Thread _thread;
private readonly ManualResetEventSlim _exitEvent;
private readonly ManualResetEventSlim _newRequestsAvailable;
private readonly ConnectionOptions _connectionOptions;
private bool _disposed;
private long _remaining;

public RequestWriter(ClientOptions clientOptions, IPhysicalConnection physicalConnection)
{
_clientOptions = clientOptions;
_physicalConnection = physicalConnection;
_buffer = new Queue<Tuple<ArraySegment<byte>, ArraySegment<byte>>>();
_buffer = new Queue<ArraySegment<byte>>();
_thread = new Thread(WriteFunction)
{
IsBackground = true,
Name = $"{clientOptions.Name} :: Write"
};
_exitEvent = new ManualResetEventSlim();
_newRequestsAvailable = new ManualResetEventSlim();
_connectionOptions = _clientOptions.ConnectionOptions;
}

public void BeginWriting()
Expand All @@ -43,18 +49,18 @@ public void BeginWriting()

public bool IsConnected => !_disposed;

public void Write(ArraySegment<byte> header, ArraySegment<byte> body)
public void Write(ArraySegment<byte> request)
{
if (_disposed)
{
throw new ObjectDisposedException(nameof(ResponseReader));
}

_clientOptions?.LogWriter?.WriteLine($"Enqueuing request: headers {header.Count} bytes, body {body.Count} bytes.");
_clientOptions?.LogWriter?.WriteLine($"Enqueuing request: {request.Count} bytes.");
bool shouldSignal;
lock (_lock)
{
_buffer.Enqueue(Tuple.Create(header, body));
_buffer.Enqueue(request);
shouldSignal = _buffer.Count == 1;
}

Expand All @@ -79,56 +85,88 @@ public void Dispose()
private void WriteFunction()
{
var handles = new[] { _exitEvent.WaitHandle, _newRequestsAvailable.WaitHandle };

var throttle = _connectionOptions.WriteThrottlePeriodInMs;
long remaining;
while (true)
{
switch (WaitHandle.WaitAny(handles))
{
case 0:
return;
case 1:
WriteRequests(200);
WriteRequests(_connectionOptions.WriteStreamBufferSize,
_connectionOptions.MaxRequestsInBatch);

remaining = Interlocked.Read(ref _remaining);

// Thread.Sleep will be called only if the number of pending bytes less than
// MinRequestsWithThrottle

if (throttle > 0 && remaining < _connectionOptions.MinRequestsWithThrottle)
Thread.Sleep(throttle);

break;
default:
throw new ArgumentOutOfRangeException();
}
}
}

private void WriteRequests(int limit)
private void WriteRequests(int bufferLength, int limit)
{
void WriteBuffer(ArraySegment<byte> buffer)
{
_physicalConnection.Write(buffer.Array, buffer.Offset, buffer.Count);
}

Tuple<ArraySegment<byte>, ArraySegment<byte>> GetRequest()
bool GetRequest(out ArraySegment<byte> result)
{
lock (_lock)
{
if (_buffer.Count > 0)
return _buffer.Dequeue();
{
_remaining = _buffer.Count + 1;
result = _buffer.Dequeue();
return true;
}
}

return null;
}
result = default(ArraySegment<byte>);
return false;
}

Tuple<ArraySegment<byte>, ArraySegment<byte>> request;
ArraySegment<byte> request;
var count = 0;
while ((request = GetRequest()) != null)
UInt64 length = 0;
var list = new List<ArraySegment<byte>>();
while (GetRequest(out request))
{
_clientOptions?.LogWriter?.WriteLine($"Writing request: headers {request.Item1.Count} bytes, body {request.Item2.Count} bytes.");

WriteBuffer(request.Item1);
WriteBuffer(request.Item2);
_clientOptions?.LogWriter?.WriteLine($"Writing request: {request.Count} bytes.");
length += (uint)request.Count;

_clientOptions?.LogWriter?.WriteLine($"Wrote request: headers {request.Item1.Count} bytes, body {request.Item2.Count} bytes.");
list.Add(request);
_clientOptions?.LogWriter?.WriteLine($"Wrote request: {request.Count} bytes.");

count++;
if (limit > 0 && count > limit)
if ((limit > 0 && count > limit) || length > (ulong)bufferLength)
{
break;
}

}

if (list.Count > 0)
{
// merge requests into one buffer
var result = new byte[length];
int position = 0;
foreach (var r in list)
{
Buffer.BlockCopy(r.Array, r.Offset, result, position, r.Count);
position += r.Count;
}

WriteBuffer(new ArraySegment<byte>(result));
}

lock (_lock)
Expand Down

0 comments on commit 89c91c6

Please sign in to comment.