diff --git a/source/Sylvan.Data.Csv.Tests/BoundedDataReader.cs b/source/Sylvan.Data.Csv.Tests/BoundedDataReader.cs index 922f59c..482b435 100644 --- a/source/Sylvan.Data.Csv.Tests/BoundedDataReader.cs +++ b/source/Sylvan.Data.Csv.Tests/BoundedDataReader.cs @@ -1,16 +1,24 @@ -using System.Data.Common; +using System; +using System.Data.Common; +using System.Threading; +using System.Threading.Tasks; namespace Sylvan.Data.Csv; class BoundedDataReader : DataReaderAdapter { - int rows; + readonly int rows; int count = 0; public BoundedDataReader(DbDataReader dr, int rows) : base(dr) { this.rows = rows; - } + } + + public override Task ReadAsync(CancellationToken cancellationToken) + { + throw new NotImplementedException(); + } public override bool Read() { diff --git a/source/Sylvan.Data/ProgressReader.cs b/source/Sylvan.Data/ProgressReader.cs index f8d651f..e4d4774 100644 --- a/source/Sylvan.Data/ProgressReader.cs +++ b/source/Sylvan.Data/ProgressReader.cs @@ -1,5 +1,7 @@ using System; using System.Data.Common; +using System.Threading; +using System.Threading.Tasks; namespace Sylvan.Data; @@ -14,6 +16,12 @@ public ProgressDataReader(DbDataReader dr, Action progressCallback) : base( this.progressCallback = progressCallback; } + public override Task ReadAsync(CancellationToken cancellationToken) + { + progressCallback(row++); + return base.ReadAsync(cancellationToken); + } + public override bool Read() { progressCallback(row++); diff --git a/source/Sylvan.Data/SkipTakeDataReader.cs b/source/Sylvan.Data/SkipTakeDataReader.cs index c90f86d..ae36c74 100644 --- a/source/Sylvan.Data/SkipTakeDataReader.cs +++ b/source/Sylvan.Data/SkipTakeDataReader.cs @@ -1,4 +1,6 @@ using System.Data.Common; +using System.Threading; +using System.Threading.Tasks; namespace Sylvan.Data; @@ -13,6 +15,29 @@ public SkipTakeDataReader(DbDataReader dr, long skip, long take) : base(dr) this.take = take; } + public override async Task ReadAsync(CancellationToken cancellationToken) + { + while (skip > 0) + { + skip -= 1; + if (!await Reader.ReadAsync(cancellationToken).ConfigureAwait(false)) + { + return false; + } + } + switch (take) + { + case 0: + break; + default: + take -= 1; + goto case -1; + case -1: + return await Reader.ReadAsync(cancellationToken).ConfigureAwait(false); + } + return false; + } + public override bool Read() { while (skip > 0) diff --git a/source/Sylvan.Data/TakeWhileDataReader.cs b/source/Sylvan.Data/TakeWhileDataReader.cs index 3ae7853..5d4b406 100644 --- a/source/Sylvan.Data/TakeWhileDataReader.cs +++ b/source/Sylvan.Data/TakeWhileDataReader.cs @@ -1,5 +1,7 @@ using System; using System.Data.Common; +using System.Threading; +using System.Threading.Tasks; namespace Sylvan.Data; @@ -14,6 +16,25 @@ public TakeWhileDataReader(DbDataReader dr, Func predicate) this.state = 0; } + public override async Task ReadAsync(CancellationToken cancellationToken) + { + if (state == 0) + { + if (!await Reader.ReadAsync(cancellationToken).ConfigureAwait(false)) + { + this.state = 1; + return false; + } + if (!predicate(this)) + { + this.state = 2; + return false; + } + return true; + } + return false; + } + public override bool Read() { if (state == 0) diff --git a/source/Sylvan.Data/ValidatingDataReader.cs b/source/Sylvan.Data/ValidatingDataReader.cs index bfd23e7..5c35b7f 100644 --- a/source/Sylvan.Data/ValidatingDataReader.cs +++ b/source/Sylvan.Data/ValidatingDataReader.cs @@ -4,6 +4,8 @@ using System.Collections.ObjectModel; using System.Data.Common; using System.Linq.Expressions; +using System.Threading; +using System.Threading.Tasks; namespace Sylvan.Data; @@ -745,6 +747,16 @@ public override bool IsDBNull(int ordinal) return !cache[ordinal].HasValue; } + public override async Task NextResultAsync(CancellationToken cancellationToken) + { + if (await inner.NextResultAsync(cancellationToken).ConfigureAwait(false)) + { + InitializeSchema(); + return true; + } + return false; + } + public override bool NextResult() { if (inner.NextResult()) @@ -755,6 +767,22 @@ public override bool NextResult() return false; } + public override async Task ReadAsync(CancellationToken cancellationToken) + { + while (await inner.ReadAsync(cancellationToken).ConfigureAwait(false)) + { + // if the current row has valid data + if (ProcessRow()) + { + // return it + return true; + } + // otherwise move to the next row + } + this.rowNumber = -1; + return false; + } + public override bool Read() { while (inner.Read())