Replaced BlockingCollection in Agents/Agent.cs with AsyncProducerConsumerCollection...
authorPanagiotis Kanavos <pkanavos@gmail.com>
Thu, 1 Mar 2012 13:37:22 +0000 (15:37 +0200)
committerPanagiotis Kanavos <pkanavos@gmail.com>
Thu, 1 Mar 2012 13:37:22 +0000 (15:37 +0200)
when an agent stops (e.g. when a FileAgent stops because an account is removed).
Cleanup of several related files

Fixes #1785

trunk/Pithos.Client.WPF/Shell/ShellViewModel.cs
trunk/Pithos.Core/Agents/Agent.cs
trunk/Pithos.Core/Agents/CloudTransferAction.cs
trunk/Pithos.Core/Agents/NetworkAgent.cs
trunk/Pithos.Core/Agents/PollAgent.cs
trunk/Pithos.Core/PithosMonitor.cs

index b593617..f5c1ddb 100644 (file)
@@ -738,6 +738,7 @@ namespace Pithos.Client.WPF {
                        var accountInfo=_accounts.FirstOrDefault(account => account.UserName == accountName);
                        _accounts.TryRemove(accountInfo);
 
+                   _pollAgent.RemoveAccount(accountInfo);
                        PithosMonitor monitor;
                        if (Monitors.TryRemove(accountName, out monitor))
                        {
index ec927a6..0956bfc 100644 (file)
@@ -43,9 +43,8 @@ using System;
 using System.Collections.Concurrent;
 using System.Collections.Generic;
 using System.Diagnostics.Contracts;
-using System.Linq;
-using System.Text;
 using System.Threading;
+using System.Threading.Async;
 using System.Threading.Tasks;
 using Pithos.Core.Agents;
 
@@ -54,7 +53,7 @@ namespace Pithos.Core
     public class Agent<TMessage> : IDisposable
     {
         private readonly ConcurrentQueue<TMessage> _queue;
-        private readonly BlockingCollection<TMessage> _messages;
+        private readonly AsyncProducerConsumerCollection<TMessage> _messages;
         private readonly CancellationTokenSource _cancelSource = new CancellationTokenSource();
         public CancellationToken CancellationToken;
 
@@ -64,7 +63,7 @@ namespace Pithos.Core
         public Agent(Action<Agent<TMessage>> action)
         {
             _queue=new ConcurrentQueue<TMessage>();
-            _messages = new BlockingCollection<TMessage>(_queue);
+            _messages = new AsyncProducerConsumerCollection<TMessage>(_queue);            
             _process = action;
             CancellationToken = _cancelSource.Token;
         }
@@ -82,33 +81,10 @@ namespace Pithos.Core
         /// <summary>
         /// Receives a message asynchronously, optionally with a timeout. Receive throws a TimeoutException if the timeout expires
         /// </summary>
-        /// <param name="timeout">Optional timeout in milliseconds. If provided, Receive fails with a TimeoutException if no message is available in the specified time</param>
         /// <returns>A Task that will return the message asynchronously</returns>
-        public Task<TMessage> Receive(int timeout = -1)
+        public  Task<TMessage> Receive()
         {
-            return Task<TMessage>.Factory.StartNew(() =>
-            {
-                TMessage item;
-                if (!_messages.TryTake(out item, timeout, CancellationToken))
-                    throw new TimeoutException();
-                return item;
-            });
-        }
-
-
-        /// <summary>
-        /// Receives a message asynchronously, optionally with a timeout. TryReceive returns an empty task if the timeout expires
-        /// </summary>
-        /// <param name="timeout">Optional timeout in milliseconds. If provided, Receive returns an empty task</param>
-        /// <returns>A Task that will return the message asynchronously</returns>
-        public Task<TMessage> TryReceive(int timeout = -1)
-        {
-            return Task<TMessage>.Factory.StartNew(() =>
-            {
-                TMessage item;
-                _messages.TryTake(out item, timeout, CancellationToken);
-                return item;
-            });
+            return _messages.Take();
         }
 
 
@@ -140,7 +116,7 @@ namespace Pithos.Core
         public void Stop()
         {
             //Stop the message queue
-            _messages.CompleteAdding();
+            //_messages.CompleteAdding();
             //And signal the cancellation
             _cancelSource.Cancel();
         }
@@ -179,7 +155,7 @@ namespace Pithos.Core
 
         public IEnumerable<TMessage> GetEnumerable()
         {
-            return _messages;
+            return _queue;
         }
 
         /// <summary>
index edfd729..5163c3c 100644 (file)
@@ -72,12 +72,7 @@ namespace Pithos.Core.Agents
 
 
         public Lazy<string> LocalHash { get; protected set; }
-        private Lazy<string> _topHash;
-        public Lazy<string> TopHash
-        {
-            get { return _topHash; }
-            set { _topHash = value; }
-        }
+        public Lazy<string> TopHash { get; set; }
 
 
         [ContractInvariantMethod]
@@ -126,7 +121,7 @@ namespace Pithos.Core.Agents
 
         public override string ToString()
         {
-            return String.Format("{0}:{1}->{2}", this.Action, this.LocalFile.FullName, this.CloudFile.Name);
+            return String.Format("{0}:{1}->{2}", Action, LocalFile.FullName, CloudFile.Name);
         }
 
         protected static ObjectInfo CreateObjectInfoFor(AccountInfo accountInfo, FileSystemInfo fileInfo)
@@ -208,7 +203,7 @@ namespace Pithos.Core.Agents
 
         public override string ToString()
         {
-            return String.Format("{0}: _ ->{1}", this.Action, this.CloudFile.Name);
+            return String.Format("{0}: _ ->{1}", Action, CloudFile.Name);
         }
 
     }
@@ -249,7 +244,7 @@ namespace Pithos.Core.Agents
 
         public override string ToString()
         {
-            return String.Format("{0}:{1}->{2}", this.Action, OldCloudFile.Name, CloudFile.Name);
+            return String.Format("{0}:{1}->{2}", Action, OldCloudFile.Name, CloudFile.Name);
         }
 
     }
index d051dbf..563f0f9 100644 (file)
  */
 #endregion
 
-//TODO: Now there is a UUID tag. This can be used for renames/moves
-
-
 using System;
-using System.Collections.Concurrent;
 using System.Collections.Generic;
 using System.ComponentModel.Composition;
 using System.Diagnostics;
 using System.Diagnostics.Contracts;
 using System.IO;
-using System.Linq;
 using System.Net;
 using System.Threading;
 using System.Threading.Tasks;
-using System.Threading.Tasks.Dataflow;
 using Castle.ActiveRecord;
 using Pithos.Interfaces;
 using Pithos.Network;
@@ -62,7 +56,6 @@ using log4net;
 
 namespace Pithos.Core.Agents
 {
-    //TODO: Ensure all network operations use exact casing. Pithos is case sensitive
     [Export]
     public class NetworkAgent
     {
@@ -78,8 +71,6 @@ namespace Pithos.Core.Agents
 
         private static readonly ILog Log = LogManager.GetLogger("NetworkAgent");
 
-        private readonly ConcurrentBag<AccountInfo> _accounts = new ConcurrentBag<AccountInfo>();
-
         [System.ComponentModel.Composition.Import]
         public IPithosSettings Settings { get; set; }
 
@@ -144,7 +135,7 @@ namespace Pithos.Core.Agents
                     {
                         //Clear the status of already deleted files to avoid reprocessing
                         if (action.LocalFile != null)
-                            this.StatusKeeper.ClearFileStatus(action.LocalFile.FullName);
+                            StatusKeeper.ClearFileStatus(action.LocalFile.FullName);
                     }
                     else
                     {
@@ -676,7 +667,7 @@ namespace Pithos.Core.Agents
                     var accountName = parts[1];
                     var oldName = accountInfo.UserName;
                     var absoluteUri = accountInfo.StorageUri.AbsoluteUri;
-                    var nameIndex = absoluteUri.IndexOf(oldName);
+                    var nameIndex = absoluteUri.IndexOf(oldName, StringComparison.Ordinal);
                     var root = absoluteUri.Substring(0, nameIndex);
 
                     accountInfo = new AccountInfo
@@ -852,11 +843,6 @@ namespace Pithos.Core.Agents
         }
 
 
-        public void AddAccount(AccountInfo accountInfo)
-        {            
-            if (!_accounts.Contains(accountInfo))
-                _accounts.Add(accountInfo);
-        }
     }
 
    
index 1a5d764..b14068c 100644 (file)
@@ -47,7 +47,6 @@ using System.Diagnostics.Contracts;
 using System.IO;\r
 using System.Threading;\r
 using System.Threading.Tasks;\r
-using System.Threading.Tasks.Dataflow;\r
 using Castle.ActiveRecord;\r
 using Pithos.Interfaces;\r
 using Pithos.Network;\r
@@ -58,7 +57,6 @@ namespace Pithos.Core.Agents
     using System;\r
     using System.Collections.Generic;\r
     using System.Linq;\r
-    using System.Text;\r
 \r
     /// <summary>\r
     /// PollAgent periodically polls the server to detect object changes. The agent retrieves a listing of all\r
@@ -87,8 +85,8 @@ namespace Pithos.Core.Agents
         //The Sync Event signals a manual synchronisation\r
         private readonly AsyncManualResetEvent _syncEvent = new AsyncManualResetEvent();\r
 \r
-        private ConcurrentDictionary<string, DateTime> _lastSeen = new ConcurrentDictionary<string, DateTime>();\r
-        private readonly ConcurrentBag<AccountInfo> _accounts = new ConcurrentBag<AccountInfo>();\r
+        private readonly ConcurrentDictionary<string, DateTime> _lastSeen = new ConcurrentDictionary<string, DateTime>();\r
+        private readonly ConcurrentDictionary<string, AccountInfo> _accounts = new ConcurrentDictionary<string,AccountInfo>();\r
 \r
 \r
         /// <summary>\r
@@ -111,7 +109,7 @@ namespace Pithos.Core.Agents
             UpdateStatus(PithosStatus.Syncing);\r
             StatusNotification.Notify(new PollNotification());\r
 \r
-            using (log4net.ThreadContext.Stacks["Retrieve Remote"].Push("All accounts"))\r
+            using (ThreadContext.Stacks["Retrieve Remote"].Push("All accounts"))\r
             {\r
                 //If this poll fails, we will retry with the same since value\r
                 var nextSince = since;\r
@@ -121,7 +119,7 @@ namespace Pithos.Core.Agents
                     //This is done to ensure there are no discrepancies due to clock differences\r
                     var current = DateTime.Now.AddSeconds(-1);\r
 \r
-                    var tasks = from accountInfo in _accounts\r
+                    var tasks = from accountInfo in _accounts.Values\r
                                 select ProcessAccountFiles(accountInfo, since);\r
 \r
                     await TaskEx.WhenAll(tasks.ToList());\r
@@ -195,7 +193,9 @@ namespace Pithos.Core.Agents
                 var client = new CloudFilesClient(accountInfo);\r
 \r
                 //We don't need to check the trash container\r
-                var containers = client.ListContainers(accountInfo.UserName).Where(c=>c.Name!="trash");\r
+                var containers = client.ListContainers(accountInfo.UserName)\r
+                    .Where(c=>c.Name!="trash")\r
+                    .ToList();\r
 \r
 \r
                 CreateContainerFolders(accountInfo, containers);\r
@@ -206,20 +206,12 @@ namespace Pithos.Core.Agents
                     await NetworkAgent.GetDeleteAwaiter();\r
                     //Get the poll time now. We may miss some deletions but it's better to keep a file that was deleted\r
                     //than delete a file that was created while we were executing the poll                    \r
-                    var pollTime = DateTime.Now;\r
 \r
                     //Get the list of server objects changed since the last check\r
                     //The name of the container is passed as state in order to create a dictionary of tasks in a subsequent step\r
                     var listObjects = (from container in containers\r
                                        select Task<IList<ObjectInfo>>.Factory.StartNew(_ =>\r
                                              client.ListObjects(accountInfo.UserName, container.Name, since), container.Name)).ToList();\r
-                    //BUG: Can't detect difference between no changes or no objects\r
-                    //ListObjects returns nothing if there are no changes since the last check time (since value)                    \r
-                    //TODO: Must detect the difference between no server objects and no change\r
-\r
-                    //NOTE: One option is to "mark" all result lists with their container name, or \r
-                    //rather the url of the container\r
-                    //Another option \r
 \r
                     var listShared = Task<IList<ObjectInfo>>.Factory.StartNew(_ => \r
                         client.ListSharedObjects(since), "shared");\r
@@ -263,7 +255,7 @@ namespace Pithos.Core.Agents
 \r
                         var differencer = _differencer.PostSnapshot(accountInfo, cleanRemotes);\r
 \r
-                        ProcessDeletedFiles(accountInfo, differencer.Deleted.FilterDirectlyBelow(SelectiveUris), pollTime);\r
+                        ProcessDeletedFiles(accountInfo, differencer.Deleted.FilterDirectlyBelow(SelectiveUris));\r
 \r
                         // @@@ NEED To add previous state here as well, To compare with previous hash\r
 \r
@@ -301,7 +293,7 @@ namespace Pithos.Core.Agents
             }\r
         }\r
 \r
-        AccountsDifferencer _differencer = new AccountsDifferencer();\r
+        readonly AccountsDifferencer _differencer = new AccountsDifferencer();\r
         private List<Uri> _selectiveUris=new List<Uri>();\r
 \r
         /// <summary>\r
@@ -309,8 +301,7 @@ namespace Pithos.Core.Agents
         /// </summary>\r
         /// <param name="accountInfo"></param>\r
         /// <param name="cloudFiles"></param>\r
-        /// <param name="pollTime"></param>\r
-        private void ProcessDeletedFiles(AccountInfo accountInfo, IEnumerable<ObjectInfo> cloudFiles, DateTime pollTime)\r
+        private void ProcessDeletedFiles(AccountInfo accountInfo, IEnumerable<ObjectInfo> cloudFiles)\r
         {\r
             if (accountInfo == null)\r
                 throw new ArgumentNullException("accountInfo");\r
@@ -504,19 +495,6 @@ namespace Pithos.Core.Agents
             }\r
         }\r
 \r
