Important
REPLACE WHERE 흐름은 베타에 있습니다.
이 페이지에서는 Lakeflow Spark 선언적 파이프라인에서 REPLACE WHERE 흐름을 사용하여 전체 테이블 기록을 다시 처리하지 않고 테이블의 대상 하위 집합을 다시 계산하고 덮어쓰는 방법을 설명합니다. REPLACE WHERE 흐름은 늦게 도착하는 데이터, 업스트림 재처리, 스키마 진화 및 백필을 처리합니다.
REPLACE WHERE 흐름을 사용하여 대상 테이블에 조건자를 정의합니다. 조건자와 일치하는 모든 행은 삭제되고 동일한 조건자 범위에 대한 원본 쿼리를 다시 평가하여 대체됩니다. 조건자와 일치하지 않는 행은 그대로 유지됩니다.
Requirements
REPLACE WHERE 흐름에는 다음과 같은 요구 사항이 있습니다.
- 파이프라인은
PREVIEW채널을 사용해야 합니다. - Databricks는 Unity 카탈로그 및 서버리스 컴퓨팅을 권장합니다. 증분 새로 고침은 서버리스 컴퓨팅에서만 지원됩니다.
REPLACE WHERE 플로우를 사용해야 하는 경우
다음 시나리오에 대해 REPLACE WHERE 흐름을 사용합니다.
- 스트리밍 의미 체계 없이 증분 일괄 처리: 워터마크와 같은 스트리밍 개념을 관리하지 않고 일괄 처리로 새 행을 처리합니다.
- 선택적 재처리: 조건자와 일치하는 행만 다시 계산하고 다른 모든 행은 그대로 둡니다.
-
표준 구체화된 뷰 기능을 넘어서는 시나리오:
- 원본보다 보존 기간이 긴 대상 테이블
- 차원 테이블이 변경되면 다시 계산 방지
- 전체 기록을 다시 계산하지 않고 스키마 진화
REPLACE WHERE 플로 만들기
SQL 또는 Python REPLACE WHERE 흐름을 정의합니다.
SQL
CREATE STREAMING TABLE와 함께 FLOW REPLACE WHERE 절을 인라인으로 사용하세요:
CREATE STREAMING TABLE orders_enriched
FLOW REPLACE WHERE date >= date_add(current_date(), -7) BY NAME
SELECT
o.order_id,
o.date,
o.region,
p.product_name,
o.qty,
o.price
FROM orders_fct o
JOIN product_dim p
ON o.product_id = p.product_id;
또는 긴 형식 CREATE FLOW 구문을 사용합니다.
CREATE STREAMING TABLE orders_enriched;
CREATE FLOW orders_enriched AS
INSERT INTO orders_enriched BY NAME
REPLACE WHERE date >= date_add(current_date(), -7)
SELECT
o.order_id,
o.date,
o.region,
p.product_name,
o.qty,
o.price
FROM orders_fct o
JOIN product_dim p
ON o.product_id = p.product_id;
Python
Python 테이블과 흐름은 단일 문에 정의됩니다. 흐름은 테이블과 동일한 이름을 상속합니다.
from pyspark import pipelines as dp
from pyspark.sql import functions as F
from pyspark.sql.functions import col
@dp.table(
replace_where=col("date") >= F.date_sub(F.current_date(), 7)
)
def orders_enriched():
orders_fct = spark.read.table("orders_fct").select("date", "order_id", "region", "qty", "price")
product_dim = spark.read.table("product_dim")
return orders_fct.join(product_dim, "product_id")
매개 변수는 replace_where PySpark 열 식 또는 문자열 조건자를 허용합니다.
이 예제에서는 지난 7일 동안의 모든 행이 원본 쿼리를 orders_enriched 사용하여 삭제되고 다시 계산됩니다. 소스 쿼리에 조건자를 추가할 필요가 없습니다. 파이프라인 엔진은 원본에서 읽어올 때 이를 자동으로 적용합니다.
메모
BY NAME 는 SQL에 필요합니다. 위치가 아니라 이름을 기준으로 열을 매칭합니다.
기록 데이터 백필
예약된 새로 고침 외부의 대상 테이블에 기록 또는 수정된 행을 쓰려면 기록 데이터가 있는 위치에 따라 다음 두 메커니즘 중에서 선택합니다.
- 조건자 재정의: 일회성 조건자 범위에 대해 흐름의 소스 쿼리를 다시 실행합니다. 기록 데이터가 증분 데이터와 동일한 원본에서 가져온 경우 사용합니다.
- DML 문: 흐름을 우회하여 대상 테이블에 직접 삽입합니다. 기록 데이터가 증분 데이터와 다른 원본에 있을 때 사용합니다.
조건자 재정의
파이프라인 정의를 수정하지 않고 단일 파이프라인 업데이트에 대한 REPLACE WHERE 조건자를 재정의합니다. 조건자 재정의는 일회성이며 현재 업데이트에만 적용되며 향후 실행에는 영향을 주지 않습니다.
예: 초기 이력 데이터 로드
파이프라인을 처음 설정할 때 기록 데이터의 일회성 백필을 수행하려면 다음을 수행합니다.
pipeline_id = "<pipeline-id>"
overrides = [
{
"flow_name": "orders_enriched",
"predicate_override": "date BETWEEN '2020-01-01' AND '2024-12-31'",
}
]
resp = start_update_with_replace_where(
pipeline_id=pipeline_id,
replace_where_overrides=overrides,
)
print(resp)
예: 특정 기간에 대해 열을 수정합니다
열 정의를 업데이트한 후 대상 기록 범위에 대한 변경 내용을 백필합니다.
pipeline_id = "<pipeline-id>"
overrides = [
{
"flow_name": "orders_enriched",
"predicate_override": "date >= date_add(current_date(), -30)",
}
]
resp = start_update_with_replace_where(
pipeline_id=pipeline_id,
replace_where_overrides=overrides,
refresh_selection=["orders_enriched"],
)
print(resp)
단일 조건자 재정의에 여러 차원을 결합합니다.
overrides = [
{
"flow_name": "orders_enriched",
"predicate_override": "date >= date_add(current_date(), -30) AND region = 'asia'",
}
]
도우미 함수: start_update_with_replace_where
노트북에서 파이프라인 업데이트 API를 사용하여 프레디킷 재정의를 제출:
from databricks.sdk import WorkspaceClient
from databricks.sdk.service.pipelines import StartUpdateResponse
def start_update_with_replace_where(
pipeline_id: str,
replace_where_overrides: list[dict],
refresh_selection: list[str] = None,
) -> StartUpdateResponse:
"""Start a pipeline update with REPLACE WHERE predicate overrides."""
client = WorkspaceClient()
body = {
"pipeline_id": pipeline_id,
"cause": "JOB_TASK",
"update_cause_details": {
"job_details": {"performance_target": "PERFORMANCE"}
},
"replace_where_overrides": replace_where_overrides,
}
if refresh_selection:
body["refresh_selection"] = refresh_selection
res = client.api_client.do(
"POST",
f"/api/2.0/pipelines/{pipeline_id}/updates",
body=body,
headers={"Accept": "application/json", "Content-Type": "application/json"},
)
return StartUpdateResponse.from_dict(res)
DML 문
파이프라인 외부에서 대상 테이블에서 직접 DML 문을 실행하여 레거시 테이블의 로드와 같은 초기 로드 또는 수정을 수행합니다.
INSERT INTO orders_enriched
SELECT *
FROM orders_enriched_legacy
WHERE date < '2025-01-01';
DML을 통해 삽입된 행은 REPLACE WHERE 조건자의 적용을 받지 않으며 향후 실행의 조건자 범위에 속하지 않는 한 예약된 새로 고침 간에 유지됩니다.
전체 새로 고침 동작
REPLACE WHERE 흐름의 전체 새로 고침은 현재 조건자만 사용하여 원본 쿼리를 다시 실행합니다. 조건자 재정의 또는 현재 조건자 범위를 벗어난 DML 문에 의해 삽입된 행은 영구적으로 삭제됩니다.
경고
전체 새로 고침은 모든 기존 데이터를 지우고 정의된 조건자만 사용하여 흐름을 다시 실행합니다. 파이프라인이 7일 조건자를 사용하여 1년 동안 실행된 경우 전체 새로 고침을 수행하면 지난 7일간의 데이터만 포함된 테이블이 생성됩니다. 모든 이전 행은 영구적으로 삭제됩니다.
테이블에서 전체 새로 고침을 방지하려면 테이블 속성을 pipelines.reset.allowedfalse.로 설정합니다.
파이프라인 속성 참조를 참조하세요.
증분 새로 고침
REPLACE WHERE 흐름은 가능한 경우 증분 새로 고침을 사용하며, 전체 바꾸기 창을 다시 계산하는 대신 마지막 새로 고침 이후 변경된 원본 데이터만 다시 처리합니다. 증분 새로 고침에는 서버리스 컴퓨팅이 필요합니다.
증분 새로 고침이 적용되는 경우
다음 항목은 모두 true여야 합니다.
- 파이프라인은 서버리스 컴퓨팅에서 실행됩니다.
- 쿼리 셰이프가 지원됩니다. 지원되는 연산자 집합은 증분 새로 고침을 참조하세요.
- 조건자는 원본 테이블의 기본 열을 참조합니다. 집계 또는 윈도 함수 출력과 같은 파생 값에 대한 조건자는 원본 소스로 푸시다운할 수 없으며, 이로 인해 증분 새로 고침이 비활성화됩니다.
- 현재 바꾸기 창에서 외부 DML이 행을 수정하지 않았습니다. 현재 창 외부의 행을 수정하는 DML은 영향을 받지 않습니다.
- 현재 바꾸기 창에는 이전 조건자가 제외한 행이 포함되지 않습니다. 이전에 처리한 적이 없는 범위까지 포함하도록 조건자를 넓히면, 해당 새로 고침은 전체 재계산으로 전환됩니다. 후속 새로 고침은 다시 증분 새로 고침을 적용할 수 있습니다.
- 조건자는 결정적입니다.
rand()와 같은 비결정적 함수를 사용하는 조건자는 증분 새로 고침을 사용할 수 없게 합니다. 임시 함수(예:current_date()허용됨).
모든 흐름의 첫 번째 새로 고침은 항상 전체 계산입니다. 조건이 충족되지 않으면 해당 새로 고침은 현재 바꾸기 창의 전체 다시 계산으로 대체됩니다.
증분 새로 고침에 대한 모범 사례
REPLACE WHERE 흐름이 증분 새로 고침에 적합한 상태로 유지되도록 다음 지침을 따르세요.
가변 하한 사용
이동하는 하한값이 있는 조건식은 증분 새로 고침 대상으로 무기한 유지됩니다.
FLOW REPLACE WHERE date >= date_add(current_date(), -7)
date BETWEEN date_add(current_date(), -7) AND current_date()와 같은 가변 상한은 이전에 제외된 행을 포함하게 윈도우를 이동시켜 한 번만 전체 재계산으로 폴백하게 할 수 있습니다.
GROUP BY에 조건자 열을 포함합니다
집계할 때 엔진이 조건자를 집계 아래로 푸시할 수 있도록 조건자 열을 GROUP BY 포함합니다.
FLOW REPLACE WHERE date >= date_add(current_date(), -7) BY NAME
SELECT date, region, SUM(amount) AS total
FROM sales
GROUP BY date, region;
조건자 컬럼이 GROUP BY에 없으면 조건자를 집계 아래로 푸시다운할 수 없으며 소스 전체를 스캔합니다.
조인 키에 조건자 열을 포함
조인 조건에 조건자 열을 포함하면 엔진이 조인된 모든 소스를 정리할 수 있습니다.
FLOW REPLACE WHERE date >= date_add(current_date(), -7) BY NAME
SELECT f.date, f.user_id, d.region, f.revenue
FROM fact f
JOIN dim d ON f.date = d.date AND f.user_id = d.user_id;
조인된 테이블이 조건자 열을 노출하지 않는 경우 해당 테이블은 새로 고칠 때마다 전체 검사됩니다.
전면 재계산으로 전환되는 현상 진단
새로 고침이 전체 재계산으로 전환되면 해당 흐름의 planning_information 이벤트에서 그 이유가 보고됩니다.
파이프라인 이벤트 로그 모니터링을 참조하세요. 다음 표에서는 이벤트에 보고된 이유를 나열합니다.
| 이유 | Meaning |
|---|---|
EXTERNAL_CHANGE_IN_REPLACE_WINDOW |
외부 DML로 인해 현재 교체 범위의 행이 수정되었습니다. |
REPLACE_WHERE_NOT_DETERMINISTIC |
조건자는 비결정적 식을 사용합니다. |
PRIOR_REPLACE_WHERE_NOT_DETERMINISTIC |
이전 새로 고침에서는 비결정적 조건자를 사용했습니다. |
UNSUPPORTED_REPLACE_WHERE_PREDICATE |
조건자를 원본으로 푸시할 수 없거나, 현재 창에 이전 조건자가 처리하지 않은 행이 포함되거나, 실행에서 조건자 재정의를 사용합니다. |
Limitations
REPLACE WHERE 흐름에는 다음과 같은 제한 사항이 있습니다.
- 대상 테이블은 파이프라인 내에서 만들어야 합니다.
- 대상 테이블당 하나의 REPLACE WHERE 흐름만 허용됩니다.
- REPLACE WHERE 흐름이 대상으로 하는 테이블은 AUTO CDC 흐름 또는 추가 흐름과 같은 다른 흐름 형식의 대상도 지정할 수 없습니다.
- REPLACE WHERE 흐름이 대상으로 하는 테이블에서는 기대치가 지원되지 않습니다.
- Databricks SQL에서 생성된 스트리밍 테이블의 경우 구문 및 백필 차이점은 독립 실행형 스트리밍 테이블용 REPLACE WHERE 흐름을 참조하세요.
예제
다음 예제에서는 일반적인 REPLACE WHERE 흐름 패턴을 보여 줍니다.
예제 1: 제한된 보존 원본에서 기록 집계 유지
이 예제에서는 원시 데이터가 원본 테이블에서 보존 기간(3일)이 지나 삭제된 후에도 일일 집계를 무기한 유지합니다.
SQL
CREATE STREAMING TABLE events_agg
FLOW REPLACE WHERE date >= date_add(current_date(), -3) BY NAME
SELECT
date,
key,
SUM(val) AS agg
FROM events_raw
GROUP BY ALL;
Python
from pyspark import pipelines as dp
from pyspark.sql import functions as F
from pyspark.sql.functions import col
@dp.table(
replace_where=col("date") >= F.date_sub(F.current_date(), 3)
)
def events_agg():
return (
spark.read.table("events_raw")
.groupBy("date", "key")
.agg(F.sum("val").alias("agg"))
)
예제 2: 차원 테이블이 변경되면 다시 계산 방지
이 예제에서는 차원 특성이 변경될 때 기록 팩트 행을 변경하지 않고 유지합니다.
SQL
CREATE STREAMING TABLE fact_dim_join
FLOW REPLACE WHERE f.date >= date_add(current_date(), -1) BY NAME
SELECT
f.date,
f.user_id,
d.region,
f.revenue
FROM fact_table f
JOIN dim_users d
ON f.user_id = d.user_id;
Python
from pyspark import pipelines as dp
from pyspark.sql import functions as F
from pyspark.sql.functions import col
@dp.table(
replace_where=col("date") >= F.date_sub(F.current_date(), 1)
)
def fact_dim_join():
fact_table = spark.read.table("fact_table").alias("f")
dim_users = spark.read.table("dim_users").alias("d")
return (
fact_table.join(dim_users, col("f.user_id") == col("d.user_id"))
.select(
col("f.date"),
col("f.user_id"),
col("d.region"),
col("f.revenue"),
)
)
사용자의 지역이 변경되면 최근 행만 다시 계산됩니다. 기록 행은 기록된 시간에 지역 값을 유지합니다. 기존 행을 수정하려면 predicate overrides를 사용해 대상 지정 백필을 실행하세요.
예제 3: 전체 기록을 다시 계산하지 않고 새 메트릭 추가
이 예제에서는 테이블 정의를 발전시키고 대상 범위만 백필하는 방법을 보여줍니다.
초기 테이블을 정의합니다.
SQL
CREATE STREAMING TABLE clickstream_daily FLOW REPLACE WHERE event_date >= date_add(current_date(), -7) BY NAME SELECT event_date, page_id, COUNT(*) AS clicks FROM clickstream_raw GROUP BY ALL;Python
from pyspark import pipelines as dp from pyspark.sql import functions as F from pyspark.sql.functions import col @dp.table( replace_where=col("event_date") >= F.date_sub(F.current_date(), 7) ) def clickstream_daily(): return ( spark.read.table("clickstream_raw") .groupBy("event_date", "page_id") .agg(F.count("*").alias("clicks")) )다음을 추가하도록 쿼리를 업데이트합니다
uniq_users.SQL
CREATE STREAMING TABLE clickstream_daily FLOW REPLACE WHERE event_date >= date_add(current_date(), -7) BY NAME SELECT event_date, page_id, COUNT(*) AS clicks, COUNT(DISTINCT user_id) AS uniq_users FROM clickstream_raw GROUP BY ALL;Python
@dp.table( replace_where=col("event_date") >= F.date_sub(F.current_date(), 7) ) def clickstream_daily(): return ( spark.read.table("clickstream_raw") .groupBy("event_date", "page_id") .agg( F.count("*").alias("clicks"), F.countDistinct("user_id").alias("uniq_users"), ) )지난 30일 동안 새 메트릭을 백필합니다.
overrides = [ { "flow_name": "clickstream_daily", "predicate_override": "event_date BETWEEN '2026-01-01' AND '2026-01-30'", } ] resp = start_update_with_replace_where( pipeline_id="<pipeline-id>", replace_where_overrides=overrides, refresh_selection=["clickstream_daily"], )백필된 범위보다 오래된 행에는
uniq_users에 대해NULL가 포함됩니다.
예제 4: 전체 기록을 백필하기 전에 작은 창에서 반복
이 예제에서는 전체 기록 범위를 처리하기 전에 작은 데이터 창에서 쿼리 논리의 유효성을 검사하는 방법을 보여줍니다.
짧은 기간 범위로 시작하여 쿼리를 수정하는 동안 각 새로 고침이 최근 7일만 다시 계산하도록 합니다.
SQL
CREATE STREAMING TABLE revenue_attribution
FLOW REPLACE WHERE event_date >= date_add(current_date(), -7) BY NAME
SELECT
event_date,
campaign_id,
SUM(revenue) AS total_revenue
FROM marketing_events
GROUP BY ALL;
Python
from pyspark import pipelines as dp
from pyspark.sql import functions as F
from pyspark.sql.functions import col
@dp.table(
replace_where=col("event_date") >= F.date_sub(F.current_date(), 7)
)
def revenue_attribution():
return (
spark.read.table("marketing_events")
.groupBy("event_date", "campaign_id")
.agg(F.sum("revenue").alias("total_revenue"))
)
쿼리가 확정된 후 조건자 재정의를 사용하여 일회성 과거 데이터 백필을 수행하세요.
overrides = [
{
"flow_name": "revenue_attribution",
"predicate_override": "event_date >= date_add(current_date(), -365)",
}
]
resp = start_update_with_replace_where(
pipeline_id="<pipeline-id>",
replace_where_overrides=overrides,
refresh_selection=["revenue_attribution"],
)