기본 제공 커넥터를 사용하여 Google Pub/Sub를 구독합니다. 이 커넥터는 subscriber에서 가져오는 행에 대해 정확히 한 번 처리 시맨틱을 제공합니다.
참고
Pub/Sub는 중복된 행을 전송할 수 있으며, 행이 구독자에게 순서가 뒤바뀐 상태로 도착할 수도 있습니다. 중복 행과 순서가 다른 행을 처리하는 코드를 작성해야 합니다.
Pub/Sub 스트림 구성
다음 코드 예제에서는 Pub/Sub에서 읽은 구조적 스트리밍을 구성하고 프라이빗 키로 인증하는 방법을 보여줍니다.
Python
auth_options = {
"clientId": client_id,
"clientEmail": client_email,
"privateKey": private_key,
"privateKeyId": private_key_id
}
query = (spark.readStream
.format("pubsub")
.option("subscriptionId", "mysub")
.option("topicId", "mytopic")
.option("projectId", "myproject")
.options(auth_options)
.load()
)
Scala
val authOptions: Map[String, String] =
Map("clientId" -> clientId,
"clientEmail" -> clientEmail,
"privateKey" -> privateKey,
"privateKeyId" -> privateKeyId)
val query = spark.readStream
.format("pubsub")
// Creates a Pub/Sub subscription if one does not already exist with this ID
.option("subscriptionId", "mysub")
.option("topicId", "mytopic")
.option("projectId", "myproject")
.options(authOptions)
.load()
SQL
CREATE OR REFRESH STREAMING TABLE pubsub_raw
AS SELECT * FROM STREAM read_pubsub(
subscriptionId => 'mysub',
projectId => 'myproject',
topicId => 'mytopic',
clientEmail => secret('pubsub-scope', 'clientEmail'),
clientId => secret('pubsub-scope', 'clientId'),
privateKeyId => secret('pubsub-scope', 'privateKeyId'),
privateKey => secret('pubsub-scope', 'privateKey')
);
자세한 구성 옵션은 Pub/Sub 스트리밍 읽기에 대한 구성 옵션을 참조 하세요.
Pub/Sub에 대한 액세스 구성
자격 증명에는 다음 역할이 있어야 합니다.
| 역할 | 필수 또는 선택 사항 | 역할 사용 방법 |
|---|---|---|
roles/pubsub.viewer 또는 roles/viewer |
필수 | 구독이 있는지 확인하고 구독을 가져옵니다. |
roles/pubsub.subscriber |
필수 | 구독에서 데이터를 가져옵니다. |
roles/pubsub.editor 또는 roles/editor |
선택 사항 | 구독이 없는 경우 구독을 만들 수 있으며 스트림 종료에 구독을 삭제할 수 있습니다 deleteSubscriptionOnStreamStop . |
Databricks는 키를 사용할 때 비밀을 사용하는 것이 좋습니다. 연결 권한을 부여하려면 다음 옵션이 필요합니다.
clientEmailclientIdprivateKeyprivateKeyId
Pub/Sub 스키마 이해
스트림의 스키마는 다음 표에 설명된 대로 Pub/Sub에서 가져온 행과 일치합니다.
| 필드 | 유형 |
|---|---|
messageId |
StringType |
payload |
ArrayType[ByteType] |
attributes |
StringType |
publishTimestampInMillis |
LongType |
Pub/Sub 스트리밍 읽기를 위한 옵션 구성
다음 표에서는 Pub/Sub에 지원되는 옵션에 대해 설명합니다. 모든 옵션은 스트림 리더에서 .option("<optionName>", "<optionValue>")를 사용하여 구성됩니다.
참고
일부 Pub/Sub 구성 옵션은 마이크로 일괄 처리 대신 페치 개념을 사용합니다. 이는 내부 구현 세부 정보이며, 행을 가져온 다음 처리한다는 점을 제외하고 옵션은 다른 구조적 스트리밍 커넥터와 유사하게 작동합니다.
| Key | 기본값 | 설명 |
|---|---|---|
numFetchPartitions |
스트림 초기화에 있는 실행기 수의 절반으로 설정합니다. | 구독에서 행을 가져오는 병렬 Spark 작업의 수입니다. |
deleteSubscriptionOnStreamStop |
false |
이 경우 true스트리밍 작업이 종료될 때 스트림에 전달된 구독이 삭제됩니다. |
maxBytesPerTrigger |
none |
트리거된 각 마이크로 일괄 처리 중에 처리할 일괄 처리 크기에 대한 소프트 제한입니다. |
maxRecordsPerFetch |
1000 |
행을 처리하기 전에 태스크당 가져올 행 수입니다. |
maxFetchPeriod |
10s |
행을 처리하기 전에 각 태스크가 가져올 기간입니다. 기간 문자열(예 1s : 1초 또는 1m 1분)을 허용합니다. Databricks는 기본값을 사용하는 것이 좋습니다. |
Pub/Sub에서 증분 일괄 처리 사용
Trigger.AvailableNow를 사용하여 Pub/Sub 소스에서 사용 가능한 행을 증분 배치로 가져올 수 있습니다.
Azure Databricks는 Trigger.AvailableNow 설정을 사용하여 읽기를 시작할 때 타임스탬프를 기록합니다. 일괄 처리에서 처리되는 행에는 이전에 가져온 모든 데이터와 기록된 시작 타임스탬프보다 작은 타임스탬프가 있는 새로 게시된 행이 모두 포함됩니다. 자세한 내용은 다음을 참조하세요 AvailableNow. 증분 일괄 처리.
Pub/Sub 스트리밍 메트릭 모니터링
구조적 스트리밍 진행률 메트릭은 가져오고 처리할 준비가 된 행의 수, 가져오고 처리할 준비가 된 행의 크기, 스트림 시작 이후 표시되는 중복 항목 수를 보고합니다.
다음은 Pub/Sub 메트릭의 예입니다.
"metrics" : {
"numDuplicatesSinceStreamStart" : "1",
"numRecordsReadyToProcess" : "1",
"sizeOfRecordsReadyToProcess" : "8"
}
제한 사항
Pub/Sub는 spark.speculation에서 추측 실행을 지원하지 않습니다.