Kommentar
Åtkomst till den här sidan kräver auktorisering. Du kan prova att logga in eller ändra kataloger.
Åtkomst till den här sidan kräver auktorisering. Du kan prova att ändra kataloger.
Med ompartitionering av tillstånd på begäran kan du ändra storlek på antalet partitioner för en tillståndskänslig fråga för strukturerad direktuppspelning utan att förlora kontrollpunktstillståndet.
Utan ompartitionering av tillstånd vid behov anger du antalet shuffle-partitioner vid skapandet av kontrollpunkten. Om du ändrar spark.sql.shuffle.partitionsignorerar frågor med befintliga kontrollpunkter det nya värdet. Om du tillämpar ett nytt partitionsantal måste du starta om frågan med en ny kontrollpunkt.
Ompartitionering av status vid behov har följande fördelar:
- Justera frågor genom att ändra storlek på antalet partitioner utan att återskapa kontrollpunkten.
- Skala upp eller ned frågor för att matcha arbetsbelastningsändringar.
Requirements
- Databricks Runtime 18.3 eller högre.
- Frågan måste använda providern för tillståndslagring i RocksDB. I DBR 17.3 eller senare versioner är RocksDB standardleverantör för tillståndslager. Se Konfigurera RocksDB tillståndslagring i Azure Databricks.
Ändra antalet partitioner
Använd Spark-konfigurationen spark.sql.streaming.stateStore.partitions och starta om frågan för att ändra antalet partitioner för shuffle- och streamingtillstånd:
Python
query.stop()
spark.conf.set("spark.sql.streaming.stateStore.partitions", "<numPartitions>")
query = df.writeStream.start()
Scala
query.stop()
spark.conf.set("spark.sql.streaming.stateStore.partitions", "<numPartitions>")
val query = df.writeStream.start()
För tillståndskänsliga frågor har spark.sql.streaming.stateStore.partitions företräde framför spark.sql.shuffle.partitions. När frågan har startats om och den senaste planerade mikrobatchen har slutförts kör frågan en ompartitioneringsåtgärd för att omdistribuera tillståndsdata till det nya antalet partitioner. När ompartitioneringen har slutförts återupptas bearbetningen av frågan.
Övervaka ompartitionstillstånd
När nästa mikrobatch har slutförts inkluderar StreamingQueryProgress händelserna varaktigheten för ompartitioneringen. I en händelses durationMs mått controlBatch.REPARTITION visar varaktighetsvärdet i millisekunder. Större tillståndsstorlekar kan öka tiden det tar att ompartitionera. Se Övervaka frågor om strukturerad direktuppspelning på Azure Databricks.
Example
I följande exempel skalas en fråga ned från 200, standardvärdet, till 100 shuffle-partitioner. Stoppa frågan, ange det nya antalet partitioner och starta om:
Python
# Start the query with the default partition count (200)
query = (df
.withWatermark("event_time", "10 minutes")
.groupBy(
window("event_time", "5 minutes"),
"id")
.count()
.writeStream
.format("delta")
.option("checkpointLocation", "/checkpoint/path")
.outputMode("append")
.start()
)
# Stop the query and scale down to 100 partitions
query.stop()
spark.conf.set("spark.sql.streaming.stateStore.partitions", "100")
# Restart the query with the same options
query = (df
.withWatermark("event_time", "10 minutes")
.groupBy(
window("event_time", "5 minutes"),
"id")
.count()
.writeStream
.format("delta")
.option("checkpointLocation", "/checkpoint/path")
.outputMode("append")
.start()
)
Scala
// Start the query with the default partition count (200)
val query = df
.withWatermark("event_time", "10 minutes")
.groupBy(
window($"event_time", "5 minutes"),
$"id")
.count()
.writeStream
.format("delta")
.option("checkpointLocation", "/checkpoint/path")
.outputMode("append")
.start()
// Stop the query and scale down to 100 partitions
query.stop()
spark.conf.set("spark.sql.streaming.stateStore.partitions", "100")
// Restart the query with the same options
val query2 = df
.withWatermark("event_time", "10 minutes")
.groupBy(
window($"event_time", "5 minutes"),
$"id")
.count()
.writeStream
.format("delta")
.option("checkpointLocation", "/checkpoint/path")
.outputMode("append")
.start()