이 페이지에서는 구조적 스트리밍에서 실시간 모드 쿼리를 실행하는 데 필요한 필수 구성 요소 및 구성에 대해 설명합니다. 단계별 자습서는 자습서: 실시간 스트리밍 워크로드 실행을 참조하세요. 실시간 모드에 대한 개념 정보는 구조적 스트리밍의 실시간 모드를 참조하세요.
필수 구성 요소
실시간 모드를 사용하려면 다음 요구 사항을 충족하도록 컴퓨팅을 구성해야 합니다.
- 클래식 컴퓨팅을 사용합니다. 전용 및 표준 액세스 모드가 지원됩니다. 표준 액세스 모드는 Python 경우에만 지원됩니다. Lakeflow Spark 선언적 파이프라인 및 서버리스 클러스터는 지원되지 않습니다.
- Databricks Runtime 16.4 LTS 이상을 사용합니다.
- 자동 크기 조정을 끕니다.
- Photon을 끕니다.
-
spark.databricks.streaming.realTimeMode.enabled를true로 설정합니다. - 중단을 방지하려면 스폿 인스턴스를 끕니다.
UDF를 사용하는 대기 시간에 민감한 워크로드의 경우 Databricks는 전용 액세스 모드를 사용하는 것이 좋습니다. Table 함수를 참조하세요.
클래식 컴퓨팅을 만들고 구성하는 방법에 대한 지침은 Compute 구성 참조를 참조하세요.
쿼리 구성
실시간 모드에서 쿼리를 실행하려면 실시간 트리거를 사용하도록 설정해야 합니다. 실시간 트리거는 업데이트 모드에서만 지원됩니다.
Python
query = (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("subscribe", input_topic)
.load()
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("topic", output_topic)
.option("checkpointLocation", checkpoint_location)
.outputMode("update")
# In PySpark, the realTime trigger requires specifying the interval.
.trigger(realTime="5 minutes")
.start()
)
Scala
import org.apache.spark.sql.execution.streaming.RealTimeTrigger
val readStream = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", brokerAddress)
.option("subscribe", inputTopic).load()
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", brokerAddress)
.option("topic", outputTopic)
.option("checkpointLocation", checkpointLocation)
.outputMode("update")
.trigger(RealTimeTrigger.apply())
// RealTimeTrigger can also accept an argument specifying the checkpoint interval.
// For example, this code indicates a checkpoint interval of 5 minutes:
// .trigger(RealTimeTrigger.apply("5 minutes"))
.start()
컴퓨팅 크기 조정
컴퓨팅에 충분한 작업 슬롯이 있는 경우 컴퓨팅 리소스당 하나의 실시간 작업을 실행할 수 있습니다.
대기 시간이 짧은 모드에서 실행하려면 사용 가능한 총 작업 슬롯 수가 모든 쿼리 단계의 작업 수보다 크거나 같아야 합니다.
슬롯 계산 예제
| 파이프라인 유형 | Configuration | 필수 슬롯 |
|---|---|---|
| 단일 단계 상태 비저장(Kafka 원본 + 싱크) |
maxPartitions=8 |
슬롯 8개 |
| 2단계 상태 저장(Kafka 소스 + 셔플) |
maxPartitions = 8, 파티션 순서 섞기 = 20 |
슬롯 28개(8개 + 20개) |
| 3단계(Kafka 원본 + 순서 섞기 + 다시 분할) |
maxPartitions = 8, 각 20의 두 순서 섞기 단계 |
48 슬롯(8 + 20 + 20) |
maxPartitions을 설정하지 않으면, Kafka 토픽의 파티션 수를 사용하세요.