Revision 174bbb6e trunk/Pithos.Core/Agents/Agent.cs
b/trunk/Pithos.Core/Agents/Agent.cs | ||
---|---|---|
53 | 53 |
public class Agent<TMessage> : IDisposable |
54 | 54 |
{ |
55 | 55 |
private readonly ConcurrentQueue<TMessage> _queue; |
56 |
private readonly AsyncProducerConsumerCollection<TMessage> _messages; |
|
56 |
//private readonly AsyncCollection<TMessage> _messages; |
|
57 |
private readonly AsyncCollection<TMessage> _messages; |
|
57 | 58 |
private readonly CancellationTokenSource _cancelSource = new CancellationTokenSource(); |
58 | 59 |
public CancellationToken CancellationToken; |
59 | 60 |
|
... | ... | |
63 | 64 |
public Agent(Action<Agent<TMessage>> action) |
64 | 65 |
{ |
65 | 66 |
_queue=new ConcurrentQueue<TMessage>(); |
66 |
_messages = new AsyncProducerConsumerCollection<TMessage>(_queue);
|
|
67 |
_messages = new AsyncCollection<TMessage>(_queue); |
|
67 | 68 |
_process = action; |
68 | 69 |
CancellationToken = _cancelSource.Token; |
69 | 70 |
} |
... | ... | |
78 | 79 |
_messages.Add(message); |
79 | 80 |
} |
80 | 81 |
|
82 |
ConcurrentDictionary<TMessage,TaskCompletionSource<object>> _awaiters=new ConcurrentDictionary<TMessage,TaskCompletionSource<object>>(); |
|
83 |
|
|
84 |
public Task PostAndAwait(TMessage message) |
|
85 |
{ |
|
86 |
var tcs = new TaskCompletionSource<object>(); |
|
87 |
_awaiters[message] = tcs; |
|
88 |
Post(message); |
|
89 |
return tcs.Task; |
|
90 |
} |
|
91 |
|
|
81 | 92 |
/// <summary> |
82 | 93 |
/// Receives a message asynchronously, optionally with a timeout. Receive throws a TimeoutException if the timeout expires |
83 | 94 |
/// </summary> |
... | ... | |
87 | 98 |
return _messages.Take(); |
88 | 99 |
} |
89 | 100 |
|
101 |
public void NotifyComplete(TMessage message) |
|
102 |
{ |
|
103 |
TaskCompletionSource<object> tcs; |
|
104 |
if (_awaiters.TryRemove(message,out tcs)) |
|
105 |
tcs.SetResult(null); |
|
106 |
} |
|
107 |
|
|
90 | 108 |
|
91 | 109 |
|
92 | 110 |
/// <summary> |
... | ... | |
148 | 166 |
if (disposing) |
149 | 167 |
{ |
150 | 168 |
Stop(); |
151 |
_messages.Dispose(); |
|
152 | 169 |
_cancelSource.Dispose(); |
153 | 170 |
} |
154 | 171 |
} |
... | ... | |
209 | 226 |
onError(ex); |
210 | 227 |
} |
211 | 228 |
return default(T); |
212 |
},CancellationToken);
|
|
229 |
}); |
|
213 | 230 |
} |
214 | 231 |
} |
215 | 232 |
} |
Also available in: Unified diff