Intraprocess Message Queue
Tuesday, December 8, 2009 at 9:14PM Applications whose services are layered/modular are often composed of a set of collaborating tasks within a process. To simplify interfaces and design, minimize maintenance costs, and maximize reuse, these tasks should have the following properties.
- separate task objecs should have minimal dependencies on each other's data and methods
- the methods and data in a task should focus related set of functionality
To accomplish these properties, tasks often communicate by passing messages via generic method such as send() or put(), instead of calling methods directly. Messages can represent work requests, work results or other type of data to process.
When producer and consumer tasks are located in the same process, tasks often exchange messages using an intraprocess message queue. In this scenario the producer inserts messages into the message queue which is serviced by consumer tasks that remove and process the messages. In this article I develop an intraprocess message queue that facilitates this type of message passing communication between services. The sample application inserts messages at different intervals and the messages are processed on a different thread.
The ReactiveQueue class creates a thread for consumption of messages on the queue. This thread goes to sleep when the queue is empty. When a message is inserted using the Equeue method the consumer thread wakes up, removes and process all the messages from the queue. While the messages are being processed the producer is free to continue inserting messages. Once the consumer thread finishes processing messages it will check if the queue is empty, if the queue is empty it will go to sleep, if the queue is not empty it will acquire the synchronization lock, remove and process all messages. The code that illustrates this process in contained in the OnEnqueue method (Line# 113).
Here is the applications output. (Click on the image for large view)
Download the code source code.
1: using System.Collections.Generic;
2: using System.Threading;
3: using System.Linq;
4: using System;
5:
6: namespace Bo.Utilities
7: {
8: /// <summary>
9: /// Delegate DequeueEventHandler.
10: /// </summary>
11: /// <typeparam name="T"></typeparam>
12: /// <param name="item"></param>
13: public delegate void DequeueEventHandler<T>(T item);
14:
15: /// <summary>
16: /// A thread safe queue.
17: /// </summary>
18: /// <typeparam name="T"></typeparam>
19: public class ReactiveQueue<T> : IDisposable
20: {
21: private Queue<T> _queue;
22: private bool _closed;
23:
24: /// <summary>
25: /// Gets or sets a value indicating whether this <see
26: /// cref="ReactiveQueue<T>"/> is closed.
27: /// </summary>
28: /// <value><c>true</c> if closed; otherwise, <c>false</c>.</value>
29: public bool Closed
30: {
31: get
32: {
33: lock (_closeLock)
34: {
35: return _closed;
36: }
37: }
38: set
39: {
40: lock (_closeLock)
41: {
42: _closed = value;
43: }
44: }
45: }
46:
47: /// <summary>
48: /// Gets or sets the queue.
49: /// </summary>
50: /// <value>The queue.</value>
51: internal Queue<T> Queue
52: {
53: get { return _queue; }
54: set { _queue = value; }
55: }
56:
57: private object _syncLock = new object();
58: private object _closeLock = new object();
59: /// <summary>
60: /// Gets or sets the sync lock.
61: /// </summary>
62: /// <value>The sync lock.</value>
63: internal object SyncLock
64: {
65: get { return _syncLock; }
66: set { _syncLock = value; }
67: }
68:
69: /// <summary>
70: /// Initializes a new instance of the ReactiveQueue class.
71: /// </summary>
72: public ReactiveQueue()
73: {
74: _queue = new Queue<T>();
75: ThreadPool.QueueUserWorkItem(OnEnqueue);
76: }
77:
78: /// <summary>
79: /// Initializes a new instance of the ReactiveQueue class with the given
80: /// capacity.
81: /// </summary>
82: /// <param name="capacity">The capacity of the queue.</param>
83: public ReactiveQueue(int capacity)
84: : this()
85: {
86: _queue = new Queue<T>(capacity);
87: }
88:
89: /// <summary>
90: /// Enqueues the specified item.
91: /// </summary>
92: /// <param name="item">The item.</param>
93: public void Enqueue(T item)
94: {
95: if (!Closed)
96: {
97: lock (_syncLock)
98: {
99: _queue.Enqueue(item);
100: Monitor.PulseAll(_syncLock);
101: }
102: }
103: }
104:
105: /// <summary>
106: /// Occurs when [dequeue handler].
107: /// </summary>
108: public event DequeueEventHandler<T> DequeueHandler;
109:
110: /// <summary>
111: /// Occurs when Enqueue is called.
112: /// </summary>
113: protected void OnEnqueue(object obj)
114: {
115: Thread.CurrentThread.Name = "ReactiveQueue";
116: while (!Closed)
117: {
118: List<T> items = null;
119: lock (_syncLock)
120: {
121: if (_queue.Count == 0)
122: {
123: Monitor.Wait(_syncLock);
124: }
125:
126: if (DequeueHandler != null && _queue.Count > 0)
127: {
128: items = _queue.ToList();
129: _queue.Clear();
130: }
131: }
132:
133: if (DequeueHandler != null && items != null && items.Count > 0)
134: {
135: items.ForEach(ActionHandler);
136: }
137: }
138: }
139:
140: private void ActionHandler(T item)
141: {
142: DequeueHandler(item);
143: }
144: /// <summary>
145: /// Closes this instance.
146: /// </summary>
147: public void Close()
148: {
149: Dispose();
150: }
151:
152: #region IDisposable Members
153:
154: /// <summary>
155: /// Releases unmanaged and - optionally - managed resources
156: /// </summary>
157: /// <param name="disposing"><c>true</c> to release both managed and
158: /// unmanaged resources; <c>false</c> to release only unmanaged
159: /// resources.</param>
160: private void Dispose(bool disposing)
161: {
162: if (!_disposed)
163: {
164: if (disposing)
165: {
166: Closed = true;
167: lock (_syncLock)
168: {
169: Monitor.PulseAll(_syncLock);
170: }
171: }
172: }
173: _disposed = true;
174: }
175:
176: private bool _disposed = false;
177: /// <summary>
178: /// Performs application-defined tasks associated with freeing,
179: /// releasing, or resetting unmanaged resources.
180: /// </summary>
181: public void Dispose()
182: {
183: Dispose(true);
184: GC.SuppressFinalize(this);
185: }
186:
187: /// <summary>
188: /// Releases unmanaged resources and performs other cleanup operations
189: /// before the
190: /// <see cref="SharedMemoryReceiverChannel"/> is reclaimed by garbage
191: /// collection.
192: /// </summary>
193: ~ReactiveQueue()
194: {
195: Dispose(false);
196: }
197:
198: #endregion
199:
200:
201: }
202: }
203:
.Net,
c#,
message queue,
thread in
messaging 
Reader Comments