Skip to content

Commit

Permalink
Patch 5.18.1 for DotNext.IO
Browse files Browse the repository at this point in the history
  • Loading branch information
sakno committed Jan 21, 2025
1 parent 04b91db commit 4d875d1
Show file tree
Hide file tree
Showing 6 changed files with 157 additions and 45 deletions.
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,9 @@ Release Notes
* Synchronous `TryAcquire` implemented by `AsyncExclusiveLock` and `AsyncReaderWriterLock` are now implemented in portable way. Previously, WASM target was not supported. Additionally, the method supports lock stealing
* * Improved synchronous support for `RandomAccessCache` class

<a href="https://www.nuget.org/packages/dotnext.io/5.18.0">DotNext.IO 5.18.0</a>
<a href="https://www.nuget.org/packages/dotnext.io/5.18.1">DotNext.IO 5.18.1</a>
* Fixed issue of `PoolingBufferedStream` class when the stream has buffered bytes in the write buffer and `Position` is set to backward
* Fixed [256](https://github.com/dotnet/dotNext/issues/256)

<a href="https://www.nuget.org/packages/dotnext.net.cluster/5.18.0">DotNext.Net.Cluster 5.18.0</a>
* Updated dependencies
Expand Down
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,9 @@ Release Date: 01-20-2025
* Synchronous `TryAcquire` implemented by `AsyncExclusiveLock` and `AsyncReaderWriterLock` are now implemented in portable way. Previously, WASM target was not supported. Additionally, the method supports lock stealing
* Improved synchronous support for `RandomAccessCache` class

<a href="https://www.nuget.org/packages/dotnext.io/5.18.0">DotNext.IO 5.18.0</a>
<a href="https://www.nuget.org/packages/dotnext.io/5.18.1">DotNext.IO 5.18.1</a>
* Fixed issue of `PoolingBufferedStream` class when the stream has buffered bytes in the write buffer and `Position` is set to backward
* Fixed [256](https://github.com/dotnet/dotNext/issues/256)

<a href="https://www.nuget.org/packages/dotnext.net.cluster/5.18.0">DotNext.Net.Cluster 5.18.0</a>
* Updated dependencies
Expand Down
2 changes: 1 addition & 1 deletion src/DotNext.IO/DotNext.IO.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
<Authors>.NET Foundation and Contributors</Authors>
<Company />
<Product>.NEXT Family of Libraries</Product>
<VersionPrefix>5.18.0</VersionPrefix>
<VersionPrefix>5.18.1</VersionPrefix>
<VersionSuffix></VersionSuffix>
<AssemblyName>DotNext.IO</AssemblyName>
<PackageLicenseExpression>MIT</PackageLicenseExpression>
Expand Down
8 changes: 3 additions & 5 deletions src/DotNext.IO/IO/FileWriter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -239,19 +239,17 @@ public void Write()

private void WriteSlow(ReadOnlySpan<byte> input)
{
RandomAccess.Write(handle, WrittenBuffer.Span, fileOffset);
fileOffset += bufferOffset;

if (input.Length >= maxBufferSize)
{
RandomAccess.Write(handle, WrittenBuffer.Span, fileOffset);
fileOffset += bufferOffset;

RandomAccess.Write(handle, input, fileOffset);
fileOffset += input.Length;
Reset();
}
else
{
RandomAccess.Write(handle, WrittenBuffer.Span, fileOffset);
fileOffset += bufferOffset;
input.CopyTo(EnsureBufferAllocated().Span);
bufferOffset += input.Length;
}
Expand Down
115 changes: 78 additions & 37 deletions src/DotNext.IO/IO/PoolingBufferedStream.cs
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ public override int WriteTimeout
}

/// <inheritdoc/>
[DebuggerBrowsable(DebuggerBrowsableState.Never)]
public override long Length
{
get
Expand Down Expand Up @@ -156,6 +157,7 @@ private void EnsureReadBufferIsEmpty()
}

/// <inheritdoc/>
[DebuggerBrowsable(DebuggerBrowsableState.Never)]
Memory<byte> IBufferedWriter.Buffer
{
get
Expand Down Expand Up @@ -302,23 +304,29 @@ private void WriteCore(ReadOnlySpan<byte> data)

if (writePosition is 0)
ClearReadBufferBeforeWrite();

var freeBuf = EnsureBufferAllocated().Span.Slice(writePosition);

// drain buffered data if needed
if (freeBuf.Length < data.Length)
WriteCore();

// if internal buffer has not enough space then just write through
if (data.Length > freeBuf.Length)
if (data.Length <= freeBuf.Length)
{
stream.Write(data);
Reset();
data.CopyTo(freeBuf);
writePosition += data.Length;
}
else
else if (data.Length < maxBufferSize)
{
data.CopyTo(freeBuf, out var bytesWritten);
stream.Write(freeBuf = buffer.Span);
data = data.Slice(bytesWritten);
data.CopyTo(freeBuf);
writePosition += data.Length;
writePosition = data.Length;

Debug.Assert(writePosition > 0);
}
else
{
WriteCore();
stream.Write(data);
Reset();
}
}

Expand Down Expand Up @@ -360,35 +368,64 @@ public override ValueTask WriteAsync(ReadOnlyMemory<byte> data, CancellationToke
return task;
}

private async ValueTask WriteCoreAsync(ReadOnlyMemory<byte> data, CancellationToken token)
private ValueTask WriteCoreAsync(ReadOnlyMemory<byte> data, CancellationToken token)
{
Debug.Assert(stream is not null);

if (writePosition is 0)
ClearReadBufferBeforeWrite();

var freeBuf = EnsureBufferAllocated().Memory.Slice(writePosition);
// drain buffered data if needed
if (freeBuf.Length < data.Length)
var freeCapacity = maxBufferSize - writePosition;

ValueTask task;
if (data.Length <= freeCapacity)
{
await WriteCoreAsync(out _, token).ConfigureAwait(false);
freeBuf = buffer.Memory.Slice(writePosition);
data.CopyTo(EnsureBufferAllocated().Memory.Slice(writePosition));
writePosition += data.Length;
task = ValueTask.CompletedTask;
}

// if internal buffer has not enough space then just write through
if (data.Length > freeBuf.Length)
else if (data.Length < maxBufferSize)
{
await stream.WriteAsync(data, token).ConfigureAwait(false);
Reset();
task = CopyAndWriteAsync(data, token);
}
else if (writePosition is 0)
{
task = stream.WriteAsync(data, token);
}
else
{
data.CopyTo(freeBuf);
writePosition += data.Length;
task = WriteWithBufferAsync(data, token);
}

return task;
}


private async ValueTask CopyAndWriteAsync(ReadOnlyMemory<byte> data, CancellationToken token)
{
Debug.Assert(stream is not null);
Debug.Assert(data.Length < maxBufferSize);

var writeBuffer = buffer.Memory;
data.Span.CopyTo(writeBuffer.Span.Slice(writePosition), out var bytesWritten);
await stream.WriteAsync(writeBuffer, token).ConfigureAwait(false);
data = data.Slice(bytesWritten);
data.CopyTo(writeBuffer);
writePosition = data.Length;

Debug.Assert(writePosition > 0);
}

private async ValueTask WriteWithBufferAsync(ReadOnlyMemory<byte> data, CancellationToken token)
{
Debug.Assert(stream is not null);
Debug.Assert(data.Length >= maxBufferSize);
Debug.Assert(writePosition > 0);

await stream.WriteAsync(WrittenMemory, token).ConfigureAwait(false);
await stream.WriteAsync(data, token).ConfigureAwait(false);
Reset();
}

/// <inheritdoc/>
public override Task WriteAsync(byte[] data, int offset, int count, CancellationToken token)
=> WriteAsync(new ReadOnlyMemory<byte>(data, offset, count), token).AsTask();
Expand Down Expand Up @@ -452,20 +489,24 @@ private int ReadCore(Span<byte> data)

if (data.IsEmpty)
{
// nothing to do
if (readPosition == readLength)
Reset();
}
else if (data.Length > MaxBufferSize)
else if (data.Length >= maxBufferSize)
{
Debug.Assert(readPosition == readLength);

bytesRead += stream.Read(data);
Reset();
}
else
{
Debug.Assert(readPosition == readLength);

readPosition = 0;
readLength = stream.Read(EnsureBufferAllocated().Span);
bytesRead += ReadFromBuffer(data);
}

if (readPosition == readLength)
Reset();

return bytesRead;
}
Expand Down Expand Up @@ -526,22 +567,21 @@ private async ValueTask<int> ReadCoreAsync(Memory<byte> data, CancellationToken

if (data.IsEmpty)
{
// nothing to do
if (readPosition == readLength)
Reset();
}
else if (data.Length > MaxBufferSize)
else if (data.Length >= maxBufferSize)
{
Debug.Assert(readPosition == readLength);
bytesRead += await stream.ReadAsync(data, token).ConfigureAwait(false);
Reset();
}
else
{
Debug.Assert(readPosition == readLength);
readPosition = 0;
readLength = await stream.ReadAsync(EnsureBufferAllocated().Memory, token).ConfigureAwait(false);
bytesRead += ReadFromBuffer(data.Span);
}

if (readPosition == readLength)
Reset();

return bytesRead;
}
Expand Down Expand Up @@ -740,6 +780,7 @@ private void EnsureWriteBufferIsEmpty()
}

