Yesterday I attended an online meeting with Microsoft and got an overview about a new database tool, SQL Server Data Tool (SSDT), which was shipped alone with the SQL Server 2012. After the meeting I decided to have a deeper try and found that it could make our live easier especially for SQL Azure development and deployment.
Install the SSDT
SSDT is part of the SQL Server 2012. It can be installed with the SQL Server 2012 installation. And if you don’t need the whole bunch of SQL Server you can install it through the Web PI or Visual Studio.

SSDT is an integrated environment for database developers to carry out all their database design work for any SQL Server platform (both on and off premise) within Visual Studio. We can use the SQL Server Object Explorer in VS to easily create or edit database objects and data, or execute queries. So the SSDT is not a database engine or runtime, it is just a database tool with some cool feature we can use during our development phase. So you can use SSDT with any existing database tools, such as the SQL Server Management Studio (SSMS) side by side.
Once you have installed the SSDT there will be a new window in the Visual Studio named SQL Server Object Explorer.

The new SQL Server Object Explorer is NOT the Server Explorer which exists in Visual Studio already. Although we can add and view the database connection through the Server Explorer it’s totally different to the new SQL Server Object Explorer.
SSDT highly integrated with Visual Studio. This means we don’t need to navigate to another application to tweak the database when coding and debugging. Also, once the SSDT had been installed there will be a new project type in Visual Studio named SQL Server Database project, which we will use later.

This project type is different than the original database project in Visual Studio Database Edition. It has another project extension name and has different functionality.
Statement Oriented, Instead of Command Oriented
SSDT adopts a totally different concept to maintain and control your database. It’s statement oriented, instead of command oriented. This means, by using SSDT you only need to care what your database schema should be. And you will never care about how to update the database to this schema. You will use T-SQL as a statement language to define the tables, views, keys, indexes, etc. you want and just save those CREATE scripts in SSDT project, and then SSDT will help you to apply the changes to the database.
For example, currently we have a database with a table and a view associated.
1: CREATE TABLE [dbo].[Table1] (
2: [Id] INT NOT NULL,
3: [Name] NVARCHAR (50) NOT NULL,
4: PRIMARY KEY CLUSTERED ([Id] ASC)
5: );
6:
7: CREATE VIEW [dbo].[View]
8: AS SELECT * FROM [Table1]
In traditional way, if I want to extend the size of the Name column, I need to write an ALTER TABLE statement and then refresh the view. These scripts is focusing on how to change the database to the schema I want, but it cannot give me a very clear information on what schema I want it to be. This script is command oriented.

In SSDT we don’t need to care about how to alter the table and refresh the view. I only need to change my database creation script, which means just changed the CREATE TABLE script which have the length from 50 to 100 of the Name column. And then SSDT will compare my current script and the existing database schema, generate the upgrade script and run it.

SSDT also includes some other features such as the target platform validation, which means you can check your script against a database platform like SQL Server 2008, 2012 and SQL Azure, make sure it works before you run it.
It also provides the local database feature. By using this feature you can download a database schema copy from the production or center database and use it only by yourself when developing and testing. You can change the schema of your local database. All changes will be in some script files with many CREATE statement, you don’t need to write the alter script at all. And if you think it’s OK, you can use SSDT to update your changes back to the center database. SSDT will help you to alter the database to your latest schema. This is very useful when a team is working on the same database, and very useful when you are working with SQL Azure. Since frequently connect with the SQL Azure and upload download data is NOT free. In the following part I will demonstrate how to use SSDT to develop and upgrade the SQL Azure database.
Create SQL Azure Database and Schema in SSDT
SSDT can integrated with the SQL Azure very well. Just open the Visual Studio and the SQL Server Object Explorer window, right click on the root node and click the Add SQL Server item.

Specify the SQL Azure connection information in the popping up window.

Then it will appear in the SQL Server Object Explorer. You can create a new SQL Azure database in this window directly. Just expand this server and database node then right click to select the Add New Database. Then type the name of database. After several seconds the database will be created on your SQL Azure.

SSDT will create a 1GB Web edition SQL Azure database. If you want to create a database in different configuration you need to write the CREATE DATABASE command manually, or through the development portal.
Expand this new database you can see the tables, views, programmability, etc. in the list. Now let’s create a new table. Right click the Tables node and select Add New Table.

As you can see the table designer appeared! If you had been working with SQL Azure a bit while you should know that till now there’s no table designer in the local SSMS. And the only one designer is by using the web-based SQL Azure database manager, originally named “Project Houston”. But in SSDT you can use the designer to create and alter your tables. And more cool stuff is, when you change the table schema either in the designer or in the script panel they will be synchronized and refreshed automatically. Let’s added two columns through the designer and rename the table name in the script panel.

Then we can click the Update button at the top of the designer to apply the changes to the database. In this case, it will be run on the SQL Azure.
The SSDT will compare the schema between the SQL Azure database and our script, to generate the update script for us. In this case since the SQL Azure database is empty it will tell us just create a new table. But if our changes were huge SSDT will generate more steps to upgrade the schema.

We can retrieve the update script by clicking the Generate Script button, and we can ask SSDT to execute the script for us. Let’s click the Update Database button to apply the changes. Once the SSDT is performing the script steps will be shown in the Data Tool Operations window in Visual Studio. And you can see my script was executed finished successfully.

Back to the SQL Server Object Explorer window the new table had been shown there. And we can view its data, by using the context menu the View Data item, even though at this moment there is no data available.

Database Project and Local Database with SQL Azure
Assuming I’m a developer who is going to work with the database I had just created. Let’s have a look on how to use the SSDT database project and local database feature to make it easier and effectively.
There are many reasons that it is not a good approach to develop against the SQL Azure database directly. The first one is the cost. Since all transaction and data our bound is billed, I don’t want to frequently connect to the SQL Azure in development and test phase. The second reason is the performance. Connecting to the SQL Azure will be more slower than connecting to a local database of course. My boss doesn’t want me to waste of time on waiting for the SQL Azure response. The third one is, if I have some colleagues who are working on the same SQL Azure database, we might be affect each other by changing the schema and add or remove some data. So what I want is to download the SQL Azure database to my local machine, update the schema and data based on my business needs, develop and test, then finally update my changes of the database back to SQL Azure and my code to TFS.
To download the SQL Azure database I will create a new SQL Server Database project. And then right click on the project node from the Solution Explorer and select Import, Database.

SSDT allows to import database schema from a Data-tier Application package, a live database or a database creation script.
In the popping up window I selected the database which I had created on SQL Azure and then click Start button.

SSDT will connect to the SQL Azure database, grab the schema information and generate the creation script to me, and added them into the database project.

SSDT will only download the schema from the source database. It will NOT download and data.
In the Solution Explorer there will be some scripts listed under the related folders. In this example since I only have one table so there’s only one script under the table folder. Also in the SQL Server Object Explorer window there’s a new server node added with the name of (localdb)\Database 2 (SQL Server …). This is the local database SSDT created for me.

You can find the local database files (MDF and LDF) in the folder named Sandbox under you database project folder.
But currently there’s no table in the local database. We need to run the database project to let it generate the database content to us. Select the local database node in the SQL Server Object Explorer and click F5. In the output we can see the database had been deployed to the local database and the table was shown as well.

Now the schema of this database is exactly the same as what it is on SQL Azure, and I can develop against it rather than connecting to the SQL Azure. Select the database node we can find the connection string.

I don’t want to demonstrate how to use the local database through C# and ADO.NET. It’s exactly the same as what we are doing with SQL Server every day. I will focus on database part in this post.
Assuming we need a lookup table named Country and a new column in the Product table named CountryID, and a foreign key between them. We can add and modify on our local database. This is very quickly and will not affect other developers who is related with the SQL Azure database. In the Solution Explorer right click on the Tables folder and add a new Table item.

And in the designer added a column named Name and then save it. Now there will be two scripts under the Tables folder.

And then double click on the Product.sql file to open its design window. Let’s append a new column named CountryID and add a new foreign key from the design panel as well.

Input the foreign key name and then we need to manually specify the columns associated with it through the script panel at the bottom.

Finally, to make our change applied to the local database just run (click F5) on the database project. The output window told us the deployment successful and in the SQL Server Object Explorer the new table and key will be shown as well.

Now let’s assuming I had finished the development and testing and I want to update my database changes to the SQL Azure. In database project this can be done by right click the project and select Publish. In the publish window I selected the target database connection information and then click the Publish button. SSDT will compare the schema between the SQL Azure database and my local database, generate the update scripts and run.

You can check the “Add profile to project” so that your publish setting can be saved and used in the future. Clicking the “Generate Script” will let the SSDT generate the update script to us without performing it.
After a while the SQL Azure database will be changes based on what we have done on the local database.

Target Platform Validation and Schema Compare
Besides the designer and local database, there are some other features can help us for SQL Azure development. The first one is target platform validation.
As we know, even though the underlying database engine is same between SQL Server and SQL Azure, there are some limitation of SQL Azure. When we create or change the schema of SQL Azure it’s very hard to remember all these limitations and causes a service failure. But now, SSDT provide the target platform validation feature which means it can check our schema definition scripts based on the database platform we specified. To demonstrate this feature let’s create another table named Area and with two columns: ID and Area. But let’s remove the primary key of this table. This kind of table is called “Heap” sometimes, which is supported by SQL Server but not by SQL Azure.

Right click the database project and click Build. This will perform the target platform validation. By default our database project is using SQL Server 2012 as its platform so the Area table is OK. Next, let’s click the Property menu item of the database project and change the target platform to SQL Azure.

Then build again, we will get an error said that in SQL Azure a clustered index is required in a table. So when we using SQL Azure through the SSDT we can let it check the schema for us before publish to the cloud.

Another cool feature is schema compare. In fact when we perform any updates in SSDT it will invoke this feature to generate the update script. But we can invoke it manually by clicking the Schema Compare from the context menu of the database project. Then select the target database we want to compare with our database project. Then it will show the differences by tables, views, etc..

Summary
SQL Server Data Tool (SSDT) is not a new product, but was improved a lot and published alone with the new SQL Server 2012. SSDT resolved the problem that how to define, trace and update the database by introducing the statement oriented script principle. This makes the developers focus on what the database should be instead of how to upgrade the database, by using its powerful compare engine and script generator engine.
SSDT also provides the local database and database project feature as well. Working with the script update engine we can easily download the database to the development local and amend. This will not affect the center database. And the developer can publish his/her changes back to the center database very easily. And this is much more useful when working with SQL Azure.
SSDT also make it possible to trace the history of database changes in source control service, such as TFS, by checking the scripts in the repository. Since it only contains the schema definition it would be very easy to find who, when and what to changes to the schema.
Hope this helps,
Shaun
All documents and related graphics, codes are provided "AS IS" without warranty of any kind.
Copyright © Shaun Ziyan Xu. This work is licensed under the Creative Commons License.
This morning I received an email from the Microsoft said that, a meaningful company, Microsoft Open Technologies Inc., had just release their first pre-production, the Redis on Windows.
Redis
If you had been working in the Linux world or interested in NoSQL field you should have probably known or heard about the Redis. Redis is an open source, advanced key-value store. Someone categorized Redis to the distributed cache since it’s an in-memory key-value store. Someone categorized it to the NoSQL, since it provides the feature saving your data into disk. And someone categorized it to the distributed queue, since it supports storing your data into hash and list type and provides the enqueue, dequeue and pub/sub functionalities. You see, Redis is a very powerful tool that can be used widely in building especially a large scaling system, as a distributed cache, NoSQL and message queue.
The original Redis is written in ANSI C, which sponsored by VMWare, works in most POSIX systems like Linux, *BSD, OS X without external dependencies. Linux and OSX are the two operating systems where Redis is developed and more tested, and it’s recommended to use Linux for deploying. As you can see there’s no Windows in the list. Microsoft had submitted a patch into Redis repository to make it available on Windows, but unfortunately Redis doesn’t have any plan to merge it into the main branch. This means all Windows version of Redis are all unofficial, includes this one.
Different from the basic in-memory key-value store, such as the Memcached, the Redis not only provides storing objects by keys, it also can store your data in vary types such as strings, hashes, lists, sets and sorted sets. Redis also have the atomic operations on these types, like appending to a string; incrementing the value in a hash; pushing to a list; computing set intersection, union and difference; or getting the member with highest ranking in a sorted set.
In order to achieve its outstanding performance, Redis works with an in-memory data store. But depending on the use cases, we can persist it either by dumping the dataset to disk every once in a while, or by appending each command to a log. So it looks like a NoSQL database as well. Other features include a simple check-and-set mechanism, pub/sub and configuration settings to make Redis behave like a cache.
Download, Build and Install Redis on Windows
The Redis on Windows now is available on the GitHub of the Microsoft Open Technologies Inc. site. You can download the full source here, which including the Redis Service, Redis Client, Redis Benchmark, etc..
In Windows the simplest way to build the Redis is to use the Visual Studio. If you have installed the C++ component then you can open its solution file under the “msvs” folder. Then just build it.

Navigate to the “msvs\debug” folder there will be a lot of messing files appeared. I don’t have any experience on C and C++ but I can find something useful. The most imortant two files are redis-server.exe and redis-cli.exe.
| redis-server.exe | The execution file that start a Redis server process on your machine. |
| redis-cli.exe | The command line client tool that can operate against a Redis server. |
The simplest way to start a Redis server is just to open a command windows and go to this folder, execute the redis-server.exe then you can see the Redis is now running and on the screen it will show the status every 5 seconds by default.

Very easy, right? If you want to check whether the Redis is working or not, just execute the redis-cli.exe in another command window, and input the Redis command “ping”. If the server you are connecting is fine it will response a “PONG” back to you.

If you had been using the Linux version of Redis before you will find that the execution files, output log and the commands are exactly same. This means you can use any tools and clients you are familiar with which running on the original Redis to this new windows version. For example, let put a key-value pair into the Redis store through the client.

I added two key-value pairs into my Redis by using the SET command, and then searched all keys in the server. It gave me the result on the client screen. And then I got one of them by specifying the key. This is a very common and simple usage. You can find the full command list on this page.
Redis and C#
The Redis on Windows product just ship the windows version of Redis server, without the connection client for the .NET language. But this is not a problem. Since as I mentioned below, the Redis on Windows follows all behaviors and interfaces of the original one. This means we can use any connection client that works with the original Redis to this Windows version.
There are many clients available on this page, includes C, C++, Java, PHP, Ruby, Python, etc.. And of course, the C#. I was using the ServiceStack.Redis as the connection client in one of my previous project. It worked well with my Redis running on Ubuntu, so it should be able to work on the Redis on Windows version as well.
You can download the ServiceStack.Redis here, build it can add the reference of ServiceStack.Interfaces and ServiceStack.Redis into the C# project.
When using the ServiceStack.Redis we need to instant its RedisClient class by specifying the Redis server name (or IP) and port. Then your can execute the Redis commands from vary methods in the RedisClient class. Let’s firstly have a try on how to use the list feature of Redis, which it works like a queue.
First of all, I will invoke the FlushAll method to clear all keys in my Redis. This will remove all items that stored in the server.
1: using (var redis = new RedisClient("127.0.0.1"))
2: {
3: redis.FlushAll();
4: }
Then I will start a thread to let user input something and append it into one of the list (queue) of my Redis. The name of this list would be “default”.
1: var senderThread = new Thread(() =>
2: {
3: using (var redis = new RedisClient("127.0.0.1"))
4: {
5: Console.WriteLine("Input message: ");
6: var message = Console.ReadLine();
7: while (!string.IsNullOrWhiteSpace(message))
8: {
9: redis.EnqueueItemOnList("default", message);
10:
11: Console.WriteLine("Input message: ");
12: message = Console.ReadLine();
13: }
14: }
15: });
16: senderThread.Start();
Next, in the main thread the application will try to dequeue any items from the list, and print the value in the console.
1: using (var redis = new RedisClient("127.0.0.1"))
2: {
3: var result = string.Empty;
4: while (string.Compare("q", result, false) != 0)
5: {
6: result = redis.BlockingDequeueItemFromList("default", TimeSpan.FromSeconds(5));
7: Console.WriteLine("DEQUEUE RESULT = [{0}]", result);
8: }
9: }
The full code would be like this.
1: static void Main(string[] args)
2: {
3: // clean up
4: using (var redis = new RedisClient("127.0.0.1"))
5: {
6: redis.FlushAll();
7: }
8:
9: // enqueue thread definition
10: var senderThread = new Thread(() =>
11: {
12: using (var redis = new RedisClient("127.0.0.1"))
13: {
14: Console.WriteLine("Input message: ");
15: var message = Console.ReadLine();
16: while (!string.IsNullOrWhiteSpace(message))
17: {
18: redis.EnqueueItemOnList("default", message);
19:
20: Console.WriteLine("Input message: ");
21: message = Console.ReadLine();
22: }
23: }
24: });
25: senderThread.Start();
26:
27: // dequeue operation
28: using (var redis = new RedisClient("127.0.0.1"))
29: {
30: var result = string.Empty;
31: while (string.Compare("q", result, false) != 0)
32: {
33: result = redis.BlockingDequeueItemFromList("default", TimeSpan.FromSeconds(5));
34: Console.WriteLine("DEQUEUE RESULT = [{0}]", result);
35: }
36: }
37:
38: Console.WriteLine("Done!");
39: Console.ReadKey();
40: }
As you can see after I input some strings it will be appended into the list, and then dequeued from the list at once.

In this example we only have one thread to dequeue the items. If there are more than one client which is dequeuing the same list, only one of them can retrieve the item. In message bus glossology this is named “queue”, which means only one consumer can retrieve an item. And if there’s no consumer available the items in the queue should be stay there until at least a consumer connected and dequeued.

If it’s a reliable queue, the items in the queue will be remained even though the message bus was terminated or the machine is crashed.
Next, let’s have a try on the pub/sub mode. Pub/Sub mode, also known as the “topic” mode in message bus field. Different from the “queue” mode I mentioned below, in “topic” mode a message will be received by all consumers that is subscribing on the topic. (In Redis it’s said “channel” rather than “topic”.) But if some consumers were not available at that moment the message will be lost and will never be delivered to them even though they got back later.

To use the Pub/Sub in Redis from C# as a consumer we also need to create the RedisClient instance, and use its CreateSubscrption method to bind it to a channel. And when a message comes it will raise the OnMessage event and we can handle it to do our own business logic.
1: static void ConsumerAction(object name)
2: {
3: using (var consumer = new RedisClient("127.0.0.1"))
4: {
5: using (var subscription = consumer.CreateSubscription())
6: {
7: subscription.OnSubscribe = (channel) =>
8: {
9: Console.WriteLine("[{0}] Subscribe to channel '{1}'.", name, channel);
10: };
11: subscription.OnUnSubscribe = (channel) =>
12: {
13: Console.WriteLine("[{0}] Unsubscribe to channel '{1}'.", name, channel);
14: };
15: subscription.OnMessage = (channel, message) =>
16: {
17: Console.WriteLine("[{0}] Received message '{1}' from channel '{2}'.", name, message, channel);
18: };
19:
20: subscription.SubscribeToChannels("default");
21: }
22: }
23: }
In order to demonstrate the behavior of Pub/Sub let’s create there threads as three consumers that are subscribing the same channel.
1: var consumerThread1 = new Thread(new ParameterizedThreadStart(ConsumerAction));
2: var consumerThread2 = new Thread(new ParameterizedThreadStart(ConsumerAction));
3: var consumerThread3 = new Thread(new ParameterizedThreadStart(ConsumerAction));
4: consumerThread1.Start("Consumer 1");
5: consumerThread2.Start("Consumer 2");
6: consumerThread3.Start("Consumer 3");
And use the PublishMessage method of the RedisClient to send some messages in the channel in the main thread. The full code would be like this.
1: static void ConsumerAction(object name)
2: {
3: using (var consumer = new RedisClient("127.0.0.1"))
4: {
5: using (var subscription = consumer.CreateSubscription())
6: {
7: subscription.OnSubscribe = (channel) =>
8: {
9: Console.WriteLine("[{0}] Subscribe to channel '{1}'.", name, channel);
10: };
11: subscription.OnUnSubscribe = (channel) =>
12: {
13: Console.WriteLine("[{0}] Unsubscribe to channel '{1}'.", name, channel);
14: };
15: subscription.OnMessage = (channel, message) =>
16: {
17: Console.WriteLine("[{0}] Received message '{1}' from channel '{2}'.", name, message, channel);
18: };
19:
20: subscription.SubscribeToChannels("default");
21: }
22: }
23: }
24:
25: static void Main(string[] args)
26: {
27: using (var redis = new RedisClient("127.0.0.1"))
28: {
29: redis.FlushAll();
30: }
31:
32: var consumerThread1 = new Thread(new ParameterizedThreadStart(ConsumerAction));
33: var consumerThread2 = new Thread(new ParameterizedThreadStart(ConsumerAction));
34: var consumerThread3 = new Thread(new ParameterizedThreadStart(ConsumerAction));
35: consumerThread1.Start("Consumer 1");
36: consumerThread2.Start("Consumer 2");
37: consumerThread3.Start("Consumer 3");
38:
39: using (var publisher = new RedisClient("127.0.0.1"))
40: {
41: Console.WriteLine("Input message: ");
42: var message = Console.ReadLine();
43: while (!string.IsNullOrWhiteSpace(message))
44: {
45: publisher.PublishMessage("default", message);
46: Console.WriteLine("Input message: ");
47: message = Console.ReadLine();
48: }
49: }
50:
51:
52: Console.WriteLine("Done!");
53: Console.ReadKey();
54: }
Then let’s run our application and input some messages. You can see in this sample all of our three consumers received the messages and process their own business logic.

Summary
Distributed cache, distributed data store and distributed message queue are very important components when we build a large system with high scalability and high performance. Redis is one of the most powerful product. It can be used as a cache, a NoSQL database and message queue as well. And the performance of Redis is outstanding. But for the developers who is working on Windows platform it’s a little pity that Redis doesn’t support Windows by default.
In this post I forwarded the announcement that Microsoft Open Technologies had just published their first un-commercial Redis on Windows version. By using it we can deploy the Redis server on Windows, which means no need to learn on how to build and run it on Linux.
I also demonstrated how to use it from C# through the ServiceStack.Redis client library. Since the Windows version of Redis follows all APIs as the original one we can use anything we are familiar with.
Integrated with my previous post about using the WCF transport extension to build a high scalability system on top of the message bus, since the Redis supports the list data type which implements the message bus feature, we can use it as our underlying transportation as well.
Hope this helps,
Shaun
All documents and related graphics, codes are provided "AS IS" without warranty of any kind.
Copyright © Shaun Ziyan Xu. This work is licensed under the Creative Commons License.
We are almost done everything about the WCF transport extension over the message bus, which makes our services can be scaled out by introducing more instances over machines and servers. We had finished the structure of our transport extension and implemented the request reply mode in the 2nd post, the datagram and duplex mode in the 4th and 5th post. As I have said at the end of the 5th post, currently we can use our transport extension. But there still something left. Although are not that major as the three MEPs implementation, sometimes they are very useful. In this post I will cover the first of them: session.
Session in ASP.NET and WCF
If you have the experience developing the ASP.NET application you should be very familiar with the session. Session is a very important feature included in the ASP.NET runtime, and all framework built on top could use it, such as the ASP.NET WebForm, ASP.NET MVC and ASP.NET Dynamic Data.
In ASP.NET, when the client sent the first request to the server, a session ID will be generated on the server side. And by default, there will be a dictionary item created in the web application process (w3wp.exe) to associate with this session ID. Then the session ID will be replied to the client and by default, it will be stored in the cookie. After that every request the client send to the server will pass the session ID within the request body, so that the server would be able to know which this request was come from.
In this way, ASP.NET runtime makes it possible on the server side to save some information per client. When we develop an ASP.NET website we can save and retrieve this information through the SessionState. In ASP.NET runtime it find our session in the dictionary in the server memory by the session ID it has.
What I mentioned above are based on the situation that specified to use the InProc session state mode. If we use the session server or SQL Server, the dictionary and the session object may not be located in the server’s memory.

So we can figure out that the ASP.NET session is mainly focus on:
- ASP.NET session is always be initialized by the server.
- ASP.NET session solves how to keep the relationship between the requests from the same client.
- ASP.NET provides a general way to store the session data, but not forced.
Next, let’s have a look on the WCF session. WCF session is more general than ASP.NET session. In WCF, a session just means a connection of messages. It doesn’t care about how to pass the identity between the server and the client. It also doesn’t case about how to store the session at all. As the developer of transport extension we need to figure out how to pass the session ID through the transportation.
MSDN explained the WCF session as following:
- They are explicitly initiated and terminated by the calling application.
- Messages delivered during a session are processed in the order in which they are received.
- Sessions correlate a group of messages into a conversation. The meaning of that correlation is an abstraction. For instance, one session-based channel may correlate messages based on a shared network connection while another session-based channel may correlate messages based on a shared tag in the message body. The features that can be derived from the session depend on the nature of the correlation.
- There is no general data store associated with a WCF session.
In WCF, a session will be established when the client created the channel and will be terminated once the client channel was closed. Based on the service contract definition the client will ask the server if it needs a sessionful channel. If the transport support session it will create the session and have a session ID associate with all message until this channel is closed. Regarding how to store the shared data between the messages in a session, WCF doesn’t have any principal or guild. By default, WCF combines the session and the service instance lifecycle, so that the developer can utilize the service class local variants to store the intermediate data. But this is not mandatory. In the following sections you will see how we will do to make the session works under our scaling-out architecture.
Apply the Default WCF Session Behavior
By default WCF combines the service instance mode and session to make it easy to developer to use the session. There are three modes could be specified in the service contract:
- Required: This service requires session.
- Allow: This service allows the session to be establish, but not mandatory.
- Not Allow: This service doesn’t allow session.
And there are three service instance context modes which takes the responsible for managing the service instance lifecycle:
- PerCall: Each client request will make the service create a new instance and handle it.
- PerSession: A service instance will be created once a session was establish. All requests in this session will be handled by this instance.
- Single: The service instance will be created once the service was opened. All clients requests will be handled by this instance.
In MSDN there is a very good table that describes the relationship between the session, service instance mode and the default WCF session behavior.
In WCF it utilize a binding element to implement the default session behavior. This binding element takes the currently channel type and if it’s a sessionful channel, it will create some special channels on top of the underlying transport channels to handle the session. For example, if it found that we are going to use session of the datagram, then it will create the ReliableOutputSessionChannelOverRequest on top our own OutputChannel. And it will takes the responsible to create session, maintain the message order etc.
To use this build-in session binding element we can just add the ReliableSessionBindingElement into the binding element collection in our binding class. In MessageBusTransportBinding class I added a local variant which saves the ReliableSessionBindingElement, and with two constructors.
1: public class MessageBusTransportBinding : Binding
2: {
3: private readonly MessageEncodingBindingElement _messageElement;
4: private readonly MessageBusTransportBindingElement _transportElement;
5: private readonly ReliableSessionBindingElement _sessionElement;
6:
7: public MessageBusTransportBinding(IBus bus)
8: : this(bus, SessionfulMode.Distributed)
9: {
10: }
11:
12: public MessageBusTransportBinding(IBus bus, SessionfulMode sessionfulMode)
13: : base()
14: {
15: _messageElement = new TextMessageEncodingBindingElement();
16: _transportElement = new MessageBusTransportBindingElement(bus);
17: if (sessionfulMode == SessionfulMode.Standard)
18: {
19: _sessionElement = new ReliableSessionBindingElement();
20: }
21: }
22:
23: ... ...
24:
25: }
And when getting the binding elements we will add it based on what we want.
1: public override BindingElementCollection CreateBindingElements()
2: {
3: var elements = new BindingElementCollection();
4: elements.Add(_messageElement);
5: if (_sessionElement != null)
6: {
7: elements.Add(_sessionElement);
8: }
9: // the transport binding element must be the last one
10: elements.Add(_transportElement);
11: return elements.Clone();
12: }
And, that’s all. Simple? OK, let’s test our sessionful transport extension. Update our service contract and the implementation class. Here I defined the service behavior as required session and the service instance mode was per session.
1: [ServiceContract(Namespace = "http://wcf.shaunxu.me/", SessionMode= SessionMode.Required)]
2: public interface ISampleService
3: {
4: [OperationContract(IsOneWay = true)]
5: void Add(int value);
6:
7: [OperationContract(IsOneWay = false)]
8: int GetResult();
9: }
10:
11: [ServiceBehavior(InstanceContextMode = InstanceContextMode.PerSession)]
12: public class SampleService : ISampleService
13: {
14: private int _current;
15:
16: public void Add(int value)
17: {
18: _current += value;
19: }
20:
21: public int GetResult()
22: {
23: return _current;
24: }
25: }
I added a helper method to establish a client proxy.
1: static TChannel EstablishClientProxy<TChannel>(IBus bus, string address, SessionfulMode sessionfulMode)
2: {
3: var binding = new MessageBusTransportBinding(bus, sessionfulMode);
4: var factory = new ChannelFactory<TChannel>(binding, address);
5: factory.Opened += (sender, e) =>
6: {
7: Console.WriteLine("Client connected to {0}", factory.Endpoint.ListenUri);
8: };
9: var proxy = factory.CreateChannel();
10: return proxy;
11: }
Then we make our service hosted on a message bus and two client invoke it.
1: static void Main(string[] args)
2: {
3: var bus = new InProcMessageBus();
4: var address = "net.bus://localhost/sample";
5:
6: // establish the services
7: var host1 = EstablishServiceHost<ISampleService, SampleService>(bus, address, SessionfulMode.Standard);
8:
9: // establish the client
10: var client1 = EstablishClientProxy<ISampleService>(bus, address, SessionfulMode.Standard);
11: var client2 = EstablishClientProxy<ISampleService>(bus, address, SessionfulMode.Standard);
12: using (client1 as IDisposable)
13: using (client2 as IDisposable)
14: {
15: client1.Add(1);
16: client2.Add(4);
17:
18: client1.Add(3);
19: client2.Add(5);
20:
21: client1.Add(2);
22: client2.Add(6);
23:
24: var result1 = client1.GetResult();
25: var result2 = client2.GetResult();
26: Console.WriteLine("Client 1 Result: {0}", result1);
27: Console.WriteLine("Client 2 Result: {0}", result2);
28: }
29:
30: // close the service
31: host1.Close();
32:
33: Console.ReadKey();
34: }
The execution result was like this. Since our service instance mode was per session, so all requests in the same session will use the same service instance. This is the reason we can use the service class local variant to store the intermediate data.

But if we added more service instance it will be failed when client invokes. This is because, under the default ReliableSessionBindingElement WCF need to track the request sequence and it’s not allowed that messages in one session was received by another service instance.

