인증

이 페이지에는 Azure Databricks Kafka 커넥터에 대한 가장 일반적인 인증 방법이 표시됩니다.

지원되는 인증 방법의 전체 목록은 Kafka 설명서에서 찾을 수 있습니다. 인증 옵션 참조는 인증을 참조 하세요.

서비스 주체와 Azure Event Hubs 연결

Azure Databricks Microsoft Entra ID OAuth를 사용하여 Event Hubs 서비스에서 Spark 작업의 인증을 지원합니다.

AAD 인증 다이어그램

Unity 카탈로그 서비스 자격 증명으로 연결

Databricks Runtime 16.1 이상에서는 Azure Databricks Azure Event Hubs 인증하기 위한 Unity 카탈로그 서비스 자격 증명을 지원합니다. Databricks는 공유 클러스터 또는 서버리스 컴퓨팅에서 Kafka 스트리밍을 실행하는 경우 이 방법을 권장합니다.

인증에 Unity 카탈로그 서비스 자격 증명을 사용하려면 다음을 수행합니다.

  • 새 Unity 카탈로그 서비스 자격 증명을 만듭니다. 서비스 자격 증명 만들기를 참조하세요.
    • 서비스 자격 증명에 연결된 액세스 커넥터에 Azure Event Hubs 연결할 수 있는 올바른 권한이 있는지 확인합니다.
  • 원본 옵션을 databricks.serviceCredential 서비스 자격 증명의 이름으로 설정합니다.

다음 예제에서는 서비스 자격 증명을 사용하여 Kafka를 원본으로 구성합니다.

Python

kafka_options = {
  "kafka.bootstrap.servers": "<bootstrap-hostname>:9092",
  "subscribe": "<topic>",
  "databricks.serviceCredential": "<service-credential-name>",
  # Optional: set this only if Databricks can't infer the scope for your Kafka service.
  # "databricks.serviceCredential.scope": "https://<event-hubs-server>/.default",
}

df = spark.readStream.format("kafka").options(**kafka_options).load()

Scala

val kafkaOptions = Map(
  "kafka.bootstrap.servers" -> "<bootstrap-hostname>:9092",
  "subscribe" -> "<topic>",
  "databricks.serviceCredential" -> "<service-credential-name>",
  // Optional: set this only if Databricks can't infer the scope for your Kafka service.
  // "databricks.serviceCredential.scope" -> "https://<event-hubs-server>/.default",
)

val df = spark.readStream.format("kafka").options(kafkaOptions).load()

SQL

SELECT * FROM read_kafka(
  bootstrapServers => '<bootstrap-hostname>:9092',
  subscribe => '<topic>',
  serviceCredential => '<service-credential-name>'
);

메모

Unity 카탈로그 서비스 자격 증명을 사용하여 Kafka에 연결하는 경우 다음 옵션을 사용하지 마세요.

  • kafka.sasl.mechanism
  • kafka.sasl.jaas.config
  • kafka.security.protocol
  • kafka.sasl.client.callback.handler.class
  • kafka.sasl.oauthbearer.token.endpoint.url

클라이언트 ID 및 비밀로 연결

Azure Databricks 다음 컴퓨팅 환경에서 클라이언트 ID 및 비밀을 사용하여 Microsoft Entra ID 인증을 지원합니다.

  • 전용 액세스 모드로 구성된 컴퓨트에서 Databricks Runtime 12.2 LTS 이상.
  • 표준 액세스 모드로 구성된 컴퓨트에서 Databricks Runtime 14.3 LTS 이상.
  • Unity 카탈로그 없이 구성된 Lakeflow Spark 선언적 파이프라인입니다.

Azure Databricks 컴퓨팅 환경 또는 Unity 카탈로그로 구성된 Lakeflow Spark 선언적 파이프라인에서 인증서로 Microsoft Entra ID 인증을 지원하지 않습니다.

