Statistics
| Branch: | Revision:

root / trunk / Pithos.Core / Agents / Agent.cs @ aa7ac00e

History | View | Annotate | Download (8 kB)

1
#region
2
/* -----------------------------------------------------------------------
3
 * <copyright file="Agent.cs" company="GRNet">
4
 * 
5
 * Copyright 2011-2012 GRNET S.A. All rights reserved.
6
 *
7
 * Redistribution and use in source and binary forms, with or
8
 * without modification, are permitted provided that the following
9
 * conditions are met:
10
 *
11
 *   1. Redistributions of source code must retain the above
12
 *      copyright notice, this list of conditions and the following
13
 *      disclaimer.
14
 *
15
 *   2. Redistributions in binary form must reproduce the above
16
 *      copyright notice, this list of conditions and the following
17
 *      disclaimer in the documentation and/or other materials
18
 *      provided with the distribution.
19
 *
20
 *
21
 * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
22
 * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
23
 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
24
 * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
25
 * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26
 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27
 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
28
 * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
29
 * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
30
 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
31
 * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
32
 * POSSIBILITY OF SUCH DAMAGE.
33
 *
34
 * The views and conclusions contained in the software and
35
 * documentation are those of the authors and should not be
36
 * interpreted as representing official policies, either expressed
37
 * or implied, of GRNET S.A.
38
 * </copyright>
39
 * -----------------------------------------------------------------------
40
 */
