자동 로더 모니터링 및 관찰

자동 로더 파이프라인은 다운스트림 소비자에게 영향을 미치기 전에 백로그 증가, 스키마 드리프트, 손상된 데이터 및 중단된 스트림과 같은 문제를 감지하기 위해 활성 모니터링이 필요합니다. 이 페이지에서는 주요 메트릭을 모니터링하고, 파일 수준 상태를 쿼리하고, 관찰성 대시보드를 빌드하고, 일반적인 문제를 해결하는 방법을 설명합니다.

프로덕션 구성 세부 정보는 프로덕션 워크로드에 대한 자동 로더 구성을 참조하세요.

사전 요구 사항

이 페이지의 cloud_files_state() 여러 모니터링 워크플로는 백로그 쿼리, 대기 시간 계산 및 스키마 드리프트 검색을 포함하여 파일별 수집 상태를 관찰합니다. cloud_files_state() 는 자동 로더 검사점의 파일 수준 수집 상태를 반환하는 테이블 반환 함수입니다. 기본적으로 모든 필드를 사용할 수 있는 것은 아닙니다. 가용성은 Databricks 런타임 버전 및 구성에 따라 달라집니다.

  • Databricks Runtime 18.2 이상: discovery_time, processed_timecommit_time 자동으로 사용할 수 있습니다. Databricks Runtime 16.4–18.1에서 이러한 필드는 사용하도록 설정된 경우에만 cloudFiles.cleanSource 사용할 수 있습니다.
  • cloudFiles.cleanSource이 활성화된 Databricks Runtime 16.4 이상: archive_time, archive_modemove_location을 사용할 수 있습니다.

cloudFiles.cleanSource을 사용하도록 설정하면 일부 성능 오버헤드가 발생합니다. 프로덕션 환경에서 이를 사용하도록 설정하기 전에 사전 프로덕션 환경에서 워크로드를 기준으로 벤치마크하십시오.

또한:

  • 수집된 데이터에 _metadata 열로 주석을 답니다. 최소한 file_pathfile_modification_time을(를) 캡처하세요. 파일 메타데이터 열을 참조하세요.
  • _rescued_data_corrupt_record 열을 활성화합니다.

키 자동 로더 지표

다음 표에는 자동 로더 파이프라인에 대해 모니터링할 가장 중요한 메트릭이 요약되어 있습니다. 이러한 메트릭은 StreamingQueryListener 진행 이벤트에서 확인할 수 있으며, Auto Loader 관련 값은 각 소스의 metrics 맵 아래에 표시됩니다.

Metric 그것은 당신에게 무엇을 알려줍니다
numFilesOutstanding 처리 대기 중인 백로그의 파일 수
numBytesOutstanding 파일 백로그 크기(바이트)
approximateQueueSize 클라우드 큐 깊이(파일 알림 모드에만 해당)
numInputRows 배치당 처리되는 행 수
inputRowsPerSecond 데이터 도착률
processedRowsPerSecond 처리량
durationMs 고장 각 일괄 처리에서 소요되는 시간

주의할 사항

다음 패턴은 파이프라인에 주의가 필요할 수 있음을 나타냅니다.

  • 증가 numFilesOutstanding: 백로그가 쌓이고 있습니다. 파이프라인이 들어오는 데이터에 뒤처지고 있습니다.
  • processedRowsPerSecond < inputRowsPerSecond: 파이프라인이 도착하는 것보다 느리게 데이터를 처리하고 있습니다.
  • 대용량 durationMs.latestOffset: 파일 검색 속도가 느립니다. 파일 이벤트로 전환하는 것이 좋습니다.
  • 대형 durationMs.addBatch: 데이터 처리 속도가 느립니다. 컴퓨팅 크기를 조정하거나 변환을 최적화하는 것이 좋습니다.

전체 메트릭 참조는 자동 로더 원본 메트릭을 참조하세요.

다음을 사용하여 파일 수준 상태 쿼리 cloud_files_state

cloud_files_state() 테이블 값 함수는 Auto Loader가 발견한 각 파일에 대한 자세한 정보를 제공합니다. 다음 필드를 사용할 수 있습니다. Databricks Runtime 16.4 이상 또는 18.2 이상이 필요한 것으로 표시된 필드는 필수 구성 요소에 설명된 조건에서만 채워집니다.

