Skip to content

Commit d56340b

Browse files
authored
add missing QueryUnbufferedAsync<T> API (#1912)
* impl QueryUnbufferedAsync<T> * implement GridReader.ReadUnbufferedAsync<T>
1 parent 194a0ce commit d56340b

File tree

5 files changed

+256
-31
lines changed

5 files changed

+256
-31
lines changed

Dapper/SqlMapper.Async.cs

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
using System.Data.Common;
66
using System.Globalization;
77
using System.Linq;
8+
using System.Runtime.CompilerServices;
89
using System.Threading;
910
using System.Threading.Tasks;
1011

@@ -1217,5 +1218,79 @@ private static async Task<T> ExecuteScalarImplAsync<T>(IDbConnection cnn, Comman
12171218
}
12181219
return Parse<T>(result);
12191220
}
1221+
1222+
#if NET5_0_OR_GREATER
1223+
/// <summary>
1224+
/// Execute a query asynchronously using <see cref="IAsyncEnumerable{T}"/>.
1225+
/// </summary>
1226+
/// <typeparam name="T">The type of results to return.</typeparam>
1227+
/// <param name="cnn">The connection to query on.</param>
1228+
/// <param name="sql">The SQL to execute for the query.</param>
1229+
/// <param name="param">The parameters to pass, if any.</param>
1230+
/// <param name="transaction">The transaction to use, if any.</param>
1231+
/// <param name="commandTimeout">The command timeout (in seconds).</param>
1232+
/// <param name="commandType">The type of command to execute.</param>
1233+
/// <returns>
1234+
/// A sequence of data of <typeparamref name="T"/>; if a basic type (int, string, etc) is queried then the data from the first column is assumed, otherwise an instance is
1235+
/// created per row, and a direct column-name===member-name mapping is assumed (case insensitive).
1236+
/// </returns>
1237+
public static IAsyncEnumerable<T> QueryUnbufferedAsync<T>(this DbConnection cnn, string sql, object param = null, DbTransaction transaction = null, int? commandTimeout = null, CommandType? commandType = null)
1238+
{
1239+
// note: in many cases of adding a new async method I might add a CancellationToken - however, cancellation is expressed via WithCancellation on iterators
1240+
return QueryUnbufferedAsync<T>(cnn, typeof(T), new CommandDefinition(sql, param, transaction, commandTimeout, commandType, CommandFlags.None, default));
1241+
}
1242+
1243+
private static IAsyncEnumerable<T> QueryUnbufferedAsync<T>(this IDbConnection cnn, Type effectiveType, CommandDefinition command)
1244+
{
1245+
return Impl(cnn, effectiveType, command, command.CancellationToken); // proxy to allow CT expression
1246+
1247+
static async IAsyncEnumerable<T> Impl(IDbConnection cnn, Type effectiveType, CommandDefinition command,
1248+
[EnumeratorCancellation] CancellationToken cancel)
1249+
{
1250+
object param = command.Parameters;
1251+
var identity = new Identity(command.CommandText, command.CommandType, cnn, effectiveType, param?.GetType());
1252+
var info = GetCacheInfo(identity, param, command.AddToCache);
1253+
bool wasClosed = cnn.State == ConnectionState.Closed;
1254+
using var cmd = command.TrySetupAsyncCommand(cnn, info.ParamReader);
1255+
DbDataReader reader = null;
1256+
try
1257+
{
1258+
if (wasClosed) await cnn.TryOpenAsync(cancel).ConfigureAwait(false);
1259+
reader = await ExecuteReaderWithFlagsFallbackAsync(cmd, wasClosed, CommandBehavior.SequentialAccess | CommandBehavior.SingleResult, cancel).ConfigureAwait(false);
1260+
1261+
var tuple = info.Deserializer;
1262+
int hash = GetColumnHash(reader);
1263+
if (tuple.Func == null || tuple.Hash != hash)
1264+
{
1265+
if (reader.FieldCount == 0)
1266+
{
1267+
yield break;
1268+
}
1269+
tuple = info.Deserializer = new DeserializerState(hash, GetDeserializer(effectiveType, reader, 0, -1, false));
1270+
if (command.AddToCache) SetQueryCache(identity, info);
1271+
}
1272+
1273+
var func = tuple.Func;
1274+
1275+
var convertToType = Nullable.GetUnderlyingType(effectiveType) ?? effectiveType;
1276+
while (await reader.ReadAsync(cancel).ConfigureAwait(false))
1277+
{
1278+
object val = func(reader);
1279+
yield return GetValue<T>(reader, effectiveType, val);
1280+
}
1281+
while (await reader.NextResultAsync(cancel).ConfigureAwait(false)) { /* ignore subsequent result sets */ }
1282+
command.OnCompleted();
1283+
}
1284+
finally
1285+
{
1286+
if (reader is not null)
1287+
{
1288+
await reader.DisposeAsync();
1289+
}
1290+
if (wasClosed) cnn.Close();
1291+
}
1292+
}
1293+
}
1294+
#endif
12201295
}
12211296
}

