Aplicar marcas de agua para controlar los umbrales de procesamiento de datos

En esta página se describen los conceptos de marcado de agua y se ofrecen recomendaciones para usar el marcado de agua en operaciones habituales de streaming con estado.

Las consultas de streaming acumulan datos de estado a lo largo del tiempo. Las marcas de agua quitan automáticamente los datos de estado antiguos para evitar errores de memoria y aumentar la latencia de procesamiento.

¿Qué es una marca de agua?

Durante el procesamiento, Structured Streaming mantiene el estado entre microprocesos. Las consultas de streaming usan el estado para actualizar incrementalmente los resultados en lugar de volver a calcular todo después de cada microproceso. Los umbrales controlan el valor a partir del cual una consulta deja de procesarse sobre una entidad con estado.

Entre los ejemplos comunes de entidades de estado se incluyen:

  • Agregaciones a lo largo de una ventana de tiempo.
  • Claves únicas en una unión entre dos flujos.

Para declarar una marca de agua en un DataFrame de streaming, especifique un campo de marca de tiempo y un umbral de latencia. A medida que llegan nuevos datos, el administrador de estado realiza un seguimiento de la marca de tiempo más reciente en el campo especificado y procesa solo los registros dentro del umbral de latencia.

Las consultas siempre procesan los registros que llegan dentro del umbral. Es posible que las consultas sigan procesando registros que llegan fuera del umbral, pero esto no está garantizado.

En el ejemplo siguiente se aplica un umbral de marca de agua de 10 minutos a un recuento con ventanas:

Python

from pyspark.sql.functions import window

(df
  .withWatermark("event_time", "10 minutes")
  .groupBy(
    window("event_time", "5 minutes"),
    "id")
  .count()
)

Scala

import org.apache.spark.sql.functions.window

df
  .withWatermark("event_time", "10 minutes")
  .groupBy(
    window($"event_time", "5 minutes"),
    $"id")
  .count()

En este ejemplo:

  • La columna event_time se usa para definir una marca de agua de 10 minutos y una ventana de tiempo fija de 5 minutos.
  • Se recoge un recuento por cada id observado en cada intervalo de 5 minutos no superpuesto.
  • La información de estado se mantiene para cada conteo hasta que el final de la ventana sea 10 minutos anterior al último event_time observado.

Importante

En una operación groupBy() y window(), haga referencia a las columnas por nombre, "<colName>" o col("<colName>"), para garantizar que se conserve la marca temporal del evento. En Scala, también puede usar $colName.

¿Cómo afectan las marcas de agua al tiempo de procesamiento y al rendimiento?

Los modos de salida controlan cuándo una consulta con marcas de agua escribe datos en el receptor. Las marcas de agua son esenciales para el control de rendimiento en streaming con estado, ya que reducen la cantidad total de información de estado en la memoria. No todos los modos de salida son compatibles con todas las operaciones con estado. Consulte Watermarks y modo de salida para las agregaciones en ventanas.

La selección de una duración de marca de agua tiene desventajas:

  • Las marcas de agua más cortas reducen la latencia de las consultas porque estas almacenan menos información de estado y escriben los resultados después de que finaliza cada intervalo de marca de agua. Sin embargo, las marcas de agua cortas tienen poca tolerancia a los datos tardíos.
  • Las marcas de agua más largas tienen una alta tolerancia a los datos atrasados. Sin embargo, las marcas de agua largas aumentan la latencia de las consultas porque estas deben almacenar más información de estado y esperar más tiempo antes de escribir los resultados cuando la duración de la marca de agua es mayor.

Marcas de agua y modo de salida para agregaciones con ventanas

En la tabla siguiente se muestra el comportamiento de procesamiento de las consultas con agregación sobre una marca de tiempo y una marca de agua:

Modo de salida Comportamiento
Agregar La consulta escribe filas en la tabla de destino una vez superado el umbral de la marca temporal. Todas las escrituras de datos se retrasan en función del umbral de tiempo. El estado de agregación anterior se quita después de que se haya superado el umbral.
Actualizar La consulta escribe filas en la tabla de destino a medida que se calculan los resultados y la consulta puede actualizar y sobrescribir filas a medida que llegan nuevos datos. El estado de agregación anterior se quita después de que se haya superado el umbral.
Completado El estado de agregación no se elimina. La consulta vuelve a escribir la tabla de destino para cada desencadenador.

Marcas temporales de agua y modos de salida para uniones entre flujos

Las uniones entre varios flujos solo admiten el modo append. Las consultas escriben registros coincidentes para cada lote.

Para las uniones internas, Databricks recomienda configurar un umbral de marca de agua en cada origen de datos en streaming para que la consulta pueda descartar la información de estado de los registros antiguos. Sin marcas de agua, Structured Streaming intenta unir todas las claves de ambos lados de la unión en cada disparador, lo que podría afectar al rendimiento.

Para las uniones externas, es obligatorio el uso de marcas de agua. Cuando un registro no coincide, la consulta escribe un valor NULL para esa clave. Dado que las combinaciones solo admiten el modo de anexión, los registros no coincidentes no se escriben hasta que supere el umbral de latencia.

