Pontos de verificação de Streaming Estruturado

Os pontos de verificação e os logs de registro antecipado trabalham juntos para fornecer garantias de processamento para aplicações de streaming estruturado. O ponto de verificação rastreia as informações que identificam a consulta, incluindo informações de estado e registros processados. Quando você exclui os arquivos em um diretório de ponto de verificação ou altera para um novo local de ponto de verificação, a próxima execução da consulta começa do zero.

Um diretório de ponto de verificação contém o seguinte:

  • Offsets: os offsets de origem processados durante cada microlote. Isso permite que a consulta seja retomada exatamente de onde parou sem reprocessar dados.
  • Confirmações: um registro de quais microlotes foram confirmados no coletor, habilitando semântica exatamente uma vez.
  • Estado: para consultas com estado (agregações, junções entre fluxos, desduplicação e operadores com estado personalizados, como transformWithState), o checkpoint armazena metadados sobre o operador com estado, o esquema de estado e o conteúdo do armazenamento de estado verificado gerenciado pelo provedor do armazenamento de estado.
  • Metadados: a ID de consulta exclusiva usada para identificar a consulta. As configurações são armazenadas como parte do log de deslocamento.

Cada consulta deve ter um local de ponto de verificação diferente. Múltiplas consultas nunca devem compartilhar o mesmo local.

Observação

Este artigo aborda pontos de verificação de streaming estruturados para consultas de streaming. Para obter informações sobre como usar DataFrame.checkpoint() com volumes do Catálogo do Unity para truncar planos de execução de DataFrames não streaming, consulte Pontos de Verificação de DataFrame em Volumes.

Habilitar pontos de verificação para consultas de Streaming Estruturado

Você deve especificar a opção checkpointLocation antes de executar uma consulta de streaming, como no exemplo a seguir:

Python

(df.writeStream
  .option("checkpointLocation", "/Volumes/catalog/schema/volume/path")
  .toTable("catalog.schema.table")
)

Scala

df.writeStream
  .option("checkpointLocation", "/Volumes/catalog/schema/volume/path")
  .toTable("catalog.schema.table")

Observação

Alguns destinos, como a saída para display() em notebooks e o destino memory, geram automaticamente um local de ponto de verificação temporário se você omitir essa configuração. Os locais de ponto de verificação temporários não garantem nenhuma tolerância a falhas ou consistência de dados e podem não ser limpos corretamente. A Databricks recomenda sempre especificar um local de ponto de verificação para esses sinks.

Recuperação após alterações em uma consulta de Streaming Estruturado

Há limitações sobre quais alterações são permitidas em uma consulta de streaming entre as reinicializações do mesmo local de ponto de verificação.

As alterações que geralmente exigem um novo ponto de verificação incluem o número ou tipo de fontes de entrada, tópicos Kafka inscritos, caminhos do Auto Loader, tipos de operação com estado, esquema de estado e tipos de coletores de saída.

As alterações geralmente seguras incluem adicionar ou remover filtros, alterar limites de taxa, intervalos de gatilho e atualizar a lógica de função definida pelo usuário dentro mapGroupsWithState (embora a semântica possa mudar).

A seção a seguir descreve as alterações que não são permitidas ou o efeito da alteração não está bem definido, em que:

  • O termo permitido significa que você pode fazer a alteração especificada, mas se a semântica de seu efeito é bem definida depende da consulta e da alteração.
  • O termo não permitido significa que você não deve fazer a alteração especificada, pois a consulta reiniciada provavelmente falhará com erros imprevisíveis.
  • sdf representa um DataFrame de streaming/Conjunto de dados gerado com sparkSession.readStream.

