SyncFiles method modified to detect conflicts as per #2096
[pithos-ms-client] / trunk / Pithos.Core / Agents / Agent.cs
index faa5760..ec927a6 100644 (file)
@@ -1,4 +1,45 @@
-using System;
+#region
+/* -----------------------------------------------------------------------
+ * <copyright file="Agent.cs" company="GRNet">
+ * 
+ * Copyright 2011-2012 GRNET S.A. All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or
+ * without modification, are permitted provided that the following
+ * conditions are met:
+ *
+ *   1. Redistributions of source code must retain the above
+ *      copyright notice, this list of conditions and the following
+ *      disclaimer.
+ *
+ *   2. Redistributions in binary form must reproduce the above
+ *      copyright notice, this list of conditions and the following
+ *      disclaimer in the documentation and/or other materials
+ *      provided with the distribution.
+ *
+ *
+ * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
+ * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+ * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
+ * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
+ * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
+ * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
+ * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
+ * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
+ * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ *
+ * The views and conclusions contained in the software and
+ * documentation are those of the authors and should not be
+ * interpreted as representing official policies, either expressed
+ * or implied, of GRNET S.A.
+ * </copyright>
+ * -----------------------------------------------------------------------
+ */
+#endregion
+using System;
 using System.Collections.Concurrent;
 using System.Collections.Generic;
 using System.Diagnostics.Contracts;
@@ -6,12 +47,14 @@ using System.Linq;
 using System.Text;
 using System.Threading;
 using System.Threading.Tasks;
+using Pithos.Core.Agents;
 
 namespace Pithos.Core
 {
     public class Agent<TMessage> : IDisposable
-    {        
-        private readonly BlockingCollection<TMessage> _messages = new BlockingCollection<TMessage>();
+    {
+        private readonly ConcurrentQueue<TMessage> _queue;
+        private readonly BlockingCollection<TMessage> _messages;
         private readonly CancellationTokenSource _cancelSource = new CancellationTokenSource();
         public CancellationToken CancellationToken;
 
@@ -20,15 +63,27 @@ namespace Pithos.Core
 
         public Agent(Action<Agent<TMessage>> action)
         {
+            _queue=new ConcurrentQueue<TMessage>();
+            _messages = new BlockingCollection<TMessage>(_queue);
             _process = action;
             CancellationToken = _cancelSource.Token;
         }
 
+        public bool IsEmpty
+        {
+            get { return _queue.IsEmpty; }
+        }
+
         public void Post(TMessage message)
         {
             _messages.Add(message);
         }
 
+        /// <summary>
+        /// Receives a message asynchronously, optionally with a timeout. Receive throws a TimeoutException if the timeout expires
+        /// </summary>
+        /// <param name="timeout">Optional timeout in milliseconds. If provided, Receive fails with a TimeoutException if no message is available in the specified time</param>
+        /// <returns>A Task that will return the message asynchronously</returns>
         public Task<TMessage> Receive(int timeout = -1)
         {
             return Task<TMessage>.Factory.StartNew(() =>
@@ -40,6 +95,12 @@ namespace Pithos.Core
             });
         }
 
+
+        /// <summary>
+        /// Receives a message asynchronously, optionally with a timeout. TryReceive returns an empty task if the timeout expires
+        /// </summary>
+        /// <param name="timeout">Optional timeout in milliseconds. If provided, Receive returns an empty task</param>
+        /// <returns>A Task that will return the message asynchronously</returns>
         public Task<TMessage> TryReceive(int timeout = -1)
         {
             return Task<TMessage>.Factory.StartNew(() =>
@@ -52,14 +113,20 @@ namespace Pithos.Core
 
 
 
-
+        /// <summary>
+        /// Start the agent
+        /// </summary>
         public void Start()
         {
             Task.Factory.StartNew(() => _process(this), CancellationToken);            
         }
 
 
-
+        /// <summary>
+        /// Create and start a new agent for the specified type of message
+        /// </summary>
+        /// <param name="action">The message processing action</param>
+        /// <returns>A started Agent</returns>
         public static Agent<TMessage> Start(Action<Agent<TMessage>> action)
         {
             var agent = new Agent<TMessage>(action);
@@ -67,12 +134,21 @@ namespace Pithos.Core
             return agent;
         }
 
+        /// <summary>
+        /// Stops the agent 
+        /// </summary>
         public void Stop()
         {
+            //Stop the message queue
             _messages.CompleteAdding();
+            //And signal the cancellation
             _cancelSource.Cancel();
         }
 
+        /// <summary>
+        /// Execute an action asynchronously, using the agent's cancellation source
+        /// </summary>
+        /// <param name="action">The action to execute</param>
         public void DoAsync(Action action)
         {
             Contract.Requires(action!=null);
@@ -106,6 +182,20 @@ namespace Pithos.Core
             return _messages;
         }
 
+        /// <summary>
+        /// Remove the first message that matches the predicate
+        /// </summary>
+        /// <param name="predicate">The condition to match</param>
+        /// <remarks>Removes the first message that matches the predicate by dequeing all 
+        /// messages and re-enqueing all except the first matching message</remarks>
+        public void Remove(Func<TMessage,bool> predicate)
+        {
+            //Can this work? Dequeue all items 
+            //and then enqueue everything except the filtered items
+
+            _queue.RemoveFirst(predicate);
+        }
+
         public Task LoopAsync(Task process, Action loop,Action<Exception> onError=null)
         {
             Contract.Requires(process!=null);