2 using System.Collections.Concurrent;
3 using System.Collections.Generic;
6 using System.Threading;
7 using System.Threading.Tasks;
11 public class Agent<TMessage> : IDisposable
13 private ConcurrentQueue<TMessage> _queue;
14 private readonly BlockingCollection<TMessage> _messages = new BlockingCollection<TMessage>();
15 private readonly CancellationTokenSource _cancelSource = new CancellationTokenSource();
16 public CancellationToken CancellationToken;
18 private readonly Action<Agent<TMessage>> _process;
21 public Agent(Action<Agent<TMessage>> action)
24 CancellationToken = _cancelSource.Token;
27 public void Post(TMessage message)
29 _messages.Add(message);
32 public Task<TMessage> Receive(int timeout = -1)
34 return Task<TMessage>.Factory.StartNew(() =>
37 if (!_messages.TryTake(out item, timeout, CancellationToken))
38 throw new TimeoutException();
43 public Task<TMessage> TryReceive(int timeout = -1)
45 return Task<TMessage>.Factory.StartNew(() =>
48 _messages.TryTake(out item, timeout, CancellationToken);
58 Task.Factory.StartNew(() => _process(this), CancellationToken);
63 public static Agent<TMessage> Start(Action<Agent<TMessage>> action)
65 var agent = new Agent<TMessage>(action);
72 _messages.CompleteAdding();
73 _cancelSource.Cancel();
76 public void DoAsync(Action action)
78 Task.Factory.StartNew(action, CancellationToken);
90 GC.SuppressFinalize(this);
93 protected void Dispose(bool disposing)
99 _cancelSource.Dispose();
103 public void AddFromEnumerable(IEnumerable<TMessage> enumerable)
105 foreach (var message in enumerable)
111 public IEnumerable<TMessage> GetEnumerable()