Revision f3d080df trunk/Pithos.Core/Agents/Agent.cs

b/trunk/Pithos.Core/Agents/Agent.cs
6 6
using System.Text;
7 7
using System.Threading;
8 8
using System.Threading.Tasks;
9
using Pithos.Core.Agents;
9 10

  
10 11
namespace Pithos.Core
11 12
{
12 13
    public class Agent<TMessage> : IDisposable
13
    {        
14
        private readonly BlockingCollection<TMessage> _messages = new BlockingCollection<TMessage>();
14
    {
15
        private readonly ConcurrentQueue<TMessage> _queue;
16
        private readonly BlockingCollection<TMessage> _messages;
15 17
        private readonly CancellationTokenSource _cancelSource = new CancellationTokenSource();
16 18
        public CancellationToken CancellationToken;
17 19

  
......
20 22

  
21 23
        public Agent(Action<Agent<TMessage>> action)
22 24
        {
25
            _queue=new ConcurrentQueue<TMessage>();
26
            _messages = new BlockingCollection<TMessage>(_queue);
23 27
            _process = action;
24 28
            CancellationToken = _cancelSource.Token;
25 29
        }
......
132 136
            return _messages;
133 137
        }
134 138

  
139
        /// <summary>
140
        /// Remove the first message that matches the predicate
141
        /// </summary>
142
        /// <param name="predicate">The condition to match</param>
143
        /// <remarks>Removes the first message that matches the predicate by dequeing all 
144
        /// messages and re-enqueing all except the first matching message</remarks>
145
        public void Remove(Func<TMessage,bool> predicate)
146
        {
147
            //Can this work? Dequeue all items 
148
            //and then enqueue everything except the filtered items
149

  
150
            _queue.RemoveFirst(predicate);
151
        }
135 152

  
136 153
        public Task LoopAsync(Task process, Action loop,Action<Exception> onError=null)
137 154
        {

Also available in: Unified diff