How to Multicast messages with SQL Server Service Broker
July 20th, 2011Starting with SQL Server 11 the the SEND verb has a new syntax and accepts multiple dialogs handles to send on:
SEND
ON CONVERSATION [(]conversation_handle [,.. @conversation_handle_n][)]
[ MESSAGE TYPE message_type_name ]
[ ( message_body_expression ) ]
[ ; ]
With this syntax enhancement you can send a message to multiple destinations. This is not different from sending the same message multiple times. From the application point of view issuing one single SEND on 10 dialog handles is exactly the same as issuing 10 SEND statements on one dialog handle at a time. The improvement is in the sys.transmission_queue: issuing SEND multiple time would create multiple copies of the message body to be sent. By contrast the one single SEND on multiple handles will only store the message body once. We can see this if we look at the definition of sys.tranmission_queue in SQL Server 11:
sp_helptext 'sys.transmission_queue'
CREATE VIEW sys.transmission_queue AS
SELECT conversation_handle = S.handle,
to_service_name = Q.tosvc,
to_broker_instance = Q.tobrkrinst,
from_service_name = Q.fromsvc,
service_contract_name = Q.svccontr,
enqueue_time = Q.enqtime,
message_sequence_number = Q.msgseqnum,
message_type_name = Q.msgtype,
is_conversation_error = sysconv(bit, Q.status & 2),
is_end_of_dialog = sysconv(bit, Q.status & 4),
message_body = ISNULL(Q.msgbody, B.msgbody),
transmission_status = GET_TRANSMISSION_STATUS (S.handle),
priority = R.priority
FROM sys.sysxmitqueue Q
LEFT JOIN sys.sysxmitbody B WITH (NOLOCK) ON Q.msgref = B.msgref
INNER JOIN sys.sysdesend S WITH (NOLOCK)
ON Q.dlgid = S.diagid AND Q.finitiator = S.initiator
INNER JOIN sys.sysdercv R WITH (NOLOCK)
ON Q.dlgid = R.diagid AND Q.finitiator = R.initiator
WHERE is_member('db_owner') = 1
Compare this with the same view definition in SQL Server 2008 R2:
CREATE VIEW sys.transmission_queue AS
SELECT conversation_handle = S.handle,
to_service_name = Q.tosvc,
to_broker_instance = Q.tobrkrinst,
from_service_name = Q.fromsvc,
service_contract_name = Q.svccontr,
enqueue_time = Q.enqtime,
message_sequence_number = Q.msgseqnum,
message_type_name = Q.msgtype,
is_conversation_error = sysconv(bit, Q.status & 2),
is_end_of_dialog = sysconv(bit, Q.status & 4),
message_body = Q.msgbody,
transmission_status = GET_TRANSMISSION_STATUS (S.handle),
priority = R.priority
FROM sys.sysxmitqueue Q
INNER JOIN sys.sysdesend S WITH (NOLOCK)
ON Q.dlgid = S.diagid AND Q.finitiator = S.initiator
INNER JOIN sys.sysdercv R WITH (NOLOCK)
ON Q.dlgid = R.diagid AND Q.finitiator = R.initiator
WHERE is_member('db_owner') = 1
You can see how in SQL Server 11 the message body was separated into a new system table (sys.sysxmitbody). Multicast SEND will create multiple entries in sys.sysxmitqueue (one for each dialog on which the message was multicasted) but only one entry in sys.sysxmitbody. Such a normalized storage scheme saves space consumed and, more importantly, amount of log generated during the SEND.
The Reversed Dialog pattern in publish-subscribe
The typical dialog pattern in pub-sub systems is for the subscriber to start the dialog and send an initial ‘subscribe’ message, then the subscription content is being delivered from the target (the publisher) to the initiator (the subscriber). I call this the Reverse Dialog Pattern because messages flow from the target to the initiator. Lets show with an example. We’ll create a publisher service that broadcast some important content to which services can subscribe to receive it. To spice it up, we’ll use a tag system to subscribe to optional content: all content is distributed with a list of associated tags, all subscribers specify the tag they’re interested in. Tag matching is done using the LIKE syntax, so that subscribers can specify '%' as a mean to subscribe to all content.
The Publisher Service
create message type subscription_request validation = none;
create message type subscription_content validation = well_formed_xml;
create contract distribution
(subscription_request sent by initiator,
subscription_content sent by target);
create queue publisher;
create service publisher on queue publisher (distribution);
go
create table subscriptions (
subscription_id int not null identity(1,1),
tag nvarchar(50) not null,
conversation_handle uniqueidentifier not null,
constraint pk_subscriptions primary key (subscription_id),
constraint unq_conversation_handle unique (conversation_handle, tag));
go
create procedure usp_publisher_handler
as
begin
declare @mt sysname, @dh uniqueidentifier, @mb varbinary(max);
begin try
begin transaction;
receive top(1)
@mt = message_type_name,
@dh = conversation_handle,
@mb = message_body
from publisher;
if (@mt = N'subscription_request')
begin
insert into subscriptions (conversation_handle, tag)
values (@dh, cast(@mb as nvarchar(50)));
end
else if(@mt = N'http://schemas.microsoft.com/SQL/ServiceBroker/Error'
or @mt = N'http://schemas.microsoft.com/SQL/ServiceBroker/EndDialog')
begin
delete from subscriptions
where conversation_handle = @dh;
end conversation @dh;
end
commit
end try
begin catch
declare @xact_state int = xact_state();
if @xact_state <> 0
begin
rollback;
end
end catch
end
go
alter queue publisher with activation (
status = on,
max_queue_readers = 1,
procedure_name = usp_publisher_handler,
execute as owner);
go
The publisher service is straight forward: it uses the subscriptions table to keep track of subscribers. The activated procedure associated with the publisher service processes the subscription_request messages and adds the request dialog to the subscriptions table. The request message body is the tag the subscriber is interested in.
The Publish Content procedure
create type publish_tags_type as table (
tag nvarchar(50) not null primary key);
go
create procedure usp_publish_content
@content xml,
@tags publish_tags_type readonly
as
begin
declare @sql nvarchar(max) = N'send on conversation (';
declare @cnt int = 0;
declare @dh uniqueidentifier;
declare @comma nvarchar(2) = N'';
declare crs cursor static read_only forward_only for
select distinct conversation_handle
from subscriptions s
join @tags t on t.tag like s.tag;
open crs;
fetch next from crs into @dh;
while 0 = @@fetch_status
begin
set @sql += @comma + N'''' + cast(@dh as nvarchar(36)) + N'''';
set @comma = N', ';
set @cnt += 1;
fetch next from crs into @dh;
end
close crs;
deallocate crs;
if @cnt > 0
begin
set @sql+= N') message type subscription_content (@content)';
exec sp_executesql @sql, N'@content xml', @content;
end
end
go
The publish content procedure takes a content to be distributed and the list of tags under which the content is distributed and sends the content to all interested subscribers. One single multicast SEND is used to reach all subscribers. Dynamic SQL is used to build the multicast SEND statement.
Adding subscribers
declare @i int = 0;
declare @sql nvarchar(max);
while @i < 10
begin
set @sql = N'create queue subscriber_' + cast(@i as nvarchar(20)) + N';
create service subscriber_' + cast(@i as nvarchar(20)) + N'
on queue subscriber_'+cast(@i as nvarchar(20)) + N';';
exec sp_executesql @sql;
set @sql = N'declare @dh uniqueidentifier;
begin dialog @dh
from service subscriber_' + cast(@i as nvarchar(20)) + N'
to service N''publisher''
on contract distribution
with encryption = off;
send on conversation @dh message type subscription_request
(''' +case @i%5 when 0 then N'%' else nchar(@i + 65) end + ''');';
exec sp_executesql @sql;
set @i += 1;
end
go
This snipped adds 10 subscribers interested in tags 'B', 'C', 'D' etc. The first and fifth subscribers are interested in everything ('%'). We can see the subscribers were added to the subscriptions table by the publisher activated procedure:
select * from subscriptions
subscription_id tag conversation_handle
--------------- ----------------- -------------------------------------
1 % AFC62EF2-35B3-E011-8EED-001C25160E57
2 B B3C62EF2-35B3-E011-8EED-001C25160E57
3 C B7C62EF2-35B3-E011-8EED-001C25160E57
4 D BBC62EF2-35B3-E011-8EED-001C25160E57
5 E BFC62EF2-35B3-E011-8EED-001C25160E57
6 % C3C62EF2-35B3-E011-8EED-001C25160E57
7 G C7C62EF2-35B3-E011-8EED-001C25160E57
8 H CBC62EF2-35B3-E011-8EED-001C25160E57
9 I CFC62EF2-35B3-E011-8EED-001C25160E57
10 J D3C62EF2-35B3-E011-8EED-001C25160E57
A test multicast message
declare @tags publish_tags_type;
insert into @tags (tag) values ('A'), ('B'), ('C');
exec usp_publish_content N' ', @tags;
go
With this one call we notified all subscribers interested, with one single multicast SEND. We can check which of the subscribers got the content:
declare @i int = 0;
declare @sql nvarchar(max) = N'', @union nvarchar(20) = N'';
while @i < 10
begin
set @sql += @union + N'select
''subscriber_' + cast(@i as nvarchar(20)) + N''' as subscriber,
count(*) as count
from subscriber_' + cast(@i as nvarchar(20));
set @union = ' union all ';
set @i += 1;
end
exec sp_executesql @sql;
go
subscriber count
------------ -----------
subscriber_0 1
subscriber_1 1
subscriber_2 1
subscriber_3 0
subscriber_4 0
subscriber_5 1
subscriber_6 0
subscriber_7 0
subscriber_8 0
subscriber_9 0
(10 row(s) affected)
We can see that subscriber_2 and subscriber_3 each got a message since the tags they're interested are 'B' and 'C' which both match a tag set by the publisher. Subscribers 1 and 5 eahc got a message because they're interested in any tag.
This pattern of publish-subscribe is not new and similar applications could be built with SQL Server Service Broker in SQL Server 2005, 2008 and 2008R2. But with SQL Server 11 the distribution is more efficient and can scale and perform better as the message bodies are not inserted and deleted multiple times, once for each subscriber, in the publisher's transmission queue.
Nice! Good to see this still growing. And, as always, you’ve done a great job with this write-up.
Remus:
Great post. what i scalability in SQL Server 2011. suppose if i want to use SB for 10-15+ tables, do you think that SB hs mature to that extent, where it can do.
I am aware of replication, logship, DBM but want to see if that is possible in SB.
The beauty of SB is async and so far your database has good diskspace, QUEUE can hold good amount of messages in the event of network latency or server goes down.
Thanks
Regards
Jay