DataFrame 작업 또는 SQL 테이블 값 함수를 사용하여 구조적 스트리밍 상태 데이터 및 메타데이터를 쿼리할 수 있습니다. 이러한 함수를 사용하여 구조적 스트리밍 상태 저장 쿼리에 대한 상태 정보를 관찰합니다. 이는 모니터링 및 디버깅에 유용할 수 있습니다.
상태 데이터 또는 메타데이터를 쿼리하려면 스트리밍 쿼리의 검사점 경로에 대한 읽기 권한이 있어야 합니다. 이 문서에 설명된 함수는 상태 데이터 및 메타데이터에 대한 읽기 전용 액세스를 제공합니다. 배치 읽기 의미 체계만 사용하여 상태 정보를 쿼리할 수 있습니다.
참고
Lakeflow Spark 선언적 파이프라인, 스트리밍 테이블 또는 구체화된 뷰에 대한 상태 정보를 쿼리할 수 없습니다. 표준 액세스 모드로 구성된 서버리스 컴퓨팅 또는 컴퓨팅을 사용하여 상태 정보를 쿼리할 수 없습니다.
요구 사항
- 다음 컴퓨팅 구성 중 하나를 사용합니다.
- 표준 액세스 모드로 구성된 컴퓨팅에서 Databricks Runtime 16.3 이상 사용.
- Databricks Runtime 14.3 LTS 이상이 전용 또는 비격리 액세스 모드로 구성된 컴퓨팅에서.
- 스트리밍 쿼리에서 사용하는 검사점 경로에 대한 읽기 권한입니다.
구조적 스트리밍 상태 저장소 읽기
지원되는 모든 Databricks Runtime에서 실행되는 구조적 스트리밍 쿼리에 대한 상태 저장소 정보를 읽을 수 있습니다. 다음 구문을 사용합니다.
파이썬
df = (spark.read
.format("statestore")
.load("/checkpoint/path"))
Scala
val df = spark.read
.format("statestore")
.load("/checkpoint/path")
SQL
SELECT * FROM read_statestore('/checkpoint/path')
상태 판독기 API 옵션 및 스키마
서식 옵션의 statestore 전체 목록은 상태 저장소를 참조하세요.
출력 데이터에는 다음 스키마가 있습니다.
| 열 | 타입 | 설명 |
|---|---|---|
key |
구조체(상태 키에서 파생된 추가 형식) | 상태 검사점에서 상태 저장 연산자 레코드의 키입니다. |
value |
구조체(상태 값에서 파생된 추가 형식) | 상태 검사점에서 상태 저장 연산자 레코드의 값입니다. |
partition_id |
정수 | 상태 저장 연산자 레코드를 포함하는 상태 검사점의 파티션입니다. |
Databricks Runtime 16.4 LTS 이상에서는 readChangeFeed 옵션이 true로 설정된 경우, 출력 데이터의 스키마는 다음과 같습니다.
| 열 | 타입 | 설명 |
|---|---|---|
batch_id |
긴 | 상태 변경이 속하는 일괄 처리 ID입니다. |
change_type |
문자열 | 배치에 의해 적용되는 변경 유형: update는 삽입 및 업데이트, delete는 삭제입니다. |
key |
구조체(상태 키에서 파생된 추가 형식) | 상태 검사점에서 상태 저장 연산자 레코드의 키입니다. |
value |
구조체(상태 값에서 파생된 추가 형식) | 상태 검사점에서 상태 저장 연산자 레코드의 값입니다.
change_type이(가) delete인 레코드에 대한 null. |
partition_id |
정수 | 상태 저장 연산자 레코드를 포함하는 상태 검사점의 파티션입니다. |
테이블 값 함수 read_statestore을 참조하세요.
구조적 스트리밍 상태 변경 내용 읽기
Databricks Runtime 16.4 LTS 이상에서 사용할 수 있습니다. 단일 마이크로배치의 전체 상태를 보는 대신 마이크로배치 전반에서 상태가 어떻게 변하는지 확인하려면 readChangeFeed를 true로 설정하고 changeStartBatchId를 지정합니다. 필요에 따라 changeEndBatchId를 지정합니다. 전체 옵션 목록은 상태 저장소를 참조하세요.
예를 들어, 배치 2부터 최신 커밋된 배치까지의 상태 변경을 읽으려면:
파이썬
df = (spark.read
.format("statestore")
.option("readChangeFeed", True)
.option("changeStartBatchId", 2)
.load("<checkpointLocation>")
)
Scala
val df = spark.read
.format("statestore")
.option("readChangeFeed", true)
.option("changeStartBatchId", 2)
.load("<checkpointLocation>")
SQL
SELECT * FROM read_statestore(
'<checkpointLocation>',
readChangeFeed => true,
changeStartBatchId => 2
);
출력 스키마에는 추가 batch_id 및 change_type 열이 포함됩니다. 전체 스키마는 상태 판독기 API 옵션 및 스키마를 참조하세요.
구조적 스트리밍 상태 메타데이터 읽기
Databricks Runtime 14.3 LTS 이상에서 사용할 수 있습니다. 구조적 스트리밍 쿼리에 대한 상태 메타데이터 정보를 읽을 수 있습니다.
파이썬
df = (spark.read
.format("state-metadata")
.load("<checkpointLocation>"))
Scala
val df = spark.read
.format("state-metadata")
.load("<checkpointLocation>")
SQL
SELECT * FROM read_state_metadata('/checkpoint/path')
반환된 데이터는 다음 스키마를 보유합니다.
| 열 | 타입 | 설명 |
|---|---|---|
operatorId |
정수 | 스트리밍 상태 저장 연산자의 ID는 정수입니다. |
operatorName |
문자열 | 상태 저장 스트리밍 연산자의 이름입니다. |
stateStoreName |
문자열 | 연산자 상태 저장소의 이름입니다. |
numPartitions |
정수 | 상태 저장소의 파티션 수입니다. |
minBatchId |
긴 | 쿼리 상태에 사용할 수 있는 최소 배치 ID입니다. |
maxBatchId |
긴 | 쿼리 상태에 사용할 수 있는 최대 배치 ID입니다. |
참고
검사점이 작성될 당시의 상태를 minBatchId 및 maxBatchId가 제공하는 배치 ID 값들이 반영합니다. 이전 배치는 마이크로 일괄 처리 실행으로 자동으로 정리되므로 여기에 제공된 값은 계속 사용할 수 있다고 보장되지 않습니다.
테이블 값 함수 read_state_metadata을 참조하세요.
예: 스트림-스트림 조인의 한쪽 쿼리
다음 구문을 사용하여 스트림 스트림 조인의 왼쪽을 쿼리합니다.
파이썬
left_df = (spark.read
.format("statestore")
.option("joinSide", "left")
.load("/checkpoint/path"))
Scala
val leftDf = spark.read
.format("statestore")
.option("joinSide", "left")
.load("/checkpoint/path")
SQL
SELECT * FROM read_statestore(
'/checkpoint/path',
joinSide => 'left'
);
예: 여러 상태 저장 연산자가 있는 스트림에 대한 쿼리 상태 저장소
이 예제에서는 상태 메타데이터 판독기를 사용하여 여러 상태 저장 연산자를 사용하여 스트리밍 쿼리의 메타데이터 세부 정보를 수집한 다음, 상태 판독기의 옵션으로 메타데이터 결과를 사용합니다.
상태 메타데이터 판독기는 다음 구문 예제와 같이 검사점 경로를 유일한 옵션으로 사용합니다.
파이썬
df = (spark.read
.format("state-metadata")
.load("<checkpointLocation>"))
Scala
val df = spark.read
.format("state-metadata")
.load("<checkpointLocation>")
SQL
SELECT * FROM read_state_metadata('/checkpoint/path')
다음 표는 상태 저장소 메타데이터의 예제 출력을 나타냅니다.
| operatorId | 운영자이름 | 상태저장소이름 | numPartitions | minBatchId | maxBatchId |
|---|---|---|---|---|---|
| 0 | 상태 저장소 저장 | 기본값 | 200 | 0 | 13 |
| 1 | 워터마크 내 중복 제거 | 기본값 | 200 | 0 | 13 |
연산자에 dedupeWithinWatermark 대한 결과를 얻으려면 다음 예제와 같이 옵션을 사용하여 operatorId 상태 판독기를 쿼리합니다.
파이썬
left_df = (spark.read
.format("statestore")
.option("operatorId", 1)
.load("/checkpoint/path"))
Scala
val leftDf = spark.read
.format("statestore")
.option("operatorId", 1)
.load("/checkpoint/path")
SQL
SELECT * FROM read_statestore(
'/checkpoint/path',
operatorId => 1
);