Writing Service Broker Procedures

October 16th, 2006

It’s been a while since I posted an entry in this blog and this article is long overdue. There were a series of events that prevented me from posting this, not least impacting being the fact that I’ve opened a WoW account…

T-SQL RECEIVE. Fast.

A question often asked is how to write a typical activated procedure? This article will cover the ways to write a performant T-SQL procedure to process messages. I am not going to cover CLR procedures for now.

Basic Receive

The typical example shows how to do this by creating a WHILE loop and the RECEIVE TOP (1) the next message from the queue and process it, each message in an individual transaction:

CREATE PROCEDURE [BasicReceive]

AS

BEGIN

SET NOCOUNT ON;

DECLARE @h UNIQUEIDENTIFIER;

DECLARE @messageTypeName SYSNAME;

DECLARE @payload VARBINARY(MAX);

WHILE (1=1)

BEGIN

BEGIN TRANSACTION;

WAITFOR(RECEIVE TOP(1)

@h = conversation_handle,

@messageTypeName = message_type_name,

@payload = message_body

FROM [Target]), TIMEOUT 1000;

IF (@@ROWCOUNT = 0)

BEGIN

COMMIT;

BREAK;

END

— Some basic processing. Send back an echo reply

IF N‘DEFAULT’ = @messageTypeName

BEGIN

SEND ON CONVERSATION @h (@payload);

END

ELSE IF N‘http://schemas.microsoft.com/SQL/ServiceBroker/EndDialog’ = @messageTypeName

BEGIN

END CONVERSATION @h;

END

ELSE IF N‘http://schemas.microsoft.com/SQL/ServiceBroker/Error’ = @messageTypeName

BEGIN

— Log the received error into ERRORLOG and system Event Log (eventvwr.exe)

DECLARE @h_string NVARCHAR(100);

DECLARE @error_message NVARCHAR(4000);

SELECT @h_string = CAST(@h AS NVARCHAR(100)), @error_message = CAST(@payload AS NVARCHAR(4000));

RAISERROR (N‘Conversation %s was ended with error %s’, 10, 1, @h_string, @error_message) WITH LOG;

END CONVERSATION @h;

END

COMMIT;

END

END

GO

The question on the table is: is this the fastest way to RECEIVE messages from a queue? Well, let’s measure it how fast it is.

Methodology

First thing we need is to devise a method to measure the performance of this procedure. My proposal is to use a simple way: preload the queue with a number of messages, and then run the procedure and measure how long it takes to drain the queue. I’m going to do these measurements on my home system, which is a measly single proc P4 2.4GHz with 1 GB of RAM. The storage consists of an 80GB WD and a 160 GB Maxtor IDE drives. I’ll store the MDF on the first and the LDF on the second. Of course, as with any such measurement, I’ll pre-grow the database and log files to an acceptable size to prevent dramatic alteration of results due to file growth events. Next thing we’ll do is to create a queue and load it with 100 conversations each with 100 messages (for a total of 10000 messages). For this, I’ll create a procedure that loads the queue:

IF EXISTS(SELECT * FROM sys.databases WHERE NAME = ‘ReceivePerfBlog’)

DROP DATABASE [ReceivePerfBlog];

GO

CREATE DATABASE [ReceivePerfBlog]

ON (NAME = ReceivePerfBlog, FILENAME = ‘C:\Program Files\Microsoft SQL Server\MSSQL.1\MSSQL\Data\ReceivePerfBlog.MDF’, SIZE = 4GB)

LOG ON (NAME = ReceivePerfBlog_log, FILENAME = ‘E:\DATA\ReceivePerfBlog.LDF’, SIZE = 10GB);

GO

USE [ReceivePerfBlog];

GO

CREATE QUEUE [Initiator];

CREATE QUEUE [Target];

CREATE SERVICE [Initiator] ON QUEUE [Initiator];

CREATE SERVICE [Target] ON QUEUE [Target] ([DEFAULT]);

GO

— This procedure loads the test queue qith the

number of messages and conversations passed in

