워터마크를 적용하여 데이터 처리 임계값 제어

이 페이지에서는 워터마크 개념을 설명하고 일반적인 상태 저장 스트리밍 작업에서 워터마크를 사용하기 위한 권장 사항을 제공합니다.

스트리밍 쿼리는 시간이 지남에 따라 상태 데이터를 누적합니다. 워터마크는 메모리 오류 및 처리 대기 시간 증가를 방지하기 위해 이전 상태 데이터를 자동으로 제거합니다.

워터마크란?

처리하는 동안 구조적 스트리밍은 마이크로 일괄 처리에서 상태를 유지합니다. 스트리밍 쿼리는 각 마이크로 일괄 처리 후에 모든 항목을 다시 계산하는 대신 상태를 사용하여 결과를 증분 방식으로 업데이트합니다. 워터마크는 쿼리가 상태 엔터티 처리를 중지하는 경우에 대한 임계값을 제어합니다.

상태 엔터티의 일반적인 예는 다음과 같습니다.

  • 시간 창에 대한 집계입니다.
  • 두 스트림을 조인할 때 생기는 고유 키입니다.

스트리밍 데이터 프레임에서 워터마크를 선언하려면 타임스탬프 필드와 대기 시간 임계값을 지정합니다. 새 데이터가 도착하면 상태 관리자는 지정된 필드에서 가장 최근 타임스탬프를 추적하고 대기 시간 임계값 내의 레코드만 처리합니다.

쿼리는 항상 임계값 내에 도착하는 레코드를 처리합니다. 쿼리는 여전히 임계값을 벗어나는 레코드를 처리할 수 있지만 이것이 보장되지는 않습니다.

다음 예제에서는 창 개수에 10분 워터마크 임계값을 적용합니다.

Python

from pyspark.sql.functions import window

(df
  .withWatermark("event_time", "10 minutes")
  .groupBy(
    window("event_time", "5 minutes"),
    "id")
  .count()
)

Scala

import org.apache.spark.sql.functions.window

df
  .withWatermark("event_time", "10 minutes")
  .groupBy(
    window($"event_time", "5 minutes"),
    $"id")
  .count()

이 예에서는 다음이 적용됩니다.

  • event_time 열은 10분 워터마크와 5분 연속 창을 정의하는 데 사용됩니다.
  • 겹치지 않는 각 5분 동안 관찰되는 각 id 창에 대해 개수가 수집됩니다.
  • 상태 정보는 각 카운트별로 윈도우의 끝 시점이 가장 최근에 관측된 event_time보다 10분 더 이전이 될 때까지 유지됩니다.

중요

groupBy()window() 작업에서는 이벤트 시간 마커가 유지되도록 열을 이름, "<colName>" 또는 col("<colName>")로 참조하세요. Scala에서도 $colName를 사용할 수 있습니다.

워터마크는 처리 시간 및 처리량에 어떤 영향을 주나요?

출력 모드는 워터마크가 있는 쿼리가 싱크에 데이터를 쓰는 시기를 제어합니다. 워터마크는 메모리의 총 상태 정보 양을 줄이기 때문에 상태 저장 스트리밍의 처리량 제어에 필수적입니다. 모든 상태 저장 작업에 대해 모든 출력 모드가 지원되는 것은 아닙니다. 윈도우 집계의 워터마크 및 출력 모드를 참조하세요.

워터마크 기간을 선택하면 장단점이 있습니다.

  • 워터마크가 짧을수록 쿼리는 상태 정보를 적게 저장하고 각 워터마크 기간이 완료된 후에 결과를 작성하기 때문에 쿼리 대기 시간이 줄어듭니다. 그러나 짧은 워터마크는 늦은 데이터에 대한 허용 오차가 낮습니다.
  • 워터마크가 길면 늦은 데이터에 대한 허용 오차가 높습니다. 그러나 긴 워터마크는 쿼리가 더 많은 상태 정보를 저장하고 워터마크 기간이 길어지면 결과를 작성할 때까지 기다려야 하기 때문에 쿼리 대기 시간이 증가합니다.

윈도우 기반 집계에 대한 워터마크 및 출력 모드

다음 표에서는 타임스탬프 및 워터마크에 집계가 있는 쿼리에 대한 처리 동작을 보여 줍니다.

