Utilização de escoamentos em oleodutos declarativos Lakeflow Spark

Os fluxos nos Pipelines Declarativos Lakeflow Spark movem dados para uma tabela de streaming ou vista materializada. Os exemplos seguintes mostram como definir fluxos por defeito, definir um fluxo separadamente do seu destino, escrever numa tabela de streaming a partir de múltiplos tópicos de Kafka, executar um backfill único e substituir UNION consultas por processamento de fluxo de adição.

Para uma visão geral dos fluxos, consulte Dados de carga e processo incrementalmente com fluxos Lakeflow Spark Declarative Pipelines.

Exemplo: Criar um fluxo padrão

Quando crias um pipeline, normalmente defines uma tabela ou vista juntamente com a consulta que o suporta. Por exemplo, esta consulta cria uma tabela de streaming nomeada customers_silver por leitura de customers_bronze. A tabela de streaming e o seu fluxo padrão são criados juntos num único passo.

SQL

CREATE OR REFRESH STREAMING TABLE customers_silver
AS SELECT * FROM STREAM(customers_bronze)

Python

from pyspark import pipelines as dp

@dp.table()
def customers_silver():
  return spark.readStream.table("customers_bronze")

O fluxo padrão para uma tabela de streaming é um fluxo de anexação que adiciona novas linhas a cada atualização, e tem o mesmo nome do destino. Esta é a forma mais comum de usar pipelines — criar um fluxo e o seu destino numa única etapa — e pode usá-la para ingerir ou transformar dados. Para mais informações sobre conceitos de fluxo, consulte Dados de carga e processo incrementais com fluxos Lakeflow Spark Declarative Pipelines.

Exemplo: Defina um fluxo separadamente do seu alvo

Também pode criar um fluxo para uma tabela que definiu separadamente. O resultado é idêntico a criar um fluxo por defeito, incluindo usar o mesmo nome para a tabela de streaming e o fluxo:

Python

from pyspark import pipelines as dp

# create streaming table
dp.create_streaming_table("customers_silver")

# add a flow
@dp.append_flow(
  target = "customers_silver")
def customer_silver():
  return spark.readStream.table("customers_bronze")

SQL

-- create a streaming table
CREATE OR REFRESH STREAMING TABLE customers_silver;

-- add a flow
CREATE FLOW customers_silver
AS INSERT INTO customers_silver BY NAME
SELECT * FROM STREAM(customers_bronze);

Definir um fluxo separadamente do seu destino permite criar múltiplos fluxos que adicionam dados ao mesmo destino. Use o @dp.append_flow decorador na interface Python ou a CREATE FLOW...INSERT INTO cláusula na interface SQL para adicionar fluxos para tarefas como as seguintes:

Para consultas Python, use a função create_streaming_table() para criar uma tabela de destino.

Importante

  • Se você precisar definir restrições de qualidade de dados com expectativas, defina as expectativas na tabela de destino como parte da função ou em uma definição de create_streaming_table() tabela existente. Não é possível definir expectativas na @append_flow definição.
  • Os fluxos são identificados por um nome de fluxo, e esse nome é usado para identificar pontos de verificação de streaming. O uso do nome do fluxo para identificar o ponto de verificação significa o seguinte:
    • Se um fluxo existente numa canalização for renomeado, o ponto de controlo não será transferido e o fluxo renomeado será, efetivamente, um fluxo totalmente novo.
    • Não é possível reutilizar um nome de fluxo em um pipeline, porque o ponto de verificação existente não corresponderá à nova definição de fluxo.

Exemplo: Escrever numa tabela de streaming a partir de vários tópicos do Kafka

O exemplo a seguir cria uma tabela de streaming nomeada kafka_target e escreve nessa tabela de streaming a partir de dois tópicos do Kafka:

Python

from pyspark import pipelines as dp

dp.create_streaming_table("kafka_target")

# Kafka stream from multiple topics
@dp.append_flow(target = "kafka_target")
def topic1():
  return (
    spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "host1:port1,...")
      .option("subscribe", "topic1")
      .load()
  )

@dp.append_flow(target = "kafka_target")
def topic2():
  return (
    spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "host1:port1,...")
      .option("subscribe", "topic2")
      .load()
  )

SQL

CREATE OR REFRESH STREAMING TABLE kafka_target;

CREATE FLOW
  topic1
AS INSERT INTO
  kafka_target BY NAME
SELECT * FROM
  read_kafka(bootstrapServers => 'host1:port1,...', subscribe => 'topic1');

CREATE FLOW
  topic2
AS INSERT INTO
  kafka_target BY NAME
SELECT * FROM
  read_kafka(bootstrapServers => 'host1:port1,...', subscribe => 'topic2');

Para saber mais sobre a função valor tabela read_kafka() usada nas consultas SQL, consulte read_kafka na referência da linguagem SQL.

