Monitorar e observar o Carregador Automático

Os pipelines do Carregador Automático exigem monitoramento ativo para detectar problemas como acúmulo crescente de pendências, descompasso de esquema, dados corrompidos e fluxos interrompidos antes que eles afetem os consumidores posteriores. Esta página descreve como monitorar as principais métricas, consultar o estado no nível do arquivo, criar painéis de observabilidade e solucionar problemas comuns.

Para obter detalhes de configuração de produção, consulte Configurar o Carregador Automático para cargas de trabalho de produção. Para obter as práticas recomendadas de configuração, consulte as práticas recomendadas do Carregador Automático.

Pré-requisitos

Vários fluxos de trabalho de monitoramento nesta página dependem de cloud_files_state() para observar o estado da ingestão por arquivo — incluindo consultas de pendências, cálculos de latência e detecção de descompassos de esquema. cloud_files_state() é uma função com valor de tabela que retorna o estado de ingestão no nível do arquivo para um ponto de verificação do Carregador Automático. Nem todos os campos estão disponíveis por padrão. A disponibilidade depende da sua versão e configuração do Databricks Runtime:

  • Databricks Runtime 18.2 e superior: discovery_time, processed_timee commit_time estão disponíveis automaticamente. No Databricks Runtime 16.4–18.1, esses campos só estarão disponíveis quando cloudFiles.cleanSource estiverem habilitados.
  • Databricks Runtime 16.4 ou superior, com cloudFiles.cleanSource habilitado: archive_time, archive_mode e move_location estão disponíveis.

Habilitar cloudFiles.cleanSource gera certa sobrecarga de desempenho. Compare com suas cargas de trabalho em um ambiente de pré-produção antes de habilitá-lo em um produção.

Additionally:

  • Anote os dados ingeridos com a coluna _metadata. Capturar no mínimo file_path e file_modification_time. Confira Coluna de metadados de arquivo.
  • Habilitar _rescued_data e _corrupt_record colunas.

Principais métricas do Carregador Automático

A tabela a seguir resume as métricas mais importantes para monitorar os pipelines do Auto Loader. Essas métricas estão disponíveis a partir dos eventos de progresso StreamingQueryListener, com valores específicos do Carregador Automático expostos no mapa metrics de cada origem.

Metric O que isso te diz
numFilesOutstanding Número de arquivos na lista de pendências aguardando para serem processados
numBytesOutstanding Tamanho da fila de arquivos pendentes em bytes
approximateQueueSize Profundidade da fila na nuvem (somente no modo de notificação de arquivo)
numInputRows Linhas processadas por lote
inputRowsPerSecond Taxa de chegada de dados
processedRowsPerSecond Taxa de processamento
durationMs Detalhamento Onde o tempo é empregado em cada lote

O que observar

Os padrões a seguir indicam que seu pipeline pode precisar de atenção.

  • Crescimento numFilesOutstanding: a lista de pendências está se acumulando. Seu pipeline não está conseguindo acompanhar os dados de entrada.
  • processedRowsPerSecond < inputRowsPerSecond: O pipeline está processando os dados com mais lentidão do que eles chegam.
  • Grande durationMs.latestOffset: a descoberta de arquivos é lenta. Considere mudar para eventos de arquivo.
  • Grande durationMs.addBatch: o processamento de dados é lento. Considere dimensionar a computação ou otimizar transformações.

Para obter a referência completa de métricas, consulte as métricas de origem do Carregador Automático.

Consultar o estado em nível de arquivo com cloud_files_state

A cloud_files_state() função com valor de tabela fornece informações detalhadas sobre cada arquivo descoberto pelo Carregador Automático. Os campos a seguir estão disponíveis. Campos marcados como exigindo o Databricks Runtime 16.4 ou superior, ou 18.2 ou superior, só são preenchidos nas condições descritas em Pré-requisitos.

