Added processing of batch files
[pithos-ms-client] / trunk / Pithos.Core / Agents / PollAgent.cs
index b14068c..476f904 100644 (file)
@@ -45,6 +45,9 @@ using System.ComponentModel.Composition;
 using System.Diagnostics;\r
 using System.Diagnostics.Contracts;\r
 using System.IO;\r
+using System.Linq.Expressions;\r
+using System.Reflection;\r
+using System.Security.Cryptography;\r
 using System.Threading;\r
 using System.Threading.Tasks;\r
 using Castle.ActiveRecord;\r
@@ -58,6 +61,71 @@ namespace Pithos.Core.Agents
     using System.Collections.Generic;\r
     using System.Linq;\r
 \r
+    [DebuggerDisplay("{FilePath} C:{C} L:{L} S:{S}")]\r
+    public class StateTuple\r
+    {\r
+        public string FilePath { get; private set; }\r
+\r
+        public string MD5 { get; set; }\r
+\r
+        public string L\r
+        {\r
+            get { return FileState==null?null:FileState.Checksum; }\r
+        }\r
+\r
+        private string _c;\r
+        public string C\r
+        {\r
+            get { return _c; }\r
+            set {\r
+                _c = String.IsNullOrWhiteSpace(value) ? null : value;\r
+            }\r
+        }\r
+\r
+        public string S\r
+        {\r
+            get { return ObjectInfo == null ? null : ObjectInfo.X_Object_Hash; }\r
+        }\r
+\r
+        private FileSystemInfo _fileInfo;\r
+        private TreeHash _merkle;\r
+\r
+        public FileSystemInfo FileInfo\r
+        {\r
+            get { return _fileInfo; }\r
+            set\r
+            {\r
+                _fileInfo = value;\r
+                FilePath = value.FullName;\r
+            }\r
+        }\r
+\r
+        public FileState FileState { get; set; }\r
+        public ObjectInfo ObjectInfo{ get; set; }\r
+\r
+\r
+        public TreeHash Merkle\r
+        {\r
+            get {\r
+                return _merkle;\r
+            }\r
+            set {\r
+                _merkle = value;\r
+                C = _merkle.TopHash.ToHashString();\r
+            }\r
+        }\r
+\r
+        public StateTuple() { }\r
+\r
+        public StateTuple(FileSystemInfo info)\r
+        {\r
+            FileInfo = info;\r
+        }\r
+\r
+\r
+    }\r
+\r
+\r
     /// <summary>\r
     /// PollAgent periodically polls the server to detect object changes. The agent retrieves a listing of all\r
     /// objects and compares it with a previously cached version to detect differences. \r
