DEV Community

Cover image for Sql Service Broker
mortylen
mortylen

Posted on • Originally published at mortylen.hashnode.dev

Sql Service Broker

I would like to share my first experience with SQL Service Broker from Microsoft. While I have some experience with SQL as a programmer, I have primarily utilized SQL as a database component within various applications and systems. However, I had never delved into the realm of Service Broker until now. This article chronicles my initial exploration and efforts to understand and utilize this component.

So, what exactly is a Service Broker?

I read a few introductory words and I understood it roughly like this. Service Broker is about communication. It is all. It is about communication between services. This communication can occur within a single database, between multiple databases, or even between different instances. Essentially, it serves as a technology for efficient queue management. Messages are placed in a queue and remain there until the system has available resources to process them. Importantly, this communication is asynchronous. When a sender dispatches a message, they don't have to wait for the receiver to process it. The sender can continue working while the message sits in the queue, and the processing occurs in the background. The possibilities of use are vast, ranging from leveraging asynchronous triggers for handling long-running tasks to distributing data between SQL instances. It is only limited by your imagination. It's worth noting that Service Broker is an internal service of SQL Server and is not listed among the Windows services.

Broker Diagram

What do I need to test?

In my particular case, I needed to implement asynchronous triggers to solve long-running tasks.

What do I need to know?

As a first step, I needed to familiarize myself with the various objects and the overall broker architecture to gain a better understanding of how it all fits together. A Service Broker consists of a client service, which initiates the conversation, and a service that receives and processes the messages. Each service is associated with a queue. The relationship between services is defined by a contract, which determines the types of messages that the initiator and target services can send to the queue. The exchange of messages between two services is referred to as a dialog.

What are the main objects?

The main objects involved in Service Broker are Message type, Contract, Queue, Service, and Route.

Message type

Each message is represented by a record in the queue, and its format is determined by the Message Type. A Message Type contains the name of the message and the data type. The target service performs validation on the type of data received. Various validations can be applied, such as:

  • EMPTY: The message must not contain any data.

  • NONE: The message can contain any type of data (e.g., JSON, binary, etc.).

  • WELL_FORMED_XML: The message must have a valid XML format.

  • VALID_XML: The message must be valid against a specified XML schema.

In addition to user-defined messages, there are also system messages provided by the Service Broker. These system messages are used by the broker to report the status of a dialog e.g. error messages, end dialog messages, and dialog timer messages.

Contract

A contract represents an agreement between two services and defines the types of messages that they can send to each other during communication. The SEND BY parameter within a contract specifies which endpoint is allowed to send messages:

  • INITIATOR: Only the initiator can send messages.

  • TARGET: Only the target can send messages.

  • ANY: Both the initiator and target can send messages.

Queue

A queue is a storage location where incoming messages are stored for processing. A single queue can be assigned to multiple services, allowing them to send or receive messages from that queue.

Service

A service is responsible for routing and delivering messages to the designated queue within a database. A service acts as an endpoint for a conversation and can be either an initiator (which starts a conversation by sending a message to a queue) or a target (which receives a message from a queue). Each service is associated with a queue and a contract that specifies the types of messages it can send to the queue.

Route

A route is an object used for message routing, mapping a service name to a network address. If a service is not assigned a route, it will deliver the message within the current instance.

What are the main commands I will need?

Next, I delved into learning more about the commands involved in Service Broker. The main commands in Service Broker are Begin Dialog, Send, Receive, and End Conversation.

Dialog

A dialog represents a communication channel between two services and is used for sending messages. Service Broker ensures the guaranteed delivery of messages, and they are delivered to the queue in the order in which they were sent. BEGIN DIALOG is used to initiate a new conversation, and the conversation continues until one of the parties terminates it or sends an error message. It's important to note that both parties must properly terminate the conversation for it to be correctly ended; otherwise, the dialog will persist in the database.

Send

The SEND command is used to send a message within a conversation. The message is placed in the destination queue. If the destination of the conversation is outside the instance, the message is placed in the transmission_queue.

End Conversation

The END CONVERSATION command is used to close a conversation. It is crucial to ensure that both sides of the conversation properly close it.

Receive

The RECEIVE command is used to retrieve one or more messages from the queue. Once a message is successfully retrieved, it is removed from the queue.

Alright, now that I have a basic understanding of how Service Broker works, it's time to set it up for testing purposes.

