Cómo evitar "Violación de la restricción UNIQUE KEY" cuando se realizan MUCHOS INSERTES simultáneos

c# dapper database sql sql-server

Pregunta

Estoy realizando MUCHAS INSERT SQL INSERT simultáneas que colisionan en una restricción UNIQUE KEY, aunque también estoy verificando los registros existentes para la clave dada dentro de una sola transacción. Estoy buscando una manera de eliminar, o minimizar, la cantidad de colisiones que estoy recibiendo sin dañar el rendimiento (demasiado).

Fondo:

Estoy trabajando en un proyecto ASP.NET MVC4 WebApi que recibe MUCHAS solicitudes HTTP POST para INSERT registros. Se obtiene alrededor de 5K - 10K solicitudes por segundo. La única responsabilidad del proyecto es desduplicar y agregar registros. Es muy escribir pesado; tiene una cantidad relativamente pequeña de solicitudes de lectura; todos los cuales usan una Transacción con IsolationLevel.ReadUncommitted .

Esquema de la base de datos

Aquí está la tabla DB:

CREATE TABLE [MySchema].[Records] ( 
    Id BIGINT IDENTITY NOT NULL, 
    RecordType TINYINT NOT NULL, 
    UserID BIGINT NOT NULL, 
    OtherID SMALLINT NULL, 
    TimestampUtc DATETIMEOFFSET NOT NULL, 
    CONSTRAINT [UQ_MySchemaRecords_UserIdRecordTypeOtherId] UNIQUE CLUSTERED ( 
        [UserID], [RecordType], [OtherID] 
    ), 
    CONSTRAINT [PK_MySchemaRecords_Id] PRIMARY KEY NONCLUSTERED ( 
        [Id] ASC 
    ) 
) 

Código de repositorio

Aquí está el código para el método Upsert que está causando la excepción:

using System;
using System.Data;
using System.Data.SqlClient;
using System.Linq;
using Dapper;

namespace MyProject.DataAccess
{
    public class MyRepo
    {
        public void Upsert(MyRecord record)
        {
            var dbConnectionString = "MyDbConnectionString";
            using (var connection = new SqlConnection(dbConnectionString))
            {
                connection.Open();
                using (var transaction = connection.BeginTransaction(IsolationLevel.ReadCommitted))
                {
                    try
                    {
                        var existingRecord = FindByByUniqueKey(transaction, record.RecordType, record.UserID, record.OtherID);

                        if (existingRecord == null)
                        {
                            const string sql = @"INSERT INTO [MySchema].[Records] 
                                                 ([UserID], [RecordType], [OtherID], [TimestampUtc]) 
                                                 VALUES (@UserID, @RecordType, @OtherID, @TimestampUtc) 
                                                 SELECT CAST(SCOPE_IDENTITY() AS BIGINT";
                            var results = transaction.Connection.Query<long>(sql, record, transaction);
                            record.Id = results.Single();
                        }
                        else if (existingRecord.TimestampUtc <= record.TimestampUtc)
                        {
                            // UPDATE
                        }

                        transaction.Commit();
                    }
                    catch (Exception e)
                    {
                        transaction.Rollback();
                        throw e;
                    }
                }
            }
        }

        // all read-only methods use explicit transactions with IsolationLevel.ReadUncommitted

        private static MyRecord FindByByUniqueKey(SqlTransaction transaction, RecordType recordType, long userID, short? otherID)
        {
            const string sql = @"SELECT * from [MySchema].[Records] 
                                 WHERE [UserID] = @UserID
                                 AND [RecordType] = @RecordType
                                 AND [OtherID] = @OtherID";
            var paramz = new {
                UserID = userID,
                RecordType = recordType,
                OtherID = otherID
            };
            var results = transaction.Connection.Query<MyRecord>(sql, paramz, transaction);
            return results.SingleOrDefault();
        }
    }

