Note
Access to this page requires authorization. You can try signing in or changing directories.
Access to this page requires authorization. You can try changing directories.
Tables support schema evolution, allowing modifications to table structure as data requirements change. The following types of changes are supported:
- Adding new columns at arbitrary positions
- Reordering existing columns
- Renaming existing columns
- Type widening existing columns, see Widen types with automatic schema evolution
Make these changes explicitly using DDL or implicitly using DML.
Important
Schema updates conflict with all concurrent write operations. Databricks recommends coordinating schema changes to avoid write conflicts.
Updating a table schema terminates any streams reading from that table. To continue processing, restart the stream using the methods described in Production considerations for Structured Streaming.
Manual schema changes
Use ALTER TABLE statements to explicitly change a table's schema without writing new data.
Add columns
Use ALTER TABLE ... ADD COLUMNS to add one or more columns to an existing table, optionally specifying position and a comment:
ALTER TABLE table_name ADD COLUMNS (col_name data_type [COMMENT col_comment] [FIRST|AFTER colA_name], ...)
By default, nullability is true.
Example: Add nested fields
Adding nested columns is supported only for structs. Arrays and maps are not supported.
To add a column to a nested field, use:
ALTER TABLE table_name ADD COLUMNS (col_name.nested_col_name data_type [COMMENT col_comment] [FIRST|AFTER colA_name], ...)
For example, if the schema before running ALTER TABLE boxes ADD COLUMNS (colB.nested STRING AFTER field1) is:
- root
| - colA
| - colB
| +-field1
| +-field2
the schema after is:
- root
| - colA
| - colB
| +-field1
| +-nested
| +-field2
Change column comments and ordering
Use ALTER TABLE ... ALTER COLUMN to update a column's comment or reorder it relative to other columns:
ALTER TABLE table_name ALTER [COLUMN] col_name (COMMENT col_comment | FIRST | AFTER colA_name)
Example: Change nested fields
To change a column in a nested field, use:
ALTER TABLE table_name ALTER [COLUMN] col_name.nested_col_name (COMMENT col_comment | FIRST | AFTER colA_name)
For example, if the schema before running ALTER TABLE boxes ALTER COLUMN colB.field2 FIRST is:
- root
| - colA
| - colB
| +-field1
| +-field2
the schema after is:
- root
| - colA
| - colB
| +-field2
| +-field1
Replace columns
Use ALTER TABLE ... REPLACE COLUMNS to redefine the full column list of a table, including adding, removing, reordering, or renaming columns in a single operation:
ALTER TABLE table_name REPLACE COLUMNS (col_name1 col_type1 [COMMENT col_comment1], ...)
Example: Replace nested fields
For example, when running the following DDL:
ALTER TABLE boxes REPLACE COLUMNS (colC STRING, colB STRUCT<field2:STRING, nested:STRING, field1:STRING>, colA STRING)
if the schema before is:
- root
| - colA
| - colB
| +-field1
| +-field2
the schema after is:
- root
| - colC
| - colB
| +-field2
| +-nested
| +-field1
| - colA
Rename columns
To rename columns without rewriting any of the columns' existing data, you must enable column mapping for the table. See Rename and drop columns with Delta Lake column mapping.
To rename a column:
ALTER TABLE table_name RENAME COLUMN old_col_name TO new_col_name
Example: Rename nested fields
To rename a nested field:
ALTER TABLE table_name RENAME COLUMN col_name.old_nested_field TO new_nested_field
For example, when you run the following command:
ALTER TABLE boxes RENAME COLUMN colB.field1 TO field001
If the schema before is:
- root
| - colA
| - colB
| +-field1
| +-field2
Then the schema after is:
- root
| - colA
| - colB
| +-field001
| +-field2
See Rename and drop columns with Delta Lake column mapping.
Drop columns
To drop columns as a metadata-only operation without rewriting any data files, you must enable column mapping for the table. See Rename and drop columns with Delta Lake column mapping.
Note
Dropping a column from metadata does not delete the underlying data for the column in files. To purge the dropped column data:
- Use REORG TABLE to rewrite files.
- Then use VACUUM to physically delete the files that contain the dropped column data.
To drop a column:
ALTER TABLE table_name DROP COLUMN col_name
To drop multiple columns:
ALTER TABLE table_name DROP COLUMNS (col_name_1, col_name_2)
Change column type or name
You can change a column's type or name or drop a column by rewriting the table. To do this, use the overwriteSchema option.
The following example shows changing a column type:
(spark.read.table(...)
.withColumn("birthDate", col("birthDate").cast("date"))
.write
.mode("overwrite")
.option("overwriteSchema", "true")
.saveAsTable(...)
)
The following example shows changing a column name:
(spark.read.table(...)
.withColumnRenamed("dateOfBirth", "birthDate")
.write
.mode("overwrite")
.option("overwriteSchema", "true")
.saveAsTable(...)
)
Enable schema evolution
Use WITH SCHEMA EVOLUTION or set mergeSchema to true to make schema changes based on the schema of the data you want to INSERT or MERGE into an existing table.
Enable schema evolution using one of the following methods:
- Use
INSERT WITH SCHEMA EVOLUTIONsyntax forINSERTstatements. - Use
MERGE WITH SCHEMA EVOLUTIONsyntax for withMERGEstatements. Use eitherWITH SCHEMA EVOLUTIONin the SQL syntax or.withSchemaEvolution()in the Azure Databricks API. - Set the
mergeSchemaoption for batch writes or streaming writes. Set.option("mergeSchema", "true")on individual write operations. - Set the Spark configuration (legacy): Sets
spark.databricks.delta.schema.autoMerge.enabledtotruefor the entire SparkSession.
Databricks recommends enabling schema evolution for each write operation using the WITH SCHEMA EVOLUTION syntax or the mergeSchema option rather than setting a Spark configuration.
When you use options or syntax to enable schema evolution in a write operation, this takes precedence over the Spark configuration.
Enable schema evolution for writes to add new columns
When schema evolution is enabled, columns that are present in the source query but missing from the target table are automatically added as part of a write transaction. See Enable schema evolution.
Consider the following:
- Case is preserved when appending a new column.
- New columns are added to the end of the table schema.
- If the additional columns are in a struct, they are appended to the end of the struct in the target table.
INSERT with schema evolution using SQL
Use the WITH SCHEMA EVOLUTION clause in INSERT statements to enable schema evolution:
INSERT WITH SCHEMA EVOLUTION INTO target_table
SELECT * FROM source_table
If the query on source_table returns columns that don't exist in the target table, those columns are automatically added to the target_table schema. Existing rows receive NULL values for the new columns.
INSERT with schema evolution using DataFrame API
The following example demonstrates using the mergeSchema option with a batch write operation:
Python
(spark.read
.table("source_table")
.write
.option("mergeSchema", "true")
.mode("append")
.saveAsTable("target_table")
)
Scala
spark.read
.table("source_table")
.write
.option("mergeSchema", "true")
.mode("append")
.saveAsTable("target_table")
INSERT with schema evolution with Structured Streaming
The following example demonstrates using the mergeSchema option with Auto Loader for Structured Streaming. See What is Auto Loader?.
(spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.schemaLocation", "<path-to-schema-location>")
.load("<path-to-source-data>")
.writeStream
.option("mergeSchema", "true")
.option("checkpointLocation", "<path-to-checkpoint>")
.trigger(availableNow=True)
.toTable("table_name")
)
Automatic schema evolution for merge
For MERGE, schema evolution allows you to resolve schema mismatches between the target and source table. It handles the following two cases:
A column exists in the source table but not the target table, and is specified by name in an assignment of insert or update actions. Alternatively, an
UPDATE SET *orINSERT *action is present.That column will be added to the target schema, and its values will be populated from the corresponding column in the source.
This only applies when the column name and structure in the merge source exactly match the target assignment.
The new column must be present in the source schema. Assigning the new column in the action clause doesn't define that column.
These examples allow schema evolution:
-- The column newcol is present in the source but not in the target. It will be added to the target. UPDATE SET target.newcol = source.newcol -- The field newfield doesn't exist in struct column somestruct of the target. It will be added to that struct column. UPDATE SET target.somestruct.newfield = source.somestruct.newfield -- The column newcol is present in the source but not in the target. -- It will be added to the target. UPDATE SET target.newcol = source.newcol + 1 -- Any columns and nested fields in the source that don't exist in target will be added to the target. UPDATE SET * INSERT *These examples don't trigger schema evolution if the column
newcolisn't present in thesourceschema:UPDATE SET target.newcol = source.someothercol UPDATE SET target.newcol = source.x + source.y UPDATE SET target.newcol = source.output.newcolA column exists in the target table but not the source table.
The target schema is not changed. These columns:
Are left unchanged for
UPDATE SET *.Are set to
NULLforINSERT *.Might still be explicitly modified if assigned in the action clause.
For example:
UPDATE SET * -- The target columns that are not in the source are left unchanged. INSERT * -- The target columns that are not in the source are set to NULL. UPDATE SET target.onlyintarget = 5 -- The target column is explicitly updated. UPDATE SET target.onlyintarget = source.someothercol -- The target column is explicitly updated from some other source column.
You must manually enable automatic schema evolution. See Enable schema evolution.
Note
In Databricks Runtime 11.3 LTS and below, only INSERT * or UPDATE SET * actions can be used for schema evolution with merge.
In Databricks Runtime 12.2 LTS and above, columns and struct fields present in the source table can be specified by name in insert or update actions.
In Databricks Runtime 13.3 LTS and above, you can use schema evolution with structs nested inside maps, such as map<int, struct<a: int, b: int>>.
MERGE with schema evolution using SQL, Python, and Scala
In Databricks Runtime 15.4 LTS and above, you can specify schema evolution in a merge statement using SQL or table APIs:
SQL
MERGE WITH SCHEMA EVOLUTION INTO target
USING source
ON source.key = target.key
WHEN MATCHED THEN
UPDATE SET *
WHEN NOT MATCHED THEN
INSERT *
WHEN NOT MATCHED BY SOURCE THEN
DELETE
Python
from delta.tables import *
(targetTable
.merge(sourceDF, "source.key = target.key")
.withSchemaEvolution()
.whenMatchedUpdateAll()
.whenNotMatchedInsertAll()
.whenNotMatchedBySourceDelete()
.execute()
)
Scala
import io.delta.tables._
targetTable
.merge(sourceDF, "source.key = target.key")
.withSchemaEvolution()
.whenMatched()
.updateAll()
.whenNotMatched()
.insertAll()
.whenNotMatchedBySource()
.delete()
.execute()
Example operations of MERGE with schema evolution
Here are a few examples of the effects of MERGE operation with and without schema evolution.
| Columns | Query (in SQL) | Behavior without schema evolution (default) | Behavior with schema evolution |
|---|---|---|---|
Target columns: key, valueSource columns: key, value, new_value |
MERGE INTO target_table tUSING source_table sON t.key = s.keyWHEN MATCHED THEN UPDATE SET *WHEN NOT MATCHED THEN INSERT * |
The table schema remains unchanged; only columns key, value are updated/inserted. |
The table schema is changed to (key, value, new_value). Existing records with matches are updated with the value and new_value in the source. New rows are inserted with the schema (key, value, new_value). |
Target columns: key, old_valueSource columns: key, new_value |
MERGE INTO target_table tUSING source_table sON t.key = s.keyWHEN MATCHED THEN UPDATE SET *WHEN NOT MATCHED THEN INSERT * |
UPDATE and INSERT actions throw an error because the target column old_value is not in the source. |
The table schema is changed to (key, old_value, new_value). Existing records with matches are updated with the new_value in the source leaving old_value unchanged. New records are inserted with the specified key, new_value, and NULL for the old_value. |
Target columns: key, old_valueSource columns: key, new_value |
MERGE INTO target_table tUSING source_table sON t.key = s.keyWHEN MATCHED THEN UPDATE SET new_value = s.new_value |
UPDATE throws an error because column new_value does not exist in the target table. |
The table schema is changed to (key, old_value, new_value). Existing records with matches are updated with the new_value in the source leaving old_value unchanged, and unmatched records have NULL entered for new_value. See note (1). |
Target columns: key, old_valueSource columns: key, new_value |
MERGE INTO target_table tUSING source_table sON t.key = s.keyWHEN NOT MATCHED THEN INSERT (key, new_value) VALUES (s.key, s.new_value) |
INSERT throws an error because column new_value does not exist in the target table. |
The table schema is changed to (key, old_value, new_value). New records are inserted with the specified key, new_value, and NULL for the old_value. Existing records have NULL entered for new_value leaving old_value unchanged. See note (1). |
(1) This behavior is available in Databricks Runtime 12.2 LTS and above; Databricks Runtime 11.3 LTS and below error in this condition.
Exclude columns with merge
In Databricks Runtime 12.2 LTS and above, you can use EXCEPT clauses in merge conditions to explicitly exclude columns. The behavior of the EXCEPT keyword varies depending on whether or not schema evolution is enabled.
When schema evolution is turned off, the EXCEPT keyword applies to the list of columns in the target table and allows excluding columns from UPDATE or INSERT actions. Excluded columns are set to null.
With schema evolution enabled, the EXCEPT keyword applies to the list of columns in the source table and allows excluding columns from schema evolution. A new column in the source, not present in the target table, isn't added to the target schema if it is listed in the EXCEPT clause. Excluded columns that are already present in the target are set to null.
Examples of EXCLUDE with MERGE
The following examples demonstrate this syntax:
| Columns | Query (in SQL) | Behavior without schema evolution (default) | Behavior with schema evolution |
|---|---|---|---|
Target columns: id, title, last_updatedSource columns: id, title, review, last_updated |
MERGE INTO target tUSING source sON t.id = s.idWHEN MATCHED THEN UPDATE SET last_updated = current_date()WHEN NOT MATCHED THEN INSERT * EXCEPT (last_updated) |
Matched rows are updated by setting the last_updated field to the current date. New rows are inserted using values for id and title. The excluded field last_updated is set to null. The field review is ignored because it is not in the target. |
Matched rows are updated by setting the last_updated field to the current date. Schema is evolved to add the field review. New rows are inserted using all source fields except last_updated which is set to null. |
Target columns: id, title, last_updatedSource columns: id, title, review, internal_count |
MERGE INTO target tUSING source sON t.id = s.idWHEN MATCHED THEN UPDATE SET last_updated = current_date()WHEN NOT MATCHED THEN INSERT * EXCEPT (last_updated, internal_count) |
INSERT throws an error because column internal_count does not exist in the target table. |
Matched rows are updated by setting the last_updated field to the current date. The review field is added to the target table, but the internal_count field is ignored. New rows inserted have last_updated set to null. |
Enable schema evolution with Spark configuration (legacy)
You can set the Spark configuration spark.databricks.delta.schema.autoMerge.enabled to true to enable schema evolution for all write operations in the current SparkSession:
Python
spark.conf.set("spark.databricks.delta.schema.autoMerge.enabled", True)
Scala
spark.conf.set("spark.databricks.delta.schema.autoMerge.enabled", true)
SQL
SET spark.databricks.delta.schema.autoMerge.enabled=true
Note
Databricks doesn't recommend this approach for production. Setting a session-wide configuration might lead to unintended schema changes across multiple operations and makes it harder to reason about which operations evolve the schema.
Instead, enable schema evolution for each write operation:
- For
INSERTand batch/streaming writes, use.option("mergeSchema", "true")orINSERT WITH SCHEMA EVOLUTION - For
MERGEstatements, useMERGE WITH SCHEMA EVOLUTION
When you use options or syntax to enable schema evolution in a write operation, this takes precedence over the Spark configuration.
Replace table schema
By default, overwriting the data in a table doesn't overwrite the schema. When overwriting a table using mode("overwrite") without replaceWhere, you might still want to overwrite the schema of the data being written.
To replace the schema and partitioning of the table, set the overwriteSchema option to true:
df.write.option("overwriteSchema", "true")
Note
You cannot specify overwriteSchema as true when using dynamic partition overwrite. See Dynamic partition overwrites with partitionOverwriteMode (legacy).