Nota
O acesso a esta página requer autorização. Pode tentar iniciar sessão ou alterar os diretórios.
O acesso a esta página requer autorização. Pode tentar alterar os diretórios.
O feed de dados de alteração (CDF) acompanha alterações ao nível das linhas entre versões de uma tabela Delta Lake ou da tabela Apache Iceberg v3.
O Azure Databricks suporta duas abordagens:
- Feed automático de dados de alterações: Calcula as alterações durante a leitura de tabelas utilizando metadados de linhagem de linhas. Isto não requer configuração individual de tabelas e funciona em tabelas Delta Lake e Apache Iceberg v3. Consulte alimentação de dados de alterações automática.
- Fluxo de dados de alterações legado: Materializa alterações durante operações de escrita na tabela. Suporta apenas tabelas Delta Lake. Requer configuração individual de mesa. Consulte o feed de dados de alterações legadas para Delta Lake.
Pode utilizar o fluxo de dados de alterações para casos de utilização de dados comuns, incluindo:
- Pipelines ETL incrementais que processam apenas as linhas que mudaram desde a última execução do pipeline.
- Registos de auditoria que acompanham modificações de dados para requisitos de conformidade e governação.
- Cargas de trabalho de replicação de dados que sincronizam alterações em tabelas a jusante, caches ou sistemas externos.
Fluxo automático de alterações de dados
Importante
Este recurso está no Public Preview. Os administradores do espaço de trabalho podem controlar o acesso a esse recurso na página Visualizações . Ver Gerir as pré-visualizações de Azure Databricks.
O fluxo automático de dados de alteração calcula alterações ao nível da linha no momento da consulta, em vez de no momento da escrita, recorrendo ao rastreamento de linhas para o Delta Lake e à linhagem de linhas para o Apache Iceberg v3. Ao contrário do feed de dados de alterações legado, o feed automático de dados de alteração não requer configuração individual de tabelas e funciona em tabelas Delta Lake e tabelas Apache Iceberg v3.
Como as alterações não são calculadas em todas as operações de escrita MERGE INTO e UPDATE, o fluxo automático de dados de alterações melhora o desempenho da escrita e reduz os custos de armazenamento, em comparação com o fluxo de dados de alterações legado.
O fluxo automático de dados de alterações utiliza as mesmas APIs table_changes() e readChangeFeed que o antigo fluxo de dados de alterações e funciona com consultas em lote, Structured Streaming e Databricks-to-Databricks Delta Lake Sharing. Consulte Ler alterações em consultas por lote e Processar dados de alteração de forma incremental.
Requisitos
- Databricks Runtime 18 ou superior
- Um formato de tabela suportado que está registado no Unity Catalog:
- Uma tabela gerida em formato Delta Lake com rastreamento de linhas ativado ou em formato Iceberg v3.
- Uma tabela externa em formato Delta Lake com rastreamento de linhas ativado.
Ver tipos de tabelas do Catálogo Unity do Databricks.
Note
O feed de dados de alteração não faz parte da especificação do Apache Iceberg. Os leitores do Azure Databricks podem consultar o feed automático de dados de alterações para tabelas Apache Iceberg v3, mas leitores externos do Iceberg não podem. Consulte as especificações da tabela Iceberg.
Para o Delta Lake, apenas os leitores do Azure Databricks podem consultar o feed automático de dados de alteração.
Utilizar o fluxo de dados de alterações
Para usar o feed de dados de alteração, verifique se está a usar uma tabela que cumpra os requisitos. Consulte Requisitos.
Para ler em lote o feed de dados de alterações, faça o seguinte:
Python
spark.read \
.option("readChangeFeed", "true") \
.option("startingVersion", 0) \
.table("<table_name>")
Scala
spark.read
.option("readChangeFeed", "true")
.option("startingVersion", 0)
.table("<table_name>")
SQL
SELECT * FROM table_changes('<table_name>', 0)
Para mais informações sobre leituras em lote para o feed de dados de alterações, consulte Ler alterações em consultas por lotes.
Para ler em fluxo o feed de dados de alterações, faça o seguinte:
Python
(spark.readStream
.option("readChangeFeed", "true")
.table("<table_name>")
)
Scala
spark.readStream
.option("readChangeFeed", "true")
.table("<table_name>")
Para mais informações sobre leituras em fluxo para o feed de dados de alterações, consulte Processar dados de alterações de forma incremental.
Migrar do feed legado de dados de alterações
Para migrar uma tabela Delta Lake de um feed de dados de alterações legado para um feed automático de dados de alteração, faça o seguinte:
- Verifique se a sua tabela cumpre os requisitos.
- Desligue o fluxo de dados de alterações legadas executando o seguinte comando:
ALTER TABLE <table_name> UNSET TBLPROPERTIES ('delta.enableChangeDataFeed');
Não é possível usar em conjunto os feeds de dados de alteração herdados e automáticos.
Esquema do feed de dados de alteração
Quando lês o feed de dados de alteração de uma tabela, a consulta usa o esquema da versão mais recente da tabela. O Azure Databricks suporta a maioria das operações de alteração de esquema e evolução, mas as tabelas com mapeamento de colunas têm limitações. Ver Tabelas com mapeamento de colunas.
Para além das colunas de dados do esquema da tabela Delta Lake, o feed de dados de alteração contém colunas de metadados que identificam o tipo de evento de alteração:
| Nome da coluna | Tipo | Values |
|---|---|---|
_change_type |
Cordão | Contém: insert, update_preimage, update_postimage, delete.preimage é o valor antes da atualização, postimage é o valor após a atualização. |
_commit_version |
Long | Contém: o registo Delta ou a versão da tabela que contém a alteração. |
_commit_timestamp |
Carimbo de data/hora | Contém: a marca temporal associada à criação do commit. |
Se o esquema contiver colunas com os mesmos nomes dessas colunas de metadados, não pode usar o feed de alteração de dados numa tabela. Antes de ativares o feed de alterações de dados, renomeia as colunas da tua tabela para resolver este conflito.
Processar incrementalmente os dados de alterações
O Databricks recomenda que utilize o feed de dados de alterações em combinação com o Structured Streaming para processar alterações incrementais a partir de tabelas. Você deve usar o Streaming Estruturado para Azure Databricks para controlar automaticamente as versões do feed de dados de alteração da tabela. Para processamento CDC com tabelas SCD tipo 1 ou tipo 2, veja As APIs AUTO CDC: Simplificar a captura de dados de alterações com pipelines.
Quando o fluxo é iniciado, o feed de dados de alteração devolve o instantâneo mais recente da tabela sob a forma de registos INSERT e, em seguida, devolve as alterações futuras como dados de alteração. Os fluxos de dados de alterações gravam tanto os dados alterados como as novas linhas de dados no registo de transações da tabela, ao mesmo tempo.
Para configurar um fluxo para ler o feed de dados de alterações de uma tabela, defina a opção readChangeFeed como true, como se segue:
Python
(spark.readStream
.option("readChangeFeed", "true")
.table("myTable")
)
Scala
spark.readStream
.option("readChangeFeed", "true")
.table("myTable")
Limites tarifários
O Azure Databricks suporta limites de taxa (maxFilesPerTrigger, maxBytesPerTrigger) e excludeRegex ao ler dados de alteração. Para uma lista completa das opções de streaming em Delta Lake, veja Delta Lake.
Opcionalmente, pode especificar uma versão inicial, veja Especificar uma versão inicial. Para versões que não sejam o snapshot inicial, os limites de taxa aplicam-se atomicamente a commits inteiros. Ou o lote atual inclui o commit completo, ou o lote atual adia o commit para o lote seguinte.
Histórico da tabela de repetições
Um feed de dados de alterações não se destina a servir como um registo permanente de todas as alterações a uma tabela. Só regista as alterações que ocorrem depois de o feed de dados das alterações ter sido ativado. Podes iniciar uma nova leitura em streaming para capturar a versão atual e todas as alterações subsequentes.
Os registos no feed de dados de alteração são transitórios e apenas acessíveis durante uma janela de retenção especificada. Os registos de transações removem as versões de tabelas e as respetivas versões do feed de dados de alterações a intervalos regulares. Quando uma versão é removida, já não pode ler o feed de dados de alteração dessa versão.
Arquivar dados de alteração para histórico permanente
Se o seu caso de uso exigir que mantenha um histórico permanente de todas as alterações a uma tabela, use lógica incremental para escrever registos do feed de dados das alterações para uma nova tabela.
O exemplo seguinte demonstra a utilização trigger.AvailableNow para processar dados disponíveis como carga de trabalho em lote para auditoria ou repetições completas de alterações:
Python
(spark.readStream
.option("readChangeFeed", "true")
.table("source_table")
.writeStream
.option("checkpointLocation", <checkpoint-path>)
.trigger(availableNow=True)
.toTable("target_table")
)
Scala
spark.readStream
.option("readChangeFeed", "true")
.table("source_table")
.writeStream
.option("checkpointLocation", <checkpoint-path>)
.trigger(Trigger.AvailableNow)
.toTable("target_table")
Especifique uma versão inicial
Para ler alterações a partir de um ponto específico, especifique uma versão inicial usando um carimbo temporal ou número de versão. As versões iniciais são necessárias para leituras em lote. Opcionalmente, podes especificar uma versão final para limitar o alcance. Para saber mais sobre o histórico da tabela, consulte O que é a viagem no tempo?.
Quando configura cargas de trabalho de Structured Streaming que utilizam o feed de dados de alteração, especificar uma versão inicial pode afetar o desempenho do processamento:
- Novos pipelines de processamento de dados normalmente beneficiam do comportamento padrão, que regista todos os registos existentes na tabela como
INSERToperações quando o fluxo começa pela primeira vez. - Se a tabela de destino já contiver todos os registros com alterações apropriadas até um determinado ponto, especifique uma versão inicial para evitar o processamento do estado da tabela de origem como
INSERTeventos.
O exemplo seguinte mostra como recuperar de uma falha de streaming com um checkpoint corrompido. Neste exemplo, suponha as seguintes condições:
- O feed de alteração de dados foi ativado na tabela de origem quando da sua criação.
- A tabela de destino a jusante processou todas as alterações até à versão 75, inclusive.
- O histórico de versões da tabela de origem está disponível para as versões 70 e superiores.
Ao definir o fluxo de escrita para a tabela de destino existente, tem de especificar uma nova localização de ponto de verificação:
Python
(spark.readStream
.option("readChangeFeed", "true")
.option("startingVersion", 76)
.table("source_table")
.writeStream
.option("checkpointLocation", "<new-checkpoint-path>")
.toTable("target_table")
)
Scala
spark.readStream
.option("readChangeFeed", "true")
.option("startingVersion", 76)
.table("source_table")
.writeStream
.option("checkpointLocation", "<new-checkpoint-path>")
.toTable("target_table")
Importante
Se especificar uma versão inicial e essa versão não estiver disponível no histórico da tabela, o fluxo falha em começar a partir de um novo ponto de controlo. Como as tabelas geridas limpam automaticamente as versões históricas, todas as versões iniciais especificadas acabam por ser eliminadas.
Ver histórico da tabela de repetição.
Ler alterações em consultas em lote
Pode usar a sintaxe de consultas em lote para ler todas as alterações a partir de uma determinada versão ou para ler alterações dentro de um intervalo especificado de versões, da seguinte forma:
- Especifique versões como inteiros e carimbos temporais como strings no formato
yyyy-MM-dd[ HH:mm:ss[.SSS]]. - As versões inicial e final são inclusivas. Para ler desde uma versão inicial até à mais recente, especifique apenas a versão inicial.
- Se especificar uma versão antes de o feed de alterações de dados estar ativado, gera um erro.
Para usar leituras em lote com opções de versões iniciais e terminais, faça o seguinte:
SQL
Para ler desde a versão 0 até 10, faça o seguinte:
SELECT * FROM table_changes('tableName', 0, 10)
Para ler entre duas versões com marca temporal, faça o seguinte:
--
SELECT * FROM table_changes('tableName', '2021-04-21 05:45:46', '2021-05-21 12:00:00')
Para ler desde a versão inicial até à mais recente, faça o seguinte:
SELECT * FROM table_changes('tableName', 0)
Para consultar as alterações de uma tabela com caracteres especiais no nome, faça o seguinte:
SELECT * FROM table_changes('`schema`.`dotted.tableName`', '2021-04-21 06:45:46', '2021-05-21 12:00:00')
Consulte table_changes função de valor de tabela.
Python
Para ler desde a versão 0 até 10, faça o seguinte:
spark.read \
.option("readChangeFeed", "true") \
.option("startingVersion", 0) \
.option("endingVersion", 10) \
.table("myDeltaTable")
Para ler entre dois carimbos temporais, faça o seguinte:
spark.read \
.option("readChangeFeed", "true") \
.option("startingTimestamp", '2021-04-21 05:45:46') \
.option("endingTimestamp", '2021-05-21 12:00:00') \
.table("myDeltaTable")
Para ler desde a versão inicial até à mais recente, faça o seguinte:
spark.read \
.option("readChangeFeed", "true") \
.option("startingVersion", 0) \
.table("myDeltaTable")
Scala
Para ler desde a versão 0 até 10, faça o seguinte:
spark.read
.option("readChangeFeed", "true")
.option("startingVersion", 0)
.option("endingVersion", 10)
.table("myDeltaTable")
Para ler entre dois carimbos temporais, faça o seguinte:
spark.read
.option("readChangeFeed", "true")
.option("startingTimestamp", "2021-04-21 05:45:46")
.option("endingTimestamp", "2021-05-21 12:00:00")
.table("myDeltaTable")
Para ler desde a versão inicial até à mais recente, faça o seguinte:
spark.read
.option("readChangeFeed", "true")
.option("startingVersion", 0)
.table("myDeltaTable")
Lidar com versões fora dos limites
Por predefinição, se especificar uma versão ou marca temporal que exceda o último commit, a consulta devolve o erro timestampGreaterThanLatestCommit.
No Databricks Runtime 11.3 LTS e versões superiores, pode ativar a tolerância a versões fora do intervalo da seguinte forma:
SET spark.databricks.delta.changeDataFeed.timestampOutOfRange.enabled = true;
Quando esta configuração está ativada, a consulta retorna resultados diferentes da seguinte forma:
- Uma versão inicial ou carimbo temporal posterior ao último commit devolve um resultado vazio.
- Uma versão final ou marca temporal posterior ao último commit retorna todas as alterações desde o início até ao último commit.
Feed de dados de alterações legadas para Delta Lake
O fluxo de dados de alterações legado requer configuração manual para tabelas individuais do Delta Lake. Como o feed de dados de alteração não está incluído na especificação do Apache Iceberg, as tabelas do Apache Iceberg não são suportadas. A Databricks recomenda que migre para a alimentação automática de dados de alterações. Consulte Migrar do feed legado de dados de alterações.
Quando o feed de dados de alterações antigo está ativado, o runtime regista eventos de alteração para todos os dados gravados na tabela. Isto inclui os dados da linha juntamente com metadados que indicam se a linha especificada foi inserida, eliminada ou atualizada.
O feed legado de dados de alterações utiliza as mesmas readChangeFeed APIs de leitura table_changes() que o feed automático de dados de alterações. Veja Processar dados de alteração de forma incremental e Ler alterações em consultas por lote.
Ativar o feed de dados de alterações antigo
Deve ativar explicitamente o feed de dados de alterações legados em tabelas individuais. Utilize um dos métodos seguintes:
Nova tabela
Defina a propriedade de tabela delta.enableChangeDataFeed = true no comando CREATE TABLE.
CREATE TABLE student (id INT, name STRING, age INT)
TBLPROPERTIES (delta.enableChangeDataFeed = true)
Note
Se desligar o feed de dados de alteração legado durante qualquer intervalo de tempo e depois o ativar novamente, o intervalo não será consultável. Use o fluxo automático de dados de alterações para consultar alterações durante o intervalo. Consulte alimentação de dados de alterações automática.
Tabela existente
Defina a propriedade de tabela delta.enableChangeDataFeed = true no comando ALTER TABLE.
ALTER TABLE myDeltaTable
SET TBLPROPERTIES (delta.enableChangeDataFeed = true)
Considerações de armazenamento
As tabelas geridas registam alterações de dados de forma eficiente e podem usar outras funcionalidades para otimizar o layout do armazenamento.
Com o feed de dados de alteração antigo, tem de ter em conta o seguinte comportamento de armazenamento:
- Pode notar um pequeno aumento nos custos de armazenamento porque as alterações podem ser registadas em ficheiros separados.
- Algumas operações, como operações apenas de inserção ou eliminações de partições completas, não geram ficheiros de dados de alterações. O Azure Databricks calcula o feed de dados de alteração diretamente a partir do registo de transações.
- Os ficheiros de dados de alterações seguem a política de retenção da tabela. O comando
VACUUMelimina ficheiros de dados de alterações, e as alterações provenientes do registo de transações utilizam a política de retenção de pontos de verificação.
O Databricks recomenda que não tente reconstruir o feed de dados de alteração consultando diretamente ficheiros de dados de alteração. Utilize sempre as APIs do Delta Lake e do Apache Iceberg.
Limitações
Considere as seguintes limitações para os fluxos de dados de alteração:
Tabelas com mapeamento de colunas
Com o mapeamento de colunas ativado numa tabela Delta Lake, pode eliminar ou renomear colunas sem ter de reescrever ficheiros de dados. ** Consulte Renomear e remover colunas utilizando o mapeamento de colunas no Delta Lake.
No entanto, os fluxos de dados de alterações têm limitações após alterações de esquema não aditivas. As alterações de esquemas não aditivas incluem as seguintes operações:
- Renomear ou eliminar colunas.
- Alterar os tipos de dados da coluna.
- Alterar a capacidade de a coluna aceitar valores NULL, por exemplo, com
ALTER COLUMN ... SET NOT NULL. Veja Definir umaNOT NULLrestrição no Azure Databricks.
Não pode ler feeds de dados de alterações de uma transação ou de um intervalo em que ocorra uma alteração de esquema não aditiva.
Para permitir alterações de esquema não aditivas antes ou depois do intervalo especificado de leituras em lote, as consultas utilizam o esquema da versão final do intervalo em vez da última versão da tabela. As consultas continuam a falhar se o intervalo de versões abranger uma alteração de esquema não aditiva.
Fluxo automático de dados de alterações
- Como o feed de dados de alteração não é suportado na especificação do Apache Iceberg, os clientes externos do Iceberg não podem consultar o feed automático de dados de alteração. Consulte as especificações da tabela Iceberg.
- Para transações com várias instruções, se a tabela de origem tiver sido modificada durante a transação, a alimentação automática de dados de alterações não é suportada.
- O fluxo automático de dados de alteração não é suportado em tabelas com filtros de linha ou máscaras de coluna. Consulte os filtros de linha e as máscaras de coluna.
- As consultas ao feed de dados de alteração não podem abranger versões de tabela onde ocorreu uma alteração de esquema não aditiva, como uma renomeação de coluna, eliminação ou alteração de tipo de dado. Divida a consulta em intervalos antes e depois da alteração do esquema.