Processing conversation with priority order
March 28th, 2006One of the most frequent questions about Service Broker is whether it supports any sort of priority for messages. But having priority within a conversation would conflict directly with the exactly-once-in-order guarantee of a conversation. In a good SOA design the two services involved in a conversation are supposed to be independent: separate developers, separate orgs, separate admins etc. If one service can set the priority of an individual message w/o the consent of the other, this action could wreck havoc on how the messages are processed, since the other service may expect the messages in order. A different story though is to have priority for individual conversations. Conversations are supposed to be independent and atomic; the processing order should not matter so having the possibility of setting a priority for a conversation makes sense. Roger has recently addressed the same problem and he already has a couple of posts on the topic: http://blogs.msdn.com/rogerwolterblog/archive/2006/03/11/549730.aspx and http://blogs.msdn.com/rogerwolterblog/archive/2006/03/17/554134.aspx
My idea of how to implement priority is different that Roger’s. My solution apply to the case when there are messages that take a long time to be processed, so the service queue will accumulate a long lists of pending messages during peak hours, and will (slowly?) make progress through the queue, draining it empty at off hours. Perhaps the admin of the service would like a way to promote certain conversations to be processed sooner, or the requesting service might want to send a message asking for a higher priority on a given conversation.
Promote pending conversation to the front of the queue
In principle the answer is very simple: the RECEIVE verb has WHERE clause that can be used to specify the conversation desired. So what you need is a way to pass the conversation IDs and the desired priority in an out of band fashion (out of band relatively to the message queue and the processing service). This out of band fashion can vary broadly:
– if the processing service is an application running on a computer in front of an human operator, the User Interface can contain a list of pending conversations and the operator can drag-and-drop list items to establish the desired priority
– the processing service can run a priority algorithm each time to decide which is the most urgent conversation to process next (the solution in Roger’s blog)
– the processing service looks up a priority table to retrieve the next conversation to be processed
BTW, in practice you will quickly notice that you want to control the priority of conversation groups not of individual conversations. This is because the RECEIVE verb locks an entire group and naturally returns all messages for an entire group in one result set. In the case when you don’t use related conversations, then groups map one to one to individual conversations and it makes no difference. If you do use related conversations, I really cannot see a situation when one priority is desired for a conversation in the group and another priority for another conversation in the same group.
Technically, the solution to look up a priority table is fairly easy to implement. Simply change the typical WAITFOR(RECEIVE…) loop into a loop that first checks the priority table:
BEGIN TRANSACTION
SELECT TOP 1 @cg = conversation_group
FROM priority_table
ORDER BY [priority] ;
IF @cg IS NULL
WAITFOR (GET CONVERSATION GROUP @cg FROM [queue]);
WHILE @cg IS NOT NULL
BEGIN
RECEIVE … FROM [queue] WHERE conversation_group = @cg;
— process messages
COMMIT;
BEGIN TRANSACTION;
SELECT TOP 1 @cg = conversation_group
FROM priority_table
ORDER BY [priority];
IF @cg IS NULL
WAITFOR (GET CONVERSATION GROUP @cg FROM [queue]);
END
COMMIT;
The exact semantics of how the priority_table is processed depends on the service semantics. I believe the most rationale semantic would be remove the conversation_group retrieved from this table after is processed, like this (the unchanged code is grayed out):
BEGIN TRANSACTION
SELECT TOP 1 @cg = conversation_group
FROM priority_table
ORDER BY [priority] ;
IF @cg IS NULL
WAITFOR (GET CONVERSATION GROUP @cg FROM [queue]);
WHILE @cg IS NOT NULL
BEGIN
RECEIVE … FROM [queue] WHERE conversation_group = @cg;
— process messages
— always delete the currently processed @cg from the priority table
DELETE FROM priority_table
WHERE conversation_group = @cg;
COMMIT;
BEGIN TRANSACTION;
SELECT TOP 1 @cg = conversation_group
FROM priority_table
ORDER BY [priority];
IF @cg IS NULL
WAITFOR (GET CONVERSATION GROUP @cg FROM [queue]);
END
COMMIT;
The inserts into the priority_table can be managed by an external admin tool or by the service itself (see below).
Offering priority processing on the service interface itself
How about the case when the priority can be asked for by the remote side of a conversation? Ideally a message could be sent on an existing conversation, and the result of the message would be that the pending work in the service queue would be bumped into the top of the work queue. The idea is fairly simple: a new message type (say [SetPriority]) is added to the service contract and the service itself processes the [SetPriority] by inserting the conversation into the priority_table (again, the irrelevant part is grayed out):
BEGIN TRANSACTION
SELECT TOP 1 @cg = conversation_group
FROM priority_table
ORDER BY [priority] ;
IF @cg IS NULL
WAITFOR (GET CONVERSATION GROUP @cg FROM [queue]);
WHILE @cg IS NOT NULL
BEGIN
RECEIVE @message_type_name = message_type_name,
… FROM [queue] WHERE conversation_group = @cg;
IF @message_type = N‘SetPriority’
BEGIN
INSERT INTO priority_table (conversation_group, priority)
VALUES (@cg, @default_priority);
END
ELSE
BEGIN
— process other messages
— always delete the currently processed @cg from the priority table
DELETE FROM priority_table
WHERE conversation_group = @cg;
END
COMMIT;
BEGIN TRANSACTION;
SELECT TOP 1 @cg = conversation_group
FROM priority_table
ORDER BY [priority];
IF @cg IS NULL
WAITFOR (GET CONVERSATION GROUP @cg FROM [queue]);
END
COMMIT;
But there is one problem with this simple approach: the service will not get to process this message, simply because the message will be at the bottom of the service queue! Apparently, we have a Catch-22 problem here. My solution to this problem is to split the service into to services: a front end service, to which the peers are opening conversations, and an internal service, that does the real, long timed, processing.
Now here is the actual code for the front end and back end services.
— Create the database for the sample
create database sample_priority_queuing
go
use sample_priority_queuing;
go
———————————————————————–
— Create the message types for the sample
———————————————————————–
CREATE MESSAGE TYPE [LongWorkloadRequest] VALIDATION = WELL_FORMED_XML;
CREATE MESSAGE TYPE [SetPriority] VALIDATION = WELL_FORMED_XML;
CREATE MESSAGE TYPE [Response] VALIDATION = WELL_FORMED_XML;
GO
— This is the contract used by service clients
CREATE CONTRACT [RequestWithPriority] (
[LongWorkloadRequest] SENT BY INITIATOR,
[SetPriority] SENT BY INITIATOR,
[Response] SENT BY TARGET);
GO
— This is a stripped down version of the contract,
— used only by the internal back end service
CREATE CONTRACT [RequestInternal] (
[LongWorkloadRequest] SENT BY INITIATOR,
[Response] SENT BY TARGET);
GO
CREATE QUEUE FrontEndQueue;
CREATE QUEUE BackEndQueue;
GO
— the front end service
CREATE SERVICE [ms.com/Samples/Broker/PriorityService]
ON QUEUE [FrontEndQueue]
([RequestWithPriority]);
— the back end service
CREATE SERVICE [BackEndService]
ON QUEUE [BackEndQueue]
([RequestInternal]);
GO
— this table links incomming requests to the fron end
— to the back end service
CREATE TABLE requests_bindings (
front_end_conversation UNIQUEIDENTIFIER PRIMARY KEY,
back_end_conversation UNIQUEIDENTIFIER UNIQUE);
GO
———————————————————
— Procedure for retrieving the peer conversation from
— the bindings table. Will retrieve front_end_conversation
— from back_end_conversation and vice-versa
———————————————————
CREATE PROCEDURE binding_get_peer (
@conversation UNIQUEIDENTIFIER,
@peer UNIQUEIDENTIFIER OUTPUT)
AS
SET NOCOUNT ON;
SELECT @peer = (
SELECT
back_end_conversation
FROM requests_bindings
WHERE front_end_conversation = @conversation
UNION ALL
SELECT front_end_conversation
FROM requests_bindings
WHERE back_end_conversation = @conversation)
IF 0 = @@ROWCOUNT
BEGIN
SELECT @peer = NULL;
END
GO
———————————————————
— Procedure for retrieving a back_end_conversation for
— a fron end conversation. Will initiate a new conversation
— if one does not already exists
———————————————————
CREATE PROCEDURE binding_get_back_end (
@front_end_conversation UNIQUEIDENTIFIER,
@back_end_conversation UNIQUEIDENTIFIER OUTPUT)
AS
SET NOCOUNT ON;
BEGIN TRANSACTION;
SELECT @back_end_conversation = back_end_conversation
FROM requests_bindings
WHERE front_end_conversation = @front_end_conversation;
IF 0 = @@ROWCOUNT
BEGIN
BEGIN DIALOG CONVERSATION @back_end_conversation
FROM SERVICE [ms.com/Samples/Broker/PriorityService]
TO SERVICE N‘BackEndService’, N‘current database’
ON CONTRACT [RequestInternal]
WITH
RELATED_CONVERSATION = @front_end_conversation,
ENCRYPTION = OFF;
INSERT INTO requests_bindings
(front_end_conversation, back_end_conversation)
VALUES (@front_end_conversation, @back_end_conversation);
END
COMMIT;
GO
— this is the priority table for the back-end service
— the primary key constraint gives the dequeue order
— priority is descending (255 is max, 0 is min, 100 is default)
CREATE TABLE priority_table (
conversation_group UNIQUEIDENTIFIER UNIQUE,
priority TINYINT,
enqueue_time TIMESTAMP,
PRIMARY KEY CLUSTERED (priority DESC, enqueue_time ASC, conversation_group));
GO
— This is the stored proc for dequeuing the next
— conversation from the priority queue
— will retrieve the next available (unlocked) conversation group
CREATE PROCEDURE dequeue_priority (
@conversation_group UNIQUEIDENTIFIER OUTPUT)
AS
SET NOCOUNT ON;
BEGIN TRANSACTION;
SELECT @conversation_group = NULL;
DECLARE @cgt TABLE (conversation_group UNIQUEIDENTIFIER);
DELETE FROM priority_table WITH (READPAST)
OUTPUT DELETED.conversation_group INTO @cgt
WHERE conversation_group = (
SELECT TOP (1) conversation_group
FROM priority_table WITH (READPAST)
ORDER BY priority DESC, enqueue_time ASC)
SELECT @conversation_group = conversation_group
FROM @cgt;
COMMIT;
GO
— This is the stored proc for equeuing a new
— conversation from the priority queue
CREATE PROCEDURE enqueue_priority (
@conversation_group UNIQUEIDENTIFIER,
@priority TINYINT)
AS
SET NOCOUNT ON;
BEGIN TRANSACTION;
DELETE FROM priority_table
WHERE conversation_group = @conversation_group;
INSERT INTO priority_table
(conversation_group, priority)
VALUES (@conversation_group, @priority);
COMMIT;
GO
— the backend service procedure
CREATE PROCEDURE back_end_service
AS
SET NOCOUNT ON
DECLARE @dh UNIQUEIDENTIFIER;
DECLARE @cg UNIQUEIDENTIFIER
DECLARE @message_type_name SYSNAME;
DECLARE @message_body VARBINARY(MAX);
BEGIN TRANSACTION;
— dequeue o priority conversation_group,
— or wait from an unprioritized one from the queue
EXEC dequeue_priority @cg OUTPUT;
IF @cg IS NULL
BEGIN
WAITFOR (GET CONVERSATION GROUP @cg
FROM [BackEndQueue]), TIMEOUT 1000;
END
WHILE @cg IS NOT NULL
BEGIN
— We have a conversation group
— process all messages in this group
RECEIVE TOP (1)
@dh = conversation_handle,
@message_type_name = message_type_name,
@message_body = message_body
FROM [BackEndQueue]
WHERE conversation_group_id = @cg;
WHILE @dh IS NOT NULL
BEGIN
IF @message_type_name = N‘http://schemas.microsoft.com/SQL/ServiceBroker/EndDialog’
OR @message_type_name = N‘http://schemas.microsoft.com/SQL/ServiceBroker/Error’
BEGIN
— In a real app the Error message might need to be somehow handled, like logged
END CONVERSATION @dh;
END
ELSE IF @message_type_name = N‘LongWorkloadRequest’
BEGIN
— simulate a really lengthy worload. sleep for 2 seconds.
WAITFOR DELAY ’00:00:02′;
— send back the ‘result’ of the workload
— For our sample the result is simply the request wraped in <response> tag,
— decorated with the current time and @@spid attributes
DECLARE @result XML;
SELECT @result = (
SELECT
@@SPID as [@spid],
GETDATE() as [@time],
CAST(@message_body AS XML) AS [*]
FOR XML PATH (‘result’), TYPE);
SEND ON CONVERSATION @dh
MESSAGE TYPE [Response]
(@result);
END;
— In a real app we’d need to treat the ELSE case, when an unknown type message was received
SELECT @dh = NULL;
— get more messages on this conversation_group
RECEIVE TOP (1)
@dh = conversation_handle,
@message_type_name = message_type_name,
@message_body = message_body
FROM [BackEndQueue]
WHERE conversation_group_id = @cg;
END
— commit this transaction, then loop again
COMMIT;
BEGIN TRANSACTION;
SELECT @cg = NULL;
EXEC dequeue_priority @cg OUTPUT;
IF @cg IS NULL
BEGIN
WAITFOR (GET CONVERSATION GROUP @cg
FROM [BackEndQueue]), TIMEOUT 1000;
END
END
COMMIT;
GO
— the front end service procedure
CREATE PROCEDURE front_end_service
AS
SET NOCOUNT ON
DECLARE @dh UNIQUEIDENTIFIER;
DECLARE @bind_dh UNIQUEIDENTIFIER;
DECLARE @message_type_name SYSNAME;
DECLARE @message_body VARBINARY(MAX);
BEGIN TRANSACTION;
WAITFOR (RECEIVE TOP (1)
@dh = conversation_handle,
@message_type_name = message_type_name,
@message_body = message_body
FROM [FrontEndQueue]), TIMEOUT 1000;
WHILE @dh IS NOT NULL
BEGIN
IF @message_type_name = N‘http://schemas.microsoft.com/SQL/ServiceBroker/EndDialog’
BEGIN
— End the conversation on which the End was received,
— then end the conversation on the other side of the binding
END CONVERSATION @dh;
EXEC binding_get_peer @dh, @bind_dh OUTPUT;
IF @bind_dh IS NOT NULL
BEGIN
END CONVERSATION @bind_dh;
DELETE FROM requests_bindings
WHERE front_end_conversation = @dh
OR back_end_conversation = @dh;
END
END
ELSE IF @message_type_name = N‘http://schemas.microsoft.com/SQL/ServiceBroker/Error’
BEGIN
— End the conversation on which the Error was received,
— then forward the error to the conversation on the other side of the binding
END CONVERSATION @dh;
EXEC binding_get_peer @dh, @bind_dh OUTPUT;
IF @bind_dh IS NOT NULL
BEGIN
— Extract the error code and description from the error message body
DECLARE @error_number INT;
DECLARE @error_description NVARCHAR(4000);
DECLARE @error_message_body XML;
SELECT @error_message_body = CAST(@message_body AS XML);
WITH XMLNAMESPACES (DEFAULT ‘http://schemas.microsoft.com/SQL/ServiceBroker/Error’)
SELECT @error_number = @error_message_body.value (‘(/Error/Code)[1]’, ‘INT’),
@error_description = @error_message_body.value (‘(/Error/Description)[1]’, ‘NVARCHAR(4000)’);
IF (@error_number < 0 )
BEGIN
SELECT @error_number = –@error_number;
END
END CONVERSATION @bind_dh WITH
ERROR = @error_number
DESCRIPTION = @error_description;
DELETE FROM requests_bindings
WHERE front_end_conversation = @dh
OR back_end_conversation = @dh;
END
END
ELSE IF @message_type_name = N‘LongWorkloadRequest’
BEGIN
— forward the workload request to the back end service
EXEC binding_get_back_end @dh, @bind_dh OUTPUT;
SEND ON CONVERSATION @bind_dh
MESSAGE TYPE [LongWorkloadRequest]
(@message_body);
END
ELSE IF @message_type_name = N‘SetPriority’
BEGIN
— increase the priority of this conversation
— we need the target side conversation group
— of the back end conversation bound to @dh
DECLARE @cg UNIQUEIDENTIFIER;
SELECT @cg = tep.conversation_group_id
FROM sys.conversation_endpoints tep WITH (NOLOCK)
JOIN sys.conversation_endpoints iep WITH (NOLOCK) ON
tep.conversation_id = iep.conversation_id
AND tep.is_initiator = 0
AND iep.is_initiator = 1
JOIN requests_bindings rb ON
iep.conversation_handle = rb.back_end_conversation
WHERE
rb.front_end_conversation = @dh;
IF @cg IS NOT NULL
BEGIN
— retrieve the desired priority from the message body
DECLARE @priority TINYINT;
SELECT @priority = cast(@message_body as XML).value (N‘(/priority)[1]’, N‘TINYINT’);
EXEC enqueue_priority @cg, @priority;
END
END
ELSE IF @message_type_name = N‘Response’
BEGIN
— forward the workload response to the front end conversation
EXEC binding_get_peer @dh, @bind_dh OUTPUT;
SEND ON CONVERSATION @bind_dh
MESSAGE TYPE [Response]
(@message_body);
END
— commit this transaction, then loop again
COMMIT;
BEGIN TRANSACTION;
SELECT @dh = NULL, @bind_dh = NULL;
WAITFOR (RECEIVE TOP (1)
@dh = conversation_handle,
@message_type_name = message_type_name,
@message_body = message_body
FROM [FrontEndQueue]), TIMEOUT 1000;
END
COMMIT;
GO
ALTER QUEUE [FrontEndQueue]
WITH ACTIVATION (
STATUS = ON,
MAX_QUEUE_READERS = 10,
PROCEDURE_NAME = [front_end_service],
EXECUTE AS OWNER);
GO
ALTER QUEUE [BackEndQueue]
WITH ACTIVATION (
STATUS = ON,
MAX_QUEUE_READERS = 10,
PROCEDURE_NAME = [back_end_service],
EXECUTE AS OWNER);
GO
We can now go ahead and send some requests. We’ll also make that every 10th request to have a priority set:
use sample_priority_queuing;
go
CREATE QUEUE [SampleClient];
CREATE SERVICE [SampleClient] ON QUEUE [SampleClient];
GO
DECLARE @dh UNIQUEIDENTIFIER;
DECLARE @i INT;
SELECT @i = 0;
WHILE @i < 100
BEGIN
BEGIN TRANSACTION;
BEGIN DIALOG CONVERSATION @dh
FROM SERVICE [SampleClient]
TO SERVICE N‘ms.com/Samples/Broker/PriorityService’
ON CONTRACT [RequestWithPriority]
WITH ENCRYPTION = OFF;
DECLARE @request XML;
SELECT @request = (
SELECT GETDATE() AS [@time],
@@SPID AS [@spid],
@i
FOR XML PATH (‘request’), TYPE);
SEND ON CONVERSATION @dh
MESSAGE TYPE [LongWorkloadRequest]
(@request);
— Every 10 requests asks for a priority bump
IF 0 = (@I % 10)
BEGIN
DECLARE @priority XML;
SELECT @priority = (SELECT @i
FOR XML PATH (‘priority’), TYPE);
SEND ON CONVERSATION @dh
MESSAGE TYPE [SetPriority]
(@priority);
END
COMMIT;
SELECT @i = @i + 1;
END
GO
And now wait for the responses on our requests to come back:
SET NOCOUNT ON;
DECLARE @dh UNIQUEIDENTIFIER;
DECLARE @message_body NVARCHAR(4000);
BEGIN TRANSACTION
WAITFOR(RECEIVE
@dh = conversation_handle,
@message_body = cast(message_body as NVARCHAR(4000))
FROM [SampleClient]), TIMEOUT 10000;
WHILE @dh IS NOT NULL
BEGIN
END CONVERSATION @dh;
PRINT @message_body;
COMMIT;
SELECT @dh = NULL;
BEGIN TRANSACTION;
WAITFOR(RECEIVE
@dh = conversation_handle,
@message_body = cast(message_body as NVARCHAR(4000))
FROM [SampleClient]), TIMEOUT 10000;
END
COMMIT;
GO
We’ll see how the responses are coming back in the order of the priority (higher priority will be processed faster):
?<result spid=”…” time=”…20.547″><request time=”…16.453″ spid=”…”>0</request></result>
?<result spid=”…” time=”…23.030″><request time=”…19.860″ spid=”…”>80</request></result>
?<result spid=”…” time=”…23.563″><request time=”…19.890″ spid=”…”>90</request></result>
?<result spid=”…” time=”…25.033″><request time=”…18.813″ spid=”…”>70</request></result>
?<result spid=”…” time=”…25.580″><request time=”…18.783″ spid=”…”>60</request></result>
?<result spid=”…” time=”…27.047″><request time=”…18.737″ spid=”…”>50</request></result>
?<result spid=”…” time=”…27.580″><request time=”…17.690″ spid=”…”>40</request></result>
?<result spid=”…” time=”…29.047″><request time=”…17.657″ spid=”…”>30</request></result>
?<result spid=”…” time=”…29.593″><request time=”…17.610″ spid=”…”>20</request></result>
?<result spid=”…” time=”…31.050″><request time=”…17.563″ spid=”…”>10</request></result>
?<result spid=”…” time=”…31.597″><request time=”…16.487″ spid=”…”>1</request></result>
?<result spid=”…” time=”…33.050″><request time=”…16.487″ spid=”…”>2</request></result>
?<result spid=”…” time=”…33.597″><request time=”…16.500″ spid=”…”>3</request></result>
?<result spid=”…” time=”…35.063″><request time=”…16.517″ spid=”…”>4</request></result>
?<result spid=”…” time=”…35.610″><request time=”…16.533″ spid=”…”>5</request></result>
?<result spid=”…” time=”…37.063″><request time=”…16.533″ spid=”…”>6</request></result>
?<result spid=”…” time=”…37.610″><request time=”…17.550″ spid=”…”>7</request></result>
?<result spid=”…” time=”…39.063″><request time=”…17.550″ spid=”…”>8</request></result>
…
The first returned response is the one for the first request (0). This is because this is the first request that activated the back end service, and no other request existed at that moment in the queue. While this request was ‘processed’ (i.e. the service procedure was waiting for the 2 sends delay), many other requests were enqueued, some with priority set. The next one processed is the requests number 80, because it had the highest priority at the moment it was dequeued. After that, all requests were enqueued and they are processed in the decreasing order of priority (90, 70, 60, …), until all priority requests are processed. After that the processing resums with the normal, non-priority items, in the order arrived: (1, 2, 3, …).
So here it is, message processed in the desired priority order! QED.
An alternative to the front-end back-end service design
The disadvantage of the separation of the service in front-end part and back-end part is that the messages are being copied between the front-end service and back-end service. Normally, this is not a big deal, but if messages are significant in size (i.e. more than 1MB), this extra copy can become a bottleneck. An alternative is to split the service in front-end and back-end, but to deploy two distinct services in ‘parallel’. One is the real service, that does the work, and one is a control-service, that allows priorities to be set on the real service requests. Clients open conversations with the real service and send requests, as they’d normally would. If they need to set the priority of a request they need to open a separate dialog with the control service and send a [SetPriority] message, giving some cookie that identifies the real request conversation (e.g. the conversation_id). This approach eliminates the extra copy, but it has other draw backs:
– it opens a can of worms from the security point of view. How can the control-service validate that the SetPriority request for a given conversation comes from a valid party? On the front-end back-end design, the SetPriority can come ONLY on the conversation it tries to change the priority, so this issue doesn’t exists.
– Is MUCH more difficult for the client application to program against. Instead of simply sending another message on the existing dialog, it opens a separate dialog. For one, it must ensure that the new dialog lands on the same broker instance as the real work dialog! Also, the entire infrastructure required (routes, certificates, remote service bindings, permissions) has just doubled.
– The workload requests and SetPriority requests have just lost any correlation on the delivery order. The client has to ensure that the workload request has first arrived at the target, before attempting to send the SetPriority message. While this is possible (by looking into it’s own sys.transmission_queue), is just messy and error prone.
I’d recommend against this approach, the problems it opens are much bigger than the benefit.