root / trunk / Pithos.Core / Agents / Agent.cs @ dccd340f
History | View | Annotate | Download (7.5 kB)
1 | 255f5f86 | Panagiotis Kanavos | #region |
---|---|---|---|
2 | 255f5f86 | Panagiotis Kanavos | /* ----------------------------------------------------------------------- |
3 | 255f5f86 | Panagiotis Kanavos | * <copyright file="Agent.cs" company="GRNet"> |
4 | 255f5f86 | Panagiotis Kanavos | * |
5 | 255f5f86 | Panagiotis Kanavos | * Copyright 2011-2012 GRNET S.A. All rights reserved. |
6 | 255f5f86 | Panagiotis Kanavos | * |
7 | 255f5f86 | Panagiotis Kanavos | * Redistribution and use in source and binary forms, with or |
8 | 255f5f86 | Panagiotis Kanavos | * without modification, are permitted provided that the following |
9 | 255f5f86 | Panagiotis Kanavos | * conditions are met: |
10 | 255f5f86 | Panagiotis Kanavos | * |
11 | 255f5f86 | Panagiotis Kanavos | * 1. Redistributions of source code must retain the above |
12 | 255f5f86 | Panagiotis Kanavos | * copyright notice, this list of conditions and the following |
13 | 255f5f86 | Panagiotis Kanavos | * disclaimer. |
14 | 255f5f86 | Panagiotis Kanavos | * |
15 | 255f5f86 | Panagiotis Kanavos | * 2. Redistributions in binary form must reproduce the above |
16 | 255f5f86 | Panagiotis Kanavos | * copyright notice, this list of conditions and the following |
17 | 255f5f86 | Panagiotis Kanavos | * disclaimer in the documentation and/or other materials |
18 | 255f5f86 | Panagiotis Kanavos | * provided with the distribution. |
19 | 255f5f86 | Panagiotis Kanavos | * |
20 | 255f5f86 | Panagiotis Kanavos | * |
21 | 255f5f86 | Panagiotis Kanavos | * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS |
22 | 255f5f86 | Panagiotis Kanavos | * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED |
23 | 255f5f86 | Panagiotis Kanavos | * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR |
24 | 255f5f86 | Panagiotis Kanavos | * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR |
25 | 255f5f86 | Panagiotis Kanavos | * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, |
26 | 255f5f86 | Panagiotis Kanavos | * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT |
27 | 255f5f86 | Panagiotis Kanavos | * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF |
28 | 255f5f86 | Panagiotis Kanavos | * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED |
29 | 255f5f86 | Panagiotis Kanavos | * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT |
30 | 255f5f86 | Panagiotis Kanavos | * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN |
31 | 255f5f86 | Panagiotis Kanavos | * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE |
32 | 255f5f86 | Panagiotis Kanavos | * POSSIBILITY OF SUCH DAMAGE. |
33 | 255f5f86 | Panagiotis Kanavos | * |
34 | 255f5f86 | Panagiotis Kanavos | * The views and conclusions contained in the software and |
35 | 255f5f86 | Panagiotis Kanavos | * documentation are those of the authors and should not be |
36 | 255f5f86 | Panagiotis Kanavos | * interpreted as representing official policies, either expressed |
37 | 255f5f86 | Panagiotis Kanavos | * or implied, of GRNET S.A. |
38 | 255f5f86 | Panagiotis Kanavos | * </copyright> |
39 | 255f5f86 | Panagiotis Kanavos | * ----------------------------------------------------------------------- |
40 | 255f5f86 | Panagiotis Kanavos | */ |
41 | 255f5f86 | Panagiotis Kanavos | #endregion |
42 | 255f5f86 | Panagiotis Kanavos | using System; |
43 | 9c4346c9 | Panagiotis Kanavos | using System.Collections.Concurrent; |
44 | 9c4346c9 | Panagiotis Kanavos | using System.Collections.Generic; |
45 | cfed7823 | Panagiotis Kanavos | using System.Diagnostics.Contracts; |
46 | 9c4346c9 | Panagiotis Kanavos | using System.Threading; |
47 | ec1a1baf | Panagiotis Kanavos | using System.Threading.Async; |
48 | 9c4346c9 | Panagiotis Kanavos | using System.Threading.Tasks; |
49 | f3d080df | Panagiotis Kanavos | using Pithos.Core.Agents; |
50 | 9c4346c9 | Panagiotis Kanavos | |
51 | 9c4346c9 | Panagiotis Kanavos | namespace Pithos.Core |
52 | 9c4346c9 | Panagiotis Kanavos | { |
53 | 9c4346c9 | Panagiotis Kanavos | public class Agent<TMessage> : IDisposable |
54 | f3d080df | Panagiotis Kanavos | { |
55 | f3d080df | Panagiotis Kanavos | private readonly ConcurrentQueue<TMessage> _queue; |
56 | 174bbb6e | Panagiotis Kanavos | //private readonly AsyncCollection<TMessage> _messages; |
57 | 174bbb6e | Panagiotis Kanavos | private readonly AsyncCollection<TMessage> _messages; |
58 | 9c4346c9 | Panagiotis Kanavos | private readonly CancellationTokenSource _cancelSource = new CancellationTokenSource(); |
59 | 9c4346c9 | Panagiotis Kanavos | public CancellationToken CancellationToken; |
60 | 9c4346c9 | Panagiotis Kanavos | |
61 | 9c4346c9 | Panagiotis Kanavos | private readonly Action<Agent<TMessage>> _process; |
62 | 9c4346c9 | Panagiotis Kanavos | |
63 | 9c4346c9 | Panagiotis Kanavos | |
64 | 9c4346c9 | Panagiotis Kanavos | public Agent(Action<Agent<TMessage>> action) |
65 | 9c4346c9 | Panagiotis Kanavos | { |
66 | f3d080df | Panagiotis Kanavos | _queue=new ConcurrentQueue<TMessage>(); |
67 | 174bbb6e | Panagiotis Kanavos | _messages = new AsyncCollection<TMessage>(_queue); |
68 | 9c4346c9 | Panagiotis Kanavos | _process = action; |
69 | 9c4346c9 | Panagiotis Kanavos | CancellationToken = _cancelSource.Token; |
70 | 9c4346c9 | Panagiotis Kanavos | } |
71 | 9c4346c9 | Panagiotis Kanavos | |
72 | aa7ac00e | Panagiotis Kanavos | public bool IsEmpty |
73 | aa7ac00e | Panagiotis Kanavos | { |
74 | aa7ac00e | Panagiotis Kanavos | get { return _queue.IsEmpty; } |
75 | aa7ac00e | Panagiotis Kanavos | } |
76 | aa7ac00e | Panagiotis Kanavos | |
77 | 9c4346c9 | Panagiotis Kanavos | public void Post(TMessage message) |
78 | 9c4346c9 | Panagiotis Kanavos | { |
79 | 9c4346c9 | Panagiotis Kanavos | _messages.Add(message); |
80 | 9c4346c9 | Panagiotis Kanavos | } |
81 | 9c4346c9 | Panagiotis Kanavos | |
82 | 174bbb6e | Panagiotis Kanavos | ConcurrentDictionary<TMessage,TaskCompletionSource<object>> _awaiters=new ConcurrentDictionary<TMessage,TaskCompletionSource<object>>(); |
83 | 174bbb6e | Panagiotis Kanavos | |
84 | 174bbb6e | Panagiotis Kanavos | public Task PostAndAwait(TMessage message) |
85 | 174bbb6e | Panagiotis Kanavos | { |
86 | 174bbb6e | Panagiotis Kanavos | var tcs = new TaskCompletionSource<object>(); |
87 | 174bbb6e | Panagiotis Kanavos | _awaiters[message] = tcs; |
88 | 174bbb6e | Panagiotis Kanavos | Post(message); |
89 | 174bbb6e | Panagiotis Kanavos | return tcs.Task; |
90 | 174bbb6e | Panagiotis Kanavos | } |
91 | 174bbb6e | Panagiotis Kanavos | |
92 | aba9e6d9 | Panagiotis Kanavos | /// <summary> |
93 | aba9e6d9 | Panagiotis Kanavos | /// Receives a message asynchronously, optionally with a timeout. Receive throws a TimeoutException if the timeout expires |
94 | aba9e6d9 | Panagiotis Kanavos | /// </summary> |
95 | aba9e6d9 | Panagiotis Kanavos | /// <returns>A Task that will return the message asynchronously</returns> |
96 | ec1a1baf | Panagiotis Kanavos | public Task<TMessage> Receive() |
97 | 9c4346c9 | Panagiotis Kanavos | { |
98 | ec1a1baf | Panagiotis Kanavos | return _messages.Take(); |
99 | 9c4346c9 | Panagiotis Kanavos | } |
100 | 9c4346c9 | Panagiotis Kanavos | |
101 | 174bbb6e | Panagiotis Kanavos | public void NotifyComplete(TMessage message) |
102 | 174bbb6e | Panagiotis Kanavos | { |
103 | 174bbb6e | Panagiotis Kanavos | TaskCompletionSource<object> tcs; |
104 | 174bbb6e | Panagiotis Kanavos | if (_awaiters.TryRemove(message,out tcs)) |
105 | 174bbb6e | Panagiotis Kanavos | tcs.SetResult(null); |
106 | 174bbb6e | Panagiotis Kanavos | } |
107 | 174bbb6e | Panagiotis Kanavos | |
108 | 9c4346c9 | Panagiotis Kanavos | |
109 | 9c4346c9 | Panagiotis Kanavos | |
110 | aba9e6d9 | Panagiotis Kanavos | /// <summary> |
111 | aba9e6d9 | Panagiotis Kanavos | /// Start the agent |
112 | aba9e6d9 | Panagiotis Kanavos | /// </summary> |
113 | 9c4346c9 | Panagiotis Kanavos | public void Start() |
114 | 9c4346c9 | Panagiotis Kanavos | { |
115 | a27aa447 | Panagiotis Kanavos | Task.Factory.StartNew(() => _process(this), CancellationToken); |
116 | 9c4346c9 | Panagiotis Kanavos | } |
117 | 9c4346c9 | Panagiotis Kanavos | |
118 | 9c4346c9 | Panagiotis Kanavos | |
119 | aba9e6d9 | Panagiotis Kanavos | /// <summary> |
120 | aba9e6d9 | Panagiotis Kanavos | /// Create and start a new agent for the specified type of message |
121 | aba9e6d9 | Panagiotis Kanavos | /// </summary> |
122 | aba9e6d9 | Panagiotis Kanavos | /// <param name="action">The message processing action</param> |
123 | aba9e6d9 | Panagiotis Kanavos | /// <returns>A started Agent</returns> |
124 | 9c4346c9 | Panagiotis Kanavos | public static Agent<TMessage> Start(Action<Agent<TMessage>> action) |
125 | 9c4346c9 | Panagiotis Kanavos | { |
126 | 9c4346c9 | Panagiotis Kanavos | var agent = new Agent<TMessage>(action); |
127 | 9c4346c9 | Panagiotis Kanavos | agent.Start(); |
128 | 9c4346c9 | Panagiotis Kanavos | return agent; |
129 | 9c4346c9 | Panagiotis Kanavos | } |
130 | 9c4346c9 | Panagiotis Kanavos | |
131 | aba9e6d9 | Panagiotis Kanavos | /// <summary> |
132 | aba9e6d9 | Panagiotis Kanavos | /// Stops the agent |
133 | aba9e6d9 | Panagiotis Kanavos | /// </summary> |
134 | 9c4346c9 | Panagiotis Kanavos | public void Stop() |
135 | 9c4346c9 | Panagiotis Kanavos | { |
136 | aba9e6d9 | Panagiotis Kanavos | //Stop the message queue |
137 | ec1a1baf | Panagiotis Kanavos | //_messages.CompleteAdding(); |
138 | aba9e6d9 | Panagiotis Kanavos | //And signal the cancellation |
139 | 9c4346c9 | Panagiotis Kanavos | _cancelSource.Cancel(); |
140 | 9c4346c9 | Panagiotis Kanavos | } |
141 | 9c4346c9 | Panagiotis Kanavos | |
142 | aba9e6d9 | Panagiotis Kanavos | /// <summary> |
143 | aba9e6d9 | Panagiotis Kanavos | /// Execute an action asynchronously, using the agent's cancellation source |
144 | aba9e6d9 | Panagiotis Kanavos | /// </summary> |
145 | aba9e6d9 | Panagiotis Kanavos | /// <param name="action">The action to execute</param> |
146 | 9c4346c9 | Panagiotis Kanavos | public void DoAsync(Action action) |
147 | 9c4346c9 | Panagiotis Kanavos | { |
148 | cfed7823 | Panagiotis Kanavos | Contract.Requires(action!=null); |
149 | 9c4346c9 | Panagiotis Kanavos | Task.Factory.StartNew(action, CancellationToken); |
150 | 9c4346c9 | Panagiotis Kanavos | } |
151 | 9c4346c9 | Panagiotis Kanavos | |
152 | 9c4346c9 | Panagiotis Kanavos | |
153 | 9c4346c9 | Panagiotis Kanavos | ~Agent() |
154 | 9c4346c9 | Panagiotis Kanavos | { |
155 | 9c4346c9 | Panagiotis Kanavos | Dispose(false); |
156 | 9c4346c9 | Panagiotis Kanavos | } |
157 | 9c4346c9 | Panagiotis Kanavos | |
158 | 9c4346c9 | Panagiotis Kanavos | public void Dispose() |
159 | 9c4346c9 | Panagiotis Kanavos | { |
160 | 9c4346c9 | Panagiotis Kanavos | Dispose(true); |
161 | 9c4346c9 | Panagiotis Kanavos | GC.SuppressFinalize(this); |
162 | 9c4346c9 | Panagiotis Kanavos | } |
163 | 9c4346c9 | Panagiotis Kanavos | |
164 | 9c4346c9 | Panagiotis Kanavos | protected void Dispose(bool disposing) |
165 | 9c4346c9 | Panagiotis Kanavos | { |
166 | 9c4346c9 | Panagiotis Kanavos | if (disposing) |
167 | 9c4346c9 | Panagiotis Kanavos | { |
168 | 9c4346c9 | Panagiotis Kanavos | Stop(); |
169 | 9c4346c9 | Panagiotis Kanavos | _cancelSource.Dispose(); |
170 | 9c4346c9 | Panagiotis Kanavos | } |
171 | 9c4346c9 | Panagiotis Kanavos | } |
172 | 9c4346c9 | Panagiotis Kanavos | |
173 | 9c4346c9 | Panagiotis Kanavos | public IEnumerable<TMessage> GetEnumerable() |
174 | 9c4346c9 | Panagiotis Kanavos | { |
175 | ec1a1baf | Panagiotis Kanavos | return _queue; |
176 | 9c4346c9 | Panagiotis Kanavos | } |
177 | a27aa447 | Panagiotis Kanavos | |
178 | f3d080df | Panagiotis Kanavos | /// <summary> |
179 | f3d080df | Panagiotis Kanavos | /// Remove the first message that matches the predicate |
180 | f3d080df | Panagiotis Kanavos | /// </summary> |
181 | f3d080df | Panagiotis Kanavos | /// <param name="predicate">The condition to match</param> |
182 | f3d080df | Panagiotis Kanavos | /// <remarks>Removes the first message that matches the predicate by dequeing all |
183 | f3d080df | Panagiotis Kanavos | /// messages and re-enqueing all except the first matching message</remarks> |
184 | f3d080df | Panagiotis Kanavos | public void Remove(Func<TMessage,bool> predicate) |
185 | f3d080df | Panagiotis Kanavos | { |
186 | f3d080df | Panagiotis Kanavos | //Can this work? Dequeue all items |
187 | f3d080df | Panagiotis Kanavos | //and then enqueue everything except the filtered items |
188 | f3d080df | Panagiotis Kanavos | |
189 | f3d080df | Panagiotis Kanavos | _queue.RemoveFirst(predicate); |
190 | f3d080df | Panagiotis Kanavos | } |
191 | aba9e6d9 | Panagiotis Kanavos | |
192 | a27aa447 | Panagiotis Kanavos | public Task LoopAsync(Task process, Action loop,Action<Exception> onError=null) |
193 | a27aa447 | Panagiotis Kanavos | { |
194 | cfed7823 | Panagiotis Kanavos | Contract.Requires(process!=null); |
195 | cfed7823 | Panagiotis Kanavos | Contract.Requires(loop!=null); |
196 | cfed7823 | Panagiotis Kanavos | |
197 | a27aa447 | Panagiotis Kanavos | return process.ContinueWith(t => |
198 | a27aa447 | Panagiotis Kanavos | { |
199 | a27aa447 | Panagiotis Kanavos | |
200 | a27aa447 | Panagiotis Kanavos | Task.Factory.StartNew(loop, CancellationToken); |
201 | a27aa447 | Panagiotis Kanavos | |
202 | a27aa447 | Panagiotis Kanavos | if (t.IsFaulted) |
203 | a27aa447 | Panagiotis Kanavos | { |
204 | a27aa447 | Panagiotis Kanavos | var ex = t.Exception.InnerException; |
205 | a27aa447 | Panagiotis Kanavos | if (ex is OperationCanceledException) |
206 | a27aa447 | Panagiotis Kanavos | Stop(); |
207 | a27aa447 | Panagiotis Kanavos | if (onError != null) |
208 | a27aa447 | Panagiotis Kanavos | onError(ex); |
209 | a27aa447 | Panagiotis Kanavos | } |
210 | a64c87c8 | Panagiotis Kanavos | },CancellationToken); |
211 | a64c87c8 | Panagiotis Kanavos | } |
212 | a27aa447 | Panagiotis Kanavos | |
213 | a64c87c8 | Panagiotis Kanavos | public Task<T> LoopAsync<T>(Task<T> process, Action loop,Action<Exception> onError=null) |
214 | a64c87c8 | Panagiotis Kanavos | { |
215 | a64c87c8 | Panagiotis Kanavos | return process.ContinueWith(t => |
216 | a64c87c8 | Panagiotis Kanavos | { |
217 | a64c87c8 | Panagiotis Kanavos | //Spawn the Loop immediatelly |
218 | a64c87c8 | Panagiotis Kanavos | Task.Factory.StartNew(loop,CancellationToken); |
219 | a64c87c8 | Panagiotis Kanavos | //Then process possible exceptions |
220 | a64c87c8 | Panagiotis Kanavos | if (t.IsFaulted) |
221 | a64c87c8 | Panagiotis Kanavos | { |
222 | a64c87c8 | Panagiotis Kanavos | var ex = t.Exception.InnerException; |
223 | a64c87c8 | Panagiotis Kanavos | if (ex is OperationCanceledException) |
224 | a64c87c8 | Panagiotis Kanavos | Stop(); |
225 | a64c87c8 | Panagiotis Kanavos | if (onError != null) |
226 | a64c87c8 | Panagiotis Kanavos | onError(ex); |
227 | a64c87c8 | Panagiotis Kanavos | } |
228 | a64c87c8 | Panagiotis Kanavos | return default(T); |
229 | 174bbb6e | Panagiotis Kanavos | }); |
230 | a27aa447 | Panagiotis Kanavos | } |
231 | 9c4346c9 | Panagiotis Kanavos | } |
232 | 9c4346c9 | Panagiotis Kanavos | } |