Mettre à jour les schémas de table avec l’évolution du schéma

Les tables prennent en charge l’évolution du schéma, ce qui permet de modifier la structure des tables à mesure que les exigences de données changent. Les types de modifications suivants sont pris en charge :

Apportez ces modifications explicitement à l’aide de DDL ou implicitement à l’aide de DML.

Important

Les mises à jour de schéma sont en conflit avec toutes les opérations d’écriture simultanées. Databricks recommande de coordonner les modifications de schéma pour éviter les conflits d’écriture.

La mise à jour d’un schéma de table met fin à tous les flux lisant à partir de cette table. Pour poursuivre le traitement, redémarrez le flux à l’aide des méthodes décrites dans les considérations de production pour Structured Streaming.

Modifications manuelles du schéma

Utilisez ALTER TABLE des instructions pour modifier explicitement le schéma d’une table sans écrire de nouvelles données.

Ajouter des colonnes

Permet ALTER TABLE ... ADD COLUMNS d’ajouter une ou plusieurs colonnes à une table existante, en spécifiant éventuellement une position et un commentaire :

ALTER TABLE table_name ADD COLUMNS (col_name data_type [COMMENT col_comment] [FIRST|AFTER colA_name], ...)

Par défaut, la nullité est true.

Exemple : Ajouter des champs imbriqués

L’ajout de colonnes imbriquées est pris en charge uniquement pour les structs. Les tableaux et les maps ne sont pas pris en charge.

Pour ajouter une colonne à un champ imbriqué, utilisez :

ALTER TABLE table_name ADD COLUMNS (col_name.nested_col_name data_type [COMMENT col_comment] [FIRST|AFTER colA_name], ...)

Par exemple, si le schéma avant l’exécution de ALTER TABLE boxes ADD COLUMNS (colB.nested STRING AFTER field1) est :

- root
| - colA
| - colB
| +-field1
| +-field2

le schéma après est :

- root
| - colA
| - colB
| +-field1
| +-nested
| +-field2

Modifier les commentaires et l’ordre des colonnes

Permet ALTER TABLE ... ALTER COLUMN de mettre à jour le commentaire d’une colonne ou de la réorganiser par rapport à d’autres colonnes :

ALTER TABLE table_name ALTER [COLUMN] col_name (COMMENT col_comment | FIRST | AFTER colA_name)

Exemple : Modifier les champs imbriqués

Pour modifier une colonne dans un champ imbriqué, utilisez :

ALTER TABLE table_name ALTER [COLUMN] col_name.nested_col_name (COMMENT col_comment | FIRST | AFTER colA_name)

Par exemple, si le schéma avant l’exécution de ALTER TABLE boxes ALTER COLUMN colB.field2 FIRST est :

- root
| - colA
| - colB
| +-field1
| +-field2

le schéma après est :

- root
| - colA
| - colB
| +-field2
| +-field1

Remplacer les colonnes

Permet ALTER TABLE ... REPLACE COLUMNS de redéfinir la liste complète des colonnes d’une table, notamment l’ajout, la suppression, la réorganisation ou le renommage de colonnes dans une seule opération :

ALTER TABLE table_name REPLACE COLUMNS (col_name1 col_type1 [COMMENT col_comment1], ...)

Exemple : Remplacer les champs imbriqués

Par exemple, lors de l’exécution de la commande DDL suivante :

ALTER TABLE boxes REPLACE COLUMNS (colC STRING, colB STRUCT<field2:STRING, nested:STRING, field1:STRING>, colA STRING)

si le schéma avant est :

- root
| - colA
| - colB
| +-field1
| +-field2

le schéma après est :

- root
| - colC
| - colB
| +-field2
| +-nested
| +-field1
| - colA

Renommer des colonnes

Pour renommer des colonnes sans réécrire les données existantes des colonnes, vous devez activer le mappage de colonnes pour la table. Voir Renommer et supprimer des colonnes avec le mappage de colonnes Delta Lake.

Pour renommer une colonne :

ALTER TABLE table_name RENAME COLUMN old_col_name TO new_col_name

Exemple : Renommer des champs imbriqués

Pour renommer un champ imbriqué :

