#region * License * /* SimpleHelpers - TimedQueue Copyright © 2013 Khalid Salomão Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the “Software”), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. THE SOFTWARE IS PROVIDED “AS IS”, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. License: http://www.opensource.org/licenses/mit-license.php Website: https://github.com/khalidsalomao/SimpleHelpers.Net */ #endregion using System; using System.Collections.Generic; using System.Linq; namespace SimpleHelpers { /// /// TimedQueueManager is a helper class that stores instances of TimedQueue by a key. /// /// The type used on the TimedQueue. public class TimedQueueManager where T : class { private static System.Collections.Concurrent.ConcurrentDictionary> m_map = new System.Collections.Concurrent.ConcurrentDictionary> (StringComparer.Ordinal); /// /// Gets all registered queue names. /// public static IEnumerable Queues { get { return m_map.Keys; } } /// /// Configures a stored TimedQueue by its key. /// Note: if the queue does not exists, it will be created. /// /// The TimedQueue associated key. /// The interval between OnExecution calls by the internal timer thread. /// The OnExecution action fired for every timer step. /// public static TimedQueue Configure (string key, TimeSpan timerStep, Action> action) { TimedQueue q = Get(key); q.TimerStep = timerStep; q.OnExecution = action; return q; } /// /// Gets a stored TimedQueue by its key. /// Note: if the queue does not exists, it will be created. /// /// The TimedQueue associated key. public static TimedQueue Get (string key) { TimedQueue q; if (!m_map.TryGetValue (key, out q)) { q = new TimedQueue (); m_map[key] = q; } return q; } /// /// Removes the specified TimedQueue by key. /// Note: the TimedQueue will safely be removed and disposed. /// If the queue was already removed, this will be a NOP. /// /// The TimedQueue associated key. public static void Remove (string key) { TimedQueue q; if (!m_map.TryRemove (key, out q)) { q.Dispose (); } } /// /// Removes and dispose of all TimedQueues. /// Note: the TimedQueue will safely be removed and disposed. /// public static void Clear () { foreach (var q in m_map.ToList ()) Remove (q.Key); } } /// /// Simple lightweight queue that stores data in a concurrent queue and periodically process the queued items. /// Userful for: /// * processing items in batches; /// * grouping data for later processing; /// * async processing (consumer/producer); /// * etc. /// Note: this nuget package contains C# source code and depends on System.Collections.Concurrent introduced in .Net 4.0. /// public class TimedQueue : IDisposable where T : class { private TimeSpan m_timerStep = TimeSpan.FromMilliseconds (1000); private System.Collections.Concurrent.ConcurrentQueue m_queue = new System.Collections.Concurrent.ConcurrentQueue (); /// /// Interval duration between OnExecution calls by the internal timer thread. /// Default value is 1000 milliseconds. /// public TimeSpan TimerStep { get { return m_timerStep; } set { if (!m_timerStep.Equals (value)) { m_timerStep = value; StopMaintenance (); StartMaintenance (); } } } /// /// Gets the number of queued items. /// public int Count { get { return m_queue.Count; } } /// /// Event fired for every timer step. /// Note: the IEnumerable must be consumed to clear the queued items. /// public Action> OnExecution { get; set; } /// /// Initializes a new instance of the class. /// public TimedQueue () { } /// /// Initializes a new instance of the class. /// /// The interval between OnExecution calls by the internal timer thread. /// The OnExecution action fired for every timer step. public TimedQueue (TimeSpan timerStep, Action> action) { TimerStep = timerStep; OnExecution = action; } /// /// Puts the specified data in the timed queue for processing. /// public void Put (T data) { if (data == null) return; m_queue.Enqueue (data); StartMaintenance (); } /// /// Clears this instance. /// public void Clear () { System.Threading.Interlocked.Exchange (ref m_queue, new System.Collections.Concurrent.ConcurrentQueue ()); } /// /// Flushes the current queue by firing the event OnExecute. /// public void Flush () { StopMaintenance (); ExecuteMaintenance (null); } /// /// Flushes the current enqueued events. /// public void Dispose () { Flush (); StopMaintenance (); } #region * Scheduled Task * private System.Threading.Timer m_scheduledTask = null; private readonly object m_lock = new object (); private int m_executing = 0; private int m_idleCounter = 0; private void StartMaintenance () { if (m_scheduledTask == null) { lock (m_lock) { if (m_scheduledTask == null) { m_scheduledTask = new System.Threading.Timer (ExecuteMaintenance, null, m_timerStep, m_timerStep); } } } } private void StopMaintenance () { lock (m_lock) { if (m_scheduledTask != null) m_scheduledTask.Dispose (); m_scheduledTask = null; } } private void ExecuteMaintenance (object state) { // check if a step is executing if (System.Threading.Interlocked.CompareExchange (ref m_executing, 1, 0) != 0) return; // try to fire OnExecute event try { // check for idle queue if (m_queue.Count == 0) { // after 3 loops with empty queue, stop timer if (m_idleCounter++ > 2) StopMaintenance (); } else { // fire event if (OnExecution != null) { // clear idle queue marker m_idleCounter = 0; // execute event handler OnExecution (TakeQueuedItems ()); } else { // simply stop the queue if there is no event listening StopMaintenance (); } } } finally { System.Threading.Interlocked.Exchange (ref m_executing, 0); } } private IEnumerable TakeQueuedItems () { T obj; while (m_queue.TryDequeue (out obj)) yield return obj; } #endregion } }