Hence in order to support the service scaling-out we must implement our own sessionful channels.
Message Bus with Session ID
Before we implement the sessionful channels, we need to send the current session ID between the service and client through the message bus. So need to amend the message structure.
1: public class BusMessage
2: {
3: public string MessageID { get; private set; }
4: public string SessionID { get; private set; }
5: public string From { get; private set; }
6: public string ReplyTo { get; private set; }
7: public string Content { get; private set; }
8:
9: public BusMessage(string messageId, string sessionId, string fromChannelId, string replyToChannelId, string content)
10: {
11: MessageID = messageId;
12: SessionID = sessionId;
13: From = fromChannelId;
14: ReplyTo = replyToChannelId;
15: Content = content;
16: }
17: }
As well as the IBus interface which allows us to send the session ID.
1: public interface IBus : IDisposable
2: {
3: string SendRequest(string message, string sessionId, bool fromClient, string from, string to = null);
4:
5: void SendReply(string message, string sessionId, bool fromClient, string replyTo);
6:
7: BusMessage Receive(bool fromClient, string replyTo);
8: }
And the underlying message entity and the in process message bus implementation.
1: public class InProcMessageEntity
2: {
3: public Guid ID { get; set; }
4: public string SessionID { get; set; }
5: public string Content { get; set; }
6: public bool FromClient { get; set; }
7: public string From { get; set; }
8: public string To { get; set; }
9:
10: public InProcMessageEntity()
11: : this(string.Empty, string.Empty, false, string.Empty, string.Empty)
12: {
13: }
14:
15: public InProcMessageEntity(string content, string sessionId, bool fromClient, string from, string to)
16: {
17: ID = Guid.NewGuid();
18: SessionID = sessionId;
19: Content = content;
20: FromClient = fromClient;
21: From = from;
22: To = to;
23: }
24: }
1: public class InProcMessageBus : IBus
2: {
3: private readonly ConcurrentDictionary<Guid, InProcMessageEntity> _queue;
4: private readonly object _lock;
5:
6: public InProcMessageBus()
7: {
8: _queue = new ConcurrentDictionary<Guid, InProcMessageEntity>();
9: _lock = new object();
10: }
11:
12: public string SendRequest(string message, string sessionId, bool fromClient, string from, string to = null)
13: {
14: var entity = new InProcMessageEntity(message, sessionId, fromClient, from, to);
15: _queue.TryAdd(entity.ID, entity);
16: return entity.ID.ToString();
17: }
18:
19: public void SendReply(string message, string sessionId, bool fromClient, string replyTo)
20: {
21: var entity = new InProcMessageEntity(message, sessionId, fromClient, null, replyTo);
22: _queue.TryAdd(entity.ID, entity);
23: }
24:
25: public BusMessage Receive(bool fromClient, string replyTo)
26: {
27: InProcMessageEntity e = null;
28: while (true)
29: {
30: lock (_lock)
31: {
32: var entity = _queue
33: .Where(kvp => kvp.Value.FromClient == fromClient && (kvp.Value.To == replyTo || string.IsNullOrWhiteSpace(kvp.Value.To)))
34: .FirstOrDefault();
35: if (entity.Key != Guid.Empty && entity.Value != null)
36: {
37: _queue.TryRemove(entity.Key, out e);
38: }
39: }
40: if (e == null)
41: {
42: Thread.Sleep(100);
43: }
44: else
45: {
46: return new BusMessage(e.ID.ToString(), e.SessionID, e.From, e.To, e.Content);
47: }
48: }
49: }
50:
51: public void Dispose()
52: {
53: }
54: }
As we had changed the message bus interface, some existing code need to be changed. You can download the final code at end of this post.
Sessionful Channels: Request Reply Mode
In WCF if we want to implement our own sessionful channels we need to implement the channel classes for each MEP. Each session channel will have a property that returns the session object. And in WCF there is an interface to define the session which is ISession.
1: namespace System.ServiceModel.Channels
2: {
3: // Summary:
4: // Defines the interface to establish a shared context among parties that exchange
5: // messages by providing an ID for the communication session.
6: public interface ISession
7: {
8: // Summary:
9: // Gets the ID that uniquely identifies the session.
10: //
11: // Returns:
12: // The ID that uniquely identifies the session.
13: string Id { get; }
14: }
15: }
As you can see the ISession interface only defines a property that returns the session ID. This means, as I said before, WCF doesn’t care about how the session data will be stored and how to establish the sessionful communication.
And regarding each MEP there will be three sub session interfaces in WCF: IOutputSession, IInputSession and IDuplexSession. First two interfaces will be used in the sessionful datagram and request reply mode, the IDuplexSession will be used in the sessionful duplex mode.
1: // Summary:
2: // Defines the interface for the session implemented on the sending side of
3: // a one-way communication between messaging endpoints.
4: public interface IOutputSession : ISession
5: {
6: }
7:
8: // Summary:
9: // Defines the interface for the session implemented on the receiving side of
10: // a one-way communication between messaging endpoints.
11: public interface IInputSession : ISession
12: {
13: }
14:
15: // Summary:
16: // Defines the interface for the session implemented on each side of a bi-directional
17: // communication between messaging endpoints.
18: public interface IDuplexSession : IInputSession, IOutputSession, ISession
19: {
20: // Summary:
21: // Begins an asynchronous operation to terminate the outbound session.
22: //
23: // Parameters:
24: // callback:
25: // The System.AsyncCallback delegate.
26: //
27: // state:
28: // An object that contains state information for this request.
29: //
30: // Returns:
31: // The System.IAsyncResult that references the asynchronous outbound session
32: // termination.
33: IAsyncResult BeginCloseOutputSession(AsyncCallback callback, object state);
34: //
35: // Summary:
36: // Begins an asynchronous operation to terminate the outbound session with a
37: // specified timeout within which the operation must complete.
38: //
39: // Parameters:
40: // timeout:
41: // The System.TimeSpan that specifies the interval of time within which the
42: // operation must complete.
43: //
44: // callback:
45: // The System.AsyncCallback delegate.
46: //
47: // state:
48: // An object that contains state information for this request.
49: //
50: // Returns:
51: // The System.IAsyncResult that references the asynchronous outbound session
52: // termination.
53: IAsyncResult BeginCloseOutputSession(TimeSpan timeout, AsyncCallback callback, object state);
54: //
55: // Summary:
56: // Terminates the outbound session that indicates that no more messages will
57: // be sent from this endpoint on the channel associated with the session.
58: void CloseOutputSession();
59: //
60: // Summary:
61: // Terminates the outbound session that indicates that no more messages will
62: // be sent from this endpoint on the channel associated with the session within
63: // a specified interval of time.
64: //
65: // Parameters:
66: // timeout:
67: // The System.TimeSpan that specifies the interval of time within which the
68: // operation must complete.
69: void CloseOutputSession(TimeSpan timeout);
70: //
71: // Summary:
72: // Completes an asynchronous operation to terminate the outbound session that
73: // indicates that no more messages will be sent from this endpoint on the channel
74: // associated with the session.
75: //
76: // Parameters:
77: // result:
78: // The System.IAsyncResult returned by a call to one of the Overload:System.ServiceModel.Channels.IDuplexSession.BeginCloseOutputSession
79: // methods.
80: void EndCloseOutputSession(IAsyncResult result);
81: }
So what we need to do is to implement these sessions, sessionful channels and the related channel factories and listeners.
To make our sample as simple as possible I just implemented the necessary members of these thress session interface, which is the property that returns the session ID.
1: internal class MessageBusOutputSession : IOutputSession
2: {
3: private string _id;
4:
5: public string Id
6: {
7: get
8: {
9: return _id;
10: }
11: }
12:
13: public MessageBusOutputSession(string id)
14: {
15: _id = id;
16: }
17: }
18:
19: internal class MessageBusInputSession : IInputSession
20: {
21: private string _id;
22:
23: public string Id
24: {
25: get
26: {
27: return _id;
28: }
29: }
30:
31: public MessageBusInputSession(string id)
32: {
33: _id = id;
34: }
35: }
36:
37: internal class MessageBusDuplexSession : IDuplexSession
38: {
39: private string _id;
40:
41: public MessageBusDuplexSession(string id)
42: {
43: _id = id;
44: }
45:
46: public IAsyncResult BeginCloseOutputSession(TimeSpan timeout, AsyncCallback callback, object state)
47: {
48: throw new NotImplementedException();
49: }
50:
51: public IAsyncResult BeginCloseOutputSession(AsyncCallback callback, object state)
52: {
53: throw new NotImplementedException();
54: }
55:
56: public void CloseOutputSession(TimeSpan timeout)
57: {
58: throw new NotImplementedException();
59: }
60:
61: public void CloseOutputSession()
62: {
63: throw new NotImplementedException();
64: }
65:
66: public void EndCloseOutputSession(IAsyncResult result)
67: {
68: throw new NotImplementedException();
69: }
70:
71: public string Id
72: {
73: get
74: {
75: return _id;
76: }
77: }
78: }
And then, let’s implement the sessionful channels. The first one is the request reply mode, which are request session channel and reply session channel.
The request session channel will be inherited from the existing MessageBusRequestChannel class, so that we can leverage the existing message operations. And it should implement the IRequestSessionChannel interface. The IRequestSessionChannel interface need a property of IOutputSession which we had implemented before. So the MessageBusRequestSessionChannel would be like this.
1: public class MessageBusRequestSessionChannel : MessageBusRequestChannel, IRequestSessionChannel
2: {
3: private IOutputSession _session;
4:
5: public IOutputSession Session
6: {
7: get
8: {
9: return _session;
10: }
11: }
12:
13: public MessageBusRequestSessionChannel(
14: BufferManager bufferManager, MessageEncoderFactory encoder, ChannelManagerBase parent,
15: EndpointAddress remoteAddress, Uri via,
16: IBus bus)
17: : base(bufferManager, encoder, parent, remoteAddress, via, bus)
18: {
19: _session = new MessageBusOutputSession((new UniqueId()).ToString());
20: }
21: }
In our implementation before the client send the request message it will attach the current session ID from its IOutputSession object. So in the base class before the request message was sent we need to provide a chance to let the session ID to be set. Use a virtual method should be a quick way.
In the base class we added a virtual method that can update the session ID, if needed. And then it will send the request message with this session ID.
1: public Message Request(Message message, TimeSpan timeout)
2: {
3: ThrowIfDisposedOrNotOpen();
4: lock (_aLock)
5: {
6: // unbox the message into string that will be sent into the bus
7: var content = GetStringFromWcfMessage(message,_remoteAddress);
8: // apply the session from the sub class if needed
9: var sessionId = string.Empty;
10: OnBeforeRequest(ref sessionId);
11: // send the message into bus
12: var busMsgId = _bus.SendRequest(content, sessionId, true, null);
13: // waiting for the reply message arrive from the bus
14: var replyMsg = _bus.Receive(false, busMsgId);
15: if (string.IsNullOrWhiteSpace(replyMsg.Content))
16: {
17: // this means this is a one way channel acknowledge from server
18: // we just return null and do nothing
19: return null;
20: }
21: else
22: {
23: // box the message from the bus message content and return back
24: var reply = GetWcfMessageFromString(replyMsg.Content);
25: return reply;
26: }
27: }
28: }
29:
30: protected virtual void OnBeforeRequest(ref string sessionId)
31: {
32: }
Then in the sessionful output channel we can override the virtual method and set the session ID.
1: protected override void OnBeforeRequest(ref string sessionId)
2: {
3: sessionId = _session.Id;
4: }
Similarly, on the server side we create the MessageBusReplySessionChannel class which is based on the MessageBusReplyChannel, and implement the interface IReplySessionChannel. On the server side, it needs to retrieve the session ID after it received a message from the bus. So that in the base class we need a virtual method as well. The base class MessageBusReplyChannel would be changed like this.
1: public MessageBusReplyChannel(
2: BufferManager bufferManager, MessageEncoderFactory encoder, ChannelManagerBase parent,
3: EndpointAddress localAddress,
4: IBus bus)
5: : base(bufferManager, encoder, parent)
6: {
7: _localAddress = localAddress;
8: _bus = bus;
9: _aLock = new object();
10:
11: _tryReceiveRequestDelegate = (TimeSpan t, out RequestContext rc) =>
12: {
13: rc = null;
14: // receive the request message from the bus
15: var busMsg = _bus.Receive(true, null);
16: // box the wcf message
17: var message = GetWcfMessageFromString(busMsg.Content);
18: // initialize the request context and return
19: rc = new MessageBusRequestContext(message, this, _localAddress, _bus, busMsg.MessageID);
20: OnAfterTryReceiveRequest(busMsg);
21: return true;
22: };
23: }
24:
25: protected virtual void OnAfterTryReceiveRequest(BusMessage message)
26: {
27: }
And the sessionful channel MessageBusReplySessionChannel would be like this.
1: public class MessageBusReplySessionChannel : MessageBusReplyChannel, IReplySessionChannel
2: {
3: private IInputSession _session;
4:
5: public IInputSession Session
6: {
7: get
8: {
9: return _session;
10: }
11: }
12:
13: public MessageBusReplySessionChannel(
14: BufferManager bufferManager, MessageEncoderFactory encoderFactory, ChannelManagerBase parent,
15: EndpointAddress localAddress,
16: IBus bus)
17: : base(bufferManager, encoderFactory, parent, localAddress, bus)
18: {
19: }
20:
21: protected override void OnAfterTryReceiveRequest(BusMessage message)
22: {
23: _session = new MessageBusInputSession(message.SessionID);
24: }
25: }
The sessionful reply channel need the IInputSession, instead of the IOutputChannel that we are using in the request part.
Last one, implement the related channel factory and channel listener, and modify the transport binding element to let it return the sessionful channels.
1: public class MessageBusRequestSessionChannelFactory : MessageBusChannelFactoryBase<IRequestSessionChannel>
2: {
3: public MessageBusRequestSessionChannelFactory(MessageBusTransportBindingElement transportElement, BindingContext context)
4: : base(transportElement, context)
5: {
6: }
7:
8: protected override IRequestSessionChannel CreateChannel(
9: BufferManager bufferManager, MessageEncoderFactory encoder, EndpointAddress remoteAddress,
10: MessageBusChannelFactoryBase<IRequestSessionChannel> parent,
11: Uri via,
12: IBus bus)
13: {
14: return new MessageBusRequestSessionChannel(bufferManager, encoder, parent, remoteAddress, via, bus);
15: }
16: }
1: public class MessageBusReplySessionChannelListener : MessageBusChannelListenerBase<IReplySessionChannel>
2: {
3: public MessageBusReplySessionChannelListener(MessageBusTransportBindingElement transportElement, BindingContext context)
4: : base(transportElement, context)
5: {
6: }
7:
8: protected override IReplySessionChannel CreateChannel(
9: BufferManager bufferManager, MessageEncoderFactory encoder, EndpointAddress localAddress,
10: MessageBusChannelListenerBase<IReplySessionChannel> parent,
11: IBus bus)
12: {
13: return new MessageBusReplySessionChannel(bufferManager, encoder, parent, localAddress, bus);
14: }
15: }
1: public override bool CanBuildChannelFactory<TChannel>(BindingContext context)
2: {
3: return typeof(TChannel) == typeof(IRequestChannel) ||
4: typeof(TChannel) == typeof(IOutputChannel) ||
5: typeof(TChannel) == typeof(IDuplexChannel) ||
6: typeof(TChannel) == typeof(IRequestSessionChannel);
7: }
8:
9: public override bool CanBuildChannelListener<TChannel>(BindingContext context)
10: {
11: return typeof(TChannel) == typeof(IReplyChannel) ||
12: typeof(TChannel) == typeof(IInputChannel) ||
13: typeof(TChannel) == typeof(IDuplexChannel) ||
14: typeof(TChannel) == typeof(IReplySessionChannel);
15: }
16:
17: public override IChannelFactory<TChannel> BuildChannelFactory<TChannel>(BindingContext context)
18: {
19: if (context == null)
20: {
21: throw new ArgumentNullException("context");
22: }
23: if (!CanBuildChannelFactory<TChannel>(context))
24: {
25: throw new ArgumentException(string.Format("Unsupported channel type: {0} for the channel factory.", typeof(TChannel).Name));
26: }
27:
28: if (typeof(TChannel) == typeof(IRequestChannel))
29: {
30: return (IChannelFactory<TChannel>)(object)new MessageBusRequestChannelFactory(this, context);
31: }
32: else if (typeof(TChannel) == typeof(IOutputChannel))
33: {
34: return (IChannelFactory<TChannel>)(object)new MessageBusOutputChannelFactory(this, context);
35: }
36: else if (typeof(TChannel) == typeof(IDuplexChannel))
37: {
38: return (IChannelFactory<TChannel>)(object)new MessageBusDuplexChannelFactory(this, context);
39: }
40: else if (typeof(TChannel) == typeof(IRequestSessionChannel))
41: {
42: return (IChannelFactory<TChannel>)(object)new MessageBusRequestSessionChannelFactory(this, context);
43: }
44: else
45: {
46: throw new ArgumentException(string.Format("Unsupported channel type: {0} for the channel listener.", typeof(TChannel).Name));
47: }
48:
49: }
50:
51: public override IChannelListener<TChannel> BuildChannelListener<TChannel>(BindingContext context)
52: {
53: if (context == null)
54: {
55: throw new ArgumentNullException("context");
56: }
57: if (!CanBuildChannelListener<TChannel>(context))
58: {
59: throw new ArgumentException(string.Format("Unsupported channel type: {0} for the channel listener.", typeof(TChannel).Name));
60: }
61:
62: if (typeof(TChannel) == typeof(IReplyChannel))
63: {
64: return (IChannelListener<TChannel>)(object)new MessageBusReplyChannelListener(this, context);
65: }
66: else if (typeof(TChannel) == typeof(IInputChannel))
67: {
68: return (IChannelListener<TChannel>)(object)new MessageBusInputChannelListener(this, context);
69: }
70: else if (typeof(TChannel) == typeof(IDuplexChannel))
71: {
72: return (IChannelListener<TChannel>)(object)new MessageBusDuplexChannelListener(this, context);
73: }
74: else if (typeof(TChannel) == typeof(IReplySessionChannel))
75: {
76: return (IChannelListener<TChannel>)(object)new MessageBusReplySessionChannelListener(this, context);
77: }
78: else
79: {
80: throw new ArgumentException(string.Format("Unsupported channel type: {0} for the channel listener.", typeof(TChannel).Name));
81: }
82: }
Now we can test our sessionful request reply channel. Since currently our transport extension supports multiple service instances running over the message bus, so there’s no restrict that we must specify the instance context mode to PerSession or Single for sessionful channel. But as the requests may be processed by any service instances we cannot use the local variant of the service class to store the session state values. What we should do now is to retrieve the current session ID and get/set the intermediate value from the session store, maybe it’s a distributed cache.

The image above demonstrates how the session ID will be used when using our new sessionful request reply mode.
- When client channel was opened, a session ID will be generated on the client side. Let’s say the session ID is ABCDE.
- Client invoked the service method Add with the value that it wanted to add. The request will be sent to the service with the session ID (ABCDE) attached.
- Service received this message and take the session ID into its session channel’s session object, so that the service business logic can retrieve this value by using the OperationContext.SessionID.
- Service business code utilize this session ID to find the value from the session store, add the value and set back to the session store.
- Client sent next two request with the same session ID.
- Client request to get the result with the session ID attached.
- Service utilized the session ID to retrieve the value from the session store and reply back.
Below is an in process session store for test purpose.
1: public class InProcSessionStore
2: {
3: #region Singleton
4:
5: private static InProcSessionStore _instance;
6:
7: public static InProcSessionStore Current
8: {
9: get
10: {
11: return _instance;
12: }
13: }
14:
15: static InProcSessionStore()
16: {
17: _instance = new InProcSessionStore();
18: }
19:
20: private InProcSessionStore()
21: {
22: _dic = new ConcurrentDictionary<string, object>();
23: }
24:
25: #endregion
26:
27: private ConcurrentDictionary<string, object> _dic;
28:
29: public void Set(string key, object value)
30: {
31: _dic.AddOrUpdate(key, value, (k, v) => value);
32: }
33:
34: public T Get<T>(string key)
35: {
36: var value = default(T);
37: object result;
38: if (_dic.TryGetValue(key, out result) && result != null && result.GetType() == typeof(T))
39: {
40: value = (T)result;
41: }
42: return value;
43: }
44: }
And below is the new service contract and implementation class. You can see I specified the instance context mode to PerCall. And I use the OperationContext.SessionID to get and set the intermediate value.
1: [ServiceContract(Namespace = "http://wcf.shaunxu.me/", SessionMode= SessionMode.Required)]
2: public interface ISampleService
3: {
4: [OperationContract(IsOneWay = true)]
5: void Add(int value);
6:
7: [OperationContract(IsOneWay = false)]
8: int GetResult();
9: }
10:
11: [ServiceBehavior(InstanceContextMode = InstanceContextMode.PerCall)]
12: public class SampleService : ISampleService
13: {
14: public void Add(int value)
15: {
16: //_current += value;
17: var current = InProcSessionStore.Current.Get<int>(OperationContext.Current.SessionId);
18: current += value;
19: InProcSessionStore.Current.Set(OperationContext.Current.SessionId, current);
20: Console.WriteLine("[{0}] SampleService.Add({1}), Current = {2}, SessionID = {3}", this.GetHashCode(), value, current, OperationContext.Current.SessionId);
21: }
22:
23: public int GetResult()
24: {
25: var current = InProcSessionStore.Current.Get<int>(OperationContext.Current.SessionId);
26: Console.WriteLine("[{0}] SampleService.GetResult(), SessionID = {1}", this.GetHashCode(), OperationContext.Current.SessionId);
27: return current;
28: }
29: }
In the main function I created two services and two clients, each of the client should have its own session ID.
1: static void Main(string[] args)
2: {
3: var bus = new InProcMessageBus();
4: var address = "net.bus://localhost/sample";
5:
6: // establish the services
7: var host1 = EstablishServiceHost<ISampleService, SampleService>(bus, address, SessionfulMode.Distributed);
8: var host2 = EstablishServiceHost<ISampleService, SampleService>(bus, address, SessionfulMode.Distributed);
9:
10: // establish the client
11: var client1 = EstablishClientProxy<ISampleService>(bus, address, SessionfulMode.Distributed);
12: var client2 = EstablishClientProxy<ISampleService>(bus, address, SessionfulMode.Distributed);
13: using (client1 as IDisposable)
14: using (client2 as IDisposable)
15: {
16: client1.Add(1);
17: client2.Add(4);
18:
19: client1.Add(3);
20: client2.Add(5);
21:
22: client1.Add(2);
23: client2.Add(6);
24:
25: var result1 = client1.GetResult();
26: var result2 = client2.GetResult();
27: Console.WriteLine("Client 1 Result: {0}", result1);
28: Console.WriteLine("Client 2 Result: {0}", result2);
29: }
30:
31: // close the service
32: host1.Close();
33: host2.Close();
34:
35: Console.ReadKey();
36: }
From the following result we can see that based on the PerCall instance mode each client request will create a new service instance. But the session ID would be the same within the same client. So that we can use the session ID to get and set values in the session store.

Sessionful Channels: Datagram Mode and Duplex Mode
The sessionful datagram mode would be similar as the sessionful request reply mode, but it inherits from our original input and output channel. Since it is sessionful, we also need some extra works to make it fulfill the WCF session requirement.
At the beginning of this post I described that in WCF session, “Messages delivered during a session are processed in the order in which they are received.”. This means on the server side it cannot receive and process the second request until the first one had been done. This is not a problem when we implemented the sessionful request reply channel, since the request reply mode ensure that the request channel must be waiting for the reply message. So that it would be impossible that the next request was sent before the first reply comes.
But the datagram mode doesn’t have this restriction. The sender (output channel) will be returned without waiting for anything replied from the server. Now we need to add some more procedures to make sure that the sessionful datagram mode follow the session requirement, which means the output channel must be waiting for the input channel’s reply, or I should say, input channel’s acknowledge.
The sessionful output will inherit from our own MessageBusOutputChannel and implement the interface IOutputSessionChannel. In order to make it possible to rewrite the send method we need to mark the Send method as virtual in the base class. So that in the sessionful output channel we can override it. We append the message ID into the output message for receiving purpose and send the message as usual. And then we receive the acknowledge message. This will ensure that the next operation will not be fired in this client channel until the reply comes.
1: public override void Send(Message message, TimeSpan timeout)
2: {
3: // add the message id if not
4: var messageId = new System.Xml.UniqueId();
5: if (message.Headers.MessageId == null)
6: {
7: message.Headers.MessageId = messageId;
8: }
9: // send message with session id
10: var content = GetStringFromWcfMessage(message, RemoteAddress);
11: _bus.SendRequest(content, _session.Id, true, ChannelID, null);
12: // wait for the acknowledge message from the server side
13: _bus.Receive(false, messageId.ToString());
14: }
On the server side in the input channel we also need to override the receive method, so that it can send the acknowledge message back.
1: public override bool EndTryReceive(IAsyncResult result, out Message message)
2: {
3: var ret = base.EndTryReceive(result, out message);
4: // unbox the message id and send the acknowledge message back to the client
5: var messageId = message.Headers.MessageId;
6: _bus.SendReply(string.Empty, _session.Id, false, messageId.ToString());
7: return ret;
8: }
I don’t want to describe the other modification here for example the channel factory, listener and transport binding element. The full code can be found at the end of this post. Let’s quick jump to have a try of the sessionful datagram mode. The service contract and class would be like this.
1: [ServiceContract(Namespace = "http://wcf.shaunxu.me/", SessionMode= SessionMode.Required)]
2: public interface ISampleService
3: {
4: [OperationContract(IsOneWay = true)]
5: void Add(int value);
6:
7: [OperationContract(IsOneWay = true)]
8: void GetResult(string id);
9: }
10:
11: [ServiceBehavior(InstanceContextMode = InstanceContextMode.PerCall)]
12: public class SampleService : ISampleService
13: {
14: public void Add(int value)
15: {
16: //_current += value;
17: var current = InProcSessionStore.Current.Get<int>(OperationContext.Current.SessionId);
18: current += value;
19: InProcSessionStore.Current.Set(OperationContext.Current.SessionId, current);
20: Console.WriteLine("[{0}] SampleService.Add({1}), Current = {2}, SessionID = {3}", this.GetHashCode(), value, current, OperationContext.Current.SessionId);
21: }
22:
23: public void GetResult(string id)
24: {
25: var current = InProcSessionStore.Current.Get<int>(OperationContext.Current.SessionId);
26: Console.WriteLine("[{0}] SampleService.GetResult(), SessionID = {1}", this.GetHashCode(), OperationContext.Current.SessionId);
27: InProcSessionStore.Current.Set(id, current);
28: }
29: }
And the main function would be changed as well.
1: static void Main(string[] args)
2: {
3: var bus = new InProcMessageBus();
4: var address = "net.bus://localhost/sample";
5:
6: // establish the services
7: var host1 = EstablishServiceHost<ISampleService, SampleService>(bus, address, SessionfulMode.Distributed);
8: var host2 = EstablishServiceHost<ISampleService, SampleService>(bus, address, SessionfulMode.Distributed);
9:
10: // establish the client
11: var client1 = EstablishClientProxy<ISampleService>(bus, address, SessionfulMode.Distributed);
12: var client2 = EstablishClientProxy<ISampleService>(bus, address, SessionfulMode.Distributed);
13: using (client1 as IDisposable)
14: using (client2 as IDisposable)
15: {
16: client1.Add(1);
17: client2.Add(4);
18:
19: client1.Add(3);
20: client2.Add(5);
21:
22: client1.Add(2);
23: client2.Add(6);
24:
25: client1.GetResult("1");
26: client2.GetResult("2");
27: Console.WriteLine("Client 1 Result: {0}", InProcSessionStore.Current.Get<int>("1"));
28: Console.WriteLine("Client 2 Result: {0}", InProcSessionStore.Current.Get<int>("2"));
29: }
30:
31: // close the service
32: host1.Close();
33: host2.Close();
34:
35: Console.ReadKey();
36: }
Since in the datagram mode I cannot return any value from the server side so in the GetResult method I copy the current value into the session store so that in the main function I can get the result from there. The execution result would be like this. As you can see the session ID was maintained during the whole message conversation.

The last one would be the duplex channel. There’s no special modification so the sessionful channel would be like this.
1: public class MessageBusDuplexSessionChannel : MessageBusDuplexChannel, IDuplexSessionChannel
2: {
3: private IDuplexSession _session;
4:
5: public IDuplexSession Session
6: {
7: get
8: {
9: return _session;
10: }
11: }
12:
13: public MessageBusDuplexSessionChannel(
14: BufferManager bufferManager, MessageEncoderFactory encoder, ChannelManagerBase parent,
15: EndpointAddress remoteAddress, Uri via,
16: IBus bus,
17: bool isClient)
18: : base(bufferManager, encoder, remoteAddress, parent, via, bus, isClient)
19: {
20: _session = new MessageBusDuplexSession((new UniqueId()).ToString());
21: }
22:
23: protected override void OnAfterTryReceive(BusMessage message)
24: {
25: _session = new MessageBusDuplexSession(message.SessionID);
26: }
27:
28: protected override void OnBeforeSend(ref string sessionId)
29: {
30: sessionId = _session.Id;
31: }
32: }
Summary
In this post I described the basis of the WCF session and how different it is to the ASP.NET session. WCF session is more general than ASP.NET session. It doesn’t care about how session ID should be passed, it doesn’t care how we should handle the session state data. It only ensure that the messages within a session should have the same session ID, at least on one side of the communication.
It’s not mandatory that the session ID must be the same on both server side and client side. It only needs that in one side the session ID should be the same. Hence we can have our transport extension that in one session there’s a session ID on server side, while another ID on client side.
And then we discussed what WCF itself works for session and how it leverage the session mode, instance context mode to make the developer utilize the service class local variant to store the intermediate data. And we also discussed why this is not suitable for our scaling-out requirement.
Then we implemented our own sessionful channels. Developer can use the OperationContext.SessionID as the key to get and set the intermediate data from the session store, so that it could be scaling-out across the service instances.
Till now I can say that we have done all things for WCF transport extension. We have our own binding and transport binding element. We support all three WCF MEP and the additional sessionful MEPs on top our transport, with the scalability on both server and client instances. But I have to note that, the code I mentioned and attached at the end of each posts are just for research purpose. Do NOT use it directly in your production.
For now you will notice that all our testing are based on the in process message bus, which is cannot be used in the real production at all. In the next post I’m going to use some real message bus, by implementing their own IBus classes.
The source code can be download here.
Hope this helps,
Shaun
All documents and related graphics, codes are provided "AS IS" without warranty of any kind.
Copyright © Shaun Ziyan Xu. This work is licensed under the Creative Commons License.
In our last post I demonstrated how to implement the datagram channel shape, and in the second and third post I described the request reply shape. In this post I will explain the last MEP in WCF, duplex, which is the most complex one.
Basis of the Duplex Channel Shape (MEP)
In the MSDN document it said that “The duplex MEP allows an arbitrary number of messages to be sent by a client and received in any order. The duplex MEP is like a phone conversation, where each word being spoken is a message. Because both sides can send and receive in this MEP, the interface implemented by the client and service channels is IDuplexChannel.”.
It would be very easy to understand if we have one server instance and one client instance. You can assume that when they are using duplex mode, the client and the server can call each other freely. When the server calling the client, it will grab the client contract and invoke by using a proxy class, which is very similar as what we did to call a service from a client through the ChannelFactory<T>. In fact in duplex mode the client also play as a service role, which means when server invoked a client callback, the server will be a client and the client will be a server.

The above figure outlined how a duplex communication works. At the beginning the client send a request to the server. And during the server side business logic processed, it invoked the client side contract and got the result. These two client side invoking we normally call them “callback”. They can be invoked to the same client side method or different. And finally the server finished the business logic and send the reply back.
This figure only described the scenario that all communication are request-reply mode, which means client request and server callback are all need the reply. In duplex mode we can also using the datagram mode.The server can invoke a datagram callback to the client, while the client can invoke a datagram service method as well.
But the WCF will always use the duplex channel to send and receive message if we defined the callback contract in the service contract, even though there’s no callback invoked in the service implementation.
From the explanation above we might feel that the duplex channel may have a concept of the connection. When the client send the first request it somehow established a “connection” with the server. And the callback will use this “connection” to communicate with this client and send the request, reply back and forth. In the build-in WCF transports it does utilizes the connection to implement the duplex channel. For example, in NET.TCP the WCF use the TCP connection as the duplex connection. In WsDualHttp it would be a little bit complexity. Since the HTTP protocol is connection-less, it implements the duplex mode by introducing another HTTP address from the client to the server to handle the callback communication.

But in our case we only have a message bus which serves multiple service instances and clients. In order to make our duplex channel scalable as much as possible, but also need the ensure that once a client send the original request and received by a server instance, all messages must be between them, the server callback should not be handled by another client.

Duplex Channel
After clarified the duplex channel basis and the goal in our message bus and scaling-out case let’s have a look on the WCF duplex channel itself.
Different from the channel shapes we introduced before, the duplex channel will be created and used on both server and client side. If you are dig into the definition of the IDuplexChannel you will find that there’s no extra member for it but just inherited from the IInputChannel and IOutputChannel. When the duplex channel was established, no mater from the server side ChannelListener or the client side ChannelFactory, it will try to receive the message by its BeginTryReceive method. And no matter the client side request or callback reply, or the server side callback request or reply, the duplex channel will always use its Send method to send the message. So this makes our implementation a little bit complexity.
In fact you can have a duplex channel class for server while another for client. But I prefer to use one channel class to support both server and client side since the main procedure are all same.
On the server side, the channel needs to receive two kind of messages.
- Original client request message. This kind of message can be received by any server instance that listening on the same endpoint (message bus queue).
- Callback reply. The server can only receive the callback reply message which requested by itself.
It would be very easy to listen the first kind of message but the second one would be annoying. It’s similar as what we have done in request-reply mode. In request reply mode the client should only receive the reply which the request was sent by itself. But in the request-reply the client will send the request message and then wait for the reply. In that case it’s possible for us to know the request message ID, and use this ID to pick the related reply message. But in duplex mode, as I have said before, both on client and server side it will firstly try to receive message then send the message if needed. That means we must have some mechanism to let the channel know the identity of the request message BEFORE it was sent.

In duplex mode the restrict is that, a channel must receive the reply message which the original request was sent by itself. Channel cannot receive the reply which request by another channel. Hence we just need to find a way to identity if the message was related with the request that sent from the same channel. After figured out the key point the solution should be simple. If you remembered in the last post when I refactoring the channel base class there’s a property named ChannelID, which is a GUID assigned automatically when a channel was initialized. This is the identity we will use to check if the message should be received by this channel.
- When a duplex channel was created and began to receive message, it should try to receive the message which is a request (client request or callback request), and the reply message which have the property saying that it’s only for this channel.
- After the channel received the message it will unbox the SOAP message ID and the ChannelID where it came from and save into a dictionary.
- When the channel sent a reply message, it will find the message ID from the reply message’s RelatedTo field and get the relevant ChannelID, and send the message with this ChannelID to indicate that only this channel can receive this message.
- When the channel sent a callback request, it will find the original request message ID from the OperationContext.RequestContext.RequestMessge and get the relevant ChannelID, and send the message with this ID to indicate that only this client can receive the callback request and perform the client side logic.

The figure above demonstrated what will happened for a duplex communication with one callback request in our solution.
- The client and server channel has their own channel ID.
- Client and server start to receive the incoming messages. On the client side it only receive the message that send to its channel ID. On the server side it can receive messages from any channels as well as the message that only to it.
- Client send the request message with the channel ID where it comes from, but no target channel ID specified. This message can be received by any service instance channels.
- A service channel received this message and saved the message ID and the from channel ID.
- Service channel start to try to receive message again.
- After processed some business code it began to send the callback request message back to the client. From the OperationContext.RequestContext it retrieved the original request message ID and find the channel ID it came from, and append this channel ID into the callback request message. Then this message can only be received by this channel. It also append the channel ID it is.
- Only this client received this callback request since the message have the “To ChannelID = 1”. It saved the message ID and the from channel ID.
- Client channel start to try to receive message again.
- After processed some business code it began to send the callback reply message back to the service. From the OperationContext.RequestContext it retrieved the original callback request message ID and find the channel ID it came from, and append this channel ID into the callback reply message. Then this message can only be received by this channel.
- Only this service received this reply message since its channel ID equals the “To ChannelID” in the reply message.
- Service channel start to try to receive message again.
- After processed the business code it began to send the reply message back to the client. From the OperationContext.RequestContext it retrieved the original request message ID and find the channel ID it came from, and append this channel ID into the reply message. Then this message can only be received by this channel.
- Only this client received this reply message since the message have the “To ChannelID = 1”.
- Client channel start to try to receive message again.
- Finished the duplex invoke.
Implement the Duplex Channel, Channel Factory and Channel Listener
After clarified the solution the implementation would be straight forward. Since I’m going to using one duplex channel class for both server and client, I need a local variant to indicate if it’s running on the server side or client side. I also need to save the server side address and a dictionary to save the relationship between the message ID and the channel ID.
Since our channel would be executed in multi-thread mode we should use the ConcurrentDictionary in .NET 4 System.Collections.Concurrent namespace to store the relationship of the channel ID and message ID.
And when implementing the receive message delegate we will specify the channel ID in the parameter, so that it will receive the message that has no “To Channel ID” specified, or specified only to this channel.
After it received a message we will check if it has the message ID field. If yes this means this message is a request message (client request or server callback request). Then we need to save the message ID and the “From Channel ID” into the local dictionary, so that when sending the reply message we can find which channel ID it should send to.
And also some logic of the local address and remote address properties etc. as well.
1: private readonly IBus _bus;
2: private readonly Uri _via;
3: private readonly EndpointAddress _serverAddress;
4: private readonly ConcurrentDictionary<UniqueId, string> _replyTos;
5: private readonly bool _isClient;
6:
7: private delegate bool TryReceiveDelegate(TimeSpan timeout, out Message message);
8: private readonly TryReceiveDelegate _tryReceiveDelegate;
9:
10: public MessageBusDuplexChannel(
11: BufferManager bufferManager, MessageEncoderFactory encoder, EndpointAddress remoteAddress,
12: ChannelManagerBase parent, Uri via,
13: IBus bus, bool isClient)
14: : base(bufferManager, encoder, parent)
15: {
16: _serverAddress = remoteAddress;
17: _via = via;
18: _bus = bus;
19: _isClient = isClient;
20: _replyTos = new ConcurrentDictionary<UniqueId, string>();
21:
22: _tryReceiveDelegate = (TimeSpan timeout, out Message message) =>
23: {
24: message = null;
25: try
26: {
27: // listen the message bus based on the sticky mode:
28: // channel: only receive the message that reply to this channel's id
29: // scaling gourp: receive the message the reply to this channel's id and the scaling group id of this channel
30: var requestMessage = _bus.Receive(!_isClient, ChannelID);
31: if (requestMessage != null)
32: {
33: message = GetWcfMessageFromString(requestMessage.Content);
34: if (message.Headers.MessageId != null)
35: {
36: _replyTos.AddOrUpdate(message.Headers.MessageId, requestMessage.From, (key, value) => requestMessage.From);
37: }
38: }
39: }
40: catch (Exception ex)
41: {
42: throw new CommunicationException(ex.Message, ex);
43: }
44: return true;
45: };
46: }
47:
48: public EndpointAddress LocalAddress
49: {
50: get
51: {
52: if (_isClient)
53: {
54: return new EndpointAddress(EndpointAddress.AnonymousUri);
55: }
56: else
57: {
58: return _serverAddress;
59: }
60: }
61: }
62:
63: public EndpointAddress RemoteAddress
64: {
65: get
66: {
67: if (_isClient)
68: {
69: return _serverAddress;
70: }
71: else
72: {
73: return new EndpointAddress(EndpointAddress.AnonymousUri);
74: }
75: }
76: }
77:
78: public Uri Via
79: {
80: get
81: {
82: return _via;
83: }
84: }
85:
86: public IAsyncResult BeginTryReceive(TimeSpan timeout, AsyncCallback callback, object state)
87: {
88: Message message;
89: return _tryReceiveDelegate.BeginInvoke(timeout, out message, callback, state);
90: }
91:
92: public bool EndTryReceive(IAsyncResult result, out Message message)
93: {
94: var ret = _tryReceiveDelegate.EndInvoke(out message, result);
95: return ret;
96: }
Next, let’s implement the send procedure. The send method will be used when send request message and reply message, which might be the client side request, server side callback request, client side callback reply and server side reply messages.
If the message has the RelatedTo field this means it’s a reply message, and the value of the RelatedTo is the message ID of the request. Then we will find the channel ID from the dictionary and appended to the message so that only this channel can receive this reply.
If there’s no RelatedTo field this means it’s a request message. On the client side we don’t need to do anything but send the message into the bus with the channel ID append, so that any servers can grab which channel it came. If on the server side, this means it’s a callback request which must be sent to the client channel that fired the original request. As we know we can get the original request from the OperationContext so we can find the original channel ID from the original request message ID from our dictionary.
1: public void Send(Message message, TimeSpan timeout)
2: {
3: if (message.Headers.RelatesTo != null)
4: {
5: // when relatesTo is not null it means this is a response message which must be send to the request channel
6: // and after sent out the original request had been finished so we don't need to store the original message any more
7: // hence we will remove and retrieve the original message id and append to the bus message and send out
8: var replyTo = string.Empty;
9: _replyTos.TryRemove(message.Headers.RelatesTo, out replyTo);
10: if (!string.IsNullOrWhiteSpace(replyTo))
11: {
12: var content = GetStringFromWcfMessage(message, RemoteAddress);
13: _bus.SendReply(content, _isClient, replyTo);
14: }
15: else
16: {
17: throw new CommunicationException(string.Format("Cannot find the ReplyTo valid for the message related to {0}.", message.Headers.RelatesTo));
18: }
19: }
20: else
21: {
22: // on the server side, when performing the callback request we will firstly retrieve the original request message id,
23: // then find the related client channel id. so that we can send the callback request back to the same client channel.
24: var sendTo = string.Empty;
25: if (!_isClient &&
26: OperationContext.Current != null &&
27: OperationContext.Current.RequestContext != null &&
28: OperationContext.Current.RequestContext.RequestMessage != null &&
29: OperationContext.Current.RequestContext.RequestMessage.Headers.MessageId != null)
30: {
31: var requestMessageId = OperationContext.Current.RequestContext.RequestMessage.Headers.MessageId;
32: _replyTos.TryGetValue(requestMessageId, out sendTo);
33: }
34: var content = GetStringFromWcfMessage(message, RemoteAddress);
35: _bus.SendRequest(content, _isClient, ChannelID, sendTo);
36: }
37: }
To finalize the implementation, just create the channel factory, channel listener and update the transport binding element to make it support duplex mode.
1: public class MessageBusDuplexChannelFactory : MessageBusChannelFactoryBase<IDuplexChannel>
2: {
3: public MessageBusDuplexChannelFactory(MessageBusTransportBindingElement transportElement, BindingContext context)
4: : base(transportElement, context)
5: {
6: }
7:
8: protected override IDuplexChannel CreateChannel(
9: BufferManager bufferManager, MessageEncoderFactory encoder, EndpointAddress remoteAddress,
10: MessageBusChannelFactoryBase<IDuplexChannel> parent,
11: Uri via,
12: IBus bus)
13: {
14: return new MessageBusDuplexChannel(bufferManager, encoder, remoteAddress, parent, via, bus, true);
15: }
16: }
1: public class MessageBusDuplexChannelListener : MessageBusChannelListenerBase<IDuplexChannel>
2: {
3: public MessageBusDuplexChannelListener(MessageBusTransportBindingElement transportElement, BindingContext context)
4: : base(transportElement, context)
5: {
6: }
7:
8: protected override IDuplexChannel CreateChannel(
9: BufferManager bufferManager, MessageEncoderFactory encoder, EndpointAddress localAddress,
10: MessageBusChannelListenerBase<IDuplexChannel> parent,
11: IBus bus)
12: {
13: return new MessageBusDuplexChannel(bufferManager, encoder, localAddress, parent, null, bus, false);
14: }
15: }
1: public override bool CanBuildChannelFactory<TChannel>(BindingContext context)
2: {
3: return typeof(TChannel) == typeof(IRequestChannel) ||
4: typeof(TChannel) == typeof(IOutputChannel) ||
5: typeof(TChannel) == typeof(IDuplexChannel);
6: }
7:
8: public override bool CanBuildChannelListener<TChannel>(BindingContext context)
9: {
10: return typeof(TChannel) == typeof(IReplyChannel) ||
11: typeof(TChannel) == typeof(IInputChannel) ||
12: typeof(TChannel) == typeof(IDuplexChannel);
13: }
14:
15: public override IChannelFactory<TChannel> BuildChannelFactory<TChannel>(BindingContext context)
16: {
17: if (context == null)
18: {
19: throw new ArgumentNullException("context");
20: }
21: if (!CanBuildChannelFactory<TChannel>(context))
22: {
23: throw new ArgumentException(string.Format("Unsupported channel type: {0} for the channel factory.", typeof(TChannel).Name));
24: }
25:
26: if (typeof(TChannel) == typeof(IRequestChannel))
27: {
28: return (IChannelFactory<TChannel>)(object)new MessageBusRequestChannelFactory(this, context);
29: }
30: else if (typeof(TChannel) == typeof(IOutputChannel))
31: {
32: return (IChannelFactory<TChannel>)(object)new MessageBusOutputChannelFactory(this, context);
33: }
34: else if (typeof(TChannel) == typeof(IDuplexChannel))
35: {
36: return (IChannelFactory<TChannel>)(object)new MessageBusDuplexChannelFactory(this, context);
37: }
38: else
39: {
40: throw new ArgumentException(string.Format("Unsupported channel type: {0} for the channel listener.", typeof(TChannel).Name));
41: }
42:
43: }
44:
45: public override IChannelListener<TChannel> BuildChannelListener<TChannel>(BindingContext context)
46: {
47: if (context == null)
48: {
49: throw new ArgumentNullException("context");
50: }
51: if (!CanBuildChannelListener<TChannel>(context))
52: {
53: throw new ArgumentException(string.Format("Unsupported channel type: {0} for the channel listener.", typeof(TChannel).Name));
54: }
55:
56: if (typeof(TChannel) == typeof(IReplyChannel))
57: {
58: return (IChannelListener<TChannel>)(object)new MessageBusReplyChannelListener(this, context);
59: }
60: else if (typeof(TChannel) == typeof(IInputChannel))
61: {
62: return (IChannelListener<TChannel>)(object)new MessageBusInputChannelListener(this, context);
63: }
64: else if (typeof(TChannel) == typeof(IDuplexChannel))
65: {
66: return (IChannelListener<TChannel>)(object)new MessageBusDuplexChannelListener(this, context);
67: }
68: else
69: {
70: throw new ArgumentException(string.Format("Unsupported channel type: {0} for the channel listener.", typeof(TChannel).Name));
71: }
72: }
Test Our Duplex Channel
Update our test console application to verify the duplex channel works. First we need to define and implement a service which contains the duplex callback. The service contract and the client side callback contract would be like hits.
1: [ServiceContract(Namespace = "http://wcf.shaunxu.me/", CallbackContract = typeof(ISampleCallback))]
2: public interface ISampleService
3: {
4: [OperationContract]
5: string Reverse(string content);
6: }
7:
8: [ServiceContract(Namespace = "http://wcf.shaunxu.me/")]
9: public interface ISampleCallback
10: {
11: [OperationContract]
12: string ToUpper(string content);
13:
14: [OperationContract]
15: string AddSpaces(string content);
16: }
The service only has one method to reverse an input string. The client callback contract has two methods. One is to make the input string upper, the other is to add space between each of the chars of the string. When implementation, the service method will invoke these two callback methods one by one.
You will see that when the service and client method was invoked I printed the hash code of current instance and the result, to demonstrate the duplex calling flow.
1: [ServiceBehavior(ConcurrencyMode = ConcurrencyMode.Multiple)]
2: public class SampleService : ISampleService
3: {
4: public string Reverse(string content)
5: {
6: var callback = OperationContext.Current.GetCallbackChannel<ISampleCallback>();
7:
8: var result1 = new string(content.Reverse().ToArray());
9: Console.WriteLine("Service {0}: Reverse {1} => {2}", OperationContext.Current.Host.GetHashCode(), content, result1);
10:
11: var result2 = callback.ToUpper(result1);
12: Console.WriteLine("Service {0}: Callback.ToUpper {1} => {2}", OperationContext.Current.Host.GetHashCode(), result1, result2);
13: var result3 = callback.AddSpaces(result2);
14: Console.WriteLine("Service {0}: Callback.AddSpaces {1} => {2}", OperationContext.Current.Host.GetHashCode(), result2, result3);
15:
16: return result3;
17: }
18: }
19:
20: public class SampleCallback : ISampleCallback
21: {
22: public string ToUpper(string content)
23: {
24: var result = content.ToUpper();
25: Console.WriteLine("Client {0}: ToUpper {1} => {2}", this.GetHashCode(), content, result);
26: return result;
27: }
28:
29: public string AddSpaces(string content)
30: {
31: var result = string.Join(" ", content.Select(c => new string(c, 1)));
32: Console.WriteLine("Client {0}: AddSpaces {1} => {2}", this.GetHashCode(), content, result);
33: return result;
34: }
35: }
Make sure on the service implementation class you added the ServiceBehavior attribute and set the concurrency mode to multiple or reentrant, otherwise the application will be failed when executing.
Add another helper method to make it easy to create a duplex channel factory and proxy.
1: static TChannel EstablishDuplexClientProxy<TChannel, TCallback>(IBus bus, string address) where TCallback : new()
2: {
3: var binding = new MessageBusTransportBinding(bus);
4: var callbackInstance = new InstanceContext(new TCallback());
5: var factory = new DuplexChannelFactory<TChannel>(callbackInstance, binding, address);
6: factory.Opened += (sender, e) =>
7: {
8: Console.WriteLine("Client connected to {0}", factory.Endpoint.ListenUri);
9: };
10: var proxy = factory.CreateChannel();
11: return proxy;
12: }
Finally in the main function we will create some service instances and client proxies, then let the user select a client to send request to the services. In this scaling-out mode all services could be able to pick this request and process the service logic, but within its method all client callback should be received by this client.
1: static void Main(string[] args)
2: {
3: var bus = new InProcMessageBus();
4: var address = "net.bus://localhost/sample";
5:
6: // establish the services
7: var host1 = EstablishServiceHost<ISampleService, SampleService>(bus, address);
8: var host2 = EstablishServiceHost<ISampleService, SampleService>(bus, address);
9: var host3 = EstablishServiceHost<ISampleService, SampleService>(bus, address);
10:
11: // establish the clients
12: var clients = new List<ISampleService>()
13: {
14: EstablishDuplexClientProxy<ISampleService, SampleCallback>(bus, address),
15: EstablishDuplexClientProxy<ISampleService, SampleCallback>(bus, address),
16: EstablishDuplexClientProxy<ISampleService, SampleCallback>(bus, address)
17: };
18:
19: // invoke the service
20: Console.WriteLine("Which client do you want to use? (1|2|3, 0 to exit)");
21: var idx = int.Parse(Console.ReadLine()) - 1;
22: while (idx >= 0 && idx <= clients.Count - 1)
23: {
24: var proxy = clients[idx];
25: Console.WriteLine("Client ({0}): Say something...", proxy.GetHashCode());
26: var content = Console.ReadLine();
27: var result = proxy.Reverse(content);
28: Console.WriteLine("Client ({0}): {1} => {2}", proxy.GetHashCode(), content, result);
29:
30: Console.WriteLine("Which client do you want to use? (1|2|3, 0 to exit)");
31: idx = int.Parse(Console.ReadLine()) - 1;
32: }
33:
34: clients.All((cli) =>
35: {
36: (cli as ICommunicationObject).Close();
37: return true;
38: });
39: }
Let’s start the application and have a try.

