-using System;
+#region
+/* -----------------------------------------------------------------------
+ * <copyright file="Agent.cs" company="GRNet">
+ *
+ * Copyright 2011-2012 GRNET S.A. All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or
+ * without modification, are permitted provided that the following
+ * conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above
+ * copyright notice, this list of conditions and the following
+ * disclaimer.
+ *
+ * 2. Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following
+ * disclaimer in the documentation and/or other materials
+ * provided with the distribution.
+ *
+ *
+ * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
+ * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+ * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
+ * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
+ * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
+ * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
+ * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
+ * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
+ * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ *
+ * The views and conclusions contained in the software and
+ * documentation are those of the authors and should not be
+ * interpreted as representing official policies, either expressed
+ * or implied, of GRNET S.A.
+ * </copyright>
+ * -----------------------------------------------------------------------
+ */
+#endregion
+using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics.Contracts;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
+using Pithos.Core.Agents;
namespace Pithos.Core
{
public class Agent<TMessage> : IDisposable
- {
- private readonly BlockingCollection<TMessage> _messages = new BlockingCollection<TMessage>();
+ {
+ private readonly ConcurrentQueue<TMessage> _queue;
+ private readonly BlockingCollection<TMessage> _messages;
private readonly CancellationTokenSource _cancelSource = new CancellationTokenSource();
public CancellationToken CancellationToken;
public Agent(Action<Agent<TMessage>> action)
{
+ _queue=new ConcurrentQueue<TMessage>();
+ _messages = new BlockingCollection<TMessage>(_queue);
_process = action;
CancellationToken = _cancelSource.Token;
}
+ public bool IsEmpty
+ {
+ get { return _queue.IsEmpty; }
+ }
+
public void Post(TMessage message)
{
_messages.Add(message);
}
+ /// <summary>
+ /// Receives a message asynchronously, optionally with a timeout. Receive throws a TimeoutException if the timeout expires
+ /// </summary>
+ /// <param name="timeout">Optional timeout in milliseconds. If provided, Receive fails with a TimeoutException if no message is available in the specified time</param>
+ /// <returns>A Task that will return the message asynchronously</returns>
public Task<TMessage> Receive(int timeout = -1)
{
return Task<TMessage>.Factory.StartNew(() =>
});
}
+
+ /// <summary>
+ /// Receives a message asynchronously, optionally with a timeout. TryReceive returns an empty task if the timeout expires
+ /// </summary>
+ /// <param name="timeout">Optional timeout in milliseconds. If provided, Receive returns an empty task</param>
+ /// <returns>A Task that will return the message asynchronously</returns>
public Task<TMessage> TryReceive(int timeout = -1)
{
return Task<TMessage>.Factory.StartNew(() =>
-
+ /// <summary>
+ /// Start the agent
+ /// </summary>
public void Start()
{
Task.Factory.StartNew(() => _process(this), CancellationToken);
}
-
+ /// <summary>
+ /// Create and start a new agent for the specified type of message
+ /// </summary>
+ /// <param name="action">The message processing action</param>
+ /// <returns>A started Agent</returns>
public static Agent<TMessage> Start(Action<Agent<TMessage>> action)
{
var agent = new Agent<TMessage>(action);
return agent;
}
+ /// <summary>
+ /// Stops the agent
+ /// </summary>
public void Stop()
{
+ //Stop the message queue
_messages.CompleteAdding();
+ //And signal the cancellation
_cancelSource.Cancel();
}
+ /// <summary>
+ /// Execute an action asynchronously, using the agent's cancellation source
+ /// </summary>
+ /// <param name="action">The action to execute</param>
public void DoAsync(Action action)
{
Contract.Requires(action!=null);
return _messages;
}
+ /// <summary>
+ /// Remove the first message that matches the predicate
+ /// </summary>
+ /// <param name="predicate">The condition to match</param>
+ /// <remarks>Removes the first message that matches the predicate by dequeing all
+ /// messages and re-enqueing all except the first matching message</remarks>
+ public void Remove(Func<TMessage,bool> predicate)
+ {
+ //Can this work? Dequeue all items
+ //and then enqueue everything except the filtered items
+
+ _queue.RemoveFirst(predicate);
+ }
+
public Task LoopAsync(Task process, Action loop,Action<Exception> onError=null)
{
Contract.Requires(process!=null);