@@ -67,7 +135,7 @@ namespace Pithos.Core.Agents
     [Export]\r
     public class PollAgent\r
     {\r
-        private static readonly ILog Log = LogManager.GetLogger("PollAgent");\r
+        private static readonly ILog Log = LogManager.GetLogger(MethodBase.GetCurrentMethod().DeclaringType);\r
 \r
         [System.ComponentModel.Composition.Import]\r
         public IStatusKeeper StatusKeeper { get; set; }\r
@@ -78,25 +146,72 @@ namespace Pithos.Core.Agents
         [System.ComponentModel.Composition.Import]\r
         public NetworkAgent NetworkAgent { get; set; }\r
 \r
+        [System.ComponentModel.Composition.Import]\r
+        public Selectives Selectives { get; set; }\r
+\r
         public IStatusNotification StatusNotification { get; set; }\r
 \r
+        private CancellationTokenSource _currentOperationCancellation = new CancellationTokenSource();\r
+\r
+        public void CancelCurrentOperation()\r
+        {\r
+            //What does it mean to cancel the current upload/download?\r
+            //Obviously, the current operation will be cancelled by throwing\r
+            //a cancellation exception.\r
+            //\r
+            //The default behavior is to retry any operations that throw.\r
+            //Obviously this is not what we want in this situation.\r
+            //The cancelled operation should NOT bea retried. \r
+            //\r
+            //This can be done by catching the cancellation exception\r
+            //and avoiding the retry.\r
+            //\r
+\r
+            //Have to reset the cancellation source - it is not possible to reset the source\r
+            //Have to prevent a case where an operation requests a token from the old source\r
+            var oldSource = Interlocked.Exchange(ref _currentOperationCancellation, new CancellationTokenSource());\r
+            oldSource.Cancel();\r
+\r
+        }\r
+\r
+        public bool Pause\r
+        {\r
+            get {\r
+                return _pause;\r
+            }\r
+            set {\r
+                _pause = value;                \r
+                if (!_pause)\r
+                    _unPauseEvent.Set();\r
+                else\r
+                {\r
+                    _unPauseEvent.Reset();\r
+                }\r
+            }\r
+        }\r
+\r
         private bool _firstPoll = true;\r
 \r
         //The Sync Event signals a manual synchronisation\r
         private readonly AsyncManualResetEvent _syncEvent = new AsyncManualResetEvent();\r
 \r
+        private readonly AsyncManualResetEvent _unPauseEvent = new AsyncManualResetEvent(true);\r
+\r
         private readonly ConcurrentDictionary<string, DateTime> _lastSeen = new ConcurrentDictionary<string, DateTime>();\r
-        private readonly ConcurrentDictionary<string, AccountInfo> _accounts = new ConcurrentDictionary<string,AccountInfo>();\r
+        private readonly ConcurrentDictionary<Uri, AccountInfo> _accounts = new ConcurrentDictionary<Uri,AccountInfo>();\r
 \r
 \r
         /// <summary>\r
         /// Start a manual synchronization\r
         /// </summary>\r
-        public void SynchNow()\r
+        public void SynchNow(IEnumerable<string> paths=null)\r
         {            \r
+            _batchQueue.Enqueue(paths);\r
             _syncEvent.Set();\r
         }\r
 \r
+        readonly ConcurrentQueue<IEnumerable<string>> _batchQueue=new ConcurrentQueue<IEnumerable<string>>();\r
+\r
         /// <summary>\r
         /// Remote files are polled periodically. Any changes are processed\r
         /// </summary>\r
@@ -104,10 +219,12 @@ namespace Pithos.Core.Agents
         /// <returns></returns>\r
         public async Task PollRemoteFiles(DateTime? since = null)\r
         {\r
+            if (Log.IsDebugEnabled)\r
+                Log.DebugFormat("Polling changes after [{0}]",since);\r
+\r
             Debug.Assert(Thread.CurrentThread.IsBackground, "Polling Ended up in the main thread!");\r
 \r
-            UpdateStatus(PithosStatus.Syncing);\r
-            StatusNotification.Notify(new PollNotification());\r
+            //GC.Collect();\r
 \r
             using (ThreadContext.Stacks["Retrieve Remote"].Push("All accounts"))\r
             {\r
@@ -115,18 +232,36 @@ namespace Pithos.Core.Agents
                 var nextSince = since;\r
                 try\r
                 {\r
-                    //Next time we will check for all changes since the current check minus 1 second\r
-                    //This is done to ensure there are no discrepancies due to clock differences\r
-                    var current = DateTime.Now.AddSeconds(-1);\r
+                    await _unPauseEvent.WaitAsync();\r
+                    UpdateStatus(PithosStatus.PollSyncing);\r
+\r
+                    var accountBatches=new Dictionary<Uri, IEnumerable<string>>();\r
+                    IEnumerable<string> batch = null;\r
+                    if (_batchQueue.TryDequeue(out batch) && batch != null)\r
+                        foreach (var account in _accounts.Values)\r
+                        {\r
+                            var accountBatch = batch.Where(path => path.IsAtOrBelow(account.AccountPath));\r
+                            accountBatches[account.AccountKey] = accountBatch;\r
+                        }\r
 \r
-                    var tasks = from accountInfo in _accounts.Values\r
-                                select ProcessAccountFiles(accountInfo, since);\r
 \r
-                    await TaskEx.WhenAll(tasks.ToList());\r
+                    IEnumerable<Task<DateTime?>> tasks = new List<Task<DateTime?>>();\r
+                    foreach(var accountInfo in _accounts.Values)\r
+                    {\r
+                        IEnumerable<string> accountBatch ;\r
+                        accountBatches.TryGetValue(accountInfo.AccountKey,out accountBatch);\r
+                        ProcessAccountFiles (accountInfo, accountBatch, since);\r
+                    }\r
+\r
+                    var nextTimes=await TaskEx.WhenAll(tasks.ToList());\r
 \r
                     _firstPoll = false;\r
                     //Reschedule the poll with the current timestamp as a "since" value\r
-                    nextSince = current;\r
+\r
+                    if (nextTimes.Length>0)\r
+                        nextSince = nextTimes.Min();\r
+                    if (Log.IsDebugEnabled)\r
+                        Log.DebugFormat("Next Poll at [{0}]",nextSince);\r
                 }\r
                 catch (Exception ex)\r
                 {\r
@@ -134,7 +269,7 @@ namespace Pithos.Core.Agents
                     //In case of failure retry with the same "since" value\r
                 }\r
 \r
-                UpdateStatus(PithosStatus.InSynch);\r
+                UpdateStatus(PithosStatus.PollComplete);\r
                 //The multiple try blocks are required because we can't have an await call\r
                 //inside a finally block\r
                 //TODO: Find a more elegant solution for reschedulling in the event of an exception\r
@@ -159,9 +294,13 @@ namespace Pithos.Core.Agents
         private async Task<DateTime?> WaitForScheduledOrManualPoll(DateTime? since)\r
         {\r
             var sync = _syncEvent.WaitAsync();\r
-            var wait = TaskEx.Delay(TimeSpan.FromSeconds(Settings.PollingInterval), NetworkAgent.CancellationToken);\r
+            var wait = TaskEx.Delay(TimeSpan.FromSeconds(Settings.PollingInterval));\r
+            \r
             var signaledTask = await TaskEx.WhenAny(sync, wait);\r
-\r
+            \r
+            //Pausing takes precedence over manual sync or awaiting\r
+            _unPauseEvent.Wait();\r
+            \r
             //Wait for network processing to finish before polling\r
             var pauseTask=NetworkAgent.ProceedEvent.WaitAsync();\r
             await TaskEx.WhenAll(signaledTask, pauseTask);\r
@@ -176,7 +315,7 @@ namespace Pithos.Core.Agents
             return since;\r
         }\r
 \r
-        public async Task ProcessAccountFiles(AccountInfo accountInfo, DateTime? since = null)\r
+        public async Task<DateTime?> ProcessAccountFiles(AccountInfo accountInfo, IEnumerable<string> accountBatch, DateTime? since = null)\r
         {\r
             if (accountInfo == null)\r
                 throw new ArgumentNullException("accountInfo");\r
@@ -185,8 +324,9 @@ namespace Pithos.Core.Agents
             Contract.EndContractBlock();\r
 \r
 \r
-            using (log4net.ThreadContext.Stacks["Retrieve Remote"].Push(accountInfo.UserName))\r
+            using (ThreadContext.Stacks["Retrieve Remote"].Push(accountInfo.UserName))\r
             {\r
+\r
                 await NetworkAgent.GetDeleteAwaiter();\r
 \r
                 Log.Info("Scheduled");\r
@@ -200,6 +340,11 @@ namespace Pithos.Core.Agents
 \r
                 CreateContainerFolders(accountInfo, containers);\r
 \r
+                //The nextSince time fallback time is the same as the current.\r
+                //If polling succeeds, the next Since time will be the smallest of the maximum modification times\r
+                //of the shared and account objects\r
+                var nextSince = since;\r
+\r
                 try\r
                 {\r
                     //Wait for any deletions to finish\r
@@ -218,17 +363,23 @@ namespace Pithos.Core.Agents
                     listObjects.Add(listShared);\r
                     var listTasks = await Task.Factory.WhenAll(listObjects.ToArray());\r
 \r
-                    using (log4net.ThreadContext.Stacks["SCHEDULE"].Push("Process Results"))\r
+                    using (ThreadContext.Stacks["SCHEDULE"].Push("Process Results"))\r
                     {\r
                         var dict = listTasks.ToDictionary(t => t.AsyncState);\r
 \r
                         //Get all non-trash objects. Remember, the container name is stored in AsyncState\r
-                        var remoteObjects = from objectList in listTasks\r
+                        var remoteObjects = (from objectList in listTasks\r
                                             where (string)objectList.AsyncState != "trash"\r
                                             from obj in objectList.Result\r
-                                            select obj;\r
+                                            orderby obj.Bytes ascending \r
+                                            select obj).ToList();\r
+                        \r
+                        //Get the latest remote object modification date, only if it is after\r
+                        //the original since date\r
+                        nextSince = GetLatestDateAfter(nextSince, remoteObjects);\r
 \r
                         var sharedObjects = dict["shared"].Result;\r
+                        nextSince = GetLatestDateBefore(nextSince, sharedObjects);\r
 \r
                         //DON'T process trashed files\r
                         //If some files are deleted and added again to a folder, they will be deleted\r
@@ -242,7 +393,7 @@ namespace Pithos.Core.Agents
                                         where\r
                                             !remoteObjects.Any(\r
                                                 info => info.Name == trash.Name && info.Hash == trash.Hash)\r
-                                        select trash;\r
+                                   8     select trash;\r
                         ProcessTrashedFiles(accountInfo, realTrash);\r
 */\r
 \r
@@ -253,48 +404,422 @@ namespace Pithos.Core.Agents
                                                                    StringComparison.InvariantCultureIgnoreCase)\r
                                             select info).ToList();\r
 \r
-                        var differencer = _differencer.PostSnapshot(accountInfo, cleanRemotes);\r
+                        if (_firstPoll)\r
+                            StatusKeeper.CleanupOrphanStates();\r
+                        StatusKeeper.CleanupStaleStates(accountInfo, cleanRemotes);\r
+                        \r
+                        //var differencer = _differencer.PostSnapshot(accountInfo, cleanRemotes);\r
+\r
+                        //var filterUris = Selectives.SelectiveUris[accountInfo.AccountKey];\r
+\r
 \r
-                        ProcessDeletedFiles(accountInfo, differencer.Deleted.FilterDirectlyBelow(SelectiveUris));\r
+                        //Get the local files here                        \r
+                        var agent = AgentLocator<FileAgent>.Get(accountInfo.AccountPath);\r
 \r
-                        // @@@ NEED To add previous state here as well, To compare with previous hash\r
+                        var files = LoadLocalFileTuples(accountInfo);\r
 \r
+                        var states = FileState.Queryable.ToList();                        \r
                         \r
 \r
-                        //Create a list of actions from the remote files\r
-                        var allActions = MovesToActions(accountInfo,differencer.Moved.FilterDirectlyBelow(SelectiveUris))\r
-                                        .Union(\r
-                                        ChangesToActions(accountInfo, differencer.Changed.FilterDirectlyBelow(SelectiveUris)))\r
-                                        .Union(\r
-                                        CreatesToActions(accountInfo, differencer.Created.FilterDirectlyBelow(SelectiveUris)));\r
+                        var infos = (from remote in cleanRemotes\r
+                                    let path = remote.RelativeUrlToFilePath(accountInfo.UserName)\r
+                                    let info=agent.GetFileSystemInfo(path)\r
+                                    select Tuple.Create(info.FullName,remote))\r
+                                    .ToList();\r
 \r
-                        //And remove those that are already being processed by the agent\r
-                        var distinctActions = allActions\r
-                            .Except(NetworkAgent.GetEnumerable(), new PithosMonitor.LocalFileComparer())\r
-                            .ToList();\r
+                        var token = _currentOperationCancellation.Token;\r
 \r
-                        //Queue all the actions\r
-                        foreach (var message in distinctActions)\r
+                        var tuples = MergeSources(infos, files, states).ToList();\r
+\r
+\r
+                        var stateTuples = accountBatch==null?tuples:tuples.Where(t => accountBatch.Contains(t.FilePath));\r
+                        foreach (var tuple in stateTuples)\r
                         {\r
-                            NetworkAgent.Post(message);\r
+                            await _unPauseEvent.WaitAsync();\r
+\r
+                            //Set the Merkle Hash\r
+                            SetMerkleHash(accountInfo, tuple);\r
+\r
+                            SyncSingleItem(accountInfo, tuple, agent, token);\r
+\r
                         }\r
 \r
+\r
+                        //On the first run\r
+/*\r
+                        if (_firstPoll)\r
+                        {\r
+                            MarkSuspectedDeletes(accountInfo, cleanRemotes);\r
+                        }\r
+*/\r
+\r
+\r
                         Log.Info("[LISTENER] End Processing");\r
                     }\r
                 }\r
                 catch (Exception ex)\r
                 {\r
                     Log.ErrorFormat("[FAIL] ListObjects for{0} in ProcessRemoteFiles with {1}", accountInfo.UserName, ex);\r
-                    return;\r
+                    return nextSince;\r
                 }\r
 \r
                 Log.Info("[LISTENER] Finished");\r
+                return nextSince;\r
+            }\r
+        }\r
+\r
+        private static void SetMerkleHash(AccountInfo accountInfo, StateTuple tuple)\r
+        {\r
+            //The Merkle hash for directories is that of an empty buffer\r
+            if (tuple.FileInfo is DirectoryInfo)\r
+                tuple.C = MERKLE_EMPTY;\r
+            else if (tuple.FileState != null && tuple.MD5 == tuple.FileState.ShortHash)\r
+            {\r
+                //If there is a state whose MD5 matches, load the merkle hash from the file state\r
+                //insteaf of calculating it\r
+                tuple.C = tuple.FileState.Checksum;                              \r
+            }\r
+            else\r
+            {\r
+                tuple.Merkle = Signature.CalculateTreeHash(tuple.FileInfo, accountInfo.BlockSize, accountInfo.BlockHash);\r
+                //tuple.C=tuple.Merkle.TopHash.ToHashString();                \r
+            }\r
+        }\r
+\r
+        private static List<Tuple<FileSystemInfo, string>> LoadLocalFileTuples(AccountInfo accountInfo)\r
+        {\r
+            using (ThreadContext.Stacks["Account Files Hashing"].Push(accountInfo.UserName))\r
+            {\r
+\r
+                var localInfos = AgentLocator<FileAgent>.Get(accountInfo.AccountPath).EnumerateFileSystemInfos();\r
+                //Use the queue to retry locked file hashing\r
+                var fileQueue = new Queue<FileSystemInfo>(localInfos);\r
+                var hasher = MD5.Create();\r
+\r
+                var results = new List<Tuple<FileSystemInfo, string>>();\r
+\r
+                while (fileQueue.Count > 0)\r
+                {\r
+                    var file = fileQueue.Dequeue();\r
+                    using (ThreadContext.Stacks["File"].Push(file.FullName))\r
+                    {\r
+                        /*\r
+                                                Signature.CalculateTreeHash(file, accountInfo.BlockSize,\r
+                                                                                                 accountInfo.BlockHash).\r
+                                                                         TopHash.ToHashString()\r
+                        */\r
+                        try\r
+                        {\r
+                            //Replace MD5 here, do the calc while syncing individual files\r
+                            string hash ;\r
+                            if (file is DirectoryInfo)\r
+                                hash = MERKLE_EMPTY;\r
+                            else\r
+                            {\r
+                                using (var stream = (file as FileInfo).OpenRead())\r
+                                {\r
+                                    hash = hasher.ComputeHash(stream).ToHashString();\r
+                                }\r
+                            }                            \r
+                            results.Add(Tuple.Create(file, hash));\r
+                        }\r
+                        catch (IOException exc)\r
+                        {\r
+                            Log.WarnFormat("[HASH] File in use, will retry [{0}]", exc);\r
+                            fileQueue.Enqueue(file);\r
+                        }\r
+                    }\r
+                }\r
+\r
+                return results;\r
+            }\r
+        }\r
+\r
+        private void SyncSingleItem(AccountInfo accountInfo, StateTuple tuple, FileAgent agent, CancellationToken token)\r
+        {\r
+            Log.DebugFormat("Sync [{0}] C:[{1}] L:[{2}] S:[{3}]",tuple.FilePath,tuple.C,tuple.L,tuple.S);\r
+\r
+            var localFilePath = tuple.FilePath;\r
+            //Don't use the tuple info, it may have been deleted\r
+            var localInfo = FileInfoExtensions.FromPath(localFilePath);\r
+\r
+\r
+            // Local file unchanged? If both C and L are null, make sure it's because \r
+            //both the file is missing and the state checksum is not missing\r
+            if (tuple.C == tuple.L && (localInfo.Exists || tuple.FileState==null))\r
+            {\r
+                //No local changes\r
+                //Server unchanged?\r
+                if (tuple.S == tuple.L)\r
+                {\r
+                    // No server changes\r
+                    //Has the file been renamed on the server?\r
+                    MoveForServerMove(accountInfo, tuple);\r
+                }\r
+                else\r
+                {\r
+                    //Different from server\r
+                    if (Selectives.IsSelected(accountInfo, localFilePath))\r
+                    {\r
+                        //Does the server file exist?\r
+                        if (tuple.S == null)\r
+                        {\r
+                            //Server file doesn't exist\r
+                            //deleteObjectFromLocal()\r
+                            StatusKeeper.SetFileState(localFilePath, FileStatus.Deleted,\r
+                                                      FileOverlayStatus.Deleted, "");\r
+                            agent.Delete(localFilePath);\r
+                            //updateRecord(Remove C, L)\r
+                            StatusKeeper.ClearFileStatus(localFilePath);\r
+                        }\r
+                        else\r
+                        {\r
+                            //Server file exists\r
+                            //downloadServerObject() // Result: L = S\r
+                            //If the file has moved on the server, move it locally before downloading\r
+                            var targetPath=MoveForServerMove(accountInfo,tuple);\r
+\r
+                            StatusKeeper.SetFileState(targetPath, FileStatus.Modified,\r
+                                                      FileOverlayStatus.Modified, "");\r
+                            NetworkAgent.Downloader.DownloadCloudFile(accountInfo,\r
+                                                                            tuple.ObjectInfo,\r
+                                                                            targetPath,tuple.Merkle, token).Wait(token);\r
+                            //updateRecord( L = S )\r
+                            StatusKeeper.UpdateFileChecksum(targetPath, tuple.ObjectInfo.ETag,\r
+                                                            tuple.ObjectInfo.X_Object_Hash);\r
+\r
+                            StatusKeeper.StoreInfo(targetPath, tuple.ObjectInfo);\r
+\r
+/*\r
+                            StatusKeeper.SetFileState(targetPath, FileStatus.Unchanged,\r
+                                                      FileOverlayStatus.Normal, "");\r
+*/\r
+                        }\r
+                    }\r
+                }\r
+            }\r
+            else\r
+            {\r
+                //Local changes found\r
+\r
+                //Server unchanged?\r
+                if (tuple.S == tuple.L)\r
+                {\r
+                    //The FileAgent selective sync checks for new root folder files\r
+                    if (!agent.Ignore(localFilePath))\r
+                    {\r
+                        if ((tuple.C == null || !localInfo.Exists) && tuple.ObjectInfo != null)\r
+                        {\r
+                            //deleteObjectFromServer()\r
+                            DeleteCloudFile(accountInfo, tuple);\r
+                            //updateRecord( Remove L, S)                  \r
+                        }\r
+                        else\r
+                        {\r
+                            //uploadLocalObject() // Result: S = C, L = S                        \r
+                            var isUnselected = agent.IsUnselectedRootFolder(tuple.FilePath);\r
+\r
+                            //Debug.Assert(tuple.FileState !=null);\r
+                            var action = new CloudUploadAction(accountInfo, localInfo, tuple.FileState,\r
+                                                               accountInfo.BlockSize, accountInfo.BlockHash,\r
+                                                               "Poll", isUnselected);\r
+                            NetworkAgent.Uploader.UploadCloudFile(action, tuple.Merkle,token).Wait(token);\r
+\r
+                            //updateRecord( S = C )\r
+                            //State updated by the uploader\r
+                            \r
+                            if (isUnselected)\r
+                            {\r
+                                ProcessChildren(accountInfo, tuple, agent, token);\r
+                            }\r
+                        }\r
+                    }\r
+                }\r
+                else\r
+                {\r
+                    if (Selectives.IsSelected(accountInfo, localFilePath))\r
+                    {\r
+                        if (tuple.C == tuple.S)\r
+                        {\r
+                            // (Identical Changes) Result: L = S\r
+                            //doNothing()\r
+                            //Detect server moves\r
+                            var targetPath=MoveForServerMove(accountInfo, tuple);\r
+                            StatusKeeper.StoreInfo(targetPath,tuple.ObjectInfo);\r
+                        }\r
+                        else\r
+                        {\r
+                            if ((tuple.C == null || !localInfo.Exists) && tuple.ObjectInfo != null )\r
+                            {\r
+                                //deleteObjectFromServer()\r
+                                DeleteCloudFile(accountInfo, tuple);\r
+                                //updateRecord(Remove L, S)                  \r
+                            }\r
+                                //If both the local and server files are missing, the state is stale\r
+                            else if (!localInfo.Exists && ( tuple.S==null || tuple.ObjectInfo==null))\r
+                            {\r
+                                StatusKeeper.ClearFileStatus(localInfo.FullName);\r
+                            }\r
+                            else\r
+                            {\r
+                                ReportConflictForMismatch(localFilePath);\r
+                                //identifyAsConflict() // Manual action required\r
+                            }\r
+                        }\r
+                    }\r
+                }\r
+            }\r
+        }\r
 \r
+        private string MoveForServerMove(AccountInfo accountInfo, StateTuple tuple)\r
+        {\r
+            if (tuple.ObjectInfo == null)\r
+                return null;\r
+            var relativePath = tuple.ObjectInfo.RelativeUrlToFilePath(accountInfo.UserName);\r
+            var serverPath = Path.Combine(accountInfo.AccountPath, relativePath);\r
+            \r
+            //Compare Case Insensitive\r
+            if (String.Equals(tuple.FilePath ,serverPath,StringComparison.InvariantCultureIgnoreCase)) return serverPath;\r
+\r
+            if (tuple.FileInfo.Exists)\r
+            {                    \r
+                var fi = tuple.FileInfo as FileInfo;\r
+                if (fi != null)\r
+                    fi.MoveTo(serverPath);\r
+                var di = tuple.FileInfo as DirectoryInfo;\r
+                if (di != null)\r
+                    di.MoveTo(serverPath);\r
+                StatusKeeper.StoreInfo(serverPath, tuple.ObjectInfo);\r
+            }\r
+            else\r
+            {\r
+                Debug.Assert(false, "File does not exist");\r
             }\r
+            return serverPath;\r
+        }\r
+\r
+        private void DeleteCloudFile(AccountInfo accountInfo, StateTuple tuple)\r
+        {\r
+            StatusKeeper.SetFileState(tuple.FilePath, FileStatus.Deleted,\r
+                                      FileOverlayStatus.Deleted, "");\r
+            NetworkAgent.DeleteAgent.DeleteCloudFile(accountInfo, tuple.ObjectInfo);\r
+            StatusKeeper.ClearFileStatus(tuple.FilePath);\r
+        }\r
+\r
+        private void ProcessChildren(AccountInfo accountInfo, StateTuple tuple, FileAgent agent, CancellationToken token)\r
+        {\r
+\r
+            var dirInfo = tuple.FileInfo as DirectoryInfo;\r
+            var folderTuples = from folder in dirInfo.EnumerateDirectories("*", SearchOption.AllDirectories)\r
+                               select new StateTuple(folder);\r
+            var fileTuples = from file in dirInfo.EnumerateFiles("*", SearchOption.AllDirectories)\r
+                             select new StateTuple(file);\r
+            \r
+            //Process folders first, to ensure folders appear on the sever as soon as possible\r
+            folderTuples.ApplyAction(t => SyncSingleItem(accountInfo, t, agent, token));\r
+            \r
+            fileTuples.ApplyAction(t => SyncSingleItem(accountInfo, t, agent, token));\r
+        }\r
+\r
+        private static IEnumerable<StateTuple> MergeSources(\r
+            IEnumerable<Tuple<string, ObjectInfo>> infos, \r
+            IEnumerable<Tuple<FileSystemInfo, string>> files, \r
+            IEnumerable<FileState> states)\r
+        {\r
+            var tuplesByPath = new Dictionary<string, StateTuple>();\r
+            foreach (var file in files)\r
+            {\r
+                var fsInfo = file.Item1;\r
+                var fileHash = fsInfo is DirectoryInfo? MERKLE_EMPTY:file.Item2;\r
+\r
+                tuplesByPath[fsInfo.FullName] = new StateTuple {FileInfo = fsInfo, MD5 = fileHash};\r
+            }\r
+            foreach (var state in states)\r
+            {\r
+                StateTuple hashTuple;\r
+                if (tuplesByPath.TryGetValue(state.FilePath, out hashTuple))\r
+                {\r
+                    hashTuple.FileState = state;\r
+                }\r
+                else\r
+                {\r
+                    var fsInfo = FileInfoExtensions.FromPath(state.FilePath);\r
+                    tuplesByPath[state.FilePath] = new StateTuple {FileInfo = fsInfo, FileState = state};\r
+                }\r
+            }\r
+\r
+            var tuplesByID = tuplesByPath.Values\r
+                .Where(tuple => tuple.FileState != null && tuple.FileState.ObjectID!=null)\r
+                .ToDictionary(tuple=>tuple.FileState.ObjectID,tuple=>tuple);//new Dictionary<Guid, StateTuple>();\r
+\r
+            foreach (var info in infos)\r
+            {\r
+                StateTuple hashTuple;\r
+                var filePath = info.Item1;\r
+                var objectInfo = info.Item2;\r
+                var objectID = objectInfo.UUID;\r
+\r
+                if (tuplesByID.TryGetValue(objectID, out hashTuple))\r
+                {\r
+                    hashTuple.ObjectInfo = objectInfo;                    \r
+                }\r
+                else if (tuplesByPath.TryGetValue(filePath, out hashTuple))\r
+                {\r
+                    hashTuple.ObjectInfo = objectInfo;\r
+                }\r
+                else\r
+                {\r
+                    var fsInfo = FileInfoExtensions.FromPath(filePath);\r
+                    var tuple = new StateTuple {FileInfo = fsInfo, ObjectInfo = objectInfo};\r
+                    tuplesByPath[filePath] = tuple;\r
+                    tuplesByID[objectInfo.UUID] = tuple;\r
+                }\r
+            }\r
+            return tuplesByPath.Values;\r
+        }\r
+\r
+        /// <summary>\r
+        /// Returns the latest LastModified date from the list of objects, but only if it is before\r
+        /// than the threshold value\r
+        /// </summary>\r
+        /// <param name="threshold"></param>\r
+        /// <param name="cloudObjects"></param>\r
+        /// <returns></returns>\r
+        private static DateTime? GetLatestDateBefore(DateTime? threshold, IList<ObjectInfo> cloudObjects)\r
+        {\r
+            DateTime? maxDate = null;\r
+            if (cloudObjects!=null &&  cloudObjects.Count > 0)\r
+                maxDate = cloudObjects.Max(obj => obj.Last_Modified);\r
+            if (maxDate == null || maxDate == DateTime.MinValue)\r
+                return threshold;\r
+            if (threshold == null || threshold == DateTime.MinValue || threshold > maxDate)\r
+                return maxDate;\r
+            return threshold;\r
+        }\r
+\r
+        /// <summary>\r
+        /// Returns the latest LastModified date from the list of objects, but only if it is after\r
+        /// the threshold value\r
+        /// </summary>\r
+        /// <param name="threshold"></param>\r
+        /// <param name="cloudObjects"></param>\r
+        /// <returns></returns>\r
+        private static DateTime? GetLatestDateAfter(DateTime? threshold, IList<ObjectInfo> cloudObjects)\r
+        {\r
+            DateTime? maxDate = null;\r
+            if (cloudObjects!=null &&  cloudObjects.Count > 0)\r
+                maxDate = cloudObjects.Max(obj => obj.Last_Modified);\r
+            if (maxDate == null || maxDate == DateTime.MinValue)\r
+                return threshold;\r
+            if (threshold == null || threshold == DateTime.MinValue || threshold < maxDate)\r
+                return maxDate;\r
+            return threshold;\r
         }\r
 \r
         readonly AccountsDifferencer _differencer = new AccountsDifferencer();\r
-        private List<Uri> _selectiveUris=new List<Uri>();\r
+        private Dictionary<Uri, List<Uri>> _selectiveUris = new Dictionary<Uri, List<Uri>>();\r
+        private bool _pause;\r
+        private static string MERKLE_EMPTY = "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855";\r
 \r
         /// <summary>\r
         /// Deletes local files that are not found in the list of cloud files\r
@@ -311,67 +836,98 @@ namespace Pithos.Core.Agents
                 throw new ArgumentNullException("cloudFiles");\r
             Contract.EndContractBlock();\r
 \r
-            //On the first run\r
-            if (_firstPoll)\r
+            var deletedFiles = new List<FileSystemInfo>();\r
+            foreach (var objectInfo in cloudFiles)\r
             {\r
-                //Only consider files that are not being modified, ie they are in the Unchanged state            \r
-                var deleteCandidates = FileState.Queryable.Where(state =>\r
-                    state.FilePath.StartsWith(accountInfo.AccountPath)\r
-                    && state.FileStatus == FileStatus.Unchanged).ToList();\r
-\r
+                if (Log.IsDebugEnabled)\r
+                    Log.DebugFormat("Handle deleted [{0}]", objectInfo.Uri);\r
+                var relativePath = objectInfo.RelativeUrlToFilePath(accountInfo.UserName);\r
+                var item = FileAgent.GetFileAgent(accountInfo).GetFileSystemInfo(relativePath);\r
+                if (Log.IsDebugEnabled)\r
+                    Log.DebugFormat("Will delete [{0}] for [{1}]", item.FullName, objectInfo.Uri);\r
+                if (item.Exists)\r
+                {\r
+                    if ((item.Attributes & FileAttributes.ReadOnly) == FileAttributes.ReadOnly)\r
+                    {\r
+                        item.Attributes = item.Attributes & ~FileAttributes.ReadOnly;\r
 \r
-                //TODO: filesToDelete must take into account the Others container            \r
-                var filesToDelete = (from deleteCandidate in deleteCandidates\r
-                                     let localFile = FileInfoExtensions.FromPath(deleteCandidate.FilePath)\r
-                                     let relativeFilePath = localFile.AsRelativeTo(accountInfo.AccountPath)\r
-                                     where\r
-                                         !cloudFiles.Any(r => r.RelativeUrlToFilePath(accountInfo.UserName) == relativeFilePath)\r
-                                     select localFile).ToList();\r
+                    }\r
 \r
 \r
+                    Log.DebugFormat("Deleting {0}", item.FullName);\r
 \r
-                //Set the status of missing files to Conflict\r
-                foreach (var item in filesToDelete)\r
-                {\r
-                    //Try to acquire a gate on the file, to take into account files that have been dequeued\r
-                    //and are being processed\r
-                    using (var gate = NetworkGate.Acquire(item.FullName, NetworkOperation.Deleting))\r
-                    {\r
-                        if (gate.Failed)\r
-                            continue;\r
-                        StatusKeeper.SetFileState(item.FullName, FileStatus.Conflict, FileOverlayStatus.Deleted);\r
-                    }\r
+                    var directory = item as DirectoryInfo;\r
+                    if (directory != null)\r
+                        directory.Delete(true);\r
+                    else\r
+                        item.Delete();\r
+                    Log.DebugFormat("Deleted [{0}] for [{1}]", item.FullName, objectInfo.Uri);\r
+                    DateTime lastDate;\r
+                    _lastSeen.TryRemove(item.FullName, out lastDate);\r
+                    deletedFiles.Add(item);\r
                 }\r
-                UpdateStatus(PithosStatus.HasConflicts);\r
-                StatusNotification.NotifyConflicts(filesToDelete, String.Format("{0} local files are missing from Pithos, possibly because they were deleted", filesToDelete.Count));\r
-                StatusNotification.NotifyForFiles(filesToDelete, String.Format("{0} files were deleted", filesToDelete.Count), TraceLevel.Info);\r
+                StatusKeeper.SetFileState(item.FullName, FileStatus.Deleted, FileOverlayStatus.Deleted, "File Deleted");\r
             }\r
-            else\r
+            Log.InfoFormat("[{0}] files were deleted", deletedFiles.Count);\r
+            StatusNotification.NotifyForFiles(deletedFiles, String.Format("{0} files were deleted", deletedFiles.Count),\r
+                                              TraceLevel.Info);\r
+\r
+        }\r
+\r
+        private void MarkSuspectedDeletes(AccountInfo accountInfo, IEnumerable<ObjectInfo> cloudFiles)\r
+        {\r
+//Only consider files that are not being modified, ie they are in the Unchanged state            \r
+            var deleteCandidates = FileState.Queryable.Where(state =>\r
+                                                             state.FilePath.StartsWith(accountInfo.AccountPath)\r
+                                                             && state.FileStatus == FileStatus.Unchanged).ToList();\r
+\r
+\r
+            //TODO: filesToDelete must take into account the Others container            \r
+            var filesToDelete = (from deleteCandidate in deleteCandidates\r
+                                 let localFile = FileInfoExtensions.FromPath(deleteCandidate.FilePath)\r
+                                 let relativeFilePath = localFile.AsRelativeTo(accountInfo.AccountPath)\r
+                                 where\r
+                                     !cloudFiles.Any(r => r.RelativeUrlToFilePath(accountInfo.UserName) == relativeFilePath)\r
+                                 select localFile).ToList();\r
+\r
+\r
+            //Set the status of missing files to Conflict\r
+            foreach (var item in filesToDelete)\r
             {\r
-                var deletedFiles = new List<FileSystemInfo>();\r
-                foreach (var objectInfo in cloudFiles)\r
+                //Try to acquire a gate on the file, to take into account files that have been dequeued\r
+                //and are being processed\r
+                using (var gate = NetworkGate.Acquire(item.FullName, NetworkOperation.Deleting))\r
                 {\r
-                    var relativePath = objectInfo.RelativeUrlToFilePath(accountInfo.UserName);\r
-                    var item = FileAgent.GetFileAgent(accountInfo).GetFileSystemInfo(relativePath);\r
-                    if (item.Exists)\r
-                    {\r
-                        if ((item.Attributes & FileAttributes.ReadOnly) == FileAttributes.ReadOnly)\r
-                        {\r
-                            item.Attributes = item.Attributes & ~FileAttributes.ReadOnly;\r
-\r
-                        }\r
-                        item.Delete();\r
-                        DateTime lastDate;\r
-                        _lastSeen.TryRemove(item.FullName, out lastDate);\r
-                        deletedFiles.Add(item);\r
-                    }\r
-                    StatusKeeper.SetFileState(item.FullName, FileStatus.Deleted, FileOverlayStatus.Deleted);\r
+                    if (gate.Failed)\r
+                        continue;\r
+                    StatusKeeper.SetFileState(item.FullName, FileStatus.Conflict, FileOverlayStatus.Deleted,\r
+                                              "Local file missing from server");\r
                 }\r
-                StatusNotification.NotifyForFiles(deletedFiles, String.Format("{0} files were deleted", deletedFiles.Count), TraceLevel.Info);\r
             }\r
+            UpdateStatus(PithosStatus.HasConflicts);\r
+            StatusNotification.NotifyConflicts(filesToDelete,\r
+                                               String.Format(\r
+                                                   "{0} local files are missing from Pithos, possibly because they were deleted",\r
+                                                   filesToDelete.Count));\r
+            StatusNotification.NotifyForFiles(filesToDelete, String.Format("{0} files were deleted", filesToDelete.Count),\r
+                                              TraceLevel.Info);\r
+        }\r
 \r
+        private void ReportConflictForMismatch(string localFilePath)\r
+        {\r
+            if (String.IsNullOrWhiteSpace(localFilePath))\r
+                throw new ArgumentNullException("localFilePath");\r
+            Contract.EndContractBlock();\r
+\r
+            StatusKeeper.SetFileState(localFilePath, FileStatus.Conflict, FileOverlayStatus.Conflict, "File changed at the server");\r
+            UpdateStatus(PithosStatus.HasConflicts);\r
+            var message = String.Format("Conflict detected for file {0}", localFilePath);\r
+            Log.Warn(message);\r
+            StatusNotification.NotifyChange(message, TraceLevel.Warning);\r
         }\r
 \r
+\r
+\r
         /// <summary>\r
         /// Creates a Sync action for each changed server file\r
         /// </summary>\r
@@ -395,7 +951,7 @@ namespace Pithos.Core.Agents
                 {\r
                     var localFile = fileAgent.GetFileSystemInfo(relativePath);\r
                     //We don't need to sync directories\r
-                    if (objectInfo.Content_Type == @"application/directory" && localFile is DirectoryInfo)\r
+                    if (objectInfo.IsDirectory && localFile is DirectoryInfo)\r
                         continue;\r
                     using (new SessionScope(FlushAction.Never))\r
                     {\r
@@ -405,13 +961,13 @@ namespace Pithos.Core.Agents
 \r
                         yield return new CloudAction(accountInfo, CloudActionType.MustSynch,\r
                                                      localFile, objectInfo, state, accountInfo.BlockSize,\r
-                                                     accountInfo.BlockHash);\r
+                                                     accountInfo.BlockHash,"Poll Changes");\r
                     }\r
                 }\r
                 else\r
                 {\r
                     //Remote files should be downloaded\r
-                    yield return new CloudDownloadAction(accountInfo, objectInfo);\r
+                    yield return new CloudDownloadAction(accountInfo, objectInfo,"Poll Changes");\r
                 }\r
             }\r
         }\r