As you can see, at first the client sent the request to the service (66622070), and it fired the client callback twice, which all received to the same client callback instance (21647132). Then the second request was picked by another service (20876819) and the callbacks were all went to the same client (6451435).
Summary
The duplex mode would be the most complex one in three of the WCF MEPs. The duplex channel allows the service and the client to be invoked freely and unlimited once the channel had been established. In our case it becomes more complexity. If the think about the scaling-out mode the service might have more than one instances, so in fact we have a N:N channel shape.
In our solution we utilize the ChannelID as the identifier to ensure the two rules of duplex mode:
- The reply message must be received by the channel who sent the related request message
- The callback request must be received by the client who fired the original request message.
The first rule is mandatory, but the second one is optional. In fact it’s not necessary that only the client who fired the duplex request can receive the callback request from the server. All clients if they implemented the callback contract, should be able to handle the callback request. You can try to implement if you are interested.
Now we had finished all the WCF MEPs in our transport extension. Here I can say we have almost done everything. We can scaling-out our service instances on top of our message bus which supports the datagram, request-reply and duplex channel mode.
In the next post I would describe how to handle the session in our transport extension and to see how different between the WCF session and ASP.NET session.
You can download the source code here.
Hope this helps,
Shaun
All documents and related graphics, codes are provided "AS IS" without warranty of any kind.
Copyright © Shaun Ziyan Xu. This work is licensed under the Creative Commons License.
In the previous posts we talked about the transport extension that scaling-out our WCF services over a message bus transportation. What we have done is to use request reply MEP as a example, and implement the asynchronous methods that makes our transport support the normal WCF usage – ServiceHost and ChannelFactory on service hosting and client invoking.
Request reply MEP is the most common mode when we are using WCF, but there are two MEPs available in WCF as well: datagram and duplex. In this post I will demonstrate how to implement the datagram MEP in our solution, with some code refectoring.
Datagram MEP
Datagram MEP, also known as the one-way, request-forget or fire-forget MEP. Different from the request reply, in this mode the client will send the request to the server and no need to wait for the reply. This means, on one hand, the client will not be blocked by the server reply. On the other hand, the client will neither know if the server received the message, nor the service works properly. This MEP is widely used in scenario like logging, auditing, etc..

On the client side, the OutputChannel will send the request message to the underlying transportation, while on the server side, the InputChannel will receive them and dispatched by WCF to the service class and related method.
The OutputChannel just sends the message to the message bus, in our case, and will not wait for any response. It will return back immediately. And the InputChannel on the server will receive the message and process the business logic without sending back any reply.
Hence in order to implement the datagram MEP what we need to do is,
- Client side, implement a ChannelFactory that returns an OutputChannel, and implement the OutputChannel which can send message to the bus.
- Server side, implement a ChannelListener that returns an InputChannel, and implement the InputChannel which can receive message from the bus.
Since we had created a ChannelFactory and ChannelListener for the request-reply mode, and the channels works well with our message bus before, then we will do some code refactoring first.
ChannelBase, ChannelFactoryBase and ChannelListenerBase
We will have three base classes that covers some common procedures for channels, channel factories and channel listeners. The channel base class will just receive the buffer manager, encoder factory and provide them as a protected readonly properties. We will also have a property named ChannelID, which will be used in the duplex MEP in the next post.
In the request channel and reply channel we have a lot of code that convert the WCF message into string and vice versa, which using the buffer manager and encoder. Here we will create two methods for them so that in the future the channels can convert between string and message by just invoking them.
1: public abstract class MessageBusChannelBase : ChannelBase
2: {
3: private const int CST_MAXBUFFERSIZE = 64 * 1024;
4:
5: private readonly Guid _id;
6: private readonly BufferManager _bufferManager;
7: private readonly MessageEncoder _encoder;
8:
9: protected BufferManager BufferManager
10: {
11: get
12: {
13: return _bufferManager;
14: }
15: }
16:
17: protected MessageEncoder Encoder
18: {
19: get
20: {
21: return _encoder;
22: }
23: }
24:
25: public string ChannelID
26: {
27: get
28: {
29: return _id.ToString();
30: }
31: }
32:
33: protected MessageBusChannelBase(BufferManager bufferManager, MessageEncoderFactory encoder, ChannelManagerBase parent)
34: : base(parent)
35: {
36: _id = Guid.NewGuid();
37:
38: _bufferManager = bufferManager;
39: _encoder = encoder.CreateSessionEncoder();
40: }
41:
42: internal Message GetWcfMessageFromString(string content)
43: {
44: var raw = Encoding.UTF8.GetBytes(content);
45: var data = _bufferManager.TakeBuffer(raw.Length);
46: Buffer.BlockCopy(raw, 0, data, 0, raw.Length);
47: var buffer = new ArraySegment<byte>(data, 0, raw.Length);
48: var message = _encoder.ReadMessage(buffer, _bufferManager);
49: return message;
50: }
51:
52: internal string GetStringFromWcfMessage(Message message, EndpointAddress to)
53: {
54: ArraySegment<byte> buffer;
55: string content;
56: using (message)
57: {
58: to.ApplyTo(message);
59: buffer = _encoder.WriteMessage(message, CST_MAXBUFFERSIZE, _bufferManager);
60: }
61: content = Encoding.UTF8.GetString(buffer.Array, buffer.Offset, buffer.Count);
62: _bufferManager.ReturnBuffer(buffer.Array);
63: return content;
64: }
65:
66: protected override void OnAbort()
67: {
68: }
69:
70: protected override void OnClose(TimeSpan timeout)
71: {
72: }
73:
74: protected override void OnOpen(TimeSpan timeout)
75: {
76: }
77:
78: #region Not Implemented
79:
80: protected override IAsyncResult OnBeginClose(TimeSpan timeout, AsyncCallback callback, object state)
81: {
82: throw new NotImplementedException();
83: }
84:
85: protected override IAsyncResult OnBeginOpen(TimeSpan timeout, AsyncCallback callback, object state)
86: {
87: throw new NotImplementedException();
88: }
89:
90: protected override void OnEndClose(IAsyncResult result)
91: {
92: throw new NotImplementedException();
93: }
94:
95: protected override void OnEndOpen(IAsyncResult result)
96: {
97: throw new NotImplementedException();
98: }
99:
100: #endregion
101: }
The main responsibility of the channel factory and channel listener is to create a related channel object. Each factories and listeners will has very similar logic, except which type of the channel it will create. So we can have two base classes to wrap almost all complexity and just let the sub classes to take the responsible to create the actual channel object.
1: public abstract class MessageBusChannelFactoryBase<TChannel> : ChannelFactoryBase<TChannel> where TChannel : class, IChannel
2: {
3: private readonly MessageBusTransportBindingElement _transportElement;
4: private readonly BufferManager _bufferManager;
5: private readonly MessageEncoderFactory _encoder;
6:
7: private readonly IBus _bus;
8:
9: public IBus Bus
10: {
11: get
12: {
13: return _bus;
14: }
15: }
16:
17: protected MessageBusTransportBindingElement TransportElement
18: {
19: get
20: {
21: return _transportElement;
22: }
23: }
24:
25: protected MessageBusChannelFactoryBase(MessageBusTransportBindingElement transportElement, BindingContext context)
26: : base(context.Binding)
27: {
28: _transportElement = transportElement;
29: _bufferManager = BufferManager.CreateBufferManager(transportElement.MaxBufferPoolSize, int.MaxValue);
30: var encodingElement = context.Binding.Elements.Find<MessageEncodingBindingElement>();
31: if (encodingElement == null)
32: {
33: _encoder = (new TextMessageEncodingBindingElement()).CreateMessageEncoderFactory();
34: }
35: else
36: {
37: _encoder = encodingElement.CreateMessageEncoderFactory();
38: }
39: _bus = transportElement.Bus;
40: }
41:
42: protected abstract TChannel CreateChannel(
43: BufferManager bufferManager, MessageEncoderFactory encoder, EndpointAddress remoteAddress,
44: MessageBusChannelFactoryBase<TChannel> parent,
45: Uri via,
46: IBus bus);
47:
48: protected override void OnOpen(TimeSpan timeout)
49: {
50: }
51:
52: protected override void OnClosed()
53: {
54: base.OnClosed();
55:
56: _bufferManager.Clear();
57: _bus.Dispose();
58: }
59:
60: protected override TChannel OnCreateChannel(System.ServiceModel.EndpointAddress address, Uri via)
61: {
62: return CreateChannel(_bufferManager, _encoder, address, this, via, _bus);
63: }
64:
65: protected override IAsyncResult OnBeginOpen(TimeSpan timeout, AsyncCallback callback, object state)
66: {
67: throw new NotImplementedException();
68: }
69:
70: protected override void OnEndOpen(IAsyncResult result)
71: {
72: throw new NotImplementedException();
73: }
74: }
1: public abstract class MessageBusChannelListenerBase<TChannel> : ChannelListenerBase<TChannel> where TChannel : class, IChannel
2: {
3: private readonly MessageBusTransportBindingElement _transportElement;
4:
5: private readonly BufferManager _bufferManager;
6: private readonly MessageEncoderFactory _encoder;
7: private readonly string _scheme;
8: private readonly Uri _uri;
9:
10: private readonly IBus _bus;
11:
12: private readonly InputQueue<TChannel> _channelQueue;
13: private readonly object _currentChannelLock;
14:
15: private TChannel _currentChannel;
16:
17: public IBus Bus
18: {
19: get
20: {
21: return _bus;
22: }
23: }
24:
25: public override Uri Uri
26: {
27: get
28: {
29: return _uri;
30: }
31: }
32:
33: public string Scheme
34: {
35: get
36: {
37: return _scheme;
38: }
39: }
40:
41: protected MessageBusTransportBindingElement TransportElement
42: {
43: get
44: {
45: return _transportElement;
46: }
47: }
48:
49: protected MessageBusChannelListenerBase(MessageBusTransportBindingElement transportElement, BindingContext context)
50: : base(context.Binding)
51: {
52: _transportElement = transportElement;
53: _bufferManager = BufferManager.CreateBufferManager(transportElement.MaxBufferPoolSize, int.MaxValue);
54: var encodingElement = context.Binding.Elements.Find<MessageEncodingBindingElement>();
55: if (encodingElement == null)
56: {
57: _encoder = (new TextMessageEncodingBindingElement()).CreateMessageEncoderFactory();
58: }
59: else
60: {
61: _encoder = encodingElement.CreateMessageEncoderFactory();
62: }
63: _scheme = transportElement.Scheme;
64: _uri = new Uri(context.ListenUriBaseAddress, context.ListenUriRelativeAddress);
65:
66: _bus = transportElement.Bus;
67:
68: _channelQueue = new InputQueue<TChannel>();
69: _currentChannelLock = new object();
70: _currentChannel = null;
71: }
72:
73: protected override void OnAbort()
74: {
75: try
76: {
77: lock (ThisLock)
78: {
79: _channelQueue.Close();
80: }
81: }
82: catch { }
83: }
84:
85: protected override void OnClose(TimeSpan timeout)
86: {
87: try
88: {
89: lock (ThisLock)
90: {
91: _channelQueue.Close();
92: }
93: }
94: catch { }
95: }
96:
97: protected override void OnClosed()
98: {
99: base.OnClosed();
100:
101: try
102: {
103: _bufferManager.Clear();
104: _bus.Dispose();
105: }
106: catch { }
107: }
108:
109: private void EnsureChannelAvailable()
110: {
111: TChannel newChannel = null;
112: bool channelCreated = false;
113:
114: if ((newChannel = _currentChannel) == null)
115: {
116: lock (_currentChannelLock)
117: {
118: if ((newChannel = _currentChannel) == null)
119: {
120: newChannel = CreateChannel(_bufferManager, _encoder, new EndpointAddress(_uri), this, _bus);
121: newChannel.Closed += new EventHandler(OnChannelClosed);
122: _currentChannel = newChannel;
123: channelCreated = true;
124: }
125: }
126: }
127:
128: if (channelCreated)
129: {
130: _channelQueue.EnqueueAndDispatch(newChannel);
131: }
132: }
133:
134: protected abstract TChannel CreateChannel(
135: BufferManager bufferManager, MessageEncoderFactory encoder, EndpointAddress localAddress,
136: MessageBusChannelListenerBase<TChannel> parent,
137: IBus bus);
138:
139: private void OnChannelClosed(object sender, EventArgs e)
140: {
141: var channel = sender as TChannel;
142: lock (_currentChannelLock)
143: {
144: if (channel == _currentChannel)
145: {
146: _currentChannel = null;
147: }
148: }
149: }
150:
151: protected override TChannel OnAcceptChannel(TimeSpan timeout)
152: {
153: if (!IsDisposed)
154: {
155: EnsureChannelAvailable();
156: }
157:
158: TChannel channel = null;
159: if (_channelQueue.Dequeue(timeout, out channel))
160: {
161: return channel;
162: }
163: else
164: {
165: throw new TimeoutException();
166: }
167: }
168:
169: protected override IAsyncResult OnBeginAcceptChannel(TimeSpan timeout, AsyncCallback callback, object state)
170: {
171: if (!IsDisposed)
172: {
173: EnsureChannelAvailable();
174: }
175:
176: return _channelQueue.BeginDequeue(timeout, callback, state);
177: }
178:
179: protected override TChannel OnEndAcceptChannel(IAsyncResult result)
180: {
181: TChannel channel;
182: if (_channelQueue.EndDequeue(result, out channel))
183: {
184: return channel;
185: }
186: else
187: {
188: throw new TimeoutException();
189: }
190: }
191:
192: protected override void OnOpen(TimeSpan timeout)
193: {
194: }
195:
196: protected override IAsyncResult OnBeginWaitForChannel(TimeSpan timeout, AsyncCallback callback, object state)
197: {
198: throw new NotImplementedException();
199: }
200:
201: protected override bool OnEndWaitForChannel(IAsyncResult result)
202: {
203: throw new NotImplementedException();
204: }
205:
206: protected override bool OnWaitForChannel(TimeSpan timeout)
207: {
208: throw new NotImplementedException();
209: }
210:
211: protected override IAsyncResult OnBeginClose(TimeSpan timeout, AsyncCallback callback, object state)
212: {
213: throw new NotImplementedException();
214: }
215:
216: protected override IAsyncResult OnBeginOpen(TimeSpan timeout, AsyncCallback callback, object state)
217: {
218: throw new NotImplementedException();
219: }
220:
221: protected override void OnEndClose(IAsyncResult result)
222: {
223: throw new NotImplementedException();
224: }
225:
226: protected override void OnEndOpen(IAsyncResult result)
227: {
228: throw new NotImplementedException();
229: }
230: }
As you can see, the ChannelFactoryBase and the ChannelListenerBase I just expose an abstract method which is CreateChannel, that gives a chance to the sub class to instant the actual channel object. After these modification our request and reply part (that had been done in the previous post) could be turned simpler and clearer.
The RequestChannelFactory will be like this.
1: public class MessageBusRequestChannelFactory : MessageBusChannelFactoryBase<IRequestChannel>
2: {
3: public MessageBusRequestChannelFactory(MessageBusTransportBindingElement transportElement, BindingContext context)
4: : base(transportElement, context)
5: {
6: }
7:
8: protected override IRequestChannel CreateChannel(
9: BufferManager bufferManager, MessageEncoderFactory encoder, EndpointAddress remoteAddress,
10: MessageBusChannelFactoryBase<IRequestChannel> parent,
11: Uri via,
12: IBus bus)
13: {
14: return new MessageBusRequestChannel(bufferManager, encoder, parent, remoteAddress, via, bus);
15: }
16: }
And this is the ReplyChannelListener.
1: public class MessageBusReplyChannelListener : MessageBusChannelListenerBase<IReplyChannel>
2: {
3: public MessageBusReplyChannelListener(MessageBusTransportBindingElement transportElement, BindingContext context)
4: : base(transportElement, context)
5: {
6: }
7:
8: protected override IReplyChannel CreateChannel(
9: BufferManager bufferManager, MessageEncoderFactory encoder, EndpointAddress localAddress,
10: MessageBusChannelListenerBase<IReplyChannel> parent,
11: IBus bus)
12: {
13: return new MessageBusReplyChannel(bufferManager, encoder, localAddress, parent, bus);
14: }
15: }
And below is the RequestChannel and ReplyChannel. I inherited them from the MessageBusChannelBase class and utilized the helper methods to convert between string and message.
1: public class MessageBusRequestChannel : MessageBusChannelBase, IRequestChannel
2: {
3: private readonly IBus _bus;
4: private readonly Uri _via;
5: private readonly EndpointAddress _remoteAddress;
6: private readonly object _aLock;
7:
8: public MessageBusRequestChannel(
9: BufferManager bufferManager, MessageEncoderFactory encoder, ChannelManagerBase parent,
10: EndpointAddress remoteAddress, Uri via, IBus bus)
11: : base(bufferManager, encoder, parent)
12: {
13: _via = via;
14: _remoteAddress = remoteAddress;
15: _bus = bus;
16: _aLock = new object();
17: }
18:
19: public Uri Via
20: {
21: get
22: {
23: return _via;
24: }
25: }
26:
27: public IAsyncResult BeginRequest(Message message, TimeSpan timeout, AsyncCallback callback, object state)
28: {
29: throw new NotImplementedException();
30: }
31:
32: public IAsyncResult BeginRequest(Message message, AsyncCallback callback, object state)
33: {
34: throw new NotImplementedException();
35: }
36:
37: public Message EndRequest(IAsyncResult result)
38: {
39: throw new NotImplementedException();
40: }
41:
42: public System.ServiceModel.EndpointAddress RemoteAddress
43: {
44: get
45: {
46: return _remoteAddress;
47: }
48: }
49:
50: public Message Request(Message message, TimeSpan timeout)
51: {
52: ThrowIfDisposedOrNotOpen();
53: lock (_aLock)
54: {
55: // unbox the message into string that will be sent into the bus
56: var content = GetStringFromWcfMessage(message,_remoteAddress);
57: // send the message into bus
58: var busMsgId = _bus.SendRequest(content, true, null);
59: // waiting for the reply message arrive from the bus
60: var replyMsg = _bus.Receive(false, busMsgId);
61: // box the message from the bus message content and return back
62: var reply = GetWcfMessageFromString(replyMsg.Content);
63: return reply;
64: }
65: }
66:
67: public Message Request(Message message)
68: {
69: return Request(message, DefaultSendTimeout);
70: }
71: }
1: public class MessageBusReplyChannel : MessageBusChannelBase, IReplyChannel
2: {
3: private readonly EndpointAddress _localAddress;
4: private readonly object _aLock;
5:
6: private readonly IBus _bus;
7:
8: private delegate bool TryReceiveRequestDelegate(TimeSpan timeout, out RequestContext context);
9: private TryReceiveRequestDelegate _tryReceiveRequestDelegate;
10:
11: public MessageBusReplyChannel(
12: BufferManager bufferManager, MessageEncoderFactory encoder, EndpointAddress localAddress,
13: ChannelManagerBase parent,
14: IBus bus)
15: : base(bufferManager, encoder, parent)
16: {
17: _localAddress = localAddress;
18: _bus = bus;
19: _aLock = new object();
20:
21: _tryReceiveRequestDelegate = (TimeSpan t, out RequestContext rc) =>
22: {
23: rc = null;
24: // receive the request message from the bus
25: var busMsg = _bus.Receive(true, null);
26: // box the wcf message
27: var message = GetWcfMessageFromString(busMsg.Content);
28: // initialize the request context and return
29: rc = new MessageBusRequestContext(message, this, _localAddress, _bus, busMsg.MessageID);
30: return true;
31: };
32: }
33:
34: public System.ServiceModel.EndpointAddress LocalAddress
35: {
36: get
37: {
38: return _localAddress;
39: }
40: }
41:
42: public bool WaitForRequest(TimeSpan timeout)
43: {
44: return true;
45: }
46:
47: public RequestContext ReceiveRequest()
48: {
49: return ReceiveRequest(DefaultReceiveTimeout);
50: }
51:
52: public RequestContext ReceiveRequest(TimeSpan timeout)
53: {
54: ThrowIfDisposedOrNotOpen();
55: lock (_aLock)
56: {
57: // receive the request message from the bus
58: var busMsg = _bus.Receive(true, null);
59: // box the wcf message
60: var message = GetWcfMessageFromString(busMsg.Content);
61: // initialize the request context and return
62: return new MessageBusRequestContext(message, this, _localAddress, _bus, busMsg.MessageID);
63: }
64: }
65:
66: public IAsyncResult BeginTryReceiveRequest(TimeSpan timeout, AsyncCallback callback, object state)
67: {
68: RequestContext context;
69: return _tryReceiveRequestDelegate.BeginInvoke(timeout, out context, callback, state);
70: }
71:
72: public bool EndTryReceiveRequest(IAsyncResult result, out RequestContext context)
73: {
74: var ret = _tryReceiveRequestDelegate.EndInvoke(out context, result);
75: return ret;
76: }
77:
78: public IAsyncResult BeginReceiveRequest(TimeSpan timeout, AsyncCallback callback, object state)
79: {
80: throw new NotImplementedException();
81: }
82:
83: public IAsyncResult BeginReceiveRequest(AsyncCallback callback, object state)
84: {
85: throw new NotImplementedException();
86: }
87:
88: public IAsyncResult BeginWaitForRequest(TimeSpan timeout, AsyncCallback callback, object state)
89: {
90: throw new NotImplementedException();
91: }
92:
93: public RequestContext EndReceiveRequest(IAsyncResult result)
94: {
95: throw new NotImplementedException();
96: }
97:
98: public bool EndWaitForRequest(IAsyncResult result)
99: {
100: throw new NotImplementedException();
101: }
102:
103: public bool TryReceiveRequest(TimeSpan timeout, out RequestContext context)
104: {
105: throw new NotImplementedException();
106: }
107: }
Also a little bit changes could be made in the RequestContext as well, which utilizes the message convert methods as well.
1: public class MessageBusRequestContext : RequestContext
2: {
3: private bool _aborted;
4: private readonly Message _message;
5: private readonly MessageBusReplyChannel _parent;
6: private readonly EndpointAddress _address;
7: private readonly object _aLock;
8: private readonly string _busMessageId;
9: private readonly IBus _bus;
10:
11: private CommunicationState _state;
12:
13: public MessageBusRequestContext(
14: Message message, MessageBusReplyChannel parent,
15: EndpointAddress address,
16: IBus bus,
17: string relatedTo)
18: {
19: _aborted = false;
20: _parent = parent;
21: _message = message;
22: _address = address;
23: _busMessageId = relatedTo;
24: _bus = bus;
25:
26: _aLock = new object();
27: _state = CommunicationState.Opened;
28: }
29:
30: public override void Abort()
31: {
32: lock (_aLock)
33: {
34: if (_aborted)
35: {
36: return;
37: }
38: _aborted = true;
39: _state = CommunicationState.Faulted;
40: }
41: }
42:
43: public override IAsyncResult BeginReply(Message message, TimeSpan timeout, AsyncCallback callback, object state)
44: {
45: throw new NotImplementedException();
46: }
47:
48: public override IAsyncResult BeginReply(Message message, AsyncCallback callback, object state)
49: {
50: throw new NotImplementedException();
51: }
52:
53: public override void Close(TimeSpan timeout)
54: {
55: lock (_aLock)
56: {
57: _state = CommunicationState.Closed;
58: }
59: }
60:
61: public override void Close()
62: {
63: Close(TimeSpan.MaxValue);
64: }
65:
66: public override void EndReply(IAsyncResult result)
67: {
68: throw new NotImplementedException();
69: }
70:
71: public override void Reply(Message message, TimeSpan timeout)
72: {
73: // unbox the reply message to string
74: var content = _parent.GetStringFromWcfMessage(message, _address);
75: // send the reply into bus
76: _bus.SendReply(content, false, _busMessageId);
77: }
78:
79: public override void Reply(Message message)
80: {
81: Reply(message, TimeSpan.MaxValue);
82: }
83:
84: public override Message RequestMessage
85: {
86: get
87: {
88: return _message;
89: }
90: }
91: }
Channel Factory, Listener and Channels for Datagram
After the refactoring it would be easier to create the channel factory, listener and channels for datagram MEP. First we will create the client side OutputChannelFactory, which just need to implement the CreateChannel from its base class (ChannelFactoryBase) and instant an IOutputChannel object.
1: public class MessageBusOutputChannelFactory : MessageBusChannelFactoryBase<IOutputChannel>
2: {
3: public MessageBusOutputChannelFactory(MessageBusTransportBindingElement transportElement, BindingContext context)
4: : base(transportElement, context)
5: {
6: }
7:
8: protected override IOutputChannel CreateChannel(
9: BufferManager bufferManager, MessageEncoderFactory encoder, EndpointAddress remoteAddress,
10: MessageBusChannelFactoryBase<IOutputChannel> parent,
11: Uri via,
12: IBus bus)
13: {
14: return new MessageBusOutputChannel(bufferManager, encoder, parent, remoteAddress, via, bus);
15: }
16: }
The MessageBusOutputChannel class we are going to create will be inherited from the MessageBusChannelBase as well, and will implement the IOutputChannel interface. There’s only one important method we should implement which is Send. This method will be invoked when the client send the message to the server to do something. You will notice that this method doesn’t have any return value, which means, as the datagram MEP defined, the client should not care about whether the service will process its message.
Since we have had the convert method in the channel base class, we can simply convert the message into string and pass it to the underlying message bus.
1: public virtual void Send(Message message, TimeSpan timeout)
2: {
3: var content = GetStringFromWcfMessage(message, RemoteAddress);
4: _bus.SendRequest(content, true, ChannelID, null);
5: }
The full code of the output channel is
1: public class MessageBusOutputChannel : MessageBusChannelBase, IOutputChannel
2: {
3: private readonly IBus _bus;
4: private readonly Uri _via;
5: private readonly EndpointAddress _remoteAddress;
6:
7: public MessageBusOutputChannel(
8: BufferManager bufferManager, MessageEncoderFactory encoder, ChannelManagerBase parent,
9: EndpointAddress remoteAddress,
10: Uri via, IBus bus)
11: : base(bufferManager, encoder, parent)
12: {
13: _bus = bus;
14: _via = via;
15: _remoteAddress = remoteAddress;
16: }
17:
18: public Uri Via
19: {
20: get
21: {
22: return _via;
23: }
24: }
25:
26: public EndpointAddress RemoteAddress
27: {
28: get
29: {
30: return _remoteAddress;
31: }
32: }
33:
34: public IAsyncResult BeginSend(Message message, TimeSpan timeout, AsyncCallback callback, object state)
35: {
36: throw new NotImplementedException();
37: }
38:
39: public IAsyncResult BeginSend(Message message, AsyncCallback callback, object state)
40: {
41: throw new NotImplementedException();
42: }
43:
44: public void EndSend(IAsyncResult result)
45: {
46: throw new NotImplementedException();
47: }
48:
49: public virtual void Send(Message message, TimeSpan timeout)
50: {
51: var content = GetStringFromWcfMessage(message, RemoteAddress);
52: _bus.SendRequest(content, true, ChannelID, null);
53: }
54:
55: public void Send(Message message)
56: {
57: Send(message, DefaultSendTimeout);
58: }
59: }
On the server side, similarly we need to create the ChannelListener from the base class and just need to implement the code to create the relevant InputChannel instance.
1: public class MessageBusInputChannelListener : MessageBusChannelListenerBase<IInputChannel>
2: {
3: public MessageBusInputChannelListener(MessageBusTransportBindingElement transportElement, BindingContext context)
4: : base(transportElement, context)
5: {
6: }
7:
8: protected override IInputChannel CreateChannel(
9: BufferManager bufferManager, MessageEncoderFactory encoder, EndpointAddress localAddress,
10: MessageBusChannelListenerBase<IInputChannel> parent,
11: IBus bus)
12: {
13: return new MessageBusInputChannel(bufferManager, encoder, parent, localAddress, bus);
14: }
15: }
The InputChannel need to receive the message sent from the client (InputChannel), by its BeginTryReceive, EndTryReceive, BeginReceive and EndReceive methods. We will also create two delegates for them and use the asynchronous invoke methods.
1: public class MessageBusInputChannel : MessageBusChannelBase, IInputChannel
2: {
3: private readonly IBus _bus;
4: private readonly EndpointAddress _localAddress;
5: private readonly object _aLock;
6:
7: private delegate bool TryReceiveDelegate(TimeSpan timeout, out Message message);
8: private TryReceiveDelegate _tryReceiveDelegate;
9:
10: private delegate Message ReceiveDelegate(TimeSpan timeout);
11: private ReceiveDelegate _receiveDelegate;
12:
13: public MessageBusInputChannel(
14: BufferManager bufferManager, MessageEncoderFactory encoder, ChannelManagerBase parent,
15: EndpointAddress localAddress,
16: IBus bus)
17: : base(bufferManager, encoder, parent)
18: {
19: _localAddress = localAddress;
20: _bus = bus;
21: _aLock = new object();
22:
23: _tryReceiveDelegate = (TimeSpan timeout, out Message message) =>
24: {
25: message = null;
26: try
27: {
28: var requestMessage = _bus.Receive(true, null);
29: if (requestMessage != null)
30: {
31: message = GetWcfMessageFromString(requestMessage.Content);
32: OnTryReceive(requestMessage);
33: }
34: }
35: catch (Exception ex)
36: {
37: throw new CommunicationException(ex.Message, ex);
38: }
39: return true;
40: };
41:
42: _receiveDelegate = (TimeSpan timeout) =>
43: {
44: var requestMessage = _bus.Receive(false, ChannelID);
45: return GetWcfMessageFromString(requestMessage.Content);
46: };
47: }
48:
49: public System.ServiceModel.EndpointAddress LocalAddress
50: {
51: get
52: {
53: return _localAddress;
54: }
55: }
56:
57: public IAsyncResult BeginReceive(TimeSpan timeout, AsyncCallback callback, object state)
58: {
59: return _receiveDelegate.BeginInvoke(timeout, callback, state);
60: }
61:
62: public IAsyncResult BeginReceive(AsyncCallback callback, object state)
63: {
64: return BeginReceive(DefaultReceiveTimeout, callback, state);
65: }
66:
67: public IAsyncResult BeginTryReceive(TimeSpan timeout, AsyncCallback callback, object state)
68: {
69: Message message;
70: return _tryReceiveDelegate.BeginInvoke(timeout, out message, callback, state);
71: }
72:
73: public IAsyncResult BeginWaitForMessage(TimeSpan timeout, AsyncCallback callback, object state)
74: {
75: throw new NotImplementedException();
76: }
77:
78: public Message EndReceive(IAsyncResult result)
79: {
80: return _receiveDelegate.EndInvoke(result);
81: }
82:
83: public virtual bool EndTryReceive(IAsyncResult result, out Message message)
84: {
85: var ret = _tryReceiveDelegate.EndInvoke(out message, result);
86: return ret;
87: }
88:
89: protected virtual void OnTryReceive(BusMessage message)
90: {
91: }
92:
93: public bool EndWaitForMessage(IAsyncResult result)
94: {
95: throw new NotImplementedException();
96: }
97:
98: public Message Receive(TimeSpan timeout)
99: {
100: throw new NotImplementedException();
101: }
102:
103: public Message Receive()
104: {
105: throw new NotImplementedException();
106: }
107:
108: public bool TryReceive(TimeSpan timeout, out Message message)
109: {
110: throw new NotImplementedException();
111: }
112:
113: public bool WaitForMessage(TimeSpan timeout)
114: {
115: throw new NotImplementedException();
116: }
117: }
You might notice that we defined a virtual method named OnTryRecevie and invoked it after the delegate got a valid incoming message, but without doing anything. This method will be used in the future in session implement. You can ignore it for now.
Last thing we need to do is to update the transport binding element, which will make our transport support the InputChannel and OutputChannel, and return the related ChannelFactory and ChannelListener.
1: public override bool CanBuildChannelFactory<TChannel>(BindingContext context)
2: {
3: return typeof(TChannel) == typeof(IRequestChannel) ||
4: typeof(TChannel) == typeof(IOutputChannel);
5: }
6:
7: public override bool CanBuildChannelListener<TChannel>(BindingContext context)
8: {
9: return typeof(TChannel) == typeof(IReplyChannel) ||
10: typeof(TChannel) == typeof(IInputChannel);
11: }
12:
13: public override IChannelFactory<TChannel> BuildChannelFactory<TChannel>(BindingContext context)
14: {
15: if (context == null)
16: {
17: throw new ArgumentNullException("context");
18: }
19: if (!CanBuildChannelFactory<TChannel>(context))
20: {
21: throw new ArgumentException(string.Format("Unsupported channel type: {0} for the channel factory.", typeof(TChannel).Name));
22: }
23:
24: if (typeof(TChannel) == typeof(IRequestChannel))
25: {
26: return (IChannelFactory<TChannel>)(object)new MessageBusRequestChannelFactory(this, context);
27: }
28: else if (typeof(TChannel) == typeof(IOutputChannel))
29: {
30: return (IChannelFactory<TChannel>)(object)new MessageBusOutputChannelFactory(this, context);
31: }
32: else
33: {
34: throw new ArgumentException(string.Format("Unsupported channel type: {0} for the channel listener.", typeof(TChannel).Name));
35: }
36:
37: }
38:
39: public override IChannelListener<TChannel> BuildChannelListener<TChannel>(BindingContext context)
40: {
41: if (context == null)
42: {
43: throw new ArgumentNullException("context");
44: }
45: if (!CanBuildChannelListener<TChannel>(context))
46: {
47: throw new ArgumentException(string.Format("Unsupported channel type: {0} for the channel listener.", typeof(TChannel).Name));
48: }
49:
50: if (typeof(TChannel) == typeof(IReplyChannel))
51: {
52: return (IChannelListener<TChannel>)(object)new MessageBusReplyChannelListener(this, context);
53: }
54: else if (typeof(TChannel) == typeof(IInputChannel))
55: {
56: return (IChannelListener<TChannel>)(object)new MessageBusInputChannelListener(this, context);
57: }
58: else
59: {
60: throw new ArgumentException(string.Format("Unsupported channel type: {0} for the channel listener.", typeof(TChannel).Name));
61: }
62: }
Now let’s test our datagram operation. Open the test console project and create a new service contract with one method which has the IsOneWay = true defined.
1: [ServiceContract(Namespace = "http://wcf.shaunxu.me/")]
2: public interface ISampleService
3: {
4: [OperationContract(IsOneWay = true)]
5: void Ping();
6: }
7:
8: public class SampleService : ISampleService
9: {
10: public void Ping()
11: {
12: Console.WriteLine("Service {0}: PONG!", OperationContext.Current.Host.GetHashCode());
13: }
14: }
And update the console main function as well.
1: static void Main(string[] args)
2: {
3: var bus = new InProcMessageBus();
4: var address = "net.bus://localhost/sample";
5:
6: // establish the services
7: var host1 = EstablishServiceHost<ISampleService, SampleService>(bus, address);
8: var host2 = EstablishServiceHost<ISampleService, SampleService>(bus, address);
9: var host3 = EstablishServiceHost<ISampleService, SampleService>(bus, address);
10:
11: // establish the client
12: var cliBinding = new MessageBusTransportBinding(bus);
13: var factory = new ChannelFactory<ISampleService>(cliBinding, address);
14: factory.Opened += (sender, e) =>
15: {
16: Console.WriteLine("Client connected to {0}", factory.Endpoint.ListenUri);
17: };
18: var proxy = factory.CreateChannel();
19:
20: // invoke the service
21: using (proxy as IDisposable)
22: {
23: Console.WriteLine("Press 'p' to ping the service, otherwise to exit.");
24: var content = Console.ReadLine();
25: while (string.Compare(content, "p", true) == 0)
26: {
27: proxy.Ping();
28: content = Console.ReadLine();
29: }
30: }
31: }
Again, I’m using the in process message bus so the multi-instances service will be hosted on threads instead of processes and machines. When we invoked the service many times we can see the datagram message send across to the three services.

