Actualizar o combinar registros en Azure SQL Database mediante Azure Functions

Actualmente, Azure Stream Analytics (ASA) solo admite la inserción (anexación) de filas a salidas SQL (Azure SQL Databases y Azure Synapse Analytics). En este artículo se describen las soluciones alternativas para habilitar UPDATE, UPSERT o MERGE en bases de datos SQL mediante Azure Functions como capa intermedia.

Al final se presentan las opciones alternativas a Azure Functions.

Requisito

Puede escribir datos en una tabla mediante uno de los siguientes modos:

Modo Instrucción T-SQL equivalente Requisitos
Añadir INSERT None.
Reemplazar MERGE (UPSERT) Clave única
Acumular MERGE (UPSERT) con operador de asignación compuesta (+=, -=...) Clave única y acumulador

Para ilustrar las diferencias, tenga en cuenta lo que sucede al ingerir los dos registros siguientes:

Hora_de_Llegada Identificador_Dispositivo Valor_Medida
10:00 A 1
10:05 A 20

En el modo de anexión , se insertan dos registros. La instrucción T-SQL equivalente es:

INSERT INTO [target] VALUES (...);

Resultando en:

Modified_Time Identificador_de_Dispositivo Valor_de_Medida
10:00 A 1
10:05 A 20

En el modo de reemplazo , solo se obtiene el último valor por clave. Aquí se usa Device_Id como clave. La instrucción T-SQL equivalente es:

MERGE INTO [target] t
USING (VALUES ...) AS v (Modified_Time,Device_Id,Measure_Value)
ON t.Device_Key = v.Device_Id
-- Replace when the key exists
WHEN MATCHED THEN
    UPDATE SET
        t.Modified_Time = v.Modified_Time,
        t.Measure_Value = v.Measure_Value
-- Insert new keys
WHEN NOT MATCHED BY t THEN
    INSERT (Modified_Time,Device_Key,Measure_Value)
    VALUES (v.Modified_Time,v.Device_Id,v.Measure_Value)

Como resultado:

Hora_Modificada Clave_de_Dispositivo Valor_de_medida
10:05 A 20

Por último, en el modo acumular, sumas Value con un operador de asignación compuesta (+=). Aquí también se usa Device_Id como clave:

MERGE INTO [target] t
USING (VALUES ...) AS v (Modified_Time,Device_Id,Measure_Value)
ON t.Device_Key = v.Device_Id
-- Replace and/or accumulate when the key exists
WHEN MATCHED THEN
    UPDATE SET
        t.Modified_Time = v.Modified_Time,
        t.Measure_Value += v.Measure_Value
-- Insert new keys
WHEN NOT MATCHED BY t THEN
    INSERT (Modified_Time,Device_Key,Measure_Value)
    VALUES (v.Modified_Time,v.Device_Id,v.Measure_Value)

Resultando en:

Hora_Modificada Clave_del_Dispositivo Valor_de_Medida
10:05 A 21

Por motivos de rendimiento, los adaptadores de salida de base de datos SQL de ASA solo admiten de momento el modo Append de forma nativa. Estos adaptadores usan la inserción masiva para maximizar el rendimiento y limitar la presión de retorno.

En este artículo se muestra cómo usar Azure Functions para implementar los modos Replace y Accumulate de ASA. Cuando usas una función como capa intermedia, el rendimiento potencial de escritura no afecta al trabajo de streaming. En este sentido, el uso de Azure Functions funciona mejor con Azure SQL. Con Synapse SQL, el cambio de instrucciones masivas a instrucciones fila a fila puede crear mayores problemas de rendimiento.

Salida de Azure Functions

En este trabajo, remplazará la salida de ASA SQL con la salida de ASA Azure Functions. La función implementa las funcionalidades UPDATE, UPSERT o MERGE.

Actualmente, puede acceder a una instancia de SQL Database en una función mediante dos opciones. La primera opción es la vinculación de salida Azure SQL. Actualmente se limita a C#, y solo ofrece el modo de reemplazo. La segunda opción consiste en redactar una consulta SQL para enviar a través del controlador SQL adecuado (Microsoft. Data.SqlClient para .NET).

Ambos de los siguientes ejemplos suponen el siguiente esquema de tabla. La opción de enlace requiere el establecimiento de una clave principal en la tabla de destino. No es necesario, aunque se recomienda, si se usa un controlador SQL.

CREATE TABLE [dbo].[device_updated](
	[DeviceId] [bigint] NOT NULL, -- bigint in ASA
	[Value] [decimal](18, 10) NULL, -- float in ASA
	[Timestamp] [datetime2](7) NULL, -- datetime in ASA
CONSTRAINT [PK_device_updated] PRIMARY KEY CLUSTERED
(
	[DeviceId] ASC
)
);

