Überwachen und Beobachten des automatischen Ladevorgangs

Auto Loader-Pipelines erfordern eine aktive Überwachung, um Probleme wie wachsende Backlogs, Schemaabweichung, beschädigte Daten und blockierte Datenströme zu erkennen, bevor sie sich auf nachgeschaltete Verbraucher auswirken. Auf dieser Seite wird beschrieben, wie Sie wichtige Metriken, den Zustand auf Dateiebene abfragen, Observability-Dashboards erstellen und häufige Probleme beheben.

Details zur Produktionskonfiguration finden Sie unter Konfigurieren des automatischen Ladevorgangs für Produktionsworkloads.

Prerequisites

Mehrere Überwachungsworkflows auf dieser Seite stützen sich auf cloud_files_state(), um den Ingestionsstatus pro Datei zu überwachen – einschließlich Backlogabfragen, Latenzberechnungen und der Erkennung von Schemaabweichungen. cloud_files_state() ist eine Tabellenwertfunktion, die den Aufnahmestatus auf Dateiebene für einen Prüfpunkt für das automatische Laden zurückgibt. Nicht alle Felder sind standardmäßig verfügbar. Die Verfügbarkeit hängt von Ihrer Databricks-Runtime-Version und -Konfiguration ab:

  • Databricks Runtime 18.2 und höher: discovery_time, processed_time, und commit_time sind automatisch verfügbar. Auf Databricks Runtime 16.4–18.1 sind diese Felder nur dann verfügbar, wenn cloudFiles.cleanSource aktiviert ist.
  • Databricks Runtime 16.4 und höher mit cloudFiles.cleanSource aktiviert: archive_time, , archive_mode, und move_location sind verfügbar.

Die Aktivierung cloudFiles.cleanSource hat einen gewissen Leistungsaufwand. Vergleichen Sie ihre Workloads in einer Vorproduktionsumgebung, bevor Sie sie in der Produktion aktivieren.

Außerdem wurde Folgendes durchgeführt:

  • Kommentieren Sie aufgenommene Daten mit der _metadata Spalte. Erfassen Sie mindestens file_path und file_modification_time. Weitere Informationen finden Sie unter Spalte mit Dateimetadaten.
  • Aktivieren _rescued_data und _corrupt_record Spalten

Schlüsselmetriken für das automatische Laden

In der folgenden Tabelle sind die wichtigsten Metriken zusammengefasst, die für die Überwachung von Pipelines für das automatische Laden verwendet werden sollen. Diese Metriken sind über StreamingQueryListener Fortschrittsereignisse verfügbar, wobei Auto Loader-spezifische Werte unter der Map metrics jeder Quelle verfügbar sind.

Kennzahl Was sie Ihnen sagt
numFilesOutstanding Anzahl der Dateien im Backlog, die auf die Verarbeitung warten
numBytesOutstanding Größe des Dateirücklogs in Byte
approximateQueueSize Länge der Cloud-Warteschlange (nur im Dateibenachrichtigungsmodus)
numInputRows Pro Batch verarbeitete Zeilen
inputRowsPerSecond Datenankunftsrate
processedRowsPerSecond Verarbeitungsdurchsatz
durationMs Aufschlüsselung Wo die Zeit in den einzelnen Batches verbracht wird

Was zu beobachten ist

Die folgenden Muster deuten darauf hin, dass Ihre Pipeline möglicherweise Aufmerksamkeit benötigt.

  • Wachsend numFilesOutstanding: Der Backlog wächst an. Ihre Pipeline kann mit den eingehenden Daten nicht Schritt halten.
  • processedRowsPerSecond < inputRowsPerSecond: Die Pipeline verarbeitet die Daten langsamer, als sie eintreffen.
  • Groß durationMs.latestOffset: Die Dateisuche ist langsam. Erwägen Sie den Wechsel zu Datei-Ereignissen.
  • Groß durationMs.addBatch: Die Datenverarbeitung ist langsam. Erwägen Sie die Skalierung der Berechnung oder Optimierung von Transformationen.