CREATE PROCEDURE LoadQueueReceivePerfBlog

@conversationCount INT,

@messagesPerConversation INT,

@payload VARBINARY(MAX)

AS

BEGIN

SET NOCOUNT ON;

DECLARE @batchCount INT;

SELECT @batchCount = 0;

DECLARE @h UNIQUEIDENTIFIER;

BEGIN TRANSACTION;

WHILE @conversationCount > 0

BEGIN

BEGIN DIALOG CONVERSATION @h

FROM SERVICE [Initiator]

TO SERVICE N‘Target’, ‘current database’

WITH ENCRYPTION = OFF;

DECLARE @messageCount INT;

SELECT @messageCount = 0;

WHILE @messageCount < @messagesPerConversation

BEGIN

SEND ON CONVERSATION @h (@payload);

SELECT @messageCount = @messageCount + 1,

@batchCount = @batchCount + 1;

IF @batchCount >= 100

BEGIN

COMMIT;

SELECT @batchCount = 0;

BEGIN TRANSACTION;

END

END

SELECT @conversationCount = @conversationCount1

END

COMMIT;

END

GO

Measuring the performance

First procedure we’re going to measure is the very basic RECEIVE TOP (1) that processes on single message per transaction.

USE [ReceivePerfBlog];

GO

DECLARE @payload VARBINARY(MAX);

SELECT @payload = CAST(N‘<Test/>’ AS VARBINARY(MAX));

EXEC LoadQueueReceivePerfBlog 100,100, @payload;

GO

DECLARE @msgCount FLOAT;

DECLARE @startTime DATETIME;

DECLARE @endTime DATETIME;

SELECT @msgCount = COUNT(*) FROM [Target];

SELECT @startTime = GETDATE();

EXEC [BasicReceive];

SELECT @endTime = GETDATE();

SELECT @startTime as [Start],

@endTime as [End],

@msgCount as [Count],

DATEDIFF(second, @startTime, @endTime) as [Duration],

@msgCount/DATEDIFF(millisecond, @startTime, @endTime)*1000 as [Rate];

GO

These are the reported results on my machine:

Start End Count Duration Rate

———————– ———————– ———————- ———– ———————-

2006-10-14 13:22:25.060 2006-10-14 13:22:57.107 10000 32 312.051426075017

(1 row(s) affected)

So the basic procedure can process about 312 msgs/sec. This in itself is no a bad result, but let’s see if we can go higher.

Batched Commits

The first thing we can look at is to change the one message per transaction into batching multiple messages into one transaction. This is the most simple and basic optimization any database application should think of first, as the commit rate is probably the very first bottle necks any system will hit, especially on commodity hardware. So we’ll do a simple modification to our basic RECEIVE procedure: we’ll keep a counter of messages processed and commit only after, say, 100 messages were processed. Everything else in the processing stays the same:

IF EXISTS(SELECT * FROM sys.procedures WHERE NAME = N‘BatchedReceive’)

DROP PROCEDURE [BatchedReceive];

GO

CREATE PROCEDURE [BatchedReceive]

AS

BEGIN

SET NOCOUNT ON;

DECLARE @h UNIQUEIDENTIFIER;

DECLARE @messageTypeName SYSNAME;

DECLARE @payload VARBINARY(MAX);

DECLARE @batchCount INT;

SELECT @batchCount = 0;

BEGIN TRANSACTION;

WHILE (1=1)

BEGIN

WAITFOR(RECEIVE TOP(1)

@h = conversation_handle,

@messageTypeName = message_type_name,

@payload = message_body

FROM [Target]), TIMEOUT 1000;

IF (@@ROWCOUNT = 0)

BEGIN

BREAK;

END

— Some basic processing. Send back an echo reply

IF N‘DEFAULT’ = @messageTypeName

BEGIN

SEND ON CONVERSATION @h (@payload);

END

ELSE IF N‘http://schemas.microsoft.com/SQL/ServiceBroker/EndDialog’ = @messageTypeName

BEGIN

END CONVERSATION @h;

END

