Azure Service Bus Queue Types
1) Queues: One to one communication. Message is put into queue and the receiver application receives it.
2) topics and subscriptions: One to Many communication. Once the message is in queue in particular topic, the message will be sent to all the subscribers of the topic. There can be multiple sender as well.
Service Bus SDKs:
1) Service Bus Client: Is a primary iterface for developers interacting with the Service Bus Client library. It serves as a gateway from which all interaction with the library will occur.
2) Service Bus Sender: is scoped to a partcular queue/topic, and is created using the Service Bus Client. The sender allows us to send messages to a queue or topic. It also allows for scheduling messages to be available for delivery at a specified date.
3) Service Bus receiver: scoped to a particular queue or subscription and is created using the service bus client.Receiver allows us to receive messages from a queue or subscription.
4) Service Bus Processor: is scoped to a a particular queue or subscription and is created using client. The Procesr can be thought of as an abstraction around the set of RECEIVERS.
// sending the message to the servic bus
car cleint = new ServiceBusClient("Connection string");
ServiceBusSender sender = client.CreateSender("QueueName");
await sender.SendMessageAsync(new ServiceBusMessage("Hello Saumya"));
// Receiving the message
ServiceBusReceiver receiver = client.createREceiver("QueueName");
ServiceBusReceivedMessage msg = await receiver.ReceiverMessageAsync();
Console.WriteLine(msg.Body.ToString());
Allowed Operations on messages received
1) complete: when a message is received from queue or sbscription, the receiver can call the "Complete" operation to indicate that the message has been successfully processed and can be removed from the queue or subscription. This ensures that other receiver will not be receiving the message
receiver.CompleteMessageAsync(message);
2) Abandon: IF the receiver of the message is not able to process the message, then it can call the abandon operation to indicate that the message should be made available for another receiver to process. This returns the message to the queue or subscription and allows another receiver to receive and process it.
receiver.AbandonMessageAsync(message); // this will also increase the delivery count of the message.
3) Defer: If a message cannot tbe processed immidiately, the receiver can call the Defer operation and store it separately untill it is ready to be processed. This allows the receiver to prioritize other messages in the queue or subscription and come back to deferred message later. (may be because the service that will process the message may be down or something, mainly if the issue is not with the message).
receiver.AbandonMessageAsync(message); // this will also increase the delivery count of the message.
ServiceBusReceivedMessage defMsg = await receiver.ReceiveDefferedMessageAsync(message.SequenceMessage);
4) DeadLetter: If a message cannot be processed due to some errir that cannt be resolved, the receiver can call the "Deadletter" operation to inicate that the message should be moved to dead-letter ueue to topic for further analysis or processing. Dead-lettering is useful for handling messages that cannot be processsed due to issues with the message content, receiver code, or configuration.
receiver.DeadLetterMessageAsync(message, "reason", "description"); // push the message to DeadLetter
// retrieve the deadLetter message
ServiceBusReceiver dlqrcv = client.CreateReceiver("firstQueue", new BusReceiverOptions{
SubQueue = SubQueue.DeadLetter
});
ServiceBusREceiverMessage msg = await dlqrcv.ReceiverMessageAsync();
Filtering the messages to put in the queue.
For filtering the message we have to put some rules against that will be matched. The filter is usually used if we have to send the message wih particular tags to specific Queues or topics.
To apply the filters we have to first create the Administrator Client, like beow:
ServiceBusAdministrationClient administrationClient = new ServiceBus AdministrationClient("<Connection string>");
1) Applying SQL Filter
Below code create a ubscription that subscribe to "mytopic" topic in Azure Service Bus and specifically contains the message that has the tag as "name=saumya"
using Azure.Messaging.ServiceBus.Administration;
var adminClient = new ServiceBusAdministrationClient("<connection-string>");
string topicName = "customers-topic";
string subscriptionName = "prime-customers";
// Create subscription with a filter
if (!await adminClient.SubscriptionExistsAsync(topicName, subscriptionName))
{
var options = new CreateSubscriptionOptions(topicName, subscriptionName);
var ruleOptions = new CreateRuleOptions
{
Name = "PrimeMemberRule",
Filter = new SqlRuleFilter("isPrimeMember = true")
};
await adminClient.CreateSubscriptionAsync(options, ruleOptions);
}
sending message:
class Customers
{
string name {get; set;}
int userId {get; set;}
bool isPrimeMember{get; set;}
}
var sender = client.CreateSender("mytopic");
Customer obj = new Customer {name="Saumya", userId = 1, isPrimeMember = false};
var message = new ServiceBusMessage()
{
ApplicationProperties = {{ "name",obj.name} , {"userId",obj.userID} , {"isPrimeMember",false}};
await sender.sendMessageAsync(message);
}
2) Correlation Filter
using Azure.Messaging.ServiceBus.Administration;
var adminClient = new ServiceBusAdministrationClient("<connection-string>");
string topicName = "customers-topic";
string subscriptionName = "prime-customers";
// Create subscription with Correlation Filter
if (!await adminClient.SubscriptionExistsAsync(topicName, subscriptionName))
{
var options = new CreateSubscriptionOptions(topicName, subscriptionName);
var ruleOptions = new CreateRuleOptions
{
Name = "PrimeMemberRule",
Filter = new CorrelationRuleFilter()
};
// Boolean property match
ruleOptions.Filter.Properties.Add("isPrimeMember", true);
await adminClient.CreateSubscriptionAsync(options, ruleOptions);
}
Reading the object properties
using Azure.Messaging.ServiceBus;
// client
var client = new ServiceBusClient("<connection-string>");
// queue receiver
var receiver = client.CreateReceiver("customers-queue");
// receive one message
ServiceBusReceivedMessage received = await receiver.ReceiveMessageAsync();
if (received != null)
{
// Deserialize body into Customers object
var customer = received.Body.ToObjectFromJson<Customers>();
Console.WriteLine($"Customer Name: {customer.name}");
Console.WriteLine($"Customer Id: {customer.userId}");
Console.WriteLine($"Prime Member: {customer.isPrimeMember}");
// Read ApplicationProperties (metadata)
if (received.ApplicationProperties.TryGetValue("isPrimeMember", out var primeFlag))
{
Console.WriteLine($"(From Properties) Prime Flag: {primeFlag}");
}
// complete the message
await receiver.CompleteMessageAsync(received);
}
Top comments (0)