Die vollständige Metrikreferenz finden Sie unter Auto Loader-Quellmetriken.

Status auf Dateiebene mit cloud_files_state abfragen

Die cloud_files_state() Tabellenwertfunktion enthält detaillierte Informationen zu jeder Datei, die vom automatischen Ladeprogramm ermittelt wird. Die folgenden Felder sind verfügbar. Felder, die als Databricks Runtime 16.4 und höher oder 18.2 und höher erfordernd markiert sind, werden nur unter den in Voraussetzungen beschriebenen Bedingungen ausgefüllt.

Feld Typ Description
path STRING Der Pfad der Datei
size BIGINT Die Größe der Datei in Byte
create_time TIMESTAMP Wann die Datei erstellt wurde
discovery_time TIMESTAMP Wenn das automatische Laden die Datei ermittelt hat (Databricks Runtime 16.4 und höher)
processed_time TIMESTAMP Wenn Auto Loader die Datei verarbeitet (Databricks Runtime 16.4 und höher)
commit_time TIMESTAMP Wann die Datei an den Prüfpunkt gebunden wurde (Databricks Runtime 16.4 und höher)
archive_time TIMESTAMP Wann die Datei archiviert wurde (erfordert cloudFiles.cleanSource)
archive_mode STRING MOVE, DELETE oder NULL (erfordert cloudFiles.cleanSource)
move_location STRING Zielpfad, wenn cloudFiles.cleanSourceMOVE ist
ingestion_state STRING Aktueller Dateiaufnahmestatus

Status des Dateiimports untersuchen

Die folgenden Abfragen decken allgemeine Diagnoseszenarien ab.

Suchen aller unverarbeiteten Dateien (aktueller Backlog):

SELECT * FROM cloud_files_state('path/to/checkpoint')
WHERE ingestion_state != 'COMMITTED';

Berechnen der durchschnittlichen Aufnahmelatenz (Zeit von der Dateierstellung bis zum Commit):

SELECT avg(unix_timestamp(commit_time) - unix_timestamp(create_time)) AS avg_latency_seconds
FROM cloud_files_state('path/to/checkpoint')
WHERE commit_time IS NOT NULL AND create_time IS NOT NULL;

Suchen sie beschädigte oder übersprungene Dateien:

SELECT path, ingestion_state, size, create_time
FROM cloud_files_state('path/to/checkpoint')
WHERE ingestion_state LIKE 'SKIPPED%';

Nachverfolgen des Archivierungsfortschritts (erfordert cloudFiles.cleanSource):

SELECT archive_mode, count(*) AS file_count
FROM cloud_files_state('path/to/checkpoint')
GROUP BY archive_mode;

Suchen Sie Dateien mit hoher Latenz zwischen Auffindung und Commit, um Engpässe zu ermitteln:

SELECT
  path,
  size,
  unix_timestamp(commit_time) - unix_timestamp(discovery_time) AS processing_latency_seconds,
  unix_timestamp(commit_time) - unix_timestamp(create_time) AS end_to_end_latency_seconds
FROM cloud_files_state('path/to/checkpoint')
WHERE commit_time IS NOT NULL
ORDER BY end_to_end_latency_seconds DESC
LIMIT 20;

Die vollständige SQL-Referenz finden Sie unter cloud_files_state tabellenwertige Funktion.

Überwachen des automatischen Ladevorgangs in Lakeflow Spark Declarative Pipelines

