Connect to Apache Kafka

This page describes how you can use Apache Kafka as either a source or a sink when running Structured Streaming workloads on Azure Databricks.

For more information about Kafka, see the Apache Kafka documentation.

Read data from Kafka

Use the kafka format to configure connections to Kafka. The following is an example for a streaming read:

Python

df = (spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "<server:ip>")
  .option("subscribe", "<topic>")
  .option("startingOffsets", "latest")
  .load()
)

Scala

val df = spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "<server:ip>")
  .option("subscribe", "<topic>")
  .option("startingOffsets", "latest")
  .load()

SQL

CREATE OR REFRESH STREAMING TABLE <table_name> AS
SELECT * FROM STREAM read_kafka(
  bootstrapServers => '<server:ip>',
  subscribe => '<topic>'
);

Azure Databricks also supports batch reads from Kafka, as in the following example:

Python

df = (spark.read
  .format("kafka")
  .option("kafka.bootstrap.servers", "<server:ip>")
  .option("subscribe", "<topic>")
  .option("startingOffsets", "earliest")
  .option("endingOffsets", "latest")
  .load()
)

Scala

val df = spark.read
  .format("kafka")
  .option("kafka.bootstrap.servers", "<server:ip>")
  .option("subscribe", "<topic>")
  .option("startingOffsets", "earliest")
  .option("endingOffsets", "latest")
  .load()

SQL

SELECT * FROM read_kafka(
  bootstrapServers => '<server:ip>',
  subscribe => '<topic>',
  startingOffsets => 'earliest',
  endingOffsets => 'latest'
);

For incremental batch loading, Databricks recommends using Kafka with Trigger.AvailableNow. See AvailableNow: Incremental batch processing.

In Databricks Runtime 13.3 LTS and above, Azure Databricks also provides a SQL function for reading Kafka data. Streaming with SQL is supported only in Lakeflow Spark Declarative Pipelines or with streaming tables in Databricks SQL. See read_kafka table-valued function.

Configure Kafka Structured Streaming reader

For both batch and streaming queries, you must set the bootstrap servers for the Kafka source with the following option:

Key Value Description
kafka.bootstrap.servers A comma-separated list of host:port The Kafka cluster bootstrap servers

To set subscription topics, you must specify one of the following options:

Option Value Description
subscribe A comma-separated list of topics. The topic list to subscribe to.
subscribePattern Java regex string. The pattern used to subscribe to topic(s).
assign JSON string {"topicA":[0,1],"topic":[2,4]}. Specific topicPartitions to consume.

See the Options page for the full list of available options.

Schema for Kafka rows

The Kafka Structured Streaming reader returns rows with the following schema:

Column Type
key binary
value binary
topic string
partition int
offset long
timestamp long
timestampType int

The key and the value are always deserialized as byte arrays with the ByteArrayDeserializer. Use DataFrame operations (such as cast("string") or from_avro) to explicitly deserialize the keys and values.

Write data to Kafka

The following is an example for a streaming write to Kafka:

Python

(df.writeStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "<server:ip>")
  .option("topic", "<topic>")
  .start()
)

Scala

df.writeStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "<server:ip>")
  .option("topic", "<topic>")
  .start()

Azure Databricks also supports batch write semantics to Kafka data sinks, as shown in the following example:

Python

(df.write
  .format("kafka")
  .option("kafka.bootstrap.servers", "<server:ip>")
  .option("topic", "<topic>")
  .save()
)

Scala

df.write
  .format("kafka")
  .option("kafka.bootstrap.servers", "<server:ip>")
  .option("topic", "<topic>")
  .save()

Configure the Kafka Structured Streaming writer

Important

Databricks Runtime 13.3 LTS and above includes a newer version of the kafka-clients library that enables idempotent writes by default. If a Kafka sink uses version 2.8.0 or below with ACLs configured, but without IDEMPOTENT_WRITE enabled, the write fails with the error message org.apache.kafka.common.KafkaException: Cannot execute transactional method because we are in an error state.

Resolve this error by upgrading to Kafka version 2.8.0 or above, or by setting .option(“kafka.enable.idempotence”, “false”) while configuring your Structured Streaming writer.

The following are common options for writes to Kafka:

Key Value Default value Description
kafka.boostrap.servers A comma-separated list of <host:port> none Required. The Kafka bootstrap.servers configuration.
topic STRING not set Optional. Sets the topic for all rows to be written. This option overrides any topic column that exists in the data.
includeHeaders BOOLEAN false Optional. Whether to include the Kafka headers in the row.

See the Options page for the full list of available options.

Schema for Kafka writer

When writing data to Kafka, the provided DataFrame may include the following fields:

Column name Required or optional Type
key optional STRING or BINARY
value required STRING or BINARY
headers optional ARRAY
topic optional (ignored if topic is set as writer option) STRING
partition optional INT

Authentication

Azure Databricks supports multiple authentication methods for Kafka, including Unity Catalog service credentials, SASL/SSL, and cloud-specific options for AWS MSK, Azure Event Hubs, and Google Cloud Managed Kafka. See Authentication.

