Autenticazione

Questa pagina mostra i metodi di autenticazione più comuni per il connettore Kafka in Azure Databricks.

L'elenco completo dei metodi di autenticazione supportati è disponibile nella documentazione di Kafka.

Connetti a Hub eventi di Azure con un'entità servizio

Azure Databricks supporta l'autenticazione dei processi Spark con i servizi Event Hubs tramite OAuth con Microsoft Entra ID.

Diagramma di autenticazione AAD

Connettersi alle credenziali del servizio di Catalogo Unity

In Databricks Runtime 16.1 e versioni successive, Azure Databricks supporta le credenziali del servizio Catalogo Unity per l'autenticazione in Hub eventi di Azure. Databricks consiglia questo approccio se si esegue lo streaming Kafka in cluster condivisi o in un ambiente di calcolo serverless.

Per usare le credenziali del servizio Catalogo Unity per l'autenticazione, eseguire le operazioni seguenti:

  • Creare una nuova credenziale del servizio Catalogo Unity. Vedere Creare le credenziali del servizio.
    • Verificare che il connettore di accesso collegato alle credenziali del servizio disponga delle autorizzazioni corrette per connettersi a Hub eventi di Azure.
  • Impostare l'opzione databricks.serviceCredential di origine sul nome delle credenziali del servizio.

L'esempio seguente configura Kafka come origine usando le credenziali del servizio:

Python

kafka_options = {
  "kafka.bootstrap.servers": "<bootstrap-hostname>:9092",
  "subscribe": "<topic>",
  "databricks.serviceCredential": "<service-credential-name>",
  # Optional: set this only if Databricks can't infer the scope for your Kafka service.
  # "databricks.serviceCredential.scope": "https://<event-hubs-server>/.default",
}

df = spark.readStream.format("kafka").options(**kafka_options).load()

Scala

val kafkaOptions = Map(
  "kafka.bootstrap.servers" -> "<bootstrap-hostname>:9092",
  "subscribe" -> "<topic>",
  "databricks.serviceCredential" -> "<service-credential-name>",
  // Optional: set this only if Databricks can't infer the scope for your Kafka service.
  // "databricks.serviceCredential.scope" -> "https://<event-hubs-server>/.default",
)

val df = spark.readStream.format("kafka").options(kafkaOptions).load()

SQL

SELECT * FROM read_kafka(
  bootstrapServers => '<bootstrap-hostname>:9092',
  subscribe => '<topic>',
  serviceCredential => '<service-credential-name>'
);

Annotazioni

Quando si usa una credenziale del servizio Catalogo Unity per connettersi a Kafka, non usare le opzioni seguenti:

  • kafka.sasl.mechanism
  • kafka.sasl.jaas.config
  • kafka.security.protocol
  • kafka.sasl.client.callback.handler.class
  • kafka.sasl.oauthbearer.token.endpoint.url

Connettersi con un ID client e un segreto

Azure Databricks supporta l'autenticazione Microsoft Entra ID con un ID client e un segreto negli ambienti di calcolo seguenti:

  • Databricks Runtime 12.2 LTS e versioni successive nel calcolo configurato con modalità di accesso dedicato.
  • Databricks Runtime 14.3 LTS e versioni successive nei cluster di calcolo configurati con modalità di accesso standard.
  • Pipeline dichiarative di Lakeflow Spark configurate senza il catalogo unity.

Azure Databricks non supporta l'autenticazione Microsoft Entra ID con un certificato in qualsiasi ambiente di calcolo o in Pipeline dichiarative spark di Lakeflow configurate con Unity Catalog.

Questa autenticazione non funziona sul calcolo con la modalità di accesso standard o sulle pipeline dichiarative di Unity Catalog Lakeflow Spark.

Per eseguire l'autenticazione con Microsoft Entra ID, è necessario avere i valori seguenti:

  • Un ID del tenant. È possibile trovarla nella scheda servizi Microsoft Entra ID.

  • ClientID, noto anche come ID applicazione.

  • Un segreto del cliente. Aggiungerlo come segreto all'area di lavoro di Databricks. Vedere Gestione dei segreti.

  • Argomento di EventHubs. È possibile trovare un elenco di argomenti nella sezione Hub degli Eventi sotto la sezione Entità su una pagina specifica dello spazio dei nomi di Event Hubs. Per lavorare con più argomenti, è possibile impostare il ruolo IAM a livello di Event Hubs.

  • Un server EventHubs. È possibile trovarla nella pagina di panoramica del namespace di Event Hubs specifico:

    Namespace di Event Hubs

