Skip to content

Commit

Permalink
Fix DbDataReader implementations to provide async Read where it was m…
Browse files Browse the repository at this point in the history
…issing.
  • Loading branch information
MarkPflug committed Jun 20, 2024
1 parent ecc5ff3 commit e6b6e0e
Show file tree
Hide file tree
Showing 5 changed files with 93 additions and 3 deletions.
14 changes: 11 additions & 3 deletions source/Sylvan.Data.Csv.Tests/BoundedDataReader.cs
Original file line number Diff line number Diff line change
@@ -1,16 +1,24 @@
using System.Data.Common;
using System;
using System.Data.Common;
using System.Threading;
using System.Threading.Tasks;

namespace Sylvan.Data.Csv;

class BoundedDataReader : DataReaderAdapter
{
int rows;
readonly int rows;
int count = 0;

public BoundedDataReader(DbDataReader dr, int rows) : base(dr)
{
this.rows = rows;
}
}

public override Task<bool> ReadAsync(CancellationToken cancellationToken)
{
throw new NotImplementedException();
}

public override bool Read()
{
Expand Down
8 changes: 8 additions & 0 deletions source/Sylvan.Data/ProgressReader.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
using System;
using System.Data.Common;
using System.Threading;
using System.Threading.Tasks;

namespace Sylvan.Data;

Expand All @@ -14,6 +16,12 @@ public ProgressDataReader(DbDataReader dr, Action<int> progressCallback) : base(
this.progressCallback = progressCallback;
}

public override Task<bool> ReadAsync(CancellationToken cancellationToken)
{
progressCallback(row++);
return base.ReadAsync(cancellationToken);
}

public override bool Read()
{
progressCallback(row++);
Expand Down
25 changes: 25 additions & 0 deletions source/Sylvan.Data/SkipTakeDataReader.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
using System.Data.Common;
using System.Threading;
using System.Threading.Tasks;

namespace Sylvan.Data;

Expand All @@ -13,6 +15,29 @@ public SkipTakeDataReader(DbDataReader dr, long skip, long take) : base(dr)
this.take = take;
}

public override async Task<bool> ReadAsync(CancellationToken cancellationToken)
{
while (skip > 0)
{
skip -= 1;
if (!await Reader.ReadAsync(cancellationToken).ConfigureAwait(false))
{
return false;
}
}
switch (take)
{
case 0:
break;
default:
take -= 1;
goto case -1;
case -1:
return await Reader.ReadAsync(cancellationToken).ConfigureAwait(false);
}
return false;
}

public override bool Read()
{
while (skip > 0)
Expand Down
21 changes: 21 additions & 0 deletions source/Sylvan.Data/TakeWhileDataReader.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
using System;
using System.Data.Common;
using System.Threading;
using System.Threading.Tasks;

namespace Sylvan.Data;

Expand All @@ -14,6 +16,25 @@ public TakeWhileDataReader(DbDataReader dr, Func<DbDataReader, bool> predicate)
this.state = 0;
}

public override async Task<bool> ReadAsync(CancellationToken cancellationToken)
{
if (state == 0)
{
if (!await Reader.ReadAsync(cancellationToken).ConfigureAwait(false))
{
this.state = 1;
return false;
}
if (!predicate(this))
{
this.state = 2;
return false;
}
return true;
}
return false;
}

public override bool Read()
{
if (state == 0)
Expand Down
28 changes: 28 additions & 0 deletions source/Sylvan.Data/ValidatingDataReader.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
using System.Collections.ObjectModel;
using System.Data.Common;
using System.Linq.Expressions;
using System.Threading;
using System.Threading.Tasks;

namespace Sylvan.Data;

Expand Down Expand Up @@ -745,6 +747,16 @@ public override bool IsDBNull(int ordinal)
return !cache[ordinal].HasValue;
}

public override async Task<bool> NextResultAsync(CancellationToken cancellationToken)
{
if (await inner.NextResultAsync(cancellationToken).ConfigureAwait(false))
{
InitializeSchema();
return true;
}
return false;
}

public override bool NextResult()
{
if (inner.NextResult())
Expand All @@ -755,6 +767,22 @@ public override bool NextResult()
return false;
}

public override async Task<bool> ReadAsync(CancellationToken cancellationToken)
{
while (await inner.ReadAsync(cancellationToken).ConfigureAwait(false))
{
// if the current row has valid data
if (ProcessRow())
{
// return it
return true;
}
// otherwise move to the next row
}
this.rowNumber = -1;
return false;
}

public override bool Read()
{
while (inner.Read())
Expand Down

0 comments on commit e6b6e0e

Please sign in to comment.