/// <inheritdoc/>
[DebuggerBrowsable(DebuggerBrowsableState.Never)]
ReadOnlyMemory<byte> IBufferedReader.Buffer
{
get
Expand Down
71 changes: 71 additions & 0 deletions src/DotNext.Tests/IO/PoolingBufferedStreamTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -389,4 +389,75 @@ public static void OverwriteStream()
buffered.Write("text"u8);
Equal(4L, buffered.Position);
}

[Fact]
public static async Task RegressionIssue256Async()
{
const int dataSize = 128 + 3105 + 66 + 3111 + 66 + 3105 + 66 + 2513 + 128;
ReadOnlyMemory<byte> expected = RandomBytes(dataSize);
await using var ms = new MemoryStream();

await using (var buffered = new PoolingBufferedStream(ms, leaveOpen: true) { MaxBufferSize = 8192 })
{
await buffered.WriteAsync(expected);
await buffered.FlushAsync();
}

ms.Position = 0;
await using (var reader = new PoolingBufferedStream(ms, leaveOpen: true) { MaxBufferSize = 4096 })
{
Memory<byte> buffer = new byte[dataSize];
await reader.ReadExactlyAsync(buffer.Slice(0, 3175));
reader.Position = 3303;
await reader.ReadExactlyAsync(buffer.Slice(0, 3107));
Equal(expected.Slice(3303, 3107), buffer.Slice(0, 3107));
}
}