이 인증은 표준 액세스 모드를 사용하는 컴퓨팅 또는 Unity Catalog Lakeflow Spark 선언적 파이프라인에서 작동하지 않습니다.

Microsoft Entra ID 인증을 수행하려면 다음 값이 있어야 합니다.

  • 임차인 ID. Microsoft Entra ID 서비스 탭에서 찾을 수 있습니다.

  • 애플리케이션 ID라고도 하는 clientID입니다.

  • 클라이언트 암호. Databricks 작업 영역에 비밀로 추가합니다. 비밀 관리를 참조하세요.

  • EventHubs 토픽. 특정 Event Hubs 네임스페이스 페이지의 엔터티 섹션 아래 Event Hubs 섹션에서 주제 목록을 찾을 수 있습니다. 여러 주제를 사용하려면 Event Hubs 수준에서 IAM 역할을 설정할 수 있습니다.

  • EventHubs 서버. Event Hubs 네임스페이스의 개요 페이지에서 이 정보를 확인할 수 있습니다.

    Event Hubs 네임스페이스

Entra ID 사용하려면 OAuth SASL을 사용하도록 Kafka를 구성해야 합니다.

  • kafka.security.protocolSASL_SSL으로 설정합니다.
  • kafka.sasl.mechanismOAUTHBEARER으로 설정합니다.
  • kafka.sasl.login.callback.handler.class Java 클래스의 정규화된 이름으로 설정합니다. 정규화된 이름은 kafkashaded이며 Databricks의 셰이딩된 Kafka 클래스의 로그인 콜백 핸들러입니다. 정확한 클래스에 대한 내용은 아래 예제를 참조하세요.

SASL은 일반 인증 프로토콜이며 OAuth는 SASL 메커니즘입니다.

다음 예제에서는 클라이언트 ID 및 비밀로 Microsoft Entra ID 인증을 사용하여 Azure Event Hubs 연결하도록 Kafka를 구성합니다.

Python

# This is the only section you need to modify for auth purposes
# ------------------------------
tenant_id = "..."
client_id = "..."
client_secret = dbutils.secrets.get("your-scope", "your-secret-name")

event_hubs_server = "..."
event_hubs_topic = "..."
# -------------------------------

sasl_config = f'kafkashaded.org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required clientId="{client_id}" clientSecret="{client_secret}" scope="https://{event_hubs_server}/.default" ssl.protocol="SSL";'

kafka_options = {
    "kafka.bootstrap.servers": f"{event_hubs_server}:9093", # Port 9093 is the EventHubs Kafka port
    "kafka.sasl.jaas.config": sasl_config,
    "kafka.sasl.oauthbearer.token.endpoint.url": f"https://login.microsoft.com/{tenant_id}/oauth2/v2.0/token",
    "subscribe": event_hubs_topic,

    # You should not need to modify these
    "kafka.security.protocol": "SASL_SSL",
    "kafka.sasl.mechanism": "OAUTHBEARER",
    "kafka.sasl.login.callback.handler.class": "kafkashaded.org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler"
}

df = spark.readStream.format("kafka").options(**kafka_options)

display(df)

Scala

// This is the only section you need to modify for auth purposes
// -------------------------------
val tenantId = "..."
val clientId = "..."
val clientSecret = dbutils.secrets.get("your-scope", "your-secret-name")

val eventHubsServer = "..."
val eventHubsTopic = "..."
// -------------------------------

val saslConfig = s"""kafkashaded.org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required clientId="$clientId" clientSecret="$clientSecret" scope="https://$eventHubsServer/.default" ssl.protocol="SSL";"""

val kafkaOptions = Map(
  "kafka.bootstrap.servers" -> s"$eventHubsServer:9093", // Port 9093 is the EventHubs Kafka port
  "kafka.sasl.jaas.config" -> saslConfig,
  "kafka.sasl.oauthbearer.token.endpoint.url" -> s"https://login.microsoft.com/$tenantId/oauth2/v2.0/token",
  "subscribe" -> eventHubsTopic,

  // You should not need to modify these
  "kafka.security.protocol" -> "SASL_SSL",
  "kafka.sasl.mechanism" -> "OAUTHBEARER",
  "kafka.sasl.login.callback.handler.class" -> "kafkashaded.org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler"
)

