Shaun Xu

The Sheep-Pen of the Shaun


News

logo

Shaun, the author of this blog is a semi-geek, clumsy developer, passionate speaker and incapable architect with about 10 years’ experience in .NET and JavaScript. He hopes to prove that software development is art rather than manufacturing. He's into cloud computing platform and technologies (Windows Azure, Amazon and Aliyun) and right now, Shaun is being attracted by JavaScript (Angular.js and Node.js) and he likes it.

Shaun is working at Worktile Inc. as the chief architect for overall design and develop worktile, a web-based collaboration and task management tool, and lesschat, a real-time communication aggregation tool.

MVP

My Stats

  • Posts - 122
  • Comments - 579
  • Trackbacks - 0

Tag Cloud


Recent Comments


Recent Posts


Article Categories


Archives


Post Categories


Image Galleries


.NET


 

We are almost done everything about the WCF transport extension over the message bus, which makes our services can be scaled out by introducing more instances over machines and servers. We had finished the structure of our transport extension and implemented the request reply mode in the 2nd post, the datagram and duplex mode in the 4th and 5th post. As I have said at the end of the 5th post, currently we can use our transport extension. But there still something left. Although are not that major as the three MEPs implementation, sometimes they are very useful. In this post I will cover the first of them: session.

 

Session in ASP.NET and WCF

If you have the experience developing the ASP.NET application you should be very familiar with the session. Session is a very important feature included in the ASP.NET runtime, and all framework built on top could use it, such as the ASP.NET WebForm, ASP.NET MVC and ASP.NET Dynamic Data.

In ASP.NET, when the client sent the first request to the server, a session ID will be generated on the server side. And by default, there will be a dictionary item created in the web application process (w3wp.exe) to associate with this session ID. Then the session ID will be replied to the client and by default, it will be stored in the cookie. After that every request the client send to the server will pass the session ID within the request body, so that the server would be able to know which this request was come from.

In this way, ASP.NET runtime makes it possible on the server side to save some information per client. When we develop an ASP.NET website we can save and retrieve this information through the SessionState. In ASP.NET runtime it find our session in the dictionary in the server memory by the session ID it has.

What I mentioned above are based on the situation that specified to use the InProc session state mode. If we use the session server or SQL Server, the dictionary and the session object may not be located in the server’s memory.

image

So we can figure out that the ASP.NET session is mainly focus on:

  • ASP.NET session is always be initialized by the server.
  • ASP.NET session solves how to keep the relationship between the requests from the same client.
  • ASP.NET provides a general way to store the session data, but not forced.

Next, let’s have a look on the WCF session. WCF session is more general than ASP.NET session. In WCF, a session just means a connection of messages. It doesn’t care about how to pass the identity between the server and the client. It also doesn’t case about how to store the session at all. As the developer of transport extension we need to figure out how to pass the session ID through the transportation.

MSDN explained the WCF session as following:

  • They are explicitly initiated and terminated by the calling application.
  • Messages delivered during a session are processed in the order in which they are received.
  • Sessions correlate a group of messages into a conversation. The meaning of that correlation is an abstraction. For instance, one session-based channel may correlate messages based on a shared network connection while another session-based channel may correlate messages based on a shared tag in the message body. The features that can be derived from the session depend on the nature of the correlation.
  • There is no general data store associated with a WCF session.

In WCF, a session will be established when the client created the channel and will be terminated once the client channel was closed. Based on the service contract definition the client will ask the server if it needs a sessionful channel. If the transport support session it will create the session and have a session ID associate with all message until this channel is closed. Regarding how to store the shared data between the messages in a session, WCF doesn’t have any principal or guild. By default, WCF combines the session and the service instance lifecycle, so that the developer can utilize the service class local variants to store the intermediate data. But this is not mandatory. In the following sections you will see how we will do to make the session works under our scaling-out architecture.

Apply the Default WCF Session Behavior

By default WCF combines the service instance mode and session to make it easy to developer to use the session. There are three modes could be specified in the service contract:

  • Required: This service requires session.
  • Allow: This service allows the session to be establish, but not mandatory.
  • Not Allow: This service doesn’t allow session.

And there are three service instance context modes which takes the responsible for managing the service instance lifecycle:

  • PerCall: Each client request will make the service create a new instance and handle it.
  • PerSession: A service instance will be created once a session was establish. All requests in this session will be handled by this instance.
  • Single: The service instance will be created once the service was opened. All clients requests will be handled by this instance.

In MSDN there is a very good table that describes the relationship between the session, service instance mode and the default WCF session behavior.

In WCF it utilize a binding element to implement the default session behavior. This binding element takes the currently channel type and if it’s a sessionful channel, it will create some special channels on top of the underlying transport channels to handle the session. For example, if it found that we are going to use session of the datagram, then it will create the ReliableOutputSessionChannelOverRequest on top our own OutputChannel. And it will takes the responsible to create session, maintain the message order etc.

To use this build-in session binding element we can just add the ReliableSessionBindingElement into the binding element collection in our binding class. In MessageBusTransportBinding class I added a local variant which saves the ReliableSessionBindingElement, and with two constructors.

   1: public class MessageBusTransportBinding : Binding
   2: {
   3:     private readonly MessageEncodingBindingElement _messageElement;
   4:     private readonly MessageBusTransportBindingElement _transportElement;
   5:     private readonly ReliableSessionBindingElement _sessionElement;
   6:  
   7:     public MessageBusTransportBinding(IBus bus)
   8:         : this(bus, SessionfulMode.Distributed)
   9:     {
  10:     }
  11:  
  12:     public MessageBusTransportBinding(IBus bus, SessionfulMode sessionfulMode)
  13:         : base()
  14:     {
  15:         _messageElement = new TextMessageEncodingBindingElement();
  16:         _transportElement = new MessageBusTransportBindingElement(bus);
  17:         if (sessionfulMode == SessionfulMode.Standard)
  18:         {
  19:             _sessionElement = new ReliableSessionBindingElement();
  20:         }
  21:     }
  22:  
  23:     ... ...
  24:  
  25: }

And when getting the binding elements we will add it based on what we want.

   1: public override BindingElementCollection CreateBindingElements()
   2: {
   3:     var elements = new BindingElementCollection();
   4:     elements.Add(_messageElement);
   5:     if (_sessionElement != null)
   6:     {
   7:         elements.Add(_sessionElement);
   8:     }
   9:     // the transport binding element must be the last one
  10:     elements.Add(_transportElement);
  11:     return elements.Clone();
  12: }

And, that’s all. Simple? OK, let’s test our sessionful transport extension. Update our service contract and the implementation class. Here I defined the service behavior as required session and the service instance mode was per session.

   1: [ServiceContract(Namespace = "http://wcf.shaunxu.me/", SessionMode= SessionMode.Required)]
   2: public interface ISampleService
   3: {
   4:     [OperationContract(IsOneWay = true)]
   5:     void Add(int value);
   6:  
   7:     [OperationContract(IsOneWay = false)]
   8:     int GetResult();
   9: }
  10:  
  11: [ServiceBehavior(InstanceContextMode = InstanceContextMode.PerSession)]
  12: public class SampleService : ISampleService
  13: {
  14:     private int _current;
  15:  
  16:     public void Add(int value)
  17:     {
  18:         _current += value;
  19:     }
  20:  
  21:     public int GetResult()
  22:     {
  23:         return _current;
  24:     }
  25: }

I added a helper method to establish a client proxy.

   1: static TChannel EstablishClientProxy<TChannel>(IBus bus, string address, SessionfulMode sessionfulMode)
   2: {
   3:     var binding = new MessageBusTransportBinding(bus, sessionfulMode);
   4:     var factory = new ChannelFactory<TChannel>(binding, address);
   5:     factory.Opened += (sender, e) =>
   6:     {
   7:         Console.WriteLine("Client connected to {0}", factory.Endpoint.ListenUri);
   8:     };
   9:     var proxy = factory.CreateChannel();
  10:     return proxy;
  11: }

Then we make our service hosted on a message bus and two client invoke it.

   1: static void Main(string[] args)
   2: {
   3:     var bus = new InProcMessageBus();
   4:     var address = "net.bus://localhost/sample";
   5:  
   6:     // establish the services
   7:     var host1 = EstablishServiceHost<ISampleService, SampleService>(bus, address, SessionfulMode.Standard);
   8:  
   9:     // establish the client
  10:     var client1 = EstablishClientProxy<ISampleService>(bus, address, SessionfulMode.Standard);
  11:     var client2 = EstablishClientProxy<ISampleService>(bus, address, SessionfulMode.Standard);
  12:     using (client1 as IDisposable)
  13:     using (client2 as IDisposable)
  14:     {
  15:         client1.Add(1);
  16:         client2.Add(4);
  17:         
  18:         client1.Add(3);
  19:         client2.Add(5);
  20:         
  21:         client1.Add(2);
  22:         client2.Add(6);
  23:  
  24:         var result1 = client1.GetResult();
  25:         var result2 = client2.GetResult();
  26:         Console.WriteLine("Client 1 Result: {0}", result1);
  27:         Console.WriteLine("Client 2 Result: {0}", result2);
  28:     }
  29:  
  30:     // close the service
  31:     host1.Close();
  32:  
  33:     Console.ReadKey();
  34: }

The execution result was like this. Since our service instance mode was per session, so all requests in the same session will use the same service instance. This is the reason we can use the service class local variant to store the intermediate data.

image

But if we added more service instance it will be failed when client invokes. This is because, under the default ReliableSessionBindingElement WCF need to track the request sequence and it’s not allowed that messages in one session was received by another service instance.

image

Hence in order to support the service scaling-out we must implement our own sessionful channels.

 

Message Bus with Session ID

Before we implement the sessionful channels, we need to send the current session ID between the service and client through the message bus. So need to amend the message structure.

   1: public class BusMessage
   2: {
   3:     public string MessageID { get; private set; }
   4:     public string SessionID { get; private set; }
   5:     public string From { get; private set; }
   6:     public string ReplyTo { get; private set; }
   7:     public string Content { get; private set; }
   8:  
   9:     public BusMessage(string messageId, string sessionId, string fromChannelId, string replyToChannelId, string content)
  10:     {
  11:         MessageID = messageId;
  12:         SessionID = sessionId;
  13:         From = fromChannelId;
  14:         ReplyTo = replyToChannelId;
  15:         Content = content;
  16:     }
  17: }

As well as the IBus interface which allows us to send the session ID.

   1: public interface IBus : IDisposable
   2: {
   3:     string SendRequest(string message, string sessionId, bool fromClient, string from, string to = null);
   4:  
   5:     void SendReply(string message, string sessionId, bool fromClient, string replyTo);
   6:  
   7:     BusMessage Receive(bool fromClient, string replyTo);
   8: }

