Tutorial: Rotear dados usando políticas de atualização de tabela

Alterne os serviços usando a lista suspensa Versão. Saiba mais sobre navegação.
Aplica-se a: ✅ Microsoft Fabric ✅ Azure Data Explorer

Quando os dados de origem envolvem transformações simples e rápidas, execute-os upstream no pipeline usando um fluxo de eventos. No entanto, essa abordagem pode não funcionar bem para outras transformações complexas ou que exigem funcionalidade especializada para operar.

Neste tutorial, você aprenderá como:

O exemplo neste tutorial demonstra como usar políticas de atualização para roteamento de dados para executar transformações complexas para enriquecer, limpar e transformar dados no momento da ingestão. Para obter uma lista de outros casos de uso comuns, consulte Casos de uso comuns para políticas de atualização de tabela.

Pré-requisitos

  • Uma conta Microsoft ou uma identidade de usuário do Microsoft Entra. Uma assinatura do Azure não é necessária.

Etapa 1 – Criar tabelas e políticas de atualização

As etapas a seguir orientam você na criação de uma tabela de origem, funções de transformação, tabelas de destino e políticas de atualização. O tutorial demonstra como usar políticas de atualização de tabela para executar transformações complexas e salvar os resultados em uma ou mais tabelas de destino. O exemplo usa uma única tabela de origem chamada Raw_Table e três tabelas de destino chamadas Device_Telemetry, Device_Alarms e Error_Log.

  1. Execute o comando a seguir para criar uma tabela chamada Raw_Table.

    .create table Raw_Table (RawData: dynamic)
    

    A tabela de origem é onde os dados ingeridos são salvos. A tabela tem uma única coluna chamada RawData do tipo dinâmico. O tipo dinâmico armazena os dados brutos como estão, sem nenhum esquema. Para obter mais informações, consulte o comando .create table.

  2. Execute o comando a seguir para criar funções chamadas Get_Telemetry, Get_Alarms e Log_Error.

    .execute database script <|
      .create-or-alter function Get_Telemetry() {
        Raw_Table
        | where todynamic(RawData).MessageType == 'Telemetry'
        | extend
          Timestamp = unixtime_seconds_todatetime(tolong(RawData.Timestamp)),
          DeviceId = tostring(RawData.DeviceId),
          DeviceType = tostring(RawData.DeviceType),
          SensorName = tostring(RawData.SensorName),
          SensorValue = toreal(RawData.SensorValue),
          SensorUnit = tostring(RawData.SensorUnit)
        | project-away RawData
      }
      .create-or-alter function Get_Alarms() {
        Raw_Table
        | where RawData.MessageType == 'Alarms'
        | extend
          Timestamp = unixtime_seconds_todatetime(tolong(RawData.Timestamp)),
          DeviceId = tostring(RawData.DeviceId),
          DeviceType = tostring(RawData.DeviceTpe) ,
          AlarmType = tostring(RawData.AlarmType)
        | project-away RawData
      }
      .create-or-alter function Log_Error() {
        Raw_Table
        | where RawData.MessageType !in ('Telemetry', 'Alarms')
        | extend
          TimeStamp = datetime(now),
          ErrorType = 'Unknown MessageType'
        | project TimeStamp, RawData, ErrorType
      }
    

    Ao criar uma política de atualização, você pode especificar um script embutido para execução. No entanto, encapsule a lógica de transformação em uma função. O uso de uma função melhora a manutenção de código. Quando novos dados chegam, a função é executada para transformar os dados. A função pode ser reutilizada em várias políticas de atualização. Para obter mais informações, consulte o comando .create function.

  3. Execute o comando a seguir para criar as tabelas de destino.

    .execute database script <|
      .create table Device_Telemetry (Timestamp: datetime, DeviceId: string, DeviceType: string, SensorName: string, SensorValue: real, SensorUnit: string)
      .set-or-append Device_Alarms <| Get_Alarms | take 0
      .set-or-append Error_Log <| Log_Error | take 0
    

    A tabela de destino deve ter o mesmo esquema que a saída da função de transformação. Você pode criar tabelas de destino das seguintes maneiras:

    • Use o .create table comando e especifique manualmente o esquema, conforme demonstrado com a criação da tabela Device_Telemetry . No entanto, essa abordagem pode ser propensa a erros e demorada.
    • Use o .set-or-append comando se você já criou uma função para transformar os dados. Esse método cria uma nova tabela com o mesmo esquema que a saída da função, usando take 0 para garantir que a função retorne apenas o esquema. Para obter mais informações, consulte o comando .set-or-append.
  4. Execute o comando a seguir para criar as políticas de atualização para as tabelas de destino.

    .execute database script <|
      .alter table Device_Telemetry policy update "[{\"IsEnabled\":true,\"Source\":\"Raw_Table\",\"Query\":\"Get_Telemetry\",\"IsTransactional\":false,\"PropagateIngestionProperties\":true,\"ManagedIdentity\":null}]"
      .alter table Device_Alarms policy update "[{\"IsEnabled\":true,\"Source\":\"Raw_Table\",\"Query\":\"Get_Alarms\",\"IsTransactional\":false,\"PropagateIngestionProperties\":true,\"ManagedIdentity\":null}]"
      .alter table Error_Log policy update "[{\"IsEnabled\":true,\"Source\":\"Raw_Table\",\"Query\":\"Log_Error\",\"IsTransactional\":false,\"PropagateIngestionProperties\":true,\"ManagedIdentity\":null}]"
    

    Use o .alter table policy update comando para vincular a tabela de origem, a função de transformação e a tabela de destino. Crie a política de atualização na tabela de destino e especifique a tabela de origem e a função de transformação. Para obter mais informações, consulte o comando .alter table policy update.