ELSE IF N‘http://schemas.microsoft.com/SQL/ServiceBroker/Error’ = @messageTypeName

BEGIN

— Log the received error into ERRORLOG and system Event Log (eventvwr.exe)

DECLARE @h_string NVARCHAR(100);

DECLARE @error_message NVARCHAR(4000);

SELECT @h_string = CAST(@h AS NVARCHAR(100)), @error_message = CAST(@payload AS NVARCHAR(4000));

RAISERROR (N‘Conversation %s was ended with error %s’, 10, 1, @h_string, @error_message) WITH LOG;

END CONVERSATION @h;

END

— Increment the batch count and commit every 100 messages

SELECT @batchCount = @batchCount + 1

IF @batchCount >= 100

BEGIN

COMMIT;

SELECT @batchCount = 0;

BEGIN TRANSACTION;

END

END

COMMIT;

END

GO

I have highlighted the parts that changed in the procedure code. Let’s go ahead, load again 10000 messages in the queue and measure how fast they are drained:

ALTER DATABASE [ReceivePerfBlog] SET NEW_BROKER WITH ROLLBACK IMMEDIATE;

GO

USE [ReceivePerfBlog];

GO

DECLARE @payload VARBINARY(MAX);

SELECT @payload = CAST(N‘<Test/>’ AS VARBINARY(MAX));

EXEC LoadQueueReceivePerfBlog 100,100, @payload;

GO

DECLARE @msgCount FLOAT;

DECLARE @startTime DATETIME;

DECLARE @endTime DATETIME;

SELECT @msgCount = COUNT(*) FROM [Target];

SELECT @startTime = GETDATE();

EXEC [BatchedReceive];

SELECT @endTime = GETDATE();

SELECT @startTime as [Start],

@endTime as [End],

@msgCount as [Count],

DATEDIFF(second, @startTime, @endTime) as [Duration],

@msgCount/DATEDIFF(millisecond, @startTime, @endTime)*1000 as [Rate];

GO

The results are:

Start End Count Duration Rate

———————– ———————– ———————- ———– ———————-

2006-10-14 13:37:36.497 2006-10-14 13:38:02.560 10000 26 383.685684687104

(1 row(s) affected)

So we increased the processing rate at 383 msgs/sec, about 22% faster than the single message per transaction procedure. The result is better, but not earth shattering better. Mostly this is because I already took the precaution of separating the LDF file into a disk that has no other activity other that this LDF file, so streaming in the log pages is quite fast.

Cursor based processing

The next step will take is a departure from the basic RECEIVE procedure. Instead of using the TOP (1) clause, we’ll use a cursor and process as many messages as RECEIE returns in one execution. Because the T-SQL cursors unfortunately cannot be declared on top of the RECEIVE resultset, we’re going to use a trick: RECEIVE into a table variable, then iterate the table variable using a cursor. The processing for each message will be identical as for the previous cases.

IF EXISTS(SELECT * FROM sys.procedures WHERE NAME = N‘CursorReceive’)

DROP PROCEDURE [CursorReceive];

GO

CREATE PROCEDURE [CursorReceive]

AS

BEGIN

SET NOCOUNT ON;

DECLARE @tableMessages TABLE (

queuing_order BIGINT,

conversation_handle UNIQUEIDENTIFIER,

message_type_name SYSNAME,

message_body VARBINARY(MAX));

— Create cursor over the table variable

— Use the queueing_order column to

— preserve the message order

DECLARE cursorMessages

CURSOR FORWARD_ONLY READ_ONLY

FOR SELECT conversation_handle,

message_type_name,

message_body

FROM @tableMessages

ORDER BY queuing_order;

DECLARE @h UNIQUEIDENTIFIER;

DECLARE @messageTypeName SYSNAME;

DECLARE @payload VARBINARY(MAX);

WHILE (1=1)

BEGIN

BEGIN TRANSACTION;

WAITFOR(RECEIVE

queuing_order,

conversation_handle,

message_type_name,

message_body

FROM [Target]

INTO @tableMessages), TIMEOUT 1000;

IF (@@ROWCOUNT = 0)