Controlar el umbral de datos atrasados con una política de múltiples marcas de agua

Con varias entradas de Structured Streaming, puede configurar varias marcas de agua para controlar los umbrales de tolerancia de los datos tardíos. Las marcas de agua permiten controlar la información de estado y la latencia.

Una consulta de streaming puede tener varios flujos de entrada que se unen o se combinan. En el caso de las operaciones con estado, cada uno de los flujos de entrada podría requerir un umbral diferente para tolerar datos tardíos. Especifique estos umbrales mediante withWatermark("eventTime", delay) en cada flujo de entrada. A continuación se muestra una consulta de ejemplo en la que se usan combinaciones de flujos de datos.

Python

input_stream1 = ...      # delays up to 1 hour
input_stream2 = ...      # delays up to 2 hours

(input_stream1.withWatermark("eventTime1", "1 hour")
  .join(
    input_stream2.withWatermark("eventTime2", "2 hours"),
    joinCondition)
)

Scala

val inputStream1 = ...      // delays up to 1 hour
val inputStream2 = ...      // delays up to 2 hours

inputStream1.withWatermark("eventTime1", "1 hour")
  .join(
    inputStream2.withWatermark("eventTime2", "2 hours"),
    joinCondition)

Al ejecutar la consulta con operaciones con estado, Structured Streaming realiza un seguimiento individual del tiempo máximo de evento de cada flujo de entrada, calcula las marcas de agua en función del retraso correspondiente y determina una única marca de agua global. De forma predeterminada, Structured Streaming usa el mínimo como marca de agua global. Si un flujo se retrasa respecto a los demás, una marca de agua mínima global impide que la consulta marque accidentalmente los datos como tardíos. Por ejemplo, esto puede ocurrir cuando uno de los flujos deja de recibir datos debido a fallos en origen. El watermark global avanza de forma segura al ritmo del flujo más lento y retrasa el resultado de la consulta cuando es necesario.

Para reducir la latencia, configure spark.sql.streaming.multipleWatermarkPolicy como max (el valor predeterminado es min) para usar la marca de agua del flujo más rápido como marca de agua global. Sin embargo, el uso de esta configuración resulta en pérdidas de datos de las secuencias más lentas. Databricks recomienda aplicar esta configuración con precaución.

Aplicar marcas de agua a operaciones distintas

La operación distinct hace un seguimiento de cada registro único en el estado. Sin una marca de agua, el estado crece indefinidamente y puede causar problemas de memoria. Especifique un watermark en un campo de marca de tiempo para delimitar el estado y eliminar los registros antiguos una vez superado el umbral.

En el ejemplo siguiente se aplica una marca de agua a una distinct operación:

Python

streamingDf = spark.readStream. ...  # columns: eventTime, id, value, ...

# Apply watermark before distinct operation
(streamingDf
  .withWatermark("eventTime", "1 hour")
  .distinct()
)

Scala

val streamingDf = spark.readStream. ...  // columns: eventTime, id, value, ...

// Apply watermark before distinct operation
streamingDf
  .withWatermark("eventTime", "1 hour")
  .distinct()

En este ejemplo, la consulta de streaming quita los registros duplicados que llegan en un plazo de 1 hora a partir de la última observación observada eventTime. La consulta descarta la información de estado para la desduplicación una vez superado el umbral.

Importante

Para desduplicar columnas específicas en lugar de todas las columnas, use dropDuplicates() o dropDuplicatesWithinWatermark() en lugar de distinct. Consultar Eliminar duplicados en la marca de agua.

Eliminar duplicados dentro del watermark

En Databricks Runtime 13.3 LTS o posterior, puede usar un identificador único para eliminar duplicados de registros dentro de un umbral de watermark.

Structured Streaming garantiza el procesamiento de exactamente una vez, pero no elimina los registros duplicados procedentes de las fuentes de datos. Use dropDuplicatesWithinWatermark para quitar duplicados en cualquier campo, incluso cuando los campos difieren entre registros duplicados, como la hora del evento o la hora de llegada.

Con dropDuplicatesWithinWatermark, las consultas siempre eliminan los registros duplicados que llegan dentro del umbral de la marca de agua. Las consultas también pueden eliminar duplicados de registros que lleguen fuera de ese umbral, pero no se garantiza. Para garantizar que las consultas eliminen todos los duplicados, establezca el umbral de watermark de modo que sea mayor que la diferencia máxima entre las marcas de tiempo de los eventos duplicados.

Debe especificar una marca de agua para usar el método dropDuplicatesWithinWatermark:

Python

streamingDf = spark.readStream. ...

# deduplicate using guid column with watermark based on eventTime column
(streamingDf
  .withWatermark("eventTime", "10 hours")
  .dropDuplicatesWithinWatermark(["guid"])
)

Scala

val streamingDf = spark.readStream. ...  // columns: guid, eventTime, ...

// deduplicate using guid column with watermark based on eventTime column
streamingDf
  .withWatermark("eventTime", "10 hours")
  .dropDuplicatesWithinWatermark(Seq("guid"))