Databricks empfiehlt die Verwendung von Lakeflow Spark Declarative Pipelines für Auto-Loader-Pipelines im Produktionseinsatz. So nutzen Sie die integrierten Überwachungsfunktionen:

  • Speichern Sie das Lakeflow Spark Declarative Pipelines-Ereignisprotokoll in einer Delta-Tabelle, damit es nach Observability-Daten abgefragt werden kann. Konfigurieren Sie dies über die erweiterten Einstellungen der Pipeline oder die API. Ausführliche Informationen finden Sie im Pipelineereignisprotokoll.

  • Strukturieren Sie Ihre Pipeline für Beobachtbarkeit. Eine gut strukturierte Auto-Loader-Pipeline in Lakeflow Spark Declarative Pipelines umfasst eine {table}_source Ansicht (die Auto-Loader-Quelldefinition), eine {table}_bronze Streamingtabelle (Erfassung von Rohdaten mit den Spalten _rescued_data und _corrupt_record), eine corrupt_records_sink, die Zeilen mit nicht parsierbaren Daten in Quarantäne verschiebt, und eine {table} bereinigte Ansicht zur nachgelagerten Nutzung.

  • Definieren Sie Erwartungen für Ihre Bronze-Streaming-Tabellen, um Schemaabweichungen und Datenkorruption zu überwachen. _rescued_data IS NULL erkennt unerwartete Schemaänderungen und _corrupt_record IS NULL erkennt nicht analysierte Daten. Lakeflow Spark Declarative Pipelines wertet diese Erwartungen bei Eingang der Daten aus und erstellt einen Nachvollziehbarkeitsverlauf. Sie können Erwartungen so konfigurieren, dass sie eine Warnung ausgeben, Zeilen verwerfen oder die Pipeline fehlschlagen lassen.

Verwenden Sie nach dem Erstellen der event_log_raw Ansicht für Ihre Pipeline die folgenden Abfragen für auto Loader-spezifische Metriken.

Überwachen des Aufnahmedurchsatzes pro Fluss:

SELECT
  origin.flow_name,
  origin.update_id,
  timestamp,
  TRY_CAST(details:flow_progress.metrics.num_output_rows AS BIGINT) AS rows_written
FROM event_log_raw
WHERE event_type = 'flow_progress'
ORDER BY timestamp DESC;

Datenrückstand pro Ablauf überwachen:

SELECT
  origin.flow_name,
  timestamp,
  DOUBLE(details:flow_progress.metrics.backlog_bytes) AS backlog_bytes
FROM event_log_raw
WHERE event_type = 'flow_progress'
  AND details:flow_progress.metrics.backlog_bytes IS NOT NULL
ORDER BY timestamp DESC;

Fassen Sie Erwartungsverstöße zusammen, um Schemaabweichungen und beschädigte Daten zu erkennen:

SELECT
  origin.flow_name,
  explode(from_json(
    details:flow_progress.data_quality.expectations,
    'array<struct<name:string, dataset:string, passed_records:bigint, failed_records:bigint>>'
  )) AS expectation
FROM event_log_raw
WHERE event_type = 'flow_progress'
  AND details:flow_progress.data_quality.expectations IS NOT NULL;

Allgemeine Anleitungen zur Überwachung von Lakeflow Spark Declarative Pipelines finden Sie unter Überwachen von Pipelines und Pipeline-Ereignisprotokollen.

Überwachen des automatischen Ladevorgangs mit strukturiertem Streaming

Verwenden Sie beim Ausführen von Auto Loader außerhalb von Lakeflow Spark Declarative Pipelines die folgenden Strukturierten Streaming-Überwachungsansätze.

  • Implementieren Sie eine StreamingQueryListener, um Auto Loader-spezifische Metriken aus jedem Batch zu erfassen, indem Sie aus source.metrics lesen.
from pyspark.sql.streaming import StreamingQueryListener

class AutoLoaderMonitor(StreamingQueryListener):
    def onQueryStarted(self, event):
        pass

    def onQueryProgress(self, event):
        for source in event.progress.sources:
            if "CloudFilesSource" in source.description:
                metrics = source.metrics
                files_outstanding = metrics.get("numFilesOutstanding", "0")
                bytes_outstanding = metrics.get("numBytesOutstanding", "0")
                rows_per_sec = source.processedRowsPerSecond
                # Push metrics to your monitoring system (for example, write to a Delta table)

    def onQueryIdle(self, event):
        pass

    def onQueryTerminated(self, event):
        pass

spark.streams.addListener(AutoLoaderMonitor())