val scalaDF = spark.readStream
  .format("kafka")
  .options(kafkaOptions)
  .load()

display(scalaDF)

SQL

CREATE OR REFRESH STREAMING TABLE <table_name>
AS
SELECT * FROM STREAM read_kafka(
  bootstrapServers => '<event-hubs-server>:9093',
  subscribe => '<event-hubs-topic>',
  `kafka.security.protocol` => 'SASL_SSL',
  `kafka.sasl.mechanism` => 'OAUTHBEARER',
  `kafka.sasl.jaas.config` => 'kafkashaded.org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required clientId="<client-id>" clientSecret="<client-secret>" scope="https://<event-hubs-server>/.default" ssl.protocol="SSL";',
  `kafka.sasl.oauthbearer.token.endpoint.url` => 'https://login.microsoft.com/<tenant-id>/oauth2/v2.0/token',
  `kafka.sasl.login.callback.handler.class` => 'kafkashaded.org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler'
);

SASL/PLAIN을 사용하여 인증

SASL/PLAIN(사용자 이름 및 암호) 인증을 사용하여 Kafka에 연결하려면 다음 옵션을 구성합니다. 음영 처리된 PlainLoginModule 클래스 이름을 사용합니다.

Python

kafka_options = {
  "kafka.bootstrap.servers": "<bootstrap-server>:9093",
  "subscribe": "<topic>",
  "kafka.security.protocol": "SASL_SSL",
  "kafka.sasl.mechanism": "PLAIN",
  "kafka.sasl.jaas.config":
    'kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username="<username>" password="<password>";',
}

df = spark.readStream.format("kafka").options(**kafka_options).load()

Scala

val kafkaOptions = Map(
  "kafka.bootstrap.servers" -> "<bootstrap-server>:9093",
  "subscribe" -> "<topic>",
  "kafka.security.protocol" -> "SASL_SSL",
  "kafka.sasl.mechanism" -> "PLAIN",
  "kafka.sasl.jaas.config" ->
    """kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username="<username>" password="<password>";""",
)

val df = spark.readStream.format("kafka").options(kafkaOptions).load()

SQL

SELECT * FROM STREAM read_kafka(
  bootstrapServers => '<bootstrap-server>:9093',
  subscribe => '<topic>',
  `kafka.security.protocol` => 'SASL_SSL',
  `kafka.sasl.mechanism` => 'PLAIN',
  `kafka.sasl.jaas.config` => 'kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username="<username>" password="<password>";'
);

Azure Databricks 암호를 코드에 직접 포함하는 대신 비밀로 저장하는 것이 좋습니다. 자세한 내용은 비밀 관리를 참조하세요.

SASL/SCRAM을 사용하여 인증

SASL/SCRAM(SCRAM-SHA-256 또는 SCRAM-SHA-512)을 사용하여 Kafka에 연결하려면 다음 옵션을 구성합니다. 음영 처리된 ScramLoginModule 클래스 이름을 사용합니다.

Python

kafka_options = {
  "kafka.bootstrap.servers": "<bootstrap-server>:9093",
  "subscribe": "<topic>",
  "kafka.security.protocol": "SASL_SSL",
  "kafka.sasl.mechanism": "SCRAM-SHA-512",
  "kafka.sasl.jaas.config":
    'kafkashaded.org.apache.kafka.common.security.scram.ScramLoginModule required username="<username>" password="<password>";',
}

df = spark.readStream.format("kafka").options(**kafka_options).load()

Scala

