Azure Databricks 변경 데이터 피드 사용

CDF(변경 데이터 피드)는 Delta Lake 테이블 또는 Apache Iceberg v3 테이블 버전 간의 행 수준 변경 내용을 추적합니다.

Azure Databricks는 다음 두 가지 방법을 지원합니다.

  • 자동 변경 데이터 피드: 행 계보 메타데이터를 사용하여 테이블 읽기 중에 변경 내용을 계산합니다. 개별 테이블 구성이 필요하지 않으며 Delta Lake 및 Apache Iceberg v3 테이블에서 작동합니다. 자동 변경 데이터 피드를 참조하세요.
  • 레거시 변경 데이터 피드: 테이블 쓰기 중에 변경 내용을 구체화합니다. Delta Lake 테이블만 지원합니다. 개별 테이블 구성이 필요합니다. Delta Lake에 대한 레거시 변경 데이터 피드를 참조하세요.

다음과 같은 일반적인 데이터 사용 사례에 변경 데이터 피드를 사용할 수 있습니다.

  • 마지막 파이프라인 실행 이후 변경된 행만 처리하는 증분 ETL 파이프라인입니다.
  • 규정 준수 및 거버넌스 요구 사항에 대한 데이터 수정 내용을 추적하는 감사 내역입니다.
  • 다운스트림 테이블, 캐시 또는 외부 시스템에 변경 내용을 동기화하는 데이터 복제 워크로드입니다.

자동 변경 데이터 피드

Important

이 기능은 공개 미리보기 단계에 있습니다. 작업 영역 관리자는 미리 보기 페이지에서 이 기능에 대한 액세스를 제어할 수 있습니다. Azure Databricks 미리 보기 관리를 참조하세요.

자동 변경 데이터 피드는 Delta Lake에 대한 행 추적 및 Apache Iceberg v3의 행 계보를 사용하여 쓰기 시간이 아닌 쿼리 시간에 행 수준 변경 내용을 계산합니다. 레거시 변경 데이터 피드와 달리 자동 변경 데이터 피드에는 개별 테이블 구성이 필요하지 않으며 Delta Lake 테이블 및 Apache Iceberg v3 테이블에서 작동합니다.

변경 내용은 모든 쓰기 MERGE INTOUPDATE 작업에 대해 계산되지 않으므로 자동 변경 데이터 피드는 레거시 변경 데이터 피드에 비해 쓰기 성능을 향상시키고 스토리지 비용을 절감합니다.

자동 변경 데이터 피드는 레거시 변경 데이터 피드와 동일한 table_changes() API를 readChangeFeed 사용하며 일괄 처리 쿼리, 구조적 스트리밍 및 Databricks-to-Databricks Delta Lake Sharing에서 작동합니다. 일괄 처리 쿼리의 변경 내용 읽기 및변경 데이터 증분 처리를 참조하세요.

요구 사항

  • Databricks Runtime 18 이상
  • Unity 카탈로그에 등록된 지원되는 테이블 형식:
    • 행 추적이 사용되거나 Iceberg v3 형식인 Delta Lake 형식의 관리되는 테이블입니다.
    • 행 추적을 사용하도록 설정된 Delta Lake 형식의 외부 테이블입니다.

Databricks Unity 카탈로그 테이블 형식을 참조하세요.

참고

변경 데이터 피드는 Apache Iceberg 사양의 일부가 아닙니다. Azure Databricks 판독기는 Apache Iceberg v3 테이블에 대한 자동 변경 데이터 피드를 쿼리할 수 있지만 외부 Iceberg 판독기는 쿼리할 수 없습니다. Iceberg 테이블 사양을 참조하세요.

Delta Lake의 경우 Azure Databricks 판독기만 자동 변경 데이터 피드를 쿼리할 수 있습니다.

변경 데이터 피드 사용

변경 데이터 피드를 사용하려면 요구 사항을 충족하는 테이블을 사용하고 있는지 확인합니다. 요구 사항을 참조하세요.

읽기 변경 데이터 피드를 일괄 처리하려면 다음을 수행합니다.

Python

spark.read \
  .option("readChangeFeed", "true") \
  .option("startingVersion", 0) \
  .table("<table_name>")

Scala

spark.read
  .option("readChangeFeed", "true")
  .option("startingVersion", 0)
  .table("<table_name>")

SQL

SELECT * FROM table_changes('<table_name>', 0)

변경 데이터 피드에 대한 일괄 처리 읽기에 대한 자세한 내용은 일괄 처리 쿼리의 변경 내용 읽기를 참조하세요.