41
#endregion
42
using System;
43
using System.Collections.Concurrent;
44
using System.Collections.Generic;
45
using System.Diagnostics.Contracts;
46
using System.Linq;
47
using System.Text;
48
using System.Threading;
49
using System.Threading.Tasks;
50
using Pithos.Core.Agents;
51

    
52
namespace Pithos.Core
53
{
54
    public class Agent<TMessage> : IDisposable
55
    {
56
        private readonly ConcurrentQueue<TMessage> _queue;
57
        private readonly BlockingCollection<TMessage> _messages;
58
        private readonly CancellationTokenSource _cancelSource = new CancellationTokenSource();
59
        public CancellationToken CancellationToken;
60

    
61
        private readonly Action<Agent<TMessage>> _process;
62

    
63

    
64
        public Agent(Action<Agent<TMessage>> action)
65
        {
66
            _queue=new ConcurrentQueue<TMessage>();
67
            _messages = new BlockingCollection<TMessage>(_queue);
68
            _process = action;
69
            CancellationToken = _cancelSource.Token;
70
        }
71

    
72
        public bool IsEmpty
73
        {
74
            get { return _queue.IsEmpty; }
75
        }
76

    
77
        public void Post(TMessage message)
78
        {
79
            _messages.Add(message);
80
        }
81

    
82
        /// <summary>
83
        /// Receives a message asynchronously, optionally with a timeout. Receive throws a TimeoutException if the timeout expires
84
        /// </summary>
85
        /// <param name="timeout">Optional timeout in milliseconds. If provided, Receive fails with a TimeoutException if no message is available in the specified time</param>
86
        /// <returns>A Task that will return the message asynchronously</returns>
87
        public Task<TMessage> Receive(int timeout = -1)
88
        {
89
            return Task<TMessage>.Factory.StartNew(() =>
90
            {
91
                TMessage item;
92
                if (!_messages.TryTake(out item, timeout, CancellationToken))
93
                    throw new TimeoutException();
94
                return item;
95
            });
96
        }
97

    
98

    
99
        /// <summary>
100
        /// Receives a message asynchronously, optionally with a timeout. TryReceive returns an empty task if the timeout expires
101
        /// </summary>
102
        /// <param name="timeout">Optional timeout in milliseconds. If provided, Receive returns an empty task</param>
103
        /// <returns>A Task that will return the message asynchronously</returns>
104
        public Task<TMessage> TryReceive(int timeout = -1)
105
        {
106
            return Task<TMessage>.Factory.StartNew(() =>
107
            {
108
                TMessage item;
109
                _messages.TryTake(out item, timeout, CancellationToken);
110
                return item;
111
            });
112
        }
113

    
114

    
115

    
116
        /// <summary>
117
        /// Start the agent
118
        /// </summary>
119
        public void Start()
120
        {
121
            Task.Factory.StartNew(() => _process(this), CancellationToken);            
122
        }
123

    
124

    
125
        /// <summary>
126
        /// Create and start a new agent for the specified type of message
127
        /// </summary>
128
        /// <param name="action">The message processing action</param>
129
        /// <returns>A started Agent</returns>
130
        public static Agent<TMessage> Start(Action<Agent<TMessage>> action)
131
        {
132
            var agent = new Agent<TMessage>(action);
133
            agent.Start();
134
            return agent;
135
        }
136

    
137
        /// <summary>
138
        /// Stops the agent 
139
        /// </summary>
140
        public void Stop()
141
        {
142
            //Stop the message queue
143
            _messages.CompleteAdding();
144
            //And signal the cancellation
145
            _cancelSource.Cancel();
146
        }
147

    
148
        /// <summary>
149
        /// Execute an action asynchronously, using the agent's cancellation source
150
        /// </summary>
151
        /// <param name="action">The action to execute</param>
152
        public void DoAsync(Action action)
153
        {
154
            Contract.Requires(action!=null);
155
            Task.Factory.StartNew(action, CancellationToken);
156
        }
157

    
158

    
159
        ~Agent()
160
        {
161
            Dispose(false);
162
        }
163

    
164
        public void Dispose()
165
        {
166
            Dispose(true);
167
            GC.SuppressFinalize(this);
168
        }
169

    
170
        protected void Dispose(bool disposing)
171
        {
172
            if (disposing)
173
            {
174
                Stop();
175
                _messages.Dispose();
176
                _cancelSource.Dispose();
177
            }
178
        }
179

    
180
        public IEnumerable<TMessage> GetEnumerable()
181
        {
182
            return _messages;
183
        }
184

    
185
        /// <summary>
186
        /// Remove the first message that matches the predicate
187
        /// </summary>
188
        /// <param name="predicate">The condition to match</param>
189
        /// <remarks>Removes the first message that matches the predicate by dequeing all 
190
        /// messages and re-enqueing all except the first matching message</remarks>
191
        public void Remove(Func<TMessage,bool> predicate)
192
        {
193
            //Can this work? Dequeue all items 
194
            //and then enqueue everything except the filtered items
195

    
196
            _queue.RemoveFirst(predicate);
197
        }
198

    
199
        public Task LoopAsync(Task process, Action loop,Action<Exception> onError=null)
200
        {
201
            Contract.Requires(process!=null);
202
            Contract.Requires(loop!=null);
203

    
204
            return process.ContinueWith(t =>
205
            {   
206
             
207
                Task.Factory.StartNew(loop, CancellationToken);
208

    
209
                if (t.IsFaulted)
210
                {
211
                    var ex = t.Exception.InnerException;
212
                    if (ex is OperationCanceledException)
213
                        Stop();
214
                    if (onError != null)
215
                        onError(ex);
216
                }
217
            },CancellationToken);
218
        }
219

    
220
        public Task<T> LoopAsync<T>(Task<T> process, Action loop,Action<Exception> onError=null)
221
        {
222
            return process.ContinueWith(t =>
223
            {   
224
                //Spawn the Loop immediatelly
225
                Task.Factory.StartNew(loop,CancellationToken);
226
                //Then process possible exceptions
227
                if (t.IsFaulted)
228
                {
229
                    var ex = t.Exception.InnerException;
230
                    if (ex is OperationCanceledException)
231
                        Stop();
232
                    if (onError != null)
233
                        onError(ex);
234
                }
235
                return default(T);
236
            },CancellationToken);
237
        }
238
    }
239
}