Nota
O acesso a esta página requer autorização. Pode tentar iniciar sessão ou alterar os diretórios.
O acesso a esta página requer autorização. Pode tentar alterar os diretórios.
O reparticionamento do estado a pedido permite redimensionar o número de partições de uma consulta do Structured Streaming com estado sem perder o estado do ponto de verificação.
Sem reparticionamento de estado a pedido, defines o número de partições de shuffle durante a criação do checkpoint. Se alterar spark.sql.shuffle.partitions, as consultas com pontos de verificação existentes ignoram o novo valor. Aplicar uma nova contagem de partições exige que reinicies a consulta com um novo checkpoint.
A repartição de estados sob demanda tem os seguintes benefícios:
- Ajuste as consultas redimensionando o número de partições sem reconstruir o checkpoint.
- Escala as consultas para cima ou para baixo para corresponder às alterações da carga de trabalho.
Requirements
- Databricks Runtime 18.3 ou superior.
- A consulta deve usar o fornecedor do armazenamento de estado RocksDB. No DBR 17.3 ou posterior, o RocksDB é o provedor predefinido do armazenamento de estado. Consulte Configurar o armazenamento de estado do RocksDB no Azure Databricks.
Alterar o número de partições
Utilize a configuração do Spark spark.sql.streaming.stateStore.partitions e reinicie a consulta para alterar o número de partições de mistura e de estado do streaming:
Python
query.stop()
spark.conf.set("spark.sql.streaming.stateStore.partitions", "<numPartitions>")
query = df.writeStream.start()
Scala
query.stop()
spark.conf.set("spark.sql.streaming.stateStore.partitions", "<numPartitions>")
val query = df.writeStream.start()
Para consultas com estado, spark.sql.streaming.stateStore.partitions tem precedência sobre spark.sql.shuffle.partitions. Após o reinício da consulta e a conclusão do último microlote planeado, a consulta executa uma operação de repartição para redistribuir os dados de estado no novo número de partições. Após a conclusão da operação de repartição, a consulta retoma o processamento.
Estado de repartição do monitor
Depois de o próximo microlote estar concluído, os eventos StreamingQueryProgress passam a incluir a duração da operação de repartição. Nas métricas de durationMs um evento, controlBatch.REPARTITION mostra o valor de duração em milissegundos. Tamanhos maiores dos estados podem aumentar o tempo até à repartição. Consulte Monitorização de consultas de streaming estruturado no Azure Databricks.
Example
O exemplo seguinte reduz a escala de uma consulta de 200 partições "shuffle", o valor predefinido, para 100. Pare a consulta, defina a nova contagem de partições e reinicie:
Python
# Start the query with the default partition count (200)
query = (df
.withWatermark("event_time", "10 minutes")
.groupBy(
window("event_time", "5 minutes"),
"id")
.count()
.writeStream
.format("delta")
.option("checkpointLocation", "/checkpoint/path")
.outputMode("append")
.start()
)
# Stop the query and scale down to 100 partitions
query.stop()
spark.conf.set("spark.sql.streaming.stateStore.partitions", "100")
# Restart the query with the same options
query = (df
.withWatermark("event_time", "10 minutes")
.groupBy(
window("event_time", "5 minutes"),
"id")
.count()
.writeStream
.format("delta")
.option("checkpointLocation", "/checkpoint/path")
.outputMode("append")
.start()
)
Scala
// Start the query with the default partition count (200)
val query = df
.withWatermark("event_time", "10 minutes")
.groupBy(
window($"event_time", "5 minutes"),
$"id")
.count()
.writeStream
.format("delta")
.option("checkpointLocation", "/checkpoint/path")
.outputMode("append")
.start()
// Stop the query and scale down to 100 partitions
query.stop()
spark.conf.set("spark.sql.streaming.stateStore.partitions", "100")
// Restart the query with the same options
val query2 = df
.withWatermark("event_time", "10 minutes")
.groupBy(
window($"event_time", "5 minutes"),
$"id")
.count()
.writeStream
.format("delta")
.option("checkpointLocation", "/checkpoint/path")
.outputMode("append")
.start()