읽기 변경 데이터 피드를 스트리밍하려면 다음을 수행합니다.

Python

(spark.readStream
  .option("readChangeFeed", "true")
  .table("<table_name>")
)

Scala

spark.readStream
  .option("readChangeFeed", "true")
  .table("<table_name>")

변경 데이터 피드에 대한 스트리밍 읽기에 대한 자세한 내용은 변경 데이터 증분 처리를 참조하세요.

레거시 변경 데이터 피드에서 마이그레이션

Delta Lake 테이블을 레거시 변경 데이터 피드에서 자동 변경 데이터 피드로 마이그레이션하려면 다음을 수행합니다.

  1. 테이블이 요구 사항을 충족하는지 확인합니다.
  2. 다음 명령을 실행하여 레거시 변경 데이터 피드를 끕니다.
ALTER TABLE <table_name> UNSET TBLPROPERTIES ('delta.enableChangeDataFeed');

레거시 및 자동 변경 데이터 피드를 함께 사용할 수 없습니다.

데이터 피드 스키마 변경

테이블에 대한 변경 데이터 피드에서 읽을 때 쿼리는 최신 테이블 버전의 스키마를 사용합니다. Azure Databricks 대부분의 스키마 변경 및 진화 작업을 지원하지만 열 매핑이 있는 테이블에는 제한이 있습니다. 열 매핑이 있는 테이블을 참조하세요.

Delta Lake 테이블의 스키마에 있는 데이터 열 외에도 변경 데이터 피드에는 변경 이벤트의 유형을 식별하는 메타데이터 열이 포함되어 있습니다.

열 이름 유형 가치들
_change_type 스트링 포함: insert, update_preimage, update_postimagedelete.
preimage 는 업데이트 전의 값이며 업데이트 postimage 후의 값입니다.
_commit_version Long 포함: 변경 내용이 포함된 델타 로그 또는 테이블 버전입니다.
_commit_timestamp 타임스탬프 포함: 커밋을 만들 때 연결된 타임스탬프입니다.

스키마에 이러한 메타데이터 열과 이름이 같은 열이 포함되어 있으면 테이블에서 변경 데이터 피드를 사용할 수 없습니다. 변경 데이터 피드를 켜기 전에 이 충돌을 해결하기 위해 테이블의 열 이름을 바꿉니다.

변경 데이터 증분 처리

Databricks는 구조적 스트리밍과 함께 변경 데이터 피드를 사용하여 테이블의 변경 내용을 증분 처리할 것을 권장합니다. 테이블의 변경 데이터 피드에 대한 버전을 자동으로 추적하려면 Azure Databricks용 구조적 스트리밍을 사용해야 합니다. SCD 유형 1 또는 형식 2 테이블을 사용한 CDC 처리 는 AUTO CDC API: 파이프라인을 사용하여 변경 데이터 캡처 간소화를 참조하세요.

스트림이 처음 시작되면 변경 데이터 피드는 테이블의 최신 스냅샷을 레코드로 INSERT 반환한 다음 이후 변경 내용을 변경 데이터로 반환합니다. 변경 데이터 피드는 변경 데이터와 새 데이터 행을 테이블 트랜잭션 로그에 동시에 커밋합니다.

테이블의 변경 데이터 피드를 읽도록 스트림을 구성하려면 옵션을 readChangeFeedtrue 다음과 같이 설정합니다.

Python

(spark.readStream
  .option("readChangeFeed", "true")
  .table("myTable")
)

Scala

spark.readStream
  .option("readChangeFeed", "true")
  .table("myTable")

요율 제한

Azure Databricks 변경 데이터를 읽을 때 속도 제한(maxFilesPerTrigger, maxBytesPerTrigger)excludeRegex을 지원합니다. 스트리밍 Delta Lake 옵션의 전체 목록은 Delta Lake를 참조하세요.

필요에 따라 시작 버전을 지정할 수 있습니다. 시작 버전 지정을 참조하세요. 시작 스냅샷 이외의 버전의 경우 속도 제한은 전체 커밋에 원자성으로 적용됩니다. 현재 일괄 처리에 전체 커밋이 포함되거나 현재 일괄 처리가 다음 일괄 처리로 커밋을 연기합니다.

리플레이 테이블 기록