Ejemplos de casos de uso

En los ejemplos siguientes se muestran casos de uso avanzados de ventanas:

Usar ventanas de saltos de tamaño constante para calcular los totales de ventas por hora

Las ventanas contiguas son de tamaño fijo y tienen intervalos no superpuestos. Cada fila de entrada pertenece exactamente a una ventana. Utilice ventanas contiguas para calcular agregaciones discretas por intervalos de tiempo, como los totales de ventas por hora:

Python

from pyspark.sql.functions import window, sum

hourly_sales = (orders
  .withWatermark("timestamp", "1 hour")
  .groupBy(window("timestamp", "1 hour"))
  .agg(sum("amount").alias("total_sales"))
)

Scala

import org.apache.spark.sql.functions.{window, sum}

val hourlySales = orders
  .withWatermark("timestamp", "1 hour")
  .groupBy(window($"timestamp", "1 hour"))
  .agg(sum($"amount").alias("total_sales"))

En este ejemplo:

  • window("timestamp", "1 hour") agrupa los pedidos en intervalos de 1 hora no superpuestos, como de 5 a 6 am y de 6 a 7 a. m.
  • withWatermark("timestamp", "1 hour") mantiene la agregación de cada ventana en el estado hasta que la marca de tiempo de finalización de la ventana sea 1 hora anterior a la marca de tiempo máxima del pedido.

Usar ventanas deslizantes para calcular agregaciones móviles

Las ventanas deslizantes son de tamaño fijo con intervalos que se pueden superponer. Una sola fila puede pertenecer a varias ventanas. Use ventanas deslizantes para calcular agregados graduales, como las ventas durante un período gradual de 6 horas:

Python

from pyspark.sql.functions import window, sum

rolling_sales = (orders
  .withWatermark("timestamp", "1 hour")
  .groupBy(window("timestamp", "6 hours", slideDuration="1 hour"))
  .agg(sum("amount").alias("total_sales"))
)

Scala

import org.apache.spark.sql.functions.{window, sum}

val rollingSales = orders
  .withWatermark("timestamp", "1 hour")
  .groupBy(window($"timestamp", "6 hours", "1 hour"))
  .agg(sum($"amount").alias("total_sales"))

En este ejemplo:

  • window("timestamp", "6 hours", slideDuration="1 hour") agrupa los pedidos en intervalos de 6 horas que avanzan en 1 hora, por ejemplo, de 5 a 11 a. m. y de 6 a. m. a 12 p. m.
  • withWatermark("timestamp", "1 hour") mantiene la agregación de cada ventana en el estado hasta que la marca de tiempo de finalización de la ventana sea 1 hora anterior a la marca de tiempo máxima de pedido.
  • slideDurationdebe ser menor o igual que .windowDuration

Usar ventanas de sesión para comprobar la actividad del usuario

Las ventanas de sesión no tienen ningún tamaño fijo. Se abre una ventana cuando llega una fila y se cierra tras un intervalo de inactividad durante el cual no llegan filas nuevas. Use ventanas de sesión para agregar ráfagas de actividad entre períodos de inactividad largos, como las vistas de página de un usuario dentro de un período de 30 minutos:

Python

from pyspark.sql.functions import session_window, sum

sessionized_page_views = (activity
  .withWatermark("timestamp", "1 hour")
  .groupBy("user_id", session_window("timestamp", gapDuration="30 minutes"))
  .agg(sum("page_views").alias("total_page_views"))
)

Scala

import org.apache.spark.sql.functions.{session_window, sum}

val sessionizedPageViews = activity
  .withWatermark("timestamp", "1 hour")
  .groupBy($"user_id", session_window($"timestamp", "30 minutes"))
  .agg(sum($"page_views").alias("total_page_views"))

En este ejemplo:

  • session_window("timestamp", gapDuration="30 minutes") abre una ventana cuando llega la primera vista de página. Cada visualización de página posterior que se produce dentro de los 30 minutos siguientes amplía la ventana. Cuando no llega ninguna vista de página en un plazo de 30 minutos, la ventana se cierra y la vista de página siguiente inicia una nueva ventana.
  • withWatermark("timestamp", "1 hour") mantiene el agregado de cada sesión en el estado hasta que la marca de tiempo de finalización de la ventana sea 1 hora anterior a la marca de tiempo máxima de visualización de página.
  • El timeColumn argumento de window() y session_window() debe ser de TimestampType o TimestampNTZType.
  • Use current_timestamp() para definir ventanas en función del tiempo de procesamiento en lugar de la hora del evento.
  • Puede establecer duraciones de ventana desde microsegundos hasta días. No se admiten las duraciones del mes ni las más largas.
  • Use el modo de salida complete con agregaciones en ventanas para mantener indefinidamente todo el estado de las ventanas. Utilice el modo de salida append con una marca de agua adecuada para limitar el crecimiento del estado y evitar problemas de memoria en grandes conjuntos de datos. Para obtener más información sobre el comportamiento del modo de salida, consulte Marcas de agua y modo de salida para agregaciones con ventanas.