Hinweis

Die Verarbeitungslogik in Listenern kann die Abfrageverarbeitung verlangsamen. Beschränken Sie die Berechnung in Listener-Callbacks und vermeiden Sie dort synchrone externe Schreibvorgänge; geben Sie stattdessen leichte Telemetriedaten asynchron aus oder übergeben Sie Metriken zur dauerhaften Speicherung an einen separaten Prozess.

  • Verwenden Sie numInputRows, inputRowsPerSecond, und processedRowsPerSecond vom Quellfortschritt zum Berechnen des Durchsatzes – Dateien pro Sekunde und Zeilen pro Sekunde für jeden Batch.

  • Um die Erfassungslatenz zu berechnen, vergleichen Sie create_time und commit_time aus cloud_files_state() für die End-to-End-Latenz. Verwenden Sie bei der Verarbeitungslatenz die Aufschlüsselung in durationMs (z. B. latestOffset, addBatch und andere gemeldete Batch-Phasen), um zu ermitteln, welche Phase den Engpass darstellt.

  • Verwenden Sie df.observe(), um Inline-Datenqualitätsmetriken direkt im Streaming-DataFrame zu definieren. Metriken sind in StreamingQueryListener Fortschrittsereignissen unter observedMetrics sichtbar.

from pyspark.sql.functions import count, lit, col

observed_df = df.observe(
    "auto_loader_quality",
    count(lit(1)).alias("total_rows"),
    count(col("_rescued_data")).alias("rescued_rows"),
    count(col("_corrupt_record")).alias("corrupt_rows")
)
  • Verwenden Sie diese Option .queryName() , um jedem Datenstrom einen eindeutigen Namen zuzuweisen, wodurch es einfacher ist, automatische Ladedatenströme auf der Registerkarte "Spark UI Streaming" und in Überwachungsdashboards zu unterscheiden.

Die vollständige Referenz zur Strukturierten Streaming-Überwachung finden Sie unter Monitoring Structured Streaming queries on Azure Databricks.

Observability-Dashboard erstellen

Kombinieren Sie Daten aus mehreren Quellen, um ein umfassendes Observability-Dashboard für Ihre Auto Loader-Pipelines zu erstellen. In dieser Tabelle werden einige vorgeschlagene Quellen angezeigt, die Sie zum Strukturieren Ihres Observability-Dashboards verwenden können.

Datenquelle Observability-Daten
cloud_files_state() Erfassungsstatus auf Dateiebene: Erkennung, Verarbeitung, Übernahme und Archivierungszeitstempel pro Datei
Lakeflow Spark Declarative Pipelines-Ereignisprotokoll Ausführungsverlauf der Pipeline, Ablaufmetriken pro Batch und Ergebnisse der Datenqualitätserwartungen
Pipelineausgabetabellen Zeilenanzahl und pro importierter Tabelle geschriebenes Datenvolumen

Anschließend können Sie Observability-Daten in dedizierte Tabellen aggregieren, die als Grundlage für Dashboards und Warnungen dienen:

  • Status von Pipelineausführungen (Erfolg oder Fehlschlag) im Zeitverlauf zusammenfassen, abgeleitet aus event_type = 'update_progress'-Ereignissen.
  • Aggregierte Dateiaufnahmemetriken (Backlog-Größe, Durchsatz, Latenz pro Batch), abgeleitet von cloud_files_state() und event_type = 'flow_progress' Ereignissen.
  • Entwickeln Sie Tabellenstatistiken mithilfe von Zeilenanzahlen und Datenvolumen pro Tabelle, die aus num_output_rows dem Ereignisprotokoll abgeleitet werden.
  • Sammeln Sie Debug-Informationen aus detaillierten Fehlerprotokollen und Erwartungsverletzungen für jedes Update, abgeleitet aus event_type = 'flow_progress'-Ereignissen, bei denen data_quality ausgefüllt ist.