Field Type Description
path STRING 파일의 경로입니다.
size BIGINT 파일 크기(바이트)
create_time TIMESTAMP 파일을 만든 경우
discovery_time TIMESTAMP 자동 로더가 파일을 검색한 경우(Databricks Runtime 16.4 이상)
processed_time TIMESTAMP 자동 로더가 파일을 처리한 경우(Databricks Runtime 16.4 이상)
commit_time TIMESTAMP 파일이 검사점(Databricks Runtime 16.4 이상)에 커밋된 경우
archive_time TIMESTAMP 파일이 보관된 경우(필요 cloudFiles.cleanSource)
archive_mode STRING MOVE, DELETE또는 NULL (필요 cloudFiles.cleanSource)
move_location STRING cloudFiles.cleanSource이(가) MOVE인 경우의 대상 경로
ingestion_state STRING 현재 파일 수집 상태

파일 수집 상태 확인

다음 쿼리는 일반적인 진단 시나리오를 다룹니다.

처리되지 않은 모든 파일(현재 백로그)을 찾습니다.

SELECT * FROM cloud_files_state('path/to/checkpoint')
WHERE ingestion_state != 'COMMITTED';

컴퓨팅 평균 수집 대기 시간(파일 생성에서 커밋까지의 시간):

SELECT avg(unix_timestamp(commit_time) - unix_timestamp(create_time)) AS avg_latency_seconds
FROM cloud_files_state('path/to/checkpoint')
WHERE commit_time IS NOT NULL AND create_time IS NOT NULL;

손상되었거나 건너뛴 파일을 찾습니다.

SELECT path, ingestion_state, size, create_time
FROM cloud_files_state('path/to/checkpoint')
WHERE ingestion_state LIKE 'SKIPPED%';

보관 진행 상황 추적(cloudFiles.cleanSource 필요):

SELECT archive_mode, count(*) AS file_count
FROM cloud_files_state('path/to/checkpoint')
GROUP BY archive_mode;

커밋 대기 시간이 긴 파일을 찾아 병목 상태를 식별합니다.

SELECT
  path,
  size,
  unix_timestamp(commit_time) - unix_timestamp(discovery_time) AS processing_latency_seconds,
  unix_timestamp(commit_time) - unix_timestamp(create_time) AS end_to_end_latency_seconds
FROM cloud_files_state('path/to/checkpoint')
WHERE commit_time IS NOT NULL
ORDER BY end_to_end_latency_seconds DESC
LIMIT 20;

전체 SQL 참조는 테이블 반환 함수를 참조cloud_files_state하세요.

Lakeflow Spark 선언적 파이프라인에서 자동 로더 모니터링

Databricks는 프로덕션 자동 로더 파이프라인에 Lakeflow Spark 선언적 파이프라인을 사용하는 것이 좋습니다. 기본 제공 모니터링 기능을 활용하려면 다음을 수행합니다.

  • Lakeflow Spark 선언적 파이프라인 이벤트 로그를 델타 테이블에 저장하여 관찰 가능성 데이터를 쿼리할 수 있습니다. 파이프라인의 고급 설정 또는 API를 통해 구성합니다. 자세한 내용은 파이프라인 이벤트 로그를 참조하세요.

  • 가시성을 위해 파이프라인을 구성합니다. Lakeflow Spark Declarative Pipelines의 체계적으로 구성된 Auto Loader 파이프라인에는 {table}_source 뷰(Auto Loader 소스 정의), {table}_bronze 스트리밍 테이블(원시 데이터 수집, _rescued_data_corrupt_record 열 포함), 구문 분석할 수 없는 데이터가 있는 행을 격리하는 corrupt_records_sink, 그리고 다운스트림 소비를 위한 {table} 정제된 뷰가 포함됩니다.

  • 스키마 드리프트 및 데이터 손상을 모니터링하도록 브론즈 스트리밍 테이블에 대한 기대치를 설정합니다. _rescued_data IS NULL 는 예기치 않은 스키마 변경을 감지하고 _corrupt_record IS NULL 분리할 수 없는 데이터를 검색합니다. Lakeflow Spark 선언적 파이프라인은 데이터가 도착하여 관찰 가능성 내역을 생성할 때 이러한 기대치를 평가합니다. 경고, 행 삭제 또는 파이프라인 실패에 대한 기대치를 구성할 수 있습니다.

