1: // ItemDequeuedCallback is called as an item is dequeued from the InputQueue. The
2: // InputQueue lock is not held during the callback. However, the user code is
3: // not notified of the item being available until the callback returns. If you
4: // are not sure if the callback blocks for a long time, then first call
5: // IOThreadScheduler.ScheduleCallback to get to a "safe" thread.
6: delegate void ItemDequeuedCallback();
7:
8: /// <summary>
9: /// Handles asynchronous interactions between producers and consumers.
10: /// Producers can dispatch available data to the input queue,
11: /// where it is dispatched to a waiting consumer or stored until a
12: /// consumer becomes available. Consumers can synchronously or asynchronously
13: /// request data from the queue, which is returned when data becomes
14: /// available.
15: /// </summary>
16: /// <typeparam name="T">The concrete type of the consumer objects that are waiting for data.</typeparam>
17: internal class InputQueue<T> : IDisposable where T : class
18: {
19: //Stores items that are waiting to be accessed.
20: private ItemQueue itemQueue;
21:
22: //Each IQueueReader represents some consumer that is waiting for
23: //items to appear in the queue. The readerQueue stores them
24: //in an ordered list so consumers get serviced in a FIFO manner.
25: private Queue<IQueueReader> readerQueue;
26:
27: //Each IQueueWaiter represents some waiter that is waiting for
28: //items to appear in the queue. When any item appears, all
29: //waiters are signaled.
30: private List<IQueueWaiter> waiterList;
31:
32: private static WaitCallback onInvokeDequeuedCallback;
33: private static WaitCallback onDispatchCallback;
34: private static WaitCallback completeOutstandingReadersCallback;
35: private static WaitCallback completeWaitersFalseCallback;
36: private static WaitCallback completeWaitersTrueCallback;
37:
38: //Represents the current state of the InputQueue.
39: //as it transitions through its lifecycle.
40: QueueState queueState;
41: enum QueueState
42: {
43: Open,
44: Shutdown,
45: Closed
46: }
47:
48: public InputQueue()
49: {
50: this.itemQueue = new ItemQueue();
51: this.readerQueue = new Queue<IQueueReader>();
52: this.waiterList = new List<IQueueWaiter>();
53: this.queueState = QueueState.Open;
54: }
55:
56: public int PendingCount
57: {
58: get
59: {
60: lock (ThisLock)
61: {
62: return itemQueue.ItemCount;
63: }
64: }
65: }
66:
67: object ThisLock
68: {
69: get { return itemQueue; }
70: }
71:
72: public IAsyncResult BeginDequeue(TimeSpan timeout, AsyncCallback callback, object state)
73: {
74: Item item = default(Item);
75:
76: lock (ThisLock)
77: {
78: if (queueState == QueueState.Open)
79: {
80: if (itemQueue.HasAvailableItem)
81: {
82: item = itemQueue.DequeueAvailableItem();
83: }
84: else
85: {
86: AsyncQueueReader reader = new AsyncQueueReader(this, timeout, callback, state);
87: readerQueue.Enqueue(reader);
88: return reader;
89: }
90: }
91: else if (queueState == QueueState.Shutdown)
92: {
93: if (itemQueue.HasAvailableItem)
94: {
95: item = itemQueue.DequeueAvailableItem();
96: }
97: else if (itemQueue.HasAnyItem)
98: {
99: AsyncQueueReader reader = new AsyncQueueReader(this, timeout, callback, state);
100: readerQueue.Enqueue(reader);
101: return reader;
102: }
103: }
104: }
105:
106: InvokeDequeuedCallback(item.DequeuedCallback);
107: return new TypedCompletedAsyncResult<T>(item.GetValue(), callback, state);
108: }
109:
110: public IAsyncResult BeginWaitForItem(TimeSpan timeout, AsyncCallback callback, object state)
111: {
112: lock (ThisLock)
113: {
114: if (queueState == QueueState.Open)
115: {
116: if (!itemQueue.HasAvailableItem)
117: {
118: AsyncQueueWaiter waiter = new AsyncQueueWaiter(timeout, callback, state);
119: waiterList.Add(waiter);
120: return waiter;
121: }
122: }
123: else if (queueState == QueueState.Shutdown)
124: {
125: if (!itemQueue.HasAvailableItem && itemQueue.HasAnyItem)
126: {
127: AsyncQueueWaiter waiter = new AsyncQueueWaiter(timeout, callback, state);
128: waiterList.Add(waiter);
129: return waiter;
130: }
131: }
132: }
133:
134: return new TypedCompletedAsyncResult<bool>(true, callback, state);
135: }
136:
137: static void CompleteOutstandingReadersCallback(object state)
138: {
139: IQueueReader[] outstandingReaders = (IQueueReader[])state;
140:
141: for (int i = 0; i < outstandingReaders.Length; i++)
142: {
143: outstandingReaders[i].Set(default(Item));
144: }
145: }
146:
147: static void CompleteWaitersFalseCallback(object state)
148: {
149: CompleteWaiters(false, (IQueueWaiter[])state);
150: }
151:
152: static void CompleteWaitersTrueCallback(object state)
153: {
154: CompleteWaiters(true, (IQueueWaiter[])state);
155: }
156:
157: static void CompleteWaiters(bool itemAvailable, IQueueWaiter[] waiters)
158: {
159: for (int i = 0; i < waiters.Length; i++)
160: {
161: waiters[i].Set(itemAvailable);
162: }
163: }
164:
165: static void CompleteWaitersLater(bool itemAvailable, IQueueWaiter[] waiters)
166: {
167: if (itemAvailable)
168: {
169: if (completeWaitersTrueCallback == null)
170: completeWaitersTrueCallback = new WaitCallback(CompleteWaitersTrueCallback);
171:
172: ThreadPool.QueueUserWorkItem(completeWaitersTrueCallback, waiters);
173: }
174: else
175: {
176: if (completeWaitersFalseCallback == null)
177: completeWaitersFalseCallback = new WaitCallback(CompleteWaitersFalseCallback);
178:
179: ThreadPool.QueueUserWorkItem(completeWaitersFalseCallback, waiters);
180: }
181: }
182:
183: void GetWaiters(out IQueueWaiter[] waiters)
184: {
185: if (waiterList.Count > 0)
186: {
187: waiters = waiterList.ToArray();
188: waiterList.Clear();
189: }
190: else
191: {
192: waiters = null;
193: }
194: }
195:
196: public void Close()
197: {
198: ((IDisposable)this).Dispose();
199: }
200:
201: public void Shutdown()
202: {
203: IQueueReader[] outstandingReaders = null;
204: lock (ThisLock)
205: {
206: if (queueState == QueueState.Shutdown)
207: return;
208:
209: if (queueState == QueueState.Closed)
210: return;
211:
212: this.queueState = QueueState.Shutdown;
213:
214: if (readerQueue.Count > 0 && this.itemQueue.ItemCount == 0)
215: {
216: outstandingReaders = new IQueueReader[readerQueue.Count];
217: readerQueue.CopyTo(outstandingReaders, 0);
218: readerQueue.Clear();
219: }
220: }
221:
222: if (outstandingReaders != null)
223: {
224: for (int i = 0; i < outstandingReaders.Length; i++)
225: {
226: outstandingReaders[i].Set(new Item((Exception)null, null));
227: }
228: }
229: }
230:
231: public T Dequeue(TimeSpan timeout)
232: {
233: T value;
234:
235: if (!this.Dequeue(timeout, out value))
236: {
237: throw new TimeoutException(string.Format("Dequeue timed out in {0}.", timeout));
238: }
239:
240: return value;
241: }
242:
243: public bool Dequeue(TimeSpan timeout, out T value)
244: {
245: WaitQueueReader reader = null;
246: Item item = new Item();
247:
248: lock (ThisLock)
249: {
250: if (queueState == QueueState.Open)
251: {
252: if (itemQueue.HasAvailableItem)
253: {
254: item = itemQueue.DequeueAvailableItem();
255: }
256: else
257: {
258: reader = new WaitQueueReader(this);
259: readerQueue.Enqueue(reader);
260: }
261: }
262: else if (queueState == QueueState.Shutdown)
263: {
264: if (itemQueue.HasAvailableItem)
265: {
266: item = itemQueue.DequeueAvailableItem();
267: }
268: else if (itemQueue.HasAnyItem)
269: {
270: reader = new WaitQueueReader(this);
271: readerQueue.Enqueue(reader);
272: }
273: else
274: {
275: value = default(T);
276: return true;
277: }
278: }
279: else // queueState == QueueState.Closed
280: {
281: value = default(T);
282: return true;
283: }
284: }
285:
286: if (reader != null)
287: {
288: return reader.Wait(timeout, out value);
289: }
290: else
291: {
292: InvokeDequeuedCallback(item.DequeuedCallback);
293: value = item.GetValue();
294: return true;
295: }
296: }
297:
298: public void Dispose()
299: {
300: Dispose(true);
301:
302: GC.SuppressFinalize(this);
303: }
304:
305: protected void Dispose(bool disposing)
306: {
307: if (disposing)
308: {
309: bool dispose = false;
310:
311: lock (ThisLock)
312: {
313: if (queueState != QueueState.Closed)
314: {
315: queueState = QueueState.Closed;
316: dispose = true;
317: }
318: }
319:
320: if (dispose)
321: {
322: while (readerQueue.Count > 0)
323: {
324: IQueueReader reader = readerQueue.Dequeue();
325: reader.Set(default(Item));
326: }
327:
328: while (itemQueue.HasAnyItem)
329: {
330: Item item = itemQueue.DequeueAnyItem();
331: item.Dispose();
332: InvokeDequeuedCallback(item.DequeuedCallback);
333: }
334: }
335: }
336: }
337:
338: public void Dispatch()
339: {
340: IQueueReader reader = null;
341: Item item = new Item();
342: IQueueReader[] outstandingReaders = null;
343: IQueueWaiter[] waiters = null;
344: bool itemAvailable = true;
345:
346: lock (ThisLock)
347: {
348: itemAvailable = !((queueState == QueueState.Closed) || (queueState == QueueState.Shutdown));
349: this.GetWaiters(out waiters);
350:
351: if (queueState != QueueState.Closed)
352: {
353: itemQueue.MakePendingItemAvailable();
354:
355: if (readerQueue.Count > 0)
356: {
357: item = itemQueue.DequeueAvailableItem();
358: reader = readerQueue.Dequeue();
359:
360: if (queueState == QueueState.Shutdown && readerQueue.Count > 0 && itemQueue.ItemCount == 0)
361: {
362: outstandingReaders = new IQueueReader[readerQueue.Count];
363: readerQueue.CopyTo(outstandingReaders, 0);
364: readerQueue.Clear();
365:
366: itemAvailable = false;
367: }
368: }
369: }
370: }
371:
372: if (outstandingReaders != null)
373: {
374: if (completeOutstandingReadersCallback == null)
375: completeOutstandingReadersCallback = new WaitCallback(CompleteOutstandingReadersCallback);
376:
377: ThreadPool.QueueUserWorkItem(completeOutstandingReadersCallback, outstandingReaders);
378: }
379:
380: if (waiters != null)
381: {
382: CompleteWaitersLater(itemAvailable, waiters);
383: }
384:
385: if (reader != null)
386: {
387: InvokeDequeuedCallback(item.DequeuedCallback);
388: reader.Set(item);
389: }
390: }
391:
392: //Ends an asynchronous Dequeue operation.
393: public T EndDequeue(IAsyncResult result)
394: {
395: T value;
396:
397: if (!this.EndDequeue(result, out value))
398: {
399: throw new TimeoutException("Asynchronous Dequeue operation timed out.");
400: }
401:
402: return value;
403: }
404:
405: public bool EndDequeue(IAsyncResult result, out T value)
406: {
407: TypedCompletedAsyncResult<T> typedResult = result as TypedCompletedAsyncResult<T>;
408:
409: if (typedResult != null)
410: {
411: value = TypedCompletedAsyncResult<T>.End(result);
412: return true;
413: }
414:
415: return AsyncQueueReader.End(result, out value);
416: }
417:
418: public bool EndWaitForItem(IAsyncResult result)
419: {
420: TypedCompletedAsyncResult<bool> typedResult = result as TypedCompletedAsyncResult<bool>;
421: if (typedResult != null)
422: {
423: return TypedCompletedAsyncResult<bool>.End(result);
424: }
425:
426: return AsyncQueueWaiter.End(result);
427: }
428:
429: public void EnqueueAndDispatch(T item)
430: {
431: EnqueueAndDispatch(item, null);
432: }
433:
434: public void EnqueueAndDispatch(T item, ItemDequeuedCallback dequeuedCallback)
435: {
436: EnqueueAndDispatch(item, dequeuedCallback, true);
437: }
438:
439: public void EnqueueAndDispatch(Exception exception, ItemDequeuedCallback dequeuedCallback, bool canDispatchOnThisThread)
440: {
441: Debug.Assert(exception != null, "exception parameter should not be null");
442: EnqueueAndDispatch(new Item(exception, dequeuedCallback), canDispatchOnThisThread);
443: }
444:
445: public void EnqueueAndDispatch(T item, ItemDequeuedCallback dequeuedCallback, bool canDispatchOnThisThread)
446: {
447: Debug.Assert(item != null, "item parameter should not be null");
448: EnqueueAndDispatch(new Item(item, dequeuedCallback), canDispatchOnThisThread);
449: }
450:
451: void EnqueueAndDispatch(Item item, bool canDispatchOnThisThread)
452: {
453: bool disposeItem = false;
454: IQueueReader reader = null;
455: bool dispatchLater = false;
456: IQueueWaiter[] waiters = null;
457: bool itemAvailable = true;
458:
459: lock (ThisLock)
460: {
461: itemAvailable = !((queueState == QueueState.Closed) || (queueState == QueueState.Shutdown));
462: this.GetWaiters(out waiters);
463:
464: if (queueState == QueueState.Open)
465: {
466: if (canDispatchOnThisThread)
467: {
468: if (readerQueue.Count == 0)
469: {
470: itemQueue.EnqueueAvailableItem(item);
471: }
472: else
473: {
474: reader = readerQueue.Dequeue();
475: }
476: }
477: else
478: {
479: if (readerQueue.Count == 0)
480: {
481: itemQueue.EnqueueAvailableItem(item);
482: }
483: else
484: {
485: itemQueue.EnqueuePendingItem(item);
486: dispatchLater = true;
487: }
488: }
489: }
490: else // queueState == QueueState.Closed || queueState == QueueState.Shutdown
491: {
492: disposeItem = true;
493: }
494: }
495:
496: if (waiters != null)
497: {
498: if (canDispatchOnThisThread)
499: {
500: CompleteWaiters(itemAvailable, waiters);
501: }
502: else
503: {
504: CompleteWaitersLater(itemAvailable, waiters);
505: }
506: }
507:
508: if (reader != null)
509: {
510: InvokeDequeuedCallback(item.DequeuedCallback);
511: reader.Set(item);
512: }
513:
514: if (dispatchLater)
515: {
516: if (onDispatchCallback == null)
517: {
518: onDispatchCallback = new WaitCallback(OnDispatchCallback);
519: }
520:
521: ThreadPool.QueueUserWorkItem(onDispatchCallback, this);
522: }
523: else if (disposeItem)
524: {
525: InvokeDequeuedCallback(item.DequeuedCallback);
526: item.Dispose();
527: }
528: }
529:
530: public bool EnqueueWithoutDispatch(T item, ItemDequeuedCallback dequeuedCallback)
531: {
532: Debug.Assert(item != null, "EnqueueWithoutDispatch: item parameter should not be null");
533: return EnqueueWithoutDispatch(new Item(item, dequeuedCallback));
534: }
535:
536: public bool EnqueueWithoutDispatch(Exception exception, ItemDequeuedCallback dequeuedCallback)
537: {
538: Debug.Assert(exception != null, "EnqueueWithoutDispatch: exception parameter should not be null");
539: return EnqueueWithoutDispatch(new Item(exception, dequeuedCallback));
540: }
541:
542: // This does not block, however, Dispatch() must be called later if this function
543: // returns true.
544: bool EnqueueWithoutDispatch(Item item)
545: {
546: lock (ThisLock)
547: {
548: // Open
549: if (queueState != QueueState.Closed && queueState != QueueState.Shutdown)
550: {
551: if (readerQueue.Count == 0)
552: {
553: itemQueue.EnqueueAvailableItem(item);
554: return false;
555: }
556: else
557: {
558: itemQueue.EnqueuePendingItem(item);
559: return true;
560: }
561: }
562: }
563:
564: item.Dispose();
565: InvokeDequeuedCallbackLater(item.DequeuedCallback);
566: return false;
567: }
568:
569: static void OnDispatchCallback(object state)
570: {
571: ((InputQueue<T>)state).Dispatch();
572: }
573:
574: static void InvokeDequeuedCallbackLater(ItemDequeuedCallback dequeuedCallback)
575: {
576: if (dequeuedCallback != null)
577: {
578: if (onInvokeDequeuedCallback == null)
579: {
580: onInvokeDequeuedCallback = OnInvokeDequeuedCallback;
581: }
582:
583: ThreadPool.QueueUserWorkItem(onInvokeDequeuedCallback, dequeuedCallback);
584: }
585: }
586:
587: static void InvokeDequeuedCallback(ItemDequeuedCallback dequeuedCallback)
588: {
589: if (dequeuedCallback != null)
590: {
591: dequeuedCallback();
592: }
593: }
594:
595: static void OnInvokeDequeuedCallback(object state)
596: {
597: ItemDequeuedCallback dequeuedCallback = (ItemDequeuedCallback)state;
598: dequeuedCallback();
599: }
600:
601: bool RemoveReader(IQueueReader reader)
602: {
603: lock (ThisLock)
604: {
605: if (queueState == QueueState.Open || queueState == QueueState.Shutdown)
606: {
607: bool removed = false;
608:
609: for (int i = readerQueue.Count; i > 0; i--)
610: {
611: IQueueReader temp = readerQueue.Dequeue();
612: if (Object.ReferenceEquals(temp, reader))
613: {
614: removed = true;
615: }
616: else
617: {
618: readerQueue.Enqueue(temp);
619: }
620: }
621:
622: return removed;
623: }
624: }
625:
626: return false;
627: }
628:
629: public bool WaitForItem(TimeSpan timeout)
630: {
631: WaitQueueWaiter waiter = null;
632: bool itemAvailable = false;
633:
634: lock (ThisLock)
635: {
636: if (queueState == QueueState.Open)
637: {
638: if (itemQueue.HasAvailableItem)
639: {
640: itemAvailable = true;
641: }
642: else
643: {
644: waiter = new WaitQueueWaiter();
645: waiterList.Add(waiter);
646: }
647: }
648: else if (queueState == QueueState.Shutdown)
649: {
650: if (itemQueue.HasAvailableItem)
651: {
652: itemAvailable = true;
653: }
654: else if (itemQueue.HasAnyItem)
655: {
656: waiter = new WaitQueueWaiter();
657: waiterList.Add(waiter);
658: }
659: else
660: {
661: return false;
662: }
663: }
664: else // queueState == QueueState.Closed
665: {
666: return true;
667: }
668: }
669:
670: if (waiter != null)
671: {
672: return waiter.Wait(timeout);
673: }
674: else
675: {
676: return itemAvailable;
677: }
678: }
679:
680: interface IQueueReader
681: {
682: void Set(Item item);
683: }
684:
685: interface IQueueWaiter
686: {
687: void Set(bool itemAvailable);
688: }
689:
690: class WaitQueueReader : IQueueReader
691: {
692: Exception exception;
693: InputQueue<T> inputQueue;
694: T item;
695: ManualResetEvent waitEvent;
696: object thisLock = new object();
697:
698: public WaitQueueReader(InputQueue<T> inputQueue)
699: {
700: this.inputQueue = inputQueue;
701: waitEvent = new ManualResetEvent(false);
702: }
703:
704: object ThisLock
705: {
706: get
707: {
708: return this.thisLock;
709: }
710: }
711:
712: public void Set(Item item)
713: {
714: lock (ThisLock)
715: {
716: Debug.Assert(this.item == null, "InputQueue.WaitQueueReader.Set: (this.item == null)");
717: Debug.Assert(this.exception == null, "InputQueue.WaitQueueReader.Set: (this.exception == null)");
718:
719: this.exception = item.Exception;
720: this.item = item.Value;
721: waitEvent.Set();
722: }
723: }
724:
725: public bool Wait(TimeSpan timeout, out T value)
726: {
727: bool isSafeToClose = false;
728: try
729: {
730: if (timeout == TimeSpan.MaxValue)
731: {
732: waitEvent.WaitOne();
733: }
734: else if (!waitEvent.WaitOne(timeout, false))
735: {
736: if (this.inputQueue.RemoveReader(this))
737: {
738: value = default(T);
739: isSafeToClose = true;
740: return false;
741: }
742: else
743: {
744: waitEvent.WaitOne();
745: }
746: }
747:
748: isSafeToClose = true;
749: }
750: finally
751: {
752: if (isSafeToClose)
753: {
754: waitEvent.Close();
755: }
756: }
757:
758: value = item;
759: return true;
760: }
761: }
762:
763: class AsyncQueueReader : AsyncResult, IQueueReader
764: {
765: static TimerCallback timerCallback = new TimerCallback(AsyncQueueReader.TimerCallback);
766:
767: bool expired;
768: InputQueue<T> inputQueue;
769: T item;
770: Timer timer;
771:
772: public AsyncQueueReader(InputQueue<T> inputQueue, TimeSpan timeout, AsyncCallback callback, object state)
773: : base(callback, state)
774: {
775: this.inputQueue = inputQueue;
776: if (timeout != TimeSpan.MaxValue)
777: {
778: this.timer = new Timer(timerCallback, this, timeout, TimeSpan.FromMilliseconds(-1));
779: }
780: }
781:
782: public static bool End(IAsyncResult result, out T value)
783: {
784: AsyncQueueReader readerResult = AsyncResult.End<AsyncQueueReader>(result);
785:
786: if (readerResult.expired)
787: {
788: value = default(T);
789: return false;
790: }
791: else
792: {
793: value = readerResult.item;
794: return true;
795: }
796: }
797:
798: static void TimerCallback(object state)
799: {
800: AsyncQueueReader thisPtr = (AsyncQueueReader)state;
801: if (thisPtr.inputQueue.RemoveReader(thisPtr))
802: {
803: thisPtr.expired = true;
804: thisPtr.Complete(false);
805: }
806: }
807:
808: public void Set(Item item)
809: {
810: this.item = item.Value;
811: if (this.timer != null)
812: {
813: this.timer.Change(-1, -1);
814: }
815: Complete(false, item.Exception);
816: }
817: }
818:
819: struct Item
820: {
821: T value;
822: Exception exception;
823: ItemDequeuedCallback dequeuedCallback;
824:
825: public Item(T value, ItemDequeuedCallback dequeuedCallback)
826: : this(value, null, dequeuedCallback)
827: {
828: }
829:
830: public Item(Exception exception, ItemDequeuedCallback dequeuedCallback)
831: : this(null, exception, dequeuedCallback)
832: {
833: }
834:
835: Item(T value, Exception exception, ItemDequeuedCallback dequeuedCallback)
836: {
837: this.value = value;
838: this.exception = exception;
839: this.dequeuedCallback = dequeuedCallback;
840: }
841:
842: public Exception Exception
843: {
844: get { return this.exception; }
845: }
846:
847: public T Value
848: {
849: get { return value; }
850: }
851:
852: public ItemDequeuedCallback DequeuedCallback
853: {
854: get { return dequeuedCallback; }
855: }
856:
857: public void Dispose()
858: {
859: if (value != null)
860: {
861: if (value is IDisposable)
862: {
863: ((IDisposable)value).Dispose();
864: }
865: else if (value is ICommunicationObject)
866: {
867: ((ICommunicationObject)value).Abort();
868: }
869: }
870: }
871:
872: public T GetValue()
873: {
874: if (this.exception != null)
875: {
876: throw this.exception;
877: }
878:
879: return this.value;
880: }
881: }
882:
883: class WaitQueueWaiter : IQueueWaiter
884: {
885: bool itemAvailable;
886: ManualResetEvent waitEvent;
887: object thisLock = new object();
888:
889: public WaitQueueWaiter()
890: {
891: waitEvent = new ManualResetEvent(false);
892: }
893:
894: object ThisLock
895: {
896: get
897: {
898: return this.thisLock;
899: }
900: }
901:
902: public void Set(bool itemAvailable)
903: {
904: lock (ThisLock)
905: {
906: this.itemAvailable = itemAvailable;
907: waitEvent.Set();
908: }
909: }
910:
911: public bool Wait(TimeSpan timeout)
912: {
913: if (timeout == TimeSpan.MaxValue)
914: {
915: waitEvent.WaitOne();
916: }
917: else if (!waitEvent.WaitOne(timeout, false))
918: {
919: return false;
920: }
921:
922: return this.itemAvailable;
923: }
924: }
925:
926: class AsyncQueueWaiter : AsyncResult, IQueueWaiter
927: {
928: static TimerCallback timerCallback = new TimerCallback(AsyncQueueWaiter.TimerCallback);
929: Timer timer;
930: bool itemAvailable;
931: object thisLock = new object();
932:
933: public AsyncQueueWaiter(TimeSpan timeout, AsyncCallback callback, object state)
934: : base(callback, state)
935: {
936: if (timeout != TimeSpan.MaxValue)
937: {
938: this.timer = new Timer(timerCallback, this, timeout, TimeSpan.FromMilliseconds(-1));
939: }
940: }
941:
942: object ThisLock
943: {
944: get
945: {
946: return this.thisLock;
947: }
948: }
949:
950: public static bool End(IAsyncResult result)
951: {
952: AsyncQueueWaiter waiterResult = AsyncResult.End<AsyncQueueWaiter>(result);
953: return waiterResult.itemAvailable;
954: }
955:
956: static void TimerCallback(object state)
957: {
958: AsyncQueueWaiter thisPtr = (AsyncQueueWaiter)state;
959: thisPtr.Complete(false);
960: }
961:
962: public void Set(bool itemAvailable)
963: {
964: bool timely;
965:
966: lock (ThisLock)
967: {
968: timely = (this.timer == null) || this.timer.Change(-1, -1);
969: this.itemAvailable = itemAvailable;
970: }
971:
972: if (timely)
973: {
974: Complete(false);
975: }
976: }
977: }
978:
979: class ItemQueue
980: {
981: Item[] items;
982: int head;
983: int pendingCount;
984: int totalCount;
985:
986: public ItemQueue()
987: {
988: items = new Item[1];
989: }
990:
991: public Item DequeueAvailableItem()
992: {
993: if (totalCount == pendingCount)
994: {
995: Debug.Assert(false, "ItemQueue does not contain any available items");
996: throw new Exception("Internal Error");
997: }
998: return DequeueItemCore();
999: }
1000:
1001: public Item DequeueAnyItem()
1002: {
1003: if (pendingCount == totalCount)
1004: pendingCount--;
1005: return DequeueItemCore();
1006: }
1007:
1008: void EnqueueItemCore(Item item)
1009: {
1010: if (totalCount == items.Length)
1011: {
1012: Item[] newItems = new Item[items.Length * 2];
1013: for (int i = 0; i < totalCount; i++)
1014: newItems[i] = items[(head + i) % items.Length];
1015: head = 0;
1016: items = newItems;
1017: }
1018: int tail = (head + totalCount) % items.Length;
1019: items[tail] = item;
1020: totalCount++;
1021: }
1022:
1023: Item DequeueItemCore()
1024: {
1025: if (totalCount == 0)
1026: {
1027: Debug.Assert(false, "ItemQueue does not contain any items");
1028: throw new Exception("Internal Error");
1029: }
1030: Item item = items[head];
1031: items[head] = new Item();
1032: totalCount--;
1033: head = (head + 1) % items.Length;
1034: return item;
1035: }
1036:
1037: public void EnqueuePendingItem(Item item)
1038: {
1039: EnqueueItemCore(item);
1040: pendingCount++;
1041: }
1042:
1043: public void EnqueueAvailableItem(Item item)
1044: {
1045: EnqueueItemCore(item);
1046: }
1047:
1048: public void MakePendingItemAvailable()
1049: {
1050: if (pendingCount == 0)
1051: {
1052: Debug.Assert(false, "ItemQueue does not contain any pending items");
1053: throw new Exception("Internal Error");
1054: }
1055: pendingCount--;
1056: }
1057:
1058: public bool HasAvailableItem
1059: {
1060: get { return totalCount > pendingCount; }
1061: }
1062:
1063: public bool HasAnyItem
1064: {
1065: get { return totalCount > 0; }
1066: }
1067:
1068: public int ItemCount
1069: {
1070: get { return totalCount; }
1071: }
1072: }
1073: }