Puntos de control de Structured Streaming

Los puntos de control y los registros de escritura anticipada funcionan juntos para proporcionar garantías de procesamiento para cargas de trabajo de Structured Streaming. El punto de control realiza un seguimiento de la información que identifica la consulta, incluida la información de estado y los registros procesados. Al eliminar los archivos de un directorio de punto de control o cambiar a una nueva ubicación de punto de control, la siguiente ejecución de la consulta comienza como nueva.

Un directorio de punto de control contiene lo siguiente:

  • Desplazamientos: los desplazamientos de origen procesados en cada microproceso. Esto permite que la consulta se reanude exactamente desde donde se dejó sin volver a procesar los datos.
  • Confirmaciones: un registro de los microprocesos que se han confirmado en el receptor, lo que permite la semántica exactamente una vez.
  • Estado: Para las consultas con estado (agregaciones, combinaciones de flujos, desduplicación y operadores estado personalizados como transformWithState), el punto de control almacena metadatos sobre el operador estado, el esquema de estado y el contenido del almacén de estado guardado en el punto de control gestionado por el proveedor de almacenamiento de estado.
  • Metadatos: el identificador de consulta único que se usa para identificar la consulta. Las opciones de configuración se almacenan como parte del registro de desplazamiento.

Cada consulta debe tener una ubicación de punto de control diferente. Nunca use la misma ubicación para varias consultas.

Nota:

En este artículo se describen los puntos de control de Structured Streaming para las consultas de streaming. Para obtener información sobre el uso de DataFrame.checkpoint() con los volúmenes del Catálogo de Unity para truncar los planes de ejecución de los DataFrames que no son de streaming, consulte Puntos de control de DataFrame en volúmenes.

Habilitar puntos de control para consultas de Structured Streaming

Debe especificar la opción checkpointLocation antes de ejecutar una consulta de streaming, como en el ejemplo siguiente:

Python

(df.writeStream
  .option("checkpointLocation", "/Volumes/catalog/schema/volume/path")
  .toTable("catalog.schema.table")
)

Scala

df.writeStream
  .option("checkpointLocation", "/Volumes/catalog/schema/volume/path")
  .toTable("catalog.schema.table")

Nota:

Algunos receptores, como la salida de display() en cuadernos y el receptor memory, generan automáticamente una ubicación de punto de control temporal si se omite esta opción. Las ubicaciones temporales de puntos de control no garantizan la tolerancia a errores ni la coherencia de los datos y es posible que no se limpien correctamente. Databricks recomienda especificar siempre una ubicación de punto de control para estos receptores.

Recuperación después de realizar cambios en una consulta de Structured Streaming

Hay limitaciones sobre los cambios permitidos en una consulta de streaming entre reinicios desde la misma localización del punto de control.

Los cambios que generalmente requieren un nuevo punto de control incluyen el número o tipo de fuentes de entrada, los temas de Kafka suscritos, las rutas de Auto Loader, los tipos de operaciones con estado, el esquema de estado y el tipo de destino de salida.

Los cambios que generalmente son seguros incluyen agregar o quitar filtros, cambiar los límites de tasa, intervalos de activación y actualizar la lógica de la función definida por el usuario dentro de mapGroupsWithState (aunque la semántica puede cambiar).

En la sección siguiente se describen los cambios que no están permitidos o el efecto del cambio no está bien definido, donde:

  • El término permitido significa que puede realizar el cambio especificado, pero si la semántica de su efecto está bien definida depende de la consulta y del cambio.
  • El término no permitido significa que no debe realizar el cambio especificado, ya que es probable que la consulta reiniciada tenga errores impredecibles.
  • sdf representa un dataframe/conjunto de datos de streaming generado con sparkSession.readStream.