ALTER TABLE table_name RENAME COLUMN col_name.old_nested_field TO new_nested_field

Par exemple, lorsque vous exécutez la commande suivante :

ALTER TABLE boxes RENAME COLUMN colB.field1 TO field001

Si le schéma avant est :

- root
| - colA
| - colB
| +-field1
| +-field2

Alors le schéma après est :

- root
| - colA
| - colB
| +-field001
| +-field2

Voir Renommer et supprimer des colonnes avec le mappage de colonnes Delta Lake.

Supprimer des colonnes

Pour supprimer des colonnes en tant qu’opération de métadonnées uniquement sans réécriture de fichiers de données, vous devez activer le mappage de colonnes pour la table. Voir Renommer et supprimer des colonnes avec le mappage de colonnes Delta Lake.

Note

La suppression d’une colonne des métadonnées ne supprime pas les données sous-jacentes de la colonne dans les fichiers. Pour vider les données de colonne supprimées :

  • Permet REORG TABLE de réécrire des fichiers.
  • Ensuite, utilisez VACUUM pour supprimer physiquement les fichiers qui contiennent les données de colonne supprimées.

Pour supprimer une colonne :

ALTER TABLE table_name DROP COLUMN col_name

Pour supprimer plusieurs colonnes :

ALTER TABLE table_name DROP COLUMNS (col_name_1, col_name_2)

Modifier le type de colonne ou le nom

Vous pouvez modifier le type ou le nom d’une colonne ou supprimer une colonne en réécritant la table. Pour ce faire, utilisez l’option overwriteSchema.

L’exemple suivant illustre la modification d’un type de colonne :

(spark.read.table(...)
  .withColumn("birthDate", col("birthDate").cast("date"))
  .write
  .mode("overwrite")
  .option("overwriteSchema", "true")
  .saveAsTable(...)
)

L’exemple suivant illustre la modification d’un nom de colonne :

(spark.read.table(...)
  .withColumnRenamed("dateOfBirth", "birthDate")
  .write
  .mode("overwrite")
  .option("overwriteSchema", "true")
  .saveAsTable(...)
)

Activer l’évolution du schéma

Utilisez WITH SCHEMA EVOLUTION ou définissez mergeSchema pour true apporter des modifications de schéma en fonction du schéma des données que vous souhaitez INSERT ou MERGE dans une table existante.

Activez l’évolution du schéma à l’aide de l’une des méthodes suivantes :

Databricks recommande d’activer l’évolution du schéma pour chaque opération d’écriture à l’aide de la WITH SCHEMA EVOLUTION syntaxe ou de l’option mergeSchema plutôt que de définir une configuration Spark.

Lorsque vous utilisez des options ou une syntaxe pour activer l’évolution du schéma dans une opération d’écriture, cela est prioritaire sur la configuration Spark.

Activer l’évolution du schéma pour les écritures afin d’ajouter de nouvelles colonnes

Lorsque l’évolution du schéma est activée, les colonnes présentes dans la requête source, mais manquantes dans la table cible, sont automatiquement ajoutées dans le cadre d’une transaction d’écriture. Consultez Activer l’évolution du schéma.

Tenez compte des éléments suivants :

  • La casse est conservée lors de l’ajout d’une nouvelle colonne.
  • Les nouvelles colonnes sont ajoutées à la fin du schéma de la table.
  • Si les colonnes supplémentaires se trouvent dans un struct, elles sont ajoutées à la fin du struct dans la table cible.

INSERT avec l’évolution du schéma à l’aide de SQL

Utilisez la clause dans INSERT les instructions pour activer l’évolution WITH SCHEMA EVOLUTION du schéma :

INSERT WITH SCHEMA EVOLUTION INTO target_table
SELECT * FROM source_table

Si la requête sur source_table retourne des colonnes qui n’existent pas dans la table cible, ces colonnes sont automatiquement ajoutées au schéma target_table. Les lignes existantes reçoivent des valeurs NULL pour les nouvelles colonnes.

INSERT avec l’évolution du schéma à l’aide de l’API DataFrame

L’exemple suivant illustre l’utilisation de l’option mergeSchema avec une opération d’écriture par lots :

