Statistics
| Branch: | Revision:

root / trunk / Pithos.Core / Agents / Agent.cs @ ec1a1baf

History | View | Annotate | Download (6.9 kB)

1
#region
2
/* -----------------------------------------------------------------------
3
 * <copyright file="Agent.cs" company="GRNet">
4
 * 
5
 * Copyright 2011-2012 GRNET S.A. All rights reserved.
6
 *
7
 * Redistribution and use in source and binary forms, with or
8
 * without modification, are permitted provided that the following
9
 * conditions are met:
10
 *
11
 *   1. Redistributions of source code must retain the above
12
 *      copyright notice, this list of conditions and the following
13
 *      disclaimer.
14
 *
15
 *   2. Redistributions in binary form must reproduce the above
16
 *      copyright notice, this list of conditions and the following
17
 *      disclaimer in the documentation and/or other materials
18
 *      provided with the distribution.
19
 *
20
 *
21
 * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
22
 * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
23
 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
24
 * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
25
 * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26
 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27
 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
28
 * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
29
 * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
30
 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
31
 * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
32
 * POSSIBILITY OF SUCH DAMAGE.
33
 *
34
 * The views and conclusions contained in the software and
35
 * documentation are those of the authors and should not be
36
 * interpreted as representing official policies, either expressed
37
 * or implied, of GRNET S.A.
38
 * </copyright>
39
 * -----------------------------------------------------------------------
40
 */
41
#endregion
42
using System;
43
using System.Collections.Concurrent;
44
using System.Collections.Generic;
45
using System.Diagnostics.Contracts;
46
using System.Threading;
47
using System.Threading.Async;
48
using System.Threading.Tasks;
49
using Pithos.Core.Agents;
50

    
51
namespace Pithos.Core
52
{
53
    public class Agent<TMessage> : IDisposable
54
    {
55
        private readonly ConcurrentQueue<TMessage> _queue;
56
        private readonly AsyncProducerConsumerCollection<TMessage> _messages;
57
        private readonly CancellationTokenSource _cancelSource = new CancellationTokenSource();
58
        public CancellationToken CancellationToken;
59

    
60
        private readonly Action<Agent<TMessage>> _process;
61

    
62

    
63
        public Agent(Action<Agent<TMessage>> action)
64
        {
65
            _queue=new ConcurrentQueue<TMessage>();
66
            _messages = new AsyncProducerConsumerCollection<TMessage>(_queue);            
67
            _process = action;
68
            CancellationToken = _cancelSource.Token;
69
        }
70

    
71
        public bool IsEmpty
72
        {
73
            get { return _queue.IsEmpty; }
74
        }
75

    
76
        public void Post(TMessage message)
77
        {
78
            _messages.Add(message);
79
        }
80

    
81
        /// <summary>
82
        /// Receives a message asynchronously, optionally with a timeout. Receive throws a TimeoutException if the timeout expires
83
        /// </summary>
84
        /// <returns>A Task that will return the message asynchronously</returns>
85
        public  Task<TMessage> Receive()
86
        {
87
            return _messages.Take();
88
        }
89

    
90

    
91

    
92
        /// <summary>
93
        /// Start the agent
94
        /// </summary>
95
        public void Start()
96
        {
97
            Task.Factory.StartNew(() => _process(this), CancellationToken);            
98
        }
99

    
100

    
101
        /// <summary>
102
        /// Create and start a new agent for the specified type of message
103
        /// </summary>
104
        /// <param name="action">The message processing action</param>
105
        /// <returns>A started Agent</returns>
106
        public static Agent<TMessage> Start(Action<Agent<TMessage>> action)
107
        {
108
            var agent = new Agent<TMessage>(action);
109
            agent.Start();
110
            return agent;
111
        }
112

    
113
        /// <summary>
114
        /// Stops the agent 
115
        /// </summary>
116
        public void Stop()
117
        {
118
            //Stop the message queue
119
            //_messages.CompleteAdding();
120
            //And signal the cancellation
121
            _cancelSource.Cancel();
122
        }
123

    
124
        /// <summary>
125
        /// Execute an action asynchronously, using the agent's cancellation source
126
        /// </summary>
127
        /// <param name="action">The action to execute</param>
128
        public void DoAsync(Action action)
129
        {
130
            Contract.Requires(action!=null);
131
            Task.Factory.StartNew(action, CancellationToken);
132
        }
133

    
134

    
135
        ~Agent()
136
        {
137
            Dispose(false);
138
        }
139

    
140
        public void Dispose()
141
        {
142
            Dispose(true);
143
            GC.SuppressFinalize(this);
144
        }
145

    
146
        protected void Dispose(bool disposing)
147
        {
148
            if (disposing)
149
            {
150
                Stop();
151
                _messages.Dispose();
152
                _cancelSource.Dispose();
153
            }
154
        }
155

    
156
        public IEnumerable<TMessage> GetEnumerable()
157
        {
158
            return _queue;
159
        }
160

    
161
        /// <summary>
162
        /// Remove the first message that matches the predicate
163
        /// </summary>
164
        /// <param name="predicate">The condition to match</param>
165
        /// <remarks>Removes the first message that matches the predicate by dequeing all 
166
        /// messages and re-enqueing all except the first matching message</remarks>
167
        public void Remove(Func<TMessage,bool> predicate)
168
        {
169
            //Can this work? Dequeue all items 
170
            //and then enqueue everything except the filtered items
171

    
172
            _queue.RemoveFirst(predicate);
173
        }
174

    
175
        public Task LoopAsync(Task process, Action loop,Action<Exception> onError=null)
176
        {
177
            Contract.Requires(process!=null);
178
            Contract.Requires(loop!=null);
179

    
180
            return process.ContinueWith(t =>
181
            {   
182
             
183
                Task.Factory.StartNew(loop, CancellationToken);
184

    
185
                if (t.IsFaulted)
186
                {
187
                    var ex = t.Exception.InnerException;
188
                    if (ex is OperationCanceledException)
189
                        Stop();
190
                    if (onError != null)
191
                        onError(ex);
192
                }
193
            },CancellationToken);
194
        }
195

    
196
        public Task<T> LoopAsync<T>(Task<T> process, Action loop,Action<Exception> onError=null)
197
        {
198
            return process.ContinueWith(t =>
199
            {   
200
                //Spawn the Loop immediatelly
201
                Task.Factory.StartNew(loop,CancellationToken);
202
                //Then process possible exceptions
203
                if (t.IsFaulted)
204
                {
205
                    var ex = t.Exception.InnerException;
206
                    if (ex is OperationCanceledException)
207
                        Stop();
208
                    if (onError != null)
209
                        onError(ex);
210
                }
211
                return default(T);
212
            },CancellationToken);
213
        }
214
    }
215
}