Edit

Perform bulk operations on Azure Cosmos DB data

This tutorial shows how to perform bulk operations in the Azure Cosmos DB Java V4 SDK. This version of the SDK includes the bulk executor library. If you're using an older version of the Java SDK, migrate to the latest version. The Azure Cosmos DB Java V4 SDK is the current recommended solution for Java bulk support.

Currently, the bulk executor library is supported only by Azure Cosmos DB for NoSQL and API for Gremlin accounts. To learn about using the bulk executor .NET library with API for Gremlin, see perform bulk operations in Azure Cosmos DB for Gremlin.

Prerequisites

  • If you don't have an Azure subscription, create a free account before you begin. Or, you can use the Azure Cosmos DB Emulator with the https://localhost:8081 endpoint. The Primary Key is provided in Authenticating requests.

  • Java Development Kit (JDK) 1.8+

    • On Ubuntu, run apt-get install default-jdk to install the JDK.

    • Be sure to set the JAVA_HOME environment variable to point to the folder where the JDK is installed.

  • Download and install a Maven binary archive

    • On Ubuntu, you can run apt-get install maven to install Maven.
  • Create an Azure Cosmos DB for NoSQL account by using the steps described in the create database account section of the Java quickstart article.

Clone the sample application

Download a sample repository for the Java V4 SDK from GitHub. These sample applications perform CRUD operations and other common operations on Azure Cosmos DB. To clone the repository, open a command prompt, go to the directory where you want to copy the application, and run the following command:

 git clone https://github.com/Azure-Samples/azure-cosmos-java-sql-api-samples 

The cloned repository contains a sample SampleBulkQuickStartAsync.java in the azure-cosmos-java-sql-api-samples/src/main/java/com/azure/cosmos/examples/bulk/async folder. The application generates documents and executes operations to bulk create, upsert, replace, and delete items in Azure Cosmos DB. The following sections review the code in the sample app.

Bulk execution in Azure Cosmos DB

  1. The Azure Cosmos DB's connection strings are read as arguments and assigned to variables defined in /examples/common/AccountSettings.java file. These environment variables must be set
ACCOUNT_HOST=your account hostname;ACCOUNT_KEY=your account primary key

To run the bulk sample, specify its Main Class:

com.azure.cosmos.examples.bulk.async.SampleBulkQuickStartAsync
  1. The CosmosAsyncClient object is initialized by using the following statements:
client = new CosmosClientBuilder()
    .endpoint(AccountSettings.HOST)
        .credential(new DefaultAzureCredentialBuilder().build())
    .preferredRegions(preferredRegions)
    .contentResponseOnWriteEnabled(true)
    .consistencyLevel(ConsistencyLevel.SESSION).buildAsyncClient();

  1. The sample creates an async database and container. It then creates multiple documents on which bulk operations will be executed. It adds these documents to a Flux<Family> reactive stream object:
Family andersenFamilyItem = Families.getAndersenFamilyItem();
Family wakefieldFamilyItem = Families.getWakefieldFamilyItem();
Family johnsonFamilyItem = Families.getJohnsonFamilyItem();
Family smithFamilyItem = Families.getSmithFamilyItem();

//  Setup family items to create
Flux<Family> families = Flux.just(andersenFamilyItem, wakefieldFamilyItem, johnsonFamilyItem, smithFamilyItem);
  1. The sample contains methods for bulk create, upsert, replace, and delete. In each method we map the families documents in the BulkWriter Flux<Family> stream to multiple method calls in CosmosBulkOperations. These operations are added to another reactive stream object Flux<CosmosItemOperation>. The stream is then passed to the executeBulkOperations method of the async container we created at the beginning, and operations are executed in bulk. See bulk create method below as an example:
private void bulkCreateItems(Flux<Family> families) {
    Flux<CosmosItemOperation> cosmosItemOperations = families.map(
        family -> CosmosBulkOperations.getCreateItemOperation(family, new PartitionKey(family.getLastName())));
    container.executeBulkOperations(cosmosItemOperations).blockLast();
}
  1. The BulkWriter.java class in the same directory as the sample application demonstrates how to handle rate limiting (429) and timeout (408) errors that occur during bulk execution and how to retry those operations. The following methods also show how to implement local and global throughput control.
private void bulkUpsertItemsWithBulkWriterAbstraction() {
    Family andersenFamilyItem = Families.getAndersenFamilyItem();
    Family wakefieldFamilyItem = Families.getWakefieldFamilyItem();
    CosmosItemOperation wakeFieldItemOperation = CosmosBulkOperations.getUpsertItemOperation(wakefieldFamilyItem, new PartitionKey(wakefieldFamilyItem.getLastName()));
    CosmosItemOperation andersonItemOperation = CosmosBulkOperations.getUpsertItemOperation(andersenFamilyItem, new PartitionKey(andersenFamilyItem.getLastName()));
    BulkWriter bulkWriter = new BulkWriter(container);
    bulkWriter.scheduleWrites(andersonItemOperation);
    bulkWriter.scheduleWrites(wakeFieldItemOperation);
    bulkWriter.execute().subscribe();
}