Python
(spark.read
  .table("source_table")
  .write
  .option("mergeSchema", "true")
  .mode("append")
  .saveAsTable("target_table")
)
Scala
spark.read
  .table("source_table")
  .write
  .option("mergeSchema", "true")
  .mode("append")
  .saveAsTable("target_table")

INSERT avec l’évolution du schéma avec Structured Streaming

L’exemple suivant illustre l’utilisation de l’option mergeSchema avec le chargeur automatique pour Structured Streaming. Consultez Qu’est-ce que Auto Loader ?.

(spark.readStream
  .format("cloudFiles")
  .option("cloudFiles.format", "json")
  .option("cloudFiles.schemaLocation", "<path-to-schema-location>")
  .load("<path-to-source-data>")
  .writeStream
  .option("mergeSchema", "true")
  .option("checkpointLocation", "<path-to-checkpoint>")
  .trigger(availableNow=True)
  .toTable("table_name")
)

Évolution automatique du schéma pour la fusion

Pour MERGE, l’évolution du schéma vous permet de résoudre les incompatibilités de schéma entre la table cible et source. Il gère les deux cas suivants :

  1. Une colonne existe dans la table source, mais pas la table cible, et est spécifiée par nom dans une affectation d’actions d’insertion ou de mise à jour. Une action UPDATE SET * ou INSERT * est également présente.

    Cette colonne sera ajoutée au schéma cible et ses valeurs seront renseignées à partir de la colonne correspondante dans la source.

    • Cela s’applique uniquement lorsque le nom et la structure de colonne dans la source de fusion correspondent exactement à l’affectation cible.

    • La nouvelle colonne doit être présente dans le schéma source. L’affectation de la nouvelle colonne dans la clause action ne définit pas cette colonne.

    Ces exemples permettent l’évolution du schéma :

    -- The column newcol is present in the source but not in the target. It will be added to the target.
    UPDATE SET target.newcol = source.newcol
    
    -- The field newfield doesn't exist in struct column somestruct of the target. It will be added to that struct column.
    UPDATE SET target.somestruct.newfield = source.somestruct.newfield
    
    -- The column newcol is present in the source but not in the target.
    -- It will be added to the target.
    UPDATE SET target.newcol = source.newcol + 1
    
    -- Any columns and nested fields in the source that don't exist in target will be added to the target.
    UPDATE SET *
    INSERT *
    

    Ces exemples ne déclenchent pas l’évolution du schéma si la colonne newcol n’est pas présente dans le source schéma :

    UPDATE SET target.newcol = source.someothercol
    UPDATE SET target.newcol = source.x + source.y
    UPDATE SET target.newcol = source.output.newcol
    
  2. Une colonne existe dans la table cible, mais pas dans la table source.

    Le schéma cible n’est pas modifié. Ces colonnes :

    • Sont laissés inchangés pour UPDATE SET *.

    • Ils sont définis NULL pour INSERT *.

    • Peut toujours être modifié explicitement s’il est affecté dans la clause action.

    Par exemple:

    UPDATE SET *  -- The target columns that are not in the source are left unchanged.
    INSERT *  -- The target columns that are not in the source are set to NULL.
    UPDATE SET target.onlyintarget = 5  -- The target column is explicitly updated.
    UPDATE SET target.onlyintarget = source.someothercol  -- The target column is explicitly updated from some other source column.
    

Vous devez activer manuellement l’évolution automatique du schéma. Consultez Activer l’évolution du schéma.

Note

Dans Databricks Runtime 11.3 LTS et versions antérieures, seules les actions INSERT * ou UPDATE SET * peuvent être utilisées pour l’évolution du schéma avec la fusion.

Dans Databricks Runtime 12.2 LTS et versions ultérieures, vous pouvez spécifier les colonnes et les champs struct présents dans la table source par leur nom dans les actions d’insertion ou de mise à jour.

Dans Databricks Runtime 13.3 LTS et versions ultérieures, vous pouvez utiliser l’évolution du schéma avec des structs imbriqués à l’intérieur des cartes, comme map<int, struct<a: int, b: int>>.

MERGEavec l’évolution du schéma à l’aide de SQL, Python et Scala

