Remarque
L’accès à cette page nécessite une autorisation. Vous pouvez essayer de vous connecter ou de modifier des répertoires.
L’accès à cette page nécessite une autorisation. Vous pouvez essayer de modifier des répertoires.
Cette page contient des recommandations sur l’ordonnancement des charges de travail de streaming structuré à l’aide de jobs Lakeflow sur Azure Databricks. Consultez les offres d'emploi Lakeflow.
Databricks recommande de toujours configurer les éléments suivants :
- Supprimez le code inutile des notebooks qui retourne des résultats, tels que
displayetcount. - N’exécutez pas de charges de travail Structured Streaming à l’aide d’un calcul à usage unique. Planifiez toujours des flux en tant que travaux Lakeflow à l’aide du calcul des travaux.
- Planifiez les tâches Lakeflow en
Continuousmode. Il s'agit de la fonctionnalité de planification des travaux Azure Databricks, et non de l'intervalle de déclenchement du Streaming Structuré intervalle de déclenchement. - N’activez pas la mise à l’échelle automatique pour le calcul pour les travaux Structured Streaming.
Certaines charges de travail bénéficient de ce qui suit :
- Configurer le magasin d'état RocksDB sur Azure Databricks
- Réalisation d’un point de contrôle d’état asynchrone pour des requêtes avec état
- Suivi de progression asynchrone
Azure Databricks a introduit des pipelines déclaratifs Spark Lakeflow pour simplifier la gestion de l’infrastructure de production des flux de données structurés. Databricks recommande d’utiliser des pipelines déclaratifs Spark Lakeflow pour les nouveaux pipelines de streaming structuré. Consultez pipelines déclaratifs Spark Lakeflow.
Remarque
La mise à l’échelle automatique du calcul présente des limitations lorsqu'il s'agit de réduire la taille des clusters pour les charges de travail en flux structurés. Databricks recommande d’utiliser des pipelines déclaratifs Spark Lakeflow avec une mise à l’échelle automatique améliorée pour les charges de travail de streaming. Consultez Optimiser l’utilisation du cluster des pipelines déclaratifs Spark Lakeflow avec mise à l’échelle automatique.
:::note Calcul informatique sans serveur
Dans le calcul serverless, uniquement Trigger.AvailableNow() et Trigger.Once() sont pris en charge. Databricks recommande Trigger.AvailableNow().
Pour la diffusion en continu sur le calcul serverless, utilisez le mode de pipeline continu et déclenché en mode continu.
Consultez les limitations de streaming.
:::
Concevez des charges de travail de diffusion en continu pour prévoir les pannes
Databricks recommande de toujours configurer les travaux de diffusion en continu pour redémarrer automatiquement en cas d’échec. Certaines fonctionnalités, notamment l’évolution du schéma, nécessitent que les charges de travail Structured Streaming réessayent automatiquement. Consultez Configurer des travaux de Structured Streaming pour redémarrer des requêtes de diffusion en continu en cas d’échec.
Certaines opérations, telles que foreachBatch, fournissent « au moins une » garantie plutôt qu’exactement une. Pour ces opérations, veillez à ce que votre pipeline de traitement soit idempotent. Consultez Utiliser foreachBatch pour écrire dans des récepteurs de données arbitraires.
Remarque
Lorsqu’une requête redémarre, le micro-lot planifié pendant l’exécution précédente est traité. Si votre tâche a échoué en raison d’une erreur de mémoire insuffisante, ou si vous avez annulé manuellement une tâche en raison d’un micro-lot surdimensionné, vous devrez peut-être effectuer un scale-up du calcul pour traiter correctement le micro-lot.
Si vous modifiez les configurations entre les exécutions, celles-ci s’appliquent au premier nouveau lot planifié. Consultez Récupérer après des modifications dans une requête de Structured Streaming.
Quand une tâche effectue-t-elle une nouvelle tentative ?
Vous pouvez planifier plusieurs tâches dans le cadre d’un travail Azure Databricks. Lorsque vous configurez un travail à l’aide du déclencheur continu, vous ne pouvez pas définir de dépendances entre les tâches.
Vous pouvez choisir de planifier plusieurs flux dans un même travail à l’aide de l’une des approches suivantes :
- Plusieurs tâches : définissez un travail avec plusieurs tâches qui exécutent des flux de travail en continu à l’aide du déclencheur continu.
- Plusieurs requêtes : définissez plusieurs requêtes de diffusion en continu dans le code source pour une seule tâche.
Vous pouvez également combiner ces stratégies. Le tableau suivant compare ces approches.
| Stratégie | Tâches multiples | Requêtes multiples |
|---|---|---|
| Comment le calcul est-il partagé ? | Databricks vous recommande de dimensionner correctement les ressources de calcul des jobs pour chaque tâche de streaming. Si vous le souhaitez, vous pouvez partager le calcul entre les tâches. | Toutes les requêtes partagent le même calcul. Si vous le souhaitez, vous pouvez affecter des requêtes à des pools de planificateurs. |
| Comment les nouvelles tentatives sont-elles gérées ? | Toutes les tâches doivent échouer avant que le travail ne soit réessayé. | La tâche est tentée à nouveau si une requête échoue. |
Configurer des travaux de Structured Streaming pour redémarrer des requêtes de diffusion en continu en cas d’échec
Databricks vous recommande de configurer toutes les charges de travail de streaming en mode de déclenchement continu. Consultez Exécuter des travaux en continu.
Le déclencheur continu a le comportement suivant par défaut :
- Empêche l'exécution simultanée de plus d'une tâche.
- Démarre une nouvelle exécution en cas d’échec d’une précédente.
- Utilise le backoff exponentiel pour les nouvelles tentatives.
Databricks recommande de toujours utiliser le calcul des travaux au lieu du calcul à usage unique lors de la planification des workflows. Lors de l’échec du travail et de la nouvelle tentative, de nouvelles ressources de calcul sont déployées.
Remarque
Databricks recommande de ne pas utiliser streamingQuery.awaitTermination() ou spark.streams.awaitAnyTermination(). Voir Quand utiliser awaitTermination().
Quand utiliser awaitTermination()
streamingQuery.awaitTermination() et spark.streams.awaitAnyTermination() bloquez le thread actuel jusqu’à ce qu’une requête de diffusion en continu se termine. L’utilisation de ces fonctions dépend de votre environnement d’exécution.
Pour Lakeflow Jobs, n’utilisez ni streamingQuery.awaitTermination() ni spark.streams.awaitAnyTermination(). Ces fonctions ne sont pas nécessaires, car le service Jobs empêche automatiquement la fin d'une exécution lorsqu'une requête de diffusion en continu est active. Les deux fonctions bloquent la fin des cellules du bloc-notes et empêchent le service Travaux de suivre la requête de diffusion en continu, ce qui interrompt les métriques de backlog et les notifications de travail.
Utilisez awaitTermination() dans les cas suivants :
| Cas d’utilisation | Comportement |
|---|---|
| Notebooks interactifs sur le calcul polyvalent |
awaitTermination() conserve la cellule en cours d’exécution, vous permet d’observer l’état de la requête et garantit que les défaillances s’affichent dans la sortie du notebook. |
| Environnements locaux et de développement | Lors de l’exécution d’un programme Spark localement, le processus se termine lorsque le thread principal est terminé. Appelez awaitTermination() pour maintenir le programme actif jusqu’à ce que la requête de diffusion en continu se termine ou échoue. |
| Propagation de défaillance vers le pilote | Sans awaitTermination(), un échec de requête de streaming dans un contexte non professionnel peut ne pas se propager au thread appelant. La requête peut échouer silencieusement, ce qui rend les échecs plus difficiles à détecter et diagnostiquer. L’appel awaitTermination() déclenche à nouveau l’exception de requête sur le pilote. |
Utiliser des pools de planificateurs pour plusieurs requêtes de streaming
Vous pouvez configurer des pools de planificateurs pour affecter la capacité de calcul aux requêtes lors de l’exécution de plusieurs requêtes de streaming à partir du même code source.
Par défaut, toutes les requêtes démarrées dans un notebook s’exécutent dans le même pool de planification équitable. Les tâches Apache Spark générées par des déclencheurs à partir de toutes les requêtes de diffusion en continu dans un notebook s’exécutent l’une après l’autre dans l’ordre FIFO (premier entré, premier sorti). Cela peut entraîner des retards inutiles dans les requêtes, car ils ne partagent pas efficacement les ressources du cluster.
Les pools de planificateurs vous permettent de déclarer les requêtes de Structured Streaming qui partagent des ressources de calcul.
L’exemple suivant affecte à query1 un pool dédié, tandis que query2 et query3 partagent un pool de planificateurs.
# Run streaming query1 in scheduler pool1
spark.sparkContext.setLocalProperty("spark.scheduler.pool", "pool1")
df.writeStream.queryName("query1").toTable("table1")
# Run streaming query2 in scheduler pool2
spark.sparkContext.setLocalProperty("spark.scheduler.pool", "pool2")
df.writeStream.queryName("query2").toTable("table2")
# Run streaming query3 in scheduler pool2
spark.sparkContext.setLocalProperty("spark.scheduler.pool", "pool2")
df.writeStream.queryName("query3").toTable("table3")
Remarque
La configuration de propriété locale doit se trouver dans la même cellule de bloc-notes que celle où vous démarrez votre requête de diffusion en continu.
Pour plus d’informations sur les pools de planificateurs équitables Apache, consultez la documentation du planificateur équitable Apache.