Skip to content

Commit

Permalink
Fixed #256
Browse files Browse the repository at this point in the history
  • Loading branch information
sakno committed Jan 21, 2025
1 parent 54bb3c3 commit 57d758c
Show file tree
Hide file tree
Showing 3 changed files with 115 additions and 28 deletions.
2 changes: 1 addition & 1 deletion src/DotNext.IO/DotNext.IO.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
<Authors>.NET Foundation and Contributors</Authors>
<Company />
<Product>.NEXT Family of Libraries</Product>
<VersionPrefix>5.18.0</VersionPrefix>
<VersionPrefix>5.18.1</VersionPrefix>
<VersionSuffix></VersionSuffix>
<AssemblyName>DotNext.IO</AssemblyName>
<PackageLicenseExpression>MIT</PackageLicenseExpression>
Expand Down
94 changes: 67 additions & 27 deletions src/DotNext.IO/IO/PoolingBufferedStream.cs
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ public override int WriteTimeout
}

/// <inheritdoc/>
[DebuggerBrowsable(DebuggerBrowsableState.Never)]
public override long Length
{
get
Expand Down Expand Up @@ -156,6 +157,7 @@ private void EnsureReadBufferIsEmpty()
}

/// <inheritdoc/>
[DebuggerBrowsable(DebuggerBrowsableState.Never)]
Memory<byte> IBufferedWriter.Buffer
{
get
Expand Down Expand Up @@ -302,23 +304,29 @@ private void WriteCore(ReadOnlySpan<byte> data)

if (writePosition is 0)
ClearReadBufferBeforeWrite();

var freeBuf = EnsureBufferAllocated().Span.Slice(writePosition);

// drain buffered data if needed
if (freeBuf.Length < data.Length)
WriteCore();

// if internal buffer has not enough space then just write through
if (data.Length > freeBuf.Length)
if (data.Length <= freeBuf.Length)
{
stream.Write(data);
Reset();
data.CopyTo(freeBuf);
writePosition += data.Length;
}
else
else if (data.Length < maxBufferSize)
{
data.CopyTo(freeBuf, out var bytesWritten);
stream.Write(freeBuf = buffer.Span);
data = data.Slice(bytesWritten);
data.CopyTo(freeBuf);
writePosition += data.Length;
writePosition = data.Length;

Debug.Assert(writePosition > 0);
}
else
{
WriteCore();
stream.Write(data);
Reset();
}
}

Expand Down Expand Up @@ -360,35 +368,64 @@ public override ValueTask WriteAsync(ReadOnlyMemory<byte> data, CancellationToke
return task;
}

private async ValueTask WriteCoreAsync(ReadOnlyMemory<byte> data, CancellationToken token)
private ValueTask WriteCoreAsync(ReadOnlyMemory<byte> data, CancellationToken token)
{
Debug.Assert(stream is not null);

if (writePosition is 0)
ClearReadBufferBeforeWrite();

var freeBuf = EnsureBufferAllocated().Memory.Slice(writePosition);
// drain buffered data if needed
if (freeBuf.Length < data.Length)
var freeCapacity = maxBufferSize - writePosition;

ValueTask task;
if (data.Length <= freeCapacity)
{
await WriteCoreAsync(out _, token).ConfigureAwait(false);
freeBuf = buffer.Memory.Slice(writePosition);
data.CopyTo(EnsureBufferAllocated().Memory.Slice(writePosition));
writePosition += data.Length;
task = ValueTask.CompletedTask;
}

// if internal buffer has not enough space then just write through
if (data.Length > freeBuf.Length)
else if (data.Length < maxBufferSize)
{
await stream.WriteAsync(data, token).ConfigureAwait(false);
Reset();
task = CopyAndWriteAsync(data, token);
}
else if (writePosition is 0)
{
task = stream.WriteAsync(data, token);
}
else
{
data.CopyTo(freeBuf);
writePosition += data.Length;
task = WriteWithBufferAsync(data, token);
}

return task;
}