Campo Tipo Description
path STRING O caminho do arquivo
size BIGINT O tamanho do arquivo em bytes
create_time TIMESTAMP Quando o arquivo foi criado
discovery_time TIMESTAMP Quando o Carregador Automático descobriu o arquivo (Databricks Runtime 16.4 e superior)
processed_time TIMESTAMP Quando o Carregador Automático processou o arquivo (Databricks Runtime 16.4 e superior)
commit_time TIMESTAMP Quando o arquivo foi comprometido no ponto de verificação (Databricks Runtime 16.4 e superior)
archive_time TIMESTAMP Quando o arquivo foi arquivado (requer cloudFiles.cleanSource)
archive_mode STRING MOVE, DELETE, ou NULL (requer cloudFiles.cleanSource)
move_location STRING Caminho de destino quando cloudFiles.cleanSource é MOVE
ingestion_state STRING Estado de ingestão de arquivo atual

Investigar o estado da ingestão de arquivos

As consultas a seguir abrangem cenários comuns de diagnóstico.

Localizar todos os arquivos não processados (a lista de pendências atual):

SELECT * FROM cloud_files_state('path/to/checkpoint')
WHERE ingestion_state != 'COMMITTED';

Calcule a latência média de ingestão (tempo entre a criação do arquivo e a gravação):

SELECT avg(unix_timestamp(commit_time) - unix_timestamp(create_time)) AS avg_latency_seconds
FROM cloud_files_state('path/to/checkpoint')
WHERE commit_time IS NOT NULL AND create_time IS NOT NULL;

Localizar arquivos corrompidos ou omitidos:

SELECT path, ingestion_state, size, create_time
FROM cloud_files_state('path/to/checkpoint')
WHERE ingestion_state LIKE 'SKIPPED%';

Acompanhar o progresso do arquivamento (requer cloudFiles.cleanSource):

SELECT archive_mode, count(*) AS file_count
FROM cloud_files_state('path/to/checkpoint')
GROUP BY archive_mode;

Encontre arquivos com alta latência de descoberta para confirmação para identificar gargalos:

SELECT
  path,
  size,
  unix_timestamp(commit_time) - unix_timestamp(discovery_time) AS processing_latency_seconds,
  unix_timestamp(commit_time) - unix_timestamp(create_time) AS end_to_end_latency_seconds
FROM cloud_files_state('path/to/checkpoint')
WHERE commit_time IS NOT NULL
ORDER BY end_to_end_latency_seconds DESC
LIMIT 20;

Para obter a referência completa de SQL, consulte cloud_files_statefunção com valor de tabela.

Monitore o Carregador Automático nos Pipelines Declarativos do Lakeflow Spark

O Databricks recomenda o uso de Pipelines Declarativos do Lakeflow Spark para a produção de pipelines do Carregador Automático. Para aproveitar seus recursos de monitoramento internos:

  • Armazene o log de eventos do Lakeflow Spark Declarative Pipelines em uma tabela Delta para que ele possa ser consultado quanto a dados de observabilidade. Configure isso por meio das configurações avançadas do pipeline ou da API. Para obter detalhes, consulte o log de eventos do Pipeline.

  • Estruture seu pipeline para observabilidade. Um pipeline do Carregador Automático bem estruturado nos Pipelines Declarativos do Lakeflow Spark inclui uma exibição {table}_source (a definição da origem do Carregador Automático), uma {table}_bronze tabela de fluxo (ingestão de dados brutos com colunas _rescued_data e _corrupt_record), uma corrupt_records_sink que coloca linhas com dados que não podem ser analisados em quarentena e uma {table} exibição limpa para consumo posterior.

  • Configure as expectativas em suas tabelas bronze de fluxo para monitorar desvios de esquema e corrupção de dados. _rescued_data IS NULL detecta alterações inesperadas de esquema e _corrupt_record IS NULL detecta dados nãoparáveis. O Lakeflow Spark Declarative Pipelines avalia essas expectativas à medida que os dados chegam e gera um rastro de observabilidade. Você pode configurar as expectativas para emitir avisos, remover linhas ou fazer o pipeline falhar.

Depois de criar a visualização do pipeline event_log_raw, use as seguintes consultas para métricas específicas do Carregador Automático.