How do I imagine my testing purposes?

First, I will create a database called Test_ServiceBroker. Within this database, I will create a table named tbl_DataStorage to simulate a production table. This table will collect data from users for further processing.

Next, I will create a trigger for the tbl_DataStorage table that will execute after the data is inserted. This trigger will utilize Service Broker to send the data to a stored procedure named sp_ProcessRequestMessage. This procedure will handle the data processing. Once the data is processed, the Service Broker will send a message to another stored procedure named sp_ProcessResponseMessage, which will update the status of the record in the tbl_DataStorage table to mark it as complete.

I will create and configure the necessary Service Broker objects. Firstly, I will create a Message Type for the request message and another Message Type for the response message. Following that, I will establish a Contract between these Message Types, defining the allowable communication between them.

Afterward, I will create a Queue for the initiator and another Queue for the target. These queues will serve as the storage location for incoming messages.

Finally, I will create two Services: one for the initiator and another for the target. These services will act as the endpoints for the conversation and will be associated with their respective queues.

SQL Diagram

Create a database playground

First, let's prepare the playground for our Service Broker implementation.

-- Create the 'Test_ServiceBroker' database
USE master
GO
IF db_id('Test_ServiceBroker') IS NOT NULL
    DROP DATABASE Test_ServiceBroker
CREATE DATABASE Test_ServiceBroker;
GO
Enter fullscreen mode Exit fullscreen mode
-- Check if Service Broker is enabled
SELECT name, is_broker_enabled FROM sys.databases
Enter fullscreen mode Exit fullscreen mode

We can check the status of the Service Broker by querying the is_broker_enabled column in the sys.databases table. Here are the possible values:

  • 0: Service Broker is disabled. Received messages will not be placed in the queue, and sent messages will remain in the transmission queue.

  • 1: Service Broker is enabled.

-- Enable Service Broker for the 'Test_ServiceBroker' database
ALTER DATABASE Test_ServiceBroker SET ENABLE_BROKER
GO
Enter fullscreen mode Exit fullscreen mode

We can enable the Service Broker for the Test_ServiceBroker database by using the ENABLE_BROKER parameter. Similarly, the Service Broker can be disabled using the DISABLE_BROKER parameter.

-- Use the 'Test_ServiceBroker' database
USE Test_ServiceBroker
GO
Enter fullscreen mode Exit fullscreen mode

Let's switch to the Test_ServiceBroker database for the following operations.

-- Create the 'tbl_DataStorage' table
IF object_id('tbl_DataStorage') IS NOT NULL
  DROP TABLE tbl_DataStorage
GO
CREATE TABLE tbl_DataStorage
(
    Id INT NOT NULL IDENTITY(1,1) PRIMARY KEY,
    InsertDate DATETIME NOT NULL DEFAULT (GETDATE()),
    DataStorage NVARCHAR(MAX),
    DataStatus NVARCHAR(256),
    CompletionDate DATETIME,
);
GO
Enter fullscreen mode Exit fullscreen mode

Here, we create the tbl_DataStorage table, which will store the data for processing. The table has the following columns:

  • Id: A unique primary key for each record.

  • InsertDate: The date and time of data insertion.

  • DataStorage: User data that will be processed.

  • DataStatus: The status of data processing.

  • CompletionDate: The date and time of data processing completion.

Configure the Service Broker

Next, let's configure the Service Broker for our playground.

-- Use the 'Test_ServiceBroker' database
USE Test_ServiceBroker
GO
Enter fullscreen mode Exit fullscreen mode
-- Create the message type for the request
CREATE MESSAGE TYPE [RequestMessage]
VALIDATION = WELL_FORMED_XML
GO

-- Create the message type for the response
CREATE MESSAGE TYPE [ResponseMessage]
VALIDATION = WELL_FORMED_XML
GO
Enter fullscreen mode Exit fullscreen mode

We create two message types, one for the request and another for the response. Both message types have their validation set to WELL_FORMED_XML, indicating that the user will send data in XML format.

-- Retrieve information about message types
SELECT * FROM sys.service_message_types
Enter fullscreen mode Exit fullscreen mode

This query retrieves information about the registered message types in the Service Broker. The key attribute here is validation_desc, which provides details about the validation type. Additionally, there are several default message types, such as Error, EndDialog, and DialogTimer.

