Observação
O acesso a essa página exige autorização. Você pode tentar entrar ou alterar diretórios.
O acesso a essa página exige autorização. Você pode tentar alterar os diretórios.
Você pode usar operações DataFrame ou funções de valor de tabela SQL para consultar metadados e dados de estado de Streaming Estruturado. Use essas funções para observar informações de estado para consultas com estado de Streaming Estruturado, que podem ser úteis para monitoramento e depuração.
É preciso ter acesso de leitura no caminho do ponto de verificação de uma consulta de streaming para consultar dados de estado ou metadados. As funções descritas neste artigo fornecem acesso somente leitura a dados de estado e metadados. Você só pode usar a semântica de leitura em lote para consultar informações de estado.
Observação
Não é possível consultar informações de estado para Pipelines Declarativas do Lakeflow Spark, tabelas de streaming ou exibições materializadas. Não é possível consultar informações de estado usando computação sem servidor ou computação configurada com o modo de acesso padrão.
Requisitos
- Use uma das seguintes configurações de computação:
- Databricks Runtime 16.3 e posteriores na computação configurada com o modo de acesso padrão.
- Databricks Runtime 14.3 LTS e posteriores na computação configurada com modo de acesso dedicado ou sem isolamento.
- Leia o acesso ao caminho do ponto de verificação usado pela consulta de streaming.
Ler o repositório de estado do Streaming Estruturado
Você pode ler as informações de armazenamento de estado para consultas de Streaming Estruturado executadas em qualquer Databricks Runtime com suporte. Use a seguinte sintaxe:
Python
df = (spark.read
.format("statestore")
.load("/checkpoint/path"))
Scala
val df = spark.read
.format("statestore")
.load("/checkpoint/path")
SQL
SELECT * FROM read_statestore('/checkpoint/path')
Opções e esquema da API do leitor de estado
Para obter uma lista completa das opções de formato statestore, consulte State store.
Os dados de saída têm o seguinte esquema:
| Coluna | Tipo | Descrição |
|---|---|---|
key |
Struct (tipo adicional derivado da chave de estado) | A chave de um registro do operador com estado no ponto de verificação de estado. |
value |
Struct (tipo adicional derivado do valor de estado) | O valor de um registro do operador com estado no ponto de verificação de estado. |
partition_id |
Inteiro | A partição do ponto de verificação de estado que contém o registro do operador com estado. |
No Databricks Runtime 16.4 LTS e posteriores, quando a opção readChangeFeed é definida como true, os dados de saída têm o seguinte esquema:
| Coluna | Tipo | Descrição |
|---|---|---|
batch_id |
Longo | A ID do lote à qual a alteração de estado pertence. |
change_type |
Cadeia de caracteres | O tipo de alteração aplicada pelo lote: update para inserções e atualizações, delete para exclusões. |
key |
Struct (tipo adicional derivado da chave de estado) | A chave de um registro do operador com estado no ponto de verificação de estado. |
value |
Struct (tipo adicional derivado do valor de estado) | O valor de um registro do operador com estado no ponto de verificação de estado.
null para registros onde change_type está delete. |
partition_id |
Inteiro | A partição do ponto de verificação de estado que contém o registro do operador com estado. |
Consulte TVF read_statestore.
Visualizar alterações de estado do Structured Streaming
Disponível no Databricks Runtime 16.4 LTS e superior. Para ler como o estado é alterado entre microbates em vez de exibir o estado completo em uma única microbatch, defina readChangeFeedtrue e especifique changeStartBatchId. Opcionalmente, especifique changeEndBatchId. Para obter uma lista completa de opções, consulte State store.
Por exemplo, para ler as alterações de estado do lote 2 até o lote confirmado mais recente:
Python
df = (spark.read
.format("statestore")
.option("readChangeFeed", True)
.option("changeStartBatchId", 2)
.load("<checkpointLocation>")
)
Scala
val df = spark.read
.format("statestore")
.option("readChangeFeed", true)
.option("changeStartBatchId", 2)
.load("<checkpointLocation>")
SQL
SELECT * FROM read_statestore(
'<checkpointLocation>',
readChangeFeed => true,
changeStartBatchId => 2
);
O esquema de saída inclui colunas adicionais batch_id e change_type. Para obter o esquema completo, consulte as opções e o esquema da API do leitor de estado.
Ler metadados de estado de Streaming Estruturado
Disponível no Databricks Runtime 14.3 LTS ou superior. Você pode ler informações de metadados de estado para consultas de Streaming Estruturado:
Python
df = (spark.read
.format("state-metadata")
.load("<checkpointLocation>"))
Scala
val df = spark.read
.format("state-metadata")
.load("<checkpointLocation>")
SQL
SELECT * FROM read_state_metadata('/checkpoint/path')
Os dados retornados têm o seguinte esquema:
| Coluna | Tipo | Descrição |
|---|---|---|
operatorId |
Inteiro | A ID de inteiro do operador de streaming com estado. |
operatorName |
Cadeia de caracteres | Nome do operador de streaming com estado. |
stateStoreName |
Cadeia de caracteres | Nome do repositório de estado do operador. |
numPartitions |
Inteiro | Número de partições do repositório de estado. |
minBatchId |
Longo | A ID mínima do lote disponível para consultar o estado. |
maxBatchId |
Longo | A ID máxima do lote disponível para consultar o estado. |
Observação
Os valores de ID do lote fornecidos por minBatchId e maxBatchId refletem o estado no momento em que o ponto de verificação foi gravado. Lotes antigos são limpos automaticamente com execução de microlotes, então não há garantia que o valor fornecido aqui ainda esteja disponível.
Consulte TVF read_state_metadata.
Exemplo: consultar um lado de uma junção entre fluxos
Use a seguinte sintaxe para consultar o lado esquerdo de uma junção de fluxo de fluxo:
Python
left_df = (spark.read
.format("statestore")
.option("joinSide", "left")
.load("/checkpoint/path"))
Scala
val leftDf = spark.read
.format("statestore")
.option("joinSide", "left")
.load("/checkpoint/path")
SQL
SELECT * FROM read_statestore(
'/checkpoint/path',
joinSide => 'left'
);
Exemplo: consultar o repositório de estado para fluxo com vários operadores com estado
Esses exemplos usam o leitor de metadados de estado para coletar detalhes de metadados de uma consulta de streaming com vários operadores com estado e, em seguida, usa os resultados dos metadados como opções para o leitor de estado.
O leitor de metadados de estado usa o caminho do ponto de verificação como a única opção, como no exemplo de sintaxe a seguir:
Python
df = (spark.read
.format("state-metadata")
.load("<checkpointLocation>"))
Scala
val df = spark.read
.format("state-metadata")
.load("<checkpointLocation>")
SQL
SELECT * FROM read_state_metadata('/checkpoint/path')
A tabela a seguir representa um exemplo de saída dos metadados do repositório de estado:
| operatorId | operatorName | stateStoreName | númeroDePartições | minBatchId | maxBatchId |
|---|---|---|---|---|---|
| 0 | stateStoreSave | padrão | 200 | 0 | 13 |
| 1 | dedupeWithinWatermark | padrão | 200 | 0 | 13 |
Para obter resultados para o dedupeWithinWatermark operador, consulte o leitor de estado com a opção operatorId , como no exemplo a seguir:
Python
left_df = (spark.read
.format("statestore")
.option("operatorId", 1)
.load("/checkpoint/path"))
Scala
val leftDf = spark.read
.format("statestore")
.option("operatorId", 1)
.load("/checkpoint/path")
SQL
SELECT * FROM read_statestore(
'/checkpoint/path',
operatorId => 1
);