변경 데이터 피드는 테이블에 대한 모든 변경 내용의 영구 레코드로 사용되지 않습니다. 변경 데이터 피드를 사용하도록 설정한 후에 발생하는 변경 내용만 기록합니다. 새 스트리밍 읽기를 시작하여 현재 버전 및 모든 후속 변경 내용을 캡처할 수 있습니다.

변경 데이터 피드의 레코드는 일시적이며 지정된 보존 기간에만 액세스할 수 있습니다. 트랜잭션 로그는 정기적으로 테이블 버전과 해당 변경 데이터 피드 버전을 제거합니다. 버전이 제거되면 해당 버전에 대한 변경 데이터 피드를 더 이상 읽을 수 없습니다.

영구 기록에 대한 변경 데이터 보관

사용 사례에서 테이블에 대한 모든 변경 내용의 영구 기록을 유지해야 하는 경우 증분 논리를 사용하여 변경 데이터 피드에서 새 테이블로 레코드를 작성합니다.

다음 예제에서는 사용 가능한 데이터를 감사 또는 전체 변경 재생을 위한 일괄 처리 워크로드로 처리하는 방법을 trigger.AvailableNow 보여 줍니다.

Python
(spark.readStream
  .option("readChangeFeed", "true")
  .table("source_table")
  .writeStream
  .option("checkpointLocation", <checkpoint-path>)
  .trigger(availableNow=True)
  .toTable("target_table")
)
Scala
spark.readStream
  .option("readChangeFeed", "true")
  .table("source_table")
  .writeStream
  .option("checkpointLocation", <checkpoint-path>)
  .trigger(Trigger.AvailableNow)
  .toTable("target_table")

시작 버전 지정

특정 지점에서 변경 내용을 읽으려면 타임스탬프 또는 버전 번호를 사용하여 시작 버전을 지정합니다. 일괄 처리 읽기에는 시작 버전이 필요합니다. 필요에 따라 범위를 제한하는 끝 버전을 지정할 수 있습니다. 테이블 기록에 대한 자세한 내용은 시간 여행이란?을 참조하세요.

변경 데이터 피드를 사용하는 구조적 스트리밍 워크로드를 구성하는 경우 시작 버전을 지정하면 처리 성능에 영향을 줄 수 있습니다.

  • 새 데이터 처리 파이프라인은 일반적으로 스트림이 처음 시작될 때 테이블의 모든 기존 레코드를 작업으로 INSERT 기록하는 기본 동작의 이점을 활용합니다.
  • 대상 테이블에 특정 시점까지 적절한 변경 내용이 있는 모든 레코드가 이미 포함되어 있는 경우 원본 테이블 상태를 이벤트로 INSERT 처리하지 않도록 시작 버전을 지정합니다.

다음 예제에서는 손상된 검사점을 사용하여 스트리밍 실패에서 복구하는 방법을 보여 있습니다. 이 예제에서는 다음 조건을 가정합니다.

  1. 변경 데이터 피드는 테이블을 만들 때 원본 테이블에서 사용하도록 설정되었습니다.
  2. 대상 다운스트림 테이블은 버전 75까지 모든 변경 내용을 처리했습니다.
  3. 원본 테이블의 버전 기록은 버전 70 이상에서 사용할 수 있습니다.

기존 대상 테이블에 대한 쓰기 스트림을 정의할 때 새 검사점 위치를 지정해야 합니다.

Python

(spark.readStream
  .option("readChangeFeed", "true")
  .option("startingVersion", 76)
  .table("source_table")
  .writeStream
  .option("checkpointLocation", "<new-checkpoint-path>")
  .toTable("target_table")
)

Scala

spark.readStream
  .option("readChangeFeed", "true")
  .option("startingVersion", 76)
  .table("source_table")
  .writeStream
  .option("checkpointLocation", "<new-checkpoint-path>")
  .toTable("target_table")

Important

시작 버전을 지정하고 테이블 기록에서 해당 버전을 사용할 수 없는 경우 스트림이 새 검사점에서 시작되지 않습니다. 관리되는 테이블은 기록 버전을 자동으로 정리하므로 지정된 모든 시작 버전이 결국 삭제됩니다.

재생 테이블 기록을 참조하세요.

일괄 처리 쿼리의 변경 내용 읽기

일괄 처리 쿼리 구문을 사용하여 특정 버전에서 시작하는 모든 변경 내용을 읽거나 지정된 버전 범위 내의 변경 내용을 다음과 같이 읽을 수 있습니다.

  • 버전을 정수로 지정하고 타임스탬프를 문자열 yyyy-MM-dd[ HH:mm:ss[.SSS]]형식으로 지정합니다.
  • 시작 및 종료 버전은 포함됩니다. 시작 버전에서 최신 버전으로 읽으려면 시작 버전만 지정합니다.
  • 변경 데이터 피드를 사용하도록 설정하기 전에 버전을 지정하면 오류가 발생합니다.

