Processing conversation with priority order

March 28th, 2006

One of the most frequent questions about Service Broker is whether it supports any sort of priority for messages. But having priority within a conversation would conflict directly with the exactly-once-in-order guarantee of a conversation. In a good SOA design the two services involved in a conversation are supposed to be independent: separate developers, separate orgs, separate admins etc. If one service can set the priority of an individual message w/o the consent of the other, this action could wreck havoc on how the messages are processed, since the other service may expect the messages in order. A different story though is to have priority for individual conversations. Conversations are supposed to be independent and atomic; the processing order should not matter so having the possibility of setting a priority for a conversation makes sense. Roger has recently addressed the same problem and he already has a couple of posts on the topic: http://blogs.msdn.com/rogerwolterblog/archive/2006/03/11/549730.aspx and http://blogs.msdn.com/rogerwolterblog/archive/2006/03/17/554134.aspx

My idea of how to implement priority is different that Roger’s. My solution apply to the case when there are messages that take a long time to be processed, so the service queue will accumulate a long lists of pending messages during peak hours, and will (slowly?) make progress through the queue, draining it empty at off hours. Perhaps the admin of the service would like a way to promote certain conversations to be processed sooner, or the requesting service might want to send a message asking for a higher priority on a given conversation.

Promote pending conversation to the front of the queue

In principle the answer is very simple: the RECEIVE verb has WHERE clause that can be used to specify the conversation desired. So what you need is a way to pass the conversation IDs and the desired priority in an out of band fashion (out of band relatively to the message queue and the processing service). This out of band fashion can vary broadly:

if the processing service is an application running on a computer in front of an human operator, the User Interface can contain a list of pending conversations and the operator can drag-and-drop list items to establish the desired priority

the processing service can run a priority algorithm each time to decide which is the most urgent conversation to process next (the solution in Roger’s blog)

the processing service looks up a priority table to retrieve the next conversation to be processed

BTW, in practice you will quickly notice that you want to control the priority of conversation groups not of individual conversations. This is because the RECEIVE verb locks an entire group and naturally returns all messages for an entire group in one result set. In the case when you don’t use related conversations, then groups map one to one to individual conversations and it makes no difference. If you do use related conversations, I really cannot see a situation when one priority is desired for a conversation in the group and another priority for another conversation in the same group.

Technically, the solution to look up a priority table is fairly easy to implement. Simply change the typical WAITFOR(RECEIVE…) loop into a loop that first checks the priority table:

BEGIN TRANSACTION

SELECT TOP 1 @cg = conversation_group

FROM priority_table

ORDER BY [priority] ;

IF @cg IS NULL

WAITFOR (GET CONVERSATION GROUP @cg FROM [queue]);

WHILE @cg IS NOT NULL

BEGIN

RECEIVE FROM [queue] WHERE conversation_group = @cg;

— process messages

COMMIT;

BEGIN TRANSACTION;

SELECT TOP 1 @cg = conversation_group

FROM priority_table

ORDER BY [priority];

IF @cg IS NULL

WAITFOR (GET CONVERSATION GROUP @cg FROM [queue]);

END

COMMIT;

The exact semantics of how the priority_table is processed depends on the service semantics. I believe the most rationale semantic would be remove the conversation_group retrieved from this table after is processed, like this (the unchanged code is grayed out):

BEGIN TRANSACTION

SELECT TOP 1 @cg = conversation_group

FROM priority_table

ORDER BY [priority] ;

IF @cg IS NULL

WAITFOR (GET CONVERSATION GROUP @cg FROM [queue]);

WHILE @cg IS NOT NULL

BEGIN

RECEIVE … FROM [queue] WHERE conversation_group = @cg;

— process messages

— always delete the currently processed @cg from the priority table

DELETE FROM priority_table

WHERE conversation_group = @cg;

COMMIT;

BEGIN TRANSACTION;

SELECT TOP 1 @cg = conversation_group

FROM priority_table

ORDER BY [priority];

IF @cg IS NULL

WAITFOR (GET CONVERSATION GROUP @cg FROM [queue]);

END

COMMIT;

The inserts into the priority_table can be managed by an external admin tool or by the service itself (see below).

