Google Pub/Sub 구독

기본 제공 커넥터를 사용하여 Google Pub/Sub를 구독합니다. 이 커넥터는 subscriber에서 가져오는 행에 대해 정확히 한 번 처리 시맨틱을 제공합니다.

참고

Pub/Sub는 중복된 행을 전송할 수 있으며, 행이 구독자에게 순서가 뒤바뀐 상태로 도착할 수도 있습니다. 중복 행과 순서가 다른 행을 처리하는 코드를 작성해야 합니다.

Pub/Sub 스트림 구성

다음 코드 예제에서는 Pub/Sub에서 읽은 구조적 스트리밍을 구성하고 프라이빗 키로 인증하는 방법을 보여줍니다.

Python

auth_options = {
    "clientId": client_id,
    "clientEmail": client_email,
    "privateKey": private_key,
    "privateKeyId": private_key_id
}

query = (spark.readStream
  .format("pubsub")
  .option("subscriptionId", "mysub")
  .option("topicId", "mytopic")
  .option("projectId", "myproject")
  .options(auth_options)
  .load()
)

Scala

val authOptions: Map[String, String] =
  Map("clientId" -> clientId,
      "clientEmail" -> clientEmail,
      "privateKey" -> privateKey,
      "privateKeyId" -> privateKeyId)

val query = spark.readStream
  .format("pubsub")
  // Creates a Pub/Sub subscription if one does not already exist with this ID
  .option("subscriptionId", "mysub")
  .option("topicId", "mytopic")
  .option("projectId", "myproject")
  .options(authOptions)
  .load()

SQL

CREATE OR REFRESH STREAMING TABLE pubsub_raw
AS SELECT * FROM STREAM read_pubsub(
  subscriptionId => 'mysub',
  projectId => 'myproject',
  topicId => 'mytopic',
  clientEmail => secret('pubsub-scope', 'clientEmail'),
  clientId => secret('pubsub-scope', 'clientId'),
  privateKeyId => secret('pubsub-scope', 'privateKeyId'),
  privateKey => secret('pubsub-scope', 'privateKey')
);

자세한 구성 옵션은 Pub/Sub 스트리밍 읽기에 대한 구성 옵션을 참조 하세요.

Pub/Sub에 대한 액세스 구성

자격 증명에는 다음 역할이 있어야 합니다.

역할 필수 또는 선택 사항 역할 사용 방법
roles/pubsub.viewer 또는 roles/viewer 필수 구독이 있는지 확인하고 구독을 가져옵니다.
roles/pubsub.subscriber 필수 구독에서 데이터를 가져옵니다.
roles/pubsub.editor 또는 roles/editor 선택 사항 구독이 없는 경우 구독을 만들 수 있으며 스트림 종료에 구독을 삭제할 수 있습니다 deleteSubscriptionOnStreamStop .

Databricks는 키를 사용할 때 비밀을 사용하는 것이 좋습니다. 연결 권한을 부여하려면 다음 옵션이 필요합니다.

  • clientEmail
  • clientId
  • privateKey
  • privateKeyId

Pub/Sub 스키마 이해

스트림의 스키마는 다음 표에 설명된 대로 Pub/Sub에서 가져온 행과 일치합니다.

필드 유형
messageId StringType
payload ArrayType[ByteType]
attributes StringType
publishTimestampInMillis LongType

Pub/Sub 스트리밍 읽기를 위한 옵션 구성

다음 표에서는 Pub/Sub에 지원되는 옵션에 대해 설명합니다. 모든 옵션은 스트림 리더에서 .option("<optionName>", "<optionValue>")를 사용하여 구성됩니다.

참고

일부 Pub/Sub 구성 옵션은 마이크로 일괄 처리 대신 페치 개념을 사용합니다. 이는 내부 구현 세부 정보이며, 행을 가져온 다음 처리한다는 점을 제외하고 옵션은 다른 구조적 스트리밍 커넥터와 유사하게 작동합니다.

Key 기본값 설명
numFetchPartitions 스트림 초기화에 있는 실행기 수의 절반으로 설정합니다. 구독에서 행을 가져오는 병렬 Spark 작업의 수입니다.
deleteSubscriptionOnStreamStop false 이 경우 true스트리밍 작업이 종료될 때 스트림에 전달된 구독이 삭제됩니다.
maxBytesPerTrigger none 트리거된 각 마이크로 일괄 처리 중에 처리할 일괄 처리 크기에 대한 소프트 제한입니다.
maxRecordsPerFetch 1000 행을 처리하기 전에 태스크당 가져올 행 수입니다.
maxFetchPeriod 10s 행을 처리하기 전에 각 태스크가 가져올 기간입니다. 기간 문자열(예 1s : 1초 또는 1m 1분)을 허용합니다. Databricks는 기본값을 사용하는 것이 좋습니다.

Pub/Sub에서 증분 일괄 처리 사용

Trigger.AvailableNow를 사용하여 Pub/Sub 소스에서 사용 가능한 행을 증분 배치로 가져올 수 있습니다.

Azure Databricks는 Trigger.AvailableNow 설정을 사용하여 읽기를 시작할 때 타임스탬프를 기록합니다. 일괄 처리에서 처리되는 행에는 이전에 가져온 모든 데이터와 기록된 시작 타임스탬프보다 작은 타임스탬프가 있는 새로 게시된 행이 모두 포함됩니다. 자세한 내용은 다음을 참조하세요 AvailableNow. 증분 일괄 처리.

Pub/Sub 스트리밍 메트릭 모니터링

구조적 스트리밍 진행률 메트릭은 가져오고 처리할 준비가 된 행의 수, 가져오고 처리할 준비가 된 행의 크기, 스트림 시작 이후 표시되는 중복 항목 수를 보고합니다.

다음은 Pub/Sub 메트릭의 예입니다.

"metrics" : {
  "numDuplicatesSinceStreamStart" : "1",
  "numRecordsReadyToProcess" : "1",
  "sizeOfRecordsReadyToProcess" : "8"
}

제한 사항

Pub/Sub는 spark.speculation에서 추측 실행을 지원하지 않습니다.