-- Create the contract
CREATE CONTRACT [DataProcessContract]
(
    [RequestMessage] SENT BY INITIATOR,
    [ResponseMessage] SENT BY TARGET
)
GO
Enter fullscreen mode Exit fullscreen mode

We create a contract named DataProcessContract between the RequestMessage (sent by the initiator) and the ResponseMessage (sent by the target).

-- Retrieve information about contracts
SELECT * FROM sys.service_contracts
Enter fullscreen mode Exit fullscreen mode

This query retrieves information about the contracts registered in the Service Broker.

-- Retrieve information about contracts and message types
SELECT
    sc.name AS 'Contract',
    smt.name AS 'Message type',
    scmu.is_sent_by_initiator,
    scmu.is_sent_by_target,
    smt.validation
FROM sys.service_contract_message_usages scmu
    INNER JOIN sys.service_message_types smt ON scmu.message_type_id = smt.message_type_id
    INNER JOIN sys.service_contracts sc ON sc.service_contract_id = scmu.service_contract_id
Enter fullscreen mode Exit fullscreen mode

This query provides data about the contracts, their associated message types, whether they are sent by the initiator or target, and the validation type.

-- Create the queue for the initiator
CREATE QUEUE [InitiatorQueue]
WITH STATUS = ON
GO

-- Create the queue for the target
CREATE QUEUE [TargetQueue]
WITH STATUS = ON
GO
Enter fullscreen mode Exit fullscreen mode

We create two queues, one for the initiator and one for the target. Both queues are set to ON status, which means they are enabled and ready to accept messages.

-- Retrieve information about queues
SELECT * FROM sys.service_queues
Enter fullscreen mode Exit fullscreen mode

This query retrieves information about the queues in the Service Broker. The main attributes here are the name, max_readers, activation_procedure, and the enabled status of activation, receive, and retention.

-- Create the service for the initiator
CREATE SERVICE [InitiatorService]
ON QUEUE [InitiatorQueue]
(
    [DataProcessContract]
)
GO

-- Create the service for the target
CREATE SERVICE [TargetService]
ON QUEUE [TargetQueue]
(
    [DataProcessContract]
)
GO
Enter fullscreen mode Exit fullscreen mode

We create two service endpoints: one for the initiator and another for the target. Both services are associated with the DataProcessContract contract.

-- Retrieve information about services
SELECT * FROM sys.services
Enter fullscreen mode Exit fullscreen mode

This query retrieves information about the services in the Service Broker. It includes the name of the service and the associated service queue ID (service_queue_id).

Create a Trigger and Stored Procedures

Now we need to create a trigger for the tbl_DataStorage table and two stored procedures to process messages from the Service Broker.

-- Create the trigger
CREATE TRIGGER [dbo].[trg_DataStorage_insert]  
    ON [Test_ServiceBroker].[dbo].[tbl_DataStorage]
    AFTER INSERT
AS
BEGIN TRY
    BEGIN TRANSACTION ;
        DECLARE @DataContent NVARCHAR(MAX)
        DECLARE @Id int
        SET @DataContent = (SELECT [DataStorage] FROM inserted i)   
        SET @Id = (SELECT Id FROM inserted i)  
        If (@DataContent IS NOT NULL)
        BEGIN
            UPDATE [Test_ServiceBroker].[dbo].[tbl_DataStorage] SET [DataStatus] = 'pending processing' WHERE [Id] = @Id;
            SET @DataContent = @DataContent + '<Id_DataStorage>' + CAST(@Id AS NVARCHAR(10)) + '</Id_DataStorage>';
            DECLARE @DialogHandle UNIQUEIDENTIFIER;  
            BEGIN DIALOG CONVERSATION @DialogHandle  
                FROM SERVICE [InitiatorService]  
                TO SERVICE 'TargetService'  
                ON CONTRACT [DataProcessContract]  
                WITH ENCRYPTION = OFF;  
            SEND ON CONVERSATION @DialogHandle  
                MESSAGE TYPE [RequestMessage](@DataContent);
      END
    COMMIT TRANSACTION
END TRY
BEGIN CATCH
    ROLLBACK TRANSACTION
END CATCH
GO
Enter fullscreen mode Exit fullscreen mode

