Tutorial: Executar o Funções do Azure a partir de trabalhos do Azure Stream Analytics

Neste tutorial, você cria um trabalho do Azure Stream Analytics que lê eventos dos Hubs de Eventos do Azure, executa uma consulta nos dados do evento e invoca uma função do Azure, que grava em uma instância do Cache do Azure para Redis.

Captura de tela que mostra a relação entre os serviços do Azure na solução.

Nota

  • Você pode executar o Funções do Azure a partir do Azure Stream Analytics configurando o Functions como um dos coletores (saídas) para o trabalho do Stream Analytics. As Funções são uma experiência de cálculo a pedido orientada por eventos que lhe permite implementar o código que é acionado pelos eventos que ocorrem no Azure ou em serviços de terceiros. Esta capacidade que as Funções têm de responder a acionadores torna-as numa saída natural para as tarefas do Stream Analytics.
  • O Stream Analytics invoca Funções através de acionadores HTTP. O adaptador de saída das Funções permite aos utilizadores ligarem Funções ao Stream Analytics, para que os eventos possam ser acionados com base em consultas do Stream Analytics.
  • A ligação ao Funções do Azure dentro de uma rede virtual (VNet) a partir de um trabalho de Stream Analytics que está a correr num cluster multitenant não é suportada.

Neste tutorial, irá aprender a:

  • Criar uma instância dos Hubs de Eventos do Azure
  • Criar uma instância do Cache do Azure para Redis
  • Criar uma Função do Azure
  • Criar uma tarefa do Stream Analytics
  • Configurar o hub de eventos como entrada e função como saída
  • Executar a tarefa do Stream Analytics
  • Verifique no Azure Cache para Redis os resultados

Se não tiver uma subscrição do Azure, crie uma conta gratuita antes de começar.

Pré-requisitos

Antes de começar, certifique-se de que completa os seguintes passos:

Iniciar sessão no Azure

Inicie sessão no portal do Azure.

Criar um hub de eventos

Você precisa enviar alguns dados de exemplo para um hub de eventos antes que o Stream Analytics possa analisar o fluxo de dados de chamadas fraudulentas. Neste tutorial, você envia dados para o Azure usando os Hubs de Eventos do Azure.

Use as seguintes etapas para criar um hub de eventos e enviar dados de chamada para esse hub de eventos:

  1. Inicie sessão no portal do Azure.

  2. Selecione Todos os serviços no menu à esquerda, selecione Internet das coisas, passe o mouse sobre Hubs de Eventos e selecione o botão + (Adicionar ).

    Captura de tela mostrando a página de criação de Hubs de Eventos.

  3. Na página Criar namespace, execute estas etapas:

    1. Selecione uma assinatura do Azure onde você deseja criar o hub de eventos.

    2. Em Grupo de recursos, selecione Criar novo e insira um nome para o grupo de recursos. O namespace Event Hubs é criado neste grupo de recursos.

    3. Em Nome do namespace, insira um nome exclusivo para o namespace Hubs de Eventos.

    4. Em Local, selecione a região na qual você deseja criar o namespace.

    5. Em Nível de preço, selecione Padrão.

    6. Selecione Rever + criar na parte inferior da página.

      Captura de ecrã mostrando a página Criar Namespace.

    7. Na página Revisão + criação do assistente de criação de namespace, selecione Criar na parte inferior da página depois de revisar todas as configurações.

  4. Depois que o namespace for implantado com êxito, selecione Ir para o recurso para navegar até a página Namespace dos Hubs de Eventos.

  5. Na página do namespace Event Hubs , selecione +Event Hub na barra de comandos.

    Captura de tela mostrando o botão Adicionar hub de eventos na página Namespace de Hubs de Eventos.

  6. Na página Criar Hub de Eventos, insira um Nome para o hub de eventos. Defina a contagem de partições como 2. Use as opções padrão nas configurações restantes e selecione Revisar + criar.

    Captura de tela mostrando a página Criar hub de eventos.

  7. Na página Rever + criar, selecione Criar na parte inferior da página. Em seguida, aguarde até que a implementação seja executada com êxito.