Offering priority processing on the service interface itself

How about the case when the priority can be asked for by the remote side of a conversation? Ideally a message could be sent on an existing conversation, and the result of the message would be that the pending work in the service queue would be bumped into the top of the work queue. The idea is fairly simple: a new message type (say [SetPriority]) is added to the service contract and the service itself processes the [SetPriority] by inserting the conversation into the priority_table (again, the irrelevant part is grayed out):

BEGIN TRANSACTION

SELECT TOP 1 @cg = conversation_group

FROM priority_table

ORDER BY [priority] ;

IF @cg IS NULL

WAITFOR (GET CONVERSATION GROUP @cg FROM [queue]);

WHILE @cg IS NOT NULL

BEGIN

RECEIVE @message_type_name = message_type_name,

FROM [queue] WHERE conversation_group = @cg;

IF @message_type = N‘SetPriority’

BEGIN

INSERT INTO priority_table (conversation_group, priority)

VALUES (@cg, @default_priority);

END

ELSE

BEGIN

— process other messages

— always delete the currently processed @cg from the priority table

DELETE FROM priority_table

WHERE conversation_group = @cg;

END

COMMIT;

BEGIN TRANSACTION;

SELECT TOP 1 @cg = conversation_group

FROM priority_table

ORDER BY [priority];

IF @cg IS NULL

WAITFOR (GET CONVERSATION GROUP @cg FROM [queue]);

END

COMMIT;

But there is one problem with this simple approach: the service will not get to process this message, simply because the message will be at the bottom of the service queue! Apparently, we have a Catch-22 problem here. My solution to this problem is to split the service into to services: a front end service, to which the peers are opening conversations, and an internal service, that does the real, long timed, processing.

Now here is the actual code for the front end and back end services.

— Create the database for the sample

create database sample_priority_queuing

go

use sample_priority_queuing;

go

———————————————————————–

— Create the message types for the sample

———————————————————————–

CREATE MESSAGE TYPE [LongWorkloadRequest] VALIDATION = WELL_FORMED_XML;

CREATE MESSAGE TYPE [SetPriority] VALIDATION = WELL_FORMED_XML;

CREATE MESSAGE TYPE [Response] VALIDATION = WELL_FORMED_XML;

GO

— This is the contract used by service clients

CREATE CONTRACT [RequestWithPriority] (

[LongWorkloadRequest] SENT BY INITIATOR,

[SetPriority] SENT BY INITIATOR,

[Response] SENT BY TARGET);

GO

— This is a stripped down version of the contract,

— used only by the internal back end service

CREATE CONTRACT [RequestInternal] (

[LongWorkloadRequest] SENT BY INITIATOR,

[Response] SENT BY TARGET);

GO

CREATE QUEUE FrontEndQueue;

CREATE QUEUE BackEndQueue;

GO

— the front end service

CREATE SERVICE [ms.com/Samples/Broker/PriorityService]

ON QUEUE [FrontEndQueue]

([RequestWithPriority]);

— the back end service

CREATE SERVICE [BackEndService]

ON QUEUE [BackEndQueue]

([RequestInternal]);

GO

— this table links incomming requests to the fron end

— to the back end service

CREATE TABLE requests_bindings (

front_end_conversation UNIQUEIDENTIFIER PRIMARY KEY,

back_end_conversation UNIQUEIDENTIFIER UNIQUE);

GO

———————————————————

— Procedure for retrieving the peer conversation from

— the bindings table. Will retrieve front_end_conversation

— from back_end_conversation and vice-versa

———————————————————

CREATE PROCEDURE binding_get_peer (

@conversation UNIQUEIDENTIFIER,

@peer UNIQUEIDENTIFIER OUTPUT)

AS

SET NOCOUNT ON;

SELECT @peer = (

SELECT

back_end_conversation

FROM requests_bindings

WHERE front_end_conversation = @conversation

UNION ALL

SELECT front_end_conversation

FROM requests_bindings

WHERE back_end_conversation = @conversation)

IF 0 = @@ROWCOUNT

BEGIN

SELECT @peer = NULL;

END

GO

———————————————————

— Procedure for retrieving a back_end_conversation for

— a fron end conversation. Will initiate a new conversation