BEGIN

COMMIT;

BREAK;

END

OPEN cursorMessages;

WHILE (1=1)

BEGIN

FETCH NEXT FROM cursorMessages

INTO @h, @messageTypeName, @payload;

IF (@@FETCH_STATUS != 0)

BREAK;

— Some basic processing. Send back an echo reply

IF N‘DEFAULT’ = @messageTypeName

BEGIN

SEND ON CONVERSATION @h (@payload);

END

ELSE IF N‘http://schemas.microsoft.com/SQL/ServiceBroker/EndDialog’ = @messageTypeName

BEGIN

END CONVERSATION @h;

END

ELSE IF N‘http://schemas.microsoft.com/SQL/ServiceBroker/Error’ = @messageTypeName

BEGIN

— Log the received error into ERRORLOG and system Event Log (eventvwr.exe)

DECLARE @h_string NVARCHAR(100);

DECLARE @error_message NVARCHAR(4000);

SELECT @h_string = CAST(@h AS NVARCHAR(100)), @error_message = CAST(@payload AS NVARCHAR(4000));

RAISERROR (N‘Conversation %s was ended with error %s’, 10, 1, @h_string, @error_message) WITH LOG;

END CONVERSATION @h;

END

END

CLOSE cursorMessages;

DELETE FROM @tableMessages;

COMMIT;

END

DEALLOCATE cursorMessages;

END

GO

Again, I highlighted the significant changes on the procedure.

Let’s run this and see the results:

ALTER DATABASE [ReceivePerfBlog] SET NEW_BROKER WITH ROLLBACK IMMEDIATE;

GO

USE [ReceivePerfBlog];

GO

DECLARE @payload VARBINARY(MAX);

SELECT @payload = CAST(N‘<Test/>’ AS VARBINARY(MAX));

EXEC LoadQueueReceivePerfBlog 100,100, @payload;

GO

DECLARE @msgCount FLOAT;

DECLARE @startTime DATETIME;

DECLARE @endTime DATETIME;

SELECT @msgCount = COUNT(*) FROM [Target];

SELECT @startTime = GETDATE();

EXEC [CursorReceive];

SELECT @endTime = GETDATE();

SELECT @startTime as [Start],

@endTime as [End],

@msgCount as [Count],

DATEDIFF(second, @startTime, @endTime) as [Duration],

@msgCount/DATEDIFF(millisecond, @startTime, @endTime)*1000 as [Rate];

GO

Start End Count Duration Rate

———————– ———————– ———————- ———– ———————-

2006-10-14 14:08:01.623 2006-10-14 14:08:07.590 10000 6 1676.16493462957

(1 row(s) affected)

Now we’re talking! This kind of processing improved our performance to 1676 msgs/sec, a 437% improvement. BTW, if you’re asking yourself what happened to the batch commit, is still there. Because the procedure processes one entire RECEIVE resultset in a transaction, the batch commit is inherent.

With this procedure we pretty much pushed the performance of a message by message processing as far as it goes. The procedure can be further tweaked, but will not give significant performance boosts. For instance, we can replace the message type name with the message type id. This will be a faster comparison (int vs. string) and also will prevent RECEIVE from joining into it’s plan the sys.service_message_types view.

IF EXISTS(SELECT * FROM sys.procedures WHERE NAME = N‘CursorMessageTypeReceive’)

DROP PROCEDURE [CursorMessageTypeReceive];

GO

CREATE PROCEDURE [CursorMessageTypeReceive]

AS

BEGIN

SET NOCOUNT ON;

DECLARE @tableMessages TABLE (

queuing_order BIGINT,

conversation_handle UNIQUEIDENTIFIER,

message_type INT,

message_body VARBINARY(MAX));

— Create cursor over the table variable

— Use the queueing_order column to

— preserve the message order

DECLARE cursorMessages

CURSOR FORWARD_ONLY READ_ONLY

FOR SELECT conversation_handle,

message_type,

message_body

FROM @tableMessages

ORDER BY queuing_order;

DECLARE @h UNIQUEIDENTIFIER;