@@ -446,17 +1002,17 @@ namespace Pithos.Core.Agents
                         //For each moved object we need to move both the local file and update                                                \r
                         yield return new CloudAction(accountInfo, CloudActionType.RenameLocal,\r
                                                      previousFile, objectInfo, state, accountInfo.BlockSize,\r
-                                                     accountInfo.BlockHash);\r
+                                                     accountInfo.BlockHash,"Poll Moves");\r
                         //For modified files, we need to download the changes as well\r
-                        if (objectInfo.Hash!=objectInfo.PreviousHash)\r
-                            yield return new CloudDownloadAction(accountInfo,objectInfo);\r
+                        if (objectInfo.X_Object_Hash != objectInfo.PreviousHash)\r
+                            yield return new CloudDownloadAction(accountInfo,objectInfo, "Poll Moves");\r
                     }\r
                 }\r
                 //If the previous file does not exist, we need to download it in the new location\r
                 else\r
                 {\r
                     //Remote files should be downloaded\r
-                    yield return new CloudDownloadAction(accountInfo, objectInfo);\r
+                    yield return new CloudDownloadAction(accountInfo, objectInfo, "Poll Moves");\r
                 }\r
             }\r
         }\r
@@ -479,19 +1035,26 @@ namespace Pithos.Core.Agents
             //over the remote files\r
             foreach (var objectInfo in creates)\r
             {\r
+                if (Log.IsDebugEnabled)\r
+                    Log.DebugFormat("[NEW INFO] {0}",objectInfo.Uri);\r
+\r
                 var relativePath = objectInfo.RelativeUrlToFilePath(accountInfo.UserName);\r
-                //If the object already exists, we probably have a conflict\r
+\r
+                //If the object already exists, we should check before uploading or downloading\r
                 if (fileAgent.Exists(relativePath))\r
                 {\r
-                    //If a directory object already exists, we don't need to perform any other action                    \r
-                    var localFile = fileAgent.GetFileSystemInfo(relativePath);\r
-                    StatusKeeper.SetFileState(localFile.FullName, FileStatus.Conflict, FileOverlayStatus.Conflict);\r
+                    var localFile= fileAgent.GetFileSystemInfo(relativePath);\r
+                    var state = StatusKeeper.GetStateByFilePath(localFile.WithProperCapitalization().FullName);\r
+                    yield return new CloudAction(accountInfo, CloudActionType.MustSynch,\r
+                                                     localFile, objectInfo, state, accountInfo.BlockSize,\r
+                                                     accountInfo.BlockHash,"Poll Creates");                    \r
                 }\r
                 else\r
                 {\r
                     //Remote files should be downloaded\r
-                    yield return new CloudDownloadAction(accountInfo, objectInfo);\r
+                    yield return new CloudDownloadAction(accountInfo, objectInfo,"Poll Creates");\r
                 }\r
+\r
             }\r
         }\r
 \r
