using System; using System.Collections.Concurrent; using System.Collections.Generic; using System.Diagnostics.Contracts; using System.Linq; using System.Text; using System.Threading; using System.Threading.Tasks; using Pithos.Core.Agents; namespace Pithos.Core { public class Agent : IDisposable { private readonly ConcurrentQueue _queue; private readonly BlockingCollection _messages; private readonly CancellationTokenSource _cancelSource = new CancellationTokenSource(); public CancellationToken CancellationToken; private readonly Action> _process; public Agent(Action> action) { _queue=new ConcurrentQueue(); _messages = new BlockingCollection(_queue); _process = action; CancellationToken = _cancelSource.Token; } public void Post(TMessage message) { _messages.Add(message); } /// /// Receives a message asynchronously, optionally with a timeout. Receive throws a TimeoutException if the timeout expires /// /// Optional timeout in milliseconds. If provided, Receive fails with a TimeoutException if no message is available in the specified time /// A Task that will return the message asynchronously public Task Receive(int timeout = -1) { return Task.Factory.StartNew(() => { TMessage item; if (!_messages.TryTake(out item, timeout, CancellationToken)) throw new TimeoutException(); return item; }); } /// /// Receives a message asynchronously, optionally with a timeout. TryReceive returns an empty task if the timeout expires /// /// Optional timeout in milliseconds. If provided, Receive returns an empty task /// A Task that will return the message asynchronously public Task TryReceive(int timeout = -1) { return Task.Factory.StartNew(() => { TMessage item; _messages.TryTake(out item, timeout, CancellationToken); return item; }); } /// /// Start the agent /// public void Start() { Task.Factory.StartNew(() => _process(this), CancellationToken); } /// /// Create and start a new agent for the specified type of message /// /// The message processing action /// A started Agent public static Agent Start(Action> action) { var agent = new Agent(action); agent.Start(); return agent; } /// /// Stops the agent /// public void Stop() { //Stop the message queue _messages.CompleteAdding(); //And signal the cancellation _cancelSource.Cancel(); } /// /// Execute an action asynchronously, using the agent's cancellation source /// /// The action to execute public void DoAsync(Action action) { Contract.Requires(action!=null); Task.Factory.StartNew(action, CancellationToken); } ~Agent() { Dispose(false); } public void Dispose() { Dispose(true); GC.SuppressFinalize(this); } protected void Dispose(bool disposing) { if (disposing) { Stop(); _messages.Dispose(); _cancelSource.Dispose(); } } public IEnumerable GetEnumerable() { return _messages; } /// /// Remove the first message that matches the predicate /// /// The condition to match /// Removes the first message that matches the predicate by dequeing all /// messages and re-enqueing all except the first matching message public void Remove(Func predicate) { //Can this work? Dequeue all items //and then enqueue everything except the filtered items _queue.RemoveFirst(predicate); } public Task LoopAsync(Task process, Action loop,Action onError=null) { Contract.Requires(process!=null); Contract.Requires(loop!=null); return process.ContinueWith(t => { Task.Factory.StartNew(loop, CancellationToken); if (t.IsFaulted) { var ex = t.Exception.InnerException; if (ex is OperationCanceledException) Stop(); if (onError != null) onError(ex); } },CancellationToken); } public Task LoopAsync(Task process, Action loop,Action onError=null) { return process.ContinueWith(t => { //Spawn the Loop immediatelly Task.Factory.StartNew(loop,CancellationToken); //Then process possible exceptions if (t.IsFaulted) { var ex = t.Exception.InnerException; if (ex is OperationCanceledException) Stop(); if (onError != null) onError(ex); } return default(T); },CancellationToken); } } }