파이프라인용 event_log_raw 보기를 만든 후 Auto Loader 관련 메트릭에 대해 다음 쿼리를 사용하세요.

흐름별 수집 처리량 모니터링:

SELECT
  origin.flow_name,
  origin.update_id,
  timestamp,
  TRY_CAST(details:flow_progress.metrics.num_output_rows AS BIGINT) AS rows_written
FROM event_log_raw
WHERE event_type = 'flow_progress'
ORDER BY timestamp DESC;

흐름당 데이터 백로그 모니터링:

SELECT
  origin.flow_name,
  timestamp,
  DOUBLE(details:flow_progress.metrics.backlog_bytes) AS backlog_bytes
FROM event_log_raw
WHERE event_type = 'flow_progress'
  AND details:flow_progress.metrics.backlog_bytes IS NOT NULL
ORDER BY timestamp DESC;

스키마 드리프트 및 손상된 데이터를 검색하는 예상 위반을 요약합니다.

SELECT
  origin.flow_name,
  explode(from_json(
    details:flow_progress.data_quality.expectations,
    'array<struct<name:string, dataset:string, passed_records:bigint, failed_records:bigint>>'
  )) AS expectation
FROM event_log_raw
WHERE event_type = 'flow_progress'
  AND details:flow_progress.data_quality.expectations IS NOT NULL;

일반적인 Lakeflow Spark 선언적 파이프라인 모니터링 지침은 파이프라인 모니터링파이프라인 이벤트 로그를 참조하세요.

구조적 스트리밍을 사용하여 자동 로더 모니터링

Lakeflow Spark 선언적 파이프라인 외부에서 자동 로더를 실행하는 경우 다음 구조적 스트리밍 모니터링 방법을 사용합니다.

  • source.metrics에서 읽어 각 배치의 Auto Loader 관련 메트릭을 수집하도록 StreamingQueryListener를 구현합니다.
from pyspark.sql.streaming import StreamingQueryListener

class AutoLoaderMonitor(StreamingQueryListener):
    def onQueryStarted(self, event):
        pass

    def onQueryProgress(self, event):
        for source in event.progress.sources:
            if "CloudFilesSource" in source.description:
                metrics = source.metrics
                files_outstanding = metrics.get("numFilesOutstanding", "0")
                bytes_outstanding = metrics.get("numBytesOutstanding", "0")
                rows_per_sec = source.processedRowsPerSecond
                # Push metrics to your monitoring system (for example, write to a Delta table)

    def onQueryIdle(self, event):
        pass

    def onQueryTerminated(self, event):
        pass

spark.streams.addListener(AutoLoaderMonitor())

비고

수신기에서 논리를 처리하면 쿼리 처리 속도가 느려질 수 있습니다. 수신기 콜백에서 계산을 제한하고 동기 외부 쓰기를 방지합니다. 대신 경량 원격 분석을 비동기적으로 내보내거나 지속성을 위해 메트릭을 별도의 작업으로 전달합니다.

  • 처리량, 즉 각 배치의 초당 파일 수와 초당 행 수를 계산하려면 소스 진행률의 numInputRows, inputRowsPerSecond, processedRowsPerSecond를 사용하세요.

  • 수집 지연 시간을 계산하려면 종단 간 지연 시간에 대해 cloud_files_state()create_timecommit_time를 비교합니다. 처리 지연 시간의 경우 durationMs 분석(예: latestOffset, addBatch 및 기타 보고된 일괄 처리 단계)을 사용하여 어떤 단계가 병목인지 식별합니다.

  • 스트리밍 데이터 프레임에서 직접 인라인 데이터 품질 메트릭을 정의하는 데 사용합니다 df.observe() . 지표는 observedMetrics 아래의 StreamingQueryListener 진행 이벤트에서 볼 수 있습니다.

from pyspark.sql.functions import count, lit, col

observed_df = df.observe(
    "auto_loader_quality",
    count(lit(1)).alias("total_rows"),
    count(col("_rescued_data")).alias("rescued_rows"),
    count(col("_corrupt_record")).alias("corrupt_rows")
)
  • .queryName() 각 스트림에 고유한 이름을 할당하여 Spark UI 스트리밍 탭 및 모니터링 대시보드에서 자동 로더 스트림을 보다 쉽게 구분할 수 있습니다.

전체 구조적 스트리밍 모니터링 참조는 Azure Databricks 대한 구조적 스트리밍 모니터링 쿼리를 참조하세요.

