using System.Text;
using System.Threading;
using System.Threading.Tasks;
+using Pithos.Core.Agents;
namespace Pithos.Core
{
public class Agent<TMessage> : IDisposable
- {
- private readonly BlockingCollection<TMessage> _messages = new BlockingCollection<TMessage>();
+ {
+ private readonly ConcurrentQueue<TMessage> _queue;
+ private readonly BlockingCollection<TMessage> _messages;
private readonly CancellationTokenSource _cancelSource = new CancellationTokenSource();
public CancellationToken CancellationToken;
public Agent(Action<Agent<TMessage>> action)
{
+ _queue=new ConcurrentQueue<TMessage>();
+ _messages = new BlockingCollection<TMessage>(_queue);
_process = action;
CancellationToken = _cancelSource.Token;
}
return _messages;
}
+ /// <summary>
+ /// Remove the first message that matches the predicate
+ /// </summary>
+ /// <param name="predicate">The condition to match</param>
+ /// <remarks>Removes the first message that matches the predicate by dequeing all
+ /// messages and re-enqueing all except the first matching message</remarks>
+ public void Remove(Func<TMessage,bool> predicate)
+ {
+ //Can this work? Dequeue all items
+ //and then enqueue everything except the filtered items
+
+ _queue.RemoveFirst(predicate);
+ }
public Task LoopAsync(Task process, Action loop,Action<Exception> onError=null)
{