Notitie
Voor toegang tot deze pagina is autorisatie vereist. U kunt proberen u aan te melden of de directory te wijzigen.
Voor toegang tot deze pagina is autorisatie vereist. U kunt proberen de mappen te wijzigen.
U kunt DataFrame-bewerkingen of SQL-tabelwaardefuncties gebruiken om query's uit te voeren op statusgegevens en metagegevens van gestructureerde streaming. Gebruik deze functies om statusinformatie te bekijken voor stateful query's met structured streaming. Dit kan handig zijn voor bewaking en foutopsporing.
U moet leestoegang hebben tot het controlepuntpad voor een streamingquery om statusgegevens of metagegevens op te vragen. De functies die in dit artikel worden beschreven, bieden alleen-lezentoegang tot statusgegevens en metagegevens. U kunt alleen batchleessemantiek gebruiken om staatinformatie op te vragen.
Notitie
U kunt geen query's uitvoeren op statusgegevens voor declaratieve Pijplijnen van Lakeflow Spark, streamingtabellen of gerealiseerde weergaven. U kunt geen query's uitvoeren op statusgegevens met behulp van serverloze berekeningen of berekeningen die zijn geconfigureerd met de standaardtoegangsmodus.
Behoeften
- Gebruik een van de volgende rekenconfiguraties:
- Databricks Runtime 16.3 en hoger op computers die zijn geconfigureerd met de standaardtoegangsmodus.
- Databricks Runtime 14.3 LTS en hoger op rekenkracht die is geconfigureerd met toegewezen of geen isolatietoegangsmodus.
- Leestoegang tot het controlepuntpad dat wordt gebruikt door de streamingquery.
Statusopslag voor gestructureerde streaming lezen
U kunt statusopslaginformatie lezen voor Structured Streaming-query's die worden uitgevoerd in elke ondersteunde Databricks Runtime. Gebruik de volgende syntaxis:
Python
df = (spark.read
.format("statestore")
.load("/checkpoint/path"))
Scala
val df = spark.read
.format("statestore")
.load("/checkpoint/path")
SQL
SELECT * FROM read_statestore('/checkpoint/path')
API-opties en schema voor statuslezer
Zie statestore voor een volledige lijst met indelingsopties.
De uitvoergegevens hebben het volgende schema:
| Kolom | Typologie | Beschrijving |
|---|---|---|
key |
Struct (verder type afgeleid van de statussleutel) | De sleutel voor een toestand-gebaseerd operatorrecord in het statuscontrolepunt. |
value |
Struct (verder type afgeleid van de statuswaarde) | De waarde voor een stateful operatorrecord in het statuscontrolepunt. |
partition_id |
Geheel getal | De partitie van het toestandcontrolepunt dat het stateful operatorrecord bevat. |
In Databricks Runtime 16.4 LTS en hoger, wanneer de readChangeFeed optie is ingesteld trueop, hebben de uitvoergegevens het volgende schema:
| Kolom | Typologie | Beschrijving |
|---|---|---|
batch_id |
Lang | De batch-id waartoe de statuswijziging behoort. |
change_type |
Snaar / Touwtje | Het type wijziging dat door de batch wordt toegepast: update voor invoegingen en updates, delete voor verwijderingen. |
key |
Struct (verder type afgeleid van de statussleutel) | De sleutel voor een toestand-gebaseerd operatorrecord in het statuscontrolepunt. |
value |
Struct (verder type afgeleid van de statuswaarde) | De waarde voor een stateful operatorrecord in het statuscontrolepunt.
null voor records waarbij change_typedelete is. |
partition_id |
Geheel getal | De partitie van het toestandcontrolepunt dat het stateful operatorrecord bevat. |
Zie read_statestore tabelwaardefunctie.
Statuswijzigingen voor gestructureerd streamen lezen
Beschikbaar op Databricks Runtime 16.4 LTS en hoger. Als u wilt zien hoe de status verandert tussen microbatches in plaats van de volledige status van één microbatch te bekijken, stelt u readChangeFeed in op true en specificeert u changeStartBatchId. Geef desgewenst changeEndBatchIdop. Zie State Store voor een volledige lijst met opties.
Als u bijvoorbeeld statuswijzigingen uit batch 2 wilt lezen via de meest recente vastgelegde batch:
Python
df = (spark.read
.format("statestore")
.option("readChangeFeed", True)
.option("changeStartBatchId", 2)
.load("<checkpointLocation>")
)
Scala
val df = spark.read
.format("statestore")
.option("readChangeFeed", true)
.option("changeStartBatchId", 2)
.load("<checkpointLocation>")
SQL
SELECT * FROM read_statestore(
'<checkpointLocation>',
readChangeFeed => true,
changeStartBatchId => 2
);
Het uitvoerschema bevat extra batch_id kolommen en change_type kolommen. Zie API-opties en schema voor de statuslezer voor het volledige schema.
Metagegevens van gestructureerde streamingstatus lezen
Beschikbaar op Databricks Runtime 14.3 LTS of hoger. U kunt statusmetagegevens voor structured streaming-query's lezen:
Python
df = (spark.read
.format("state-metadata")
.load("<checkpointLocation>"))
Scala
val df = spark.read
.format("state-metadata")
.load("<checkpointLocation>")
SQL
SELECT * FROM read_state_metadata('/checkpoint/path')
De geretourneerde gegevens hebben het volgende schema:
| Kolom | Typologie | Beschrijving |
|---|---|---|
operatorId |
Geheel getal | Het integer-ID van de stateful streaming-operator. |
operatorName |
Snaar / Touwtje | Naam van de stateful streaming-operator. |
stateStoreName |
Snaar / Touwtje | Naam van de statusopslag van de operator. |
numPartitions |
Geheel getal | Aantal partities van de toestandwinkel. |
minBatchId |
Lang | De minimale batch-id die beschikbaar is voor het opvragen van statusinformatie. |
maxBatchId |
Lang | De maximale batch-id die beschikbaar is voor het opvragen van de staat. |
Notitie
De batch-id-waarden van minBatchId en maxBatchId geven de status weer op het moment dat het controlepunt is geschreven. Oude batches worden automatisch opgeschoond met microbatchuitvoering, dus de hier opgegeven waarde is niet gegarandeerd nog steeds beschikbaar.
Zie read_state_metadata tabelwaardefunctie.
Voorbeeld: Een query uitvoeren op één zijde van een stream-stream-join
Gebruik de volgende syntaxis om een query uit te voeren aan de linkerkant van een stream-stream-join:
Python
left_df = (spark.read
.format("statestore")
.option("joinSide", "left")
.load("/checkpoint/path"))
Scala
val leftDf = spark.read
.format("statestore")
.option("joinSide", "left")
.load("/checkpoint/path")
SQL
SELECT * FROM read_statestore(
'/checkpoint/path',
joinSide => 'left'
);
Voorbeeld: Querystatusarchief voor stream met meerdere stateful operators
In deze voorbeelden wordt de metagegevenslezer van de status gebruikt om metagegevens van een streamingquery met meerdere stateful operators te verzamelen. Vervolgens worden de metagegevensresultaten gebruikt als opties voor de statuslezer.
De lezer van de statusmetagegevens neemt het controlepuntpad als enige optie, zoals in het volgende syntaxisvoorbeeld:
Python
df = (spark.read
.format("state-metadata")
.load("<checkpointLocation>"))
Scala
val df = spark.read
.format("state-metadata")
.load("<checkpointLocation>")
SQL
SELECT * FROM read_state_metadata('/checkpoint/path')
De volgende tabel geeft een voorbeelduitvoer van metagegevens van de statusopslag weer.
| operatorId | operatornaam | stateStoreName | numPartitions | minBatchId | maxBatchId |
|---|---|---|---|---|---|
| 0 | stateStoreOpslaan | standaard | 200 | 0 | 13 |
| 1 | dedupeWithinWatermark | standaard | 200 | 0 | 13 |
Als u resultaten voor de dedupeWithinWatermark operator wilt ophalen, voert u een query uit op de statuslezer met de operatorId optie, zoals in het volgende voorbeeld:
Python
left_df = (spark.read
.format("statestore")
.option("operatorId", 1)
.load("/checkpoint/path"))
Scala
val leftDf = spark.read
.format("statestore")
.option("operatorId", 1)
.load("/checkpoint/path")
SQL
SELECT * FROM read_statestore(
'/checkpoint/path',
operatorId => 1
);