update_flow

Important

De update_flow API bevindt zich in openbare preview.

Gebruik de @dp.update_flow decorator om een updatestroom te maken. Werk stromen schrijven naar sinks bij met behulp van de update-uitvoermodus, waarbij alleen de rijen worden verzonden die in elke batch zijn gewijzigd. In tegenstelling tot toevoegstromen bieden ze ondersteuning voor stateful aggregaties zonder dat hiervoor een watermerk is vereist.

Updatestromen kunnen alleen doelsinks gebruiken. Delta-tabellen worden niet ondersteund.

Syntax

from pyspark import pipelines as dp

dp.create_sink("<sink-name>", "<format>", {"<key>": "<value>"})

@dp.update_flow(
  target = "<sink-name>",
  name = "<flow-name>", # optional, defaults to function name
  spark_conf = {"<key>" : "<value>", "<key>" : "<value>"}, # optional
  comment = "<comment>", # optional
  import_checkpoint = "<checkpoint-path>") # optional
def <function-name>():
  return (<streaming-query>)

Parameters

Parameter Typ Description
function function Verplicht. Een functie die een Streaming DataFrame van Apache Spark retourneert vanuit een door de gebruiker gedefinieerde query.
target str Verplicht. De naam van de sink naar deze stroom schrijft.
name str De naam van de stroom. Als deze niet is opgegeven, wordt standaard de functienaam gebruikt.
comment str Een beschrijving voor het proces.
spark_conf dict Een dicteerfunctie voor Spark-configuraties voor de uitvoering van deze query. Deze configuraties overschrijven confs set voor de bestemming, pijplijn of cluster.
import_checkpoint str Een extern controlepuntpad dat moet worden geïmporteerd voordat de stroom wordt gestart. Slechts eenmaal geïmporteerd, wanneer de controlepuntmap van de stroom nog niet bestaat.

Examples

Aggregatie naar een Kafka-sink

Stateful aggregatieresultaten schrijven naar een Kafka-sink:

from pyspark import pipelines as dp
from pyspark.sql.functions import col

dp.create_sink("event_counts_sink", "kafka", {
    "kafka.bootstrap.servers": broker_address,
    "topic": output_topic,
})

@dp.update_flow(
    name="event_counts_flow",
    target="event_counts_sink",
)
def event_counts():
    return (
        spark.readStream
            .format("kafka")
            .option("kafka.bootstrap.servers", broker_address)
            .option("subscribe", input_topic)
            .load()
            .selectExpr("CAST(key AS STRING) AS event_type")
            .groupBy(col("event_type"))
            .count()
    )

Real-time modus

Gebruik spark_conf dit om een updatestroom te configureren voor de realtime-modus:

from pyspark import pipelines as dp

dp.create_sink("my_kafka_sink", "kafka", {
    "kafka.bootstrap.servers": broker_address,
    "topic": output_topic,
})

@dp.update_flow(
    name="my_rtm_flow",
    target="my_kafka_sink",
    spark_conf={
        "pipelines.trigger": "RealTime",
        "pipelines.trigger.interval": "5 minutes",
    }
)
def my_real_time_flow():
    return (
        spark.readStream
            .format("kafka")
            .option("kafka.bootstrap.servers", broker_address)
            .option("subscribe", input_topic)
            .load()
    )

Limitations

  • Delta-tabelsinks worden niet ondersteund als doelen voor updatestromen.