Introducing Automatic Request Batching for Amazon SQS in the AWS Java SDK 2.x
Introducing Automatic Request Batching for Amazon SQS in the AWS Java SDK 2.x
Author: John Viegas
Published on: 2024-12-16 22:58:06
Source: AWS Developer Tools Blog
Disclaimer:All rights are owned by the respective creators. No copyright infringement is intended.
We are excited to introduce the Automatic Request Batching API for Amazon Simple Queue Service (SQS) in the AWS SDK for Java 2.x. This feature allows client-side batching of up to 10 requests before sending them as a batch to Amazon SQS, optimizing both performance and cost. It also supports message polling to reduce the number of individual requests, boosting throughput while reducing costs.
Requirements
Before getting started, ensure that you’re using version 2.28.0 or later of the AWS Java SDK. You can find setup details in our “Set up an Apache Maven project” guide.
Add the following dependency to your pom.xml:
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>sqs</artifactId>
<version>2.28.0</version>
</dependency>
Quick walk-through for using SQS Automatic Request Batching APIs
Create a batch manager
Default configuration via SqsAsyncClient
The simplest way to create a batch manager is by using an SqsAsyncClient
as shown in the following snippet.
SqsAsyncClient asyncClient = SqsAsyncClient.create();
SqsAsyncBatchManager sqsAsyncBatchManager = asyncClient.batchManager();
By default, all SqsAsyncBatchManager instances use the values shown in the Configuration options for SqsAsyncBatchManager
in the following section. Additionally, the batch manager uses the client’s ScheduledExecutor
by default.
Custom Configuration via SqsAsyncBatchManager.Builder
For advanced use cases, you can customize the batch manager with optional configurations using SqsAsyncBatchManager.Builder
:
// Define all the necessary constants for the configuration used below (not shown here).
SqsAsyncBatchManager sqsAsyncBatchManager = SqsAsyncBatchManager.builder()
.client(SqsAsyncClient.create())
.scheduledExecutor(Executors.newScheduledThreadPool(SCHEDULED_THREAD_POOL_SIZE))
.overrideConfiguration(b -> b
.receiveMessageMinWaitDuration(MIN_WAIT_DURATION)
.receiveMessageVisibilityTimeout(VISIBILITY_TIMEOUT)
.receiveMessageAttributeNames(MESSAGE_ATTRIBUTE_NAMES)
.receiveMessageSystemAttributeNames(SYSTEM_ATTRIBUTE_NAMES))
.build();
Send messages
To send messages using the batch manager, use the sendMessage
method. The SDK batches requests and sends them when either the batchSize
(default: 10) is reached or the sendFrequency
(default: 200 ms) interval elapses.
Example:
CompletableFuture<SendMessageResponse> sendMessageFuture =
sqsAsyncBatchManager.sendMessage(request ->
request.messageBody("Message Body")
.queueUrl("https://sqs.us-east-1.amazonaws.com/123456789012/MyQueue"));
sendMessageFuture.thenAccept(response -> {
System.out.println("Message sent successfully. Message ID: " + response.messageId());
}).exceptionally(throwable -> {
System.err.println("Failed to send message: " + throwable.getMessage());
return null;
});
Change message visibility
You can change the visibilityTimeout of messages in a batch using the changeMessageVisibility
method.
Example:
CompletableFuture<ChangeMessageVisibilityResponse> changeVisibilityFuture =
sqsAsyncBatchManager.changeMessageVisibility(request ->
request.receiptHandle("ReceiptHandle")
.queueUrl("https://sqs.us-east-1.amazonaws.com/123456789012/MyQueue")
.visibilityTimeout(30));
changeVisibilityFuture.thenAccept(response -> {
System.out.println("Visibility timeout changed successfully.");
}).exceptionally(throwable -> {
System.err.println("Failed to change message visibility: " + throwable.getMessage());
return null;
});
Delete messages
To delete messages in a batch, use the deleteMessage
method.
CompletableFuture<DeleteMessageResponse> deleteMessageFuture =
sqsAsyncBatchManager.deleteMessage(request ->
request.receiptHandle("ReceiptHandle")
.queueUrl("https://sqs.us-east-1.amazonaws.com/123456789012/MyQueue"));
deleteMessageFuture.thenAccept(response -> {
System.out.println("Message deleted successfully.");
}).exceptionally(throwable -> {
System.err.println("Failed to delete message: " + throwable.getMessage());
return null;
});
Receive messages
With default settings
When you continuously call receiveMessage
in your application, the SqsAsyncBatchManager
will deliver messages directly from its internal buffer, which the SDK updates in the background. This approach allows the SDK to retrieve and store extra messages in the buffer, enabling the SqsAsyncBatchManager
to provide immediate responses from the buffer each time you call receiveMessage
without waiting for a new request to SQS.
Example:
CompletableFuture<ReceiveMessageResponse> receiveMessageFuture =
sqsAsyncBatchManager.receiveMessage(request ->
request.queueUrl("https://sqs.us-east-1.amazonaws.com/123456789012/MyQueue"));
receiveMessageFuture.thenAccept(response -> {
response.messages().forEach(message ->
System.out.println("Received message: " + message.body())
);
}).exceptionally(throwable -> {
System.err.println("Failed to receive messages: " + throwable.getMessage());
return null;
});
With a custom request
If you want to customize the request further, for example by setting custom wait times and specifying the number of messages to retrieve, you can do so as follows:
CompletableFuture<ReceiveMessageResponse> receiveMessageCustomTimeoutFuture =
sqsAsyncBatchManager.receiveMessage(request ->
request.queueUrl("https://sqs.us-east-1.amazonaws.com/123456789012/MyQueue")
.waitTimeSeconds(1)
.maxNumberOfMessages(7));
receiveMessageCustomTimeoutFuture.thenAccept(response -> {
response.messages().forEach(message ->
System.out.println("Received message with custom timeout: " + message.body())
);
}).exceptionally(throwable -> {
System.err.println("Failed to receive messages: " + throwable.getMessage());
return null;
});
Note: For ReceiveMessageRequest
, if any additional parameters, such as visibilityTimeout
, messageAttributeNames
, or requestOverrideConfiguration
, are set, the receiveMessage
API of the Batch Manager will bypass the internal buffer and send the request directly to SQS.
Configuration options for SqsAsyncBatchManager
SlNo | Configuration | Description | Default value |
1 | maxBatchSize | Maximum number of items per outbound request (SendMessageBatchRequest, ChangeMessageVisibilityBatchRequest, DeleteMessageBatchRequest). | 10 |
2 | sendRequestFrequency | Time before sending a batch, unless maxBatchSize is reached earlier. Higher values may reduce requests but increase latency. | 200ms |
3 | receiveMessageVisibilityTimeout | Custom visibility timeout for messages. If unset, the queue’s default is used. | Queue default |
4 | receiveMessageMinWaitDuration | Minimum wait time for receiveMessage requests. Avoid setting to 0 to prevent CPU waste. | 50ms |
5 | receiveMessageSystemAttributeNames | List of system attributes to request for receiveMessage calls. | None |
6 | receiveMessageAttributeNames | List of attributes to request for receiveMessage calls. | None |
Conclusion
In this blog post, we showed you how to set up and begin using the Automatic Request Batching feature with the SqsAsyncBatchManager
in the AWS SDK for Java 2.x. This feature is open-source and resides in the same repository as the AWS SDK for Java 2.0. For more examples related to SQS Automatic Request Batching, visit our Java SDK Developer Guide. We hope you’ll find this new feature useful. You can always share your feedback on our GitHub issues page.
About the author:
Disclaimer: All rights are owned by the respective creators. No copyright infringement is intended.