Redução de escala eficiente e gerenciador remoto de shuffle

Aplicável a:✅ Fabric Data Engineering and Data Science

A redução de escala eficiente é um recurso do Microsoft Fabric Spark que desvincula os dados de shuffle do Spark do ciclo de vida do executor. Em vez de manter os dados de shuffle nos discos locais do executor, o Fabric Spark faz o roteamento desses dados para o Armazenamento de Blobs do Azure (ou os migra para lá sob demanda) e permite que a Execução Adaptativa de Consultas (AQE) defina a forma da própria gravação. O resultado é uma redução mais rápida do cluster, menor custo de computação e trabalhos mais resilientes, sem alterações em suas consultas, notebooks ou pipelines.

Overview

A redução de escala eficiente se baseia em quatro capacidades que trabalham em conjunto:

Capacidade O que faz
RSM (Remote Shuffle Manager) Grava e lê dados de embaralhamento no Armazenamento de Blobs do Azure em vez de nos discos locais do executor.
Migração embaralhada Move os blocos de shuffle de um executor antes que ele seja desativado, em vez de descartá-los.
Camada de decisão Roteamento em tempo de execução por estágio que mantém pequenos shuffles locais e descarrega grandes shuffles no armazenamento remoto.
Gravação de shuffle do AQE Permite que o Adaptive Query Execution participe da fase de gravação do shuffle para que o particionamento fique correto já na primeira vez.

Pré-requisitos

  • O NEE (Mecanismo de Execução Nativa) deve estar habilitado.
  • Dimensionamento automático habilitado (recomendado). A redução de escala eficiente também funciona sem escalonamento automático com as configurações do Spark abaixo.
  • Runtime 1.3 (Apache Spark 3.5) ou posterior.

Como funciona

Quando o Spark processa uma consulta, ele geralmente redistribui dados entre estágios—um shuffle. Normalmente, os dados de shuffle são armazenados no disco local de cada executor, o que vincula os executores a esses dados. Eles não podem ser liberados até que todos os consumidores terminem a leitura. Esse acoplamento é o principal motivo pelo qual os clusters não conseguem reduzir a escala rapidamente e pelo qual a perda de um executor causa reexecuções de estágio custosas.

A redução eficiente de escala quebra esse acoplamento:

  • Large shuffles vá diretamente para Armazenamento de Blobs do Azure por meio do Gerenciador de Embaralhamento Remoto.
  • Pequenas reorganizações permanecem no disco local para maior desempenho. Se esse executor precisar ser liberado mais tarde, o Shuffle Migration moverá os blocos em segundo plano para outros nós ou para o armazenamento de fallback.
  • A Camada de Decisão escolhe o caminho certo por estágio no runtime.
  • AQE Shuffle Write garante que o mecanismo de gravação produza um particionamento que o AQE subsequente consome sem precisar de nova coalescência, evitando E/S desnecessária.
                ┌───────────────────────────┐
   Query  ───►  │   AQE + Decision Layer    │   per-stage choice
                └─────────────┬─────────────┘
                              │
                ┌─────────────▼─────────────┐
                │   AQE Shuffle Write       │   partition-aware writer
                └─────┬─────────────────┬───┘
                      │                 │
              local   ▼                 ▼   remote
        ┌────────────────────┐   ┌──────────────────┐
        │  Local disk +      │   │  RSM → Azure     │
        │  Shuffle Migration │   │  Blob Storage    │
        └─────────┬──────────┘   └─────────┬────────┘
                  │ on decommission        │
                  ▼                        ▼
        fallback storage   Remote shuffle store

Roteamento inteligente (Camada de Decisão)

A Camada de Decisão avalia cada troca de shuffle e decide:

  • Shuffles grandes → Armazenamento de Blobs do Azure. Benefício máximo de redução e tolerância a falhas.
  • Pequenos shuffles → disco local. Nenhuma sobrecarga de E/S de nuvem para transferências minúsculas. Se o executor for descomissionado posteriormente, o Shuffle Migration assume.

O roteamento é automático e não requer nenhuma entrada do usuário. A granularidade recomendada é por estágio.

Principais benefícios

Custos mais baixos: pagar somente pela computação que você usa