Monitore a taxa de ingestão por fluxo:

SELECT
  origin.flow_name,
  origin.update_id,
  timestamp,
  TRY_CAST(details:flow_progress.metrics.num_output_rows AS BIGINT) AS rows_written
FROM event_log_raw
WHERE event_type = 'flow_progress'
ORDER BY timestamp DESC;

Monitoramento do backlog de dados por fluxo:

SELECT
  origin.flow_name,
  timestamp,
  DOUBLE(details:flow_progress.metrics.backlog_bytes) AS backlog_bytes
FROM event_log_raw
WHERE event_type = 'flow_progress'
  AND details:flow_progress.metrics.backlog_bytes IS NOT NULL
ORDER BY timestamp DESC;

Resuma as violações das expectativas para detectar desvio de esquema e dados corrompidos:

SELECT
  origin.flow_name,
  explode(from_json(
    details:flow_progress.data_quality.expectations,
    'array<struct<name:string, dataset:string, passed_records:bigint, failed_records:bigint>>'
  )) AS expectation
FROM event_log_raw
WHERE event_type = 'flow_progress'
  AND details:flow_progress.data_quality.expectations IS NOT NULL;

Para obter diretrizes gerais de monitoramento dos Pipelines Declarativos do Lakeflow Spark, consulte Monitorar pipelines e Log de eventos de pipeline.

Monitorar o Carregador Automático com Fluxo Estruturado

Ao executar o Carregador Automático fora dos Pipelines Declarativos do Lakeflow Spark, use as seguintes abordagens de monitoramento do Fluxo Estruturado.

  • Implemente um StreamingQueryListener para capturar métricas específicas do Carregador Automático em cada lote por meio da leitura de source.metrics.
from pyspark.sql.streaming import StreamingQueryListener

class AutoLoaderMonitor(StreamingQueryListener):
    def onQueryStarted(self, event):
        pass

    def onQueryProgress(self, event):
        for source in event.progress.sources:
            if "CloudFilesSource" in source.description:
                metrics = source.metrics
                files_outstanding = metrics.get("numFilesOutstanding", "0")
                bytes_outstanding = metrics.get("numBytesOutstanding", "0")
                rows_per_sec = source.processedRowsPerSecond
                # Push metrics to your monitoring system (for example, write to a Delta table)

    def onQueryIdle(self, event):
        pass

    def onQueryTerminated(self, event):
        pass

spark.streams.addListener(AutoLoaderMonitor())

Observação

A lógica de processamento nos ouvintes pode tornar mais lento o processamento de consultas. Limite a computação nos callbacks de ouvinte e evite gravações síncronas em sistemas externos nesses callbacks; em vez disso, emita telemetria enxuta de forma assíncrona ou encaminhe as métricas para uma tarefa separada para persistência.

  • Use numInputRows, inputRowsPerSecond e processedRowsPerSecond a partir do progresso de origem para calcular a taxa de transferência — arquivos por segundo e linhas por segundo para cada lote.

  • Para calcular a latência de ingestão, compare create_time e commit_time de cloud_files_state() para obter a latência ponta a ponta. Para a latência de processamento, use o durationMs detalhamento (por exemplo, latestOffset, addBatch e outras fases de lote relatadas) para identificar qual estágio está o gargalo.

  • Use df.observe() para definir métricas de qualidade de dados embutidas diretamente no DataFrame de streaming. As métricas estão visíveis nos eventos de progresso StreamingQueryListener em observedMetrics.

from pyspark.sql.functions import count, lit, col

observed_df = df.observe(
    "auto_loader_quality",
    count(lit(1)).alias("total_rows"),
    count(col("_rescued_data")).alias("rescued_rows"),
    count(col("_corrupt_record")).alias("corrupt_rows")
)
  • Use .queryName() para atribuir um nome exclusivo a cada fluxo, facilitando a distinção de fluxos do Carregador Automático na guia Streaming de Interface do Usuário do Spark e nos painéis de monitoramento.