The trigger is executed after data is inserted into the tbl_DataStorage table. It stores the ID and user data in variables. Then, it updates the DataStatus to "pending processing" and adds the <Id_DataStorage> tag with the ID value to the DataContent variable. The important part is creating a dialog from the InitiatorService to the TargetService using the DataProcessContract contract. The dialog handle is stored in the DialogHandle variable as a UNIQUEIDENTIFIER, a unique identifier composed of a hexadecimal string. Next, a message of type RequestMessage is sent with the DataContent as the message body. This message will be processed by the Service Broker.

The WITH ENCRYPTION option in the BEGIN DIALOG statement determines whether encryption will be used. It encrypts all messages sent outside the SQL Server instance. The message body is encrypted with a symmetric key generated for the conversation, and the keys are encrypted using the private key with the specified certificate. In my case, the value is set to OFF. I don't need to encrypt messages where they are sent within one instance and one database.

-- Create the stored procedure sp_ProcessRequestMessage
CREATE PROCEDURE [dbo].[sp_ProcessRequestMessage]
AS
BEGIN
    DECLARE @DialogHandle UNIQUEIDENTIFIER
    DECLARE @MessageName VARCHAR(256)
    DECLARE @MessageBody XML
    DECLARE @Id_DataStorage INT
        WHILE (1 = 1)
            BEGIN
                BEGIN TRY
                    BEGIN TRANSACTION;
                        WAITFOR (
                            RECEIVE TOP(1)
                                @DialogHandle = conversation_handle,
                                @MessageName = message_type_name,   
                                @MessageBody = CAST(message_body AS XML)
                            FROM dbo.TargetQueue ), TIMEOUT 5000   
                            IF (@@ROWCOUNT = 0)
                            BEGIN
                                ROLLBACK TRANSACTION
                                BREAK
                            END
                            IF ( @MessageName = 'RequestMessage' )
                            BEGIN
                                -- any data processing here... replace WAITFOR DELAY ;
                                WAITFOR DELAY '00:01:00';
                                SET @Id_DataStorage =  @MessageBody.value('/Id_DataStorage[1]', 'INT');
                                SET @MessageBody = '<Id_DataStorage>' + CAST(@Id_DataStorage AS VARCHAR(10)) + '</Id_DataStorage>' + '<ProcessStatus>Complete</ProcessStatus>';
                                SEND ON CONVERSATION @DialogHandle MESSAGE TYPE [ResponseMessage] (@MessageBody);
                            END
                            ELSE IF ( @MessageName = 'http://schemas.microsoft.com/SQL/ServiceBroker/EndDialog' )
                            BEGIN
                                END CONVERSATION @DialogHandle ;
                            END
                            ELSE IF ( @MessageName = 'http://schemas.microsoft.com/SQL/ServiceBroker/Error' )
                            BEGIN
                                END CONVERSATION @DialogHandle ;
                            END
                    COMMIT TRANSACTION;
                END TRY
                BEGIN CATCH
                  ROLLBACK TRANSACTION
                END CATCH
            END
END
GO
Enter fullscreen mode Exit fullscreen mode

This stored procedure waits in a loop for messages from the queue and processes them. If there are no messages in the queue to process (@@ROWCOUNT = 0), the procedure terminates. The WAITFOR statement waits for a return value or a timeout. The TIMEOUT parameter determines how long the WAITFOR statement waits for messages, specified in milliseconds. If the value is -1, the wait time is unlimited. If the timeout expires, RECEIVE returns an empty result set. RECEIVE retrieves a table containing zero or more rows, where each row represents one message from the queue. In this case, we expect messages from the dbo.TargetQueue table and wait for a maximum of 5000 milliseconds. If a message of the type RequestMessage (sent by the trigger) is received, data processing is simulated. For testing purposes, the WAITFOR DELAY command is used to introduce a delay of one minute. After processing the data, a new message is created with the processed record's ID in the <Id_DataStorage> tag and the completion status in the <ProcessStatus> tag. The message is sent using the Service Broker to the second procedure, sp_ProcessResponseMessage, which updates the status of the record in the tbl_DataStorage table. If an EndDialog or Error message type is received, the conversation is ended using the END CONVERSATION command.