버전 시작 및 종료 옵션과 함께 일괄 처리 읽기를 사용하려면 다음을 수행합니다.

SQL

버전 010에서 읽어 보려면 다음을 수행합니다.

SELECT * FROM table_changes('tableName', 0, 10)

두 타임스탬프 버전 간에 읽으려면 다음을 수행합니다.

--
SELECT * FROM table_changes('tableName', '2021-04-21 05:45:46', '2021-05-21 12:00:00')

시작 버전에서 최신 버전으로 읽으려면 다음을 수행합니다.

SELECT * FROM table_changes('tableName', 0)

이름에 특수 문자가 있는 테이블의 변경 내용을 읽으려면 다음을 수행합니다.

SELECT * FROM table_changes('`schema`.`dotted.tableName`', '2021-04-21 06:45:46', '2021-05-21 12:00:00')

테이블 값 함수 table_changes을 참조하세요.

Python

버전 010에서 읽어 보려면 다음을 수행합니다.

spark.read \
  .option("readChangeFeed", "true") \
  .option("startingVersion", 0) \
  .option("endingVersion", 10) \
  .table("myDeltaTable")

두 타임스탬프 사이를 읽으려면 다음을 수행합니다.

spark.read \
  .option("readChangeFeed", "true") \
  .option("startingTimestamp", '2021-04-21 05:45:46') \
  .option("endingTimestamp", '2021-05-21 12:00:00') \
  .table("myDeltaTable")

시작 버전에서 최신 버전으로 읽으려면 다음을 수행합니다.

spark.read \
  .option("readChangeFeed", "true") \
  .option("startingVersion", 0) \
  .table("myDeltaTable")

Scala

버전 010에서 읽어 보려면 다음을 수행합니다.

spark.read
  .option("readChangeFeed", "true")
  .option("startingVersion", 0)
  .option("endingVersion", 10)
  .table("myDeltaTable")

두 타임스탬프 사이를 읽으려면 다음을 수행합니다.

spark.read
  .option("readChangeFeed", "true")
  .option("startingTimestamp", "2021-04-21 05:45:46")
  .option("endingTimestamp", "2021-05-21 12:00:00")
  .table("myDeltaTable")

시작 버전에서 최신 버전으로 읽으려면 다음을 수행합니다.

spark.read
  .option("readChangeFeed", "true")
  .option("startingVersion", 0)
  .table("myDeltaTable")

범위를 벗어난 버전 처리

기본적으로 마지막 커밋을 초과하는 버전 또는 타임스탬프를 지정하면 쿼리에서 오류를 timestampGreaterThanLatestCommit반환합니다.

Databricks Runtime 11.3 LTS 이상에서는 다음과 같이 범위를 벗어난 버전에 대해 허용 오차를 사용하도록 설정할 수 있습니다.

SET spark.databricks.delta.changeDataFeed.timestampOutOfRange.enabled = true;

이 구성을 사용하도록 설정하면 쿼리는 다음과 같이 다른 결과를 반환합니다.

  • 마지막 커밋 이후의 시작 버전 또는 타임스탬프는 빈 결과를 반환합니다.
  • 마지막 커밋 이후의 종료 버전 또는 타임스탬프는 시작부터 마지막 커밋까지의 모든 변경 내용을 반환합니다.

Delta Lake에 대한 레거시 변경 데이터 피드

레거시 변경 데이터 피드에는 개별 Delta Lake 테이블에 대한 수동 구성이 필요합니다. 변경 데이터 피드는 Apache Iceberg 사양에 포함되지 않으므로 Apache Iceberg 테이블은 지원되지 않습니다. Databricks는 자동 변경 데이터 피드로 마이그레이션하는 것이 좋습니다. 레거시 변경 데이터 피드에서 마이그레이션을 참조하세요.

레거시 변경 데이터 피드가 켜져 있으면 런타임 레코드는 테이블에 기록된 모든 데이터에 대한 변경 이벤트를 기록합니다. 여기에는 지정된 행이 삽입, 삭제 또는 업데이트되었는지 여부를 나타내는 메타데이터와 함께 행 데이터가 포함됩니다.

