update_flow

Important

L’API update_flow est en préversion publique.

Utilisez le @dp.update_flow décorateur pour créer un flux de mise à jour. Les flux de mise à jour écrivent dans des récepteurs à l’aide du mode de sortie de mise à jour, en émettant uniquement les lignes qui ont changé dans chaque lot. Contrairement aux flux d’ajout, ils prennent en charge les agrégations avec état sans nécessiter de filigrane.

Les flux de mise à jour peuvent uniquement cibler des récepteurs. Les tables delta ne sont pas prises en charge.

Syntax

from pyspark import pipelines as dp

dp.create_sink("<sink-name>", "<format>", {"<key>": "<value>"})

@dp.update_flow(
  target = "<sink-name>",
  name = "<flow-name>", # optional, defaults to function name
  spark_conf = {"<key>" : "<value>", "<key>" : "<value>"}, # optional
  comment = "<comment>", # optional
  import_checkpoint = "<checkpoint-path>") # optional
def <function-name>():
  return (<streaming-query>)

Parameters

Paramètre Catégorie Description
function function Obligatoire. Fonction qui retourne un DataFrame de streaming Apache Spark à partir d’une requête définie par l’utilisateur.
target str Obligatoire. Nom du récepteur dans qui écrit ce flux.
name str Nom du flux. S’il n’est pas fourni, la valeur par défaut est le nom de la fonction.
comment str Description du flux.
spark_conf dict dictée des configurations Spark pour l’exécution de cette requête. Ces configurations remplacent les confs définis pour la destination, le pipeline ou le cluster.
import_checkpoint str Chemin d’accès de point de contrôle externe à importer avant de démarrer le flux. Importé une seule fois, lorsque le répertoire de point de contrôle du flux n’existe pas encore.

Exemples

Agrégation vers un récepteur Kafka

Écrivez les résultats d’agrégation avec état dans un récepteur Kafka :

from pyspark import pipelines as dp
from pyspark.sql.functions import col

dp.create_sink("event_counts_sink", "kafka", {
    "kafka.bootstrap.servers": broker_address,
    "topic": output_topic,
})

@dp.update_flow(
    name="event_counts_flow",
    target="event_counts_sink",
)
def event_counts():
    return (
        spark.readStream
            .format("kafka")
            .option("kafka.bootstrap.servers", broker_address)
            .option("subscribe", input_topic)
            .load()
            .selectExpr("CAST(key AS STRING) AS event_type")
            .groupBy(col("event_type"))
            .count()
    )

Mode en temps réel

Permet spark_conf de configurer un flux de mise à jour pour le mode en temps réel :

from pyspark import pipelines as dp

dp.create_sink("my_kafka_sink", "kafka", {
    "kafka.bootstrap.servers": broker_address,
    "topic": output_topic,
})

@dp.update_flow(
    name="my_rtm_flow",
    target="my_kafka_sink",
    spark_conf={
        "pipelines.trigger": "RealTime",
        "pipelines.trigger.interval": "5 minutes",
    }
)
def my_real_time_flow():
    return (
        spark.readStream
            .format("kafka")
            .option("kafka.bootstrap.servers", broker_address)
            .option("subscribe", input_topic)
            .load()
    )

Limitations

  • Les récepteurs de tables Delta ne sont pas pris en charge en tant que cibles pour les flux de mise à jour.