[Fact]
public static void RegressionIssue256()
{
const int dataSize = 128 + 3105 + 66 + 3111 + 66 + 3105 + 66 + 2513 + 128;
ReadOnlySpan<byte> expected = RandomBytes(dataSize);
using var ms = new MemoryStream();

using (var buffered = new PoolingBufferedStream(ms, leaveOpen: true) { MaxBufferSize = 8192 })
{
buffered.Write(expected);
buffered.Flush();
}

ms.Position = 0;
using (var reader = new PoolingBufferedStream(ms, leaveOpen: true) { MaxBufferSize = 4096 })
{
Span<byte> buffer = new byte[dataSize];
reader.ReadExactly(buffer.Slice(0, 3175));
reader.Position = 3303;
reader.ReadExactly(buffer.Slice(0, 3107));
Equal(expected.Slice(3303, 3107), buffer.Slice(0, 3107));
}
}

[Fact]
public static void RepeatableReads()
{
var bytes = RandomBytes(128);
using var reader = new PoolingBufferedStream(new MemoryStream(bytes)) { MaxBufferSize = 256 };
True(reader.Read());
False(reader.Read());
True(reader.HasBufferedDataToRead);

Equal(bytes, reader.As<IBufferedReader>().Buffer);
}

[Fact]
public static void ReadEmpty()
{
var bytes = RandomBytes(128);
using var reader = new PoolingBufferedStream(new MemoryStream(bytes)) { MaxBufferSize = 256 };
True(reader.Read());
reader.ReadExactly(new byte[bytes.Length]);
False(reader.Read());
False(reader.HasBufferedDataToRead);
}
}

0 comments on commit 4d875d1

Please sign in to comment.