Observação
O acesso a essa página exige autorização. Você pode tentar entrar ou alterar diretórios.
O acesso a essa página exige autorização. Você pode tentar alterar os diretórios.
Os pipelines do Carregador Automático exigem monitoramento ativo para detectar problemas como acúmulo crescente de pendências, descompasso de esquema, dados corrompidos e fluxos interrompidos antes que eles afetem os consumidores posteriores. Esta página descreve como monitorar as principais métricas, consultar o estado no nível do arquivo, criar painéis de observabilidade e solucionar problemas comuns.
Para obter detalhes de configuração de produção, consulte Configurar o Carregador Automático para cargas de trabalho de produção. Para obter as práticas recomendadas de configuração, consulte as práticas recomendadas do Carregador Automático.
Pré-requisitos
Vários fluxos de trabalho de monitoramento nesta página dependem de cloud_files_state() para observar o estado da ingestão por arquivo — incluindo consultas de pendências, cálculos de latência e detecção de descompassos de esquema.
cloud_files_state() é uma função com valor de tabela que retorna o estado de ingestão no nível do arquivo para um ponto de verificação do Carregador Automático. Nem todos os campos estão disponíveis por padrão. A disponibilidade depende da sua versão e configuração do Databricks Runtime:
-
Databricks Runtime 18.2 e superior:
discovery_time,processed_timeecommit_timeestão disponíveis automaticamente. No Databricks Runtime 16.4–18.1, esses campos só estarão disponíveis quandocloudFiles.cleanSourceestiverem habilitados. -
Databricks Runtime 16.4 ou superior, com
cloudFiles.cleanSourcehabilitado:archive_time,archive_modeemove_locationestão disponíveis.
Habilitar cloudFiles.cleanSource gera certa sobrecarga de desempenho. Compare com suas cargas de trabalho em um ambiente de pré-produção antes de habilitá-lo em um produção.
Additionally:
- Anote os dados ingeridos com a coluna
_metadata. Capturar no mínimofile_pathefile_modification_time. Confira Coluna de metadados de arquivo. - Habilitar
_rescued_datae_corrupt_recordcolunas.
Principais métricas do Carregador Automático
A tabela a seguir resume as métricas mais importantes para monitorar os pipelines do Auto Loader. Essas métricas estão disponíveis a partir dos eventos de progresso StreamingQueryListener, com valores específicos do Carregador Automático expostos no mapa metrics de cada origem.
| Metric | O que isso te diz |
|---|---|
numFilesOutstanding |
Número de arquivos na lista de pendências aguardando para serem processados |
numBytesOutstanding |
Tamanho da fila de arquivos pendentes em bytes |
approximateQueueSize |
Profundidade da fila na nuvem (somente no modo de notificação de arquivo) |
numInputRows |
Linhas processadas por lote |
inputRowsPerSecond |
Taxa de chegada de dados |
processedRowsPerSecond |
Taxa de processamento |
durationMs Detalhamento |
Onde o tempo é empregado em cada lote |
O que observar
Os padrões a seguir indicam que seu pipeline pode precisar de atenção.
-
Crescimento
numFilesOutstanding: a lista de pendências está se acumulando. Seu pipeline não está conseguindo acompanhar os dados de entrada. -
processedRowsPerSecond<inputRowsPerSecond: O pipeline está processando os dados com mais lentidão do que eles chegam. -
Grande
durationMs.latestOffset: a descoberta de arquivos é lenta. Considere mudar para eventos de arquivo. -
Grande
durationMs.addBatch: o processamento de dados é lento. Considere dimensionar a computação ou otimizar transformações.
Para obter a referência completa de métricas, consulte as métricas de origem do Carregador Automático.
Consultar o estado em nível de arquivo com cloud_files_state
A cloud_files_state() função com valor de tabela fornece informações detalhadas sobre cada arquivo descoberto pelo Carregador Automático. Os campos a seguir estão disponíveis. Campos marcados como exigindo o Databricks Runtime 16.4 ou superior, ou 18.2 ou superior, só são preenchidos nas condições descritas em Pré-requisitos.
| Campo | Tipo | Description |
|---|---|---|
path |
STRING |
O caminho do arquivo |
size |
BIGINT |
O tamanho do arquivo em bytes |
create_time |
TIMESTAMP |
Quando o arquivo foi criado |
discovery_time |
TIMESTAMP |
Quando o Carregador Automático descobriu o arquivo (Databricks Runtime 16.4 e superior) |
processed_time |
TIMESTAMP |
Quando o Carregador Automático processou o arquivo (Databricks Runtime 16.4 e superior) |
commit_time |
TIMESTAMP |
Quando o arquivo foi comprometido no ponto de verificação (Databricks Runtime 16.4 e superior) |
archive_time |
TIMESTAMP |
Quando o arquivo foi arquivado (requer cloudFiles.cleanSource) |
archive_mode |
STRING |
MOVE, DELETE, ou NULL (requer cloudFiles.cleanSource) |
move_location |
STRING |
Caminho de destino quando cloudFiles.cleanSource é MOVE |
ingestion_state |
STRING |
Estado de ingestão de arquivo atual |
Investigar o estado da ingestão de arquivos
As consultas a seguir abrangem cenários comuns de diagnóstico.
Localizar todos os arquivos não processados (a lista de pendências atual):
SELECT * FROM cloud_files_state('path/to/checkpoint')
WHERE ingestion_state != 'COMMITTED';
Calcule a latência média de ingestão (tempo entre a criação do arquivo e a gravação):
SELECT avg(unix_timestamp(commit_time) - unix_timestamp(create_time)) AS avg_latency_seconds
FROM cloud_files_state('path/to/checkpoint')
WHERE commit_time IS NOT NULL AND create_time IS NOT NULL;
Localizar arquivos corrompidos ou omitidos:
SELECT path, ingestion_state, size, create_time
FROM cloud_files_state('path/to/checkpoint')
WHERE ingestion_state LIKE 'SKIPPED%';
Acompanhar o progresso do arquivamento (requer cloudFiles.cleanSource):
SELECT archive_mode, count(*) AS file_count
FROM cloud_files_state('path/to/checkpoint')
GROUP BY archive_mode;
Encontre arquivos com alta latência de descoberta para confirmação para identificar gargalos:
SELECT
path,
size,
unix_timestamp(commit_time) - unix_timestamp(discovery_time) AS processing_latency_seconds,
unix_timestamp(commit_time) - unix_timestamp(create_time) AS end_to_end_latency_seconds
FROM cloud_files_state('path/to/checkpoint')
WHERE commit_time IS NOT NULL
ORDER BY end_to_end_latency_seconds DESC
LIMIT 20;
Para obter a referência completa de SQL, consulte cloud_files_statefunção com valor de tabela.
Monitore o Carregador Automático nos Pipelines Declarativos do Lakeflow Spark
O Databricks recomenda o uso de Pipelines Declarativos do Lakeflow Spark para a produção de pipelines do Carregador Automático. Para aproveitar seus recursos de monitoramento internos:
Armazene o log de eventos do Lakeflow Spark Declarative Pipelines em uma tabela Delta para que ele possa ser consultado quanto a dados de observabilidade. Configure isso por meio das configurações avançadas do pipeline ou da API. Para obter detalhes, consulte o log de eventos do Pipeline.
Estruture seu pipeline para observabilidade. Um pipeline do Carregador Automático bem estruturado nos Pipelines Declarativos do Lakeflow Spark inclui uma exibição
{table}_source(a definição da origem do Carregador Automático), uma{table}_bronzetabela de fluxo (ingestão de dados brutos com colunas_rescued_datae_corrupt_record), umacorrupt_records_sinkque coloca linhas com dados que não podem ser analisados em quarentena e uma{table}exibição limpa para consumo posterior.Configure as expectativas em suas tabelas bronze de fluxo para monitorar desvios de esquema e corrupção de dados.
_rescued_data IS NULLdetecta alterações inesperadas de esquema e_corrupt_record IS NULLdetecta dados nãoparáveis. O Lakeflow Spark Declarative Pipelines avalia essas expectativas à medida que os dados chegam e gera um rastro de observabilidade. Você pode configurar as expectativas para emitir avisos, remover linhas ou fazer o pipeline falhar.
Depois de criar a visualização do pipeline event_log_raw, use as seguintes consultas para métricas específicas do Carregador Automático.
Monitore a taxa de ingestão por fluxo:
SELECT
origin.flow_name,
origin.update_id,
timestamp,
TRY_CAST(details:flow_progress.metrics.num_output_rows AS BIGINT) AS rows_written
FROM event_log_raw
WHERE event_type = 'flow_progress'
ORDER BY timestamp DESC;
Monitoramento do backlog de dados por fluxo:
SELECT
origin.flow_name,
timestamp,
DOUBLE(details:flow_progress.metrics.backlog_bytes) AS backlog_bytes
FROM event_log_raw
WHERE event_type = 'flow_progress'
AND details:flow_progress.metrics.backlog_bytes IS NOT NULL
ORDER BY timestamp DESC;
Resuma as violações das expectativas para detectar desvio de esquema e dados corrompidos:
SELECT
origin.flow_name,
explode(from_json(
details:flow_progress.data_quality.expectations,
'array<struct<name:string, dataset:string, passed_records:bigint, failed_records:bigint>>'
)) AS expectation
FROM event_log_raw
WHERE event_type = 'flow_progress'
AND details:flow_progress.data_quality.expectations IS NOT NULL;
Para obter diretrizes gerais de monitoramento dos Pipelines Declarativos do Lakeflow Spark, consulte Monitorar pipelines e Log de eventos de pipeline.
Monitorar o Carregador Automático com Fluxo Estruturado
Ao executar o Carregador Automático fora dos Pipelines Declarativos do Lakeflow Spark, use as seguintes abordagens de monitoramento do Fluxo Estruturado.
- Implemente um
StreamingQueryListenerpara capturar métricas específicas do Carregador Automático em cada lote por meio da leitura desource.metrics.
from pyspark.sql.streaming import StreamingQueryListener
class AutoLoaderMonitor(StreamingQueryListener):
def onQueryStarted(self, event):
pass
def onQueryProgress(self, event):
for source in event.progress.sources:
if "CloudFilesSource" in source.description:
metrics = source.metrics
files_outstanding = metrics.get("numFilesOutstanding", "0")
bytes_outstanding = metrics.get("numBytesOutstanding", "0")
rows_per_sec = source.processedRowsPerSecond
# Push metrics to your monitoring system (for example, write to a Delta table)
def onQueryIdle(self, event):
pass
def onQueryTerminated(self, event):
pass
spark.streams.addListener(AutoLoaderMonitor())
Observação
A lógica de processamento nos ouvintes pode tornar mais lento o processamento de consultas. Limite a computação nos callbacks de ouvinte e evite gravações síncronas em sistemas externos nesses callbacks; em vez disso, emita telemetria enxuta de forma assíncrona ou encaminhe as métricas para uma tarefa separada para persistência.
Use
numInputRows,inputRowsPerSecondeprocessedRowsPerSeconda partir do progresso de origem para calcular a taxa de transferência — arquivos por segundo e linhas por segundo para cada lote.Para calcular a latência de ingestão, compare
create_timeecommit_timedecloud_files_state()para obter a latência ponta a ponta. Para a latência de processamento, use odurationMsdetalhamento (por exemplo,latestOffset,addBatche outras fases de lote relatadas) para identificar qual estágio está o gargalo.Use
df.observe()para definir métricas de qualidade de dados embutidas diretamente no DataFrame de streaming. As métricas estão visíveis nos eventos de progressoStreamingQueryListeneremobservedMetrics.
from pyspark.sql.functions import count, lit, col
observed_df = df.observe(
"auto_loader_quality",
count(lit(1)).alias("total_rows"),
count(col("_rescued_data")).alias("rescued_rows"),
count(col("_corrupt_record")).alias("corrupt_rows")
)
- Use
.queryName()para atribuir um nome exclusivo a cada fluxo, facilitando a distinção de fluxos do Carregador Automático na guia Streaming de Interface do Usuário do Spark e nos painéis de monitoramento.
Para obter a referência completa de monitoramento de Streaming Estruturado, consulte Monitoramento de consultas de Streaming Estruturado em Azure Databricks.
Criar um painel de observabilidade
Combine dados de várias fontes para criar um painel abrangente de observabilidade para seus pipelines do Auto Loader. Esta tabela exibe algumas fontes sugeridas que você pode usar para estruturar seu painel de observabilidade.
| Fonte de dados | Dados de observabilidade |
|---|---|
cloud_files_state() |
Estado de ingestão no nível do arquivo: descoberta, processamento, comprometimento e carimbos de data/hora do arquivamento por arquivo |
| Log de eventos do Lakeflow Spark Declarative Pipelines | Histórico de execução de pipeline, métricas de fluxo por lote e resultados de expectativa de qualidade de dados |
| Tabelas de saída do pipeline | Contagem de linhas e volume de dados gravados por tabela ingerida |
Em seguida, você pode agregar dados de observabilidade em tabelas dedicadas que servem como base para dashboards e alertas:
- Resuma os status das execuções do pipeline (sucesso ou falha) ao longo do tempo, com base em eventos de
event_type = 'update_progress'. - Métricas agregadas de ingestão de arquivos (tamanho da lista de pendências, vazão, latência por lote), derivadas de eventos
cloud_files_state()eevent_type = 'flow_progress'. - Gere estatísticas de tabelas usando contagens de linhas e volume de dados por tabela, com base em
num_output_rowsno log de eventos. - Colete informações de depuração dos logs de erro detalhados e das violações de expectativa a cada atualização, com base em eventos
event_type = 'flow_progress'comdata_qualitypreenchido.
Essas tabelas agregadas podem alimentar um painel de IA/BI e alertas SQL. Os painéis recomendados incluem linha do tempo do status das execuções do pipeline, tendência do backlog de ingestão, tendência da taxa de transferência, distribuição da latência de ingestão, métricas de qualidade de dados, eventos de evolução de esquema e status de arquivamento dos arquivos.
Monitorar eventos de evolução do esquema
Use as abordagens a seguir para detectar alterações de esquema conforme elas ocorrem.
- Valores não nulos em
_rescued_datanas contagens de violação de expectativa indicam descompasso de esquema. Consulte o log de eventos defailed_records > 0na expectativano rescued data. - Alterações no
_schemasdiretório dentro do configuradocloudFiles.schemaLocation(ou dentro do ponto de verificação somente quando o local do esquema não está definido separadamente) indicam que a evolução do esquema ocorreu. Você pode verificar esse diretório a partir de uma tarefa de monitoramento separada. - Não considere um evento
onQueryTerminatedseguido poronQueryStartedpara o mesmo nome de fluxo como evidência suficiente, por si só, de evolução de esquema. Os fluxos são reiniciados por muitos motivos (reinicializações de cluster, implantações de código, erros transitórios de armazenamento). Correlacione reinicializações com sinais independentes —_schemasmudanças no diretório ou_rescued_dataviolações de expectativas — antes de concluir que houve evolução do esquema. - Use
_metadata.file_pathpara identificar quais arquivos introduziram alterações de esquema. Associe isso acloud_files_state()no campopathpara correlacionar alterações no esquema com arquivos e lotes específicos.
Use esta consulta de exemplo para detectar desvio recente de esquema por meio de violações de expectativas:
SELECT
timestamp,
origin.flow_name,
exp.name AS expectation_name,
exp.failed_records
FROM (
SELECT
timestamp,
origin,
explode(from_json(
details:flow_progress.data_quality.expectations,
'array<struct<name:string, dataset:string, passed_records:bigint, failed_records:bigint>>'
)) AS exp
FROM event_log_raw
WHERE event_type = 'flow_progress'
AND details:flow_progress.data_quality.expectations IS NOT NULL
)
WHERE exp.name = '<rescued-data expectation name>'
AND exp.failed_records > 0
ORDER BY timestamp DESC;
Configurar alertas para problemas comuns
Use alertas do DATAbricks SQL ou notificações de pipeline para detectar problemas antes que eles afetem os consumidores downstream.
O SQL a seguir detecta uma lista de pendências crescente e pode ser usado como base para um alerta sql do Databricks. Agende-o para ser executado periodicamente (por exemplo, a cada 5 minutos) e alerte quando o resultado não estiver vazio.
-- Alert when backlog exceeds threshold or trends upward across recent batches
WITH recent_backlog AS (
SELECT
origin.flow_name,
timestamp,
DOUBLE(details:flow_progress.metrics.backlog_bytes) AS backlog_bytes,
ROW_NUMBER() OVER (PARTITION BY origin.flow_name ORDER BY timestamp DESC) AS rn
FROM event_log_raw
WHERE event_type = 'flow_progress'
AND details:flow_progress.metrics.backlog_bytes IS NOT NULL
)
SELECT flow_name, backlog_bytes, timestamp
FROM recent_backlog
WHERE rn = 1
AND backlog_bytes > 1073741824 -- alert when backlog exceeds 1 GB
A tabela a seguir resume as condições de alerta recomendadas:
| O que detectar | Como detectá-lo | Quando alertar |
|---|---|---|
| Pendências crescentes |
numFilesOutstanding tendência para cima |
Aumento sustentado em vários lotes |
| Fluxo parado | Nenhum evento de progresso | Nenhum evento para N minutos (com base no intervalo de gatilho esperado) |
| Alta latência de ingestão | commit_time - create_time |
Ultrapassa o limiar do SLA |
| Degradação da qualidade dos dados | Taxa de falha de expectativa | Percentual crescente de linhas que não atendem às expectativas |
| Evento de evolução do esquema | _rescued_data IS NOT NULL |
Qualquer valor não nulo na contagem de violações de expectativa |
| Descoberta lenta de arquivos | durationMs.latestOffset |
Significativamente maior que a linha de base |
Solucionar problemas comuns
A tabela a seguir descreve problemas comuns do pipeline do Auto Loader, suas causas prováveis e as ações recomendadas para resolvê-los.
| Issue | Causa possível | Ação recomendada |
|---|---|---|
| Lista de pendências crescendo mais rápido do que o processamento | Capacidade computacional insuficiente, distribuição desigual dos dados ou limites de taxa restringidos | Dimensione a capacidade computacional, verifique se há desbalanceamento na Spark UI e revise as configurações de maxFilesPerTrigger para controlar o tamanho do lote |
| Arquivos que não estão sendo descobertos | Eventos de arquivo configurados incorretamente, problemas de permissões ou fluxo não executado dentro de 7 dias | Verifique as permissões de localização externas, verifique a configuração de eventos de arquivo na interface do usuário do Catálogo do Unity e verifique se o fluxo é executado pelo menos a cada 7 dias para evitar a expiração do estado do RocksDB |
| A inicialização da transmissão está demorando muito | Download do estado de ponto de verificação grande (RocksDB) | Atualize para o Databricks Runtime 15.3 e superior para carregamento de estado assíncrono, o que reduz o tempo de inicialização em cerca de 90% |
| Processamento de arquivo duplicado | Configurações agressivas cloudFiles.maxFileAge ou corrupção do ponto de verificação |
Use um maxFileAge conservador (mínimo de mais de 90 dias), verifique a integridade do ponto de verificação e evite políticas de ciclo de vida no armazenamento do ponto de verificação |
| Evolução do esquema causando reinicializações no pipeline | Alterações frequentes ou incompatíveis de esquema | Analise schemaEvolutionMode, alterne para addNewColumnsWithTypeWidening para promoções de tipo ou use o tipo Variante para esquemas altamente dinâmicos |
| Dados corrompidos acumulados no coletor | Problemas de qualidade de dados de origem | Verifique se _corrupt_record há padrões no coletor de quarentena, analise a geração de dados de origem e considere adicionar a validação posterior |
discovery_time e commit_time não preenchidos |
Em execução no Databricks Runtime anterior à 18.2 sem cleanSource |
Atualizar para o Databricks Runtime 18.2 e superior ou habilitar cloudFiles.cleanSource no Databricks Runtime 16.4–18.1 |
Para obter mais solução de problemas, consulte perguntas frequentes sobre o Carregador Automático.