Ompartitionering av tillstånd på begäran för tillståndskänsliga strömningsfrågor

Med ompartitionering av tillstånd på begäran kan du ändra storlek på antalet partitioner för en tillståndskänslig fråga för strukturerad direktuppspelning utan att förlora kontrollpunktstillståndet.

Utan ompartitionering av tillstånd vid behov anger du antalet shuffle-partitioner vid skapandet av kontrollpunkten. Om du ändrar spark.sql.shuffle.partitionsignorerar frågor med befintliga kontrollpunkter det nya värdet. Om du tillämpar ett nytt partitionsantal måste du starta om frågan med en ny kontrollpunkt.

Ompartitionering av status vid behov har följande fördelar:

  • Justera frågor genom att ändra storlek på antalet partitioner utan att återskapa kontrollpunkten.
  • Skala upp eller ned frågor för att matcha arbetsbelastningsändringar.

Requirements

Ändra antalet partitioner

Använd Spark-konfigurationen spark.sql.streaming.stateStore.partitions och starta om frågan för att ändra antalet partitioner för shuffle- och streamingtillstånd:

Python

query.stop()
spark.conf.set("spark.sql.streaming.stateStore.partitions", "<numPartitions>")
query = df.writeStream.start()

Scala

query.stop()
spark.conf.set("spark.sql.streaming.stateStore.partitions", "<numPartitions>")
val query = df.writeStream.start()

För tillståndskänsliga frågor har spark.sql.streaming.stateStore.partitions företräde framför spark.sql.shuffle.partitions. När frågan har startats om och den senaste planerade mikrobatchen har slutförts kör frågan en ompartitioneringsåtgärd för att omdistribuera tillståndsdata till det nya antalet partitioner. När ompartitioneringen har slutförts återupptas bearbetningen av frågan.

Övervaka ompartitionstillstånd

När nästa mikrobatch har slutförts inkluderar StreamingQueryProgress händelserna varaktigheten för ompartitioneringen. I en händelses durationMs mått controlBatch.REPARTITION visar varaktighetsvärdet i millisekunder. Större tillståndsstorlekar kan öka tiden det tar att ompartitionera. Se Övervaka frågor om strukturerad direktuppspelning på Azure Databricks.

Example

I följande exempel skalas en fråga ned från 200, standardvärdet, till 100 shuffle-partitioner. Stoppa frågan, ange det nya antalet partitioner och starta om:

Python

# Start the query with the default partition count (200)
query = (df
  .withWatermark("event_time", "10 minutes")
  .groupBy(
    window("event_time", "5 minutes"),
    "id")
  .count()
  .writeStream
  .format("delta")
  .option("checkpointLocation", "/checkpoint/path")
  .outputMode("append")
  .start()
)

# Stop the query and scale down to 100 partitions
query.stop()

spark.conf.set("spark.sql.streaming.stateStore.partitions", "100")

# Restart the query with the same options
query = (df
  .withWatermark("event_time", "10 minutes")
  .groupBy(
    window("event_time", "5 minutes"),
    "id")
  .count()
  .writeStream
  .format("delta")
  .option("checkpointLocation", "/checkpoint/path")
  .outputMode("append")
  .start()
)

Scala

// Start the query with the default partition count (200)
val query = df
  .withWatermark("event_time", "10 minutes")
  .groupBy(
    window($"event_time", "5 minutes"),
    $"id")
  .count()
  .writeStream
  .format("delta")
  .option("checkpointLocation", "/checkpoint/path")
  .outputMode("append")
  .start()

// Stop the query and scale down to 100 partitions
query.stop()

spark.conf.set("spark.sql.streaming.stateStore.partitions", "100")

// Restart the query with the same options
val query2 = df
  .withWatermark("event_time", "10 minutes")
  .groupBy(
    window($"event_time", "5 minutes"),
    $"id")
  .count()
  .writeStream
  .format("delta")
  .option("checkpointLocation", "/checkpoint/path")
  .outputMode("append")
  .start()