Revision 174bbb6e trunk/Pithos.Core/Agents/Agent.cs

b/trunk/Pithos.Core/Agents/Agent.cs
53 53
    public class Agent<TMessage> : IDisposable
54 54
    {
55 55
        private readonly ConcurrentQueue<TMessage> _queue;
56
        private readonly AsyncProducerConsumerCollection<TMessage> _messages;
56
        //private readonly AsyncCollection<TMessage> _messages;
57
        private readonly AsyncCollection<TMessage> _messages;
57 58
        private readonly CancellationTokenSource _cancelSource = new CancellationTokenSource();
58 59
        public CancellationToken CancellationToken;
59 60

  
......
63 64
        public Agent(Action<Agent<TMessage>> action)
64 65
        {
65 66
            _queue=new ConcurrentQueue<TMessage>();
66
            _messages = new AsyncProducerConsumerCollection<TMessage>(_queue);            
67
            _messages = new AsyncCollection<TMessage>(_queue);            
67 68
            _process = action;
68 69
            CancellationToken = _cancelSource.Token;
69 70
        }
......
78 79
            _messages.Add(message);
79 80
        }
80 81

  
82
        ConcurrentDictionary<TMessage,TaskCompletionSource<object>> _awaiters=new ConcurrentDictionary<TMessage,TaskCompletionSource<object>>();
83

  
84
        public Task PostAndAwait(TMessage message)
85
        {            
86
            var tcs = new TaskCompletionSource<object>();
87
            _awaiters[message] = tcs;
88
            Post(message);
89
            return tcs.Task;
90
        }
91

  
81 92
        /// <summary>
82 93
        /// Receives a message asynchronously, optionally with a timeout. Receive throws a TimeoutException if the timeout expires
83 94
        /// </summary>
......
87 98
            return _messages.Take();
88 99
        }
89 100

  
101
        public void NotifyComplete(TMessage message)
102
        {
103
            TaskCompletionSource<object> tcs;
104
            if (_awaiters.TryRemove(message,out tcs))
105
                tcs.SetResult(null);
106
        }
107

  
90 108

  
91 109

  
92 110
        /// <summary>
......
148 166
            if (disposing)
149 167
            {
150 168
                Stop();
151
                _messages.Dispose();
152 169
                _cancelSource.Dispose();
153 170
            }
154 171
        }
......
209 226
                        onError(ex);
210 227
                }
211 228
                return default(T);
212
            },CancellationToken);
229
            });
213 230
        }
214 231
    }
215 232
}

Also available in: Unified diff