Statistics
| Branch: | Revision:

root / trunk / Pithos.Core / Agents / Agent.cs @ ab2f6f79

History | View | Annotate | Download (5.4 kB)

1 9c4346c9 Panagiotis Kanavos
using System;
2 9c4346c9 Panagiotis Kanavos
using System.Collections.Concurrent;
3 9c4346c9 Panagiotis Kanavos
using System.Collections.Generic;
4 cfed7823 Panagiotis Kanavos
using System.Diagnostics.Contracts;
5 9c4346c9 Panagiotis Kanavos
using System.Linq;
6 9c4346c9 Panagiotis Kanavos
using System.Text;
7 9c4346c9 Panagiotis Kanavos
using System.Threading;
8 9c4346c9 Panagiotis Kanavos
using System.Threading.Tasks;
9 9c4346c9 Panagiotis Kanavos
10 9c4346c9 Panagiotis Kanavos
namespace Pithos.Core
11 9c4346c9 Panagiotis Kanavos
{
12 9c4346c9 Panagiotis Kanavos
    public class Agent<TMessage> : IDisposable
13 a27aa447 Panagiotis Kanavos
    {        
14 9c4346c9 Panagiotis Kanavos
        private readonly BlockingCollection<TMessage> _messages = new BlockingCollection<TMessage>();
15 9c4346c9 Panagiotis Kanavos
        private readonly CancellationTokenSource _cancelSource = new CancellationTokenSource();
16 9c4346c9 Panagiotis Kanavos
        public CancellationToken CancellationToken;
17 9c4346c9 Panagiotis Kanavos
18 9c4346c9 Panagiotis Kanavos
        private readonly Action<Agent<TMessage>> _process;
19 9c4346c9 Panagiotis Kanavos
20 9c4346c9 Panagiotis Kanavos
21 9c4346c9 Panagiotis Kanavos
        public Agent(Action<Agent<TMessage>> action)
22 9c4346c9 Panagiotis Kanavos
        {
23 9c4346c9 Panagiotis Kanavos
            _process = action;
24 9c4346c9 Panagiotis Kanavos
            CancellationToken = _cancelSource.Token;
25 9c4346c9 Panagiotis Kanavos
        }
26 9c4346c9 Panagiotis Kanavos
27 9c4346c9 Panagiotis Kanavos
        public void Post(TMessage message)
28 9c4346c9 Panagiotis Kanavos
        {
29 9c4346c9 Panagiotis Kanavos
            _messages.Add(message);
30 9c4346c9 Panagiotis Kanavos
        }
31 9c4346c9 Panagiotis Kanavos
32 aba9e6d9 Panagiotis Kanavos
        /// <summary>
33 aba9e6d9 Panagiotis Kanavos
        /// Receives a message asynchronously, optionally with a timeout. Receive throws a TimeoutException if the timeout expires
34 aba9e6d9 Panagiotis Kanavos
        /// </summary>
35 aba9e6d9 Panagiotis Kanavos
        /// <param name="timeout">Optional timeout in milliseconds. If provided, Receive fails with a TimeoutException if no message is available in the specified time</param>
36 aba9e6d9 Panagiotis Kanavos
        /// <returns>A Task that will return the message asynchronously</returns>
37 9c4346c9 Panagiotis Kanavos
        public Task<TMessage> Receive(int timeout = -1)
38 9c4346c9 Panagiotis Kanavos
        {
39 9c4346c9 Panagiotis Kanavos
            return Task<TMessage>.Factory.StartNew(() =>
40 9c4346c9 Panagiotis Kanavos
            {
41 9c4346c9 Panagiotis Kanavos
                TMessage item;
42 9c4346c9 Panagiotis Kanavos
                if (!_messages.TryTake(out item, timeout, CancellationToken))
43 9c4346c9 Panagiotis Kanavos
                    throw new TimeoutException();
44 9c4346c9 Panagiotis Kanavos
                return item;
45 9c4346c9 Panagiotis Kanavos
            });
46 9c4346c9 Panagiotis Kanavos
        }
47 9c4346c9 Panagiotis Kanavos
48 aba9e6d9 Panagiotis Kanavos
49 aba9e6d9 Panagiotis Kanavos
        /// <summary>
50 aba9e6d9 Panagiotis Kanavos
        /// Receives a message asynchronously, optionally with a timeout. TryReceive returns an empty task if the timeout expires
51 aba9e6d9 Panagiotis Kanavos
        /// </summary>
52 aba9e6d9 Panagiotis Kanavos
        /// <param name="timeout">Optional timeout in milliseconds. If provided, Receive returns an empty task</param>
53 aba9e6d9 Panagiotis Kanavos
        /// <returns>A Task that will return the message asynchronously</returns>
54 9c4346c9 Panagiotis Kanavos
        public Task<TMessage> TryReceive(int timeout = -1)
55 9c4346c9 Panagiotis Kanavos
        {
56 9c4346c9 Panagiotis Kanavos
            return Task<TMessage>.Factory.StartNew(() =>
57 9c4346c9 Panagiotis Kanavos
            {
58 9c4346c9 Panagiotis Kanavos
                TMessage item;
59 9c4346c9 Panagiotis Kanavos
                _messages.TryTake(out item, timeout, CancellationToken);
60 9c4346c9 Panagiotis Kanavos
                return item;
61 9c4346c9 Panagiotis Kanavos
            });
62 9c4346c9 Panagiotis Kanavos
        }
63 9c4346c9 Panagiotis Kanavos
64 9c4346c9 Panagiotis Kanavos
65 9c4346c9 Panagiotis Kanavos
66 aba9e6d9 Panagiotis Kanavos
        /// <summary>
67 aba9e6d9 Panagiotis Kanavos
        /// Start the agent
68 aba9e6d9 Panagiotis Kanavos
        /// </summary>
69 9c4346c9 Panagiotis Kanavos
        public void Start()
70 9c4346c9 Panagiotis Kanavos
        {
71 a27aa447 Panagiotis Kanavos
            Task.Factory.StartNew(() => _process(this), CancellationToken);            
72 9c4346c9 Panagiotis Kanavos
        }
73 9c4346c9 Panagiotis Kanavos
74 9c4346c9 Panagiotis Kanavos
75 aba9e6d9 Panagiotis Kanavos
        /// <summary>
76 aba9e6d9 Panagiotis Kanavos
        /// Create and start a new agent for the specified type of message
77 aba9e6d9 Panagiotis Kanavos
        /// </summary>
78 aba9e6d9 Panagiotis Kanavos
        /// <param name="action">The message processing action</param>
79 aba9e6d9 Panagiotis Kanavos
        /// <returns>A started Agent</returns>
80 9c4346c9 Panagiotis Kanavos
        public static Agent<TMessage> Start(Action<Agent<TMessage>> action)
81 9c4346c9 Panagiotis Kanavos
        {
82 9c4346c9 Panagiotis Kanavos
            var agent = new Agent<TMessage>(action);
83 9c4346c9 Panagiotis Kanavos
            agent.Start();
84 9c4346c9 Panagiotis Kanavos
            return agent;
85 9c4346c9 Panagiotis Kanavos
        }
86 9c4346c9 Panagiotis Kanavos
87 aba9e6d9 Panagiotis Kanavos
        /// <summary>
88 aba9e6d9 Panagiotis Kanavos
        /// Stops the agent 
89 aba9e6d9 Panagiotis Kanavos
        /// </summary>
90 9c4346c9 Panagiotis Kanavos
        public void Stop()
91 9c4346c9 Panagiotis Kanavos
        {
92 aba9e6d9 Panagiotis Kanavos
            //Stop the message queue
93 9c4346c9 Panagiotis Kanavos
            _messages.CompleteAdding();
94 aba9e6d9 Panagiotis Kanavos
            //And signal the cancellation
95 9c4346c9 Panagiotis Kanavos
            _cancelSource.Cancel();
96 9c4346c9 Panagiotis Kanavos
        }
97 9c4346c9 Panagiotis Kanavos
98 aba9e6d9 Panagiotis Kanavos
        /// <summary>
99 aba9e6d9 Panagiotis Kanavos
        /// Execute an action asynchronously, using the agent's cancellation source
100 aba9e6d9 Panagiotis Kanavos
        /// </summary>
101 aba9e6d9 Panagiotis Kanavos
        /// <param name="action">The action to execute</param>
102 9c4346c9 Panagiotis Kanavos
        public void DoAsync(Action action)
103 9c4346c9 Panagiotis Kanavos
        {
104 cfed7823 Panagiotis Kanavos
            Contract.Requires(action!=null);
105 9c4346c9 Panagiotis Kanavos
            Task.Factory.StartNew(action, CancellationToken);
106 9c4346c9 Panagiotis Kanavos
        }
107 9c4346c9 Panagiotis Kanavos
108 9c4346c9 Panagiotis Kanavos
109 9c4346c9 Panagiotis Kanavos
        ~Agent()
110 9c4346c9 Panagiotis Kanavos
        {
111 9c4346c9 Panagiotis Kanavos
            Dispose(false);
112 9c4346c9 Panagiotis Kanavos
        }
113 9c4346c9 Panagiotis Kanavos
114 9c4346c9 Panagiotis Kanavos
        public void Dispose()
115 9c4346c9 Panagiotis Kanavos
        {
116 9c4346c9 Panagiotis Kanavos
            Dispose(true);
117 9c4346c9 Panagiotis Kanavos
            GC.SuppressFinalize(this);
118 9c4346c9 Panagiotis Kanavos
        }
119 9c4346c9 Panagiotis Kanavos
120 9c4346c9 Panagiotis Kanavos
        protected void Dispose(bool disposing)
121 9c4346c9 Panagiotis Kanavos
        {
122 9c4346c9 Panagiotis Kanavos
            if (disposing)
123 9c4346c9 Panagiotis Kanavos
            {
124 9c4346c9 Panagiotis Kanavos
                Stop();
125 9c4346c9 Panagiotis Kanavos
                _messages.Dispose();
126 9c4346c9 Panagiotis Kanavos
                _cancelSource.Dispose();
127 9c4346c9 Panagiotis Kanavos
            }
128 9c4346c9 Panagiotis Kanavos
        }
129 9c4346c9 Panagiotis Kanavos
130 9c4346c9 Panagiotis Kanavos
        public IEnumerable<TMessage> GetEnumerable()
131 9c4346c9 Panagiotis Kanavos
        {
132 9c4346c9 Panagiotis Kanavos
            return _messages;
133 9c4346c9 Panagiotis Kanavos
        }
134 a27aa447 Panagiotis Kanavos
135 aba9e6d9 Panagiotis Kanavos
136 a27aa447 Panagiotis Kanavos
        public Task LoopAsync(Task process, Action loop,Action<Exception> onError=null)
137 a27aa447 Panagiotis Kanavos
        {
138 cfed7823 Panagiotis Kanavos
            Contract.Requires(process!=null);
139 cfed7823 Panagiotis Kanavos
            Contract.Requires(loop!=null);
140 cfed7823 Panagiotis Kanavos
141 a27aa447 Panagiotis Kanavos
            return process.ContinueWith(t =>
142 a27aa447 Panagiotis Kanavos
            {   
143 a27aa447 Panagiotis Kanavos
             
144 a27aa447 Panagiotis Kanavos
                Task.Factory.StartNew(loop, CancellationToken);
145 a27aa447 Panagiotis Kanavos
146 a27aa447 Panagiotis Kanavos
                if (t.IsFaulted)
147 a27aa447 Panagiotis Kanavos
                {
148 a27aa447 Panagiotis Kanavos
                    var ex = t.Exception.InnerException;
149 a27aa447 Panagiotis Kanavos
                    if (ex is OperationCanceledException)
150 a27aa447 Panagiotis Kanavos
                        Stop();
151 a27aa447 Panagiotis Kanavos
                    if (onError != null)
152 a27aa447 Panagiotis Kanavos
                        onError(ex);
153 a27aa447 Panagiotis Kanavos
                }
154 a64c87c8 Panagiotis Kanavos
            },CancellationToken);
155 a64c87c8 Panagiotis Kanavos
        }
156 a27aa447 Panagiotis Kanavos
157 a64c87c8 Panagiotis Kanavos
        public Task<T> LoopAsync<T>(Task<T> process, Action loop,Action<Exception> onError=null)
158 a64c87c8 Panagiotis Kanavos
        {
159 a64c87c8 Panagiotis Kanavos
            return process.ContinueWith(t =>
160 a64c87c8 Panagiotis Kanavos
            {   
161 a64c87c8 Panagiotis Kanavos
                //Spawn the Loop immediatelly
162 a64c87c8 Panagiotis Kanavos
                Task.Factory.StartNew(loop,CancellationToken);
163 a64c87c8 Panagiotis Kanavos
                //Then process possible exceptions
164 a64c87c8 Panagiotis Kanavos
                if (t.IsFaulted)
165 a64c87c8 Panagiotis Kanavos
                {
166 a64c87c8 Panagiotis Kanavos
                    var ex = t.Exception.InnerException;
167 a64c87c8 Panagiotis Kanavos
                    if (ex is OperationCanceledException)
168 a64c87c8 Panagiotis Kanavos
                        Stop();
169 a64c87c8 Panagiotis Kanavos
                    if (onError != null)
170 a64c87c8 Panagiotis Kanavos
                        onError(ex);
171 a64c87c8 Panagiotis Kanavos
                }
172 a64c87c8 Panagiotis Kanavos
                return default(T);
173 a64c87c8 Panagiotis Kanavos
            },CancellationToken);
174 a27aa447 Panagiotis Kanavos
        }
175 9c4346c9 Panagiotis Kanavos
    }
176 9c4346c9 Panagiotis Kanavos
}