-- Create the stored procedure sp_ProcessResponseMessage
CREATE PROCEDURE [dbo].[sp_ProcessResponseMessage]
AS
BEGIN
    DECLARE @DialogHandle UNIQUEIDENTIFIER
    DECLARE @MessageName VARCHAR(256)
    DECLARE @MessageBody XML
    DECLARE @Id_DataStorage INT
    DECLARE @ProcessStatus NVARCHAR(64)
        WHILE (1 = 1)
            BEGIN
                BEGIN TRY
                    BEGIN TRANSACTION;
                        WAITFOR (
                            RECEIVE TOP(1)
                                @DialogHandle = conversation_handle,
                                @MessageName = message_type_name,   
                                @MessageBody = CAST(message_body AS XML)
                            FROM dbo.InitiatorQueue ), TIMEOUT 5000   
                            IF (@@ROWCOUNT = 0)
                            BEGIN
                                ROLLBACK TRANSACTION
                                BREAK
                            END
                            IF ( @MessageName = 'ResponseMessage' )
                            BEGIN
                                SET @Id_DataStorage =  @MessageBody.value('/Id_DataStorage[1]', 'INT');
                                SET @ProcessStatus =  @MessageBody.value('/ProcessStatus[1]', 'NVARCHAR(64)');
                                UPDATE [Test_ServiceBroker].[dbo].[tbl_DataStorage] SET [DataStatus] = @ProcessStatus, [CompletionDate] = GETDATE() WHERE [Id] = @Id_DataStorage;
                                END CONVERSATION @DialogHandle;
                            END
                            ELSE IF ( @MessageName = 'http://schemas.microsoft.com/SQL/ServiceBroker/EndDialog' )
                            BEGIN
                                END CONVERSATION @DialogHandle ;
                            END
                            ELSE IF ( @MessageName = 'http://schemas.microsoft.com/SQL/ServiceBroker/Error' )
                            BEGIN
                                END CONVERSATION @DialogHandle ;
                            END
                    COMMIT TRANSACTION;
                END TRY
                BEGIN CATCH
                  ROLLBACK TRANSACTION
                END CATCH
            END
END
GO
Enter fullscreen mode Exit fullscreen mode

This procedure is almost identical to the sp_ProcessRequestMessage stored procedure. The difference is that the RECEIVE command waits for data in the InitiatorQueue queue where the previous procedure sent it. The message name must be ResponseMessage. This procedure parses the XML content of the received message, updates the record in the tbl_DataStorage table with the status provided in the message, and finally terminates the conversation with the END CONVERSATION command.

I stumbled a little bit

Hmmm, here is one small problem. How will my procedures be called? Triggers are understandable; if a record is added to the table, a trigger will be executed, and a dialog will start. But what about procedures? Who triggers their execution? I discovered a useful functionality called Internal Activation. By enabling Internal Activation, the service broker begins monitoring the queue. Whenever messages become available in the queue for processing, the procedure associated with that queue will be automatically triggered. Let's take a look at the parameters for Internal Activation:

WITH STATUS: This parameter determines whether the queue is enabled and ready to receive messages (ON) or disabled (OFF).

RETENTION: If set to ON, messages in the queue are retained and marked as managed. Conversely, setting it to OFF deletes messages from the queue after processing. The default value is OFF.

ACTIVATION: This parameter controls the activation behavior after receiving a message in the queue.

ACTIVATION parameters:

  • STATUS: ON, the activation automatically.

  • PROCEDURE_NAME: Specifies the name of the stored procedure associated with the queue.

  • MAX_QUEUE_READERS: Determines the maximum number of instances of the stored procedure that can be initiated simultaneously. The maximum allowed value is 32767.

  • EXECUTE AS: Specifies the security context under which the procedure runs. It can be set to SELF (indicating the currently logged-in user) or OWNER (indicating the account that owns the queue), or any other account name.

Now, let's proceed with modifying both queues accordingly:

-- Altering the InitiatorQueue as a queue with an activation procedure
ALTER QUEUE [InitiatorQueue]
WITH ACTIVATION
(
    STATUS = ON,
    PROCEDURE_NAME = [sp_ProcessResponseMessage],
    MAX_QUEUE_READERS = 10,
    EXECUTE AS SELF
)
GO

-- Altering the TargetQueue as a queue with an activation procedure
ALTER QUEUE [TargetQueue]
WITH ACTIVATION
(
    STATUS = ON,
    PROCEDURE_NAME = [sp_ProcessRequestMessage],
    MAX_QUEUE_READERS = 10,
    EXECUTE AS SELF
)
GO
Enter fullscreen mode Exit fullscreen mode