— if one does not already exists

———————————————————

CREATE PROCEDURE binding_get_back_end (

@front_end_conversation UNIQUEIDENTIFIER,

@back_end_conversation UNIQUEIDENTIFIER OUTPUT)

AS

SET NOCOUNT ON;

BEGIN TRANSACTION;

SELECT @back_end_conversation = back_end_conversation

FROM requests_bindings

WHERE front_end_conversation = @front_end_conversation;

IF 0 = @@ROWCOUNT

BEGIN

BEGIN DIALOG CONVERSATION @back_end_conversation

FROM SERVICE [ms.com/Samples/Broker/PriorityService]

TO SERVICE N‘BackEndService’, N‘current database’

ON CONTRACT [RequestInternal]

WITH

RELATED_CONVERSATION = @front_end_conversation,

ENCRYPTION = OFF;

INSERT INTO requests_bindings

(front_end_conversation, back_end_conversation)

VALUES (@front_end_conversation, @back_end_conversation);

END

COMMIT;

GO

— this is the priority table for the back-end service

— the primary key constraint gives the dequeue order

— priority is descending (255 is max, 0 is min, 100 is default)

CREATE TABLE priority_table (

conversation_group UNIQUEIDENTIFIER UNIQUE,

priority TINYINT,

enqueue_time TIMESTAMP,

PRIMARY KEY CLUSTERED (priority DESC, enqueue_time ASC, conversation_group));

GO

— This is the stored proc for dequeuing the next

— conversation from the priority queue

— will retrieve the next available (unlocked) conversation group

CREATE PROCEDURE dequeue_priority (

@conversation_group UNIQUEIDENTIFIER OUTPUT)

AS

SET NOCOUNT ON;

BEGIN TRANSACTION;

SELECT @conversation_group = NULL;

DECLARE @cgt TABLE (conversation_group UNIQUEIDENTIFIER);

DELETE FROM priority_table WITH (READPAST)

OUTPUT DELETED.conversation_group INTO @cgt

WHERE conversation_group = (

SELECT TOP (1) conversation_group

FROM priority_table WITH (READPAST)

ORDER BY priority DESC, enqueue_time ASC)

SELECT @conversation_group = conversation_group

FROM @cgt;

COMMIT;

GO

— This is the stored proc for equeuing a new

— conversation from the priority queue

CREATE PROCEDURE enqueue_priority (

@conversation_group UNIQUEIDENTIFIER,

@priority TINYINT)

AS

SET NOCOUNT ON;

BEGIN TRANSACTION;

DELETE FROM priority_table

WHERE conversation_group = @conversation_group;

INSERT INTO priority_table

(conversation_group, priority)

VALUES (@conversation_group, @priority);

COMMIT;

GO

— the backend service procedure

CREATE PROCEDURE back_end_service

AS

SET NOCOUNT ON

DECLARE @dh UNIQUEIDENTIFIER;

DECLARE @cg UNIQUEIDENTIFIER

DECLARE @message_type_name SYSNAME;

DECLARE @message_body VARBINARY(MAX);

BEGIN TRANSACTION;

— dequeue o priority conversation_group,

— or wait from an unprioritized one from the queue

EXEC dequeue_priority @cg OUTPUT;

IF @cg IS NULL

BEGIN

WAITFOR (GET CONVERSATION GROUP @cg

FROM [BackEndQueue]), TIMEOUT 1000;

END

WHILE @cg IS NOT NULL

BEGIN

— We have a conversation group

— process all messages in this group

RECEIVE TOP (1)

@dh = conversation_handle,

@message_type_name = message_type_name,

@message_body = message_body

FROM [BackEndQueue]

WHERE conversation_group_id = @cg;

WHILE @dh IS NOT NULL

BEGIN

IF @message_type_name = N‘http://schemas.microsoft.com/SQL/ServiceBroker/EndDialog’

OR @message_type_name = N‘http://schemas.microsoft.com/SQL/ServiceBroker/Error’

BEGIN

— In a real app the Error message might need to be somehow handled, like logged

END CONVERSATION @dh;

END

ELSE IF @message_type_name = N‘LongWorkloadRequest’

BEGIN