And the underlying message entity and the in process message bus implementation.

   1: public class InProcMessageEntity
   2: {
   3:     public Guid ID { get; set; }
   4:     public string SessionID { get; set; }
   5:     public string Content { get; set; }
   6:     public bool FromClient { get; set; }
   7:     public string From { get; set; }
   8:     public string To { get; set; }
   9:  
  10:     public InProcMessageEntity()
  11:         : this(string.Empty, string.Empty, false, string.Empty, string.Empty)
  12:     {
  13:     }
  14:  
  15:     public InProcMessageEntity(string content, string sessionId, bool fromClient, string from, string to)
  16:     {
  17:         ID = Guid.NewGuid();
  18:         SessionID = sessionId;
  19:         Content = content;
  20:         FromClient = fromClient;
  21:         From = from;
  22:         To = to;
  23:     }
  24: }
   1: public class InProcMessageBus : IBus
   2: {
   3:     private readonly ConcurrentDictionary<Guid, InProcMessageEntity> _queue;
   4:     private readonly object _lock;
   5:  
   6:     public InProcMessageBus()
   7:     {
   8:         _queue = new ConcurrentDictionary<Guid, InProcMessageEntity>();
   9:         _lock = new object();
  10:     }
  11:  
  12:     public string SendRequest(string message, string sessionId, bool fromClient, string from, string to = null)
  13:     {
  14:         var entity = new InProcMessageEntity(message, sessionId, fromClient, from, to);
  15:         _queue.TryAdd(entity.ID, entity);
  16:         return entity.ID.ToString();
  17:     }
  18:  
  19:     public void SendReply(string message, string sessionId, bool fromClient, string replyTo)
  20:     {
  21:         var entity = new InProcMessageEntity(message, sessionId, fromClient, null, replyTo);
  22:         _queue.TryAdd(entity.ID, entity);
  23:     }
  24:  
  25:     public BusMessage Receive(bool fromClient, string replyTo)
  26:     {
  27:         InProcMessageEntity e = null;
  28:         while (true)
  29:         {
  30:             lock (_lock)
  31:             {
  32:                 var entity = _queue
  33:                     .Where(kvp => kvp.Value.FromClient == fromClient && (kvp.Value.To == replyTo || string.IsNullOrWhiteSpace(kvp.Value.To)))
  34:                     .FirstOrDefault();
  35:                 if (entity.Key != Guid.Empty && entity.Value != null)
  36:                 {
  37:                     _queue.TryRemove(entity.Key, out e);
  38:                 }
  39:             }
  40:             if (e == null)
  41:             {
  42:                 Thread.Sleep(100);
  43:             }
  44:             else
  45:             {
  46:                 return new BusMessage(e.ID.ToString(), e.SessionID, e.From, e.To, e.Content);
  47:             }
  48:         }
  49:     }
  50:  
  51:     public void Dispose()
  52:     {
  53:     }
  54: }

As we had changed the message bus interface, some existing code need to be changed. You can download the final code at end of this post.

 

Sessionful Channels: Request Reply Mode

In WCF if we want to implement our own sessionful channels we need to implement the channel classes for each MEP. Each session channel will have a property that returns the session object. And in WCF there is an interface to define the session which is ISession.

   1: namespace System.ServiceModel.Channels
   2: {
   3:     // Summary:
   4:     //     Defines the interface to establish a shared context among parties that exchange
   5:     //     messages by providing an ID for the communication session.
   6:     public interface ISession
   7:     {
   8:         // Summary:
   9:         //     Gets the ID that uniquely identifies the session.
  10:         //
  11:         // Returns:
  12:         //     The ID that uniquely identifies the session.
  13:         string Id { get; }
  14:     }
  15: }

As you can see the ISession interface only defines a property that returns the session ID. This means, as I said before, WCF doesn’t care about how the session data will be stored and how to establish the sessionful communication.

And regarding each MEP there will be three sub session interfaces in WCF: IOutputSession, IInputSession and IDuplexSession. First two interfaces will be used in the sessionful datagram and request reply mode, the IDuplexSession will be used in the sessionful duplex mode.

   1: // Summary:
   2: //     Defines the interface for the session implemented on the sending side of
   3: //     a one-way communication between messaging endpoints.
   4: public interface IOutputSession : ISession
   5: {
   6: }
   7:  
   8: // Summary:
   9: //     Defines the interface for the session implemented on the receiving side of
  10: //     a one-way communication between messaging endpoints.
  11: public interface IInputSession : ISession
  12: {
  13: }
  14:  
  15: // Summary:
  16: //     Defines the interface for the session implemented on each side of a bi-directional
  17: //     communication between messaging endpoints.
  18: public interface IDuplexSession : IInputSession, IOutputSession, ISession
  19: {
  20:     // Summary:
  21:     //     Begins an asynchronous operation to terminate the outbound session.
  22:     //
  23:     // Parameters:
  24:     //   callback:
  25:     //     The System.AsyncCallback delegate.
  26:     //
  27:     //   state:
  28:     //     An object that contains state information for this request.
  29:     //
  30:     // Returns:
  31:     //     The System.IAsyncResult that references the asynchronous outbound session
  32:     //     termination.
  33:     IAsyncResult BeginCloseOutputSession(AsyncCallback callback, object state);
  34:     //
  35:     // Summary:
  36:     //     Begins an asynchronous operation to terminate the outbound session with a
  37:     //     specified timeout within which the operation must complete.
  38:     //
  39:     // Parameters:
  40:     //   timeout:
  41:     //     The System.TimeSpan that specifies the interval of time within which the
  42:     //     operation must complete.
  43:     //
  44:     //   callback:
  45:     //     The System.AsyncCallback delegate.
  46:     //
  47:     //   state:
  48:     //     An object that contains state information for this request.
  49:     //
  50:     // Returns:
  51:     //     The System.IAsyncResult that references the asynchronous outbound session
  52:     //     termination.
  53:     IAsyncResult BeginCloseOutputSession(TimeSpan timeout, AsyncCallback callback, object state);
  54:     //
  55:     // Summary:
  56:     //     Terminates the outbound session that indicates that no more messages will
  57:     //     be sent from this endpoint on the channel associated with the session.
  58:     void CloseOutputSession();
  59:     //
  60:     // Summary:
  61:     //     Terminates the outbound session that indicates that no more messages will
  62:     //     be sent from this endpoint on the channel associated with the session within
  63:     //     a specified interval of time.
  64:     //
  65:     // Parameters:
  66:     //   timeout:
  67:     //     The System.TimeSpan that specifies the interval of time within which the
  68:     //     operation must complete.
  69:     void CloseOutputSession(TimeSpan timeout);
  70:     //
  71:     // Summary:
  72:     //     Completes an asynchronous operation to terminate the outbound session that
  73:     //     indicates that no more messages will be sent from this endpoint on the channel
  74:     //     associated with the session.
  75:     //
  76:     // Parameters:
  77:     //   result:
  78:     //     The System.IAsyncResult returned by a call to one of the Overload:System.ServiceModel.Channels.IDuplexSession.BeginCloseOutputSession
  79:     //     methods.
  80:     void EndCloseOutputSession(IAsyncResult result);
  81: }

So what we need to do is to implement these sessions, sessionful channels and the related channel factories and listeners.

To make our sample as simple as possible I just implemented the necessary members of these thress session interface, which is the property that returns the session ID.

   1: internal class MessageBusOutputSession : IOutputSession
   2: {
   3:     private string _id;
   4:  
   5:     public string Id
   6:     {
   7:         get
   8:         {
   9:             return _id;
  10:         }
  11:     }
  12:  
  13:     public MessageBusOutputSession(string id)
  14:     {
  15:         _id = id;
  16:     }
  17: }
  18:  
  19: internal class MessageBusInputSession : IInputSession
  20: {
  21:     private string _id;
  22:  
  23:     public string Id
  24:     {
  25:         get
  26:         {
  27:             return _id;
  28:         }
  29:     }
  30:  
  31:     public MessageBusInputSession(string id)
  32:     {
  33:         _id = id;
  34:     }
  35: }
  36:  
  37: internal class MessageBusDuplexSession : IDuplexSession
  38: {
  39:     private string _id;
  40:  
  41:     public MessageBusDuplexSession(string id)
  42:     {
  43:         _id = id;
  44:     }
  45:  
  46:     public IAsyncResult BeginCloseOutputSession(TimeSpan timeout, AsyncCallback callback, object state)
  47:     {
  48:         throw new NotImplementedException();
  49:     }
  50:  
  51:     public IAsyncResult BeginCloseOutputSession(AsyncCallback callback, object state)
  52:     {
  53:         throw new NotImplementedException();
  54:     }
  55:  
  56:     public void CloseOutputSession(TimeSpan timeout)
  57:     {
  58:         throw new NotImplementedException();
  59:     }
  60:  
  61:     public void CloseOutputSession()
  62:     {
  63:         throw new NotImplementedException();
  64:     }
  65:  
  66:     public void EndCloseOutputSession(IAsyncResult result)
  67:     {
  68:         throw new NotImplementedException();
  69:     }
  70:  
  71:     public string Id
  72:     {
  73:         get
  74:         {
  75:             return _id;
  76:         }
  77:     }
  78: }

And then, let’s implement the sessionful channels. The first one is the request reply mode, which are request session channel and reply session channel.

The request session channel will be inherited from the existing MessageBusRequestChannel class, so that we can leverage the existing message operations. And it should implement the IRequestSessionChannel interface. The IRequestSessionChannel  interface need a property of IOutputSession which we had implemented before. So the MessageBusRequestSessionChannel would be like this.

   1: public class MessageBusRequestSessionChannel : MessageBusRequestChannel, IRequestSessionChannel
   2: {
   3:     private IOutputSession _session;
   4:  
   5:     public IOutputSession Session
   6:     {
   7:         get
   8:         {
   9:             return _session;
  10:         }
  11:     }
  12:  
  13:     public MessageBusRequestSessionChannel(
  14:         BufferManager bufferManager, MessageEncoderFactory encoder, ChannelManagerBase parent, 
  15:         EndpointAddress remoteAddress, Uri via,
  16:         IBus bus)
  17:         : base(bufferManager, encoder, parent, remoteAddress, via, bus)
  18:     {
  19:         _session = new MessageBusOutputSession((new UniqueId()).ToString());
  20:     }
  21: }

In our implementation before the client send the request message it will attach the current session ID from its IOutputSession object. So in the base class before the request message was sent we need to provide a chance to let the session ID to be set. Use a virtual method should be a quick way.

In the base class we added a virtual method that can update the session ID, if needed. And then it will send the request message with this session ID.

   1: public Message Request(Message message, TimeSpan timeout)
   2: {
   3:     ThrowIfDisposedOrNotOpen();
   4:     lock (_aLock)
   5:     {
   6:         // unbox the message into string that will be sent into the bus
   7:         var content = GetStringFromWcfMessage(message,_remoteAddress);
   8:         // apply the session from the sub class if needed
   9:         var sessionId = string.Empty;
  10:         OnBeforeRequest(ref sessionId);
  11:         // send the message into bus
  12:         var busMsgId = _bus.SendRequest(content, sessionId, true, null);
  13:         // waiting for the reply message arrive from the bus
  14:         var replyMsg = _bus.Receive(false, busMsgId);
  15:         if (string.IsNullOrWhiteSpace(replyMsg.Content))
  16:         {
  17:             // this means this is a one way channel acknowledge from server
  18:             // we just return null and do nothing
  19:             return null;
  20:         }
  21:         else
  22:         {
  23:             // box the message from the bus message content and return back
  24:             var reply = GetWcfMessageFromString(replyMsg.Content);
  25:             return reply;
  26:         }
  27:     }
  28: }
  29:  
  30: protected virtual void OnBeforeRequest(ref string sessionId)
  31: {
  32: }

Then in the sessionful output channel we can override the virtual method and set the session ID.

   1: protected override void OnBeforeRequest(ref string sessionId)
   2: {
   3:     sessionId = _session.Id;
   4: }