Per usare Entra ID, è necessario configurare Kafka per l'uso di OAuth SASL:

  • Impostare kafka.security.protocol su SASL_SSL
  • Impostare kafka.sasl.mechanism su OAUTHBEARER
  • Impostare kafka.sasl.login.callback.handler.class come nome completo della classe Java. Il nome qualificato è kafkashaded e il gestore di callback di accesso della classe Kafka shaded di Databricks. Vedere l'esempio seguente per la classe esatta.

SASL è un protocollo di autenticazione generico e OAuth è un meccanismo SASL.

L'esempio seguente configura Kafka per la connessione a Hub eventi di Azure usando l'autenticazione Microsoft Entra ID con un ID client e un segreto:

Python

# This is the only section you need to modify for auth purposes
# ------------------------------
tenant_id = "..."
client_id = "..."
client_secret = dbutils.secrets.get("your-scope", "your-secret-name")

event_hubs_server = "..."
event_hubs_topic = "..."
# -------------------------------

sasl_config = f'kafkashaded.org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required clientId="{client_id}" clientSecret="{client_secret}" scope="https://{event_hubs_server}/.default" ssl.protocol="SSL";'

kafka_options = {
    "kafka.bootstrap.servers": f"{event_hubs_server}:9093", # Port 9093 is the EventHubs Kafka port
    "kafka.sasl.jaas.config": sasl_config,
    "kafka.sasl.oauthbearer.token.endpoint.url": f"https://login.microsoft.com/{tenant_id}/oauth2/v2.0/token",
    "subscribe": event_hubs_topic,

    # You should not need to modify these
    "kafka.security.protocol": "SASL_SSL",
    "kafka.sasl.mechanism": "OAUTHBEARER",
    "kafka.sasl.login.callback.handler.class": "kafkashaded.org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler"
}

df = spark.readStream.format("kafka").options(**kafka_options)

display(df)

Scala

// This is the only section you need to modify for auth purposes
// -------------------------------
val tenantId = "..."
val clientId = "..."
val clientSecret = dbutils.secrets.get("your-scope", "your-secret-name")

val eventHubsServer = "..."
val eventHubsTopic = "..."
// -------------------------------

val saslConfig = s"""kafkashaded.org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required clientId="$clientId" clientSecret="$clientSecret" scope="https://$eventHubsServer/.default" ssl.protocol="SSL";"""

val kafkaOptions = Map(
  "kafka.bootstrap.servers" -> s"$eventHubsServer:9093", // Port 9093 is the EventHubs Kafka port
  "kafka.sasl.jaas.config" -> saslConfig,
  "kafka.sasl.oauthbearer.token.endpoint.url" -> s"https://login.microsoft.com/$tenantId/oauth2/v2.0/token",
  "subscribe" -> eventHubsTopic,

  // You should not need to modify these
  "kafka.security.protocol" -> "SASL_SSL",
  "kafka.sasl.mechanism" -> "OAUTHBEARER",
  "kafka.sasl.login.callback.handler.class" -> "kafkashaded.org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler"
)

val scalaDF = spark.readStream
  .format("kafka")
  .options(kafkaOptions)
  .load()

display(scalaDF)

SQL

CREATE OR REFRESH STREAMING TABLE <table_name>
AS
SELECT * FROM STREAM read_kafka(
  bootstrapServers => '<event-hubs-server>:9093',
  subscribe => '<event-hubs-topic>',
  `kafka.security.protocol` => 'SASL_SSL',
  `kafka.sasl.mechanism` => 'OAUTHBEARER',
  `kafka.sasl.jaas.config` => 'kafkashaded.org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required clientId="<client-id>" clientSecret="<client-secret>" scope="https://<event-hubs-server>/.default" ssl.protocol="SSL";',
  `kafka.sasl.oauthbearer.token.endpoint.url` => 'https://login.microsoft.com/<tenant-id>/oauth2/v2.0/token',
  `kafka.sasl.login.callback.handler.class` => 'kafkashaded.org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler'
);

Usare SASL/PLAIN per l'autenticazione

Per connettersi a Kafka usando l'autenticazione SASL/PLAIN (nome utente e password), configurare le opzioni seguenti. Usare il nome della classe ombreggiata PlainLoginModule :

Python

kafka_options = {
  "kafka.bootstrap.servers": "<bootstrap-server>:9093",
  "subscribe": "<topic>",
  "kafka.security.protocol": "SASL_SSL",
  "kafka.sasl.mechanism": "PLAIN",
  "kafka.sasl.jaas.config":
    'kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username="<username>" password="<password>";',
}

df = spark.readStream.format("kafka").options(**kafka_options).load()

Scala

