Nota
L'accesso a questa pagina richiede l'autorizzazione. È possibile provare ad accedere o modificare le directory.
L'accesso a questa pagina richiede l'autorizzazione. È possibile provare a modificare le directory.
Importante
L'API update_flow è disponibile in anteprima pubblica.
Usare l'elemento @dp.update_flow Decorator per creare un flusso di aggiornamento. I flussi di aggiornamento scrivono in sink usando la modalità di output degli aggiornamenti, generando solo le righe modificate in ogni batch. A differenza dei flussi di accodamento, supportano aggregazioni con stato senza richiedere una filigrana.
I flussi di aggiornamento possono essere destinati solo ai sink. Le tabelle delta non sono supportate.
Syntax
from pyspark import pipelines as dp
dp.create_sink("<sink-name>", "<format>", {"<key>": "<value>"})
@dp.update_flow(
target = "<sink-name>",
name = "<flow-name>", # optional, defaults to function name
spark_conf = {"<key>" : "<value>", "<key>" : "<value>"}, # optional
comment = "<comment>", # optional
import_checkpoint = "<checkpoint-path>") # optional
def <function-name>():
return (<streaming-query>)
Parameters
| Parametro | Tipo | Description |
|---|---|---|
| funzione | function |
Obbligatorio. Funzione che restituisce un dataframe di streaming Apache Spark da una query definita dall'utente. |
target |
str |
Obbligatorio. Nome del sink in cui scrive il flusso. |
name |
str |
Nome del flusso. Se non specificato, per impostazione predefinita viene impostato il nome della funzione. |
comment |
str |
Descrizione del flusso. |
spark_conf |
dict |
Una deviazione delle configurazioni di Spark per l'esecuzione di questa query. Queste configurazioni eseguono l'override di confs impostate per la destinazione, la pipeline o il cluster. |
import_checkpoint |
str |
Percorso del checkpoint esterno da importare prima di avviare il flusso. Importato una sola volta, quando la directory del checkpoint del flusso non esiste ancora. |
Examples
Aggregazione in un sink Kafka
Scrivere risultati di aggregazione con stato in un sink Kafka:
from pyspark import pipelines as dp
from pyspark.sql.functions import col
dp.create_sink("event_counts_sink", "kafka", {
"kafka.bootstrap.servers": broker_address,
"topic": output_topic,
})
@dp.update_flow(
name="event_counts_flow",
target="event_counts_sink",
)
def event_counts():
return (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("subscribe", input_topic)
.load()
.selectExpr("CAST(key AS STRING) AS event_type")
.groupBy(col("event_type"))
.count()
)
Modalità in tempo reale
Usare spark_conf per configurare un flusso di aggiornamento per la modalità in tempo reale:
from pyspark import pipelines as dp
dp.create_sink("my_kafka_sink", "kafka", {
"kafka.bootstrap.servers": broker_address,
"topic": output_topic,
})
@dp.update_flow(
name="my_rtm_flow",
target="my_kafka_sink",
spark_conf={
"pipelines.trigger": "RealTime",
"pipelines.trigger.interval": "5 minutes",
}
)
def my_real_time_flow():
return (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("subscribe", input_topic)
.load()
)
Limitations
- I sink di tabella delta non sono supportati come destinazioni per i flussi di aggiornamento.