Conceda acesso ao hub de eventos e obtenha uma cadeia de conexão

Antes que um aplicativo possa enviar dados para os Hubs de Eventos do Azure, o hub de eventos deve ter uma política que permita o acesso. A política de acesso produz uma cadeia de ligação que inclui as informações de autorização.

  1. Na página do namespace dos Centros de Eventos , selecione Políticas de acesso partilhado no menu esquerdo.

  2. Selecione RootManageSharedAccessKey na lista de políticas.

    Captura de ecrã que mostra a página Políticas de acesso partilhado.

  3. Em seguida, selecione o botão de cópia ao lado de string de conexão - chave primária.

  4. Cole a cadeia de ligação num editor de texto. Vai precisar desta cadeia de ligação na secção seguinte.

    A cadeia de ligação é semelhante à seguinte:

    Endpoint=sb://<Your event hub namespace>.servicebus.windows.net/;SharedAccessKeyName=<Your shared access policy name>;SharedAccessKey=<generated key>

    Observe que a cadeia de conexão contém vários pares chave-valor separados por ponto-e-vírgula: Endpoint, SharedAccessKeyName e SharedAccessKey.

Iniciar a aplicação geradora de eventos

Antes de iniciar a aplicação TelcoGenerator, deve configurá-la para enviar dados para os Hubs de Eventos do Azure que criou anteriormente.

  1. Extraia o conteúdo do ficheiro TelcoGenerator.zip.

  2. Abra o TelcoGenerator\TelcoGenerator\telcodatagen.exe.config arquivo em um editor de texto de sua escolha Há mais de um .config arquivo, então certifique-se de abrir o correto.

  3. Atualize o elemento <appSettings> no ficheiro de configuração com os seguintes detalhes:

    • Defina o valor da chave EventHubName para o nome do hub de eventos que criou na secção anterior.
    • Defina o valor da chave Microsoft.ServiceBus.ConnectionString para a cadeia de conexão do namespace. Se estiver a utilizar uma string de conexão para um hub de eventos, e não um namespace, remova o valor EntityPath;EntityPath=myeventhub no final. Não se esqueça de remover o ponto-e-vírgula que precede o valor EntityPath.
  4. Guarde o ficheiro.

  5. Em seguida, abra uma janela de comando e mude para a pasta onde deszipou a aplicação TelcoGenerator. Em seguida, introduza o seguinte comando:

    .\telcodatagen.exe 1000 0.2 2
    

    Este comando recebe os seguintes parâmetros:

    • Número de registos de dados de chamada por hora.
    • Percentagem de probabilidade de fraude, que corresponde à frequência com que a aplicação deve simular uma chamada fraudulenta. O valor 0,2 significa que cerca de 20% dos registos de chamadas parecem fraudulentos.
    • Duração em horas, que corresponde ao número de horas que a aplicação deve ser executada. Você também pode parar o aplicativo a qualquer momento, encerrando o processo (Ctrl+C) na linha de comando.

    Após alguns segundos, a aplicação começa a apresentar registos de chamadas telefónicas no ecrã, à medida que os envia para o hub de eventos. Os dados das chamadas telefónicas contêm os seguintes campos:

    Registo Definição
    CallrecTime O carimbo de data/hora do início da chamada.
    SwitchNum A central telefónica utilizada para estabelecer a chamada. Neste exemplo, as opções são cadeias de caracteres que representam o país/região de origem (EUA, China, Reino Unido, Alemanha ou Austrália).
    CallingNum O número de telefone do chamador.
    CallingIMSI A Identidade Internacional de Assinante Móvel (IMSI). É um identificador exclusivo do chamador.
    CalledNum O número de telefone do destinatário da chamada.
    CalledIMSI Identidade Internacional de Assinante Móvel (IMSI). É um identificador exclusivo do destinatário da chama.

Criar uma tarefa do Stream Analytics