Diese aggregierten Tabellen können ein AI/BI-Dashboard und SQL-Warnungen ermöglichen. Zu den empfohlenen Dashboard-Bereichen gehören die Zeitachse des Pipeline-Ausführungsstatus, der Trend des Aufnahme-Backlogs, der Durchsatztrend, die Verteilung der Aufnahmeverzögerung, Datenqualitätsmetriken, Schemaänderungsereignisse und der Status der Dateiarchivierung.

Überwachen von Schemaentwicklungsereignissen

Verwenden Sie die folgenden Ansätze, um Schemaänderungen zu erkennen, während sie auftreten.

  • Nicht NULL-Werte in _rescued_data der Erwartungsverletzungsanzahl deuten auf Die Schemaabweichung hin. Fragen Sie das Ereignisprotokoll für failed_records > 0 zur no rescued data-Erwartung ab.
  • Änderungen am _schemas Verzeichnis innerhalb des konfigurierten cloudFiles.schemaLocation (oder nur innerhalb des Checkpoints, wenn der Schema-Speicherort nicht separat festgelegt ist) deuten darauf hin, dass eine Schema-Evolution stattgefunden hat. Sie können dieses Verzeichnis aus einem separaten Überwachungsauftrag abrufen.
  • Betrachten Sie ein onQueryTerminated-Ereignis, auf das onQueryStarted für denselben Stream-Namen folgt, nicht für sich allein als ausreichenden Nachweis einer Schemaentwicklung. Datenströme werden aus vielen Gründen neu gestartet (Clusterneustarts, Codebereitstellungen, vorübergehende Speicherfehler). Bringen Sie Neustarts mit unabhängigen Signalen in Zusammenhang — _schemas Verzeichnisänderungen oder _rescued_data Verletzungen von Erwartungen —, bevor Sie zu dem Schluss kommen, dass eine Schemaentwicklung stattgefunden hat.
  • Verwenden Sie diese Möglichkeit _metadata.file_path , um zu ermitteln, welche Dateien Schemaänderungen eingeführt haben. Verknüpfen Sie dies mit cloud_files_state() dem path Feld, um Schemaänderungen mit bestimmten Dateien und Batches zu korrelieren.

Verwenden Sie diese Beispielabfrage, um aktuelle Schemaabweichungen über Erwartungsverletzungen zu erkennen:

SELECT
  timestamp,
  origin.flow_name,
  exp.name AS expectation_name,
  exp.failed_records
FROM (
  SELECT
    timestamp,
    origin,
    explode(from_json(
      details:flow_progress.data_quality.expectations,
      'array<struct<name:string, dataset:string, passed_records:bigint, failed_records:bigint>>'
    )) AS exp
  FROM event_log_raw
  WHERE event_type = 'flow_progress'
    AND details:flow_progress.data_quality.expectations IS NOT NULL
)
WHERE exp.name = '<rescued-data expectation name>'
  AND exp.failed_records > 0
ORDER BY timestamp DESC;

Einrichten von Warnungen für häufige Probleme

Verwenden Sie DATAbricks SQL-Warnungen oder Pipelinebenachrichtigungen, um Probleme zu erkennen, bevor sie sich auf downstreame Verbraucher auswirken.

Die folgende SQL erkennt einen wachsenden Backlog und kann als Grundlage für eine Databricks SQL-Warnung verwendet werden. Konfigurieren Sie die regelmäßige Ausführung (z. B. alle 5 Minuten), und lösen Sie eine Benachrichtigung aus, wenn das Ergebnis nicht leer ist.

-- Alert when backlog exceeds threshold or trends upward across recent batches
WITH recent_backlog AS (
  SELECT
    origin.flow_name,
    timestamp,
    DOUBLE(details:flow_progress.metrics.backlog_bytes) AS backlog_bytes,
    ROW_NUMBER() OVER (PARTITION BY origin.flow_name ORDER BY timestamp DESC) AS rn
  FROM event_log_raw
  WHERE event_type = 'flow_progress'
    AND details:flow_progress.metrics.backlog_bytes IS NOT NULL
)
SELECT flow_name, backlog_bytes, timestamp
FROM recent_backlog
WHERE rn = 1
  AND backlog_bytes > 1073741824  -- alert when backlog exceeds 1 GB

