Synch fixes
[pithos-ms-client] / trunk / Pithos.Core / Agent.cs
1 using System;
2 using System.Collections.Concurrent;
3 using System.Collections.Generic;
4 using System.Linq;
5 using System.Text;
6 using System.Threading;
7 using System.Threading.Tasks;
8
9 namespace Pithos.Core
10 {
11     public class Agent<TMessage> : IDisposable
12     {
13         private ConcurrentQueue<TMessage> _queue;
14         private readonly BlockingCollection<TMessage> _messages = new BlockingCollection<TMessage>();
15         private readonly CancellationTokenSource _cancelSource = new CancellationTokenSource();
16         public CancellationToken CancellationToken;
17
18         private readonly Action<Agent<TMessage>> _process;
19
20
21         public Agent(Action<Agent<TMessage>> action)
22         {
23             _process = action;
24             CancellationToken = _cancelSource.Token;
25         }
26
27         public void Post(TMessage message)
28         {
29             _messages.Add(message);
30         }
31
32         public Task<TMessage> Receive(int timeout = -1)
33         {
34             return Task<TMessage>.Factory.StartNew(() =>
35             {
36                 TMessage item;
37                 if (!_messages.TryTake(out item, timeout, CancellationToken))
38                     throw new TimeoutException();
39                 return item;
40             });
41         }
42
43         public Task<TMessage> TryReceive(int timeout = -1)
44         {
45             return Task<TMessage>.Factory.StartNew(() =>
46             {
47                 TMessage item;
48                 _messages.TryTake(out item, timeout, CancellationToken);
49                 return item;
50             });
51         }
52
53
54
55
56         public void Start()
57         {
58             Task.Factory.StartNew(() => _process(this), CancellationToken);
59         }
60
61
62
63         public static Agent<TMessage> Start(Action<Agent<TMessage>> action)
64         {
65             var agent = new Agent<TMessage>(action);
66             agent.Start();
67             return agent;
68         }
69
70         public void Stop()
71         {
72             _messages.CompleteAdding();
73             _cancelSource.Cancel();
74         }
75
76         public void DoAsync(Action action)
77         {
78             Task.Factory.StartNew(action, CancellationToken);
79         }
80
81
82         ~Agent()
83         {
84             Dispose(false);
85         }
86
87         public void Dispose()
88         {
89             Dispose(true);
90             GC.SuppressFinalize(this);
91         }
92
93         protected void Dispose(bool disposing)
94         {
95             if (disposing)
96             {
97                 Stop();
98                 _messages.Dispose();
99                 _cancelSource.Dispose();
100             }
101         }
102
103         public void AddFromEnumerable(IEnumerable<TMessage> enumerable)
104         {
105             foreach (var message in enumerable)
106             {
107                 Post(message);
108             }
109         }
110
111         public IEnumerable<TMessage> GetEnumerable()
112         {
113             return _messages;
114         }
115     }
116 }