Agora que tem um fluxo de eventos de chamada, pode criar uma tarefa do Stream Analytics que lê os dados do hub de eventos.

  1. Para criar uma tarefa do Stream Analytics, navegue até ao portal do Azure.

  2. Selecione Todos os serviços no menu esquerdo, procure Stream Analytics jobs, passe com o cursor do rato sobre o tile de Stream Analytics jobs e depois selecione + ou selecione Criar na janela de pop-up.

    Captura de ecrã a mostrar como encontrar Stream Analytics no portal Azure.

  3. Na página do trabalho New Stream Analytics, siga estas etapas:

    1. Em Assinatura, selecione a assinatura que contém o namespace Hubs de Eventos.

    2. Em Grupo de recursos, selecione o grupo de recursos criado anteriormente.

    3. Na seção Detalhes da instância, para Nome, insira um nome exclusivo para o trabalho do Stream Analytics.

    4. Em Região, selecione a região na qual você deseja criar o trabalho do Stream Analytics. Recomendamos que você coloque o trabalho e o hub de eventos na mesma região para obter o melhor desempenho e para não pagar para transferir dados entre regiões.

    5. Em Ambiente de hospedagem< , selecione Nuvem se ainda não estiver selecionado. As tarefas do Stream Analytics podem ser implementadas na cloud ou no Edge. A nuvem permite que você implante na Nuvem do Azure e o Edge permite que você implante em um dispositivo IoT Edge.

    6. Em Unidades de streaming, selecione 1. As unidades de transmissão em fluxo representam os recursos informáticos que são necessários para executar uma tarefa. Por predefinição, este valor está definido como 1. Para saber mais sobre o dimensionamento de unidades de transmissão em fluxo, veja o artigo Compreender e ajustar as unidades de transmissão em fluxo.

    7. Selecione Rever + criar na parte inferior da página.

      Captura de tela que mostra a página de trabalho Criar o Azure Stream Analytics.

  4. Na página Rever + criar, reveja as definições e, em seguida, selecione Criar para criar a tarefa do Stream Analytics.

  5. Depois que a tarefa for implantada, selecione Ir para o recurso para navegar até a página da tarefa do Stream Analytics.

Configurar a entrada da tarefa

A próxima etapa consiste em definir uma origem de entrada para que a tarefa leia dados através do hub de eventos que criou na seção anterior.

  1. Na página de trabalho do Stream Analytics, na seção Topologia de Trabalho no menu à esquerda, selecione Entradas.

  2. Na página Entradas, selecione + Adicionar entrada e Hub de eventos.

    Captura de tela mostrando a página Entrada para um trabalho do Stream Analytics.

  3. Na página Hub de Eventos, siga estas etapas:

    1. Em Alias de entrada, digite CallStream. Alias de entrada é um nome amigável para identificar sua entrada. O alias de entrada só pode conter caracteres alfanuméricos e hífenes e deve ter de 3 a 63 caracteres.

    2. Em Assinatura, selecione a assinatura do Azure onde você criou o hub de eventos. O hub de eventos pode estar na mesma subscrição ou numa subscrição diferente da tarefa do Stream Analytics.

    3. Para namespace Hubs de Eventos, selecione o namespace Hubs de Eventos que você criou na seção anterior. Todos os namespaces disponíveis na sua subscrição atual estão listados no menu suspenso.

    4. Em Nome do hub de eventos, selecione o hub de eventos criado na seção anterior. Todos os hubs de eventos disponíveis no namespace selecionado são listados na lista suspensa.

    5. Para o grupo de consumidores do hub de eventos, mantenha a opção Criar novo selecionada para que um novo grupo de consumidores seja criado no hub de eventos. Recomendamos que você use um grupo de consumidores distinto para cada trabalho do Stream Analytics. Se nenhum grupo de consumidores for especificado, o trabalho do Stream Analytics usará o grupo de $Default consumidores. Quando um trabalho contém uma associação automática ou tem várias entradas, algumas entradas posteriores podem ser lidas por mais de um leitor. Esta situação afeta o número de leitores num único grupo de consumidores.

    6. Em Modo de autenticação, selecione Cadeia de conexão. É mais fácil testar o tutorial com esta opção.

    7. Para o nome da política do hub de eventos, selecione Usar existente e depois selecione a política predefinida: RootManageSharedAccessKey.

    8. Selecione Salvar na parte inferior da página.

      Captura de tela mostrando a página de configuração dos Hubs de Eventos para uma entrada.