Em Python, você pode criar programaticamente vários fluxos que visam uma única tabela. O exemplo a seguir mostra esse padrão para uma lista de tópicos de Kafka.

Observação

Esse padrão tem os mesmos requisitos que usar um for loop para criar tabelas. Você deve passar explicitamente um valor Python para a função que define o fluxo. Consulte Criar tabelas em um for loop.

from pyspark import pipelines as dp

dp.create_streaming_table("kafka_target")

topic_list = ["topic1", "topic2", "topic3"]

for topic_name in topic_list:

  @dp.append_flow(target = "kafka_target", name=f"{topic_name}_flow")
  def topic_flow(topic=topic_name):
    return (
      spark.readStream
        .format("kafka")
        .option("kafka.bootstrap.servers", "host1:port1,...")
        .option("subscribe", topic)
        .load()
    )

Exemplo: Executar um preenchimento de dados único

Se desejar executar uma consulta para acrescentar dados a uma tabela de streaming existente, use append_flow.

Depois de anexar um conjunto de dados existentes, você tem várias opções:

  • Se desejar que a consulta acrescente novos dados se eles chegarem no diretório de backfill, mantenha a consulta no lugar.
  • Se quiseres que isto seja um backfill único e que não seja executado de novo, remove a consulta depois de executar o pipeline uma vez.
  • Se desejar que a consulta seja executada uma vez e só seja executada novamente nos casos em que os dados estão sendo totalmente atualizados, defina o once parâmetro como True no fluxo de acréscimo. Em SQL, use INSERT INTO ONCE.

Os exemplos a seguir executam uma consulta para acrescentar dados históricos a uma tabela de streaming:

Python

from pyspark import pipelines as dp

@dp.table()
def csv_target():
  return spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format","csv")
    .load("path/to/sourceDir")

@dp.append_flow(
  target = "csv_target",
  once = True)
def backfill():
  return spark.read
    .format("cloudFiles")
    .option("cloudFiles.format","csv")
    .load("path/to/backfill/data/dir")

SQL

CREATE OR REFRESH STREAMING TABLE csv_target
AS SELECT * FROM
  read_files(
    "path/to/sourceDir",
    "csv"
  );

CREATE FLOW
  backfill
AS INSERT INTO ONCE
  csv_target BY NAME
SELECT * FROM
  read_files(
    "path/to/backfill/data/dir",
    "csv"
  );

Para obter um exemplo mais detalhado, consulte Reabastecimento de dados históricos usando pipelines.

Exemplo: Utilize processamento de fluxo de adição em vez de UNION

Em vez de usar uma consulta com uma UNION cláusula, pode-se usar consultas de fluxo de anexação para combinar várias fontes e gravar em uma única tabela de streaming. Utilizar consultas de fluxo de acréscimo em vez de UNION permite anexar a uma tabela de streaming a partir de múltiplas fontes sem realizar uma atualização completa.

O exemplo Python a seguir inclui uma consulta que combina várias fontes de dados com uma UNION cláusula:

@dp.create_table(name="raw_orders")
def unioned_raw_orders():
  raw_orders_us = (
    spark.readStream
      .format("cloudFiles")
      .option("cloudFiles.format", "csv")
      .load("/path/to/orders/us")
  )

  raw_orders_eu = (
    spark.readStream
      .format("cloudFiles")
      .option("cloudFiles.format", "csv")
      .load("/path/to/orders/eu")
  )

  return raw_orders_us.union(raw_orders_eu)

Os exemplos a seguir substituem a UNION consulta por consultas de fluxo de acréscimo:

Python

dp.create_streaming_table("raw_orders")

@dp.append_flow(target="raw_orders")
def raw_orders_us():
  return spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "csv")
    .load("/path/to/orders/us")

@dp.append_flow(target="raw_orders")
def raw_orders_eu():
  return spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "csv")
    .load("/path/to/orders/eu")

# Additional flows can be added without the full refresh that a UNION query would require:
@dp.append_flow(target="raw_orders")
def raw_orders_apac():
  return spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "csv")
    .load("/path/to/orders/apac")

SQL

CREATE OR REFRESH STREAMING TABLE raw_orders;

CREATE FLOW
  raw_orders_us
AS INSERT INTO
  raw_orders BY NAME
SELECT * FROM
  STREAM read_files(
    "/path/to/orders/us",
    format => "csv"
  );

CREATE FLOW
  raw_orders_eu
AS INSERT INTO
  raw_orders BY NAME
SELECT * FROM
  STREAM read_files(
    "/path/to/orders/eu",
    format => "csv"
  );

-- Additional flows can be added without the full refresh that a UNION query would require:
CREATE FLOW
  raw_orders_apac
AS INSERT INTO
  raw_orders BY NAME
