Statistics
| Branch: | Revision:

root / trunk / Pithos.Core / Agents / Agent.cs @ 2341c603

History | View | Annotate | Download (7.7 kB)

1 255f5f86 Panagiotis Kanavos
#region
2 255f5f86 Panagiotis Kanavos
/* -----------------------------------------------------------------------
3 255f5f86 Panagiotis Kanavos
 * <copyright file="Agent.cs" company="GRNet">
4 255f5f86 Panagiotis Kanavos
 * 
5 255f5f86 Panagiotis Kanavos
 * Copyright 2011-2012 GRNET S.A. All rights reserved.
6 255f5f86 Panagiotis Kanavos
 *
7 255f5f86 Panagiotis Kanavos
 * Redistribution and use in source and binary forms, with or
8 255f5f86 Panagiotis Kanavos
 * without modification, are permitted provided that the following
9 255f5f86 Panagiotis Kanavos
 * conditions are met:
10 255f5f86 Panagiotis Kanavos
 *
11 255f5f86 Panagiotis Kanavos
 *   1. Redistributions of source code must retain the above
12 255f5f86 Panagiotis Kanavos
 *      copyright notice, this list of conditions and the following
13 255f5f86 Panagiotis Kanavos
 *      disclaimer.
14 255f5f86 Panagiotis Kanavos
 *
15 255f5f86 Panagiotis Kanavos
 *   2. Redistributions in binary form must reproduce the above
16 255f5f86 Panagiotis Kanavos
 *      copyright notice, this list of conditions and the following
17 255f5f86 Panagiotis Kanavos
 *      disclaimer in the documentation and/or other materials
18 255f5f86 Panagiotis Kanavos
 *      provided with the distribution.
19 255f5f86 Panagiotis Kanavos
 *
20 255f5f86 Panagiotis Kanavos
 *
21 255f5f86 Panagiotis Kanavos
 * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
22 255f5f86 Panagiotis Kanavos
 * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
23 255f5f86 Panagiotis Kanavos
 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
24 255f5f86 Panagiotis Kanavos
 * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
25 255f5f86 Panagiotis Kanavos
 * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26 255f5f86 Panagiotis Kanavos
 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27 255f5f86 Panagiotis Kanavos
 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
28 255f5f86 Panagiotis Kanavos
 * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
29 255f5f86 Panagiotis Kanavos
 * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
30 255f5f86 Panagiotis Kanavos
 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
31 255f5f86 Panagiotis Kanavos
 * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
32 255f5f86 Panagiotis Kanavos
 * POSSIBILITY OF SUCH DAMAGE.
33 255f5f86 Panagiotis Kanavos
 *
34 255f5f86 Panagiotis Kanavos
 * The views and conclusions contained in the software and
35 255f5f86 Panagiotis Kanavos
 * documentation are those of the authors and should not be
36 255f5f86 Panagiotis Kanavos
 * interpreted as representing official policies, either expressed
37 255f5f86 Panagiotis Kanavos
 * or implied, of GRNET S.A.
38 255f5f86 Panagiotis Kanavos
 * </copyright>
39 255f5f86 Panagiotis Kanavos
 * -----------------------------------------------------------------------
40 255f5f86 Panagiotis Kanavos
 */