Let's test it

Okay, that could be all for now. It's time to give it a try. I will simulate the user application by using a regular SQL INSERT INTO command to send test data to the tbl_DataStorage table.

-- Sending test data.
INSERT INTO [Test_ServiceBroker].[dbo].[tbl_DataStorage] ([DataStorage]) VALUES ('<test>fooo</test>')
Enter fullscreen mode Exit fullscreen mode
-- Viewing the table
SELECT * FROM [Test_ServiceBroker].[dbo].[tbl_DataStorage]
Enter fullscreen mode Exit fullscreen mode

If I manage to send the data and observe the output within one minute (since I have set a DELAY of 1 minute in the sp_ProcessRequestMessage procedure), I should see the value "pending processing" in the DataStatus column, which was inserted by the trigger. Also, send trigger data for processing.

Pending Processing

After one minute, the data should be processed, and the DataStatus column should show "Complete" in the output. The difference between the InsertData and CompletionDate times is approximately one minute, as determined by the 1-minute delay set in the sp_ProcessRequestMessage procedure.

Complete

Of course, it is possible to INSERT many different rows simultaneously or at the same time and observe what happens. I tried entering multiple rows at once.

-- insertion of the 15th line (trash code)
INSERT INTO [tbl_DataStorage] ([DataStorage]) VALUES ('<test>bar01</test>')
INSERT INTO [tbl_DataStorage] ([DataStorage]) VALUES ('<test>bar02</test>')
INSERT INTO [tbl_DataStorage] ([DataStorage]) VALUES ('<test>bar03</test>')
INSERT INTO [tbl_DataStorage] ([DataStorage]) VALUES ('<test>bar04</test>')
INSERT INTO [tbl_DataStorage] ([DataStorage]) VALUES ('<test>bar05</test>')
INSERT INTO [tbl_DataStorage] ([DataStorage]) VALUES ('<test>bar06</test>')
INSERT INTO [tbl_DataStorage] ([DataStorage]) VALUES ('<test>bar07</test>')
INSERT INTO [tbl_DataStorage] ([DataStorage]) VALUES ('<test>bar08</test>')
INSERT INTO [tbl_DataStorage] ([DataStorage]) VALUES ('<test>bar09</test>')
INSERT INTO [tbl_DataStorage] ([DataStorage]) VALUES ('<test>bar10</test>')
INSERT INTO [tbl_DataStorage] ([DataStorage]) VALUES ('<test>bar11</test>')
INSERT INTO [tbl_DataStorage] ([DataStorage]) VALUES ('<test>bar12</test>')
INSERT INTO [tbl_DataStorage] ([DataStorage]) VALUES ('<test>bar13</test>')
INSERT INTO [tbl_DataStorage] ([DataStorage]) VALUES ('<test>bar14</test>')
INSERT INTO [tbl_DataStorage] ([DataStorage]) VALUES ('<test>bar15</test>')
Enter fullscreen mode Exit fullscreen mode

Test multiple rows

I noticed that the data is processed sequentially in packages. This can be seen from the timestamp in the CompletionDate column, which is determined by the MAX_QUEUE_READERS setting of the queue. If I set MAX_QUEUE_READERS to 1, the data will be processed sequentially in 1-minute intervals instead of processing several rows at once. But this process is still asynchronous, the data insert does not wait for the job to complete. See the next section.

Test single row

Comparison of asynchronous vs. synchronous

It may seem unnecessarily complicated to enable the service broker, create all those objects, start a dialog and send messages. Catch the messages with procedures and they will process them. Plus create a trigger to start the communication. Instead of all this, one trigger would be enough, which after inserting data into tbl_DataStirage processes the data. This is true, but with the difference that the data will be processed synchronously. This means that the user has to wait until the data is processed.

Here is a small demonstration. First, I will create a new trigger for the tbl_DataStorage table called trg_DataStorage_insert_sync. The trigger is activated after the data is inserted and all it does is update the DataStatus column to "pending processing". It then waits for one minute before updating the DataStatus to "Complete". Essentially, the trigger performs the same actions as my service broker implementation.

-- Create the trigger for synchronous test
CREATE TRIGGER [dbo].[trg_DataStorage_insert_sync]  
    ON [Test_ServiceBroker].[dbo].[tbl_DataStorage]
    AFTER INSERT