— simulate a really lengthy worload. sleep for 2 seconds.

WAITFOR DELAY ’00:00:02′;

— send back the ‘result’ of the workload

— For our sample the result is simply the request wraped in <response> tag,

— decorated with the current time and @@spid attributes

DECLARE @result XML;

SELECT @result = (

SELECT

@@SPID as [@spid],

GETDATE() as [@time],

CAST(@message_body AS XML) AS [*]

FOR XML PATH (‘result’), TYPE);

SEND ON CONVERSATION @dh

MESSAGE TYPE [Response]

(@result);

END;

— In a real app we’d need to treat the ELSE case, when an unknown type message was received

SELECT @dh = NULL;

— get more messages on this conversation_group

RECEIVE TOP (1)

@dh = conversation_handle,

@message_type_name = message_type_name,

@message_body = message_body

FROM [BackEndQueue]

WHERE conversation_group_id = @cg;

END

— commit this transaction, then loop again

COMMIT;

BEGIN TRANSACTION;

SELECT @cg = NULL;

EXEC dequeue_priority @cg OUTPUT;

IF @cg IS NULL

BEGIN

WAITFOR (GET CONVERSATION GROUP @cg

FROM [BackEndQueue]), TIMEOUT 1000;

END

END

COMMIT;

GO

— the front end service procedure

CREATE PROCEDURE front_end_service

AS

SET NOCOUNT ON

DECLARE @dh UNIQUEIDENTIFIER;

DECLARE @bind_dh UNIQUEIDENTIFIER;

DECLARE @message_type_name SYSNAME;

DECLARE @message_body VARBINARY(MAX);

BEGIN TRANSACTION;

WAITFOR (RECEIVE TOP (1)

@dh = conversation_handle,

@message_type_name = message_type_name,

@message_body = message_body

FROM [FrontEndQueue]), TIMEOUT 1000;

WHILE @dh IS NOT NULL

BEGIN

IF @message_type_name = N‘http://schemas.microsoft.com/SQL/ServiceBroker/EndDialog’

BEGIN

— End the conversation on which the End was received,

— then end the conversation on the other side of the binding

END CONVERSATION @dh;

EXEC binding_get_peer @dh, @bind_dh OUTPUT;

IF @bind_dh IS NOT NULL

BEGIN

END CONVERSATION @bind_dh;

DELETE FROM requests_bindings

WHERE front_end_conversation = @dh

OR back_end_conversation = @dh;

END

END

ELSE IF @message_type_name = N‘http://schemas.microsoft.com/SQL/ServiceBroker/Error’

BEGIN

— End the conversation on which the Error was received,

— then forward the error to the conversation on the other side of the binding

END CONVERSATION @dh;

EXEC binding_get_peer @dh, @bind_dh OUTPUT;

IF @bind_dh IS NOT NULL

BEGIN

— Extract the error code and description from the error message body

DECLARE @error_number INT;

DECLARE @error_description NVARCHAR(4000);

DECLARE @error_message_body XML;

SELECT @error_message_body = CAST(@message_body AS XML);