출력 모드 동작
추가 쿼리는 워터마크 임계값을 통과한 후 대상 테이블에 행을 씁니다. 모든 쓰기는 대기 시간 임계값에 따라 지연됩니다. 임계값을 통과한 후 이전 집계 상태가 삭제됩니다.
업데이트 쿼리는 결과가 계산되면 대상 테이블에 행을 쓰고, 쿼리는 새 데이터가 도착할 때 행을 업데이트하고 덮어쓸 수 있습니다. 임계값을 통과한 후 이전 집계 상태가 삭제됩니다.
완료 집계 상태는 삭제되지 않습니다. 쿼리는 각 트리거에 대한 대상 테이블을 다시 작성합니다.

스트림-스트림 조인의 워터마크 및 출력 모드

여러 스트림 간의 조인은 추가 모드만 지원합니다. 쿼리는 각 일괄 처리에 대해 일치하는 레코드를 씁니다.

내부 조인의 경우 Databricks는 쿼리가 이전 레코드에 대한 상태 정보를 삭제할 수 있도록 각 스트리밍 데이터 원본에 워터마크 임계값을 설정하는 것이 좋습니다. 워터마크가 없으면 구조적 스트리밍은 각 트리거에서 조인의 양쪽에서 모든 키를 조인하려고 시도하므로 성능에 영향을 줄 수 있습니다.

외부 조인의 경우 워터마크가 필수입니다. 레코드가 일치하지 않으면 쿼리는 해당 키에 대해 null 값을 씁니다. 조인은 추가 모드만 지원하므로 지연 임계값이 지날 때까지 일치하지 않는 레코드는 기록되지 않습니다.

여러 워터마크 정책을 사용하여 지연 데이터 임계값 제어

여러 구조적 스트리밍 입력의 경우 늦은 데이터에 대한 허용 오차 임계값을 제어하도록 여러 워터마크를 설정할 수 있습니다. 워터마크를 사용하면 상태 정보 및 대기 시간을 제어할 수 있습니다.

스트리밍 쿼리에 여러 입력 스트림이 통합되거나 함께 조인되어 있을 수 있습니다. 상태 저장 작업의 경우 각 입력 스트림에 늦은 데이터 허용 오차에 대해 다른 임계값이 필요할 수 있습니다. 각 입력 스트림에서 이러한 임계값을 지정 withWatermark("eventTime", delay) 합니다. 다음은 스트림-스트림 조인을 사용하는 예제 쿼리입니다.

Python

input_stream1 = ...      # delays up to 1 hour
input_stream2 = ...      # delays up to 2 hours

(input_stream1.withWatermark("eventTime1", "1 hour")
  .join(
    input_stream2.withWatermark("eventTime2", "2 hours"),
    joinCondition)
)

Scala

val inputStream1 = ...      // delays up to 1 hour
val inputStream2 = ...      // delays up to 2 hours

inputStream1.withWatermark("eventTime1", "1 hour")
  .join(
    inputStream2.withWatermark("eventTime2", "2 hours"),
    joinCondition)

상태 저장 작업을 사용하여 쿼리를 실행하는 동안 구조적 스트리밍은 각 입력 스트림의 최대 이벤트 시간을 개별적으로 추적하고, 해당 지연에 따라 워터마크를 계산하고, 단일 전역 워터마크를 결정합니다. 기본적으로 구조적 스트리밍은 최소값을 전역 워터마크로 사용합니다. 스트림 하나가 다른 스트림보다 뒤처지는 경우, 최소 전역 워터마크가 쿼리가 데이터를 실수로 지연된 데이터로 표시하는 것을 방지합니다. 예를 들어 스트림 중 하나가 업스트림 오류로 인해 데이터 수신을 중지할 때 발생할 수 있습니다. 전역 워터마크는 가장 느린 스트림의 속도로 안전하게 이동하고 필요한 경우 쿼리 출력을 지연합니다.

대기 시간을 줄이려면 가장 빠른 스트림의 워터마크를 전역 워터마크로 사용하려면 (기본값은spark.sql.streaming.multipleWatermarkPolicy)으로 설정합니다.maxmin 그러나 이 구성은 가장 느린 스트림에서 데이터를 삭제합니다. Databricks는 이 구성을 신중하게 적용하는 것이 좋습니다.