Dans Databricks Runtime 15.4 LTS et versions ultérieures, vous pouvez spécifier l’évolution du schéma dans une instruction de fusion à l’aide d’API SQL ou de table :

SQL
MERGE WITH SCHEMA EVOLUTION INTO target
USING source
ON source.key = target.key
WHEN MATCHED THEN
  UPDATE SET *
WHEN NOT MATCHED THEN
  INSERT *
WHEN NOT MATCHED BY SOURCE THEN
  DELETE
Python
from delta.tables import *

(targetTable
  .merge(sourceDF, "source.key = target.key")
  .withSchemaEvolution()
  .whenMatchedUpdateAll()
  .whenNotMatchedInsertAll()
  .whenNotMatchedBySourceDelete()
  .execute()
)
Scala
import io.delta.tables._

targetTable
  .merge(sourceDF, "source.key = target.key")
  .withSchemaEvolution()
  .whenMatched()
  .updateAll()
  .whenNotMatched()
  .insertAll()
  .whenNotMatchedBySource()
  .delete()
  .execute()

Exemples d’opérations d’évolution de MERGE schéma

Voici quelques exemples des effets de l’opération MERGE avec et sans évolution du schéma.

Columns Requête (dans SQL) Comportement sans évolution du schéma (par défaut) Comportement avec évolution du schéma
Colonnes cibles : key, value
Colonnes sources : key, value, new_value
MERGE INTO target_table t
USING source_table s
ON t.key = s.key
WHEN MATCHED
THEN UPDATE SET *
WHEN NOT MATCHED
THEN INSERT *
Le schéma de la table reste inchangé. Seules les colonnes key et value sont mises à jour/insérées. Le schéma de la table est modifié en (key, value, new_value). Les enregistrements existants avec des correspondances sont mis à jour avec le value et le new_value dans la source. Les nouvelles lignes sont insérées avec le schéma (key, value, new_value).
Colonnes cibles : key, old_value
Colonnes sources : key, new_value
MERGE INTO target_table t
USING source_table s
ON t.key = s.key
WHEN MATCHED
THEN UPDATE SET *
WHEN NOT MATCHED
THEN INSERT *
Les actions UPDATE et INSERT génèrent une erreur parce que la colonne cible old_value ne figure pas dans la source Le schéma de la table est modifié en (key, old_value, new_value). Les enregistrements existants avec des correspondances sont mis à jour avec le new_value dans la source, laissant le old_value inchangé. Les nouveaux enregistrements sont insérés avec les key, new_value et NULL spécifiés pour le old_value.
Colonnes cibles : key, old_value
Colonnes sources : key, new_value
MERGE INTO target_table t
USING source_table s
ON t.key = s.key
WHEN MATCHED
THEN UPDATE SET new_value = s.new_value
UPDATE génère une erreur parce que la colonne new_value n’existe pas dans la table cible. Le schéma de la table est modifié en (key, old_value, new_value). Les enregistrements existants ayant des correspondances sont mis à jour avec le new_value dans la source tout en laissant le old_value inchangé, et les enregistrements sans correspondance reçoivent NULL pour new_value. Voir la remarque (1).
Colonnes cibles : key, old_value
Colonnes sources : key, new_value
MERGE INTO target_table t
USING source_table s
ON t.key = s.key
WHEN NOT MATCHED
THEN INSERT (key, new_value) VALUES (s.key, s.new_value)
INSERT génère une erreur parce que la colonne new_value n’existe pas dans la table cible. Le schéma de la table est modifié en (key, old_value, new_value). Les nouveaux enregistrements sont insérés avec les key, new_value et NULL spécifiés pour le old_value. Les enregistrements existants ont NULL entré pour new_value, laissant old_value inchangé. Voir la remarque (1).

(1) Ce comportement est disponible dans Databricks Runtime 12.2 LTS et versions ultérieures ; Databricks Runtime 11.3 LTS et une erreur ci-dessous dans cette condition.

Exclure des colonnes avec fusion

Dans Databricks Runtime 12.2 LTS et versions ultérieures, vous pouvez utiliser des clauses EXCEPT dans des conditions de fusion pour exclure explicitement des colonnes. Le comportement du mot clé EXCEPT varie selon que l’évolution du schéma est activée.