Similarly, on the server side we create the MessageBusReplySessionChannel class which is based on the MessageBusReplyChannel, and implement the interface IReplySessionChannel. On the server side, it needs to retrieve the session ID after it received a message from the bus. So that in the base class we need a virtual method as well. The base class MessageBusReplyChannel would be changed like this.

   1: public MessageBusReplyChannel(
   2:     BufferManager bufferManager, MessageEncoderFactory encoder, ChannelManagerBase parent,
   3:     EndpointAddress localAddress,
   4:     IBus bus)
   5:     : base(bufferManager, encoder, parent)
   6: {
   7:     _localAddress = localAddress;
   8:     _bus = bus;
   9:     _aLock = new object();
  10:  
  11:     _tryReceiveRequestDelegate = (TimeSpan t, out RequestContext rc) =>
  12:     {
  13:         rc = null;
  14:         // receive the request message from the bus
  15:         var busMsg = _bus.Receive(true, null);
  16:         // box the wcf message
  17:         var message = GetWcfMessageFromString(busMsg.Content);
  18:         // initialize the request context and return
  19:         rc = new MessageBusRequestContext(message, this, _localAddress, _bus, busMsg.MessageID);
  20:         OnAfterTryReceiveRequest(busMsg);
  21:         return true;
  22:     };
  23: }
  24:  
  25: protected virtual void OnAfterTryReceiveRequest(BusMessage message)
  26: {
  27: }

And the sessionful channel MessageBusReplySessionChannel  would be like this.

   1: public class MessageBusReplySessionChannel : MessageBusReplyChannel, IReplySessionChannel
   2: {
   3:     private IInputSession _session;
   4:  
   5:     public IInputSession Session
   6:     {
   7:         get
   8:         {
   9:             return _session;
  10:         }
  11:     }
  12:  
  13:     public MessageBusReplySessionChannel(
  14:         BufferManager bufferManager, MessageEncoderFactory encoderFactory, ChannelManagerBase parent,
  15:         EndpointAddress localAddress,
  16:         IBus bus)
  17:         : base(bufferManager, encoderFactory, parent, localAddress, bus)
  18:     {
  19:     }
  20:  
  21:     protected override void OnAfterTryReceiveRequest(BusMessage message)
  22:     {
  23:         _session = new MessageBusInputSession(message.SessionID);
  24:     }
  25: }

The sessionful reply channel need the IInputSession, instead of the IOutputChannel that we are using in the request part.

Last one, implement the related channel factory and channel listener, and modify the transport binding element to let it return the sessionful channels.

   1: public class MessageBusRequestSessionChannelFactory : MessageBusChannelFactoryBase<IRequestSessionChannel>
   2: {
   3:     public MessageBusRequestSessionChannelFactory(MessageBusTransportBindingElement transportElement, BindingContext context)
   4:         : base(transportElement, context)
   5:     {
   6:     }
   7:  
   8:     protected override IRequestSessionChannel CreateChannel(
   9:         BufferManager bufferManager, MessageEncoderFactory encoder, EndpointAddress remoteAddress,
  10:         MessageBusChannelFactoryBase<IRequestSessionChannel> parent, 
  11:         Uri via, 
  12:         IBus bus)
  13:     {
  14:         return new MessageBusRequestSessionChannel(bufferManager, encoder, parent, remoteAddress, via, bus);
  15:     }
  16: }
   1: public class MessageBusReplySessionChannelListener : MessageBusChannelListenerBase<IReplySessionChannel>
   2: {
   3:     public MessageBusReplySessionChannelListener(MessageBusTransportBindingElement transportElement, BindingContext context)
   4:         : base(transportElement, context)
   5:     {
   6:     }
   7:  
   8:     protected override IReplySessionChannel CreateChannel(
   9:         BufferManager bufferManager, MessageEncoderFactory encoder, EndpointAddress localAddress,
  10:         MessageBusChannelListenerBase<IReplySessionChannel> parent,
  11:         IBus bus)
  12:     {
  13:         return new MessageBusReplySessionChannel(bufferManager, encoder, parent, localAddress, bus);
  14:     }
  15: }
   1: public override bool CanBuildChannelFactory<TChannel>(BindingContext context)
   2: {
   3:     return typeof(TChannel) == typeof(IRequestChannel) ||
   4:            typeof(TChannel) == typeof(IOutputChannel) ||
   5:            typeof(TChannel) == typeof(IDuplexChannel) ||
   6:            typeof(TChannel) == typeof(IRequestSessionChannel);
   7: }
   8:  
   9: public override bool CanBuildChannelListener<TChannel>(BindingContext context)
  10: {
  11:     return typeof(TChannel) == typeof(IReplyChannel) ||
  12:            typeof(TChannel) == typeof(IInputChannel) ||
  13:            typeof(TChannel) == typeof(IDuplexChannel) ||
  14:            typeof(TChannel) == typeof(IReplySessionChannel);
  15: }
  16:  
  17: public override IChannelFactory<TChannel> BuildChannelFactory<TChannel>(BindingContext context)
  18: {
  19:     if (context == null)
  20:     {
  21:         throw new ArgumentNullException("context");
  22:     }
  23:     if (!CanBuildChannelFactory<TChannel>(context))
  24:     {
  25:         throw new ArgumentException(string.Format("Unsupported channel type: {0} for the channel factory.", typeof(TChannel).Name));
  26:     }
  27:  
  28:     if (typeof(TChannel) == typeof(IRequestChannel))
  29:     {
  30:         return (IChannelFactory<TChannel>)(object)new MessageBusRequestChannelFactory(this, context);
  31:     }
  32:     else if (typeof(TChannel) == typeof(IOutputChannel))
  33:     {
  34:         return (IChannelFactory<TChannel>)(object)new MessageBusOutputChannelFactory(this, context);
  35:     }
  36:     else if (typeof(TChannel) == typeof(IDuplexChannel))
  37:     {
  38:         return (IChannelFactory<TChannel>)(object)new MessageBusDuplexChannelFactory(this, context);
  39:     }
  40:     else if (typeof(TChannel) == typeof(IRequestSessionChannel))
  41:     {
  42:         return (IChannelFactory<TChannel>)(object)new MessageBusRequestSessionChannelFactory(this, context);
  43:     }
  44:     else
  45:     {
  46:         throw new ArgumentException(string.Format("Unsupported channel type: {0} for the channel listener.", typeof(TChannel).Name));
  47:     }
  48:  
  49: }
  50:  
  51: public override IChannelListener<TChannel> BuildChannelListener<TChannel>(BindingContext context)
  52: {
  53:     if (context == null)
  54:     {
  55:         throw new ArgumentNullException("context");
  56:     }
  57:     if (!CanBuildChannelListener<TChannel>(context))
  58:     {
  59:         throw new ArgumentException(string.Format("Unsupported channel type: {0} for the channel listener.", typeof(TChannel).Name));
  60:     }
  61:  
  62:     if (typeof(TChannel) == typeof(IReplyChannel))
  63:     {
  64:         return (IChannelListener<TChannel>)(object)new MessageBusReplyChannelListener(this, context);
  65:     }
  66:     else if (typeof(TChannel) == typeof(IInputChannel))
  67:     {
  68:         return (IChannelListener<TChannel>)(object)new MessageBusInputChannelListener(this, context);
  69:     }
  70:     else if (typeof(TChannel) == typeof(IDuplexChannel))
  71:     {
  72:         return (IChannelListener<TChannel>)(object)new MessageBusDuplexChannelListener(this, context);
  73:     }
  74:     else if (typeof(TChannel) == typeof(IReplySessionChannel))
  75:     {
  76:         return (IChannelListener<TChannel>)(object)new MessageBusReplySessionChannelListener(this, context);
  77:     }
  78:     else
  79:     {
  80:         throw new ArgumentException(string.Format("Unsupported channel type: {0} for the channel listener.", typeof(TChannel).Name));
  81:     }
  82: }

Now we can test our sessionful request reply channel. Since currently our transport extension supports multiple service instances running over the message bus, so there’s no restrict that we must specify the instance context mode to PerSession or Single for sessionful channel. But as the requests may be processed by any service instances we cannot use the local variant of the service class to store the session state values. What we should do now is to retrieve the current session ID and get/set the intermediate value from the session store, maybe it’s a distributed cache.

image

The image above demonstrates how the session ID will be used when using our new sessionful request reply mode.

  • When client channel was opened, a session ID will be generated on the client side. Let’s say the session ID is ABCDE.
  • Client invoked the service method Add with the value that it wanted to add. The request will be sent to the service with the session ID (ABCDE) attached.
  • Service received this message and take the session ID into its session channel’s session object, so that the service business logic can retrieve this value by using the OperationContext.SessionID.
  • Service business code utilize this session ID to find the value from the session store, add the value and set back to the session store.
  • Client sent next two request with the same session ID.
  • Client request to get the result with the session ID attached.
  • Service utilized the session ID to retrieve the value from the session store and reply back.

Below is an in process session store for test purpose.

   1: public class InProcSessionStore
   2: {
   3:     #region Singleton
   4:  
   5:     private static InProcSessionStore _instance;
   6:  
   7:     public static InProcSessionStore Current
   8:     {
   9:         get
  10:         {
  11:             return _instance;
  12:         }
  13:     }
  14:  
  15:     static InProcSessionStore()
  16:     {
  17:         _instance = new InProcSessionStore();
  18:     }
  19:  
  20:     private InProcSessionStore()
  21:     {
  22:         _dic = new ConcurrentDictionary<string, object>();
  23:     }
  24:  
  25:     #endregion
  26:  
  27:     private ConcurrentDictionary<string, object> _dic;
  28:  
  29:     public void Set(string key, object value)
  30:     {
  31:         _dic.AddOrUpdate(key, value, (k, v) => value);
  32:     }
  33:  
  34:     public T Get<T>(string key)
  35:     {
  36:         var value = default(T);
  37:         object result;
  38:         if (_dic.TryGetValue(key, out result) && result != null && result.GetType() == typeof(T))
  39:         {
  40:             value = (T)result;
  41:         }
  42:         return value;
  43:     }
  44: }

And below is the new service contract and implementation class. You can see I specified the instance context mode to PerCall. And I use the OperationContext.SessionID to get and set the intermediate value.

   1: [ServiceContract(Namespace = "http://wcf.shaunxu.me/", SessionMode= SessionMode.Required)]
   2: public interface ISampleService
   3: {
   4:     [OperationContract(IsOneWay = true)]
   5:     void Add(int value);
   6:  
   7:     [OperationContract(IsOneWay = false)]
   8:     int GetResult();
   9: }
  10:  
  11: [ServiceBehavior(InstanceContextMode = InstanceContextMode.PerCall)]
  12: public class SampleService : ISampleService
  13: {
  14:     public void Add(int value)
  15:     {
  16:         //_current += value;
  17:         var current = InProcSessionStore.Current.Get<int>(OperationContext.Current.SessionId);
  18:         current += value;
  19:         InProcSessionStore.Current.Set(OperationContext.Current.SessionId, current);
  20:         Console.WriteLine("[{0}] SampleService.Add({1}), Current = {2}, SessionID = {3}", this.GetHashCode(), value, current, OperationContext.Current.SessionId);
  21:     }
  22:  
  23:     public int GetResult()
  24:     {
  25:         var current = InProcSessionStore.Current.Get<int>(OperationContext.Current.SessionId);
  26:         Console.WriteLine("[{0}] SampleService.GetResult(), SessionID = {1}", this.GetHashCode(), OperationContext.Current.SessionId);
  27:         return current;
  28:     }
  29: }