Com uma redução de escala eficiente, os executores são liberados assim que seu trabalho é concluído. Eles não ficam mais ociosos mantendo dados de shuffle que tarefas posteriores podem eventualmente ler.

  • Redução de escala mais rápida. A escala automática remove os nós imediatamente após a conclusão da tarefa.
  • Menos recursos computacionais ociosos. Nenhum executor "zumbi" mantido vivo apenas para servir seu shuffle local.
  • Nenhum excesso de provisionamento de disco. Grandes operações de shuffle vão para o Blob Storage em vez de exigir discos locais grandes.
  • Custo de armazenamento limitado. O armazenamento de fallback é limpo automaticamente quando os blocos não são mais necessários.

Trabalhos mais resilientes

Quando os dados de shuffle ficam armazenados apenas no disco local, uma falha em um executor significa que esses dados são perdidos e o Spark deve recalculá-los. Com redução eficiente, os dados já estão no armazenamento de blobs ou migrados para lá antes que o executor desagre.

Scenario Sem redução eficiente Com redução eficiente
Falhas do executor Dados de shuffle perdidos; estágios reexecutados Os dados são seguros no armazenamento; sem recomputação
Preempção de nó Dados perdidos, repetições caras Os dados sobrevivem; o trabalho continua normalmente
Desativação controlada Modo aleatório desativado ao desligar Blocos migrados para o armazenamento peer ou de fallback
Blips de rede durante a busca Em cascata FetchFailedException As operações de leitura vêm do armazenamento, sem serem afetadas

Isso elimina a causa mais comum de FetchFailedException em produção.

Dimensionamento mais rápido e verdadeiramente elástico

Sem uma redução eficiente da escala, o autoescalador não pode liberar um nó enquanto qualquer executor nesse nó ainda mantiver dados de shuffle ou dados em cache. A redução de escala eficiente desacopla ambos:

  • Os dados de shuffle estão no armazenamento de blobs (ou migram para lá ao desligar).
  • O cache não fixa mais executores. Caches reproduzíveis, como o cache de instantâneo Delta, são excluídos da proteção de redução de escala.

O dimensionador automático pode remover livremente os nós ociosos e redimensionar o cluster em resposta às alterações de carga de trabalho.

Melhor desempenho em shuffles desbalanceados e grandes

O AQE Shuffle Write permite que a Execução Adaptativa de Consultas otimize a própria gravação de shuffle, escolhendo um particionamento que o AQE posterior consome sem precisar recoalescer novamente e produzindo menos blocos, com tamanhos mais adequados, para armazenamento remoto. Combinado com a Camada de Decisão, você obtém um tempo total de execução menor em consultas grandes ou desbalanceadas e latência inalterada para consultas pequenas.

Introdução

Aplique esta configuração para ativar o conjunto completo e eficiente de recursos de redução de escala:

# Remote Shuffle Manager
spark.conf.set("spark.remote.shuffle.enabled", "true")

# Decision Layer — per-stage routing of local vs. remote shuffle
spark.conf.set("spark.sql.rsm.decisionlayer.enabled.level", "stage")

# AQE participates in shuffle write
spark.conf.set("spark.sql.adaptive.shuffleWrite.enabled", "true")

# Shuffle Migration on executor decommission
spark.conf.set("spark.storage.decommission.shuffleBlocks.enabled", "true")
spark.conf.set("spark.storage.decommission.shuffleBlocks.cleanup", "true")
spark.conf.set("spark.storage.decommission.shuffleBlocks.migrateToFallbackStorage", "true")
spark.conf.set("spark.storage.decommission.fallbackStorage.cleanUp", "true")

Nenhuma alteração de código é necessária. Você também pode definir isso nas propriedades do Spark do seu ambiente.

Referência de configuração

RSM (Remote Shuffle Manager)

Setting Recomendado O que controla
spark.remote.shuffle.enabled true Ativa a redução de escala eficiente. Os dados de shuffle vão para o Armazenamento de Blobs do Azure em vez de nos discos locais do executor.

Camada de decisão

Setting Recomendado O que controla
spark.sql.rsm.decisionlayer.enabled.level stage Granularidade na qual a Camada de Decisão roteia embaralhar. stage avalia cada estágio do Spark de forma independente.

Gravação de shuffle do AQE

Setting Recomendado O que controla
spark.sql.adaptive.shuffleWrite.enabled true Permite que o AQE participe da fase de escrita do shuffle. Produz um particionamento que o AQE subsequente consome sem novo coalescimento.

Note

O próprio AQE (spark.sql.adaptive.enabled) deve estar ativado. Ele está ativado por padrão no Fabric Spark.

Migração de shuffle na desativação