val kafkaOptions = Map(
  "kafka.bootstrap.servers" -> "<bootstrap-server>:9093",
  "subscribe" -> "<topic>",
  "kafka.security.protocol" -> "SASL_SSL",
  "kafka.sasl.mechanism" -> "PLAIN",
  "kafka.sasl.jaas.config" ->
    """kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username="<username>" password="<password>";""",
)

val df = spark.readStream.format("kafka").options(kafkaOptions).load()

SQL

SELECT * FROM STREAM read_kafka(
  bootstrapServers => '<bootstrap-server>:9093',
  subscribe => '<topic>',
  `kafka.security.protocol` => 'SASL_SSL',
  `kafka.sasl.mechanism` => 'PLAIN',
  `kafka.sasl.jaas.config` => 'kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username="<username>" password="<password>";'
);

Azure Databricks consiglia di archiviare la password come segreto anziché includerla direttamente nel codice. Per altre informazioni, vedere Gestione dei segreti.

Usare SASL/SCRAM per l'autenticazione

Per connettersi a Kafka usando SASL/SCRAM (SCRAM-SHA-256 o SCRAM-SHA-512), configurare le opzioni seguenti. Usare il nome della classe ombreggiata ScramLoginModule :

Python

kafka_options = {
  "kafka.bootstrap.servers": "<bootstrap-server>:9093",
  "subscribe": "<topic>",
  "kafka.security.protocol": "SASL_SSL",
  "kafka.sasl.mechanism": "SCRAM-SHA-512",
  "kafka.sasl.jaas.config":
    'kafkashaded.org.apache.kafka.common.security.scram.ScramLoginModule required username="<username>" password="<password>";',
}

df = spark.readStream.format("kafka").options(**kafka_options).load()

Scala

val kafkaOptions = Map(
  "kafka.bootstrap.servers" -> "<bootstrap-server>:9093",
  "subscribe" -> "<topic>",
  "kafka.security.protocol" -> "SASL_SSL",
  "kafka.sasl.mechanism" -> "SCRAM-SHA-512",
  "kafka.sasl.jaas.config" ->
    """kafkashaded.org.apache.kafka.common.security.scram.ScramLoginModule required username="<username>" password="<password>";""",
)

val df = spark.readStream.format("kafka").options(kafkaOptions).load()

SQL

SELECT * FROM STREAM read_kafka(
  bootstrapServers => '<bootstrap-server>:9093',
  subscribe => '<topic>',
  `kafka.security.protocol` => 'SASL_SSL',
  `kafka.sasl.mechanism` => 'SCRAM-SHA-512',
  `kafka.sasl.jaas.config` => 'kafkashaded.org.apache.kafka.common.security.scram.ScramLoginModule required username="<username>" password="<password>";'
);

Annotazioni

Sostituire SCRAM-SHA-512 con SCRAM-SHA-256 se il cluster Kafka è configurato per l'uso di SCRAM-SHA-256.

Azure Databricks consiglia di archiviare la password come segreto anziché includerla direttamente nel codice. Per altre informazioni, vedere Gestione dei segreti.

Usare SSL per connettere Azure Databricks a Kafka

Per abilitare le connessioni SSL/TLS a Kafka, impostare kafka.security.protocol su SSL e fornire le opzioni di configurazione del truststore e del keystore, precedute da kafka.. Per le connessioni SSL che richiedono solo l'autenticazione del server (TLS unidirezionale), è necessario usare un truststore. Per il TLS reciproco (mTLS), in cui anche il broker Kafka autentica il client, è necessario utilizzare sia un truststore sia un keystore.

Sono disponibili le opzioni SSL/TLS seguenti. Per l'elenco completo delle proprietà SSL, vedere la documentazione sulla configurazione ssl di Apache Kafka e Crittografia e autenticazione con SSL nella documentazione di Confluent.

Opzione Descrizione
kafka.security.protocol Impostare su SSL per abilitare la crittografia TLS.
kafka.ssl.truststore.location Percorso del file dell'archivio attendibilità contenente certificati CA attendibili.
kafka.ssl.truststore.password Password per il file dell'archivio di fiducia.
kafka.ssl.truststore.type Formato file dell'archivio attendibilità (impostazione predefinita: JKS).
kafka.ssl.keystore.location Percorso del file dell'archivio chiavi contenente il certificato client e la chiave privata (obbligatorio per mTLS).
kafka.ssl.keystore.password Password per il file dell'archivio chiavi.
kafka.ssl.key.password Password per la chiave privata nell'archivio chiavi.
kafka.ssl.endpoint.identification.algorithm Algoritmo di verifica del nome host. Il valore predefinito è https. Configurare una stringa vuota per disabilitarlo.