Para usar una función como salida de ASA, la función debe cumplir las expectativas siguientes:

  • Azure Stream Analytics espera el estado HTTP 200 de la aplicación de Functions para lotes que procesa correctamente.
  • Cuando Azure Stream Analytics recibe una excepción 413 ("http Request Entity Too Large") de una función de Azure, reduce el tamaño de los lotes que envía a Azure Function.
  • Durante la conexión de prueba, Stream Analytics envía una solicitud POST con un lote vacío a Azure Functions y espera una respuesta HTTP con el código de estado 20x para verificar la conexión.

Opción 1: Actualización por clave con el SQL Binding de Azure Functions

Esta opción usa el enlace de salida SQL de Azure Functions. Esta extensión puede reemplazar un objeto de una tabla sin tener que escribir una instrucción SQL. En este momento, no admite operadores de asignación compuestos (acumulaciones).

Este ejemplo se ha basado en:

Para comprender mejor el enfoque de enlace, siga este tutorial.

En primer lugar, cree una aplicación de funciones HttpTrigger predeterminada con este tutorial. Use la siguiente información:

  • Lenguaje: C#
  • Runtime: .NET 6 (en función/tiempo de ejecución v4)
  • Plantilla: HTTP trigger

Instale la extensión de enlace mediante la ejecución del siguiente comando en un terminal ubicado en la carpeta del proyecto:

dotnet add package Microsoft.Azure.WebJobs.Extensions.Sql --prerelease

Agregue el elemento SqlConnectionString en la sección Values de local.settings.json y rellene la cadena de conexión del servidor de destino:

{
    "IsEncrypted": false,
    "Values": {
        "AzureWebJobsStorage": "UseDevelopmentStorage=true",
        "FUNCTIONS_WORKER_RUNTIME": "dotnet",
        "SqlConnectionString": "Your connection string"
    }
}

Reemplace toda la función (archivo .cs del proyecto) por el siguiente fragmento de código. Actualice el espacio de nombres, el nombre de clase y el nombre de función con el suyo propio:

using System;
using System.IO;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Extensions.Http;
using Microsoft.AspNetCore.Http;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;

namespace Company.Function
{
    public static class HttpTrigger1{
        [FunctionName("HttpTrigger1")]
        public static async Task<IActionResult> Run (
            // http trigger binding
            [HttpTrigger(AuthorizationLevel.Function, "get","post", Route = null)] HttpRequest req,
            ILogger log,
            [Sql("dbo.device_updated", ConnectionStringSetting = "SqlConnectionString")] IAsyncCollector<Device> devices
            )
        {

            // Extract the body from the request
            string requestBody = await new StreamReader(req.Body).ReadToEndAsync();
            if (string.IsNullOrEmpty(requestBody)) {return new StatusCodeResult(204);} // 204, ASA connectivity check

            dynamic data = JsonConvert.DeserializeObject(requestBody);

            // Reject if too large, as per the doc
            if (data.ToString().Length > 262144) {return new StatusCodeResult(413);} //HttpStatusCode.RequestEntityTooLarge

            // Parse items and send to binding
            for (var i = 0; i < data.Count; i++)
            {
                var device = new Device();
                device.DeviceId = data[i].DeviceId;
                device.Value = data[i].Value;
                device.Timestamp = data[i].Timestamp;

                await devices.AddAsync(device);
            }
            await devices.FlushAsync();

            return new OkResult(); // 200
        }
    }

    public class Device{
        public int DeviceId { get; set; }
        public double Value { get; set; }
        public DateTime Timestamp { get; set; }
    }
}

Actualice el nombre de la tabla de destino de la sección de enlace:

[Sql("dbo.device_updated", ConnectionStringSetting = "SqlConnectionString")] IAsyncCollector<Device> devices

Actualice la clase Device y la sección de mapeo para que coincidan con su propio esquema:

...
                device.DeviceId = data[i].DeviceId;
                device.Value = data[i].Value;
                device.Timestamp = data[i].Timestamp;
