실시간 모드 쿼리 성능 최적화 및 모니터링

이 페이지에서는 컴퓨팅 튜닝, 엔드투엔드 대기 시간을 줄이기 위한 기술 및 실시간 모드에서 쿼리 성능을 측정하는 방법에 대해 설명합니다.

컴퓨팅 튜닝

컴퓨팅을 구성할 때 다음을 고려합니다.

  • 마이크로 일괄 처리 모드와 달리 실시간 작업은 데이터를 기다리는 동안 유휴 상태를 유지할 수 있으므로 리소스 낭비를 방지하려면 적절한 크기 조정이 필요합니다.
  • 튜닝하여 대상 클러스터 사용률 수준(예: 50%)을 목표로 합니다.
    • maxPartitions (Kafka의 경우)
    • spark.sql.shuffle.partitions (순서 섞기 단계의 경우)
  • Databricks는 각 태스크가 오버헤드를 줄이기 위해 여러 Kafka 파티션을 처리하도록 설정하는 maxPartitions 것이 좋습니다.
  • 간단한 1단계 작업의 워크로드와 일치하도록 작업자당 작업 슬롯을 조정합니다.
  • 셔플이 많은 작업의 경우 백로그를 피하기 위해 실험을 통해 최소 셔플 파티션 수를 찾아 그에 맞추어 조정하십시오. 충분한 슬롯이 없는 경우 컴퓨팅은 작업을 예약하지 않습니다.

메모

Databricks Runtime 16.4 LTS 이상에서 모든 실시간 파이프라인은 검사점 v2를 사용하여 실시간 및 마이크로 일괄 처리 모드 간의 원활한 전환을 허용합니다.

대기 시간 최적화

구조적 스트리밍 실시간 모드에는 엔드 투 엔드 대기 시간을 줄이기 위한 선택적 기술이 있습니다. 둘 다 기본적으로 사용하도록 설정되지 않습니다. 별도로 사용하도록 설정해야 합니다.

  • 비동기 진행률 추적: 오프셋에 쓰기를 이동하고 로그를 비동기 스레드로 커밋하여 상태 비정상 쿼리에 대한 일괄 처리 간 시간을 줄입니다.
  • 비동기 상태 검사점: 상태 검사점을 기다리지 않고 계산이 완료되는 즉시 다음 마이크로 일괄 처리를 시작하여 상태 저장 쿼리의 대기 시간을 줄입니다.

모니터링 및 관찰 가능성

실시간 모드에서 기존 일괄 처리 기간 메트릭은 실제 엔드 투 엔드 대기 시간을 반영하지 않습니다. 아래 방법을 사용하여 대기 시간을 정확하게 측정하고 쿼리에서 병목 상태를 식별합니다.

엔드 투 엔드 대기 시간은 워크로드에 따라 다릅니다. 때로는 비즈니스 논리로만 정확하게 측정할 수 있습니다. 예를 들어 원본 타임스탬프가 Kafka의 출력인 경우 Kafka의 출력 타임스탬프와 원본 타임스탬프 간의 차이로 대기 시간을 계산할 수 있습니다.

기본 제공 메트릭 StreamingQueryProgress

