Ler informações de estado do Streaming Estruturado

Você pode usar operações DataFrame ou funções de valor de tabela SQL para consultar dados e metadados de estado do Streaming Estruturado. Utilize estas funções para monitorizar o estado das consultas com estado do Structured Streaming, que podem ser úteis para monitorização e resolução de problemas.

Você deve ter acesso de leitura ao caminho do ponto de verificação para uma consulta em fluxo contínuo a fim de consultar dados de estado ou metadados. As funções descritas neste artigo fornecem acesso somente leitura a dados de estado e metadados. Você só pode usar a semântica de leitura em lote para consultar informações de estado.

Nota

Não é possível consultar informações de estado para Lakeflow Spark Declarative Pipelines, tabelas de streaming ou exibições materializadas. Não é possível consultar informações de estado usando computação sem servidor ou computação configurada com o modo de acesso padrão.

Requerimentos

  • Use uma das seguintes configurações de computação:
    • Databricks Runtime 16.3 e superior na computação configurada com o modo de acesso padrão.
    • Databricks Runtime 14.3 LTS e superior em computação configurada com modo de acesso dedicado ou sem isolamento.
  • Acesso de leitura ao caminho do ponto de verificação usado pela consulta de streaming.

Ler armazenamento de estado de streaming estruturado

Você pode ler informações de armazenamento de estado para consultas de Streaming Estruturado executadas em qualquer Databricks Runtime suportado. Utilize a seguinte sintaxe:

Python

df = (spark.read
  .format("statestore")
  .load("/checkpoint/path"))

Scala

val df = spark.read
  .format("statestore")
  .load("/checkpoint/path")

SQL

SELECT * FROM read_statestore('/checkpoint/path')

Opções e esquema da API do leitor de estado

Para uma lista completa de opções de formato statestore, consulte State store.

Os dados de saída apresentam o seguinte esquema:

Coluna Tipo Descrição
key Struct (tipo adicional derivado da chave de estado) A chave para um registro de operador com estado no ponto de verificação de estado.
value Struct (tipo adicional derivado do valor de estado) O valor de um registo de operador com estado no ponto de verificação de estado.
partition_id Número inteiro A partição do ponto de verificação do estado que contém o registo do operador com estado.

No Databricks Runtime 16.4 LTS e superiores, quando a readChangeFeed opção está definida para true, os dados de saída têm o seguinte esquema:

Coluna Tipo Descrição
batch_id Longo O ID do lote a que pertence a alteração de estado.
change_type Cadeia O tipo de alteração aplicada pelo lote: update para inserções e atualizações, delete para eliminações.
key Struct (tipo adicional derivado da chave de estado) A chave para um registro de operador com estado no ponto de verificação de estado.
value Struct (tipo adicional derivado do valor de estado) O valor de um registo de operador com estado no ponto de verificação de estado. null para registos onde change_type é delete.
partition_id Número inteiro A partição do ponto de verificação do estado que contém o registo do operador com estado.

Consulte read_statestore função de valor de tabela.

Ler alterações de estado do Structured Streaming

Disponível em Databricks Runtime 16.4 LTS e superiores. Para ler como o estado muda entre microlotes, em vez de visualizar o estado completo num único microlote, defina readChangeFeed e true especifique changeStartBatchId. Opcionalmente, especifique changeEndBatchId. Para uma lista completa de opções, consulte a loja State.

Por exemplo, para ler as alterações de estado do lote 2 até ao último lote confirmado:

Python

df = (spark.read
  .format("statestore")
  .option("readChangeFeed", True)
  .option("changeStartBatchId", 2)
  .load("<checkpointLocation>")
)

Scala

val df = spark.read
  .format("statestore")
  .option("readChangeFeed", true)
  .option("changeStartBatchId", 2)
  .load("<checkpointLocation>")

SQL

SELECT * FROM read_statestore(
    '<checkpointLocation>',
    readChangeFeed => true,
    changeStartBatchId => 2
);

O esquema de saída inclui colunas adicionais batch_idchange_type . Para o esquema completo, consulte as opções e o esquema da API do leitor de estados.

Ler metadados de estado do Streaming Estruturado

Disponível em Databricks Runtime 14.3 LTS ou superior. Pode ler informações de metadados de estado para consultas de Streaming Estruturado:

Python

df = (spark.read
  .format("state-metadata")
  .load("<checkpointLocation>"))

Scala

val df = spark.read
  .format("state-metadata")
  .load("<checkpointLocation>")

SQL

SELECT * FROM read_state_metadata('/checkpoint/path')

Os dados retornados têm o seguinte esquema:

Coluna Tipo Descrição
operatorId Número inteiro O ID inteiro do operador de streaming com estado.
operatorName Cadeia Nome do operador de streaming com estado.
stateStoreName Cadeia Nome do armazenamento de estado do operador.
numPartitions Número inteiro Número de partições do armazenamento de estado.
minBatchId Longo O identificador de lote mínimo disponível para consulta de estado.
maxBatchId Longo O ID máximo de lote disponível para a consulta de estado.

Nota

Os valores de ID de lote fornecidos por minBatchId e maxBatchId refletem o estado no momento em que o ponto de verificação foi gravado. Os lotes antigos são limpos automaticamente com a execução de microlotes, portanto, não é garantido que o valor fornecido aqui ainda esteja disponível.

Consulte read_state_metadata função de valor de tabela.

Exemplo: Analisar um lado de uma junção de fluxos de dados

Use a sintaxe a seguir para consultar o lado esquerdo de uma associação de fluxo de fluxo:

Python

left_df = (spark.read
  .format("statestore")
  .option("joinSide", "left")
  .load("/checkpoint/path"))

Scala

val leftDf = spark.read
  .format("statestore")
  .option("joinSide", "left")
  .load("/checkpoint/path")

SQL

SELECT * FROM read_statestore(
    '/checkpoint/path',
    joinSide => 'left'
);

Exemplo: Consultar armazenamento de estado para fluxo com múltiplos operadores com estado

Este exemplo usa o leitor de metadados de estado para coletar detalhes de metadados de uma consulta de streaming com vários operadores com estado e, em seguida, usa os resultados de metadados como opções para o leitor de estado.

O leitor de metadados de estado usa o caminho do ponto de verificação como a única opção, como no exemplo de sintaxe a seguir:

Python

df = (spark.read
  .format("state-metadata")
  .load("<checkpointLocation>"))

Scala

val df = spark.read
  .format("state-metadata")
  .load("<checkpointLocation>")

SQL

SELECT * FROM read_state_metadata('/checkpoint/path')

A tabela a seguir representa um exemplo de saída de metadados de armazenamento de estado:

operatorId nome_do-operador stateStoreName [en] numPartições minBatchId maxBatchId
0 stateStoreSalvar predefinição 200 0 13
1 dedupeWithinWatermark predefinição 200 0 13

Para obter resultados para o dedupeWithinWatermark operador, consulte o leitor de estado com a operatorId opção, como no exemplo a seguir:

Python

left_df = (spark.read
  .format("statestore")
  .option("operatorId", 1)
  .load("/checkpoint/path"))

Scala

val leftDf = spark.read
  .format("statestore")
  .option("operatorId", 1)
  .load("/checkpoint/path")

SQL

SELECT * FROM read_statestore(
    '/checkpoint/path',
    operatorId => 1
);