    public class MyRecord
    {
        public long ID { get; set; }
        public RecordType RecordType { get; set; }
        public long UserID { get; set; }
        public short? OtherID { get; set; }
        public DateTimeOffset TimestampUtc { get; set; }
    }

    public enum RecordType : byte
    {
        TypeOne = 1,
        TypeTwo = 2,
        TypeThree = 3
    }
}

El problema

Cuando el servidor tiene una carga lo suficientemente pesada, veo que se producen muchas de estas excepciones:

Violación de la restricción UNIQUE KEY 'UQ_MySchemaRecords_UserIdRecordTypeOtherId'. No se puede insertar una clave duplicada en el objeto 'MySchema.Records'. El valor duplicado de la clave es (1234567890, 1, 123). La instrucción se ha terminado.

Esta excepción ocurre a menudo, hasta 10 veces en un minuto.

Lo que he intentado

  • Traté de cambiar IsolationLevel a Serializable . La excepción se produjo con mucha menos frecuencia, pero aún así se produjo. Además, el rendimiento del código sufrió mucho; el sistema solo podía manejar solicitudes de 2K por segundo. Sospecho que esta disminución en el rendimiento fue en realidad la causa de las Excepciones reducidas, así que concluí que esto no resolvió mi problema.
  • He considerado utilizar la UPDLOCK tabla UPDLOCK pero no entiendo completamente cómo coopera con los niveles de aislamiento o cómo aplicarlo a mi código. Sin embargo, parece que podría ser la mejor solución, desde mi comprensión actual.
  • También traté de agregar la declaración SELECT inicial (para los registros existentes) para ser parte de la declaración INSERT , como se muestra aquí, pero este intento aún tenía el mismo problema.
  • Intenté implementar mi método Upsert utilizando la instrucción SQL MERGE , pero esto también sufrió el mismo problema.

Mis preguntas)

  • ¿Hay algo que pueda hacer para evitar este tipo de colisiones de restricciones de clave UNIQUE ?
  • Si yo debería estar utilizando la UPDLOCK sugerencia de tabla (o cualquier otra sugerencia de tabla para el caso), ¿cómo voy a añadir que a mi código? ¿Lo agregaría al INSERT ? El SELECT ? ¿Ambos?

Respuesta aceptada

Haga que la lectura de validación tome un bloqueo:

FROM SomeTable WITH (UPDLOCK, ROWLOCK, HOLDLOCK)

Esto serializa los accesos en una sola clave, lo que permite la concurrencia en todos los demás.


HOLDLOCK (= SERIALIZABLE ) protege un rango de valores. Esto garantiza que una fila que no existe continúa sin existir, por lo que INSERT correctamente.

UPDLOCK garantiza que cualquier fila existente no se modifique o elimine mediante otra transacción concurrente para que la UPDATE exitosa.

ROWLOCK alienta al motor a tomar un bloqueo a nivel de fila.

Estos cambios pueden aumentar las posibilidades de un punto muerto.


Respuesta popular

Puede ser más rápido permitir y suprimir los errores en su escenario que tratar de eliminarlos. Si está consolidando varias fuentes de forma sincrónica con datos superpuestos, tendrá que crear un cuello de botella en algún lugar para administrar la condición de carrera.

Podría crear una clase de gestor de singleton que tuviera las restricciones únicas de los registros en un hashset para que pueda soltar automáticamente los duplicados cuando se agreguen al conjunto. Los registros se agregan antes de enviarse al DB y se eliminan al completar la declaración. De esta forma, o bien el hashset come el duplicado o bien el control de registro existente que realiza en la parte superior de su intento detecta el registro duplicado confirmado.



Licencia bajo: CC-BY-SA with attribution
No afiliado con Stack Overflow
¿Es esto KB legal? Sí, aprende por qué
Licencia bajo: CC-BY-SA with attribution
No afiliado con Stack Overflow
¿Es esto KB legal? Sí, aprende por qué