Ich verwende Dapper, um Daten aus einem sehr großen Satz in SQL Server zu streamen. Es funktioniert gut mit der Rückgabe von IEnumerable
und dem Aufrufen von Query()
, aber wenn ich zu QueryAsync()
, scheint es, dass das Programm versucht, alle Daten von SQL Server zu lesen, anstatt sie zu streamen.
Laut dieser Frage sollte es gut mit buffered: false
funktionieren, was ich tue, aber die Frage sagt nichts über async/await
.
Nach dieser Frage ist es nicht einfach, mit QueryAsync()
zu tun, was ich will.
Verstehe ich richtig, dass Enumerables iteriert werden, wenn der Kontext für async/await
umgeschaltet wird?
Noch eine Frage, ob dies möglich ist, wenn das neue asynchrone C # 8-Streaming verfügbar ist?
Update März 2020
.NET Core 3.0 (und 3.1) sind jetzt verfügbar und unterstützen asynchrone Streams vollständig. Die Microsoft.Bcl.AsyncInterfaces bieten Unterstützung für .NET Standard 2.0 und .NET Framework 4.6.1+, obwohl 4.7.2 aus Gründen der Vernunft verwendet werden sollte. Wie in den Dokumenten zur Unterstützung der .NET Standard-Implementierung erläutert
Während NuGet .NET Framework 4.6.1 als Unterstützung für .NET Standard 1.5 bis 2.0 betrachtet, gibt es verschiedene Probleme beim Konsumieren von .NET Standard-Bibliotheken, die für diese Versionen aus .NET Framework 4.6.1-Projekten erstellt wurden.
Für .NET Framework-Projekte, die solche Bibliotheken verwenden müssen, empfehlen wir, das Projekt auf .NET Framework 4.7.2 oder höher zu aktualisieren.
Ursprüngliche Antwort
Wenn Sie den Quellcode überprüfen , werden Sie feststellen, dass Ihr Verdacht fast richtig ist. Wenn buffered
falsch ist, wird QueryAsync
synchron QueryAsync
.
if (command.Buffered)
{
var buffer = new List<T>();
var convertToType = Nullable.GetUnderlyingType(effectiveType) ?? effectiveType;
while (await reader.ReadAsync(cancel).ConfigureAwait(false))
{
object val = func(reader);
if (val == null || val is T)
{
buffer.Add((T)val);
}
else
{
buffer.Add((T)Convert.ChangeType(val, convertToType, CultureInfo.InvariantCulture));
}
}
while (await reader.NextResultAsync(cancel).ConfigureAwait(false)) { /* ignore subsequent result sets */ }
command.OnCompleted();
return buffer;
}
else
{
// can't use ReadAsync / cancellation; but this will have to do
wasClosed = false; // don't close if handing back an open reader; rely on the command-behavior
var deferred = ExecuteReaderSync<T>(reader, func, command.Parameters);
reader = null; // to prevent it being disposed before the caller gets to see it
return deferred;
}
Wie der Kommentar erklärt, ist es nicht möglich, ReadAsync
zu verwenden, wenn erwartet wird, dass der Rückgabetyp IEnumerable ist. Deshalb mussten die asynchronen Aufzählungen von C # 8 eingeführt werden.
Der Code für ExecuteReaderSync lautet:
private static IEnumerable<T> ExecuteReaderSync<T>(IDataReader reader, Func<IDataReader, object> func, object parameters)
{
using (reader)
{
while (reader.Read())
{
yield return (T)func(reader);
}
while (reader.NextResult()) { /* ignore subsequent result sets */ }
(parameters as IParameterCallbacks)?.OnCompleted();
}
}
Es verwendet Read
anstelle von ReadAsync
.
Mit IAsyncEnumerable
C # 8-Streams kann dies neu geschrieben werden, um eine IAsyncEnumerable
. Durch einfaches Ändern der Sprachversion wird das Problem nicht gelöst.
Angesichts der aktuellen Dokumente zu asynchronen Streams könnte dies folgendermaßen aussehen:
private static async IAsyncEnumerable<T> ExecuteReaderASync<T>(IDataReader reader, Func<IDataReader, object> func, object parameters)
{
using (reader)
{
while (await reader.ReadAsync())
{
yield return (T)func(reader);
}
while (await reader.NextResultAsync(cancel).ConfigureAwait(false)) { /* ignore subsequent result sets */ }
command.OnCompleted();
(parameters as IParameterCallbacks)?.OnCompleted();
}
}
Asynchrone Streams von Buuuuuut sind eines der Dinge, die nur unter .NET Core funktionieren können und wahrscheinlich noch nicht implementiert sind. Als ich versuchte, eine in Sharplab.io zu schreiben, Kaboom. [connection lost, reconnecting…]
Insbesondere im Zusammenhang mit dem adrett, ja: es muss eine andere API wie von @Panagiotis durch die ausgezeichnete Antwort erklärt. Was folgt, ist keine Antwort als solche, sondern ein zusätzlicher Kontext, den Implementierer, die sich den gleichen Herausforderungen stellen, möglicherweise berücksichtigen möchten.
Ich habe dies noch nicht für Dapper "aufgespießt" (obwohl ich es für SE.Redis getan habe), und ich bin zwischen verschiedenen Optionen hin und her gerissen:
Wir werden wahrscheinlich mit "1" gehen, aber ich muss sagen, die zweite Option ist aus guten Gründen ungewöhnlich verlockend:
Aber das Seltsame ist die .NET Core 3.0- IAsyncEnumerable<T>
von IAsyncEnumerable<T>
- offensichtlich zielt Dapper nicht nur auf .NET Core 3.0 ab. wir können:
IAsyncEnumerable<T>
die Funktion auf .NET Core 3.0, und geben Sie IAsyncEnumerable<T>
IAsyncEnumerable<T>
die Bibliothek auf .NET Core 3.0, und geben Sie IAsyncEnumerable<T>
IAsyncEnumerable<T>
IAsyncEnumerable<T>
einen benutzerdefinierten IAsyncEnumerable<T>
, der nicht IAsyncEnumerable<T>
(aber IAsyncEnumerable<T>
implementiert, wenn verfügbar), und implementieren Sie den Zustandsautomaten manuell. IAsyncEnumerable<T>
der IAsyncEnumerable<T>
von foreach
funktioniert dies so lange, wie es von uns benutzerdefiniert ist Enumerable Type bietet die richtigen Methoden Ich denke, wir werden wahrscheinlich Option 3 wählen, aber um es noch einmal zu wiederholen: Ja, etwas muss sich ändern.