Läsa information om status för strukturerad direktuppspelning

Du kan använda DataFrame-åtgärder eller SQL-tabellvärdesfunktioner för att köra frågor mot strukturerade strömningstillståndsdata och metadata. Använd dessa funktioner för att observera tillståndsinformation för tillståndskänsliga frågor med strukturerad direktuppspelning, vilket kan vara användbart för övervakning och felsökning.

Du måste ha läsbehörighet till kontrollpunktssökvägen för en strömmande fråga för att kunna fråga tillståndsdata eller metadata. Funktionerna som beskrivs i den här artikeln ger skrivskyddad åtkomst till tillståndsdata och metadata. Du kan bara använda semantik för batchläsning för att hämta tillståndsinformation.

Kommentar

Du kan inte fråga statusinformation för Lakeflow Spark deklarativa pipelines, strömmande tabeller eller materialiserade vyer. Du kan inte fråga statusinformation med hjälp av serverlös beräkning eller beräkning som har konfigurerats med standardåtkomstläge.

Krav

  • Använd någon av följande beräkningskonfigurationer:
    • Databricks Runtime 16.3 och senare på beräkningsenheter konfigurerade med standardåtkomstläge.
    • Databricks Runtime 14.3 LTS och senare på beräkningsresurser konfigurerade med dedikerat eller utan isoleringsåtkomstläge.
  • Läs åtkomst till kontrollpunktssökvägen som används av strömningsfrågan.

Läs strukturerad strömningstillståndsbutik

Du kan läsa information om state store för strukturerade strömningsfrågor som körs i vilken som helst av de Databricks Runtime-versioner som stöds. Använd följande syntax:

python

df = (spark.read
  .format("statestore")
  .load("/checkpoint/path"))

Scala

val df = spark.read
  .format("statestore")
  .load("/checkpoint/path")

SQL

SELECT * FROM read_statestore('/checkpoint/path')

Api-alternativ och schema för tillståndsläsare

För en fullständig lista över statestore-formatalternativ, se State store.

Utdata har följande schema:

Kolumn Typ beskrivning
key Struct (ytterligare typ härledd från statnyckeln) Nyckeln till en tillståndsbevarande operatörspost i tillståndskontrollpunkten.
value Struct (en ytterligare typ härledd från tillståndsvärdet) Värdet för en tillståndskänslig operatorpost i en kontrollpunkt för systemtillståndet.
partition_id Heltal Partitionen för kontrollpunkten för tillstånd som innehåller operatörsposten med tillstånd.

I Databricks Runtime 16.4 LTS och senare, när alternativet readChangeFeed är inställt på true, har utdata följande schema:

Kolumn Typ beskrivning
batch_id Lång Batch-ID:t som tillståndsändringen tillhör.
change_type Sträng Den typ av ändring som används av batchen: update för infogningar och uppdateringar, delete för borttagningar.
key Struct (ytterligare typ härledd från statnyckeln) Nyckeln till en tillståndsbevarande operatörspost i tillståndskontrollpunkten.
value Struct (en ytterligare typ härledd från tillståndsvärdet) Värdet för en tillståndskänslig operatorpost i en kontrollpunkt för systemtillståndet. null för poster där change_type är delete.
partition_id Heltal Partitionen för kontrollpunkten för tillstånd som innehåller operatörsposten med tillstånd.

Se read_statestore tabellvärdesfunktion.

Läs tillståndsändringar för Structured Streaming

Tillgänglig på Databricks Runtime 16.4 LTS och senare. Om du vill läsa hur tillståndet ändras mellan mikrobatcher i stället för att visa hela tillståndet vid en enda mikrobatch anger du readChangeFeed till true och anger changeStartBatchId. Du kan också ange changeEndBatchId. För en fullständig lista över alternativ, se Tillståndslager.

Om du till exempel vill läsa tillståndsändringar från batch 2 via den senaste bekräftade batchen:

python

