Gestructureerde streamingstatusinformatie lezen

U kunt DataFrame-bewerkingen of SQL-tabelwaardefuncties gebruiken om query's uit te voeren op statusgegevens en metagegevens van gestructureerde streaming. Gebruik deze functies om statusinformatie te bekijken voor stateful query's met structured streaming. Dit kan handig zijn voor bewaking en foutopsporing.

U moet leestoegang hebben tot het controlepuntpad voor een streamingquery om statusgegevens of metagegevens op te vragen. De functies die in dit artikel worden beschreven, bieden alleen-lezentoegang tot statusgegevens en metagegevens. U kunt alleen batchleessemantiek gebruiken om staatinformatie op te vragen.

Notitie

U kunt geen query's uitvoeren op statusgegevens voor declaratieve Pijplijnen van Lakeflow Spark, streamingtabellen of gerealiseerde weergaven. U kunt geen query's uitvoeren op statusgegevens met behulp van serverloze berekeningen of berekeningen die zijn geconfigureerd met de standaardtoegangsmodus.

Behoeften

  • Gebruik een van de volgende rekenconfiguraties:
    • Databricks Runtime 16.3 en hoger op computers die zijn geconfigureerd met de standaardtoegangsmodus.
    • Databricks Runtime 14.3 LTS en hoger op rekenkracht die is geconfigureerd met toegewezen of geen isolatietoegangsmodus.
  • Leestoegang tot het controlepuntpad dat wordt gebruikt door de streamingquery.

Statusopslag voor gestructureerde streaming lezen

U kunt statusopslaginformatie lezen voor Structured Streaming-query's die worden uitgevoerd in elke ondersteunde Databricks Runtime. Gebruik de volgende syntaxis:

Python

df = (spark.read
  .format("statestore")
  .load("/checkpoint/path"))

Scala

val df = spark.read
  .format("statestore")
  .load("/checkpoint/path")

SQL

SELECT * FROM read_statestore('/checkpoint/path')

API-opties en schema voor statuslezer

Zie statestore voor een volledige lijst met indelingsopties.

De uitvoergegevens hebben het volgende schema:

Kolom Typologie Beschrijving
key Struct (verder type afgeleid van de statussleutel) De sleutel voor een toestand-gebaseerd operatorrecord in het statuscontrolepunt.
value Struct (verder type afgeleid van de statuswaarde) De waarde voor een stateful operatorrecord in het statuscontrolepunt.
partition_id Geheel getal De partitie van het toestandcontrolepunt dat het stateful operatorrecord bevat.

In Databricks Runtime 16.4 LTS en hoger, wanneer de readChangeFeed optie is ingesteld trueop, hebben de uitvoergegevens het volgende schema:

Kolom Typologie Beschrijving
batch_id Lang De batch-id waartoe de statuswijziging behoort.
change_type Snaar / Touwtje Het type wijziging dat door de batch wordt toegepast: update voor invoegingen en updates, delete voor verwijderingen.
key Struct (verder type afgeleid van de statussleutel) De sleutel voor een toestand-gebaseerd operatorrecord in het statuscontrolepunt.
value Struct (verder type afgeleid van de statuswaarde) De waarde voor een stateful operatorrecord in het statuscontrolepunt. null voor records waarbij change_typedelete is.
partition_id Geheel getal De partitie van het toestandcontrolepunt dat het stateful operatorrecord bevat.

Zie read_statestore tabelwaardefunctie.

Statuswijzigingen voor gestructureerd streamen lezen

Beschikbaar op Databricks Runtime 16.4 LTS en hoger. Als u wilt zien hoe de status verandert tussen microbatches in plaats van de volledige status van één microbatch te bekijken, stelt u readChangeFeed in op true en specificeert u changeStartBatchId. Geef desgewenst changeEndBatchIdop. Zie State Store voor een volledige lijst met opties.

Als u bijvoorbeeld statuswijzigingen uit batch 2 wilt lezen via de meest recente vastgelegde batch:

Python