In the main function I created two services and two clients, each of the client should have its own session ID.

   1: static void Main(string[] args)
   2: {
   3:     var bus = new InProcMessageBus();
   4:     var address = "net.bus://localhost/sample";
   5:  
   6:     // establish the services
   7:     var host1 = EstablishServiceHost<ISampleService, SampleService>(bus, address, SessionfulMode.Distributed);
   8:     var host2 = EstablishServiceHost<ISampleService, SampleService>(bus, address, SessionfulMode.Distributed);
   9:  
  10:     // establish the client
  11:     var client1 = EstablishClientProxy<ISampleService>(bus, address, SessionfulMode.Distributed);
  12:     var client2 = EstablishClientProxy<ISampleService>(bus, address, SessionfulMode.Distributed);
  13:     using (client1 as IDisposable)
  14:     using (client2 as IDisposable)
  15:     {
  16:         client1.Add(1);
  17:         client2.Add(4);
  18:         
  19:         client1.Add(3);
  20:         client2.Add(5);
  21:         
  22:         client1.Add(2);
  23:         client2.Add(6);
  24:  
  25:         var result1 = client1.GetResult();
  26:         var result2 = client2.GetResult();
  27:         Console.WriteLine("Client 1 Result: {0}", result1);
  28:         Console.WriteLine("Client 2 Result: {0}", result2);
  29:     }
  30:  
  31:     // close the service
  32:     host1.Close();
  33:     host2.Close();
  34:  
  35:     Console.ReadKey();
  36: }

From the following result we can see that based on the PerCall instance mode each client request will create a new service instance. But the session ID would be the same within the same client. So that we can use the session ID to get and set values in the session store.

image

 

Sessionful Channels: Datagram Mode and Duplex Mode

The sessionful datagram mode would be similar as the sessionful request reply mode, but it inherits from our original input and output channel. Since it is sessionful, we also need some extra works to make it fulfill the WCF session requirement.

At the beginning of this post I described that in WCF session, “Messages delivered during a session are processed in the order in which they are received.”. This means on the server side it cannot receive and process the second request until the first one had been done. This is not a problem when we implemented the sessionful request reply channel, since the request reply mode ensure that the request channel must be waiting for the reply message. So that it would be impossible that the next request was sent before the first reply comes.

But the datagram mode doesn’t have this restriction. The sender (output channel) will be returned without waiting for anything replied from the server. Now we need to add some more procedures to make sure that the sessionful datagram mode follow the session requirement, which means the output channel must be waiting for the input channel’s reply, or I should say, input channel’s acknowledge.

The sessionful output will inherit from our own MessageBusOutputChannel and implement the interface IOutputSessionChannel. In order to make it possible to rewrite the send method we need to mark the Send method as virtual in the base class. So that in the sessionful output channel we can override it. We append the message ID into the output message for receiving purpose and send the message as usual. And then we receive the acknowledge message. This will ensure that the next operation will not be fired in this client channel until the reply comes.

   1: public override void Send(Message message, TimeSpan timeout)
   2: {
   3:     // add the message id if not
   4:     var messageId = new System.Xml.UniqueId();
   5:     if (message.Headers.MessageId == null)
   6:     {
   7:         message.Headers.MessageId = messageId;
   8:     }
   9:     // send message with session id
  10:     var content = GetStringFromWcfMessage(message, RemoteAddress);
  11:     _bus.SendRequest(content, _session.Id, true, ChannelID, null);
  12:     // wait for the acknowledge message from the server side
  13:     _bus.Receive(false, messageId.ToString());
  14: }

On the server side in the input channel we also need to override the receive method, so that it can send the acknowledge message back.

   1: public override bool EndTryReceive(IAsyncResult result, out Message message)
   2: {
   3:     var ret = base.EndTryReceive(result, out message);
   4:     // unbox the message id and send the acknowledge message back to the client
   5:     var messageId = message.Headers.MessageId;
   6:     _bus.SendReply(string.Empty, _session.Id, false, messageId.ToString());
   7:     return ret;
   8: }

I don’t want to describe the other modification here for example the channel factory, listener and transport binding element. The full code can be found at the end of this post. Let’s quick jump to have a try of the sessionful datagram mode. The service contract and class would be like this.

   1: [ServiceContract(Namespace = "http://wcf.shaunxu.me/", SessionMode= SessionMode.Required)]
   2: public interface ISampleService
   3: {
   4:     [OperationContract(IsOneWay = true)]
   5:     void Add(int value);
   6:  
   7:     [OperationContract(IsOneWay = true)]
   8:     void GetResult(string id);
   9: }
  10:  
  11: [ServiceBehavior(InstanceContextMode = InstanceContextMode.PerCall)]
  12: public class SampleService : ISampleService
  13: {
  14:     public void Add(int value)
  15:     {
  16:         //_current += value;
  17:         var current = InProcSessionStore.Current.Get<int>(OperationContext.Current.SessionId);
  18:         current += value;
  19:         InProcSessionStore.Current.Set(OperationContext.Current.SessionId, current);
  20:         Console.WriteLine("[{0}] SampleService.Add({1}), Current = {2}, SessionID = {3}", this.GetHashCode(), value, current, OperationContext.Current.SessionId);
  21:     }
  22:  
  23:     public void GetResult(string id)
  24:     {
  25:         var current = InProcSessionStore.Current.Get<int>(OperationContext.Current.SessionId);
  26:         Console.WriteLine("[{0}] SampleService.GetResult(), SessionID = {1}", this.GetHashCode(), OperationContext.Current.SessionId);
  27:         InProcSessionStore.Current.Set(id, current);
  28:     }
  29: }

And the main function would be changed as well.

   1: static void Main(string[] args)
   2: {
   3:     var bus = new InProcMessageBus();
   4:     var address = "net.bus://localhost/sample";
   5:  
   6:     // establish the services
   7:     var host1 = EstablishServiceHost<ISampleService, SampleService>(bus, address, SessionfulMode.Distributed);
   8:     var host2 = EstablishServiceHost<ISampleService, SampleService>(bus, address, SessionfulMode.Distributed);
   9:  
  10:     // establish the client
  11:     var client1 = EstablishClientProxy<ISampleService>(bus, address, SessionfulMode.Distributed);
  12:     var client2 = EstablishClientProxy<ISampleService>(bus, address, SessionfulMode.Distributed);
  13:     using (client1 as IDisposable)
  14:     using (client2 as IDisposable)
  15:     {
  16:         client1.Add(1);
  17:         client2.Add(4);
  18:         
  19:         client1.Add(3);
  20:         client2.Add(5);
  21:         
  22:         client1.Add(2);
  23:         client2.Add(6);
  24:  
  25:         client1.GetResult("1");
  26:         client2.GetResult("2");
  27:         Console.WriteLine("Client 1 Result: {0}", InProcSessionStore.Current.Get<int>("1"));
  28:         Console.WriteLine("Client 2 Result: {0}", InProcSessionStore.Current.Get<int>("2"));
  29:     }
  30:  
  31:     // close the service
  32:     host1.Close();
  33:     host2.Close();
  34:  
  35:     Console.ReadKey();
  36: }

Since in the datagram mode I cannot return any value from the server side so in the GetResult method I copy the current value into the session store so that in the main function I can get the result from there. The execution result would be like this. As you can see the session ID was maintained during the whole message conversation.

image

The last one would be the duplex channel. There’s no special modification so the sessionful channel would be like this.

   1: public class MessageBusDuplexSessionChannel : MessageBusDuplexChannel, IDuplexSessionChannel
   2: {
   3:     private IDuplexSession _session;
   4:  
   5:     public IDuplexSession Session
   6:     {
   7:         get
   8:         {
   9:             return _session;
  10:         }
  11:     }
  12:  
  13:     public MessageBusDuplexSessionChannel(
  14:         BufferManager bufferManager, MessageEncoderFactory encoder, ChannelManagerBase parent,
  15:         EndpointAddress remoteAddress, Uri via,
  16:         IBus bus,
  17:         bool isClient)
  18:         : base(bufferManager, encoder, remoteAddress, parent, via, bus, isClient)
  19:     {
  20:         _session = new MessageBusDuplexSession((new UniqueId()).ToString());
  21:     }
  22:  
  23:     protected override void OnAfterTryReceive(BusMessage message)
  24:     {
  25:         _session = new MessageBusDuplexSession(message.SessionID);
  26:     }
  27:  
  28:     protected override void OnBeforeSend(ref string sessionId)
  29:     {
  30:         sessionId = _session.Id;
  31:     }
  32: }

 

Summary

In this post I described the basis of the WCF session and how different it is to the ASP.NET session. WCF session is more general than ASP.NET session. It doesn’t care about how session ID should be passed, it doesn’t care how we should handle the session state data. It only ensure that the messages within a session should have the same session ID, at least on one side of the communication.

It’s not mandatory that the session ID must be the same on both server side and client side. It only needs that in one side the session ID should be the same. Hence we can have our transport extension that in one session there’s a session ID on server side, while another ID on client side.

And then we discussed what WCF itself works for session and how it leverage the session mode, instance context mode to make the developer utilize the service class local variant to store the intermediate data. And we also discussed why this is not suitable for our scaling-out requirement.

Then we implemented our own sessionful channels. Developer can use the OperationContext.SessionID as the key to get and set the intermediate data from the session store, so that it could be scaling-out across the service instances.

Till now I can say that we have done all things for WCF transport extension. We have our own binding and transport binding element. We support all three WCF MEP and the additional sessionful MEPs on top our transport, with the scalability on both server and client instances. But I have to note that, the code I mentioned and attached at the end of each posts are just for research purpose. Do NOT use it directly in your production.

For now you will notice that all our testing are based on the in process message bus, which is cannot be used in the real production at all. In the next post I’m going to use some real message bus, by implementing their own IBus classes.

 

The source code can be download here.

 

Hope this helps,

Shaun

All documents and related graphics, codes are provided "AS IS" without warranty of any kind.
Copyright © Shaun Ziyan Xu. This work is licensed under the Creative Commons License.

 

In our last post I demonstrated how to implement the datagram channel shape, and in the second and third post I described the request reply shape. In this post I will explain the last MEP in WCF, duplex, which is the most complex one.

 

Basis of the Duplex Channel Shape (MEP)

In the MSDN document it said that “The duplex MEP allows an arbitrary number of messages to be sent by a client and received in any order. The duplex MEP is like a phone conversation, where each word being spoken is a message. Because both sides can send and receive in this MEP, the interface implemented by the client and service channels is IDuplexChannel.”.

It would be very easy to understand if we have one server instance and one client instance. You can assume that when they are using duplex mode, the client and the server can call each other freely. When the server calling the client, it will grab the client contract and invoke by using a proxy class, which is very similar as what we did to call a service from a client through the ChannelFactory<T>. In fact in duplex mode the client also play as a service role, which means when server invoked a client callback, the server will be a client and the client will be a server.

image

The above figure outlined how a duplex communication works. At the beginning the client send a request to the server. And during the server side business logic processed, it invoked the client side contract and got the result. These two client side invoking we normally call them “callback”. They can be invoked to the same client side method or different. And finally the server finished the business logic and send the reply back.

This figure only described the scenario that all communication are request-reply mode, which means client request and server callback are all need the reply. In duplex mode we can also using the datagram mode.The server can invoke a datagram callback to the client, while the client can invoke a datagram service method as well.

But the WCF will always use the duplex channel to send and receive message if we defined the callback contract in the service contract, even though there’s no callback invoked in the service implementation.

