Preliminary fix for #1999, incorrect deletions
[pithos-ms-client] / trunk / Pithos.Core / Agents / Agent.cs
1 using System;
2 using System.Collections.Concurrent;
3 using System.Collections.Generic;
4 using System.Diagnostics.Contracts;
5 using System.Linq;
6 using System.Text;
7 using System.Threading;
8 using System.Threading.Tasks;
9 using Pithos.Core.Agents;
10
11 namespace Pithos.Core
12 {
13     public class Agent<TMessage> : IDisposable
14     {
15         private readonly ConcurrentQueue<TMessage> _queue;
16         private readonly BlockingCollection<TMessage> _messages;
17         private readonly CancellationTokenSource _cancelSource = new CancellationTokenSource();
18         public CancellationToken CancellationToken;
19
20         private readonly Action<Agent<TMessage>> _process;
21
22
23         public Agent(Action<Agent<TMessage>> action)
24         {
25             _queue=new ConcurrentQueue<TMessage>();
26             _messages = new BlockingCollection<TMessage>(_queue);
27             _process = action;
28             CancellationToken = _cancelSource.Token;
29         }
30
31         public void Post(TMessage message)
32         {
33             _messages.Add(message);
34         }
35
36         /// <summary>
37         /// Receives a message asynchronously, optionally with a timeout. Receive throws a TimeoutException if the timeout expires
38         /// </summary>
39         /// <param name="timeout">Optional timeout in milliseconds. If provided, Receive fails with a TimeoutException if no message is available in the specified time</param>
40         /// <returns>A Task that will return the message asynchronously</returns>
41         public Task<TMessage> Receive(int timeout = -1)
42         {
43             return Task<TMessage>.Factory.StartNew(() =>
44             {
45                 TMessage item;
46                 if (!_messages.TryTake(out item, timeout, CancellationToken))
47                     throw new TimeoutException();
48                 return item;
49             });
50         }
51
52
53         /// <summary>
54         /// Receives a message asynchronously, optionally with a timeout. TryReceive returns an empty task if the timeout expires
55         /// </summary>
56         /// <param name="timeout">Optional timeout in milliseconds. If provided, Receive returns an empty task</param>
57         /// <returns>A Task that will return the message asynchronously</returns>
58         public Task<TMessage> TryReceive(int timeout = -1)
59         {
60             return Task<TMessage>.Factory.StartNew(() =>
61             {
62                 TMessage item;
63                 _messages.TryTake(out item, timeout, CancellationToken);
64                 return item;
65             });
66         }
67
68
69
70         /// <summary>
71         /// Start the agent
72         /// </summary>
73         public void Start()
74         {
75             Task.Factory.StartNew(() => _process(this), CancellationToken);            
76         }
77
78
79         /// <summary>
80         /// Create and start a new agent for the specified type of message
81         /// </summary>
82         /// <param name="action">The message processing action</param>
83         /// <returns>A started Agent</returns>
84         public static Agent<TMessage> Start(Action<Agent<TMessage>> action)
85         {
86             var agent = new Agent<TMessage>(action);
87             agent.Start();
88             return agent;
89         }
90
91         /// <summary>
92         /// Stops the agent 
93         /// </summary>
94         public void Stop()
95         {
96             //Stop the message queue
97             _messages.CompleteAdding();
98             //And signal the cancellation
99             _cancelSource.Cancel();
100         }
101
102         /// <summary>
103         /// Execute an action asynchronously, using the agent's cancellation source
104         /// </summary>
105         /// <param name="action">The action to execute</param>
106         public void DoAsync(Action action)
107         {
108             Contract.Requires(action!=null);
109             Task.Factory.StartNew(action, CancellationToken);
110         }
111
112
113         ~Agent()
114         {
115             Dispose(false);
116         }
117
118         public void Dispose()
119         {
120             Dispose(true);
121             GC.SuppressFinalize(this);
122         }
123
124         protected void Dispose(bool disposing)
125         {
126             if (disposing)
127             {
128                 Stop();
129                 _messages.Dispose();
130                 _cancelSource.Dispose();
131             }
132         }
133
134         public IEnumerable<TMessage> GetEnumerable()
135         {
136             return _messages;
137         }
138
139         /// <summary>
140         /// Remove the first message that matches the predicate
141         /// </summary>
142         /// <param name="predicate">The condition to match</param>
143         /// <remarks>Removes the first message that matches the predicate by dequeing all 
144         /// messages and re-enqueing all except the first matching message</remarks>
145         public void Remove(Func<TMessage,bool> predicate)
146         {
147             //Can this work? Dequeue all items 
148             //and then enqueue everything except the filtered items
149
150             _queue.RemoveFirst(predicate);
151         }
152
153         public Task LoopAsync(Task process, Action loop,Action<Exception> onError=null)
154         {
155             Contract.Requires(process!=null);
156             Contract.Requires(loop!=null);
157
158             return process.ContinueWith(t =>
159             {   
160              
161                 Task.Factory.StartNew(loop, CancellationToken);
162
163                 if (t.IsFaulted)
164                 {
165                     var ex = t.Exception.InnerException;
166                     if (ex is OperationCanceledException)
167                         Stop();
168                     if (onError != null)
169                         onError(ex);
170                 }
171             },CancellationToken);
172         }
173
174         public Task<T> LoopAsync<T>(Task<T> process, Action loop,Action<Exception> onError=null)
175         {
176             return process.ContinueWith(t =>
177             {   
178                 //Spawn the Loop immediatelly
179                 Task.Factory.StartNew(loop,CancellationToken);
180                 //Then process possible exceptions
181                 if (t.IsFaulted)
182                 {
183                     var ex = t.Exception.InnerException;
184                     if (ex is OperationCanceledException)
185                         Stop();
186                     if (onError != null)
187                         onError(ex);
188                 }
189                 return default(T);
190             },CancellationToken);
191         }
192     }
193 }