Criar uma instância do Cache do Azure para Redis

  1. Crie um cache no Cache do Azure para Redis usando as etapas descritas em Criar uma instância do Cache do Azure para Redis.

  2. Depois de criar a cache em Definições, selecione Chaves de Acesso. Anote a cadeia de ligação primária.

    Captura de tela mostrando a seleção do item de menu Chave de Acesso.

Crie uma função no Funções do Azure que escreva dados no Cache do Azure para Redis

  1. Veja a secção Criar uma aplicação de funções na documentação das Funções. Este exemplo utiliza:

  2. Crie um aplicativo de função HttpTrigger padrão no Visual Studio Code seguindo este tutorial. Use a seguinte informação: linguagem: C#, runtime: .NET 6 (na função v4), template: HTTP trigger.

  3. Instale a biblioteca de cliente Redis executando o seguinte comando em um terminal localizado na pasta do projeto:

    dotnet add package StackExchange.Redis --version 2.2.88
    
  4. Adicione os itens RedisConnectionString e RedisDatabaseIndex na secção Values do seu local.settings.json, preenchendo a string de conexão do servidor de destino.

    {
        "IsEncrypted": false,
        "Values": {
            "AzureWebJobsStorage": "",
            "FUNCTIONS_WORKER_RUNTIME": "dotnet",
            "RedisConnectionString": "Your Redis Connection String",
            "RedisDatabaseIndex":"0"
        }
    }
    

    O Índice do Banco de Dados Redis é o número de 0 a 15 que identifica o banco de dados na instância.

  5. Substitua toda a função (.cs ficheiro no projeto) pelo seguinte excerto de código. Atualize o namespace, o nome da classe e o nome da função com os seus próprios:

    using System;
    using System.IO;
    using System.Threading.Tasks;
    using Microsoft.AspNetCore.Mvc;
    using Microsoft.Azure.WebJobs;
    using Microsoft.Azure.WebJobs.Extensions.Http;
    using Microsoft.AspNetCore.Http;
    using Microsoft.Extensions.Logging;
    using Newtonsoft.Json;
    
    using StackExchange.Redis;
    
    namespace Company.Function
    {
        public static class HttpTrigger1{
            [FunctionName("HttpTrigger1")]
            public static async Task<IActionResult> Run(
                [HttpTrigger(AuthorizationLevel.Function, "get","post", Route = null)] HttpRequest req,
                ILogger log)
            {
                // Extract the body from the request
                string requestBody = await new StreamReader(req.Body).ReadToEndAsync();
                if (string.IsNullOrEmpty(requestBody)) {return new StatusCodeResult(204);} // 204, ASA connectivity check
    
                dynamic data = JsonConvert.DeserializeObject(requestBody);
    
                // Reject if too large, as per the doc
                if (data.ToString().Length > 262144) {return new StatusCodeResult(413);} //HttpStatusCode.RequestEntityTooLarge
    
                string RedisConnectionString = Environment.GetEnvironmentVariable("RedisConnectionString");
                int RedisDatabaseIndex = int.Parse(Environment.GetEnvironmentVariable("RedisDatabaseIndex"));
    
                using (var connection = ConnectionMultiplexer.Connect(RedisConnectionString))
                {
                    // Connection refers to a property that returns a ConnectionMultiplexer
                    IDatabase db = connection.GetDatabase(RedisDatabaseIndex);
    
                    // Parse items and send to binding
                    for (var i = 0; i < data.Count; i++)
                    {
                        string key = data[i].Time + " - " + data[i].CallingNum1;
    
                        db.StringSet(key, data[i].ToString());
                        log.LogInformation($"Object put in database. Key is {key} and value is {data[i].ToString()}");
    
                        // Simple get of data types from the cache
                        string value = db.StringGet(key);
                        log.LogInformation($"Database got: {key} => {value}");
    
                    }
                }
                return new OkResult(); // 200
            }
        }
    }
    

    Quando o Stream Analytics recebe a exceção "HTTP Request Entity Too Large" da função, ele reduz o tamanho dos lotes que envia para as funções. O código a seguir garante que o Stream Analytics não envie lotes superdimensionados. Certifique-se de que os valores de contagem e tamanho de lote máximo utilizados na função são consistentes com os valores introduzidos no portal do Stream Analytics.

  6. Publicar a função para Azure.

  7. Abra a função no portal do Azure e defina as configurações do aplicativo para RedisConnectionString e RedisDatabaseIndex.