From the explanation above we might feel that the duplex channel may have a concept of the connection. When the client send the first request it somehow established a “connection” with the server. And the callback will use this “connection” to communicate with this client and send the request, reply back and forth. In the build-in WCF transports it does utilizes the connection to implement the duplex channel. For example, in NET.TCP the WCF use the TCP connection as the duplex connection. In WsDualHttp it would be a little bit complexity. Since the HTTP protocol is connection-less, it implements the duplex mode by introducing another HTTP address from the client to the server to handle the callback communication.

image

But in our case we only have a message bus which serves multiple service instances and clients. In order to make our duplex channel scalable as much as possible, but also need the ensure that once a client send the original request and received by a server instance, all messages must be between them, the server callback should not be handled by another client.

image

 

Duplex Channel

After clarified the duplex channel basis and the goal in our message bus and scaling-out case let’s have a look on the WCF duplex channel itself.

Different from the channel shapes we introduced before, the duplex channel will be created and used on both server and client side. If you are dig into the definition of the IDuplexChannel you will find that there’s no extra member for it but just inherited from the IInputChannel and IOutputChannel. When the duplex channel was established, no mater from the server side ChannelListener or the client side ChannelFactory, it will try to receive the message by its BeginTryReceive method. And no matter the client side request or callback reply, or the server side callback request or reply, the duplex channel will always use its Send method to send the message. So this makes our implementation a little bit complexity.

In fact you can have a duplex channel class for server while another for client. But I prefer to use one channel class to support both server and client side since the main procedure are all same.

On the server side, the channel needs to receive two kind of messages.

  • Original client request message. This kind of message can be received by any server instance that listening on the same endpoint (message bus queue).
  • Callback reply. The server can only receive the callback reply message which requested by itself.

It would be very easy to listen the first kind of message but the second one would be annoying. It’s similar as what we have done in request-reply mode. In request reply mode the client should only receive the reply which the request was sent by itself. But in the request-reply the client will send the request message and then wait for the reply. In that case it’s possible for us to know the request message ID, and use this ID to pick the related reply message. But in duplex mode, as I have said before, both on client and server side it will firstly try to receive message then send the message if needed. That means we must have some mechanism to let the channel know the identity of the request message BEFORE it was sent.

image

In duplex mode the restrict is that, a channel must receive the reply message which the original request was sent by itself. Channel cannot receive the reply which request by another channel. Hence we just need to find a way to identity if the message was related with the request that sent from the same channel. After figured out the key point the solution should be simple. If you remembered in the last post when I refactoring the channel base class there’s a property named ChannelID, which is a GUID assigned automatically when a channel was initialized. This is the identity we will use to check if the message should be received by this channel.

  • When a duplex channel was created and began to receive message, it should try to receive the message which is a request (client request or callback request), and the reply message which have the property saying that it’s only for this channel.
  • After the channel received the message it will unbox the SOAP message ID and the ChannelID where it came from and save into a dictionary.
  • When the channel sent a reply message, it will find the message ID from the reply message’s RelatedTo field and get the relevant ChannelID, and send the message with this ChannelID to indicate that only this channel can receive this message.
  • When the channel sent a callback request, it will find the original request message ID from the OperationContext.RequestContext.RequestMessge and get the relevant ChannelID, and send the message with this ID to indicate that only this client can receive the callback request and perform the client side logic.

image

The figure above demonstrated what will happened for a duplex communication with one callback request in our solution.

  • The client and server channel has their own channel ID.
  • Client and server start to receive the incoming messages. On the client side it only receive the message that send to its channel ID. On the server side it can receive messages from any channels as well as the message that only to it.
  • Client send the request message with the channel ID where it comes from, but no target channel ID specified. This message can be received by any service instance channels.
  • A service channel received this message and saved the message ID and the from channel ID.
  • Service channel start to try to receive message again.
  • After processed some business code it began to send the callback request message back to the client. From the OperationContext.RequestContext it retrieved the original request message ID and find the channel ID it came from, and append this channel ID into the callback request message. Then this message can only be received by this channel. It also append the channel ID it is.
  • Only this client received this callback request since the message have the “To ChannelID = 1”. It saved the message ID and the from channel ID.
  • Client channel start to try to receive message again.
  • After processed some business code it began to send the callback reply message back to the service. From the OperationContext.RequestContext it retrieved the original callback request message ID and find the channel ID it came from, and append this channel ID into the callback reply message. Then this message can only be received by this channel.
  • Only this service received this reply message since its channel ID equals the “To ChannelID” in the reply message.
  • Service channel start to try to receive message again.
  • After processed the business code it began to send the reply message back to the client. From the OperationContext.RequestContext it retrieved the original request message ID and find the channel ID it came from, and append this channel ID into the reply message. Then this message can only be received by this channel.
  • Only this client received this reply message since the message have the “To ChannelID = 1”.
  • Client channel start to try to receive message again.
  • Finished the duplex invoke.

 

Implement the Duplex Channel, Channel Factory and Channel Listener

After clarified the solution the implementation would be straight forward. Since I’m going to using one duplex channel class for both server and client, I need a local variant to indicate if it’s running on the server side or client side. I also need to save the server side address and a dictionary to save the relationship between the message ID and the channel ID.

Since our channel would be executed in multi-thread mode we should use the ConcurrentDictionary in .NET 4 System.Collections.Concurrent namespace to store the relationship of the channel ID and message ID.

And when implementing the receive message delegate we will specify the channel ID in the parameter, so that it will receive the message that has no “To Channel ID” specified, or specified only to this channel.

After it received a message we will check if it has the message ID field. If yes this means this message is a request message (client request or server callback request). Then we need to save the message ID and the “From Channel ID” into the local dictionary, so that when sending the reply message we can find which channel ID it should send to.

And also some logic of the local address and remote address properties etc. as well.

   1: private readonly IBus _bus;
   2: private readonly Uri _via;
   3: private readonly EndpointAddress _serverAddress;
   4: private readonly ConcurrentDictionary<UniqueId, string> _replyTos;
   5: private readonly bool _isClient;
   6:  
   7: private delegate bool TryReceiveDelegate(TimeSpan timeout, out Message message);
   8: private readonly TryReceiveDelegate _tryReceiveDelegate;
   9:  
  10: public MessageBusDuplexChannel(
  11:     BufferManager bufferManager, MessageEncoderFactory encoder, EndpointAddress remoteAddress,
  12:     ChannelManagerBase parent, Uri via,
  13:     IBus bus, bool isClient)
  14:     : base(bufferManager, encoder, parent)
  15: {
  16:     _serverAddress = remoteAddress;
  17:     _via = via;
  18:     _bus = bus;
  19:     _isClient = isClient;
  20:     _replyTos = new ConcurrentDictionary<UniqueId, string>();
  21:  
  22:     _tryReceiveDelegate = (TimeSpan timeout, out Message message) =>
  23:     {
  24:         message = null;
  25:         try
  26:         {
  27:             // listen the message bus based on the sticky mode: 
  28:             // channel: only receive the message that reply to this channel's id
  29:             // scaling gourp: receive the message the reply to this channel's id and the scaling group id of this channel
  30:             var requestMessage = _bus.Receive(!_isClient, ChannelID);
  31:             if (requestMessage != null)
  32:             {
  33:                 message = GetWcfMessageFromString(requestMessage.Content);
  34:                 if (message.Headers.MessageId != null)
  35:                 {
  36:                     _replyTos.AddOrUpdate(message.Headers.MessageId, requestMessage.From, (key, value) => requestMessage.From);
  37:                 }
  38:             }
  39:         }
  40:         catch (Exception ex)
  41:         {
  42:             throw new CommunicationException(ex.Message, ex);
  43:         }
  44:         return true;
  45:     };
  46: }
  47:  
  48: public EndpointAddress LocalAddress
  49: {
  50:     get 
  51:     {
  52:         if (_isClient)
  53:         {
  54:             return new EndpointAddress(EndpointAddress.AnonymousUri);
  55:         }
  56:         else
  57:         {
  58:             return _serverAddress;
  59:         }
  60:     }
  61: }
  62:  
  63: public EndpointAddress RemoteAddress
  64: {
  65:     get 
  66:     {
  67:         if (_isClient)
  68:         {
  69:             return _serverAddress;
  70:         }
  71:         else
  72:         {
  73:             return new EndpointAddress(EndpointAddress.AnonymousUri);
  74:         }
  75:     }
  76: }
  77:  
  78: public Uri Via
  79: {
  80:     get 
  81:     {
  82:         return _via;
  83:     }
  84: }
  85:  
  86: public IAsyncResult BeginTryReceive(TimeSpan timeout, AsyncCallback callback, object state)
  87: {
  88:     Message message;
  89:     return _tryReceiveDelegate.BeginInvoke(timeout, out message, callback, state);
  90: }
  91:  
  92: public bool EndTryReceive(IAsyncResult result, out Message message)
  93: {
  94:     var ret = _tryReceiveDelegate.EndInvoke(out message, result);
  95:     return ret;
  96: }

Next, let’s implement the send procedure. The send method will be used when send request message and reply message, which might be the client side request, server side callback request, client side callback reply and server side reply messages.

If the message has the RelatedTo field this means it’s a reply message, and the value of the RelatedTo is the message ID of the request. Then we will find the channel ID from the dictionary and appended to the message so that only this channel can receive this reply.

If there’s no RelatedTo field this means it’s a request message. On the client side we don’t need to do anything but send the message into the bus with the channel ID append, so that any servers can grab which channel it came. If on the server side, this means it’s a callback request which must be sent to the client channel that fired the original request. As we know we can get the original request from the OperationContext so we can find the original channel ID from the original request message ID from our dictionary.

   1: public void Send(Message message, TimeSpan timeout)
   2: {
   3:     if (message.Headers.RelatesTo != null)
   4:     {
   5:         // when relatesTo is not null it means this is a response message which must be send to the request channel
   6:         // and after sent out the original request had been finished so we don't need to store the original message any more
   7:         // hence we will remove and retrieve the original message id and append to the bus message and send out
   8:         var replyTo = string.Empty;
   9:         _replyTos.TryRemove(message.Headers.RelatesTo, out replyTo);
  10:         if (!string.IsNullOrWhiteSpace(replyTo))
  11:         {
  12:             var content = GetStringFromWcfMessage(message, RemoteAddress);
  13:             _bus.SendReply(content, _isClient, replyTo);
  14:         }
  15:         else
  16:         {
  17:             throw new CommunicationException(string.Format("Cannot find the ReplyTo valid for the message related to {0}.", message.Headers.RelatesTo));
  18:         }
  19:     }
  20:     else
  21:     {
  22:         // on the server side, when performing the callback request we will firstly retrieve the original request message id, 
  23:         // then find the related client channel id. so that we can send the callback request back to the same client channel.
  24:         var sendTo = string.Empty;
  25:         if (!_isClient &&
  26:             OperationContext.Current != null &&
  27:             OperationContext.Current.RequestContext != null &&
  28:             OperationContext.Current.RequestContext.RequestMessage != null &&
  29:             OperationContext.Current.RequestContext.RequestMessage.Headers.MessageId != null)
  30:         {
  31:             var requestMessageId = OperationContext.Current.RequestContext.RequestMessage.Headers.MessageId;
  32:             _replyTos.TryGetValue(requestMessageId, out sendTo);
  33:         }
  34:         var content = GetStringFromWcfMessage(message, RemoteAddress);
  35:         _bus.SendRequest(content, _isClient, ChannelID, sendTo);
  36:     }
  37: }

