Merge branch 'master' of \\\pk2010\Pithos\
[pithos-ms-client] / trunk / Pithos.Core / Agents / Agent.cs
index 486d459..57b08a7 100644 (file)
@@ -6,12 +6,14 @@ using System.Linq;
 using System.Text;
 using System.Threading;
 using System.Threading.Tasks;
+using Pithos.Core.Agents;
 
 namespace Pithos.Core
 {
     public class Agent<TMessage> : IDisposable
-    {        
-        private readonly BlockingCollection<TMessage> _messages = new BlockingCollection<TMessage>();
+    {
+        private readonly ConcurrentQueue<TMessage> _queue;
+        private readonly BlockingCollection<TMessage> _messages;
         private readonly CancellationTokenSource _cancelSource = new CancellationTokenSource();
         public CancellationToken CancellationToken;
 
@@ -20,6 +22,8 @@ namespace Pithos.Core
 
         public Agent(Action<Agent<TMessage>> action)
         {
+            _queue=new ConcurrentQueue<TMessage>();
+            _messages = new BlockingCollection<TMessage>(_queue);
             _process = action;
             CancellationToken = _cancelSource.Token;
         }
@@ -132,6 +136,19 @@ namespace Pithos.Core
             return _messages;
         }
 
+        /// <summary>
+        /// Remove the first message that matches the predicate
+        /// </summary>
+        /// <param name="predicate">The condition to match</param>
+        /// <remarks>Removes the first message that matches the predicate by dequeing all 
+        /// messages and re-enqueing all except the first matching message</remarks>
+        public void Remove(Func<TMessage,bool> predicate)
+        {
+            //Can this work? Dequeue all items 
+            //and then enqueue everything except the filtered items
+
+            _queue.RemoveFirst(predicate);
+        }
 
         public Task LoopAsync(Task process, Action loop,Action<Exception> onError=null)
         {