update_flow

Important

API는 update_flow공개 미리 보기로 제공됩니다.

데코레이터를 @dp.update_flow 사용하여 업데이트 흐름을 만듭니다. 업데이트 흐름은 업데이트 출력 모드를 사용하여 싱크에 쓰여서 각 일괄 처리에서 변경된 행만 내보냅니다. 추가 흐름과 달리 워터마크를 요구하지 않고 상태 저장 집계를 지원합니다.

업데이트 흐름은 싱크만 대상으로 지정할 수 있습니다. 델타 테이블은 지원되지 않습니다.

Syntax

from pyspark import pipelines as dp

dp.create_sink("<sink-name>", "<format>", {"<key>": "<value>"})

@dp.update_flow(
  target = "<sink-name>",
  name = "<flow-name>", # optional, defaults to function name
  spark_conf = {"<key>" : "<value>", "<key>" : "<value>"}, # optional
  comment = "<comment>", # optional
  import_checkpoint = "<checkpoint-path>") # optional
def <function-name>():
  return (<streaming-query>)

Parameters

매개 변수 Type Description
기능 function 필수입니다. 사용자 정의 쿼리에서 Apache Spark 스트리밍 DataFrame을 반환하는 함수입니다.
target str 필수입니다. 이 흐름이 쓰는 싱크의 이름입니다.
name str 흐름 이름입니다. 제공되지 않으면 기본적으로 함수 이름이 지정됩니다.
comment str 흐름에 대한 설명입니다.
spark_conf dict 이 쿼리를 실행하기 위한 Spark 구성의 받아쓰기입니다. 이러한 구성은 대상, 파이프라인 또는 클러스터에 대해 설정된 구성을 재정의합니다.
import_checkpoint str 흐름을 시작하기 전에 가져올 외부 검사점 경로입니다. 흐름의 검사점 디렉터리가 아직 없는 경우 한 번만 가져옵니다.

예제

Kafka 싱크에 대한 집계

상태 저장 집계 결과를 Kafka 싱크에 씁니다.

from pyspark import pipelines as dp
from pyspark.sql.functions import col

dp.create_sink("event_counts_sink", "kafka", {
    "kafka.bootstrap.servers": broker_address,
    "topic": output_topic,
})

@dp.update_flow(
    name="event_counts_flow",
    target="event_counts_sink",
)
def event_counts():
    return (
        spark.readStream
            .format("kafka")
            .option("kafka.bootstrap.servers", broker_address)
            .option("subscribe", input_topic)
            .load()
            .selectExpr("CAST(key AS STRING) AS event_type")
            .groupBy(col("event_type"))
            .count()
    )

실시간 모드

실시간 모드에 대한 업데이트 흐름을 구성하는 데 사용합니다spark_conf.

from pyspark import pipelines as dp

dp.create_sink("my_kafka_sink", "kafka", {
    "kafka.bootstrap.servers": broker_address,
    "topic": output_topic,
})

@dp.update_flow(
    name="my_rtm_flow",
    target="my_kafka_sink",
    spark_conf={
        "pipelines.trigger": "RealTime",
        "pipelines.trigger.interval": "5 minutes",
    }
)
def my_real_time_flow():
    return (
        spark.readStream
            .format("kafka")
            .option("kafka.bootstrap.servers", broker_address)
            .option("subscribe", input_topic)
            .load()
    )

Limitations

  • 델타 테이블 싱크는 업데이트 흐름의 대상으로 지원되지 않습니다.