Typerweiterung

Verfügbar in Delta Lake-Tabellen in Databricks Runtime 15.4 LTS und höher, ermöglicht die Typverbreiterung das Ändern von Spaltendatentypen in einen breiteren Typ ohne Umschreiben von Datendateien.

Alle verwalteten Tabellen im Unity-Katalog verwenden standardmäßig Delta Lake-Tabellen. Siehe verwaltete Tabellen des Unity-Katalogs für Delta Lake und Apache Iceberg.

Hinweis

Durch aktivieren der Typerweiterung werden die Lese- und Schreibprotokolle aktualisiert. Dies kann sich auf die Kompatibilität mit externen Delta Lake-Clients auswirken. Siehe Delta Lake Featurekompatibilität und Protokolle.

Tabellen mit aktivierter Typweiterung können nur von Databricks Runtime 15.4 LTS und höher gelesen werden.

Unterstützte Typänderungen

Sie können Typen entsprechend den folgenden Regeln erweitern:

Quelltyp Unterstützte größere Datentypen
BYTE SHORT, INT, BIGINT, DECIMAL, , DOUBLE
SHORT INT, BIGINT, DECIMAL, DOUBLE
INT BIGINT, DECIMALDOUBLE
BIGINT DECIMAL
FLOAT DOUBLE
DECIMAL DECIMAL mit größerer Präzision und Skalierung
DATE TIMESTAMP_NTZ
VOID Beliebiger Typ

Typänderungen werden für Spalten und Felder auf oberster Ebene unterstützt, die in Strukturen, Maps und Arrays geschachtelt sind.

Hinweis

VOID für jeden Typ erfordert nicht, dass Typverbreiterung für die Tabelle aktiviert wird. Jeder Vorgang, der den Typ einer VOID Spalte aktualisiert, wird ohne zusätzliche Konfiguration erfolgreich ausgeführt. VOID Typ-Erweiterung ist in Databricks Runtime 18.2 und höher verfügbar.

Dezimalverhalten

Spark schneidet den Bruchteil eines Werts standardmäßig ab, wenn ein Vorgang einen ganzzahligen Typ in einen decimal oder double umwandelt und ein nachgeschalteter Datenfluss den Wert zurück in eine ganzzahlige Spalte schreibt. Ausführliche Informationen zum Verhalten von Zuweisungsrichtlinien finden Sie unter Store-Zuweisung.

Beim Ändern eines numerischen Typs in decimalmuss die Gesamtgenauigkeit gleich oder größer als die Anfangsgenauigkeit sein. Wenn Sie auch die Skalierung erhöhen, muss die Gesamtgenauigkeit um einen entsprechenden Betrag erhöht werden.

Das Mindestziel für die Typen byte, short und int ist decimal(10,0). Das Mindestziel für long ist decimal(20,0).

Wenn Sie einem Feld mit decimal(10,1)zwei Dezimalstellen hinzufügen möchten, ist das Mindestziel decimal(12,3).

Typausweitung aktivieren

Hinweis

Durch aktivieren der Typerweiterung werden die Lese- und Schreibprotokolle aktualisiert. Dies kann sich auf die Kompatibilität mit externen Delta Lake-Clients auswirken. Siehe Delta Lake Featurekompatibilität und Protokolle.

Sie können die Typerweiterung für eine vorhandene Tabelle aktivieren, indem Sie die Tabelleneigenschaft delta.enableTypeWidening auf true festlegen:

  ALTER TABLE <table_name> SET TBLPROPERTIES ('delta.enableTypeWidening' = 'true')

Sie können die Typerweiterung auch während der Tabellenerstellung aktivieren:

  CREATE TABLE T(c1 INT) TBLPROPERTIES('delta.enableTypeWidening' = 'true')

Manuelles Anwenden einer Typänderung

Verwenden Sie den Befehl ALTER COLUMN, um Typen manuell zu ändern:

ALTER TABLE <table_name> ALTER COLUMN <col_name> TYPE <new_type>

Dieser Vorgang aktualisiert das Tabellenschema, ohne die zugrunde liegenden Datendateien neu zu schreiben. Weitere Informationen finden Sie unter ALTER TABLE.

Erweitern von Typen mit der automatischen Schemaentwicklung

Verwenden Sie die Schemaentwicklung mit Typweitergabe, um Datentypen in Zieltabellen zu aktualisieren, um dem Typ der eingehenden Daten zu entsprechen.

Hinweis

