Observação
O acesso a essa página exige autorização. Você pode tentar entrar ou alterar diretórios.
O acesso a essa página exige autorização. Você pode tentar alterar os diretórios.
Esta página descreve conceitos sobre marcas d'água e apresenta recomendações para o uso de marcas d'água em operações comuns de streaming com estado.
As consultas de streaming acumulam dados de estado ao longo do tempo. As marcas d'água removem automaticamente dados de estado antigos para evitar erros de memória e aumentar a latência de processamento.
O que é uma marca d'água?
Durante o processamento, o Streaming Estruturado mantém o estado entre microlotes. As consultas de streaming usam o estado para atualizar incrementalmente os resultados em vez de recomputar tudo após cada microlote. As marcas de corte determinam o limite em que uma consulta interrompe o processamento de uma entidade de estado.
Exemplos comuns de entidades de estado incluem:
- Agregações em uma janela de tempo.
- Chaves exclusivas em uma junção entre dois fluxos.
Para definir uma marca d'água em um DataFrame de streaming, especifique um campo de timestamp e um limite de atraso. À medida que novos dados chegam, o gerenciador de estado rastreia o carimbo de data/hora mais recente no campo especificado e processa apenas registros dentro do limite de latência.
As consultas sempre processam registros que chegam dentro do limite. As consultas ainda podem processar registros que chegam fora do limite, mas isso não é garantido.
O exemplo a seguir aplica um limite de marca d'água de 10 minutos a uma contagem em janela:
Python
from pyspark.sql.functions import window
(df
.withWatermark("event_time", "10 minutes")
.groupBy(
window("event_time", "5 minutes"),
"id")
.count()
)
Scala
import org.apache.spark.sql.functions.window
df
.withWatermark("event_time", "10 minutes")
.groupBy(
window($"event_time", "5 minutes"),
$"id")
.count()
Neste exemplo:
- A coluna
event_timeé usada para definir uma marca d'água de 10 minutos e uma janela deslizante de 5 minutos. - Uma contagem é coletada para cada
idobservado, em cada janela de 5 minutos sem sobreposição. - As informações de estado são mantidas para cada contagem até que o final da janela seja 10 minutos mais antigo do que o último observado
event_time.
Importante
Em uma operação groupBy() e window(), refira-se às colunas pelo nome, "<colName>" ou col("<colName>"), para garantir que o marcador de hora do evento seja preservado. No Scala, você também pode usar $colName.
Como as marcas d'água afetam o tempo de processamento e a taxa de transferência?
Os modos de saída controlam quando uma consulta com marcas d'água grava dados no coletor. As marcas d’água são essenciais para o controle de vazão no processamento de fluxo com estado, pois reduzem a quantidade total de dados de estado na memória. Nem todos os modos de saída são compatíveis com todas as operações stateful. Consulte Marcas-d'água e modo de saída para agregações em janela.
A seleção de uma duração da marca d'água tem compensações:
- Watermarks mais curtos reduzem a latência da consulta porque as consultas armazenam menos informações de estado e gravam os resultados após o término de cada período de watermark. No entanto, marcas de tempo limite curtas têm baixa tolerância a dados que chegam com atraso.
- Marcas d'água mais longas têm alta tolerância a dados que chegam com atraso. No entanto, as marcas d'água longas aumentam a latência da consulta porque as consultas devem armazenar mais informações de estado e esperar para gravar resultados após uma duração mais longa da marca d'água.
Marcas d'água e modo de saída para agregações em janelas
A tabela a seguir mostra o comportamento do processamento para consultas com agregação sobre um carimbo de data/hora e uma marca-d’água:
| Modo de saída | Comportamento |
|---|---|
| Acrescentar | A consulta grava linhas na tabela de destino depois que o limite de watermark é ultrapassado. Todas as gravações são atrasadas com base no limite de tardança. O estado de agregação antigo é descartado depois que o limite é passado. |
| Atualizar | A consulta grava linhas na tabela de destino à medida que os resultados são calculados e a consulta pode atualizar e substituir linhas à medida que novos dados chegam. O estado de agregação antigo é descartado depois que o limite é passado. |
| Concluído | O estado de agregação não foi descartado. A consulta reescreve a tabela de destino para cada gatilho. |
Marcas de tempo d'água e modos de saída para junções entre fluxos
As junções entre vários fluxos só dão suporte ao modo de acréscimo. As consultas gravam registros correspondentes para cada lote.
Para junções internas, a Databricks recomenda que você defina um limiar de marca d'água em cada fonte de dados em streaming para permitir que a consulta descarte informações de estado de registros antigos. Sem marcas d'água, o Streaming Estruturado tenta unir todas as chaves de ambos os lados da junção em cada gatilho, o que pode afetar o desempenho.
Para junções externas, o uso de marcas d'água é obrigatório. Quando um registro é incompatível, a consulta grava um valor nulo para essa chave. Como as junções só dão suporte ao modo de acréscimo, os registros incompatíveis não são gravados até que o limite de latência passe.
Controle o limite para dados atrasados com base em uma política de múltiplas marcas d'água
Para múltiplas entradas do Structured Streaming, você pode definir múltiplos watermarks para controlar os limiares de tolerância para dados tardios. As marcas d'água permitem controlar as informações de estado e a latência.
Uma consulta de streaming pode ter vários fluxos de entrada que são unidos ou combinados. Para operações com estado, cada um dos fluxos de entrada pode exigir um limite diferente para tolerância a dados atrasados. Especifique esses limites usando withWatermark("eventTime", delay) em cada fluxo de entrada. Veja a seguir uma consulta de exemplo com junções de fluxo a fluxo.
Python
input_stream1 = ... # delays up to 1 hour
input_stream2 = ... # delays up to 2 hours
(input_stream1.withWatermark("eventTime1", "1 hour")
.join(
input_stream2.withWatermark("eventTime2", "2 hours"),
joinCondition)
)
Scala
val inputStream1 = ... // delays up to 1 hour
val inputStream2 = ... // delays up to 2 hours
inputStream1.withWatermark("eventTime1", "1 hour")
.join(
inputStream2.withWatermark("eventTime2", "2 hours"),
joinCondition)
Ao executar a consulta com operações com estado, o Structured Streaming rastreia individualmente o tempo de evento máximo de cada fluxo de entrada, calcula as marcas d’água com base no atraso correspondente e determina uma única marca d’água global. Por padrão, o Streaming Estruturado usa o valor mínimo como marca d'água global. Se um fluxo ficar defasado em relação aos demais, uma marca d'água global mínima impedirá que a consulta classifique erroneamente os dados como atrasados. Por exemplo, isso pode ocorrer quando um dos fluxos para de receber dados devido a falhas upstream. A marca d’água global avança com segurança na velocidade do fluxo mais lento e atrasa o resultado da consulta quando necessário.
Para reduzir a latência, defina spark.sql.streaming.multipleWatermarkPolicy como max (o padrão é min) para usar a marca d'água do fluxo mais rápido como a marca d'água global. No entanto, essa configuração remove dados dos fluxos mais lentos. O Databricks recomenda que você aplique essa configuração com cuidado.
Aplicar marcas d'água a operações distintas
A distinct operação rastreia todos os registros exclusivos no estado. Sem uma marca d'água, o estado cresce indefinidamente e pode causar problemas de memória. Especifique um watermark em um campo de timestamp para limitar o estado e remover registros antigos após o limite ser ultrapassado.
O exemplo a seguir aplica uma marca d'água a uma distinct operação:
Python
streamingDf = spark.readStream. ... # columns: eventTime, id, value, ...
# Apply watermark before distinct operation
(streamingDf
.withWatermark("eventTime", "1 hour")
.distinct()
)
Scala
val streamingDf = spark.readStream. ... // columns: eventTime, id, value, ...
// Apply watermark before distinct operation
streamingDf
.withWatermark("eventTime", "1 hour")
.distinct()
Neste exemplo, a consulta de streaming remove registros duplicados que chegam dentro de 1 hora da última observação eventTime. A consulta descarta informações de estado para eliminação de duplicatas após o limite ser atingido.
Importante
Para deduplicar colunas específicas em vez de todas as colunas, use dropDuplicates() ou dropDuplicatesWithinWatermark() em vez de distinct. Confira Descartar duplicatas dentro da marca d'água.
Descartar duplicatas dentro da marca d'água
No Databricks Runtime 13.3 LTS ou posterior, você pode usar um identificador exclusivo para eliminar registros duplicados dentro do limite de watermark.
O Structured Streaming garante processamento exatamente uma vez, mas não faz a deduplicação de registros de fontes de dados. Use dropDuplicatesWithinWatermark para remover duplicatas em qualquer campo, mesmo quando os campos diferem entre registros duplicados, como hora do evento ou hora de chegada.
Com dropDuplicatesWithinWatermark, as consultas sempre eliminam a duplicação de registros que chegam dentro do limite da marca d'água. As consultas também podem deduplicar registros que chegam fora do limite, mas isso não é garantido. Para garantir que as consultas removam todas as duplicatas, defina o limite de marca-d'água como maior que a diferença máxima de carimbo de data/hora entre eventos duplicados.
Você deve especificar uma marca d'água para usar o método dropDuplicatesWithinWatermark:
Python
streamingDf = spark.readStream. ...
# deduplicate using guid column with watermark based on eventTime column
(streamingDf
.withWatermark("eventTime", "10 hours")
.dropDuplicatesWithinWatermark(["guid"])
)
Scala
val streamingDf = spark.readStream. ... // columns: guid, eventTime, ...
// deduplicate using guid column with watermark based on eventTime column
streamingDf
.withWatermark("eventTime", "10 hours")
.dropDuplicatesWithinWatermark(Seq("guid"))
Exemplos de caso de uso
Os exemplos a seguir mostram casos avançados de uso de janelas:
Usar janelas em cascata para calcular totais de vendas de hora
Janelas em cascata são de tamanho fixo com intervalos não sobrepostos. Cada linha de entrada pertence a exatamente uma janela. Use janelas em cascata para calcular agregações discretas de período de tempo, como totais de vendas por hora:
Python
from pyspark.sql.functions import window, sum
hourly_sales = (orders
.withWatermark("timestamp", "1 hour")
.groupBy(window("timestamp", "1 hour"))
.agg(sum("amount").alias("total_sales"))
)
Scala
import org.apache.spark.sql.functions.{window, sum}
val hourlySales = orders
.withWatermark("timestamp", "1 hour")
.groupBy(window($"timestamp", "1 hour"))
.agg(sum($"amount").alias("total_sales"))
Neste exemplo:
-
window("timestamp", "1 hour")agrupa ordens em intervalos de 1 hora não sobrepostos, como de 5 a 6h e de 6 a 7h. -
withWatermark("timestamp", "1 hour")mantém o agregado de cada janela no estado até que o timestamp de término da janela seja 1 hora anterior ao maior timestamp do pedido.
Usar janelas deslizantes para calcular agregações móveis
Janelas deslizantes são de tamanho fixo com intervalos que podem se sobrepor. Uma única linha pode pertencer a várias janelas. Use janelas deslizantes para calcular agregações móveis, como vendas em um período móvel de 6 horas:
Python
from pyspark.sql.functions import window, sum
rolling_sales = (orders
.withWatermark("timestamp", "1 hour")
.groupBy(window("timestamp", "6 hours", slideDuration="1 hour"))
.agg(sum("amount").alias("total_sales"))
)
Scala
import org.apache.spark.sql.functions.{window, sum}
val rollingSales = orders
.withWatermark("timestamp", "1 hour")
.groupBy(window($"timestamp", "6 hours", "1 hour"))
.agg(sum($"amount").alias("total_sales"))
Neste exemplo:
-
window("timestamp", "6 hours", slideDuration="1 hour")agrupa pedidos em intervalos de 6 horas que avançam 1 hora, por exemplo, de 5 a 11h e das 6h às 12h. -
withWatermark("timestamp", "1 hour")mantém o agregado de cada janela em estado até que o timestamp de término da janela fique 1 hora mais antigo que o timestamp máximo do pedido. -
slideDurationdeve ser menor ou igual aowindowDuration.
Usar janelas de sessão para verificar a atividade do usuário
As janelas de sessão não têm tamanho fixo. Uma janela se abre quando uma linha é recebida e se fecha após um intervalo de inatividade durante o qual não chegam novas linhas. Use janelas de sessão para agregar intermitências de atividade entre longos períodos ociosos, como exibições de página de um usuário em um período de 30 minutos:
Python
from pyspark.sql.functions import session_window, sum
sessionized_page_views = (activity
.withWatermark("timestamp", "1 hour")
.groupBy("user_id", session_window("timestamp", gapDuration="30 minutes"))
.agg(sum("page_views").alias("total_page_views"))
)
Scala
import org.apache.spark.sql.functions.{session_window, sum}
val sessionizedPageViews = activity
.withWatermark("timestamp", "1 hour")
.groupBy($"user_id", session_window($"timestamp", "30 minutes"))
.agg(sum($"page_views").alias("total_page_views"))
Neste exemplo:
-
session_window("timestamp", gapDuration="30 minutes")abre uma janela quando ocorre a primeira visualização de página. Cada visualização de página subsequente que ocorra dentro de 30 minutos estende esse período. Quando nenhuma visualização de página ocorrer em até 30 minutos, a janela se fecha, e a próxima visualização de página inicia uma nova janela. -
withWatermark("timestamp", "1 hour")mantém o agregado de cada sessão no estado até que o carimbo de data/hora de término da janela esteja 1 hora atrás do carimbo de data/hora máximo de visualização de página. - O
timeColumnargumento parawindow()esession_window()deve ser deTimestampTypeouTimestampNTZType. - Use
current_timestamp()para definir janelas com base no tempo de processamento em vez da hora do evento. - Você pode definir durações de janela de microssegundos até dias. Não há suporte para durações de mês e mais tempo.
- Use
completeo modo de saída com agregações em janelas para manter todo o estado da janela indefinidamente. Use o modo de saídaappendcom um watermark apropriado para limitar o crescimento do estado e evitar problemas de memória em grandes conjuntos de dados. Para obter mais detalhes sobre o comportamento do modo de saída, consulte Marcas d'água e modo de saída para agregações em janelas.