Bemærk
Adgang til denne side kræver godkendelse. Du kan prøve at logge på eller ændre mapper.
Adgang til denne side kræver godkendelse. Du kan prøve at ændre mapper.
RocksDB is the default state store provider in Databricks Runtime 17.3 and above. For Databricks Runtime versions below 17.3, you can enable RocksDB-based state management by setting the following configuration in the SparkSession before starting the streaming query.
spark.conf.set(
"spark.sql.streaming.stateStore.providerClass",
"com.databricks.sql.streaming.state.RocksDBStateStoreProvider")
You can enable RocksDB on Lakeflow Spark Declarative Pipelines. See Optimize pipeline configuration for stateful processing.
Enable changelog checkpointing
In Databricks Runtime 13.3 LTS and above, you can enable changelog checkpointing to lower checkpoint duration and end-to-end latency for Structured Streaming workloads. Databricks recommends enabling changelog checkpointing for all Structured Streaming stateful queries. Changelog checkpointing is enabled by default in Databricks Runtime 17.3 and above.
Traditionally RocksDB State Store snapshots and uploads data files during checkpointing. To avoid this cost, changelog checkpointing only writes records that have changed since the last checkpoint to durable storage.”
Changelog checkpointing is disabled by default on Databricks Runtime versions below 17.3. You can enable changelog checkpointing in the SparkSession level using the following syntax:
spark.conf.set(
"spark.sql.streaming.stateStore.rocksdb.changelogCheckpointing.enabled", "true")
You can enable changelog checkpointing on an existing stream and maintain the state information stored in the checkpoint.
Important
Queries that have enabled changelog checkpointing can only be run on Databricks Runtime 13.3 LTS and above. You can disable changelog checkpointing to revert to legacy checkpointing behavior, but you must continue to run these queries on Databricks Runtime 13.3 LTS or above. You must restart the job for these changes to take place.
RocksDB state store metrics
Each state operator collects metrics related to the state management operations performed on its RocksDB instance to observe the state store and potentially help in debugging job slowness.
In Databricks Runtime 16.4 LTS and above, the metrics for a specific state store instance are labeled with their partition ID and store name, ensuring they remain separate. All other metrics are reported as the aggregate sum for each state operator across all tasks where the state operator is running.
These metrics are part of the customMetrics map inside the stateOperators fields in StreamingQueryProgress. The following is an example of StreamingQueryProgress in JSON form (obtained using StreamingQueryProgress.json()).
{
"id": "6774075e-8869-454b-ad51-513be86cfd43",
"runId": "3d08104d-d1d4-4d1a-b21e-0b2e1fb871c5",
"batchId": 7,
"stateOperators": [
{
"numRowsTotal": 20000000,
"numRowsUpdated": 20000000,
"memoryUsedBytes": 31005397,
"numRowsDroppedByWatermark": 0,
"customMetrics": {
"SnapshotLastUploaded.partition_0_default": 7,
"SnapshotLastUploaded.partition_1_default": 7,
"SnapshotLastUploaded.partition_2_default": 6,
"SnapshotLastUploaded.partition_3_default": 6,
"SnapshotLastUploaded.partition_4_default": -1,
"rocksdbBytesCopied": 141037747,
"rocksdbCommitCheckpointLatency": 2,
"rocksdbCommitCompactLatency": 22061,
"rocksdbCommitFileSyncLatencyMs": 1710,
"rocksdbCommitFlushLatency": 19032,
"rocksdbCommitPauseLatency": 0,
"rocksdbCommitWriteBatchLatency": 56155,
"rocksdbFilesCopied": 2,
"rocksdbFilesReused": 0,
"rocksdbGetCount": 40000000,
"rocksdbGetLatency": 21834,
"rocksdbPutCount": 1,
"rocksdbPutLatency": 56155599000,
"rocksdbReadBlockCacheHitCount": 1988,
"rocksdbReadBlockCacheMissCount": 40341617,
"rocksdbSstFileSize": 141037747,
"rocksdbTotalBytesReadByCompaction": 336853375,
"rocksdbTotalBytesReadByGet": 680000000,
"rocksdbTotalBytesReadThroughIterator": 0,
"rocksdbTotalBytesWrittenByCompaction": 141037747,
"rocksdbTotalBytesWrittenByPut": 740000012,
"rocksdbTotalCompactionLatencyMs": 21949695000,
"rocksdbWriterStallLatencyMs": 0,
"rocksdbZipFileBytesUncompressed": 7038
}
}
],
"sources": [{}],
"sink": {}
}
Detailed descriptions of the metrics are as follows:
| Metric name | Description |
|---|---|
| rocksdbCommitWriteBatchLatency | Time (in millis) took for applying the staged writes in in-memory structure (WriteBatch) to native RocksDB. |
| rocksdbCommitFlushLatency | Time (in millis) took for flushing the RocksDB in-memory changes to local disk. |
| rocksdbCommitCompactLatency | Time (in millis) took for compaction (optional) during the checkpoint commit. |
| rocksdbCommitPauseLatency | Time (in millis) took for stopping the background worker threads (for compaction etc.) as part of the checkpoint commit. |
| rocksdbCommitCheckpointLatency | Time (in millis) took for taking a snapshot of native RocksDB and write it to a local directory. |
| rocksdbCommitFileSyncLatencyMs | Time (in millis) took for syncing the native RocksDB snapshot related files to an external storage (checkpoint location). |
| rocksdbGetLatency | Average time (in nanos) took per the underlying native RocksDB::Get call. |
| rocksdbPutCount | Average time (in nanos) took per the underlying native RocksDB::Put call. |
| rocksdbGetCount | Number of native RocksDB::Get calls (doesn't include Gets from WriteBatch - in memory batch used for staging writes). |
| rocksdbPutCount | Number of native RocksDB::Put calls (doesn't include Puts to WriteBatch - in memory batch used for staging writes). |
| rocksdbTotalBytesReadByGet | Number of uncompressed bytes read through native RocksDB::Get calls. |
| rocksdbTotalBytesWrittenByPut | Number of uncompressed bytes written through native RocksDB::Put calls. |
| rocksdbReadBlockCacheHitCount | Number of times the native RocksDB block cache is used to avoid reading data from local disk. |
| rocksdbReadBlockCacheMissCount | Number of times the native RocksDB block cache missed and required reading data from local disk. |
| rocksdbTotalBytesReadByCompaction | Number of bytes read from the local disk by the native RocksDB compaction process. |
| rocksdbTotalBytesWrittenByCompaction | Number of bytes written to the local disk by the native RocksDB compaction process. |
| rocksdbTotalCompactionLatencyMs | Time (in millis) took for RocksDB compactions (both background and the optional compaction initiated during the commit). |
| rocksdbWriterStallLatencyMs | Time (in millis) the writer has stalled due to a background compaction or flushing of the memtables to disk. |
| rocksdbTotalBytesReadThroughIterator | Some of the stateful operations (such as timeout processing in flatMapGroupsWithState or watermarking in windowed aggregations) requires reading entire data in DB through iterator. The total size of uncompressed data read using the iterator. |
Cap RocksDB memory usage
In Databricks Runtime 17.3 and above, Azure Databricks caps RocksDB memory usage per node automatically. On earlier Databricks Runtime versions, configure this manually to avoid out-of-memory errors.
RocksDB allocates memory for memtables, the block cache, and filter and index blocks. Without a limit, memory usage across multiple RocksDB instances on a node can grow indefinitely and cause out-of-memory errors. The RocksDB write buffer manager enforces a per-node memory limit across all RocksDB instances running on the node.
To cap RocksDB memory usage, set the following configurations in the Spark session before starting the streaming query:
spark.conf.set(
"spark.sql.streaming.stateStore.rocksdb.boundedMemoryUsage", "true")
spark.conf.set(
"spark.sql.streaming.stateStore.rocksdb.maxMemoryUsageMB", "<value>")
The following configurations control RocksDB memory usage:
| Configuration | Description |
|---|---|
spark.sql.streaming.stateStore.rocksdb.boundedMemoryUsage |
Enables a shared memory limit across all RocksDB instances on a node. Set to true to enable. |
spark.sql.streaming.stateStore.rocksdb.maxMemoryUsageMB |
Maximum memory in MB shared across all RocksDB instances on a node. Set this to a static value, or to a value computed as a fraction of the node's physical memory. |
spark.sql.streaming.stateStore.rocksdb.writeBufferSizeMB |
Maximum write buffer size in MB for an individual RocksDB instance. Defaults to the RocksDB internal value. |
spark.sql.streaming.stateStore.rocksdb.maxWriteBufferNumber |
Maximum number of write buffers for an individual RocksDB instance. Defaults to the RocksDB internal value. |