IsOneWay = ture Always Means Input-Output Channel?
Let’s modified our test service contract and service implementation a little bit, and to see what happen. Let’s add another method in the contract with the IsOneWay = false, as well as the implementation class.
1: [ServiceContract(Namespace = "http://wcf.shaunxu.me/")]
2: public interface ISampleService
3: {
4: [OperationContract(IsOneWay = true)]
5: void Ping();
6:
7: [OperationContract(IsOneWay = false)]
8: string Reverse(string content);
9: }
10:
11: public class SampleService : ISampleService
12: {
13: public void Ping()
14: {
15: Console.WriteLine("Service {0}: PONG!", OperationContext.Current.Host.GetHashCode());
16: }
17:
18: public string Reverse(string content)
19: {
20: var result = new string(content.Reverse().ToArray());
21: Console.WriteLine("Service {0}: Reverse {1} => {2}", OperationContext.Current.Host.GetHashCode(), content, result);
22: return result;
23: }
24: }
And then F5 our console application and try to invoke the Ping method. Now we found that the client sent the request, but the service method was not invoked and our client was dead.

More interesting, if we debugged the code we will find that when we invoked the Ping service method which is IsOneWay = true, the WCF tried to create a RequestChannel and ReplyChannel to us.

Why I invoked a service method with IsOneWay = true, but the WCF gave me a request and reply Channel? Well the channel the WCF chose is NOT based on the method you are invoking, it is based on the contract definition. The channel selection logic is complex but to be simple, WCF will select the channels which just satisfied with the requirement of your service contract. Kenny Wolf have a nice post that roughly summarized this logic.
In our case, we have a method marked IsOneWay = true and another marked IsOneWay = false. So the WCF will chose the request reply channels even though we are using the method that IsOneWay = true. The problem is that, when using request reply channel handles the datagram message, the reply message from WCF will always be null. This will cause the exception in our RequestContext.Reply method, since we need to send the reply message back to the client but now it’s null.

Thanks Jiang Jinnan, who helped me a lot to figure this problem out and the solution suggestion. For more information about this WCF MVP you can visit his blog (in Chinese).
When we knew the root caution, the solution would be very simple. In the RequestContext.Reply method we will check if the message is null. If yes, that means this is a datagram channel shape and we need to send a blank message back to the client to acknowledge and make sure the process is running.
1: public override void Reply(Message message, TimeSpan timeout)
2: {
3: if (message == null)
4: {
5: // this means this is a one way message
6: // we just need to send a blank message back to the client to acknowledge it
7: _bus.SendReply(string.Empty, false, _busMessageId);
8: }
9: else
10: {
11: // unbox the reply message to string
12: var content = _parent.GetStringFromWcfMessage(message, _address);
13: // send the reply into bus
14: _bus.SendReply(content, false, _busMessageId);
15: }
16: }
And in the RequestChannel.Request we also need to check whether the reply message content is blank. If yes we will do nothing since we know that this is a datagram channel acknowledge message from the server side.
1: public Message Request(Message message, TimeSpan timeout)
2: {
3: ThrowIfDisposedOrNotOpen();
4: lock (_aLock)
5: {
6: // unbox the message into string that will be sent into the bus
7: var content = GetStringFromWcfMessage(message,_remoteAddress);
8: // send the message into bus
9: var busMsgId = _bus.SendRequest(content, true, null);
10: // waiting for the reply message arrive from the bus
11: var replyMsg = _bus.Receive(false, busMsgId);
12: if (string.IsNullOrWhiteSpace(replyMsg.Content))
13: {
14: // this means this is a one way channel acknowledge from server
15: // we just return null and do nothing
16: return null;
17: }
18: else
19: {
20: // box the message from the bus message content and return back
21: var reply = GetWcfMessageFromString(replyMsg.Content);
22: return reply;
23: }
24: }
25: }
Now let’s execute our test console you can see the IsOneWay = ture method was invoked correctly.

Summary
In this post I implemented the datagram MEP channel factory, listener and the related input output channels. I also explained a potential problem if we have IsOneWay = true methods and IsOneWay = false methods mixed in one service contract, and how to solve it.
We have finished two MEPs in our transport extension, request reply and datagram. And both of them supports the service side scaling-out, as you can see our request can be handled by multiple services, based on the pulling approach.
In the next post I will demonstrate the most complexity and difficulty part, the duplex mode. I will implement the duplex channels over our message bus infrastructure with the server side scalability and sticky to the same client instance as well.
You can download the source code here.
Hope this helps,
Shaun
All documents and related graphics, codes are provided "AS IS" without warranty of any kind.
Copyright © Shaun Ziyan Xu. This work is licensed under the Creative Commons License.
In the previous post I demonstrated how to implement a very basic transport extension over an in memory message bus that supports request reply MEP. At the end of that post I created a console application and establish the service and client directly through the channel listener (on the server side) and the channel factory (on the client side). But this is not the WCF usage that we are familiar with. If you have been using the WCF for a while the common pattern is to create a ServiceHost on the server side and a ChannelFactory<T> on the client side. Then we will using the ChannelFactory<T>.CreateChannel to get an instance of the service contract interface so we can invoke the service remotely as if it’s locally.
Using the “Add Service Reference” in Visual Studio or the “svcutil” is the same way as using the ChannelFactory<T> on the client side. In fact the Visual Studio will call the “svcutil” and generate a client proxy to us, which is similar as the service contract one but will some connection functions.
Based on the transport extension we had finished so far, we can using the ServiceHost and ChannelFactory at once. But since we just implemented the synchronized methods, for example the ReceiveRequest. We must indicate to WCF infrastructure to use the synchronized methods.
1: <endpointBehaviors>
2: <behavior name="">
3: <synchronousReceive />
4: </behavior>
5: </endpointBehaviors>
Now I will describe how to amend our transport extension to support the asynchronous mode channel creation and message operation so that we can use the ServiceHost and ClientFactory<T>, which is the way that we are familiar with. Again, this is not mandatory for transport extension since it works well even though only support the synchronous mode. But in order to maximize the performance and usability, this is very important.
Client: ChannelFactory and RequestChannel
On the client side the transport binding element will create the channel factory and it will initialize a request channel object in its CreateChannel method. All of them are invoked synchronously and there’s no related asynchronous methods available. This means in WCF, the client operation will always be invoked synchronously.
Although we can generate a client proxy class from the “svcutil” with the asynchronously methods, this will only affect on the calling thread. The channel factory and channel will still be created and invoked synchronously in its thread.

It’s the same on the client side channel, the RequestChannel in our case. It only support the synchronously method to send the request message. So on the client side we don’t need to modify anything.
Server: ChannelListener
On the server side it would be a little bit complex. In WCF one service could be able to handle multiple requests at the same time by using the different threads. This means on the server side besides the synchronous methods we must implement the asynchronous ones to support the concurrency mode.
As we know the ChannelListener will create a server side channel when any request came to. In our current code we just initialized a new ReplyChannel instance and returned in the OnAcceptChannel method. But if we are going to use the ServiceHost class to establish the service we must implement its asynchronous method pair, which are OnBeginAcceptChannel and OnEndAcceptChannel. One of the implementation is very simple, we can create a new delegate that point to the OnAcceptChannel method we had done before, and use the asynchronous invoke methods to implement the OnBeginAcceptChannel and the OnEndAcceptChannel. We will define a delegation and assign the OnAcceptChannel to it in the class constructor, and use the BeginInvoke method in the OnBeginAcceptChannel while EndInvoke in the OnEndAcceptChannel method.
1: public class MessageBusReplyChannelListener : ChannelListenerBase<IReplyChannel>
2: {
3: ... ...
4:
5: private delegate IReplyChannel AcceptChannelDelegate(TimeSpan timeout);
6: private AcceptChannelDelegate _acceptChannelDelegate;
7:
8: ... ...
9:
10: public MessageBusReplyChannelListener(MessageBusTransportBindingElement transportElement, BindingContext context)
11: : base(context.Binding)
12: {
13: _encoderFactory = (new TextMessageEncodingBindingElement()).CreateMessageEncoderFactory();
14: _bufferManager = BufferManager.CreateBufferManager(transportElement.MaxBufferPoolSize, int.MaxValue);
15: _uri = new Uri(context.ListenUriBaseAddress, context.ListenUriRelativeAddress);
16: _bus = transportElement.Bus;
17:
18: _acceptChannelDelegate = OnAcceptChannel;
19: }
20:
21: ... ...
22:
23: protected override IAsyncResult OnBeginAcceptChannel(TimeSpan timeout, AsyncCallback callback, object state)
24: {
25: return _acceptChannelDelegate.BeginInvoke(timeout, callback, state);
26: }
27:
28: protected override IReplyChannel OnEndAcceptChannel(IAsyncResult result)
29: {
30: return _acceptChannelDelegate.EndInvoke(result);
31: }
32:
33: ... ...
34: }
This is OK and works when using the ServiceHost, and we will see that we are going to use the same way when amend the ReplyChannel part. But this is not the best way of doing the ChannelListener. It’s because if we are doing like this all incoming request will cause the service to try to create a new ReplyChannel, regardless if the server have the enough computing and memory resource available. In WCF we can define the throughput on a server so all client request will be queued on the server, and the server will create the channels based on its availability. Microsoft gives us a very good class to implement this queue with the throughput controlling, as well as the asynchronous enqueue, dequeue and dispatch methods, which the name is InputQueue<T>.
InputQueue<T> was not provided within the BCL of the .NET Framework, although it’s very useful. But there are many ways to get it. One is to use some decompile tool, such as the ILSpy, to get its source code from the System.ServiceModel.dll. It’s an internal class inside the System.ServiceModel.Chanels namespace.
Alternatively you can download the WCF samples and after installed, you can find this class in the folder \WCF\Extensibility\Transport\Udp\CS\UdpTransport\InputQueue.cs. Below this is source code of this class I’m going to use in our example.
1: // ItemDequeuedCallback is called as an item is dequeued from the InputQueue. The
2: // InputQueue lock is not held during the callback. However, the user code is
3: // not notified of the item being available until the callback returns. If you
4: // are not sure if the callback blocks for a long time, then first call
5: // IOThreadScheduler.ScheduleCallback to get to a "safe" thread.
6: delegate void ItemDequeuedCallback();
7:
8: /// <summary>
9: /// Handles asynchronous interactions between producers and consumers.
10: /// Producers can dispatch available data to the input queue,
11: /// where it is dispatched to a waiting consumer or stored until a
12: /// consumer becomes available. Consumers can synchronously or asynchronously
13: /// request data from the queue, which is returned when data becomes
14: /// available.
15: /// </summary>
16: /// <typeparam name="T">The concrete type of the consumer objects that are waiting for data.</typeparam>
17: internal class InputQueue<T> : IDisposable where T : class
18: {
19: //Stores items that are waiting to be accessed.
20: private ItemQueue itemQueue;
21:
22: //Each IQueueReader represents some consumer that is waiting for
23: //items to appear in the queue. The readerQueue stores them
24: //in an ordered list so consumers get serviced in a FIFO manner.
25: private Queue<IQueueReader> readerQueue;
26:
27: //Each IQueueWaiter represents some waiter that is waiting for
28: //items to appear in the queue. When any item appears, all
29: //waiters are signaled.
30: private List<IQueueWaiter> waiterList;
31:
32: private static WaitCallback onInvokeDequeuedCallback;
33: private static WaitCallback onDispatchCallback;
34: private static WaitCallback completeOutstandingReadersCallback;
35: private static WaitCallback completeWaitersFalseCallback;
36: private static WaitCallback completeWaitersTrueCallback;
37:
38: //Represents the current state of the InputQueue.
39: //as it transitions through its lifecycle.
40: QueueState queueState;
41: enum QueueState
42: {
43: Open,
44: Shutdown,
45: Closed
46: }
47:
48: public InputQueue()
49: {
50: this.itemQueue = new ItemQueue();
51: this.readerQueue = new Queue<IQueueReader>();
52: this.waiterList = new List<IQueueWaiter>();
53: this.queueState = QueueState.Open;
54: }
55:
56: public int PendingCount
57: {
58: get
59: {
60: lock (ThisLock)
61: {
62: return itemQueue.ItemCount;
63: }
64: }
65: }
66:
67: object ThisLock
68: {
69: get { return itemQueue; }
70: }
71:
72: public IAsyncResult BeginDequeue(TimeSpan timeout, AsyncCallback callback, object state)
73: {
74: Item item = default(Item);
75:
76: lock (ThisLock)
77: {
78: if (queueState == QueueState.Open)
79: {
80: if (itemQueue.HasAvailableItem)
81: {
82: item = itemQueue.DequeueAvailableItem();
83: }
84: else
85: {
86: AsyncQueueReader reader = new AsyncQueueReader(this, timeout, callback, state);
87: readerQueue.Enqueue(reader);
88: return reader;
89: }
90: }
91: else if (queueState == QueueState.Shutdown)
92: {
93: if (itemQueue.HasAvailableItem)
94: {
95: item = itemQueue.DequeueAvailableItem();
96: }
97: else if (itemQueue.HasAnyItem)
98: {
99: AsyncQueueReader reader = new AsyncQueueReader(this, timeout, callback, state);
100: readerQueue.Enqueue(reader);
101: return reader;
102: }
103: }
104: }
105:
106: InvokeDequeuedCallback(item.DequeuedCallback);
107: return new TypedCompletedAsyncResult<T>(item.GetValue(), callback, state);
108: }
109:
110: public IAsyncResult BeginWaitForItem(TimeSpan timeout, AsyncCallback callback, object state)
111: {
112: lock (ThisLock)
113: {
114: if (queueState == QueueState.Open)
115: {
116: if (!itemQueue.HasAvailableItem)
117: {
118: AsyncQueueWaiter waiter = new AsyncQueueWaiter(timeout, callback, state);
119: waiterList.Add(waiter);
120: return waiter;
121: }
122: }
123: else if (queueState == QueueState.Shutdown)
124: {
125: if (!itemQueue.HasAvailableItem && itemQueue.HasAnyItem)
126: {
127: AsyncQueueWaiter waiter = new AsyncQueueWaiter(timeout, callback, state);
128: waiterList.Add(waiter);
129: return waiter;
130: }
131: }
132: }
133:
134: return new TypedCompletedAsyncResult<bool>(true, callback, state);
135: }
136:
137: static void CompleteOutstandingReadersCallback(object state)
138: {
139: IQueueReader[] outstandingReaders = (IQueueReader[])state;
140:
141: for (int i = 0; i < outstandingReaders.Length; i++)
142: {
143: outstandingReaders[i].Set(default(Item));
144: }
145: }
146:
147: static void CompleteWaitersFalseCallback(object state)
148: {
149: CompleteWaiters(false, (IQueueWaiter[])state);
150: }
151:
152: static void CompleteWaitersTrueCallback(object state)
153: {
154: CompleteWaiters(true, (IQueueWaiter[])state);
155: }
156:
157: static void CompleteWaiters(bool itemAvailable, IQueueWaiter[] waiters)
158: {
159: for (int i = 0; i < waiters.Length; i++)
160: {
161: waiters[i].Set(itemAvailable);
162: }
163: }
164:
165: static void CompleteWaitersLater(bool itemAvailable, IQueueWaiter[] waiters)
166: {
167: if (itemAvailable)
168: {
169: if (completeWaitersTrueCallback == null)
170: completeWaitersTrueCallback = new WaitCallback(CompleteWaitersTrueCallback);
171:
172: ThreadPool.QueueUserWorkItem(completeWaitersTrueCallback, waiters);
173: }
174: else
175: {
176: if (completeWaitersFalseCallback == null)
177: completeWaitersFalseCallback = new WaitCallback(CompleteWaitersFalseCallback);
178:
179: ThreadPool.QueueUserWorkItem(completeWaitersFalseCallback, waiters);
180: }
181: }
182:
183: void GetWaiters(out IQueueWaiter[] waiters)
184: {
185: if (waiterList.Count > 0)
186: {
187: waiters = waiterList.ToArray();
188: waiterList.Clear();
189: }
190: else
191: {
192: waiters = null;
193: }
194: }
195:
196: public void Close()
197: {
198: ((IDisposable)this).Dispose();
199: }
200:
201: public void Shutdown()
202: {
203: IQueueReader[] outstandingReaders = null;
204: lock (ThisLock)
205: {
206: if (queueState == QueueState.Shutdown)
207: return;
208:
209: if (queueState == QueueState.Closed)
210: return;
211:
212: this.queueState = QueueState.Shutdown;
213:
214: if (readerQueue.Count > 0 && this.itemQueue.ItemCount == 0)
215: {
216: outstandingReaders = new IQueueReader[readerQueue.Count];
217: readerQueue.CopyTo(outstandingReaders, 0);
218: readerQueue.Clear();
219: }
220: }
221:
222: if (outstandingReaders != null)
223: {
224: for (int i = 0; i < outstandingReaders.Length; i++)
225: {
226: outstandingReaders[i].Set(new Item((Exception)null, null));
227: }
228: }
229: }
230:
231: public T Dequeue(TimeSpan timeout)
232: {
233: T value;
234:
235: if (!this.Dequeue(timeout, out value))
236: {
237: throw new TimeoutException(string.Format("Dequeue timed out in {0}.", timeout));
238: }
239:
240: return value;
241: }
242:
243: public bool Dequeue(TimeSpan timeout, out T value)
244: {
245: WaitQueueReader reader = null;
246: Item item = new Item();
247:
248: lock (ThisLock)
249: {
250: if (queueState == QueueState.Open)
251: {
252: if (itemQueue.HasAvailableItem)
253: {
254: item = itemQueue.DequeueAvailableItem();
255: }
256: else
257: {
258: reader = new WaitQueueReader(this);
259: readerQueue.Enqueue(reader);
260: }
261: }
262: else if (queueState == QueueState.Shutdown)
263: {
264: if (itemQueue.HasAvailableItem)
265: {
266: item = itemQueue.DequeueAvailableItem();
267: }
268: else if (itemQueue.HasAnyItem)
269: {
270: reader = new WaitQueueReader(this);
271: readerQueue.Enqueue(reader);
272: }
273: else
274: {
275: value = default(T);
276: return true;
277: }
278: }
279: else // queueState == QueueState.Closed
280: {
281: value = default(T);
282: return true;
283: }
284: }
285:
286: if (reader != null)
287: {
288: return reader.Wait(timeout, out value);
289: }
290: else
291: {
292: InvokeDequeuedCallback(item.DequeuedCallback);
293: value = item.GetValue();
294: return true;
295: }
296: }
297:
298: public void Dispose()
299: {
300: Dispose(true);
301:
302: GC.SuppressFinalize(this);
303: }
304:
305: protected void Dispose(bool disposing)
306: {
307: if (disposing)
308: {
309: bool dispose = false;
310:
311: lock (ThisLock)
312: {
313: if (queueState != QueueState.Closed)
314: {
315: queueState = QueueState.Closed;
316: dispose = true;
317: }
318: }
319:
320: if (dispose)
321: {
322: while (readerQueue.Count > 0)
323: {
324: IQueueReader reader = readerQueue.Dequeue();
325: reader.Set(default(Item));
326: }
327:
328: while (itemQueue.HasAnyItem)
329: {
330: Item item = itemQueue.DequeueAnyItem();
331: item.Dispose();
332: InvokeDequeuedCallback(item.DequeuedCallback);
333: }
334: }
335: }
336: }
337:
338: public void Dispatch()
339: {
340: IQueueReader reader = null;
341: Item item = new Item();
342: IQueueReader[] outstandingReaders = null;
343: IQueueWaiter[] waiters = null;
344: bool itemAvailable = true;
345:
346: lock (ThisLock)
347: {
348: itemAvailable = !((queueState == QueueState.Closed) || (queueState == QueueState.Shutdown));
349: this.GetWaiters(out waiters);
350:
351: if (queueState != QueueState.Closed)
352: {
353: itemQueue.MakePendingItemAvailable();
354:
355: if (readerQueue.Count > 0)
356: {
357: item = itemQueue.DequeueAvailableItem();
358: reader = readerQueue.Dequeue();
359:
360: if (queueState == QueueState.Shutdown && readerQueue.Count > 0 && itemQueue.ItemCount == 0)
361: {
362: outstandingReaders = new IQueueReader[readerQueue.Count];
363: readerQueue.CopyTo(outstandingReaders, 0);
364: readerQueue.Clear();
365:
366: itemAvailable = false;
367: }
368: }
369: }
370: }
371:
372: if (outstandingReaders != null)
373: {
374: if (completeOutstandingReadersCallback == null)
375: completeOutstandingReadersCallback = new WaitCallback(CompleteOutstandingReadersCallback);
376:
377: ThreadPool.QueueUserWorkItem(completeOutstandingReadersCallback, outstandingReaders);
378: }
379:
380: if (waiters != null)
381: {
382: CompleteWaitersLater(itemAvailable, waiters);
383: }
384:
385: if (reader != null)
386: {
387: InvokeDequeuedCallback(item.DequeuedCallback);
388: reader.Set(item);
389: }
390: }
391:
392: //Ends an asynchronous Dequeue operation.
393: public T EndDequeue(IAsyncResult result)
394: {
395: T value;
396:
397: if (!this.EndDequeue(result, out value))
398: {
399: throw new TimeoutException("Asynchronous Dequeue operation timed out.");
400: }
401:
402: return value;
403: }
404:
405: public bool EndDequeue(IAsyncResult result, out T value)
406: {
407: TypedCompletedAsyncResult<T> typedResult = result as TypedCompletedAsyncResult<T>;
408:
409: if (typedResult != null)
410: {
411: value = TypedCompletedAsyncResult<T>.End(result);
412: return true;
413: }
414:
415: return AsyncQueueReader.End(result, out value);
416: }
417:
418: public bool EndWaitForItem(IAsyncResult result)
419: {
420: TypedCompletedAsyncResult<bool> typedResult = result as TypedCompletedAsyncResult<bool>;
421: if (typedResult != null)
422: {
423: return TypedCompletedAsyncResult<bool>.End(result);
424: }
425:
426: return AsyncQueueWaiter.End(result);
427: }
428:
429: public void EnqueueAndDispatch(T item)
430: {
431: EnqueueAndDispatch(item, null);
432: }
433:
434: public void EnqueueAndDispatch(T item, ItemDequeuedCallback dequeuedCallback)
435: {
436: EnqueueAndDispatch(item, dequeuedCallback, true);
437: }
438:
439: public void EnqueueAndDispatch(Exception exception, ItemDequeuedCallback dequeuedCallback, bool canDispatchOnThisThread)
440: {
441: Debug.Assert(exception != null, "exception parameter should not be null");
442: EnqueueAndDispatch(new Item(exception, dequeuedCallback), canDispatchOnThisThread);
443: }
444:
445: public void EnqueueAndDispatch(T item, ItemDequeuedCallback dequeuedCallback, bool canDispatchOnThisThread)
446: {
447: Debug.Assert(item != null, "item parameter should not be null");
448: EnqueueAndDispatch(new Item(item, dequeuedCallback), canDispatchOnThisThread);
449: }
450:
451: void EnqueueAndDispatch(Item item, bool canDispatchOnThisThread)
452: {
453: bool disposeItem = false;
454: IQueueReader reader = null;
455: bool dispatchLater = false;
456: IQueueWaiter[] waiters = null;
457: bool itemAvailable = true;
458:
459: lock (ThisLock)
460: {
461: itemAvailable = !((queueState == QueueState.Closed) || (queueState == QueueState.Shutdown));
462: this.GetWaiters(out waiters);
463:
464: if (queueState == QueueState.Open)
465: {
466: if (canDispatchOnThisThread)
467: {
468: if (readerQueue.Count == 0)
469: {
470: itemQueue.EnqueueAvailableItem(item);
471: }
472: else
473: {
474: reader = readerQueue.Dequeue();
475: }
476: }
477: else
478: {
479: if (readerQueue.Count == 0)
480: {
481: itemQueue.EnqueueAvailableItem(item);
482: }
483: else
484: {
485: itemQueue.EnqueuePendingItem(item);
486: dispatchLater = true;
487: }
488: }
489: }
490: else // queueState == QueueState.Closed || queueState == QueueState.Shutdown
491: {
492: disposeItem = true;
493: }
494: }
495:
496: if (waiters != null)
497: {
498: if (canDispatchOnThisThread)
499: {
500: CompleteWaiters(itemAvailable, waiters);
501: }
502: else
503: {
504: CompleteWaitersLater(itemAvailable, waiters);
505: }
506: }
507:
508: if (reader != null)
509: {
510: InvokeDequeuedCallback(item.DequeuedCallback);
511: reader.Set(item);
512: }
513:
514: if (dispatchLater)
515: {
516: if (onDispatchCallback == null)
517: {
518: onDispatchCallback = new WaitCallback(OnDispatchCallback);
519: }
520:
521: ThreadPool.QueueUserWorkItem(onDispatchCallback, this);
522: }
523: else if (disposeItem)
524: {
525: InvokeDequeuedCallback(item.DequeuedCallback);
526: item.Dispose();
527: }
528: }
529:
530: public bool EnqueueWithoutDispatch(T item, ItemDequeuedCallback dequeuedCallback)
531: {
532: Debug.Assert(item != null, "EnqueueWithoutDispatch: item parameter should not be null");
533: return EnqueueWithoutDispatch(new Item(item, dequeuedCallback));
534: }
535:
536: public bool EnqueueWithoutDispatch(Exception exception, ItemDequeuedCallback dequeuedCallback)
537: {
538: Debug.Assert(exception != null, "EnqueueWithoutDispatch: exception parameter should not be null");
539: return EnqueueWithoutDispatch(new Item(exception, dequeuedCallback));
540: }
541:
542: // This does not block, however, Dispatch() must be called later if this function
543: // returns true.
544: bool EnqueueWithoutDispatch(Item item)
545: {
546: lock (ThisLock)
547: {
548: // Open
549: if (queueState != QueueState.Closed && queueState != QueueState.Shutdown)
550: {
551: if (readerQueue.Count == 0)
552: {
553: itemQueue.EnqueueAvailableItem(item);
554: return false;
555: }
556: else
557: {
558: itemQueue.EnqueuePendingItem(item);
559: return true;
560: }
561: }
562: }
563:
564: item.Dispose();
565: InvokeDequeuedCallbackLater(item.DequeuedCallback);
566: return false;
567: }
568:
569: static void OnDispatchCallback(object state)
570: {
571: ((InputQueue<T>)state).Dispatch();
572: }
573:
574: static void InvokeDequeuedCallbackLater(ItemDequeuedCallback dequeuedCallback)
575: {
576: if (dequeuedCallback != null)
577: {
578: if (onInvokeDequeuedCallback == null)
579: {
580: onInvokeDequeuedCallback = OnInvokeDequeuedCallback;
581: }
582:
583: ThreadPool.QueueUserWorkItem(onInvokeDequeuedCallback, dequeuedCallback);
584: }
585: }
586:
587: static void InvokeDequeuedCallback(ItemDequeuedCallback dequeuedCallback)
588: {
589: if (dequeuedCallback != null)
590: {
591: dequeuedCallback();
592: }
593: }
594:
595: static void OnInvokeDequeuedCallback(object state)
596: {
597: ItemDequeuedCallback dequeuedCallback = (ItemDequeuedCallback)state;
598: dequeuedCallback();
599: }
600:
601: bool RemoveReader(IQueueReader reader)
602: {
603: lock (ThisLock)
604: {
605: if (queueState == QueueState.Open || queueState == QueueState.Shutdown)
606: {
607: bool removed = false;
608:
609: for (int i = readerQueue.Count; i > 0; i--)
610: {
611: IQueueReader temp = readerQueue.Dequeue();
612: if (Object.ReferenceEquals(temp, reader))
613: {
614: removed = true;
615: }
616: else
617: {
618: readerQueue.Enqueue(temp);
619: }
620: }
621:
622: return removed;
623: }
624: }
625:
626: return false;
627: }
628:
629: public bool WaitForItem(TimeSpan timeout)
630: {
631: WaitQueueWaiter waiter = null;
632: bool itemAvailable = false;
633:
634: lock (ThisLock)
635: {
636: if (queueState == QueueState.Open)
637: {
638: if (itemQueue.HasAvailableItem)
639: {
640: itemAvailable = true;
641: }
642: else
643: {
644: waiter = new WaitQueueWaiter();
645: waiterList.Add(waiter);
646: }
647: }
648: else if (queueState == QueueState.Shutdown)
649: {
650: if (itemQueue.HasAvailableItem)
651: {
652: itemAvailable = true;
653: }
654: else if (itemQueue.HasAnyItem)
655: {
656: waiter = new WaitQueueWaiter();
657: waiterList.Add(waiter);
658: }
659: else
660: {
661: return false;
662: }
663: }
664: else // queueState == QueueState.Closed
665: {
666: return true;
667: }
668: }
669:
670: if (waiter != null)
671: {
672: return waiter.Wait(timeout);
673: }
674: else
675: {
676: return itemAvailable;
677: }
678: }
679:
680: interface IQueueReader
681: {
682: void Set(Item item);
683: }
684:
685: interface IQueueWaiter
686: {
687: void Set(bool itemAvailable);
688: }
689:
690: class WaitQueueReader : IQueueReader
691: {
692: Exception exception;
693: InputQueue<T> inputQueue;
694: T item;
695: ManualResetEvent waitEvent;
696: object thisLock = new object();
697:
698: public WaitQueueReader(InputQueue<T> inputQueue)
699: {
700: this.inputQueue = inputQueue;
701: waitEvent = new ManualResetEvent(false);
702: }
703:
704: object ThisLock
705: {
706: get
707: {
708: return this.thisLock;
709: }
710: }
711:
712: public void Set(Item item)
713: {
714: lock (ThisLock)
715: {
716: Debug.Assert(this.item == null, "InputQueue.WaitQueueReader.Set: (this.item == null)");
717: Debug.Assert(this.exception == null, "InputQueue.WaitQueueReader.Set: (this.exception == null)");
718:
719: this.exception = item.Exception;
720: this.item = item.Value;
721: waitEvent.Set();
722: }
723: }
724:
725: public bool Wait(TimeSpan timeout, out T value)
726: {
727: bool isSafeToClose = false;
728: try
729: {
730: if (timeout == TimeSpan.MaxValue)
731: {
732: waitEvent.WaitOne();
733: }
734: else if (!waitEvent.WaitOne(timeout, false))
735: {
736: if (this.inputQueue.RemoveReader(this))
737: {
738: value = default(T);
739: isSafeToClose = true;
740: return false;
741: }
742: else
743: {
744: waitEvent.WaitOne();
745: }
746: }
747:
748: isSafeToClose = true;
749: }
750: finally
751: {
752: if (isSafeToClose)
753: {
754: waitEvent.Close();
755: }
756: }
757:
758: value = item;
759: return true;
760: }
761: }
762:
763: class AsyncQueueReader : AsyncResult, IQueueReader
764: {
765: static TimerCallback timerCallback = new TimerCallback(AsyncQueueReader.TimerCallback);
766:
767: bool expired;
768: InputQueue<T> inputQueue;
769: T item;
770: Timer timer;
771:
772: public AsyncQueueReader(InputQueue<T> inputQueue, TimeSpan timeout, AsyncCallback callback, object state)
773: : base(callback, state)
774: {
775: this.inputQueue = inputQueue;
776: if (timeout != TimeSpan.MaxValue)
777: {
778: this.timer = new Timer(timerCallback, this, timeout, TimeSpan.FromMilliseconds(-1));
779: }
780: }
781:
782: public static bool End(IAsyncResult result, out T value)
783: {
784: AsyncQueueReader readerResult = AsyncResult.End<AsyncQueueReader>(result);
785:
786: if (readerResult.expired)
787: {
788: value = default(T);
789: return false;
790: }
791: else
792: {
793: value = readerResult.item;
794: return true;
795: }
796: }
797:
798: static void TimerCallback(object state)
799: {
800: AsyncQueueReader thisPtr = (AsyncQueueReader)state;
801: if (thisPtr.inputQueue.RemoveReader(thisPtr))
802: {
803: thisPtr.expired = true;
804: thisPtr.Complete(false);
805: }
806: }
807:
808: public void Set(Item item)
809: {
810: this.item = item.Value;
811: if (this.timer != null)
812: {
813: this.timer.Change(-1, -1);
814: }
815: Complete(false, item.Exception);
816: }
817: }
818:
819: struct Item
820: {
821: T value;
822: Exception exception;
823: ItemDequeuedCallback dequeuedCallback;
824:
825: public Item(T value, ItemDequeuedCallback dequeuedCallback)
826: : this(value, null, dequeuedCallback)
827: {
828: }
829:
830: public Item(Exception exception, ItemDequeuedCallback dequeuedCallback)
831: : this(null, exception, dequeuedCallback)
832: {
833: }
834:
835: Item(T value, Exception exception, ItemDequeuedCallback dequeuedCallback)
836: {
837: this.value = value;
838: this.exception = exception;
839: this.dequeuedCallback = dequeuedCallback;
840: }
841:
842: public Exception Exception
843: {
844: get { return this.exception; }
845: }
846:
847: public T Value
848: {
849: get { return value; }
850: }
851:
852: public ItemDequeuedCallback DequeuedCallback
853: {
854: get { return dequeuedCallback; }
855: }
856:
857: public void Dispose()
858: {
859: if (value != null)
860: {
861: if (value is IDisposable)
862: {
863: ((IDisposable)value).Dispose();
864: }
865: else if (value is ICommunicationObject)
866: {
867: ((ICommunicationObject)value).Abort();
868: }
869: }
870: }
871:
872: public T GetValue()
873: {
874: if (this.exception != null)
875: {
876: throw this.exception;
877: }
878:
879: return this.value;
880: }
881: }
882:
883: class WaitQueueWaiter : IQueueWaiter
884: {
885: bool itemAvailable;
886: ManualResetEvent waitEvent;
887: object thisLock = new object();
888:
889: public WaitQueueWaiter()
890: {
891: waitEvent = new ManualResetEvent(false);
892: }
893:
894: object ThisLock
895: {
896: get
897: {
898: return this.thisLock;
899: }
900: }
901:
902: public void Set(bool itemAvailable)
903: {
904: lock (ThisLock)
905: {
906: this.itemAvailable = itemAvailable;
907: waitEvent.Set();
908: }
909: }
910:
911: public bool Wait(TimeSpan timeout)
912: {
913: if (timeout == TimeSpan.MaxValue)
914: {
915: waitEvent.WaitOne();
916: }
917: else if (!waitEvent.WaitOne(timeout, false))
918: {
919: return false;
920: }
921:
922: return this.itemAvailable;
923: }
924: }
925:
926: class AsyncQueueWaiter : AsyncResult, IQueueWaiter
927: {
928: static TimerCallback timerCallback = new TimerCallback(AsyncQueueWaiter.TimerCallback);
929: Timer timer;
930: bool itemAvailable;
931: object thisLock = new object();
932:
933: public AsyncQueueWaiter(TimeSpan timeout, AsyncCallback callback, object state)
934: : base(callback, state)
935: {
936: if (timeout != TimeSpan.MaxValue)
937: {
938: this.timer = new Timer(timerCallback, this, timeout, TimeSpan.FromMilliseconds(-1));
939: }
940: }
941:
942: object ThisLock
943: {
944: get
945: {
946: return this.thisLock;
947: }
948: }
949:
950: public static bool End(IAsyncResult result)
951: {
952: AsyncQueueWaiter waiterResult = AsyncResult.End<AsyncQueueWaiter>(result);
953: return waiterResult.itemAvailable;
954: }
955:
956: static void TimerCallback(object state)
957: {
958: AsyncQueueWaiter thisPtr = (AsyncQueueWaiter)state;
959: thisPtr.Complete(false);
960: }
961:
962: public void Set(bool itemAvailable)
963: {
964: bool timely;
965:
966: lock (ThisLock)
967: {
968: timely = (this.timer == null) || this.timer.Change(-1, -1);
969: this.itemAvailable = itemAvailable;
970: }
971:
972: if (timely)
973: {
974: Complete(false);
975: }
976: }
977: }
978:
979: class ItemQueue
980: {
981: Item[] items;
982: int head;
983: int pendingCount;
984: int totalCount;
985:
986: public ItemQueue()
987: {
988: items = new Item[1];
989: }
990:
991: public Item DequeueAvailableItem()
992: {
993: if (totalCount == pendingCount)
994: {
995: Debug.Assert(false, "ItemQueue does not contain any available items");
996: throw new Exception("Internal Error");
997: }
998: return DequeueItemCore();
999: }
1000:
1001: public Item DequeueAnyItem()
1002: {
1003: if (pendingCount == totalCount)
1004: pendingCount--;
1005: return DequeueItemCore();
1006: }
1007:
1008: void EnqueueItemCore(Item item)
1009: {
1010: if (totalCount == items.Length)
1011: {
1012: Item[] newItems = new Item[items.Length * 2];
1013: for (int i = 0; i < totalCount; i++)
1014: newItems[i] = items[(head + i) % items.Length];
1015: head = 0;
1016: items = newItems;
1017: }
1018: int tail = (head + totalCount) % items.Length;
1019: items[tail] = item;
1020: totalCount++;
1021: }
1022:
1023: Item DequeueItemCore()
1024: {
1025: if (totalCount == 0)
1026: {
1027: Debug.Assert(false, "ItemQueue does not contain any items");
1028: throw new Exception("Internal Error");
1029: }
1030: Item item = items[head];
1031: items[head] = new Item();
1032: totalCount--;
1033: head = (head + 1) % items.Length;
1034: return item;
1035: }
1036:
1037: public void EnqueuePendingItem(Item item)
1038: {
1039: EnqueueItemCore(item);
1040: pendingCount++;
1041: }
1042:
1043: public void EnqueueAvailableItem(Item item)
1044: {
1045: EnqueueItemCore(item);
1046: }
1047:
1048: public void MakePendingItemAvailable()
1049: {
1050: if (pendingCount == 0)
1051: {
1052: Debug.Assert(false, "ItemQueue does not contain any pending items");
1053: throw new Exception("Internal Error");
1054: }
1055: pendingCount--;
1056: }
1057:
1058: public bool HasAvailableItem
1059: {
1060: get { return totalCount > pendingCount; }
1061: }
1062:
1063: public bool HasAnyItem
1064: {
1065: get { return totalCount > 0; }
1066: }
1067:
1068: public int ItemCount
1069: {
1070: get { return totalCount; }
1071: }
1072: }
1073: }
And in order to cooperate with this InputQueue<T> we also need a asynchronous result class, which will be inherited from the IAsyncResult interface.
1: /// <summary>
2: /// A generic base class for IAsyncResult implementations
3: /// that wraps a ManualResetEvent.
4: /// </summary>
5: abstract class AsyncResult : IAsyncResult
6: {
7: AsyncCallback callback;
8: object state;
9: bool completedSynchronously;
10: bool endCalled;
11: Exception exception;
12: bool isCompleted;
13: ManualResetEvent manualResetEvent;
14: object thisLock;
15:
16: protected AsyncResult(AsyncCallback callback, object state)
17: {
18: this.callback = callback;
19: this.state = state;
20: this.thisLock = new object();
21: }
22:
23: public object AsyncState
24: {
25: get
26: {
27: return state;
28: }
29: }
30:
31: public WaitHandle AsyncWaitHandle
32: {
33: get
34: {
35: if (manualResetEvent != null)
36: {
37: return manualResetEvent;
38: }
39:
40: lock (ThisLock)
41: {
42: if (manualResetEvent == null)
43: {
44: manualResetEvent = new ManualResetEvent(isCompleted);
45: }
46: }
47:
48: return manualResetEvent;
49: }
50: }
51:
52: public bool CompletedSynchronously
53: {
54: get
55: {
56: return completedSynchronously;
57: }
58: }
59:
60: public bool IsCompleted
61: {
62: get
63: {
64: return isCompleted;
65: }
66: }
67:
68: object ThisLock
69: {
70: get
71: {
72: return this.thisLock;
73: }
74: }
75:
76: // Call this version of complete when your asynchronous operation is complete. This will update the state
77: // of the operation and notify the callback.
78: protected void Complete(bool completedSynchronously)
79: {
80: if (isCompleted)
81: {
82: // It is a bug to call Complete twice.
83: throw new InvalidOperationException("Cannot call Complete twice");
84: }
85:
86: this.completedSynchronously = completedSynchronously;
87:
88: if (completedSynchronously)
89: {
90: // If we completedSynchronously, then there is no chance that the manualResetEvent was created so
91: // we do not need to worry about a race condition.
92: Debug.Assert(this.manualResetEvent == null, "No ManualResetEvent should be created for a synchronous AsyncResult.");
93: this.isCompleted = true;
94: }
95: else
96: {
97: lock (ThisLock)
98: {
99: this.isCompleted = true;
100: if (this.manualResetEvent != null)
101: {
102: this.manualResetEvent.Set();
103: }
104: }
105: }
106:
107: // If the callback throws, there is a bug in the callback implementation
108: if (callback != null)
109: {
110: callback(this);
111: }
112: }
113:
114: // Call this version of complete if you raise an exception during processing. In addition to notifying
115: // the callback, it will capture the exception and store it to be thrown during AsyncResult.End.
116: protected void Complete(bool completedSynchronously, Exception exception)
117: {
118: this.exception = exception;
119: Complete(completedSynchronously);
120: }
121:
122: // End should be called when the End function for the asynchronous operation is complete. It
123: // ensures the asynchronous operation is complete, and does some common validation.
124: protected static TAsyncResult End<TAsyncResult>(IAsyncResult result)
125: where TAsyncResult : AsyncResult
126: {
127: if (result == null)
128: {
129: throw new ArgumentNullException("result");
130: }
131:
132: TAsyncResult asyncResult = result as TAsyncResult;
133:
134: if (asyncResult == null)
135: {
136: throw new ArgumentException("Invalid async result.", "result");
137: }
138:
139: if (asyncResult.endCalled)
140: {
141: throw new InvalidOperationException("Async object already ended.");
142: }
143:
144: asyncResult.endCalled = true;
145:
146: if (!asyncResult.isCompleted)
147: {
148: asyncResult.AsyncWaitHandle.WaitOne();
149: }
150:
151: if (asyncResult.manualResetEvent != null)
152: {
153: asyncResult.manualResetEvent.Close();
154: }
155:
156: if (asyncResult.exception != null)
157: {
158: throw asyncResult.exception;
159: }
160:
161: return asyncResult;
162: }
163: }
164:
165: //An AsyncResult that completes as soon as it is instantiated.
166: class CompletedAsyncResult : AsyncResult
167: {
168: public CompletedAsyncResult(AsyncCallback callback, object state)
169: : base(callback, state)
170: {
171: Complete(true);
172: }
173:
174: public static void End(IAsyncResult result)
175: {
176: AsyncResult.End<CompletedAsyncResult>(result);
177: }
178: }
179:
180: //A strongly typed AsyncResult
181: abstract class TypedAsyncResult<T> : AsyncResult
182: {
183: T data;
184:
185: protected TypedAsyncResult(AsyncCallback callback, object state)
186: : base(callback, state)
187: {
188: }
189:
190: public T Data
191: {
192: get { return data; }
193: }
194:
195: protected void Complete(T data, bool completedSynchronously)
196: {
197: this.data = data;
198: Complete(completedSynchronously);
199: }
200:
201: public static T End(IAsyncResult result)
202: {
203: TypedAsyncResult<T> typedResult = AsyncResult.End<TypedAsyncResult<T>>(result);
204: return typedResult.Data;
205: }
206: }
207:
208: //A strongly typed AsyncResult that completes as soon as it is instantiated.
209: class TypedCompletedAsyncResult<T> : TypedAsyncResult<T>
210: {
211: public TypedCompletedAsyncResult(T data, AsyncCallback callback, object state)
212: : base(callback, state)
213: {
214: Complete(data, true);
215: }
216:
217: public new static T End(IAsyncResult result)
218: {
219: TypedCompletedAsyncResult<T> completedResult = result as TypedCompletedAsyncResult<T>;
220: if (completedResult == null)
221: {
222: throw new ArgumentException("Invalid async result.", "result");
223: }
224:
225: return TypedAsyncResult<T>.End(completedResult);
226: }
227: }
I’m not going to explain more deeply into the InputQueue<T> and the AsyncResult classes. If you want to know more about it there’s a very good blog post to be reference.
Then with this new scaffold let’s amend our ChannelListener. First we will define a local variant of the InputQueue<T>, where the type parameter would be the IReplyChannel. This means we will queue all request channel in this class. We also need a local variant for currently using channel and an object for locking.
1: private readonly InputQueue<IReplyChannel> _channelQueue;
2: private readonly object _currentChannelLock;
3: private IReplyChannel _currentChannel;
And initialize them in the constructor.
1: public MessageBusReplyChannelListener(MessageBusTransportBindingElement transportElement, BindingContext context)
2: : base(context.Binding)
3: {
4: _encoderFactory = (new TextMessageEncodingBindingElement()).CreateMessageEncoderFactory();
5: _bufferManager = BufferManager.CreateBufferManager(transportElement.MaxBufferPoolSize, int.MaxValue);
6: _uri = new Uri(context.ListenUriBaseAddress, context.ListenUriRelativeAddress);
7: _bus = transportElement.Bus;
8:
9: _channelQueue = new InputQueue<IReplyChannel>();
10: _currentChannelLock = new object();
11: _currentChannel = null;
12: }
Since the InputQueue<T> need to execute some procedures when terminate we have to implement the OnAbort, OnClose and OnClosed virtual methods to close the InputQueue<T>.
1: protected override void OnAbort()
2: {
3: try
4: {
5: lock (ThisLock)
6: {
7: _channelQueue.Close();
8: }
9: }
10: catch { }
11: }
12:
13: protected override void OnClose(TimeSpan timeout)
14: {
15: try
16: {
17: lock (ThisLock)
18: {
19: _channelQueue.Close();
20: }
21: }
22: catch { }
23: }
24:
25: protected override void OnClosed()
26: {
27: base.OnClosed();
28:
29: try
30: {
31: _bufferManager.Clear();
32: _bus.Dispose();
33: }
34: catch { }
35: }
And then we will create an internal method that ensure the current channel is available. It will check if the local variant _currentChannel is null. If yes then it will create a new channel from another private method called CreateChannel, which we will implement in the next step, and assign the OnChannelClosed method to its Closed event, and then enqueued into the InputQueue<IRequestChannel>. If not, which means there’s a channel object available, we will not do anything here.
1: private void EnsureChannelAvailable()
2: {
3: IReplyChannel newChannel = null;
4: bool channelCreated = false;
5:
6: if ((newChannel = _currentChannel) == null)
7: {
8: lock (_currentChannelLock)
9: {
10: if ((newChannel = _currentChannel) == null)
11: {
12: newChannel = CreateChannel(_bufferManager, _encoder, new EndpointAddress(_uri), this, _bus);
13: newChannel.Closed += new EventHandler(OnChannelClosed);
14: _currentChannel = newChannel;
15: channelCreated = true;
16: }
17: }
18: }
19:
20: if (channelCreated)
21: {
22: _channelQueue.EnqueueAndDispatch(newChannel);
23: }
24: }
25:
26: private IReplyChannel CreateChannel(
27: BufferManager bufferManager, MessageEncoderFactory encoder, EndpointAddress localAddress, MessageBusReplyChannelListener parent, IBus bus)
28: {
29: return new MessageBusReplyChannel(_bufferManager, _encoder, localAddress, parent, _bus);
30: }
31:
32: private void OnChannelClosed(object sender, EventArgs e)
33: {
34: var channel = sender as IReplyChannel;
35: lock (_currentChannelLock)
36: {
37: if (channel == _currentChannel)
38: {
39: _currentChannel = null;
40: }
41: }
42: }
Now we can use them to finish the asynchronous BeginOnAcceptChannel, EndOnAcceptChannel, and we will also amend the synchronous OnAcceptChannel as well.
1: protected override IReplyChannel OnAcceptChannel(TimeSpan timeout)
2: {
3: if (!IsDisposed)
4: {
5: EnsureChannelAvailable();
6: }
7:
8: IReplyChannel channel = null;
9: if (_channelQueue.Dequeue(timeout, out channel))
10: {
11: return channel;
12: }
13: else
14: {
15: throw new TimeoutException();
16: }
17: }
18:
19: protected override IAsyncResult OnBeginAcceptChannel(TimeSpan timeout, AsyncCallback callback, object state)
20: {
21: if (!IsDisposed)
22: {
23: EnsureChannelAvailable();
24: }
25:
26: return _channelQueue.BeginDequeue(timeout, callback, state);
27: }
28:
29: protected override IReplyChannel OnEndAcceptChannel(IAsyncResult result)
30: {
31: IReplyChannel channel;
32: if (_channelQueue.EndDequeue(result, out channel))
33: {
34: return channel;
35: }
36: else
37: {
38: throw new TimeoutException();
39: }
40: }
Server: ReplyChannel and RequestContext
The ReplyChannel created from the ChannelListener also need to support the asynchronous functions in order to make it work with the ServiceHost. In this case we will just use the asynchronous delegate invoke. Since we have had the synchronous ReceiveRequest method, what we need to do is to create a delegate from it, and invoke its Begin and End invoke method in the BeginTryRecevieRequest and the EndTryReceiveRequest methods.
1: private delegate bool TryReceiveRequestDelegate(TimeSpan timeout, out RequestContext context);
2: private TryReceiveRequestDelegate _tryReceiveRequestDelegate;
Initialize this delegate based on what we did in the ReceiveRequest method.
1: public MessageBusReplyChannel(
2: BufferManager bufferManager, MessageEncoderFactory encoder, EndpointAddress address,
3: MessageBusReplyChannelListener parent,
4: IBus bus)
5: : base(parent)
6: {
7: _bufferManager = bufferManager;
8: _encoder = encoder.CreateSessionEncoder();
9:
10: _localAddress = address;
11: _bus = bus;
12: _aLock = new object();
13:
14: _tryReceiveRequestDelegate = (TimeSpan t, out RequestContext rc) =>
15: {
16: rc = null;
17: // receive the request message from the bus
18: var busMsg = _bus.Receive(true, null);
19: // box the wcf message
20: var raw = Encoding.UTF8.GetBytes(busMsg.Content);
21: var data = _bufferManager.TakeBuffer(raw.Length);
22: Buffer.BlockCopy(raw, 0, data, 0, raw.Length);
23: var buffer = new ArraySegment<byte>(data, 0, raw.Length);
24: var message = _encoder.ReadMessage(buffer, _bufferManager);
25: // initialize the request context and return
26: rc = new MessageBusRequestContext(message, this, _bufferManager, _encoder, _localAddress, _bus, busMsg.MessageID);
27: return true;
28: };
29: }
And invoke it asynchronously in the BeginTryRecevieRequest and the EndTryReceiveRequest methods.
1: public IAsyncResult BeginTryReceiveRequest(TimeSpan timeout, AsyncCallback callback, object state)
2: {
3: RequestContext context;
4: return _tryReceiveRequestDelegate.BeginInvoke(timeout, out context, callback, state);
5: }
6:
7: public bool EndTryReceiveRequest(IAsyncResult result, out RequestContext context)
8: {
9: var ret = _tryReceiveRequestDelegate.EndInvoke(out context, result);
10: return ret;
11: }
The ReplyChannel had been modified finished and we don’t need to change anything in the RequestContext. Then let’s have a try.
Use the ServiceHost and ChannelFactory Mode
Now we will use the way that you are familiar with to test our transport extension. There is no channel listener, no channel in our test code, and the service business logic will not be mixed within the service hosting code. Let’s have a look.
First we will create a service contract interface and a service class which implement it. Again, we will use the string reverse as the sample.
1: [ServiceContract(Namespace="http://wcf.shaunxu.me/")]
2: public interface ISampleService
3: {
4: [OperationContract]
5: string Reverse(string content);
6: }
7:
8: public class SampleService : ISampleService
9: {
10: public string Reverse(string content)
11: {
12: return new string(content.Reverse().ToArray());
13: }
14: }
And in the main function we will create and open the service by using the ServiceHost. But since we didn’t implement any configuration extension so for now we have to use the code to set the binding and address.
1: static void Main(string[] args)
2: {
3: var bus = new InProcMessageBus();
4: var address = "net.bus://localhost/sample";
5:
6: // establish the service
7: var host = new ServiceHost(typeof(SampleService));
8: var svcBinding = new MessageBusTransportBinding(bus);
9: host.AddServiceEndpoint(typeof(ISampleService), svcBinding, address);
10: host.Opened += (sender, e) =>
11: {
12: Console.WriteLine("Service opened at {0}", host.Description.Endpoints[0].ListenUri);
13: };
14: host.Open();
15: }
And we will use the ChannelFactory<TChannel> to create a client side proxy.
1: static void Main(string[] args)
2: {
3: var bus = new InProcMessageBus();
4: var address = "net.bus://localhost/sample";
5:
6: // establish the services
7: ... ...
8:
9: // establish the client
10: var cliBinding = new MessageBusTransportBinding(bus);
11: var factory = new ChannelFactory<ISampleService>(cliBinding, address);
12: factory.Opened += (sender, e) =>
13: {
14: Console.WriteLine("Client connected to {0}", factory.Endpoint.ListenUri);
15: };
16: var proxy = factory.CreateChannel();
17: }
Then finally invoke the service by the proxy the ChannelFactory generate to us.
1: static void Main(string[] args)
2: {
3: var bus = new InProcMessageBus();
4: var address = "net.bus://localhost/sample";
5:
6: // establish the service
7: ... ...
8:
9: // establish the client
10: ... ...
11:
12: // invoke the service
13: using (proxy as IDisposable)
14: {
15: Console.WriteLine("Say something...");
16: var content = Console.ReadLine();
17: while (!string.IsNullOrEmpty(content))
18: {
19: var result = proxy.Reverse(content);
20: Console.WriteLine("RESULT: {0} => {1}", content, result);
21: Console.WriteLine("Say something...");
22: content = Console.ReadLine();
23: }
24: }
25: }
And this is the screenshot when I was running this code on my machine.