Tipos de alterações em consultas do Fluxo Estruturado

  • Alterações no número ou tipo de fontes de entrada: isso não é permitido por padrão porque o Streaming Estruturado identifica as fontes por sua posição no plano de consulta. Se você ativar a nomenclatura de origem, poderá reordenar fontes existentes e adicionar novas fontes sem iniciar um novo ponto de verificação. Consulte Alterar fontes de streaming com evolução da origem.

  • Alterações nos parâmetros das fontes de entrada: se isso é permitido e se a semântica da alteração é bem definida depende da origem e da consulta, incluindo controles de admissão como maxFilesPerTrigger ou maxOffsetsPerTrigger. Veja alguns exemplos:

    • Adição, exclusão e modificação de limites de taxa são permitidos:

      spark.readStream.format("kafka").option("subscribe", "article")
      

      para

      spark.readStream.format("kafka").option("subscribe", "article").option("maxOffsetsPerTrigger", ...)
      

      Para obter detalhes, consulte Configurar o tamanho do lote de Streaming Estruturado no Azure Databricks

    • As alterações nos artigos e arquivos assinados geralmente não são permitidas, pois os resultados são imprevisíveis: spark.readStream.format("kafka").option("subscribe", "article") para spark.readStream.format("kafka").option("subscribe", "newarticle")

  • Alterações no intervalo de gatilho: você pode alterar gatilhos entre lotes incrementais e intervalos de tempo. Veja Alterar intervalos de gatilho entre execuções.

  • Alterações no tipo de coletor de saída: são permitidas alterações entre algumas combinações específicas de coletores. Isso precisa ser verificado em uma base caso a caso. Veja alguns exemplos.

    • É permitido usar o destino de arquivo para o destino Kafka. O Kafka verá apenas os novos dados.
    • O coletor de Kafka para o coletor de arquivos não é permitido.
    • O coletor Kafka pode ser alterado para "foreach" ou vice-versa.
  • Alterações nos parâmetros do coletor de saída: se isso é permitido e se a semântica da alteração é bem definida depende do coletor e da consulta. Veja alguns exemplos.

    • As alterações no diretório de saída de um coletor de arquivos não são permitidas: sdf.writeStream.format("parquet").option("path", "/somePath") para sdf.writeStream.format("parquet").option("path", "/anotherPath")
    • É permitido alterar o tópico de saída: sdf.writeStream.format("kafka").option("topic", "topic1") para sdf.writeStream.format("kafka").option("topic", "topic2")
    • Alterações no coletor foreach definido pelo usuário (ou seja, o código ForeachWriter) são permitidas, mas a semântica da alteração depende do próprio código.
  • Alterações em operações tipo projeção/filtro/mapa: em alguns casos, são permitidas. Por exemplo:

    • Adição/exclusão de filtros é permitido: sdf.selectExpr("a") para sdf.where(...).selectExpr("a").filter(...).
    • As alterações nas projeções com o mesmo esquema de saída são permitidas: sdf.selectExpr("stringColumn AS json").writeStream para sdf.select(to_json(...).as("json")).writeStream.
    • As alterações nas projeções com um esquema de saída diferente são condicionalmente permitidas: sdf.selectExpr("a").writeStream a sdf.selectExpr("b").writeStream é permitida somente se o coletor de saída permitir que o esquema mude de "a" para "b".
  • Alterações em operações com estado: algumas operações em consultas de streaming precisam manter dados de estado para atualizar continuamente o resultado. O Fluxo estruturado automaticamente verifica os dados de estado no armazenamento tolerante a falhas (por exemplo, DBFS, Armazenamento de BLOBs do Azure) e restaura-os após a reinicialização. No entanto, isso pressupõe que o esquema dos dados de estado permaneça o mesmo entre as reinicializações. Isso significa que quaisquer alterações (isto é, adições, exclusões ou modificações de esquema) para as operações com estado de uma consulta de streaming não são permitidas entre as reinicializações. Aqui está a lista de operações com estado cujo esquema não deve ser alterado entre reinicializações para garantir a recuperação de estado:

    • Agregação de streaming: por exemplo, sdf.groupBy("a").agg(...). Qualquer alteração no número ou tipo de chaves ou agregações de agrupamento não é permitida.
    • Eliminação de duplicação de streaming: por exemplo, sdf.dropDuplicates("a"). Qualquer alteração no número ou tipo de chaves ou agregações de agrupamento não é permitida.
    • Junção de fluxo-fluxo: por exemplo, sdf1.join(sdf2, ...) (ou seja, ambas as entradas são geradas com sparkSession.readStream ). Alterações no esquema ou nas colunas de equijunção não são permitidas. Não são permitidas alterações no tipo de junção (externa ou interna). Outras alterações na condição de junção são mal definidas.
    • Operação com estado arbitrário: por exemplo, sdf.groupByKey(...).mapGroupsWithState(...) ou sdf.groupByKey(...).flatMapGroupsWithState(...). Qualquer alteração no esquema do estado definido pelo usuário e no tipo de tempo limite não é permitida. Qualquer alteração na função de mapeamento de estado definida pelo usuário é permitida, mas o efeito semântico da alteração depende da lógica definida pelo usuário. Se você realmente quiser dar suporte a alterações de esquema de estado, poderá codificar/decodificar explicitamente suas estruturas de dados de estado complexo em bytes usando um esquema de codificação/decodificação que dá suporte à migração de esquema. Por exemplo, se você salvar seu estado como bytes codificados em Avro, será possível alterar o esquema de estado Avro entre as reinicializações de consulta, pois isso restaura o estado binário.