Retrieve Kafka metrics

To monitor lag behind Kafka for a streaming query, use the avgOffsetsBehindLatest, maxOffsetsBehindLatest, and minOffsetsBehindLatest metrics. These metrics report the average, maximum, and minimum offset lag across all subscribed topic partitions, relative to the latest offsets in Kafka. See Reading Metrics Interactively.

Note

In Databricks Runtime 17.1 and above, the latest Kafka offsets are fetched after each micro-batch completes. On topics that continuously receive data, backlog metrics may show small, persistent non-zero values. This is expected behavior and does not indicate that the stream is falling behind.

In Databricks Runtime 17.0 and below, the latest Kafka offsets are fetched at micro-batch start time. Backlog metrics may return 0 when streaming queries consistently consume all records available at the start of the micro-batch.

To estimate the remaining data for a query to read, use the estimatedTotalBytesBehindLatest metric. This metric estimates the total number of bytes remaining across all subscribed partitions based on the batches processed in the last 300 seconds. You can modify the time window used for this estimate by setting the bytesEstimateWindowLength option.

For example, to set the window length to 10 minutes:

Python

df = (spark.readStream
  .format("kafka")
  .option("bytesEstimateWindowLength", "10m") # m for minutes, you can also use "600s" for 600 seconds
)

Scala

val df = spark.readStream
  .format("kafka")
  .option("bytesEstimateWindowLength", "10m") // m for minutes, you can also use "600s" for 600 seconds

If you are running the stream in a notebook, you can see these metrics under the Raw Data tab in the streaming query progress dashboard:

{
  "sources": [
    {
      "description": "KafkaV2[Subscribe[topic]]",
      "metrics": {
        "avgOffsetsBehindLatest": "4.0",
        "maxOffsetsBehindLatest": "4",
        "minOffsetsBehindLatest": "4",
        "estimatedTotalBytesBehindLatest": "80.0"
      }
    }
  ]
}

See Monitoring Structured Streaming queries on Azure Databricks for more information.

Example for Kafka to Delta Lake

The following example shows a complete workflow for continuously streaming data from Kafka to a Delta Lake table. You can use this approach for near real time data ingestion workloads.

This example uses a fixed JSON schema. For other formats like Avro or Protobuf, use from_avro or from_protobuf. You can also integrate with a schema registry. See Example with Schema Registry.

Python

from pyspark.sql.functions import from_json, col

# Define simple JSON schemas for key and value
key_schema = "user_id STRING"
value_schema = "event_type STRING, event_ts TIMESTAMP"

# Configure Kafka options with service credentials
kafka_options = {
  "kafka.bootstrap.servers": "<bootstrap-server>:9092",
  "subscribe": "<topic-name>",
  "databricks.serviceCredential": "<service-credential-name>",
}

# Read from Kafka and parse JSON
parsed_df = (spark.readStream
  .format("kafka")
  .options(**kafka_options)
  .load()
  .select(
    from_json(col("key").cast("string"), key_schema).alias("key"),
    from_json(col("value").cast("string"), value_schema).alias("value")
  )
  .select("key.*", "value.*")
)

# Write to Delta table
query = (parsed_df.writeStream
  .format("delta")
  .option("checkpointLocation", "/path/to/checkpoint")
  .trigger(processingTime="10 seconds")
  .toTable("catalog.schema.events_table")
)

query.awaitTermination()

Scala

import org.apache.spark.sql.functions.{from_json, col}
import org.apache.spark.sql.streaming.Trigger

// Define JSON schemas for key and value
val keySchema = "user_id STRING"
val valueSchema = "event_type STRING, event_ts TIMESTAMP"

// Configure Kafka options with service credentials
val kafkaOptions = Map(
  "kafka.bootstrap.servers" -> "<bootstrap-server>:9092",
  "subscribe" -> "<topic-name>",
  "databricks.serviceCredential" -> "<service-credential-name>"
)

// Read from Kafka and parse JSON
val parsedDF = spark.readStream
  .format("kafka")
  .options(kafkaOptions)
  .load()
  .select(
    from_json(col("key").cast("string"), keySchema).alias("key"),
    from_json(col("value").cast("string"), valueSchema).alias("value")
  )
  .select("key.*", "value.*")

// Write to Delta table
val query = parsedDF.writeStream
  .format("delta")
  .option("checkpointLocation", "/path/to/checkpoint")
  .trigger(Trigger.ProcessingTime("10 seconds"))
  .toTable("catalog.schema.events_table")

query.awaitTermination()

SQL

-- Create a streaming table from Kafka using read_kafka
CREATE OR REFRESH STREAMING TABLE catalog.schema.events_table AS
SELECT
  key::string:user_id AS user_id,
  value::string:event_type AS event_type,
  to_timestamp(value::string:event_ts) AS event_ts
FROM STREAM read_kafka(
  bootstrapServers => '<bootstrap-server>:9092',
  subscribe => '<topic-name>',
  serviceCredential => '<service-credential-name>'
);