고유 작업에 워터마크 적용

distinct 작업은 상태 내의 모든 고유 레코드를 추적합니다. 워터마크가 없으면 상태가 무기한 증가하여 메모리 문제가 발생할 수 있습니다. 상태 크기를 제한하고 임계값이 지난 후 오래된 레코드를 제거할 수 있도록 타임스탬프 필드에 워터마크를 지정합니다.

다음 예제에서는 distinct 작업에 워터마크를 적용합니다.

Python

streamingDf = spark.readStream. ...  # columns: eventTime, id, value, ...

# Apply watermark before distinct operation
(streamingDf
  .withWatermark("eventTime", "1 hour")
  .distinct()
)

Scala

val streamingDf = spark.readStream. ...  // columns: eventTime, id, value, ...

// Apply watermark before distinct operation
streamingDf
  .withWatermark("eventTime", "1 hour")
  .distinct()

이 예제에서 스트리밍 쿼리는 관찰된 최신 레코드의 1시간 이내에 도착하는 중복 레코드를 제거합니다 eventTime. 쿼리는 임계값을 통과한 후 중복 제거에 대한 상태 정보를 삭제합니다.

중요

모든 열 대신 특정 열의 중복을 제거하려면 distinct 대신 dropDuplicates() 또는 dropDuplicatesWithinWatermark()을 사용하세요. 워터마크 내에서 중복 항목 삭제를 참조하세요.

워터마크 내에 중복 항목 삭제

Databricks Runtime 13.3 LTS 이상에서는 고유 식별자를 사용하여 워터마크 임계값 내에서 레코드를 중복 제거할 수 있습니다.

구조적 스트리밍은 정확히 한 번 처리를 보장하지만 데이터 원본에서 레코드를 중복 제거하지는 않습니다. 이벤트 시간 또는 도착 시간과 같은 중복 레코드 간에 필드가 다른 경우에도 모든 필드에서 중복 항목을 제거하는 데 사용합니다 dropDuplicatesWithinWatermark .

dropDuplicatesWithinWatermark경우 쿼리는 항상 워터마크 임계값 내에 도착하는 레코드를 중복 제거합니다. 쿼리는 임계값을 벗어나는 레코드를 중복 제거할 수도 있지만 이것이 보장되지는 않습니다. 쿼리가 모든 중복 항목을 삭제하도록 하려면 워터마크 임계값을 중복 이벤트 간의 최대 타임스탬프 차이보다 크게 설정합니다.

메서드를 사용하려면 워터마크를 지정해야 합니다.dropDuplicatesWithinWatermark

Python

streamingDf = spark.readStream. ...

# deduplicate using guid column with watermark based on eventTime column
(streamingDf
  .withWatermark("eventTime", "10 hours")
  .dropDuplicatesWithinWatermark(["guid"])
)

Scala

val streamingDf = spark.readStream. ...  // columns: guid, eventTime, ...

// deduplicate using guid column with watermark based on eventTime column
streamingDf
  .withWatermark("eventTime", "10 hours")
  .dropDuplicatesWithinWatermark(Seq("guid"))

사용 사례 예제

다음 예제에서는 고급 창 사용 사례를 보여 줍니다.

텀블링 윈도우를 사용하여 시간별 판매 합계 계산

텀블링 윈도우는 겹치지 않는 간격을 갖는 고정 크기 윈도우입니다. 각 입력 행은 정확히 하나의 창에 속합니다. 연속 창을 사용하여 시간별 판매 합계와 같은 개별 기간 집계를 계산합니다.

Python

from pyspark.sql.functions import window, sum

hourly_sales = (orders
  .withWatermark("timestamp", "1 hour")
  .groupBy(window("timestamp", "1 hour"))
  .agg(sum("amount").alias("total_sales"))
)

Scala

import org.apache.spark.sql.functions.{window, sum}

val hourlySales = orders
  .withWatermark("timestamp", "1 hour")
  .groupBy(window($"timestamp", "1 hour"))
  .agg(sum($"amount").alias("total_sales"))

