Ampliação de tipo

Disponível em tabelas Delta Lake no Databricks Runtime 15.4 LTS e versões posteriores, a expansão de tipo permite alterar os tipos de dados das colunas para tipos mais amplos sem reescrever os arquivos de dados.

Todas as tabelas gerenciadas do Catálogo do Unity usam o Delta Lake por padrão. Consulte as tabelas gerenciadas do Unity Catalog para Delta Lake e Apache Iceberg.

Observação

Habilitar a ampliação de tipo atualiza os protocolos de leitor e gravador. Isso pode afetar a compatibilidade com clientes externos do Delta Lake. Consulte a compatibilidade de recursos e protocolos do Delta Lake.

Tabelas com ampliação de tipo habilitada só podem ser lidas pelo Databricks Runtime 15.4 LTS e superior.

Alterações de tipo com suporte

Você pode ampliar os tipos de acordo com as seguintes regras:

Tipo de origem Tipos mais amplos com suporte
BYTE SHORT, INT, BIGINT, , DECIMALDOUBLE
SHORT INT, BIGINT, , DECIMALDOUBLE
INT BIGINT, DECIMAL, DOUBLE
BIGINT DECIMAL
FLOAT DOUBLE
DECIMAL DECIMAL com maior precisão e escala
DATE TIMESTAMP_NTZ
VOID Qualquer tipo

Há suporte para as alterações de tipo nas colunas de nível superior e campos aninhados no structs, mapas e matrizes.

Observação

VOID para qualquer tipo não requer que a ampliação de tipo seja habilitada na tabela. Qualquer operação que atualize o tipo de uma VOID coluna é bem-sucedida sem configuração adicional. VOID A ampliação de tipo está disponível no Databricks Runtime 18.2 e superior.

Comportamento decimal

O Spark trunca a parte fracionária de um valor por padrão quando uma operação converte um tipo inteiro para um decimal ou double e uma ingestão downstream grava o valor de volta em uma coluna de números inteiros. Para obter detalhes sobre o comportamento da política de atribuição, consulte Store assignment.

Ao alterar qualquer tipo numérico para decimal, a precisão total precisa ser igual ou maior que a precisão inicial. Se você também aumentar a escala, a precisão total deverá aumentar na mesma proporção.

O objetivo mínimo para tipos byte, shorte int é decimal(10,0). O destino mínimo para long é decimal(20,0).

Se você quiser adicionar duas casas decimais a um campo com decimal(10,1), o alvo mínimo é decimal(12,3).

Habilitar ampliação de tipo

Observação

Habilitar a ampliação de tipo atualiza os protocolos de leitor e gravador. Isso pode afetar a compatibilidade com clientes externos do Delta Lake. Consulte a compatibilidade de recursos e protocolos do Delta Lake.

Você pode habilitar a ampliação de tipo em uma tabela existente definindo a propriedade da tabela delta.enableTypeWidening como true:

  ALTER TABLE <table_name> SET TBLPROPERTIES ('delta.enableTypeWidening' = 'true')

Você também pode habilitar a ampliação de tipo durante a criação da tabela:

  CREATE TABLE T(c1 INT) TBLPROPERTIES('delta.enableTypeWidening' = 'true')

Aplicar manualmente uma alteração de tipo

Use o comando ALTER COLUMN para alterar manualmente os tipos:

ALTER TABLE <table_name> ALTER COLUMN <col_name> TYPE <new_type>

Esta operação atualiza o esquema de tabela sem reescrever os arquivos de dados subjacentes. Confira ALTER TABLE para saber mais.

Ampliar tipos com evolução automática de esquema

Use a evolução do esquema com a ampliação de tipos para atualizar tipos de dados em tabelas de destino para corresponder ao tipo de dados de entrada.

Observação

Sem a ampliação de tipo habilitada, a evolução do esquema sempre tenta reduzir os dados para corresponder aos tipos de coluna na tabela de destino. Se você não quiser ampliar automaticamente os tipos de dados em suas tabelas de destino, desative a ampliação de tipos antes de executar cargas de trabalho com a evolução do esquema habilitada.