-        private void ProcessTrashedFiles(AccountInfo accountInfo, IEnumerable<ObjectInfo> trashObjects)\r
-        {\r
-            var fileAgent = FileAgent.GetFileAgent(accountInfo);\r
-            foreach (var trashObject in trashObjects)\r
-            {\r
-                var barePath = trashObject.RelativeUrlToFilePath(accountInfo.UserName);\r
-                //HACK: Assume only the "pithos" container is used. Must find out what happens when\r
-                //deleting a file from a different container\r
-                var relativePath = Path.Combine("pithos", barePath);\r
-                fileAgent.Delete(relativePath);\r
-            }\r
-        }\r
-\r
         /// <summary>\r
         /// Notify the UI to update the visual status\r
         /// </summary>\r
@@ -561,8 +539,14 @@ namespace Pithos.Core.Agents
 \r
         public void AddAccount(AccountInfo accountInfo)\r
         {\r
-            if (!_accounts.Contains(accountInfo))\r
-                _accounts.Add(accountInfo);\r
+            //Avoid adding a duplicate accountInfo\r
+            _accounts.TryAdd(accountInfo.UserName, accountInfo);\r
+        }\r
+\r
+        public void RemoveAccount(AccountInfo accountInfo)\r
+        {\r
+            AccountInfo account;\r
+            _accounts.TryRemove(accountInfo.UserName,out account);\r
         }\r
     }\r
 }\r
index 2495b10..9e74cb1 100644 (file)
@@ -361,8 +361,6 @@ namespace Pithos.Core
         private void StartNetworkAgent()
         {
 
-            NetworkAgent.AddAccount(_accountInfo);
-
             NetworkAgent.StatusNotification = StatusNotification;
                         
             NetworkAgent.Start();