In der folgenden Tabelle sind die empfohlenen Warnungsbedingungen zusammengefasst:

Was zu erkennen ist So erkennen Sie sie Zeitpunkt der Benachrichtigung
Wachsender Backlog numFilesOutstanding Trend nach oben Anhaltende Zunahme über mehrere Batches
Stockender Datenstrom Keine Fortschrittsereignisse Keine Ereignisse für N Minuten (basierend auf dem erwarteten Triggerintervall)
Hohe Aufnahmelatenz commit_time - create_time Überschreitet den SLA-Schwellenwert
Verschlechterung der Datenqualität Erwartungsfehlerrate Steigender Prozentsatz von Zeilen, die die Erwartungen nicht erfüllen
Schemaentwicklungsereignis _rescued_data IS NOT NULL Alle Nicht-NULL-Werte in der Anzahl der Erwartungsverstöße
Langsame Dateisuche durationMs.latestOffset Deutlich höher als der Ausgangswert

Häufige Probleme beheben

In der folgenden Tabelle werden allgemeine Probleme mit der Pipeline für das automatische Laden, ihre wahrscheinlichen Ursachen und empfohlenen Aktionen zum Beheben beschrieben.

Issue Mögliche Ursache Empfohlene Maßnahme
Backlog wächst schneller als die Verarbeitung Zu gering dimensionierte Rechenressourcen, Datenungleichverteilung oder gedrosselte Ratenlimits Rechenleistung skalieren, mit der Spark-Benutzeroberfläche auf Datenungleichverteilung prüfen und maxFilesPerTriggerEinstellungen zur Steuerung der Batchgröße überprüfen
Nicht gefundene Dateien Dateiereignisse fehlkonfiguriert, Berechtigungsproblem oder Datenstrom wurde nicht innerhalb von 7 Tagen ausgeführt Überprüfen Sie die Berechtigungen für externe Speicherorte, überprüfen Sie das Setup von Dateiereignissen in der Unity Catalog-Benutzeroberfläche, und stellen Sie sicher, dass der Datenstrom mindestens alle 7 Tage ausgeführt wird, um das Ablaufen des RocksDB-Zustands zu vermeiden.
Der Streamstart dauert zu lange. Download des großen Prüfpunktzustands (RocksDB) Führen Sie ein Upgrade auf Databricks Runtime 15.3 oder höher durch, um asynchrones Laden des Status zu nutzen, wodurch die Startzeit um ca. 90 % reduziert wird.
Doppelte Dateiverarbeitung Aggressive cloudFiles.maxFileAge Einstellungen oder Prüfpunktbeschädigung Verwenden Sie eine konservative maxFileAge (mindestens 90 Tage), überprüfen Sie die Prüfpunktintegrität, und vermeiden Sie Lebenszyklusrichtlinien für den Prüfpunktspeicher.
Schemaänderungen führen zu Neustarts der Pipeline Häufige oder inkompatible Schemaänderungen Prüfen Sie schemaEvolutionMode, wechseln Sie für Typheraufstufungen zu addNewColumnsWithTypeWidening oder verwenden Sie den Variant-Typ für hochdynamische Schemas
Beschädigte Daten, die in der Spüle akkumuliert werden Probleme mit der Quelldatenqualität Überprüfen sie die Quarantänesenke auf Muster, überprüfen Sie die _corrupt_record Quelldatengenerierung, und erwägen Sie, die Upstreamüberprüfung hinzuzufügen.
discovery_time und commit_time nicht befüllt Ausführung auf Databricks Runtime unter Version 18.2 ohne cleanSource Aktualisieren Sie auf Databricks Runtime 18.2 oder höher oder aktivieren Sie cloudFiles.cleanSource auf Databricks Runtime 16.4–18.1

Weitere Problembehandlung finden Sie unter Häufig gestellte Fragen zum automatischen Laden.