df = (spark.read
  .format("statestore")
  .option("readChangeFeed", True)
  .option("changeStartBatchId", 2)
  .load("<checkpointLocation>")
)

Scala

val df = spark.read
  .format("statestore")
  .option("readChangeFeed", true)
  .option("changeStartBatchId", 2)
  .load("<checkpointLocation>")

SQL

SELECT * FROM read_statestore(
    '<checkpointLocation>',
    readChangeFeed => true,
    changeStartBatchId => 2
);

Utdataschemat innehåller ytterligare batch_id kolumner och change_type kolumner. Det fullständiga schemat finns i Alternativ och schema för tillståndsläsarens API.

Läsa metadata för strukturerat direktuppspelningstillstånd

Tillgänglig på Databricks Runtime 14.3 LTS eller senare. Du kan läsa statusmetadatainformation för frågor om strukturerad direktuppspelning:

python

df = (spark.read
  .format("state-metadata")
  .load("<checkpointLocation>"))

Scala

val df = spark.read
  .format("state-metadata")
  .load("<checkpointLocation>")

SQL

SELECT * FROM read_state_metadata('/checkpoint/path')

De returnerade data har följande schema:

Kolumn Typ beskrivning
operatorId Heltal Heltals-ID för den tillståndsbevarande strömningsoperatorn.
operatorName Sträng Namnet på den tillståndskänsliga strömningsoperatorn.
stateStoreName Sträng Operatorns tillståndslagernamn.
numPartitions Heltal Antal partitioner i statuslagret.
minBatchId Lång Minsta batch-ID som är tillgängligt för frågestatus.
maxBatchId Lång Det maximala batch-ID som är tillgängligt för frågetillstånd.

Kommentar

Batch-ID-värdena som tillhandahålls av minBatchId och maxBatchId återspeglar tillståndet när kontrollpunkten skrevs. Gamla batchar rensas automatiskt med micro-batch-körning, så värdet som anges här är inte garanterat fortfarande tillgängligt.

Se read_state_metadata tabellvärdesfunktion.

Exempel: Fråga ena sidan av en stream-stream-koppling

Använd följande syntax för att ställa en förfrågan om den vänstra sidan av en stream-stream-koppling.

python

left_df = (spark.read
  .format("statestore")
  .option("joinSide", "left")
  .load("/checkpoint/path"))

Scala

val leftDf = spark.read
  .format("statestore")
  .option("joinSide", "left")
  .load("/checkpoint/path")

SQL

SELECT * FROM read_statestore(
    '/checkpoint/path',
    joinSide => 'left'
);

Exempel: Fråga tillståndslager för dataström med flera tillståndskänsliga operatorer

I det här exemplet används tillståndsmetadataläsaren för att samla in metadatainformation om en strömmande fråga med flera tillståndskänsliga operatorer och använder sedan metadataresultatet som alternativ för tillståndsläsaren.

Tillståndsmetadataläsaren tar kontrollpunktssökvägen som det enda alternativet, som i följande syntaxexempel:

python

df = (spark.read
  .format("state-metadata")
  .load("<checkpointLocation>"))

Scala

val df = spark.read
  .format("state-metadata")
  .load("<checkpointLocation>")

SQL

SELECT * FROM read_state_metadata('/checkpoint/path')

Följande tabell representerar ett exempel på utdata från tillståndslagermetadata:

operatorId operatörsnamn stateStoreName antalPartitioner minBatchId maxBatchId
0 stateStoreSave förval 200 0 tretton
1 dedupeWithinWatermark förval 200 0 tretton

För att få resultat för operatorn dedupeWithinWatermark frågar du tillståndsläsaren med alternativet operatorId , som i följande exempel:

python

left_df = (spark.read
  .format("statestore")
  .option("operatorId", 1)
  .load("/checkpoint/path"))

Scala

val leftDf = spark.read
  .format("statestore")
  .option("operatorId", 1)
  .load("/checkpoint/path")

SQL

SELECT * FROM read_statestore(
    '/checkpoint/path',
    operatorId => 1
);