AS
BEGIN TRY
    BEGIN TRANSACTION ;
        DECLARE @DataContent NVARCHAR(MAX)
        DECLARE @Id int
        SET @DataContent = (SELECT [DataStorage] FROM inserted i)   
        SET @Id = (SELECT Id FROM inserted i)  
        If (@DataContent IS NOT NULL)
        BEGIN
            UPDATE [Test_ServiceBroker].[dbo].[tbl_DataStorage] SET [DataStatus] = 'pending processing' WHERE [Id] = @Id;
            WAITFOR DELAY '00:01:00'; -- any data processing here... replace WAITFOR DELAY ;
            UPDATE [Test_ServiceBroker].[dbo].[tbl_DataStorage] SET [DataStatus] = 'Complete' WHERE [Id] = @Id;
      END
    COMMIT TRANSACTION
END TRY
BEGIN CATCH
    ROLLBACK TRANSACTION
END CATCH
GO
Enter fullscreen mode Exit fullscreen mode

It is still necessary to temporarily disable the trg_DataStorage_insert trigger to prevent it from sending messages through the service broker.

-- Disable trigger trg_DataStorage_insert
ALTER TABLE [tbl_DataStorage] DISABLE TRIGGER [trg_DataStorage_insert]
Enter fullscreen mode Exit fullscreen mode

Similarly to the previous example, I will insert 15 rows into the tbl_DataStorage table.

Here, you can observe that the execution of the insert took 15 minutes, which corresponds to the time it takes for 15 rows to be processed, considering the 1-minute DELAY for each row. The session waits for all the tasks to complete and only unblocks after they have been processed.

Status line

Now, I will revert everything back to its original state. I will enable the trg_DataStorage_insert trigger again and disable the trg_DataStorage_insert_sync trigger (or alternatively, I can delete it since it is no longer needed).

-- Enable trigger trg_DataStorage_insert
ALTER TABLE [tbl_DataStorage] ENABLE TRIGGER [trg_DataStorage_insert]

-- Disable trigger trg_DataStorage_insert_sync
ALTER TABLE [tbl_DataStorage] DISABLE TRIGGER [trg_DataStorage_insert_sync]
Enter fullscreen mode Exit fullscreen mode

Finally, I will insert 15 rows into the tbl_DataStorage table once again.

Status line

Perfect! The data insertion occurred instantly, and the entire processing is now happening asynchronously in the background. The session is now free, and the service broker will take care of managing everything.

Controlling message priority

SQL Service Broker offers the capability to assign priorities, allowing you to control the processing order of messages. While not strictly necessary in my particular use case with a single communication, it is valuable to have an understanding of priority settings. By default, all conversations have the same priority. Message priority is defined at the conversation level and influences the sequence in which messages are sent and received.

Priority levels range from 0 to 10, with 0 being the lowest and 10 being the highest. Messages with higher priority levels are processed before those with lower priority levels, enabling you to manage the message processing order. However, it's important to note that message priority does not guarantee immediate or real-time execution, as processing time depends on factors such as system resources and workload.

To begin, the first step is to enable the HONOR_BROKER_PRIORITY option for the database, as it is disabled by default. To determine whether the HONOR_BROKER_PRIORITY setting is enabled, you can execute a simple query that retrieves the relevant information from the sys.databases. The query provides you with the current status of the honor_broker_priority setting for each database.

-- Check if is_honor_broker_priority_on is enabled
SELECT name, is_honor_broker_priority_on FROM sys.databases;
Enter fullscreen mode Exit fullscreen mode

Enabling the honor_broker_priority setting is crucial if you want the SQL Service Broker to honor message priorities during processing.

-- Enable priority management
ALTER DATABASE [Test_ServiceBroker] SET HONOR_BROKER_PRIORITY ON;
Enter fullscreen mode Exit fullscreen mode

To create a priority level, you can use the CREATE BROKER PRIORITY command.

-- Create a priority
CREATE BROKER PRIORITY ConversationPriority
FOR CONVERSATION
SET (CONTRACT_NAME = [DataProcessContract],
PRIORITY_LEVEL = 1 );
Enter fullscreen mode Exit fullscreen mode

In this example, I have set a lower priority level for the DataProcessContract to illustrate the flexibility of message prioritization. By utilizing the ALTER command, you can easily adjust the priority level.

