Hinweis
Für den Zugriff auf diese Seite ist eine Autorisierung erforderlich. Sie können versuchen, sich anzumelden oder das Verzeichnis zu wechseln.
Für den Zugriff auf diese Seite ist eine Autorisierung erforderlich. Sie können versuchen, das Verzeichnis zu wechseln.
Sie können transformWithState verwenden, um zustandsbehaftete Streaminganwendungen zu erstellen und Lösungen mit geringer Latenz sowie nahezu in Echtzeit arbeitende Lösungen zu implementieren. Mit benutzerdefinierten zustandsbehafteten Operatoren können Sie beliebige Zustandslogik erstellen, mit der Sie neue betriebsbereite Anwendungsfälle erstellen können, die mit der herkömmlichen Strukturierten Streaming-Verarbeitung nicht möglich sind.
Hinweis
Für zustandsbehaftete Vorgänge wie Aggregationen, Deduplizierung und Streamingverknüpfungen empfiehlt Databricks die Verwendung integrierter strukturierter Streamingoperatoren anstelle von benutzerdefinierter Logik. Siehe Was ist zustandsbehaftetes Streaming?.
Databricks empfiehlt die Verwendung von transformWithState anstelle von veralteten Operatoren wie flatMapGroupsWithState und mapGroupsWithState für Transformationen beliebiger Zustände. Weitere Informationen finden Sie unter Legacy-Operatoren mit beliebigen Zustandszuständen.
Anforderungen
Die Operatoren transformWithState und transformWithStateInPandas unterliegen den folgenden Anforderungen:
- Verfügbar in Databricks Runtime 16.2 und höher.
- Verwenden Sie für den Echtzeitmodus Databricks Runtime 17.3 LTS oder höher. Sehen Sie sich den Echtzeitmodus im strukturierten Streaming an.
- Für den Standardzugriffsmodus ist Python in Databricks Runtime 16.3 und höher verfügbar, und Scala ist in Databricks Runtime 17.3 und höher verfügbar.
- RocksDB ist der Standardstatusspeicheranbieter in Databricks Runtime 17.3 und höher.
Für Databricks Runtime 17.2 und unten müssen Sie den RocksDB-Zustandsspeicheranbieter konfigurieren. Databricks empfiehlt die Aktivierung von RocksDB in der Spark-Konfiguration.
spark.conf.set("spark.sql.streaming.stateStore.providerClass", "org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider")
Was ist transformWithState?
Der transformWithState Operator wendet einen benutzerdefinierten zustandsbehafteten Prozessor auf eine Strukturierte Streaming-Abfrage an. Sie müssen einen benutzerdefinierten zustandsbehafteten Prozessor implementieren, um transformWithState verwenden zu können. Strukturiertes Streaming umfasst APIs zum Erstellen Ihres zustandsvollen Prozessors mithilfe von Python, Scala oder Java.
Verwenden Sie transformWithState, um benutzerdefinierte Logik auf einen Gruppierungsschlüssel anzuwenden. Im Folgenden wird das allgemeine Design beschrieben:
- Definieren Sie eine oder mehrere Statusvariablen.
- Statusinformationen werden für jeden Gruppierungsschlüssel beibehalten. Sie können auf jede Zustandsvariable im benutzerdefinierten Code zugreifen.
- Für jeden verarbeiteten Mikrobatch sind alle Zeilen für den Schlüssel als Iterator verfügbar.
- Verwenden Sie
StatefulProcessorHandlemit Timern und benutzerdefinierten Bedingungen, um zu steuern, wie Zeilen ausgegeben werden. - Zur Verwaltung der Ablaufzeit von Zuständen und der Zustandsgröße unterstützen Zustandswerte individuelle TTL-Definitionen (Time-to-Live).
Da transformWithState die Schemaevolution im Zustandsspeicher unterstützt, können Sie Ihre Produktionsanwendungen iterativ weiterentwickeln und aktualisieren, ohne historische Zustandsinformationen zu verlieren. Nach dem Aktualisieren des Statusschemas müssen Sie keine Zeilen neu verarbeiten, was Codebereitstellungen und Wartung vereinfacht. Siehe Schemaentwicklung im Zustandsspeicher.
Von Bedeutung
Azure Databricks Dokumentation verwendettransformWithState, um sowohl Python- als auch Scala-Implementierungen zu beschreiben:
- PySpark unterstützt sowohl die zeilenbasierte
transformWithStateAPI als auch den Pandas-basiertentransformWithStateInPandasOperator.-
transformWithStateInPandaswird im Echtzeitmodus nicht unterstützt. Verwenden Sie stattdessentransformWithState. Weitere Informationen finden Sie untertransformWithStateim Echtzeitmodus.
-
- Scala unterstützt nur die zeilenbasierte
transformWithStateAPI.
Die Scala- und Python Implementierungen transformWithState verfügen über die gleichen Funktionen, aber mit einigen Unterschieden in der Syntax.
Definieren eines StatefulProcessor
Sie definieren einen zustandsbehafteten Prozessor, indem Sie die StatefulProcessor Klasse erweitern und ihre Methoden implementieren.
Spark übergibt eine StatefulProcessorHandle an die init-Methode Ihrer StatefulProcessor. Verwenden Sie das Handle, um Zustandsvariablen zu erstellen und mit dem Zustandsspeicher zu interagieren.
transformWithState unterstützt drei Statustypen: ValueState, ListState, und MapState. Jeder Typ speichert den Zustand für jeden Gruppierungsschlüssel mithilfe einer anderen zugrunde liegenden Datenstruktur.
Implementieren Sie die folgenden Methoden, um Ihre benutzerdefinierte Logik zu definieren:
- Implementieren Sie die Implementierung
handleInputRows, um zu steuern, wie Ihre Anwendung Daten verarbeitet, den Zustand aktualisiert und Zeilen für jeden Mikrobatch ausgibt. Siehe Behandeln von Eingabezeilen. - Implementieren Sie
handleExpiredTimer, um zeitbasierte Logik auszuführen, unabhängig davon, ob der Gruppierungsschlüssel in einem Mikrobatch neue Zeilen erhält. Siehe Behandeln abgelaufener Timer. - Implementieren Sie optional
handleInitialState, um den Zustand vorab zu füllen, bevor Ihre Anwendung Eingabezeilen verarbeitet. Siehe Handle initial state.
In der folgenden Tabelle werden die funktionalen Verhaltensweisen dieser Methoden verglichen:
| Verhalten | handleInputRows |
handleExpiredTimer |
|---|---|---|
| Abrufen, Platzieren, Aktualisieren oder Löschen von Zustandswerten | Ja | Ja |
| Erstellen oder Löschen eines Timers | Ja | Ja |
| Zeilen ausgeben | Ja | Ja |
| Durchlaufen von Zeilen im aktuellen Mikrobatch | Ja | Nein |
| Triggerlogik basierend auf verstrichener Zeit | Nein | Ja |
Sie können sowohl handleInputRows als auch handleExpiredTimer kombinieren, um bei Bedarf komplexe Logik zu implementieren.
Sie können beispielsweise eine Anwendung implementieren, die handleInputRows verwendet, um die Zustandswerte für jeden Mikro-Batch zu aktualisieren und einen Timer für die nächsten 10 Sekunden zu setzen. Wenn keine zusätzlichen Zeilen verarbeitet werden, können Sie handleExpiredTimer verwenden, um die aktuellen Werte im Statusspeicher auszugeben. Wenn neue Zeilen für den Gruppierungsschlüssel verarbeitet werden, können Sie den vorhandenen Timer löschen und einen neuen Timer festlegen.
StatefulProcessorHandle
In PySpark ermöglicht ihnen die StatefulProcessorHandle Klasse den Zugriff auf Funktionen, die steuern, wie Ihr Code Zustandsinformationen verwendet.
Beim Initialisieren eines StatefulProcessor müssen Sie das StatefulProcessorHandle immer importieren und an die Variable handle übergeben. Die variable handle verknüpft die lokale Variable in Ihrer Python Klasse mit der Zustandsvariable.
Hinweis
Scala verwendet die getHandle Methode.
Benutzerdefinierte Zustandstypen
Sie können mehrere Zustandsobjekte in einem einzelnen Zustandsoperator implementieren.
Wählen Sie einen Statustyp basierend auf Der vollständigen Anwendungslogik aus. Zum Beispiel könnten Sie Sitzungen mit ValueState nach user_id und session_id gruppiert nachverfolgen. Oder verwenden Sie zum Auswerten von Bedingungen über mehrere Sitzungen hinweg ein nach user_id gruppiertes MapState, wobei session_id als Schlüssel der Zuordnung verwendet wird.
Wenn Ihr Zustandsobjekt ein StructType verwendet, müssen Sie für jedes Feld in der Struktur des Schemas eindeutige Namen definieren. Diese Namen sind beim Lesen des Zustandsspeichers sichtbar. Weitere Informationen finden Sie unter Lesen von strukturierten Streamingstatusinformationen.
In den folgenden Abschnitten werden die von transformWithState unterstützten Zustandstypen beschrieben:
ValueState
ValueState speichert einen Wert für jeden Gruppierungsschlüssel.
Ein Wertstatus kann komplexe Typen enthalten, z. B. eine Struktur oder ein Tupel. Für ValueState, müssen Sie Logik implementieren, um den gesamten Wert zu ersetzen.
Die Time-to-Live eines Wertstatus wird zurückgesetzt, wenn der Wert aktualisiert wird. Wenn Sie einen Quellschlüssel ohne ValueState Aktualisierung des gespeicherten ValueStateSchlüssels verarbeiten, wird der Zeit-zu-Live-Vorgang nicht zurückgesetzt.
ListState
ListState speichert eine Liste für jeden Gruppierungsschlüssel.
Ein Listenstatus ist eine Sammlung von Werten, von denen jeder komplexe Typen enthalten kann. Jeder Wert in einer Liste hat seine eigene Gültigkeitsdauer.
Sie können einer Liste Elemente hinzufügen, indem Sie einzelne Elemente anfügen, eine Liste mit Elementen anfügen oder die gesamte Liste mit einer putüberschreiben. Zum Zurücksetzen von Time-to-Live müssen Sie einen put Vorgang verwenden.
MapState
MapState speichert eine Karte für jeden Gruppierungsschlüssel. Karten sind das Apache Spark-Äquivalent zu einem Python Wörterbuch (dict).
Ein Kartenzustand ist eine Sammlung unterschiedlicher Schlüssel, die jeweils einem Wert zugeordnet sind, von denen jeder komplexe Typen enthalten kann. Jedes Schlüssel-Wert-Paar in einer Karte verfügt über ein eigenes Zeit-zu-Leben-Verhältnis.
Sie können den Wert eines bestimmten Schlüssels aktualisieren oder einen Schlüssel und dessen Wert entfernen. Sie können einen einzelnen Wert mithilfe des Schlüssels zurückgeben, alle Schlüssel auflisten, alle Werte auflisten oder einen Iterator zurückgeben, um mit dem vollständigen Satz von Schlüsselwertpaaren in der Karte zu arbeiten.
Von Bedeutung
Gruppierungsschlüssel beschreiben die in der GROUP BY Klausel der Strukturierten Streaming-Abfrage angegebenen Felder. Map-Zustände können eine beliebige Anzahl von Schlüssel-Wert-Paaren für einen Gruppierungsschlüssel enthalten.
Wenn Ihre Abfrage beispielsweise GROUP BY user_id verwendet und Sie für jedes session_id eine Map definieren möchten, ist user_id Ihr Gruppierungsschlüssel und der Schlüssel MapState ist session_id:
Python
class SessionTracker(StatefulProcessor):
def init(self, handle: StatefulProcessorHandle) -> None:
self.sessions = handle.getMapState("sessions", StringType(), LongType())
def handleInputRows(self, key, rows: Iterator[Row], timerValues) -> Iterator[Row]:
for row in rows:
session_id = row["session_id"] # session_id is the MapState key
count = self.sessions.getValue(session_id)[0] if self.sessions.containsKey(session_id) else 0
new_count = count + 1
self.sessions.updateValue(session_id, (new_count,))
yield from []
def close(self) -> None:
pass
df.groupBy("user_id").transformWithState(SessionTracker(), ...) # user_id is the grouping key
Scala
case class Event(userId: String, sessionId: String)
class SessionTracker extends StatefulProcessor[String, Event, Row] {
@transient private var sessions: MapState[String, Long] = _
override def init(outputMode: OutputMode, timeMode: TimeMode): Unit = {
sessions = getHandle.getMapState[String, Long]("sessions", Encoders.STRING, Encoders.scalaLong, TTLConfig.NONE)
}
override def handleInputRows(
key: String,
rows: Iterator[Event],
timerValues: TimerValues): Iterator[Row] = {
rows.foreach { event =>
val count = if (sessions.containsKey(event.sessionId)) sessions.getValue(event.sessionId) else 0L
sessions.updateValue(event.sessionId, count + 1) // sessionId is the MapState key
}
Iterator.empty
}
}
df.as[Event]
.groupByKey(_.userId) // userId is the grouping key
.transformWithState(new SessionTracker(), TimeMode.None(), OutputMode.Update())
Erstellen einer benutzerdefinierten Zustandsvariablen in der StatefulProcessor
Beim Initialisieren des StatefulProcessorObjekts erstellen Sie eine lokale Variable für jedes Zustandsobjekt, mit dem Sie mit Zustandsobjekten in Ihrer benutzerdefinierten Logik interagieren können. Definieren und initialisieren Sie Zustandsvariablen, indem Sie die integrierte init Methode in der StatefulProcessor Klasse überschreiben.
Sie können eine beliebige Anzahl von Zustandsobjekten mithilfe der Methoden getValueState, getListState und getMapState in Ihrem StatefulProcessor definieren.
Jedes Zustandsobjekt muss folgendes aufweisen:
- Ein eindeutiger Name
- Ein Schema
- In Python müssen Sie das Schema angeben.
- In Scala können Sie
Encoderübergeben, um ein Zustandsschema anzugeben.
Optional können Sie auch eine Zeit-zu-Live-Dauer (TTL) in Millisekunden bereitstellen. Wenn Sie einen Kartenzustand implementieren, müssen Sie eine separate Schemadefinition für die Kartenschlüssel und die Werte bereitstellen.
Hinweis
Die Logik in StatefulProcessor ist getrennt für das Abfragen, Aktualisieren und Ausgeben von Zustandsinformationen zuständig. Weitere Informationen finden Sie unter Verwenden der Statusvariablen in Methoden mit benutzerdefinierter Logik.
Verwenden der Statusvariablen in Methoden mit benutzerdefinierter Logik
Statusobjekte verfügen über Methoden zum Abrufen des Zustands, Aktualisieren vorhandener Statusinformationen und Löschen des aktuellen Zustands.
Jeder Gruppierungsschlüssel verfügt über dedizierte Statusinformationen.
-
StatefulProcessorgibt Zeilen basierend auf Ihrer benutzerdefinierten Logik und dem angegebenen Ausgabeschema aus. Siehe "Zeilen ausgeben". - Verwenden Sie den
statestoreReader, um auf Werte im Zustandsspeicher zuzugreifen. Dieser Reader ist für Batchworkloads vorgesehen und ist nicht für Workloads mit geringer Latenz vorgesehen. Weitere Informationen finden Sie unter Lesen von strukturierten Streamingstatusinformationen. - Die mit der Verwendung
handleInputRowsangegebene Logik wird nur ausgeführt, wenn Zeilen für den Schlüssel in einem Mikrobatch vorhanden sind. Siehe Behandeln von Eingabezeilen. - Verwenden Sie
handleExpiredTimer, um zeitbasierte Logik zu implementieren, die nicht davon abhängt, durch das Beobachten von Zeilen ausgelöst zu werden. Siehe Behandeln abgelaufener Timer.
Hinweis
Statusobjekte werden durch Gruppieren von Schlüsseln mit den folgenden Auswirkungen isoliert:
- Zustandswerte können nicht von Zeilen betroffen sein, die einem anderen Gruppierungsschlüssel zugeordnet sind.
- Sie können keine Logik implementieren, die vom Vergleichen von Werten oder dem Aktualisieren des Zustands über Gruppierungsschlüssel abhängt.
Sie können Werte in einem Gruppierungsschlüssel vergleichen. Verwenden Sie MapState um Logik mit einem zweiten Schlüssel zu implementieren, den Ihre benutzerdefinierte Logik verwenden kann. Zum Beispiel können Sie durch die Gruppierung nach user_id und die Verwendung von ip_address für Ihren MapState-Schlüssel gleichzeitige Benutzersitzungen verfolgen.
Erweiterte Überlegungen zum Arbeiten mit Zustand
Zustandsupdates sind fehlertolerant. Wenn ein Vorgang abstürzt, bevor eine Mikrobatchverarbeitung abgeschlossen ist, verwendet der Wiederholungsvorgang den Wert aus dem letzten erfolgreichen Mikrobatch.
Für eine optimierte Leistung empfiehlt Databricks, alle Werte im Iterator für einen bestimmten Schlüssel zu verarbeiten und Aktualisierungen in einem einzigen Schreibvorgang zu übernehmen. Wenn Sie in eine Zustandsvariable schreiben, löst dies einen Schreibvorgang in RocksDB aus.
Statuswerte haben keine Standardwerte. Wenn Ihre Logik das Lesen vorhandener Statusinformationen erfordert, verwenden Sie die exists Methode.
Um Logik für den NULL-Zustand zu implementieren, MapState können Sie mithilfe von Variablen nach einzelnen Schlüsseln suchen oder alle Schlüssel auflisten.
Eingabezeilen bearbeiten
Verwenden Sie die handleInputRows Methode, um zu definieren, wie Ihre Anwendung Zeilen verarbeitet und Zustandswerte aktualisiert. Diese Methode wird jedes Mal ausgeführt, wenn Ihre Strukturierte Streaming-Abfrage Zeilen für einen Gruppierungsschlüssel verarbeitet.
Für die meisten zustandsbehafteten Anwendungen, die mit transformWithState implementiert werden, wird die Kernlogik mit handleInputRows definiert.
Für jedes verarbeitete Micro-Batch-Update sind alle Zeilen im Micro-Batch für einen angegebenen Gruppierungsschlüssel über einen Iterator verfügbar. Benutzerdefinierte Logik kann mit allen Zeilen aus dem aktuellen Mikrobatch und den Werten im StateStore interagieren.
Abgelaufene Timer behandeln
Verwenden Sie die handleExpiredTimer Methode, um benutzerdefinierte Logik basierend auf verstrichener Zeit zu implementieren.
Innerhalb eines Gruppierungsschlüssels werden Zeitgeber durch ihren Zeitstempel eindeutig identifiziert.
Wenn ein Timer abläuft, wird das Ergebnis durch die in Ihrer Anwendung implementierte Logik bestimmt. Zu den gängigen Mustern gehören:
- Das Ausgeben von in einer Zustandsvariable gespeicherten Informationen.
- Entfernen gespeicherter Zustandsinformationen.
- Erstellen eines neuen Zeitgebers.
Abgelaufene Timer werden ausgelöst, auch wenn keine Zeilen für den zugehörigen Schlüssel in einem Micro-Batch verarbeitet werden.
Angeben des Zeitmodus
Wenn Sie Ihr StatefulProcessor an transformWithState übergeben, müssen Sie den Zeitmodus mithilfe des Parameters timeMode angeben.
Die folgenden Optionen werden unterstützt:
| Zeitmodus | BESCHREIBUNG |
|---|---|
ProcessingTime |
Timer und TTL werden sowohl unterstützt als auch basierend auf der Wanduhrzeit ausgewertet, wenn Apache Spark jeden Mikrobatch verarbeitet. Verwenden Sie ProcessingTime, wenn Timer in festen Intervallen relativ zur Verarbeitung von Zeilen ausgelöst werden sollen, unabhängig von den Zeitstempeln in den Daten. |
EventTime |
Timer werden unterstützt und basierend auf der Event-Time-Watermark ausgewertet. Die Watermark schreitet fort, während Apache Spark Zeitstempel in den Eingabedaten beobachtet. TTL wird mit EventTime nicht unterstützt. Verwenden Sie EventTime, wenn Ihre Daten Zeitstempel enthalten und Timer entsprechend dem Fortschritt dieser Zeitstempel ausgelöst werden sollen. Bei Verwendung EventTimemüssen Sie auch den eventTimeColumnName Parameter angeben. Siehe eventTimeColumnName. |
NoTime oder TimeMode.None() |
Timer und TTL werden nicht unterstützt. Verwenden Sie NoTime, wenn Ihre zustandsbehaftete Anwendung keine zeitabhängige Logik erfordert. |
eventTimeColumnName
Bei Verwendung des EventTime Zeitmodus gibt der eventTimeColumnName Parameter den Namen der Spalte in Ihrem Ausgabeschema an, die den Ereigniszeitstempel enthält. Apache Spark verwendet diese Spalte, um das Wasserzeichen an den Ausgabedatenstrom weiterzuverbreiten, wodurch korrekte nachgeschaltete zeitbasierte Vorgänge ermöglicht werden.
Python
eventTimeColumnName ist ein zusätzliches Argument für transformWithState oder transformWithStateInPandas:
q = (
df.groupBy("key")
.transformWithState(
statefulProcessor=MyProcessor(),
outputStructType=output_schema,
outputMode="Append",
timeMode="EventTime",
eventTimeColumnName="outputTimestamp",
)
.writeStream...
)
Scala
transformWithState akzeptiert eventTimeColumnName anstelle von timeMode. Bei diesem Ansatz wird immer der Modus EventTime verwendet:
val q = spark
.readStream
.format("delta")
.load(srcDeltaTableDir)
.as[(String, String)]
.groupByKey(x => x._1)
.transformWithState(
new MyProcessor(),
"outputTimestamp",
OutputMode.Append(),
)
.writeStream...
Integrierte Zeitgeberwerte
Databricks empfiehlt, die Systemuhr in Ihrer benutzerdefinierten zustandsbehafteten Anwendung nicht zu verwenden, da dies zu unzuverlässigen Wiederholungen bei Vorgangsfehlern führen kann. Verwenden Sie die Methoden in der TimerValues Klasse, wenn Sie auf die Verarbeitungszeit oder das Wasserzeichen zugreifen müssen:
TimerValues |
BESCHREIBUNG |
|---|---|
getCurrentProcessingTimeInMs |
Gibt den Zeitstempel der Verarbeitungszeit für den aktuellen Batch in Millisekunden seit der Epoche zurück. |
getCurrentWatermarkInMs |
Gibt den Zeitstempel des Wasserzeichens für die aktuelle Charge in Millisekunden seit der Epoche zurück. |
Hinweis
Die Verarbeitungszeit beschreibt die Zeit, zu der der Mikrobatch von Apache Spark verarbeitet wird. Viele Streamingquellen, wie z. B. Kafka, umfassen auch die Systemverarbeitungszeit.
Wasserzeichen für Streamingabfragen werden häufig anhand der Ereigniszeit oder der Verarbeitungszeit der Streamingquelle definiert. Siehe Anwenden von Wasserzeichen zum Steuern von Schwellenwerten für die Datenverarbeitung.
Sowohl Wasserzeichen als auch Fenster können in Kombination mit transformWithState verwendet werden. Sie können ähnliche Funktionsweise in Ihrer benutzerdefinierten zustandsbehafteten Anwendung implementieren, indem Sie TTL, Timer und MapState oder ListState Funktionalität verwenden.
Time-to-Live (TTL) für Zustandstypen
Um Nicht genügend Arbeitsspeicher-Fehler zu vermeiden und veraltete Zustandstypwerte zu entfernen, unterstützt transformWithState für jeden Zustandstypwert einen optionalen Time-to-Live-Wert (TTL). Nach Ablauf entfernt TTL stillschweigend die Werte des Zustandstyps. TTL führt weder handleExpiredTimer noch benutzerdefinierte Logik aus. Um Code auszuführen, wenn der Zustand abläuft, verwenden Sie stattdessen einen Timer.
Von Bedeutung
Wenn Sie TTL nicht implementieren, müssen Sie die Zustandsräumung behandeln, um Nichtspeicherfehler zu vermeiden.
Für alle Zustandstypen wird die TTL beim Aktualisieren von Zustandsinformationen zurückgesetzt. TTL wird für jeden Zustandstypwert mit unterschiedlichen Regeln für jeden Zustandstyp erzwungen:
- Zustandsvariablen sind an bestimmte Gruppenschlüssel gebunden.
- Bei
ValueStateObjekten wird pro Gruppierungsschlüssel nur ein einzelner Wert gespeichert. TTL gilt für diesen Wert. - Bei
ListStateObjekten kann die Liste viele Werte enthalten. TTL gilt für jeden Wert in einer Liste unabhängig.- Während TTL auf einzelne Werte in einem
ListStateBereich festgelegt ist, besteht die einzige Möglichkeit zum Aktualisieren eines einzelnen Werts mit derputMethode, die den gesamten Inhalt derListStateVariablen überschreibt und TTL für alle Werte in der Liste zurücksetzt.
- Während TTL auf einzelne Werte in einem
- Für
MapStateObjekte weist jeder Kartenschlüssel einen zugeordneten Statuswert auf. TTL gilt unabhängig für jedes Schlüsselwertpaar in einer Karte.
Hinweis
Timer ermöglichen es Ihnen, benutzerdefinierte Logik zu definieren, die über das Entfernen von Zuständen hinausgeht, einschließlich der Ausgabe von Zeilen. Optional können Sie Timer verwenden, um sowohl Zustandsinformationen zu einem bestimmten Zustandswert zu löschen als auch Werte auszugeben oder bedingte Logik auszulösen. Siehe Behandeln abgelaufener Timer.
Beispiel einer zustandsbehafteten Anwendung
Im folgenden Beispiel wird ein benutzerdefinierter Zustandsprozessor definiert, SimpleCounterProcessoreinschließlich Beispielzustandsvariablen.
SimpleCounterProcessor verwendet ValueState, ListStateund MapState zum Zählen von Zeilen für jeden Gruppierungsschlüssel.
Python (Pandas)
import pandas as pd
from pyspark.sql import Row
from pyspark.sql.streaming import StatefulProcessor, StatefulProcessorHandle
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
from typing import Iterator
spark.conf.set("spark.sql.streaming.stateStore.providerClass","org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider")
output_schema = StructType(
[
StructField("id", StringType(), True),
StructField("countAsString", StringType(), True),
]
)
class SimpleCounterProcessor(StatefulProcessor):
def init(self, handle: StatefulProcessorHandle) -> None:
value_state_schema = StructType([StructField("count", IntegerType(), True)])
list_state_schema = StructType([StructField("count", IntegerType(), True)])
self.value_state = handle.getValueState(stateName="valueState", schema=value_state_schema)
self.list_state = handle.getListState(stateName="listState", schema=list_state_schema)
# Schema can also be defined using strings and SQL DDL syntax
self.map_state = handle.getMapState(stateName="mapState", userKeySchema="name string", valueSchema="count int")
def handleInputRows(self, key, rows, timerValues) -> Iterator[pd.DataFrame]:
count = 0
for pdf in rows:
list_state_rows = [(120,), (20,)] # A list of tuples
self.list_state.put(list_state_rows)
self.list_state.appendValue((111,))
self.list_state.appendList(list_state_rows)
pdf_count = pdf.count()
count += pdf_count.get("value")
self.value_state.update((count,)) # Count is passed as a tuple
iter = self.list_state.get()
list_state_value = next(iter)[0]
value = count
user_key = ("user_key",)
if self.map_state.exists():
if self.map_state.containsKey(user_key):
value += self.map_state.getValue(user_key)[0]
self.map_state.updateValue(user_key, (value,)) # Value is a tuple
yield pd.DataFrame({"id": key, "countAsString": str(count)})
q = (df.groupBy("key")
.transformWithStateInPandas(
statefulProcessor=SimpleCounterProcessor(),
outputStructType=output_schema,
outputMode="Update",
timeMode="None",
)
.writeStream...
)
Python (zeilenbasiert)
from pyspark.sql import Row
from pyspark.sql.streaming import StatefulProcessor, StatefulProcessorHandle
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
from typing import Iterator
spark.conf.set("spark.sql.streaming.stateStore.providerClass", "org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider")
output_schema = StructType(
[
StructField("id", StringType(), True),
StructField("countAsString", StringType(), True),
]
)
class SimpleCounterProcessor(StatefulProcessor):
def init(self, handle: StatefulProcessorHandle) -> None:
value_state_schema = StructType([StructField("count", IntegerType(), True)])
list_state_schema = StructType([StructField("count", IntegerType(), True)])
self.value_state = handle.getValueState(stateName="valueState", schema=value_state_schema)
self.list_state = handle.getListState(stateName="listState", schema=list_state_schema)
self.map_state = handle.getMapState(stateName="mapState", userKeySchema="name string", valueSchema="count int")
def handleInputRows(self, key, rows: Iterator[Row], timerValues) -> Iterator[Row]:
count = 0
for row in rows:
list_state_rows = [(120,), (20,)] # A list of tuples
self.list_state.put(list_state_rows)
self.list_state.appendValue((111,))
self.list_state.appendList(list_state_rows)
count += 1
self.value_state.update((count,)) # Count is passed as a tuple
iter_list = self.list_state.get()
list_state_value = next(iter_list)[0]
value = count
user_key = ("user_key",)
if self.map_state.exists():
if self.map_state.containsKey(user_key):
value += self.map_state.getValue(user_key)[0]
self.map_state.updateValue(user_key, (value,)) # Value is a tuple
yield Row(id=key, countAsString=str(count))
q = (
df.groupBy("key")
.transformWithState(
statefulProcessor=SimpleCounterProcessor(),
outputStructType=output_schema,
outputMode="Update",
timeMode="None",
)
.writeStream...
)
Scala
import org.apache.spark.sql.streaming._
import org.apache.spark.sql.{Dataset, Encoder, Encoders , DataFrame}
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._
spark.conf.set("spark.sql.streaming.stateStore.providerClass","org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider")
class SimpleCounterProcessor extends StatefulProcessor[String, (String, String), (String, String)] {
@transient private var countState: ValueState[Int] = _
@transient private var listState: ListState[Int] = _
@transient private var mapState: MapState[String, Int] = _
private val longEncoder = Encoders.scalaLong
private val intEncoder = Encoders.scalaInt
private val stringEncoder = Encoders.STRING
override def init(
outputMode: OutputMode,
timeMode: TimeMode): Unit = {
countState = getHandle.getValueState[Int]("countState",
intEncoder, TTLConfig.NONE)
listState = getHandle.getListState[Int]("listState",
intEncoder, TTLConfig.NONE)
mapState = getHandle.getMapState[String, Int]("mapState",
stringEncoder, intEncoder, TTLConfig.NONE)
}
override def handleInputRows(
key: String,
inputRows: Iterator[(String, String)],
timerValues: TimerValues): Iterator[(String, String)] = {
var count = countState.getOption().getOrElse(0)
for (row <- inputRows) {
val listData = Array(120, 20)
listState.put(listData)
listState.appendValue(count)
listState.appendList(listData)
count += 1
}
val iter = listState.get()
var listStateValue = 0
if (iter.hasNext) {
listStateValue = iter.next()
}
countState.update(count)
var value = count
val userKey = "userKey"
if (mapState.exists()) {
if (mapState.containsKey(userKey)) {
value += mapState.getValue(userKey)
}
}
mapState.updateValue(userKey, value)
Iterator((key, count.toString))
}
}
val q = spark
.readStream
.format("delta")
.load("$srcDeltaTableDir")
.as[(String, String)]
.groupByKey(x => x._1)
.transformWithState(
new SimpleCounterProcessor(),
TimeMode.None(),
OutputMode.Update(),
)
.writeStream...
Weitere Beispiele finden Sie unter Beispiel für zustandsbehaftete Anwendungen.
Hinweis
In Python sind Zustandswerte Tupel. Übergeben Sie Tupel an put und update, und erwarten Sie Tupel von get.
Wenn beispielsweise das Schema für Ihr ValueState eine einzelne Ganzzahl ist:
current_value_tuple = value_state.get() # Returns the value state as a tuple
current_value = current_value_tuple[0] # Extracts the first item in the tuple
new_value = current_value + 1 # Calculate a new value
value_state.update((new_value,)) # Pass the new value formatted as a tuple
Verwenden Sie diesen Ansatz auch für Elemente in einem ListState oder werten in einem MapState .
Zeilen ausgeben
Sie müssen handleInputRows oder handleExpiredTimer verwenden, um zu definieren, wie transformWithState für jeden Gruppierungsschlüssel Zeilen ausgibt. Siehe Behandeln von Eingabezeilen und Behandeln abgelaufener Timer.
Benutzerdefinierte zustandsbehaftete Anwendungen machen keine Annahmen darüber, wie Zustandsinformationen verwendet werden. Bei einer bestimmten Bedingung gibt die Anwendung möglicherweise keine Zeilen, eine Zeile oder viele Zeilen aus.
Hinweis
Sie können mehrere Zustandswerte implementieren und mehrere Bedingungen für das Ausführen von Zeilen definieren, aber alle Zeilen müssen dasselbe Schema verwenden.
Python (Pandas)
Mit transformWithStateInPandas definieren Sie Ihr Ausgabeschema mit dem Schlüsselwort outputStructType.
Emittieren von Zeilen mit einem Pandas DataFrame-Objekt und yield.
Optional können Sie yield einen leeren DataFrame erstellen. Wenn Sie den Ausgabemodus verwenden update und einen leeren DataFrame ausgeben, aktualisiert dies die Werte für den Gruppierungsschlüssel null.
Python (zeilenbasiert)
Definieren Sie mit transformWithState dem Schlüsselwort outputStructType Ihr Ausgabeschema.
Emittieren von Zeilen mit einem Row Objekt und yield.
Optional können Sie einen leeren Iterator zurückgeben. Wenn Sie den Ausgabemodus verwenden update und einen leeren Iterator ausgeben, aktualisiert dies die Werte für den Gruppierungsschlüssel null.
Scala
In Scala geben Sie Zeilen mithilfe eines Iterator Objekts aus. Das Schema leitet sich automatisch vom Schema der ausgegebenen Zeilen ab.
Optional können Sie eine leere IteratorZurückgeben. Wenn Sie den Ausgabemodus update verwenden und ein leeres Iterator ausgeben, werden die Werte für den Gruppierungsschlüssel auf null aktualisiert.
Behandeln des Anfangszustands
Optional können Sie einen Anfangszustand an den ersten Mikrobatch übergeben.
Sie können dies z. B. für Folgendes verwenden:
- Migrieren sie einen vorhandenen Workflow zu einer neuen benutzerdefinierten Anwendung.
- Aktualisieren Sie einen zustandsbehafteten Operator, um Ihr Schema oder Ihre Logik zu ändern.
- Reparieren Eines Fehlers, der nicht automatisch repariert werden kann und einen manuellen Eingriff erfordert.
Hinweis
Verwenden Sie den Statusspeicherleser, um Statusinformationen von einem vorhandenen Prüfpunkt abzufragen. Weitere Informationen finden Sie unter Lesen von strukturierten Streamingstatusinformationen.
Wenn Sie eine vorhandene Delta-Tabelle in eine zustandsbehaftete Anwendung konvertieren, lesen Sie die Tabelle, und spark.read.table("table_name") übergeben Sie den resultierenden DataFrame. Sie können optional Felder auswählen oder ändern, um ihrer neuen zustandsbehafteten Anwendung zu entsprechen.
Sie stellen einen Anfangszustand mithilfe eines DataFrame mit demselben Gruppierungsschlüsselschema wie die Eingabezeilen bereit.
Hinweis
Python verwendet handleInitialState, um den Anfangszustand beim Definieren eines StatefulProcessor anzugeben. Scala verwendet die unterschiedliche Klasse StatefulProcessorWithInitialState.
Im folgenden Beispiel wird ein Zähler pro Schlüssel aus einer vorhandenen Delta-Tabelle initialisiert:
Python (zeilenbasiert)
from pyspark.sql import Row
from pyspark.sql.streaming import StatefulProcessor, StatefulProcessorHandle
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
from typing import Iterator
class CounterWithInitialState(StatefulProcessor):
def init(self, handle: StatefulProcessorHandle) -> None:
state_schema = StructType([StructField("count", IntegerType(), True)])
self.count_state = handle.getValueState("countState", state_schema)
def handleInitialState(self, key, initialState: Row, timerValues) -> None:
self.count_state.update((initialState["count"],))
def handleInputRows(self, key, rows: Iterator[Row], timerValues) -> Iterator[Row]:
count = self.count_state.get()[0] if self.count_state.exists() else 0
for _ in rows:
count += 1
self.count_state.update((count,))
yield Row(id=key[0], count=count)
def close(self) -> None:
pass
output_schema = StructType([
StructField("id", StringType(), True),
StructField("count", IntegerType(), True),
])
# Load existing counts as initial state — must use the same grouping key as the input
initial_state = spark.read.table("existing_counts").groupBy("id")
q = (
df.groupBy("id")
.transformWithState(
statefulProcessor=CounterWithInitialState(),
outputStructType=output_schema,
outputMode="Update",
timeMode="None",
initialState=initial_state,
)
.writeStream...
)
Scala
import org.apache.spark.sql.streaming._
import org.apache.spark.sql.Encoders
class CounterWithInitialState
extends StatefulProcessorWithInitialState[String, (String, String), (String, String), (String, Int)] {
@transient private var countState: ValueState[Int] = _
override def init(outputMode: OutputMode, timeMode: TimeMode): Unit = {
countState = getHandle.getValueState[Int]("countState", Encoders.scalaInt, TTLConfig.NONE)
}
override def handleInitialState(
key: String, initialState: (String, Int), timerValues: TimerValues): Unit = {
countState.update(initialState._2)
}
override def handleInputRows(
key: String,
rows: Iterator[(String, String)],
timerValues: TimerValues): Iterator[(String, String)] = {
val count = if (countState.exists()) countState.get() else 0
val newCount = count + rows.size
countState.update(newCount)
Iterator((key, newCount.toString))
}
}
// Load existing counts as initial state — must use the same grouping key as the input
val initialState = spark.read.table("existing_counts")
.as[(String, Int)]
.groupByKey(_._1)
val q = spark
.readStream
.format("delta")
.load(srcDeltaTableDir)
.as[(String, String)]
.groupByKey(_._1)
.transformWithState(
new CounterWithInitialState(),
TimeMode.None(),
OutputMode.Update(),
initialState,
)
.writeStream...
Verwendung transformWithState in Lakeflow Spark Declarative Pipelines
Verwenden Sie den Operator transformWithState in Lakeflow Spark Declarative Pipelines, um beliebige Zustandslogik in Ihren Streamingpipelines mithilfe von Python zu implementieren.
Führen Sie dafür folgende Schritte aus:
- Definieren Sie das Ausgabeschema und die zustandsbehaftete Prozessorlogik für ihre beliebigen zustandsbehafteten Transformationen. Beispiele finden Sie unter Beispiel für zustandsbehaftete Anwendungen.
- Erstellen Sie einen Lakeflow Spark Declarative Pipelines-Fluss, der den
transformWithStateOperator für einen DataFrame aufruft. Siehe Lernprogramm: Erstellen Ihrer ersten Pipeline mithilfe des Lakeflow-Pipelines-Editors. - Führen Sie die Pipeline aus und überprüfen Sie die Ergebnisse in der Zieltabelle oder im Speicherort.
Ein Beispiel, das transformWithState zur Überwachung von Sensortakten verwendet, finden Sie unter Beispiel: Verwenden von transformWithState zur Überwachung von Sensortakten.