2 /* -----------------------------------------------------------------------
3 * <copyright file="Agent.cs" company="GRNet">
5 * Copyright 2011-2012 GRNET S.A. All rights reserved.
7 * Redistribution and use in source and binary forms, with or
8 * without modification, are permitted provided that the following
11 * 1. Redistributions of source code must retain the above
12 * copyright notice, this list of conditions and the following
15 * 2. Redistributions in binary form must reproduce the above
16 * copyright notice, this list of conditions and the following
17 * disclaimer in the documentation and/or other materials
18 * provided with the distribution.
21 * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
22 * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
23 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
24 * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
25 * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
28 * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
29 * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
30 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
31 * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
32 * POSSIBILITY OF SUCH DAMAGE.
34 * The views and conclusions contained in the software and
35 * documentation are those of the authors and should not be
36 * interpreted as representing official policies, either expressed
37 * or implied, of GRNET S.A.
39 * -----------------------------------------------------------------------
43 using System.Collections.Concurrent;
44 using System.Collections.Generic;
45 using System.Diagnostics.Contracts;
48 using System.Threading;
49 using System.Threading.Tasks;
50 using Pithos.Core.Agents;
54 public class Agent<TMessage> : IDisposable
56 private readonly ConcurrentQueue<TMessage> _queue;
57 private readonly BlockingCollection<TMessage> _messages;
58 private readonly CancellationTokenSource _cancelSource = new CancellationTokenSource();
59 public CancellationToken CancellationToken;
61 private readonly Action<Agent<TMessage>> _process;
64 public Agent(Action<Agent<TMessage>> action)
66 _queue=new ConcurrentQueue<TMessage>();
67 _messages = new BlockingCollection<TMessage>(_queue);
69 CancellationToken = _cancelSource.Token;
74 get { return _queue.IsEmpty; }
77 public void Post(TMessage message)
79 _messages.Add(message);
83 /// Receives a message asynchronously, optionally with a timeout. Receive throws a TimeoutException if the timeout expires
85 /// <param name="timeout">Optional timeout in milliseconds. If provided, Receive fails with a TimeoutException if no message is available in the specified time</param>
86 /// <returns>A Task that will return the message asynchronously</returns>
87 public Task<TMessage> Receive(int timeout = -1)
89 return Task<TMessage>.Factory.StartNew(() =>
92 if (!_messages.TryTake(out item, timeout, CancellationToken))
93 throw new TimeoutException();
100 /// Receives a message asynchronously, optionally with a timeout. TryReceive returns an empty task if the timeout expires
102 /// <param name="timeout">Optional timeout in milliseconds. If provided, Receive returns an empty task</param>
103 /// <returns>A Task that will return the message asynchronously</returns>
104 public Task<TMessage> TryReceive(int timeout = -1)
106 return Task<TMessage>.Factory.StartNew(() =>
109 _messages.TryTake(out item, timeout, CancellationToken);
121 Task.Factory.StartNew(() => _process(this), CancellationToken);
126 /// Create and start a new agent for the specified type of message
128 /// <param name="action">The message processing action</param>
129 /// <returns>A started Agent</returns>
130 public static Agent<TMessage> Start(Action<Agent<TMessage>> action)
132 var agent = new Agent<TMessage>(action);
142 //Stop the message queue
143 _messages.CompleteAdding();
144 //And signal the cancellation
145 _cancelSource.Cancel();
149 /// Execute an action asynchronously, using the agent's cancellation source
151 /// <param name="action">The action to execute</param>
152 public void DoAsync(Action action)
154 Contract.Requires(action!=null);
155 Task.Factory.StartNew(action, CancellationToken);
164 public void Dispose()
167 GC.SuppressFinalize(this);
170 protected void Dispose(bool disposing)
176 _cancelSource.Dispose();
180 public IEnumerable<TMessage> GetEnumerable()
186 /// Remove the first message that matches the predicate
188 /// <param name="predicate">The condition to match</param>
189 /// <remarks>Removes the first message that matches the predicate by dequeing all
190 /// messages and re-enqueing all except the first matching message</remarks>
191 public void Remove(Func<TMessage,bool> predicate)
193 //Can this work? Dequeue all items
194 //and then enqueue everything except the filtered items
196 _queue.RemoveFirst(predicate);
199 public Task LoopAsync(Task process, Action loop,Action<Exception> onError=null)
201 Contract.Requires(process!=null);
202 Contract.Requires(loop!=null);
204 return process.ContinueWith(t =>
207 Task.Factory.StartNew(loop, CancellationToken);
211 var ex = t.Exception.InnerException;
212 if (ex is OperationCanceledException)
217 },CancellationToken);
220 public Task<T> LoopAsync<T>(Task<T> process, Action loop,Action<Exception> onError=null)
222 return process.ContinueWith(t =>
224 //Spawn the Loop immediatelly
225 Task.Factory.StartNew(loop,CancellationToken);
226 //Then process possible exceptions
229 var ex = t.Exception.InnerException;
230 if (ex is OperationCanceledException)
236 },CancellationToken);