Se si usa SSL, Databricks consiglia di:

L'esempio seguente usa i percorsi di archiviazione degli oggetti e i segreti di Databricks per abilitare una connessione SSL:

Python

df = (spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "<bootstrap-server>:9093")
  .option("kafka.security.protocol", "SSL")
  .option("kafka.ssl.truststore.location", <truststore-location>)
  .option("kafka.ssl.keystore.location", <keystore-location>)
  .option("kafka.ssl.keystore.password", dbutils.secrets.get(scope=<certificate-scope-name>,key=<keystore-password-key-name>))
  .option("kafka.ssl.truststore.password", dbutils.secrets.get(scope=<certificate-scope-name>,key=<truststore-password-key-name>))
)

Scala

val df = spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "<bootstrap-server>:9093")
  .option("kafka.security.protocol", "SSL")
  .option("kafka.ssl.truststore.location", <truststore-location>)
  .option("kafka.ssl.keystore.location", <keystore-location>)
  .option("kafka.ssl.keystore.password", dbutils.secrets.get(scope = <certificate-scope-name>, key = <keystore-password-key-name>))
  .option("kafka.ssl.truststore.password", dbutils.secrets.get(scope = <certificate-scope-name>, key = <truststore-password-key-name>))

SQL

SELECT * FROM read_kafka(
  bootstrapServers => '<bootstrap-server>:9093',
  subscribe => '<topic>',
  `kafka.security.protocol` => 'SSL',
  `kafka.ssl.truststore.location` => '<truststore-location>',
  `kafka.ssl.keystore.location` => '<keystore-location>',
  `kafka.ssl.keystore.password` => secret('<certificate-scope-name>', '<keystore-password-key-name>'),
  `kafka.ssl.truststore.password` => secret('<certificate-scope-name>', '<truststore-password-key-name>')
);

Connettere Kafka in HDInsight a Azure Databricks

  1. Creare un cluster Kafka in HDInsight.

    Per istruzioni, vedere Connect to Kafka on HDInsight through an Rete virtuale di Azure (Connettersi a Kafka in HDInsight tramite un Rete virtuale di Azure.

  2. Configurare i broker Kafka per pubblicizzare l'indirizzo corretto.

    Seguire le istruzioni in Configurare Kafka per la pubblicità IP. Se gestisci Kafka da solo su Macchine virtuali di Azure, assicurati che la configurazione advertised.listeners dei broker sia impostata sull'indirizzo IP interno degli host.

  3. Creare un cluster Azure Databricks.

  4. Collegare il cluster Kafka al cluster Azure Databricks.

    Seguire le istruzioni in Reti peer virtuali.

Usare i nomi delle classi Kafka incapsulate di Databricks

Azure Databricks aggrega versioni proprietarie e ombreggiate delle librerie client Kafka. Tutti i nomi delle classi client Kafka a cui si fa riferimento nelle opzioni di configurazione dell'autenticazione devono usare il prefisso del nome della classe ombreggiata anziché il nome della classe open source standard. Questo vale per qualsiasi classe a cui si fa riferimento in opzioni come kafka.sasl.jaas.config, kafka.sasl.login.callback.handler.classe kafka.sasl.client.callback.handler.class.

Se si usano nomi di classe non oscurati, il codice genera un RESTRICTED_STREAMING_OPTION_PERMISSION_ENFORCED errore. Per altri dettagli, vedere le domande frequenti .

Gestione degli errori potenziali

  • Impossibile creare un nuovo KafkaAdminClient

    Questo errore kafka interno viene generato se una delle opzioni di autenticazione seguenti non è corretta:

    • ID client (detto anche ID applicazione)
    • ID dell'inquilino
    • Server di Hub eventi

    Per risolvere l'errore, verificare che i valori siano corretti per queste opzioni. Inoltre, è possibile che questo errore venga visualizzato se si modificano le opzioni di configurazione fornite per impostazione predefinita nell'esempio , ad esempio kafka.security.protocol.

  • Nessun record restituito

    Se si sta tentando di visualizzare o elaborare il dataframe ma non si ottengono risultati, nell'interfaccia utente verrà visualizzato quanto segue.

    Nessun messaggio dei risultati

    Questo messaggio indica che l'autenticazione ha avuto esito positivo, ma EventHubs non ha restituito dati. Alcuni possibili motivi (anche se non esaustivi) sono:

    • È stato specificato l'argomento EventHubs errato.
    • L'opzione di configurazione predefinita di Kafka per startingOffsets è latest, e attualmente non si ricevono dati tramite il topic. È possibile impostare startingOffsets su earliest per iniziare a leggere i dati a partire dai primi offset di Kafka.