DECLARE @messageType INT;

DECLARE @payload VARBINARY(MAX);

WHILE (1=1)

BEGIN

BEGIN TRANSACTION;

WAITFOR(RECEIVE

queuing_order,

conversation_handle,

message_type_id,

message_body

FROM [Target]

INTO @tableMessages), TIMEOUT 1000;

IF (@@ROWCOUNT = 0)

BEGIN

COMMIT;

BREAK;

END

OPEN cursorMessages;

WHILE (1=1)

BEGIN

FETCH NEXT FROM cursorMessages

INTO @h, @messageType, @payload;

IF (@@FETCH_STATUS != 0)

BREAK;

— Some basic processing. Send back an echo reply

IF 14 = @messageType

BEGIN

SEND ON CONVERSATION @h (@payload);

END

ELSE IF 2 = @messageType

BEGIN

END CONVERSATION @h;

END

ELSE IF 1 = @messageType

BEGIN

— Log the received error into ERRORLOG and system Event Log (eventvwr.exe)

DECLARE @h_string NVARCHAR(100);

DECLARE @error_message NVARCHAR(4000);

SELECT @h_string = CAST(@h AS NVARCHAR(100)), @error_message = CAST(@payload AS NVARCHAR(4000));

RAISERROR (N‘Conversation %s was ended with error %s’, 10, 1, @h_string, @error_message) WITH LOG;

END CONVERSATION @h;

END

END

CLOSE cursorMessages;

DELETE FROM @tableMessages;

COMMIT;

END

DEALLOCATE cursorMessages;

END

GO

With these improvements, the results are:

Start End Count Duration Rate

———————– ———————– ———————- ———– ———————-

2006-10-14 14:04:53.967 2006-10-14 14:04:59.763 10000 6 1725.32781228433

(1 row(s) affected)

This gives only a 3% improvement, and makes the procedure more difficult to maintain and debug because it uses the message types IDs instead of the names. I’m not convinced the benefits justify the potential problems.

Set based Processing

The next step we can do is not a free one. We’re going to move away from the message by message processing and see how can we do set based processing. One cautionary note is required here: not all messaging applications are going to be able to do a set based processing for the incoming messages. But certain messaging patterns are particularly well fit for this kind of processing. Whenever one conversation side has to send long streams of messages w/o a response from the other side, this kind of processing usually can be applied. A good example of such pattern is audit: front end machines have to record user actions for audit needs. Rather than connecting to a central database and inserting directly the audit record into the database, they start a one directional conversation on which they’ll send the audit data as messages. The back-end processing of the message is straightforward: extract the audit data from the message payload and insert it into the audit tables. This can be done as a set based operation, the entire RECEIVE resultset can be inserted as a single set into the audit table.

XML payload

So let’s create a dummy audit like back-end: the messages consist of an XML payload containing the user name, the date and some arbitrary audit payload. The back end has to store these records into a table, shredding the XML into relational columns first. As with any well planned application that does this, it should also store the original XML received.

ALTER DATABASE [ReceivePerfBlog] SET NEW_BROKER WITH ROLLBACK IMMEDIATE;

GO

USE [ReceivePerfBlog];

GO

IF EXISTS(SELECT * FROM sys.tables WHERE NAME = N‘PayloadData’)

DROP TABLE [PayloadData];

GO

CREATE TABLE [PayloadData] (

[Id] INT NOT NULL IDENTITY,

[DateTime] DATETIME,

[Payload] NVARCHAR(MAX),

[User] NVARCHAR(256),

[Original] XML);

GO

IF EXISTS(SELECT * FROM sys.procedures WHERE NAME = N‘RowsetDatagram’)

DROP PROCEDURE [RowsetDatagram];

GO

CREATE PROCEDURE [RowsetDatagram]

AS

BEGIN

SET NOCOUNT ON;

DECLARE @tableMessages TABLE (

queuing_order BIGINT,

conversation_handle UNIQUEIDENTIFIER,

message_type_name SYSNAME,

payload XML);–([DatagramSchemaCollection]));