Para usar a evolução do esquema de dados para expandir o tipo de dados de uma coluna durante a ingestão, você deve atender às seguintes condições:

  • O comando de gravação é executado com a evolução automática do esquema habilitada.
  • A tabela de destino tem a ampliação de tipo habilitada.
  • O tipo de coluna de origem é maior que o tipo de coluna de destino.
  • A ampliação de tipo dá suporte à alteração de tipo.

Incompatibilidades de tipo que não atendem a todas essas condições seguem regras normais de aplicação de esquema. Consulte Imposição do esquema.

Example

Os exemplos a seguir demonstram como a ampliação de tipo funciona com a evolução do esquema.

Python

Crie uma tabela de destino com uma INT coluna e uma tabela de origem com uma BIGINT coluna:

spark.sql("CREATE TABLE target_table (id INT, data STRING) TBLPROPERTIES ('delta.enableTypeWidening' = 'true')")
spark.sql("CREATE TABLE source_table (id BIGINT, data STRING)")

Use saveAsTable() com a evolução do esquema para ampliar automaticamente a coluna INT para BIGINT durante uma operação de acréscimo:

spark.table("source_table").write.mode("append").option("mergeSchema", "true").saveAsTable("target_table")

Use MERGE INTO com a evolução do esquema:

from delta.tables import DeltaTable

source_df = spark.table("source_table")
target_table = DeltaTable.forName(spark, "target_table")

(target_table.alias("target")
  .merge(source_df.alias("source"), "target.id = source.id")
  .withSchemaEvolution()
  .whenMatchedUpdateAll()
  .whenNotMatchedInsertAll()
  .execute()
)

Scala

Crie uma tabela de destino com uma INT coluna e uma tabela de origem com uma BIGINT coluna:

spark.sql("CREATE TABLE target_table (id INT, data STRING) TBLPROPERTIES ('delta.enableTypeWidening' = 'true')")
spark.sql("CREATE TABLE source_table (id BIGINT, data STRING)")

Use saveAsTable() com a evolução do esquema para ampliar automaticamente a coluna INT para BIGINT durante uma operação de acréscimo:

spark.table("source_table").write.mode("append").option("mergeSchema", "true").saveAsTable("target_table")

Use MERGE INTO com a evolução do esquema:

import io.delta.tables.DeltaTable

val sourceDf = spark.table("source_table")
val targetTable = DeltaTable.forName(spark, "target_table")

targetTable.alias("target")
  .merge(sourceDf.alias("source"), "target.id = source.id")
  .withSchemaEvolution()
  .whenMatched().updateAll()
  .whenNotMatched().insertAll()
  .execute()

SQL

Crie uma tabela de destino com uma INT coluna e uma tabela de origem com uma BIGINT coluna:

CREATE TABLE target_table (id INT, data STRING) TBLPROPERTIES ('delta.enableTypeWidening' = 'true');
CREATE TABLE source_table (id BIGINT, data STRING);

Use INSERT INTO com a evolução de esquema para ampliar automaticamente a coluna INT para BIGINT durante uma operação de acréscimo:

INSERT WITH SCHEMA EVOLUTION INTO target_table SELECT * FROM source_table;

Use MERGE INTO com a evolução do esquema:

MERGE WITH SCHEMA EVOLUTION INTO target_table
USING source_table
ON target_table.id = source_table.id
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *;

Carregador Automático

Importante

O suporte à ampliação de tipos no Carregador Automático está em Versão Prévia Pública.

O Carregador Automático dá suporte à ampliação de tipo com evolução automática do esquema. Quando você usa o Auto Loader para ingerir dados em uma tabela do Delta Lake com expansão de tipos e evolução de esquema habilitadas, os tipos de coluna são automaticamente expandidos para corresponderem aos dados recebidos.

