1: public abstract class MessageBusChannelListenerBase<TChannel> : ChannelListenerBase<TChannel> where TChannel : class, IChannel
2: {
3: private readonly MessageBusTransportBindingElement _transportElement;
4:
5: private readonly BufferManager _bufferManager;
6: private readonly MessageEncoderFactory _encoder;
7: private readonly string _scheme;
8: private readonly Uri _uri;
9:
10: private readonly IBus _bus;
11:
12: private readonly InputQueue<TChannel> _channelQueue;
13: private readonly object _currentChannelLock;
14:
15: private TChannel _currentChannel;
16:
17: public IBus Bus
18: {
19: get
20: {
21: return _bus;
22: }
23: }
24:
25: public override Uri Uri
26: {
27: get
28: {
29: return _uri;
30: }
31: }
32:
33: public string Scheme
34: {
35: get
36: {
37: return _scheme;
38: }
39: }
40:
41: protected MessageBusTransportBindingElement TransportElement
42: {
43: get
44: {
45: return _transportElement;
46: }
47: }
48:
49: protected MessageBusChannelListenerBase(MessageBusTransportBindingElement transportElement, BindingContext context)
50: : base(context.Binding)
51: {
52: _transportElement = transportElement;
53: _bufferManager = BufferManager.CreateBufferManager(transportElement.MaxBufferPoolSize, int.MaxValue);
54: var encodingElement = context.Binding.Elements.Find<MessageEncodingBindingElement>();
55: if (encodingElement == null)
56: {
57: _encoder = (new TextMessageEncodingBindingElement()).CreateMessageEncoderFactory();
58: }
59: else
60: {
61: _encoder = encodingElement.CreateMessageEncoderFactory();
62: }
63: _scheme = transportElement.Scheme;
64: _uri = new Uri(context.ListenUriBaseAddress, context.ListenUriRelativeAddress);
65:
66: _bus = transportElement.Bus;
67:
68: _channelQueue = new InputQueue<TChannel>();
69: _currentChannelLock = new object();
70: _currentChannel = null;
71: }
72:
73: protected override void OnAbort()
74: {
75: try
76: {
77: lock (ThisLock)
78: {
79: _channelQueue.Close();
80: }
81: }
82: catch { }
83: }
84:
85: protected override void OnClose(TimeSpan timeout)
86: {
87: try
88: {
89: lock (ThisLock)
90: {
91: _channelQueue.Close();
92: }
93: }
94: catch { }
95: }
96:
97: protected override void OnClosed()
98: {
99: base.OnClosed();
100:
101: try
102: {
103: _bufferManager.Clear();
104: _bus.Dispose();
105: }
106: catch { }
107: }
108:
109: private void EnsureChannelAvailable()
110: {
111: TChannel newChannel = null;
112: bool channelCreated = false;
113:
114: if ((newChannel = _currentChannel) == null)
115: {
116: lock (_currentChannelLock)
117: {
118: if ((newChannel = _currentChannel) == null)
119: {
120: newChannel = CreateChannel(_bufferManager, _encoder, new EndpointAddress(_uri), this, _bus);
121: newChannel.Closed += new EventHandler(OnChannelClosed);
122: _currentChannel = newChannel;
123: channelCreated = true;
124: }
125: }
126: }
127:
128: if (channelCreated)
129: {
130: _channelQueue.EnqueueAndDispatch(newChannel);
131: }
132: }
133:
134: protected abstract TChannel CreateChannel(
135: BufferManager bufferManager, MessageEncoderFactory encoder, EndpointAddress localAddress,
136: MessageBusChannelListenerBase<TChannel> parent,
137: IBus bus);
138:
139: private void OnChannelClosed(object sender, EventArgs e)
140: {
141: var channel = sender as TChannel;
142: lock (_currentChannelLock)
143: {
144: if (channel == _currentChannel)
145: {
146: _currentChannel = null;
147: }
148: }
149: }
150:
151: protected override TChannel OnAcceptChannel(TimeSpan timeout)
152: {
153: if (!IsDisposed)
154: {
155: EnsureChannelAvailable();
156: }
157:
158: TChannel channel = null;
159: if (_channelQueue.Dequeue(timeout, out channel))
160: {
161: return channel;
162: }
163: else
164: {
165: throw new TimeoutException();
166: }
167: }
168:
169: protected override IAsyncResult OnBeginAcceptChannel(TimeSpan timeout, AsyncCallback callback, object state)
170: {
171: if (!IsDisposed)
172: {
173: EnsureChannelAvailable();
174: }
175:
176: return _channelQueue.BeginDequeue(timeout, callback, state);
177: }
178:
179: protected override TChannel OnEndAcceptChannel(IAsyncResult result)
180: {
181: TChannel channel;
182: if (_channelQueue.EndDequeue(result, out channel))
183: {
184: return channel;
185: }
186: else
187: {
188: throw new TimeoutException();
189: }
190: }
191:
192: protected override void OnOpen(TimeSpan timeout)
193: {
194: }
195:
196: protected override IAsyncResult OnBeginWaitForChannel(TimeSpan timeout, AsyncCallback callback, object state)
197: {
198: throw new NotImplementedException();
199: }
200:
201: protected override bool OnEndWaitForChannel(IAsyncResult result)
202: {
203: throw new NotImplementedException();
204: }
205:
206: protected override bool OnWaitForChannel(TimeSpan timeout)
207: {
208: throw new NotImplementedException();
209: }
210:
211: protected override IAsyncResult OnBeginClose(TimeSpan timeout, AsyncCallback callback, object state)
212: {
213: throw new NotImplementedException();
214: }
215:
216: protected override IAsyncResult OnBeginOpen(TimeSpan timeout, AsyncCallback callback, object state)
217: {
218: throw new NotImplementedException();
219: }
220:
221: protected override void OnEndClose(IAsyncResult result)
222: {
223: throw new NotImplementedException();
224: }
225:
226: protected override void OnEndOpen(IAsyncResult result)
227: {
228: throw new NotImplementedException();
229: }
230: }