관찰성 대시보드 빌드

여러 원본의 데이터를 결합하여 자동 로더 파이프라인에 대한 포괄적인 관찰성 대시보드를 빌드합니다. 이 표에는 관찰성 대시보드를 구성하는 데 사용할 수 있는 몇 가지 제안된 원본이 표시됩니다.

데이터 원본 관찰 가능성 데이터
cloud_files_state() 파일 수준 수집 상태: 파일당 검색, 처리, 커밋 및 보관 타임스탬프
Lakeflow Spark 선언적 파이프라인 이벤트 로그 파이프라인 실행 기록, 일괄 처리별 흐름 메트릭 및 데이터 품질 예상 결과
파이프라인 출력 테이블 수집된 각 테이블에 대해 기록된 행 수 및 데이터 양

그런 다음, 대시보드 및 경고의 기초 역할을 하는 전용 테이블로 관찰성 데이터를 집계할 수 있습니다.

  • 이벤트에서 파생된 event_type = 'update_progress' 시간에 따른 파이프라인 실행 상태(성공 또는 실패)를 요약합니다.
  • cloud_files_state()event_type = 'flow_progress' 이벤트에서 파생된 파일 수집 메트릭(백로그 크기, 처리량, 일괄 처리당 지연 시간)을 집계합니다.
  • 이벤트 로그에서 num_output_rows 파생된 테이블당 행 수 및 데이터 볼륨을 사용하여 테이블 통계를 개발합니다.
  • data_quality이 채워진 event_type = 'flow_progress' 이벤트에서 파생된 업데이트별 자세한 오류 로그와 기대값 위반 정보에서 디버깅 정보를 수집합니다.

이러한 집계 테이블은 AI/BI 대시보드 및 SQL 경고를 구동할 수 있습니다. 권장되는 대시보드 패널에는 파이프라인 실행 상태 타임라인, 수집 백로그 추세, 처리량 추세, 수집 대기 시간 분포, 데이터 품질 메트릭, 스키마 진화 이벤트 및 파일 보관 상태가 포함됩니다.

스키마 진화 이벤트 모니터링

다음 방법을 사용하여 스키마 변경이 발생할 때 이를 검색합니다.

  • 예상 위반 횟수에서 _rescued_data NULL이 아닌 값은 스키마 드리프트를 나타냅니다. no rescued data expectation에서 failed_records > 0의 이벤트 로그를 조회합니다.
  • _schemas 구성된 cloudFiles.schemaLocation 내부 또는 스키마 위치가 별도로 설정되지 않은 경우에만 검사점 내에서 디렉터리를 변경하면 스키마가 진화했음을 나타냅니다. 별도의 모니터링 작업에서 이 디렉터리를 폴링할 수 있습니다.
  • 동일한 스트림 이름에 대해 onQueryTerminated 이벤트 다음에 onQueryStarted가 온다고 해서, 그 자체만으로 스키마 진화의 충분한 증거로 간주하지 마세요. 여러 가지 이유로 스트림이 다시 시작됩니다(클러스터 다시 시작, 코드 배포, 일시적인 스토리지 오류). 스키마 진화가 발생했다고 결론내리기 전에 재시작을 독립적인 신호 — _schemas 디렉터리 변경 또는 _rescued_data 기대치 위반 — 와 연관 지어 판단하세요.
  • 스키마 변경을 도입한 파일을 식별하는 데 사용합니다 _metadata.file_path . path 필드를 기준으로 이것을 cloud_files_state()와 조인하여 스키마 변경 사항을 특정 파일 및 배치와 연관 지을 수 있습니다.

이 예제 쿼리를 사용하여 예상 위반을 통해 최근 스키마 드리프트를 검색합니다.

SELECT
  timestamp,
  origin.flow_name,
  exp.name AS expectation_name,
  exp.failed_records
FROM (
  SELECT
    timestamp,
    origin,
    explode(from_json(
      details:flow_progress.data_quality.expectations,
      'array<struct<name:string, dataset:string, passed_records:bigint, failed_records:bigint>>'
    )) AS exp
  FROM event_log_raw
  WHERE event_type = 'flow_progress'
    AND details:flow_progress.data_quality.expectations IS NOT NULL
)
WHERE exp.name = '<rescued-data expectation name>'
  AND exp.failed_records > 0
ORDER BY timestamp DESC;

