Important
API update_flow 处于 公开预览版。
@dp.update_flow使用修饰器创建更新流。 使用更新输出模式将更新流写入接收器,只发出每个批处理中已更改的行。 与 追加流不同,它们支持有状态聚合,而无需水印。
更新流只能面向接收器。 不支持增量表。
Syntax
from pyspark import pipelines as dp
dp.create_sink("<sink-name>", "<format>", {"<key>": "<value>"})
@dp.update_flow(
target = "<sink-name>",
name = "<flow-name>", # optional, defaults to function name
spark_conf = {"<key>" : "<value>", "<key>" : "<value>"}, # optional
comment = "<comment>", # optional
import_checkpoint = "<checkpoint-path>") # optional
def <function-name>():
return (<streaming-query>)
Parameters
| 参数 | 类型 | Description |
|---|---|---|
| 函数 | function |
必填。 从用户定义的查询返回 Apache Spark 流式处理数据帧的函数。 |
target |
str |
必填。 此流写入的接收器的名称。 |
name |
str |
流名称。 如果未提供,则默认为函数名称。 |
comment |
str |
流程的描述。 |
spark_conf |
dict |
用于执行此查询的 Spark 配置的听写。 这些配置替代为目标、管道或群集设置的授予。 |
import_checkpoint |
str |
启动流之前要导入的外部检查点路径。 仅当流的检查点目录尚不存在时导入一次。 |
示例
到 Kafka 接收器的聚合
将有状态聚合结果写入 Kafka 接收器:
from pyspark import pipelines as dp
from pyspark.sql.functions import col
dp.create_sink("event_counts_sink", "kafka", {
"kafka.bootstrap.servers": broker_address,
"topic": output_topic,
})
@dp.update_flow(
name="event_counts_flow",
target="event_counts_sink",
)
def event_counts():
return (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("subscribe", input_topic)
.load()
.selectExpr("CAST(key AS STRING) AS event_type")
.groupBy(col("event_type"))
.count()
)
实时模式
用于 spark_conf 为 实时模式配置更新流:
from pyspark import pipelines as dp
dp.create_sink("my_kafka_sink", "kafka", {
"kafka.bootstrap.servers": broker_address,
"topic": output_topic,
})
@dp.update_flow(
name="my_rtm_flow",
target="my_kafka_sink",
spark_conf={
"pipelines.trigger": "RealTime",
"pipelines.trigger.interval": "5 minutes",
}
)
def my_real_time_flow():
return (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("subscribe", input_topic)
.load()
)
局限性
- 不支持增量表接收器作为更新流的目标。