Ohne Typweitergabe aktiviert, versucht die Schemaentwicklung immer, Daten zu downcasten, um Spaltentypen in der Zieltabelle abzugleichen. Wenn Sie datentypen in Ihren Zieltabellen nicht automatisch verbreitern möchten, müssen Sie die Typweitergabe deaktivieren, bevor Sie Workloads mit aktivierter Schemaentwicklung ausführen.

Um die Schemaentwicklung zu verwenden, um den Datentyp einer Spalte während der Aufnahme zu erweitern, müssen Sie die folgenden Bedingungen erfüllen:

  • Der Schreibbefehl wird mit aktivierter automatischer Schemaentwicklung ausgeführt.
  • Für die Zieltabelle ist die Typerweiterung aktiviert.
  • Der Quellspaltentyp ist breiter als der Zielspaltentyp.
  • Die Typverbreiterung ermöglicht die Typänderung.

Typenkollisionen, die nicht alle diese Bedingungen erfüllen, unterliegen den normalen Regeln der Schemaüberprüfung. Siehe Schema-Durchsetzung.

Example

Die folgenden Beispiele veranschaulichen, wie die Typweiterung mit der Schemaentwicklung funktioniert.

Python

Erstellen Sie eine Zieltabelle mit einer INT Spalte und einer Quelltabelle mit einer BIGINT Spalte:

spark.sql("CREATE TABLE target_table (id INT, data STRING) TBLPROPERTIES ('delta.enableTypeWidening' = 'true')")
spark.sql("CREATE TABLE source_table (id BIGINT, data STRING)")

Verwenden Sie saveAsTable() mit Schemaevolution, um die Spalte INT während eines Anfügevorgangs automatisch auf BIGINT zu erweitern:

spark.table("source_table").write.mode("append").option("mergeSchema", "true").saveAsTable("target_table")

Verwenden von MERGE INTO mit Schemaweiterentwicklung:

from delta.tables import DeltaTable

source_df = spark.table("source_table")
target_table = DeltaTable.forName(spark, "target_table")

(target_table.alias("target")
  .merge(source_df.alias("source"), "target.id = source.id")
  .withSchemaEvolution()
  .whenMatchedUpdateAll()
  .whenNotMatchedInsertAll()
  .execute()
)

Scala

Erstellen Sie eine Zieltabelle mit einer INT Spalte und einer Quelltabelle mit einer BIGINT Spalte:

spark.sql("CREATE TABLE target_table (id INT, data STRING) TBLPROPERTIES ('delta.enableTypeWidening' = 'true')")
spark.sql("CREATE TABLE source_table (id BIGINT, data STRING)")

Verwenden Sie saveAsTable() mit der Schemaevolution, um die Spalte INT während eines Anfügevorgangs automatisch auf BIGINT zu erweitern:

spark.table("source_table").write.mode("append").option("mergeSchema", "true").saveAsTable("target_table")

Verwendung von MERGE INTO mit Schemaentwicklung:

import io.delta.tables.DeltaTable

val sourceDf = spark.table("source_table")
val targetTable = DeltaTable.forName(spark, "target_table")

targetTable.alias("target")
  .merge(sourceDf.alias("source"), "target.id = source.id")
  .withSchemaEvolution()
  .whenMatched().updateAll()
  .whenNotMatched().insertAll()
  .execute()

SQL

Erstellen Sie eine Zieltabelle mit einer INT Spalte und einer Quelltabelle mit einer BIGINT Spalte:

CREATE TABLE target_table (id INT, data STRING) TBLPROPERTIES ('delta.enableTypeWidening' = 'true');
CREATE TABLE source_table (id BIGINT, data STRING);

Verwenden Sie INSERT INTO mit der Schemaentwicklung, um die Spalte INT während eines Anfügevorgangs automatisch auf BIGINT zu erweitern:

INSERT WITH SCHEMA EVOLUTION INTO target_table SELECT * FROM source_table;

Verwenden von MERGE INTO mit Schema-Evolution:

MERGE WITH SCHEMA EVOLUTION INTO target_table
USING source_table
ON target_table.id = source_table.id
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *;

Automatischer Lader

Important

Die Unterstützung für Typverbreiterung im Auto Loader befindet sich in der öffentlichen Vorschau.

Auto Loader unterstützt die Typverbreiteung mit der automatischen Schemaentwicklung. Wenn Sie das automatische Ladeprogramm zum Aufnehmen von Daten in eine Delta Lake-Tabelle mit aktivierter Typweiterung und Schemaentwicklung verwenden, werden Spaltentypen automatisch erweitert, um die eingehenden Daten abzugleichen.