Lorsque l’évolution du schéma est désactivée, le EXCEPT mot clé s’applique à la liste des colonnes de la table cible et autorise l’exclusion de colonnes à partir de UPDATE ou INSERT d’actions. Les colonnes exclues sont définies sur null.

Si l’évolution du schéma est activée, le mot clé EXCEPT s’applique à la liste des colonnes de la table source et permet d’exclure les colonnes de l’évolution du schéma. Une nouvelle colonne dans la source, non présente dans la table cible, n’est pas ajoutée au schéma cible s’il est répertorié dans la EXCEPT clause. Les colonnes exclues qui sont déjà présentes dans la cible sont définies sur null.

Exemples de l’utilisation de EXCLUDE avec MERGE

Les exemples ci-dessous illustrent cette syntaxe :

Columns Requête (dans SQL) Comportement sans évolution du schéma (par défaut) Comportement avec évolution du schéma
Colonnes cibles : id, title, last_updated
Colonnes sources : id, title, review, last_updated
MERGE INTO target t
USING source s
ON t.id = s.id
WHEN MATCHED
THEN UPDATE SET last_updated = current_date()
WHEN NOT MATCHED
THEN INSERT * EXCEPT (last_updated)
Les lignes correspondantes sont mises à jour en définissant le champ last_updated sur la date actuelle. Les nouvelles lignes sont insérées à l’aide de valeurs pour id et title. Le champ exclu last_updated est défini sur null. Le champ review est ignoré, car il n’est pas dans la cible. Les lignes correspondantes sont mises à jour en définissant le champ last_updated sur la date actuelle. Le schéma est évolué pour ajouter le champ review. Les nouvelles lignes sont insérées à l’aide de tous les champs sources, à l’exception de last_updated, qui est défini sur null.
Colonnes cibles : id, title, last_updated
Colonnes sources : id, title, review, internal_count
MERGE INTO target t
USING source s
ON t.id = s.id
WHEN MATCHED
THEN UPDATE SET last_updated = current_date()
WHEN NOT MATCHED
THEN INSERT * EXCEPT (last_updated, internal_count)
INSERT génère une erreur parce que la colonne internal_count n’existe pas dans la table cible. Les lignes correspondantes sont mises à jour en définissant le champ last_updated sur la date actuelle. Le champ review est ajouté à la table cible, mais le champ internal_count est ignoré. Pour les nouvelles lignes insérées, le champ last_updated est défini sur null.

Activer l’évolution du schéma avec la configuration Spark (hérité)

Vous pouvez définir la configuration spark.databricks.delta.schema.autoMerge.enabled de Spark sur true pour activer l’évolution du schéma pour toutes les opérations d’écriture dans la session Spark actuelle.

Python

spark.conf.set("spark.databricks.delta.schema.autoMerge.enabled", True)

Scala

spark.conf.set("spark.databricks.delta.schema.autoMerge.enabled", true)

SQL

SET spark.databricks.delta.schema.autoMerge.enabled=true

Note

Databricks ne recommande pas cette approche pour la production. La définition d’une configuration à l’échelle de la session peut entraîner des modifications involontaires de schéma sur plusieurs opérations et complique la raison des opérations qui évoluent le schéma.

Au lieu de cela, activez l’évolution du schéma pour chaque opération d’écriture :

Lorsque vous utilisez des options ou une syntaxe pour activer l’évolution du schéma dans une opération d’écriture, cela est prioritaire sur la configuration Spark.

Remplacer le schéma de table

Par défaut, le remplacement des données dans une table ne remplace pas le schéma. Lorsque vous remplacez une table mode("overwrite") sans replaceWhere, vous pouvez toujours remplacer le schéma des données en cours d’écriture.

Pour remplacer le schéma et le partitionnement de la table, définissez l’option overwriteSchema sur true:

df.write.option("overwriteSchema", "true")

Note

Vous ne pouvez pas spécifier overwriteSchema comme true lors de l’utilisation du remplacement de partition dynamique. Consultez les remplacements de partition dynamique avec partitionOverwriteMode (hérité) .