Setting Recomendado O que controla
spark.storage.decommission.shuffleBlocks.enabled true Migra blocos embaralhados de um executor que está sendo desativado, em vez de removê-los.
spark.storage.decommission.shuffleBlocks.cleanup true Remove os blocos de shuffle no executor de origem após uma migração bem-sucedida.
spark.storage.decommission.shuffleBlocks.migrateToFallbackStorage true Se nenhum peer executor puder aceitar os blocos, os migra para o armazenamento de fallback (Armazenamento de Blobs do Azure).
spark.storage.decommission.fallbackStorage.cleanUp true Remove os blocos de embaralhamento do armazenamento de fallback assim que não forem mais necessários, mantendo o custo de armazenamento sob controle.

Alocação dinâmica com reconhecimento de cache

Setting Recomendado O que controla
spark.dynamicAllocation.preventShutdownExecutorWithCache false Permite que a alocação dinâmica libere executores mesmo quando eles mantêm blocos armazenados em cache.
spark.dynamicAllocation.excludeDeltaSnapshotCache true Ignora o cache de snapshots Delta ao determinar se um executor ainda mantém cache útil. O cache de snapshot delta é reprodutível e não deve bloquear a redução de escala.

Ajuste avançado (RSM)

A maioria dos usuários não precisa alterar esses padrões.

Desempenho de gravação

Setting Default O que controla
spark.remote.shuffle.partition.buffersize 16777216 (16 MB) Buffer por partição antes de gravar no armazenamento.
spark.remote.shuffle.blocksize 8388608 (8 MB) Tamanho de blocos individuais carregados para Armazenamento de Blobs.
spark.remote.shuffle.write.maxthreads cores × 16 Número máximo de threads usadas para gravar dados de shuffle.
spark.remote.shuffle.write.maxtasks 16384 Número máximo de operações de gravação simultâneas.

Desempenho de leitura

Setting Default O que controla
spark.remote.shuffle.read.parallel.enabled true Fluxos de download paralelos para leituras aleatórias.
spark.remote.shuffle.read.parallelism 4 Fluxos de download paralelos por tarefa.
spark.remote.shuffle.read.prefetchqueuesize 250 Profundidade da fila de pré-busca durante as leituras.
spark.remote.shuffle.read.maxthreads cores × 4 Número máximo de threads usadas para leitura.

Fiabilidade

Setting Default O que controla
spark.remote.shuffle.retries 5 Tentar novamente em caso de erros transitórios de armazenamento.
spark.remote.shuffle.retrydelayms 800 Retirada inicial entre novas tentativas.
spark.remote.shuffle.retrymaxdelayms 60000 Limite de retirada.

Compression

Setting Default O que controla
spark.remote.shuffle.compression Usa spark.io.compression.codec Formato de compactação para dados de shuffle remoto (por exemplo, lz4, zstd).

Resultados de desempenho

Gráfico que mostra a economia nos custos de computação com a redução de escala eficiente habilitada, em comparação com desabilitada, em um benchmark TPC-DS, demonstrando uma redução de custo de 54%.

Economia de custo de computação (TPC-DS benchmark)

Métrica Sem redução eficiente Com redução eficiente
Processamento total (VM-Minutes) 14,952 6,880
Redução de custos 54%

O tempo de execução total do trabalho pode ser maior (o dimensionamento automático usa menos executores simultâneos), mas a computação cobrada é cortada em mais da metade.

Desempenho da Camada de Decisão (TPC-DS, RSM ativado)

Direcionar pequenos shuffles para o disco local e apenas os grandes shuffles para o armazenamento remoto proporciona até 57% de melhoria no tempo de execução em comparação com direcionar todos os shuffles para o armazenamento remoto, com o mesmo benefício de redução de escala.

Limitações

  • NEE é necessário. A redução eficiente depende do Mecanismo de Execução Nativa.
  • Armazenamento de Blobs do Azure apenas. Padrão BlockBlobStorage com HNS desativado. Contas do Azure Data Lake Gen2 / habilitadas para HNS não são compatíveis como armazenamento remoto de shuffle.
  • Não há suporte para Link Privado do Azure. Os ambientes que usam rede de link privado não são compatíveis no momento.
  • A granularidade da camada de decisão é atualmente por estágio. O roteamento por tarefa ou por partição não está no escopo.
  • Alteração do comportamento do cache. Com preventShutdownExecutorWithCache=false, os executors que armazenam dados cache()/persist() podem ser reduzidos em escala. Cargas de trabalho que dependem muito do cache local do executor para dados frequentes devem ser validadas.