But don’t forget our goal, scaling-out on the server instances. Similar as what we did before, we will create more than one ServiceHost instance that listening on the same endpoint, which can handle the client request at the same time, to simulate the cross process or cross machine scaling-out scenario.
1: class Program
2: {
3: static void Main(string[] args)
4: {
5: var bus = new InProcMessageBus();
6: var address = "net.bus://localhost/sample";
7:
8: // establish the services
9: var host1 = EstablishServiceHost<ISampleService, SampleService>(bus, address);
10: var host2 = EstablishServiceHost<ISampleService, SampleService>(bus, address);
11: var host3 = EstablishServiceHost<ISampleService, SampleService>(bus, address);
12:
13: // establish the client
14: var cliBinding = new MessageBusTransportBinding(bus);
15: var factory = new ChannelFactory<ISampleService>(cliBinding, address);
16: factory.Opened += (sender, e) =>
17: {
18: Console.WriteLine("Client connected to {0}", factory.Endpoint.ListenUri);
19: };
20: var proxy = factory.CreateChannel();
21:
22: // invoke the service
23: using (proxy as IDisposable)
24: {
25: Console.WriteLine("Say something...");
26: var content = Console.ReadLine();
27: while (!string.IsNullOrEmpty(content))
28: {
29: var result = proxy.Reverse(content);
30: Console.WriteLine("RESULT: {0} => {1}", content, result);
31: Console.WriteLine("Say something...");
32: content = Console.ReadLine();
33: }
34: }
35: }
36:
37: static ServiceHost EstablishServiceHost<TChannel, TService>(IBus bus, string address)
38: {
39: var host = new ServiceHost(typeof(TService));
40: var binding = new MessageBusTransportBinding(bus);
41: host.AddServiceEndpoint(typeof(TChannel), binding, address);
42: host.Opened += (sender, e) =>
43: {
44: Console.WriteLine("Service ({0}) opened at {1}", host.GetHashCode(), host.Description.Endpoints[0].ListenUri);
45: };
46: host.Open();
47: return host;
48: }
49: }
As you can see I created three hosts and one client. In order to display which service host is handing the request I add one line code inside the service implementation class that display the current service host’s hash code.
1: public class SampleService : ISampleService
2: {
3: public string Reverse(string content)
4: {
5: Console.WriteLine("SERVICE: Invoked on the service host instance {0}.", OperationContext.Current.Host.GetHashCode());
6: return new string(content.Reverse().ToArray());
7: }
8: }
So let’s have a look on what’s going on.

As you can see I invoked the service method four times and the first three of them was handled by the three different service host, which means scaling-out to my service instances. And the fourth one was handled by the first service host instance.
Summary
In this post I described how to amend our extension so that we can use it in a better way for service definition, implementation, hosting and client invoking. Once we implemented the relevant asynchronous methods in the ChannelListener and ReplyChannel we can use the ServiceContract interface to define the service contract, and use the ServiceHost to let WCF create the service class instance, open the service. On the client side we can use the ChannelFactory<TChannel> to get the proxy class of our ServiceContract interface. All of them are what we had been doing for a long time and familiar with.
Till now we had finished the basis of the transport extension, even though we had just implemented the request reply MEP. But as what I said in the first post, all other MEPs can be implemented by introducing the more ChannelListener, ChannelFactory and Channel.
But things always not as simple as we expected. In the next post I will show you how to implement the datagram MEP, with some code refactoring, with some bug fix.
You can download the source code of this post here.
Hope this helps,
Shaun
All documents and related graphics, codes are provided "AS IS" without warranty of any kind.
Copyright © Shaun Ziyan Xu. This work is licensed under the Creative Commons License.
In my previous post I introduced the architecture of message bus based system, the dispatcher mode and the pulling mode. I also explained a bit about the channel mode and transport extensibility of WCF. And then, in order to make the following sample simple and easy to use I created an in process and in memory message bus.
In this post, I will demonstrate how to create a WCF transport extension over this memory message bus, for the most common MEP – Request Reply mode. Before we go to the implementation, let recap the WCF transport mode.

The figure above shows the classes that introduced for the request-reply MEP.
First of all, we need a dedicate binding for this transport. As I said in the previous post, the binding takes the responsible for creating and adding all necessary binding elements into the channel stack. There could be many binding elements in the stack but the encoding and transport are mandatory. The encoding binding element defines how the message should be serialized and deserialized. And the transport binding element must be the last on in the stack.
As I mentioned, we can leverage the WCF build-in CustomBinding, but for a fully example I will create our own binding in this series.
The transport binging element is the root of our transport extension, it takes the responsible to create all following stuff and pass all necessary information into the next level. Based on the service or client side, transport binding element will create the instance of channel factory and the channel listener. The binding and transport binding element will be used on both client and server side.
On the client side, the channel factory will be used to check if the MEP is support by this transport. And if applicable, it will create the related channel. For example, if the current MEP is request-reply and is supported, then the channel factory will create a request channel.
How many EMPs the transport can support is based on us. That’s to say, we defines which MEP can be supported by our transport. It’s no need to support all MEPs in a transport. For example, the basic HTTP transport doesn’t support duplex MEP, the MSMQ transport only support datagram MEP.
On the server side, the channel listener take the similar responsible as the channel factory on client side. It will create the related channel as well on the server side.
The channels, in this case the client side request channel and server side reply channel, takes the responsible to receive and send proper messages from the underlying transportation, which is our message bus in this case.
Binding
Our binding class will be responsible for creating all necessary binding elements. In WCF we can define a binding by the code, or by the configuration. But in this example I will only show how to do it by program. And in order to make things as simple as possible, the binding will create only two elements:
- TextMessageEncodingBindingElement: This will help us to encode the WCF message into the plain text, which is very easy to debug and discover what the actual message is.
- MessageBusTransportBindingElement: This is the transport binding element which we will create later.
As I’ve said, since the MessageBusTransportBindingElement is the transport binding element, it must be added at end of all others.
1: public class MessageBusTransportBinding : Binding
2: {
3: private readonly MessageEncodingBindingElement _messageElement;
4: private readonly MessageBusTransportBindingElement _transportElement;
5:
6: public MessageBusTransportBinding()
7: : base()
8: {
9: _messageElement = new TextMessageEncodingBindingElement();
10: _transportElement = new MessageBusTransportBindingElement();
11: }
12:
13: public override BindingElementCollection CreateBindingElements()
14: {
15: var elements = new BindingElementCollection();
16: elements.Add(_messageElement);
17: // the transport binding element must be the last one
18: elements.Add(_transportElement);
19: return elements.Clone();
20: }
21: }
Our MessageBusTransportBinding class inherits from the System.ServiceModel.Channels.Binding class, which is the base class for any bindings, such as basic HTTP, NET.TCP, etc.. And in the constructor we initialize the TextMessageEncodingBindingElement and MessageBusTransportBindingElement. When the WCF creates this binding it will invoke the CreateBindingElements method, and in this method we add these 2 binding elements.
Highlight once again, the transport binding element, MessageBusTransportBindingElement in our example, MUST be added at the end of the list of the binding elements.
Another property that must be implemented for a binding is the Scheme. If you are familiar with web development you would know that the scheme is the first part of the URL, which defines the protocol of your connection. WCF absorbs this concept and for each binding it must have a predefined scheme. Since the scheme is related with the underlying transportation, so it will be defined in the transport binding element. Hence in the binding layer we just need to return the transport binding element scheme back.
1: public override string Scheme
2: {
3: get
4: {
5: return _transportElement.Scheme;
6: }
7: }
Since we need to make the transport binding element to be able to operate our message bus, we also need to pass the bus instance into the binding and send to the underlying object. So the constructor need to have a parameter to accept the bus object and pass it to the transport binding element. So the constructor of our binding should be like this.
1: public MessageBusTransportBinding(IBus bus)
2: : base()
3: {
4: _messageElement = new TextMessageEncodingBindingElement();
5: _transportElement = new MessageBusTransportBindingElement(bus);
6: }
OK, now we have the binding ready, the full code should be like this below and the next step is to create our own transport binding element.
1: public class MessageBusTransportBinding : Binding
2: {
3: private readonly MessageEncodingBindingElement _messageElement;
4: private readonly MessageBusTransportBindingElement _transportElement;
5:
6: public MessageBusTransportBinding(IBus bus)
7: : base()
8: {
9: _messageElement = new TextMessageEncodingBindingElement();
10: _transportElement = new MessageBusTransportBindingElement(bus);
11: }
12:
13: public override BindingElementCollection CreateBindingElements()
14: {
15: var elements = new BindingElementCollection();
16: elements.Add(_messageElement);
17: // the transport binding element must be the last one
18: elements.Add(_transportElement);
19: return elements.Clone();
20: }
21:
22: public override string Scheme
23: {
24: get
25: {
26: return _transportElement.Scheme;
27: }
28: }
29: }
Transport Binding Element
The transport binding element inherits from System.ServiceModel.Channels.TransportBindingElement base class and it has three main responsibilities:
- Define the transport scheme.
- Determined what kind of MEP it supports, which means what kind of channels it can create.
- Create the relevant channel factory and channel listener based on the current MEP.
Let’s implement them one by one. Firstly, we must define the scheme of this transport, at it will be the scheme of the endpoint address of all services that use our transport. In our example I would like to use “net.bus” as the scheme, so in the future the services on our transport would be using the endpoint likes “net.bus://localhost/MySameplService”.
1: public class MessageBusTransportBindingElement : TransportBindingElement
2: {
3: public const string CST_SCHEME = "net.bus";
4:
5: public override string Scheme
6: {
7: get
8: {
9: return CST_SCHEME;
10: }
11: }
12: }
There are two generic methods we must implement to determined what kind of MEP it supports, which are the CanBuildChannelFactory<TChannel> and CanBuildChannelListener<TChannel>. We check the type parameter TChannel and based on its type value, we return true or false back. Since currently we only support request-reply mode, so it will return true only if the TChannel is IRplyChannel on the server side (CanBuildChannelListener) and IRequestChannel on the client side (CanBuildChannelFactory).
1: public override bool CanBuildChannelFactory<TChannel>(BindingContext context)
2: {
3: return typeof(TChannel) == typeof(IRequestChannel);
4: }
5:
6: public override bool CanBuildChannelListener<TChannel>(BindingContext context)
7: {
8: return typeof(TChannel) == typeof(IReplyChannel);
9: }
Then the methods BuildChannelFactory<TChannel> and BuildChannelListener<TChannel> would be the place where we will create the proper channel instance. The BuildChannelFactory<TChannel> will create the channels on the client side, which based on the TChannel, while the BuildChannelListener<TChannel> will create the channels on the server side.
1: public override IChannelFactory<TChannel> BuildChannelFactory<TChannel>(BindingContext context)
2: {
3: if (context == null)
4: {
5: throw new ArgumentNullException("context");
6: }
7: if (!CanBuildChannelFactory<TChannel>(context))
8: {
9: throw new ArgumentException(string.Format("Unsupported channel type: {0} for the channel factory.", typeof(TChannel).Name));
10: }
11:
12: if (typeof(TChannel) == typeof(IRequestChannel))
13: {
14: return (IChannelFactory<TChannel>)(object)new MessageBusRequestChannelFactory(this, context);
15: }
16: else
17: {
18: throw new ArgumentException(string.Format("Unsupported channel type: {0} for the channel listener.", typeof(TChannel).Name));
19: }
20:
21: }
22:
23: public override IChannelListener<TChannel> BuildChannelListener<TChannel>(BindingContext context)
24: {
25: if (context == null)
26: {
27: throw new ArgumentNullException("context");
28: }
29: if (!CanBuildChannelListener<TChannel>(context))
30: {
31: throw new ArgumentException(string.Format("Unsupported channel type: {0} for the channel listener.", typeof(TChannel).Name));
32: }
33:
34: if (typeof(TChannel) == typeof(IReplyChannel))
35: {
36: return (IChannelListener<TChannel>)(object)new MessageBusReplyChannelListener(this, context);
37: }
38: else
39: {
40: throw new ArgumentException(string.Format("Unsupported channel type: {0} for the channel listener.", typeof(TChannel).Name));
41: }
42: }
Since we also need the bus object to be passed into the channel factory and channel listener we will add a private member to store it. And also the base class TransportBindingElement needs some other abstract members we must implement. These are fairly easy I don’t want to explain here, the full code of our transport binding element would be like this.
1: public class MessageBusTransportBindingElement : TransportBindingElement
2: {
3: public const string CST_SCHEME = "net.bus";
4:
5: private readonly IBus _bus;
6:
7: public IBus Bus
8: {
9: get
10: {
11: return _bus;
12: }
13: }
14:
15: public override string Scheme
16: {
17: get
18: {
19: return CST_SCHEME;
20: }
21: }
22:
23: public MessageBusTransportBindingElement(IBus bus)
24: : base()
25: {
26: _bus = bus;
27: }
28:
29: public MessageBusTransportBindingElement(MessageBusTransportBindingElement other)
30: : base(other)
31: {
32: _bus = other._bus;
33: }
34:
35: public override BindingElement Clone()
36: {
37: return new MessageBusTransportBindingElement(this);
38: }
39:
40: public override bool CanBuildChannelFactory<TChannel>(BindingContext context)
41: {
42: return typeof(TChannel) == typeof(IRequestChannel);
43: }
44:
45: public override bool CanBuildChannelListener<TChannel>(BindingContext context)
46: {
47: return typeof(TChannel) == typeof(IReplyChannel);
48: }
49:
50: public override IChannelFactory<TChannel> BuildChannelFactory<TChannel>(BindingContext context)
51: {
52: if (context == null)
53: {
54: throw new ArgumentNullException("context");
55: }
56: if (!CanBuildChannelFactory<TChannel>(context))
57: {
58: throw new ArgumentException(string.Format("Unsupported channel type: {0} for the channel factory.", typeof(TChannel).Name));
59: }
60:
61: if (typeof(TChannel) == typeof(IRequestChannel))
62: {
63: return (IChannelFactory<TChannel>)(object)new MessageBusRequestChannelFactory(this, context);
64: }
65: else
66: {
67: throw new ArgumentException(string.Format("Unsupported channel type: {0} for the channel listener.", typeof(TChannel).Name));
68: }
69:
70: }
71:
72: public override IChannelListener<TChannel> BuildChannelListener<TChannel>(BindingContext context)
73: {
74: if (context == null)
75: {
76: throw new ArgumentNullException("context");
77: }
78: if (!CanBuildChannelListener<TChannel>(context))
79: {
80: throw new ArgumentException(string.Format("Unsupported channel type: {0} for the channel listener.", typeof(TChannel).Name));
81: }
82:
83: if (typeof(TChannel) == typeof(IReplyChannel))
84: {
85: return (IChannelListener<TChannel>)(object)new MessageBusReplyChannelListener(this, context);
86: }
87: else
88: {
89: throw new ArgumentException(string.Format("Unsupported channel type: {0} for the channel listener.", typeof(TChannel).Name));
90: }
91: }
92: }
The binding and the transport binding element will be created and used on both server and client. And after the transport binding element, when the channel factory and channel listener was created, our code will be split to server and client. Some of the following classes should be run on the server side while other on the client side, based on what MEP and what channel it is.
Client: RequestChannelFactory and RequestChannel
The request channel factory will be created by the transport binding element, if the current channel is IRequestChannl on the client side. It inherits from the base class ChannelFactoryBase<IRequestChannel> and has only one responsibility: create the request channel.
Besides this main responsibility we also need some extra work to make our life easy. Since the underlying channel will communicate with the actual bus, we need the MessageEncoderFactory and BufferManager to read, write, encode and decode the message. These objects can be created from our encoder binding element.
1: public class MessageBusRequestChannelFactory : ChannelFactoryBase<IRequestChannel>
2: {
3: private readonly BufferManager _bufferManager;
4: private readonly MessageEncoderFactory _encoderFactory;
5: private readonly long _maxReceivedMessageSize;
6:
7:
8: public MessageBusRequestChannelFactory(MessageBusTransportBindingElement transportElement, BindingContext context)
9: : base(context.Binding)
10: {
11: _bufferManager = BufferManager.CreateBufferManager(transportElement.MaxBufferPoolSize, int.MaxValue);
12: var encodingElement = context.Binding.Elements.Find<MessageEncodingBindingElement>();
13: if (encodingElement == null)
14: {
15: _encoderFactory = (new TextMessageEncodingBindingElement()).CreateMessageEncoderFactory();
16: }
17: else
18: {
19: _encoderFactory = encodingElement.CreateMessageEncoderFactory();
20: }
21: _maxReceivedMessageSize = transportElement.MaxReceivedMessageSize;
22: }
23: }
The MessageVersion property defines which SOAP version and addressing version we are going to use in our transport. Just use the default value would be OK. In order to make the inner channel could be able to use the message bus, we also need to pass the bus object from the transport binding element. So the full code of our RequestChannlFactory would be like this.
1: public class MessageBusRequestChannelFactory : ChannelFactoryBase<IRequestChannel>
2: {
3: private readonly BufferManager _bufferManager;
4: private readonly MessageEncoderFactory _encoderFactory;
5: private readonly long _maxReceivedMessageSize;
6: private readonly IBus _bus;
7:
8: public MessageBusRequestChannelFactory(MessageBusTransportBindingElement transportElement, BindingContext context)
9: : base(context.Binding)
10: {
11: _bufferManager = BufferManager.CreateBufferManager(transportElement.MaxBufferPoolSize, int.MaxValue);
12: var encodingElement = context.Binding.Elements.Find<MessageEncodingBindingElement>();
13: if (encodingElement == null)
14: {
15: _encoderFactory = (new TextMessageEncodingBindingElement()).CreateMessageEncoderFactory();
16: }
17: else
18: {
19: _encoderFactory = encodingElement.CreateMessageEncoderFactory();
20: }
21: _maxReceivedMessageSize = transportElement.MaxReceivedMessageSize;
22: _bus = transportElement.Bus;
23: }
24:
25: public MessageVersion MessageVersion
26: {
27: get
28: {
29: return MessageVersion.Default;
30: }
31: }
32:
33: public long MaxReceivedMessageSize
34: {
35: get
36: {
37: return _maxReceivedMessageSize;
38: }
39: }
40:
41: protected override IRequestChannel OnCreateChannel(System.ServiceModel.EndpointAddress address, Uri via)
42: {
43: return new MessageBusRequestChannel(_bufferManager, _encoderFactory, this, address, via, _bus);
44: }
45:
46: protected override IAsyncResult OnBeginOpen(TimeSpan timeout, AsyncCallback callback, object state)
47: {
48: throw new NotImplementedException();
49: }
50:
51: protected override void OnEndOpen(IAsyncResult result)
52: {
53: throw new NotImplementedException();
54: }
55:
56: protected override void OnOpen(TimeSpan timeout)
57: {
58: }
59: }
Now we have entered the last thing on the client side, the request channel. As the lowest level in the transport structure, the channel should send and receive the actual message to the actual transportation, which is our message bus in this case. All channels are inherited from ChannelBase class, and implement the related channel interface. Since we are going to implement our request channel, so it should be inherited from the ChannelBase and the IRequestChannel.
1: public class MessageBusRequestChannel : ChannelBase, IRequestChannel
2: {
3: }
The ChannelBase abstract class need us to implement some abstract methods, which will be invoked when the channel is opened, closed and abort. In order to make our sample clear I just implement a blank OnOpen method, which will be invoked when the channel is opened but do nothing.
1: public class MessageBusRequestChannel : ChannelBase, IRequestChannel
2: {
3: protected override void OnAbort()
4: {
5: throw new NotImplementedException();
6: }
7:
8: protected override IAsyncResult OnBeginClose(TimeSpan timeout, AsyncCallback callback, object state)
9: {
10: throw new NotImplementedException();
11: }
12:
13: protected override IAsyncResult OnBeginOpen(TimeSpan timeout, AsyncCallback callback, object state)
14: {
15: throw new NotImplementedException();
16: }
17:
18: protected override void OnClose(TimeSpan timeout)
19: {
20: throw new NotImplementedException();
21: }
22:
23: protected override void OnEndClose(IAsyncResult result)
24: {
25: throw new NotImplementedException();
26: }
27:
28: protected override void OnEndOpen(IAsyncResult result)
29: {
30: throw new NotImplementedException();
31: }
32:
33: protected override void OnOpen(TimeSpan timeout)
34: {
35: }
36: }
The IRequestChannel defines all operations to send a request message from the client to server. In order to initialize the WCF message, add the message into the bus. So we will add some private members to helps us to operate the message.
1: private readonly IBus _bus;
2: private readonly Uri _via;
3: private readonly EndpointAddress _remoteAddress;
4: private readonly object _aLock;
5:
6: public MessageBusRequestChannel(
7: BufferManager bufferManager, MessageEncoderFactory encoder, MessageBusRequestChannelFactory parent,
8: EndpointAddress remoteAddress, Uri via, IBus bus)
9: : base(parent)
10: {
11: _via = via;
12: _remoteAddress = remoteAddress;
13: _bus = bus;
14: _aLock = new object();
15: }
Then what we need to do is to implement the two Request methods, one with a timeout parameter the other isn’t. The method without the timeout parameter could be just invoking the other one with the default send timeout value.
1: public Message Request(Message message)
2: {
3: return Request(message, DefaultSendTimeout);
4: }
For the other method, this is where we need to send the client request message, which comes from the method parameter, into the message bus. First of all we’d better check if the current channel is available and opened. Just invoke a based method.
1: public Message Request(Message message, TimeSpan timeout)
2: {
3: ThrowIfDisposedOrNotOpen();
4: }
And then what we need to do is to
- Unbox the WCF message and convert it into string, which can be sent into out message bus.
- Send the message into bus.
- Wait for the reply message comes from the server side.
- Box the message into WCF message format and return back.
Since we are using in memory bus, in fact we don’t need to convert the WCF message into string. We can send the message object directly into the bus. But in a real distributed scenario we cannot do this as the server are client are in different machine. So I’m converting them to string and show how to use the buffer manager and encoder to conversion between string and WCF message.
We can simply get the content of a WCF message by using its ToString() method, but this is not the best way. In WCF when we need to do the message encoding and decoding, we must use the MessageEncoder, which passed from our binding, and leverage the BufferManager to maximize performance and memory usage. If we go back to the constructor of our channel class you can see we had created the proper encoder from the parameter. So in the Request method we will use this encoder and the buffer manager to read the message and convert it into string. For more information about the encoder and the buffer manager please have a look on the MSDN documents here and here.
1: // unbox the message into string that will be sent into the bus
2: ArraySegment<byte> buffer;
3: string content;
4: using (message)
5: {
6: _remoteAddress.ApplyTo(message);
7: buffer = _encoder.WriteMessage(message, 64 * 1024, _bufferManager);
8: }
9: content = Encoding.UTF8.GetString(buffer.Array, buffer.Offset, buffer.Count);
10: _bufferManager.ReturnBuffer(buffer.Array);
Now we have the message content, then we can send it into the message bus by using the bus object that passed from the constructor. Since the request channel is located on the client side, when invoke the send method of the bus we should tell it this is come from client. And currently we don’t need to tell the server side where the request message come from, so we will leave the last parameter as null.
The SendRequest method will add the message into bus and get a unique ID back. Then the request channel will wait for the reply message, which takes this ID, back into the bus. This unique ID ensure that only this client can receive its own reply message. This is very importance if we have multiple clients are communicating with the same server.