val kafkaOptions = Map(
  "kafka.bootstrap.servers" -> "<bootstrap-server>:9093",
  "subscribe" -> "<topic>",
  "kafka.security.protocol" -> "SASL_SSL",
  "kafka.sasl.mechanism" -> "SCRAM-SHA-512",
  "kafka.sasl.jaas.config" ->
    """kafkashaded.org.apache.kafka.common.security.scram.ScramLoginModule required username="<username>" password="<password>";""",
)

val df = spark.readStream.format("kafka").options(kafkaOptions).load()

SQL

SELECT * FROM STREAM read_kafka(
  bootstrapServers => '<bootstrap-server>:9093',
  subscribe => '<topic>',
  `kafka.security.protocol` => 'SASL_SSL',
  `kafka.sasl.mechanism` => 'SCRAM-SHA-512',
  `kafka.sasl.jaas.config` => 'kafkashaded.org.apache.kafka.common.security.scram.ScramLoginModule required username="<username>" password="<password>";'
);

메모

SCRAM-SHA-512SCRAM-SHA-256로 대체하십시오, 만약 Kafka 클러스터가 SCRAM-SHA-256을 사용하도록 구성된 경우.

Azure Databricks 암호를 코드에 직접 포함하는 대신 비밀로 저장하는 것이 좋습니다. 자세한 내용은 비밀 관리를 참조하세요.

SL을 사용하여 Kafka에 Azure Databricks 연결

Kafka에 대한 SSL/TLS 연결을 활성화하려면 kafka.security.protocolSSL을(를) 설정하고 접두사로 kafka.이(가) 지정된 신뢰 저장소 및 키 저장소 구성 옵션을 제공하십시오. 서버 인증(단방향 TLS)만 필요한 SSL 연결의 경우 트러스트 저장소를 사용해야 합니다. Kafka broker가 클라이언트를 인증하는 mTLS(상호 TLS)의 경우 트러스트 저장소와 키 저장소를 모두 사용해야 합니다.

다음 SSL/TLS 옵션을 사용할 수 있습니다. SSL 속성의 전체 목록은 Confluent 설명서에서 Apache Kafka SSL 구성 설명서및 SSL을 사용한 암호화 및 인증 을 참조하세요.

Option 설명
kafka.security.protocol TLS 암호화를 SSL 사용하도록 설정합니다.
kafka.ssl.truststore.location 신뢰할 수 있는 CA 인증서를 포함하는 트러스트 저장소 파일의 경로입니다.
kafka.ssl.truststore.password 트러스트 저장소 파일의 암호입니다.
kafka.ssl.truststore.type 트러스트 저장소 파일 형식(기본값: JKS)입니다.
kafka.ssl.keystore.location 클라이언트 인증서 및 프라이빗 키를 포함하는 키 저장소 파일의 경로입니다(mTLS에 필요).
kafka.ssl.keystore.password 키 저장소 파일의 암호입니다.
kafka.ssl.key.password 키 저장소의 프라이빗 키에 대한 암호입니다.
kafka.ssl.endpoint.identification.algorithm 호스트 이름 확인 알고리즘입니다. 기본값은 https입니다. 사용하지 않도록 설정하려면 빈 문자열로 설정합니다.

SSL을 사용하는 경우 Databricks는 다음을 권장합니다.

다음 예제에서는 개체 스토리지 위치 및 Databricks 비밀을 사용하여 SSL 연결을 사용하도록 설정합니다.

Python

df = (spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "<bootstrap-server>:9093")
  .option("kafka.security.protocol", "SSL")
  .option("kafka.ssl.truststore.location", <truststore-location>)
  .option("kafka.ssl.keystore.location", <keystore-location>)
  .option("kafka.ssl.keystore.password", dbutils.secrets.get(scope=<certificate-scope-name>,key=<keystore-password-key-name>))
  .option("kafka.ssl.truststore.password", dbutils.secrets.get(scope=<certificate-scope-name>,key=<truststore-password-key-name>))
)

Scala

