Note
Access to this page requires authorization. You can try signing in or changing directories.
Access to this page requires authorization. You can try changing directories.
This tutorial shows how to perform bulk operations in the Azure Cosmos DB Java V4 SDK. This version of the SDK includes the bulk executor library. If you're using an older version of the Java SDK, migrate to the latest version. The Azure Cosmos DB Java V4 SDK is the current recommended solution for Java bulk support.
Currently, the bulk executor library is supported only by Azure Cosmos DB for NoSQL and API for Gremlin accounts. To learn about using the bulk executor .NET library with API for Gremlin, see perform bulk operations in Azure Cosmos DB for Gremlin.
Prerequisites
If you don't have an Azure subscription, create a free account before you begin. Or, you can use the Azure Cosmos DB Emulator with the
https://localhost:8081endpoint. The Primary Key is provided in Authenticating requests.Java Development Kit (JDK) 1.8+
On Ubuntu, run
apt-get install default-jdkto install the JDK.Be sure to set the JAVA_HOME environment variable to point to the folder where the JDK is installed.
Download and install a Maven binary archive
- On Ubuntu, you can run
apt-get install mavento install Maven.
- On Ubuntu, you can run
Create an Azure Cosmos DB for NoSQL account by using the steps described in the create database account section of the Java quickstart article.
Clone the sample application
Download a sample repository for the Java V4 SDK from GitHub. These sample applications perform CRUD operations and other common operations on Azure Cosmos DB. To clone the repository, open a command prompt, go to the directory where you want to copy the application, and run the following command:
git clone https://github.com/Azure-Samples/azure-cosmos-java-sql-api-samples
The cloned repository contains a sample SampleBulkQuickStartAsync.java in the azure-cosmos-java-sql-api-samples/src/main/java/com/azure/cosmos/examples/bulk/async folder. The application generates documents and executes operations to bulk create, upsert, replace, and delete items in Azure Cosmos DB. The following sections review the code in the sample app.
Bulk execution in Azure Cosmos DB
- The Azure Cosmos DB's connection strings are read as arguments and assigned to variables defined in /
examples/common/AccountSettings.javafile. These environment variables must be set
ACCOUNT_HOST=your account hostname;ACCOUNT_KEY=your account primary key
To run the bulk sample, specify its Main Class:
com.azure.cosmos.examples.bulk.async.SampleBulkQuickStartAsync
- The
CosmosAsyncClientobject is initialized by using the following statements:
client = new CosmosClientBuilder()
.endpoint(AccountSettings.HOST)
.credential(new DefaultAzureCredentialBuilder().build())
.preferredRegions(preferredRegions)
.contentResponseOnWriteEnabled(true)
.consistencyLevel(ConsistencyLevel.SESSION).buildAsyncClient();
- The sample creates an async database and container. It then creates multiple documents on which bulk operations will be executed. It adds these documents to a
Flux<Family>reactive stream object:
Family andersenFamilyItem = Families.getAndersenFamilyItem();
Family wakefieldFamilyItem = Families.getWakefieldFamilyItem();
Family johnsonFamilyItem = Families.getJohnsonFamilyItem();
Family smithFamilyItem = Families.getSmithFamilyItem();
// Setup family items to create
Flux<Family> families = Flux.just(andersenFamilyItem, wakefieldFamilyItem, johnsonFamilyItem, smithFamilyItem);
- The sample contains methods for bulk create, upsert, replace, and delete. In each method we map the families documents in the BulkWriter
Flux<Family>stream to multiple method calls inCosmosBulkOperations. These operations are added to another reactive stream objectFlux<CosmosItemOperation>. The stream is then passed to theexecuteBulkOperationsmethod of the asynccontainerwe created at the beginning, and operations are executed in bulk. See bulk create method below as an example:
private void bulkCreateItems(Flux<Family> families) {
Flux<CosmosItemOperation> cosmosItemOperations = families.map(
family -> CosmosBulkOperations.getCreateItemOperation(family, new PartitionKey(family.getLastName())));
container.executeBulkOperations(cosmosItemOperations).blockLast();
}
- The
BulkWriter.javaclass in the same directory as the sample application demonstrates how to handle rate limiting (429) and timeout (408) errors that occur during bulk execution and how to retry those operations. The following methods also show how to implement local and global throughput control.
private void bulkUpsertItemsWithBulkWriterAbstraction() {
Family andersenFamilyItem = Families.getAndersenFamilyItem();
Family wakefieldFamilyItem = Families.getWakefieldFamilyItem();
CosmosItemOperation wakeFieldItemOperation = CosmosBulkOperations.getUpsertItemOperation(wakefieldFamilyItem, new PartitionKey(wakefieldFamilyItem.getLastName()));
CosmosItemOperation andersonItemOperation = CosmosBulkOperations.getUpsertItemOperation(andersenFamilyItem, new PartitionKey(andersenFamilyItem.getLastName()));
BulkWriter bulkWriter = new BulkWriter(container);
bulkWriter.scheduleWrites(andersonItemOperation);
bulkWriter.scheduleWrites(wakeFieldItemOperation);
bulkWriter.execute().subscribe();
}
private void bulkUpsertItemsWithBulkWriterAbstractionAndLocalThroughPutControl() {
ThroughputControlGroupConfig groupConfig =
new ThroughputControlGroupConfigBuilder()
.setGroupName("group1")
.setTargetThroughput(200)
.build();
container.enableLocalThroughputControlGroup(groupConfig);
Family andersenFamilyItem = Families.getAndersenFamilyItem();
Family wakefieldFamilyItem = Families.getWakefieldFamilyItem();
CosmosItemOperation wakeFieldItemOperation = CosmosBulkOperations.getUpsertItemOperation(wakefieldFamilyItem, new PartitionKey(wakefieldFamilyItem.getLastName()));
CosmosItemOperation andersonItemOperation = CosmosBulkOperations.getUpsertItemOperation(andersenFamilyItem, new PartitionKey(andersenFamilyItem.getLastName()));
BulkWriter bulkWriter = new BulkWriter(container);
bulkWriter.scheduleWrites(andersonItemOperation);
bulkWriter.scheduleWrites(wakeFieldItemOperation);
bulkWriter.execute().subscribe();
}
private void bulkCreateItemsWithBulkWriterAbstractionAndGlobalThroughputControl() {
String controlContainerId = "throughputControlContainer";
CosmosAsyncContainer controlContainer = database.getContainer(controlContainerId);
database.createContainerIfNotExists(controlContainer.getId(), "/groupId").block();
ThroughputControlGroupConfig groupConfig =
new ThroughputControlGroupConfigBuilder()
.setGroupName("group-" + UUID.randomUUID())
.setTargetThroughput(200)
.build();
GlobalThroughputControlConfig globalControlConfig = this.client.createGlobalThroughputControlConfigBuilder(this.database.getId(), controlContainerId)
.setControlItemRenewInterval(Duration.ofSeconds(5))
.setControlItemExpireInterval(Duration.ofSeconds(20))
.build();
container.enableGlobalThroughputControlGroup(groupConfig, globalControlConfig);
CosmosItemRequestOptions requestOptions = new CosmosItemRequestOptions();
requestOptions.setThroughputControlGroupName(groupConfig.getGroupName());
Family andersenFamilyItem = Families.getAndersenFamilyItem();
Family wakefieldFamilyItem = Families.getWakefieldFamilyItem();
CosmosItemOperation andersonItemOperation = CosmosBulkOperations.getCreateItemOperation(andersenFamilyItem, new PartitionKey(andersenFamilyItem.getLastName()));
CosmosItemOperation wakeFieldItemOperation = CosmosBulkOperations.getCreateItemOperation(wakefieldFamilyItem, new PartitionKey(wakefieldFamilyItem.getLastName()));
BulkWriter bulkWriter = new BulkWriter(container);
bulkWriter.scheduleWrites(andersonItemOperation);
bulkWriter.scheduleWrites(wakeFieldItemOperation);
bulkWriter.execute().subscribe();
}
- The sample also includes bulk create methods that illustrate how to add response processing and set execution options:
private void bulkCreateItemsWithResponseProcessing(Flux<Family> families) {
Flux<CosmosItemOperation> cosmosItemOperations = families.map(
family -> CosmosBulkOperations.getCreateItemOperation(family, new PartitionKey(family.getLastName())));
container.executeBulkOperations(cosmosItemOperations).flatMap(cosmosBulkOperationResponse -> {
CosmosBulkItemResponse cosmosBulkItemResponse = cosmosBulkOperationResponse.getResponse();
CosmosItemOperation cosmosItemOperation = cosmosBulkOperationResponse.getOperation();
if (cosmosBulkOperationResponse.getException() != null) {
logger.error("Bulk operation failed", cosmosBulkOperationResponse.getException());
} else if (cosmosBulkItemResponse == null ||
!cosmosBulkOperationResponse.getResponse().isSuccessStatusCode()) {
logger.error(
"The operation for Item ID: [{}] Item PartitionKey Value: [{}] did not complete " +
"successfully with " + "a" + " {} response code.",
cosmosItemOperation.<Family>getItem().getId(),
cosmosItemOperation.<Family>getItem().getLastName(),
cosmosBulkItemResponse != null ? cosmosBulkItemResponse.getStatusCode() : "n/a");
} else {
logger.info(
"Item ID: [{}] Item PartitionKey Value: [{}]",
cosmosItemOperation.<Family>getItem().getId(),
cosmosItemOperation.<Family>getItem().getLastName());
logger.info("Status Code: {}", cosmosBulkItemResponse.getStatusCode());
logger.info("Request Charge: {}", cosmosBulkItemResponse.getRequestCharge());
}
if (cosmosBulkItemResponse == null) {
return Mono.error(new IllegalStateException("No response retrieved."));
} else {
return Mono.just(cosmosBulkItemResponse);
}
}).blockLast();
}
private void bulkCreateItemsWithExecutionOptions(Flux<Family> families) {
CosmosBulkExecutionOptions bulkExecutionOptions = new CosmosBulkExecutionOptions();
// The default value for maxMicroBatchConcurrency is 1.
// By increasing it, it means more concurrent requests will be allowed to be sent to the server, which leads to increased RU usage.
//
// Before you increase the value, please examine the RU usage of your container - whether it has been saturated or not.
// When the RU has already been under saturation, increasing the concurrency will not help the situation,
// rather it may cause more 429 and request timeout.
bulkExecutionOptions.setMaxMicroBatchConcurrency(2);
Flux<CosmosItemOperation> cosmosItemOperations = families.map(family -> CosmosBulkOperations.getCreateItemOperation(family, new PartitionKey(family.getLastName())));
container.executeBulkOperations(cosmosItemOperations, bulkExecutionOptions).blockLast();
}
Large-scale ingestion strategy
For large-scale data ingestion, throughput control and retry policy are the primary levers for avoiding throttling — not batch size or concurrency alone. Focusing on batch size and concurrency without addressing throughput limits and retries won't reliably prevent 429 (rate limited) responses at scale.
Choose an ingestion approach
| Scenario | Recommended approach |
|---|---|
| Large-scale distributed ingestion across multiple machines | Apache Spark connector |
| Single-machine ingestion requiring fine-grained control | Java SDK bulk executor with throughput control |
For large-scale ingestion, the Apache Spark connector is the preferred choice. It handles distributed computation, automatic retry and backoff, and load balancing across worker nodes without requiring manual tuning of concurrency or batch sizes.
Java SDK bulk executor with throughput control
If your scenario requires the Java SDK directly, the azure-cosmos-distributed-bulk-sample provides a reference implementation for production-scale ingestion. It demonstrates the following key settings:
- Auto-tuned micro-batch sizes: The sample dynamically adjusts batch sizes from 1 to 100 documents per physical partition to saturate throughput while keeping throttling manageable.
- Configurable retry count: Default is 20 retries per batch. Adjust based on your tolerance for transient failures and downstream latency requirements.
- Concurrent batches per machine: Default is 8 concurrent batches. The recommended range is 25–100% of available CPU cores on the ingestion machine.
Tip
Start with the defaults and monitor 429 (rate limited) response rates. Reduce concurrent batches or add throughput control if excessive throttling occurs.
Throughput control for shared containers
If multiple workloads share the same container, use throughput control groups to cap the RU/s consumed by bulk ingestion and prevent it from starving other workloads:
Note
Throughput control requires a supported minimum Azure Cosmos DB Java SDK v4 version. The throughput control APIs are also annotated with @Beta and are subject to change. Verify the current version requirements and API status in the throughput control documentation before using the following sample.
ThroughputControlGroupConfig groupConfig =
new ThroughputControlGroupConfigBuilder()
.groupName("bulkIngestionGroup")
.targetThroughputThreshold(0.75) // limit ingestion to 75% of provisioned throughput
.defaultControlGroup(true)
.build();
container.enableLocalThroughputControlGroup(groupConfig);
To coordinate throughput limits across multiple ingestion machines, use global throughput control instead.
Reference implementations
| Sample | Description |
|---|---|
| azure-cosmos-distributed-bulk-sample | End-to-end distributed ingestion with job tracking, restartable batches, auto-tuned micro-batch sizes, and configurable retry and concurrency settings. |
| ThroughputControlQuickstartAsync.java | Local throughput control, global throughput control with a shared RU limit via a metadata container, and priority-based throttling. |
Performance tips
Consider the following points for better performance when using the bulk executor library:
For best performance, run your application from an Azure VM in the same region as your Azure Cosmos DB account write region.
To achieve higher throughput:
- Set the JVM heap size large enough to avoid memory issues when handling large numbers of documents. Suggested heap size:
max(3 GB, 3 * sizeof(all documents passed to bulk import API in one batch)). - Bulk operations have a preprocessing phase, so you get higher throughput when processing large document sets. For example, importing 10,000,000 documents by running bulk import 10 times with 1,000,000 documents each is more efficient than running it 100 times with 100,000 documents each.
- Set the JVM heap size large enough to avoid memory issues when handling large numbers of documents. Suggested heap size:
Instantiate a single
CosmosAsyncClientobject for the entire application within a single virtual machine that corresponds to a specific Azure Cosmos DB container.A single bulk operation API execution consumes a large chunk of the client machine's CPU and network I/O by spawning multiple tasks internally. Avoid spawning multiple concurrent tasks within your application process, where each task executes bulk operation API calls. If a single bulk operation API call running on a single virtual machine can't consume your entire container's throughput (if your container's throughput > 1 million RU/s), create separate virtual machines to execute bulk operation API calls concurrently.