Observação
O acesso a essa página exige autorização. Você pode tentar entrar ou alterar diretórios.
O acesso a essa página exige autorização. Você pode tentar alterar os diretórios.
O CDF (feed de alterações de dados) acompanha as alterações no nível de linha entre versões de uma tabela Delta Lake ou de uma tabela Apache Iceberg v3.
O Azure Databricks dá suporte a duas abordagens:
- Fluxo de dados de alterações automático: calcula as alterações durante a leitura da tabela usando metadados de linhagem de linhas. Isso não requer configuração de tabela individual e funciona em tabelas Delta Lake e Apache Iceberg v3. Consulte o feed automático de alterações de dados.
- Feed de dados de alterações legado: materializa alterações durante operações de gravação na tabela. Suporta apenas tabelas Delta Lake. Requer configuração de tabela individual. Consulte o feed de dados de alteração herdado do Delta Lake.
Você pode usar o feed de alterações de dados para casos de uso comuns de dados, incluindo:
- Pipelines de ETL incrementais que processam apenas as linhas que foram alteradas desde a última execução do pipeline.
- Trilhas de auditoria que rastreiam modificações nos dados para atender a requisitos de conformidade e governança.
- Cargas de trabalho de replicação de dados que sincronizam alterações em tabelas downstream, caches ou sistemas externos.
Fluxo automático de dados de alteração
Importante
Esse recurso está em Visualização Pública. Os administradores do workspace podem controlar o acesso a esse recurso na página Visualizações . Consulte Gerenciar visualizações do Azure Databricks.
O feed automático de dados de alterações calcula as alterações em nível de linha no momento da consulta, em vez de no momento da gravação, usando o rastreamento de linhas para Delta Lake e a linhagem de linhas para Apache Iceberg v3. Ao contrário do feed legado de dados de alteração, o feed automático de dados de alteração não exige configuração individual por tabela e funciona com tabelas do Delta Lake e tabelas Apache Iceberg v3.
Como as alterações não são computadas em cada gravação para as operações MERGE INTO e UPDATE, o feed automático de dados de alterações melhora o desempenho das gravações e reduz os custos de armazenamento, em comparação com o feed de dados de alterações legado.
O feed automático de dados de alterações usa as mesmas APIs table_changes() e readChangeFeed que o feed legado de dados de alterações e funciona com consultas em lote, streaming estruturado e Delta Lake Sharing de Databricks para Databricks. Consulte Ler alterações em consultas em lote e Processar dados de alterações incrementalmente.
Requisitos
- Databricks Runtime 18 ou superior
- Um formato de tabela com suporte registrado no Catálogo do Unity:
- Uma tabela gerenciada no formato Delta Lake com controle de linha habilitado ou no formato Iceberg v3.
- Uma tabela externa no formato Delta Lake com o rastreamento de linhas habilitado.
Consulte os tipos de tabela do Unity Catalog do Databricks.
Note
O feed de dados de alteração não faz parte da especificação do Apache Iceberg. Azure Databricks leitores podem consultar o feed de dados de alteração automática para tabelas do Apache Iceberg v3, mas os leitores externos do Iceberg não podem. Veja a especificação da tabela Iceberg.
Para o Delta Lake, somente leitores de Azure Databricks podem consultar o feed de dados de alteração automática.
Usar o feed de dados de alteração
Para usar o feed de dados de alterações, verifique se você está usando uma tabela que atenda aos requisitos. Confira os Requisitos
Para ler em lote o feed de dados de alteração, faça o seguinte:
Python
spark.read \
.option("readChangeFeed", "true") \
.option("startingVersion", 0) \
.table("<table_name>")
Scala
spark.read
.option("readChangeFeed", "true")
.option("startingVersion", 0)
.table("<table_name>")
SQL
SELECT * FROM table_changes('<table_name>', 0)
Para obter mais informações sobre leituras em lote para o feed de dados de alterações, consulte Ler alterações em consultas em lotes.
Para ler o feed de dados de alteração em streaming, faça o seguinte:
Python
(spark.readStream
.option("readChangeFeed", "true")
.table("<table_name>")
)
Scala
spark.readStream
.option("readChangeFeed", "true")
.table("<table_name>")
Para obter mais informações sobre leituras em streaming para o feed de dados de alteração, consulte Processar dados de alteração incrementalmente.
Migrar do feed de dados de alterações legado
Para migrar uma tabela Delta Lake do feed de dados de alteração herdado para o feed de dados de alteração automática, faça o seguinte:
- Verifique se a tabela atende aos requisitos.
- Desative o change data feed antigo com o comando a seguir:
ALTER TABLE <table_name> UNSET TBLPROPERTIES ('delta.enableChangeDataFeed');
Você não pode usar feeds de dados de alteração legados e automáticos ao mesmo tempo.
Alterar esquema de feed de dados
Quando você lê a partir do feed de dados de alterações de uma tabela, a consulta usa o esquema da versão mais recente da tabela. Azure Databricks dá suporte à maioria das operações de evolução e alteração de esquema, mas tabelas com mapeamento de coluna têm limitações. Consulte Tabelas com mapeamento de coluna.
Além das colunas de dados do esquema da tabela Delta Lake, o feed de dados de alteração contém colunas de metadados que identificam o tipo de evento de alteração:
| Nome da coluna | Tipo | Values |
|---|---|---|
_change_type |
String | Contém: insert, update_preimage, , update_postimagedelete.preimage é o valor antes da atualização, postimage é o valor após a atualização. |
_commit_version |
Long | Contém: o log do Delta ou a versão da tabela que contém essa alteração. |
_commit_timestamp |
Carimbo de data/hora | Contém: o registro de data e hora associado ao momento em que o commit foi criado. |
Se o esquema contiver colunas com os mesmos nomes dessas colunas de metadados, você não poderá usar o feed de dados de alteração em uma tabela. Antes de ativar o feed de dados de alterações, renomeie colunas em sua tabela para resolver esse conflito.
Processar dados de alteração incrementalmente
O Databricks recomenda que você use o feed de dados de alteração em combinação com o Streaming Estruturado para processar incrementalmente as alterações das tabelas. Você deve usar o Streaming Estruturado para o Azure Databricks para controlar automaticamente as versões do feed de dados de alterações da tabela. Para processamento CDC com tabelas SCD tipo 1 ou tipo 2, consulte as APIs AUTO CDC: Simplifique a captura de dados de alteração com pipelines.
Quando o fluxo é iniciado pela primeira vez, o feed de dados de alterações retorna o snapshot mais recente da tabela na forma de registros INSERT e, em seguida, retorna mudanças futuras como dados de alteração. Os feeds de alteração de dados gravam as alterações de dados e as novas linhas no log de transações da tabela ao mesmo tempo.
Para configurar um fluxo para ler o feed de alterações de dados de uma tabela, defina a opção readChangeFeed como true, como segue:
Python
(spark.readStream
.option("readChangeFeed", "true")
.table("myTable")
)
Scala
spark.readStream
.option("readChangeFeed", "true")
.table("myTable")
Limites de taxa
Azure Databricks dá suporte a limites de taxa (maxFilesPerTrigger, maxBytesPerTrigger) e excludeRegex ao ler dados de alteração. Para obter uma lista completa das opções de streaming do Delta Lake, consulte Delta Lake.
Opcionalmente, você pode especificar uma versão inicial, consulte Especificar uma versão inicial. Para versões diferentes do instantâneo inicial, os limites de taxa se aplicam atomicamente a confirmações inteiras. O lote atual inclui a confirmação inteira ou o lote atual adia a confirmação para o próximo lote.
Histórico de reprodução da tabela
Um feed de dados de alteração não se destina a servir como um registro permanente de todas as alterações em uma tabela. Ele registra apenas as alterações que ocorrem depois que o feed de dados de alteração foi habilitado. Você pode iniciar uma nova leitura de streaming para capturar a versão atual e todas as alterações subsequentes.
Os registros no feed de dados de alteração são transitórios e só podem ser acessados para uma janela de retenção especificada. Os logs de transações removem as versões da tabela e suas versões correspondentes do feed de dados de alterações em intervalos regulares. Quando uma versão é removida, você não pode mais ler o feed de dados de alteração para essa versão.
Arquivar dados de alteração para histórico permanente
Se o caso de uso exigir que você mantenha um histórico permanente de todas as alterações em uma tabela, use a lógica incremental para gravar registros do feed de dados de alteração em uma nova tabela.
O exemplo a seguir demonstra o uso de trigger.AvailableNow para processar os dados disponíveis como uma carga de trabalho em lote para auditoria ou reprocessamentos completos de alterações:
Python
(spark.readStream
.option("readChangeFeed", "true")
.table("source_table")
.writeStream
.option("checkpointLocation", <checkpoint-path>)
.trigger(availableNow=True)
.toTable("target_table")
)
Scala
spark.readStream
.option("readChangeFeed", "true")
.table("source_table")
.writeStream
.option("checkpointLocation", <checkpoint-path>)
.trigger(Trigger.AvailableNow)
.toTable("target_table")
Especificar uma versão inicial
Para ler as alterações de um ponto específico, especifique uma versão inicial usando um carimbo de data/hora ou um número de versão. As versões iniciais são necessárias para leituras em lote. Opcionalmente, você pode especificar uma versão final para limitar o intervalo. Para saber mais sobre o histórico de tabelas, confira o que é viagem no tempo?.
Quando você configura cargas de trabalho de Streaming Estruturado que usam o feed de dados de alteração, especificar uma versão inicial pode afetar o desempenho do processamento:
- Os novos pipelines de processamento de dados normalmente se beneficiam do comportamento padrão, que registra todos os registros existentes na tabela como
INSERToperações quando o fluxo é iniciado pela primeira vez. - Se sua tabela de destino já contém todos os registros com alterações apropriadas até um determinado ponto, especifique uma versão inicial para evitar o processamento do estado da tabela de origem como eventos
INSERT.
O exemplo a seguir mostra como se recuperar de uma falha de streaming com um ponto de verificação corrompido. Nesse exemplo, assuma as seguintes condições:
- O feed de dados de alterações foi habilitado na tabela de origem na criação da tabela.
- A tabela downstream de destino processou todas as alterações até e incluindo a versão 75.
- O histórico de versões da tabela de origem está disponível para versões 70 e superiores.
Ao definir o fluxo de gravação para a tabela de destino existente, você deve especificar um novo local de ponto de verificação:
Python
(spark.readStream
.option("readChangeFeed", "true")
.option("startingVersion", 76)
.table("source_table")
.writeStream
.option("checkpointLocation", "<new-checkpoint-path>")
.toTable("target_table")
)
Scala
spark.readStream
.option("readChangeFeed", "true")
.option("startingVersion", 76)
.table("source_table")
.writeStream
.option("checkpointLocation", "<new-checkpoint-path>")
.toTable("target_table")
Importante
Se você especificar uma versão inicial e essa versão não estiver disponível no histórico da tabela, o fluxo não será iniciado a partir de um novo ponto de verificação. Como as tabelas gerenciadas limpam automaticamente versões históricas, todas as versões iniciais especificadas são eventualmente excluídas.
Consulte o histórico da tabela Replay.
Ler alterações em consultas em lote
Você pode usar a sintaxe de consulta em lote para ler todas as alterações a partir de uma versão específica ou ler alterações em um intervalo especificado de versões da seguinte maneira:
- Especifique versões como inteiros e carimbos de data/hora como cadeias de caracteres no formato
yyyy-MM-dd[ HH:mm:ss[.SSS]]. - As versões inicial e final são inclusivas. Para ler de uma versão inicial para a versão mais recente, especifique apenas a versão inicial.
- Se você especificar uma versão antes de o feed de dados de alteração ser habilitado, ele gerará um erro.
Para usar leituras em lote com opções de versão inicial e final, faça o seguinte:
SQL
Para ler de uma versão 0 para 10outra, faça o seguinte:
SELECT * FROM table_changes('tableName', 0, 10)
Para ler entre duas versões de carimbo de data e hora, faça o seguinte:
--
SELECT * FROM table_changes('tableName', '2021-04-21 05:45:46', '2021-05-21 12:00:00')
Para ler de uma versão inicial para a mais recente, faça o seguinte:
SELECT * FROM table_changes('tableName', 0)
Para ler as alterações de uma tabela com caracteres especiais no nome, faça o seguinte:
SELECT * FROM table_changes('`schema`.`dotted.tableName`', '2021-04-21 06:45:46', '2021-05-21 12:00:00')
Consulte TVF table_changes.
Python
Para ler de uma versão 0 para 10outra, faça o seguinte:
spark.read \
.option("readChangeFeed", "true") \
.option("startingVersion", 0) \
.option("endingVersion", 10) \
.table("myDeltaTable")
Para ler entre duas marcas de tempo, faça o seguinte:
spark.read \
.option("readChangeFeed", "true") \
.option("startingTimestamp", '2021-04-21 05:45:46') \
.option("endingTimestamp", '2021-05-21 12:00:00') \
.table("myDeltaTable")
Para ler de uma versão inicial para a mais recente, faça o seguinte:
spark.read \
.option("readChangeFeed", "true") \
.option("startingVersion", 0) \
.table("myDeltaTable")
Scala
Para ler de uma versão 0 para 10outra, faça o seguinte:
spark.read
.option("readChangeFeed", "true")
.option("startingVersion", 0)
.option("endingVersion", 10)
.table("myDeltaTable")
Para ler entre duas marcas de tempo, faça o seguinte:
spark.read
.option("readChangeFeed", "true")
.option("startingTimestamp", "2021-04-21 05:45:46")
.option("endingTimestamp", "2021-05-21 12:00:00")
.table("myDeltaTable")
Para ler de uma versão inicial para a mais recente, faça o seguinte:
spark.read
.option("readChangeFeed", "true")
.option("startingVersion", 0)
.table("myDeltaTable")
Manipular versões fora do alcance
Por padrão, se você especificar uma versão ou timestamp que seja posterior ao último commit, a consulta retornará o erro timestampGreaterThanLatestCommit.
No Databricks Runtime 11.3 LTS e posteriores, você pode habilitar a tolerância para versões fora do intervalo da seguinte maneira:
SET spark.databricks.delta.changeDataFeed.timestampOutOfRange.enabled = true;
Quando essa configuração está habilitada, a consulta retorna resultados diferentes da seguinte maneira:
- Uma versão inicial ou timestamp posterior ao último commit retorna um resultado vazio.
- Uma versão de término ou carimbo de data/hora posterior ao último commit retorna todas as alterações desde o início até o último commit.
Feed de dados de alteração herdado para Delta Lake
O fluxo de dados de alterações legado requer configuração manual para tabelas individuais do Delta Lake. Como o feed de dados de alterações não está incluído na especificação do Apache Iceberg, não há suporte para tabelas do Apache Iceberg. O Databricks recomenda que você migre para o feed de dados de alteração automática. Consulte Migrar do feed de dados de alteração herdado.
Quando o feed de dados de alteração herdado é ativado, o runtime registra eventos de alteração para todos os dados gravados na tabela. Isso inclui os dados de linha junto com metadados que indicam se a linha especificada foi inserida, excluída ou atualizada.
O feed legado de dados de alterações usa as mesmas APIs de leitura readChangeFeed e table_changes() que o feed automático de dados de alterações. Veja Processar incrementalmente os dados de alteração e Ler alterações em consultas em lote.
Ativar o feed de dados de alteração herdado
Você deve ativar explicitamente o feed de dados de alteração herdado em tabelas individuais. Use um dos métodos a seguir:
Nova tabela
Defina a propriedade delta.enableChangeDataFeed = true da tabela no CREATE TABLE comando.
CREATE TABLE student (id INT, name STRING, age INT)
TBLPROPERTIES (delta.enableChangeDataFeed = true)
Note
Se você desativar o feed de dados de alteração legado por algum período e, em seguida, ativá-lo novamente, esse período não poderá ser consultado. Use o feed de dados de alteração automática para consultar alterações durante o intervalo. Consulte o feed automático de alterações de dados.
Tabela existente
Defina a propriedade delta.enableChangeDataFeed = true da tabela no ALTER TABLE comando.
ALTER TABLE myDeltaTable
SET TBLPROPERTIES (delta.enableChangeDataFeed = true)
Considerações de armazenamento
As tabelas gerenciadas registram as alterações de dados com eficiência e podem usar outros recursos para otimizar o layout de armazenamento.
Ao usar o feed de dados de alteração legado, você deve considerar o seguinte comportamento de armazenamento:
- Você pode ver um pequeno aumento nos custos de armazenamento porque as alterações podem ser registradas em arquivos separados.
- Algumas operações, como operações somente de inserção ou exclusões de partição inteira, não geram arquivos de dados de alterações. Azure Databricks calcula o feed de dados de alteração diretamente do log de transações.
- Os arquivos de dados de alterações usam a política de retenção da tabela. O
VACUUMcomando exclui arquivos de dados de alteração e as alterações do log de transações usam a política de retenção de ponto de verificação.
O Databricks recomenda que você não tente reconstruir o feed de dados de alteração consultando diretamente arquivos de dados de alteração. Sempre use APIs Delta Lake e Apache Iceberg.
Limitações
Considere as seguintes limitações para feeds de dados de alterações:
Tabelas com mapeamento de colunas
Com o mapeamento de coluna habilitado em uma tabela Delta Lake, você pode remover ou renomear colunas sem reescrever arquivos de dados. Confira Renomear e remover colunas usando o mapeamento de colunas do Delta Lake.
No entanto, os feeds de dados de alteração têm limitações após alterações de esquema não aditivas. As alterações de esquema não aditivas incluem as seguintes operações:
- Renomear ou soltar colunas.
- Altere os tipos de dados de coluna.
- Alterar a nulidade da coluna, como com
ALTER COLUMN ... SET NOT NULL. Consulte Definir umaNOT NULLrestrição no Azure Databricks.
Não é possível ler feeds de alteração de dados para uma transação ou intervalo em que ocorra uma alteração de esquema não aditiva.
Para permitir alterações de esquema não aditivas antes ou depois do intervalo especificado de leituras em lote, as consultas usam o esquema da versão final do intervalo em vez da versão mais recente da tabela. As consultas ainda falharão se o intervalo de versão abranger uma alteração de esquema não aditivo.
Fluxo automático de dados de alterações
- Como o feed de dados de alterações não tem suporte na especificação do Apache Iceberg, os clientes externos do Iceberg não podem consultar o feed de dados de alteração automática. Veja a especificação da tabela Iceberg.
- Para transações com várias instruções, se a tabela de origem foi modificada durante a transação, não há suporte ao feed automático de dados de alteração.
- Não há suporte para o feed automático de dados de alterações em tabelas com filtros de linha ou máscaras de coluna. Veja Filtros de linha e máscaras de coluna.
- As consultas ao feed de dados de alterações não podem abranger versões da tabela nas quais ocorreu uma alteração de esquema não aditiva, como uma renomeação de coluna, remoção de coluna ou alteração de tipo de dados. Divida a consulta em intervalos antes e depois da alteração do esquema.