Para obter a referência completa de monitoramento de Streaming Estruturado, consulte Monitoramento de consultas de Streaming Estruturado em Azure Databricks.

Criar um painel de observabilidade

Combine dados de várias fontes para criar um painel abrangente de observabilidade para seus pipelines do Auto Loader. Esta tabela exibe algumas fontes sugeridas que você pode usar para estruturar seu painel de observabilidade.

Fonte de dados Dados de observabilidade
cloud_files_state() Estado de ingestão no nível do arquivo: descoberta, processamento, comprometimento e carimbos de data/hora do arquivamento por arquivo
Log de eventos do Lakeflow Spark Declarative Pipelines Histórico de execução de pipeline, métricas de fluxo por lote e resultados de expectativa de qualidade de dados
Tabelas de saída do pipeline Contagem de linhas e volume de dados gravados por tabela ingerida

Em seguida, você pode agregar dados de observabilidade em tabelas dedicadas que servem como base para dashboards e alertas:

  • Resuma os status das execuções do pipeline (sucesso ou falha) ao longo do tempo, com base em eventos de event_type = 'update_progress'.
  • Métricas agregadas de ingestão de arquivos (tamanho da lista de pendências, vazão, latência por lote), derivadas de eventos cloud_files_state() e event_type = 'flow_progress'.
  • Gere estatísticas de tabelas usando contagens de linhas e volume de dados por tabela, com base em num_output_rows no log de eventos.
  • Colete informações de depuração dos logs de erro detalhados e das violações de expectativa a cada atualização, com base em eventos event_type = 'flow_progress' com data_quality preenchido.

Essas tabelas agregadas podem alimentar um painel de IA/BI e alertas SQL. Os painéis recomendados incluem linha do tempo do status das execuções do pipeline, tendência do backlog de ingestão, tendência da taxa de transferência, distribuição da latência de ingestão, métricas de qualidade de dados, eventos de evolução de esquema e status de arquivamento dos arquivos.

Monitorar eventos de evolução do esquema

Use as abordagens a seguir para detectar alterações de esquema conforme elas ocorrem.

  • Valores não nulos em _rescued_data nas contagens de violação de expectativa indicam descompasso de esquema. Consulte o log de eventos de failed_records > 0 na expectativa no rescued data.
  • Alterações no _schemas diretório dentro do configurado cloudFiles.schemaLocation (ou dentro do ponto de verificação somente quando o local do esquema não está definido separadamente) indicam que a evolução do esquema ocorreu. Você pode verificar esse diretório a partir de uma tarefa de monitoramento separada.
  • Não considere um evento onQueryTerminated seguido por onQueryStarted para o mesmo nome de fluxo como evidência suficiente, por si só, de evolução de esquema. Os fluxos são reiniciados por muitos motivos (reinicializações de cluster, implantações de código, erros transitórios de armazenamento). Correlacione reinicializações com sinais independentes — _schemas mudanças no diretório ou _rescued_data violações de expectativas — antes de concluir que houve evolução do esquema.
  • Use _metadata.file_path para identificar quais arquivos introduziram alterações de esquema. Associe isso a cloud_files_state() no campo path para correlacionar alterações no esquema com arquivos e lotes específicos.

Use esta consulta de exemplo para detectar desvio recente de esquema por meio de violações de expectativas:

SELECT
  timestamp,
  origin.flow_name,
  exp.name AS expectation_name,
  exp.failed_records
FROM (
  SELECT
    timestamp,
    origin,
    explode(from_json(
      details:flow_progress.data_quality.expectations,
      'array<struct<name:string, dataset:string, passed_records:bigint, failed_records:bigint>>'
    )) AS exp
  FROM event_log_raw
  WHERE event_type = 'flow_progress'
    AND details:flow_progress.data_quality.expectations IS NOT NULL
)
WHERE exp.name = '<rescued-data expectation name>'
  AND exp.failed_records > 0
ORDER BY timestamp DESC;

Configurar alertas para problemas comuns

Use alertas do DATAbricks SQL ou notificações de pipeline para detectar problemas antes que eles afetem os consumidores downstream.

