e749f3327a05b3ae771b76907c1fb34efd2acba2
[pithos-ms-client] / trunk%2FPithos.Core%2FAgents%2FAgent.cs
1 #region
2 /* -----------------------------------------------------------------------
3  * <copyright file="Agent.cs" company="GRNet">
4  * 
5  * Copyright 2011-2012 GRNET S.A. All rights reserved.
6  *
7  * Redistribution and use in source and binary forms, with or
8  * without modification, are permitted provided that the following
9  * conditions are met:
10  *
11  *   1. Redistributions of source code must retain the above
12  *      copyright notice, this list of conditions and the following
13  *      disclaimer.
14  *
15  *   2. Redistributions in binary form must reproduce the above
16  *      copyright notice, this list of conditions and the following
17  *      disclaimer in the documentation and/or other materials
18  *      provided with the distribution.
19  *
20  *
21  * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
22  * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
23  * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
24  * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
25  * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
28  * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
29  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
30  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
31  * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
32  * POSSIBILITY OF SUCH DAMAGE.
33  *
34  * The views and conclusions contained in the software and
35  * documentation are those of the authors and should not be
36  * interpreted as representing official policies, either expressed
37  * or implied, of GRNET S.A.
38  * </copyright>
39  * -----------------------------------------------------------------------
40  */
41 #endregion
42 using System;
43 using System.Collections.Concurrent;
44 using System.Collections.Generic;
45 using System.Diagnostics.Contracts;
46 using System.Threading;
47 using System.Threading.Async;
48 using System.Threading.Tasks;
49 using Pithos.Core.Agents;
50
51 namespace Pithos.Core
52 {
53     public class Agent<TMessage> : IDisposable
54     {
55         private readonly ConcurrentQueue<TMessage> _queue;
56         //private readonly AsyncCollection<TMessage> _messages;
57         private readonly AsyncCollection<TMessage> _messages;
58         private readonly CancellationTokenSource _cancelSource = new CancellationTokenSource();
59         public CancellationToken CancellationToken;
60
61         private readonly Action<Agent<TMessage>> _process;
62
63
64         public Agent(Action<Agent<TMessage>> action)
65         {
66             _queue=new ConcurrentQueue<TMessage>();
67             _messages = new AsyncCollection<TMessage>(_queue);            
68             _process = action;
69             CancellationToken = _cancelSource.Token;
70         }
71
72         public bool IsEmpty
73         {
74             get { return _queue.IsEmpty; }
75         }
76
77         public void Post(TMessage message)
78         {
79             _messages.Add(message);
80         }
81
82         ConcurrentDictionary<TMessage,TaskCompletionSource<object>> _awaiters=new ConcurrentDictionary<TMessage,TaskCompletionSource<object>>();
83
84         public Task PostAndAwait(TMessage message)
85         {            
86             var tcs = new TaskCompletionSource<object>();
87             _awaiters[message] = tcs;
88             Post(message);
89             return tcs.Task;
90         }
91
92         /// <summary>
93         /// Receives a message asynchronously, optionally with a timeout. Receive throws a TimeoutException if the timeout expires
94         /// </summary>
95         /// <returns>A Task that will return the message asynchronously</returns>
96         public  Task<TMessage> Receive()
97         {
98             return _messages.Take();
99         }
100
101         public void NotifyComplete(TMessage message)
102         {
103             TaskCompletionSource<object> tcs;
104             if (_awaiters.TryRemove(message,out tcs))
105                 tcs.SetResult(null);
106         }
107
108
109
110         /// <summary>
111         /// Start the agent
112         /// </summary>
113         public void Start()
114         {
115             Task.Factory.StartNew(() => _process(this), CancellationToken);            
116         }
117
118
119         /// <summary>
120         /// Create and start a new agent for the specified type of message
121         /// </summary>
122         /// <param name="action">The message processing action</param>
123         /// <returns>A started Agent</returns>
124         public static Agent<TMessage> Start(Action<Agent<TMessage>> action)
125         {
126             var agent = new Agent<TMessage>(action);
127             agent.Start();
128             return agent;
129         }
130
131         /// <summary>
132         /// Stops the agent 
133         /// </summary>
134         public void Stop()
135         {
136             //Stop the message queue
137             //_messages.CompleteAdding();
138             //And signal the cancellation
139             _cancelSource.Cancel();
140         }
141
142         /// <summary>
143         /// Execute an action asynchronously, using the agent's cancellation source
144         /// </summary>
145         /// <param name="action">The action to execute</param>
146         public void DoAsync(Action action)
147         {
148             Contract.Requires(action!=null);
149             Task.Factory.StartNew(action, CancellationToken);
150         }
151
152
153         ~Agent()
154         {
155             Dispose(false);
156         }
157
158         public void Dispose()
159         {
160             Dispose(true);
161             GC.SuppressFinalize(this);
162         }
163
164         protected void Dispose(bool disposing)
165         {
166             if (disposing)
167             {
168                 Stop();
169                 _cancelSource.Dispose();
170             }
171         }
172
173         public IEnumerable<TMessage> GetEnumerable()
174         {
175             return _queue;
176         }
177
178         /// <summary>
179         /// Remove the first message that matches the predicate
180         /// </summary>
181         /// <param name="predicate">The condition to match</param>
182         /// <remarks>Removes the first message that matches the predicate by dequeing all 
183         /// messages and re-enqueing all except the first matching message</remarks>
184         public void Remove(Func<TMessage,bool> predicate)
185         {
186             //Can this work? Dequeue all items 
187             //and then enqueue everything except the filtered items
188
189             _queue.RemoveFirst(predicate);
190         }
191
192         public Task LoopAsync(Task process, Action loop,Action<Exception> onError=null)
193         {
194             Contract.Requires(process!=null);
195             Contract.Requires(loop!=null);
196
197             return process.ContinueWith(t =>
198             {   
199              
200                 Task.Factory.StartNew(loop, CancellationToken);
201
202                 if (t.IsFaulted)
203                 {
204                     var ex = t.Exception.InnerException;
205                     if (ex is OperationCanceledException)
206                         Stop();
207                     if (onError != null)
208                         onError(ex);
209                 }
210             },CancellationToken);
211         }
212
213         public Task<T> LoopAsync<T>(Task<T> process, Action loop,Action<Exception> onError=null)
214         {
215             return process.ContinueWith(t =>
216             {   
217                 //Spawn the Loop immediatelly
218                 Task.Factory.StartNew(loop,CancellationToken);
219                 //Then process possible exceptions
220                 if (t.IsFaulted)
221                 {
222                     var ex = t.Exception.InnerException;
223                     if (ex is OperationCanceledException)
224                         Stop();
225                     if (onError != null)
226                         onError(ex);
227                 }
228                 return default(T);
229             });
230         }
231     }
232 }