일반적인 문제에 대한 경고 설정

Databricks SQL 경고 또는 파이프라인 알림을 사용하여 다운스트림 소비자에게 영향을 미치기 전에 문제를 검색합니다.

다음 SQL은 증가하는 백로그를 검색하고 Databricks SQL 경고의 기준으로 사용할 수 있습니다. 주기적으로 실행되도록 예약하고(예: 5분마다) 결과가 비어있지 않은 경우 경고합니다.

-- Alert when backlog exceeds threshold or trends upward across recent batches
WITH recent_backlog AS (
  SELECT
    origin.flow_name,
    timestamp,
    DOUBLE(details:flow_progress.metrics.backlog_bytes) AS backlog_bytes,
    ROW_NUMBER() OVER (PARTITION BY origin.flow_name ORDER BY timestamp DESC) AS rn
  FROM event_log_raw
  WHERE event_type = 'flow_progress'
    AND details:flow_progress.metrics.backlog_bytes IS NOT NULL
)
SELECT flow_name, backlog_bytes, timestamp
FROM recent_backlog
WHERE rn = 1
  AND backlog_bytes > 1073741824  -- alert when backlog exceeds 1 GB

다음 표에서는 권장되는 경고 조건을 요약합니다.

감지할 항목 감지 방법 경고할 시기
증가하는 백로그 numFilesOutstanding 상향 추세 여러 일괄 처리에 대한 지속적인 증가
중단된 스트림 진행 이벤트 없음 N분 동안 이벤트가 없음(예상 트리거 간격 기준)
높은 수집 대기 시간 commit_time - create_time SLA 임계값을 초과합니다.
데이터 품질 저하 예상 실패율 기대치를 충족하지 못하는 행의 비율 증가
스키마 진화 이벤트 _rescued_data IS NOT NULL 기대값 위반 수에서 NULL이 아닌 모든 값
느린 파일 검색 durationMs.latestOffset 기준선보다 훨씬 높음

일반적인 문제 해결

다음 표에서는 일반적인 자동 로더 파이프라인 문제, 가능한 원인 및 이를 해결하기 위한 권장 작업에 대해 설명합니다.

Issue 가능한 원인 권장 작업
백로그가 처리보다 빠르게 증가 부족한 컴퓨팅 리소스, 데이터 편향 또는 스로틀링된 요청 한도 컴퓨팅 규모를 조정하고, Spark UI에서 스큐를 확인하고, 배치 크기를 제어하기 위해 maxFilesPerTrigger 설정을 검토합니다
파일이 검색되지 않음 잘못 구성된 파일 이벤트, 권한 문제 또는 7일 이내에 실행되지 않은 스트림 외부 위치 권한을 확인하고, Unity 카탈로그 UI에서 파일 이벤트 설정을 확인하고, RocksDB 상태 만료를 방지하기 위해 적어도 7일마다 스트림이 실행되는지 확인합니다.
스트림 시작 시간이 너무 오래 걸립니다. 대규모 체크포인트 상태 다운로드 (RocksDB) 비동기 상태 로드를 위해 Databricks Runtime 15.3 이상으로 업그레이드하면 시작 시간이 최대 90% 단축됩니다.
중복된 파일 처리 과도한 cloudFiles.maxFileAge 설정 또는 체크포인트 손상 보수적 maxFileAge (최소 90일 이상) 사용, 검사점 무결성 확인, 검사점 스토리지의 수명 주기 정책 방지
파이프라인 재시작을 유발하는 스키마 변경 자주 또는 호환되지 않는 스키마 변경 schemaEvolutionMode을 검토하거나, 형식 승격을 위해 addNewColumnsWithTypeWidening로 전환하거나, 매우 동적인 스키마에는 Variant 유형을 사용하세요.
싱크에 누적된 손상된 데이터 원본 데이터 품질 문제 _corrupt_record 격리 싱크에서 패턴을 확인하고, 원본 데이터 생성을 검토하고, 업스트림 유효성 검사 추가를 고려하세요.
discovery_timecommit_time 입력되지 않음 cleanSource 없이 18.2 미만의 Databricks Runtime에서 실행 중 Databricks Runtime 18.2 이상으로 업그레이드하거나 Databricks Runtime 16.4–18.1에서 사용하도록 설정 cloudFiles.cleanSource

추가 문제 해결은 자동 로더 FAQ를 참조하세요.