diff --git a/src/FirebirdSql.Data.FirebirdClient.Tests/BlobStreamTests.cs b/src/FirebirdSql.Data.FirebirdClient.Tests/BlobStreamTests.cs new file mode 100644 index 000000000..6631ff77a --- /dev/null +++ b/src/FirebirdSql.Data.FirebirdClient.Tests/BlobStreamTests.cs @@ -0,0 +1,86 @@ +using System.IO; +using System.Security.Cryptography; +using System.Threading.Tasks; +using FirebirdSql.Data.TestsBase; +using NUnit.Framework; + +namespace FirebirdSql.Data.FirebirdClient.Tests; + +[TestFixtureSource(typeof(FbServerTypeTestFixtureSource), nameof(FbServerTypeTestFixtureSource.Default))] +[TestFixtureSource(typeof(FbServerTypeTestFixtureSource), nameof(FbServerTypeTestFixtureSource.Embedded))] +public class BlobStreamTests : FbTestsBase +{ + public BlobStreamTests(FbServerType serverType, bool compression, FbWireCrypt wireCrypt) + : base(serverType, compression, wireCrypt) + { } + + [Test] + public async Task FbBlobStreamReadTest() + { + var id_value = RandomNumberGenerator.GetInt32(int.MinValue, int.MaxValue); + var insert_values = RandomNumberGenerator.GetBytes(100000 * 4); + + await using (var transaction = await Connection.BeginTransactionAsync()) + { + await using (var insert = new FbCommand("INSERT INTO TEST (int_field, blob_field) values(@int_field, @blob_field)", Connection, transaction)) + { + insert.Parameters.Add("@int_field", FbDbType.Integer).Value = id_value; + insert.Parameters.Add("@blob_field", FbDbType.Binary).Value = insert_values; + await insert.ExecuteNonQueryAsync(); + } + await transaction.CommitAsync(); + } + + await using (var select = new FbCommand($"SELECT blob_field FROM TEST WHERE int_field = {id_value}", Connection)) + { + await using var reader = await select.ExecuteReaderAsync(); + while (await reader.ReadAsync()) + { + await using var output = new MemoryStream(); + await using (var stream = reader.GetStream(0)) + { + await stream.CopyToAsync(output); + } + + var select_values = output.ToArray(); + CollectionAssert.AreEqual(insert_values, select_values); + } + } + } + + [Test] + public async Task FbBlobStreamWriteTest() + { + var id_value = RandomNumberGenerator.GetInt32(int.MinValue, int.MaxValue); + var insert_values = RandomNumberGenerator.GetBytes(100000 * 4); + + await using (var transaction = await Connection.BeginTransactionAsync()) + { + await using (var insert = new FbCommand("INSERT INTO TEST (int_field, blob_field) values(@int_field, @blob_field)", Connection, transaction)) + { + insert.Parameters.Add("@int_field", FbDbType.Integer).Value = id_value; + insert.Parameters.Add("@blob_field", FbDbType.Binary).Value = insert_values; + await insert.ExecuteNonQueryAsync(); + } + + await using (var select = new FbCommand($"SELECT blob_field FROM TEST WHERE int_field = {id_value}", Connection, transaction)) + { + await using var reader = await select.ExecuteReaderAsync(); + while (await reader.ReadAsync()) + { + await using var stream = reader.GetStream(0); + await stream.WriteAsync(insert_values); + + break; + } + } + await transaction.CommitAsync(); + } + + await using (var select = new FbCommand($"SELECT blob_field FROM TEST WHERE int_field = {id_value}", Connection)) + { + var select_values = (byte[])await select.ExecuteScalarAsync(); + CollectionAssert.AreEqual(insert_values, select_values); + } + } +} \ No newline at end of file diff --git a/src/FirebirdSql.Data.FirebirdClient.Tests/FbBlobTests.cs b/src/FirebirdSql.Data.FirebirdClient.Tests/FbBlobTests.cs index 61537d06b..d81d3a3ed 100644 --- a/src/FirebirdSql.Data.FirebirdClient.Tests/FbBlobTests.cs +++ b/src/FirebirdSql.Data.FirebirdClient.Tests/FbBlobTests.cs @@ -44,6 +44,7 @@ public async Task BinaryBlobTest() insert.Parameters.Add("@blob_field", FbDbType.Binary).Value = insert_values; await insert.ExecuteNonQueryAsync(); } + await transaction.CommitAsync(); } diff --git a/src/FirebirdSql.Data.FirebirdClient/Client/Managed/Version10/GdsBlob.cs b/src/FirebirdSql.Data.FirebirdClient/Client/Managed/Version10/GdsBlob.cs index 0b5fc37eb..8436eeced 100644 --- a/src/FirebirdSql.Data.FirebirdClient/Client/Managed/Version10/GdsBlob.cs +++ b/src/FirebirdSql.Data.FirebirdClient/Client/Managed/Version10/GdsBlob.cs @@ -69,7 +69,7 @@ public GdsBlob(GdsDatabase database, GdsTransaction transaction, long blobId) #region Protected Methods - protected override void Create() + public override void Create() { try { @@ -81,7 +81,7 @@ protected override void Create() throw; } } - protected override async ValueTask CreateAsync(CancellationToken cancellationToken = default) + public override async ValueTask CreateAsync(CancellationToken cancellationToken = default) { try { @@ -94,7 +94,7 @@ protected override async ValueTask CreateAsync(CancellationToken cancellationTok } } - protected override void Open() + public override void Open() { try { @@ -105,7 +105,7 @@ protected override void Open() throw; } } - protected override async ValueTask OpenAsync(CancellationToken cancellationToken = default) + public override async ValueTask OpenAsync(CancellationToken cancellationToken = default) { try { @@ -117,7 +117,87 @@ protected override async ValueTask OpenAsync(CancellationToken cancellationToken } } - protected override void GetSegment(Stream stream) + public override int GetLength() + { + try + { + if (!IsOpen) + Open(); + + var bufferLength = 20; + var buffer = new byte[bufferLength]; + + _database.Xdr.Write(IscCodes.op_info_blob); + _database.Xdr.Write(_blobHandle); + _database.Xdr.Write(0); + _database.Xdr.WriteBuffer(new byte[] { IscCodes.isc_info_blob_total_length }, 1); + _database.Xdr.Write(bufferLength); + + _database.Xdr.Flush(); + + var response = (GenericResponse)_database.ReadResponse(); + + var responseLength = bufferLength; + + if (response.Data.Length < bufferLength) + { + responseLength = response.Data.Length; + } + + Buffer.BlockCopy(response.Data, 0, buffer, 0, responseLength); + + var length = IscHelper.VaxInteger(buffer, 1, 2); + var size = IscHelper.VaxInteger(buffer, 3, (int)length); + + return (int)size; + } + catch (IOException ex) + { + throw IscException.ForIOException(ex); + } + } + + public override async ValueTask GetLengthAsync(CancellationToken cancellationToken = default) + { + try + { + if (!IsOpen) + await OpenAsync(cancellationToken).ConfigureAwait(false); + + var bufferLength = 20; + var buffer = new byte[bufferLength]; + + await _database.Xdr.WriteAsync(IscCodes.op_info_blob, cancellationToken).ConfigureAwait(false); + await _database.Xdr.WriteAsync(_blobHandle, cancellationToken).ConfigureAwait(false); + await _database.Xdr.WriteAsync(0, cancellationToken).ConfigureAwait(false); + await _database.Xdr.WriteBufferAsync(new byte[] { IscCodes.isc_info_blob_total_length }, 1, cancellationToken).ConfigureAwait(false); + await _database.Xdr.WriteAsync(bufferLength, cancellationToken).ConfigureAwait(false); + + await _database.Xdr.FlushAsync(cancellationToken).ConfigureAwait(false); + + var response = (GenericResponse)await _database.ReadResponseAsync(cancellationToken).ConfigureAwait(false); + + var responseLength = bufferLength; + + if (response.Data.Length < bufferLength) + { + responseLength = response.Data.Length; + } + + Buffer.BlockCopy(response.Data, 0, buffer, 0, responseLength); + + var length = IscHelper.VaxInteger(buffer, 1, 2); + var size = IscHelper.VaxInteger(buffer, 3, (int)length); + + return (int)size; + } + catch (IOException ex) + { + throw IscException.ForIOException(ex); + } + } + + public override void GetSegment(Stream stream) { var requested = SegmentSize; @@ -166,7 +246,7 @@ protected override void GetSegment(Stream stream) throw IscException.ForIOException(ex); } } - protected override async ValueTask GetSegmentAsync(Stream stream, CancellationToken cancellationToken = default) + public override async ValueTask GetSegmentAsync(Stream stream, CancellationToken cancellationToken = default) { var requested = SegmentSize; @@ -194,7 +274,7 @@ protected override async ValueTask GetSegmentAsync(Stream stream, CancellationTo if (buffer.Length == 0) { - // previous segment was last, this has no data + //previous segment was last, this has no data return; } @@ -216,7 +296,120 @@ protected override async ValueTask GetSegmentAsync(Stream stream, CancellationTo } } - protected override void PutSegment(byte[] buffer) + public override byte[] GetSegment() + { + var requested = SegmentSize; + + try + { + _database.Xdr.Write(IscCodes.op_get_segment); + _database.Xdr.Write(_blobHandle); + _database.Xdr.Write(requested < short.MaxValue - 12 ? requested : short.MaxValue - 12); + _database.Xdr.Write(DataSegment); + _database.Xdr.Flush(); + + var response = (GenericResponse)_database.ReadResponse(); + + RblRemoveValue(IscCodes.RBL_segment); + if (response.ObjectHandle == 1) + { + RblAddValue(IscCodes.RBL_segment); + } + else if (response.ObjectHandle == 2) + { + RblAddValue(IscCodes.RBL_eof_pending); + } + + var buffer = response.Data; + + if (buffer.Length == 0) + { + //previous segment was last, this has no data + return Array.Empty(); + } + + var posInInput = 0; + var posInOutput = 0; + + var tmp = new byte[requested * 2]; + while (posInInput < buffer.Length) + { + var len = (int)IscHelper.VaxInteger(buffer, posInInput, 2); + posInInput += 2; + + Array.Copy(buffer, posInInput, tmp, posInOutput, len); + posInOutput += len; + posInInput += len; + } + + var actualBuffer = new byte[posInOutput]; + Array.Copy(tmp, actualBuffer, posInOutput); + + return actualBuffer; + } + catch (IOException ex) + { + throw IscException.ForIOException(ex); + } + } + public override async ValueTask GetSegmentAsync(CancellationToken cancellationToken = default) + { + var requested = SegmentSize; + + try + { + await _database.Xdr.WriteAsync(IscCodes.op_get_segment, cancellationToken).ConfigureAwait(false); + await _database.Xdr.WriteAsync(_blobHandle, cancellationToken).ConfigureAwait(false); + await _database.Xdr.WriteAsync(requested < short.MaxValue - 12 ? requested : short.MaxValue - 12, cancellationToken).ConfigureAwait(false); + await _database.Xdr.WriteAsync(DataSegment, cancellationToken).ConfigureAwait(false); + await _database.Xdr.FlushAsync(cancellationToken).ConfigureAwait(false); + + var response = (GenericResponse)await _database.ReadResponseAsync(cancellationToken).ConfigureAwait(false); + + RblRemoveValue(IscCodes.RBL_segment); + if (response.ObjectHandle == 1) + { + RblAddValue(IscCodes.RBL_segment); + } + else if (response.ObjectHandle == 2) + { + RblAddValue(IscCodes.RBL_eof_pending); + } + + var buffer = response.Data; + + if (buffer.Length == 0) + { + // previous segment was last, this has no data + return Array.Empty(); + } + + var posInInput = 0; + var posInOutput = 0; + + var tmp = new byte[requested * 2]; + while (posInInput < buffer.Length) + { + var len = (int)IscHelper.VaxInteger(buffer, posInInput, 2); + posInInput += 2; + + Array.Copy(buffer, posInInput, tmp, posInOutput, len); + posInOutput += len; + posInInput += len; + } + + var actualBuffer = new byte[posInOutput]; + Array.Copy(tmp, actualBuffer, posInOutput); + + return actualBuffer; + } + catch (IOException ex) + { + throw IscException.ForIOException(ex); + } + } + + public override void PutSegment(byte[] buffer) { try { @@ -232,7 +425,7 @@ protected override void PutSegment(byte[] buffer) throw IscException.ForIOException(ex); } } - protected override async ValueTask PutSegmentAsync(byte[] buffer, CancellationToken cancellationToken = default) + public override async ValueTask PutSegmentAsync(byte[] buffer, CancellationToken cancellationToken = default) { try { @@ -249,14 +442,14 @@ protected override async ValueTask PutSegmentAsync(byte[] buffer, CancellationTo } } - protected override void Seek(int position) + public override void Seek(int offset, int seekMode) { try { _database.Xdr.Write(IscCodes.op_seek_blob); _database.Xdr.Write(_blobHandle); - _database.Xdr.Write(SeekMode); - _database.Xdr.Write(position); + _database.Xdr.Write(seekMode); + _database.Xdr.Write(offset); _database.Xdr.Flush(); var response = (GenericResponse)_database.ReadResponse(); @@ -268,14 +461,14 @@ protected override void Seek(int position) throw IscException.ForIOException(ex); } } - protected override async ValueTask SeekAsync(int position, CancellationToken cancellationToken = default) + public override async ValueTask SeekAsync(int offset, int seekMode, CancellationToken cancellationToken = default) { try { await _database.Xdr.WriteAsync(IscCodes.op_seek_blob, cancellationToken).ConfigureAwait(false); await _database.Xdr.WriteAsync(_blobHandle, cancellationToken).ConfigureAwait(false); - await _database.Xdr.WriteAsync(SeekMode, cancellationToken).ConfigureAwait(false); - await _database.Xdr.WriteAsync(position, cancellationToken).ConfigureAwait(false); + await _database.Xdr.WriteAsync(seekMode, cancellationToken).ConfigureAwait(false); + await _database.Xdr.WriteAsync(offset, cancellationToken).ConfigureAwait(false); await _database.Xdr.FlushAsync(cancellationToken).ConfigureAwait(false); var response = (GenericResponse)await _database.ReadResponseAsync(cancellationToken).ConfigureAwait(false); @@ -288,20 +481,20 @@ protected override async ValueTask SeekAsync(int position, CancellationToken can } } - protected override void Close() + public override void Close() { _database.ReleaseObject(IscCodes.op_close_blob, _blobHandle); } - protected override ValueTask CloseAsync(CancellationToken cancellationToken = default) + public override ValueTask CloseAsync(CancellationToken cancellationToken = default) { return _database.ReleaseObjectAsync(IscCodes.op_close_blob, _blobHandle, cancellationToken); } - protected override void Cancel() + public override void Cancel() { _database.ReleaseObject(IscCodes.op_cancel_blob, _blobHandle); } - protected override ValueTask CancelAsync(CancellationToken cancellationToken = default) + public override ValueTask CancelAsync(CancellationToken cancellationToken = default) { return _database.ReleaseObjectAsync(IscCodes.op_cancel_blob, _blobHandle, cancellationToken); } @@ -327,6 +520,7 @@ private void CreateOrOpen(int op, BlobParameterBuffer bpb) _blobId = response.BlobId; _blobHandle = response.ObjectHandle; + _isOpen = true; } catch (IOException ex) { @@ -350,6 +544,7 @@ private async ValueTask CreateOrOpenAsync(int op, BlobParameterBuffer bpb, Cance _blobId = response.BlobId; _blobHandle = response.ObjectHandle; + _isOpen = true; } catch (IOException ex) { diff --git a/src/FirebirdSql.Data.FirebirdClient/Client/Native/FesBlob.cs b/src/FirebirdSql.Data.FirebirdClient/Client/Native/FesBlob.cs index 6ca97a51f..b363b5b3e 100644 --- a/src/FirebirdSql.Data.FirebirdClient/Client/Native/FesBlob.cs +++ b/src/FirebirdSql.Data.FirebirdClient/Client/Native/FesBlob.cs @@ -70,7 +70,7 @@ public FesBlob(FesDatabase database, FesTransaction transaction, long blobId) #region Protected Methods - protected override void Create() + public override void Create() { ClearStatusVector(); @@ -88,9 +88,11 @@ protected override void Create() _database.ProcessStatusVector(_statusVector); + _isOpen = true; + RblAddValue(IscCodes.RBL_create); } - protected override ValueTask CreateAsync(CancellationToken cancellationToken = default) + public override ValueTask CreateAsync(CancellationToken cancellationToken = default) { ClearStatusVector(); @@ -108,12 +110,14 @@ protected override ValueTask CreateAsync(CancellationToken cancellationToken = d _database.ProcessStatusVector(_statusVector); + _isOpen = true; + RblAddValue(IscCodes.RBL_create); return ValueTask2.CompletedTask; } - protected override void Open() + public override void Open() { ClearStatusVector(); @@ -130,8 +134,10 @@ protected override void Open() new byte[0]); _database.ProcessStatusVector(_statusVector); + + _isOpen = true; } - protected override ValueTask OpenAsync(CancellationToken cancellationToken = default) + public override ValueTask OpenAsync(CancellationToken cancellationToken = default) { ClearStatusVector(); @@ -149,10 +155,56 @@ protected override ValueTask OpenAsync(CancellationToken cancellationToken = def _database.ProcessStatusVector(_statusVector); + _isOpen = true; + return ValueTask2.CompletedTask; } - protected override void GetSegment(Stream stream) + public override int GetLength() + { + ClearStatusVector(); + + var buffer = new byte[20]; + + _database.FbClient.isc_blob_info( + _statusVector, + ref _blobHandle, + 1, + new byte[] { IscCodes.isc_info_blob_total_length }, + (short)buffer.Length, + buffer); + + _database.ProcessStatusVector(_statusVector); + + var length = IscHelper.VaxInteger(buffer, 1, 2); + var size = IscHelper.VaxInteger(buffer, 3, (int)length); + + return (int)size; + } + + public override ValueTask GetLengthAsync(CancellationToken cancellationToken = default) + { + ClearStatusVector(); + + var buffer = new byte[20]; + + _database.FbClient.isc_blob_info( + _statusVector, + ref _blobHandle, + 1, + new byte[] { IscCodes.isc_info_blob_total_length }, + (short)buffer.Length, + buffer); + + _database.ProcessStatusVector(_statusVector); + + var length = IscHelper.VaxInteger(buffer, 1, 2); + var size = IscHelper.VaxInteger(buffer, 3, (int)length); + + return ValueTask2.FromResult((int)size); + } + + public override void GetSegment(Stream stream) { var requested = (short)SegmentSize; short segmentLength = 0; @@ -168,7 +220,6 @@ protected override void GetSegment(Stream stream) requested, tmp); - RblRemoveValue(IscCodes.RBL_segment); if (_statusVector[1] == new IntPtr(IscCodes.isc_segstr_eof)) @@ -190,7 +241,7 @@ protected override void GetSegment(Stream stream) stream.Write(tmp, 0, segmentLength); } - protected override ValueTask GetSegmentAsync(Stream stream, CancellationToken cancellationToken = default) + public override ValueTask GetSegmentAsync(Stream stream, CancellationToken cancellationToken = default) { var requested = (short)SegmentSize; short segmentLength = 0; @@ -231,7 +282,98 @@ protected override ValueTask GetSegmentAsync(Stream stream, CancellationToken ca return ValueTask2.CompletedTask; } - protected override void PutSegment(byte[] buffer) + public override byte[] GetSegment() + { + var requested = (short)(SegmentSize - 2); + short segmentLength = 0; + + ClearStatusVector(); + + var tmp = new byte[requested]; + + var status = _database.FbClient.isc_get_segment( + _statusVector, + ref _blobHandle, + ref segmentLength, + requested, + tmp); + + + RblRemoveValue(IscCodes.RBL_segment); + + if (_statusVector[1] == new IntPtr(IscCodes.isc_segstr_eof)) + { + RblAddValue(IscCodes.RBL_eof_pending); + return Array.Empty(); + } + + if (status == IntPtr.Zero || _statusVector[1] == new IntPtr(IscCodes.isc_segment)) + { + RblAddValue(IscCodes.RBL_segment); + } + else + { + _database.ProcessStatusVector(_statusVector); + } + + var actualSegment = tmp; + if (actualSegment.Length != segmentLength) + { + tmp = new byte[segmentLength]; + Array.Copy(actualSegment, tmp, segmentLength); + actualSegment = tmp; + } + + return actualSegment; + } + public override ValueTask GetSegmentAsync(CancellationToken cancellationToken = default) + { + var requested = (short)SegmentSize; + short segmentLength = 0; + + ClearStatusVector(); + + var tmp = new byte[requested]; + + var status = _database.FbClient.isc_get_segment( + _statusVector, + ref _blobHandle, + ref segmentLength, + requested, + tmp); + + + RblRemoveValue(IscCodes.RBL_segment); + + if (_statusVector[1] == new IntPtr(IscCodes.isc_segstr_eof)) + { + RblAddValue(IscCodes.RBL_eof_pending); + return ValueTask2.FromResult(Array.Empty()); + } + else + { + if (status == IntPtr.Zero || _statusVector[1] == new IntPtr(IscCodes.isc_segment)) + { + RblAddValue(IscCodes.RBL_segment); + } + else + { + _database.ProcessStatusVector(_statusVector); + } + } + + var actualSegment = tmp; + if (actualSegment.Length != segmentLength) + { + tmp = new byte[segmentLength]; + Array.Copy(actualSegment, tmp, segmentLength); + actualSegment = tmp; + } + + return ValueTask2.FromResult(actualSegment); + } + + public override void PutSegment(byte[] buffer) { ClearStatusVector(); @@ -243,7 +385,7 @@ protected override void PutSegment(byte[] buffer) _database.ProcessStatusVector(_statusVector); } - protected override ValueTask PutSegmentAsync(byte[] buffer, CancellationToken cancellationToken = default) + public override ValueTask PutSegmentAsync(byte[] buffer, CancellationToken cancellationToken = default) { ClearStatusVector(); @@ -258,16 +400,38 @@ protected override ValueTask PutSegmentAsync(byte[] buffer, CancellationToken ca return ValueTask2.CompletedTask; } - protected override void Seek(int position) + public override void Seek(int position, int seekOperation) { - throw new NotSupportedException(); + ClearStatusVector(); + + var resultingPosition = 0; + _database.FbClient.isc_seek_blob( + _statusVector, + ref _blobHandle, + (short)seekOperation, + position, + ref resultingPosition); + + _database.ProcessStatusVector(_statusVector); } - protected override ValueTask SeekAsync(int position, CancellationToken cancellationToken = default) + public override ValueTask SeekAsync(int position, int seekOperation, CancellationToken cancellationToken = default) { - throw new NotSupportedException(); + ClearStatusVector(); + + var resultingPosition = 0; + _database.FbClient.isc_seek_blob( + _statusVector, + ref _blobHandle, + (short)seekOperation, + position, + ref resultingPosition); + + _database.ProcessStatusVector(_statusVector); + + return ValueTask2.CompletedTask; } - protected override void Close() + public override void Close() { ClearStatusVector(); @@ -275,7 +439,7 @@ protected override void Close() _database.ProcessStatusVector(_statusVector); } - protected override ValueTask CloseAsync(CancellationToken cancellationToken = default) + public override ValueTask CloseAsync(CancellationToken cancellationToken = default) { ClearStatusVector(); @@ -286,7 +450,7 @@ protected override ValueTask CloseAsync(CancellationToken cancellationToken = de return ValueTask2.CompletedTask; } - protected override void Cancel() + public override void Cancel() { ClearStatusVector(); @@ -294,7 +458,7 @@ protected override void Cancel() _database.ProcessStatusVector(_statusVector); } - protected override ValueTask CancelAsync(CancellationToken cancellationToken = default) + public override ValueTask CancelAsync(CancellationToken cancellationToken = default) { ClearStatusVector(); diff --git a/src/FirebirdSql.Data.FirebirdClient/Client/Native/IFbClient.cs b/src/FirebirdSql.Data.FirebirdClient/Client/Native/IFbClient.cs index 15aa3148f..f49b09f0a 100644 --- a/src/FirebirdSql.Data.FirebirdClient/Client/Native/IFbClient.cs +++ b/src/FirebirdSql.Data.FirebirdClient/Client/Native/IFbClient.cs @@ -23,8 +23,8 @@ namespace FirebirdSql.Data.Client.Native; /// -/// This is the interface that the dynamically-generated class uses to call the native library. -/// Each connection can specify different client library to use even on the same OS. +/// This is the interface that the dynamically-generated class uses to call the native library. +/// Each connection can specify different client library to use even on the same OS. /// IFbClient and FbClientactory classes are implemented to support this feature. /// Public visibility added, because auto-generated assembly can't work with internal types /// @@ -68,6 +68,14 @@ IntPtr isc_open_blob2( short bpbLength, byte[] bpbAddress); + IntPtr isc_blob_info( + [In, Out] IntPtr[] statusVector, + ref BlobHandle blobHandle, + short itemListBufferLength, + byte[] itemListBuffer, + short resultBufferLength, + byte[] resultBuffer); + IntPtr isc_get_segment( [In, Out] IntPtr[] statusVector, [MarshalAs(UnmanagedType.I4)] ref BlobHandle blobHandle, @@ -81,6 +89,13 @@ IntPtr isc_put_segment( short segBufferLength, byte[] segBuffer); + IntPtr isc_seek_blob( + [In, Out] IntPtr[] statusVector, + [MarshalAs(UnmanagedType.I4)] ref BlobHandle blobHandle, + short mode, + int offset, + ref int resultingBlobPosition); + IntPtr isc_cancel_blob( [In, Out] IntPtr[] statusVector, [MarshalAs(UnmanagedType.I4)] ref BlobHandle blobHandle); @@ -260,4 +275,4 @@ IntPtr isc_transaction_info( byte[] resultBuffer); #pragma warning restore IDE1006 -} +} \ No newline at end of file diff --git a/src/FirebirdSql.Data.FirebirdClient/Common/BlobBase.cs b/src/FirebirdSql.Data.FirebirdClient/Common/BlobBase.cs index 828b80c2d..7214d5e8c 100644 --- a/src/FirebirdSql.Data.FirebirdClient/Common/BlobBase.cs +++ b/src/FirebirdSql.Data.FirebirdClient/Common/BlobBase.cs @@ -29,14 +29,17 @@ internal abstract class BlobBase private int _segmentSize; protected long _blobId; + protected bool _isOpen; protected int _position; protected TransactionBase _transaction; public abstract int Handle { get; } public long Id => _blobId; public bool EOF => (_rblFlags & IscCodes.RBL_eof_pending) != 0; + public bool IsOpen => _isOpen; - protected int SegmentSize => _segmentSize; + public int SegmentSize => _segmentSize; + public int Position => _position; public abstract DatabaseBase Database { get; } @@ -203,26 +206,32 @@ public async ValueTask WriteAsync(byte[] buffer, int index, int count, Cancellat } } - protected abstract void Create(); - protected abstract ValueTask CreateAsync(CancellationToken cancellationToken = default); + public abstract void Create(); + public abstract ValueTask CreateAsync(CancellationToken cancellationToken = default); - protected abstract void Open(); - protected abstract ValueTask OpenAsync(CancellationToken cancellationToken = default); + public abstract void Open(); + public abstract ValueTask OpenAsync(CancellationToken cancellationToken = default); - protected abstract void GetSegment(Stream stream); - protected abstract ValueTask GetSegmentAsync(Stream stream, CancellationToken cancellationToken = default); + public abstract int GetLength(); + public abstract ValueTask GetLengthAsync(CancellationToken cancellationToken = default); - protected abstract void PutSegment(byte[] buffer); - protected abstract ValueTask PutSegmentAsync(byte[] buffer, CancellationToken cancellationToken = default); + public abstract byte[] GetSegment(); + public abstract ValueTask GetSegmentAsync(CancellationToken cancellationToken = default); - protected abstract void Seek(int position); - protected abstract ValueTask SeekAsync(int position, CancellationToken cancellationToken = default); + public abstract void GetSegment(Stream stream); + public abstract ValueTask GetSegmentAsync(Stream stream, CancellationToken cancellationToken = default); - protected abstract void Close(); - protected abstract ValueTask CloseAsync(CancellationToken cancellationToken = default); + public abstract void PutSegment(byte[] buffer); + public abstract ValueTask PutSegmentAsync(byte[] buffer, CancellationToken cancellationToken = default); - protected abstract void Cancel(); - protected abstract ValueTask CancelAsync(CancellationToken cancellationToken = default); + public abstract void Seek(int offset, int seekMode); + public abstract ValueTask SeekAsync(int offset, int seekMode, CancellationToken cancellationToken = default); + + public abstract void Close(); + public abstract ValueTask CloseAsync(CancellationToken cancellationToken = default); + + public abstract void Cancel(); + public abstract ValueTask CancelAsync(CancellationToken cancellationToken = default); protected void RblAddValue(int rblValue) { @@ -233,4 +242,4 @@ protected void RblRemoveValue(int rblValue) { _rblFlags &= ~rblValue; } -} +} \ No newline at end of file diff --git a/src/FirebirdSql.Data.FirebirdClient/Common/BlobStream.cs b/src/FirebirdSql.Data.FirebirdClient/Common/BlobStream.cs new file mode 100644 index 000000000..050873a73 --- /dev/null +++ b/src/FirebirdSql.Data.FirebirdClient/Common/BlobStream.cs @@ -0,0 +1,230 @@ +using System; +using System.IO; +using System.Threading; +using System.Threading.Tasks; + +namespace FirebirdSql.Data.Common; + +public sealed class BlobStream : Stream +{ + private readonly BlobBase _blobHandle; + private int _position; + + private byte[] _currentSegment; + private int _segmentPosition; + + private int Available => _currentSegment?.Length - _segmentPosition ?? 0; + + internal BlobStream(BlobBase blob) + { + _blobHandle = blob; + _position = 0; + } + + public override long Position + { + get => _position; + set => Seek(value, SeekOrigin.Begin); + } + + public override long Length + { + get + { + if (!_blobHandle.IsOpen) + _blobHandle.Open(); + + return _blobHandle.GetLength(); + } + } + + public override void Flush() + { + } + + public override int Read(byte[] buffer, int offset, int count) + { + ValidateBufferSize(buffer, offset, count); + + if (!_blobHandle.IsOpen) + _blobHandle.Open(); + + var copied = 0; + var remainingBufferSize = buffer.Length - offset; + do + { + if (remainingBufferSize == 0) + break; + + if (Available > 0) + { + var toCopy = Math.Min(Available, remainingBufferSize); + Array.Copy(_currentSegment, _segmentPosition, buffer, offset + copied, toCopy); + copied += toCopy; + _segmentPosition += toCopy; + remainingBufferSize -= toCopy; + _position += toCopy; + } + + if (_blobHandle.EOF) + break; + + if (Available == 0) + { + _currentSegment = _blobHandle.GetSegment(); + _segmentPosition = 0; + } + } while (copied < count); + + return copied; + } + public override async Task ReadAsync(byte[] buffer, int offset, int count, + CancellationToken cancellationToken) + { + ValidateBufferSize(buffer, offset, count); + + if (!_blobHandle.IsOpen) + await _blobHandle.OpenAsync(cancellationToken).ConfigureAwait(false); + + var copied = 0; + var remainingBufferSize = buffer.Length - offset; + do + { + if (remainingBufferSize == 0) + break; + + if (Available > 0) + { + var toCopy = Math.Min(Available, remainingBufferSize); + Array.Copy(_currentSegment, _segmentPosition, buffer, offset + copied, toCopy); + copied += toCopy; + _segmentPosition += toCopy; + remainingBufferSize -= toCopy; + _position += toCopy; + } + + if (_blobHandle.EOF) + break; + + if (Available == 0) + { + _currentSegment = await _blobHandle.GetSegmentAsync(cancellationToken).ConfigureAwait(false); + _segmentPosition = 0; + } + } while (copied < count); + + return copied; + } + + public override long Seek(long offset, SeekOrigin origin) + { + if (!_blobHandle.IsOpen) + _blobHandle.Open(); + + var seekMode = origin switch + { + SeekOrigin.Begin => IscCodes.isc_blb_seek_from_head, + SeekOrigin.Current => IscCodes.isc_blb_seek_relative, + SeekOrigin.End => IscCodes.isc_blb_seek_from_tail, + _ => throw new ArgumentOutOfRangeException(nameof(origin)) + }; + + _blobHandle.Seek((int)offset, seekMode); + return _position = _blobHandle.Position; + } + + public override void SetLength(long value) + { + throw new NotSupportedException(); + } + + public override void Write(byte[] buffer, int offset, int count) + { + try + { + if (!_blobHandle.IsOpen) + _blobHandle.Create(); + + var chunk = count >= _blobHandle.SegmentSize ? _blobHandle.SegmentSize : count; + var tmpBuffer = new byte[chunk]; + + while (count > 0) + { + if (chunk > count) + { + chunk = count; + tmpBuffer = new byte[chunk]; + } + + Array.Copy(buffer, offset, tmpBuffer, 0, chunk); + _blobHandle.PutSegment(tmpBuffer); + + offset += chunk; + count -= chunk; + _position += chunk; + } + } + catch + { + _blobHandle.Cancel(); + throw; + } + } + public override async Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) + { + try + { + if (!_blobHandle.IsOpen) + await _blobHandle.CreateAsync(cancellationToken).ConfigureAwait(false); + + var chunk = count >= _blobHandle.SegmentSize ? _blobHandle.SegmentSize : count; + var tmpBuffer = new byte[chunk]; + + while (count > 0) + { + if (chunk > count) + { + chunk = count; + tmpBuffer = new byte[chunk]; + } + + Array.Copy(buffer, offset, tmpBuffer, 0, chunk); + await _blobHandle.PutSegmentAsync(tmpBuffer, cancellationToken).ConfigureAwait(false); + + offset += chunk; + count -= chunk; + _position += chunk; + } + } + catch + { + await _blobHandle.CancelAsync(cancellationToken).ConfigureAwait(false); + throw; + } + } + + public override bool CanRead => true; + public override bool CanSeek => true; + public override bool CanWrite => true; + + protected override void Dispose(bool disposing) + { + _blobHandle.Close(); + } + +#if !(NET48 || NETSTANDARD2_0) + public override ValueTask DisposeAsync() + { + return _blobHandle.CloseAsync(); + } +#endif + + private static void ValidateBufferSize(byte[] buffer, int offset, int count) + { + if (buffer is null) + throw new ArgumentNullException(nameof(buffer)); + + if (buffer.Length < offset + count) + throw new InvalidOperationException(); + } +} \ No newline at end of file diff --git a/src/FirebirdSql.Data.FirebirdClient/Common/DbValue.cs b/src/FirebirdSql.Data.FirebirdClient/Common/DbValue.cs index f799b5acd..b0656d3e2 100644 --- a/src/FirebirdSql.Data.FirebirdClient/Common/DbValue.cs +++ b/src/FirebirdSql.Data.FirebirdClient/Common/DbValue.cs @@ -20,6 +20,7 @@ using System.Numerics; using System.Threading; using System.Threading.Tasks; +using FirebirdSql.Data.FirebirdClient; using FirebirdSql.Data.Types; namespace FirebirdSql.Data.Common; @@ -331,6 +332,21 @@ public async ValueTask GetBinaryAsync(CancellationToken cancellationToke return (byte[])_value; } + public BlobStream GetBinaryStream() + { + if (_value is not long l) + throw new NotSupportedException(); + + return GetBlobStream(l); + } + public ValueTask GetBinaryStreamAsync(CancellationToken cancellationToken = default) + { + if (_value is not long l) + throw new NotSupportedException(); + + return GetBlobStreamAsync(l, cancellationToken); + } + public int GetDate() { return _value switch @@ -854,6 +870,17 @@ private ValueTask GetBlobDataAsync(long blobId, CancellationToken cancel return blob.ReadAsync(cancellationToken); } + private BlobStream GetBlobStream(long blobId) + { + var blob = _statement.CreateBlob(blobId); + return new BlobStream(blob); + } + private ValueTask GetBlobStreamAsync(long blobId, CancellationToken cancellationToken = default) + { + var blob = _statement.CreateBlob(blobId); + return ValueTask2.FromResult(new BlobStream(blob)); + } + private Array GetArrayData(long handle) { if (_field.ArrayHandle == null) diff --git a/src/FirebirdSql.Data.FirebirdClient/Common/IscCodes.cs b/src/FirebirdSql.Data.FirebirdClient/Common/IscCodes.cs index 40bb78da9..25a9d29a6 100644 --- a/src/FirebirdSql.Data.FirebirdClient/Common/IscCodes.cs +++ b/src/FirebirdSql.Data.FirebirdClient/Common/IscCodes.cs @@ -941,6 +941,10 @@ internal static class IscCodes public const int RBL_eof_pending = 4; public const int RBL_create = 8; + public const int isc_blb_seek_from_head = 0; + public const int isc_blb_seek_relative = 1; + public const int isc_blb_seek_from_tail = 2; + #endregion #region Blob Information diff --git a/src/FirebirdSql.Data.FirebirdClient/FirebirdClient/FbDataReader.cs b/src/FirebirdSql.Data.FirebirdClient/FirebirdClient/FbDataReader.cs index 7972ee045..de8cebb2e 100644 --- a/src/FirebirdSql.Data.FirebirdClient/FirebirdClient/FbDataReader.cs +++ b/src/FirebirdSql.Data.FirebirdClient/FirebirdClient/FbDataReader.cs @@ -21,6 +21,7 @@ using System.ComponentModel; using System.Data; using System.Data.Common; +using System.IO; using System.Linq; using System.Numerics; using System.Runtime.CompilerServices; @@ -978,6 +979,15 @@ public override DateTime GetDateTime(int i) return GetFieldValue(i); } + public override Stream GetStream(int i) + { + CheckState(); + CheckPosition(); + CheckIndex(i); + + return _row[i].GetBinaryStream(); + } + public override bool IsDBNull(int i) { CheckState();