Kommentar
Åtkomst till den här sidan kräver auktorisering. Du kan prova att logga in eller ändra kataloger.
Åtkomst till den här sidan kräver auktorisering. Du kan prova att ändra kataloger.
Du kan använda DataFrame-åtgärder eller SQL-tabellvärdesfunktioner för att köra frågor mot strukturerade strömningstillståndsdata och metadata. Använd dessa funktioner för att observera tillståndsinformation för tillståndskänsliga frågor med strukturerad direktuppspelning, vilket kan vara användbart för övervakning och felsökning.
Du måste ha läsbehörighet till kontrollpunktssökvägen för en strömmande fråga för att kunna fråga tillståndsdata eller metadata. Funktionerna som beskrivs i den här artikeln ger skrivskyddad åtkomst till tillståndsdata och metadata. Du kan bara använda semantik för batchläsning för att hämta tillståndsinformation.
Kommentar
Du kan inte fråga statusinformation för Lakeflow Spark deklarativa pipelines, strömmande tabeller eller materialiserade vyer. Du kan inte fråga statusinformation med hjälp av serverlös beräkning eller beräkning som har konfigurerats med standardåtkomstläge.
Krav
- Använd någon av följande beräkningskonfigurationer:
- Databricks Runtime 16.3 och senare på beräkningsenheter konfigurerade med standardåtkomstläge.
- Databricks Runtime 14.3 LTS och senare på beräkningsresurser konfigurerade med dedikerat eller utan isoleringsåtkomstläge.
- Läs åtkomst till kontrollpunktssökvägen som används av strömningsfrågan.
Läs strukturerad strömningstillståndsbutik
Du kan läsa information om state store för strukturerade strömningsfrågor som körs i vilken som helst av de Databricks Runtime-versioner som stöds. Använd följande syntax:
python
df = (spark.read
.format("statestore")
.load("/checkpoint/path"))
Scala
val df = spark.read
.format("statestore")
.load("/checkpoint/path")
SQL
SELECT * FROM read_statestore('/checkpoint/path')
Api-alternativ och schema för tillståndsläsare
För en fullständig lista över statestore-formatalternativ, se State store.
Utdata har följande schema:
| Kolumn | Typ | beskrivning |
|---|---|---|
key |
Struct (ytterligare typ härledd från statnyckeln) | Nyckeln till en tillståndsbevarande operatörspost i tillståndskontrollpunkten. |
value |
Struct (en ytterligare typ härledd från tillståndsvärdet) | Värdet för en tillståndskänslig operatorpost i en kontrollpunkt för systemtillståndet. |
partition_id |
Heltal | Partitionen för kontrollpunkten för tillstånd som innehåller operatörsposten med tillstånd. |
I Databricks Runtime 16.4 LTS och senare, när alternativet readChangeFeed är inställt på true, har utdata följande schema:
| Kolumn | Typ | beskrivning |
|---|---|---|
batch_id |
Lång | Batch-ID:t som tillståndsändringen tillhör. |
change_type |
Sträng | Den typ av ändring som används av batchen: update för infogningar och uppdateringar, delete för borttagningar. |
key |
Struct (ytterligare typ härledd från statnyckeln) | Nyckeln till en tillståndsbevarande operatörspost i tillståndskontrollpunkten. |
value |
Struct (en ytterligare typ härledd från tillståndsvärdet) | Värdet för en tillståndskänslig operatorpost i en kontrollpunkt för systemtillståndet.
null för poster där change_type är delete. |
partition_id |
Heltal | Partitionen för kontrollpunkten för tillstånd som innehåller operatörsposten med tillstånd. |
Se read_statestore tabellvärdesfunktion.
Läs tillståndsändringar för Structured Streaming
Tillgänglig på Databricks Runtime 16.4 LTS och senare. Om du vill läsa hur tillståndet ändras mellan mikrobatcher i stället för att visa hela tillståndet vid en enda mikrobatch anger du readChangeFeed till true och anger changeStartBatchId. Du kan också ange changeEndBatchId. För en fullständig lista över alternativ, se Tillståndslager.
Om du till exempel vill läsa tillståndsändringar från batch 2 via den senaste bekräftade batchen:
python
df = (spark.read
.format("statestore")
.option("readChangeFeed", True)
.option("changeStartBatchId", 2)
.load("<checkpointLocation>")
)
Scala
val df = spark.read
.format("statestore")
.option("readChangeFeed", true)
.option("changeStartBatchId", 2)
.load("<checkpointLocation>")
SQL
SELECT * FROM read_statestore(
'<checkpointLocation>',
readChangeFeed => true,
changeStartBatchId => 2
);
Utdataschemat innehåller ytterligare batch_id kolumner och change_type kolumner. Det fullständiga schemat finns i Alternativ och schema för tillståndsläsarens API.
Läsa metadata för strukturerat direktuppspelningstillstånd
Tillgänglig på Databricks Runtime 14.3 LTS eller senare. Du kan läsa statusmetadatainformation för frågor om strukturerad direktuppspelning:
python
df = (spark.read
.format("state-metadata")
.load("<checkpointLocation>"))
Scala
val df = spark.read
.format("state-metadata")
.load("<checkpointLocation>")
SQL
SELECT * FROM read_state_metadata('/checkpoint/path')
De returnerade data har följande schema:
| Kolumn | Typ | beskrivning |
|---|---|---|
operatorId |
Heltal | Heltals-ID för den tillståndsbevarande strömningsoperatorn. |
operatorName |
Sträng | Namnet på den tillståndskänsliga strömningsoperatorn. |
stateStoreName |
Sträng | Operatorns tillståndslagernamn. |
numPartitions |
Heltal | Antal partitioner i statuslagret. |
minBatchId |
Lång | Minsta batch-ID som är tillgängligt för frågestatus. |
maxBatchId |
Lång | Det maximala batch-ID som är tillgängligt för frågetillstånd. |
Kommentar
Batch-ID-värdena som tillhandahålls av minBatchId och maxBatchId återspeglar tillståndet när kontrollpunkten skrevs. Gamla batchar rensas automatiskt med micro-batch-körning, så värdet som anges här är inte garanterat fortfarande tillgängligt.
Se read_state_metadata tabellvärdesfunktion.
Exempel: Fråga ena sidan av en stream-stream-koppling
Använd följande syntax för att ställa en förfrågan om den vänstra sidan av en stream-stream-koppling.
python
left_df = (spark.read
.format("statestore")
.option("joinSide", "left")
.load("/checkpoint/path"))
Scala
val leftDf = spark.read
.format("statestore")
.option("joinSide", "left")
.load("/checkpoint/path")
SQL
SELECT * FROM read_statestore(
'/checkpoint/path',
joinSide => 'left'
);
Exempel: Fråga tillståndslager för dataström med flera tillståndskänsliga operatorer
I det här exemplet används tillståndsmetadataläsaren för att samla in metadatainformation om en strömmande fråga med flera tillståndskänsliga operatorer och använder sedan metadataresultatet som alternativ för tillståndsläsaren.
Tillståndsmetadataläsaren tar kontrollpunktssökvägen som det enda alternativet, som i följande syntaxexempel:
python
df = (spark.read
.format("state-metadata")
.load("<checkpointLocation>"))
Scala
val df = spark.read
.format("state-metadata")
.load("<checkpointLocation>")
SQL
SELECT * FROM read_state_metadata('/checkpoint/path')
Följande tabell representerar ett exempel på utdata från tillståndslagermetadata:
| operatorId | operatörsnamn | stateStoreName | antalPartitioner | minBatchId | maxBatchId |
|---|---|---|---|---|---|
| 0 | stateStoreSave | förval | 200 | 0 | tretton |
| 1 | dedupeWithinWatermark | förval | 200 | 0 | tretton |
För att få resultat för operatorn dedupeWithinWatermark frågar du tillståndsläsaren med alternativet operatorId , som i följande exempel:
python
left_df = (spark.read
.format("statestore")
.option("operatorId", 1)
.load("/checkpoint/path"))
Scala
val leftDf = spark.read
.format("statestore")
.option("operatorId", 1)
.load("/checkpoint/path")
SQL
SELECT * FROM read_statestore(
'/checkpoint/path',
operatorId => 1
);