이 예에서는 다음이 적용됩니다.

  • window("timestamp", "1 hour") 는 겹치지 않는 1시간 간격(예: 오전 5~6시, 오전 6시~오전 7시)으로 주문을 그룹화합니다.
  • withWatermark("timestamp", "1 hour") 는 창 끝 타임스탬프가 최대 주문 타임스탬프보다 1시간 오래 될 때까지 각 창의 집계 상태를 유지합니다.

슬라이딩 윈도우를 사용하여 이동 집계 계산하기

슬라이딩 윈도우는 크기가 고정되어 있으며, 간격은 서로 겹칠 수 있습니다. 단일 행은 여러 창에 속할 수 있습니다. 슬라이딩 윈도우를 사용하여 최근 6시간 동안의 매출과 같은 이동 집계를 계산합니다:

Python

from pyspark.sql.functions import window, sum

rolling_sales = (orders
  .withWatermark("timestamp", "1 hour")
  .groupBy(window("timestamp", "6 hours", slideDuration="1 hour"))
  .agg(sum("amount").alias("total_sales"))
)

Scala

import org.apache.spark.sql.functions.{window, sum}

val rollingSales = orders
  .withWatermark("timestamp", "1 hour")
  .groupBy(window($"timestamp", "6 hours", "1 hour"))
  .agg(sum($"amount").alias("total_sales"))

이 예에서는 다음이 적용됩니다.

  • window("timestamp", "6 hours", slideDuration="1 hour") 는 1시간씩 진행되는 6시간 간격(예: 오전 5~11시, 오전 6시~오후 12시)으로 주문을 그룹화합니다.
  • withWatermark("timestamp", "1 hour") 는 창 끝 타임스탬프가 최대 주문 타임스탬프보다 1시간 오래 될 때까지 각 창의 집계 상태를 유지합니다.
  • slideDurationwindowDuration보다 작거나 같아야 합니다.

세션 창을 사용하여 사용자 활동 확인

세션 창의 크기는 고정되지 않습니다. 창은 행이 도착하면 열리고, 새 행이 없는 간격 기간이 지나면 닫힙니다. 세션 창을 사용하여 30분 기간 내에 사용자의 페이지 보기와 같이 긴 유휴 기간 간에 활동 버스트를 집계합니다.

Python

from pyspark.sql.functions import session_window, sum

sessionized_page_views = (activity
  .withWatermark("timestamp", "1 hour")
  .groupBy("user_id", session_window("timestamp", gapDuration="30 minutes"))
  .agg(sum("page_views").alias("total_page_views"))
)

Scala

import org.apache.spark.sql.functions.{session_window, sum}

val sessionizedPageViews = activity
  .withWatermark("timestamp", "1 hour")
  .groupBy($"user_id", session_window($"timestamp", "30 minutes"))
  .agg(sum($"page_views").alias("total_page_views"))

이 예에서는 다음이 적용됩니다.

  • session_window("timestamp", gapDuration="30 minutes") 는 첫 번째 페이지 보기가 도착하면 창을 엽니다. 30분 이내에 도착하는 각 후속 페이지 보기는 창을 확장합니다. 30분 내에 페이지 보기가 도착하지 않으면 창이 닫히고 다음 페이지 보기가 새 창을 시작합니다.
  • withWatermark("timestamp", "1 hour") 창 끝 타임스탬프가 최대 페이지 보기 타임스탬프보다 1시간 오래 될 때까지 각 세션의 집계를 상태로 유지합니다.
  • window()session_window()에 대한 timeColumn 인수는 TimestampType 또는 TimestampNTZType여야 합니다.
  • 이벤트 시간이 아닌 처리 시간에 따라 창을 정의하는 데 사용합니다 current_timestamp() .
  • 창 기간을 마이크로초에서 최대 일로 설정할 수 있습니다. 월 기간 이상은 지원되지 않습니다.
  • 모든 윈도우 상태를 무기한 유지하려면 윈도우 집계와 함께 complete 출력 모드를 사용하세요. 적절한 워터마크와 함께 출력 모드를 사용하여 append 상태 증가를 제한하고 큰 데이터 집합에 대한 메모리 문제를 방지합니다. 출력 모드 동작에 대한 자세한 내용은 창 집계 에 대한 워터마크 및 출력 모드를 참조하세요.