Hinweis
Für den Zugriff auf diese Seite ist eine Autorisierung erforderlich. Sie können versuchen, sich anzumelden oder das Verzeichnis zu wechseln.
Für den Zugriff auf diese Seite ist eine Autorisierung erforderlich. Sie können versuchen, das Verzeichnis zu wechseln.
Important
Die update_flow API befindet sich in der öffentlichen Vorschau.
Verwenden Sie den @dp.update_flow Dekorateur, um einen Aktualisierungsfluss zu erstellen. Aktualisieren Sie Flüsse, die mit dem Updateausgabemodus in Senken geschrieben werden, wobei nur die Zeilen ausgegeben werden, die in jedem Batch geändert wurden. Im Gegensatz zu Anfügeflüssen unterstützen sie zustandsbehaftete Aggregationen, ohne dass ein Wasserzeichen erforderlich ist.
Aktualisierungsflüsse können nur Zielsenken sein. Delta-Tabellen werden nicht unterstützt.
Syntax
from pyspark import pipelines as dp
dp.create_sink("<sink-name>", "<format>", {"<key>": "<value>"})
@dp.update_flow(
target = "<sink-name>",
name = "<flow-name>", # optional, defaults to function name
spark_conf = {"<key>" : "<value>", "<key>" : "<value>"}, # optional
comment = "<comment>", # optional
import_checkpoint = "<checkpoint-path>") # optional
def <function-name>():
return (<streaming-query>)
Parameters
| Parameter | Typ | Description |
|---|---|---|
| Funktion | function |
Required. Eine Funktion, die einen Apache Spark Streaming DataFrame aus einer benutzerdefinierten Abfrage zurückgibt. |
target |
str |
Required. Der Name der Spüle, in die dieser Fluss schreibt. |
name |
str |
Der Flussname. Wenn nicht angegeben, wird standardmäßig der Funktionsname verwendet. |
comment |
str |
Eine Beschreibung für den Ablauf. |
spark_conf |
dict |
Ein Diktieren von Spark-Konfigurationen für die Ausführung dieser Abfrage. Diese Konfigurationen überschreiben die Einstellung für das Ziel, die Pipeline oder den Cluster. |
import_checkpoint |
str |
Ein externer Prüfpunktpfad, der vor dem Starten des Flusses importiert werden soll. Wird nur einmal importiert, wenn das Prüfpunktverzeichnis des Flusses noch nicht vorhanden ist. |
Beispiele
Aggregation zu einer Kafka-Spüle
Schreiben Sie zustandsbehaftete Aggregationsergebnisse in eine Kafka-Spüle:
from pyspark import pipelines as dp
from pyspark.sql.functions import col
dp.create_sink("event_counts_sink", "kafka", {
"kafka.bootstrap.servers": broker_address,
"topic": output_topic,
})
@dp.update_flow(
name="event_counts_flow",
target="event_counts_sink",
)
def event_counts():
return (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("subscribe", input_topic)
.load()
.selectExpr("CAST(key AS STRING) AS event_type")
.groupBy(col("event_type"))
.count()
)
Echtzeitmodus
Dient spark_conf zum Konfigurieren eines Aktualisierungsflusses für den Echtzeitmodus:
from pyspark import pipelines as dp
dp.create_sink("my_kafka_sink", "kafka", {
"kafka.bootstrap.servers": broker_address,
"topic": output_topic,
})
@dp.update_flow(
name="my_rtm_flow",
target="my_kafka_sink",
spark_conf={
"pipelines.trigger": "RealTime",
"pipelines.trigger.interval": "5 minutes",
}
)
def my_real_time_flow():
return (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("subscribe", input_topic)
.load()
)
Einschränkungen
- Delta-Tabellensenken werden nicht als Ziele für Updateflüsse unterstützt.