Revision f3d080df trunk/Pithos.Core/Agents/Agent.cs
b/trunk/Pithos.Core/Agents/Agent.cs | ||
---|---|---|
6 | 6 |
using System.Text; |
7 | 7 |
using System.Threading; |
8 | 8 |
using System.Threading.Tasks; |
9 |
using Pithos.Core.Agents; |
|
9 | 10 |
|
10 | 11 |
namespace Pithos.Core |
11 | 12 |
{ |
12 | 13 |
public class Agent<TMessage> : IDisposable |
13 |
{ |
|
14 |
private readonly BlockingCollection<TMessage> _messages = new BlockingCollection<TMessage>(); |
|
14 |
{ |
|
15 |
private readonly ConcurrentQueue<TMessage> _queue; |
|
16 |
private readonly BlockingCollection<TMessage> _messages; |
|
15 | 17 |
private readonly CancellationTokenSource _cancelSource = new CancellationTokenSource(); |
16 | 18 |
public CancellationToken CancellationToken; |
17 | 19 |
|
... | ... | |
20 | 22 |
|
21 | 23 |
public Agent(Action<Agent<TMessage>> action) |
22 | 24 |
{ |
25 |
_queue=new ConcurrentQueue<TMessage>(); |
|
26 |
_messages = new BlockingCollection<TMessage>(_queue); |
|
23 | 27 |
_process = action; |
24 | 28 |
CancellationToken = _cancelSource.Token; |
25 | 29 |
} |
... | ... | |
132 | 136 |
return _messages; |
133 | 137 |
} |
134 | 138 |
|
139 |
/// <summary> |
|
140 |
/// Remove the first message that matches the predicate |
|
141 |
/// </summary> |
|
142 |
/// <param name="predicate">The condition to match</param> |
|
143 |
/// <remarks>Removes the first message that matches the predicate by dequeing all |
|
144 |
/// messages and re-enqueing all except the first matching message</remarks> |
|
145 |
public void Remove(Func<TMessage,bool> predicate) |
|
146 |
{ |
|
147 |
//Can this work? Dequeue all items |
|
148 |
//and then enqueue everything except the filtered items |
|
149 |
|
|
150 |
_queue.RemoveFirst(predicate); |
|
151 |
} |
|
135 | 152 |
|
136 | 153 |
public Task LoopAsync(Task process, Action loop,Action<Exception> onError=null) |
137 | 154 |
{ |
Also available in: Unified diff