Remarque
L’accès à cette page nécessite une autorisation. Vous pouvez essayer de vous connecter ou de modifier des répertoires.
L’accès à cette page nécessite une autorisation. Vous pouvez essayer de modifier des répertoires.
Important
L’API update_flow est en préversion publique.
Utilisez le @dp.update_flow décorateur pour créer un flux de mise à jour. Les flux de mise à jour écrivent dans des récepteurs à l’aide du mode de sortie de mise à jour, en émettant uniquement les lignes qui ont changé dans chaque lot. Contrairement aux flux d’ajout, ils prennent en charge les agrégations avec état sans nécessiter de filigrane.
Les flux de mise à jour peuvent uniquement cibler des récepteurs. Les tables delta ne sont pas prises en charge.
Syntax
from pyspark import pipelines as dp
dp.create_sink("<sink-name>", "<format>", {"<key>": "<value>"})
@dp.update_flow(
target = "<sink-name>",
name = "<flow-name>", # optional, defaults to function name
spark_conf = {"<key>" : "<value>", "<key>" : "<value>"}, # optional
comment = "<comment>", # optional
import_checkpoint = "<checkpoint-path>") # optional
def <function-name>():
return (<streaming-query>)
Parameters
| Paramètre | Catégorie | Description |
|---|---|---|
| function | function |
Obligatoire. Fonction qui retourne un DataFrame de streaming Apache Spark à partir d’une requête définie par l’utilisateur. |
target |
str |
Obligatoire. Nom du récepteur dans qui écrit ce flux. |
name |
str |
Nom du flux. S’il n’est pas fourni, la valeur par défaut est le nom de la fonction. |
comment |
str |
Description du flux. |
spark_conf |
dict |
dictée des configurations Spark pour l’exécution de cette requête. Ces configurations remplacent les confs définis pour la destination, le pipeline ou le cluster. |
import_checkpoint |
str |
Chemin d’accès de point de contrôle externe à importer avant de démarrer le flux. Importé une seule fois, lorsque le répertoire de point de contrôle du flux n’existe pas encore. |
Exemples
Agrégation vers un récepteur Kafka
Écrivez les résultats d’agrégation avec état dans un récepteur Kafka :
from pyspark import pipelines as dp
from pyspark.sql.functions import col
dp.create_sink("event_counts_sink", "kafka", {
"kafka.bootstrap.servers": broker_address,
"topic": output_topic,
})
@dp.update_flow(
name="event_counts_flow",
target="event_counts_sink",
)
def event_counts():
return (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("subscribe", input_topic)
.load()
.selectExpr("CAST(key AS STRING) AS event_type")
.groupBy(col("event_type"))
.count()
)
Mode en temps réel
Permet spark_conf de configurer un flux de mise à jour pour le mode en temps réel :
from pyspark import pipelines as dp
dp.create_sink("my_kafka_sink", "kafka", {
"kafka.bootstrap.servers": broker_address,
"topic": output_topic,
})
@dp.update_flow(
name="my_rtm_flow",
target="my_kafka_sink",
spark_conf={
"pipelines.trigger": "RealTime",
"pipelines.trigger.interval": "5 minutes",
}
)
def my_real_time_flow():
return (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("subscribe", input_topic)
.load()
)
Limitations
- Les récepteurs de tables Delta ne sont pas pris en charge en tant que cibles pour les flux de mise à jour.