...
    public class Device{
        public int DeviceId { get; set; }
        public double Value { get; set; }
        public DateTime Timestamp { get; set; }

Ahora puede probar el enlace entre la función local y la base de datos mediante la depuración (F5 en Visual Studio Code). La base de datos SQL debe ser accesible desde el equipo. Puede usar SSMS para comprobar la conectividad. A continuación, envíe solicitudes POST al punto de conexión local. Una solicitud con un cuerpo vacío debe devolver HTTP 204. Debe conservarse una solicitud con una carga real en la tabla de destino (en modo de reemplazo o actualización). Esta es una carga de ejemplo correspondiente al esquema usado en este ejemplo:

[{"DeviceId":3,"Value":13.4,"Timestamp":"2021-11-30T03:22:12.991Z"},{"DeviceId":4,"Value":41.4,"Timestamp":"2021-11-30T03:22:12.991Z"}]

La función ya se puede publicar en Azure. Establezca una configuración de aplicación para SqlConnectionString. El firewall de Azure SQL Server debe permitir servicios de Azure para que la función activa acceda a ellos.

Después, puede definir la función como una salida en el trabajo de ASA y usarla para reemplazar los registros en lugar de insertarlos.

Opción 2: Combinación con asignación compuesta (acumulación) por medio de una consulta SQL personalizada

Nota:

Tras reiniciar y recuperar, ASA podría volver a enviar eventos de salida que ya emitía. Este comportamiento puede hacer que se produzca un error en la lógica de acumulación (duplicando valores individuales). Para evitar este problema, genera los mismos datos en una tabla mediante la salida de SQL de ASA nativa. Puede usar esta tabla de control para detectar problemas y volver a sincronizar la acumulación cuando sea necesario.

Esta opción usa Microsoft.Data.SqlClient. Esta biblioteca le permite enviar consultas SQL a una instancia de SQL Database.

Este ejemplo se ha basado en:

En primer lugar, cree una aplicación de funciones HttpTrigger predeterminada con este tutorial. Se usa la siguiente información:

  • Lenguaje: C#
  • Runtime: .NET 6 (en función/tiempo de ejecución v4)
  • Plantilla: HTTP trigger

Instale la biblioteca SqlClient mediante la ejecución del siguiente comando en un terminal ubicado en la carpeta del proyecto:

dotnet add package Microsoft.Data.SqlClient --version 4.0.0

Agregue el elemento SqlConnectionString en la sección Values de local.settings.json y rellene la cadena de conexión del servidor de destino:

{
    "IsEncrypted": false,
    "Values": {
        "AzureWebJobsStorage": "UseDevelopmentStorage=true",
        "FUNCTIONS_WORKER_RUNTIME": "dotnet",
        "SqlConnectionString": "Your connection string"
    }
}

Reemplace toda la función (archivo .cs del proyecto) por el siguiente fragmento de código. Actualice el espacio de nombres, el nombre de clase y el nombre de función con los suyos:

using System;
using System.IO;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Extensions.Http;
using Microsoft.AspNetCore.Http;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;
using Microsoft.Data.SqlClient;

namespace Company.Function
{
    public static class HttpTrigger1{
        [FunctionName("HttpTrigger1")]
        public static async Task<IActionResult> Run(
            [HttpTrigger(AuthorizationLevel.Function, "get","post", Route = null)] HttpRequest req,
            ILogger log)
        {
            // Extract the body from the request
            string requestBody = await new StreamReader(req.Body).ReadToEndAsync();
            if (string.IsNullOrEmpty(requestBody)) {return new StatusCodeResult(204);} // 204, ASA connectivity check

            dynamic data = JsonConvert.DeserializeObject(requestBody);

            // Reject if too large, as per the doc
            if (data.ToString().Length > 262144) {return new StatusCodeResult(413);} //HttpStatusCode.RequestEntityTooLarge

            var SqlConnectionString = Environment.GetEnvironmentVariable("SqlConnectionString");
            using (SqlConnection conn = new SqlConnection(SqlConnectionString))
            {
                conn.Open();

                // Parse items and send to binding
                for (var i = 0; i < data.Count; i++)
                {
                    int DeviceId = data[i].DeviceId;
                    double Value = data[i].Value;
                    DateTime Timestamp = data[i].Timestamp;

                    var sqltext =
                    $"MERGE INTO [device_updated] AS old " +
                    $"USING (VALUES ({DeviceId},{Value},'{Timestamp}')) AS new (DeviceId, Value, Timestamp) " +
                    $"ON new.DeviceId = old.DeviceId " +
                    $"WHEN MATCHED THEN UPDATE SET old.Value += new.Value, old.Timestamp = new.Timestamp " +
                    $"WHEN NOT MATCHED BY TARGET THEN INSERT (DeviceId, Value, TimeStamp) VALUES (DeviceId, Value, Timestamp);";

                    //log.LogInformation($"Running {sqltext}");

                    using (SqlCommand cmd = new SqlCommand(sqltext, conn))
                    {
                        // Execute the command and log the # rows affected.
                        var rows = await cmd.ExecuteNonQueryAsync();
                        log.LogInformation($"{rows} rows updated");
                    }
                }
                conn.Close();
            }
            return new OkResult(); // 200
        }
    }
}

Actualice la sección de compilación de comandos sqltext para que coincida con su propio esquema (observe cómo se logra la acumulación por medio del operador += al actualizar):

    var sqltext =
    $"MERGE INTO [device_updated] AS old " +
    $"USING (VALUES ({DeviceId},{Value},'{Timestamp}')) AS new (DeviceId, Value, Timestamp) " +
    $"ON new.DeviceId = old.DeviceId " +
    $"WHEN MATCHED THEN UPDATE SET old.Value += new.Value, old.Timestamp = new.Timestamp " +
    $"WHEN NOT MATCHED BY TARGET THEN INSERT (DeviceId, Value, TimeStamp) VALUES (DeviceId, Value, Timestamp);";

Ahora puede probar el enlace entre la función local y la base de datos mediante la depuración (F5 en VS Code). La base de datos SQL debe ser accesible desde el equipo. Puede usar SSMS para comprobar la conectividad. A continuación, envíe solicitudes POST al punto de conexión local. Una solicitud con un cuerpo vacío debe devolver HTTP 204. Debe conservarse una solicitud con una carga real en la tabla de destino (en modo de acumulación o combinación). Esta es una carga de ejemplo correspondiente al esquema usado en este ejemplo:

[{"DeviceId":3,"Value":13.4,"Timestamp":"2021-11-30T03:22:12.991Z"},{"DeviceId":4,"Value":41.4,"Timestamp":"2021-11-30T03:22:12.991Z"}]

La función ya se puede publicar en Azure. Se debe establecer una configuración de la aplicación para SqlConnectionString. El firewall de Azure SQL Server debe permitir servicios de Azure para que la función activa acceda a ellos.

Luego la función se puede definir como una salida del trabajo de ASA y usarse para reemplazar registros en lugar de insertarlos.

Alternativas

Fuera de Azure Functions, varios métodos pueden lograr el resultado esperado. En esta sección se describen algunos de estos métodos.

Procesamiento posterior en la base de datos SQL de destino

Una tarea en segundo plano funciona una vez que los datos se insertan en la base de datos a través de las salidas de ASA SQL estándar.

Para Azure SQL, use INSTEAD OFDML desencadenadores para interceptar los comandos de INSERT que emite ASA.

CREATE TRIGGER tr_devices_updated_upsert ON device_updated INSTEAD OF INSERT
AS
BEGIN
	MERGE device_updated AS old
	
	-- In case of duplicates on the key below, use a subquery to make the key unique via aggregation or ranking functions
	USING inserted AS new
		ON new.DeviceId = old.DeviceId

	WHEN MATCHED THEN 
		UPDATE SET
			old.Value += new.Value, 
			old.Timestamp = new.Timestamp

	WHEN NOT MATCHED THEN
		INSERT (DeviceId, Value, Timestamp)
		VALUES (new.DeviceId, new.Value, new.Timestamp);  
END;

En Synapse SQL, ASA puede insertar en una tabla de almacenamiento provisional. Luego una tarea periódica puede transformar los datos según sea necesario en una tabla intermediaria. Por último, los datos se trasladan a la tabla de producción.

Procesamiento previo en Azure Cosmos DB

Azure Cosmos DB admite UPSERT de forma nativa. Aquí solo es posible anexar o reemplazar. Debe administrar acumulaciones del lado cliente en Azure Cosmos DB.

Si los requisitos coinciden, puede reemplazar la base de datos SQL de destino por una instancia de Azure Cosmos DB. Este cambio requiere un cambio importante en la arquitectura general de la solución.

Para Synapse SQL, puede usar Azure Cosmos DB como una capa intermedia a través de Azure Synapse Link para Azure Cosmos DB. Use Azure Synapse Link para crear un almacén analítico. Después, puede consultar este almacén de datos directamente en Synapse SQL.

Comparación de las alternativas

Cada enfoque ofrece diferentes propuestas de valor y funcionalidades:

Tipo Opción Modos Azure SQL Database Azure Synapse Analytics
Posprocesamiento
Desencadenadores Reemplazar, Agregar + N/D, los desencadenadores no están disponibles en Synapse SQL
Preparación Reemplazar, Acumular + +
Procesamiento previo
Azure Functions Reemplazar, Acumular + - (rendimiento fila a fila)
Sustitución de Azure Cosmos DB Reemplazar N/D N/D
Azure Cosmos DB Azure Synapse Link Reemplazar N/D +

Obtención de soporte técnico

Para obtener más ayuda, pruebe la página de preguntas y preguntas de Microsoft para Azure Stream Analytics.

Pasos siguientes