(spark.readStream
  .format("cloudFiles")
  .option("cloudFiles.format", "json")
  .option("cloudFiles.schemaLocation", "<path-to-schema-location>")
  .load("<path-to-source-data>")
  .writeStream
  .option("mergeSchema", "true")
  .option("checkpointLocation", "<path-to-checkpoint>")
  .trigger(availableNow=True)
  .toTable("table_name")
)

Veja a ampliação automática de tipo com o Carregador Automático. Além disso, a tabela de destino deve ter a ampliação de tipo habilitada. Consulte Habilitar ampliação de tipo.

Desabilitar o recurso de tabela de ampliação de tipo

Você pode impedir a ampliação acidental de tipo em tabelas habilitadas definindo a propriedade como false:

  ALTER TABLE <table_name> SET TBLPROPERTIES ('delta.enableTypeWidening' = 'false')

Essa configuração impede alterações futuras de tipo na tabela, mas não remove o recurso de tabela de ampliação de tipo ou desfaz alterações de tipo anteriores.

Se você precisar remover completamente os recursos de tabela de ampliação de tipo, poderá usar o comando DROP FEATURE, conforme mostrado no exemplo a seguir:

 ALTER TABLE <table-name> DROP FEATURE 'typeWidening' [TRUNCATE HISTORY]

Observação

As tabelas que habilitaram a expansão de tipo usando o Databricks Runtime 15.4 LTS exigem que você remova o recurso typeWidening-preview em vez disso.

Ao remover a ampliação de tipo, o Databricks reescreve todos os arquivos de dados que não estão em conformidade com o esquema de tabela atual. Confira Remover um recurso de tabela do Delta Lake e fazer downgrade do protocolo de tabela.

Transmissão por streaming de uma tabela do Delta Lake

O suporte para ampliação de tipos no Streaming Estruturado está disponível no Databricks Runtime 16.4 LTS e superior.

Ao fazer streaming de uma tabela Delta Lake com a expansão de tipo habilitada, você pode configurar a expansão automática de tipos para consultas de streaming habilitando a evolução do esquema com a opção mergeSchema na tabela de destino. A tabela de destino deve ter a ampliação de tipo habilitada. Consulte Habilitar ampliação de tipo.

Python

(spark.readStream
  .table("delta_source_table")
  .writeStream
  .option("checkpointLocation", "/path/to/checkpointLocation")
  .option("mergeSchema", "true")
  .toTable("output_table")
)

Scala

spark.readStream
  .table("delta_source_table")
  .writeStream
  .option("checkpointLocation", "/path/to/checkpointLocation")
  .option("mergeSchema", "true")
  .toTable("output_table")

Quando mergeSchema estiver habilitada e a tabela de destino tiver a ampliação de tipo habilitada:

  • As alterações de tipo são aplicadas automaticamente à tabela downstream sem a necessidade de intervenção manual.
  • Novas colunas são adicionadas automaticamente ao esquema de tabela downstream.

Sem mergeSchema habilitado, os valores são tratados de acordo com a spark.sql.storeAssignmentPolicy configuração, que, por padrão, reduz valores para corresponder ao tipo de coluna de destino. Para obter mais informações sobre o comportamento da política de atribuição, consulte Store Assignment.

Manipular alterações de tipo em um fluxo

Ao fazer streaming de uma tabela Delta Lake, você pode fornecer um local para rastreamento de esquema para rastrear alterações de esquema não aditivas, incluindo alterações de tipo. Fornecer um local de acompanhamento de esquema é necessário no Databricks Runtime 18.0 e abaixo e é opcional no Databricks Runtime 18.1 e superior.

Você não pode definir um schemaTrackingLocation usando SQL. Confira Recursos sem suporte.

schemaTrackingLocation deve ser definido como um local dentro do mesmo caminho que o ponto de verificação de streaming. Por exemplo:

Python

checkpoint_path = "/path/to/checkpointLocation"

(spark.readStream
  .option("schemaTrackingLocation", checkpoint_path)
  .table("delta_source_table")
  .writeStream
  .option("checkpointLocation", checkpoint_path)
  .toTable("output_table")
)

Scala

val checkpointPath = "/path/to/checkpointLocation"