O SQL a seguir detecta uma lista de pendências crescente e pode ser usado como base para um alerta sql do Databricks. Agende-o para ser executado periodicamente (por exemplo, a cada 5 minutos) e alerte quando o resultado não estiver vazio.

-- Alert when backlog exceeds threshold or trends upward across recent batches
WITH recent_backlog AS (
  SELECT
    origin.flow_name,
    timestamp,
    DOUBLE(details:flow_progress.metrics.backlog_bytes) AS backlog_bytes,
    ROW_NUMBER() OVER (PARTITION BY origin.flow_name ORDER BY timestamp DESC) AS rn
  FROM event_log_raw
  WHERE event_type = 'flow_progress'
    AND details:flow_progress.metrics.backlog_bytes IS NOT NULL
)
SELECT flow_name, backlog_bytes, timestamp
FROM recent_backlog
WHERE rn = 1
  AND backlog_bytes > 1073741824  -- alert when backlog exceeds 1 GB

A tabela a seguir resume as condições de alerta recomendadas:

O que detectar Como detectá-lo Quando alertar
Pendências crescentes numFilesOutstanding tendência para cima Aumento sustentado em vários lotes
Fluxo parado Nenhum evento de progresso Nenhum evento para N minutos (com base no intervalo de gatilho esperado)
Alta latência de ingestão commit_time - create_time Ultrapassa o limiar do SLA
Degradação da qualidade dos dados Taxa de falha de expectativa Percentual crescente de linhas que não atendem às expectativas
Evento de evolução do esquema _rescued_data IS NOT NULL Qualquer valor não nulo na contagem de violações de expectativa
Descoberta lenta de arquivos durationMs.latestOffset Significativamente maior que a linha de base

Solucionar problemas comuns

A tabela a seguir descreve problemas comuns do pipeline do Auto Loader, suas causas prováveis e as ações recomendadas para resolvê-los.

Issue Causa possível Ação recomendada
Lista de pendências crescendo mais rápido do que o processamento Capacidade computacional insuficiente, distribuição desigual dos dados ou limites de taxa restringidos Dimensione a capacidade computacional, verifique se há desbalanceamento na Spark UI e revise as configurações de maxFilesPerTrigger para controlar o tamanho do lote
Arquivos que não estão sendo descobertos Eventos de arquivo configurados incorretamente, problemas de permissões ou fluxo não executado dentro de 7 dias Verifique as permissões de localização externas, verifique a configuração de eventos de arquivo na interface do usuário do Catálogo do Unity e verifique se o fluxo é executado pelo menos a cada 7 dias para evitar a expiração do estado do RocksDB
A inicialização da transmissão está demorando muito Download do estado de ponto de verificação grande (RocksDB) Atualize para o Databricks Runtime 15.3 e superior para carregamento de estado assíncrono, o que reduz o tempo de inicialização em cerca de 90%
Processamento de arquivo duplicado Configurações agressivas cloudFiles.maxFileAge ou corrupção do ponto de verificação Use um maxFileAge conservador (mínimo de mais de 90 dias), verifique a integridade do ponto de verificação e evite políticas de ciclo de vida no armazenamento do ponto de verificação
Evolução do esquema causando reinicializações no pipeline Alterações frequentes ou incompatíveis de esquema Analise schemaEvolutionMode, alterne para addNewColumnsWithTypeWidening para promoções de tipo ou use o tipo Variante para esquemas altamente dinâmicos
Dados corrompidos acumulados no coletor Problemas de qualidade de dados de origem Verifique se _corrupt_record há padrões no coletor de quarentena, analise a geração de dados de origem e considere adicionar a validação posterior
discovery_time e commit_time não preenchidos Em execução no Databricks Runtime anterior à 18.2 sem cleanSource Atualizar para o Databricks Runtime 18.2 e superior ou habilitar cloudFiles.cleanSource no Databricks Runtime 16.4–18.1

Para obter mais solução de problemas, consulte perguntas frequentes sobre o Carregador Automático.