Tipos de cambios en las consultas de Structured Streaming

  • Cambios en el número o tipo de orígenes de entrada: esto no se permite de forma predeterminada porque Structured Streaming identifica los orígenes por su posición en el plan de consulta. Si activa la nomenclatura de origen, puede reordenar los orígenes existentes y agregar nuevos orígenes sin empezar desde un punto de control nuevo. Consulte Cambiar fuentes de streaming con evolución de fuentes.

  • Cambios en los parámetros de los orígenes de entrada: indica si esto se permite y si la semántica del cambio está bien definida depende del origen y de la consulta, incluidos los controles de admisión como maxFilesPerTrigger o maxOffsetsPerTrigger. Estos son algunos ejemplos:

    • Se permiten la adición, eliminación y modificación de los límites de velocidad:

      spark.readStream.format("kafka").option("subscribe", "article")
      

      to

      spark.readStream.format("kafka").option("subscribe", "article").option("maxOffsetsPerTrigger", ...)
      

      Para más información, consulte Configuración del tamaño del lote de Structured Streaming en Azure Databricks.

    • Por lo general, no se permiten cambios en los artículos y archivos suscritos, ya que los resultados son impredecibles: spark.readStream.format("kafka").option("subscribe", "article")a spark.readStream.format("kafka").option("subscribe", "newarticle")

  • Cambios en el intervalo del desencadenador: puede cambiar los desencadenadores entre lotes incrementales e intervalos de tiempo. Vea Cambiar los intervalos de activación entre ejecuciones.

  • Cambios en el tipo de receptor de salida: se permiten cambios entre algunas combinaciones específicas de receptores. Esto debe comprobarse caso a caso. Estos son algunos ejemplos.

    • Se permite el paso del receptor de archivos al receptor de Kafka. Kafka solo verá los datos nuevos.
    • No se permite el paso del receptor de Kafka al receptor de archivos.
    • Se permite cambiar el sink de Kafka a foreach, o viceversa.
  • Cambios en los parámetros del receptor de salida: si se permite y si la semántica del cambio está bien definida dependen del receptor y de la consulta. Estos son algunos ejemplos.

    • No se permiten cambios en el directorio de salida de un receptor de archivos: sdf.writeStream.format("parquet").option("path", "/somePath") a sdf.writeStream.format("parquet").option("path", "/anotherPath")
    • Se permiten cambios en el tema de salida: sdf.writeStream.format("kafka").option("topic", "topic1") a sdf.writeStream.format("kafka").option("topic", "topic2")
    • Se permiten cambios en el sumidero foreach definido por el usuario (es decir, el código ForeachWriter), pero la semántica del cambio depende del código.
  • Cambios en las operaciones similares a proyección, filtro o mapa: algunos casos están permitidos. Por ejemplo:

    • Se permite la adición o eliminación de filtros: sdf.selectExpr("a") a sdf.where(...).selectExpr("a").filter(...).
    • Se permiten cambios en las proyecciones con el mismo esquema de salida: sdf.selectExpr("stringColumn AS json").writeStream a sdf.select(to_json(...).as("json")).writeStream.
    • Los cambios en las proyecciones con un esquema de salida diferente se permiten condicionalmente: sdf.selectExpr("a").writeStream a sdf.selectExpr("b").writeStream solo se permite si el receptor de salida permite el cambio de esquema de "a" a "b".
  • Cambios en operaciones con estado: algunas operaciones de las consultas de streaming deben mantener los datos de estado para poder actualizar continuamente el resultado. Structured Streaming crea automáticamente puntos de comprobación de los datos de estado en el almacenamiento tolerante a errores (por ejemplo, DBFS o Azure Blob Storage) y los restaura después del reinicio. Sin embargo, esto supone que el esquema de los datos de estado sigue siendo el mismo en los distintos reinicios, lo que significa que no se permiten cambios (es decir, adiciones, eliminaciones o modificaciones de esquema) en las operaciones con estado de una consulta de streaming entre reinicios. Esta es la lista de operaciones con estado cuyo esquema no debe cambiarse entre reinicios para garantizar la recuperación de estado:

    • Agregación de streaming: por ejemplo, sdf.groupBy("a").agg(...). No se permite ningún cambio en el número o tipo de claves o agregados de agrupación.
    • Deduplicación de streaming: por ejemplo, sdf.dropDuplicates("a"). No se permite ningún cambio en el número o tipo de claves o agregados de agrupación.
    • Combinación flujo-flujo; por ejemplo, sdf1.join(sdf2, ...) (es decir, ambas entradas se generan con sparkSession.readStream). No se permiten cambios en el esquema ni en las columnas de igualdad. No se permiten cambios en el tipo de combinación (externo o interno). Otros cambios en la condición de combinación están mal definidos.
    • Operación arbitraria con estado: por ejemplo, sdf.groupByKey(...).mapGroupsWithState(...) o sdf.groupByKey(...).flatMapGroupsWithState(...). No se permite ningún cambio en el esquema del estado definido por el usuario y el tipo de tiempo de espera. Se permite cualquier cambio dentro de la función de asignación de estado definida por el usuario, pero el efecto semántico del cambio depende de la lógica definida por el usuario. Si realmente desea admitir cambios de esquema de estado, puede codificar o descodificar explícitamente las estructuras de datos de estado complejo en bytes mediante un esquema de codificación/descodificación que admita la migración de esquemas. Por ejemplo, si guarda el estado como bytes codificados en Avro, puede cambiar el esquema de estado de Avro entre reinicios de consulta, lo que restaura el estado binario.

