2 using System.Collections.Concurrent;
3 using System.Collections.Generic;
4 using System.Diagnostics.Contracts;
7 using System.Threading;
8 using System.Threading.Tasks;
9 using Pithos.Core.Agents;
13 public class Agent<TMessage> : IDisposable
15 private readonly ConcurrentQueue<TMessage> _queue;
16 private readonly BlockingCollection<TMessage> _messages;
17 private readonly CancellationTokenSource _cancelSource = new CancellationTokenSource();
18 public CancellationToken CancellationToken;
20 private readonly Action<Agent<TMessage>> _process;
23 public Agent(Action<Agent<TMessage>> action)
25 _queue=new ConcurrentQueue<TMessage>();
26 _messages = new BlockingCollection<TMessage>(_queue);
28 CancellationToken = _cancelSource.Token;
31 public void Post(TMessage message)
33 _messages.Add(message);
37 /// Receives a message asynchronously, optionally with a timeout. Receive throws a TimeoutException if the timeout expires
39 /// <param name="timeout">Optional timeout in milliseconds. If provided, Receive fails with a TimeoutException if no message is available in the specified time</param>
40 /// <returns>A Task that will return the message asynchronously</returns>
41 public Task<TMessage> Receive(int timeout = -1)
43 return Task<TMessage>.Factory.StartNew(() =>
46 if (!_messages.TryTake(out item, timeout, CancellationToken))
47 throw new TimeoutException();
54 /// Receives a message asynchronously, optionally with a timeout. TryReceive returns an empty task if the timeout expires
56 /// <param name="timeout">Optional timeout in milliseconds. If provided, Receive returns an empty task</param>
57 /// <returns>A Task that will return the message asynchronously</returns>
58 public Task<TMessage> TryReceive(int timeout = -1)
60 return Task<TMessage>.Factory.StartNew(() =>
63 _messages.TryTake(out item, timeout, CancellationToken);
75 Task.Factory.StartNew(() => _process(this), CancellationToken);
80 /// Create and start a new agent for the specified type of message
82 /// <param name="action">The message processing action</param>
83 /// <returns>A started Agent</returns>
84 public static Agent<TMessage> Start(Action<Agent<TMessage>> action)
86 var agent = new Agent<TMessage>(action);
96 //Stop the message queue
97 _messages.CompleteAdding();
98 //And signal the cancellation
99 _cancelSource.Cancel();
103 /// Execute an action asynchronously, using the agent's cancellation source
105 /// <param name="action">The action to execute</param>
106 public void DoAsync(Action action)
108 Contract.Requires(action!=null);
109 Task.Factory.StartNew(action, CancellationToken);
118 public void Dispose()
121 GC.SuppressFinalize(this);
124 protected void Dispose(bool disposing)
130 _cancelSource.Dispose();
134 public IEnumerable<TMessage> GetEnumerable()
140 /// Remove the first message that matches the predicate
142 /// <param name="predicate">The condition to match</param>
143 /// <remarks>Removes the first message that matches the predicate by dequeing all
144 /// messages and re-enqueing all except the first matching message</remarks>
145 public void Remove(Func<TMessage,bool> predicate)
147 //Can this work? Dequeue all items
148 //and then enqueue everything except the filtered items
150 _queue.RemoveFirst(predicate);
153 public Task LoopAsync(Task process, Action loop,Action<Exception> onError=null)
155 Contract.Requires(process!=null);
156 Contract.Requires(loop!=null);
158 return process.ContinueWith(t =>
161 Task.Factory.StartNew(loop, CancellationToken);
165 var ex = t.Exception.InnerException;
166 if (ex is OperationCanceledException)
171 },CancellationToken);
174 public Task<T> LoopAsync<T>(Task<T> process, Action loop,Action<Exception> onError=null)
176 return process.ContinueWith(t =>
178 //Spawn the Loop immediatelly
179 Task.Factory.StartNew(loop,CancellationToken);
180 //Then process possible exceptions
183 var ex = t.Exception.InnerException;
184 if (ex is OperationCanceledException)
190 },CancellationToken);