Important
Lakeflow Spark SDP(선언적 파이프라인)의 실시간 모드는 미리 보기 채널의 Databricks Runtime 18.1.3에서 공개 미리 보기 로 제공됩니다.
실시간 모드를 사용하면 매우 짧은 대기 시간 데이터 처리가 가능하며, 엔드 투 엔드 대기 시간은 5밀리초로 낮습니다. 사기 감지 및 실시간 개인 설정과 같은 스트리밍 데이터에 대한 즉각적인 대응이 필요한 운영 워크로드에 실시간 모드를 사용합니다.
실시간 모드는 파이프라인 외부의 구조적 스트리밍에서도 직접 사용할 수 있습니다. 구조적 스트리밍의 실시간 모드를 참조하세요.
실시간 모드에서 짧은 대기 시간을 달성하는 방법
실시간 모드는 다음 세 가지 주요 방법으로 표준 연속 처리와 다릅니다.
- 장기 실행 일괄 처리: 시스템은 장기 실행 일괄 처리 내에서 원본에서 사용할 수 있게 되면 데이터를 처리합니다(기본값은 5분).
- 동시 스테이지 예약: 모든 쿼리 단계가 동시에 예약됩니다. 컴퓨팅 리소스에는 모든 단계를 동시에 처리할 수 있는 충분한 작업 슬롯이 있어야 합니다. 컴퓨팅 크기 조정을 참조하세요.
- 스트리밍 순서 섞기: 다운스트림 스테이지를 시작하기 전에 업스트림 단계가 완료될 때까지 기다리지 않고 생성되는 즉시 단계 간에 데이터가 전달됩니다.
체크포인트 간격(pipelines.trigger.interval를 통해 구성됨)은 상태와 소스 오프셋을 내구성 스토리지에 얼마나 자주 저장할지를 제어합니다. 간격이 길어지면 검사점 오버헤드가 줄어들지만 실패 후 복구 시간이 늘어나고 메트릭 보고가 지연됩니다. 간격이 짧을 경우 내구성이 향상되지만 오버헤드가 추가됩니다.
실시간 모드 및 연속 파이프라인
실시간 모드는 특수한 유형의 연속 트리거입니다. 연속 모드는 여전히 필요합니다. 실시간 모드는 흐름 수준 대기 시간 최적화를 추가합니다. 실시간 모드를 사용하려면 파이프라인이 먼저 연속 모드에서 실행되어야 합니다. 그런 다음 실시간 모드는 흐름 수준에서 추가 최적화를 적용하여 표준 연속 처리에서 제공하는 것 이상으로 초 미만의 대기 시간을 달성합니다.
실시간 모드를 사용하도록 설정하려면 다음 세 가지 구성 단계가 필요합니다.
- 파이프라인을 연속 모드로 설정합니다.
- 파이프라인 수준에서 실시간 모드를 사용하도록 설정합니다.
- 실시간 업데이트 흐름을 정의합니다.
Requirements
| Requirement | Value |
|---|---|
| Databricks Runtime | SDP 미리 보기 채널의 18.1.3 |
| 컴퓨팅 유형 | 클래식 컴퓨팅 또는 서버리스 |
실시간 모드 구성
1단계: 파이프라인을 연속 모드로 설정
파이프라인 설정에서 파이프라인 모드 를 연속으로 설정하거나 파이프라인 JSON에서 설정합니다.
{
"continuous": true
}
2단계: 파이프라인 수준에서 실시간 모드 사용
파이프라인 설정에서 고급 > Spark 구성 아래의 Spark 구성에 다음 키를 추가합니다.
spark.databricks.streaming.realTimeMode.enabled = true
파이프라인 JSON에서 이를 설정할 수도 있습니다.
{
"continuous": true,
"spark_conf": {
"spark.databricks.streaming.realTimeMode.enabled": "true"
}
}
3단계: 실시간 업데이트 흐름 정의
실시간 모드에는 업데이트 흐름이 필요합니다.
dp.create_sink()를 사용하여 출력 대상을 정의한 다음, @dp.update_flow를 pipelines.trigger로 설정하고 "RealTime"가 싱크를 가리키도록 target 데코레이터를 사용합니다.
from pyspark import pipelines as dp
# Define the output sink
dp.create_sink(
"my_kafka_sink",
"kafka",
{
"kafka.bootstrap.servers": "<bootstrap-servers>",
"topic": "<output-topic>",
}
)
# Define the real-time update flow targeting the sink
@dp.update_flow(
name="my_rtm_flow",
target="my_kafka_sink",
spark_conf={
"pipelines.trigger": "RealTime",
"pipelines.trigger.interval": "5 minutes", # optional; defaults to 5 minutes
}
)
def my_real_time_flow():
return (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "<bootstrap-servers>")
.option("subscribe", "<input-topic>")
.load()
)
흐름 수준 구성 매개 변수:
| 매개 변수 | 필수 | 기본값 | Description |
|---|---|---|---|
pipelines.trigger |
Yes | — | 이 흐름에 대해 실시간 모드를 사용하려면 "RealTime"(으)로 설정하세요. |
pipelines.trigger.interval |
No | "5 minutes" |
체크포인트 간격 상태 및 오프셋이 커밋되는 빈도를 제어합니다. 값이 짧을 경우 복구 가능성이 향상됩니다. 값이 길면 오버헤드가 줄어듭니다. |
코드 예제
Kafka에서 Kafka로
Kafka 토픽에서 읽고 Kafka 출력 대상에 씁니다.
from pyspark import pipelines as dp
dp.create_sink("kafka_output_sink", "kafka", {
"kafka.bootstrap.servers": broker_address,
"topic": output_topic,
})
@dp.update_flow(
name="kafka_rtm_flow",
target="kafka_output_sink",
spark_conf={
"pipelines.trigger": "RealTime",
"pipelines.trigger.interval": "5 minutes",
}
)
def kafka_rtm_flow():
return (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("subscribe", input_topic)
.option("startingOffsets", "latest")
.load()
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)", "timestamp")
)
브로드캐스트 조인으로 강화
Kafka 스트림을 정적 조회 테이블에 조인합니다. 브로드캐스트(스트림-정적) 조인만 지원됩니다. 스트림-스트림 조인은 실시간 모드에서 지원되지 않습니다.
from pyspark import pipelines as dp
from pyspark.sql.functions import broadcast, expr
dp.create_sink("enriched_output_sink", "kafka", {
"kafka.bootstrap.servers": broker_address,
"topic": enriched_output_topic,
})
@dp.update_flow(
name="enriched_events_flow",
target="enriched_output_sink",
spark_conf={
"pipelines.trigger": "RealTime",
"pipelines.trigger.interval": "5 minutes",
}
)
def enriched_events():
lookup = spark.read.table("catalog.schema.lookup_table")
return (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("subscribe", input_topic)
.load()
.withColumn("event_key", expr("CAST(value AS STRING)"))
.join(broadcast(lookup), expr("event_key = lookup_key"))
.select("event_key", "lookup_value", "timestamp")
)
집계
상태 저장형 groupBy를 사용하여 키별 이벤트 수를 계산합니다. 상태 저장 작업의 입력 파티션 수와 일치하도록 설정합니다 spark.sql.shuffle.partitions .
from pyspark import pipelines as dp
from pyspark.sql.functions import col
dp.create_sink("event_counts_sink", "kafka", {
"kafka.bootstrap.servers": broker_address,
"topic": output_topic,
})
@dp.update_flow(
name="event_counts_flow",
target="event_counts_sink",
spark_conf={
"pipelines.trigger": "RealTime",
"pipelines.trigger.interval": "5 minutes",
"spark.sql.shuffle.partitions": "8",
}
)
def event_counts():
return (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("subscribe", input_topic)
.load()
.selectExpr("CAST(key AS STRING) AS event_type", "timestamp")
.groupBy(col("event_type"))
.count()
)
지원되는 원본 및 싱크
| Connector | 출처에 따라 | 싱크로 | Notes |
|---|---|---|---|
| Apache Kafka | ✓ | ✓ | — |
| AWS MSK | ✓ | ✓ | Kafka 호환 인터페이스를 사용합니다. |
| Azure Event Hubs(Kafka 커넥터) | ✓ | ✓ | Kafka 호환 인터페이스를 사용합니다. |
| Amazon Kinesis | ✓ | 지원되지 않음 | EFO(향상된 팬아웃) 모드에만 사용하십시오. |
| 델타 | 지원되지 않음 | 지원되지 않음 | — |
컴퓨팅 크기 조정
컴퓨팅에 충분한 작업 슬롯이 있는 경우 컴퓨팅 리소스당 하나의 실시간 파이프라인을 실행할 수 있습니다. 사용 가능한 작업 슬롯은 모든 쿼리 단계에서 모든 작업을 포함해야 합니다.
| 파이프라인 유형 | Configuration | 필수 작업 슬롯 |
|---|---|---|
| 단일 단계 상태 비저장(Kafka 원본 + 싱크) |
maxPartitions=8 |
8 |
| 2단계 상태 저장(Kafka 소스 + 셔플) |
maxPartitions = 8, 파티션 순서 섞기 = 20 |
28 (8 + 20) |
| 3단계(Kafka 원본 + 2개의 순서 섞기) |
maxPartitions = 8, 각 20의 두 순서 섞기 단계 |
48 (8 + 20 + 20) |
maxPartitions을 설정하지 않으면, Kafka 토픽의 파티션 수를 사용하세요.
운영자 지원
| 카테고리 | Operator | 지원됨 |
|---|---|---|
| 상태 비저장 | 선택, 투영 | ✓ |
| UDFs | Scala 사용자 정의 함수 (UDF) | ✓ (제한 사항 포함) |
| UDFs | Python 사용자 정의 함수 (UDF) | ✓ (제한 사항 포함) |
| 집계 | 합계, 개수, 최대값, 최소값, 평균 | ✓ |
| Windowing | 회전, 슬라이딩 | ✓ |
| Windowing | Session | 지원되지 않음 |
| Deduplication | dropDuplicates |
✓ (바인딩되지 않은 상태) |
| Deduplication | dropDuplicatesWithinWatermark |
지원되지 않음 |
| Joins | 브로드캐스트 테이블 조인 | ✓ |
| Joins | 스트림 간 조인 | 지원되지 않음 |
| 관습 | transformWithState |
✓ (동작 차이 포함) |
| 관습 | union |
✓ (제한 사항 포함) |
| 관습 | forEach |
지원되지 않음 |
| 관습 | flatMapGroupsWithState |
지원되지 않음 |
| 관습 | mapPartitions |
지원되지 않음 |
| 관습 | forEachBatch |
지원되지 않음 |
transformWithState 실시간 모드에서
transformWithState 는 마이크로 일괄 처리와는 다음과 같은 차이점이 있는 실시간 모드에서 지원됩니다.
-
handleInputRows는 배치당 각 키마다 한 번이 아니라 각 행마다 한 번 호출됩니다.inputRows반복기는 호출당 단일 값을 생성합니다. - 이벤트 시간 타이머는 지원되지 않습니다. 데이터가 도착하지 않은 경우 장기 실행 일괄 처리가 종료되면 처리 시간 타이머가 발생합니다.
-
transformWithStateInPandas은 지원되지 않습니다.
실시간 모드에서의 Pandas UDF
pandas UDF의 지연 시간을 최소화하려면 spark.sql.execution.arrow.maxRecordsPerBatch를 1로 설정합니다. 처리량을 희생하면서 대기 시간을 최적화합니다. 처리량도 중요한 경우 이 값을 100 이상으로 설정합니다.
실시간 모드 성능 모니터링
실시간 모드는 StreamingQueryProgress 필드 아래의 latencies에서 지연 시간 메트릭을 표시합니다.
StreamingQueryListener를 통해 또는 스트리밍 쿼리의 lastProgress 속성을 검사하여 이러한 메트릭에 액세스합니다.
| 메트릭 | Description |
|---|---|
processingLatencyMs |
흐름에서 레코드를 읽는 시간과 흐름에 의해 레코드가 완전히 처리되는 시점 사이의 시간 |
sourceQueuingLatencyMs |
레코드가 메시지 버스에 성공적으로 기록된 시간(예: Kafka의 로그 추가 시간)과 흐름에서 처음 읽은 시간 사이의 시간 |
e2eLatencyMs |
레코드가 원본에서 생성되는 시점부터 흐름에 의해 완전히 처리되는 시점까지의 총 엔드 투 엔드 대기 시간 |
각 메트릭은 p50, p90, p95 및 p99 백분위수로 보고됩니다.
Limitations
파이프라인당 하나의 실시간 흐름이 권장됩니다. 여러 흐름이 허용되지만 흐름 간의 작업 슬롯 경합은 대기 시간을 증가합니다.
운영자 및 원본 제한 사항의 전체 목록은 실시간 모드 제한을 참조하세요.