Statistics
| Branch: | Revision:

root / trunk / Pithos.Core / Agents / Agent.cs @ aba9e6d9

History | View | Annotate | Download (5.4 kB)

1
using System;
2
using System.Collections.Concurrent;
3
using System.Collections.Generic;
4
using System.Diagnostics.Contracts;
5
using System.Linq;
6
using System.Text;
7
using System.Threading;
8
using System.Threading.Tasks;
9

    
10
namespace Pithos.Core
11
{
12
    public class Agent<TMessage> : IDisposable
13
    {        
14
        private readonly BlockingCollection<TMessage> _messages = new BlockingCollection<TMessage>();
15
        private readonly CancellationTokenSource _cancelSource = new CancellationTokenSource();
16
        public CancellationToken CancellationToken;
17

    
18
        private readonly Action<Agent<TMessage>> _process;
19

    
20

    
21
        public Agent(Action<Agent<TMessage>> action)
22
        {
23
            _process = action;
24
            CancellationToken = _cancelSource.Token;
25
        }
26

    
27
        public void Post(TMessage message)
28
        {
29
            _messages.Add(message);
30
        }
31

    
32
        /// <summary>
33
        /// Receives a message asynchronously, optionally with a timeout. Receive throws a TimeoutException if the timeout expires
34
        /// </summary>
35
        /// <param name="timeout">Optional timeout in milliseconds. If provided, Receive fails with a TimeoutException if no message is available in the specified time</param>
36
        /// <returns>A Task that will return the message asynchronously</returns>
37
        public Task<TMessage> Receive(int timeout = -1)
38
        {
39
            return Task<TMessage>.Factory.StartNew(() =>
40
            {
41
                TMessage item;
42
                if (!_messages.TryTake(out item, timeout, CancellationToken))
43
                    throw new TimeoutException();
44
                return item;
45
            });
46
        }
47

    
48

    
49
        /// <summary>
50
        /// Receives a message asynchronously, optionally with a timeout. TryReceive returns an empty task if the timeout expires
51
        /// </summary>
52
        /// <param name="timeout">Optional timeout in milliseconds. If provided, Receive returns an empty task</param>
53
        /// <returns>A Task that will return the message asynchronously</returns>
54
        public Task<TMessage> TryReceive(int timeout = -1)
55
        {
56
            return Task<TMessage>.Factory.StartNew(() =>
57
            {
58
                TMessage item;
59
                _messages.TryTake(out item, timeout, CancellationToken);
60
                return item;
61
            });
62
        }
63

    
64

    
65

    
66
        /// <summary>
67
        /// Start the agent
68
        /// </summary>
69
        public void Start()
70
        {
71
            Task.Factory.StartNew(() => _process(this), CancellationToken);            
72
        }
73

    
74

    
75
        /// <summary>
76
        /// Create and start a new agent for the specified type of message
77
        /// </summary>
78
        /// <param name="action">The message processing action</param>
79
        /// <returns>A started Agent</returns>
80
        public static Agent<TMessage> Start(Action<Agent<TMessage>> action)
81
        {
82
            var agent = new Agent<TMessage>(action);
83
            agent.Start();
84
            return agent;
85
        }
86

    
87
        /// <summary>
88
        /// Stops the agent 
89
        /// </summary>
90
        public void Stop()
91
        {
92
            //Stop the message queue
93
            _messages.CompleteAdding();
94
            //And signal the cancellation
95
            _cancelSource.Cancel();
96
        }
97

    
98
        /// <summary>
99
        /// Execute an action asynchronously, using the agent's cancellation source
100
        /// </summary>
101
        /// <param name="action">The action to execute</param>
102
        public void DoAsync(Action action)
103
        {
104
            Contract.Requires(action!=null);
105
            Task.Factory.StartNew(action, CancellationToken);
106
        }
107

    
108

    
109
        ~Agent()
110
        {
111
            Dispose(false);
112
        }
113

    
114
        public void Dispose()
115
        {
116
            Dispose(true);
117
            GC.SuppressFinalize(this);
118
        }
119

    
120
        protected void Dispose(bool disposing)
121
        {
122
            if (disposing)
123
            {
124
                Stop();
125
                _messages.Dispose();
126
                _cancelSource.Dispose();
127
            }
128
        }
129

    
130
        public IEnumerable<TMessage> GetEnumerable()
131
        {
132
            return _messages;
133
        }
134

    
135

    
136
        public Task LoopAsync(Task process, Action loop,Action<Exception> onError=null)
137
        {
138
            Contract.Requires(process!=null);
139
            Contract.Requires(loop!=null);
140

    
141
            return process.ContinueWith(t =>
142
            {   
143
             
144
                Task.Factory.StartNew(loop, CancellationToken);
145

    
146
                if (t.IsFaulted)
147
                {
148
                    var ex = t.Exception.InnerException;
149
                    if (ex is OperationCanceledException)
150
                        Stop();
151
                    if (onError != null)
152
                        onError(ex);
153
                }
154
            },CancellationToken);
155
        }
156

    
157
        public Task<T> LoopAsync<T>(Task<T> process, Action loop,Action<Exception> onError=null)
158
        {
159
            return process.ContinueWith(t =>
160
            {   
161
                //Spawn the Loop immediatelly
162
                Task.Factory.StartNew(loop,CancellationToken);
163
                //Then process possible exceptions
164
                if (t.IsFaulted)
165
                {
166
                    var ex = t.Exception.InnerException;
167
                    if (ex is OperationCanceledException)
168
                        Stop();
169
                    if (onError != null)
170
                        onError(ex);
171
                }
172
                return default(T);
173
            },CancellationToken);
174
        }
175
    }
176
}