Important
API는 update_flow공개 미리 보기로 제공됩니다.
데코레이터를 @dp.update_flow 사용하여 업데이트 흐름을 만듭니다. 업데이트 흐름은 업데이트 출력 모드를 사용하여 싱크에 쓰여서 각 일괄 처리에서 변경된 행만 내보냅니다.
추가 흐름과 달리 워터마크를 요구하지 않고 상태 저장 집계를 지원합니다.
업데이트 흐름은 싱크만 대상으로 지정할 수 있습니다. 델타 테이블은 지원되지 않습니다.
Syntax
from pyspark import pipelines as dp
dp.create_sink("<sink-name>", "<format>", {"<key>": "<value>"})
@dp.update_flow(
target = "<sink-name>",
name = "<flow-name>", # optional, defaults to function name
spark_conf = {"<key>" : "<value>", "<key>" : "<value>"}, # optional
comment = "<comment>", # optional
import_checkpoint = "<checkpoint-path>") # optional
def <function-name>():
return (<streaming-query>)
Parameters
| 매개 변수 | Type | Description |
|---|---|---|
| 기능 | function |
필수입니다. 사용자 정의 쿼리에서 Apache Spark 스트리밍 DataFrame을 반환하는 함수입니다. |
target |
str |
필수입니다. 이 흐름이 쓰는 싱크의 이름입니다. |
name |
str |
흐름 이름입니다. 제공되지 않으면 기본적으로 함수 이름이 지정됩니다. |
comment |
str |
흐름에 대한 설명입니다. |
spark_conf |
dict |
이 쿼리를 실행하기 위한 Spark 구성의 받아쓰기입니다. 이러한 구성은 대상, 파이프라인 또는 클러스터에 대해 설정된 구성을 재정의합니다. |
import_checkpoint |
str |
흐름을 시작하기 전에 가져올 외부 검사점 경로입니다. 흐름의 검사점 디렉터리가 아직 없는 경우 한 번만 가져옵니다. |
예제
Kafka 싱크에 대한 집계
상태 저장 집계 결과를 Kafka 싱크에 씁니다.
from pyspark import pipelines as dp
from pyspark.sql.functions import col
dp.create_sink("event_counts_sink", "kafka", {
"kafka.bootstrap.servers": broker_address,
"topic": output_topic,
})
@dp.update_flow(
name="event_counts_flow",
target="event_counts_sink",
)
def event_counts():
return (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("subscribe", input_topic)
.load()
.selectExpr("CAST(key AS STRING) AS event_type")
.groupBy(col("event_type"))
.count()
)
실시간 모드
실시간 모드에 대한 업데이트 흐름을 구성하는 데 사용합니다spark_conf.
from pyspark import pipelines as dp
dp.create_sink("my_kafka_sink", "kafka", {
"kafka.bootstrap.servers": broker_address,
"topic": output_topic,
})
@dp.update_flow(
name="my_rtm_flow",
target="my_kafka_sink",
spark_conf={
"pipelines.trigger": "RealTime",
"pipelines.trigger.interval": "5 minutes",
}
)
def my_real_time_flow():
return (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("subscribe", input_topic)
.load()
)
Limitations
- 델타 테이블 싱크는 업데이트 흐름의 대상으로 지원되지 않습니다.