df = (spark.read
  .format("statestore")
  .option("readChangeFeed", True)
  .option("changeStartBatchId", 2)
  .load("<checkpointLocation>")
)

Scala

val df = spark.read
  .format("statestore")
  .option("readChangeFeed", true)
  .option("changeStartBatchId", 2)
  .load("<checkpointLocation>")

SQL

SELECT * FROM read_statestore(
    '<checkpointLocation>',
    readChangeFeed => true,
    changeStartBatchId => 2
);

Het uitvoerschema bevat extra batch_id kolommen en change_type kolommen. Zie API-opties en schema voor de statuslezer voor het volledige schema.

Metagegevens van gestructureerde streamingstatus lezen

Beschikbaar op Databricks Runtime 14.3 LTS of hoger. U kunt statusmetagegevens voor structured streaming-query's lezen:

Python

df = (spark.read
  .format("state-metadata")
  .load("<checkpointLocation>"))

Scala

val df = spark.read
  .format("state-metadata")
  .load("<checkpointLocation>")

SQL

SELECT * FROM read_state_metadata('/checkpoint/path')

De geretourneerde gegevens hebben het volgende schema:

Kolom Typologie Beschrijving
operatorId Geheel getal Het integer-ID van de stateful streaming-operator.
operatorName Snaar / Touwtje Naam van de stateful streaming-operator.
stateStoreName Snaar / Touwtje Naam van de statusopslag van de operator.
numPartitions Geheel getal Aantal partities van de toestandwinkel.
minBatchId Lang De minimale batch-id die beschikbaar is voor het opvragen van statusinformatie.
maxBatchId Lang De maximale batch-id die beschikbaar is voor het opvragen van de staat.

Notitie

De batch-id-waarden van minBatchId en maxBatchId geven de status weer op het moment dat het controlepunt is geschreven. Oude batches worden automatisch opgeschoond met microbatchuitvoering, dus de hier opgegeven waarde is niet gegarandeerd nog steeds beschikbaar.

Zie read_state_metadata tabelwaardefunctie.

Voorbeeld: Een query uitvoeren op één zijde van een stream-stream-join

Gebruik de volgende syntaxis om een query uit te voeren aan de linkerkant van een stream-stream-join:

Python

left_df = (spark.read
  .format("statestore")
  .option("joinSide", "left")
  .load("/checkpoint/path"))

Scala

val leftDf = spark.read
  .format("statestore")
  .option("joinSide", "left")
  .load("/checkpoint/path")

SQL

SELECT * FROM read_statestore(
    '/checkpoint/path',
    joinSide => 'left'
);

Voorbeeld: Querystatusarchief voor stream met meerdere stateful operators

In deze voorbeelden wordt de metagegevenslezer van de status gebruikt om metagegevens van een streamingquery met meerdere stateful operators te verzamelen. Vervolgens worden de metagegevensresultaten gebruikt als opties voor de statuslezer.

De lezer van de statusmetagegevens neemt het controlepuntpad als enige optie, zoals in het volgende syntaxisvoorbeeld:

Python

df = (spark.read
  .format("state-metadata")
  .load("<checkpointLocation>"))

Scala

val df = spark.read
  .format("state-metadata")
  .load("<checkpointLocation>")

SQL

SELECT * FROM read_state_metadata('/checkpoint/path')

De volgende tabel geeft een voorbeelduitvoer van metagegevens van de statusopslag weer.

operatorId operatornaam stateStoreName numPartitions minBatchId maxBatchId
0 stateStoreOpslaan standaard 200 0 13
1 dedupeWithinWatermark standaard 200 0 13

Als u resultaten voor de dedupeWithinWatermark operator wilt ophalen, voert u een query uit op de statuslezer met de operatorId optie, zoals in het volgende voorbeeld:

Python

left_df = (spark.read
  .format("statestore")
  .option("operatorId", 1)
  .load("/checkpoint/path"))

Scala

val leftDf = spark.read
  .format("statestore")
  .option("operatorId", 1)
  .load("/checkpoint/path")

SQL

SELECT * FROM read_statestore(
    '/checkpoint/path',
    operatorId => 1
);