spark.readStream
  .option("schemaTrackingLocation", checkpointPath)
  .table("delta_source_table")
  .writeStream
  .option("checkpointLocation", checkpointPath)
  .toTable("output_table")

Depois de definir um local de acompanhamento de esquema, o fluxo evolui seu esquema rastreado quando detecta uma alteração de tipo e, em seguida, para. Nesse momento, você deve lidar com a alteração de tipo, como habilitar a ampliação de tipo na tabela downstream ou atualizar a consulta de streaming.

Para retomar o processamento, defina a configuração spark.databricks.delta.streaming.allowSourceColumnTypeChange do Spark ou a opção DataFrameallowSourceColumnTypeChangede leitor, como no exemplo a seguir:

Python

checkpoint_path = "/path/to/checkpointLocation"

(spark.readStream
  .option("schemaTrackingLocation", checkpoint_path)
  .option("allowSourceColumnTypeChange", "<delta_source_table_version>")
  # alternatively to allow all future type changes for this stream:
  # .option("allowSourceColumnTypeChange", "always")
  .table("delta_source_table")
  .writeStream
  .option("checkpointLocation", checkpoint_path)
  .toTable("output_table")
)

Scala

val checkpointPath = "/path/to/checkpointLocation"

spark.readStream
  .option("schemaTrackingLocation", checkpointPath)
  .option("allowSourceColumnTypeChange", "<delta_source_table_version>")
  // alternatively to allow all future type changes for this stream:
  // .option("allowSourceColumnTypeChange", "always")
  .table("delta_source_table")
  .writeStream
  .option("checkpointLocation", checkpointPath)
  .toTable("output_table")

SQL

  -- To unblock for this particular stream just for this series of schema change(s):
  SET spark.databricks.delta.streaming.allowSourceColumnTypeChange.ckpt_<checkpoint_id> = "<delta_source_table_version>"
  -- To unblock for this particular stream:
  SET spark.databricks.delta.streaming.allowSourceColumnTypeChange = "<delta_source_table_version>"
  -- To unblock for all streams:
  SET spark.databricks.delta.streaming.allowSourceColumnTypeChange = "always"

Quando o fluxo é interrompido, uma mensagem de erro exibe a ID <checkpoint_id> do ponto de verificação e a versão <delta_source_table_version>da tabela de origem do Delta Lake.

Para obter uma lista completa das opções de streaming do Delta Lake, consulte Delta Lake.

Pipelines Declarativos do Lakeflow Spark

Você pode habilitar a ampliação de tipo para Pipelines Declarativos do Lakeflow Spark no nível do pipeline ou para tabelas individuais. A ampliação de tipo permite que os tipos de coluna sejam ampliados automaticamente durante a execução do pipeline sem a necessidade de uma atualização completa das tabelas de streaming. Alterações de tipo em exibições materializadas sempre disparam uma recompute completa e, quando uma alteração de tipo é aplicada a uma tabela de origem, exibições materializadas que dependem dessa tabela exigem uma recompute completa para refletir os novos tipos.

Habilitar a ampliação de tipo para um pipeline inteiro

Para habilitar a ampliação de tipo para todas as tabelas em um pipeline, defina a configuração do pipeline pipelines.enableTypeWidening:

JSON

{
  "configuration": {
    "pipelines.enableTypeWidening": "true"
  }
}

YAML

configuration:
  pipelines.enableTypeWidening: 'true'

Habilitar a ampliação de tipo para tabelas específicas

Você também pode habilitar a ampliação de tipo para tabelas individuais definindo a propriedade delta.enableTypeWideningda tabela:

Python

import dlt

@dlt.table(
  table_properties={"delta.enableTypeWidening": "true"}
)
def my_table():
  return spark.readStream.table("source_table")

SQL

CREATE OR REFRESH STREAMING TABLE my_table
TBLPROPERTIES ('delta.enableTypeWidening' = 'true')
AS SELECT * FROM source_table

Compatibilidade com leitores downstream

