Introducing the AWS Message Processing Framework for .NET (Preview)
Introducing the AWS Message Processing Framework for .NET (Preview)
Author: Alex Shovlin
Published on: 2024-03-28 19:09:24
Source: AWS Developer Tools Blog
Disclaimer:All rights are owned by the respective creators. No copyright infringement is intended.
We are happy to announce the developer preview release of the AWS Message Processing Framework for .NET. This is an AWS-native framework that simplifies the development of .NET message-processing applications that use AWS services such as Amazon Simple Queue Service (Amazon SQS), Amazon Simple Notification Service (Amazon SNS), and Amazon EventBridge. The framework is designed to work with AWS messaging services in an idiomatic fashion for .NET, and remove boilerplate code that developers often write when working with these services. Unlike cloud-agnostic frameworks this framework is tailored for AWS messaging services, exposing their advanced features such as message attributes, FIFO queues, and message visibility management.
For example, when handling messages from SQS in a long-running application, you may be responsible for:
- Receiving the messages from SQS.
- Deserializing the SQS messages to .NET types.
- Processing each message.
- Deleting each message from the queue once it has been processed successfully.
The following code performs the steps listed previously by using the AWS SDK for .NET directly. Note that this is a simplified implementation, without sophisticated error handling or parallelization of handling messages once they are received from SQS.
var client = new AmazonSQSClient();
var queueUrl = "https://sqs.us-west-2.amazonaws.com/012345678910/MyQueue"
while (true) // process messages continually
{
var request = new ReceiveMessageRequest()
{
QueueUrl = queueUrl,
VisibilityTimeout = 30
};
var response = await client.ReceiveMessageAsync(request);
foreach (var message in response.Messages)
{
GreetingMessage? greetingMessage = null;
try
{
// Deserialize each message to our GreetingMessage type
greetingMessage = JsonSerializer.Deserialize<GreetingMessage>(message.Body);
}
catch (JsonException ex)
{
// handle malformed JSON
}
// Process each message (in this case just logging their contents)
Console.WriteLine($"Received message {greetingMessage?.Greeting} from {greetingMessage?.SenderName}");
// Then delete each message from the queue
var deleteRequest = new DeleteMessageRequest()
{
QueueUrl = queueUrl,
ReceiptHandle = message.ReceiptHandle
};
await client.DeleteMessageAsync(deleteRequest);
}
}
With the AWS Message Processing Framework for .NET, you’re only responsible for implementing your business logic to handle each message. In the following section is the handler for the same GreetingMessage
shown in the preceding example, now using the new framework.
/// <summary>
/// This handler is invoked each time you receive the message
/// </summary>
public class GreetingMessageHandler : IMessageHandler<GreetingMessage>
{
public Task<MessageProcessStatus> HandleAsync(MessageEnvelope<GreetingMessage> messageEnvelope, CancellationToken token = default)
{
Console.WriteLine($"Received message {messageEnvelope.Message.Greeting} from {messageEnvelope.Message.SenderName}");
return Task.FromResult(MessageProcessStatus.Success());
}
}
During your application’s startup, you configure the queue URL you wish to poll, and map the type of your messages to their respective handlers. The framework then handles polling SQS, deserializing each message to its .NET type, dispatching each message to the appropriate business logic using the registered handler types, and finally deleting the messages once the handler returns a result. In the following example we will configure the framework for the GreetingMessage
handler shown in the previous example:
var builder = Host.CreateDefaultBuilder(args);
builder.ConfigureServices(services =>
{
// Register the AWS Message Processing Framework for .NET
services.AddAWSMessageBus(builder =>
{
// Register that you'll handle messages of type GreetingMessage
builder.AddMessageHandler<GreetingMessageHandler, GreetingMessage>();
// That are polled from the specified queue
builder.AddSQSPoller("https://sqs.us-west-2.amazonaws.com/012345678910/MyQueue");
});
});
var host = builder.Build();
await host.RunAsync();
Features
The AWS Message Processing Framework for .NET supports sending messages to SQS and publishing events to SNS and EventBridge. When sending messages, the .NET message types are serialized to JSON and then wrapped in an envelope that conforms to the JSON event format for CloudEvents. By using this standardized format, interoperation is possible with other frameworks and programming lanugages that send or receive messages and events.
The following is the message envelope for the GreetingMessage
example shown in the previous example.
{
"id":"b02f156b-0f02-48cf-ae54-4fbbe05cffba",
"source":"/aws/messaging",
"specversion":"1.0",
"type":"GreeterMessage",
"time":"2024-03-21T16:36:02.8957126+00:00",
"data":"{\u0022SenderName\u0022:\u0022DemoUser\u0022,\u0022Greeting\u0022:\u0022Hello!\u0022}"
}
The framework supports receiving and handling messages from SQS via a long-running poller, which is meant for background services and might be deployed to services such as EC2 or ECS. When receiving messages, a message is deserialized to the .NET type, and then routed to the appropriate handler in your code based on the message type. The framework manages the visibility timeout while the message is being handled to prevent other clients from processing it. The framework also deletes the message from the queue once it has been handled successfully.
The framework also supports handling messages in AWS Lambda functions via the AWS.Messaging.Lambda package, which builds on the existing integration between Lambda and SQS.
The framework supports publishing to FIFO (first-in-first-out) SQS queues and SNS topics, as well as honoring message ordering on the client when handling messages received from a FIFO queue.
The framework supports OpenTelemetry via the AWS.Messaging.Telemetry.OpenTelemetry package. Call AddAWSMessagingInstrumentation
when configuring OpenTelemetry.
public void ConfigureServices(IServiceCollection services)
{
// Register the AWS Message Processing Framework for .NET
services.AddAWSMessageBus(builder =>
{
builder.AddMessageHandler<GreetingMessageHandler, GreetingMessage>();
builder.AddSQSPoller("https://sqs.us-west-2.amazonaws.com/012345678910/MyQueue");
});
// Configure OpenTelemetry, including the message processing framework
services.AddOpenTelemetry()
.ConfigureResource(resource => resource.AddService("myApplication"))
.WithTracing(tracing => tracing
.AddAWSMessagingInstrumentation()
.AddConsoleExporter());
}
Getting Started
This walkthrough will create two applications: an ASP.NET Core Minimal API that publishes messages to SQS when it receives a request at an API endpoint, and a long-running console application that polls for these messages and handles them.
Prerequisites
This walkthrough requires an SQS queue to publish to and receive messages from, and may incur costs if you have exceeded SQS’s Free Tier.
A queue can be created via the following AWS CLI or AWS Tools for PowerShell commands. Take note of the queue URL that is returned so that you can specify it in the framework configuration that follows.
# Using the AWS CLI
aws sqs create-queue --queue-name DemoQueue
# Or using AWS Tools for PowerShell
New-SQSQueue -QueueName DemoQueue
Publishing
- Open the command prompt or terminal. Find or create an operating system folder under which you can create a .NET project.
- In that folder, run the following command to create the .NET project.
dotnet new webapi --name Publisher
- Navigate into the new project’s folder. Add a dependency on the AWS Message Processing Framework for .NET.
cd Publisher
dotnet add package AWS.Messaging
- Replace the code in
Program.cs
with the following. Replace<queue URL>
with the queue created previously in the prerequisites.
using AWS.Messaging;
using Microsoft.AspNetCore.Mvc;
var builder = WebApplication.CreateBuilder(args);
// Add services to the container.
// Learn more about configuring Swagger/OpenAPI at https://aka.ms/aspnetcore/swashbuckle
builder.Services.AddEndpointsApiExplorer();
builder.Services.AddSwaggerGen();
// Configure the AWS Message Processing Framework for .NET
builder.Services.AddAWSMessageBus(builder =>
{
// Register that you'll publish messages of type GreetingMessage
// 1. To a specified queue,
// 2. using the message identifier "greetingMessage", which will be used
// by handlers to route the message to the appropriate handler.
builder.AddSQSPublisher<GreetingMessage>("<queue URL>", "greetingMessage");
// You can map additional message types to queues or topics here as well
});
var app = builder.Build();
// Configure the HTTP request pipeline.
if (app.Environment.IsDevelopment())
{
app.UseSwagger();
app.UseSwaggerUI();
}
app.UseHttpsRedirection();
// Create an API Endpoint that receives GreetingMessage objects
// from the caller and then sends them as an SQS message.
app.MapPost("/greeting", async ([FromServices]IMessagePublisher publisher, GreetingMessage message) =>
{
if (message.SenderName == null || message.Greeting == null)
{
return Results.BadRequest();
}
// Publish the message the queue configured above
await publisher.PublishAsync(message);
return Results.Ok();
})
.WithName("SendGreeting")
.WithOpenApi();
app.Run();
/// <summary>
/// This class represents the message contents
/// </summary>
public class GreetingMessage
{
public string? SenderName { get; set; }
public string? Greeting { get; set; }
}
- Run the following command. This should open a browser window with the Swagger UI, which allows you to explore and test your API.
dotnet watch run
- Open the
/greeting
endpoint and click “Try it out”. - Specify
senderName
andgreeting
values for the message, and click “Execute”. This invokes your API, which will send the SQS message.
Handling
- Open the command prompt or terminal. Find or create an operating system folder under which you can create a .NET project.
- In that folder, run the following command to create the .NET project.
dotnet new console --name Handler
- Navigate into the new project’s folder. Add a dependency on the AWS Message Processing Framework for .NET. Also add the
Microsoft.Extensions.Hosting
package, which allows you to configure the framework via the .NET Generic Host.
cd Handler
dotnet add package AWS.Messaging
dotnet add package Microsoft.Extensions.Hosting
- Replace the code in
Program.cs
with the following. Replace<queue URL>
with the queue created previously in the prerequisites.
using AWS.Messaging;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
var builder = Host.CreateDefaultBuilder(args);
builder.ConfigureServices(services =>
{
// Register the AWS Message Processing Framework for .NET
services.AddAWSMessageBus(builder =>
{
// Register you'll poll the following queue
builder.AddSQSPoller("<queue URL>");
// And that messages of type "greetingMessage" should be
// 1. Deserialized as GreetingMessage objects
// 2. Which are then passed to GreetingMessageHandler
builder.AddMessageHandler<GreetingMessageHandler, GreetingMessage>("greetingMessage");
// You can add additional message handlers here, using different message types.
});
});
var host = builder.Build();
await host.RunAsync();
/// <summary>
/// This class represents the message contents
/// </summary>
public class GreetingMessage
{
public string? SenderName { get; set; }
public string? Greeting { get; set; }
}
/// <summary>
/// This handler is invoked each time you receive the message
/// </summary>
public class GreetingMessageHandler : IMessageHandler<GreetingMessage>
{
public Task<MessageProcessStatus> HandleAsync(MessageEnvelope<GreetingMessage> messageEnvelope, CancellationToken token = default)
{
Console.WriteLine($"Received message {messageEnvelope.Message.Greeting} from {messageEnvelope.Message.SenderName}");
return Task.FromResult(MessageProcessStatus.Success());
}
}
- Run the following command. This will start a long-running poller.
dotnet run
Shortly after startup it should receive the message published in the first part of the walkthrough and log the following message:
Received message {greeting} from {senderName}
Press Ctrl+C
to stop the poller.
Cleanup
Invoke the following AWS CLI or AWS Tools for PowerShell command to delete the queue when finished:
# Using the AWS CLI
aws sqs delete-queue --queue-url "<queue URL noted previously>"
# Or using AWS Tools for PowerShell
Remove-SQSQueue -QueueUrl "<queue URL noted previously>"
Conclusion
The AWS Message Processing Framework for .NET is available as the AWS.Messaging package on NuGet.org. Refer to the README, Developer Guide, and API Reference documentation to get started.
To get started handling messages in AWS Lambda, there is a template available in Visual Studio 2022 via the AWS Toolkit for Visual Studio and by calling dotnet new serverless.Messaging
after installing the latest Amazon.Lambda.Templates NuGet package.
The framework is open source in the awslabs/aws-dotnet-messaging repository on GitHub. Don’t hesitate to create an issue or a pull request if you have ideas for improvements.
About the author:
Disclaimer: All rights are owned by the respective creators. No copyright infringement is intended.