update_flow

Importante

L'API update_flow è disponibile in anteprima pubblica.

Usare l'elemento @dp.update_flow Decorator per creare un flusso di aggiornamento. I flussi di aggiornamento scrivono in sink usando la modalità di output degli aggiornamenti, generando solo le righe modificate in ogni batch. A differenza dei flussi di accodamento, supportano aggregazioni con stato senza richiedere una filigrana.

I flussi di aggiornamento possono essere destinati solo ai sink. Le tabelle delta non sono supportate.

Syntax

from pyspark import pipelines as dp

dp.create_sink("<sink-name>", "<format>", {"<key>": "<value>"})

@dp.update_flow(
  target = "<sink-name>",
  name = "<flow-name>", # optional, defaults to function name
  spark_conf = {"<key>" : "<value>", "<key>" : "<value>"}, # optional
  comment = "<comment>", # optional
  import_checkpoint = "<checkpoint-path>") # optional
def <function-name>():
  return (<streaming-query>)

Parameters

Parametro Tipo Description
funzione function Obbligatorio. Funzione che restituisce un dataframe di streaming Apache Spark da una query definita dall'utente.
target str Obbligatorio. Nome del sink in cui scrive il flusso.
name str Nome del flusso. Se non specificato, per impostazione predefinita viene impostato il nome della funzione.
comment str Descrizione del flusso.
spark_conf dict Una deviazione delle configurazioni di Spark per l'esecuzione di questa query. Queste configurazioni eseguono l'override di confs impostate per la destinazione, la pipeline o il cluster.
import_checkpoint str Percorso del checkpoint esterno da importare prima di avviare il flusso. Importato una sola volta, quando la directory del checkpoint del flusso non esiste ancora.

Examples

Aggregazione in un sink Kafka

Scrivere risultati di aggregazione con stato in un sink Kafka:

from pyspark import pipelines as dp
from pyspark.sql.functions import col

dp.create_sink("event_counts_sink", "kafka", {
    "kafka.bootstrap.servers": broker_address,
    "topic": output_topic,
})

@dp.update_flow(
    name="event_counts_flow",
    target="event_counts_sink",
)
def event_counts():
    return (
        spark.readStream
            .format("kafka")
            .option("kafka.bootstrap.servers", broker_address)
            .option("subscribe", input_topic)
            .load()
            .selectExpr("CAST(key AS STRING) AS event_type")
            .groupBy(col("event_type"))
            .count()
    )

Modalità in tempo reale

Usare spark_conf per configurare un flusso di aggiornamento per la modalità in tempo reale:

from pyspark import pipelines as dp

dp.create_sink("my_kafka_sink", "kafka", {
    "kafka.bootstrap.servers": broker_address,
    "topic": output_topic,
})

@dp.update_flow(
    name="my_rtm_flow",
    target="my_kafka_sink",
    spark_conf={
        "pipelines.trigger": "RealTime",
        "pipelines.trigger.interval": "5 minutes",
    }
)
def my_real_time_flow():
    return (
        spark.readStream
            .format("kafka")
            .option("kafka.bootstrap.servers", broker_address)
            .option("subscribe", input_topic)
            .load()
    )

Limitations

  • I sink di tabella delta non sono supportati come destinazioni per i flussi di aggiornamento.