update_flow

Important

Die update_flow API befindet sich in der öffentlichen Vorschau.

Verwenden Sie den @dp.update_flow Dekorateur, um einen Aktualisierungsfluss zu erstellen. Aktualisieren Sie Flüsse, die mit dem Updateausgabemodus in Senken geschrieben werden, wobei nur die Zeilen ausgegeben werden, die in jedem Batch geändert wurden. Im Gegensatz zu Anfügeflüssen unterstützen sie zustandsbehaftete Aggregationen, ohne dass ein Wasserzeichen erforderlich ist.

Aktualisierungsflüsse können nur Zielsenken sein. Delta-Tabellen werden nicht unterstützt.

Syntax

from pyspark import pipelines as dp

dp.create_sink("<sink-name>", "<format>", {"<key>": "<value>"})

@dp.update_flow(
  target = "<sink-name>",
  name = "<flow-name>", # optional, defaults to function name
  spark_conf = {"<key>" : "<value>", "<key>" : "<value>"}, # optional
  comment = "<comment>", # optional
  import_checkpoint = "<checkpoint-path>") # optional
def <function-name>():
  return (<streaming-query>)

Parameters

Parameter Typ Description
Funktion function Required. Eine Funktion, die einen Apache Spark Streaming DataFrame aus einer benutzerdefinierten Abfrage zurückgibt.
target str Required. Der Name der Spüle, in die dieser Fluss schreibt.
name str Der Flussname. Wenn nicht angegeben, wird standardmäßig der Funktionsname verwendet.
comment str Eine Beschreibung für den Ablauf.
spark_conf dict Ein Diktieren von Spark-Konfigurationen für die Ausführung dieser Abfrage. Diese Konfigurationen überschreiben die Einstellung für das Ziel, die Pipeline oder den Cluster.
import_checkpoint str Ein externer Prüfpunktpfad, der vor dem Starten des Flusses importiert werden soll. Wird nur einmal importiert, wenn das Prüfpunktverzeichnis des Flusses noch nicht vorhanden ist.

Beispiele

Aggregation zu einer Kafka-Spüle

Schreiben Sie zustandsbehaftete Aggregationsergebnisse in eine Kafka-Spüle:

from pyspark import pipelines as dp
from pyspark.sql.functions import col

dp.create_sink("event_counts_sink", "kafka", {
    "kafka.bootstrap.servers": broker_address,
    "topic": output_topic,
})

@dp.update_flow(
    name="event_counts_flow",
    target="event_counts_sink",
)
def event_counts():
    return (
        spark.readStream
            .format("kafka")
            .option("kafka.bootstrap.servers", broker_address)
            .option("subscribe", input_topic)
            .load()
            .selectExpr("CAST(key AS STRING) AS event_type")
            .groupBy(col("event_type"))
            .count()
    )

Echtzeitmodus

Dient spark_conf zum Konfigurieren eines Aktualisierungsflusses für den Echtzeitmodus:

from pyspark import pipelines as dp

dp.create_sink("my_kafka_sink", "kafka", {
    "kafka.bootstrap.servers": broker_address,
    "topic": output_topic,
})

@dp.update_flow(
    name="my_rtm_flow",
    target="my_kafka_sink",
    spark_conf={
        "pipelines.trigger": "RealTime",
        "pipelines.trigger.interval": "5 minutes",
    }
)
def my_real_time_flow():
    return (
        spark.readStream
            .format("kafka")
            .option("kafka.bootstrap.servers", broker_address)
            .option("subscribe", input_topic)
            .load()
    )

Einschränkungen

  • Delta-Tabellensenken werden nicht als Ziele für Updateflüsse unterstützt.