Nota
O acesso a esta página requer autorização. Pode tentar iniciar sessão ou alterar os diretórios.
O acesso a esta página requer autorização. Pode tentar alterar os diretórios.
Os fluxos nos Pipelines Declarativos Lakeflow Spark movem dados para uma tabela de streaming ou vista materializada. Os exemplos seguintes mostram como definir fluxos por defeito, definir um fluxo separadamente do seu destino, escrever numa tabela de streaming a partir de múltiplos tópicos de Kafka, executar um backfill único e substituir UNION consultas por processamento de fluxo de adição.
Para uma visão geral dos fluxos, consulte Dados de carga e processo incrementalmente com fluxos Lakeflow Spark Declarative Pipelines.
Exemplo: Criar um fluxo padrão
Quando crias um pipeline, normalmente defines uma tabela ou vista juntamente com a consulta que o suporta. Por exemplo, esta consulta cria uma tabela de streaming nomeada customers_silver por leitura de customers_bronze. A tabela de streaming e o seu fluxo padrão são criados juntos num único passo.
SQL
CREATE OR REFRESH STREAMING TABLE customers_silver
AS SELECT * FROM STREAM(customers_bronze)
Python
from pyspark import pipelines as dp
@dp.table()
def customers_silver():
return spark.readStream.table("customers_bronze")
O fluxo padrão para uma tabela de streaming é um fluxo de anexação que adiciona novas linhas a cada atualização, e tem o mesmo nome do destino. Esta é a forma mais comum de usar pipelines — criar um fluxo e o seu destino numa única etapa — e pode usá-la para ingerir ou transformar dados. Para mais informações sobre conceitos de fluxo, consulte Dados de carga e processo incrementais com fluxos Lakeflow Spark Declarative Pipelines.
Exemplo: Defina um fluxo separadamente do seu alvo
Também pode criar um fluxo para uma tabela que definiu separadamente. O resultado é idêntico a criar um fluxo por defeito, incluindo usar o mesmo nome para a tabela de streaming e o fluxo:
Python
from pyspark import pipelines as dp
# create streaming table
dp.create_streaming_table("customers_silver")
# add a flow
@dp.append_flow(
target = "customers_silver")
def customer_silver():
return spark.readStream.table("customers_bronze")
SQL
-- create a streaming table
CREATE OR REFRESH STREAMING TABLE customers_silver;
-- add a flow
CREATE FLOW customers_silver
AS INSERT INTO customers_silver BY NAME
SELECT * FROM STREAM(customers_bronze);
Definir um fluxo separadamente do seu destino permite criar múltiplos fluxos que adicionam dados ao mesmo destino. Use o @dp.append_flow decorador na interface Python ou a CREATE FLOW...INSERT INTO cláusula na interface SQL para adicionar fluxos para tarefas como as seguintes:
- Adicione fontes de streaming que acrescentam dados a uma tabela de streaming existente sem exigir uma atualização completa. Por exemplo, você pode ter uma tabela combinando dados regionais de cada região em que opera. À medida que novas regiões são distribuídas, você pode adicionar os novos dados de região à tabela sem executar uma atualização completa. Ver exemplo: Escrever para uma tabela de streaming a partir de múltiplos tópicos de Kafka.
- Atualize uma tabela de streaming anexando dados históricos ausentes (backfilling). Podes usar a
INSERT INTO ONCEsintaxe para criar um backfill histórico que seja executado uma única vez. Veja o exemplo: Execute um backfill de dados único e um backfill de dados históricos com pipelines. - Combine dados de várias fontes e grave numa tabela única de streaming em vez de usar a cláusula
UNIONnuma consulta. Usar o processamento de fluxo de acréscimo permite que você atualize a tabela deUNIONdestino incrementalmente sem executar uma atualização completa. Ver exemplo: Use o processamento de fluxo de anexo em vez deUNION.
Para consultas Python, use a função create_streaming_table() para criar uma tabela de destino.
Importante
- Se você precisar definir restrições de qualidade de dados com expectativas, defina as expectativas na tabela de destino como parte da função ou em uma definição de
create_streaming_table()tabela existente. Não é possível definir expectativas na@append_flowdefinição. - Os fluxos são identificados por um nome de fluxo, e esse nome é usado para identificar pontos de verificação de streaming. O uso do nome do fluxo para identificar o ponto de verificação significa o seguinte:
- Se um fluxo existente numa canalização for renomeado, o ponto de controlo não será transferido e o fluxo renomeado será, efetivamente, um fluxo totalmente novo.
- Não é possível reutilizar um nome de fluxo em um pipeline, porque o ponto de verificação existente não corresponderá à nova definição de fluxo.
Exemplo: Escrever numa tabela de streaming a partir de vários tópicos do Kafka
O exemplo a seguir cria uma tabela de streaming nomeada kafka_target e escreve nessa tabela de streaming a partir de dois tópicos do Kafka:
Python
from pyspark import pipelines as dp
dp.create_streaming_table("kafka_target")
# Kafka stream from multiple topics
@dp.append_flow(target = "kafka_target")
def topic1():
return (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,...")
.option("subscribe", "topic1")
.load()
)
@dp.append_flow(target = "kafka_target")
def topic2():
return (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,...")
.option("subscribe", "topic2")
.load()
)
SQL
CREATE OR REFRESH STREAMING TABLE kafka_target;
CREATE FLOW
topic1
AS INSERT INTO
kafka_target BY NAME
SELECT * FROM
read_kafka(bootstrapServers => 'host1:port1,...', subscribe => 'topic1');
CREATE FLOW
topic2
AS INSERT INTO
kafka_target BY NAME
SELECT * FROM
read_kafka(bootstrapServers => 'host1:port1,...', subscribe => 'topic2');
Para saber mais sobre a função valor tabela read_kafka() usada nas consultas SQL, consulte read_kafka na referência da linguagem SQL.
Em Python, você pode criar programaticamente vários fluxos que visam uma única tabela. O exemplo a seguir mostra esse padrão para uma lista de tópicos de Kafka.
Observação
Esse padrão tem os mesmos requisitos que usar um for loop para criar tabelas. Você deve passar explicitamente um valor Python para a função que define o fluxo. Consulte Criar tabelas em um for loop.
from pyspark import pipelines as dp
dp.create_streaming_table("kafka_target")
topic_list = ["topic1", "topic2", "topic3"]
for topic_name in topic_list:
@dp.append_flow(target = "kafka_target", name=f"{topic_name}_flow")
def topic_flow(topic=topic_name):
return (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,...")
.option("subscribe", topic)
.load()
)
Exemplo: Executar um preenchimento de dados único
Se desejar executar uma consulta para acrescentar dados a uma tabela de streaming existente, use append_flow.
Depois de anexar um conjunto de dados existentes, você tem várias opções:
- Se desejar que a consulta acrescente novos dados se eles chegarem no diretório de backfill, mantenha a consulta no lugar.
- Se quiseres que isto seja um backfill único e que não seja executado de novo, remove a consulta depois de executar o pipeline uma vez.
- Se desejar que a consulta seja executada uma vez e só seja executada novamente nos casos em que os dados estão sendo totalmente atualizados, defina o
onceparâmetro comoTrueno fluxo de acréscimo. Em SQL, useINSERT INTO ONCE.
Os exemplos a seguir executam uma consulta para acrescentar dados históricos a uma tabela de streaming:
Python
from pyspark import pipelines as dp
@dp.table()
def csv_target():
return spark.readStream
.format("cloudFiles")
.option("cloudFiles.format","csv")
.load("path/to/sourceDir")
@dp.append_flow(
target = "csv_target",
once = True)
def backfill():
return spark.read
.format("cloudFiles")
.option("cloudFiles.format","csv")
.load("path/to/backfill/data/dir")
SQL
CREATE OR REFRESH STREAMING TABLE csv_target
AS SELECT * FROM
read_files(
"path/to/sourceDir",
"csv"
);
CREATE FLOW
backfill
AS INSERT INTO ONCE
csv_target BY NAME
SELECT * FROM
read_files(
"path/to/backfill/data/dir",
"csv"
);
Para obter um exemplo mais detalhado, consulte Reabastecimento de dados históricos usando pipelines.
Exemplo: Utilize processamento de fluxo de adição em vez de UNION
Em vez de usar uma consulta com uma UNION cláusula, pode-se usar consultas de fluxo de anexação para combinar várias fontes e gravar em uma única tabela de streaming. Utilizar consultas de fluxo de acréscimo em vez de UNION permite anexar a uma tabela de streaming a partir de múltiplas fontes sem realizar uma atualização completa.
O exemplo Python a seguir inclui uma consulta que combina várias fontes de dados com uma UNION cláusula:
@dp.create_table(name="raw_orders")
def unioned_raw_orders():
raw_orders_us = (
spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.load("/path/to/orders/us")
)
raw_orders_eu = (
spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.load("/path/to/orders/eu")
)
return raw_orders_us.union(raw_orders_eu)
Os exemplos a seguir substituem a UNION consulta por consultas de fluxo de acréscimo:
Python
dp.create_streaming_table("raw_orders")
@dp.append_flow(target="raw_orders")
def raw_orders_us():
return spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.load("/path/to/orders/us")
@dp.append_flow(target="raw_orders")
def raw_orders_eu():
return spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.load("/path/to/orders/eu")
# Additional flows can be added without the full refresh that a UNION query would require:
@dp.append_flow(target="raw_orders")
def raw_orders_apac():
return spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.load("/path/to/orders/apac")
SQL
CREATE OR REFRESH STREAMING TABLE raw_orders;
CREATE FLOW
raw_orders_us
AS INSERT INTO
raw_orders BY NAME
SELECT * FROM
STREAM read_files(
"/path/to/orders/us",
format => "csv"
);
CREATE FLOW
raw_orders_eu
AS INSERT INTO
raw_orders BY NAME
SELECT * FROM
STREAM read_files(
"/path/to/orders/eu",
format => "csv"
);
-- Additional flows can be added without the full refresh that a UNION query would require:
CREATE FLOW
raw_orders_apac
AS INSERT INTO
raw_orders BY NAME
SELECT * FROM
STREAM read_files(
"/path/to/orders/apac",
format => "csv"
);
Exemplo: Usar transformWithState para monitorizar batimentos cardíacos de sensores
O exemplo seguinte mostra um processador com estado persistente que lê de Kafka e verifica que os sensores estão a emitir batimentos cardíacos periodicamente. Se um batimento cardíaco não for recebido dentro de 5 minutos, o processador emite uma entrada na tabela Delta alvo para análise.
Para mais informações sobre a construção de aplicações com estado personalizadas, veja Construir uma aplicação com estado personalizada.
Observação
O RocksDB é o fornecedor de estado predefinido a partir do Databricks Runtime 17.2. Se a consulta falhar devido a uma exceção de fornecedor não suportada, adicione as seguintes configurações do pipeline, realize uma atualização completa ou reinício do checkpoint e depois execute novamente o pipeline:
"configuration": {
"spark.sql.streaming.stateStore.providerClass": "com.databricks.sql.streaming.state.RocksDBStateStoreProvider",
"spark.sql.streaming.stateStore.rocksdb.changelogCheckpointing.enabled": "true"
}
from typing import Iterator
import pandas as pd
from pyspark import pipelines as dp
from pyspark.sql.functions import col, from_json
from pyspark.sql.streaming import StatefulProcessor, StatefulProcessorHandle
from pyspark.sql.types import StructType, StructField, LongType, StringType, TimestampType
KAFKA_TOPIC = "<your-kafka-topic>"
output_schema = StructType([
StructField("sensor_id", LongType(), False),
StructField("sensor_type", StringType(), False),
StructField("last_heartbeat_time", TimestampType(), False)])
class SensorHeartbeatProcessor(StatefulProcessor):
def init(self, handle: StatefulProcessorHandle) -> None:
# Define state schema to store sensor information (sensor_id is the grouping key)
state_schema = StructType([
StructField("sensor_type", StringType(), False),
StructField("last_heartbeat_time", TimestampType(), False)])
self.sensor_state = handle.getValueState("sensorState", state_schema)
# State variable to track the previously registered timer
timer_schema = StructType([StructField("timer_ts", LongType(), False)])
self.timer_state = handle.getValueState("timerState", timer_schema)
self.handle = handle
def handleInputRows(self, key, rows, timerValues) -> Iterator[pd.DataFrame]:
# Process one row from input and update state
pdf = next(rows)
row = pdf.iloc[0]
# Store or update the sensor information in state using current timestamp
current_time = pd.Timestamp(timerValues.getCurrentProcessingTimeInMs(), unit='ms')
self.sensor_state.update((
row["sensor_type"],
current_time
))
# Delete old timer if already registered
if self.timer_state.exists():
old_timer = self.timer_state.get()[0]
self.handle.deleteTimer(old_timer)
# Register a timer for 5 minutes from current processing time
expiry_time = timerValues.getCurrentProcessingTimeInMs() + (5 * 60 * 1000)
self.handle.registerTimer(expiry_time)
# Store the new timer timestamp in state
self.timer_state.update((expiry_time,))
# No output on input processing, output only on timer expiry
return iter([])
def handleExpiredTimer(self, key, timerValues, expiredTimerInfo) -> Iterator[pd.DataFrame]:
# Emit output row based on state store
if self.sensor_state.exists():
state = self.sensor_state.get()
output = pd.DataFrame({
"sensor_id": [key[0]], # Use grouping key as sensor_id
"sensor_type": [state[0]],
"last_heartbeat_time": [state[1]]
})
# Remove the entry for the sensor from the state store
self.sensor_state.clear()
# Remove the timer state entry
self.timer_state.clear()
yield output
def close(self) -> None:
pass
dp.create_streaming_table("sensorAlerts")
# Define the schema for the Kafka message value
sensor_schema = StructType([
StructField("sensor_id", LongType(), False),
StructField("sensor_type", StringType(), False),
StructField("sensor_value", LongType(), False)])
@dp.append_flow(target = "sensorAlerts")
def kafka_delta_flow():
return (
spark.readStream
.format("kafka")
.option("subscribe", KAFKA_TOPIC)
.option("startingOffsets", "earliest")
.load()
.select(from_json(col("value").cast("string"), sensor_schema).alias("data"), col("timestamp"))
.select("data.*", "timestamp")
.withWatermark('timestamp', '1 hour')
.groupBy(col("sensor_id"))
.transformWithStateInPandas(
statefulProcessor = SensorHeartbeatProcessor(),
outputStructType = output_schema,
outputMode = 'update',
timeMode = 'ProcessingTime'))