Hinweis
Für den Zugriff auf diese Seite ist eine Autorisierung erforderlich. Sie können versuchen, sich anzumelden oder das Verzeichnis zu wechseln.
Für den Zugriff auf diese Seite ist eine Autorisierung erforderlich. Sie können versuchen, das Verzeichnis zu wechseln.
Auto Loader-Pipelines erfordern eine aktive Überwachung, um Probleme wie wachsende Backlogs, Schemaabweichung, beschädigte Daten und blockierte Datenströme zu erkennen, bevor sie sich auf nachgeschaltete Verbraucher auswirken. Auf dieser Seite wird beschrieben, wie Sie wichtige Metriken, den Zustand auf Dateiebene abfragen, Observability-Dashboards erstellen und häufige Probleme beheben.
Details zur Produktionskonfiguration finden Sie unter Konfigurieren des automatischen Ladevorgangs für Produktionsworkloads.
Prerequisites
Mehrere Überwachungsworkflows auf dieser Seite stützen sich auf cloud_files_state(), um den Ingestionsstatus pro Datei zu überwachen – einschließlich Backlogabfragen, Latenzberechnungen und der Erkennung von Schemaabweichungen.
cloud_files_state() ist eine Tabellenwertfunktion, die den Aufnahmestatus auf Dateiebene für einen Prüfpunkt für das automatische Laden zurückgibt. Nicht alle Felder sind standardmäßig verfügbar. Die Verfügbarkeit hängt von Ihrer Databricks-Runtime-Version und -Konfiguration ab:
-
Databricks Runtime 18.2 und höher:
discovery_time,processed_time, undcommit_timesind automatisch verfügbar. Auf Databricks Runtime 16.4–18.1 sind diese Felder nur dann verfügbar, wenncloudFiles.cleanSourceaktiviert ist. -
Databricks Runtime 16.4 und höher mit
cloudFiles.cleanSourceaktiviert:archive_time, ,archive_mode, undmove_locationsind verfügbar.
Die Aktivierung cloudFiles.cleanSource hat einen gewissen Leistungsaufwand. Vergleichen Sie ihre Workloads in einer Vorproduktionsumgebung, bevor Sie sie in der Produktion aktivieren.
Außerdem wurde Folgendes durchgeführt:
- Kommentieren Sie aufgenommene Daten mit der
_metadataSpalte. Erfassen Sie mindestensfile_pathundfile_modification_time. Weitere Informationen finden Sie unter Spalte mit Dateimetadaten. - Aktivieren
_rescued_dataund_corrupt_recordSpalten
Schlüsselmetriken für das automatische Laden
In der folgenden Tabelle sind die wichtigsten Metriken zusammengefasst, die für die Überwachung von Pipelines für das automatische Laden verwendet werden sollen. Diese Metriken sind über StreamingQueryListener Fortschrittsereignisse verfügbar, wobei Auto Loader-spezifische Werte unter der Map metrics jeder Quelle verfügbar sind.
| Kennzahl | Was sie Ihnen sagt |
|---|---|
numFilesOutstanding |
Anzahl der Dateien im Backlog, die auf die Verarbeitung warten |
numBytesOutstanding |
Größe des Dateirücklogs in Byte |
approximateQueueSize |
Länge der Cloud-Warteschlange (nur im Dateibenachrichtigungsmodus) |
numInputRows |
Pro Batch verarbeitete Zeilen |
inputRowsPerSecond |
Datenankunftsrate |
processedRowsPerSecond |
Verarbeitungsdurchsatz |
durationMs Aufschlüsselung |
Wo die Zeit in den einzelnen Batches verbracht wird |
Was zu beobachten ist
Die folgenden Muster deuten darauf hin, dass Ihre Pipeline möglicherweise Aufmerksamkeit benötigt.
-
Wachsend
numFilesOutstanding: Der Backlog wächst an. Ihre Pipeline kann mit den eingehenden Daten nicht Schritt halten. -
processedRowsPerSecond<inputRowsPerSecond: Die Pipeline verarbeitet die Daten langsamer, als sie eintreffen. -
Groß
durationMs.latestOffset: Die Dateisuche ist langsam. Erwägen Sie den Wechsel zu Datei-Ereignissen. -
Groß
durationMs.addBatch: Die Datenverarbeitung ist langsam. Erwägen Sie die Skalierung der Berechnung oder Optimierung von Transformationen.
Die vollständige Metrikreferenz finden Sie unter Auto Loader-Quellmetriken.
Status auf Dateiebene mit cloud_files_state abfragen
Die cloud_files_state() Tabellenwertfunktion enthält detaillierte Informationen zu jeder Datei, die vom automatischen Ladeprogramm ermittelt wird. Die folgenden Felder sind verfügbar. Felder, die als Databricks Runtime 16.4 und höher oder 18.2 und höher erfordernd markiert sind, werden nur unter den in Voraussetzungen beschriebenen Bedingungen ausgefüllt.
| Feld | Typ | Description |
|---|---|---|
path |
STRING |
Der Pfad der Datei |
size |
BIGINT |
Die Größe der Datei in Byte |
create_time |
TIMESTAMP |
Wann die Datei erstellt wurde |
discovery_time |
TIMESTAMP |
Wenn das automatische Laden die Datei ermittelt hat (Databricks Runtime 16.4 und höher) |
processed_time |
TIMESTAMP |
Wenn Auto Loader die Datei verarbeitet (Databricks Runtime 16.4 und höher) |
commit_time |
TIMESTAMP |
Wann die Datei an den Prüfpunkt gebunden wurde (Databricks Runtime 16.4 und höher) |
archive_time |
TIMESTAMP |
Wann die Datei archiviert wurde (erfordert cloudFiles.cleanSource) |
archive_mode |
STRING |
MOVE, DELETE oder NULL (erfordert cloudFiles.cleanSource) |
move_location |
STRING |
Zielpfad, wenn cloudFiles.cleanSourceMOVE ist |
ingestion_state |
STRING |
Aktueller Dateiaufnahmestatus |
Status des Dateiimports untersuchen
Die folgenden Abfragen decken allgemeine Diagnoseszenarien ab.
Suchen aller unverarbeiteten Dateien (aktueller Backlog):
SELECT * FROM cloud_files_state('path/to/checkpoint')
WHERE ingestion_state != 'COMMITTED';
Berechnen der durchschnittlichen Aufnahmelatenz (Zeit von der Dateierstellung bis zum Commit):
SELECT avg(unix_timestamp(commit_time) - unix_timestamp(create_time)) AS avg_latency_seconds
FROM cloud_files_state('path/to/checkpoint')
WHERE commit_time IS NOT NULL AND create_time IS NOT NULL;
Suchen sie beschädigte oder übersprungene Dateien:
SELECT path, ingestion_state, size, create_time
FROM cloud_files_state('path/to/checkpoint')
WHERE ingestion_state LIKE 'SKIPPED%';
Nachverfolgen des Archivierungsfortschritts (erfordert cloudFiles.cleanSource):
SELECT archive_mode, count(*) AS file_count
FROM cloud_files_state('path/to/checkpoint')
GROUP BY archive_mode;
Suchen Sie Dateien mit hoher Latenz zwischen Auffindung und Commit, um Engpässe zu ermitteln:
SELECT
path,
size,
unix_timestamp(commit_time) - unix_timestamp(discovery_time) AS processing_latency_seconds,
unix_timestamp(commit_time) - unix_timestamp(create_time) AS end_to_end_latency_seconds
FROM cloud_files_state('path/to/checkpoint')
WHERE commit_time IS NOT NULL
ORDER BY end_to_end_latency_seconds DESC
LIMIT 20;
Die vollständige SQL-Referenz finden Sie unter cloud_files_state tabellenwertige Funktion.
Überwachen des automatischen Ladevorgangs in Lakeflow Spark Declarative Pipelines
Databricks empfiehlt die Verwendung von Lakeflow Spark Declarative Pipelines für Auto-Loader-Pipelines im Produktionseinsatz. So nutzen Sie die integrierten Überwachungsfunktionen:
Speichern Sie das Lakeflow Spark Declarative Pipelines-Ereignisprotokoll in einer Delta-Tabelle, damit es nach Observability-Daten abgefragt werden kann. Konfigurieren Sie dies über die erweiterten Einstellungen der Pipeline oder die API. Ausführliche Informationen finden Sie im Pipelineereignisprotokoll.
Strukturieren Sie Ihre Pipeline für Beobachtbarkeit. Eine gut strukturierte Auto-Loader-Pipeline in Lakeflow Spark Declarative Pipelines umfasst eine
{table}_sourceAnsicht (die Auto-Loader-Quelldefinition), eine{table}_bronzeStreamingtabelle (Erfassung von Rohdaten mit den Spalten_rescued_dataund_corrupt_record), einecorrupt_records_sink, die Zeilen mit nicht parsierbaren Daten in Quarantäne verschiebt, und eine{table}bereinigte Ansicht zur nachgelagerten Nutzung.Definieren Sie Erwartungen für Ihre Bronze-Streaming-Tabellen, um Schemaabweichungen und Datenkorruption zu überwachen.
_rescued_data IS NULLerkennt unerwartete Schemaänderungen und_corrupt_record IS NULLerkennt nicht analysierte Daten. Lakeflow Spark Declarative Pipelines wertet diese Erwartungen bei Eingang der Daten aus und erstellt einen Nachvollziehbarkeitsverlauf. Sie können Erwartungen so konfigurieren, dass sie eine Warnung ausgeben, Zeilen verwerfen oder die Pipeline fehlschlagen lassen.
Verwenden Sie nach dem Erstellen der event_log_raw Ansicht für Ihre Pipeline die folgenden Abfragen für auto Loader-spezifische Metriken.
Überwachen des Aufnahmedurchsatzes pro Fluss:
SELECT
origin.flow_name,
origin.update_id,
timestamp,
TRY_CAST(details:flow_progress.metrics.num_output_rows AS BIGINT) AS rows_written
FROM event_log_raw
WHERE event_type = 'flow_progress'
ORDER BY timestamp DESC;
Datenrückstand pro Ablauf überwachen:
SELECT
origin.flow_name,
timestamp,
DOUBLE(details:flow_progress.metrics.backlog_bytes) AS backlog_bytes
FROM event_log_raw
WHERE event_type = 'flow_progress'
AND details:flow_progress.metrics.backlog_bytes IS NOT NULL
ORDER BY timestamp DESC;
Fassen Sie Erwartungsverstöße zusammen, um Schemaabweichungen und beschädigte Daten zu erkennen:
SELECT
origin.flow_name,
explode(from_json(
details:flow_progress.data_quality.expectations,
'array<struct<name:string, dataset:string, passed_records:bigint, failed_records:bigint>>'
)) AS expectation
FROM event_log_raw
WHERE event_type = 'flow_progress'
AND details:flow_progress.data_quality.expectations IS NOT NULL;
Allgemeine Anleitungen zur Überwachung von Lakeflow Spark Declarative Pipelines finden Sie unter Überwachen von Pipelines und Pipeline-Ereignisprotokollen.
Überwachen des automatischen Ladevorgangs mit strukturiertem Streaming
Verwenden Sie beim Ausführen von Auto Loader außerhalb von Lakeflow Spark Declarative Pipelines die folgenden Strukturierten Streaming-Überwachungsansätze.
- Implementieren Sie eine
StreamingQueryListener, um Auto Loader-spezifische Metriken aus jedem Batch zu erfassen, indem Sie aussource.metricslesen.
from pyspark.sql.streaming import StreamingQueryListener
class AutoLoaderMonitor(StreamingQueryListener):
def onQueryStarted(self, event):
pass
def onQueryProgress(self, event):
for source in event.progress.sources:
if "CloudFilesSource" in source.description:
metrics = source.metrics
files_outstanding = metrics.get("numFilesOutstanding", "0")
bytes_outstanding = metrics.get("numBytesOutstanding", "0")
rows_per_sec = source.processedRowsPerSecond
# Push metrics to your monitoring system (for example, write to a Delta table)
def onQueryIdle(self, event):
pass
def onQueryTerminated(self, event):
pass
spark.streams.addListener(AutoLoaderMonitor())
Hinweis
Die Verarbeitungslogik in Listenern kann die Abfrageverarbeitung verlangsamen. Beschränken Sie die Berechnung in Listener-Callbacks und vermeiden Sie dort synchrone externe Schreibvorgänge; geben Sie stattdessen leichte Telemetriedaten asynchron aus oder übergeben Sie Metriken zur dauerhaften Speicherung an einen separaten Prozess.
Verwenden Sie
numInputRows,inputRowsPerSecond, undprocessedRowsPerSecondvom Quellfortschritt zum Berechnen des Durchsatzes – Dateien pro Sekunde und Zeilen pro Sekunde für jeden Batch.Um die Erfassungslatenz zu berechnen, vergleichen Sie
create_timeundcommit_timeauscloud_files_state()für die End-to-End-Latenz. Verwenden Sie bei der Verarbeitungslatenz die Aufschlüsselung indurationMs(z. B.latestOffset,addBatchund andere gemeldete Batch-Phasen), um zu ermitteln, welche Phase den Engpass darstellt.Verwenden Sie
df.observe(), um Inline-Datenqualitätsmetriken direkt im Streaming-DataFrame zu definieren. Metriken sind inStreamingQueryListenerFortschrittsereignissen unterobservedMetricssichtbar.
from pyspark.sql.functions import count, lit, col
observed_df = df.observe(
"auto_loader_quality",
count(lit(1)).alias("total_rows"),
count(col("_rescued_data")).alias("rescued_rows"),
count(col("_corrupt_record")).alias("corrupt_rows")
)
- Verwenden Sie diese Option
.queryName(), um jedem Datenstrom einen eindeutigen Namen zuzuweisen, wodurch es einfacher ist, automatische Ladedatenströme auf der Registerkarte "Spark UI Streaming" und in Überwachungsdashboards zu unterscheiden.
Die vollständige Referenz zur Strukturierten Streaming-Überwachung finden Sie unter Monitoring Structured Streaming queries on Azure Databricks.
Observability-Dashboard erstellen
Kombinieren Sie Daten aus mehreren Quellen, um ein umfassendes Observability-Dashboard für Ihre Auto Loader-Pipelines zu erstellen. In dieser Tabelle werden einige vorgeschlagene Quellen angezeigt, die Sie zum Strukturieren Ihres Observability-Dashboards verwenden können.
| Datenquelle | Observability-Daten |
|---|---|
cloud_files_state() |
Erfassungsstatus auf Dateiebene: Erkennung, Verarbeitung, Übernahme und Archivierungszeitstempel pro Datei |
| Lakeflow Spark Declarative Pipelines-Ereignisprotokoll | Ausführungsverlauf der Pipeline, Ablaufmetriken pro Batch und Ergebnisse der Datenqualitätserwartungen |
| Pipelineausgabetabellen | Zeilenanzahl und pro importierter Tabelle geschriebenes Datenvolumen |
Anschließend können Sie Observability-Daten in dedizierte Tabellen aggregieren, die als Grundlage für Dashboards und Warnungen dienen:
- Status von Pipelineausführungen (Erfolg oder Fehlschlag) im Zeitverlauf zusammenfassen, abgeleitet aus
event_type = 'update_progress'-Ereignissen. - Aggregierte Dateiaufnahmemetriken (Backlog-Größe, Durchsatz, Latenz pro Batch), abgeleitet von
cloud_files_state()undevent_type = 'flow_progress'Ereignissen. - Entwickeln Sie Tabellenstatistiken mithilfe von Zeilenanzahlen und Datenvolumen pro Tabelle, die aus
num_output_rowsdem Ereignisprotokoll abgeleitet werden. - Sammeln Sie Debug-Informationen aus detaillierten Fehlerprotokollen und Erwartungsverletzungen für jedes Update, abgeleitet aus
event_type = 'flow_progress'-Ereignissen, bei denendata_qualityausgefüllt ist.
Diese aggregierten Tabellen können ein AI/BI-Dashboard und SQL-Warnungen ermöglichen. Zu den empfohlenen Dashboard-Bereichen gehören die Zeitachse des Pipeline-Ausführungsstatus, der Trend des Aufnahme-Backlogs, der Durchsatztrend, die Verteilung der Aufnahmeverzögerung, Datenqualitätsmetriken, Schemaänderungsereignisse und der Status der Dateiarchivierung.
Überwachen von Schemaentwicklungsereignissen
Verwenden Sie die folgenden Ansätze, um Schemaänderungen zu erkennen, während sie auftreten.
- Nicht NULL-Werte in
_rescued_datader Erwartungsverletzungsanzahl deuten auf Die Schemaabweichung hin. Fragen Sie das Ereignisprotokoll fürfailed_records > 0zurno rescued data-Erwartung ab. - Änderungen am
_schemasVerzeichnis innerhalb des konfiguriertencloudFiles.schemaLocation(oder nur innerhalb des Checkpoints, wenn der Schema-Speicherort nicht separat festgelegt ist) deuten darauf hin, dass eine Schema-Evolution stattgefunden hat. Sie können dieses Verzeichnis aus einem separaten Überwachungsauftrag abrufen. - Betrachten Sie ein
onQueryTerminated-Ereignis, auf dasonQueryStartedfür denselben Stream-Namen folgt, nicht für sich allein als ausreichenden Nachweis einer Schemaentwicklung. Datenströme werden aus vielen Gründen neu gestartet (Clusterneustarts, Codebereitstellungen, vorübergehende Speicherfehler). Bringen Sie Neustarts mit unabhängigen Signalen in Zusammenhang —_schemasVerzeichnisänderungen oder_rescued_dataVerletzungen von Erwartungen —, bevor Sie zu dem Schluss kommen, dass eine Schemaentwicklung stattgefunden hat. - Verwenden Sie diese Möglichkeit
_metadata.file_path, um zu ermitteln, welche Dateien Schemaänderungen eingeführt haben. Verknüpfen Sie dies mitcloud_files_state()dempathFeld, um Schemaänderungen mit bestimmten Dateien und Batches zu korrelieren.
Verwenden Sie diese Beispielabfrage, um aktuelle Schemaabweichungen über Erwartungsverletzungen zu erkennen:
SELECT
timestamp,
origin.flow_name,
exp.name AS expectation_name,
exp.failed_records
FROM (
SELECT
timestamp,
origin,
explode(from_json(
details:flow_progress.data_quality.expectations,
'array<struct<name:string, dataset:string, passed_records:bigint, failed_records:bigint>>'
)) AS exp
FROM event_log_raw
WHERE event_type = 'flow_progress'
AND details:flow_progress.data_quality.expectations IS NOT NULL
)
WHERE exp.name = '<rescued-data expectation name>'
AND exp.failed_records > 0
ORDER BY timestamp DESC;
Einrichten von Warnungen für häufige Probleme
Verwenden Sie DATAbricks SQL-Warnungen oder Pipelinebenachrichtigungen, um Probleme zu erkennen, bevor sie sich auf downstreame Verbraucher auswirken.
Die folgende SQL erkennt einen wachsenden Backlog und kann als Grundlage für eine Databricks SQL-Warnung verwendet werden. Konfigurieren Sie die regelmäßige Ausführung (z. B. alle 5 Minuten), und lösen Sie eine Benachrichtigung aus, wenn das Ergebnis nicht leer ist.
-- Alert when backlog exceeds threshold or trends upward across recent batches
WITH recent_backlog AS (
SELECT
origin.flow_name,
timestamp,
DOUBLE(details:flow_progress.metrics.backlog_bytes) AS backlog_bytes,
ROW_NUMBER() OVER (PARTITION BY origin.flow_name ORDER BY timestamp DESC) AS rn
FROM event_log_raw
WHERE event_type = 'flow_progress'
AND details:flow_progress.metrics.backlog_bytes IS NOT NULL
)
SELECT flow_name, backlog_bytes, timestamp
FROM recent_backlog
WHERE rn = 1
AND backlog_bytes > 1073741824 -- alert when backlog exceeds 1 GB
In der folgenden Tabelle sind die empfohlenen Warnungsbedingungen zusammengefasst:
| Was zu erkennen ist | So erkennen Sie sie | Zeitpunkt der Benachrichtigung |
|---|---|---|
| Wachsender Backlog |
numFilesOutstanding Trend nach oben |
Anhaltende Zunahme über mehrere Batches |
| Stockender Datenstrom | Keine Fortschrittsereignisse | Keine Ereignisse für N Minuten (basierend auf dem erwarteten Triggerintervall) |
| Hohe Aufnahmelatenz | commit_time - create_time |
Überschreitet den SLA-Schwellenwert |
| Verschlechterung der Datenqualität | Erwartungsfehlerrate | Steigender Prozentsatz von Zeilen, die die Erwartungen nicht erfüllen |
| Schemaentwicklungsereignis | _rescued_data IS NOT NULL |
Alle Nicht-NULL-Werte in der Anzahl der Erwartungsverstöße |
| Langsame Dateisuche | durationMs.latestOffset |
Deutlich höher als der Ausgangswert |
Häufige Probleme beheben
In der folgenden Tabelle werden allgemeine Probleme mit der Pipeline für das automatische Laden, ihre wahrscheinlichen Ursachen und empfohlenen Aktionen zum Beheben beschrieben.
| Issue | Mögliche Ursache | Empfohlene Maßnahme |
|---|---|---|
| Backlog wächst schneller als die Verarbeitung | Zu gering dimensionierte Rechenressourcen, Datenungleichverteilung oder gedrosselte Ratenlimits | Rechenleistung skalieren, mit der Spark-Benutzeroberfläche auf Datenungleichverteilung prüfen und maxFilesPerTriggerEinstellungen zur Steuerung der Batchgröße überprüfen |
| Nicht gefundene Dateien | Dateiereignisse fehlkonfiguriert, Berechtigungsproblem oder Datenstrom wurde nicht innerhalb von 7 Tagen ausgeführt | Überprüfen Sie die Berechtigungen für externe Speicherorte, überprüfen Sie das Setup von Dateiereignissen in der Unity Catalog-Benutzeroberfläche, und stellen Sie sicher, dass der Datenstrom mindestens alle 7 Tage ausgeführt wird, um das Ablaufen des RocksDB-Zustands zu vermeiden. |
| Der Streamstart dauert zu lange. | Download des großen Prüfpunktzustands (RocksDB) | Führen Sie ein Upgrade auf Databricks Runtime 15.3 oder höher durch, um asynchrones Laden des Status zu nutzen, wodurch die Startzeit um ca. 90 % reduziert wird. |
| Doppelte Dateiverarbeitung | Aggressive cloudFiles.maxFileAge Einstellungen oder Prüfpunktbeschädigung |
Verwenden Sie eine konservative maxFileAge (mindestens 90 Tage), überprüfen Sie die Prüfpunktintegrität, und vermeiden Sie Lebenszyklusrichtlinien für den Prüfpunktspeicher. |
| Schemaänderungen führen zu Neustarts der Pipeline | Häufige oder inkompatible Schemaänderungen | Prüfen Sie schemaEvolutionMode, wechseln Sie für Typheraufstufungen zu addNewColumnsWithTypeWidening oder verwenden Sie den Variant-Typ für hochdynamische Schemas |
| Beschädigte Daten, die in der Spüle akkumuliert werden | Probleme mit der Quelldatenqualität | Überprüfen sie die Quarantänesenke auf Muster, überprüfen Sie die _corrupt_record Quelldatengenerierung, und erwägen Sie, die Upstreamüberprüfung hinzuzufügen. |
discovery_time und commit_time nicht befüllt |
Ausführung auf Databricks Runtime unter Version 18.2 ohne cleanSource |
Aktualisieren Sie auf Databricks Runtime 18.2 oder höher oder aktivieren Sie cloudFiles.cleanSource auf Databricks Runtime 16.4–18.1 |
Weitere Problembehandlung finden Sie unter Häufig gestellte Fragen zum automatischen Laden.