2 /* -----------------------------------------------------------------------
3 * <copyright file="Agent.cs" company="GRNet">
5 * Copyright 2011-2012 GRNET S.A. All rights reserved.
7 * Redistribution and use in source and binary forms, with or
8 * without modification, are permitted provided that the following
11 * 1. Redistributions of source code must retain the above
12 * copyright notice, this list of conditions and the following
15 * 2. Redistributions in binary form must reproduce the above
16 * copyright notice, this list of conditions and the following
17 * disclaimer in the documentation and/or other materials
18 * provided with the distribution.
21 * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
22 * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
23 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
24 * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
25 * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
28 * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
29 * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
30 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
31 * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
32 * POSSIBILITY OF SUCH DAMAGE.
34 * The views and conclusions contained in the software and
35 * documentation are those of the authors and should not be
36 * interpreted as representing official policies, either expressed
37 * or implied, of GRNET S.A.
39 * -----------------------------------------------------------------------
43 using System.Collections.Concurrent;
44 using System.Collections.Generic;
45 using System.Diagnostics.Contracts;
46 using System.Threading;
47 using System.Threading.Async;
48 using System.Threading.Tasks;
49 using Pithos.Core.Agents;
53 public class Agent<TMessage> : IDisposable
55 private readonly ConcurrentQueue<TMessage> _queue;
56 //private readonly AsyncCollection<TMessage> _messages;
57 private readonly AsyncCollection<TMessage> _messages;
58 private readonly CancellationTokenSource _cancelSource = new CancellationTokenSource();
59 public CancellationToken CancellationToken;
61 private readonly Action<Agent<TMessage>> _process;
64 public Agent(Action<Agent<TMessage>> action)
66 _queue=new ConcurrentQueue<TMessage>();
67 _messages = new AsyncCollection<TMessage>(_queue);
69 CancellationToken = _cancelSource.Token;
74 get { return _queue.IsEmpty; }
77 public void Post(TMessage message)
79 _messages.Add(message);
82 ConcurrentDictionary<TMessage,TaskCompletionSource<object>> _awaiters=new ConcurrentDictionary<TMessage,TaskCompletionSource<object>>();
84 public Task PostAndAwait(TMessage message)
86 var tcs = new TaskCompletionSource<object>();
87 _awaiters[message] = tcs;
93 /// Receives a message asynchronously, optionally with a timeout. Receive throws a TimeoutException if the timeout expires
95 /// <returns>A Task that will return the message asynchronously</returns>
96 public Task<TMessage> Receive()
98 return _messages.Take();
101 public void NotifyComplete(TMessage message)
103 TaskCompletionSource<object> tcs;
104 if (_awaiters.TryRemove(message,out tcs))
115 Task.Factory.StartNew(() => _process(this), CancellationToken);
120 /// Create and start a new agent for the specified type of message
122 /// <param name="action">The message processing action</param>
123 /// <returns>A started Agent</returns>
124 public static Agent<TMessage> Start(Action<Agent<TMessage>> action)
126 var agent = new Agent<TMessage>(action);
136 //Stop the message queue
137 //_messages.CompleteAdding();
138 //And signal the cancellation
139 _cancelSource.Cancel();
143 /// Execute an action asynchronously, using the agent's cancellation source
145 /// <param name="action">The action to execute</param>
146 public void DoAsync(Action action)
149 throw new ArgumentNullException("action");
150 Contract.EndContractBlock();
152 Task.Factory.StartNew(action, CancellationToken);
161 public void Dispose()
164 GC.SuppressFinalize(this);
167 protected void Dispose(bool disposing)
172 _cancelSource.Dispose();
176 public IEnumerable<TMessage> GetEnumerable()
182 /// Remove the first message that matches the predicate
184 /// <param name="predicate">The condition to match</param>
185 /// <remarks>Removes the first message that matches the predicate by dequeing all
186 /// messages and re-enqueing all except the first matching message</remarks>
187 public void Remove(Func<TMessage,bool> predicate)
189 //Can this work? Dequeue all items
190 //and then enqueue everything except the filtered items
192 _queue.RemoveFirst(predicate);
195 public Task LoopAsync(Task process, Action loop,Action<Exception> onError=null)
198 throw new ArgumentNullException("process");
200 throw new ArgumentNullException("loop");
201 Contract.EndContractBlock();
203 return process.ContinueWith(t =>
206 Task.Factory.StartNew(loop, CancellationToken);
210 var ex = t.Exception.InnerException;
211 if (ex is OperationCanceledException)
216 },CancellationToken);
219 public Task<T> LoopAsync<T>(Task<T> process, Action loop,Action<Exception> onError=null)
221 return process.ContinueWith(t =>
223 //Spawn the Loop immediatelly
224 Task.Factory.StartNew(loop,CancellationToken);
225 //Then process possible exceptions
228 var ex = t.Exception.InnerException;
229 if (ex is OperationCanceledException)