update_flow

Viktigt!

API:et update_flow finns i offentlig förhandsversion.

Använd dekoratören @dp.update_flow för att skapa ett uppdateringsflöde. Uppdatera flöden skrivs till mottagare med uppdateringsutdataläget, vilket endast genererar de rader som har ändrats i varje batch. Till skillnad från tilläggsflöden stöder de tillståndskänsliga aggregeringar utan att kräva en vattenstämpel.

Uppdateringsflöden kan bara rikta in sig på mottagare. Deltatabeller stöds inte.

Syntax

from pyspark import pipelines as dp

dp.create_sink("<sink-name>", "<format>", {"<key>": "<value>"})

@dp.update_flow(
  target = "<sink-name>",
  name = "<flow-name>", # optional, defaults to function name
  spark_conf = {"<key>" : "<value>", "<key>" : "<value>"}, # optional
  comment = "<comment>", # optional
  import_checkpoint = "<checkpoint-path>") # optional
def <function-name>():
  return (<streaming-query>)

Parameters

Parameter Type Description
function function Required. En funktion som returnerar en Apache Spark-strömmande DataFrame från en användardefinierad fråga.
target str Required. Namnet på mottagaren som det här flödet skriver till.
name str Flödets namn Om det inte anges används funktionsnamnet som standard.
comment str En beskrivning av flödet.
spark_conf dict En diktering av Spark-konfigurationer för körningen av den här frågan. Dessa konfigurationer åsidosätter konfigurationer som angetts för mål, pipeline eller kluster.
import_checkpoint str En extern kontrollpunktssökväg som ska importeras innan flödet startas. Importeras bara en gång, när flödets kontrollpunktskatalog ännu inte finns.

Exempel

Sammansättning till en Kafka-mottagare

Skriva tillståndskänsliga aggregeringsresultat till en Kafka-mottagare:

from pyspark import pipelines as dp
from pyspark.sql.functions import col

dp.create_sink("event_counts_sink", "kafka", {
    "kafka.bootstrap.servers": broker_address,
    "topic": output_topic,
})

@dp.update_flow(
    name="event_counts_flow",
    target="event_counts_sink",
)
def event_counts():
    return (
        spark.readStream
            .format("kafka")
            .option("kafka.bootstrap.servers", broker_address)
            .option("subscribe", input_topic)
            .load()
            .selectExpr("CAST(key AS STRING) AS event_type")
            .groupBy(col("event_type"))
            .count()
    )

Realtidsläge

Använd spark_conf för att konfigurera ett uppdateringsflöde för realtidsläge:

from pyspark import pipelines as dp

dp.create_sink("my_kafka_sink", "kafka", {
    "kafka.bootstrap.servers": broker_address,
    "topic": output_topic,
})

@dp.update_flow(
    name="my_rtm_flow",
    target="my_kafka_sink",
    spark_conf={
        "pipelines.trigger": "RealTime",
        "pipelines.trigger.interval": "5 minutes",
    }
)
def my_real_time_flow():
    return (
        spark.readStream
            .format("kafka")
            .option("kafka.bootstrap.servers", broker_address)
            .option("subscribe", input_topic)
            .load()
    )

Limitations

  • Deltatabellmottagare stöds inte som mål för uppdateringsflöden.