SELECT * FROM
  STREAM read_files(
    "/path/to/orders/apac",
    format => "csv"
  );

Exemplo: Usar transformWithState para monitorizar batimentos cardíacos de sensores

O exemplo seguinte mostra um processador com estado persistente que lê de Kafka e verifica que os sensores estão a emitir batimentos cardíacos periodicamente. Se um batimento cardíaco não for recebido dentro de 5 minutos, o processador emite uma entrada na tabela Delta alvo para análise.

Para mais informações sobre a construção de aplicações com estado personalizadas, veja Construir uma aplicação com estado personalizada.

Observação

O RocksDB é o fornecedor de estado predefinido a partir do Databricks Runtime 17.2. Se a consulta falhar devido a uma exceção de fornecedor não suportada, adicione as seguintes configurações do pipeline, realize uma atualização completa ou reinício do checkpoint e depois execute novamente o pipeline:

"configuration": {
    "spark.sql.streaming.stateStore.providerClass": "com.databricks.sql.streaming.state.RocksDBStateStoreProvider",
    "spark.sql.streaming.stateStore.rocksdb.changelogCheckpointing.enabled": "true"
}
from typing import Iterator

import pandas as pd

from pyspark import pipelines as dp
from pyspark.sql.functions import col, from_json
from pyspark.sql.streaming import StatefulProcessor, StatefulProcessorHandle
from pyspark.sql.types import StructType, StructField, LongType, StringType, TimestampType

KAFKA_TOPIC = "<your-kafka-topic>"

output_schema = StructType([
    StructField("sensor_id", LongType(), False),
    StructField("sensor_type", StringType(), False),
    StructField("last_heartbeat_time", TimestampType(), False)])

class SensorHeartbeatProcessor(StatefulProcessor):
    def init(self, handle: StatefulProcessorHandle) -> None:
        # Define state schema to store sensor information (sensor_id is the grouping key)
        state_schema = StructType([
            StructField("sensor_type", StringType(), False),
            StructField("last_heartbeat_time", TimestampType(), False)])
        self.sensor_state = handle.getValueState("sensorState", state_schema)
        # State variable to track the previously registered timer
        timer_schema = StructType([StructField("timer_ts", LongType(), False)])
        self.timer_state = handle.getValueState("timerState", timer_schema)
        self.handle = handle

    def handleInputRows(self, key, rows, timerValues) -> Iterator[pd.DataFrame]:
        # Process one row from input and update state
        pdf = next(rows)
        row = pdf.iloc[0]
        # Store or update the sensor information in state using current timestamp
        current_time = pd.Timestamp(timerValues.getCurrentProcessingTimeInMs(), unit='ms')
        self.sensor_state.update((
            row["sensor_type"],
            current_time
        ))

        # Delete old timer if already registered
        if self.timer_state.exists():
            old_timer = self.timer_state.get()[0]
            self.handle.deleteTimer(old_timer)

        # Register a timer for 5 minutes from current processing time
        expiry_time = timerValues.getCurrentProcessingTimeInMs() + (5 * 60 * 1000)
        self.handle.registerTimer(expiry_time)
        # Store the new timer timestamp in state
        self.timer_state.update((expiry_time,))

        # No output on input processing, output only on timer expiry
        return iter([])

    def handleExpiredTimer(self, key, timerValues, expiredTimerInfo) -> Iterator[pd.DataFrame]:
        # Emit output row based on state store
        if self.sensor_state.exists():
            state = self.sensor_state.get()
            output = pd.DataFrame({
                "sensor_id": [key[0]],  # Use grouping key as sensor_id
                "sensor_type": [state[0]],
                "last_heartbeat_time": [state[1]]
            })
            # Remove the entry for the sensor from the state store
            self.sensor_state.clear()
            # Remove the timer state entry
            self.timer_state.clear()
            yield output

    def close(self) -> None:
        pass

dp.create_streaming_table("sensorAlerts")

# Define the schema for the Kafka message value
sensor_schema = StructType([
    StructField("sensor_id", LongType(), False),
    StructField("sensor_type", StringType(), False),
    StructField("sensor_value", LongType(), False)])

@dp.append_flow(target = "sensorAlerts")
def kafka_delta_flow():
    return (
      spark.readStream
        .format("kafka")
        .option("subscribe", KAFKA_TOPIC)
        .option("startingOffsets", "earliest")
        .load()
        .select(from_json(col("value").cast("string"), sensor_schema).alias("data"), col("timestamp"))
        .select("data.*", "timestamp")
        .withWatermark('timestamp', '1 hour')
        .groupBy(col("sensor_id"))
        .transformWithStateInPandas(
          statefulProcessor = SensorHeartbeatProcessor(),
          outputStructType = output_schema,
          outputMode = 'update',
          timeMode = 'ProcessingTime'))