(spark.readStream
  .format("cloudFiles")
  .option("cloudFiles.format", "json")
  .option("cloudFiles.schemaLocation", "<path-to-schema-location>")
  .load("<path-to-source-data>")
  .writeStream
  .option("mergeSchema", "true")
  .option("checkpointLocation", "<path-to-checkpoint>")
  .trigger(availableNow=True)
  .toTable("table_name")
)

Siehe Automatisches Typen-Widening mit Auto Loader. Außerdem muss die Zieltabelle die Datentyp-Erweiterung aktiviert haben. Siehe Typweiterung aktivieren.

Deaktivieren Sie die Funktion zum Erweitern von Tabellen für Typen

Sie können die versehentliche Typenerweiterung für aktivierte Tabellen verhindern, indem Sie die Eigenschaft auf false festlegen:

  ALTER TABLE <table_name> SET TBLPROPERTIES ('delta.enableTypeWidening' = 'false')

Diese Einstellung verhindert zukünftige Typänderungen an der Tabelle, entfernt jedoch nicht das Feature zum Erweitern der Tabelle oder rückgängigmachen vorheriger Typänderungen.

Wenn Sie die Tabellenfunktionen für die Typerweiterung vollständig entfernen müssen, können Sie den Befehl DROP FEATURE verwenden, wie im folgenden Beispiel gezeigt:

 ALTER TABLE <table-name> DROP FEATURE 'typeWidening' [TRUNCATE HISTORY]

Hinweis

Tabellen, für die die Typenerweiterung mit Databricks Runtime 15.4 LTS aktiviert wurde, erfordern stattdessen, dass Sie das Feature typeWidening-preview deaktivieren.

Beim Entfernen der Typverbreiterung schreibt Databricks alle Datendateien neu, die nicht dem aktuellen Tabellenschema entsprechen. Weitere Informationen finden Sie unter Löschen eines Delta Lake-Tabellenfeatures und Herabstufen des Tabellenprotokolls.

Streaming aus einer Delta Lake-Tabelle

Unterstützung für die Typweiterung im strukturierten Streaming ist in Databricks Runtime 16.4 LTS und höher verfügbar.

Beim Streaming von einer Delta Lake-Tabelle mit aktivierter Typweiterung können Sie die automatische Typweitergabe für Streamingabfragen konfigurieren, indem Sie die Schemaentwicklung mit der Option in der mergeSchema Zieltabelle aktivieren. Die Zieltabelle muss die Typweiterung aktiviert haben. Siehe Typweiterung aktivieren.

Python

(spark.readStream
  .table("delta_source_table")
  .writeStream
  .option("checkpointLocation", "/path/to/checkpointLocation")
  .option("mergeSchema", "true")
  .toTable("output_table")
)

Scala

spark.readStream
  .table("delta_source_table")
  .writeStream
  .option("checkpointLocation", "/path/to/checkpointLocation")
  .option("mergeSchema", "true")
  .toTable("output_table")

Wenn mergeSchema diese Option aktiviert ist und die Zieltabelle die Typweiterung aktiviert hat:

  • Typänderungen werden automatisch auf die nachgelagerte Tabelle angewendet, ohne dass ein manueller Eingriff erforderlich ist.
  • Neue Spalten werden dem nachgeschalteten Tabellenschema automatisch hinzugefügt.

Ohne mergeSchema aktiviert, werden Werte gemäß der Konfiguration von spark.sql.storeAssignmentPolicy behandelt, die standardmäßig Werte herabsetzt, um den Zielspaltentyp anzugleichen. Weitere Informationen zum Verhalten von Zuweisungsrichtlinien finden Sie unter Store Assignment.

Behandeln von Typänderungen in einem Datenstrom

Beim Streamen aus einer Delta Lake-Tabelle können Sie einen Schemanachverfolgungsort bereitstellen, um nicht-additive Schemaänderungen einschließlich Typänderungen nachzuverfolgen. Die Bereitstellung eines Schemaverfolgungsorts ist in Databricks Runtime 18.0 und darunter erforderlich und ist in Databricks Runtime 18.1 und höher optional.

Sie können schemaTrackingLocation nicht mit SQL festlegen. Weitere Informationen findest du unter Nicht unterstützte Features.

schemaTrackingLocation muss auf einen Speicherort im selben Verzeichnispfad wie Ihr Streaming-Checkpoint festgelegt sein. Beispiel:

Python

checkpoint_path = "/path/to/checkpointLocation"

(spark.readStream
  .option("schemaTrackingLocation", checkpoint_path)
  .table("delta_source_table")
  .writeStream
  .option("checkpointLocation", checkpoint_path)
  .toTable("output_table")
)

Scala

