#region /* ----------------------------------------------------------------------- * * * Copyright 2011-2012 GRNET S.A. All rights reserved. * * Redistribution and use in source and binary forms, with or * without modification, are permitted provided that the following * conditions are met: * * 1. Redistributions of source code must retain the above * copyright notice, this list of conditions and the following * disclaimer. * * 2. Redistributions in binary form must reproduce the above * copyright notice, this list of conditions and the following * disclaimer in the documentation and/or other materials * provided with the distribution. * * * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE * POSSIBILITY OF SUCH DAMAGE. * * The views and conclusions contained in the software and * documentation are those of the authors and should not be * interpreted as representing official policies, either expressed * or implied, of GRNET S.A. * * ----------------------------------------------------------------------- */ #endregion using System; using System.Collections.Concurrent; using System.Collections.Generic; using System.Diagnostics.Contracts; using System.Threading; using System.Threading.Async; using System.Threading.Tasks; using Pithos.Core.Agents; namespace Pithos.Core { public class Agent : IDisposable { private readonly ConcurrentQueue _queue; //private readonly AsyncCollection _messages; private readonly AsyncCollection _messages; private readonly CancellationTokenSource _cancelSource = new CancellationTokenSource(); public CancellationToken CancellationToken; private readonly Action> _process; public Agent(Action> action) { _queue=new ConcurrentQueue(); _messages = new AsyncCollection(_queue); _process = action; CancellationToken = _cancelSource.Token; } public bool IsEmpty { get { return _queue.IsEmpty; } } public void Post(TMessage message) { _messages.Add(message); } ConcurrentDictionary> _awaiters=new ConcurrentDictionary>(); public Task PostAndAwait(TMessage message) { var tcs = new TaskCompletionSource(); _awaiters[message] = tcs; Post(message); return tcs.Task; } /// /// Receives a message asynchronously, optionally with a timeout. Receive throws a TimeoutException if the timeout expires /// /// A Task that will return the message asynchronously public Task Receive() { return _messages.Take(); } public void NotifyComplete(TMessage message) { TaskCompletionSource tcs; if (_awaiters.TryRemove(message,out tcs)) tcs.SetResult(null); } /// /// Start the agent /// public void Start() { Task.Factory.StartNew(() => _process(this), CancellationToken); } /// /// Create and start a new agent for the specified type of message /// /// The message processing action /// A started Agent public static Agent Start(Action> action) { var agent = new Agent(action); agent.Start(); return agent; } /// /// Stops the agent /// public void Stop() { //Stop the message queue //_messages.CompleteAdding(); //And signal the cancellation _cancelSource.Cancel(); } /// /// Execute an action asynchronously, using the agent's cancellation source /// /// The action to execute public void DoAsync(Action action) { if(action==null) throw new ArgumentNullException("action"); Contract.EndContractBlock(); Task.Factory.StartNew(action, CancellationToken); } ~Agent() { Dispose(false); } public void Dispose() { Dispose(true); GC.SuppressFinalize(this); } protected void Dispose(bool disposing) { if (disposing) { Stop(); _cancelSource.Dispose(); } } public IEnumerable GetEnumerable() { return _queue; } /// /// Remove the first message that matches the predicate /// /// The condition to match /// Removes the first message that matches the predicate by dequeing all /// messages and re-enqueing all except the first matching message public void Remove(Func predicate) { //Can this work? Dequeue all items //and then enqueue everything except the filtered items _queue.RemoveFirst(predicate); } public Task LoopAsync(Task process, Action loop,Action onError=null) { if(process==null) throw new ArgumentNullException("process"); if(loop==null) throw new ArgumentNullException("loop"); Contract.EndContractBlock(); return process.ContinueWith(t => { Task.Factory.StartNew(loop, CancellationToken); if (t.IsFaulted) { var ex = t.Exception.InnerException; if (ex is OperationCanceledException) Stop(); if (onError != null) onError(ex); } },CancellationToken); } public Task LoopAsync(Task process, Action loop,Action onError=null) { return process.ContinueWith(t => { //Spawn the Loop immediatelly Task.Factory.StartNew(loop,CancellationToken); //Then process possible exceptions if (t.IsFaulted) { var ex = t.Exception.InnerException; if (ex is OperationCanceledException) Stop(); if (onError != null) onError(ex); } return default(T); },CancellationToken); } } }