Importante

dropDuplicates() y dropDuplicatesWithinWatermark(), operadores con estado, pueden no reiniciarse debido a una comprobación de compatibilidad de esquemas de estado al cambiar entre los modos de acceso de computación.

Se permite cambiar entre modos de acceso dedicado y sin aislamiento. Se permite cambiar entre los modos de acceso estándar y sin servidor. No intente cambiar entre otras combinaciones de modo de acceso.

Para evitar este error, no cambie el modo de acceso de cómputo para las consultas de streaming que contienen estos operadores.

Cambio de orígenes de streaming con la evolución del origen

De forma predeterminada, Structured Streaming identifica los orígenes por su posición en el plan de consulta, como 0, 1, 2, etc. Cualquier cambio en el número o el orden de los orígenes de entrada interrumpe la compatibilidad del punto de comprobación y requiere un punto de control nuevo. La evolución del origen permite asignar nombres estables definidos por el usuario a cada origen de streaming para que pueda reordenar, agregar o quitar orígenes de una consulta sin perder el estado del punto de comprobación.

La evolución de la fuente requiere Databricks Runtime 18.2 o versiones posteriores.

Configuración necesaria

Para habilitar la evolución del origen, establezca dos configuraciones de Spark:

  • spark.sql.streaming.queryEvolution.enableSourceEvolution: cuando true, todos los orígenes de streaming de la consulta deben denominarse explícitamente mediante la .name() API. El valor predeterminado es false.
  • spark.sql.streaming.offsetLog.formatVersion: debe establecerse en 2 para usar el formato de seguimiento de desplazamiento basado en nombres. El valor predeterminado es 1.

Establezca ambas configuraciones antes de definir la consulta de streaming:

spark.conf.set("spark.sql.streaming.queryEvolution.enableSourceEvolution", "true")
spark.conf.set("spark.sql.streaming.offsetLog.formatVersion", "2")

Reglas de nomenclatura

  • Los nombres solo deben contener caracteres alfanuméricos y caracteres de subrayado ([a-zA-Z0-9_]+).
  • Cada nombre de origen debe ser único dentro de una consulta.
  • Cuando la evolución del origen está habilitada, cada origen de streaming debe tener un nombre. Las fuentes sin nombre provocan el error UNNAMED_STREAMING_SOURCES_WITH_ENFORCEMENT.

Reordenar, agregar y quitar orígenes

Los siguientes cambios son seguros en los reinicios de consulta con el mismo punto de control:

  • Reordenar orígenes: reinicie la consulta con un orden diferente de orígenes. Cada fuente se reanuda desde su último offset confirmado según su nombre y no cambia el estado del punto de control.
  • Agregar nuevos orígenes: reinicie la consulta con un nuevo origen. Las fuentes nuevas se procesan desde el inicio y las fuentes existentes continúan desde sus últimos offsets.
  • Quitar orígenes: reinicie la consulta sin el origen. La fuente se elimina permanentemente del punto de control. No se puede volver a agregar un origen quitado con el mismo nombre.

Example

Use .name() en DataStreamReader antes de llamar a .load() o .table():

Python

orders_us = (spark.readStream
  .name("orders_us")
  .table("catalog.schema.orders_us")
)

orders_eu = (spark.readStream
  .name("orders_eu")
  .table("catalog.schema.orders_eu")
)

all_orders = orders_us.union(orders_eu)

Scala

val ordersUS = spark.readStream
  .name("orders_us")
  .table("catalog.schema.orders_us")

val ordersEU = spark.readStream
  .name("orders_eu")
  .table("catalog.schema.orders_eu")

val allOrders = ordersUS.union(ordersEU)

Limitaciones

  • La nomenclatura de origen requiere un punto de control nuevo. No se puede habilitar la evolución de la fuente con un punto de control existente que utilice el formato de registro de offsets V1.
  • Tras actualizar al formato de registro offset V2, no se puede volver a la V1. Consulte Configuración necesaria.
  • Los nombres de fuente son permanentes. Para cambiar el nombre de un origen, quítelo y, a continuación, agréguelo con un nombre nuevo. Se ha cambiado el nombre de los procesos de origen desde el principio.