Nota
O acesso a esta página requer autorização. Pode tentar iniciar sessão ou alterar os diretórios.
O acesso a esta página requer autorização. Pode tentar alterar os diretórios.
Você pode usar operações DataFrame ou funções de valor de tabela SQL para consultar dados e metadados de estado do Streaming Estruturado. Utilize estas funções para monitorizar o estado das consultas com estado do Structured Streaming, que podem ser úteis para monitorização e resolução de problemas.
Você deve ter acesso de leitura ao caminho do ponto de verificação para uma consulta em fluxo contínuo a fim de consultar dados de estado ou metadados. As funções descritas neste artigo fornecem acesso somente leitura a dados de estado e metadados. Você só pode usar a semântica de leitura em lote para consultar informações de estado.
Nota
Não é possível consultar informações de estado para Lakeflow Spark Declarative Pipelines, tabelas de streaming ou exibições materializadas. Não é possível consultar informações de estado usando computação sem servidor ou computação configurada com o modo de acesso padrão.
Requerimentos
- Use uma das seguintes configurações de computação:
- Databricks Runtime 16.3 e superior na computação configurada com o modo de acesso padrão.
- Databricks Runtime 14.3 LTS e superior em computação configurada com modo de acesso dedicado ou sem isolamento.
- Acesso de leitura ao caminho do ponto de verificação usado pela consulta de streaming.
Ler armazenamento de estado de streaming estruturado
Você pode ler informações de armazenamento de estado para consultas de Streaming Estruturado executadas em qualquer Databricks Runtime suportado. Utilize a seguinte sintaxe:
Python
df = (spark.read
.format("statestore")
.load("/checkpoint/path"))
Scala
val df = spark.read
.format("statestore")
.load("/checkpoint/path")
SQL
SELECT * FROM read_statestore('/checkpoint/path')
Opções e esquema da API do leitor de estado
Para uma lista completa de opções de formato statestore, consulte State store.
Os dados de saída apresentam o seguinte esquema:
| Coluna | Tipo | Descrição |
|---|---|---|
key |
Struct (tipo adicional derivado da chave de estado) | A chave para um registro de operador com estado no ponto de verificação de estado. |
value |
Struct (tipo adicional derivado do valor de estado) | O valor de um registo de operador com estado no ponto de verificação de estado. |
partition_id |
Número inteiro | A partição do ponto de verificação do estado que contém o registo do operador com estado. |
No Databricks Runtime 16.4 LTS e superiores, quando a readChangeFeed opção está definida para true, os dados de saída têm o seguinte esquema:
| Coluna | Tipo | Descrição |
|---|---|---|
batch_id |
Longo | O ID do lote a que pertence a alteração de estado. |
change_type |
Cadeia | O tipo de alteração aplicada pelo lote: update para inserções e atualizações, delete para eliminações. |
key |
Struct (tipo adicional derivado da chave de estado) | A chave para um registro de operador com estado no ponto de verificação de estado. |
value |
Struct (tipo adicional derivado do valor de estado) | O valor de um registo de operador com estado no ponto de verificação de estado.
null para registos onde change_type é delete. |
partition_id |
Número inteiro | A partição do ponto de verificação do estado que contém o registo do operador com estado. |
Consulte read_statestore função de valor de tabela.
Ler alterações de estado do Structured Streaming
Disponível em Databricks Runtime 16.4 LTS e superiores. Para ler como o estado muda entre microlotes, em vez de visualizar o estado completo num único microlote, defina readChangeFeed e true especifique changeStartBatchId. Opcionalmente, especifique changeEndBatchId. Para uma lista completa de opções, consulte a loja State.
Por exemplo, para ler as alterações de estado do lote 2 até ao último lote confirmado:
Python
df = (spark.read
.format("statestore")
.option("readChangeFeed", True)
.option("changeStartBatchId", 2)
.load("<checkpointLocation>")
)
Scala
val df = spark.read
.format("statestore")
.option("readChangeFeed", true)
.option("changeStartBatchId", 2)
.load("<checkpointLocation>")
SQL
SELECT * FROM read_statestore(
'<checkpointLocation>',
readChangeFeed => true,
changeStartBatchId => 2
);
O esquema de saída inclui colunas adicionais batch_idchange_type . Para o esquema completo, consulte as opções e o esquema da API do leitor de estados.
Ler metadados de estado do Streaming Estruturado
Disponível em Databricks Runtime 14.3 LTS ou superior. Pode ler informações de metadados de estado para consultas de Streaming Estruturado:
Python
df = (spark.read
.format("state-metadata")
.load("<checkpointLocation>"))
Scala
val df = spark.read
.format("state-metadata")
.load("<checkpointLocation>")
SQL
SELECT * FROM read_state_metadata('/checkpoint/path')
Os dados retornados têm o seguinte esquema:
| Coluna | Tipo | Descrição |
|---|---|---|
operatorId |
Número inteiro | O ID inteiro do operador de streaming com estado. |
operatorName |
Cadeia | Nome do operador de streaming com estado. |
stateStoreName |
Cadeia | Nome do armazenamento de estado do operador. |
numPartitions |
Número inteiro | Número de partições do armazenamento de estado. |
minBatchId |
Longo | O identificador de lote mínimo disponível para consulta de estado. |
maxBatchId |
Longo | O ID máximo de lote disponível para a consulta de estado. |
Nota
Os valores de ID de lote fornecidos por minBatchId e maxBatchId refletem o estado no momento em que o ponto de verificação foi gravado. Os lotes antigos são limpos automaticamente com a execução de microlotes, portanto, não é garantido que o valor fornecido aqui ainda esteja disponível.
Consulte read_state_metadata função de valor de tabela.
Exemplo: Analisar um lado de uma junção de fluxos de dados
Use a sintaxe a seguir para consultar o lado esquerdo de uma associação de fluxo de fluxo:
Python
left_df = (spark.read
.format("statestore")
.option("joinSide", "left")
.load("/checkpoint/path"))
Scala
val leftDf = spark.read
.format("statestore")
.option("joinSide", "left")
.load("/checkpoint/path")
SQL
SELECT * FROM read_statestore(
'/checkpoint/path',
joinSide => 'left'
);
Exemplo: Consultar armazenamento de estado para fluxo com múltiplos operadores com estado
Este exemplo usa o leitor de metadados de estado para coletar detalhes de metadados de uma consulta de streaming com vários operadores com estado e, em seguida, usa os resultados de metadados como opções para o leitor de estado.
O leitor de metadados de estado usa o caminho do ponto de verificação como a única opção, como no exemplo de sintaxe a seguir:
Python
df = (spark.read
.format("state-metadata")
.load("<checkpointLocation>"))
Scala
val df = spark.read
.format("state-metadata")
.load("<checkpointLocation>")
SQL
SELECT * FROM read_state_metadata('/checkpoint/path')
A tabela a seguir representa um exemplo de saída de metadados de armazenamento de estado:
| operatorId | nome_do-operador | stateStoreName [en] | numPartições | minBatchId | maxBatchId |
|---|---|---|---|---|---|
| 0 | stateStoreSalvar | predefinição | 200 | 0 | 13 |
| 1 | dedupeWithinWatermark | predefinição | 200 | 0 | 13 |
Para obter resultados para o dedupeWithinWatermark operador, consulte o leitor de estado com a operatorId opção, como no exemplo a seguir:
Python
left_df = (spark.read
.format("statestore")
.option("operatorId", 1)
.load("/checkpoint/path"))
Scala
val leftDf = spark.read
.format("statestore")
.option("operatorId", 1)
.load("/checkpoint/path")
SQL
SELECT * FROM read_statestore(
'/checkpoint/path',
operatorId => 1
);