WHILE (1=1)

BEGIN

BEGIN TRANSACTION;

WAITFOR(RECEIVE

queuing_order,

conversation_handle,

message_type_name,

CAST(message_body AS XML) AS payload

FROM [Target]

INTO @tableMessages), TIMEOUT 1000;

IF (@@ROWCOUNT = 0)

BEGIN

COMMIT;

BREAK;

END

— Rowset based datagram processing:

— Extract the XML attributes and insert into table

;WITH XMLNAMESPACES (DEFAULT ‘http://tempuri.org/RemusRusanu/Blog/10/14/2006/Datagram’)

INSERT INTO [PayloadData]

([DateTime], [Payload], [User], [Original])

SELECT payload.value(N‘(/Datagram/@date-time)[1]’, ‘DATETIME’),

payload.value(N‘(/Datagram/@payload)[1]’, ‘NVARCHAR(MAX)’),

payload.value(N‘(/Datagram/@user)[1]’, ‘NVARCHAR(256)’),

payload

FROM @tableMessages

WHERE message_type_name = N‘DEFAULT’

ORDER BY queuing_order;

COMMIT;

DELETE FROM @tableMessages;

END

END

GO

To test this procedure, we’re gonna have to preload the queue with valid XML payloads:

DECLARE @xmlPayload XML;

DECLARE @payload VARBINARY(MAX);

WITH XMLNAMESPACES (DEFAULT ‘http://tempuri.org/RemusRusanu/Blog/10/14/2006/Datagram’)

SELECT @xmlPayload = (SELECT GETDATE() AS [@date-time],

SUSER_SNAME() AS [@user],

N‘Some Data’ AS [@payload]

FOR XML PATH(N‘Datagram’), TYPE);

SELECT @payload = CAST(@xmlPayload AS VARBINARY(MAX));

EXEC LoadQueueReceivePerfBlog 100,100, @payload;

GO

Here are the results for this procedure:

Start End Count Duration Rate

———————– ———————– ———————- ———– ———————-

2006-10-14 14:55:23.827 2006-10-14 14:55:28.733 10000 5 2038.32042397065

(1 row(s) affected)

We managed to boost the numbers up to above 2000 msgs/sec! Not bad for commodity hardware, like my home system.

Binary Payload

We all love and like XML, but at the end of the day XML is text all too fond of brackets. How much could we improve the performance by moving to a binary payload for these audit records? Now doing binary marshaling and un-marshaling in T-SQL code is not for the faint of heart, but I’ve seen braver things. The first thing we need is to create two functions, one that marshals the audit data into a binary blob and one that un-marshals out the original data from a blob:

CREATE FUNCTION [BinaryMarhsalPayload] (

@dateTime DATETIME,

@payload VARBINARY(MAX),

@user NVARCHAR(256))

RETURNS VARBINARY(MAX)

AS

BEGIN

DECLARE @marshaledPayload VARBINARY(MAX);

DECLARE @payloadLength BIGINT;

DECLARE @userLength INT;

SELECT @payloadLength = LEN(@payload), @userLength = LEN(@user)*2; — wchar_t

SELECT @marshaledPayload = CAST(@dateTime AS VARBINARY(MAX)) +

CAST(@payloadLength AS VARBINARY(MAX)) + @payload +

CAST(@userLength AS VARBINARY(MAX)) + CAST(@user AS VARBINARY(MAX));

RETURN (@marshaledPayload);

END

GO

IF EXISTS (SELECT * FROM sys.objects WHERE NAME = N‘BinaryUnmarhsalPayload’)

DROP FUNCTION [BinaryUnmarhsalPayload]

GO

CREATE FUNCTION [BinaryUnmarhsalPayload] (

@message_body VARBINARY(MAX))

RETURNS @unmarshaledBody TABLE (

[DateTime] DATETIME,

[Payload] VARBINARY(MAX),

[User] NVARCHAR(256))

AS

BEGIN

DECLARE @dateTime DATETIME;

DECLARE @user NVARCHAR(MAX);

DECLARE @payload VARBINARY(MAX);

DECLARE @payloadLength BIGINT;

DECLARE @userLength INT;

SELECT @dateTime = CAST(SUBSTRING(@message_body, 1, 8) AS DATETIME);

SELECT @payloadLength = CAST(SUBSTRING(@message_body, 9, 8) AS BIGINT);

SELECT @payload = SUBSTRING(@message_body, 17, @payloadLength);

SELECT @userLength = CAST(SUBSTRING(@message_body, @payloadLength + 17, 4) AS INT)

SELECT @user = CAST(SUBSTRING(@message_body, @payloadLength + 21, @userLength) AS NVARCHAR(256));

INSERT INTO @unmarshaledBody

VALUES (@dateTime, @payload, @user);

RETURN;

END

GO

We can now create our test procedure:

IF EXISTS(SELECT * FROM sys.tables WHERE NAME = N‘PayloadData’)

DROP TABLE [PayloadData];

GO

CREATE TABLE [PayloadData] (

[Id] INT NOT NULL IDENTITY,

[DateTime] DATETIME,

[Payload] NVARCHAR(MAX),

[User] NVARCHAR(256));

GO

IF EXISTS(SELECT * FROM sys.procedures WHERE NAME = N‘RowsetBinaryDatagram’)

DROP PROCEDURE [RowsetBinaryDatagram];

GO

CREATE PROCEDURE [RowsetBinaryDatagram]

AS

BEGIN

SET NOCOUNT ON;

DECLARE @tableMessages TABLE (

queuing_order BIGINT,

conversation_handle UNIQUEIDENTIFIER,

message_type_name SYSNAME,

message_body VARBINARY(MAX));

WHILE (1=1)

BEGIN

BEGIN TRANSACTION;

WAITFOR(RECEIVE

queuing_order,

conversation_handle,

message_type_name,

message_body

FROM [Target]

INTO @tableMessages), TIMEOUT 1000;

IF (@@ROWCOUNT = 0)

BEGIN

COMMIT;

BREAK;

END

— Rowset based datagram processing:

— Unmarshal the binary result into the table

INSERT INTO [PayloadData]

([DateTime], [Payload], [User])

SELECT [DateTime], [Payload], [User]

FROM @tableMessages

CROSS APPLY dbo.[BinaryUnmarhsalPayload](message_body)

WHERE message_type_name = ‘DEFAULT’;

COMMIT;

DELETE FROM @tableMessages;

END

END

GO

Note the use of the CROSS APPLY syntax to produce the function output as columns into the SELECT projection. Here are the result numbers:

Start End Count Duration Rate

———————– ———————– ———————- ———– ———————-

2006-10-14 15:09:16.013 2006-10-14 15:09:19.700 10000 3 2712.96798697775

(1 row(s) affected)

Right, that is 2712 msgs/sec. This is how far my knowledge can get you. I don’t know of any faster way to process messages than this. But I’m sure I’ll soon be proven wrong by some intrepid user J Are you willing to pick up this challenge?

Activation

I have measured the performance of the procedure by directly invoking the procedure. How is this different if activation would be involved? Not much. From performance point of view invoking a procedure from the activated context is not different that invoking it from a user session. But there are other things to consider:

  • Multiple CPU machines. Back-end machines are often multiple proc machines, usually in the 4-way and 8-way range. The more CPUs, the better, if you can pay the bill J. When hosting activation on such machines, usually you’d configure the queue with a max_queue_readers to match the number of CPUs. The performance results we’ve seen today would scale nearly linearly with each new instance of the procedure added, provided one thing: there are enough distinct conversation groups in the queue to feed all the procedures. What happens is that each procedure would lock a conversation group to produce the RECEIVE resultset. Even if there are a million messages in the queue, if they are all on once conversation a second instance of the activated procedure would simply not have access to them!
  • Activation warm up time. If you’re measuring activated procedures performance, you’d have to consider that new procedure instances are launched at most one every 5 seconds. So if you have an 8-way system with max_queue_readers = 8, it would take 35 seconds just to launch all the 8 procedures! Take this into consideration when measuring performance involving activation.

Conclusions

We showed how we can achieve results of processing more that 2500 msgs/sec on a really low end machine. The actual results you may get in your application, of course, will vary.

While the situation tested is idealized (drain of a preloaded queue), often times in real production systems similar situations happen (e.g. after a spike of incoming messages, or after the processing service was stopped for maintenance for a period). Also, the best results we’ve seen are achievable only on specific message exchange patterns.

The cursor based processing can perform only if the cost of setting up the cursor is largely outweigh by the cost of processing only one message (TOP 1) from a potential resultset of tens or hundreds of messages. If the message exchange pattern is request-reply, than a RECEIVE cannot return more than one message in a resultset (the request). In a request-reply case, the basic batched RECEIVE procedure might be the best performing one.

7 responses to “Writing Service Broker Procedures”

  1. Rachit says:

    Hey Remus,
    I was reading this article about the SSB performance (Writing SB Procedures). I’ve a question, what if the same benchmark we run on the remote server? What do you think the performance of SSB will be?

    Here’s my scenario: HttpHandler –> Create Message (avg could be upto 1000 or 1500/sec) –> Local SQL instance SSB –> Unqueue/shove them to Central 8-way SQL server tables

    Any suggestion what to keep in mind?

  2. remus says:

    Hello Rachit,

    Can you use the contact form to start talking by mail rather than the blog comments? I think I have a pretty good idea about your scenario, I’ve seen this deployed. But the discution is more complex and the comment communication channel is not very helpful nor private enough.

  3. Kevin Jeyakanthan says:

    Hi Remus,

    Very intersting and useful article. I however have a question. When you do set based processing, you don’t seem to end the conversations. I thought Service Broker at the moment does not support one directional conversations. If you do want to do a fire and forget kind of scenario you would have to have an activated procedure on the send queue as well to complete the conversation. Am I missing something here?

    Kevin

  4. remus says:

    Hello Kevin,
    You are absolutely right, a proper procedure would have to inspect the RECEIVE resultset message types and properly process EndDialog and Error message types. As usually with these posts, they are provided for guidance only and not as production-ready code 🙂

  5. remus says:

    The missing BinaryMarshal procedure:

    CREATE FUNCTION [BinaryMarhsalPayload] (
    @dateTime DATETIME,
    @payload VARBINARY(MAX),
    @user NVARCHAR(256))
    RETURNS VARBINARY(MAX)
    AS
    BEGIN
    DECLARE @marshaledPayload VARBINARY(MAX);
    DECLARE @payloadLength BIGINT;
    DECLARE @userLength INT;
    SELECT @payloadLength = LEN(@payload), @userLength = LEN(@user)*2; — wchar_t
    SELECT @marshaledPayload = CAST(@dateTime AS VARBINARY(MAX)) +
    CAST(@payloadLength AS VARBINARY(MAX)) + @payload +
    CAST(@userLength AS VARBINARY(MAX)) + CAST(@user AS VARBINARY(MAX));
    RETURN (@marshaledPayload);
    END
    GO

  6. Lmu92 says:

    Hi Remus,

    I’m struggling with the following scenario:
    A table in a “conversation database” has an insert trigger that will fire a servie broker message to start a procedure on a different server that will process the new data. To avoid starting the procedure multiple times it gets only activated if currently not running.
    Everything works fine except for one thing:
    Since the insert comes from an ORACLE system, it is not performed as a batch but rather as a single line transaction (AFAIK it’s an issue caused by Microsoft data connector between SQL Server and Oracle).
    The effect:
    Each new line will fire a service broker message but the receiving procedure is batch oriented.
    Is there any way to avoid such beavior except for changing the procedure from batch oriented to single line processing?
    I tried Receive(10), WAITFOR clause and some other stuff but I still get messages for each inserted row.

    Regards
    Lutz

  7. […] procedure using the fastest set oriented message processing, similar to what I recommend in Writing Service Broker Procedures. After trying to speed up the IO system, moving the drives to fastest LUNs available in the […]