In my previous post Reusing Conversations I promised I’ll follow up with a solution to the question about how to end the conversations that are reused for the data-push scenario (logging, auditing, ETL for DW etc).
First of all, why end them in the first place? The answer is to minimize exposure in case of problems. If a problem happens and a dialog has to be destroyed (i.e. errored), then we want to minimize the potential number of messages that we have to re-process or that are potential lost. But wait a minute, how come we’re talking about message loss, doesn’t Service Broker guarantee delivery? The truth is that no system can ever guarantee delivery under all conditions. Human error can happen resulting in misconfiguration, applications have bugs that can send invalid messages (e.g. fail XML validation); hardware can be catastrophically lost w/o any possibility to restore a database/host and so on and so forth. Reusing a single dialog to send all your messages (or one per @@spid) is like putting all your eggs in one basket. If that dialog was trailing millions of messages behind, all those messages have to be reprocessed or are lost.
Another reason to recycle the dialogs is to allow RETENTION on queues. Enabling RETENTION keeps a copy of every message sent to/from that queue for the duration of the dialog. If a dialog is kept open for unlimited duration, the queue will continuously grow.
Also we want to recycle dialogs to deal with load balancing scenarios, since a dialog is sticky to one service instance we would like to periodically start new dialogs so they ‘stick’ to new instances of from the available target service instances.
So how should we end this dialogs? My recommendation is to end them based on time. Choose a time for how long a dialog should be used, depending on the expected number of messages traffic. Then you can use the Service Broker conversation timers to end the dialog when it’s time is done. I also believe that the timer should be handled by the initiator, but the conversation should be ended by the target. Given that the pattern we’re talking about (one way data push) involves sending only messages from initiator to target, the initiator is not allowed to end the conversation because that will create a fire-and-forget message pattern, a very undesirable message exchange pattern, see http://rusanu.com/2006/04/06/fire-and-forget-good-for-the-military-but-not-for-service-broker-conversations/. My recommendation is to have a special message in the contract used by the two services that informs that target that the initiator is done sending on this dialog. Lets call this message [EndOfStream]. The target would respond to this message by ending its side of the conversation, thus sending an EndDialog message to the initiator. The initiator responds to this message by ending its side (issuing the END CONVERSATION verb).
To implement what I described above we would need to modify the original example from http://rusanu.com/2007/04/25/reusing-conversations/ to create a conversation timer when the dialog is created and to add an activated procedure that handles the timer message, as well as the EndDialog and Error messages. One interesting issue is how to handle the timer message. After sending the [EndOfStream] message, the initiator should not send any more messages on that dialog. This is easily achieved by removing the dialog handle from the [SessionConversations] table. The next invocation of usp_Send will create a new one, just as desired. But there is a race condition there, an usp_Send procedure might be executing already and trying to send a message on that conversation. Even though the dialog was removed from the [SessionConversations] table, the concurrent usp_Send will successfully send on the dialog that just sent an [EndOfStream] message. Since the target responds to the [EndOfstream] message by ending the conversation, the message sent is most likely lost as the target will ignore it! To solve this situation, the usp_Send procedure must guarantee the stability of the conversation it looked up in the [SessionConversations] table. So the SELECT from the [SessionConversations] table has to be REPEATABLE, and this can be achieved with an appropriate query hint (HOLDLOCK). Update: Several readers have deployed this into production and found out that HOLDLOCK is not enough and UPDLOCK behaves better. The code is updated to reflect this.
But by doing this we introduce a new problem: the activated procedure handling the timer message and the usp_Send procedure might deadlock each other! One will hold a lock on the conversation and try to update the [SessionConversations] table, while the other will hold the lock on the [SessionConversations] table and try to acquire the lock on the conversation: guaranteed deadlock. To solve this problem, will solve it as most deadlock problems are solved: acquire the locks in the same order in both procedures. Since the usp_Send must acquire the locks in the order row-in-[SessionConversations]-table-first-conversation-next, the activated procedure must do the same. That is: delete the row from [SessionConversations] first, send the [EndOfStream] message next.
So here is the code, the modified usp_Send and the initiator side activated procedure.
-- This table associates the current connection (@@SPID) -- with a conversation. The association key also -- contains the conversation parameteres: -- from service, to service, contract used -- CREATE TABLE [SessionConversations] ( SPID INT NOT NULL , FromService SYSNAME NOT NULL , ToService SYSNAME NOT NULL , OnContract SYSNAME NOT NULL , Handle UNIQUEIDENTIFIER NOT NULL , PRIMARY KEY (SPID, FromService, ToService, OnContract) , UNIQUE (Handle)); GO -- SEND procedure. Will lookup to reuse an existing conversation -- or start a new in case no conversation exists or the conversation -- cannot be used -- CREATE PROCEDURE [usp_Send] ( @fromService SYSNAME, @toService SYSNAME, @onContract SYSNAME, @messageType SYSNAME, @messageBody VARBINARY(MAX)) AS BEGIN SET NOCOUNT ON; DECLARE @handle UNIQUEIDENTIFIER; DECLARE @counter INT; DECLARE @error INT; SELECT @counter = 1; BEGIN TRANSACTION; -- Will need a loop to retry in case the conversation is -- in a state that does not allow transmission -- WHILE (1=1) BEGIN -- Seek an eligible conversation in [SessionConversations] -- SELECT @handle = Handle FROM [SessionConversations] WITH (UPDLOCK) WHERE SPID = @@SPID AND FromService = @fromService AND ToService = @toService AND OnContract = @OnContract; IF @handle IS NULL BEGIN -- Need to start a new conversation for the current @@spid -- BEGIN DIALOG CONVERSATION @handle FROM SERVICE @fromService TO SERVICE @toService ON CONTRACT @onContract WITH ENCRYPTION = OFF; -- Set an one hour timer on the conversation -- BEGIN CONVERSATION TIMER (@handle) TIMEOUT = 3600; INSERT INTO [SessionConversations] (SPID, FromService, ToService, OnContract, Handle) VALUES (@@SPID, @fromService, @toService, @onContract, @handle); END; -- Attempt to SEND on the associated conversation -- SEND ON CONVERSATION @handle MESSAGE TYPE @messageType (@messageBody); SELECT @error = @@ERROR; IF @error = 0 BEGIN -- Successful send, just exit the loop -- BREAK; END SELECT @counter = @counter+1; IF @counter > 10 BEGIN -- We failed 10 times in a row, something must be broken -- RAISERROR ( N'Failed to SEND on a conversation for more than 10 times. Error %i.' , 16, 1, @error) WITH LOG; BREAK; END -- Delete the associated conversation from the table and try again -- DELETE FROM [SessionConversations] WHERE Handle = @handle; SELECT @handle = NULL; END COMMIT; END GO CREATE PROCEDURE usp_SenderActivation AS BEGIN DECLARE @handle UNIQUEIDENTIFIER; DECLARE @messageTypeName SYSNAME; DECLARE @messageBody VARBINARY(MAX); BEGIN TRANSACTION; RECEIVE TOP(1) @handle = conversation_handle, @messageTypeName = message_type_name, @messageBody = message_body FROM [sender_queue]; IF @handle IS NOT NULL BEGIN -- Delete the message from the [SessionConversations] table -- before sending the [EndOfStream] message. The order is -- important to avoid deadlocks. Strictly speaking, we should -- only delete if the message type is timer or error, but is -- simpler and safer to just delete always -- DELETE FROM [SessionConversations] WHERE [Handle] = @handle; IF @messageTypeName = N'http://schemas.microsoft.com/SQL/ServiceBroker/DialogTimer' BEGIN SEND ON CONVERSATION @handle MESSAGE TYPE [EndOfStream]; END ELSE IF @messageTypeName = N'http://schemas.microsoft.com/SQL/ServiceBroker/EndDialog' BEGIN END CONVERSATION @handle; END ELSE IF @messageTypeName = N'http://schemas.microsoft.com/SQL/ServiceBroker/Error' BEGIN END CONVERSATION @handle; -- Insert your error handling here. Could send a notification or -- store the error in a table for further inspection -- We’re gonna log the error into the ERRORLOG and Eventvwr.exe -- DECLARE @error INT; DECLARE @description NVARCHAR(4000); WITH XMLNAMESPACES ('http://schemas.microsoft.com/SQL/ServiceBroker/Error' AS ssb) SELECT @error = CAST(@messageBody AS XML).value( '(//ssb:Error/ssb:Code)', 'INT'), @description = CAST(@messageBody AS XML).value( '(//ssb:Error/ssb:Description)', 'NVARCHAR(4000)') RAISERROR(N'Received error Code:%i Description:"%s"', 16, 1, @error, @description) WITH LOG; END; END COMMIT; END GO -- attach the uspSenderActivation procedure with the sender’s queue -- ALTER QUEUE [sender_queue] WITH ACTIVATION ( STATUS = ON, MAX_QUEUE_READERS = 1, PROCEDURE_NAME = [usp_SenderActivation], EXECUTE AS OWNER); GO