val checkpointPath = "/path/to/checkpointLocation"

spark.readStream
  .option("schemaTrackingLocation", checkpointPath)
  .table("delta_source_table")
  .writeStream
  .option("checkpointLocation", checkpointPath)
  .toTable("output_table")

Nach dem Festlegen eines Speicherorts für die Schema-Nachverfolgung aktualisiert der Datenstrom das nachverfolgte Schema, wenn er eine Typänderung erkennt, und stoppt anschließend. Zu diesem Zeitpunkt müssen Sie die Typänderung behandeln, z. B. indem Sie die Typerweiterung in der nachgelagerten Tabelle aktivieren oder die Streaming-Abfrage aktualisieren.

Um die Verarbeitung fortzusetzen, legen Sie die Spark-Konfiguration spark.databricks.delta.streaming.allowSourceColumnTypeChange oder die DataFrame Leseoption allowSourceColumnTypeChangewie im folgenden Beispiel fest:

Python

checkpoint_path = "/path/to/checkpointLocation"

(spark.readStream
  .option("schemaTrackingLocation", checkpoint_path)
  .option("allowSourceColumnTypeChange", "<delta_source_table_version>")
  # alternatively to allow all future type changes for this stream:
  # .option("allowSourceColumnTypeChange", "always")
  .table("delta_source_table")
  .writeStream
  .option("checkpointLocation", checkpoint_path)
  .toTable("output_table")
)

Scala

val checkpointPath = "/path/to/checkpointLocation"

spark.readStream
  .option("schemaTrackingLocation", checkpointPath)
  .option("allowSourceColumnTypeChange", "<delta_source_table_version>")
  // alternatively to allow all future type changes for this stream:
  // .option("allowSourceColumnTypeChange", "always")
  .table("delta_source_table")
  .writeStream
  .option("checkpointLocation", checkpointPath)
  .toTable("output_table")

SQL

  -- To unblock for this particular stream just for this series of schema change(s):
  SET spark.databricks.delta.streaming.allowSourceColumnTypeChange.ckpt_<checkpoint_id> = "<delta_source_table_version>"
  -- To unblock for this particular stream:
  SET spark.databricks.delta.streaming.allowSourceColumnTypeChange = "<delta_source_table_version>"
  -- To unblock for all streams:
  SET spark.databricks.delta.streaming.allowSourceColumnTypeChange = "always"

Wenn der Datenstrom beendet wird, zeigt eine Fehlermeldung die Prüfpunkt-ID <checkpoint_id> und die Version der Delta Lake-Quelltabelle <delta_source_table_version>an.

Eine vollständige Liste der Streaming-Delta Lake-Optionen finden Sie unter Delta Lake.

Lakeflow Spark-Deklarative Pipelines

Sie können die Typerweiterung für Lakeflow Spark Declarative Pipelines auf Pipelineebene oder für einzelne Tabellen aktivieren. Durch die Typerweiterung können Spaltentypen während der Pipelineausführung automatisch erweitert werden, ohne dass eine vollständige Aktualisierung von Streamingtabellen erforderlich ist. Typänderungen in materialisierten Ansichten lösen immer eine vollständige Neukompilierung aus, und wenn eine Typänderung auf eine Quelltabelle angewendet wird, benötigen materialisierte Ansichten, die von dieser Tabelle abhängen, eine vollständige Neukompilierung, um die neuen Typen widerzuspiegeln.

Aktivieren der Typerweiterung für eine gesamte Pipeline

Um die Typerweiterung für alle Tabellen in einer Pipeline zu aktivieren, legen Sie die Pipelinekonfiguration pipelines.enableTypeWideningfest:

JSON

{
  "configuration": {
    "pipelines.enableTypeWidening": "true"
  }
}

YAML

configuration:
  pipelines.enableTypeWidening: 'true'

Aktivieren Sie die Typweiterung für bestimmte Tabellen

Sie können auch die Typweiterung für einzelne Tabellen aktivieren, indem Sie die Tabelleneigenschaft delta.enableTypeWideningfestlegen:

Python

import dlt

@dlt.table(
  table_properties={"delta.enableTypeWidening": "true"}
)
def my_table():
  return spark.readStream.table("source_table")

SQL

CREATE OR REFRESH STREAMING TABLE my_table
TBLPROPERTIES ('delta.enableTypeWidening' = 'true')
AS SELECT * FROM source_table

Kompatibilität mit nachgeschalteten Lesern