val df = spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "<bootstrap-server>:9093")
  .option("kafka.security.protocol", "SSL")
  .option("kafka.ssl.truststore.location", <truststore-location>)
  .option("kafka.ssl.keystore.location", <keystore-location>)
  .option("kafka.ssl.keystore.password", dbutils.secrets.get(scope = <certificate-scope-name>, key = <keystore-password-key-name>))
  .option("kafka.ssl.truststore.password", dbutils.secrets.get(scope = <certificate-scope-name>, key = <truststore-password-key-name>))

SQL

SELECT * FROM read_kafka(
  bootstrapServers => '<bootstrap-server>:9093',
  subscribe => '<topic>',
  `kafka.security.protocol` => 'SSL',
  `kafka.ssl.truststore.location` => '<truststore-location>',
  `kafka.ssl.keystore.location` => '<keystore-location>',
  `kafka.ssl.keystore.password` => secret('<certificate-scope-name>', '<keystore-password-key-name>'),
  `kafka.ssl.truststore.password` => secret('<certificate-scope-name>', '<truststore-password-key-name>')
);

HDInsight의 Kafka를 Azure Databricks 연결

  1. HDInsight Kafka 클러스터를 만듭니다.

    지침은 Azure Virtual Network 통해 HDInsight의 Kafka에 연결합니다.

  2. 올바른 주소를 보급하도록 Kafka 브로커를 구성합니다.

    IP 광고를 위한 Kafka 구성의 지침을 따르세요. Azure Virtual Machines Kafka를 직접 관리하는 경우 broker의 advertised.listeners 구성이 호스트의 내부 IP로 설정되어 있는지 확인합니다.

  3. Azure Databricks 클러스터를 만듭니다.

  4. Kafka 클러스터를 Azure Databricks 클러스터에 피어합니다.

    가상 네트워크 피어링의 지침을 따르세요.

Databricks 쉐이딩된 Kafka 클래스 이름 사용

Azure Databricks Kafka 클라이언트 라이브러리의 독점적인 음영 처리된 버전을 번들합니다. 인증 구성 옵션에서 참조하는 모든 Kafka 클라이언트 클래스 이름은 표준 오픈 소스 클래스 이름 대신 음영 처리된 클래스 이름 접두사를 사용해야 합니다. 이는 , 및 kafka.sasl.jaas.config. 같은 kafka.sasl.login.callback.handler.classkafka.sasl.client.callback.handler.class옵션에서 참조되는 모든 클래스에 적용됩니다.

섀딩되지 않은 클래스 이름을 사용하는 경우 코드에서 RESTRICTED_STREAMING_OPTION_PERMISSION_ENFORCED 오류가 발생합니다. 자세한 내용은 FAQ 를 참조하세요.

잠재적 오류 처리

  • 새 만들기 실패 KafkaAdminClient

    인증 옵션이 잘못되면 이 내부 Kafka 오류가 발생합니다.

    • 클라이언트 ID (응용 프로그램 ID라고도 함)
    • 임차인 ID
    • Event Hubs 서버

    오류를 해결하려면 이러한 옵션에 대한 값이 올바른지 확인합니다. 또한 예제에서 기본적으로 제공되는 구성 옵션(예: kafka.security.protocol)을 수정하면 이 오류가 표시될 수 있습니다.

  • 레코드가 반환되지 않음

    DataFrame을 표시하거나 처리하려고 하지만 결과가 표시되지 않는 경우 UI에 다음이 표시됩니다.

    결과 메시지 없음

    이 메시지는 인증에 성공했지만 EventHubs에서 데이터를 반환하지 않았음을 의미합니다. 가능한 몇 가지 이유는 다음과 같습니다(완전한 목록은 아님):

    • 잘못된 EventHubs 항목을 지정했습니다.
    • 기본 Kafka 구성 옵션은 startingOffsets로 설정되어 있으며, 현재 토픽을 통해 아직 데이터를 받고 있지 않습니다. startingOffsetsearliest로 설정하여 Kafka의 초기 오프셋에서부터 데이터 읽기를 시작할 수 있습니다.