« Adding a Detail View | Main | Using the multimedia timer from c# »
Tuesday
Dec082009

Intraprocess Message Queue

Applications whose services are layered/modular are often composed of a set of collaborating tasks within a process. To simplify interfaces and design, minimize maintenance costs, and maximize reuse, these tasks should have the following properties.

  • separate task objecs should have minimal dependencies on each other's data and methods
  • the methods and data in a task should focus related set of functionality

To accomplish these properties, tasks often communicate by passing messages via generic method such as send() or put(), instead of calling methods directly. Messages can represent work requests, work results or other type of data to process.

When producer and consumer tasks are located in the same process, tasks often exchange messages using an intraprocess message queue. In this scenario the producer inserts messages into the message queue which is serviced by consumer tasks that remove and process the messages. In this article I develop an intraprocess message queue that facilitates this type of message passing communication between services. The sample application inserts messages at different intervals and the messages are processed on a different thread.

The ReactiveQueue class creates a thread for consumption of messages on the queue. This thread goes to sleep when the queue is empty. When a message is inserted using the Equeue method the consumer thread wakes up, removes and process all the messages from the queue. While the messages are being processed the producer is free to continue inserting messages. Once the consumer thread finishes processing messages it will check if the queue is empty, if the queue is empty it will go to sleep, if the queue is not empty it will acquire the synchronization lock, remove and process all messages. The code that illustrates this process in contained in the OnEnqueue method (Line# 113).

 

Here is the applications output. (Click on the image for large view) 

 

Download the code source code.

   1:  using System.Collections.Generic;
   2:  using System.Threading;
   3:  using System.Linq;
   4:  using System;
   5:   
   6:  namespace Bo.Utilities
   7:  {
   8:      /// <summary>
   9:      /// Delegate DequeueEventHandler.
  10:      /// </summary>
  11:      /// <typeparam name="T"></typeparam>
  12:      /// <param name="item"></param>
  13:      public delegate void DequeueEventHandler<T>(T item);
  14:   
  15:      /// <summary>
  16:      /// A thread safe queue.
  17:      /// </summary>
  18:      /// <typeparam name="T"></typeparam>
  19:      public class ReactiveQueue<T> : IDisposable
  20:      {
  21:          private Queue<T> _queue;
  22:          private bool _closed;
  23:   
  24:          /// <summary>
  25:          /// Gets or sets a value indicating whether this <see 
  26:          /// cref="ReactiveQueue&lt;T&gt;"/> is closed.
  27:          /// </summary>
  28:          /// <value><c>true</c> if closed; otherwise, <c>false</c>.</value>
  29:          public bool Closed
  30:          {
  31:              get
  32:              {
  33:                  lock (_closeLock)
  34:                  {
  35:                      return _closed;
  36:                  }
  37:              }
  38:              set
  39:              {
  40:                  lock (_closeLock)
  41:                  {
  42:                      _closed = value;
  43:                  }
  44:              }
  45:          }
  46:   
  47:          /// <summary>
  48:          /// Gets or sets the queue.
  49:          /// </summary>
  50:          /// <value>The queue.</value>
  51:          internal Queue<T> Queue
  52:          {
  53:              get { return _queue; }
  54:              set { _queue = value; }
  55:          }
  56:   
  57:          private  object _syncLock = new object();
  58:          private object _closeLock = new object();
  59:          /// <summary>
  60:          /// Gets or sets the sync lock.
  61:          /// </summary>
  62:          /// <value>The sync lock.</value>
  63:          internal object SyncLock
  64:          {
  65:              get { return _syncLock; }
  66:              set { _syncLock = value; }
  67:          }
  68:          
  69:          /// <summary>
  70:          /// Initializes a new instance of the ReactiveQueue class.
  71:          /// </summary>
  72:          public ReactiveQueue()
  73:          {
  74:              _queue = new Queue<T>();
  75:              ThreadPool.QueueUserWorkItem(OnEnqueue);           
  76:          }
  77:   
  78:          /// <summary>
  79:          /// Initializes a new instance of the ReactiveQueue class with the given 
  80:          /// capacity.
  81:          /// </summary>
  82:          /// <param name="capacity">The capacity of the queue.</param>
  83:          public ReactiveQueue(int capacity)
  84:              : this()
  85:          {
  86:              _queue = new Queue<T>(capacity);
  87:          }
  88:   
  89:          /// <summary>
  90:          /// Enqueues the specified item.
  91:          /// </summary>
  92:          /// <param name="item">The item.</param>
  93:          public void Enqueue(T item)
  94:          {
  95:              if (!Closed)
  96:              {
  97:                  lock (_syncLock)
  98:                  {
  99:                      _queue.Enqueue(item);
 100:                      Monitor.PulseAll(_syncLock);
 101:                  }
 102:              }
 103:          }
 104:   
 105:          /// <summary>
 106:          /// Occurs when [dequeue handler].
 107:          /// </summary>
 108:          public event DequeueEventHandler<T> DequeueHandler;
 109:   
 110:          /// <summary>
 111:          /// Occurs when Enqueue is called.
 112:          /// </summary>
 113:          protected void OnEnqueue(object obj)
 114:          {
 115:              Thread.CurrentThread.Name = "ReactiveQueue";
 116:              while (!Closed)
 117:              {
 118:                  List<T> items = null;
 119:                  lock (_syncLock)
 120:                  {
 121:                      if (_queue.Count == 0)
 122:                      {
 123:                          Monitor.Wait(_syncLock);
 124:                      }
 125:                      
 126:                      if (DequeueHandler != null && _queue.Count > 0)
 127:                      {
 128:                          items = _queue.ToList();
 129:                          _queue.Clear();
 130:                      }                    
 131:                  }
 132:   
 133:                  if (DequeueHandler != null && items != null && items.Count > 0)
 134:                  {
 135:                      items.ForEach(ActionHandler);                     
 136:                  }                
 137:              }
 138:          }
 139:   
 140:          private void ActionHandler(T item)
 141:          {
 142:              DequeueHandler(item);
 143:          }
 144:          /// <summary>
 145:          /// Closes this instance.
 146:          /// </summary>
 147:          public void Close()
 148:          {
 149:              Dispose();
 150:          }
 151:          
 152:          #region IDisposable Members
 153:   
 154:          /// <summary>
 155:          /// Releases unmanaged and - optionally - managed resources
 156:          /// </summary>
 157:          /// <param name="disposing"><c>true</c> to release both managed and 
 158:          /// unmanaged resources; <c>false</c> to release only unmanaged 
 159:          /// resources.</param>
 160:          private void Dispose(bool disposing)
 161:          {
 162:              if (!_disposed)
 163:              {
 164:                  if (disposing)
 165:                  {
 166:                      Closed = true;
 167:                      lock (_syncLock)
 168:                      {
 169:                          Monitor.PulseAll(_syncLock);
 170:                      }
 171:                  }
 172:              }
 173:              _disposed = true;
 174:          }
 175:   
 176:          private bool _disposed = false;
 177:          /// <summary>
 178:          /// Performs application-defined tasks associated with freeing, 
 179:          /// releasing, or resetting unmanaged resources.
 180:          /// </summary>
 181:          public void Dispose()
 182:          {
 183:              Dispose(true);
 184:              GC.SuppressFinalize(this);
 185:          }
 186:          
 187:          /// <summary>
 188:          /// Releases unmanaged resources and performs other cleanup operations 
 189:          /// before the
 190:          /// <see cref="SharedMemoryReceiverChannel"/> is reclaimed by garbage 
 191:          /// collection.
 192:          /// </summary>
 193:          ~ReactiveQueue()
 194:          {
 195:              Dispose(false);
 196:          }
 197:   
 198:          #endregion
 199:   
 200:   
 201:      }
 202:  }
 203:   

PrintView Printer Friendly Version

EmailEmail Article to Friend

Reader Comments (1)

Great !! Any Priority Queue about it , for intraprocess message ?

June 8, 2013 | Unregistered Commenterkiquenet

PostPost a New Comment

Enter your information below to add a new comment.
Author Email (optional):
Author URL (optional):
Post:
 
Some HTML allowed: <a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <code> <em> <i> <strike> <strong>