Observação
O acesso a essa página exige autorização. Você pode tentar entrar ou alterar diretórios.
O acesso a essa página exige autorização. Você pode tentar alterar os diretórios.
Aplicável a:✅ Fabric Data Engineering and Data Science
A redução de escala eficiente é um recurso do Microsoft Fabric Spark que desvincula os dados de shuffle do Spark do ciclo de vida do executor. Em vez de manter os dados de shuffle nos discos locais do executor, o Fabric Spark faz o roteamento desses dados para o Armazenamento de Blobs do Azure (ou os migra para lá sob demanda) e permite que a Execução Adaptativa de Consultas (AQE) defina a forma da própria gravação. O resultado é uma redução mais rápida do cluster, menor custo de computação e trabalhos mais resilientes, sem alterações em suas consultas, notebooks ou pipelines.
Overview
A redução de escala eficiente se baseia em quatro capacidades que trabalham em conjunto:
| Capacidade | O que faz |
|---|---|
| RSM (Remote Shuffle Manager) | Grava e lê dados de embaralhamento no Armazenamento de Blobs do Azure em vez de nos discos locais do executor. |
| Migração embaralhada | Move os blocos de shuffle de um executor antes que ele seja desativado, em vez de descartá-los. |
| Camada de decisão | Roteamento em tempo de execução por estágio que mantém pequenos shuffles locais e descarrega grandes shuffles no armazenamento remoto. |
| Gravação de shuffle do AQE | Permite que o Adaptive Query Execution participe da fase de gravação do shuffle para que o particionamento fique correto já na primeira vez. |
Pré-requisitos
- O NEE (Mecanismo de Execução Nativa) deve estar habilitado.
- Dimensionamento automático habilitado (recomendado). A redução de escala eficiente também funciona sem escalonamento automático com as configurações do Spark abaixo.
- Runtime 1.3 (Apache Spark 3.5) ou posterior.
Como funciona
Quando o Spark processa uma consulta, ele geralmente redistribui dados entre estágios—um shuffle. Normalmente, os dados de shuffle são armazenados no disco local de cada executor, o que vincula os executores a esses dados. Eles não podem ser liberados até que todos os consumidores terminem a leitura. Esse acoplamento é o principal motivo pelo qual os clusters não conseguem reduzir a escala rapidamente e pelo qual a perda de um executor causa reexecuções de estágio custosas.
A redução eficiente de escala quebra esse acoplamento:
- Large shuffles vá diretamente para Armazenamento de Blobs do Azure por meio do Gerenciador de Embaralhamento Remoto.
- Pequenas reorganizações permanecem no disco local para maior desempenho. Se esse executor precisar ser liberado mais tarde, o Shuffle Migration moverá os blocos em segundo plano para outros nós ou para o armazenamento de fallback.
- A Camada de Decisão escolhe o caminho certo por estágio no runtime.
- AQE Shuffle Write garante que o mecanismo de gravação produza um particionamento que o AQE subsequente consome sem precisar de nova coalescência, evitando E/S desnecessária.
┌───────────────────────────┐
Query ───► │ AQE + Decision Layer │ per-stage choice
└─────────────┬─────────────┘
│
┌─────────────▼─────────────┐
│ AQE Shuffle Write │ partition-aware writer
└─────┬─────────────────┬───┘
│ │
local ▼ ▼ remote
┌────────────────────┐ ┌──────────────────┐
│ Local disk + │ │ RSM → Azure │
│ Shuffle Migration │ │ Blob Storage │
└─────────┬──────────┘ └─────────┬────────┘
│ on decommission │
▼ ▼
fallback storage Remote shuffle store
Roteamento inteligente (Camada de Decisão)
A Camada de Decisão avalia cada troca de shuffle e decide:
- Shuffles grandes → Armazenamento de Blobs do Azure. Benefício máximo de redução e tolerância a falhas.
- Pequenos shuffles → disco local. Nenhuma sobrecarga de E/S de nuvem para transferências minúsculas. Se o executor for descomissionado posteriormente, o Shuffle Migration assume.
O roteamento é automático e não requer nenhuma entrada do usuário. A granularidade recomendada é por estágio.
Principais benefícios
Custos mais baixos: pagar somente pela computação que você usa
Com uma redução de escala eficiente, os executores são liberados assim que seu trabalho é concluído. Eles não ficam mais ociosos mantendo dados de shuffle que tarefas posteriores podem eventualmente ler.
- Redução de escala mais rápida. A escala automática remove os nós imediatamente após a conclusão da tarefa.
- Menos recursos computacionais ociosos. Nenhum executor "zumbi" mantido vivo apenas para servir seu shuffle local.
- Nenhum excesso de provisionamento de disco. Grandes operações de shuffle vão para o Blob Storage em vez de exigir discos locais grandes.
- Custo de armazenamento limitado. O armazenamento de fallback é limpo automaticamente quando os blocos não são mais necessários.
Trabalhos mais resilientes
Quando os dados de shuffle ficam armazenados apenas no disco local, uma falha em um executor significa que esses dados são perdidos e o Spark deve recalculá-los. Com redução eficiente, os dados já estão no armazenamento de blobs ou migrados para lá antes que o executor desagre.
| Scenario | Sem redução eficiente | Com redução eficiente |
|---|---|---|
| Falhas do executor | Dados de shuffle perdidos; estágios reexecutados | Os dados são seguros no armazenamento; sem recomputação |
| Preempção de nó | Dados perdidos, repetições caras | Os dados sobrevivem; o trabalho continua normalmente |
| Desativação controlada | Modo aleatório desativado ao desligar | Blocos migrados para o armazenamento peer ou de fallback |
| Blips de rede durante a busca | Em cascata FetchFailedException |
As operações de leitura vêm do armazenamento, sem serem afetadas |
Isso elimina a causa mais comum de FetchFailedException em produção.
Dimensionamento mais rápido e verdadeiramente elástico
Sem uma redução eficiente da escala, o autoescalador não pode liberar um nó enquanto qualquer executor nesse nó ainda mantiver dados de shuffle ou dados em cache. A redução de escala eficiente desacopla ambos:
- Os dados de shuffle estão no armazenamento de blobs (ou migram para lá ao desligar).
- O cache não fixa mais executores. Caches reproduzíveis, como o cache de instantâneo Delta, são excluídos da proteção de redução de escala.
O dimensionador automático pode remover livremente os nós ociosos e redimensionar o cluster em resposta às alterações de carga de trabalho.
Melhor desempenho em shuffles desbalanceados e grandes
O AQE Shuffle Write permite que a Execução Adaptativa de Consultas otimize a própria gravação de shuffle, escolhendo um particionamento que o AQE posterior consome sem precisar recoalescer novamente e produzindo menos blocos, com tamanhos mais adequados, para armazenamento remoto. Combinado com a Camada de Decisão, você obtém um tempo total de execução menor em consultas grandes ou desbalanceadas e latência inalterada para consultas pequenas.
Introdução
Configuração recomendada
Aplique esta configuração para ativar o conjunto completo e eficiente de recursos de redução de escala:
# Remote Shuffle Manager
spark.conf.set("spark.remote.shuffle.enabled", "true")
# Decision Layer — per-stage routing of local vs. remote shuffle
spark.conf.set("spark.sql.rsm.decisionlayer.enabled.level", "stage")
# AQE participates in shuffle write
spark.conf.set("spark.sql.adaptive.shuffleWrite.enabled", "true")
# Shuffle Migration on executor decommission
spark.conf.set("spark.storage.decommission.shuffleBlocks.enabled", "true")
spark.conf.set("spark.storage.decommission.shuffleBlocks.cleanup", "true")
spark.conf.set("spark.storage.decommission.shuffleBlocks.migrateToFallbackStorage", "true")
spark.conf.set("spark.storage.decommission.fallbackStorage.cleanUp", "true")
Nenhuma alteração de código é necessária. Você também pode definir isso nas propriedades do Spark do seu ambiente.
Referência de configuração
RSM (Remote Shuffle Manager)
| Setting | Recomendado | O que controla |
|---|---|---|
spark.remote.shuffle.enabled |
true |
Ativa a redução de escala eficiente. Os dados de shuffle vão para o Armazenamento de Blobs do Azure em vez de nos discos locais do executor. |
Camada de decisão
| Setting | Recomendado | O que controla |
|---|---|---|
spark.sql.rsm.decisionlayer.enabled.level |
stage |
Granularidade na qual a Camada de Decisão roteia embaralhar.
stage avalia cada estágio do Spark de forma independente. |
Gravação de shuffle do AQE
| Setting | Recomendado | O que controla |
|---|---|---|
spark.sql.adaptive.shuffleWrite.enabled |
true |
Permite que o AQE participe da fase de escrita do shuffle. Produz um particionamento que o AQE subsequente consome sem novo coalescimento. |
Note
O próprio AQE (spark.sql.adaptive.enabled) deve estar ativado. Ele está ativado por padrão no Fabric Spark.
Migração de shuffle na desativação
| Setting | Recomendado | O que controla |
|---|---|---|
spark.storage.decommission.shuffleBlocks.enabled |
true |
Migra blocos embaralhados de um executor que está sendo desativado, em vez de removê-los. |
spark.storage.decommission.shuffleBlocks.cleanup |
true |
Remove os blocos de shuffle no executor de origem após uma migração bem-sucedida. |
spark.storage.decommission.shuffleBlocks.migrateToFallbackStorage |
true |
Se nenhum peer executor puder aceitar os blocos, os migra para o armazenamento de fallback (Armazenamento de Blobs do Azure). |
spark.storage.decommission.fallbackStorage.cleanUp |
true |
Remove os blocos de embaralhamento do armazenamento de fallback assim que não forem mais necessários, mantendo o custo de armazenamento sob controle. |
Alocação dinâmica com reconhecimento de cache
| Setting | Recomendado | O que controla |
|---|---|---|
spark.dynamicAllocation.preventShutdownExecutorWithCache |
false |
Permite que a alocação dinâmica libere executores mesmo quando eles mantêm blocos armazenados em cache. |
spark.dynamicAllocation.excludeDeltaSnapshotCache |
true |
Ignora o cache de snapshots Delta ao determinar se um executor ainda mantém cache útil. O cache de snapshot delta é reprodutível e não deve bloquear a redução de escala. |
Ajuste avançado (RSM)
A maioria dos usuários não precisa alterar esses padrões.
Desempenho de gravação
| Setting | Default | O que controla |
|---|---|---|
spark.remote.shuffle.partition.buffersize |
16777216 (16 MB) |
Buffer por partição antes de gravar no armazenamento. |
spark.remote.shuffle.blocksize |
8388608 (8 MB) |
Tamanho de blocos individuais carregados para Armazenamento de Blobs. |
spark.remote.shuffle.write.maxthreads |
cores × 16 |
Número máximo de threads usadas para gravar dados de shuffle. |
spark.remote.shuffle.write.maxtasks |
16384 |
Número máximo de operações de gravação simultâneas. |
Desempenho de leitura
| Setting | Default | O que controla |
|---|---|---|
spark.remote.shuffle.read.parallel.enabled |
true |
Fluxos de download paralelos para leituras aleatórias. |
spark.remote.shuffle.read.parallelism |
4 |
Fluxos de download paralelos por tarefa. |
spark.remote.shuffle.read.prefetchqueuesize |
250 |
Profundidade da fila de pré-busca durante as leituras. |
spark.remote.shuffle.read.maxthreads |
cores × 4 |
Número máximo de threads usadas para leitura. |
Fiabilidade
| Setting | Default | O que controla |
|---|---|---|
spark.remote.shuffle.retries |
5 |
Tentar novamente em caso de erros transitórios de armazenamento. |
spark.remote.shuffle.retrydelayms |
800 |
Retirada inicial entre novas tentativas. |
spark.remote.shuffle.retrymaxdelayms |
60000 |
Limite de retirada. |
Compression
| Setting | Default | O que controla |
|---|---|---|
spark.remote.shuffle.compression |
Usa spark.io.compression.codec |
Formato de compactação para dados de shuffle remoto (por exemplo, lz4, zstd). |
Resultados de desempenho
Economia de custo de computação (TPC-DS benchmark)
| Métrica | Sem redução eficiente | Com redução eficiente |
|---|---|---|
| Processamento total (VM-Minutes) | 14,952 | 6,880 |
| Redução de custos | — | 54% |
O tempo de execução total do trabalho pode ser maior (o dimensionamento automático usa menos executores simultâneos), mas a computação cobrada é cortada em mais da metade.
Desempenho da Camada de Decisão (TPC-DS, RSM ativado)
Direcionar pequenos shuffles para o disco local e apenas os grandes shuffles para o armazenamento remoto proporciona até 57% de melhoria no tempo de execução em comparação com direcionar todos os shuffles para o armazenamento remoto, com o mesmo benefício de redução de escala.
Limitações
- NEE é necessário. A redução eficiente depende do Mecanismo de Execução Nativa.
- Armazenamento de Blobs do Azure apenas. Padrão
BlockBlobStoragecom HNS desativado. Contas do Azure Data Lake Gen2 / habilitadas para HNS não são compatíveis como armazenamento remoto de shuffle. - Não há suporte para Link Privado do Azure. Os ambientes que usam rede de link privado não são compatíveis no momento.
- A granularidade da camada de decisão é atualmente por estágio. O roteamento por tarefa ou por partição não está no escopo.
- Alteração do comportamento do cache. Com
preventShutdownExecutorWithCache=false, os executors que armazenam dadoscache()/persist()podem ser reduzidos em escala. Cargas de trabalho que dependem muito do cache local do executor para dados frequentes devem ser validadas.