온디맨드 상태 재파티셔닝을 사용하면 체크포인트 상태를 잃지 않고 상태 저장 Structured Streaming 쿼리의 파티션 수를 조정할 수 있습니다.
온디맨드 상태 재파티셔닝이 없으면 체크포인트를 생성할 때 셔플 파티션 수를 설정해야 합니다. 변경 spark.sql.shuffle.partitions하면 기존 검사점이 있는 쿼리는 새 값을 무시합니다. 새 파티션 수를 적용하려면 새 검사점을 사용하여 쿼리를 다시 시작해야 합니다.
주문형 상태 재분할에는 다음과 같은 이점이 있습니다.
- 검사점을 다시 빌드하지 않고 파티션 수를 조정하여 쿼리를 조정합니다.
- 워크로드 변경 내용에 맞게 쿼리를 확장 또는 축소합니다.
Requirements
- Databricks Runtime 18.3 버전 이상.
- 쿼리는 RocksDB 상태 저장소 공급자를 사용해야 합니다. DBR 17.3 이상에서 RocksDB는 기본 상태 저장소 공급자입니다. Azure Databricks에서 RocksDB 상태 저장소 구성을 참조하세요.
파티션 수 변경
Spark 구성 spark.sql.streaming.stateStore.partitions 을 사용하고 쿼리를 다시 시작하여 순서 섞기 및 스트리밍 상태 파티션 수를 변경합니다.
Python
query.stop()
spark.conf.set("spark.sql.streaming.stateStore.partitions", "<numPartitions>")
query = df.writeStream.start()
스칼라
query.stop()
spark.conf.set("spark.sql.streaming.stateStore.partitions", "<numPartitions>")
val query = df.writeStream.start()
상태 저장 쿼리의 경우 spark.sql.streaming.stateStore.partitions가 spark.sql.shuffle.partitions보다 우선합니다. 쿼리가 다시 시작되고 마지막으로 계획된 마이크로배치가 완료되면 쿼리는 다시 분할 작업을 실행하여 상태 데이터를 새 파티션 수로 재분배합니다. 다시 분할 작업이 완료되면 쿼리가 처리를 다시 시작합니다.
재분할 상태 모니터링
다음 마이크로배치가 완료되면 StreamingQueryProgress 이벤트에 리파티션 작업의 소요 시간이 포함됩니다. 이벤트의 durationMs 메트릭 controlBatch.REPARTITION 에서 기간 값을 밀리초 단위로 표시합니다. 상태 크기가 클수록 다시 분할하는 시간이 늘어나게 될 수 있습니다.
Azure Databricks에서 구조적 스트리밍 쿼리 모니터링을 참조하세요.
Example
다음 예제에서는 쿼리를 기본값인 200개에서 100개의 순서 섞기 파티션으로 축소합니다. 쿼리를 중지하고, 새 파티션 수를 설정하고, 다음을 다시 시작합니다.
Python
# Start the query with the default partition count (200)
query = (df
.withWatermark("event_time", "10 minutes")
.groupBy(
window("event_time", "5 minutes"),
"id")
.count()
.writeStream
.format("delta")
.option("checkpointLocation", "/checkpoint/path")
.outputMode("append")
.start()
)
# Stop the query and scale down to 100 partitions
query.stop()
spark.conf.set("spark.sql.streaming.stateStore.partitions", "100")
# Restart the query with the same options
query = (df
.withWatermark("event_time", "10 minutes")
.groupBy(
window("event_time", "5 minutes"),
"id")
.count()
.writeStream
.format("delta")
.option("checkpointLocation", "/checkpoint/path")
.outputMode("append")
.start()
)
스칼라
// Start the query with the default partition count (200)
val query = df
.withWatermark("event_time", "10 minutes")
.groupBy(
window($"event_time", "5 minutes"),
$"id")
.count()
.writeStream
.format("delta")
.option("checkpointLocation", "/checkpoint/path")
.outputMode("append")
.start()
// Stop the query and scale down to 100 partitions
query.stop()
spark.conf.set("spark.sql.streaming.stateStore.partitions", "100")
// Restart the query with the same options
val query2 = df
.withWatermark("event_time", "10 minutes")
.groupBy(
window($"event_time", "5 minutes"),
$"id")
.count()
.writeStream
.format("delta")
.option("checkpointLocation", "/checkpoint/path")
.outputMode("append")
.start()