Etapa 2 – Ingerir dados de exemplo

Para testar as políticas de atualização, ingera dados de exemplo na tabela de origem usando o .set-or-append comando. Para obter mais informações, consulte Ingestão de dados de uma consulta.

.set-or-append Raw_Table <|
  let Raw_Stream = datatable(RawData: dynamic)
    [
    dynamic({"TimeStamp": 1691757932, "DeviceId": "Sensor01", "MessageType": "Telemetry", "DeviceType": "Laminator", "SensorName": "Temperature", "SensorValue": 78.3, "SensorUnit": "Celcius"}),
    dynamic({"TimeStamp": 1691757932, "DeviceId": "Sensor01", "MessageType": "Alarms", "DeviceType": "Laminator", "AlarmType": "Temperature threshold breached"}),
    dynamic({"TimeStamp": 1691757932, "DeviceId": "Sensor01", "MessageType": "Foo", "ErrorType": "Unknown"})
  ];
  Raw_Stream

Etapa 3 – Verificar os resultados

Para validar os resultados, execute uma consulta para verificar se os dados foram transformados e roteados para as tabelas de destino. No exemplo a seguir, o union operador combina a origem e os resultados das tabelas de destino em um único conjunto de resultados.

Raw_Table | summarize Rows=count() by TableName = "Raw_Table"
| union (Device_Telemetry | summarize Rows=count() by TableName = "Device_Telemetry")
| union (Device_Alarms | summarize Rows=count() by TableName = "Device_Alarms")
| union (Error_Log | summarize Rows=count() by TableName = "Error_Log")
| sort by Rows desc

Saída

Você deverá ver a saída a seguir em que o Raw_Table tem três linhas e as tabelas de destino têm uma linha cada.

TableName Rows
Raw_Table 3
Log_de_Erros 1
Alarmes_do_Dispositivo 1
Telemetria_do_Dispositivo 1

Etapa 4 – Limpar recursos

Execute o comando a seguir em seu banco de dados para limpar as tabelas e as funções criadas neste tutorial.

.execute database script <|
  .drop table Raw_Table
  .drop table Device_Telemetry
  .drop table Device_Alarms
  .drop table Error_Log
  .drop function Get_Telemetry
  .drop function Get_Alarms
  .drop function Log_Error