Tabelas com ampliação de tipo habilitada só podem ser lidas no Databricks Runtime 15.4 LTS ou superior. Se você quiser que uma tabela com ampliação de tipo habilitada no pipeline seja legível pelos leitores no Databricks Runtime 14.3 e abaixo, você deverá:

  • Desative a ampliação de tipo removendo a propriedade delta.enableTypeWidening/pipelines.enableTypeWidening ou definindo-a como false e dispare uma atualização completa da tabela.
  • Habilitar o Modo de Compatibilidade em sua tabela.

OpenSharing

Observação

O suporte para ampliação de tipos no OpenSharing está disponível no Databricks Runtime 16.1 e superior.

O compartilhamento de uma tabela Delta Lake com expansão de tipo habilitada é compatível com o Databricks-to-Databricks OpenSharing. O provedor e o destinatário devem estar no Databricks Runtime 16.1 ou superior.

Para ler o feed de dados de alterações de uma tabela do Delta Lake com a expansão de tipos habilitada usando OpenSharing, você deve definir o formato da resposta como delta:

spark.read
  .format("deltaSharing")
  .option("responseFormat", "delta")
  .option("readChangeFeed", "true")
  .option("startingVersion", "<start version>")
  .option("endingVersion", "<end version>")
  .load("<table>")

Não há suporte para leitura do feed de dados de alterações entre alterações de tipo. Em vez disso, você deve dividir a operação em duas leituras separadas, uma terminando na versão da tabela que contém a alteração de tipo e a outra começando na versão que contém a alteração de tipo.

Limitations

Compatibilidade do Apache Iceberg

O Apache Iceberg não dá suporte a todas as alterações de tipo cobertas pela ampliação de tipo. Veja a evolução do esquema de iceberg.

As alterações de tipo sem suporte incluem:

  • byte, short, int, long para decimal ou double
  • aumento de escala decimal
  • date a timestampNTZ

Quando você habilita o UniForm com compatibilidade com o Iceberg em uma tabela Delta Lake, aplicar uma das alterações de tipo anteriores gera um erro. Consulte Ler tabelas do Delta Lake com clientes Iceberg usando o UniForm.

Se você aplicar uma dessas alterações de tipo sem suporte a uma tabela Delta Lake, terá duas opções:

  • Regenerar metadados do Iceberg: use o seguinte comando para regenerar metadados do Iceberg sem o recurso de tabela de ampliação de tipo:

    ALTER TABLE <table-name> SET TBLPROPERTIES ('delta.universalFormat.config.icebergCompatVersion' = '<version>')
    

    Isso permite que você mantenha a compatibilidade uniforme depois de aplicar alterações de tipo incompatíveis.

  • Remover o recurso de tabela de ampliação de tipo: consulte Desabilitar o recurso de tabela de ampliação de tipo.

Funções dependentes de tipo

Algumas funções SQL retornam resultados que dependem do tipo de dados de entrada. Por exemplo, hash a função retorna valores de hash diferentes para o mesmo valor lógico se o tipo de argumento for diferente: hash(1::INT) retorna um resultado diferente de hash(1::BIGINT).

Outras funções dependentes de tipo são: xxhash64, , bit_get, bit_reverse, typeof.

Para obter resultados estáveis em consultas que usam essas funções, você deve converter explicitamente valores no tipo desejado:

Python

spark.read.table("table_name") \
  .selectExpr("hash(CAST(column_name AS BIGINT))")

Scala

spark.read.table("main.johan_lasperas.dlt_type_widening_bronze2")
  .selectExpr("hash(CAST(a AS BIGINT))")

SQL

-- Use explicit casting for stable hash values
SELECT hash(CAST(column_name AS BIGINT)) FROM table_name

Recursos sem suporte

  • Você não pode definir um local para rastreamento do esquema usando SQL ao fazer streaming de uma tabela Delta Lake que tenha uma alteração de tipo.
  • Você não pode compartilhar uma tabela com a ampliação de tipo habilitada para consumidores que não são do Databricks usando o OpenSharing.