Tabellen mit aktivierter Typweiterung können nur in Databricks Runtime 15.4 LTS und höher gelesen werden. Wenn eine Tabelle mit aktivierter Typerweiterung in Ihrer Pipeline von Lesern in Databricks Runtime 14.3 und unten gelesen werden soll, müssen Sie eine der folgenden Aktionen ausführen:

  • Deaktivieren Sie die Typverbreiterung, indem Sie die Eigenschaft delta.enableTypeWidening/pipelines.enableTypeWidening entfernen oder sie auf „false“ setzen und eine vollständige Aktualisierung der Tabelle auslösen.
  • Aktivieren Sie den Kompatibilitätsmodus in Ihrer Tabelle.

OpenSharing

Hinweis

Unterstützung für die Type Widening in OpenSharing ist in Databricks Runtime 16.1 und höher verfügbar.

Das Freigeben einer Delta Lake-Tabelle mit aktivierter Typverbreiteung wird in Databricks-to-Databricks OpenSharing unterstützt. Der Anbieter und Empfänger müssen sich auf Databricks Runtime 16.1 oder höher befinden.

Um den Änderungsdatenfeed aus einer Delta Lake-Tabelle mit aktivierter Typweiterung mit OpenSharing zu lesen, müssen Sie das Antwortformat auf delta festlegen:

spark.read
  .format("deltaSharing")
  .option("responseFormat", "delta")
  .option("readChangeFeed", "true")
  .option("startingVersion", "<start version>")
  .option("endingVersion", "<end version>")
  .load("<table>")

Das Lesen von Änderungsdatenfeeds über Typänderungen hinweg wird nicht unterstützt. Sie müssen den Vorgang stattdessen in zwei separate Lesevorgänge aufteilen, eine endend auf der Tabellenversion, die die Typänderung enthält, und die andere beginnend mit der Version, die die Typänderung enthält.

Limitations

Apache Iceberg Kompatibilität

Apache Iceberg unterstützt nicht alle Typänderungen, die durch die Typweiterung abgedeckt werden. Siehe Iceberg Schema Evolution.

Nicht unterstützte Typänderungen umfassen Folgendes:

  • byte, short, int, long, bis decimal oder double
  • Erhöhung der Dezimalskala
  • date bis timestampNTZ

Wenn Sie UniForm mit Iceberg-Kompatibilität auf einer Delta Lake-Tabelle aktivieren, führt das Anwenden einer der vorherigen Typänderungen zu einem Fehler. Siehe Delta Lake-Tabellen mit Iceberg-Clients mithilfe von UniForm lesen.

Wenn Sie eine dieser nicht unterstützten Typänderungen auf eine Delta Lake-Tabelle anwenden, haben Sie zwei Optionen:

  • Iceberg-Metadaten neu generieren: Verwenden Sie den folgenden Befehl, um Iceberg-Metadaten ohne das Feature zur Typverbreiterung neu zu generieren.

    ALTER TABLE <table-name> SET TBLPROPERTIES ('delta.universalFormat.config.icebergCompatVersion' = '<version>')
    

    Auf diese Weise können Sie die einheitliche Kompatibilität beibehalten, nachdem Sie inkompatible Typänderungen angewendet haben.

  • Verwerfen Sie die Funktion der Typverbreiterungstabelle: Siehe Deaktivieren Sie die Funktion der Typverbreiterungstabelle.

Typabhängige Funktionen

Einige SQL-Funktionen geben Ergebnisse zurück, die vom Eingabedatentyp abhängen. Beispielsweise gibt die Funktion unterschiedliche Hashwerte für denselben Wahrheitswert zurück, hash wenn der Argumenttyp anders ist: hash(1::INT) gibt ein anderes Ergebnis als hash(1::BIGINT).

Andere typabhängige Funktionen sind: xxhash64, bit_get, bit_reverse, . typeof

Für stabile Ergebnisse in Abfragen, die diese Funktionen verwenden, müssen Sie Werte explizit in den gewünschten Typ umwandeln:

Python

spark.read.table("table_name") \
  .selectExpr("hash(CAST(column_name AS BIGINT))")

Scala

spark.read.table("main.johan_lasperas.dlt_type_widening_bronze2")
  .selectExpr("hash(CAST(a AS BIGINT))")

SQL

-- Use explicit casting for stable hash values
SELECT hash(CAST(column_name AS BIGINT)) FROM table_name

Nicht unterstützte Features

  • Sie können einen Schemaverfolgungsort nicht mithilfe von SQL festlegen, wenn Sie aus einer Delta Lake-Tabelle mit einer Typänderung streamen.
  • Sie können keine Tabelle mit aktivierter Typerweiterung über OpenSharing für Nicht-Databricks-Verbraucher freigeben.