Tutorial: Encaminhar dados usando políticas de atualização de tabelas

Alterne entre serviços usando a lista suspensa Version. Saiba mais sobre navegação.
Aplica-se a: ✅ Microsoft Fabric ✅ Azure Data Explorer

Quando os seus dados de origem envolvem transformações simples e rápidas, realize-as a montante no pipeline usando um fluxo de eventos. No entanto, esta abordagem pode não funcionar bem para outras transformações que sejam complexas ou que exijam funcionalidades especializadas para funcionar.

Neste tutorial, aprenderás como:

O exemplo neste tutorial demonstra como usar políticas de atualização para o encaminhamento de dados para realizar transformações complexas e enriquecer, limpar e transformar dados no momento da ingestão. Para uma lista de outros casos de uso comuns, veja Casos de uso comuns para políticas de atualização de tabelas.

Pré-requisitos

  • Uma conta Microsoft ou uma identidade de utilizador do Microsoft Entra. Uma assinatura do Azure não é necessária.

Passo 1 - Criar tabelas e atualizar políticas

Os passos seguintes guiam-no na criação de uma tabela de origem, funções de transformação, tabelas de destino e políticas de atualização. O tutorial demonstra como usar políticas de atualização de tabela para realizar transformações complexas e guardar os resultados numa ou mais tabelas de destino. O exemplo utiliza uma única tabela de origem chamada Raw_Table e três tabelas de destino denominadas Device_Telemetry, Device_Alarms e Error_Log.

  1. Execute o comando seguinte para criar uma tabela chamada Raw_Table.

    .create table Raw_Table (RawData: dynamic)
    

    A tabela de origem é onde os dados ingeridos são guardados. A tabela tem uma única coluna chamada RawData de tipo dinâmico. O tipo dinâmico armazena os dados brutos tal como estão, sem qualquer esquema. Para mais informações, consulte o comando .create table.

  2. Execute o comando seguinte para criar funções chamadas Get_Telemetry, Get_Alarms e Log_Error.

    .execute database script <|
      .create-or-alter function Get_Telemetry() {
        Raw_Table
        | where todynamic(RawData).MessageType == 'Telemetry'
        | extend
          Timestamp = unixtime_seconds_todatetime(tolong(RawData.Timestamp)),
          DeviceId = tostring(RawData.DeviceId),
          DeviceType = tostring(RawData.DeviceType),
          SensorName = tostring(RawData.SensorName),
          SensorValue = toreal(RawData.SensorValue),
          SensorUnit = tostring(RawData.SensorUnit)
        | project-away RawData
      }
      .create-or-alter function Get_Alarms() {
        Raw_Table
        | where RawData.MessageType == 'Alarms'
        | extend
          Timestamp = unixtime_seconds_todatetime(tolong(RawData.Timestamp)),
          DeviceId = tostring(RawData.DeviceId),
          DeviceType = tostring(RawData.DeviceTpe) ,
          AlarmType = tostring(RawData.AlarmType)
        | project-away RawData
      }
      .create-or-alter function Log_Error() {
        Raw_Table
        | where RawData.MessageType !in ('Telemetry', 'Alarms')
        | extend
          TimeStamp = datetime(now),
          ErrorType = 'Unknown MessageType'
        | project TimeStamp, RawData, ErrorType
      }
    

    Ao criar uma política de atualização, pode especificar um script inline para execução. No entanto, encapsula a lógica de transformação numa função. Usar uma função melhora a manutenção do código. Quando chegam novos dados, a função é executada para transformar os dados. A função pode ser reutilizada através de várias políticas de atualização. Para mais informações, consulte o comando .create function.

  3. Execute o comando seguinte para criar as tabelas de destino.

    .execute database script <|
      .create table Device_Telemetry (Timestamp: datetime, DeviceId: string, DeviceType: string, SensorName: string, SensorValue: real, SensorUnit: string)
      .set-or-append Device_Alarms <| Get_Alarms | take 0
      .set-or-append Error_Log <| Log_Error | take 0
    

    A tabela de destino deve ter o mesmo esquema que a saída da função de transformação. Pode criar tabelas de destino das seguintes formas:

    • Use o .create table comando e especifique manualmente o esquema conforme demonstrado na criação da tabela Device_Telemetry . No entanto, esta abordagem pode ser propensa a erros e demorada.
    • Use o .set-or-append comando se já criou uma função para transformar os dados. Este método cria uma nova tabela com o mesmo esquema que a saída da função, garantindo take 0 que a função apenas devolve o esquema. Para mais informações, consulte o comando .set-or-append.
  4. Execute o seguinte comando para criar as políticas de atualização para as tabelas de destino.

    .execute database script <|
      .alter table Device_Telemetry policy update "[{\"IsEnabled\":true,\"Source\":\"Raw_Table\",\"Query\":\"Get_Telemetry\",\"IsTransactional\":false,\"PropagateIngestionProperties\":true,\"ManagedIdentity\":null}]"
      .alter table Device_Alarms policy update "[{\"IsEnabled\":true,\"Source\":\"Raw_Table\",\"Query\":\"Get_Alarms\",\"IsTransactional\":false,\"PropagateIngestionProperties\":true,\"ManagedIdentity\":null}]"
      .alter table Error_Log policy update "[{\"IsEnabled\":true,\"Source\":\"Raw_Table\",\"Query\":\"Log_Error\",\"IsTransactional\":false,\"PropagateIngestionProperties\":true,\"ManagedIdentity\":null}]"
    

    Use o .alter table policy update comando para ligar a tabela de origem, a função de transformação e a tabela de destino. Crie a política de atualização na tabela de destino e especifique a tabela de origem e a função de transformação. Para mais informações, consulte .alter table policy update comando.