WITH XMLNAMESPACES (DEFAULT ‘http://schemas.microsoft.com/SQL/ServiceBroker/Error’)

SELECT @error_number = @error_message_body.value (‘(/Error/Code)[1]’, ‘INT’),

@error_description = @error_message_body.value (‘(/Error/Description)[1]’, ‘NVARCHAR(4000)’);

IF (@error_number < 0 )

BEGIN

SELECT @error_number = @error_number;

END

END CONVERSATION @bind_dh WITH

ERROR = @error_number

DESCRIPTION = @error_description;

DELETE FROM requests_bindings

WHERE front_end_conversation = @dh

OR back_end_conversation = @dh;

END

END

ELSE IF @message_type_name = N‘LongWorkloadRequest’

BEGIN

— forward the workload request to the back end service

EXEC binding_get_back_end @dh, @bind_dh OUTPUT;

SEND ON CONVERSATION @bind_dh

MESSAGE TYPE [LongWorkloadRequest]

(@message_body);

END

ELSE IF @message_type_name = N‘SetPriority’

BEGIN

— increase the priority of this conversation

— we need the target side conversation group

— of the back end conversation bound to @dh

DECLARE @cg UNIQUEIDENTIFIER;

SELECT @cg = tep.conversation_group_id

FROM sys.conversation_endpoints tep WITH (NOLOCK)

JOIN sys.conversation_endpoints iep WITH (NOLOCK) ON

tep.conversation_id = iep.conversation_id

AND tep.is_initiator = 0

AND iep.is_initiator = 1

JOIN requests_bindings rb ON

iep.conversation_handle = rb.back_end_conversation

WHERE

rb.front_end_conversation = @dh;

IF @cg IS NOT NULL

BEGIN

— retrieve the desired priority from the message body

DECLARE @priority TINYINT;

SELECT @priority = cast(@message_body as XML).value (N‘(/priority)[1]’, N‘TINYINT’);

EXEC enqueue_priority @cg, @priority;

END

END

ELSE IF @message_type_name = N‘Response’

BEGIN

— forward the workload response to the front end conversation

EXEC binding_get_peer @dh, @bind_dh OUTPUT;

SEND ON CONVERSATION @bind_dh

MESSAGE TYPE [Response]

(@message_body);

END

— commit this transaction, then loop again

COMMIT;

BEGIN TRANSACTION;

SELECT @dh = NULL, @bind_dh = NULL;

WAITFOR (RECEIVE TOP (1)

@dh = conversation_handle,

@message_type_name = message_type_name,

@message_body = message_body

FROM [FrontEndQueue]), TIMEOUT 1000;

END

COMMIT;

GO

ALTER QUEUE [FrontEndQueue]

WITH ACTIVATION (

STATUS = ON,

MAX_QUEUE_READERS = 10,

PROCEDURE_NAME = [front_end_service],

EXECUTE AS OWNER);

GO

ALTER QUEUE [BackEndQueue]

WITH ACTIVATION (

STATUS = ON,

MAX_QUEUE_READERS = 10,

PROCEDURE_NAME = [back_end_service],

EXECUTE AS OWNER);

GO

We can now go ahead and send some requests. We’ll also make that every 10th request to have a priority set:

use sample_priority_queuing;

go

CREATE QUEUE [SampleClient];

CREATE SERVICE [SampleClient] ON QUEUE [SampleClient];

GO

DECLARE @dh UNIQUEIDENTIFIER;

DECLARE @i INT;

SELECT @i = 0;

WHILE @i < 100

BEGIN

BEGIN TRANSACTION;

BEGIN DIALOG CONVERSATION @dh

FROM SERVICE [SampleClient]

TO SERVICE N‘ms.com/Samples/Broker/PriorityService’

ON CONTRACT [RequestWithPriority]

WITH ENCRYPTION = OFF;

DECLARE @request XML;

SELECT @request = (

SELECT GETDATE() AS [@time],

@@SPID AS [@spid],

@i

FOR XML PATH (‘request’), TYPE);

SEND ON CONVERSATION @dh

MESSAGE TYPE [LongWorkloadRequest]

(@request);

— Every 10 requests asks for a priority bump

IF 0 = (@I % 10)

BEGIN

DECLARE @priority XML;

SELECT @priority = (SELECT @i

FOR XML PATH (‘priority’), TYPE);

SEND ON CONVERSATION @dh

MESSAGE TYPE [SetPriority]

(@priority);

END

COMMIT;

SELECT @i = @i + 1;

END

GO

And now wait for the responses on our requests to come back:

SET NOCOUNT ON;

DECLARE @dh UNIQUEIDENTIFIER;

DECLARE @message_body NVARCHAR(4000);

BEGIN TRANSACTION

WAITFOR(RECEIVE

@dh = conversation_handle,

@message_body = cast(message_body as NVARCHAR(4000))

FROM [SampleClient]), TIMEOUT 10000;

WHILE @dh IS NOT NULL

BEGIN

END CONVERSATION @dh;

PRINT @message_body;

COMMIT;

SELECT @dh = NULL;

BEGIN TRANSACTION;

WAITFOR(RECEIVE

@dh = conversation_handle,

@message_body = cast(message_body as NVARCHAR(4000))

FROM [SampleClient]), TIMEOUT 10000;

END

COMMIT;

GO

We’ll see how the responses are coming back in the order of the priority (higher priority will be processed faster):

?<result spid=”…” time=”…20.547″><request time=”…16.453″ spid=”…”>0</request></result>

?<result spid=”…” time=”…23.030″><request time=”…19.860″ spid=”…”>80</request></result>

?<result spid=”…” time=”…23.563″><request time=”…19.890″ spid=”…”>90</request></result>

?<result spid=”…” time=”…25.033″><request time=”…18.813″ spid=”…”>70</request></result>

?<result spid=”…” time=”…25.580″><request time=”…18.783″ spid=”…”>60</request></result>

?<result spid=”…” time=”…27.047″><request time=”…18.737″ spid=”…”>50</request></result>

?<result spid=”…” time=”…27.580″><request time=”…17.690″ spid=”…”>40</request></result>

?<result spid=”…” time=”…29.047″><request time=”…17.657″ spid=”…”>30</request></result>

?<result spid=”…” time=”…29.593″><request time=”…17.610″ spid=”…”>20</request></result>

?<result spid=”…” time=”…31.050″><request time=”…17.563″ spid=”…”>10</request></result>

?<result spid=”…” time=”…31.597″><request time=”…16.487″ spid=”…”>1</request></result>

?<result spid=”…” time=”…33.050″><request time=”…16.487″ spid=”…”>2</request></result>

?<result spid=”…” time=”…33.597″><request time=”…16.500″ spid=”…”>3</request></result>

?<result spid=”…” time=”…35.063″><request time=”…16.517″ spid=”…”>4</request></result>

?<result spid=”…” time=”…35.610″><request time=”…16.533″ spid=”…”>5</request></result>

?<result spid=”…” time=”…37.063″><request time=”…16.533″ spid=”…”>6</request></result>

?<result spid=”…” time=”…37.610″><request time=”…17.550″ spid=”…”>7</request></result>

?<result spid=”…” time=”…39.063″><request time=”…17.550″ spid=”…”>8</request></result>

The first returned response is the one for the first request (0). This is because this is the first request that activated the back end service, and no other request existed at that moment in the queue. While this request was ‘processed’ (i.e. the service procedure was waiting for the 2 sends delay), many other requests were enqueued, some with priority set. The next one processed is the requests number 80, because it had the highest priority at the moment it was dequeued. After that, all requests were enqueued and they are processed in the decreasing order of priority (90, 70, 60, …), until all priority requests are processed. After that the processing resums with the normal, non-priority items, in the order arrived: (1, 2, 3, …).

So here it is, message processed in the desired priority order! QED.

An alternative to the front-end back-end service design

The disadvantage of the separation of the service in front-end part and back-end part is that the messages are being copied between the front-end service and back-end service. Normally, this is not a big deal, but if messages are significant in size (i.e. more than 1MB), this extra copy can become a bottleneck. An alternative is to split the service in front-end and back-end, but to deploy two distinct services in ‘parallel’. One is the real service, that does the work, and one is a control-service, that allows priorities to be set on the real service requests. Clients open conversations with the real service and send requests, as they’d normally would. If they need to set the priority of a request they need to open a separate dialog with the control service and send a [SetPriority] message, giving some cookie that identifies the real request conversation (e.g. the conversation_id). This approach eliminates the extra copy, but it has other draw backs:

it opens a can of worms from the security point of view. How can the control-service validate that the SetPriority request for a given conversation comes from a valid party? On the front-end back-end design, the SetPriority can come ONLY on the conversation it tries to change the priority, so this issue doesn’t exists.

Is MUCH more difficult for the client application to program against. Instead of simply sending another message on the existing dialog, it opens a separate dialog. For one, it must ensure that the new dialog lands on the same broker instance as the real work dialog! Also, the entire infrastructure required (routes, certificates, remote service bindings, permissions) has just doubled.

The workload requests and SetPriority requests have just lost any correlation on the delivery order. The client has to ensure that the workload request has first arrived at the target, before attempting to send the SetPriority message. While this is possible (by looking into it’s own sys.transmission_queue), is just messy and error prone.

I’d recommend against this approach, the problems it opens are much bigger than the benefit.

Comments are closed.