SqlConnection no se elimina cuando se usa asincrónico

.net async-await c# dapper database-connection

Pregunta

Tengo un proyecto que tiene una base de datos de servidor Sql y Dapper como ORM. Estoy intentando utilizar el método Dapper's QueryAsync() para obtener algunos datos. No solo eso, sino que la llamada a mi repositorio proviene de varias tareas que se llaman con una Task.WhenAll . Task.WhenAll (es decir, cada tarea implica obtener datos de ese repositorio, entonces cada tarea espera el método de mi repos que envuelve QueryAsync() llamada).

El problema es que mis SqlConnections nunca se cierran aunque estoy usando un bloque de using . Como resultado, tengo más de 100 conexiones abiertas a mi base de datos y, finalmente, empiezo a obtener excepciones de "tamaño máximo de la agrupación". La cuestión es que, cuando cambio a Query() lugar de QueryAsync() , funciona bien, pero me gustaría poder hacerlo de forma asincrónica.

Aquí hay un ejemplo de código. Traté de imitar la estructura de la aplicación real lo mejor que pude, por lo que parece más complejo de lo que debe ser.

Interfaz:

public interface IFooRepository<T> where T: FooBase
{
    Task<IEnumerable<T>> Select(string account, DateTime? effectiveDate = null);
}

Implementación:

public class FooRepository : RepositoryBase, IFooRepository<SpecialFoo>
{
    private readonly IWebApiClientRepository _accountRepository;

    public FooRepository(IWebApiClientRepository repo)
    {
        _accountRepository = repo;
    }
    public async Task<IEnumerable<FuturePosition>> Select(string code, DateTime? effectiveDate = null)
    {
        effectiveDate = effectiveDate ?? DateTime.Today.Date;
        var referenceData =  await _accountRepository.GetCrossRefferenceData(code, effectiveDate.Value);
        using (var connection = new SqlConnection("iamaconnectionstring")
        {
            connection.Open();
            try
            {
                var res = await connection.QueryAsync<FuturePosition>(SqlQueryVariable + "AND t.code = @code;",
                    new
                    {
                        effectiveDate = effectiveDate.Value,
                        code = referenceData.Code
                    });

                foreach (var item in res)
                {
                    item.PropFromReference = referenceData.PropFromReference;
                }
                return res;
            }
            catch (Exception e)
            {
                //log 
                throw;
            }
            finally
            {
                connection.Close();
            }
        }
    }
}

Entonces ahora con el código de llamada, hay 2 capas. Comenzaré por el exterior. Creo que aquí es donde está el problema. Hay comentarios en el siguiente.

Populator:

public class Populator : PopulatorBase
{
    private IAccountRepository _acctRepository;
    public override async Task<IEnumerable<PopulationResult>> ProcessAccount(DateTime? popDate = null)
    {
        //My attempt at throttling the async calls
        //I was hoping this would force a max of 10 simultaneous connections.
        //It did not work.
        SemaphoreSlim ss = new SemaphoreSlim(10,10);
        var accountsToProcess = _acctRepository.GetAllAccountsToProcess();
        var accountNumbers = accountsToProcess.SelectMany(a => a.accountNumbers).ToList();

        List<Task<ProcessResult>> trackedTasks = new List<Task<ProcessResult>>();
        foreach (var item in accountNumbers)
        {
            await ss.WaitAsync();
            trackedTasks.Add(ProcessAccount(item.AccountCode, popDate ?? DateTime.Today));
            ss.Release();
        }
        //my gut tells me the issue is because of these tasks
        var results = await Task.WhenAll(trackedTasks);
        return results;
    }

    private async Task<ProcessResult>ProcessAccount(string accountCode, DateTime? popDate)
    {
        var createdItems = await _itemCreator.MakeExceptions(popDate, accountCode);
        return Populate(accountCode, createdItems);
    }
}

ItemCreator:

public class ItemCreator : ItemCreatorBase
{
    private readonly IFooRepository<FuturePosition> _fooRepository;
    private readonly IBarRepository<FuturePosition> _barRepository;

    public RussellGlobeOpFutureExceptionCreator() )
    {
        //standard constructor stuff
    }
    public async Task<ItemCreationResult> MakeItems(DateTime? effectiveDate, string account)
    {
        DateTime reconDate = effectiveDate ?? DateTime.Today.Date;

        //this uses the repository I outlined above
        var foos = await _fooRepository.Select(account, effectiveDate);

        //this repository uses a rest client, I doubt it's the problem
        var bars = await _barRepository.Select(account, effectiveDate);

        //just trying to make this example less lengthy
        var foobars = MakeFoobars(foos, bars);
        var result = new ItemCreationResult { EffectiveDate = effectiveDate, Items = foobars };
        return result;
    }
}

En cuanto a lo que he intentado:

  • Estrangulación con un semáforoSlim
  • Sin estrangulamiento
  • Usando connection.OpenAnync() en el repositorio
  • incluyendo / excluyendo un bloque finally (debe ser irrelevante con el using )

Vale la pena saber que el bucle foreach en el populator corre alrededor de 500 veces. Básicamente, hay una lista de 500 cuentas. Para cada uno, se tiene que hacer una carrera de larga populate tarea que implica la extracción de datos de mi repo Foo.

Honestamente, no tengo idea. Creo que podría tener que ver con esperar mi llamada async db de cada tarea dentro de esa lista de tareas en el populator. Cualquier idea sobre este tema sería muy útil.

Respuesta aceptada

Después de algunas excavaciones, creo que pude resolver el problema. No creo que en realidad estuviera experimentando una fuga de conexión como lo había supuesto originalmente. Por lo que ahora entiendo, con la agrupación de conexiones, cuando una conexión SQL se cierra desde el código, en realidad no desaparece, sino que va al grupo de conexiones como una conexión inactiva. Ver las conexiones abiertas en SQL todavía lo mostrará.

Como mi acceso a los datos era asincrónico, todas las conexiones se abrieron antes de que se devolvieran las conexiones "cerradas" al grupo, lo que significa que se abrió una nueva conexión para cada solicitud. Eso causó la sorprendente cantidad de conexiones abiertas que vi, lo que me hizo asumir que tenía una pérdida de conexión.

Usando un SemaphoreSlim en realidad se encargó del problema, lo acabo de implementar incorrectamente. Debería funcionar así:

public override async Task<IEnumerable<ProcessResult>> ProcessAccount(DateTime? popDate = null)
{
      foreach (item in accountNumbers)
      {

      trackedTasks.Add(new Func<Task<ProcessResult>>(async () =>
            {
                await ss.WaitAsync().ConfigureAwait(false);
                try
                {
                    return await ProcessAccount(item.AccountCode, popDate ?? DateTime.Today).ConfigureAwait(false);
                }
                catch (Exception e)
                {
                    //log, etc.
                }
                finally
                {
                    ss.Release();
                }
            })());
      }
}

De esta forma, se reduce la cantidad de conexiones que se abren a la vez y se espera a que se cierren, por lo que se vuelve a utilizar el mismo grupo de conexiones más pequeño de la agrupación.



Licencia bajo: CC-BY-SA with attribution
No afiliado con Stack Overflow
¿Es esto KB legal? Sí, aprende por qué
Licencia bajo: CC-BY-SA with attribution
No afiliado con Stack Overflow
¿Es esto KB legal? Sí, aprende por qué