To finalize the implementation, just create the channel factory, channel listener and update the transport binding element to make it support duplex mode.

   1: public class MessageBusDuplexChannelFactory : MessageBusChannelFactoryBase<IDuplexChannel>
   2: {
   3:     public MessageBusDuplexChannelFactory(MessageBusTransportBindingElement transportElement, BindingContext context)
   4:         : base(transportElement, context)
   5:     {
   6:     }
   7:  
   8:     protected override IDuplexChannel CreateChannel(
   9:         BufferManager bufferManager, MessageEncoderFactory encoder, EndpointAddress remoteAddress,
  10:         MessageBusChannelFactoryBase<IDuplexChannel> parent,
  11:         Uri via,
  12:         IBus bus)
  13:     {
  14:         return new MessageBusDuplexChannel(bufferManager, encoder, remoteAddress, parent, via, bus, true);
  15:     }
  16: }
   1: public class MessageBusDuplexChannelListener : MessageBusChannelListenerBase<IDuplexChannel>
   2: {
   3:     public MessageBusDuplexChannelListener(MessageBusTransportBindingElement transportElement, BindingContext context)
   4:         : base(transportElement, context)
   5:     {
   6:     }
   7:  
   8:     protected override IDuplexChannel CreateChannel(
   9:         BufferManager bufferManager, MessageEncoderFactory encoder, EndpointAddress localAddress,
  10:         MessageBusChannelListenerBase<IDuplexChannel> parent, 
  11:         IBus bus)
  12:     {
  13:         return new MessageBusDuplexChannel(bufferManager, encoder, localAddress, parent, null, bus, false);
  14:     }
  15: }
   1: public override bool CanBuildChannelFactory<TChannel>(BindingContext context)
   2: {
   3:     return typeof(TChannel) == typeof(IRequestChannel) ||
   4:            typeof(TChannel) == typeof(IOutputChannel) ||
   5:            typeof(TChannel) == typeof(IDuplexChannel);
   6: }
   7:  
   8: public override bool CanBuildChannelListener<TChannel>(BindingContext context)
   9: {
  10:     return typeof(TChannel) == typeof(IReplyChannel) ||
  11:            typeof(TChannel) == typeof(IInputChannel) ||
  12:            typeof(TChannel) == typeof(IDuplexChannel);
  13: }
  14:  
  15: public override IChannelFactory<TChannel> BuildChannelFactory<TChannel>(BindingContext context)
  16: {
  17:     if (context == null)
  18:     {
  19:         throw new ArgumentNullException("context");
  20:     }
  21:     if (!CanBuildChannelFactory<TChannel>(context))
  22:     {
  23:         throw new ArgumentException(string.Format("Unsupported channel type: {0} for the channel factory.", typeof(TChannel).Name));
  24:     }
  25:  
  26:     if (typeof(TChannel) == typeof(IRequestChannel))
  27:     {
  28:         return (IChannelFactory<TChannel>)(object)new MessageBusRequestChannelFactory(this, context);
  29:     }
  30:     else if (typeof(TChannel) == typeof(IOutputChannel))
  31:     {
  32:         return (IChannelFactory<TChannel>)(object)new MessageBusOutputChannelFactory(this, context);
  33:     }
  34:     else if (typeof(TChannel) == typeof(IDuplexChannel))
  35:     {
  36:         return (IChannelFactory<TChannel>)(object)new MessageBusDuplexChannelFactory(this, context);
  37:     }
  38:     else
  39:     {
  40:         throw new ArgumentException(string.Format("Unsupported channel type: {0} for the channel listener.", typeof(TChannel).Name));
  41:     }
  42:  
  43: }
  44:  
  45: public override IChannelListener<TChannel> BuildChannelListener<TChannel>(BindingContext context)
  46: {
  47:     if (context == null)
  48:     {
  49:         throw new ArgumentNullException("context");
  50:     }
  51:     if (!CanBuildChannelListener<TChannel>(context))
  52:     {
  53:         throw new ArgumentException(string.Format("Unsupported channel type: {0} for the channel listener.", typeof(TChannel).Name));
  54:     }
  55:  
  56:     if (typeof(TChannel) == typeof(IReplyChannel))
  57:     {
  58:         return (IChannelListener<TChannel>)(object)new MessageBusReplyChannelListener(this, context);
  59:     }
  60:     else if (typeof(TChannel) == typeof(IInputChannel))
  61:     {
  62:         return (IChannelListener<TChannel>)(object)new MessageBusInputChannelListener(this, context);
  63:     }
  64:     else if (typeof(TChannel) == typeof(IDuplexChannel))
  65:     {
  66:         return (IChannelListener<TChannel>)(object)new MessageBusDuplexChannelListener(this, context);
  67:     }
  68:     else
  69:     {
  70:         throw new ArgumentException(string.Format("Unsupported channel type: {0} for the channel listener.", typeof(TChannel).Name));
  71:     }
  72: }

 

Test Our Duplex Channel

Update our test console application to verify the duplex channel works. First we need to define and implement a service which contains the duplex callback. The service contract and the client side callback contract would be like hits.

   1: [ServiceContract(Namespace = "http://wcf.shaunxu.me/", CallbackContract = typeof(ISampleCallback))]
   2: public interface ISampleService
   3: {
   4:     [OperationContract]
   5:     string Reverse(string content);
   6: }
   7:  
   8: [ServiceContract(Namespace = "http://wcf.shaunxu.me/")]
   9: public interface ISampleCallback
  10: {
  11:     [OperationContract]
  12:     string ToUpper(string content);
  13:  
  14:     [OperationContract]
  15:     string AddSpaces(string content);
  16: }

The service only has one method to reverse an input string. The client callback contract has two methods. One is to make the input string upper, the other is to add space between each of the chars of the string. When implementation, the service method will invoke these two callback methods one by one.

You will see that when the service and client method was invoked I printed the hash code of current instance and the result, to demonstrate the duplex calling flow.

   1: [ServiceBehavior(ConcurrencyMode = ConcurrencyMode.Multiple)]
   2: public class SampleService : ISampleService
   3: {
   4:     public string Reverse(string content)
   5:     {
   6:         var callback = OperationContext.Current.GetCallbackChannel<ISampleCallback>();
   7:  
   8:         var result1 = new string(content.Reverse().ToArray());
   9:         Console.WriteLine("Service {0}: Reverse {1} => {2}", OperationContext.Current.Host.GetHashCode(), content, result1);
  10:  
  11:         var result2 = callback.ToUpper(result1);
  12:         Console.WriteLine("Service {0}: Callback.ToUpper {1} => {2}", OperationContext.Current.Host.GetHashCode(), result1, result2);
  13:         var result3 = callback.AddSpaces(result2);
  14:         Console.WriteLine("Service {0}: Callback.AddSpaces {1} => {2}", OperationContext.Current.Host.GetHashCode(), result2, result3);
  15:  
  16:         return result3;
  17:     }
  18: }
  19:  
  20: public class SampleCallback : ISampleCallback
  21: {
  22:     public string ToUpper(string content)
  23:     {
  24:         var result = content.ToUpper();
  25:         Console.WriteLine("Client {0}: ToUpper {1} => {2}", this.GetHashCode(), content, result);
  26:         return result;
  27:     }
  28:  
  29:     public string AddSpaces(string content)
  30:     {
  31:         var result = string.Join(" ", content.Select(c => new string(c, 1)));
  32:         Console.WriteLine("Client {0}: AddSpaces {1} => {2}", this.GetHashCode(), content, result);
  33:         return result;
  34:     }
  35: }

Make sure on the service implementation class you added the ServiceBehavior attribute and set the concurrency mode to multiple or reentrant, otherwise the application will be failed when executing.

Add another helper method to make it easy to create a duplex channel factory and proxy.

   1: static TChannel EstablishDuplexClientProxy<TChannel, TCallback>(IBus bus, string address) where TCallback : new()
   2: {
   3:     var binding = new MessageBusTransportBinding(bus);
   4:     var callbackInstance = new InstanceContext(new TCallback());
   5:     var factory = new DuplexChannelFactory<TChannel>(callbackInstance, binding, address);
   6:     factory.Opened += (sender, e) =>
   7:         {
   8:             Console.WriteLine("Client connected to {0}", factory.Endpoint.ListenUri);
   9:         };
  10:     var proxy = factory.CreateChannel();
  11:     return proxy;
  12: }

Finally in the main function we will create some service instances and client proxies, then let the user select a client to send request to the services. In this scaling-out mode all services could be able to pick this request and process the service logic, but within its method all client callback should be received by this client.

   1: static void Main(string[] args)
   2: {
   3:     var bus = new InProcMessageBus();
   4:     var address = "net.bus://localhost/sample";
   5:  
   6:     // establish the services
   7:     var host1 = EstablishServiceHost<ISampleService, SampleService>(bus, address);
   8:     var host2 = EstablishServiceHost<ISampleService, SampleService>(bus, address);
   9:     var host3 = EstablishServiceHost<ISampleService, SampleService>(bus, address);
  10:  
  11:     // establish the clients
  12:     var clients = new List<ISampleService>()
  13:     {
  14:         EstablishDuplexClientProxy<ISampleService, SampleCallback>(bus, address),
  15:         EstablishDuplexClientProxy<ISampleService, SampleCallback>(bus, address),
  16:         EstablishDuplexClientProxy<ISampleService, SampleCallback>(bus, address)
  17:     };
  18:  
  19:     // invoke the service
  20:     Console.WriteLine("Which client do you want to use? (1|2|3, 0 to exit)");
  21:     var idx = int.Parse(Console.ReadLine()) - 1;
  22:     while (idx >= 0 && idx <= clients.Count - 1)
  23:     {
  24:         var proxy = clients[idx];
  25:         Console.WriteLine("Client ({0}): Say something...", proxy.GetHashCode());
  26:         var content = Console.ReadLine();
  27:         var result = proxy.Reverse(content);
  28:         Console.WriteLine("Client ({0}): {1} => {2}", proxy.GetHashCode(), content, result);
  29:  
  30:         Console.WriteLine("Which client do you want to use? (1|2|3, 0 to exit)");
  31:         idx = int.Parse(Console.ReadLine()) - 1;
  32:     }
  33:  
  34:     clients.All((cli) =>
  35:         {
  36:             (cli as ICommunicationObject).Close();
  37:             return true;
  38:         });
  39: }

Let’s start the application and have a try.

image

As you can see, at first the client sent the request to the service (66622070), and it fired the client callback twice, which all received to the same client callback instance (21647132). Then the second request was picked by another service (20876819) and the callbacks were all went to the same client (6451435).

 

Summary

The duplex mode would be the most complex one in three of the WCF MEPs. The duplex channel allows the service and the client to be invoked freely and unlimited once the channel had been established. In our case it becomes more complexity. If the think about the scaling-out mode the service might have more than one instances, so in fact we have a N:N channel shape.

In our solution we utilize the ChannelID as the identifier to ensure the two rules of duplex mode:

  • The reply message must be received by the channel who sent the related request message
  • The callback request must be received by the client who fired the original request message.

The first rule is mandatory, but the second one is optional. In fact it’s not necessary that only the client who fired the duplex request can receive the callback request from the server. All clients if they implemented the callback contract, should be able to handle the callback request. You can try to implement if you are interested.