Passo 2 - Ingerir dados da amostra

Para testar as políticas de atualização, ingera dados de exemplo na tabela de origem usando o .set-or-append comando. Para mais informações, consulte Ingerir dados de uma consulta.

.set-or-append Raw_Table <|
  let Raw_Stream = datatable(RawData: dynamic)
    [
    dynamic({"TimeStamp": 1691757932, "DeviceId": "Sensor01", "MessageType": "Telemetry", "DeviceType": "Laminator", "SensorName": "Temperature", "SensorValue": 78.3, "SensorUnit": "Celcius"}),
    dynamic({"TimeStamp": 1691757932, "DeviceId": "Sensor01", "MessageType": "Alarms", "DeviceType": "Laminator", "AlarmType": "Temperature threshold breached"}),
    dynamic({"TimeStamp": 1691757932, "DeviceId": "Sensor01", "MessageType": "Foo", "ErrorType": "Unknown"})
  ];
  Raw_Stream

Passo 3 - Verificar os resultados

Para validar os resultados, execute uma consulta para verificar se os dados foram transformados e encaminhados para as tabelas de destino. No exemplo seguinte, o union operador combina a fonte e os resultados das tabelas de destino num único conjunto de resultados.

Raw_Table | summarize Rows=count() by TableName = "Raw_Table"
| union (Device_Telemetry | summarize Rows=count() by TableName = "Device_Telemetry")
| union (Device_Alarms | summarize Rows=count() by TableName = "Device_Alarms")
| union (Error_Log | summarize Rows=count() by TableName = "Error_Log")
| sort by Rows desc

Output

Deves ver o resultado seguinte, onde o Raw_Table tem três linhas e as tabelas de destino têm uma linha cada.

TableName Rows
Raw_Table 3
Registo de Erro 1
Alarmes_de_Dispositivo 1
Telemetria_de_Dispositivo 1

Passo 4 - Limpar recursos

Execute o seguinte comando na sua base de dados para limpar as tabelas e funções criadas neste tutorial.

.execute database script <|
  .drop table Raw_Table
  .drop table Device_Telemetry
  .drop table Device_Alarms
  .drop table Error_Log
  .drop function Get_Telemetry
  .drop function Get_Alarms
  .drop function Log_Error