root / trunk / Pithos.Core / Agents / Agent.cs @ ab2f6f79
History | View | Annotate | Download (5.4 kB)
1 | 9c4346c9 | Panagiotis Kanavos | using System; |
---|---|---|---|
2 | 9c4346c9 | Panagiotis Kanavos | using System.Collections.Concurrent; |
3 | 9c4346c9 | Panagiotis Kanavos | using System.Collections.Generic; |
4 | cfed7823 | Panagiotis Kanavos | using System.Diagnostics.Contracts; |
5 | 9c4346c9 | Panagiotis Kanavos | using System.Linq; |
6 | 9c4346c9 | Panagiotis Kanavos | using System.Text; |
7 | 9c4346c9 | Panagiotis Kanavos | using System.Threading; |
8 | 9c4346c9 | Panagiotis Kanavos | using System.Threading.Tasks; |
9 | 9c4346c9 | Panagiotis Kanavos | |
10 | 9c4346c9 | Panagiotis Kanavos | namespace Pithos.Core |
11 | 9c4346c9 | Panagiotis Kanavos | { |
12 | 9c4346c9 | Panagiotis Kanavos | public class Agent<TMessage> : IDisposable |
13 | a27aa447 | Panagiotis Kanavos | { |
14 | 9c4346c9 | Panagiotis Kanavos | private readonly BlockingCollection<TMessage> _messages = new BlockingCollection<TMessage>(); |
15 | 9c4346c9 | Panagiotis Kanavos | private readonly CancellationTokenSource _cancelSource = new CancellationTokenSource(); |
16 | 9c4346c9 | Panagiotis Kanavos | public CancellationToken CancellationToken; |
17 | 9c4346c9 | Panagiotis Kanavos | |
18 | 9c4346c9 | Panagiotis Kanavos | private readonly Action<Agent<TMessage>> _process; |
19 | 9c4346c9 | Panagiotis Kanavos | |
20 | 9c4346c9 | Panagiotis Kanavos | |
21 | 9c4346c9 | Panagiotis Kanavos | public Agent(Action<Agent<TMessage>> action) |
22 | 9c4346c9 | Panagiotis Kanavos | { |
23 | 9c4346c9 | Panagiotis Kanavos | _process = action; |
24 | 9c4346c9 | Panagiotis Kanavos | CancellationToken = _cancelSource.Token; |
25 | 9c4346c9 | Panagiotis Kanavos | } |
26 | 9c4346c9 | Panagiotis Kanavos | |
27 | 9c4346c9 | Panagiotis Kanavos | public void Post(TMessage message) |
28 | 9c4346c9 | Panagiotis Kanavos | { |
29 | 9c4346c9 | Panagiotis Kanavos | _messages.Add(message); |
30 | 9c4346c9 | Panagiotis Kanavos | } |
31 | 9c4346c9 | Panagiotis Kanavos | |
32 | aba9e6d9 | Panagiotis Kanavos | /// <summary> |
33 | aba9e6d9 | Panagiotis Kanavos | /// Receives a message asynchronously, optionally with a timeout. Receive throws a TimeoutException if the timeout expires |
34 | aba9e6d9 | Panagiotis Kanavos | /// </summary> |
35 | aba9e6d9 | Panagiotis Kanavos | /// <param name="timeout">Optional timeout in milliseconds. If provided, Receive fails with a TimeoutException if no message is available in the specified time</param> |
36 | aba9e6d9 | Panagiotis Kanavos | /// <returns>A Task that will return the message asynchronously</returns> |
37 | 9c4346c9 | Panagiotis Kanavos | public Task<TMessage> Receive(int timeout = -1) |
38 | 9c4346c9 | Panagiotis Kanavos | { |
39 | 9c4346c9 | Panagiotis Kanavos | return Task<TMessage>.Factory.StartNew(() => |
40 | 9c4346c9 | Panagiotis Kanavos | { |
41 | 9c4346c9 | Panagiotis Kanavos | TMessage item; |
42 | 9c4346c9 | Panagiotis Kanavos | if (!_messages.TryTake(out item, timeout, CancellationToken)) |
43 | 9c4346c9 | Panagiotis Kanavos | throw new TimeoutException(); |
44 | 9c4346c9 | Panagiotis Kanavos | return item; |
45 | 9c4346c9 | Panagiotis Kanavos | }); |
46 | 9c4346c9 | Panagiotis Kanavos | } |
47 | 9c4346c9 | Panagiotis Kanavos | |
48 | aba9e6d9 | Panagiotis Kanavos | |
49 | aba9e6d9 | Panagiotis Kanavos | /// <summary> |
50 | aba9e6d9 | Panagiotis Kanavos | /// Receives a message asynchronously, optionally with a timeout. TryReceive returns an empty task if the timeout expires |
51 | aba9e6d9 | Panagiotis Kanavos | /// </summary> |
52 | aba9e6d9 | Panagiotis Kanavos | /// <param name="timeout">Optional timeout in milliseconds. If provided, Receive returns an empty task</param> |
53 | aba9e6d9 | Panagiotis Kanavos | /// <returns>A Task that will return the message asynchronously</returns> |
54 | 9c4346c9 | Panagiotis Kanavos | public Task<TMessage> TryReceive(int timeout = -1) |
55 | 9c4346c9 | Panagiotis Kanavos | { |
56 | 9c4346c9 | Panagiotis Kanavos | return Task<TMessage>.Factory.StartNew(() => |
57 | 9c4346c9 | Panagiotis Kanavos | { |
58 | 9c4346c9 | Panagiotis Kanavos | TMessage item; |
59 | 9c4346c9 | Panagiotis Kanavos | _messages.TryTake(out item, timeout, CancellationToken); |
60 | 9c4346c9 | Panagiotis Kanavos | return item; |
61 | 9c4346c9 | Panagiotis Kanavos | }); |
62 | 9c4346c9 | Panagiotis Kanavos | } |
63 | 9c4346c9 | Panagiotis Kanavos | |
64 | 9c4346c9 | Panagiotis Kanavos | |
65 | 9c4346c9 | Panagiotis Kanavos | |
66 | aba9e6d9 | Panagiotis Kanavos | /// <summary> |
67 | aba9e6d9 | Panagiotis Kanavos | /// Start the agent |
68 | aba9e6d9 | Panagiotis Kanavos | /// </summary> |
69 | 9c4346c9 | Panagiotis Kanavos | public void Start() |
70 | 9c4346c9 | Panagiotis Kanavos | { |
71 | a27aa447 | Panagiotis Kanavos | Task.Factory.StartNew(() => _process(this), CancellationToken); |
72 | 9c4346c9 | Panagiotis Kanavos | } |
73 | 9c4346c9 | Panagiotis Kanavos | |
74 | 9c4346c9 | Panagiotis Kanavos | |
75 | aba9e6d9 | Panagiotis Kanavos | /// <summary> |
76 | aba9e6d9 | Panagiotis Kanavos | /// Create and start a new agent for the specified type of message |
77 | aba9e6d9 | Panagiotis Kanavos | /// </summary> |
78 | aba9e6d9 | Panagiotis Kanavos | /// <param name="action">The message processing action</param> |
79 | aba9e6d9 | Panagiotis Kanavos | /// <returns>A started Agent</returns> |
80 | 9c4346c9 | Panagiotis Kanavos | public static Agent<TMessage> Start(Action<Agent<TMessage>> action) |
81 | 9c4346c9 | Panagiotis Kanavos | { |
82 | 9c4346c9 | Panagiotis Kanavos | var agent = new Agent<TMessage>(action); |
83 | 9c4346c9 | Panagiotis Kanavos | agent.Start(); |
84 | 9c4346c9 | Panagiotis Kanavos | return agent; |
85 | 9c4346c9 | Panagiotis Kanavos | } |
86 | 9c4346c9 | Panagiotis Kanavos | |
87 | aba9e6d9 | Panagiotis Kanavos | /// <summary> |
88 | aba9e6d9 | Panagiotis Kanavos | /// Stops the agent |
89 | aba9e6d9 | Panagiotis Kanavos | /// </summary> |
90 | 9c4346c9 | Panagiotis Kanavos | public void Stop() |
91 | 9c4346c9 | Panagiotis Kanavos | { |
92 | aba9e6d9 | Panagiotis Kanavos | //Stop the message queue |
93 | 9c4346c9 | Panagiotis Kanavos | _messages.CompleteAdding(); |
94 | aba9e6d9 | Panagiotis Kanavos | //And signal the cancellation |
95 | 9c4346c9 | Panagiotis Kanavos | _cancelSource.Cancel(); |
96 | 9c4346c9 | Panagiotis Kanavos | } |
97 | 9c4346c9 | Panagiotis Kanavos | |
98 | aba9e6d9 | Panagiotis Kanavos | /// <summary> |
99 | aba9e6d9 | Panagiotis Kanavos | /// Execute an action asynchronously, using the agent's cancellation source |
100 | aba9e6d9 | Panagiotis Kanavos | /// </summary> |
101 | aba9e6d9 | Panagiotis Kanavos | /// <param name="action">The action to execute</param> |
102 | 9c4346c9 | Panagiotis Kanavos | public void DoAsync(Action action) |
103 | 9c4346c9 | Panagiotis Kanavos | { |
104 | cfed7823 | Panagiotis Kanavos | Contract.Requires(action!=null); |
105 | 9c4346c9 | Panagiotis Kanavos | Task.Factory.StartNew(action, CancellationToken); |
106 | 9c4346c9 | Panagiotis Kanavos | } |
107 | 9c4346c9 | Panagiotis Kanavos | |
108 | 9c4346c9 | Panagiotis Kanavos | |
109 | 9c4346c9 | Panagiotis Kanavos | ~Agent() |
110 | 9c4346c9 | Panagiotis Kanavos | { |
111 | 9c4346c9 | Panagiotis Kanavos | Dispose(false); |
112 | 9c4346c9 | Panagiotis Kanavos | } |
113 | 9c4346c9 | Panagiotis Kanavos | |
114 | 9c4346c9 | Panagiotis Kanavos | public void Dispose() |
115 | 9c4346c9 | Panagiotis Kanavos | { |
116 | 9c4346c9 | Panagiotis Kanavos | Dispose(true); |
117 | 9c4346c9 | Panagiotis Kanavos | GC.SuppressFinalize(this); |
118 | 9c4346c9 | Panagiotis Kanavos | } |
119 | 9c4346c9 | Panagiotis Kanavos | |
120 | 9c4346c9 | Panagiotis Kanavos | protected void Dispose(bool disposing) |
121 | 9c4346c9 | Panagiotis Kanavos | { |
122 | 9c4346c9 | Panagiotis Kanavos | if (disposing) |
123 | 9c4346c9 | Panagiotis Kanavos | { |
124 | 9c4346c9 | Panagiotis Kanavos | Stop(); |
125 | 9c4346c9 | Panagiotis Kanavos | _messages.Dispose(); |
126 | 9c4346c9 | Panagiotis Kanavos | _cancelSource.Dispose(); |
127 | 9c4346c9 | Panagiotis Kanavos | } |
128 | 9c4346c9 | Panagiotis Kanavos | } |
129 | 9c4346c9 | Panagiotis Kanavos | |
130 | 9c4346c9 | Panagiotis Kanavos | public IEnumerable<TMessage> GetEnumerable() |
131 | 9c4346c9 | Panagiotis Kanavos | { |
132 | 9c4346c9 | Panagiotis Kanavos | return _messages; |
133 | 9c4346c9 | Panagiotis Kanavos | } |
134 | a27aa447 | Panagiotis Kanavos | |
135 | aba9e6d9 | Panagiotis Kanavos | |
136 | a27aa447 | Panagiotis Kanavos | public Task LoopAsync(Task process, Action loop,Action<Exception> onError=null) |
137 | a27aa447 | Panagiotis Kanavos | { |
138 | cfed7823 | Panagiotis Kanavos | Contract.Requires(process!=null); |
139 | cfed7823 | Panagiotis Kanavos | Contract.Requires(loop!=null); |
140 | cfed7823 | Panagiotis Kanavos | |
141 | a27aa447 | Panagiotis Kanavos | return process.ContinueWith(t => |
142 | a27aa447 | Panagiotis Kanavos | { |
143 | a27aa447 | Panagiotis Kanavos | |
144 | a27aa447 | Panagiotis Kanavos | Task.Factory.StartNew(loop, CancellationToken); |
145 | a27aa447 | Panagiotis Kanavos | |
146 | a27aa447 | Panagiotis Kanavos | if (t.IsFaulted) |
147 | a27aa447 | Panagiotis Kanavos | { |
148 | a27aa447 | Panagiotis Kanavos | var ex = t.Exception.InnerException; |
149 | a27aa447 | Panagiotis Kanavos | if (ex is OperationCanceledException) |
150 | a27aa447 | Panagiotis Kanavos | Stop(); |
151 | a27aa447 | Panagiotis Kanavos | if (onError != null) |
152 | a27aa447 | Panagiotis Kanavos | onError(ex); |
153 | a27aa447 | Panagiotis Kanavos | } |
154 | a64c87c8 | Panagiotis Kanavos | },CancellationToken); |
155 | a64c87c8 | Panagiotis Kanavos | } |
156 | a27aa447 | Panagiotis Kanavos | |
157 | a64c87c8 | Panagiotis Kanavos | public Task<T> LoopAsync<T>(Task<T> process, Action loop,Action<Exception> onError=null) |
158 | a64c87c8 | Panagiotis Kanavos | { |
159 | a64c87c8 | Panagiotis Kanavos | return process.ContinueWith(t => |
160 | a64c87c8 | Panagiotis Kanavos | { |
161 | a64c87c8 | Panagiotis Kanavos | //Spawn the Loop immediatelly |
162 | a64c87c8 | Panagiotis Kanavos | Task.Factory.StartNew(loop,CancellationToken); |
163 | a64c87c8 | Panagiotis Kanavos | //Then process possible exceptions |
164 | a64c87c8 | Panagiotis Kanavos | if (t.IsFaulted) |
165 | a64c87c8 | Panagiotis Kanavos | { |
166 | a64c87c8 | Panagiotis Kanavos | var ex = t.Exception.InnerException; |
167 | a64c87c8 | Panagiotis Kanavos | if (ex is OperationCanceledException) |
168 | a64c87c8 | Panagiotis Kanavos | Stop(); |
169 | a64c87c8 | Panagiotis Kanavos | if (onError != null) |
170 | a64c87c8 | Panagiotis Kanavos | onError(ex); |
171 | a64c87c8 | Panagiotis Kanavos | } |
172 | a64c87c8 | Panagiotis Kanavos | return default(T); |
173 | a64c87c8 | Panagiotis Kanavos | },CancellationToken); |
174 | a27aa447 | Panagiotis Kanavos | } |
175 | 9c4346c9 | Panagiotis Kanavos | } |
176 | 9c4346c9 | Panagiotis Kanavos | } |