private void bulkUpsertItemsWithBulkWriterAbstractionAndLocalThroughPutControl() {
    ThroughputControlGroupConfig groupConfig =
            new ThroughputControlGroupConfigBuilder()
                    .setGroupName("group1")
                    .setTargetThroughput(200)
                    .build();
    container.enableLocalThroughputControlGroup(groupConfig);
    Family andersenFamilyItem = Families.getAndersenFamilyItem();
    Family wakefieldFamilyItem = Families.getWakefieldFamilyItem();
    CosmosItemOperation wakeFieldItemOperation = CosmosBulkOperations.getUpsertItemOperation(wakefieldFamilyItem, new PartitionKey(wakefieldFamilyItem.getLastName()));
    CosmosItemOperation andersonItemOperation = CosmosBulkOperations.getUpsertItemOperation(andersenFamilyItem, new PartitionKey(andersenFamilyItem.getLastName()));
    BulkWriter bulkWriter = new BulkWriter(container);
    bulkWriter.scheduleWrites(andersonItemOperation);
    bulkWriter.scheduleWrites(wakeFieldItemOperation);
    bulkWriter.execute().subscribe();
}

private void bulkCreateItemsWithBulkWriterAbstractionAndGlobalThroughputControl() {
    String controlContainerId = "throughputControlContainer";
    CosmosAsyncContainer controlContainer = database.getContainer(controlContainerId);
    database.createContainerIfNotExists(controlContainer.getId(), "/groupId").block();

    ThroughputControlGroupConfig groupConfig =
            new ThroughputControlGroupConfigBuilder()
                    .setGroupName("group-" + UUID.randomUUID())
                    .setTargetThroughput(200)
                    .build();

    GlobalThroughputControlConfig globalControlConfig = this.client.createGlobalThroughputControlConfigBuilder(this.database.getId(), controlContainerId)
            .setControlItemRenewInterval(Duration.ofSeconds(5))
            .setControlItemExpireInterval(Duration.ofSeconds(20))
            .build();

    container.enableGlobalThroughputControlGroup(groupConfig, globalControlConfig);
    CosmosItemRequestOptions requestOptions = new CosmosItemRequestOptions();
    requestOptions.setThroughputControlGroupName(groupConfig.getGroupName());
    Family andersenFamilyItem = Families.getAndersenFamilyItem();
    Family wakefieldFamilyItem = Families.getWakefieldFamilyItem();
    CosmosItemOperation andersonItemOperation = CosmosBulkOperations.getCreateItemOperation(andersenFamilyItem, new PartitionKey(andersenFamilyItem.getLastName()));
    CosmosItemOperation wakeFieldItemOperation = CosmosBulkOperations.getCreateItemOperation(wakefieldFamilyItem, new PartitionKey(wakefieldFamilyItem.getLastName()));
    BulkWriter bulkWriter = new BulkWriter(container);
    bulkWriter.scheduleWrites(andersonItemOperation);
    bulkWriter.scheduleWrites(wakeFieldItemOperation);
    bulkWriter.execute().subscribe();
}
  1. The sample also includes bulk create methods that illustrate how to add response processing and set execution options:
private void bulkCreateItemsWithResponseProcessing(Flux<Family> families) {
    Flux<CosmosItemOperation> cosmosItemOperations = families.map(
        family -> CosmosBulkOperations.getCreateItemOperation(family, new PartitionKey(family.getLastName())));
    container.executeBulkOperations(cosmosItemOperations).flatMap(cosmosBulkOperationResponse -> {

        CosmosBulkItemResponse cosmosBulkItemResponse = cosmosBulkOperationResponse.getResponse();
        CosmosItemOperation cosmosItemOperation = cosmosBulkOperationResponse.getOperation();

        if (cosmosBulkOperationResponse.getException() != null) {
            logger.error("Bulk operation failed", cosmosBulkOperationResponse.getException());
        } else if (cosmosBulkItemResponse == null ||
            !cosmosBulkOperationResponse.getResponse().isSuccessStatusCode()) {

            logger.error(
                "The operation for Item ID: [{}]  Item PartitionKey Value: [{}] did not complete " +
                    "successfully with " + "a" + " {} response code.",
                cosmosItemOperation.<Family>getItem().getId(),
                cosmosItemOperation.<Family>getItem().getLastName(),
                cosmosBulkItemResponse != null ? cosmosBulkItemResponse.getStatusCode() : "n/a");
        } else {
            logger.info(
                "Item ID: [{}]  Item PartitionKey Value: [{}]",
                cosmosItemOperation.<Family>getItem().getId(),
                cosmosItemOperation.<Family>getItem().getLastName());
            logger.info("Status Code: {}", cosmosBulkItemResponse.getStatusCode());
            logger.info("Request Charge: {}", cosmosBulkItemResponse.getRequestCharge());
        }
        if (cosmosBulkItemResponse == null) {
            return Mono.error(new IllegalStateException("No response retrieved."));
        } else {
            return Mono.just(cosmosBulkItemResponse);
        }
    }).blockLast();
}