41 255f5f86 Panagiotis Kanavos
#endregion
42 255f5f86 Panagiotis Kanavos
using System;
43 9c4346c9 Panagiotis Kanavos
using System.Collections.Concurrent;
44 9c4346c9 Panagiotis Kanavos
using System.Collections.Generic;
45 cfed7823 Panagiotis Kanavos
using System.Diagnostics.Contracts;
46 9c4346c9 Panagiotis Kanavos
using System.Threading;
47 ec1a1baf Panagiotis Kanavos
using System.Threading.Async;
48 9c4346c9 Panagiotis Kanavos
using System.Threading.Tasks;
49 f3d080df Panagiotis Kanavos
using Pithos.Core.Agents;
50 9c4346c9 Panagiotis Kanavos
51 9c4346c9 Panagiotis Kanavos
namespace Pithos.Core
52 9c4346c9 Panagiotis Kanavos
{
53 9c4346c9 Panagiotis Kanavos
    public class Agent<TMessage> : IDisposable
54 f3d080df Panagiotis Kanavos
    {
55 f3d080df Panagiotis Kanavos
        private readonly ConcurrentQueue<TMessage> _queue;
56 174bbb6e Panagiotis Kanavos
        //private readonly AsyncCollection<TMessage> _messages;
57 174bbb6e Panagiotis Kanavos
        private readonly AsyncCollection<TMessage> _messages;
58 9c4346c9 Panagiotis Kanavos
        private readonly CancellationTokenSource _cancelSource = new CancellationTokenSource();
59 9c4346c9 Panagiotis Kanavos
        public CancellationToken CancellationToken;
60 9c4346c9 Panagiotis Kanavos
61 9c4346c9 Panagiotis Kanavos
        private readonly Action<Agent<TMessage>> _process;
62 9c4346c9 Panagiotis Kanavos
63 9c4346c9 Panagiotis Kanavos
64 9c4346c9 Panagiotis Kanavos
        public Agent(Action<Agent<TMessage>> action)
65 9c4346c9 Panagiotis Kanavos
        {
66 f3d080df Panagiotis Kanavos
            _queue=new ConcurrentQueue<TMessage>();
67 174bbb6e Panagiotis Kanavos
            _messages = new AsyncCollection<TMessage>(_queue);            
68 9c4346c9 Panagiotis Kanavos
            _process = action;
69 9c4346c9 Panagiotis Kanavos
            CancellationToken = _cancelSource.Token;
70 9c4346c9 Panagiotis Kanavos
        }
71 9c4346c9 Panagiotis Kanavos
72 aa7ac00e Panagiotis Kanavos
        public bool IsEmpty
73 aa7ac00e Panagiotis Kanavos
        {
74 aa7ac00e Panagiotis Kanavos
            get { return _queue.IsEmpty; }
75 aa7ac00e Panagiotis Kanavos
        }
76 aa7ac00e Panagiotis Kanavos
77 9c4346c9 Panagiotis Kanavos
        public void Post(TMessage message)
78 9c4346c9 Panagiotis Kanavos
        {
79 9c4346c9 Panagiotis Kanavos
            _messages.Add(message);
80 9c4346c9 Panagiotis Kanavos
        }
81 9c4346c9 Panagiotis Kanavos
82 174bbb6e Panagiotis Kanavos
        ConcurrentDictionary<TMessage,TaskCompletionSource<object>> _awaiters=new ConcurrentDictionary<TMessage,TaskCompletionSource<object>>();
83 174bbb6e Panagiotis Kanavos
84 174bbb6e Panagiotis Kanavos
        public Task PostAndAwait(TMessage message)
85 174bbb6e Panagiotis Kanavos
        {            
86 174bbb6e Panagiotis Kanavos
            var tcs = new TaskCompletionSource<object>();
87 174bbb6e Panagiotis Kanavos
            _awaiters[message] = tcs;
88 174bbb6e Panagiotis Kanavos
            Post(message);
89 174bbb6e Panagiotis Kanavos
            return tcs.Task;
90 174bbb6e Panagiotis Kanavos
        }
91 174bbb6e Panagiotis Kanavos
92 aba9e6d9 Panagiotis Kanavos
        /// <summary>
93 aba9e6d9 Panagiotis Kanavos
        /// Receives a message asynchronously, optionally with a timeout. Receive throws a TimeoutException if the timeout expires
94 aba9e6d9 Panagiotis Kanavos
        /// </summary>
95 aba9e6d9 Panagiotis Kanavos
        /// <returns>A Task that will return the message asynchronously</returns>
96 ec1a1baf Panagiotis Kanavos
        public  Task<TMessage> Receive()
97 9c4346c9 Panagiotis Kanavos
        {
98 ec1a1baf Panagiotis Kanavos
            return _messages.Take();
99 9c4346c9 Panagiotis Kanavos
        }
100 9c4346c9 Panagiotis Kanavos
101 174bbb6e Panagiotis Kanavos
        public void NotifyComplete(TMessage message)
102 174bbb6e Panagiotis Kanavos
        {
103 174bbb6e Panagiotis Kanavos
            TaskCompletionSource<object> tcs;
104 174bbb6e Panagiotis Kanavos
            if (_awaiters.TryRemove(message,out tcs))
105 174bbb6e Panagiotis Kanavos
                tcs.SetResult(null);
106 174bbb6e Panagiotis Kanavos
        }
107 174bbb6e Panagiotis Kanavos
108 9c4346c9 Panagiotis Kanavos
109 9c4346c9 Panagiotis Kanavos
110 aba9e6d9 Panagiotis Kanavos
        /// <summary>
111 aba9e6d9 Panagiotis Kanavos
        /// Start the agent
112 aba9e6d9 Panagiotis Kanavos
        /// </summary>
113 9c4346c9 Panagiotis Kanavos
        public void Start()
114 9c4346c9 Panagiotis Kanavos
        {
115 a27aa447 Panagiotis Kanavos
            Task.Factory.StartNew(() => _process(this), CancellationToken);            
116 9c4346c9 Panagiotis Kanavos
        }
117 9c4346c9 Panagiotis Kanavos
118 9c4346c9 Panagiotis Kanavos
119 aba9e6d9 Panagiotis Kanavos
        /// <summary>
120 aba9e6d9 Panagiotis Kanavos
        /// Create and start a new agent for the specified type of message
121 aba9e6d9 Panagiotis Kanavos
        /// </summary>
122 aba9e6d9 Panagiotis Kanavos
        /// <param name="action">The message processing action</param>
123 aba9e6d9 Panagiotis Kanavos
        /// <returns>A started Agent</returns>
124 9c4346c9 Panagiotis Kanavos
        public static Agent<TMessage> Start(Action<Agent<TMessage>> action)
125 9c4346c9 Panagiotis Kanavos
        {
126 9c4346c9 Panagiotis Kanavos
            var agent = new Agent<TMessage>(action);
127 9c4346c9 Panagiotis Kanavos
            agent.Start();
128 9c4346c9 Panagiotis Kanavos
            return agent;
129 9c4346c9 Panagiotis Kanavos
        }
130 9c4346c9 Panagiotis Kanavos
131 aba9e6d9 Panagiotis Kanavos
        /// <summary>
132 aba9e6d9 Panagiotis Kanavos
        /// Stops the agent 
133 aba9e6d9 Panagiotis Kanavos
        /// </summary>
134 9c4346c9 Panagiotis Kanavos
        public void Stop()
135 9c4346c9 Panagiotis Kanavos
        {
136 aba9e6d9 Panagiotis Kanavos
            //Stop the message queue
137 ec1a1baf Panagiotis Kanavos
            //_messages.CompleteAdding();
138 aba9e6d9 Panagiotis Kanavos
            //And signal the cancellation
139 9c4346c9 Panagiotis Kanavos
            _cancelSource.Cancel();
140 9c4346c9 Panagiotis Kanavos
        }
141 9c4346c9 Panagiotis Kanavos
142 aba9e6d9 Panagiotis Kanavos
        /// <summary>
143 aba9e6d9 Panagiotis Kanavos
        /// Execute an action asynchronously, using the agent's cancellation source
144 aba9e6d9 Panagiotis Kanavos
        /// </summary>
145 aba9e6d9 Panagiotis Kanavos
        /// <param name="action">The action to execute</param>
146 9c4346c9 Panagiotis Kanavos
        public void DoAsync(Action action)
147 9c4346c9 Panagiotis Kanavos
        {
148 d78d765c pkanavos
            if(action==null)
149 d78d765c pkanavos
                throw new ArgumentNullException("action");
150 d78d765c pkanavos
            Contract.EndContractBlock();
151 d78d765c pkanavos
152 9c4346c9 Panagiotis Kanavos
            Task.Factory.StartNew(action, CancellationToken);
153 9c4346c9 Panagiotis Kanavos
        }
154 9c4346c9 Panagiotis Kanavos
155 9c4346c9 Panagiotis Kanavos
156 9c4346c9 Panagiotis Kanavos
        ~Agent()
157 9c4346c9 Panagiotis Kanavos
        {
158 9c4346c9 Panagiotis Kanavos
            Dispose(false);
159 9c4346c9 Panagiotis Kanavos
        }
160 9c4346c9 Panagiotis Kanavos
161 9c4346c9 Panagiotis Kanavos
        public void Dispose()
162 9c4346c9 Panagiotis Kanavos
        {
163 9c4346c9 Panagiotis Kanavos
            Dispose(true);
164 9c4346c9 Panagiotis Kanavos
            GC.SuppressFinalize(this);
165 9c4346c9 Panagiotis Kanavos
        }
166 9c4346c9 Panagiotis Kanavos
167 9c4346c9 Panagiotis Kanavos
        protected void Dispose(bool disposing)
168 9c4346c9 Panagiotis Kanavos
        {
169 9c4346c9 Panagiotis Kanavos
            if (disposing)
170 9c4346c9 Panagiotis Kanavos
            {
171 9c4346c9 Panagiotis Kanavos
                Stop();
172 9c4346c9 Panagiotis Kanavos
                _cancelSource.Dispose();
173 9c4346c9 Panagiotis Kanavos
            }
174 9c4346c9 Panagiotis Kanavos
        }
175 9c4346c9 Panagiotis Kanavos
176 9c4346c9 Panagiotis Kanavos
        public IEnumerable<TMessage> GetEnumerable()
177 9c4346c9 Panagiotis Kanavos
        {
178 ec1a1baf Panagiotis Kanavos
            return _queue;
179 9c4346c9 Panagiotis Kanavos
        }
180 a27aa447 Panagiotis Kanavos
181 f3d080df Panagiotis Kanavos
        /// <summary>
182 f3d080df Panagiotis Kanavos
        /// Remove the first message that matches the predicate
183 f3d080df Panagiotis Kanavos
        /// </summary>
184 f3d080df Panagiotis Kanavos
        /// <param name="predicate">The condition to match</param>
185 f3d080df Panagiotis Kanavos
        /// <remarks>Removes the first message that matches the predicate by dequeing all 
186 f3d080df Panagiotis Kanavos
        /// messages and re-enqueing all except the first matching message</remarks>
187 f3d080df Panagiotis Kanavos
        public void Remove(Func<TMessage,bool> predicate)
188 f3d080df Panagiotis Kanavos
        {
189 f3d080df Panagiotis Kanavos
            //Can this work? Dequeue all items 
190 f3d080df Panagiotis Kanavos
            //and then enqueue everything except the filtered items
191 f3d080df Panagiotis Kanavos
192 f3d080df Panagiotis Kanavos
            _queue.RemoveFirst(predicate);
193 f3d080df Panagiotis Kanavos
        }
194 aba9e6d9 Panagiotis Kanavos
195 a27aa447 Panagiotis Kanavos
        public Task LoopAsync(Task process, Action loop,Action<Exception> onError=null)
196 a27aa447 Panagiotis Kanavos
        {
197 d78d765c pkanavos
            if(process==null)
198 d78d765c pkanavos
                throw new ArgumentNullException("process");
199 d78d765c pkanavos
            if(loop==null)
200 d78d765c pkanavos
                throw new ArgumentNullException("loop");
201 d78d765c pkanavos
            Contract.EndContractBlock();
202 cfed7823 Panagiotis Kanavos
203 a27aa447 Panagiotis Kanavos
            return process.ContinueWith(t =>
204 a27aa447 Panagiotis Kanavos
            {   
205 a27aa447 Panagiotis Kanavos
             
206 a27aa447 Panagiotis Kanavos
                Task.Factory.StartNew(loop, CancellationToken);
207 a27aa447 Panagiotis Kanavos
208 a27aa447 Panagiotis Kanavos
                if (t.IsFaulted)
209 a27aa447 Panagiotis Kanavos
                {
210 a27aa447 Panagiotis Kanavos
                    var ex = t.Exception.InnerException;
211 a27aa447 Panagiotis Kanavos
                    if (ex is OperationCanceledException)
212 a27aa447 Panagiotis Kanavos
                        Stop();
213 a27aa447 Panagiotis Kanavos
                    if (onError != null)
214 a27aa447 Panagiotis Kanavos
                        onError(ex);
215 a27aa447 Panagiotis Kanavos
                }
216 a64c87c8 Panagiotis Kanavos
            },CancellationToken);
217 a64c87c8 Panagiotis Kanavos
        }
218 a27aa447 Panagiotis Kanavos
219 a64c87c8 Panagiotis Kanavos
        public Task<T> LoopAsync<T>(Task<T> process, Action loop,Action<Exception> onError=null)
220 a64c87c8 Panagiotis Kanavos
        {
221 a64c87c8 Panagiotis Kanavos
            return process.ContinueWith(t =>
222 a64c87c8 Panagiotis Kanavos
            {   
223 a64c87c8 Panagiotis Kanavos
                //Spawn the Loop immediatelly
224 a64c87c8 Panagiotis Kanavos
                Task.Factory.StartNew(loop,CancellationToken);
225 a64c87c8 Panagiotis Kanavos
                //Then process possible exceptions
226 a64c87c8 Panagiotis Kanavos
                if (t.IsFaulted)
227 a64c87c8 Panagiotis Kanavos
                {
228 a64c87c8 Panagiotis Kanavos
                    var ex = t.Exception.InnerException;
229 a64c87c8 Panagiotis Kanavos
                    if (ex is OperationCanceledException)
230 a64c87c8 Panagiotis Kanavos
                        Stop();
231 a64c87c8 Panagiotis Kanavos
                    if (onError != null)
232 a64c87c8 Panagiotis Kanavos
                        onError(ex);
233 a64c87c8 Panagiotis Kanavos
                }
234 a64c87c8 Panagiotis Kanavos
                return default(T);
235 2341c603 pkanavos
            },CancellationToken);
236 a27aa447 Panagiotis Kanavos
        }
237 9c4346c9 Panagiotis Kanavos
    }
238 9c4346c9 Panagiotis Kanavos
}