-- Modify priority to level 10
ALTER BROKER PRIORITY ConversationPriority
FOR CONVERSATION  
SET (PRIORITY_LEVEL = 10);
Enter fullscreen mode Exit fullscreen mode

Here, I have adjusted the priority level to 10, which represents the highest priority. This allows you to tailor the priority settings based on your specific requirements.

Now, let's take a closer look at the message queues and conversation endpoints to observe the impact of the priority changes. Upon examining the queues, you will notice that the priority column reflects the updated priority level of 10. However, it's important to note that to view the data within the InitiatorQueue and TargetQueue tables, the RETENTION attribute needs to be set to ON. This ensures that the messages persist in the queues even after being processed.

To view the messages priority within the InitiatorQueue and TargetQueue, you can execute the following query:

-- Viewing messages in the InitiatorQueue
SELECT * FROM InitiatorQueue

-- Viewing messages in the TargetQueue
SELECT * FROM TargetQueue
Enter fullscreen mode Exit fullscreen mode

Similarly, for exploring the conversation endpoints priority, you can execute the following query:

-- Viewing conversation endpoints
SELECT * FROM sys.conversation_endpoints
Enter fullscreen mode Exit fullscreen mode

By examining these message queues and conversation endpoints, you can gain valuable visibility into the status of your messages, and observe the effect of priority changes.

By utilizing different priority levels, you can effectively influence the sequence in which messages are processed within the SQL Service Broker. This capability proves particularly valuable when dealing with diverse message types or various scenarios that require distinct levels of attention or urgency. With higher priority levels, you can ensure that critical messages receive prompt processing. On the other hand, lower priority can be assigned to messages that are less time-sensitive or do not require immediate action. By carefully tailoring the priority settings, you can optimize the overall message processing flow, enabling the system to efficiently allocate resources and manage workloads according to the importance and urgency of each message.

A few insights

A few notes that helped me understand how the service broker works and how to monitor it. I was particularly interested in understanding what happens in the initiator queue, target queue, and conversation endpoints.

-- Viewing messages in the InitiatorQueue
SELECT * FROM InitiatorQueue

-- Viewing messages in the TargetQueue
SELECT * FROM TargetQueue
Enter fullscreen mode Exit fullscreen mode

Here, I can see the messages waiting to be processed in the queues. Once a message is processed, it is automatically deleted. However, if we want to keep processed messages in the queue for testing purposes, we can modify the RETENTION parameter.

-- Viewing conversation endpoints
SELECT * FROM sys.conversation_endpoints
Enter fullscreen mode Exit fullscreen mode

Each side of the conversation is represented by an endpoint, and the sys.conversation_endpoints table contains a catalog of these endpoints. Closed conversations remain in the table for 30 minutes, indicated by the security_timestamp. This helps prevent replay attacks. The service broker ensures that a conversation is not reopened if it has the same handle and originates from the same host with the same encryption key. It's important to correctly close the dialog on both the initiator and target sides to avoid the accumulation of conversation endpoints in sys.conversation_endpoints.

And here's one more small tip for improving performance. It's beneficial to consider creating and closing a conversation if I have to process many small messages within a short time. In such cases, creating and closing a conversation repeatedly in a short period would unnecessarily burden the system. Instead, it's better to create and close a conversation after sending several messages, rather than after each individual message.

Additionally, there is no requirement to receive messages one by one using RECIVE TOP(1). Instead, you can retrieve all the messages, store them in a variable of type TABLE, and utilize a CURSOR to process them before committing the transaction. There are various optimization possibilities available, and the approach chosen should align with the specific use case at hand.

Conclusion

Service Broker offers a robust and powerful messaging framework within the SQL Server ecosystem. It provides a reliable and scalable solution for asynchronous message-based communication between database components and instances. Throughout this article, we have explored the key concepts and components of SQL Service Broker, including messages, contracts, queues, services, and dialogs. We have seen how these elements work together to facilitate secure and efficient communication, enabling the exchange of information asynchronously. The built-in reliability and fault-tolerance mechanisms ensure message delivery and processing even in the presence of network interruptions or system failures.

In the future, I intend to broaden my knowledge and leverage the Service Broker to establish communication between two SQL instances, each deployed on a separate server. This communication between instances will use encryption and a certificate for secure data transfer.

Source SQL codes can be found on my GitHub.

Cover photo by Ryoji Iwata

Top comments (0)