Nota
O acesso a esta página requer autorização. Pode tentar iniciar sessão ou alterar os diretórios.
O acesso a esta página requer autorização. Pode tentar alterar os diretórios.
Atualmente, Azure Stream Analytics (ASA) suporta apenas a inserção (anexação) de linhas às saídas SQL (SQL do Azure Bases de Dados e Azure Synapse Analytics). Este artigo discute soluções alternativas para ativar UPDATE, UPSERT ou MERGE em bases de dados SQL, usando Funções do Azure como camada intermédia.
As opções alternativas ao Funções do Azure são apresentadas no final.
Requisito
Pode escrever dados numa tabela usando um dos seguintes modos:
| Modo | Instrução T-SQL equivalente | Requisitos |
|---|---|---|
| Acrescentar | INSERT | Nenhuma |
| Substituir | MESCLAR (UPSERT) | Chave única |
| Acumular | MERGE (UPSERT) com operador de atribuição composta |
Chave e acumulador únicos |
Para ilustrar as diferenças, considere o que acontece ao ingerir os seguintes dois registos:
| Hora de Chegada | Id_Dispositivo | Valor_Medida |
|---|---|---|
| 10:00 | A | 1 |
| 10:05 | A | 20 |
No modo de anexar , inseres dois registos. A instrução T-SQL equivalente é:
INSERT INTO [target] VALUES (...);
Resultando em:
| Hora_Modificada | Id_Dispositivo | Measure_Value |
|---|---|---|
| 10:00 | A | 1 |
| 10:05 | A | 20 |
No modo de substituição , só recebes o último valor por chave. Aqui usas Device_Id como chave. A instrução equivalente do T-SQL é:
MERGE INTO [target] t
USING (VALUES ...) AS v (Modified_Time,Device_Id,Measure_Value)
ON t.Device_Key = v.Device_Id
-- Replace when the key exists
WHEN MATCHED THEN
UPDATE SET
t.Modified_Time = v.Modified_Time,
t.Measure_Value = v.Measure_Value
-- Insert new keys
WHEN NOT MATCHED BY t THEN
INSERT (Modified_Time,Device_Key,Measure_Value)
VALUES (v.Modified_Time,v.Device_Id,v.Measure_Value)
Resultando em:
| Modified_Time | Chave_do_Dispositivo | Valor_Medida |
|---|---|---|
| 10:05 | A | 20 |
Finalmente, no modo acumular soma Value com um operador de atribuição composta (+=). Aqui também usas Device_Id como chave:
MERGE INTO [target] t
USING (VALUES ...) AS v (Modified_Time,Device_Id,Measure_Value)
ON t.Device_Key = v.Device_Id
-- Replace and/or accumulate when the key exists
WHEN MATCHED THEN
UPDATE SET
t.Modified_Time = v.Modified_Time,
t.Measure_Value += v.Measure_Value
-- Insert new keys
WHEN NOT MATCHED BY t THEN
INSERT (Modified_Time,Device_Key,Measure_Value)
VALUES (v.Modified_Time,v.Device_Id,v.Measure_Value)
Resultando em:
| Modified_Time | Chave_do_Dispositivo | Measure_Value |
|---|---|---|
| 10:05 | A | 21 |
Para considerações de desempenho , os adaptadores de saída do banco de dados ASA SQL atualmente suportam apenas o modo de acréscimo nativamente. Esses adaptadores usam inserção em massa para maximizar a taxa de transferência e limitar a pressão de retorno.
Este artigo mostra como usar o Funções do Azure para implementar os modos Substituir e Acumular para ASA. Quando usas uma função como camada intermédia, o potencial desempenho de escrita não afeta o trabalho de streaming. Nesse sentido, usar o Funções do Azure funciona melhor com o SQL do Azure. Com o Synapse SQL, alternar de instruções em massa para instruções linha a linha pode criar maiores problemas de desempenho.
Saída do Funções do Azure
Neste trabalho, substituis a saída SQL ASA pela saída ASA Funções do Azure. A função implementa as capacidades UPDATE, UPSERT ou MERGE.
Atualmente, pode aceder a uma base de dados SQL numa função usando duas opções. A primeira opção é a vinculação de saída SQL do Azure. Atualmente está limitado a C# e só oferece modo de substituição. A segunda opção é compor uma consulta SQL para submeter através do driver apropriado SQL (Microsoft. Data.SqlClient para .NET).
Ambos os exemplos seguintes assumem o seguinte esquema de tabela. A opção de vinculação requer que uma chave primária seja definida na tabela de destino. Não é necessário, mas recomendado, ao usar um driver SQL.
CREATE TABLE [dbo].[device_updated](
[DeviceId] [bigint] NOT NULL, -- bigint in ASA
[Value] [decimal](18, 10) NULL, -- float in ASA
[Timestamp] [datetime2](7) NULL, -- datetime in ASA
CONSTRAINT [PK_device_updated] PRIMARY KEY CLUSTERED
(
[DeviceId] ASC
)
);
Para usar uma função como saída do ASA, a função deve cumprir as seguintes expectativas:
- O Azure Stream Analytics espera o estado HTTP 200 da aplicação Functions para lotes que processa com sucesso.
- Quando o Azure Stream Analytics recebe uma exceção 413 ("http Request Entity Too Large") de uma função do Azure, reduz o tamanho dos lotes que envia para a função Azure.
- Durante a ligação de teste, o Stream Analytics envia um pedido POST com um lote vazio para o Funções do Azure e espera o estado HTTP 20x de volta para validar o teste.
Opção 1: Atualizar por chave com a Vinculação SQL da Função Azure
Esta opção utiliza o Binding de Saída SQL da Função Azure. Esta extensão pode substituir um objeto numa tabela sem que tenha de escrever uma instrução SQL. No momento, ele não suporta operadores de atribuição compostos (acumulações).
Esta amostra baseou-se em:
- Funções do Azure runtime versão 4
- .NET 6,0
- Microsoft. Azure. WebJobs.Extensions.Sql 0.1.131-preview
Para melhor compreender a abordagem de vinculação, siga este tutorial.
Primeiro, crie um aplicativo de função HttpTrigger padrão seguindo este tutorial. Utilize as seguintes informações:
- Idioma:
C# - Tempo de execução:
.NET 6(em função/tempo de execução v4) - Modelo:
HTTP trigger
Instale a extensão de vinculação executando o seguinte comando em um terminal localizado na pasta do projeto:
dotnet add package Microsoft.Azure.WebJobs.Extensions.Sql --prerelease
Adicione o SqlConnectionString item na Values seção do seu local.settings.json, preenchendo a cadeia de conexão do servidor de destino:
{
"IsEncrypted": false,
"Values": {
"AzureWebJobsStorage": "UseDevelopmentStorage=true",
"FUNCTIONS_WORKER_RUNTIME": "dotnet",
"SqlConnectionString": "Your connection string"
}
}
Substitua toda a função (arquivo .cs no projeto) pelo trecho de código a seguir. Atualize o namespace, o nome da classe e o nome da função com os seus próprios:
using System;
using System.IO;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Extensions.Http;
using Microsoft.AspNetCore.Http;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;
namespace Company.Function
{
public static class HttpTrigger1{
[FunctionName("HttpTrigger1")]
public static async Task<IActionResult> Run (
// http trigger binding
[HttpTrigger(AuthorizationLevel.Function, "get","post", Route = null)] HttpRequest req,
ILogger log,
[Sql("dbo.device_updated", ConnectionStringSetting = "SqlConnectionString")] IAsyncCollector<Device> devices
)
{
// Extract the body from the request
string requestBody = await new StreamReader(req.Body).ReadToEndAsync();
if (string.IsNullOrEmpty(requestBody)) {return new StatusCodeResult(204);} // 204, ASA connectivity check
dynamic data = JsonConvert.DeserializeObject(requestBody);
// Reject if too large, as per the doc
if (data.ToString().Length > 262144) {return new StatusCodeResult(413);} //HttpStatusCode.RequestEntityTooLarge
// Parse items and send to binding
for (var i = 0; i < data.Count; i++)
{
var device = new Device();
device.DeviceId = data[i].DeviceId;
device.Value = data[i].Value;
device.Timestamp = data[i].Timestamp;
await devices.AddAsync(device);
}
await devices.FlushAsync();
return new OkResult(); // 200
}
}
public class Device{
public int DeviceId { get; set; }
public double Value { get; set; }
public DateTime Timestamp { get; set; }
}
}
Atualize o nome da tabela de destino na seção de vinculação:
[Sql("dbo.device_updated", ConnectionStringSetting = "SqlConnectionString")] IAsyncCollector<Device> devices
Atualize a seção da classe Device e de mapeamento para corresponder ao seu próprio esquema.
...
device.DeviceId = data[i].DeviceId;
device.Value = data[i].Value;
device.Timestamp = data[i].Timestamp;
...
public class Device{
public int DeviceId { get; set; }
public double Value { get; set; }
public DateTime Timestamp { get; set; }
Agora pode testar as ligações entre a função local e a base de dados executando a depuração (F5 no Visual Studio Code). O banco de dados SQL precisa ser acessível a partir de sua máquina. Podes usar o SSMS para verificar a conectividade. Em seguida, envie solicitações POST para o endpoint local. Um pedido com corpo vazio deve devolver HTTP 204. Uma solicitação com uma carga útil real deve ser mantida na tabela de destino (no modo de substituição/atualização). Aqui está um exemplo de carga útil correspondente ao esquema usado neste exemplo:
[{"DeviceId":3,"Value":13.4,"Timestamp":"2021-11-30T03:22:12.991Z"},{"DeviceId":4,"Value":41.4,"Timestamp":"2021-11-30T03:22:12.991Z"}]
A função agora pode ser publicada no Azure. Defina uma configuração de aplicação para SqlConnectionString. O firewall do SQL Server do Azure deve permitir que os serviços do Azure entrem para que a função ativa possa alcançá-lo.
Pode então definir a função como uma saída no trabalho ASA e usá-la para substituir registos em vez de os inserir.
Opção 2: Combinar com atribuição composta (acumulação) através de uma consulta SQL personalizada.
Nota
Ao reiniciar e recuperar, o ASA pode reenviar eventos de saída que já emitiu. Este comportamento pode causar a falha da lógica de acumulação (duplicando valores individuais). Para evitar este problema, gere os mesmos dados numa tabela usando o ASA SQL Output nativo. Pode usar esta tabela de controlo para detetar problemas e ressincronizar a acumulação quando necessário.
Esta opção usa Microsoft.Data.SqlClient. Esta biblioteca permite-lhe enviar quaisquer consultas SQL para uma base de dados SQL.
Esta amostra baseou-se em:
Primeiro, crie um aplicativo de função HttpTrigger padrão seguindo este tutorial. São utilizadas as seguintes informações:
- Idioma:
C# - Tempo de execução:
.NET 6(em função/tempo de execução v4) - Modelo:
HTTP trigger
Instale a biblioteca SqlClient executando o seguinte comando em um terminal localizado na pasta do projeto:
dotnet add package Microsoft.Data.SqlClient --version 4.0.0
Adicione o SqlConnectionString item na Values seção do seu local.settings.json, preenchendo a cadeia de conexão do servidor de destino:
{
"IsEncrypted": false,
"Values": {
"AzureWebJobsStorage": "UseDevelopmentStorage=true",
"FUNCTIONS_WORKER_RUNTIME": "dotnet",
"SqlConnectionString": "Your connection string"
}
}
Substitua toda a função (arquivo .cs no projeto) pelo trecho de código a seguir. Atualize o namespace, o nome da classe e o nome da função por conta própria:
using System;
using System.IO;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Extensions.Http;
using Microsoft.AspNetCore.Http;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;
using Microsoft.Data.SqlClient;
namespace Company.Function
{
public static class HttpTrigger1{
[FunctionName("HttpTrigger1")]
public static async Task<IActionResult> Run(
[HttpTrigger(AuthorizationLevel.Function, "get","post", Route = null)] HttpRequest req,
ILogger log)
{
// Extract the body from the request
string requestBody = await new StreamReader(req.Body).ReadToEndAsync();
if (string.IsNullOrEmpty(requestBody)) {return new StatusCodeResult(204);} // 204, ASA connectivity check
dynamic data = JsonConvert.DeserializeObject(requestBody);
// Reject if too large, as per the doc
if (data.ToString().Length > 262144) {return new StatusCodeResult(413);} //HttpStatusCode.RequestEntityTooLarge
var SqlConnectionString = Environment.GetEnvironmentVariable("SqlConnectionString");
using (SqlConnection conn = new SqlConnection(SqlConnectionString))
{
conn.Open();
// Parse items and send to binding
for (var i = 0; i < data.Count; i++)
{
int DeviceId = data[i].DeviceId;
double Value = data[i].Value;
DateTime Timestamp = data[i].Timestamp;
var sqltext =
$"MERGE INTO [device_updated] AS old " +
$"USING (VALUES ({DeviceId},{Value},'{Timestamp}')) AS new (DeviceId, Value, Timestamp) " +
$"ON new.DeviceId = old.DeviceId " +
$"WHEN MATCHED THEN UPDATE SET old.Value += new.Value, old.Timestamp = new.Timestamp " +
$"WHEN NOT MATCHED BY TARGET THEN INSERT (DeviceId, Value, TimeStamp) VALUES (DeviceId, Value, Timestamp);";
//log.LogInformation($"Running {sqltext}");
using (SqlCommand cmd = new SqlCommand(sqltext, conn))
{
// Execute the command and log the # rows affected.
var rows = await cmd.ExecuteNonQueryAsync();
log.LogInformation($"{rows} rows updated");
}
}
conn.Close();
}
return new OkResult(); // 200
}
}
}
Atualize a seção de construção do comando sqltext para corresponder ao seu próprio esquema (note como a acumulação é obtida através do operador += na atualização):
var sqltext =
$"MERGE INTO [device_updated] AS old " +
$"USING (VALUES ({DeviceId},{Value},'{Timestamp}')) AS new (DeviceId, Value, Timestamp) " +
$"ON new.DeviceId = old.DeviceId " +
$"WHEN MATCHED THEN UPDATE SET old.Value += new.Value, old.Timestamp = new.Timestamp " +
$"WHEN NOT MATCHED BY TARGET THEN INSERT (DeviceId, Value, TimeStamp) VALUES (DeviceId, Value, Timestamp);";
Agora pode testar a ligação entre a função local e o banco de dados utilizando a depuração (F5 no VS Code). O banco de dados SQL precisa ser acessível a partir de sua máquina. Podes usar o SSMS para verificar a conectividade. Em seguida, envie solicitações POST para o endpoint local. Um pedido com corpo vazio deve devolver HTTP 204. Uma solicitação com uma carga útil real deve ser mantida na tabela de destino (no modo acumular/mesclar). Aqui está um exemplo de carga útil correspondente ao esquema usado neste exemplo:
[{"DeviceId":3,"Value":13.4,"Timestamp":"2021-11-30T03:22:12.991Z"},{"DeviceId":4,"Value":41.4,"Timestamp":"2021-11-30T03:22:12.991Z"}]
A função agora pode ser publicada no Azure. Uma configuração de aplicativo deve ser definida para SqlConnectionString. O firewall do SQL Server do Azure deve permitir os serviços do Azure para que a função ao vivo possa alcançá-lo.
A função pode então ser definida como uma saída no trabalho ASA e usada para substituir registros em vez de inseri-los.
Alternativas
Fora do Funções do Azure, múltiplos métodos podem alcançar o resultado esperado. Esta secção descreve alguns destes métodos.
Pós-processamento no Banco de Dados SQL de destino
Uma tarefa em segundo plano funciona quando os dados são inseridos no banco de dados por meio das saídas padrão ASA SQL.
Para SQL do Azure, usa os gatilhos INSTEAD OFDML para intercetar os comandos INSERT que o ASA emite.
CREATE TRIGGER tr_devices_updated_upsert ON device_updated INSTEAD OF INSERT
AS
BEGIN
MERGE device_updated AS old
-- In case of duplicates on the key below, use a subquery to make the key unique via aggregation or ranking functions
USING inserted AS new
ON new.DeviceId = old.DeviceId
WHEN MATCHED THEN
UPDATE SET
old.Value += new.Value,
old.Timestamp = new.Timestamp
WHEN NOT MATCHED THEN
INSERT (DeviceId, Value, Timestamp)
VALUES (new.DeviceId, new.Value, new.Timestamp);
END;
Para Synapse SQL, o ASA pode inserir dados em uma tabela de estágio. Uma tarefa recorrente pode então transformar os dados, conforme necessário, em uma tabela intermediária. Finalmente, os dados são movidos para a tabela de produção.
Pré-processamento no Azure Cosmos DB
O Azure Cosmos DB suporta UPSERT nativamente. Aqui, só é possível acrescentar ou substituir. Deve gerir acumulações no lado do cliente no Azure Cosmos DB.
Se os requisitos coincidirem, podes substituir a base de dados SQL alvo por uma instância do Azure Cosmos DB. Esta mudança requer uma alteração importante na arquitetura global da solução.
Para Synapse SQL, podes usar Azure Cosmos DB como camada intermédia via Azure Synapse Link para Azure Cosmos DB. Use o Azure Synapse Link para criar um armazenamento analítico. Pode então consultar este armazenamento de dados diretamente no Synapse SQL.
Comparação das alternativas
Cada abordagem oferece propostas de valor e capacidades diferentes:
| Tipo | Opção | Modos | Base de Dados SQL do Azure | Azure Synapse Analytics |
|---|---|---|---|---|
| Pós-processamento | ||||
| Acionadores | Substituir, Acumular | + | N/A, os gatilhos não estão disponíveis no Synapse SQL | |
| Processo de teste | Substituir, Acumular | + | + | |
| Pré-processamento | ||||
| Funções do Azure | Substituir, Acumular | + | - (desempenho linha a linha) | |
| Substituir Azure Cosmos DB | Substituir | N/A | N/A | |
| Azure Cosmos DB Azure Synapse Link | Substituir | N/A | + |
Obter suporte
Para obter mais assistência, tente a página de perguntas Microsoft Q&A do Azure Stream Analytics.
Próximos passos
- Entender as saídas do Azure Stream Analytics
- Saída do Azure Stream Analytics para o Banco de Dados SQL do Azure
- Aumentar o desempenho da taxa de transferência da Base de Dados SQL do Azure com o Azure Stream Analytics
- Usar identidades gerenciadas para acessar o Banco de Dados SQL do Azure ou o Azure Synapse Analytics a partir de um trabalho do Azure Stream Analytics
- Usar dados de referência de um Banco de Dados SQL para um trabalho do Azure Stream Analytics
- Executar Funções do Azure em tarefas de Azure Stream Analytics - Tutorial para output Redis
- Guia de início rápido: criar um trabalho do Stream Analytics usando o portal do Azure