private async ValueTask CopyAndWriteAsync(ReadOnlyMemory<byte> data, CancellationToken token)
{
Debug.Assert(stream is not null);
Debug.Assert(data.Length < maxBufferSize);

var writeBuffer = buffer.Memory;
data.Span.CopyTo(writeBuffer.Span.Slice(writePosition), out var bytesWritten);
await stream.WriteAsync(writeBuffer, token).ConfigureAwait(false);
data = data.Slice(bytesWritten);
data.CopyTo(writeBuffer);
writePosition = data.Length;

Debug.Assert(writePosition > 0);
}

private async ValueTask WriteWithBufferAsync(ReadOnlyMemory<byte> data, CancellationToken token)
{
Debug.Assert(stream is not null);
Debug.Assert(data.Length >= maxBufferSize);
Debug.Assert(writePosition > 0);

await stream.WriteAsync(WrittenMemory, token).ConfigureAwait(false);
await stream.WriteAsync(data, token).ConfigureAwait(false);
Reset();
}

/// <inheritdoc/>
public override Task WriteAsync(byte[] data, int offset, int count, CancellationToken token)
=> WriteAsync(new ReadOnlyMemory<byte>(data, offset, count), token).AsTask();
Expand Down Expand Up @@ -454,12 +491,13 @@ private int ReadCore(Span<byte> data)
{
// nothing to do
}
else if (data.Length > MaxBufferSize)
else if (data.Length > maxBufferSize)
{
bytesRead += stream.Read(data);
}
else
{
readPosition = 0;
readLength = stream.Read(EnsureBufferAllocated().Span);
bytesRead += ReadFromBuffer(data);
}
Expand Down Expand Up @@ -536,6 +574,7 @@ private async ValueTask<int> ReadCoreAsync(Memory<byte> data, CancellationToken
else
{
Debug.Assert(readPosition == readLength);
readPosition = 0;
readLength = await stream.ReadAsync(EnsureBufferAllocated().Memory, token).ConfigureAwait(false);
bytesRead += ReadFromBuffer(data.Span);
}
Expand Down Expand Up @@ -740,6 +779,7 @@ private void EnsureWriteBufferIsEmpty()
}

/// <inheritdoc/>
[DebuggerBrowsable(DebuggerBrowsableState.Never)]
ReadOnlyMemory<byte> IBufferedReader.Buffer
{
get
Expand Down
47 changes: 47 additions & 0 deletions src/DotNext.Tests/IO/PoolingBufferedStreamTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -389,4 +389,51 @@ public static void OverwriteStream()
buffered.Write("text"u8);
Equal(4L, buffered.Position);
}

[Fact]
public static async Task RegressionIssue256()
{
const int dataSize = 128 + 3105 + 66 + 3111 + 66 + 3105 + 66 + 2513 + 128;
ReadOnlyMemory<byte> expected = RandomBytes(dataSize);
await using var ms = new MemoryStream();

await using (var buffered = new PoolingBufferedStream(ms, leaveOpen: true) { MaxBufferSize = 8192 })
{
await buffered.WriteAsync(expected);
await buffered.FlushAsync();
}

ms.Position = 0;
await using (var reader = new PoolingBufferedStream(ms, leaveOpen: true) { MaxBufferSize = 4096 })
{
Memory<byte> buffer = new byte[dataSize];
await reader.ReadExactlyAsync(buffer.Slice(0, 3175));
reader.Position = 3303;
await reader.ReadExactlyAsync(buffer.Slice(0, 3107));
Equal(expected.Slice(3303, 3107), buffer.Slice(0, 3107));
}
}

[Fact]
public static void RepeatableReads()
{
var bytes = RandomBytes(128);
using var reader = new PoolingBufferedStream(new MemoryStream(bytes)) { MaxBufferSize = 256 };
True(reader.Read());
False(reader.Read());
True(reader.HasBufferedDataToRead);

Equal(bytes, reader.As<IBufferedReader>().Buffer);
}

[Fact]
public static void ReadEmpty()
{
var bytes = RandomBytes(128);
using var reader = new PoolingBufferedStream(new MemoryStream(bytes)) { MaxBufferSize = 256 };
True(reader.Read());
reader.ReadExactly(new byte[bytes.Length]);
False(reader.Read());
False(reader.HasBufferedDataToRead);
}
}

0 comments on commit 57d758c

Please sign in to comment.