Statistics
| Branch: | Revision:

root / trunk / Pithos.Core / Agents / Agent.cs @ b9f5b594

History | View | Annotate | Download (6.1 kB)

1
using System;
2
using System.Collections.Concurrent;
3
using System.Collections.Generic;
4
using System.Diagnostics.Contracts;
5
using System.Linq;
6
using System.Text;
7
using System.Threading;
8
using System.Threading.Tasks;
9
using Pithos.Core.Agents;
10

    
11
namespace Pithos.Core
12
{
13
    public class Agent<TMessage> : IDisposable
14
    {
15
        private readonly ConcurrentQueue<TMessage> _queue;
16
        private readonly BlockingCollection<TMessage> _messages;
17
        private readonly CancellationTokenSource _cancelSource = new CancellationTokenSource();
18
        public CancellationToken CancellationToken;
19

    
20
        private readonly Action<Agent<TMessage>> _process;
21

    
22

    
23
        public Agent(Action<Agent<TMessage>> action)
24
        {
25
            _queue=new ConcurrentQueue<TMessage>();
26
            _messages = new BlockingCollection<TMessage>(_queue);
27
            _process = action;
28
            CancellationToken = _cancelSource.Token;
29
        }
30

    
31
        public void Post(TMessage message)
32
        {
33
            _messages.Add(message);
34
        }
35

    
36
        /// <summary>
37
        /// Receives a message asynchronously, optionally with a timeout. Receive throws a TimeoutException if the timeout expires
38
        /// </summary>
39
        /// <param name="timeout">Optional timeout in milliseconds. If provided, Receive fails with a TimeoutException if no message is available in the specified time</param>
40
        /// <returns>A Task that will return the message asynchronously</returns>
41
        public Task<TMessage> Receive(int timeout = -1)
42
        {
43
            return Task<TMessage>.Factory.StartNew(() =>
44
            {
45
                TMessage item;
46
                if (!_messages.TryTake(out item, timeout, CancellationToken))
47
                    throw new TimeoutException();
48
                return item;
49
            });
50
        }
51

    
52

    
53
        /// <summary>
54
        /// Receives a message asynchronously, optionally with a timeout. TryReceive returns an empty task if the timeout expires
55
        /// </summary>
56
        /// <param name="timeout">Optional timeout in milliseconds. If provided, Receive returns an empty task</param>
57
        /// <returns>A Task that will return the message asynchronously</returns>
58
        public Task<TMessage> TryReceive(int timeout = -1)
59
        {
60
            return Task<TMessage>.Factory.StartNew(() =>
61
            {
62
                TMessage item;
63
                _messages.TryTake(out item, timeout, CancellationToken);
64
                return item;
65
            });
66
        }
67

    
68

    
69

    
70
        /// <summary>
71
        /// Start the agent
72
        /// </summary>
73
        public void Start()
74
        {
75
            Task.Factory.StartNew(() => _process(this), CancellationToken);            
76
        }
77

    
78

    
79
        /// <summary>
80
        /// Create and start a new agent for the specified type of message
81
        /// </summary>
82
        /// <param name="action">The message processing action</param>
83
        /// <returns>A started Agent</returns>
84
        public static Agent<TMessage> Start(Action<Agent<TMessage>> action)
85
        {
86
            var agent = new Agent<TMessage>(action);
87
            agent.Start();
88
            return agent;
89
        }
90

    
91
        /// <summary>
92
        /// Stops the agent 
93
        /// </summary>
94
        public void Stop()
95
        {
96
            //Stop the message queue
97
            _messages.CompleteAdding();
98
            //And signal the cancellation
99
            _cancelSource.Cancel();
100
        }
101

    
102
        /// <summary>
103
        /// Execute an action asynchronously, using the agent's cancellation source
104
        /// </summary>
105
        /// <param name="action">The action to execute</param>
106
        public void DoAsync(Action action)
107
        {
108
            Contract.Requires(action!=null);
109
            Task.Factory.StartNew(action, CancellationToken);
110
        }
111

    
112

    
113
        ~Agent()
114
        {
115
            Dispose(false);
116
        }
117

    
118
        public void Dispose()
119
        {
120
            Dispose(true);
121
            GC.SuppressFinalize(this);
122
        }
123

    
124
        protected void Dispose(bool disposing)
125
        {
126
            if (disposing)
127
            {
128
                Stop();
129
                _messages.Dispose();
130
                _cancelSource.Dispose();
131
            }
132
        }
133

    
134
        public IEnumerable<TMessage> GetEnumerable()
135
        {
136
            return _messages;
137
        }
138

    
139
        /// <summary>
140
        /// Remove the first message that matches the predicate
141
        /// </summary>
142
        /// <param name="predicate">The condition to match</param>
143
        /// <remarks>Removes the first message that matches the predicate by dequeing all 
144
        /// messages and re-enqueing all except the first matching message</remarks>
145
        public void Remove(Func<TMessage,bool> predicate)
146
        {
147
            //Can this work? Dequeue all items 
148
            //and then enqueue everything except the filtered items
149

    
150
            _queue.RemoveFirst(predicate);
151
        }
152

    
153
        public Task LoopAsync(Task process, Action loop,Action<Exception> onError=null)
154
        {
155
            Contract.Requires(process!=null);
156
            Contract.Requires(loop!=null);
157

    
158
            return process.ContinueWith(t =>
159
            {   
160
             
161
                Task.Factory.StartNew(loop, CancellationToken);
162

    
163
                if (t.IsFaulted)
164
                {
165
                    var ex = t.Exception.InnerException;
166
                    if (ex is OperationCanceledException)
167
                        Stop();
168
                    if (onError != null)
169
                        onError(ex);
170
                }
171
            },CancellationToken);
172
        }
173

    
174
        public Task<T> LoopAsync<T>(Task<T> process, Action loop,Action<Exception> onError=null)
175
        {
176
            return process.ContinueWith(t =>
177
            {   
178
                //Spawn the Loop immediatelly
179
                Task.Factory.StartNew(loop,CancellationToken);
180
                //Then process possible exceptions
181
                if (t.IsFaulted)
182
                {
183
                    var ex = t.Exception.InnerException;
184
                    if (ex is OperationCanceledException)
185
                        Stop();
186
                    if (onError != null)
187
                        onError(ex);
188
                }
189
                return default(T);
190
            },CancellationToken);
191
        }
192
    }
193
}