Statistics
| Branch: | Revision:

root / trunk / Pithos.Core / Agents / Agent.cs @ 255f5f86

History | View | Annotate | Download (7.9 kB)

1
#region
2
/* -----------------------------------------------------------------------
3
 * <copyright file="Agent.cs" company="GRNet">
4
 * 
5
 * Copyright 2011-2012 GRNET S.A. All rights reserved.
6
 *
7
 * Redistribution and use in source and binary forms, with or
8
 * without modification, are permitted provided that the following
9
 * conditions are met:
10
 *
11
 *   1. Redistributions of source code must retain the above
12
 *      copyright notice, this list of conditions and the following
13
 *      disclaimer.
14
 *
15
 *   2. Redistributions in binary form must reproduce the above
16
 *      copyright notice, this list of conditions and the following
17
 *      disclaimer in the documentation and/or other materials
18
 *      provided with the distribution.
19
 *
20
 *
21
 * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
22
 * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
23
 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
24
 * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
25
 * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26
 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27
 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
28
 * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
29
 * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
30
 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
31
 * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
32
 * POSSIBILITY OF SUCH DAMAGE.
33
 *
34
 * The views and conclusions contained in the software and
35
 * documentation are those of the authors and should not be
36
 * interpreted as representing official policies, either expressed
37
 * or implied, of GRNET S.A.
38
 * </copyright>
39
 * -----------------------------------------------------------------------
40
 */
41
#endregion
42
using System;
43
using System.Collections.Concurrent;
44
using System.Collections.Generic;
45
using System.Diagnostics.Contracts;
46
using System.Linq;
47
using System.Text;
48
using System.Threading;
49
using System.Threading.Tasks;
50
using Pithos.Core.Agents;
51

    
52
namespace Pithos.Core
53
{
54
    public class Agent<TMessage> : IDisposable
55
    {
56
        private readonly ConcurrentQueue<TMessage> _queue;
57
        private readonly BlockingCollection<TMessage> _messages;
58
        private readonly CancellationTokenSource _cancelSource = new CancellationTokenSource();
59
        public CancellationToken CancellationToken;
60

    
61
        private readonly Action<Agent<TMessage>> _process;
62

    
63

    
64
        public Agent(Action<Agent<TMessage>> action)
65
        {
66
            _queue=new ConcurrentQueue<TMessage>();
67
            _messages = new BlockingCollection<TMessage>(_queue);
68
            _process = action;
69
            CancellationToken = _cancelSource.Token;
70
        }
71

    
72
        public void Post(TMessage message)
73
        {
74
            _messages.Add(message);
75
        }
76

    
77
        /// <summary>
78
        /// Receives a message asynchronously, optionally with a timeout. Receive throws a TimeoutException if the timeout expires
79
        /// </summary>
80
        /// <param name="timeout">Optional timeout in milliseconds. If provided, Receive fails with a TimeoutException if no message is available in the specified time</param>
81
        /// <returns>A Task that will return the message asynchronously</returns>
82
        public Task<TMessage> Receive(int timeout = -1)
83
        {
84
            return Task<TMessage>.Factory.StartNew(() =>
85
            {
86
                TMessage item;
87
                if (!_messages.TryTake(out item, timeout, CancellationToken))
88
                    throw new TimeoutException();
89
                return item;
90
            });
91
        }
92

    
93

    
94
        /// <summary>
95
        /// Receives a message asynchronously, optionally with a timeout. TryReceive returns an empty task if the timeout expires
96
        /// </summary>
97
        /// <param name="timeout">Optional timeout in milliseconds. If provided, Receive returns an empty task</param>
98
        /// <returns>A Task that will return the message asynchronously</returns>
99
        public Task<TMessage> TryReceive(int timeout = -1)
100
        {
101
            return Task<TMessage>.Factory.StartNew(() =>
102
            {
103
                TMessage item;
104
                _messages.TryTake(out item, timeout, CancellationToken);
105
                return item;
106
            });
107
        }
108

    
109

    
110

    
111
        /// <summary>
112
        /// Start the agent
113
        /// </summary>
114
        public void Start()
115
        {
116
            Task.Factory.StartNew(() => _process(this), CancellationToken);            
117
        }
118

    
119

    
120
        /// <summary>
121
        /// Create and start a new agent for the specified type of message
122
        /// </summary>
123
        /// <param name="action">The message processing action</param>
124
        /// <returns>A started Agent</returns>
125
        public static Agent<TMessage> Start(Action<Agent<TMessage>> action)
126
        {
127
            var agent = new Agent<TMessage>(action);
128
            agent.Start();
129
            return agent;
130
        }
131

    
132
        /// <summary>
133
        /// Stops the agent 
134
        /// </summary>
135
        public void Stop()
136
        {
137
            //Stop the message queue
138
            _messages.CompleteAdding();
139
            //And signal the cancellation
140
            _cancelSource.Cancel();
141
        }
142

    
143
        /// <summary>
144
        /// Execute an action asynchronously, using the agent's cancellation source
145
        /// </summary>
146
        /// <param name="action">The action to execute</param>
147
        public void DoAsync(Action action)
148
        {
149
            Contract.Requires(action!=null);
150
            Task.Factory.StartNew(action, CancellationToken);
151
        }
152

    
153

    
154
        ~Agent()
155
        {
156
            Dispose(false);
157
        }
158

    
159
        public void Dispose()
160
        {
161
            Dispose(true);
162
            GC.SuppressFinalize(this);
163
        }
164

    
165
        protected void Dispose(bool disposing)
166
        {
167
            if (disposing)
168
            {
169
                Stop();
170
                _messages.Dispose();
171
                _cancelSource.Dispose();
172
            }
173
        }
174

    
175
        public IEnumerable<TMessage> GetEnumerable()
176
        {
177
            return _messages;
178
        }
179

    
180
        /// <summary>
181
        /// Remove the first message that matches the predicate
182
        /// </summary>
183
        /// <param name="predicate">The condition to match</param>
184
        /// <remarks>Removes the first message that matches the predicate by dequeing all 
185
        /// messages and re-enqueing all except the first matching message</remarks>
186
        public void Remove(Func<TMessage,bool> predicate)
187
        {
188
            //Can this work? Dequeue all items 
189
            //and then enqueue everything except the filtered items
190

    
191
            _queue.RemoveFirst(predicate);
192
        }
193

    
194
        public Task LoopAsync(Task process, Action loop,Action<Exception> onError=null)
195
        {
196
            Contract.Requires(process!=null);
197
            Contract.Requires(loop!=null);
198

    
199
            return process.ContinueWith(t =>
200
            {   
201
             
202
                Task.Factory.StartNew(loop, CancellationToken);
203

    
204
                if (t.IsFaulted)
205
                {
206
                    var ex = t.Exception.InnerException;
207
                    if (ex is OperationCanceledException)
208
                        Stop();
209
                    if (onError != null)
210
                        onError(ex);
211
                }
212
            },CancellationToken);
213
        }
214

    
215
        public Task<T> LoopAsync<T>(Task<T> process, Action loop,Action<Exception> onError=null)
216
        {
217
            return process.ContinueWith(t =>
218
            {   
219
                //Spawn the Loop immediatelly
220
                Task.Factory.StartNew(loop,CancellationToken);
221
                //Then process possible exceptions
222
                if (t.IsFaulted)
223
                {
224
                    var ex = t.Exception.InnerException;
225
                    if (ex is OperationCanceledException)
226
                        Stop();
227
                    if (onError != null)
228
                        onError(ex);
229
                }
230
                return default(T);
231
            },CancellationToken);
232
        }
233
    }
234
}