Remarque
L’accès à cette page nécessite une autorisation. Vous pouvez essayer de vous connecter ou de modifier des répertoires.
L’accès à cette page nécessite une autorisation. Vous pouvez essayer de modifier des répertoires.
Cette page décrit les concepts de watermark et fournit des recommandations sur l’utilisation des watermarks dans les opérations courantes de streaming avec état.
Les requêtes de streaming accumulent les données d’état au fil du temps. Les filigranes suppriment automatiquement les anciennes données d’état pour empêcher les erreurs de mémoire et augmenter la latence de traitement.
Qu’est-ce qu’un filigrane ?
Pendant le traitement, Structured Streaming conserve l’état entre les micro-lots. Les requêtes de diffusion en continu utilisent l’état pour mettre à jour de façon incrémentielle les résultats au lieu de recomputer tout après chaque micro-lot. Les watermarks contrôlent le seuil à partir duquel une requête arrête de traiter une entité d’état.
Voici des exemples courants d’entités d’état :
- Agrégations sur une fenêtre de temps.
- Clés uniques dans une jointure entre deux flux.
Pour déclarer un watermark sur un DataFrame en streaming, spécifiez un champ d’horodatage et un seuil de retard. À mesure que de nouvelles données arrivent, le gestionnaire d’état effectue le suivi de l’horodatage le plus récent dans le champ spécifié et traite uniquement les enregistrements dans le seuil de latence.
Les requêtes traitent toujours les enregistrements qui parviennent dans le délai seuil. Les requêtes peuvent néanmoins traiter les enregistrements qui arrivent hors du seuil, mais ce n’est pas garanti.
L'exemple suivant applique un seuil de watermark de 10 minutes à un comptage basé sur des fenêtres de temps.
Python
from pyspark.sql.functions import window
(df
.withWatermark("event_time", "10 minutes")
.groupBy(
window("event_time", "5 minutes"),
"id")
.count()
)
Scala
import org.apache.spark.sql.functions.window
df
.withWatermark("event_time", "10 minutes")
.groupBy(
window($"event_time", "5 minutes"),
$"id")
.count()
Dans cet exemple :
- La colonne
event_timeest utilisée pour définir un marqueur temporel de 10 minutes et une fenêtre déroulante de 5 minutes. - Un décompte est collecté pour chaque
idobservé, pour chaque fenêtre de 5 minutes sans chevauchement. - L’état est conservé pour chaque décompte jusqu’à ce que la fin de la fenêtre soit antérieure de 10 minutes à la plus récente valeur observée
event_time.
Important
Dans une opération groupBy() et window(), faites référence aux colonnes par leur nom, "<colName>" ou col("<colName>"), afin de garantir que le marqueur temporel de l’événement est conservé. En Scala, vous pouvez également utiliser $colName.
Comment les filigranes affectent-ils le temps de traitement et le débit ?
Les modes de sortie contrôlent lorsqu’une requête avec des filigranes écrit des données dans le récepteur. Les watermarks sont essentiels pour le contrôle du débit de traitement dans le traitement de flux avec état, car ils réduisent la quantité totale de données d’état en mémoire. Pas tous les modes de sortie ne sont pris en charge pour toutes les opérations nécessitant un état. Consultez les Watermarks et le mode de sortie pour les agrégations par fenêtre.
Le choix d’une durée du filigrane implique des compromis :
- Les filigranes plus courts réduisent la latence des requêtes, car les requêtes stockent moins d’informations d’état et écrivent des résultats après la fin de chaque durée de filigrane. Toutefois, les filigranes courts offrent une faible tolérance aux données en retard.
- Des watermarks plus longs offrent une grande tolérance aux données arrivant en retard. Toutefois, les filigranes longs augmentent la latence des requêtes, car les requêtes doivent stocker plus d’informations d’état et attendre d’écrire des résultats après une durée de filigrane plus longue.
Filigranes et mode de sortie pour les agrégations par fenêtre
Le tableau suivant présente le comportement du traitement des requêtes avec agrégation sur un horodatage et un filigrane :
| Mode de sortie | Comportement |
|---|---|
| Ajouter | La requête écrit des lignes dans la table cible une fois le seuil de filigrane passé. Toutes les écritures sont retardées en fonction du seuil de retard. L’ancien état d’agrégation est supprimé une fois le seuil passé. |
| Mise à jour | La requête écrit des lignes dans la table cible au fur et à mesure que les résultats sont calculés, et la requête peut mettre à jour et remplacer les lignes à mesure que de nouvelles données arrivent. L’ancien état d’agrégation est supprimé une fois le seuil passé. |
| Terminé | L’état d’agrégation n’est pas supprimé. La requête réécrit la table cible pour chaque déclencheur. |
Watermarks et modes de sortie pour les jointures entre flux
Les jointures entre plusieurs flux prennent uniquement en charge le mode d’ajout. Les requêtes écrivent des enregistrements correspondants pour chaque lot.
Pour les jointures internes, Databricks recommande de définir un seuil de watermark sur chaque source de données en streaming afin de permettre à la requête de supprimer les informations d’état concernant les anciens enregistrements. Sans filigranes, Structured Streaming tente de joindre chaque clé des deux côtés de la jointure sur chaque déclencheur, ce qui peut affecter les performances.
Pour les jointures externes, la gestion par watermark est obligatoire. Lorsqu’un enregistrement n’est pas équivalent, la requête écrit une valeur Null pour cette clé. Étant donné que les jointures prennent uniquement en charge le mode d’ajout, les enregistrements sans correspondance ne sont pas écrits tant que le seuil de latence n’est pas atteint.
Régler le seuil des données en retard avec une stratégie à watermarks multiples
Pour plusieurs sources Structured Streaming, vous pouvez définir plusieurs bornes temporelles afin de gérer les seuils de tolérance pour les données en retard. Les filigranes vous permettent de contrôler les informations d’état et la latence.
Une requête de diffusion en continu peut avoir plusieurs flux d’entrée réunis ou joints. Pour les opérations avec état, chacun des flux d’entrée peut nécessiter un seuil différent pour la tolérance aux données tardives. Spécifiez ces seuils à l’aide withWatermark("eventTime", delay) de chaque flux d’entrée. Voici un exemple de requête avec des jointures de flux à flux.
Python
input_stream1 = ... # delays up to 1 hour
input_stream2 = ... # delays up to 2 hours
(input_stream1.withWatermark("eventTime1", "1 hour")
.join(
input_stream2.withWatermark("eventTime2", "2 hours"),
joinCondition)
)
Scala
val inputStream1 = ... // delays up to 1 hour
val inputStream2 = ... // delays up to 2 hours
inputStream1.withWatermark("eventTime1", "1 hour")
.join(
inputStream2.withWatermark("eventTime2", "2 hours"),
joinCondition)
Lors de l’exécution de la requête utilisant des opérations avec état, Structured Streaming suit séparément le temps d’événement maximal de chaque flux d’entrée, calcule les limites temporelles en fonction du délai correspondant et détermine une limite temporelle globale unique. Par défaut, Structured Streaming utilise le minimum comme filigrane global. Si un flux prend du retard sur les autres, un watermark global minimal empêche la requête de considérer à tort les données comme tardives. Par exemple, cela peut se produire lorsque l’un des flux cesse de recevoir des données en raison d’échecs en amont. Le filigrane global se déplace en toute sécurité au rythme du flux le plus lent et retarde la sortie de la requête si nécessaire.
Pour réduire la latence, définissez spark.sql.streaming.multipleWatermarkPolicy sur max (la valeur par défaut est min) afin d’utiliser le filigrane temporel du flux le plus rapide comme filigrane temporel global. Toutefois, cette configuration a pour effet d’annuler les données des flux les plus lents. Databricks recommande d’appliquer cette configuration avec précaution.
Appliquer des filigranes à des opérations distinctes
L’opération distinct assure le suivi de chaque enregistrement unique au sein de l’état. Sans filigrane, l’état augmente indéfiniment et peut provoquer des problèmes de mémoire. Spécifiez un watermark sur un champ d’horodatage afin de limiter l’état et de supprimer les enregistrements obsolètes une fois le seuil dépassé.
L'exemple suivant applique un filigrane à l'opération distinct :
Python
streamingDf = spark.readStream. ... # columns: eventTime, id, value, ...
# Apply watermark before distinct operation
(streamingDf
.withWatermark("eventTime", "1 hour")
.distinct()
)
Scala
val streamingDf = spark.readStream. ... // columns: eventTime, id, value, ...
// Apply watermark before distinct operation
streamingDf
.withWatermark("eventTime", "1 hour")
.distinct()
Dans cet exemple, la requête de diffusion en continu supprime les enregistrements en double qui arrivent dans un délai de 1 heure de la dernière observation.eventTime La requête supprime les informations d’état utilisées pour la déduplication une fois le seuil dépassé.
Important
Pour dédupliquer des colonnes spécifiques au lieu de toutes les colonnes, utilisez dropDuplicates() ou dropDuplicatesWithinWatermark() au lieu de distinct. Consultez la section Supprimer des doublons dans un filigrane.
Supprimer des doublons dans un filigrane
Dans Databricks Runtime 13.3 LTS ou version ultérieure, vous pouvez utiliser un identifiant unique pour dédupliquer les enregistrements dans les limites d’un seuil de watermark.
Structured Streaming garantit un traitement unique, mais ne supprime pas les doublons des enregistrements issus des sources de données. Permet dropDuplicatesWithinWatermark de supprimer des doublons sur n’importe quel champ, même lorsque les champs diffèrent entre les enregistrements dupliqués, tels que l’heure de l’événement ou l’heure d’arrivée.
Avec dropDuplicatesWithinWatermark, les requêtes dédupliquent toujours les enregistrements arrivant dans les limites du seuil de filigrane. Les requêtes peuvent également dédupliquer les enregistrements qui arrivent en dehors du seuil, mais cela n’est pas garanti. Pour garantir que les requêtes éliminent tous les doublons, définissez le seuil de watermark de manière à ce qu’il soit supérieur à la différence d’horodatage maximale entre les événements en double.
Vous devez spécifier un filigrane pour utiliser la dropDuplicatesWithinWatermark méthode :
Python
streamingDf = spark.readStream. ...
# deduplicate using guid column with watermark based on eventTime column
(streamingDf
.withWatermark("eventTime", "10 hours")
.dropDuplicatesWithinWatermark(["guid"])
)
Scala
val streamingDf = spark.readStream. ... // columns: guid, eventTime, ...
// deduplicate using guid column with watermark based on eventTime column
streamingDf
.withWatermark("eventTime", "10 hours")
.dropDuplicatesWithinWatermark(Seq("guid"))
Exemples de cas d’usage
Les exemples suivants montrent des cas d’usage avancés de fenêtrage :
Utiliser des fenêtres basculantes pour calculer les totaux des ventes horaires
Les fenêtres basculantes ont une taille fixe avec des intervalles non chevauchants. Chaque ligne d’entrée appartient exactement à une fenêtre. Utilisez des fenêtres basculantes pour calculer des agrégations sur des périodes de temps distinctes, telles que les totaux horaires des ventes :
Python
from pyspark.sql.functions import window, sum
hourly_sales = (orders
.withWatermark("timestamp", "1 hour")
.groupBy(window("timestamp", "1 hour"))
.agg(sum("amount").alias("total_sales"))
)
Scala
import org.apache.spark.sql.functions.{window, sum}
val hourlySales = orders
.withWatermark("timestamp", "1 hour")
.groupBy(window($"timestamp", "1 hour"))
.agg(sum($"amount").alias("total_sales"))
Dans cet exemple :
-
window("timestamp", "1 hour")regroupe les commandes dans des intervalles de 1 heure qui ne se chevauchent pas, tels que 5 à 6 h et 6 à 7 h. -
withWatermark("timestamp", "1 hour")conserve l’agrégat de chaque fenêtre dans l’état jusqu’à ce que l’horodatage de fin de la fenêtre soit antérieur d’une heure à l’horodatage maximal des commandes.
Utiliser des fenêtres glissantes pour calculer des agrégats glissants
Les fenêtres glissantes sont de taille fixe avec des intervalles qui peuvent se chevaucher. Une seule ligne peut appartenir à plusieurs fenêtres. Utilisez des fenêtres glissantes pour calculer des agrégats glissants, tels que les ventes sur une période glissante de 6 heures :
Python
from pyspark.sql.functions import window, sum
rolling_sales = (orders
.withWatermark("timestamp", "1 hour")
.groupBy(window("timestamp", "6 hours", slideDuration="1 hour"))
.agg(sum("amount").alias("total_sales"))
)
Scala
import org.apache.spark.sql.functions.{window, sum}
val rollingSales = orders
.withWatermark("timestamp", "1 hour")
.groupBy(window($"timestamp", "6 hours", "1 hour"))
.agg(sum($"amount").alias("total_sales"))
Dans cet exemple :
-
window("timestamp", "6 hours", slideDuration="1 hour")regroupe les commandes dans des intervalles de 6 heures qui avancent de 1 heure, par exemple, de 5 à 11 h et de 6 h à 12 h. -
withWatermark("timestamp", "1 hour")maintient l’agrégat de chaque fenêtre dans l’état jusqu’à ce que l’horodatage de fin de la fenêtre soit antérieur d’une heure à l’horodatage maximal des commandes. -
slideDurationdoit être inférieur ou égal auwindowDuration.
Utiliser des fenêtres de session pour vérifier l’activité utilisateur
Les fenêtres de session n’ont aucune taille fixe. Une fenêtre s’ouvre lorsqu’une ligne de données arrive et se ferme après une période d’inactivité pendant laquelle aucune nouvelle ligne n’arrive. Utilisez des fenêtres de session pour agréger les rafales d’activité entre des périodes d’inactivité longues, telles que les vues de page d’un utilisateur dans un délai de 30 minutes :
Python
from pyspark.sql.functions import session_window, sum
sessionized_page_views = (activity
.withWatermark("timestamp", "1 hour")
.groupBy("user_id", session_window("timestamp", gapDuration="30 minutes"))
.agg(sum("page_views").alias("total_page_views"))
)
Scala
import org.apache.spark.sql.functions.{session_window, sum}
val sessionizedPageViews = activity
.withWatermark("timestamp", "1 hour")
.groupBy($"user_id", session_window($"timestamp", "30 minutes"))
.agg(sum($"page_views").alias("total_page_views"))
Dans cet exemple :
-
session_window("timestamp", gapDuration="30 minutes")ouvre une fenêtre lorsque la première vue de page arrive. Chaque affichage de page suivant qui arrive dans les 30 minutes étend la fenêtre. Quand aucun affichage de page n’arrive dans les 30 minutes, la fenêtre se ferme et l’affichage de page suivant démarre une nouvelle fenêtre. -
withWatermark("timestamp", "1 hour")conserve l’agrégat de chaque session dans l’état jusqu’à ce que l’horodatage de fin de la fenêtre soit de 1 heure supérieure à l’horodatage maximal de l’affichage de page. - L’argument
timeColumnpourwindow()etsession_window()doit être deTimestampTypeouTimestampNTZType. - Permet
current_timestamp()de définir des fenêtres en fonction du temps de traitement plutôt que de l’heure de l’événement. - Vous pouvez définir des durées de fenêtre de microsecondes jusqu’à des jours. Les durées d’un mois ou plus ne sont pas prises en charge.
- Utilisez le mode de sortie
completeavec des agrégations par fenêtre pour conserver indéfiniment tout l’état de la fenêtre. Utilisez le mode de sortieappendavec un watermark approprié afin de limiter la croissance de l’état et d’éviter les problèmes de mémoire pour les grands jeux de données. Pour plus d’informations sur le comportement du mode de sortie, consultez Filigranes et mode de sortie pour les agrégations fenêtrées.