Nota:
El acceso a esta página requiere autorización. Puede intentar iniciar sesión o cambiar directorios.
El acceso a esta página requiere autorización. Puede intentar cambiar los directorios.
Important
Los flujos REPLACE WHERE están en beta.
En esta página se describe cómo usar flujos REPLACE WHERE en canalizaciones declarativas de Spark de Lakeflow para volver a calcular y sobrescribir un subconjunto de destino de una tabla sin volver a procesar todo el historial de tablas. Los flujos REPLACE WHERE controlan los datos de llegada tardía, el reprocesamiento ascendente, la evolución del esquema y los rerrellenos.
Con un flujo REPLACE WHERE , se define un predicado en la tabla de destino. Todas las filas que coinciden con el predicado se eliminan y se sustituyen volviendo a evaluar la consulta de origen para ese mismo rango del predicado. Las filas que no coinciden con el predicado permanecen intactas.
Requirements
Los flujos REPLACE WHERE tienen los siguientes requisitos:
- La canalización debe usar el canal
PREVIEW. - Databricks recomienda Unity Catalog y la computación sin servidor. La actualización incremental solo se admite en el proceso de cálculo sin servidor.
Cuándo usar flujos REPLACE WHERE
Use flujos REPLACE WHERE para los escenarios siguientes:
- Procesamiento por lotes incremental sin semántica de streaming: procese nuevas filas en lotes sin administrar conceptos de streaming como marcas de agua.
- Reprocesamiento selectivo: recompute solo las filas que coinciden con un predicado mientras dejan sin modificar todas las demás filas.
-
Escenarios más allá de las funcionalidades de vista materializadas estándar:
- Tablas de destino con retención más larga que el origen
- Impedir la recomputación cuando cambia una tabla de dimensiones
- Evolución del esquema sin volver a calcular todo el historial
Crear un flujo REPLACE WHERE
Defina los flujos de REPLACE WHERE en SQL o Python.
SQL
Use la cláusula FLOW REPLACE WHERE en línea con CREATE STREAMING TABLE:
CREATE STREAMING TABLE orders_enriched
FLOW REPLACE WHERE date >= date_add(current_date(), -7) BY NAME
SELECT
o.order_id,
o.date,
o.region,
p.product_name,
o.qty,
o.price
FROM orders_fct o
JOIN product_dim p
ON o.product_id = p.product_id;
Como alternativa, use la sintaxis de formato CREATE FLOW largo:
CREATE STREAMING TABLE orders_enriched;
CREATE FLOW orders_enriched AS
INSERT INTO orders_enriched BY NAME
REPLACE WHERE date >= date_add(current_date(), -7)
SELECT
o.order_id,
o.date,
o.region,
p.product_name,
o.qty,
o.price
FROM orders_fct o
JOIN product_dim p
ON o.product_id = p.product_id;
Python
En Python, la tabla y el flujo se definen en una sola instrucción. El flujo hereda el mismo nombre que la tabla:
from pyspark import pipelines as dp
from pyspark.sql import functions as F
from pyspark.sql.functions import col
@dp.table(
replace_where=col("date") >= F.date_sub(F.current_date(), 7)
)
def orders_enriched():
orders_fct = spark.read.table("orders_fct").select("date", "order_id", "region", "qty", "price")
product_dim = spark.read.table("product_dim")
return orders_fct.join(product_dim, "product_id")
El replace_where parámetro acepta una expresión de columna pySpark o un predicado de cadena.
En estos ejemplos, todas las filas de los últimos 7 días se eliminan de orders_enriched y se vuelven a calcular utilizando la consulta de origen. No es necesario agregar el predicado a la consulta de origen. El motor de canalización lo aplica automáticamente al leer desde el origen.
Note
BY NAME es necesario en SQL. Hace coincidir las columnas por nombre en lugar de por posición.
Reposición de datos históricos
Para escribir filas históricas o corregidas en la tabla de destino fuera de las actualizaciones programadas, elija entre dos mecanismos en función de dónde residen los datos históricos:
- Anulaciones de predicado: Volver a ejecutar la consulta de origen del flujo para un rango de predicado de una sola vez. Use cuando los datos históricos proceden del mismo origen que los datos incrementales.
- Instrucciones DML: Insertar directamente en la tabla de destino, omitiendo el flujo. Use cuando los datos históricos residen en un origen diferente al de los datos incrementales.
Anulaciones de predicados
Invalide el predicado REPLACE WHERE para una única actualización de canalización sin modificar la definición de canalización. Las anulaciones de predicado son de un solo uso, solo se aplican a la actualización actual y no afectan a ejecuciones futuras.
Ejemplo: Carga histórica inicial
Para realizar un rellenado retrospectivo único de datos históricos al configurar por primera vez un flujo de datos:
pipeline_id = "<pipeline-id>"
overrides = [
{
"flow_name": "orders_enriched",
"predicate_override": "date BETWEEN '2020-01-01' AND '2024-12-31'",
}
]
resp = start_update_with_replace_where(
pipeline_id=pipeline_id,
replace_where_overrides=overrides,
)
print(resp)
Ejemplo: Corregir una columna durante un período específico
Después de actualizar una definición de columna, rellene el cambio para un intervalo histórico de destino:
pipeline_id = "<pipeline-id>"
overrides = [
{
"flow_name": "orders_enriched",
"predicate_override": "date >= date_add(current_date(), -30)",
}
]
resp = start_update_with_replace_where(
pipeline_id=pipeline_id,
replace_where_overrides=overrides,
refresh_selection=["orders_enriched"],
)
print(resp)
Combinar varias dimensiones en una sola anulación de predicado:
overrides = [
{
"flow_name": "orders_enriched",
"predicate_override": "date >= date_add(current_date(), -30) AND region = 'asia'",
}
]
Función auxiliar: start_update_with_replace_where
Utilice la API de actualización de canalización desde un cuaderno para enviar invalidaciones de predicados:
from databricks.sdk import WorkspaceClient
from databricks.sdk.service.pipelines import StartUpdateResponse
def start_update_with_replace_where(
pipeline_id: str,
replace_where_overrides: list[dict],
refresh_selection: list[str] = None,
) -> StartUpdateResponse:
"""Start a pipeline update with REPLACE WHERE predicate overrides."""
client = WorkspaceClient()
body = {
"pipeline_id": pipeline_id,
"cause": "JOB_TASK",
"update_cause_details": {
"job_details": {"performance_target": "PERFORMANCE"}
},
"replace_where_overrides": replace_where_overrides,
}
if refresh_selection:
body["refresh_selection"] = refresh_selection
res = client.api_client.do(
"POST",
f"/api/2.0/pipelines/{pipeline_id}/updates",
body=body,
headers={"Accept": "application/json", "Content-Type": "application/json"},
)
return StartUpdateResponse.from_dict(res)
Instrucciones DML
Ejecute instrucciones DML directamente en la tabla de destino desde fuera de la canalización para realizar cargas iniciales o correcciones, como la carga desde una tabla heredada:
INSERT INTO orders_enriched
SELECT *
FROM orders_enriched_legacy
WHERE date < '2025-01-01';
Las filas insertadas a través de DML no están sujetas al predicado REPLACE WHERE y se conservan en las actualizaciones programadas a menos que se encuentren dentro del intervalo de predicado de una ejecución futura.
Comportamiento de actualización completa
Una actualización completa de un flujo REPLACE WHERE vuelve a ejecutar la consulta de origen con solo el predicado actual. Las filas que se insertaron mediante anulaciones de predicado o instrucciones DML fuera del rango actual del predicado se eliminan permanentemente.
Advertencia
Una actualización completa borra todos los datos existentes y vuelve a ejecutar el flujo con solo su predicado definido. Si una canalización ha estado ejecutándose durante un año con un predicado de 7 días, una actualización completa hace que la tabla contenga solo los datos de los últimos 7 días. Todas las filas anteriores se eliminan permanentemente.
Para evitar actualizaciones completas en una tabla, establezca la propiedad de la tabla pipelines.reset.allowed en false. Consulte Referencia de propiedades de canalización.
Actualización incremental
Los flujos REPLACE WHERE usan la actualización incremental siempre que sea posible, reprocesando solo los datos de origen que han cambiado desde la última actualización en lugar de volver a calcular toda la ventana de reemplazo. La actualización incremental requiere un proceso sin servidor.
Cuando se aplica la actualización incremental
Todo lo siguiente debe ser verdadero:
- La canalización se ejecuta en computación sin servidor.
- Se admite la forma de consulta. Consulte Actualización incremental para el conjunto de operadores admitidos.
- El predicado hace referencia a columnas base de una tabla de origen. Los predicados sobre valores derivados, como los resultados de funciones de agregado o de ventana, no se pueden aplicar en el origen, lo que desactiva la actualización incremental.
- Ninguna fila ha sido modificada por DML externo en la ventana de reemplazo actual. DML que modifica filas fuera de la ventana actual no se ve afectada.
- La ventana de reemplazo actual no incluye filas excluidas del predicado anterior. Si se amplía el predicado para abarcar un intervalo que no se había procesado previamente, esa actualización recurre de nuevo a la recomputación completa. Las actualizaciones posteriores son aptas para la actualización incremental de nuevo.
- El predicado es determinista. Los predicados que usan funciones no deterministas, como
rand(), deshabilitan la actualización incremental. Se permiten funciones temporales comocurrent_date().
La primera actualización de cualquier flujo siempre es un cálculo completo. Si no se cumple alguna condición, esa actualización vuelve a la recomputación completa de la ventana de reemplazo actual.
Procedimientos recomendados para la actualización incremental
Siga estas instrucciones para que los flujos REPLACE WHERE sigan siendo aptos para la actualización incremental.
Usar un límite inferior móvil
Los predicados con un límite inferior móvil siguen siendo aptos para la actualización incremental indefinidamente.
FLOW REPLACE WHERE date >= date_add(current_date(), -7)
Un límite superior móvil, como date BETWEEN date_add(current_date(), -7) AND current_date(), puede desplazar la ventana para incluir filas que antes quedaban excluidas, lo que provoca un recurso puntual al recálculo completo.
Incluir la columna de predicado en GROUP BY
Al realizar una agregación, incluya la columna del predicado en GROUP BY para que el motor pueda empujar el predicado por debajo de la agregación.
FLOW REPLACE WHERE date >= date_add(current_date(), -7) BY NAME
SELECT date, region, SUM(amount) AS total
FROM sales
GROUP BY date, region;
Si falta la columna del predicado en GROUP BY, el predicado no se puede desplazar por debajo de la agregación y la fuente se escanea por completo.
Incluir la columna de predicado en claves de combinación
Incluya la columna del predicado en la condición de unión para que el motor pueda descartar todas las fuentes unidas.
FLOW REPLACE WHERE date >= date_add(current_date(), -7) BY NAME
SELECT f.date, f.user_id, d.region, f.revenue
FROM fact f
JOIN dim d ON f.date = d.date AND f.user_id = d.user_id;
Si una tabla combinada no expone la columna de predicado, esa tabla se examina en su totalidad en cada actualización.
Diagnosticar el recurso a la recomputación completa
Cuando una actualización recurre a una recomputación completa, el motivo se informa en el evento planning_information del flujo. Consulte Supervisión de los registros de eventos de canalización. En la tabla siguiente se enumeran los motivos notificados en el evento:
| Motivo | Meaning |
|---|---|
EXTERNAL_CHANGE_IN_REPLACE_WINDOW |
Una instrucción DML externa modificó filas en la ventana de reemplazo actual. |
REPLACE_WHERE_NOT_DETERMINISTIC |
El predicado usa expresiones no deterministas. |
PRIOR_REPLACE_WHERE_NOT_DETERMINISTIC |
La actualización anterior usó un predicado no determinista. |
UNSUPPORTED_REPLACE_WHERE_PREDICATE |
El predicado no se puede aplicar a ningún origen, la ventana actual contiene filas no procesadas por el predicado anterior o la ejecución usa una anulación del predicado. |
Limitaciones
Los flujos REPLACE WHERE tienen las siguientes limitaciones:
- La tabla de destino debe crearse dentro de la canalización.
- Solo se permite un flujo REPLACE WHERE por tabla de destino.
- Una tabla dirigida por un flujo REPLACE WHERE tampoco puede ser objeto de otro tipo de flujo, como un flujo CDC AUTOMÁTICO o un flujo de anexión.
- Las expectativas no son compatibles con las tablas de destino de los flujos REPLACE WHERE.
- Para las tablas de streaming creadas en Databricks SQL, consulte flujos REPLACE WHERE para tablas de streaming independientes para obtener información sobre la sintaxis y las diferencias de relleno retrospectivo.
Ejemplos
En los ejemplos siguientes se muestran patrones de flujo REPLACE WHERE comunes.
Ejemplo 1: Mantener agregados históricos de un origen de retención limitado
Este ejemplo mantiene agregados diarios de forma indefinida, incluso después de que los datos brutos caduquen en la tabla de origen (retención de 3 días):
SQL
CREATE STREAMING TABLE events_agg
FLOW REPLACE WHERE date >= date_add(current_date(), -3) BY NAME
SELECT
date,
key,
SUM(val) AS agg
FROM events_raw
GROUP BY ALL;
Python
from pyspark import pipelines as dp
from pyspark.sql import functions as F
from pyspark.sql.functions import col
@dp.table(
replace_where=col("date") >= F.date_sub(F.current_date(), 3)
)
def events_agg():
return (
spark.read.table("events_raw")
.groupBy("date", "key")
.agg(F.sum("val").alias("agg"))
)
Ejemplo 2: Evitar la recomputación cuando cambia una tabla de dimensiones
En este ejemplo se mantienen las filas de hechos históricas sin cambios cuando cambian los atributos de dimensión:
SQL
CREATE STREAMING TABLE fact_dim_join
FLOW REPLACE WHERE f.date >= date_add(current_date(), -1) BY NAME
SELECT
f.date,
f.user_id,
d.region,
f.revenue
FROM fact_table f
JOIN dim_users d
ON f.user_id = d.user_id;
Python
from pyspark import pipelines as dp
from pyspark.sql import functions as F
from pyspark.sql.functions import col
@dp.table(
replace_where=col("date") >= F.date_sub(F.current_date(), 1)
)
def fact_dim_join():
fact_table = spark.read.table("fact_table").alias("f")
dim_users = spark.read.table("dim_users").alias("d")
return (
fact_table.join(dim_users, col("f.user_id") == col("d.user_id"))
.select(
col("f.date"),
col("f.user_id"),
col("d.region"),
col("f.revenue"),
)
)
Si cambia la región de un usuario, solo se vuelven a calcular las filas recientes. Las filas históricas conservan el valor de región en el momento en que se escribieron. Para corregir las filas históricas, ejecute un rellenado retroactivo específico mediante anulaciones de predicado.
Ejemplo 3: Agregar una nueva métrica sin volver a calcular el historial completo
En este ejemplo se muestra cómo evolucionar una definición de tabla y rellenar solo un intervalo de destino:
Defina la tabla inicial:
SQL
CREATE STREAMING TABLE clickstream_daily FLOW REPLACE WHERE event_date >= date_add(current_date(), -7) BY NAME SELECT event_date, page_id, COUNT(*) AS clicks FROM clickstream_raw GROUP BY ALL;Python
from pyspark import pipelines as dp from pyspark.sql import functions as F from pyspark.sql.functions import col @dp.table( replace_where=col("event_date") >= F.date_sub(F.current_date(), 7) ) def clickstream_daily(): return ( spark.read.table("clickstream_raw") .groupBy("event_date", "page_id") .agg(F.count("*").alias("clicks")) )Actualice la consulta para agregar
uniq_users:SQL
CREATE STREAMING TABLE clickstream_daily FLOW REPLACE WHERE event_date >= date_add(current_date(), -7) BY NAME SELECT event_date, page_id, COUNT(*) AS clicks, COUNT(DISTINCT user_id) AS uniq_users FROM clickstream_raw GROUP BY ALL;Python
@dp.table( replace_where=col("event_date") >= F.date_sub(F.current_date(), 7) ) def clickstream_daily(): return ( spark.read.table("clickstream_raw") .groupBy("event_date", "page_id") .agg( F.count("*").alias("clicks"), F.countDistinct("user_id").alias("uniq_users"), ) )Rellene la nueva métrica durante los últimos 30 días:
overrides = [ { "flow_name": "clickstream_daily", "predicate_override": "event_date BETWEEN '2026-01-01' AND '2026-01-30'", } ] resp = start_update_with_replace_where( pipeline_id="<pipeline-id>", replace_where_overrides=overrides, refresh_selection=["clickstream_daily"], )Las filas anteriores al intervalo rellenado contienen
NULLparauniq_users.
Ejemplo 4: Iteración en una ventana pequeña antes de rellenar el historial completo
En este ejemplo se muestra cómo validar la lógica de consulta en una ventana de datos pequeña antes de procesar el intervalo histórico completo.
Comience con una ventana corta para que cada actualización vuelva a calcular solo los últimos 7 días mientras revisa la consulta:
SQL
CREATE STREAMING TABLE revenue_attribution
FLOW REPLACE WHERE event_date >= date_add(current_date(), -7) BY NAME
SELECT
event_date,
campaign_id,
SUM(revenue) AS total_revenue
FROM marketing_events
GROUP BY ALL;
Python
from pyspark import pipelines as dp
from pyspark.sql import functions as F
from pyspark.sql.functions import col
@dp.table(
replace_where=col("event_date") >= F.date_sub(F.current_date(), 7)
)
def revenue_attribution():
return (
spark.read.table("marketing_events")
.groupBy("event_date", "campaign_id")
.agg(F.sum("revenue").alias("total_revenue"))
)
Una vez finalizada la consulta, use una anulación del predicado para realizar una carga histórica puntual:
overrides = [
{
"flow_name": "revenue_attribution",
"predicate_override": "event_date >= date_add(current_date(), -365)",
}
]
resp = start_update_with_replace_where(
pipeline_id="<pipeline-id>",
replace_where_overrides=overrides,
refresh_selection=["revenue_attribution"],
)