1: // send the message into bus
2: var busMsgId = _bus.SendRequest(content, true, null);
3: // waiting for the reply message arrive from the bus
4: var replyMsg = _bus.Receive(false, busMsgId);
Finally, when the request channel received the related reply message, we will use the similar way to create the WCF message based on the content and return back.
1: // box the message from the bus message content and return back
2: var raw = Encoding.UTF8.GetBytes(replyMsg.Content);
3: var data = _bufferManager.TakeBuffer(raw.Length);
4: Buffer.BlockCopy(raw, 0, data, 0, raw.Length);
5: buffer = new ArraySegment<byte>(data, 0, raw.Length);
6: var reply = _encoder.ReadMessage(buffer, _bufferManager);
7: return reply;
Now we had finished all classes and operations on the client side to send the request message and wait for the reply message. The full code of the request channel would be like this.
1: public class MessageBusRequestChannel : ChannelBase, IRequestChannel
2: {
3: private readonly BufferManager _bufferManager;
4: private readonly MessageEncoder _encoder;
5:
6: private readonly IBus _bus;
7: private readonly Uri _via;
8: private readonly EndpointAddress _remoteAddress;
9: private readonly object _aLock;
10:
11: public MessageBusRequestChannel(
12: BufferManager bufferManager, MessageEncoderFactory encoder, MessageBusRequestChannelFactory parent,
13: EndpointAddress remoteAddress, Uri via, IBus bus)
14: : base(parent)
15: {
16: _bufferManager = bufferManager;
17: _encoder = encoder.CreateSessionEncoder();
18:
19: _via = via;
20: _remoteAddress = remoteAddress;
21: _bus = bus;
22: _aLock = new object();
23: }
24:
25: #region IRequestChannel
26:
27: public Uri Via
28: {
29: get
30: {
31: return _via;
32: }
33: }
34:
35: public IAsyncResult BeginRequest(Message message, TimeSpan timeout, AsyncCallback callback, object state)
36: {
37: throw new NotImplementedException();
38: }
39:
40: public IAsyncResult BeginRequest(Message message, AsyncCallback callback, object state)
41: {
42: throw new NotImplementedException();
43: }
44:
45: public Message EndRequest(IAsyncResult result)
46: {
47: throw new NotImplementedException();
48: }
49:
50: public System.ServiceModel.EndpointAddress RemoteAddress
51: {
52: get { throw new NotImplementedException(); }
53: }
54:
55: public Message Request(Message message, TimeSpan timeout)
56: {
57: ThrowIfDisposedOrNotOpen();
58: lock (_aLock)
59: {
60: // unbox the message into string that will be sent into the bus
61: ArraySegment<byte> buffer;
62: string content;
63: using (message)
64: {
65: _remoteAddress.ApplyTo(message);
66: buffer = _encoder.WriteMessage(message, 64 * 1024, _bufferManager);
67: }
68: content = Encoding.UTF8.GetString(buffer.Array, buffer.Offset, buffer.Count);
69: _bufferManager.ReturnBuffer(buffer.Array);
70: // send the message into bus
71: var busMsgId = _bus.SendRequest(content, true, null);
72: // waiting for the reply message arrive from the bus
73: var replyMsg = _bus.Receive(false, busMsgId);
74: // box the message from the bus message content and return back
75: var raw = Encoding.UTF8.GetBytes(replyMsg.Content);
76: var data = _bufferManager.TakeBuffer(raw.Length);
77: Buffer.BlockCopy(raw, 0, data, 0, raw.Length);
78: buffer = new ArraySegment<byte>(data, 0, raw.Length);
79: var reply = _encoder.ReadMessage(buffer, _bufferManager);
80: return reply;
81: }
82: }
83:
84: public Message Request(Message message)
85: {
86: return Request(message, DefaultSendTimeout);
87: }
88:
89: #endregion
90:
91: #region ChannelBase
92:
93: protected override void OnAbort()
94: {
95: throw new NotImplementedException();
96: }
97:
98: protected override IAsyncResult OnBeginClose(TimeSpan timeout, AsyncCallback callback, object state)
99: {
100: throw new NotImplementedException();
101: }
102:
103: protected override IAsyncResult OnBeginOpen(TimeSpan timeout, AsyncCallback callback, object state)
104: {
105: throw new NotImplementedException();
106: }
107:
108: protected override void OnClose(TimeSpan timeout)
109: {
110: throw new NotImplementedException();
111: }
112:
113: protected override void OnEndClose(IAsyncResult result)
114: {
115: throw new NotImplementedException();
116: }
117:
118: protected override void OnEndOpen(IAsyncResult result)
119: {
120: throw new NotImplementedException();
121: }
122:
123: protected override void OnOpen(TimeSpan timeout)
124: {
125: }
126:
127: #endregion
128: }
Server: ChannelListener, ReplyChannel and RequestContext
In the previous section we finished all stuff on the client side to send a request message to the message bus, and wait for the reply message. On the server side, the binding element will create a channel listener based on the MEP currently used, and create a related reply channel to receive the incoming request message and initialize a request context, which takes the responsible to send the reply message back.
The channel listener is similar with the channel factory, it will initialized from the transport binding element, the BuildChannelListener<TChannel> method.
The reply channel listener inherits from ChannelListenerBase<IReplyChannel>, and in order to make the underlying channel be able to use the buffer manager, encoder and bus to read and write message, we also need to pass them from the binding element, just like what we have done in the channel factory.
1: public class MessageBusReplyChannelListener : ChannelListenerBase<IReplyChannel>
2: {
3: private readonly BufferManager _bufferManager;
4: private readonly MessageEncoderFactory _encoderFactory;
5: private readonly Uri _uri;
6:
7: public override Uri Uri
8: {
9: get
10: {
11: return _uri;
12: }
13: }
14:
15: public MessageBusReplyChannelListener(MessageBusTransportBindingElement transportElement, BindingContext context)
16: : base(context.Binding)
17: {
18: _encoderFactory = (new TextMessageEncodingBindingElement()).CreateMessageEncoderFactory();
19: _bufferManager = BufferManager.CreateBufferManager(transportElement.MaxBufferPoolSize, int.MaxValue);
20: _uri = new Uri(context.ListenUriBaseAddress, context.ListenUriRelativeAddress);
21: }
22: }
To minimize the implementation, the channel listener only need to implement two methods: OnOpen and OnAcceptChannel. The OpOpen method will be invoked when the listener is opened and began to listen the channel requirement. In this example we don’t need to do anything. The AcceptChannel will be invoked when a channel required, and create a relevant channel object back, based on the MEP it’s. Since we are working on the request-reply mode, we need to reply a IReplyChannel, with the buffer manager, encoder passed though its parameters.
1: protected override IReplyChannel OnAcceptChannel(TimeSpan timeout)
2: {
3: var address = new EndpointAddress(Uri);
4: return new MessageBusReplyChannel(_bufferManager, _encoderFactory, address, this);
5: }
6:
7: protected override void OnOpen(TimeSpan timeout)
8: {
9: }
So the full code of the RequestChannelListener would be like this.
1: public class MessageBusReplyChannelListener : ChannelListenerBase<IReplyChannel>
2: {
3: private readonly BufferManager _bufferManager;
4: private readonly MessageEncoderFactory _encoderFactory;
5: private readonly Uri _uri;
6:
7: public override Uri Uri
8: {
9: get
10: {
11: return _uri;
12: }
13: }
14:
15: public MessageBusReplyChannelListener(MessageBusTransportBindingElement transportElement, BindingContext context)
16: : base(context.Binding)
17: {
18: _encoderFactory = (new TextMessageEncodingBindingElement()).CreateMessageEncoderFactory();
19: _bufferManager = BufferManager.CreateBufferManager(transportElement.MaxBufferPoolSize, int.MaxValue);
20: _uri = new Uri(context.ListenUriBaseAddress, context.ListenUriRelativeAddress);
21: }
22:
23: protected override IReplyChannel OnAcceptChannel(TimeSpan timeout)
24: {
25: var address = new EndpointAddress(Uri);
26: return new MessageBusReplyChannel(_bufferManager, _encoderFactory, address, this);
27: }
28:
29: protected override void OnOpen(TimeSpan timeout)
30: {
31: }
32:
33: protected override IAsyncResult OnBeginAcceptChannel(TimeSpan timeout, AsyncCallback callback, object state)
34: {
35: throw new NotImplementedException();
36: }
37:
38: protected override IReplyChannel OnEndAcceptChannel(IAsyncResult result)
39: {
40: throw new NotImplementedException();
41: }
42:
43: protected override IAsyncResult OnBeginWaitForChannel(TimeSpan timeout, AsyncCallback callback, object state)
44: {
45: throw new NotImplementedException();
46: }
47:
48: protected override bool OnEndWaitForChannel(IAsyncResult result)
49: {
50: throw new NotImplementedException();
51: }
52:
53: protected override bool OnWaitForChannel(TimeSpan timeout)
54: {
55: throw new NotImplementedException();
56: }
57:
58: protected override void OnAbort()
59: {
60: throw new NotImplementedException();
61: }
62:
63: protected override IAsyncResult OnBeginClose(TimeSpan timeout, AsyncCallback callback, object state)
64: {
65: throw new NotImplementedException();
66: }
67:
68: protected override IAsyncResult OnBeginOpen(TimeSpan timeout, AsyncCallback callback, object state)
69: {
70: throw new NotImplementedException();
71: }
72:
73: protected override void OnClose(TimeSpan timeout)
74: {
75: throw new NotImplementedException();
76: }
77:
78: protected override void OnEndClose(IAsyncResult result)
79: {
80: throw new NotImplementedException();
81: }
82:
83: protected override void OnEndOpen(IAsyncResult result)
84: {
85: throw new NotImplementedException();
86: }
87: }
The OnAcceptChannel returns a MessageBusReplyChannel which implemented the IReplyChannel and inherits from the ChannelBase. Similar as the request channel, we do not need to do anything in the OnOpen method.
Similar as the channel factory we get the buffer manager, encoder and bus from the upper channel listener.
1: private readonly BufferManager _bufferManager;
2: private readonly MessageEncoder _encoder;
3: private readonly EndpointAddress _localAddress;
4: private readonly object _aLock;
5:
6: private readonly IBus _bus;
7:
8: public MessageBusReplyChannel(
9: BufferManager bufferManager, MessageEncoderFactory encoder, EndpointAddress address,
10: MessageBusReplyChannelListener parent,
11: IBus bus)
12: : base(parent)
13: {
14: _bufferManager = bufferManager;
15: _encoder = encoder.CreateSessionEncoder();
16:
17: _localAddress = address;
18: _bus = bus;
19: _aLock = new object();
20: }
There are three methods in the IReplyChannel that related with the message receiving: WaitForRequest, ReceiveRequest(TimeSpan) and ReceiveRequest. The WaitForRequest method will return a Boolean when a request message came in, and the two ReceiveRequest read the message and return RequestContext. In our example, since our bus doesn’t support the wait for request feature, is means when receive request we will block the current thread until the request message arrive. So in this case the WaitForRequest will always return true.
And the parameter-less ReceiveRequest method will invoke the other one with the default timeout.
1: public bool WaitForRequest(TimeSpan timeout)
2: {
3: return true;
4: }
5:
6: public RequestContext ReceiveRequest()
7: {
8: return ReceiveRequest(DefaultReceiveTimeout);
9: }
The other ReceiveRequest method is where we need to implement the receive operation. We will firstly check if current channel is available, and receive a message from the bus by calling the Recevie method of the IBus object. If it received a message, then will construct a new RequestContext with the message and the message ID.
1: public RequestContext ReceiveRequest(TimeSpan timeout)
2: {
3: ThrowIfDisposedOrNotOpen();
4: lock (_aLock)
5: {
6: // receive the request message from the bus
7: var busMsg = _bus.Receive(true, null);
8: // box the wcf message
9: var raw = Encoding.UTF8.GetBytes(busMsg.Content);
10: var data = _bufferManager.TakeBuffer(raw.Length);
11: Buffer.BlockCopy(raw, 0, data, 0, raw.Length);
12: var buffer = new ArraySegment<byte>(data, 0, raw.Length);
13: var message = _encoder.ReadMessage(buffer, _bufferManager);
14: // initialize the request context and return
15: return new MessageBusRequestContext(message, this, _bufferManager, _encoder, _localAddress, _bus, busMsg.MessageID);
16: }
17: }
The WCF will take this request context and route it to the proper service, execute your business code. And your business result will be sent back to the request context wrapped into a WCF message into the Reply method. And what we need to do is to send this message into the bus.
The MessageBusRequestContext inherits from RequestContext with many abstract members needs to be implement. And in order to send the reply message into bus we also need the buffer manager, encoder and the bus as well. The implementation of the MessageBusRequestContext, except the Reply method would be like this.
1: public class MessageBusRequestContext : RequestContext
2: {
3: private bool _aborted;
4: private readonly Message _message;
5: private readonly MessageBusReplyChannel _parent;
6: private readonly BufferManager _bufferManager;
7: private readonly MessageEncoder _encoder;
8: private readonly EndpointAddress _address;
9: private readonly object _aLock;
10: private readonly string _busMessageId;
11: private readonly IBus _bus;
12:
13: private CommunicationState _state;
14:
15: public MessageBusRequestContext(
16: Message message, MessageBusReplyChannel parent,
17: BufferManager bufferManager, MessageEncoder encoder, EndpointAddress address,
18: IBus bus,
19: string relatedTo)
20: {
21: _aborted = false;
22: _parent = parent;
23: _message = message;
24: _bufferManager = bufferManager;
25: _encoder = encoder;
26: _address = address;
27: _busMessageId = relatedTo;
28: _bus = bus;
29:
30: _aLock = new object();
31: _state = CommunicationState.Opened;
32: }
33:
34: public override void Abort()
35: {
36: lock (_aLock)
37: {
38: if (_aborted)
39: {
40: return;
41: }
42: _aborted = true;
43: _state = CommunicationState.Faulted;
44: }
45: }
46:
47: public override IAsyncResult BeginReply(Message message, TimeSpan timeout, AsyncCallback callback, object state)
48: {
49: throw new NotImplementedException();
50: }
51:
52: public override IAsyncResult BeginReply(Message message, AsyncCallback callback, object state)
53: {
54: throw new NotImplementedException();
55: }
56:
57: public override void Close(TimeSpan timeout)
58: {
59: lock (_aLock)
60: {
61: _state = CommunicationState.Closed;
62: }
63: }
64:
65: public override void Close()
66: {
67: Close(TimeSpan.MaxValue);
68: }
69:
70: public override void EndReply(IAsyncResult result)
71: {
72: throw new NotImplementedException();
73: }
74:
75: public override void Reply(Message message, TimeSpan timeout)
76: {
77: }
78:
79: public override void Reply(Message message)
80: {
81: Reply(message, TimeSpan.MaxValue);
82: }
83:
84: public override Message RequestMessage
85: {
86: get
87: {
88: return _message;
89: }
90: }
The reply message from the business logic will be come from the parameter of the Reply method. On the reply channel when we get the message what we need to do is to send the message into our message bus so that the client will receive from its IRequestChannel.Request. So firstly retrieve the content of the reply message, and then send it into message bus.
One thing need to be highlight is that, our reply must be received by the client instance which sent the original request. This is not a major problem when using HTTP or TCP since the underlying connection ensure that the reply will be back to the proper client. But if we are using a message bus and assuming many clients are connecting and sending request messages over the bus, we must have some rules to make sure the reply received by the correct client. If you remembered the code that send the request message we generated a message ID. On the client side after it requested the service, the client will listen the bus only for the message that related with this ID.

And on the server side, we should append this ID onto the reply message as well. The workflow on both server side and client would be like this.

So in the RequestContent.Reply method after got the content from the WCF message we will send it to the bus, with the message ID that passed from the original incoming request message.
1: public override void Reply(Message message, TimeSpan timeout)
2: {
3: // unbox the reply message to string
4: ArraySegment<byte> buffer;
5: string content;
6: using (message)
7: {
8: _address.ApplyTo(message);
9: buffer = _encoder.WriteMessage(message, 64 * 1024, _bufferManager);
10: }
11: content = Encoding.UTF8.GetString(buffer.Array, buffer.Offset, buffer.Count);
12: _bufferManager.ReturnBuffer(buffer.Array);
13: // send the reply into bus
14: _bus.SendReply(content, false, _busMessageId);
15: }
Try: Using the Listener, Factory and Channels Directly
We had finished all necessary parts so far and we can start to test our transport. But in this article I don’t want to use the normal way to establish the server and client, which means I will not use ServiceHost, ClientFactory, etc.. We are going to use the inner ChannelListener, ChannelFactory, the IRequestChannel and IReplyChannel to demonstrate what happened in WCF and our transport.
Let’s create a new console application and added the references to the assembly where our transport extension is. Since we are going to use our in memory message bus, this means the server side and client side must be executed in the same process and use the same bus instance. So we will create the bus first.
1: static void Main(string[] args)
2: {
3: var bus = new InProcMessageBus();
4: }
And then we will create the server side stuff. As we know on the server side we need initialize a ChannelListener from the binding, and retrieve a IReplyChannel through the listener’s AcceptChannel method.
1: static void Main(string[] args)
2: {
3: var bus = new InProcMessageBus();
4:
5: // create and open the service listener
6: var svcBinding = new MessageBusTransportBinding(bus);
7: var address = new Uri("net.bus://localhost/sample");
8: var listener = svcBinding.BuildChannelListener<IReplyChannel>(address, new BindingParameterCollection());
9: listener.Open();
10: // create channel and begin to accept
11: var replyChannel = listener.AcceptChannel();
12: replyChannel.Open();
13: }
As you can see we also defined the service listening endpoint, which was “net.bus://localhost/sample”. The scheme of the endpoint must be the same as what we specified in the TransportBindingElement. And use the similar way we created the client side ChannelFactory and the IRequestChannel.
1: static void Main(string[] args)
2: {
3: var bus = new InProcMessageBus();
4:
5: // create and open the service listener
6: ... ...
7: // create channel and begin to accept
8: ... ...
9:
10: // create and open the client factory
11: var cliBinding = new MessageBusTransportBinding(bus);
12: var factory = cliBinding.BuildChannelFactory<IRequestChannel>();
13: factory.Open();
14: // create channel
15: var requestChannel = factory.CreateChannel(new EndpointAddress(address));
16: requestChannel.Open();
17: }
And then here’s a little bit tricky, since the in process message bus force us to execute the server and client side code in the same process we must execute the server side in another thread. So we will create a function that contains the server side code, which includes
- Use the IReplyChannel.WaitForRequest to determined that a request message comes. In our sample since the thread will be blocked by the RecevieRequest method, so the WaitForRequest will always return true.
- Use the IReplyChannel.ReceiveRequest to get the request message in the RequestContext format.
- Load the request content from the message and process the server side business logic. In this example the service will accept a string and reverse it.
- Grab the result into the reply message and invoke the RequestContext.Reply to send it back to the message bus.
1: static void ServerSideProcess(object channel)
2: {
3: var replyChannel = channel as IReplyChannel;
4:
5: while (replyChannel.WaitForRequest(TimeSpan.MaxValue))
6: {
7: using (var context = replyChannel.ReceiveRequest())
8: {
9: using (var message = context.RequestMessage)
10: {
11: Console.WriteLine("Processing request: {0}", message.Headers.Action);
12:
13: // execute the server side business logic
14: var body = message.GetBody<string>();
15: var result = new string(body.Reverse().ToArray());
16:
17: // reply to client
18: var replyMessage = Message.CreateMessage(MessageVersion.Default, "http://sbx.igt.com/SampleService/ReverseResponse", result);
19: context.Reply(replyMessage);
20: }
21: }
22: }
23: }
Back to the Main method we will create a new thread that point it this method we’d just finished and started it with the reply channel object.
1: static void Main(string[] args)
2: {
3: var bus = new InProcMessageBus();
4:
5: // create and open the service listener
6: ... ...
7: // create channel and begin to accept
8: ... ...
9:
10: // create and open the client factory
11: ... ...
12: // create channel
13: ... ...
14:
15: // server side: waiting for the request message
16: var serverThread = new Thread(new ParameterizedThreadStart(ServerSideProcess));
17: serverThread.Start(replyChannel);
18: }
Next step is the client side code. We will let the user input any string from the console, wrap it into the request message and sent through the IRequestChannel.Request. It will wait for the reply message came and display the result on the screen. The full code of this console application would be like this.
1: class Program
2: {
3: static void Main(string[] args)
4: {
5: var bus = new InProcMessageBus();
6:
7: // create and open the service listener
8: var svcBinding = new MessageBusTransportBinding(bus);
9: var address = new Uri("net.bus://localhost/sample");
10: var listener = svcBinding.BuildChannelListener<IReplyChannel>(address, new BindingParameterCollection());
11: listener.Open();
12: // create channel and begin to accept
13: var replyChannel = listener.AcceptChannel();
14: replyChannel.Open();
15:
16: // create and open the client factory
17: var cliBinding = new MessageBusTransportBinding(bus);
18: var factory = cliBinding.BuildChannelFactory<IRequestChannel>();
19: factory.Open();
20: // create channel
21: var requestChannel = factory.CreateChannel(new EndpointAddress(address));
22: requestChannel.Open();
23:
24: // server side: waiting for the request message
25: var serverThread = new Thread(new ParameterizedThreadStart(ServerSideProcess));
26: serverThread.Start(replyChannel);
27:
28: // client side: invoke the service
29: while (true)
30: {
31: Console.Write("Say something: ");
32: var text = Console.ReadLine();
33: if (string.IsNullOrWhiteSpace(text))
34: {
35: break;
36: }
37:
38: var requestMessage = Message.CreateMessage(MessageVersion.Default, "http://sbx.igt.com/SampleService/Reverse", text);
39: var replyMessage = requestChannel.Request(requestMessage);
40: using (replyMessage)
41: {
42: Console.WriteLine("Processing reply: {0}", replyMessage.Headers.Action);
43: Console.WriteLine("Reply: {0}", replyMessage.GetBody<string>());
44: }
45: }
46: }
47:
48: static void ServerSideProcess(object channel)
49: {
50: var replyChannel = channel as IReplyChannel;
51:
52: while (replyChannel.WaitForRequest(TimeSpan.MaxValue))
53: {
54: using (var context = replyChannel.ReceiveRequest())
55: {
56: using (var message = context.RequestMessage)
57: {
58: Console.WriteLine("Processing request: {0}", message.Headers.Action);
59:
60: // execute the server side business logic
61: var body = message.GetBody<string>();
62: var result = new string(body.Reverse().ToArray());
63:
64: // reply to client
65: var replyMessage = Message.CreateMessage(MessageVersion.Default, "http://sbx.igt.com/SampleService/ReverseResponse", result);
66: context.Reply(replyMessage);
67: }
68: }
69: }
70: }
71: }
Let’s start the application and try to input some strings. You can see the message was sent into the message bus and received from the service code, processing the business logic and sent back to the client.

But this is not our final goal. What we wanted to do is to build a transport that the same service can run multiple instances (scaling-out) listening on the same endpoint to accept the clients requests. The sample code below just ran one service instance with one client. Now let’s tweak the application a bit.
I created a new method to establish a new service listener and listen on the message bus in a thread. And in the main method I initialized 3 of them in threads. And in order to show which service thread was processing the request I also tweaked the server side business logic.
1: class Program
2: {
3: static void Main(string[] args)
4: {
5: var bus = new InProcMessageBus();
6: var address = new Uri("net.bus://localhost/sample");
7:
8: // launch multiple services (in threads)
9: LaunchService(bus, address, 1);
10: LaunchService(bus, address, 2);
11: LaunchService(bus, address, 3);
12:
13: // create and open the client factory
14: var cliBinding = new MessageBusTransportBinding(bus);
15: var factory = cliBinding.BuildChannelFactory<IRequestChannel>();
16: factory.Open();
17: // create channel
18: var requestChannel = factory.CreateChannel(new EndpointAddress(address));
19: requestChannel.Open();
20:
21:
22: // client side: invoke the service
23: while (true)
24: {
25: Console.Write("Say something: ");
26: var text = Console.ReadLine();
27: if (string.IsNullOrWhiteSpace(text))
28: {
29: break;
30: }
31:
32: var requestMessage = Message.CreateMessage(MessageVersion.Default, "http://sbx.igt.com/SampleService/Reverse", text);
33: var replyMessage = requestChannel.Request(requestMessage);
34: using (replyMessage)
35: {
36: Console.WriteLine("Processing reply: {0}", replyMessage.Headers.Action);
37: Console.WriteLine("Reply: {0}", replyMessage.GetBody<string>());
38: }
39: }
40: }
41:
42: static void LaunchService(IBus bus, Uri address, int id)
43: {
44: // create and open the service listener
45: var svcBinding = new MessageBusTransportBinding(bus);
46: var listener = svcBinding.BuildChannelListener<IReplyChannel>(address, new BindingParameterCollection());
47: listener.Open();
48: // create channel and begin to accept
49: var replyChannel = listener.AcceptChannel();
50: replyChannel.Open();
51: // server side: waiting for the request message
52: var thread = new Thread((obj) =>
53: {
54: var tuple = obj as Tuple<IReplyChannel, int>;
55: var channel = tuple.Item1;
56: var svcId = tuple.Item2;
57:
58: while (channel.WaitForRequest(TimeSpan.MaxValue))
59: {
60: using (var context = channel.ReceiveRequest())
61: {
62: using (var message = context.RequestMessage)
63: {
64: Console.WriteLine("[ID = {0}]: Processing request: {0}", svcId, message.Headers.Action);
65: // execute the server side business logic
66: var body = message.GetBody<string>();
67: var result = new string(body.Reverse().ToArray());
68: // reply to client
69: var replyMessage = Message.CreateMessage(MessageVersion.Default, "http://sbx.igt.com/SampleService/ReverseResponse", result);
70: context.Reply(replyMessage);
71: }
72: }
73: }
74: });
75: thread.Start(new Tuple<IReplyChannel, int>(replyChannel, id));
76: }
77: }
At this moment if we execute the application and input some strings, we can see that each service thread would be able to process the client request.

I use the service thread to simulate the multiple service instance, due to the in memory message bus limitation. If we are using some standalone message bus, such as TIBCO EMS, Redis or Windows Azure Service Bus Queues & Topics we can run the service in many processes. That would be more convictive.
Summary
In this article I described how to implement WCF transport extension, in the simplest way. We created the binding, which is the root of the extension. And then the transport binding element, which is the center of the transport. And from the transport binding element, based on the MEP, client or server side we created the channel factory and channel listener. And then from the factory and listener we finally created the channels which takes the responsible to communicate with the transportation, the message bus in our case.
After finished the work below we created a sample application to use the transport and message bus. We didn’t use the normal way to define the services logic, service host and client proxy. We initialize the underlying factory, listener and channels directly.
And finally we simulated the scaling-out on the server side, by running multiple service code in different threads, to prove that our message bus based pulling mode architecture works.
But if we review the sample application you can see that it’s not following the normal WCF pattern. Our service business logic was mixed into our service host and listening code. What we are familiar with when using WCF is to create a service contract interface and service implementation class, and then use the ServiceHost class to host it on a transport.
In the next post I will show you how to modify our transport extension, so the user could be able to define and host the service in the way we are familiar with.
Download the code here.
Hope this helps,
Shaun
All documents and related graphics, codes are provided "AS IS" without warranty of any kind.
Copyright © Shaun Ziyan Xu. This work is licensed under the Creative Commons License.
Cloud computing gives us more flexibility on the computing resource, we can provision and deploy an application or service with multiple instances over multiple machines. With the increment of the service instances, how to balance the incoming message and workload would become a new challenge.
Currently there are two approaches we can use to pass the incoming messages to the service instances, I would like call them dispatcher mode and pulling mode.
Dispatcher Mode
The dispatcher mode introduces a role which takes the responsible to find the best service instance to process the request. The image below describes the sharp of this mode.

There are four clients communicate with the service through the underlying transportation. For example, if we are using HTTP the clients might be connecting to the same service URL. On the server side there’s a dispatcher listening on this URL and try to retrieve all messages. When a message came in, the dispatcher will find a proper service instance to process it. There are three mechanism to find the instance:
- Round-robin: Dispatcher will always send the message to the next instance. For example, if the dispatcher sent the message to instance 2, then the next message will be sent to instance 3, regardless if instance 3 is busy or not at that moment.
- Random: Dispatcher will find a service instance randomly, and same as the round-robin mode it regardless if the instance is busy or not.
- Sticky: Dispatcher will send all related messages to the same service instance. This approach always being used if the service methods are state-ful or session-ful.
But as you can see, all of these approaches are not really load balanced. The clients will send messages at any time, and each message might take different process duration on the server side. This means in some cases, some of the service instances are very busy while others are almost idle. For example, if we were using round-robin mode, it could be happened that most of the simple task messages were passed to instance 1 while the complex ones were sent to instance 3, even though instance 1 should be idle.

This brings some problem in our architecture. The first one is that, the response to the clients might be longer than it should be. As it’s shown in the figure above, message 6 and 9 can be processed by instance 1 or instance 2, but in reality they were dispatched to the busy instance 3 since the dispatcher and round-robin mode.
Secondly, if there are many requests came from the clients in a very short period, service instances might be filled by tons of pending tasks and some instances might be crashed.
Third, if we are using some cloud platform to host our service instances, for example the Windows Azure, the computing resource is billed by service deployment period instead of the actual CPU usage. This means if any service instance is idle it is wasting our money!
Last one, the dispatcher would be the bottleneck of our system since all incoming messages must be routed by the dispatcher. If we are using HTTP or TCP as the transport, the dispatcher would be a network load balance. If we wants more capacity, we have to scale-up, or buy a hardware load balance which is very expensive, as well as scaling-out the service instances.
Pulling Mode
Pulling mode doesn’t need a dispatcher to route the messages. All service instances are listening to the same transport and try to retrieve the next proper message to process if they are idle.

Since there is no dispatcher in pulling mode, it requires some features on the transportation.
- The transportation must support multiple client connection and server listening. HTTP and TCP doesn’t allow multiple clients are listening on the same address and port, so it cannot be used in pulling mode directly.
- All messages in the transportation must be FIFO, which means the old message must be received before the new one.
- Message selection would be a plus on the transportation. This means both service and client can specify some selection criteria and just receive some specified kinds of messages. This feature is not mandatory but would be very useful when implementing the request reply and duplex WCF channel modes. Otherwise we must have a memory dictionary to store the reply messages. I will explain more about this in the following articles.
Message bus, or the message queue would be best candidate as the transportation when using the pulling mode. First, it allows multiple application to listen on the same queue, and it’s FIFO. Some of the message bus also support the message selection, such as TIBCO EMS, RabbitMQ. Some others provide in memory dictionary which can store the reply messages, for example the Redis.
The principle of pulling mode is to let the service instances self-managed. This means each instance will try to retrieve the next pending incoming message if they finished the current task. This gives us more benefit and can solve the problems we met with in the dispatcher mode.
- The incoming message will be received to the best instance to process, which means this will be very balanced. And it will not happen that some instances are busy while other are idle, since the idle one will retrieve more tasks to make them busy.
- Since all instances are try their best to be busy we can use less instances than dispatcher mode, which more cost effective.
- Since there’s no dispatcher in the system, there is no bottleneck.
- When we introduced more service instances, in dispatcher mode we have to change something to let the dispatcher know the new instances. But in pulling mode since all service instance are self-managed, there no extra change at all.
- If there are many incoming messages, since the message bus can queue them in the transportation, service instances would not be crashed.
All above are the benefits using the pulling mode, but it will introduce some problem as well.
- The process tracking and debugging become more difficult. Since the service instances are self-managed, we cannot know which instance will process the message. So we need more information to support debug and track.
- Real-time response may not be supported. All service instances will process the next message after the current one has done, if we have some real-time request this may not be a good solution.
Compare with the Pros and Cons above, the pulling mode would a better solution for the distributed system architecture. Because what we need more is the scalability, cost-effect and the self-management.
WCF and WCF Transport Extensibility
Windows Communication Foundation (WCF) is a framework for building service-oriented applications. In the .NET world WCF is the best way to implement the service. In this series I’m going to demonstrate how to implement the pulling mode on top of a message bus by extending the WCF.
I don’t want to deep into every related field in WCF but will highlight its transport extensibility. When we implemented an RPC foundation there are many aspects we need to deal with, for example the message encoding, encryption, authentication and message sending and receiving. In WCF, each aspect is represented by a channel. A message will be passed through all necessary channels and finally send to the underlying transportation. And on the other side the message will be received from the transport and though the same channels until the business logic.

This mode is called “Channel Stack” in WCF, and the last channel in the channel stack must always be a transport channel, which takes the responsible for sending and receiving the messages. As we are going to implement the WCF over message bus and implement the pulling mode scaling-out solution, we need to create our own transport channel so that the client and service can exchange messages over our bus. Before we deep into the transport channel, let’s have a look on the message exchange patterns that WCF defines.
Message exchange pattern (MEP) defines how client and service exchange the messages over the transportation. WCF defines 3 basic MEPs which are datagram, Request-Reply and Duplex.
- Datagram: Also known as one-way, or fire-forgot mode. The message sent from the client to the service, and no need any reply from the service. The client doesn’t care about the message result at all.
- Request-Reply: Very common used pattern. The client send the request message to the service and wait until the reply message comes from the service.
- Duplex: The client sent message to the service, when the service processing the message it can callback to the client. When callback the service would be like a client while the client would be like a service.
In WCF, each MEP represent some channels associated.
| MEP | Channels |
| Datagram | IInputChannel, IOutputChannel |
| Request-Reply | IRequestChannel, IReplyChannel |
| Duplex | IDuplexChannel |
And the channels are created by ChannelListener on the server side, and ChannelFactory on the client side. The ChannelListener and ChannelFactory are created by the TransportBindingElement. The TransportBindingElement is created by the Binding, which can be defined as a new binding or from a custom binding.
For more information about the transport channel mode, please refer to the MSDN document.
The figure below shows the transport channel objects when using the request-reply MEP.

And this is the datagram MEP.

And this is the duplex MEP.

After investigated the WCF transport architecture, channel mode and MEP, we finally identified what we should do to extend our message bus based transport layer. They are:
- Binding: (Optional) Defines the channel elements in the channel stack and added our transport binding element at the bottom of the stack. But we can use the build-in CustomBinding as well.
- TransportBindingElement: Defines which MEP is supported in our transport and create the related ChannelListener and ChannelFactory. This also defines the scheme of the endpoint if using this transport.
- ChannelListener: Create the server side channel based on the MEP it’s. We can have one ChannelListener to create channels for all supported MEPs, or we can have ChannelListener for each MEP. In this series I will use the second approach.
- ChannelFactory: Create the client side channel based on the MEP it’s. We can have one ChannelFactory to create channels for all supported MEPs, or we can have ChannelFactory for each MEP. In this series I will use the second approach.
- Channels: Based on the MEPs we want to support, we need to implement the channels accordingly. For example, if we want our transport support Request-Reply mode we should implement IRequestChannel and IReplyChannel. In this series I will implement all 3 MEPs listed above one by one.
- Scaffold: In order to make our transport extension works we also need to implement some scaffold stuff. For example we need some classes to send and receive message though out message bus. We also need some codes to read and write the WCF message, etc.. These are not necessary but would be very useful in our example.
Message Bus
There is only one thing remained before we can begin to implement our scaling-out support WCF transport, which is the message bus. As I mentioned above, the message bus must have some features to fulfill all the WCF MEPs. In my company we will be using TIBCO EMS, which is an enterprise message bus product. And I have said before we can use any message bus production if it’s satisfied with our requests.
Here I would like to introduce an interface to separate the message bus from the WCF. This allows us to implement the bus operations by any kinds bus we are going to use. The interface would be like this.
1: public interface IBus : IDisposable
2: {
3: string SendRequest(string message, bool fromClient, string from, string to = null);
4:
5: void SendReply(string message, bool fromClient, string replyTo);
6:
7: BusMessage Receive(bool fromClient, string replyTo);
8: }
There are only three methods for the bus interface. Let me explain one by one.
The SendRequest method takes the responsible for sending the request message into the bus. The parameters description are:
- message: The WCF message content.
- fromClient: Indicates if this message was came from the client.
- from: The channel ID that this message was sent from. The channel ID will be generated when any kinds of channel was created, which will be explained in the following articles.
- to: The channel ID that this message should be received. In Request-Reply and Duplex MEP this is necessary since the reply message must be received by the channel which sent the related request message.
The SendReply method takes the responsible for sending the reply message. It’s very similar as the previous one but no “from” parameter. This is because it’s no need to reply a reply message again in any MEPs.
The Receive method takes the responsible for waiting for a incoming message, includes the request message and specified reply message. It returned a BusMessage object, which contains some information about the channel information. The code of the BusMessage class is
1: public class BusMessage
2: {
3: public string MessageID { get; private set; }
4: public string From { get; private set; }
5: public string ReplyTo { get; private set; }
6: public string Content { get; private set; }
7:
8: public BusMessage(string messageId, string fromChannelId, string replyToChannelId, string content)
9: {
10: MessageID = messageId;
11: From = fromChannelId;
12: ReplyTo = replyToChannelId;
13: Content = content;
14: }
15: }
Now let’s implement a message bus based on the IBus interface. Since I don’t want you to buy and install the TIBCO EMS or any other message bus products, I will implement an in process memory bus. This bus is only for test and sample purpose. It can only be used if the service and client are in the same process. Very straightforward.
1: public class InProcMessageBus : IBus
2: {
3: private readonly ConcurrentDictionary<Guid, InProcMessageEntity> _queue;
4: private readonly object _lock;
5:
6: public InProcMessageBus()
7: {
8: _queue = new ConcurrentDictionary<Guid, InProcMessageEntity>();
9: _lock = new object();
10: }
11:
12: public string SendRequest(string message, bool fromClient, string from, string to = null)
13: {
14: var entity = new InProcMessageEntity(message, fromClient, from, to);
15: _queue.TryAdd(entity.ID, entity);
16: return entity.ID.ToString();
17: }
18:
19: public void SendReply(string message, bool fromClient, string replyTo)
20: {
21: var entity = new InProcMessageEntity(message, fromClient, null, replyTo);
22: _queue.TryAdd(entity.ID, entity);
23: }
24:
25: public BusMessage Receive(bool fromClient, string replyTo)
26: {
27: InProcMessageEntity e = null;
28: while (true)
29: {
30: lock (_lock)
31: {
32: var entity = _queue
33: .Where(kvp => kvp.Value.FromClient == fromClient && (kvp.Value.To == replyTo || string.IsNullOrWhiteSpace(kvp.Value.To)))
34: .FirstOrDefault();
35: if (entity.Key != Guid.Empty && entity.Value != null)
36: {
37: _queue.TryRemove(entity.Key, out e);
38: }
39: }
40: if (e == null)
41: {
42: Thread.Sleep(100);
43: }
44: else
45: {
46: return new BusMessage(e.ID.ToString(), e.From, e.To, e.Content);
47: }
48: }
49: }
50:
51: public void Dispose()
52: {
53: }
54: }
The InProcMessageBus stores the messages in the objects of InProcMessageEntity, which can take some extra information beside the WCF message itself.
1: public class InProcMessageEntity
2: {
3: public Guid ID { get; set; }
4: public string Content { get; set; }
5: public bool FromClient { get; set; }
6: public string From { get; set; }
7: public string To { get; set; }
8:
9: public InProcMessageEntity()
10: : this(string.Empty, false, string.Empty, string.Empty)
11: {
12: }
13:
14: public InProcMessageEntity(string content, bool fromClient, string from, string to)
15: {
16: ID = Guid.NewGuid();
17: Content = content;
18: FromClient = fromClient;
19: From = from;
20: To = to;
21: }
22: }
Summary
OK, now I have all necessary stuff ready. The next step would be implementing our WCF message bus transport extension.
In this post I described two scaling-out approaches on the service side especially if we are using the cloud platform: dispatcher mode and pulling mode. And I compared the Pros and Cons of them. Then I introduced the WCF channel stack, channel mode and the transport extension part, and identified what we should do to create our own WCF transport extension, to let our WCF services using pulling mode based on a message bus. And finally I provided some classes that need to be used in the future posts that working against an in process memory message bus, for the demonstration purpose only.
In the next post I will begin to implement the transport extension step by step.
Hope this helps,
Shaun
All documents and related graphics, codes are provided "AS IS" without warranty of any kind.
Copyright © Shaun Ziyan Xu. This work is licensed under the Creative Commons License.
This is the second post about SQL Azure Federation. In my first post I described a little bit about the theory of data partitioning, included the different between the horizontal partitioning and the vertical partitioning. I also talked about the features need to be done when we implemented the horizontal partitioning. And finally, I described some basic concept about SQL Azure Federation.
In this post, I will demonstrate how to use SQL Azure Federation in the SQL Server Management Studio (SSMS). I will also demonstrate the benefit of using SQL Azure Federation to build the multi-tenant data solution.
Create Federation and Tables
SQL Azure Federation is officially available for every Windows Azure data centers. There no additional register or sign up process. When we create a new server or database in SQL Azure, we can use SQL Azure Federation. For example, I had created a new database in my SQL Azure server which located in East Asia data center. Then I can open my SSMS and connect to this SQL Azure server and use SQL Azure Federation.
Do not forget to set the SQL Azure firewall before you connect to the server from local tools such as SSMS.
In this post I would like to take a very simple scenario as the example. Assuming we have a multi-tenant CRM system which contains accounts, contacts for each tenants. We also have some lookup data such as countries and titles. We also have one table contains some metadata for each tenants. So the database diagram would be like this.

In SQL Azure Federation, if we decided to split some tables we must have the federation ready, then create the tables that wanted to be federated based on the federation. There is no way in SQL Azure Federation to switch tables between the federated and non-federated or reference. So before we start to create the tables we need to firstly define the federation.
Since we decided to have the tenant ID as the key when splitting the database, the federation distribution should be INT type, which is the same as the type of TenantID in the tables.
In order to create the federation, in SSMS connect to the database and execute the T-SQL below.
1: CREATE FEDERATION Tenant_Fed (TID INT RANGE)
2: GO
There’s nothing special happened after we executed this T-SQL in SSMS. But in SQL Azure fabric, it created a database which represent our first federation member. The federation member database name was specified by SQL Azure which was a GUID with “system-” prefixed. And the original database, which we created and are connecting now, became the root database.

After that, all tables in our system should be in one of these three types.
| Table Type |
Description |
Where |
| Federated Table |
Tables that will be split based on the federation and their federation columns. |
Federation Members |
| Reference Table |
Tables that will not be split, but should be referred by the federated table. These tables will be copied across all federation members. |
Federation Members |
| Centre Table |
Tables that will not be split, and very rarely join-select with the tables in federation members. |
Federation Root |
For the federated table and reference table, we need to create them in a federation member, so that when we split this member, SQL Azure Federation will help us to create them in the new member. In order to connect to a federation member we need to execute the T-SQL below.
1: USE FEDERATION Tenant_Fed (TID = 0) WITH RESET, FILTERING = OFF
2: GO
If you are familiar with SQL Azure you should be already know that the USE keyword doesn’t work in SQL Azure. But if we specify the FEDERATION keyword and the federation name it will switch our connection to the federation member database, which contains the distribution key value we specified here (TID = 0). And if we connected to this federation member we can retrieve the database name by using the db_name() function. As you can see in my workstation my first federation member database name was “system-06d60081-6737-413e-85b1-df65cb55f1c9”.

Now we have been connecting to the federation member and the next step is to create the federated tables. Based on our design the Account and Contact table should be partitioning by their TenantID column so let’s create them by the following T-SQL.
1: CREATE TABLE [dbo].[Account](
2: [ID] [int] NOT NULL,
3: [TenantID] [int] NOT NULL,
4: [Name] [varchar](50) NOT NULL,
5: [CountryID] [int] NOT NULL,
6: CONSTRAINT [PK_Account] PRIMARY KEY CLUSTERED
7: (
8: [ID] ASC
9: )WITH (STATISTICS_NORECOMPUTE = OFF, IGNORE_DUP_KEY = OFF)
10: )
11: FEDERATED ON (TID = TenantID)
12: GO
In the T-SQL above we are going to create the Account table with fields and primary key. And at the end of the statement we defined that the TenantID column is the federated column of this table. This means, when SQL Azure Federation split the table it will look up the value of TenantID, decided which rows should be in which member.
But when we executed we will get an error that the primary key index could not be created since the index doesn’t contain the federated column.

This is a limitation in SQL Azure Federation. The federated column in a federated table must be the clustered index, or be in the clustered index. Since we defined the ID as the primary key of the Account table, to satisfied this limitation we must include the TenantID as the primary key as well. So our T-SQL should be like this.
1: CREATE TABLE [dbo].[Account](
2: [ID] [int] NOT NULL,
3: [TenantID] [int] NOT NULL,
4: [Name] [varchar](50) NOT NULL,
5: [CountryID] [int] NOT NULL,
6: CONSTRAINT [PK_Account] PRIMARY KEY CLUSTERED
7: (
8: [ID] ASC,
9: [TenantID] ASC
10: )WITH (STATISTICS_NORECOMPUTE = OFF, IGNORE_DUP_KEY = OFF)
11: )
12: FEDERATED ON (TID = TenantID)
13: GO
And similarly, in Contact table we also need to add the TenantID column as the primary key. We also need to amend the foreign keys as well. The T-SQL should be like this.
There is no constraint that the federation column name should be the same in each federated tables. We can specify the federation key to the different columns that has different names in each federated tables when created the tables. For example in Account table we link TID = TenantID while in Contact table we link TID = Tenant_ID or TID = T_ID, etc..
1: CREATE TABLE [dbo].[Contact](
2: [ID] [int] NOT NULL,
3: [TenantID] [int] NOT NULL,
4: [AccountID] [int] NOT NULL,
5: [Name] [varchar](50) NOT NULL,
6: [TitleID] [int] NOT NULL,
7: CONSTRAINT [PK_Contact] PRIMARY KEY CLUSTERED
8: (
9: [ID] ASC,
10: [TenantID] ASC
11: )WITH (STATISTICS_NORECOMPUTE = OFF, IGNORE_DUP_KEY = OFF)
12: )
13: FEDERATED ON (TID = [TenantID])
14: GO
15:
16: ALTER TABLE [dbo].[Account] WITH CHECK ADD CONSTRAINT [FK_Account_Country] FOREIGN KEY([CountryID])
17: REFERENCES [dbo].[Country] ([ID])
18: GO
19: ALTER TABLE [dbo].[Account] CHECK CONSTRAINT [FK_Account_Country]
20: GO
21: ALTER TABLE [dbo].[Contact] WITH CHECK ADD CONSTRAINT [FK_Contact_Account] FOREIGN KEY([AccountID], [TenantID])
22: REFERENCES [dbo].[Account] ([ID], [TenantID])
23: GO
24: ALTER TABLE [dbo].[Contact] CHECK CONSTRAINT [FK_Contact_Account]
25: GO
26: ALTER TABLE [dbo].[Contact] WITH CHECK ADD CONSTRAINT [FK_Contact_Title] FOREIGN KEY([TitleID])
27: REFERENCES [dbo].[Title] ([ID])
28: GO
29: ALTER TABLE [dbo].[Contact] CHECK CONSTRAINT [FK_Contact_Title]
30: GO
Now we have the federated tables created in our first federation member. Next let’s create the reference tables. In this case the reference tables are County and Title. They don’t have the column related with the federation distribution key, which is the tenant ID in our example. But they need to be selected alone with the federated tables. For example we might need to retrieve the account information for a tenant with the country name. So they need to be added as reference tables.
To create a reference table in a federation member would be the same as what we did on a normal database, no need to specify the federated column in the CREATE TABLE statement.
1: CREATE TABLE [dbo].[Title](
2: [ID] [int] NOT NULL,
3: [Title] [varchar](50) NOT NULL,
4: CONSTRAINT [PK_Title] PRIMARY KEY CLUSTERED
5: (
6: [ID] ASC
7: )WITH (STATISTICS_NORECOMPUTE = OFF, IGNORE_DUP_KEY = OFF)
8: )
9: GO
10:
11: CREATE TABLE [dbo].[Country](
12: [ID] [int] NOT NULL,
13: [Country] [varchar](50) NOT NULL,
14: CONSTRAINT [PK_Country] PRIMARY KEY CLUSTERED
15: (
16: [ID] ASC
17: )WITH (STATISTICS_NORECOMPUTE = OFF, IGNORE_DUP_KEY = OFF)
18: )
19: GO
20:
21: ALTER TABLE [dbo].[Account] WITH CHECK ADD CONSTRAINT [FK_Account_Country] FOREIGN KEY([CountryID])
22: REFERENCES [dbo].[Country] ([ID])
23: GO
24: ALTER TABLE [dbo].[Account] CHECK CONSTRAINT [FK_Account_Country]
25: GO
26:
27: ALTER TABLE [dbo].[Contact] WITH CHECK ADD CONSTRAINT [FK_Contact_Title] FOREIGN KEY([TitleID])
28: REFERENCES [dbo].[Title] ([ID])
29: GO
30: ALTER TABLE [dbo].[Contact] CHECK CONSTRAINT [FK_Contact_Title]
31: GO
And in the T-SQL above we also added the foreign keys between the reference tables and federated tables. It’s possible to add the foreign keys between the federated tables, like what we added between the Account and Contact. It’s possible to add the foreign key from the federated table to reference table, like the key between Account and Country. But it’s not allowed to add the foreign key from a non-federated table to a federated table. For example we cannot add a foreign key between the Account to a table in the federation root.
Now we have all tables and keys ready in our first federation member. Let’s add some sample data.
1: INSERT INTO Country VALUES (1, 'China')
2: INSERT INTO Country VALUES (2, 'US')
3: INSERT INTO Country VALUES (3, 'UK')
4:
5: INSERT INTO Title VALUES (1, 'Mr')
6: INSERT INTO Title VALUES (2, 'Ms')
7:
8: INSERT INTO Account VALUES (1, 1, 'Tenant 1 - Account 1', 1)
9: INSERT INTO Account VALUES (2, 1, 'Tenant 1 - Account 2', 2)
10: INSERT INTO Account VALUES (3, 2, 'Tenant 2 - Account 3', 3)
11: INSERT INTO Account VALUES (4, 2, 'Tenant 2 - Account 4', 1)
12: INSERT INTO Account VALUES (5, 3, 'Tenant 3 - Account 5', 2)
13: INSERT INTO Account VALUES (6, 3, 'Tenant 3 - Account 6', 3)
14: INSERT INTO Account VALUES (7, 4, 'Tenant 4 - Account 7', 1)
15: INSERT INTO Account VALUES (8, 4, 'Tenant 4 - Account 8', 2)
16: INSERT INTO Account VALUES (9, 5, 'Tenant 5 - Account 9', 3)
17: INSERT INTO Account VALUES (10, 5, 'Tenant 5 - Account 10', 1)
18:
19:
20: INSERT INTO Contact VALUES (1, 1, 1, 'Tenant 1 - Account 1 - Contact 1', 1)
21: INSERT INTO Contact VALUES (2, 1, 1, 'Tenant 1 - Account 1 - Contact 2', 2)
22: INSERT INTO Contact VALUES (3, 1, 1, 'Tenant 1 - Account 1 - Contact 3', 1)
23: INSERT INTO Contact VALUES (4, 1, 2, 'Tenant 1 - Account 2 - Contact 4', 2)
24: INSERT INTO Contact VALUES (5, 1, 2, 'Tenant 1 - Account 2 - Contact 5', 1)
25: INSERT INTO Contact VALUES (6, 1, 2, 'Tenant 1 - Account 2 - Contact 6', 2)
26: INSERT INTO Contact VALUES (7, 2, 3, 'Tenant 2 - Account 3 - Contact 7', 1)
27: INSERT INTO Contact VALUES (8, 2, 3, 'Tenant 2 - Account 3 - Contact 8', 2)
28: INSERT INTO Contact VALUES (9, 2, 3, 'Tenant 2 - Account 3 - Contact 9', 1)
29: INSERT INTO Contact VALUES (10, 2, 4, 'Tenant 2 - Account 4 - Contact 10', 2)
30: INSERT INTO Contact VALUES (11, 2, 4, 'Tenant 2 - Account 4 - Contact 11', 1)
31: INSERT INTO Contact VALUES (12, 2, 4, 'Tenant 2 - Account 4 - Contact 12', 2)
32: INSERT INTO Contact VALUES (13, 3, 5, 'Tenant 3 - Account 5 - Contact 13', 1)
33: INSERT INTO Contact VALUES (14, 3, 5, 'Tenant 3 - Account 5 - Contact 14', 2)
34: INSERT INTO Contact VALUES (15, 3, 5, 'Tenant 3 - Account 5 - Contact 15', 1)
35: INSERT INTO Contact VALUES (16, 3, 6, 'Tenant 3 - Account 6 - Contact 16', 2)
36: INSERT INTO Contact VALUES (17, 3, 6, 'Tenant 3 - Account 6 - Contact 17', 1)
37: INSERT INTO Contact VALUES (18, 3, 6, 'Tenant 3 - Account 6 - Contact 18', 2)
38: INSERT INTO Contact VALUES (19, 4, 7, 'Tenant 4 - Account 7 - Contact 19', 1)
39: INSERT INTO Contact VALUES (20, 4, 7, 'Tenant 4 - Account 7 - Contact 20', 2)
40: INSERT INTO Contact VALUES (21, 4, 7, 'Tenant 4 - Account 7 - Contact 21', 1)
41: INSERT INTO Contact VALUES (22, 4, 8, 'Tenant 4 - Account 8 - Contact 22', 2)
42: INSERT INTO Contact VALUES (23, 4, 8, 'Tenant 4 - Account 8 - Contact 23', 1)
43: INSERT INTO Contact VALUES (24, 4, 8, 'Tenant 4 - Account 8 - Contact 24', 2)
44: INSERT INTO Contact VALUES (25, 5, 9, 'Tenant 5 - Account 9 - Contact 25', 1)
45: INSERT INTO Contact VALUES (26, 5, 9, 'Tenant 5 - Account 9 - Contact 26', 2)
46: INSERT INTO Contact VALUES (27, 5, 9, 'Tenant 5 - Account 9 - Contact 27', 1)
47: INSERT INTO Contact VALUES (28, 5, 10, 'Tenant 5 - Account 10 - Contact 28', 2)
48: INSERT INTO Contact VALUES (29, 5, 10, 'Tenant 5 - Account 10 - Contact 29', 1)
49: INSERT INTO Contact VALUES (30, 5, 10, 'Tenant 5 - Account 10 - Contact 30', 2)
After executed these T-SQL we had put all data in our first federation member. It contains two references tables (Country, Title) and two federated tables (Account, Contact), and some foreign keys between the reference table and federated table.

And we can select the data in this federation member with the tables joining, for example the T-SQL below will list all accounts and contacts information.
1: SELECT Account.Name, Country.Country, Contact.Name, Title.Title FROM Contact
2: LEFT JOIN Account ON AccountID = Account.ID
3: LEFT JOIN Country ON Account.CountryID = Country.ID
4: LEFT JOIN Title ON Contact.TitleID = Title.ID
Split Federation Member
The key feature of SQL Azure Federation is to split a federation member into two based on the federated value specified without any downtime. Since we have inserted some data in our first federation member, let’s split it into two.
Split a federation member is very easy. We use the ALTER FEDERATION command and specify from what value of the distribution key to split. For example, since the federation distribution key is the TID (tenant ID), we will split all federated tables based on their federated column value at 3. This means all data that the tenant ID is less than 3 will be in the federation member 1 while others will be in the federation member 2.
In order to split the federation we need to firstly connect to the federation root, and then execute the ALTER FEDERATION command specifying the boundary value.
1: USE FEDERATION ROOT WITH RESET
2: GO
3:
4: ALTER FEDERATION Tenant_Fed SPLIT AT (TID = 3)
5: GO
When execute this command the SQL Azure engine will perform the operations listed below.
- First, it will configure the federation metadata information on the federation root, to indicate that the federation should be split.
- Then it will create two databases for the new federation members. Please note that even though we have had a federation member already when executing the split command, SQL Azure Federation will NOT use this member in the future. It will create two new members.
- SQL Azure will create the tables, keys and constraints, etc. from the current federation member to the new members.
- It will copy the records in the reference tables to new members.
- It will copy the records in the federated tables to new members based on the boundary value specified. Records with the federation column value less than the boundary value will be put into the low member while those equals or more than the boundary value will be put into the high member.
- Finally, SQL Azure will re-map the connections from the original federation member to the new members and drop the original member.
After the split command finished we can connect one federation member by using the USE FEDERATION command. In SQL Azure Federation we cannot connect to a member database by specifying its database name, instead we need to tell the SQL Azure Federation the value of the distribution key we want to us, then SQL Azure Federation will rout us to that member database. For example, we split our data at tenant ID = 3, then if we want to read the data of tenant 4 we can use the T-SQL below.
1: USE FEDERATION Tenant_Fed (TID = 4) WITH RESET, FILTERING = OFF
2: GO
Then execute the command we had just preformed before to see the data in this federation member.
1: SELECT Account.Name, Country.Country, Contact.Name, Title.Title
2: FROM Contact
3: LEFT JOIN Account ON AccountID = Account.ID
4: LEFT JOIN Country ON Account.CountryID = Country.ID
5: LEFT JOIN Title ON Contact.TitleID = Title.ID
And as you can see, the SSMS returned the data that the tenant ID is equal or more than 4.

And if we want to see the data in the first member, we can just specify the TID = 0, 1 or 2, any value less than the boundary value of that federation member.

Connect to Atomic Unit
When we connected to a federation member we use the USE FEDERATION command. There’s a clause in this T-SQL statement which is FILTERING = OFF. In the example above we can select all data in the federation member unless we didn’t put any criteria in WHERE clause in SELECT command. This is because we utilized FILTERING = OFF. It will make the connection scope set to the full range covered by the federation member containing the specified key value. The connection behaves the same if it were connected to the member through its physical federation member name (the database name).
SQL Azure Federation provides another valuable feature that can force the connection scope set to the federation key instance (federation key value) in a federation member rather than to the full range of federation member, by using FILTERING = ON clause.

So let’s have a try and to see what will happen if we set the FILTERING = ON. At this time we still specify the SSMS to use federation where the key equals to 2 but specify the FILTERING = ON, and select the data without any criteria.
1: USE FEDERATION Tenant_Fed (TID = 2) WITH RESET, FILTERING = ON
2: GO
3:
4: SELECT Account.Name, Country.Country, Contact.Name, Title.Title
5: FROM Contact
6: LEFT JOIN Account ON AccountID = Account.ID
7: LEFT JOIN Country ON Account.CountryID = Country.ID
8: LEFT JOIN Title ON Contact.TitleID = Title.ID
Now we can see that only the records that the TenantID = 2 were returned. We didn’t provide anything in the WHERE clause in SELECT command but it worked as if we specified WHERE TenantID = 2. This is the benefit that using the FILTERING = ON in the USE FEDERATION command.

We mentioned in the previous post, in SQL Azure Federation all records that related with the same federation key value in a federation member is called an Atomic Unit. In the case above we connected to the federation member’s atomic unit which the value is TenantID = 2, by using the FILTERING = ON clause.
This feature is very useful especially when we need to migrate a single tenant application to multi-tenant style. As you know we might need to add the tenant identity column in the tables that tenant-award when migrating, but this might lead us to rewrite all related SQL scripts. For example if we have a SQL script that retrieves all data in table Order, it might look like this in single tenant application.
1: SELECT Order.ID, Order.Name FROM Order
But if we implemented the multi-tenant feature we have to add the TenantID column in this table to indicate which tenant the order is. And accordingly we have to rewrite the SQL script if a participant tenant user is currently logging on.
1: SELECT Order.ID, Order.Name FROM Order
2: WHERE Order.TenantID = @UserTenentID
Assuming that we have 50 tables that are tenant-award and 10 scripts for each table. Then we need to amend 50 * 10 = 500 SQL scripts. And almost all changes are very similar.
But if we are using SQL Azure Federation we can make the tenant ID as the federation distribution key and let the tables split based on their TenantID column. Then if we have an user logged in under a tenant, we can invoke the USE FEDERATION statement specifying the TenantID value with FILTERING = ON before any data command, then all following SQL scripts will no need to be changed since the SQL Azure Federation helped us to filter the connection scope to this tenant. This can be done very easily by using AOP.
Summary
In this post I demonstrated how to use SQL Azure Federation. We talked about how to create federation, federation members and tables. And we demonstrated how to split a federation member, which is horizontal data partitioning without any downtime.
I also talked about the feature of FILTERING = ON in USE FEDERATION statement and the benefit in multi-tenant solution.
SQL Azure Federation is a very powerful tool for us to build scale-out and/or multi-tenant application. In the coming few posts I would like to discuss the metadata system of the federation, the pricing and how to split the federation members based on the data size or record count.
And I would like to introduce on how to implement our own horizontal partitioning data access layer without SQL Azure Federation available.
Hope this helps,
Shaun
All documents and related graphics, codes are provided "AS IS" without warranty of any kind.
Copyright © Shaun Ziyan Xu. This work is licensed under the Creative Commons License.
As many of you may already know that, I'm working at a global gaming and entertainment company taking the responsible for design and implement the next generation platform which will be running on the cloud, and also design the cloud platform as well. Currently one of the goal is to replace the active directory integrated security and identity solution with certificate-based solution in our product. In short, we need to work with Active Directory Certificate Service to request and issue the certificates for vary clients, so that they can use these certificates to connect our services from any devices, such as PC, smart phone, and pad, etc., securely.
Since what we need to implement first is a certificate service that can be used by any clients. This service will be talking to the Windows Active Directory Certificate Service (AD CS) through C#.
There are many articles for IT Pro on how to install and configure the AD CS, but very few on how to communicate with AD CS by C#. In this post I will describe and demonstrate how to work with the AD CS via C#. So I will not talk much about the theory of digital certificate, public key infrastructure and certificate authority, but will focus on how to use them.
Basic Knowledge of Certificate
Certificate, also known as public key certificate or digital certificate, is an electronic document which uses a digital signature to bind a public key with an identity. The identity could be anything. For example it could be represent a user, a device, a service or even a few lines of code.
The certificate can be used to sign the identity and could be verified by others. For example a message being signed by a certificate could be verified by the receiver, so that it will be able to know whether the message is the original one or had been modified by someone else. The certificate can also be used to encrypt and decrypt. This is the reason why we can bind a certificate on a website so that the data between the browser and server would be secured, since they are been encrypted and signed by the certificate.
The certificate authority (CA) takes the responsible to issue the certificates. In Unix people can use OpenSSL's ca command or SuSE's gensslcert to issue certificate. In Windows we can use the Active Directory Certificate Service.
In an enterprise there might be more than one CAs and normally they will be organized hierarchically. The top level would be the Root CA, which have a certificate signed by itself. All subordinate CAs’ certificate should be requested to and signed by the root CA.

Each CA can receive the certificate request from the client and issue them. Normally, the root CA would not be reachable by the clients since it holds the root CA certificate which is very important. Clients may send the certificate request to some subordinate CAs and get the certificate installed.

The certificate contains a key pair, which includes a private key and a public key. In order to make the private key secured, when requesting and installing the certificate, the private key should never be passed out of the client. Certificate request could be in PKCS #10 or CMC format, sent from the client to the CA. The subordinate CA received the request, and based on the request handling and policies, it will mark the request as pending status and let the administrator issue or deny manually, or automatically issue them. The certificate response would be in PKCS #7 format, signed by the CA certificate. Then the client will verify the response and combine it with the original private key to a full certificate.

So when we need to create a certificate, what we need to do is to
- Generate the key pair and some other stuff in order to send to the CA.
- Generate the certificate request in PKCS #10 or CMC format, and submit to CA.
- Download the CA response.
- Combine and install the full certificate on client based on the local key pair and the CA response.
In Windows Server 2008 R2, the AD CS introduced a new component named CES and CEP, which are Certificate Enrollment Service and Certificate Enrollment Policy. The client can communicate with CA through these web services. But in the prior version we have to use two COM: CertCli and CertEnroll.
CertCli component takes the responsible for connecting the CA server to submit the certificate request, certificate renew request and look for the request ID that CA server has. When connect from other machine the CertCli utilizes DCOM technology to invoke the CA functionality. This means, the CertCli cannot be used out of the domain or between the firewall.
CertEnroll component takes the responsible for generate the PKCS message, install and export the certificate. It doesn’t need to communicate to the CA directly.
Since in .NET we can wrap the COM and use it in managed code, we should be able to communicate with the CA by using them.
Generate Certificate Request Message to Standalone CA
There are two types of CA: enterprise CA and standalone CA. There are many differences between them. But to be simplified, the standalone CA cannot use the certificate template. In this post I will firstly demonstrate how to request the certificate to a standalone CA.
It’s only two steps to request a certificate, first one is to generate the request message, and then send the message to CA. To generate a valid certificate request message we need to use the CertEnroll COM, to send the request to need to use the CertCli COM. So let’s create a new console application and added these 2 COM components into the references.

To make sure the code runs successfully, it’s recommended to execute the sample code on the CA server, or at least the server on the same domain with the CA server. I will explain more about this later.
There are many information we need to specify or provide in order to build the request message. The first one is to select the valid cryptographic service provider (CSP). There are many CSPs built in the Windows. We can choose one of them, or we can just let the operation system retrieve all valid CSPs to us to use.
1: using System;
2: using System.Collections.Generic;
3: using System.Linq;
4: using System.Text;
5: using CERTENROLLLib;
6: using CERTENROLLLib;
7:
8: namespace ShaunXu.ADCSviaCSharp
9: {
10: class Program
11: {
12: private string CreateCertRequestMessage()
13: {
14: var objCSPs = new CCspInformations();
15: objCSPs.AddAvailableCsps();
16: }
17:
18: static void Main(string[] args)
19: {
20: }
21: }
22: }
Then we will create the key pair of the certificate. In this step we need to specify information below:
- Length: The key length of the private key. Normally the key length should NOT less than 1024 for security consideration.
- Key Spec: Define how this key pair, and the certificate will be used. For example digital signature or key exchange.
- Key Usage: The key usage value will be upgrade based on the Key Spec we defined.
- Machine Context: Specify whether the certificate will be used for current user and machine.
- Export Policy: Specify whether the private key can be exported or not from this machine.
- CSP Information: The valid CSPs for this key pair.
When we finished to define all information listed above we can just invoke CX509PrivateKey.Create to let the operation system generate a key pair for us. It will be stored in the machine in a “magic” folder.
1: var objPrivateKey = new CX509PrivateKey();
2: objPrivateKey.Length = 2048;
3: objPrivateKey.KeySpec = X509KeySpec.XCN_AT_SIGNATURE;
4: objPrivateKey.KeyUsage = X509PrivateKeyUsageFlags.XCN_NCRYPT_ALLOW_ALL_USAGES;
5: objPrivateKey.MachineContext = false;
6: objPrivateKey.ExportPolicy = X509PrivateKeyExportFlags.XCN_NCRYPT_ALLOW_EXPORT_FLAG;
7: objPrivateKey.CspInformations = objCSPs;
8: objPrivateKey.Create();
Next step, initialize the PKCS #10 object from the private we had just created. We need to specify whether the certificate should be used for current user or machine, which must be as same as the value of Machine Context that we defined in previous step. Since we will send the request to a standalone CA we will not specify the template here.
1: var objPkcs10 = new CX509CertificateRequestPkcs10();
2: objPkcs10.InitializeFromPrivateKey(
3: X509CertificateEnrollmentContext.ContextUser,
4: objPrivateKey,
5: string.Empty);
Next, specify some extension information to the certificate. I will not deep into these extensions. Just one thing, all extensions in certificate will be defined by an identity named Object ID (OID). So if we want to add some extensions to the certificate we need to specify the OID rather than the name. For example, in the code below I added “Client Authentication” enhanced key usage extension to the certificate by specifying its OID “1.3.6.1.5.5.7.3.2”.
1: var objExtensionKeyUsage = new CX509ExtensionKeyUsage();
2: objExtensionKeyUsage.InitializeEncode(
3: CERTENROLLLib.X509KeyUsageFlags.XCN_CERT_DIGITAL_SIGNATURE_KEY_USAGE |
4: CERTENROLLLib.X509KeyUsageFlags.XCN_CERT_NON_REPUDIATION_KEY_USAGE |
5: CERTENROLLLib.X509KeyUsageFlags.XCN_CERT_KEY_ENCIPHERMENT_KEY_USAGE |
6: CERTENROLLLib.X509KeyUsageFlags.XCN_CERT_DATA_ENCIPHERMENT_KEY_USAGE);
7: objPkcs10.X509Extensions.Add((CX509Extension)objExtensionKeyUsage);
8:
9: var objObjectId = new CObjectId();
10: var objObjectIds = new CObjectIds();
11: var objX509ExtensionEnhancedKeyUsage = new CX509ExtensionEnhancedKeyUsage();
12: objObjectId.InitializeFromValue("1.3.6.1.5.5.7.3.2");
13: objObjectIds.Add(objObjectId);
14: objX509ExtensionEnhancedKeyUsage.InitializeEncode(objObjectIds);
15: objPkcs10.X509Extensions.Add((CX509Extension)objX509ExtensionEnhancedKeyUsage);
Next, we will specify the subject of the certificate. As I mentioned earlier, a certificate can represent anything. So the subject will take the information of what is being identified y this certificate. There are some fields in subject:
- CN: Common Name
- C: Country (Must be 2 letter.)
- S: State
- L: Locality
- O: Organization
- OU: Organization Unit
- E: Email
We can define one or more fields when request the certificate and it will combine in the format like this.
1: [Field_Name_1] = [Field_Value_1], [Field_Name_2] = [Field_Value_2], [Field_Name_3] = [Field_Value_3]
For example this is a valid subject with the CN, C, S, L, O and OU defined.
1: CN = UIX, OU = NAS, O = IGT, L = Reno, S = Nevada, C = US
To specify the subject in C# we also need to provide them into the same format, and set into the PKCS #10 object.
1: var objDN = new CX500DistinguishedName();
2: var subjectName = "CN = shaunxu.me, OU = ADCS, O = Blog, L = Beijng, S = Beijing, C = CN";
3: objDN.Encode(subjectName, X500NameFlags.XCN_CERT_NAME_STR_NONE);
4: objPkcs10.Subject = objDN;
Finally we initialize the CertEnroll COM object by passing the PKCS #10 in and invoke its CreateRequest method to generate the certificate request in base64 format.
1: var objEnroll = new CX509Enrollment();
2: objEnroll.InitializeFromRequest(objPkcs10);
3: var strRequest = objEnroll.CreateRequest(EncodingType.XCN_CRYPT_STRING_BASE64);
So the full code for certificate request generation would be like this.
1: private string CreateCertRequestMessage()
2: {
3: var objCSPs = new CCspInformations();
4: objCSPs.AddAvailableCsps();
5:
6: var objPrivateKey = new CX509PrivateKey();
7: objPrivateKey.Length = 2048;
8: objPrivateKey.KeySpec = X509KeySpec.XCN_AT_SIGNATURE;
9: objPrivateKey.KeyUsage = X509PrivateKeyUsageFlags.XCN_NCRYPT_ALLOW_ALL_USAGES;
10: objPrivateKey.MachineContext = false;
11: objPrivateKey.ExportPolicy = X509PrivateKeyExportFlags.XCN_NCRYPT_ALLOW_EXPORT_FLAG;
12: objPrivateKey.CspInformations = objCSPs;
13: objPrivateKey.Create();
14:
15: var objPkcs10 = new CX509CertificateRequestPkcs10();
16: objPkcs10.InitializeFromPrivateKey(
17: X509CertificateEnrollmentContext.ContextUser,
18: objPrivateKey,
19: string.Empty);
20:
21: var objExtensionKeyUsage = new CX509ExtensionKeyUsage();
22: objExtensionKeyUsage.InitializeEncode(
23: CERTENROLLLib.X509KeyUsageFlags.XCN_CERT_DIGITAL_SIGNATURE_KEY_USAGE |
24: CERTENROLLLib.X509KeyUsageFlags.XCN_CERT_NON_REPUDIATION_KEY_USAGE |
25: CERTENROLLLib.X509KeyUsageFlags.XCN_CERT_KEY_ENCIPHERMENT_KEY_USAGE |
26: CERTENROLLLib.X509KeyUsageFlags.XCN_CERT_DATA_ENCIPHERMENT_KEY_USAGE);
27: objPkcs10.X509Extensions.Add((CX509Extension)objExtensionKeyUsage);
28:
29: var objObjectId = new CObjectId();
30: var objObjectIds = new CObjectIds();
31: var objX509ExtensionEnhancedKeyUsage = new CX509ExtensionEnhancedKeyUsage();
32: objObjectId.InitializeFromValue("1.3.6.1.5.5.7.3.2");
33: objObjectIds.Add(objObjectId);
34: objX509ExtensionEnhancedKeyUsage.InitializeEncode(objObjectIds);
35: objPkcs10.X509Extensions.Add((CX509Extension)objX509ExtensionEnhancedKeyUsage);
36:
37: var objDN = new CX500DistinguishedName();
38: var subjectName = "CN = shaunxu.me, OU = ADCS, O = Blog, L = Beijng, S = Beijing, C = CN";
39: objDN.Encode(subjectName, X500NameFlags.XCN_CERT_NAME_STR_NONE);
40: objPkcs10.Subject = objDN;
41:
42: var objEnroll = new CX509Enrollment();
43: objEnroll.InitializeFromRequest(objPkcs10);
44: var strRequest = objEnroll.CreateRequest(EncodingType.XCN_CRYPT_STRING_BASE64);
45: return strRequest;
46: }
Send Certificate Request to CA
Send the certificate request message we had just generated to a CA would be easy. In fact we can save the message into a text file and copy to the CA server, request the certificate by using the CA manage portal. But if we are going to use C# then we will need to use CertCli COM to send the message, and verify the status by retrieving the disposition status and request ID.
First we will create the object of CertCli and invoke its Submit method by passing the certificate request message and CA address. The CA address should be in the format:
- [CA_SERVER_IP]\[CA_NAME]
- [CA_SERVER_NAME]\[CA_NAME]
The CA name can be found by logging on the CA server and navigate to the Active Directory Certificate Service node in Server Manager window. Right click the CA node and select Properties.

The code would be like this.
1: private static int SendCertificateRequest(string message)
2: {
3: var objCertRequest = new CCertRequest();
4: var iDisposition = objCertRequest.Submit(
5: CR_IN_BASE64 | CR_IN_FORMATANY,
6: message,
7: string.Empty,
8: @"192.168.56.101\pal-CPAL-CA");
9: }
The return value of the Submit method indicates the status of the certificate request, normally it would be in the statuses below.
- 0x03: Issued. This means the certificate had been issued by the CA that we can download and install to the local machine.
- 0x05: Under submission. This means the request was in pending status, the certificate administrator need to issue it manually.
- Failed due to some reason. We can use CCertRequest.GetDispositionMessage method to retrieve the failure reason.
1: switch(iDisposition)
2: {
3: case CR_DISP_ISSUED:
4: Console.WriteLine("The certificate had been issued.");
5: break;
6: case CR_DISP_UNDER_SUBMISSION:
7: Console.WriteLine("The certificate is still pending.");
8: break;
9: default:
10: Console.WriteLine("The submission failed: " + objCertRequest.GetDispositionMessage());
11: Console.WriteLine("Last status: " + objCertRequest.GetLastStatus().ToString());
12: break;
13: }
14: return objCertRequest.GetRequestId();
Download and Install Certificate
Once the certificate request had been sent, it will be processed by CA request handling module. By default, for standalone CA all certificate requests will be at pending status and wait for the administrator to issue manually. The administrator should go to the CA portal and select the Pending Requests node, right click on the item and click Issue. (The administrator can click Deny if he/she don’t want to send this certificate.)

Then go to the Issued Certificates node we can see the issued certificate available. The certificate couldn’t be downloaded and installed into the machine where it requested unless in this status.

Back to the source to implement the code to download and install the full certificate. First of all, we will utilize the CCertRequest.RetrievePending method to detect the status of our certificate request had sent. If it’s issued then we will download the response, which is in PKCS #7 format, to the local machine by using the method CCertRequest.GetCertificate.
1: private static void DownloadAndInstallCert(int requestId)
2: {
3: var objCertRequest = new CCertRequest();
4: var iDisposition = objCertRequest.RetrievePending(requestId, @"192.168.56.101\pal-CPAL-CA");
5:
6: if (iDisposition == CR_DISP_ISSUED)
7: {
8: var cert = objCertRequest.GetCertificate(CR_OUT_BASE64 | CR_OUT_CHAIN);
9: }
10: }
Then initialize the CertEnroll object from context user certificate store, install the response that we had just retrieved.
1: var objEnroll = new CX509Enrollment();
2: objEnroll.Initialize(X509CertificateEnrollmentContext.ContextUser);
3: objEnroll.InstallResponse(
4: InstallResponseRestrictionFlags.AllowUntrustedRoot,
5: cert,
6: EncodingType.XCN_CRYPT_STRING_BASE64,
7: null);
8: Console.WriteLine("The certificate had been installed successfully.");
After it downloaded and installed the certificate we can check it’s in the current user certificate store. And from now on, since the certificate was in the store, we can use X509Store and X509Certificate2 class to export and view the attributes such as subject, thumbprint, etc..

The full code is listed below.
1: using System;
2: using System.Collections.Generic;
3: using System.Linq;
4: using System.Text;
5: using CERTENROLLLib;
6: using CERTCLIENTLib;
7:
8: namespace ShaunXu.ADCSviaCSharp
9: {
10: class Program
11: {
12: private static string CreateCertRequestMessage()
13: {
14: var objCSPs = new CCspInformations();
15: objCSPs.AddAvailableCsps();
16:
17: var objPrivateKey = new CX509PrivateKey();
18: objPrivateKey.Length = 2048;
19: objPrivateKey.KeySpec = X509KeySpec.XCN_AT_SIGNATURE;
20: objPrivateKey.KeyUsage = X509PrivateKeyUsageFlags.XCN_NCRYPT_ALLOW_ALL_USAGES;
21: objPrivateKey.MachineContext = false;
22: objPrivateKey.ExportPolicy = X509PrivateKeyExportFlags.XCN_NCRYPT_ALLOW_EXPORT_FLAG;
23: objPrivateKey.CspInformations = objCSPs;
24: objPrivateKey.Create();
25:
26: var objPkcs10 = new CX509CertificateRequestPkcs10();
27: objPkcs10.InitializeFromPrivateKey(
28: X509CertificateEnrollmentContext.ContextUser,
29: objPrivateKey,
30: string.Empty);
31:
32: var objExtensionKeyUsage = new CX509ExtensionKeyUsage();
33: objExtensionKeyUsage.InitializeEncode(
34: CERTENROLLLib.X509KeyUsageFlags.XCN_CERT_DIGITAL_SIGNATURE_KEY_USAGE |
35: CERTENROLLLib.X509KeyUsageFlags.XCN_CERT_NON_REPUDIATION_KEY_USAGE |
36: CERTENROLLLib.X509KeyUsageFlags.XCN_CERT_KEY_ENCIPHERMENT_KEY_USAGE |
37: CERTENROLLLib.X509KeyUsageFlags.XCN_CERT_DATA_ENCIPHERMENT_KEY_USAGE);
38: objPkcs10.X509Extensions.Add((CX509Extension)objExtensionKeyUsage);
39:
40: var objObjectId = new CObjectId();
41: var objObjectIds = new CObjectIds();
42: var objX509ExtensionEnhancedKeyUsage = new CX509ExtensionEnhancedKeyUsage();
43: objObjectId.InitializeFromValue("1.3.6.1.5.5.7.3.2");
44: objObjectIds.Add(objObjectId);
45: objX509ExtensionEnhancedKeyUsage.InitializeEncode(objObjectIds);
46: objPkcs10.X509Extensions.Add((CX509Extension)objX509ExtensionEnhancedKeyUsage);
47:
48: var objDN = new CX500DistinguishedName();
49: var subjectName = "CN = shaunxu.me, OU = ADCS, O = Blog, L = Beijng, S = Beijing, C = CN";
50: objDN.Encode(subjectName, X500NameFlags.XCN_CERT_NAME_STR_NONE);
51: objPkcs10.Subject = objDN;
52:
53: var objEnroll = new CX509Enrollment();
54: objEnroll.InitializeFromRequest(objPkcs10);
55: var strRequest = objEnroll.CreateRequest(EncodingType.XCN_CRYPT_STRING_BASE64);
56: return strRequest;
57: }
58:
59: private const int CC_DEFAULTCONFIG = 0;
60: private const int CC_UIPICKCONFIG = 0x1;
61: private const int CR_IN_BASE64 = 0x1;
62: private const int CR_IN_FORMATANY = 0;
63: private const int CR_IN_PKCS10 = 0x100;
64: private const int CR_DISP_ISSUED = 0x3;
65: private const int CR_DISP_UNDER_SUBMISSION = 0x5;
66: private const int CR_OUT_BASE64 = 0x1;
67: private const int CR_OUT_CHAIN = 0x100;
68:
69: private static int SendCertificateRequest(string message)
70: {
71: var objCertRequest = new CCertRequest();
72: var iDisposition = objCertRequest.Submit(
73: CR_IN_BASE64 | CR_IN_FORMATANY,
74: message,
75: string.Empty,
76: @"192.168.56.101\pal-CPAL-CA");
77:
78: switch(iDisposition)
79: {
80: case CR_DISP_ISSUED:
81: Console.WriteLine("The certificate had been issued.");
82: break;
83: case CR_DISP_UNDER_SUBMISSION:
84: Console.WriteLine("The certificate is still pending.");
85: break;
86: default:
87: Console.WriteLine("The submission failed: " + objCertRequest.GetDispositionMessage());
88: Console.WriteLine("Last status: " + objCertRequest.GetLastStatus().ToString());
89: break;
90: }
91: return objCertRequest.GetRequestId();
92: }
93:
94: private static void DownloadAndInstallCert(int requestId)
95: {
96: var objCertRequest = new CCertRequest();
97: var iDisposition = objCertRequest.RetrievePending(requestId, @"192.168.56.101\pal-CPAL-CA");
98:
99: if (iDisposition == CR_DISP_ISSUED)
100: {
101: var cert = objCertRequest.GetCertificate(CR_OUT_BASE64 | CR_OUT_CHAIN);
102: var objEnroll = new CX509Enrollment();
103: objEnroll.Initialize(X509CertificateEnrollmentContext.ContextUser);
104: objEnroll.InstallResponse(
105: InstallResponseRestrictionFlags.AllowUntrustedRoot,
106: cert,
107: EncodingType.XCN_CRYPT_STRING_BASE64,
108: null);
109: Console.WriteLine("The certificate had been installed successfully.");
110: }
111: }
112:
113: static void Main(string[] args)
114: {
115: Console.WriteLine("Request a new certificate? (y|n)");
116: if (Console.ReadLine() == "y")
117: {
118: var request = CreateCertRequestMessage();
119: var id = SendCertificateRequest(request);
120: Console.WriteLine("Request ID: " + id.ToString());
121: }
122:
123: Console.WriteLine("Download & install certificate? (y|n)");
124: if (Console.ReadLine() == "y")
125: {
126: Console.WriteLine("Request ID?");
127: var id = int.Parse(Console.ReadLine());
128: DownloadAndInstallCert(id);
129: }
130: }
131: }
132: }
Enterprise CA and Certificate Template
In the section above we discussed on how to use C# to communicate with AD CS, that is a standalone CA. A standalone CA has some limitation comparing with the enterprise CA. The biggest difference is that, the standalone CA cannot use the certificate templates.
When we implement the certificate request function, we specified everything the certificate needs. And for a CA there’s no way to define what kind of information can be set by request, what policy should the request follow. And there’s no way to define how long the certificate will be valid, which is the validity period, as well. All certificates issued by a standalone CA will have the same validity period, which is defined at the register in CA server. But if we are using enterprise CA, we can define vary rules and validity period in each template.
The enterprise templates are stored in the active directory, which means all CAs in the AD can select which templates they can use. This is a good way to control the certificate issuing permission.
You can verify if a CA is enterprise or not by opening the CA portal. If there’s a sub folder named Certificate Templates it means this is an enterprise CA.

Let’s create a template and specify some rules. Click the Certificate Templates node which under the Active Directory Certificate Service node and select a template named Computer. Right click the template and click Duplicate Template.
You can not create a brand new template. Instead you have to duplicate an existing template.

Select Windows Server 2008 Enterprise version on the popping up windows and specify a template name. In the template properties window we can see that it’s possible to define the validity period of it. All certificates that requested and issued on this template will have the same validity period.

And there are many items we can define as well. For example we can have the “Client Authenticate” in the application policies extension, which we had specified in code in previous sample.

And we can define what kind of value can be set to the subject in certificates. This provides a good way for certificate administrator to control the value of the certificates. For example, if the certificate is to represent a domain user, the subject must be a valid AD user. But in this case we will let the request supply the subject which means no control on CA side.

Once we created the template we also need to issue this template to this CA server, which means it can be received and issued by this CA. Right click the Certificate Templates node and select New > Certificate Template to Issue, and select the template we have just created.

Now the template is ready for use. Then we will change our code to send request to enterprise CA with template specified.
Send Request to Enterprise CA with Template
Send the certificate request to an enterprise CA would be very similar as what we did on a standalone CA. Previously when we generated the key pair we used an empty string on the template name parameter. So now for enterprise CA we will specify which template we are going to use.
1: var objPkcs10 = new CX509CertificateRequestPkcs10();
2: objPkcs10.InitializeFromPrivateKey(
3: X509CertificateEnrollmentContext.ContextUser,
4: objPrivateKey,
5: "ShaunXu");
It’s not allowed to request a certificate without template specified on an enterprise CA. This means we have to set a template. On the other hand, standalone CA does not allow the request related with a template.
Seems that we finished, but if we just execute it will throw an exception to us, said that the file exists when adding some extensions.

The exception message could be a little bit confusing. In fact this is because we defined something which had been defined in the certificate template. If we dig into the source code we can see that the exception occurred when we added the key usage extension.

And if we get back to the CA server and open the template we are using, we can find that the key usage had been defined in the template. This means in the code, or in the certificate request we should not specify it again.

Hence we need to comment the code for adding the key usage, also we need to comment the enhanced key usage part since it had been defined in the template, too. Because we let the request supply the subject name so here we can still specify the subject information in the request. The method for generating request message would be like this.
1: private static string CreateCertRequestMessage()
2: {
3: var objCSPs = new CCspInformations();
4: objCSPs.AddAvailableCsps();
5:
6: var objPrivateKey = new CX509PrivateKey();
7: objPrivateKey.Length = 2048;
8: objPrivateKey.KeySpec = X509KeySpec.XCN_AT_SIGNATURE;
9: objPrivateKey.KeyUsage = X509PrivateKeyUsageFlags.XCN_NCRYPT_ALLOW_ALL_USAGES;
10: objPrivateKey.MachineContext = false;
11: objPrivateKey.ExportPolicy = X509PrivateKeyExportFlags.XCN_NCRYPT_ALLOW_EXPORT_FLAG;
12: objPrivateKey.CspInformations = objCSPs;
13: objPrivateKey.Create();
14:
15: var objPkcs10 = new CX509CertificateRequestPkcs10();
16: objPkcs10.InitializeFromPrivateKey(
17: X509CertificateEnrollmentContext.ContextUser,
18: objPrivateKey,
19: "ShaunXu");
20:
21: //var objExtensionKeyUsage = new CX509ExtensionKeyUsage();
22: //objExtensionKeyUsage.InitializeEncode(
23: // CERTENROLLLib.X509KeyUsageFlags.XCN_CERT_DIGITAL_SIGNATURE_KEY_USAGE |
24: // CERTENROLLLib.X509KeyUsageFlags.XCN_CERT_NON_REPUDIATION_KEY_USAGE |
25: // CERTENROLLLib.X509KeyUsageFlags.XCN_CERT_KEY_ENCIPHERMENT_KEY_USAGE |
26: // CERTENROLLLib.X509KeyUsageFlags.XCN_CERT_DATA_ENCIPHERMENT_KEY_USAGE);
27: //objPkcs10.X509Extensions.Add((CX509Extension)objExtensionKeyUsage);
28:
29: //var objObjectId = new CObjectId();
30: //var objObjectIds = new CObjectIds();
31: //var objX509ExtensionEnhancedKeyUsage = new CX509ExtensionEnhancedKeyUsage();
32: //objObjectId.InitializeFromValue("1.3.6.1.5.5.7.3.2");
33: //objObjectIds.Add(objObjectId);
34: //objX509ExtensionEnhancedKeyUsage.InitializeEncode(objObjectIds);
35: //objPkcs10.X509Extensions.Add((CX509Extension)objX509ExtensionEnhancedKeyUsage);
36:
37: var objDN = new CX500DistinguishedName();
38: var subjectName = "CN = entprise.shaunxu.me, OU = ADCS, O = Blog, L = Beijng, S = Beijing, C = CN";
39: objDN.Encode(subjectName, X500NameFlags.XCN_CERT_NAME_STR_NONE);
40: objPkcs10.Subject = objDN;
41:
42: var objEnroll = new CX509Enrollment();
43: objEnroll.InitializeFromRequest(objPkcs10);
44: var strRequest = objEnroll.CreateRequest(EncodingType.XCN_CRYPT_STRING_BASE64);
45: return strRequest;
46: }
It works well this time and installed the certificate successfully. If we open the certificate store in MMC we can see the new one with the template displayed.

Certificate Renewal
A certificate must have a limited validity period. For example the certificate we had just request before is valid through 2012-01-13 07:21:48 to 20124-01-12 07:21:48.

When the certificate is going to be expired the operation system will send the renew request to the CA server automatically to attempt renew it. But we can ask to renew it by our code.
To send a certificate renewal message we must have this certificate installed in the certificate store. It could be in local machine or current user store. The first step is to find it by using the X509Store.Certificates.Find.
1: private static int Renew()
2: {
3: X509Certificate2 certificate = null;
4: X509Store store = new X509Store(StoreLocation.CurrentUser);
5: try
6: {
7: store.Open(OpenFlags.ReadWrite);
8: certificate = store.Certificates.Find(X509FindType.FindByThumbprint, "c1555218deed2c6dbe5101178617ef7628388a85", false)[0];
9: }
10: catch (Exception ex)
11: {
12: Console.WriteLine(ex.ToString());
13: }
14: finally
15: {
16: store.Close();
17: }
18: }
The certificate renew request is in PKCS #7 format. So in the next step, we will create an object of PKCS #7 and initialize it from the certificate we had just found from the certificate store. When initializing we’d specify that this is a renew request in the parameter. We also need to specify that the new certificate will inherit the validity period and the key pair from the existing one.
1: var objPkcs7 = new CX509CertificateRequestPkcs7();
2: objPkcs7.InitializeFromCertificate(
3: X509CertificateEnrollmentContext.ContextUser,
4: true,
5: Convert.ToBase64String(certificate.RawData),
6: EncodingType.XCN_CRYPT_STRING_BASE64,
7: X509RequestInheritOptions.InheritPrivateKey & X509RequestInheritOptions.InheritValidityPeriodFlag);
Then the following code would be very similar with what we did to send the new request before. Using the CertEnroll to generate the request message and send it out by CertCli, and check the disposition status.
1: var objEnroll = new CX509Enrollment();
2: objEnroll.InitializeFromRequest(objPkcs7);
3: var message = objEnroll.CreateRequest(EncodingType.XCN_CRYPT_STRING_BASE64);
4:
5: var objCertRequest = new CCertRequest();
6: var iDisposition = objCertRequest.Submit(
7: CR_IN_BASE64 | CR_IN_FORMATANY,
8: message,
9: string.Empty,
10: @"192.168.56.101\pal-CPAL-CA");
11:
12: switch (iDisposition)
13: {
14: case CR_DISP_ISSUED:
15: Console.WriteLine("The certificate had been issued.");
16: break;
17: case CR_DISP_UNDER_SUBMISSION:
18: Console.WriteLine("The certificate is still pending.");
19: break;
20: default:
21: Console.WriteLine("The submission failed: " + objCertRequest.GetDispositionMessage());
22: Console.WriteLine("Last status: " + objCertRequest.GetLastStatus().ToString());
23: break;
24: }
25: return objCertRequest.GetRequestId();
When the request had been sent to the CA, based on the request handling policy it will be issued automatically or manually by the administrator. To download and install the renewed certificate would be the same like what we did before, so just use the method that download the new certificate should be fine.
The full code would be like this. Just note that I hard-coded my certificate thumbprint in the code.
1: using System;
2: using System.Collections.Generic;
3: using System.Linq;
4: using System.Text;
5: using CERTENROLLLib;
6: using CERTCLIENTLib;
7: using System.Security.Cryptography.X509Certificates;
8:
9: namespace ShaunXu.ADCSviaCSharp
10: {
11: class Program
12: {
13: private static string CreateCertRequestMessage()
14: {
15: var objCSPs = new CCspInformations();
16: objCSPs.AddAvailableCsps();
17:
18: var objPrivateKey = new CX509PrivateKey();
19: objPrivateKey.Length = 2048;
20: objPrivateKey.KeySpec = X509KeySpec.XCN_AT_SIGNATURE;
21: objPrivateKey.KeyUsage = X509PrivateKeyUsageFlags.XCN_NCRYPT_ALLOW_ALL_USAGES;
22: objPrivateKey.MachineContext = false;
23: objPrivateKey.ExportPolicy = X509PrivateKeyExportFlags.XCN_NCRYPT_ALLOW_EXPORT_FLAG;
24: objPrivateKey.CspInformations = objCSPs;
25: objPrivateKey.Create();
26:
27: var objPkcs10 = new CX509CertificateRequestPkcs10();
28: objPkcs10.InitializeFromPrivateKey(
29: X509CertificateEnrollmentContext.ContextUser,
30: objPrivateKey,
31: "ShaunXu");
32:
33: //var objExtensionKeyUsage = new CX509ExtensionKeyUsage();
34: //objExtensionKeyUsage.InitializeEncode(
35: // CERTENROLLLib.X509KeyUsageFlags.XCN_CERT_DIGITAL_SIGNATURE_KEY_USAGE |
36: // CERTENROLLLib.X509KeyUsageFlags.XCN_CERT_NON_REPUDIATION_KEY_USAGE |
37: // CERTENROLLLib.X509KeyUsageFlags.XCN_CERT_KEY_ENCIPHERMENT_KEY_USAGE |
38: // CERTENROLLLib.X509KeyUsageFlags.XCN_CERT_DATA_ENCIPHERMENT_KEY_USAGE);
39: //objPkcs10.X509Extensions.Add((CX509Extension)objExtensionKeyUsage);
40:
41: //var objObjectId = new CObjectId();
42: //var objObjectIds = new CObjectIds();
43: //var objX509ExtensionEnhancedKeyUsage = new CX509ExtensionEnhancedKeyUsage();
44: //objObjectId.InitializeFromValue("1.3.6.1.5.5.7.3.2");
45: //objObjectIds.Add(objObjectId);
46: //objX509ExtensionEnhancedKeyUsage.InitializeEncode(objObjectIds);
47: //objPkcs10.X509Extensions.Add((CX509Extension)objX509ExtensionEnhancedKeyUsage);
48:
49: var objDN = new CX500DistinguishedName();
50: var subjectName = "CN = entprise.shaunxu.me, OU = ADCS, O = Blog, L = Beijng, S = Beijing, C = CN";
51: objDN.Encode(subjectName, X500NameFlags.XCN_CERT_NAME_STR_NONE);
52: objPkcs10.Subject = objDN;
53:
54: var objEnroll = new CX509Enrollment();
55: objEnroll.InitializeFromRequest(objPkcs10);
56: var strRequest = objEnroll.CreateRequest(EncodingType.XCN_CRYPT_STRING_BASE64);
57: return strRequest;
58: }
59:
60: private const int CC_DEFAULTCONFIG = 0;
61: private const int CC_UIPICKCONFIG = 0x1;
62: private const int CR_IN_BASE64 = 0x1;
63: private const int CR_IN_FORMATANY = 0;
64: private const int CR_IN_PKCS10 = 0x100;
65: private const int CR_DISP_ISSUED = 0x3;
66: private const int CR_DISP_UNDER_SUBMISSION = 0x5;
67: private const int CR_OUT_BASE64 = 0x1;
68: private const int CR_OUT_CHAIN = 0x100;
69:
70: private static int SendCertificateRequest(string message)
71: {
72: var objCertRequest = new CCertRequest();
73: var iDisposition = objCertRequest.Submit(
74: CR_IN_BASE64 | CR_IN_FORMATANY,
75: message,
76: string.Empty,
77: @"192.168.56.101\pal-CPAL-CA");
78:
79: switch(iDisposition)
80: {
81: case CR_DISP_ISSUED:
82: Console.WriteLine("The certificate had been issued.");
83: break;
84: case CR_DISP_UNDER_SUBMISSION:
85: Console.WriteLine("The certificate is still pending.");
86: break;
87: default:
88: Console.WriteLine("The submission failed: " + objCertRequest.GetDispositionMessage());
89: Console.WriteLine("Last status: " + objCertRequest.GetLastStatus().ToString());
90: break;
91: }
92: return objCertRequest.GetRequestId();
93: }
94:
95: private static void DownloadAndInstallCert(int requestId)
96: {
97: var objCertRequest = new CCertRequest();
98: var iDisposition = objCertRequest.RetrievePending(requestId, @"192.168.56.101\pal-CPAL-CA");
99:
100: if (iDisposition == CR_DISP_ISSUED)
101: {
102: var cert = objCertRequest.GetCertificate(CR_OUT_BASE64 | CR_OUT_CHAIN);
103: var objEnroll = new CX509Enrollment();
104: objEnroll.Initialize(X509CertificateEnrollmentContext.ContextUser);
105: objEnroll.InstallResponse(
106: InstallResponseRestrictionFlags.AllowUntrustedRoot,
107: cert,
108: EncodingType.XCN_CRYPT_STRING_BASE64,
109: null);
110: Console.WriteLine("The certificate had been installed successfully.");
111: }
112: }
113:
114: private static int Renew()
115: {
116: X509Certificate2 certificate = null;
117: X509Store store = new X509Store(StoreLocation.CurrentUser);
118: try
119: {
120: store.Open(OpenFlags.ReadWrite);
121: certificate = store.Certificates.Find(X509FindType.FindByThumbprint, "c1555218deed2c6dbe5101178617ef7628388a85", false)[0];
122: }
123: catch (Exception ex)
124: {
125: Console.WriteLine(ex.ToString());
126: }
127: finally
128: {
129: store.Close();
130: }
131:
132: var objPkcs7 = new CX509CertificateRequestPkcs7();
133: objPkcs7.InitializeFromCertificate(
134: X509CertificateEnrollmentContext.ContextUser,
135: true,
136: Convert.ToBase64String(certificate.RawData),
137: EncodingType.XCN_CRYPT_STRING_BASE64,
138: X509RequestInheritOptions.InheritPrivateKey & X509RequestInheritOptions.InheritValidityPeriodFlag);
139:
140: var objEnroll = new CX509Enrollment();
141: objEnroll.InitializeFromRequest(objPkcs7);
142: var message = objEnroll.CreateRequest(EncodingType.XCN_CRYPT_STRING_BASE64);
143:
144: var objCertRequest = new CCertRequest();
145: var iDisposition = objCertRequest.Submit(
146: CR_IN_BASE64 | CR_IN_FORMATANY,
147: message,
148: string.Empty,
149: @"192.168.56.101\pal-CPAL-CA");
150:
151: switch (iDisposition)
152: {
153: case CR_DISP_ISSUED:
154: Console.WriteLine("The certificate had been issued.");
155: break;
156: case CR_DISP_UNDER_SUBMISSION:
157: Console.WriteLine("The certificate is still pending.");
158: break;
159: default:
160: Console.WriteLine("The submission failed: " + objCertRequest.GetDispositionMessage());
161: Console.WriteLine("Last status: " + objCertRequest.GetLastStatus().ToString());
162: break;
163: }
164: return objCertRequest.GetRequestId();
165: }
166:
167: static void Main(string[] args)
168: {
169: Console.WriteLine("Request a new certificate? (y|n)");
170: if (Console.ReadLine() == "y")
171: {
172: var request = CreateCertRequestMessage();
173: var id = SendCertificateRequest(request);
174: Console.WriteLine("Request ID: " + id.ToString());
175: }
176:
177: Console.WriteLine("Download & install certificate? (y|n)");
178: if (Console.ReadLine() == "y")
179: {
180: Console.WriteLine("Request ID?");
181: var id = int.Parse(Console.ReadLine());
182: DownloadAndInstallCert(id);
183: }
184:
185: Console.WriteLine("Renew an existing certificate? (y|n)");
186: if (Console.ReadLine() == "y")
187: {
188: var id = Renew();
189: Console.WriteLine("Request ID: " + id.ToString());
190: }
191:
192: Console.WriteLine("Download & install renewed certificate? (y|n)");
193: if (Console.ReadLine() == "y")
194: {
195: Console.WriteLine("Request ID?");
196: var id = int.Parse(Console.ReadLine());
197: DownloadAndInstallCert(id);
198: }
199: }
200: }
201: }
After executed and back to the certificate store we can see the certificate renewed by the CA, which its validity period had been changed from 2012-01-13 07:21:48 - 20124-01-12 7:21:48 to 2012-01-13 07:52:36 - 20124-01-12 07:52:36. The old certificate had been archived by the operation system automatically.

Request Certificate Out of Domain
As I mentioned before, the sample code in this post should be executed in the same server of CA, or at least a server in the CA’s domain. This is the limitation when using CertCli and CertEnroll to communication with CA.
First of all, CA integrated with active directory. By default, only the authenticated user can request certificate. Secondly, if we are using enterprise CA, all templates are being stored in the AD. When the client request a new certificate with template specified, it will try to retrieve the template information from AD.

So before Windows Server 2008 R2 it would be very difficult to communicate to the CA from the client that out of the domain. This is why, in the beginning of this post I mentioned, that I’m working on a WCF web service working as a proxy to let the client (PC, laptop and mobile) connect and request certificates out side from the domain.

But if we have the Windows Server 2008 R2, it introduced a new component of AD CS which called Certificate Enrollment Web Services. Basically it includes two web services that wraps the LDAP invoke and DCOM invoke, so that the client can communicate with them through HTTPS with WS-Trust.

Summary
When I was beginning to work on this task I found there is very little information on the internet about how to communicate with the CA by C#, or even by code. I think this is because, CA is something related with IT Pro that more focused on how to install and configure. IT Pro doesn’t care about the code. Communicating from C# is more related with development but developer doesn’t care about the CA since it’s something about IT infrastructure. So this topic is in the middle of the two worlds - IT and development.
But I think when we move to the cloud computing, the enterprise application, most of them we need to migration the existing AD integrated architecture to certificate-based architecture, which need to replace the existing security, authentication, identification parts.
In this post I introduced a little bit background knowledge about CA, especially the AD CS. I demonstrated how to request, install and renew a certificate to a standalone and an enterprise CA by C# through COM. I also mentioned a little bit about the new Certificate Enrollment Web Service. Thanks to the great post and articles I referred recently, this and this.
There are still some topics I didn’t cover. For example the online revocation list, SCEP, OCSP, etc.. We need them if we need to build a fully, robust, online certificate solution.
Hope this helps,
Shaun
All documents and related graphics, codes are provided "AS IS" without warranty of any kind.
Copyright © Shaun Ziyan Xu. This work is licensed under the Creative Commons License.
The SQL Azure Federation had been publically launched several weeks ago and this is one of the most existing features I’m looking forward. This might be the first post of SQL Azure Federation, and hopefully not the last one.
Some Backgrounds
SQL Azure Federation was mentioned in about 2009. The Microsoft told that there will be a feature in SQL Azure allow users to split one database into many based on some sort of rules But from the client side perspective, user can interact their data as if in one database. This feature was called SQL Azure Federation. At the TechED China 2010, when I talked with Zach, a technical evangelist at Microsoft focus on Windows Azure platform when the SQL Azure Federation would be available, or even though CTP for MVPs or partners. But at that moment Zach said he still don’t have a chance to take a look at it. I remembered it was Dec, 2010.
But the good news came on June 2011, Microsoft had opened the nomination of the PE Program of SQL Azure Federation. And I was very lucky being approved to use this feature and provide some early feedback to Microsoft. During the PE program I had attended several online meetings and have the chance to play with it in some of my projects. Cihan Biyikoglu, the program manager in SQL Azure team and his group gave me a lot of information and suggestion on how the SQL Azure Federation works and how to use it in the best approach. During the PE program, the Microsoft said that SQL Azure Federation will be available at the end of 2011.
In 12th, Dec 2011 the SQL Azure Federation was launched with the SQL Azure Q4 2011 Service Release, with some other cool features, which you can have a reference here.
What’s (Data) Federation
Federation is not a new concept or technology in computer science. But it could be meaning differently in different context, such as WS-Federation, Active Directory Federation Services, etc. But in this, and the following blog posts about SQL Azure Federation, the word “Federation” I mentioned would be focus on the Data Federation.
Federation, as known as the data shard or partitioning, is a database design principle whereby rows of a database table are held separately. It also be known as horizontal partitioning. Assuming that we have a database and a table named Product which contains the product records, and now there’s 10,000 records there. After a while the number of the records raised to 10,000,000. If they are all in the same database it might cause performance problem. In this case, there are two solutions we can choose. One is to increase the hardware of the database server. For example, more CPU cores, more memory and higher network bandwidth. This approach we called Scale Up. But there will be a limitation in this way, we can not add cores, memory and much as we want.

Another one is to split the database across to multiple databases and servers. Let say we divide the database and Product table in 10 databases (servers), so in each database there will be only 1,000,000 records in Product table. We split the data volume across the multiple servers, as well as split the network load, CPU and memory usage. Furthermore, since we can have as many servers as we need, there will be no limitation to extend our system in this approach. This is called Scale Out.

SQL Azure Federation implemented this approach, which helps us to split one database into many that we called federation members, to increase the performance.
Horizontal Partitioning and Vertical Partitioning
Let’s deep into the tables in the databases to be federated. If a table was too big that introduced some performance issues, like the Product table I mentioned previously which has 10,000,000 records, we need to split them across to the databases. There are also two approaches we have, the horizontal partitioning and vertical partitioning.
Horizontal partitioning, likes you use a knife to cut the table horizontally, which means split the table by rows. The tables after the partitioning would be:
- Have the exactly same schema.
- One database command on a table would be the same on any other tables.
- Each record represent the full information.
- Can retrieve all information within one query.
- Need to touch all databases (partitioned tables) and aggressive process when fan-out query, like SUM, AVG, COUNT, etc..

Vertical partitioning means split the table by columns. The table after the partitioning would be:
- Each table would be in different schema.
- Query on each tables would be different. And may introduce some data redundant.
- Each record in a table just represent partial information.
- Easy to implement COD (Cost Oriented Design) by moving the columns in cheaper storage. For example moving the binary columns into Windows Azure Blob Storage.
- Need multiple queries when retrieve some information.
- Fan-out query normally can be finished within one query.

SQL Azure Federation utilize the horizontal partitioning to split the tables in multiple databases. But it’s not that simple as I mentioned above. When using horizontal partitioning, we need to firstly define the rule on how to divide the tables. In the picture above, it indicates that the table will be divided by ID, all records that ID less than 4 would into one database, and others (larger than 3) will be in another.
But if we have some tables related, for example UserOrder table which have UserID as well, we need to split that table by the same rule, to make sure that all records in the tables that referred to the same UserID must be in the same partition. This will make the JOIN query quick and easy.

There are also some tables that doesn’t related with the ID in this example, for instance the countries, cities, etc.. When we partitioning the database, these tables should not be split and need to be copied to each databases.

The last thing is that, there might be some tables that represent the global information, like the system settings, metadata and schema data. They should not be split and should not be copied into the databases. So they will be remained in the original database we can call it root database.

Now we have a fully implementation on the horizontal partitioning. We have the rule on how the data should be split. We ensure that all related records will be stored in the same database node and the lookup tables will copied across them. We also have the root database with tables that have the global information stored. I can tell you that this is what SQL Azure Federation does for us, automatically and safely.
SQL Azure Federation Concepts
SQL Azure Federation introduces some new concepts around the data partitioning. They are federation, federation distribution, federation member, root database, federation column, federated table, reference table, center table and atomic unit.
Federation is the rule on how to partition our data. There can be more than one federations in one system. But on a particular table there has to be only one federation apply. This means, for example we have a table with columns UserID and ProductID. We can apply a federation that split the table by UserID, or by ProductID, but we cannot apply both of them on the same time.
A federation includes:
- Federation Name: The name of the federation which can be used when alter or connect.
- Federation Distribution Name: The identity name that to split the tables in this federation. For example if we want to split the tables based on the UserID then we can name the federation distribution name as “userId”, “uid” or whatever.
- Federation Distribution Data Type: The data type that the federation distribution name is. Currently the SQL Azure Federation only support int, bigint, uniqueidentifier and varbinary(n).
- Distribution Type: How SQL Azure Federation will split the data. There are many ways to split the data such as mod, consistent hashing but currently SQL Azure Federation only support “range”.
After we split database into many, based on the federation we specified, the small databases called Federation Member. The original database, may contains some metadata tables would be called Root Database or Federation Root. The tables that is being split into the federation members are Federated Table.

The tables that represent the lookup data can be copied to each federation members automatically by SQL Azure Federation, which is called Reference Table. The remaining tables in the federation root would be Center Tables.
As we discussed below, when horizontal partitioning the tables that related with the same split key (here is the federation distribution name) should be put into the same databases (federation members). For example if we move the record in Product table into federation member 1 if UserID = 3, then all records that UserID = 3 in the table ProductDetails should be moved in federation member 1 as well. In SQL Azure Federation, the group of the records that related to the same federation distribution value called Atomic Unit. This is very important and useful when using SQL Azure Federation which I will explain in the coming few posts.

Summary
In this post I covered some basic information about the data federation. I talked about the approaches that we can use to partitioning our data. I also described the different between horizontal partitioning and the the vertical partitioning, and the goal of horizontal partitioning. Finally I talked about the concept of SQL Azure Federation.
In the next post I will demonstrate how to create a database and use SQL Azure Federation, to split my original database into members.
Hope this helps,
Shaun
All documents and related graphics, codes are provided "AS IS" without warranty of any kind.
Copyright © Shaun Ziyan Xu. This work is licensed under the Creative Commons License.