Importante

Os operadores dropDuplicates() e dropDuplicatesWithinWatermark() com estado podem falhar ao reiniciar devido a uma verificação de compatibilidade do esquema de estado ao alterar entre os modos de acesso de computação.

A alteração entre modos de acesso dedicados e sem isolamento é permitida. A alteração entre os modos de acesso padrão e sem servidor é permitida. Não tente alterar entre outras combinações de modo de acesso.

Para evitar esse erro, não altere o modo de acesso de computação para consultas de streaming que contêm esses operadores.

Alterar fontes de streaming com evolução da origem

Por padrão, o Streaming Estruturado identifica as fontes por sua posição no plano de consulta, como 0, , 1etc 2. Qualquer alteração no número ou na ordem das fontes de entrada interrompe a compatibilidade do ponto de verificação e requer um novo ponto de verificação. A evolução da origem permite atribuir nomes estáveis definidos pelo usuário a cada fonte de streaming para que você possa reordenar, adicionar ou remover fontes de uma consulta sem perder o estado de ponto de verificação.

A evolução da fonte requer Databricks Runtime 18.2 ou superior.

Configuração necessária

Para habilitar a evolução do código-fonte, defina duas configurações do Spark:

  • spark.sql.streaming.queryEvolution.enableSourceEvolution: quando true, todas as fontes de streaming na consulta devem ser nomeadas explicitamente usando a .name() API. O padrão é false.
  • spark.sql.streaming.offsetLog.formatVersion: deve ser definido como 2 para usar o formato de rastreamento de deslocamento baseado em nome. O padrão é 1.

Defina ambas as configurações antes de definir a consulta de streaming:

spark.conf.set("spark.sql.streaming.queryEvolution.enableSourceEvolution", "true")
spark.conf.set("spark.sql.streaming.offsetLog.formatVersion", "2")

Regras de nomenclatura

  • Os nomes devem conter apenas caracteres alfanuméricos e sublinhados ([a-zA-Z0-9_]+).
  • Cada nome de origem deve ser exclusivo em uma consulta.
  • Quando a evolução da origem está habilitada, cada fonte de streaming deve ter um nome. Fontes não nomeadas causam um UNNAMED_STREAMING_SOURCES_WITH_ENFORCEMENT erro.

Reordenar, adicionar e remover fontes

As seguintes alterações são seguras em reinicializações de consulta com o mesmo ponto de verificação:

  • Reordenar fontes: reinicie a consulta com uma ordem diferente de fontes. Cada fonte retoma do último offset confirmado com base no seu nome e não altera o estado do checkpoint.
  • Adicionar novas fontes: reinicie a consulta com uma nova fonte. Os novos processos de origem desde o início e as fontes existentes continuam desde seus últimos deslocamentos.
  • Remover fontes: reinicie a consulta sem a origem. A fonte é removida permanentemente do checkpoint. Uma origem removida não pode ser adicionada novamente com o mesmo nome.

Exemplo

Use .name() em DataStreamReader antes de chamar .load() ou .table():

Python

orders_us = (spark.readStream
  .name("orders_us")
  .table("catalog.schema.orders_us")
)

orders_eu = (spark.readStream
  .name("orders_eu")
  .table("catalog.schema.orders_eu")
)

all_orders = orders_us.union(orders_eu)

Scala

val ordersUS = spark.readStream
  .name("orders_us")
  .table("catalog.schema.orders_us")

val ordersEU = spark.readStream
  .name("orders_eu")
  .table("catalog.schema.orders_eu")

val allOrders = ordersUS.union(ordersEU)

Limitações

  • A nomenclatura de origem requer um novo ponto de verificação. Você não pode habilitar a evolução do código-fonte com um ponto de verificação existente que usa o formato de log de deslocamento V1.
  • Após uma atualização para o formato de log de deslocamento V2, você não poderá voltar para o V1. Consulte a configuração necessária.
  • Os nomes das fontes são permanentes. Para renomear uma origem, remova-a e adicione-a com um novo nome. Os processos de origem renomeados são executados desde o início.