Notitie
Voor toegang tot deze pagina is autorisatie vereist. U kunt proberen u aan te melden of de directory te wijzigen.
Voor toegang tot deze pagina is autorisatie vereist. U kunt proberen de mappen te wijzigen.
Important
De update_flow API bevindt zich in openbare preview.
Gebruik de @dp.update_flow decorator om een updatestroom te maken. Werk stromen schrijven naar sinks bij met behulp van de update-uitvoermodus, waarbij alleen de rijen worden verzonden die in elke batch zijn gewijzigd. In tegenstelling tot toevoegstromen bieden ze ondersteuning voor stateful aggregaties zonder dat hiervoor een watermerk is vereist.
Updatestromen kunnen alleen doelsinks gebruiken. Delta-tabellen worden niet ondersteund.
Syntax
from pyspark import pipelines as dp
dp.create_sink("<sink-name>", "<format>", {"<key>": "<value>"})
@dp.update_flow(
target = "<sink-name>",
name = "<flow-name>", # optional, defaults to function name
spark_conf = {"<key>" : "<value>", "<key>" : "<value>"}, # optional
comment = "<comment>", # optional
import_checkpoint = "<checkpoint-path>") # optional
def <function-name>():
return (<streaming-query>)
Parameters
| Parameter | Typ | Description |
|---|---|---|
| function | function |
Verplicht. Een functie die een Streaming DataFrame van Apache Spark retourneert vanuit een door de gebruiker gedefinieerde query. |
target |
str |
Verplicht. De naam van de sink naar deze stroom schrijft. |
name |
str |
De naam van de stroom. Als deze niet is opgegeven, wordt standaard de functienaam gebruikt. |
comment |
str |
Een beschrijving voor het proces. |
spark_conf |
dict |
Een dicteerfunctie voor Spark-configuraties voor de uitvoering van deze query. Deze configuraties overschrijven confs set voor de bestemming, pijplijn of cluster. |
import_checkpoint |
str |
Een extern controlepuntpad dat moet worden geïmporteerd voordat de stroom wordt gestart. Slechts eenmaal geïmporteerd, wanneer de controlepuntmap van de stroom nog niet bestaat. |
Examples
Aggregatie naar een Kafka-sink
Stateful aggregatieresultaten schrijven naar een Kafka-sink:
from pyspark import pipelines as dp
from pyspark.sql.functions import col
dp.create_sink("event_counts_sink", "kafka", {
"kafka.bootstrap.servers": broker_address,
"topic": output_topic,
})
@dp.update_flow(
name="event_counts_flow",
target="event_counts_sink",
)
def event_counts():
return (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("subscribe", input_topic)
.load()
.selectExpr("CAST(key AS STRING) AS event_type")
.groupBy(col("event_type"))
.count()
)
Real-time modus
Gebruik spark_conf dit om een updatestroom te configureren voor de realtime-modus:
from pyspark import pipelines as dp
dp.create_sink("my_kafka_sink", "kafka", {
"kafka.bootstrap.servers": broker_address,
"topic": output_topic,
})
@dp.update_flow(
name="my_rtm_flow",
target="my_kafka_sink",
spark_conf={
"pipelines.trigger": "RealTime",
"pipelines.trigger.interval": "5 minutes",
}
)
def my_real_time_flow():
return (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("subscribe", input_topic)
.load()
)
Limitations
- Delta-tabelsinks worden niet ondersteund als doelen voor updatestromen.