Atualizar a tarefa do Stream Analytics com a função como saída

  1. Abra a sua tarefa de Stream Analytics no portal do Azure.

  2. Navegue para a sua função e selecione Descrição Geral>Saídas>Adicionar. Para adicionar uma nova saída, selecione Função do Azure como opção de destino. O adaptador de saída Functions tem as seguintes propriedades:

    Nome da propriedade Descrição
    Alias de saída Um nome amigável que utiliza na consulta da tarefa para referenciar o output.
    Opção de importação Pode utilizar a função da subscrição atual ou fornecer as definições manualmente se a função estiver localizada noutra subscrição.
    Function App Nome da sua aplicação de Funções.
    Função Nome da função na sua Aplicação de funções (nome da sua função run.csx).
    Tamanho Máximo de Lote Define o tamanho máximo para cada lote de saída, que é enviado para sua função em bytes. Por padrão, esse valor é definido como 262.144 bytes (256 KB).
    Contagem Máxima de Lotes Especifica o número máximo de eventos em cada lote que é enviado para a função. O valor predefinido é 100. Esta propriedade é opcional.
    Chave Permite-lhe utilizar uma função a partir de outra subscrição. Indique o valor da chave para aceder à sua função. Esta propriedade é opcional.
  3. Indique um nome para o alias de saída. Neste tutorial, ele é chamado saop1, mas você pode usar qualquer nome de sua escolha. Preencha outros detalhes.

  4. Abra a sua tarefa do Stream Analytics e atualize a consulta para o seguinte.

    Importante

    O script de exemplo a seguir pressupõe que você usou CallStream para o nome de entrada e saop1 para o nome de saída. Se usou nomes diferentes, atualize a consulta.

     SELECT
             System.Timestamp as Time, CS1.CallingIMSI, CS1.CallingNum as CallingNum1,
             CS2.CallingNum as CallingNum2, CS1.SwitchNum as Switch1, CS2.SwitchNum as Switch2
         INTO saop1
         FROM CallStream CS1 TIMESTAMP BY CallRecTime
            JOIN CallStream CS2 TIMESTAMP BY CallRecTime
             ON CS1.CallingIMSI = CS2.CallingIMSI AND DATEDIFF(ss, CS1, CS2) BETWEEN 1 AND 5
         WHERE CS1.SwitchNum != CS2.SwitchNum
    
  5. Inicie o aplicativo telcodatagen.exe executando o seguinte comando na linha de comando. O comando usa o formato telcodatagen.exe [#NumCDRsPerHour] [SIM Card Fraud Probability] [#DurationHours].

    telcodatagen.exe 1000 0.2 2
    
  6. Inicie a tarefa do Stream Analytics.

  7. Na página Monitor da sua função do Azure, você vê que a função é invocada.

    Captura de ecrã a mostrar a página Monitor do Funções do Azure com invocações de funções.

  8. Na página Azure Cache para Redis, selecione Métricas no menu à esquerda, adicione a métrica Cache Write e defina a duração como última hora. Você vê o gráfico semelhante à imagem a seguir.

    Captura de ecrã a mostrar a página de Métricas para a sua Azure Cache para Redis.

Verifique no Azure Cache para Redis os resultados

Obtenha a chave nos registos do Funções do Azure

Primeiro, obtenha a chave de um registro inserido no Cache do Azure para Redis. No código, a chave é calculada na função do Azure, conforme mostrado no seguinte trecho de código:

string key = data[i].Time + " - " + data[i].CallingNum1;

db.StringSet(key, data[i].ToString());
log.LogInformation($"Object put in database. Key is {key} and value is {data[i].ToString()}");
  1. Navegue até o portal do Azure e localize seu aplicativo Funções do Azure.

  2. Selecione Funções no menu à esquerda.

  3. Selecione HTTPTrigger1 na lista de funções.

  4. Selecione Monitor no menu à esquerda.

  5. Mude para o separador Logs.

  6. Anote uma chave da mensagem informativa, conforme mostrado na captura de tela a seguir. Use essa chave para localizar o valor no Cache do Azure para Redis.

    Captura de ecrã a mostrar a página Registos do Monitor para a função do Azure.

Use a chave para localizar o registro no Cache Redis do Azure

  1. Navegue até o portal do Azure e localize seu Cache do Azure para Redis. Selecione Consola.

  2. Use os comandos do Cache do Azure para Redis para verificar se os seus dados estão no Cache do Azure para Redis. (O comando usa o formato Get {key}.) Use a chave copiada dos logs do Monitor para a função do Azure (na seção anterior).

    Obtenha "KEY-FROM-THE-PREVIOUS-SECTION"

    Este comando imprime o valor da chave especificada:

    Captura de ecrã mostrando o console do Cache Redis exibindo a saída do comando Get.

Processamento de erros e tentativas

Se ocorrer uma falha ao enviar eventos para o Funções do Azure, o Stream Analytics tentará novamente a maioria das operações. Tenta novamente todas as exceções HTTP até sucesso, exceto o erro HTTP 413 (entidade demasiado grande). Um erro de entidade excessivamente grande é tratado como um erro de dados sujeito à política de repetição ou eliminação.

Nota

O tempo limite para solicitações HTTP do Stream Analytics para o Funções do Azure é definido como 100 segundos. Se a sua aplicação Funções do Azure demorar mais de 100 segundos a processar um lote, o Stream Analytics devolve um erro e tenta novamente o lote.

Repetir os tempos limite pode resultar em eventos duplicados gravados no coletor de saída. Quando o Stream Analytics tenta novamente um lote falhado, tenta novamente todos os eventos do lote. Por exemplo, considere um lote de 20 eventos que a Stream Analytics envia para o Funções do Azure. Suponha que o Funções do Azure leva 100 segundos para processar os primeiros 10 eventos nesse lote. Após 100 segundos, a Stream Analytics suspende o pedido porque não recebeu resposta positiva do Funções do Azure, e envia outro pedido para o mesmo lote. O Funções do Azure processa novamente os primeiros 10 eventos do lote, o que causa um duplicado.

Problemas conhecidos

No portal do Azure, quando tenta redefinir o valor de Max Batch Size ou Max Batch Count para vazio (valor por defeito), ao guardar, o valor altera-se de volta para o valor previamente introduzido. Introduza manualmente os valores predefinidos para estes campos neste caso.

Stream Analytics não suporta atualmente o uso do encaminhamento HTTP nas suas funções Azure.

O suporte para se conectar ao Funções do Azure hospedado em uma rede virtual não está habilitado.

Limpar recursos

Quando já não precisares dos recursos, elimina o grupo de recursos, o trabalho de streaming e todos os recursos relacionados. Eliminar o trabalho impede a faturação das unidades de streaming que o trabalho consome. Se planeias usar o trabalho no futuro, podes pará-lo e recomeçar mais tarde, quando precisares. Se você não vai continuar a usar esse trabalho, exclua todos os recursos criados por este início rápido usando as seguintes etapas:

  1. No menu do lado esquerdo no portal do Azure, selecione Grupos de recursos e, em seguida, selecione o nome do recurso que criou.
  2. Na página do grupo de recursos, selecione Eliminar, escreva o nome do recurso a eliminar na caixa de texto e, em seguida, selecione Eliminar.

Próximos passos

Neste tutorial, criaste um trabalho simples de Stream Analytics que executa uma Função Azure. Para saber mais sobre tarefas do Stream Analytics, avance para o próximo tutorial: