SyncFiles method modified to detect conflicts as per #2096
[pithos-ms-client] / trunk / Pithos.Core / Agents / Agent.cs
1 #region
2 /* -----------------------------------------------------------------------
3  * <copyright file="Agent.cs" company="GRNet">
4  * 
5  * Copyright 2011-2012 GRNET S.A. All rights reserved.
6  *
7  * Redistribution and use in source and binary forms, with or
8  * without modification, are permitted provided that the following
9  * conditions are met:
10  *
11  *   1. Redistributions of source code must retain the above
12  *      copyright notice, this list of conditions and the following
13  *      disclaimer.
14  *
15  *   2. Redistributions in binary form must reproduce the above
16  *      copyright notice, this list of conditions and the following
17  *      disclaimer in the documentation and/or other materials
18  *      provided with the distribution.
19  *
20  *
21  * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
22  * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
23  * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
24  * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
25  * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
28  * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
29  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
30  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
31  * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
32  * POSSIBILITY OF SUCH DAMAGE.
33  *
34  * The views and conclusions contained in the software and
35  * documentation are those of the authors and should not be
36  * interpreted as representing official policies, either expressed
37  * or implied, of GRNET S.A.
38  * </copyright>
39  * -----------------------------------------------------------------------
40  */
41 #endregion
42 using System;
43 using System.Collections.Concurrent;
44 using System.Collections.Generic;
45 using System.Diagnostics.Contracts;
46 using System.Linq;
47 using System.Text;
48 using System.Threading;
49 using System.Threading.Tasks;
50 using Pithos.Core.Agents;
51
52 namespace Pithos.Core
53 {
54     public class Agent<TMessage> : IDisposable
55     {
56         private readonly ConcurrentQueue<TMessage> _queue;
57         private readonly BlockingCollection<TMessage> _messages;
58         private readonly CancellationTokenSource _cancelSource = new CancellationTokenSource();
59         public CancellationToken CancellationToken;
60
61         private readonly Action<Agent<TMessage>> _process;
62
63
64         public Agent(Action<Agent<TMessage>> action)
65         {
66             _queue=new ConcurrentQueue<TMessage>();
67             _messages = new BlockingCollection<TMessage>(_queue);
68             _process = action;
69             CancellationToken = _cancelSource.Token;
70         }
71
72         public bool IsEmpty
73         {
74             get { return _queue.IsEmpty; }
75         }
76
77         public void Post(TMessage message)
78         {
79             _messages.Add(message);
80         }
81
82         /// <summary>
83         /// Receives a message asynchronously, optionally with a timeout. Receive throws a TimeoutException if the timeout expires
84         /// </summary>
85         /// <param name="timeout">Optional timeout in milliseconds. If provided, Receive fails with a TimeoutException if no message is available in the specified time</param>
86         /// <returns>A Task that will return the message asynchronously</returns>
87         public Task<TMessage> Receive(int timeout = -1)
88         {
89             return Task<TMessage>.Factory.StartNew(() =>
90             {
91                 TMessage item;
92                 if (!_messages.TryTake(out item, timeout, CancellationToken))
93                     throw new TimeoutException();
94                 return item;
95             });
96         }
97
98
99         /// <summary>
100         /// Receives a message asynchronously, optionally with a timeout. TryReceive returns an empty task if the timeout expires
101         /// </summary>
102         /// <param name="timeout">Optional timeout in milliseconds. If provided, Receive returns an empty task</param>
103         /// <returns>A Task that will return the message asynchronously</returns>
104         public Task<TMessage> TryReceive(int timeout = -1)
105         {
106             return Task<TMessage>.Factory.StartNew(() =>
107             {
108                 TMessage item;
109                 _messages.TryTake(out item, timeout, CancellationToken);
110                 return item;
111             });
112         }
113
114
115
116         /// <summary>
117         /// Start the agent
118         /// </summary>
119         public void Start()
120         {
121             Task.Factory.StartNew(() => _process(this), CancellationToken);            
122         }
123
124
125         /// <summary>
126         /// Create and start a new agent for the specified type of message
127         /// </summary>
128         /// <param name="action">The message processing action</param>
129         /// <returns>A started Agent</returns>
130         public static Agent<TMessage> Start(Action<Agent<TMessage>> action)
131         {
132             var agent = new Agent<TMessage>(action);
133             agent.Start();
134             return agent;
135         }
136
137         /// <summary>
138         /// Stops the agent 
139         /// </summary>
140         public void Stop()
141         {
142             //Stop the message queue
143             _messages.CompleteAdding();
144             //And signal the cancellation
145             _cancelSource.Cancel();
146         }
147
148         /// <summary>
149         /// Execute an action asynchronously, using the agent's cancellation source
150         /// </summary>
151         /// <param name="action">The action to execute</param>
152         public void DoAsync(Action action)
153         {
154             Contract.Requires(action!=null);
155             Task.Factory.StartNew(action, CancellationToken);
156         }
157
158
159         ~Agent()
160         {
161             Dispose(false);
162         }
163
164         public void Dispose()
165         {
166             Dispose(true);
167             GC.SuppressFinalize(this);
168         }
169
170         protected void Dispose(bool disposing)
171         {
172             if (disposing)
173             {
174                 Stop();
175                 _messages.Dispose();
176                 _cancelSource.Dispose();
177             }
178         }
179
180         public IEnumerable<TMessage> GetEnumerable()
181         {
182             return _messages;
183         }
184
185         /// <summary>
186         /// Remove the first message that matches the predicate
187         /// </summary>
188         /// <param name="predicate">The condition to match</param>
189         /// <remarks>Removes the first message that matches the predicate by dequeing all 
190         /// messages and re-enqueing all except the first matching message</remarks>
191         public void Remove(Func<TMessage,bool> predicate)
192         {
193             //Can this work? Dequeue all items 
194             //and then enqueue everything except the filtered items
195
196             _queue.RemoveFirst(predicate);
197         }
198
199         public Task LoopAsync(Task process, Action loop,Action<Exception> onError=null)
200         {
201             Contract.Requires(process!=null);
202             Contract.Requires(loop!=null);
203
204             return process.ContinueWith(t =>
205             {   
206              
207                 Task.Factory.StartNew(loop, CancellationToken);
208
209                 if (t.IsFaulted)
210                 {
211                     var ex = t.Exception.InnerException;
212                     if (ex is OperationCanceledException)
213                         Stop();
214                     if (onError != null)
215                         onError(ex);
216                 }
217             },CancellationToken);
218         }
219
220         public Task<T> LoopAsync<T>(Task<T> process, Action loop,Action<Exception> onError=null)
221         {
222             return process.ContinueWith(t =>
223             {   
224                 //Spawn the Loop immediatelly
225                 Task.Factory.StartNew(loop,CancellationToken);
226                 //Then process possible exceptions
227                 if (t.IsFaulted)
228                 {
229                     var ex = t.Exception.InnerException;
230                     if (ex is OperationCanceledException)
231                         Stop();
232                     if (onError != null)
233                         onError(ex);
234                 }
235                 return default(T);
236             },CancellationToken);
237         }
238     }
239 }