private void bulkCreateItemsWithExecutionOptions(Flux<Family> families) {
    CosmosBulkExecutionOptions bulkExecutionOptions = new CosmosBulkExecutionOptions();

    // The default value for maxMicroBatchConcurrency is 1.
    // By increasing it, it means more concurrent requests will be allowed to be sent to the server, which leads to increased RU usage.
    //
    // Before you increase the value, please examine the RU usage of your container - whether it has been saturated or not.
    // When the RU has already been under saturation, increasing the concurrency will not help the situation,
    // rather it may cause more 429 and request timeout.
    bulkExecutionOptions.setMaxMicroBatchConcurrency(2);
    Flux<CosmosItemOperation> cosmosItemOperations = families.map(family -> CosmosBulkOperations.getCreateItemOperation(family, new PartitionKey(family.getLastName())));
    container.executeBulkOperations(cosmosItemOperations, bulkExecutionOptions).blockLast();
}

Large-scale ingestion strategy

For large-scale data ingestion, throughput control and retry policy are the primary levers for avoiding throttling — not batch size or concurrency alone. Focusing on batch size and concurrency without addressing throughput limits and retries won't reliably prevent 429 (rate limited) responses at scale.

Choose an ingestion approach

Scenario Recommended approach
Large-scale distributed ingestion across multiple machines Apache Spark connector
Single-machine ingestion requiring fine-grained control Java SDK bulk executor with throughput control

For large-scale ingestion, the Apache Spark connector is the preferred choice. It handles distributed computation, automatic retry and backoff, and load balancing across worker nodes without requiring manual tuning of concurrency or batch sizes.

Java SDK bulk executor with throughput control

If your scenario requires the Java SDK directly, the azure-cosmos-distributed-bulk-sample provides a reference implementation for production-scale ingestion. It demonstrates the following key settings:

  • Auto-tuned micro-batch sizes: The sample dynamically adjusts batch sizes from 1 to 100 documents per physical partition to saturate throughput while keeping throttling manageable.
  • Configurable retry count: Default is 20 retries per batch. Adjust based on your tolerance for transient failures and downstream latency requirements.
  • Concurrent batches per machine: Default is 8 concurrent batches. The recommended range is 25–100% of available CPU cores on the ingestion machine.

Tip

Start with the defaults and monitor 429 (rate limited) response rates. Reduce concurrent batches or add throughput control if excessive throttling occurs.

Throughput control for shared containers

If multiple workloads share the same container, use throughput control groups to cap the RU/s consumed by bulk ingestion and prevent it from starving other workloads:

Note

Throughput control requires a supported minimum Azure Cosmos DB Java SDK v4 version. The throughput control APIs are also annotated with @Beta and are subject to change. Verify the current version requirements and API status in the throughput control documentation before using the following sample.

ThroughputControlGroupConfig groupConfig =
    new ThroughputControlGroupConfigBuilder()
        .groupName("bulkIngestionGroup")
        .targetThroughputThreshold(0.75) // limit ingestion to 75% of provisioned throughput
        .defaultControlGroup(true)
        .build();

container.enableLocalThroughputControlGroup(groupConfig);

To coordinate throughput limits across multiple ingestion machines, use global throughput control instead.

Reference implementations

Sample Description
azure-cosmos-distributed-bulk-sample End-to-end distributed ingestion with job tracking, restartable batches, auto-tuned micro-batch sizes, and configurable retry and concurrency settings.
ThroughputControlQuickstartAsync.java Local throughput control, global throughput control with a shared RU limit via a metadata container, and priority-based throttling.

Performance tips

Consider the following points for better performance when using the bulk executor library:

  • For best performance, run your application from an Azure VM in the same region as your Azure Cosmos DB account write region.

  • To achieve higher throughput:

    • Set the JVM heap size large enough to avoid memory issues when handling large numbers of documents. Suggested heap size: max(3 GB, 3 * sizeof(all documents passed to bulk import API in one batch)).
    • Bulk operations have a preprocessing phase, so you get higher throughput when processing large document sets. For example, importing 10,000,000 documents by running bulk import 10 times with 1,000,000 documents each is more efficient than running it 100 times with 100,000 documents each.
  • Instantiate a single CosmosAsyncClient object for the entire application within a single virtual machine that corresponds to a specific Azure Cosmos DB container.

  • A single bulk operation API execution consumes a large chunk of the client machine's CPU and network I/O by spawning multiple tasks internally. Avoid spawning multiple concurrent tasks within your application process, where each task executes bulk operation API calls. If a single bulk operation API call running on a single virtual machine can't consume your entire container's throughput (if your container's throughput > 1 million RU/s), create separate virtual machines to execute bulk operation API calls concurrently.