Now we had finished all the WCF MEPs in our transport extension. Here I can say we have almost done everything. We can scaling-out our service instances on top of our message bus which supports the datagram, request-reply and duplex channel mode.

In the next post I would describe how to handle the session in our transport extension and to see how different between the WCF session and ASP.NET session.

 

You can download the source code here.

 

Hope this helps,

Shaun

All documents and related graphics, codes are provided "AS IS" without warranty of any kind.
Copyright © Shaun Ziyan Xu. This work is licensed under the Creative Commons License.

 

In the previous posts we talked about the transport extension that scaling-out our WCF services over a message bus transportation. What we have done is to use request reply MEP as a example, and implement the asynchronous methods that makes our transport support the normal WCF usage – ServiceHost and ChannelFactory on service hosting and client invoking.

Request reply MEP is the most common mode when we are using WCF, but there are two MEPs available in WCF as well: datagram and duplex. In this post I will demonstrate how to implement the datagram MEP in our solution, with some code refectoring.

 

Datagram MEP

Datagram MEP, also known as the one-way, request-forget or fire-forget MEP. Different from the request reply, in this mode the client will send the request to the server and no need to wait for the reply. This means, on one hand, the client will not be blocked by the server reply. On the other hand, the client will neither know if the server received the message, nor the service works properly. This MEP is widely used in scenario like logging, auditing, etc..

image

On the client side, the OutputChannel will send the request message to the underlying transportation, while on the server side, the InputChannel will receive them and dispatched by WCF to the service class and related method.

The OutputChannel just sends the message to the message bus, in our case, and will not wait for any response. It will return back immediately. And the InputChannel on the server will receive the message and process the business logic without sending back any reply.

Hence in order to implement the datagram MEP what we need to do is,

  • Client side, implement a ChannelFactory that returns an OutputChannel, and implement the OutputChannel which can send message to the bus.
  • Server side, implement a ChannelListener that returns an InputChannel, and implement the InputChannel which can receive message from the bus.

Since we had created a ChannelFactory and ChannelListener for the request-reply mode, and the channels works well with our message bus before, then we will do some code refactoring first.

 

ChannelBase, ChannelFactoryBase and ChannelListenerBase

We will have three base classes that covers some common procedures for channels, channel factories and channel listeners. The channel base class will just receive the buffer manager, encoder factory and provide them as a protected readonly properties. We will also have a property named ChannelID, which will be used in the duplex MEP in the next post.

In the request channel and reply channel we have a lot of code that convert the WCF message into string and vice versa, which using the buffer manager and encoder. Here we will create two methods for them so that in the future the channels can convert between string and message by just invoking them.

   1: public abstract class MessageBusChannelBase : ChannelBase
   2: {
   3:     private const int CST_MAXBUFFERSIZE = 64 * 1024;
   4:  
   5:     private readonly Guid _id;
   6:     private readonly BufferManager _bufferManager;
   7:     private readonly MessageEncoder _encoder;
   8:  
   9:     protected BufferManager BufferManager
  10:     {
  11:         get
  12:         {
  13:             return _bufferManager;
  14:         }
  15:     }
  16:  
  17:     protected MessageEncoder Encoder
  18:     {
  19:         get
  20:         {
  21:             return _encoder;
  22:         }
  23:     }
  24:  
  25:     public string ChannelID
  26:     {
  27:         get
  28:         {
  29:             return _id.ToString();
  30:         }
  31:     }
  32:  
  33:     protected MessageBusChannelBase(BufferManager bufferManager, MessageEncoderFactory encoder, ChannelManagerBase parent)
  34:         : base(parent)
  35:     {
  36:         _id = Guid.NewGuid();
  37:  
  38:         _bufferManager = bufferManager;
  39:         _encoder = encoder.CreateSessionEncoder();
  40:     }
  41:  
  42:     internal Message GetWcfMessageFromString(string content)
  43:     {
  44:         var raw = Encoding.UTF8.GetBytes(content);
  45:         var data = _bufferManager.TakeBuffer(raw.Length);
  46:         Buffer.BlockCopy(raw, 0, data, 0, raw.Length);
  47:         var buffer = new ArraySegment<byte>(data, 0, raw.Length);
  48:         var message = _encoder.ReadMessage(buffer, _bufferManager);
  49:         return message;
  50:     }
  51:  
  52:     internal string GetStringFromWcfMessage(Message message, EndpointAddress to)
  53:     {
  54:         ArraySegment<byte> buffer;
  55:         string content;
  56:         using (message)
  57:         {
  58:             to.ApplyTo(message);
  59:             buffer = _encoder.WriteMessage(message, CST_MAXBUFFERSIZE, _bufferManager);
  60:         }
  61:         content = Encoding.UTF8.GetString(buffer.Array, buffer.Offset, buffer.Count);
  62:         _bufferManager.ReturnBuffer(buffer.Array);
  63:         return content;
  64:     }
  65:  
  66:     protected override void OnAbort()
  67:     {
  68:     }
  69:  
  70:     protected override void OnClose(TimeSpan timeout)
  71:     {
  72:     }
  73:  
  74:     protected override void OnOpen(TimeSpan timeout)
  75:     {
  76:     }
  77:  
  78:     #region Not Implemented
  79:  
  80:     protected override IAsyncResult OnBeginClose(TimeSpan timeout, AsyncCallback callback, object state)
  81:     {
  82:         throw new NotImplementedException();
  83:     }
  84:  
  85:     protected override IAsyncResult OnBeginOpen(TimeSpan timeout, AsyncCallback callback, object state)
  86:     {
  87:         throw new NotImplementedException();
  88:     }
  89:  
  90:     protected override void OnEndClose(IAsyncResult result)
  91:     {
  92:         throw new NotImplementedException();
  93:     }
  94:  
  95:     protected override void OnEndOpen(IAsyncResult result)
  96:     {
  97:         throw new NotImplementedException();
  98:     }
  99:  
 100:     #endregion
 101: }

The main responsibility of the channel factory and channel listener is to create a related channel object. Each factories and listeners will has very similar logic, except which type of the channel it will create. So we can have two base classes to wrap almost all complexity and just let the sub classes to take the responsible to create the actual channel object.

   1: public abstract class MessageBusChannelFactoryBase<TChannel> : ChannelFactoryBase<TChannel> where TChannel : class, IChannel
   2: {
   3:     private readonly MessageBusTransportBindingElement _transportElement;
   4:     private readonly BufferManager _bufferManager;
   5:     private readonly MessageEncoderFactory _encoder;
   6:  
   7:     private readonly IBus _bus;
   8:  
   9:     public IBus Bus
  10:     {
  11:         get
  12:         {
  13:             return _bus;
  14:         }
  15:     }
  16:  
  17:     protected MessageBusTransportBindingElement TransportElement
  18:     {
  19:         get
  20:         {
  21:             return _transportElement;
  22:         }
  23:     }
  24:  
  25:     protected MessageBusChannelFactoryBase(MessageBusTransportBindingElement transportElement, BindingContext context)
  26:         : base(context.Binding)
  27:     {
  28:         _transportElement = transportElement;
  29:         _bufferManager = BufferManager.CreateBufferManager(transportElement.MaxBufferPoolSize, int.MaxValue);
  30:         var encodingElement = context.Binding.Elements.Find<MessageEncodingBindingElement>();
  31:         if (encodingElement == null)
  32:         {
  33:             _encoder = (new TextMessageEncodingBindingElement()).CreateMessageEncoderFactory();
  34:         }
  35:         else
  36:         {
  37:             _encoder = encodingElement.CreateMessageEncoderFactory();
  38:         }
  39:         _bus = transportElement.Bus;
  40:     }
  41:  
  42:     protected abstract TChannel CreateChannel(
  43:         BufferManager bufferManager, MessageEncoderFactory encoder, EndpointAddress remoteAddress,
  44:         MessageBusChannelFactoryBase<TChannel> parent,
  45:         Uri via,
  46:         IBus bus);
  47:  
  48:     protected override void OnOpen(TimeSpan timeout)
  49:     {
  50:     }
  51:  
  52:     protected override void OnClosed()
  53:     {
  54:         base.OnClosed();
  55:  
  56:         _bufferManager.Clear();
  57:         _bus.Dispose();
  58:     }
  59:  
  60:     protected override TChannel OnCreateChannel(System.ServiceModel.EndpointAddress address, Uri via)
  61:     {
  62:         return CreateChannel(_bufferManager, _encoder, address, this, via, _bus);
  63:     }
  64:  
  65:     protected override IAsyncResult OnBeginOpen(TimeSpan timeout, AsyncCallback callback, object state)
  66:     {
  67:         throw new NotImplementedException();
  68:     }
  69:  
  70:     protected override void OnEndOpen(IAsyncResult result)
  71:     {
  72:         throw new NotImplementedException();
  73:     }
  74: }
   1: public abstract class MessageBusChannelListenerBase<TChannel> : ChannelListenerBase<TChannel> where TChannel : class, IChannel
   2: {
   3:     private readonly MessageBusTransportBindingElement _transportElement;
   4:  
   5:     private readonly BufferManager _bufferManager;
   6:     private readonly MessageEncoderFactory _encoder;
   7:     private readonly string _scheme;
   8:     private readonly Uri _uri;
   9:  
  10:     private readonly IBus _bus;
  11:  
  12:     private readonly InputQueue<TChannel> _channelQueue;
  13:     private readonly object _currentChannelLock;
  14:  
  15:     private TChannel _currentChannel;
  16:  
  17:     public IBus Bus
  18:     {
  19:         get
  20:         {
  21:             return _bus;
  22:         }
  23:     }
  24:  
  25:     public override Uri Uri
  26:     {
  27:         get
  28:         {
  29:             return _uri;
  30:         }
  31:     }
  32:  
  33:     public string Scheme
  34:     {
  35:         get
  36:         {
  37:             return _scheme;
  38:         }
  39:     }
  40:  
  41:     protected MessageBusTransportBindingElement TransportElement
  42:     {
  43:         get
  44:         {
  45:             return _transportElement;
  46:         }
  47:     }
  48:  
  49:     protected MessageBusChannelListenerBase(MessageBusTransportBindingElement transportElement, BindingContext context)
  50:         : base(context.Binding)
  51:     {
  52:         _transportElement = transportElement;
  53:         _bufferManager = BufferManager.CreateBufferManager(transportElement.MaxBufferPoolSize, int.MaxValue);
  54:         var encodingElement = context.Binding.Elements.Find<MessageEncodingBindingElement>();
  55:         if (encodingElement == null)
  56:         {
  57:             _encoder = (new TextMessageEncodingBindingElement()).CreateMessageEncoderFactory();
  58:         }
  59:         else
  60:         {
  61:             _encoder = encodingElement.CreateMessageEncoderFactory();
  62:         }
  63:         _scheme = transportElement.Scheme;
  64:         _uri = new Uri(context.ListenUriBaseAddress, context.ListenUriRelativeAddress);
  65:  
  66:         _bus = transportElement.Bus;
  67:  
  68:         _channelQueue = new InputQueue<TChannel>();
  69:         _currentChannelLock = new object();
  70:         _currentChannel = null;
  71:     }
  72:  
  73:     protected override void OnAbort()
  74:     {
  75:         try
  76:         {
  77:             lock (ThisLock)
  78:             {
  79:                 _channelQueue.Close();
  80:             }
  81:         }
  82:         catch { }
  83:     }
  84:  
  85:     protected override void OnClose(TimeSpan timeout)
  86:     {
  87:         try
  88:         {
  89:             lock (ThisLock)
  90:             {
  91:                 _channelQueue.Close();
  92:             }
  93:         }
  94:         catch { }
  95:     }
  96:  
  97:     protected override void OnClosed()
  98:     {
  99:         base.OnClosed();
 100:  
 101:         try
 102:         {
 103:             _bufferManager.Clear();
 104:             _bus.Dispose();
 105:         }
 106:         catch { }
 107:     }
 108:  
 109:     private void EnsureChannelAvailable()
 110:     {
 111:         TChannel newChannel = null;
 112:         bool channelCreated = false;
 113:  
 114:         if ((newChannel = _currentChannel) == null)
 115:         {
 116:             lock (_currentChannelLock)
 117:             {
 118:                 if ((newChannel = _currentChannel) == null)
 119:                 {
 120:                     newChannel = CreateChannel(_bufferManager, _encoder, new EndpointAddress(_uri), this, _bus);
 121:                     newChannel.Closed += new EventHandler(OnChannelClosed);
 122:                     _currentChannel = newChannel;
 123:                     channelCreated = true;
 124:                 }
 125:             }
 126:         }
 127:  
 128:         if (channelCreated)
 129:         {
 130:             _channelQueue.EnqueueAndDispatch(newChannel);
 131:         }
 132:     }
 133:  
 134:     protected abstract TChannel CreateChannel(
 135:         BufferManager bufferManager, MessageEncoderFactory encoder, EndpointAddress localAddress,
 136:         MessageBusChannelListenerBase<TChannel> parent,
 137:         IBus bus);
 138:  
 139:     private void OnChannelClosed(object sender, EventArgs e)
 140:     {
 141:         var channel = sender as TChannel;
 142:         lock (_currentChannelLock)
 143:         {
 144:             if (channel == _currentChannel)
 145:             {
 146:                 _currentChannel = null;
 147:             }
 148:         }
 149:     }
 150:  
 151:     protected override TChannel OnAcceptChannel(TimeSpan timeout)
 152:     {
 153:         if (!IsDisposed)
 154:         {
 155:             EnsureChannelAvailable();
 156:         }
 157:  
 158:         TChannel channel = null;
 159:         if (_channelQueue.Dequeue(timeout, out channel))
 160:         {
 161:             return channel;
 162:         }
 163:         else
 164:         {
 165:             throw new TimeoutException();
 166:         }
 167:     }
 168:  
 169:     protected override IAsyncResult OnBeginAcceptChannel(TimeSpan timeout, AsyncCallback callback, object state)
 170:     {
 171:         if (!IsDisposed)
 172:         {
 173:             EnsureChannelAvailable();
 174:         }
 175:  
 176:         return _channelQueue.BeginDequeue(timeout, callback, state);
 177:     }
 178:  
 179:     protected override TChannel OnEndAcceptChannel(IAsyncResult result)
 180:     {
 181:         TChannel channel;
 182:         if (_channelQueue.EndDequeue(result, out channel))
 183:         {
 184:             return channel;
 185:         }
 186:         else
 187:         {
 188:             throw new TimeoutException();
 189:         }
 190:     }
 191:  
 192:     protected override void OnOpen(TimeSpan timeout)
 193:     {
 194:     }
 195:  
 196:     protected override IAsyncResult OnBeginWaitForChannel(TimeSpan timeout, AsyncCallback callback, object state)
 197:     {
 198:         throw new NotImplementedException();
 199:     }
 200:  
 201:     protected override bool OnEndWaitForChannel(IAsyncResult result)
 202:     {
 203:         throw new NotImplementedException();
 204:     }
 205:  
 206:     protected override bool OnWaitForChannel(TimeSpan timeout)
 207:     {
 208:         throw new NotImplementedException();
 209:     }
 210:  
 211:     protected override IAsyncResult OnBeginClose(TimeSpan timeout, AsyncCallback callback, object state)
 212:     {
 213:         throw new NotImplementedException();
 214:     }
 215:  
 216:     protected override IAsyncResult OnBeginOpen(TimeSpan timeout, AsyncCallback callback, object state)
 217:     {
 218:         throw new NotImplementedException();
 219:     }
 220:  
 221:     protected override void OnEndClose(IAsyncResult result)
 222:     {
 223:         throw new NotImplementedException();
 224:     }
 225:  
 226:     protected override void OnEndOpen(IAsyncResult result)
 227:     {
 228:         throw new NotImplementedException();
 229:     }
 230: }

