Statistics
| Branch: | Revision:

root / trunk / Pithos.Core / Agents / Agent.cs @ d78d765c

History | View | Annotate | Download (7.7 kB)

1
#region
2
/* -----------------------------------------------------------------------
3
 * <copyright file="Agent.cs" company="GRNet">
4
 * 
5
 * Copyright 2011-2012 GRNET S.A. All rights reserved.
6
 *
7
 * Redistribution and use in source and binary forms, with or
8
 * without modification, are permitted provided that the following
9
 * conditions are met:
10
 *
11
 *   1. Redistributions of source code must retain the above
12
 *      copyright notice, this list of conditions and the following
13
 *      disclaimer.
14
 *
15
 *   2. Redistributions in binary form must reproduce the above
16
 *      copyright notice, this list of conditions and the following
17
 *      disclaimer in the documentation and/or other materials
18
 *      provided with the distribution.
19
 *
20
 *
21
 * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
22
 * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
23
 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
24
 * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
25
 * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26
 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27
 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
28
 * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
29
 * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
30
 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
31
 * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
32
 * POSSIBILITY OF SUCH DAMAGE.
33
 *
34
 * The views and conclusions contained in the software and
35
 * documentation are those of the authors and should not be
36
 * interpreted as representing official policies, either expressed
37
 * or implied, of GRNET S.A.
38
 * </copyright>
39
 * -----------------------------------------------------------------------
40
 */
41
#endregion
42
using System;
43
using System.Collections.Concurrent;
44
using System.Collections.Generic;
45
using System.Diagnostics.Contracts;
46
using System.Threading;
47
using System.Threading.Async;
48
using System.Threading.Tasks;
49
using Pithos.Core.Agents;
50

    
51
namespace Pithos.Core
52
{
53
    public class Agent<TMessage> : IDisposable
54
    {
55
        private readonly ConcurrentQueue<TMessage> _queue;
56
        //private readonly AsyncCollection<TMessage> _messages;
57
        private readonly AsyncCollection<TMessage> _messages;
58
        private readonly CancellationTokenSource _cancelSource = new CancellationTokenSource();
59
        public CancellationToken CancellationToken;
60

    
61
        private readonly Action<Agent<TMessage>> _process;
62

    
63

    
64
        public Agent(Action<Agent<TMessage>> action)
65
        {
66
            _queue=new ConcurrentQueue<TMessage>();
67
            _messages = new AsyncCollection<TMessage>(_queue);            
68
            _process = action;
69
            CancellationToken = _cancelSource.Token;
70
        }
71

    
72
        public bool IsEmpty
73
        {
74
            get { return _queue.IsEmpty; }
75
        }
76

    
77
        public void Post(TMessage message)
78
        {
79
            _messages.Add(message);
80
        }
81

    
82
        ConcurrentDictionary<TMessage,TaskCompletionSource<object>> _awaiters=new ConcurrentDictionary<TMessage,TaskCompletionSource<object>>();
83

    
84
        public Task PostAndAwait(TMessage message)
85
        {            
86
            var tcs = new TaskCompletionSource<object>();
87
            _awaiters[message] = tcs;
88
            Post(message);
89
            return tcs.Task;
90
        }
91

    
92
        /// <summary>
93
        /// Receives a message asynchronously, optionally with a timeout. Receive throws a TimeoutException if the timeout expires
94
        /// </summary>
95
        /// <returns>A Task that will return the message asynchronously</returns>
96
        public  Task<TMessage> Receive()
97
        {
98
            return _messages.Take();
99
        }
100

    
101
        public void NotifyComplete(TMessage message)
102
        {
103
            TaskCompletionSource<object> tcs;
104
            if (_awaiters.TryRemove(message,out tcs))
105
                tcs.SetResult(null);
106
        }
107

    
108

    
109

    
110
        /// <summary>
111
        /// Start the agent
112
        /// </summary>
113
        public void Start()
114
        {
115
            Task.Factory.StartNew(() => _process(this), CancellationToken);            
116
        }
117

    
118

    
119
        /// <summary>
120
        /// Create and start a new agent for the specified type of message
121
        /// </summary>
122
        /// <param name="action">The message processing action</param>
123
        /// <returns>A started Agent</returns>
124
        public static Agent<TMessage> Start(Action<Agent<TMessage>> action)
125
        {
126
            var agent = new Agent<TMessage>(action);
127
            agent.Start();
128
            return agent;
129
        }
130

    
131
        /// <summary>
132
        /// Stops the agent 
133
        /// </summary>
134
        public void Stop()
135
        {
136
            //Stop the message queue
137
            //_messages.CompleteAdding();
138
            //And signal the cancellation
139
            _cancelSource.Cancel();
140
        }
141

    
142
        /// <summary>
143
        /// Execute an action asynchronously, using the agent's cancellation source
144
        /// </summary>
145
        /// <param name="action">The action to execute</param>
146
        public void DoAsync(Action action)
147
        {
148
            if(action==null)
149
                throw new ArgumentNullException("action");
150
            Contract.EndContractBlock();
151

    
152
            Task.Factory.StartNew(action, CancellationToken);
153
        }
154

    
155

    
156
        ~Agent()
157
        {
158
            Dispose(false);
159
        }
160

    
161
        public void Dispose()
162
        {
163
            Dispose(true);
164
            GC.SuppressFinalize(this);
165
        }
166

    
167
        protected void Dispose(bool disposing)
168
        {
169
            if (disposing)
170
            {
171
                Stop();
172
                _cancelSource.Dispose();
173
            }
174
        }
175

    
176
        public IEnumerable<TMessage> GetEnumerable()
177
        {
178
            return _queue;
179
        }
180

    
181
        /// <summary>
182
        /// Remove the first message that matches the predicate
183
        /// </summary>
184
        /// <param name="predicate">The condition to match</param>
185
        /// <remarks>Removes the first message that matches the predicate by dequeing all 
186
        /// messages and re-enqueing all except the first matching message</remarks>
187
        public void Remove(Func<TMessage,bool> predicate)
188
        {
189
            //Can this work? Dequeue all items 
190
            //and then enqueue everything except the filtered items
191

    
192
            _queue.RemoveFirst(predicate);
193
        }
194

    
195
        public Task LoopAsync(Task process, Action loop,Action<Exception> onError=null)
196
        {
197
            if(process==null)
198
                throw new ArgumentNullException("process");
199
            if(loop==null)
200
                throw new ArgumentNullException("loop");
201
            Contract.EndContractBlock();
202

    
203
            return process.ContinueWith(t =>
204
            {   
205
             
206
                Task.Factory.StartNew(loop, CancellationToken);
207

    
208
                if (t.IsFaulted)
209
                {
210
                    var ex = t.Exception.InnerException;
211
                    if (ex is OperationCanceledException)
212
                        Stop();
213
                    if (onError != null)
214
                        onError(ex);
215
                }
216
            },CancellationToken);
217
        }
218

    
219
        public Task<T> LoopAsync<T>(Task<T> process, Action loop,Action<Exception> onError=null)
220
        {
221
            return process.ContinueWith(t =>
222
            {   
223
                //Spawn the Loop immediatelly
224
                Task.Factory.StartNew(loop,CancellationToken);
225
                //Then process possible exceptions
226
                if (t.IsFaulted)
227
                {
228
                    var ex = t.Exception.InnerException;
229
                    if (ex is OperationCanceledException)
230
                        Stop();
231
                    if (onError != null)
232
                        onError(ex);
233
                }
234
                return default(T);
235
            });
236
        }
237
    }
238
}