레거시 변경 데이터 피드는 자동 변경 데이터 피드와 table_changes() 동일한 readChangeFeed 읽기 API를 사용합니다. 일괄 처리 쿼리 에서 변경 데이터 증분 처리변경 내용 읽기를 참조하세요.

레거시 변경 데이터 피드 켜기

개별 테이블에서 레거시 변경 데이터 피드를 명시적으로 설정해야 합니다. 다음 방법 중 하나를 사용하십시오.

새 테이블

delta.enableChangeDataFeed = true 테이블 속성을 CREATE TABLE 명령에서 설정합니다.

CREATE TABLE student (id INT, name STRING, age INT)
  TBLPROPERTIES (delta.enableChangeDataFeed = true)

참고

일정 시간 간격 동안 레거시 변경 데이터 피드를 끈 다음 다시 켜면 간격을 쿼리할 수 없습니다. 자동 변경 데이터 피드를 사용하여 간격 동안 변경 내용을 쿼리합니다. 자동 변경 데이터 피드를 참조하세요.

기존 테이블

delta.enableChangeDataFeed = true 테이블 속성을 ALTER TABLE 명령에서 설정합니다.

ALTER TABLE myDeltaTable
  SET TBLPROPERTIES (delta.enableChangeDataFeed = true)

스토리지 고려 사항

관리되는 테이블은 데이터 변경 내용을 효율적으로 기록하며 다른 기능을 사용하여 스토리지 레이아웃을 최적화할 수 있습니다.

레거시 변경 데이터 피드를 사용하면 다음 스토리지 동작을 고려해야 합니다.

  • 변경 내용이 별도의 파일에 기록될 수 있으므로 스토리지 비용이 약간 증가할 수 있습니다.
  • 삽입 전용 또는 전체 파티션 삭제와 같은 일부 작업은 변경 데이터 파일을 생성하지 않습니다. Azure Databricks 트랜잭션 로그에서 직접 변경 데이터 피드를 계산합니다.
  • 변경 데이터 파일은 테이블의 보존 정책을 사용합니다. 이 VACUUM 명령은 변경 데이터 파일을 삭제하고 트랜잭션 로그의 변경 내용은 검사점 보존 정책을 사용합니다.

Databricks는 변경 데이터 파일을 직접 쿼리하여 변경 데이터 피드를 다시 구성하지 않는 것이 좋습니다. 항상 Delta Lake 및 Apache Iceberg API를 사용합니다.

Limitations

변경 데이터 피드에 대해 다음과 같은 제한 사항을 고려합니다.

열 매핑이 있는 테이블

Delta Lake 테이블에서 열 매핑을 사용하도록 설정하면 데이터 파일을 다시 작성하지 않고도 열을 삭제하거나 이름을 바꿀 수 있습니다. Delta Lake 열 매핑을 사용하여 열 이름 바꾸기 및 삭제를 참조하세요.

그러나 비가산적 스키마 변경 후 변경 데이터 피드에는 제한 사항이 있습니다. 비가산적 스키마 변경에는 다음 작업이 포함됩니다.

비가산적 스키마 변경이 발생하는 트랜잭션 또는 범위에 대한 변경 데이터 피드는 읽을 수 없습니다.

지정된 일괄 처리 읽기 범위 전후에 비가산적 스키마 변경을 허용하기 위해 쿼리는 최신 테이블 버전이 아닌 범위의 최종 버전의 스키마를 사용합니다. 버전 범위가 추가되지 않는 스키마 변경에 걸쳐 있는 경우에도 쿼리가 실패합니다.

자동 변경 데이터 피드

  • 변경 데이터 피드는 Apache Iceberg 사양에서 지원되지 않으므로 외부 Iceberg 클라이언트는 자동 변경 데이터 피드를 쿼리할 수 없습니다. Iceberg 테이블 사양을 참조하세요.
  • 다중 문 트랜잭션의 경우 트랜잭션 중에 원본 테이블이 수정된 경우 자동 변경 데이터 피드가 지원되지 않습니다.
  • 행 필터 또는 열 마스크가 있는 테이블에서는 자동 변경 데이터 피드가 지원되지 않습니다. 행 필터 및 열 마스크를 참조하세요.
  • 변경 데이터 피드 쿼리는 열 이름 바꾸기, 삭제 또는 데이터 형식 변경과 같이 추가되지 않는 스키마 변경이 발생한 테이블 버전을 확장할 수 없습니다. 스키마 변경 전후의 범위로 쿼리를 분할합니다.