@@ -503,8 +1066,8 @@ namespace Pithos.Core.Agents
         {\r
             try\r
             {\r
-                StatusKeeper.SetPithosStatus(status);\r
-                StatusNotification.Notify(new Notification());\r
+                StatusNotification.SetPithosStatus(status);\r
+                //StatusNotification.Notify(new Notification());\r
             }\r
             catch (Exception exc)\r
             {\r
@@ -526,27 +1089,87 @@ namespace Pithos.Core.Agents
             }\r
         }\r
 \r
-        public void SetSyncUris(Uri[] uris)\r
-        {            \r
-            SelectiveUris=uris.ToList();\r
+        public void AddAccount(AccountInfo accountInfo)\r
+        {\r
+            //Avoid adding a duplicate accountInfo\r
+            _accounts.TryAdd(accountInfo.AccountKey, accountInfo);\r
         }\r
 \r
-        protected List<Uri> SelectiveUris\r
+        public void RemoveAccount(AccountInfo accountInfo)\r
         {\r
-            get { return _selectiveUris;}\r
-            set { _selectiveUris = value; }\r
+            AccountInfo account;\r
+            _accounts.TryRemove(accountInfo.AccountKey, out account);\r
+\r
+            SnapshotDifferencer differencer;\r
+            _differencer.Differencers.TryRemove(accountInfo.AccountKey, out differencer);\r
         }\r
 \r
-        public void AddAccount(AccountInfo accountInfo)\r
+        public void SetSelectivePaths(AccountInfo accountInfo,Uri[] added, Uri[] removed)\r
         {\r
-            //Avoid adding a duplicate accountInfo\r
-            _accounts.TryAdd(accountInfo.UserName, accountInfo);\r
+            AbortRemovedPaths(accountInfo,removed);\r
+            //DownloadNewPaths(accountInfo,added);\r
         }\r
 \r
-        public void RemoveAccount(AccountInfo accountInfo)\r
+/*\r
+        private void DownloadNewPaths(AccountInfo accountInfo, Uri[] added)\r
         {\r
-            AccountInfo account;\r
-            _accounts.TryRemove(accountInfo.UserName,out account);\r
+            var client = new CloudFilesClient(accountInfo);\r
+            foreach (var folderUri in added)\r
+            {\r
+                try\r
+                {\r
+\r
+                    string account;\r
+                    string container;\r
+                    var segmentsCount = folderUri.Segments.Length;\r
+                    //Is this an account URL?\r
+                    if (segmentsCount < 3)\r
+                        continue;\r
+                    //Is this a container or  folder URL?\r
+                    if (segmentsCount == 3)\r
+                    {\r
+                        account = folderUri.Segments[1].TrimEnd('/');\r
+                        container = folderUri.Segments[2].TrimEnd('/');\r
+                    }\r
+                    else\r
+                    {\r
+                        account = folderUri.Segments[2].TrimEnd('/');\r
+                        container = folderUri.Segments[3].TrimEnd('/');\r
+                    }\r
+                    IList<ObjectInfo> items;\r
+                    if (segmentsCount > 3)\r
+                    {\r
+                        //List folder\r
+                        var folder = String.Join("", folderUri.Segments.Splice(4));\r
+                        items = client.ListObjects(account, container, folder);\r
+                    }\r
+                    else\r
+                    {\r
+                        //List container\r
+                        items = client.ListObjects(account, container);\r
+                    }\r
+                    var actions = CreatesToActions(accountInfo, items);\r
+                    foreach (var action in actions)\r
+                    {\r
+                        NetworkAgent.Post(action);\r
+                    }\r
+                }\r
+                catch (Exception exc)\r
+                {\r
+                    Log.WarnFormat("Listing of new selective path [{0}] failed with \r\n{1}", folderUri, exc);\r
+                }\r
+            }\r
+\r
+            //Need to get a listing of each of the URLs, then post them to the NetworkAgent\r
+            //CreatesToActions(accountInfo,)\r
+\r
+/*            NetworkAgent.Post();#1#\r
+        }\r
+*/\r
+\r
+        private void AbortRemovedPaths(AccountInfo accountInfo, Uri[] removed)\r
+        {\r
+            /*this.NetworkAgent.*/\r
         }\r
     }\r
 }\r