이벤트는 StreamingQueryProgress 드라이버 로그에 자동으로 기록되고 '의 StreamingQueryListener 콜백 함수를 onQueryProgress()통해 액세스할 수 있습니다. 이렇게 하면 외부 모니터링 시스템에 메트릭을 게시하려는 경우와 같이 프로그래밍 방식으로 진행률 이벤트에 대응할 수 있습니다. QueryProgressEvent.json() 또는 toString() 이러한 실시간 모드 메트릭을 포함합니다.

  1. 처리 대기 시간 (processingLatencyMs). 실시간 모드 쿼리가 레코드를 읽는 시간과 쿼리가 레코드를 다음 단계 또는 다운스트림에 쓰는 시점 사이에 경과된 시간입니다. 단일 단계 쿼리의 경우 엔드 투 엔드 대기 시간과 동일한 기간을 측정합니다. 시스템은 작업당 이 메트릭을 보고합니다.
  2. 원본 큐 대기 시간 (sourceQueuingLatencyMs). 시스템에서 메시지 버스에 레코드를 쓰는 시간(예: Kafka의 로그 추가 시간)과 실시간 모드 쿼리가 레코드를 처음 읽는 시점 사이에 경과된 시간입니다. 시스템은 작업당 이 메트릭을 보고합니다.
  3. 엔드 투 엔드 대기 시간 (e2eLatencyMs). 시스템에서 메시지 버스에 레코드를 쓰는 시간과 실시간 모드 쿼리가 레코드 다운스트림을 쓰는 시점 사이의 시간입니다. 시스템은 모든 태스크에서 처리되는 모든 레코드에서 일괄 처리당 이 메트릭을 집계합니다.

다음은 그 예입니다.

"rtmMetrics" : {
    "processingLatencyMs" : {
      "P0" : 0,
      "P50" : 0,
      "P90" : 0,
      "P95" : 0,
      "P99" : 0
    },
    "sourceQueuingLatencyMs" : {
      "P0" : 0,
      "P50" : 1,
      "P90" : 1,
      "P95" : 2,
      "P99" : 3
    },
    "e2eLatencyMs" : {
      "P0" : 0,
      "P50" : 1,
      "P90" : 1,
      "P95" : 2,
      "P99" : 4
    }
}

Observe API를 사용하여 사용자 지정 대기 시간 측정

Observe API를 사용하면 별도의 작업을 시작하지 않고도 대기 시간을 인라인으로 측정할 수 있습니다. 원본 데이터 도착 시간을 근사하는 원본 타임스탬프가 있는 경우 싱크 전에 타임스탬프를 기록하고 차이를 계산하여 일괄 처리당 대기 시간을 예측할 수 있습니다. 결과는 진행 보고서에 표시되며 청취자에게 제공됩니다.

Python

from datetime import datetime

from pyspark.sql.functions import avg, col, lit, max, percentile_approx, udf, unix_millis
from pyspark.sql.types import TimestampType

@udf(returnType=TimestampType())
def current_timestamp():
  return datetime.now()

# Query before outputting
.withColumn("temp-timestamp", current_timestamp())
.withColumn(
  "latency",
  unix_millis(col("temp-timestamp")).cast("long") - unix_millis(col("timestamp")).cast("long"))
.observe(
  "observedLatency",
  avg(col("latency")).alias("avg"),
  max(col("latency")).alias("max"),
  percentile_approx(col("latency"), lit(0.99), lit(150)).alias("p99"),
  percentile_approx(col("latency"), lit(0.5), lit(150)).alias("p50"))
.drop(col("latency"))
.drop(col("temp-timestamp"))
# Output part of the query. For example, .WriteStream, etc.

Scala

import org.apache.spark.sql.functions.{avg, col, lit, max, percentile_approx, udf, unix_millis}

val currentTimestampUDF = udf(() => System.currentTimeMillis())

// Query before outputting
.withColumn("temp-timestamp", currentTimestampUDF())
.withColumn(
  "latency",
  col("temp-timestamp").cast("long") - unix_millis(col("timestamp")).cast("long"))
.observe(
  name = "observedLatency",
  avg(col("latency")).as("avg"),
  max(col("latency")).as("max"),
  percentile_approx(col("latency"), lit(0.99), lit(150)).as("p99"),
  percentile_approx(col("latency"), lit(0.5), lit(150)).as("p50"))
.drop(col("latency"))
.drop(col("temp-timestamp"))
// Output part of the query. For example, .WriteStream, etc.

샘플 출력:

"observedMetrics" : {
  "observedLatency" : {
    "avg" : 63.8369765176552,
    "max" : 219,
    "p99" : 154,
    "p50" : 49
  }
}

추가 리소스