As you can see, the ChannelFactoryBase and the ChannelListenerBase I just expose an abstract method which is CreateChannel, that gives a chance to the sub class to instant the actual channel object. After these modification our request and reply part (that had been done in the previous post) could be turned simpler and clearer.

The RequestChannelFactory will be like this.

   1: public class MessageBusRequestChannelFactory : MessageBusChannelFactoryBase<IRequestChannel>
   2: {
   3:     public MessageBusRequestChannelFactory(MessageBusTransportBindingElement transportElement, BindingContext context)
   4:         : base(transportElement, context)
   5:     {
   6:     }
   7:  
   8:     protected override IRequestChannel CreateChannel(
   9:         BufferManager bufferManager, MessageEncoderFactory encoder, EndpointAddress remoteAddress, 
  10:         MessageBusChannelFactoryBase<IRequestChannel> parent, 
  11:         Uri via, 
  12:         IBus bus)
  13:     {
  14:         return new MessageBusRequestChannel(bufferManager, encoder, parent, remoteAddress, via, bus);
  15:     }
  16: }

And this is the ReplyChannelListener.

   1: public class MessageBusReplyChannelListener : MessageBusChannelListenerBase<IReplyChannel>
   2: {
   3:     public MessageBusReplyChannelListener(MessageBusTransportBindingElement transportElement, BindingContext context)
   4:         : base(transportElement, context)
   5:     {
   6:     }
   7:  
   8:     protected override IReplyChannel CreateChannel(
   9:         BufferManager bufferManager, MessageEncoderFactory encoder, EndpointAddress localAddress, 
  10:         MessageBusChannelListenerBase<IReplyChannel> parent, 
  11:         IBus bus)
  12:     {
  13:         return new MessageBusReplyChannel(bufferManager, encoder, localAddress, parent, bus);
  14:     }
  15: }

And below is the RequestChannel and ReplyChannel. I inherited them from the MessageBusChannelBase class and utilized the helper methods to convert between string and message.

   1: public class MessageBusRequestChannel : MessageBusChannelBase, IRequestChannel
   2: {
   3:     private readonly IBus _bus;
   4:     private readonly Uri _via;
   5:     private readonly EndpointAddress _remoteAddress;
   6:     private readonly object _aLock;
   7:  
   8:     public MessageBusRequestChannel(
   9:         BufferManager bufferManager, MessageEncoderFactory encoder, ChannelManagerBase parent,
  10:         EndpointAddress remoteAddress, Uri via, IBus bus)
  11:         : base(bufferManager, encoder, parent)
  12:     {
  13:         _via = via;
  14:         _remoteAddress = remoteAddress;
  15:         _bus = bus;
  16:         _aLock = new object();
  17:     }
  18:  
  19:     public Uri Via
  20:     {
  21:         get
  22:         {
  23:             return _via;
  24:         }
  25:     }
  26:  
  27:     public IAsyncResult BeginRequest(Message message, TimeSpan timeout, AsyncCallback callback, object state)
  28:     {
  29:         throw new NotImplementedException();
  30:     }
  31:  
  32:     public IAsyncResult BeginRequest(Message message, AsyncCallback callback, object state)
  33:     {
  34:         throw new NotImplementedException();
  35:     }
  36:  
  37:     public Message EndRequest(IAsyncResult result)
  38:     {
  39:         throw new NotImplementedException();
  40:     }
  41:  
  42:     public System.ServiceModel.EndpointAddress RemoteAddress
  43:     {
  44:         get 
  45:         {
  46:             return _remoteAddress;
  47:         }
  48:     }
  49:  
  50:     public Message Request(Message message, TimeSpan timeout)
  51:     {
  52:         ThrowIfDisposedOrNotOpen();
  53:         lock (_aLock)
  54:         {
  55:             // unbox the message into string that will be sent into the bus
  56:             var content = GetStringFromWcfMessage(message,_remoteAddress);
  57:             // send the message into bus
  58:             var busMsgId = _bus.SendRequest(content, true, null);
  59:             // waiting for the reply message arrive from the bus
  60:             var replyMsg = _bus.Receive(false, busMsgId);
  61:             // box the message from the bus message content and return back
  62:             var reply = GetWcfMessageFromString(replyMsg.Content);
  63:             return reply;
  64:         }
  65:     }
  66:  
  67:     public Message Request(Message message)
  68:     {
  69:         return Request(message, DefaultSendTimeout);
  70:     }
  71: }
   1: public class MessageBusReplyChannel : MessageBusChannelBase, IReplyChannel
   2: {
   3:     private readonly EndpointAddress _localAddress;
   4:     private readonly object _aLock;
   5:  
   6:     private readonly IBus _bus;
   7:  
   8:     private delegate bool TryReceiveRequestDelegate(TimeSpan timeout, out RequestContext context);
   9:     private TryReceiveRequestDelegate _tryReceiveRequestDelegate;
  10:  
  11:     public MessageBusReplyChannel(
  12:         BufferManager bufferManager, MessageEncoderFactory encoder, EndpointAddress localAddress,
  13:         ChannelManagerBase parent,
  14:         IBus bus)
  15:         : base(bufferManager, encoder, parent)
  16:     {
  17:         _localAddress = localAddress;
  18:         _bus = bus;
  19:         _aLock = new object();
  20:  
  21:         _tryReceiveRequestDelegate = (TimeSpan t, out RequestContext rc) =>
  22:         {
  23:             rc = null;
  24:             // receive the request message from the bus
  25:             var busMsg = _bus.Receive(true, null);
  26:             // box the wcf message
  27:             var message = GetWcfMessageFromString(busMsg.Content);
  28:             // initialize the request context and return
  29:             rc = new MessageBusRequestContext(message, this, _localAddress, _bus, busMsg.MessageID);
  30:             return true;
  31:         };
  32:     }
  33:  
  34:     public System.ServiceModel.EndpointAddress LocalAddress
  35:     {
  36:         get
  37:         {
  38:             return _localAddress;
  39:         }
  40:     }
  41:  
  42:     public bool WaitForRequest(TimeSpan timeout)
  43:     {
  44:         return true;
  45:     }
  46:  
  47:     public RequestContext ReceiveRequest()
  48:     {
  49:         return ReceiveRequest(DefaultReceiveTimeout);
  50:     }
  51:  
  52:     public RequestContext ReceiveRequest(TimeSpan timeout)
  53:     {
  54:         ThrowIfDisposedOrNotOpen();
  55:         lock (_aLock)
  56:         {
  57:             // receive the request message from the bus
  58:             var busMsg = _bus.Receive(true, null);
  59:             // box the wcf message
  60:             var message = GetWcfMessageFromString(busMsg.Content);
  61:             // initialize the request context and return
  62:             return new MessageBusRequestContext(message, this, _localAddress, _bus, busMsg.MessageID);
  63:         }
  64:     }
  65:  
  66:     public IAsyncResult BeginTryReceiveRequest(TimeSpan timeout, AsyncCallback callback, object state)
  67:     {
  68:         RequestContext context;
  69:         return _tryReceiveRequestDelegate.BeginInvoke(timeout, out context, callback, state);