Dapper/SqlMapper.GridReader.Async.cs

Lines changed: 88 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,8 @@
22
using System.Collections.Generic;
33
using System.Data;
44
using System.Data.Common;
5-
using System.Globalization;
65
using System.Linq;
6+
using System.Runtime.CompilerServices;
77
using System.Threading;
88
using System.Threading.Tasks;
99

@@ -12,6 +12,9 @@ namespace Dapper
1212
public static partial class SqlMapper
1313
{
1414
public partial class GridReader
15+
#if NET5_0_OR_GREATER
16+
: IAsyncDisposable
17+
#endif
1518
{
1619
private readonly CancellationToken cancel;
1720
internal GridReader(IDbCommand command, DbDataReader reader, Identity identity, DynamicParameters dynamicParams, bool addToCache, CancellationToken cancel)
@@ -140,7 +143,7 @@ public Task<object> ReadSingleOrDefaultAsync(Type type)
140143

141144
private async Task NextResultAsync()
142145
{
143-
if (await ((DbDataReader)reader).NextResultAsync(cancel).ConfigureAwait(false))
146+
if (await reader.NextResultAsync(cancel).ConfigureAwait(false))
144147
{
145148
// readCount++;
146149
gridIndex++;
@@ -150,14 +153,37 @@ private async Task NextResultAsync()
150153
{
151154
// happy path; close the reader cleanly - no
152155
// need for "Cancel" etc
156+
#if NET5_0_OR_GREATER
157+
await reader.DisposeAsync();
158+
#else
153159
reader.Dispose();
160+
#endif
154161
reader = null;
155162
callbacks?.OnCompleted();
163+
#if NET5_0_OR_GREATER
164+
await DisposeAsync();
165+
#else
156166
Dispose();
167+
#endif
157168
}
158169
}
159170

160171
private Task<IEnumerable<T>> ReadAsyncImpl<T>(Type type, bool buffered)
172+
{
173+
var deserializer = ValidateAndMarkConsumed(type);
174+
if (buffered)
175+
{
176+
return ReadBufferedAsync<T>(gridIndex, deserializer);
177+
}
178+
else
179+
{
180+
var result = ReadDeferred<T>(gridIndex, deserializer, type);
181+
if (buffered) result = result?.ToList(); // for the "not a DbDataReader" scenario
182+
return Task.FromResult(result);
183+
}
184+
}
185+
186+
private Func<DbDataReader, object> ValidateAndMarkConsumed(Type type)
161187
{
162188
if (reader == null) throw new ObjectDisposedException(GetType().FullName, "The reader has been disposed; this can happen after all data has been consumed");
163189
if (IsConsumed) throw new InvalidOperationException("Query results must be consumed in the correct order, and each result can only be consumed once");
@@ -172,27 +198,10 @@ private Task<IEnumerable<T>> ReadAsyncImpl<T>(Type type, bool buffered)
172198
cache.Deserializer = deserializer;
173199
}
174200
IsConsumed = true;
175-
if (buffered && reader is DbDataReader)
176-
{
177-
return ReadBufferedAsync<T>(gridIndex, deserializer.Func);
178-
}
179-
else
180-
{
181-
var result = ReadDeferred<T>(gridIndex, deserializer.Func, type);
182-
if (buffered) result = result?.ToList(); // for the "not a DbDataReader" scenario
183-
return Task.FromResult(result);
184-
}
185-
}
186-
187-
private Task<T> ReadRowAsyncImpl<T>(Type type, Row row)
188-
{
189-
if (reader is DbDataReader dbReader) return ReadRowAsyncImplViaDbReader<T>(dbReader, type, row);
190-
191-
// no async API available; use non-async and fake it
192-
return Task.FromResult(ReadRow<T>(type, row));
201+
return deserializer.Func;
193202
}
194203

195-
private async Task<T> ReadRowAsyncImplViaDbReader<T>(DbDataReader reader, Type type, Row row)
204+
private async Task<T> ReadRowAsyncImpl<T>(Type type, Row row)
196205
{
197206
if (reader == null) throw new ObjectDisposedException(GetType().FullName, "The reader has been disposed; this can happen after all data has been consumed");
198207
if (IsConsumed) throw new InvalidOperationException("Query results must be consumed in the correct order, and each result can only be consumed once");
@@ -229,7 +238,6 @@ private async Task<IEnumerable<T>> ReadBufferedAsync<T>(int index, Func<DbDataRe
229238
{
230239
try
231240
{
232-
var reader = (DbDataReader)this.reader;
233241
var buffer = new List<T>();
234242
while (index == gridIndex && await reader.ReadAsync(cancel).ConfigureAwait(false))
235243
{
@@ -245,6 +253,64 @@ private async Task<IEnumerable<T>> ReadBufferedAsync<T>(int index, Func<DbDataRe
245253
}
246254
}
247255
}
256+
257+
#if NET5_0_OR_GREATER
258+
/// <summary>
259+
/// Read the next grid of results.
260+
/// </summary>
261+
/// <typeparam name="T">The type to read.</typeparam>
262+
public IAsyncEnumerable<T> ReadUnbufferedAsync<T>() => ReadAsyncUnbufferedImpl<T>(typeof(T));
263+
264+
private IAsyncEnumerable<T> ReadAsyncUnbufferedImpl<T>(Type type)
265+
{
266+
var deserializer = ValidateAndMarkConsumed(type);
267+
return ReadUnbufferedAsync<T>(gridIndex, deserializer, cancel);
268+
}
269+
270+
private async IAsyncEnumerable<T> ReadUnbufferedAsync<T>(int index, Func<DbDataReader, object> deserializer, [EnumeratorCancellation] CancellationToken cancel)
271+
{
272+
try
273+
{
274+
while (index == gridIndex && await reader.ReadAsync(cancel).ConfigureAwait(false))
275+
{
276+
yield return ConvertTo<T>(deserializer(reader));
277+
}
278+
}
279+
finally // finally so that First etc progresses things even when multiple rows
280+
{
281+
if (index == gridIndex)
282+
{
283+
await NextResultAsync().ConfigureAwait(false);
284+
}
285+
}
286+
}
287+
288+
/// <summary>
289+
/// Dispose the grid, closing and disposing both the underlying reader and command.
290+
/// </summary>
291+
public async ValueTask DisposeAsync()
292+
{
293+
if (reader != null)
294+
{
295+
if (!reader.IsClosed) Command?.Cancel();
296+
await reader.DisposeAsync();
297+
reader = null;
298+
}
299+
if (Command != null)
300+
{
301+
if (Command is DbCommand typed)
302+
{
303+
await typed.DisposeAsync();
304+
}
305+
else
306+
{
307+
Command.Dispose();
308+
}
309+
Command = null;
310+
}
311+
GC.SuppressFinalize(this);
312+
}
313+
#endif
248314
}
249315
}
250316
}

Dapper/SqlMapper.GridReader.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
11
using System;
22
using System.Collections.Generic;
33
using System.Data;
4-
using System.Linq;
4+
using System.Data.Common;
55
using System.Globalization;
6+
using System.Linq;
67
using System.Runtime.CompilerServices;
7-
using System.Data.Common;
88

99
namespace Dapper
1010
{

docs/index.md

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,13 @@ Note: to get the latest pre-release build, add ` -Pre` to the end of the command
2222

2323
### unreleased
2424

25-
- add support for `SqlDecimal` and other types that need to be accessed via `DbDataReader.GetFieldValue<T>`
26-
- add an overload of `AddTypeMap` that supports `DbDataReader.GetFieldValue<T>` for additional types
27-
- acknowledge that in reality we only support `DbDataReader`; this has been true (via `DbConnection`) for `async` forever
25+
- (#1910 via mgravell, fix #1907, #1263)
26+
- add support for `SqlDecimal` and other types that need to be accessed via `DbDataReader.GetFieldValue<T>`
27+
- add an overload of `AddTypeMap` that supports `DbDataReader.GetFieldValue<T>` for additional types
28+
- acknowledge that in reality we only support `DbDataReader`; this has been true (via `DbConnection`) for `async` forever
29+
- (#1912 via mgravell)
30+
- add missing `AsyncEnumerable<T> QueryUnbufferedAsync<T>(...)` and `GridReader.ReadUnbufferedAsync<T>(...)` APIs (.NET 5 and later)
31+
- implement `IAsyncDisposable` on `GridReader` (.NET 5 and later)
2832

2933
(note: new PRs will not be merged until they add release note wording here)
3034

tests/Dapper.Tests/AsyncTests.cs

Lines changed: 84 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,12 @@
1-
using System.Linq;
1+
using System;
2+
using System.Collections.Generic;
23
using System.Data;
4+
using System.Data.Common;
35
using System.Diagnostics;
4-
using System;
5-
using System.Threading.Tasks;
6+
using System.Linq;
67
using System.Threading;
8+
using System.Threading.Tasks;
79
using Xunit;
8-
using System.Data.Common;
910
using Xunit.Abstractions;
1011

1112
namespace Dapper.Tests
@@ -45,6 +46,85 @@ public async Task TestBasicStringUsageAsync()
4546
Assert.Equal(new[] { "abc", "def" }, arr);
4647
}
4748

49+
#if NET5_0_OR_GREATER
50+
[Fact]
51+
public async Task TestBasicStringUsageUnbufferedAsync()
52+
{
53+
var results = new List<string>();
54+
await foreach (var value in connection.QueryUnbufferedAsync<string>("select 'abc' as [Value] union all select @txt", new { txt = "def" })
55+
.ConfigureAwait(false))
56+
{
57+
results.Add(value);
58+
}
59+
var arr = results.ToArray();
60+
Assert.Equal(new[] { "abc", "def" }, arr);
61+
}
62+
63+
[Fact]
64+
public async Task TestBasicStringUsageUnbufferedAsync_Cancellation()
65+
{
66+
using var cts = new CancellationTokenSource();
67+
var results = new List<string>();
68+
await Assert.ThrowsAnyAsync<OperationCanceledException>(async () =>
69+
{
70+
await foreach (var value in connection.QueryUnbufferedAsync<string>("select 'abc' as [Value] union all select @txt", new { txt = "def" })
71+
.ConfigureAwait(false).WithCancellation(cts.Token))
72+
{
73+
results.Add(value);
74+
cts.Cancel(); // cancel after first item
75+
}
76+
});
77+
var arr = results.ToArray();
78+
Assert.Equal(new[] { "abc" }, arr); // we don't expect the "def" because of the cancellation
79+
}
80+
81+
[Fact]
82+
public async Task TestBasicStringUsageViaGridReaderUnbufferedAsync()
83+
{
84+
var results = new List<string>();
85+
await using (var grid = await connection.QueryMultipleAsync("select 'abc' union select 'def'; select @txt", new { txt = "ghi" })
86+
.ConfigureAwait(false))
87+
{
88+
while (!grid.IsConsumed)
89+
{
90+
await foreach (var value in grid.ReadUnbufferedAsync<string>()
91+
.ConfigureAwait(false))
92+
{
93+
results.Add(value);
94+
}
95+
}
96+
}
97+
var arr = results.ToArray();
98+
Assert.Equal(new[] { "abc", "def", "ghi" }, arr);
99+
}
100+
101+
[Fact]
102+
public async Task TestBasicStringUsageViaGridReaderUnbufferedAsync_Cancellation()
103+
{
104+
using var cts = new CancellationTokenSource();
105+
var results = new List<string>();
106+
await using (var grid = await connection.QueryMultipleAsync("select 'abc' union select 'def'; select @txt", new { txt = "ghi" })
107+
.ConfigureAwait(false))
108+
{
109+
await Assert.ThrowsAnyAsync<OperationCanceledException>(async () =>
110+
{
111+
while (!grid.IsConsumed)
112+
{
113+
await foreach (var value in grid.ReadUnbufferedAsync<string>()
114+
.ConfigureAwait(false)
115+
.WithCancellation(cts.Token))
116+
{
117+
results.Add(value);
118+
}
119+
cts.Cancel();
120+
}
121+
});
122+
}
123+
var arr = results.ToArray();
124+
Assert.Equal(new[] { "abc", "def" }, arr); // don't expect the ghi because of cancellation
125+
}
126+
#endif
127+
48128
[Fact]
49129
public async Task TestBasicStringUsageQueryFirstAsync()
50130
{

0 commit comments

Comments
 (0)