Change to Polling agent
authorpkanavos <pkanavos@gmail.com>
Thu, 31 May 2012 19:43:29 +0000 (22:43 +0300)
committerpkanavos <pkanavos@gmail.com>
Thu, 31 May 2012 19:43:29 +0000 (22:43 +0300)
20 files changed:
trunk/Pithos.Client.WPF/Preferences/LoginView.xaml.cs
trunk/Pithos.Client.WPF/Preferences/PreferencesViewModel.cs
trunk/Pithos.Client.WPF/Shell/ShellViewModel.cs
trunk/Pithos.Core/Agents/BlockExtensions.cs
trunk/Pithos.Core/Agents/CollectionExtensions.cs
trunk/Pithos.Core/Agents/DeleteAgent.cs
trunk/Pithos.Core/Agents/FileAgent.cs
trunk/Pithos.Core/Agents/NetworkAgent.cs
trunk/Pithos.Core/Agents/PollAgent.cs
trunk/Pithos.Core/Agents/SelectiveUris.cs
trunk/Pithos.Core/Agents/StatusAgent.cs
trunk/Pithos.Core/Agents/Uploader.cs
trunk/Pithos.Core/EnumerableExtensions.cs
trunk/Pithos.Core/FileState.cs
trunk/Pithos.Core/PithosMonitor.cs
trunk/Pithos.Interfaces/FileInfoExtensions.cs
trunk/Pithos.Network/BlockHashAlgorithms.cs
trunk/Pithos.Network/CloudFilesClient.cs
trunk/Pithos.Network/RestClient.cs
trunk/Pithos.Network/Signature.cs

index c0aa6a9..0df688d 100644 (file)
@@ -65,6 +65,7 @@ namespace Pithos.Client.WPF.Preferences
             _logoutUri = logoutUri;
 
             _loggingOut = true;
+            
             LoginBrowser.Navigate(logoutUri ?? loginUri);
         }
 
index a1000e8..08e4611 100644 (file)
@@ -620,7 +620,8 @@ namespace Pithos.Client.WPF.Preferences
             {\r
                 _currentAccount = value;\r
 \r
-                _currentAccount.PropertyChanged += (o, e) => NotifyOfPropertyChange(() => CanSelectiveSyncFolders);\r
+                if (_currentAccount!=null)\r
+                    _currentAccount.PropertyChanged += (o, e) => NotifyOfPropertyChange(() => CanSelectiveSyncFolders);\r
 \r
                 NotifyOfPropertyChange(() => CurrentAccount);\r
                 NotifyOfPropertyChange(() => CanRemoveAccount);\r
index c5bf87f..7700a81 100644 (file)
@@ -1024,7 +1024,8 @@ namespace Pithos.Client.WPF {
                        var account = Accounts.FirstOrDefault(acc => acc.AccountKey == message.Account.AccountKey);
                        if (account != null)
                        {
-                           this._pollAgent.SetSelectivePaths(account, message.Added, message.Removed);
+                           _pollAgent.SetSelectivePaths(account, message.Added, message.Removed);
+                    _pollAgent.SynchNow();
                        }
                    });
 
index 95b76d9..773754a 100644 (file)
@@ -83,7 +83,7 @@ namespace Pithos.Core.Agents
             if (String.IsNullOrWhiteSpace(algorithm))
                 throw new ArgumentNullException("algorithm");
             Contract.EndContractBlock();
-
+           info.Refresh();
            //The hash for directories is an empty string
            if (info is DirectoryInfo)
                 return String.Empty;
@@ -108,7 +108,7 @@ namespace Pithos.Core.Agents
            if(hasher== null)
                throw new ArgumentNullException("hasher");
            Contract.EndContractBlock();
-
+           info.Refresh();
            if (!info.Exists)
                return String.Empty;
 
index 7282511..bbc24b6 100644 (file)
@@ -56,6 +56,10 @@ namespace Pithos.Core.Agents
     /// </summary>\r
     public static class CollectionExtensions\r
     {\r
+\r
+        \r
+\r
+\r
         public static IEnumerable<T> Replace<T>(this IEnumerable<T> list,Func<T,bool> match, Func<T,IEnumerable<T>> generate)\r
         {\r
             foreach (var item in list)\r
index f7ce256..76e688b 100644 (file)
@@ -201,7 +201,7 @@ namespace Pithos.Core.Agents
             _deleteAgent.Post(action);
         }
 
-        private void DeleteCloudFile(AccountInfo accountInfo, ObjectInfo cloudFile)
+        public void DeleteCloudFile(AccountInfo accountInfo, ObjectInfo cloudFile)
         {
             if (accountInfo == null)
                 throw new ArgumentNullException("accountInfo");
index df3f327..1468761 100644 (file)
@@ -57,9 +57,12 @@ namespace Pithos.Core.Agents
     {
         private static readonly ILog Log = LogManager.GetLogger(MethodBase.GetCurrentMethod().DeclaringType);
 
-        Agent<WorkflowState> _agent;
+        /*
+                Agent<WorkflowState> _agent;
+        */
         private FileSystemWatcher _watcher;
         private FileSystemWatcherAdapter _adapter;
+        private FileEventIdleBatch _eventIdleBatch;
 
         //[Import]
         public IStatusKeeper StatusKeeper { get; set; }
@@ -68,19 +71,24 @@ namespace Pithos.Core.Agents
         //[Import]
         public IPithosWorkflow Workflow { get; set; }
         //[Import]
-        public WorkflowAgent WorkflowAgent { get; set; }
+        //public WorkflowAgent WorkflowAgent { get; set; }
 
         private AccountInfo AccountInfo { get; set; }
 
         internal string RootPath { get;  set; }
         
-        private FileEventIdleBatch _eventIdleBatch;
-
         public TimeSpan IdleTimeout { get; set; }
 
+        public PollAgent PollAgent { get; set; }
 
         private void ProcessBatchedEvents(Dictionary<string, FileSystemEventArgs[]> fileEvents)
         {
+            PollAgent.SynchNow();
+        }
+
+/*
+        private void ProcessBatchedEvents(Dictionary<string, FileSystemEventArgs[]> fileEvents)
+        {
             StatusNotification.SetPithosStatus(PithosStatus.LocalSyncing,String.Format("Uploading {0} files",fileEvents.Count));
             //Start with events that do not originate in one of the ignored folders
             var initialEvents = from evt in fileEvents
@@ -156,6 +164,7 @@ namespace Pithos.Core.Agents
             }
             StatusNotification.SetPithosStatus(PithosStatus.LocalComplete);
         }
+*/
 
         public void Start(AccountInfo accountInfo,string rootPath)
         {
@@ -167,14 +176,14 @@ namespace Pithos.Core.Agents
                 throw new ArgumentException("rootPath must be an absolute path","rootPath");
             if (IdleTimeout == null)
                 throw new InvalidOperationException("IdleTimeout must have a valid value");
-            Contract.EndContractBlock();
+                Contract.EndContractBlock();
 
             AccountInfo = accountInfo;
             RootPath = rootPath;
-
+            
             _eventIdleBatch = new FileEventIdleBatch((int)IdleTimeout.TotalMilliseconds, ProcessBatchedEvents);
 
-            _watcher = new FileSystemWatcher(rootPath) {IncludeSubdirectories = true,InternalBufferSize=8*4096};
+            _watcher = new FileSystemWatcher(rootPath) { IncludeSubdirectories = true, InternalBufferSize = 8 * 4096 };
             _adapter = new FileSystemWatcherAdapter(_watcher);
 
             _adapter.Changed += OnFileEvent;
@@ -184,6 +193,9 @@ namespace Pithos.Core.Agents
             _adapter.Moved += OnMoveEvent;
             _watcher.EnableRaisingEvents = true;
 
+/*            
+
+
 
             _agent = Agent<WorkflowState>.Start(inbox =>
             {
@@ -196,9 +208,10 @@ namespace Pithos.Core.Agents
                         Log.ErrorFormat("[ERROR] File Event Processing:\r{0}", ex));
                 };
                 loop();
-            });
+            });*/
         }
 
+/*
         private Task<object> Process(WorkflowState state)
         {
             if (state==null)
@@ -256,6 +269,7 @@ namespace Pithos.Core.Agents
                     _watcher.EnableRaisingEvents = !value;                
             }
         }
+*/
 
         public string CachePath { get; set; }
 
@@ -269,6 +283,7 @@ namespace Pithos.Core.Agents
         public Selectives Selectives { get; set; }
 
 
+/*
         public void Post(WorkflowState workflowState)
         {
             if (workflowState == null)
@@ -290,12 +305,14 @@ namespace Pithos.Core.Agents
                 _agent.Stop();
         }
 
+*/
         // Enumerate all files in the Pithos directory except those in the Fragment folder
         // and files with a .ignore extension
         public IEnumerable<string> EnumerateFiles(string searchPattern="*")
         {
             var monitoredFiles = from filePath in Directory.EnumerateFileSystemEntries(RootPath, searchPattern, SearchOption.AllDirectories)
                                  where !Ignore(filePath)
+                                 orderby filePath ascending 
                                  select filePath;
             return monitoredFiles;
         }
@@ -305,15 +322,33 @@ namespace Pithos.Core.Agents
             var rootDir = new DirectoryInfo(RootPath);
             var monitoredFiles = from file in rootDir.EnumerateFiles(searchPattern, SearchOption.AllDirectories)
                                  where !Ignore(file.FullName)
+                                 orderby file.FullName ascending 
                                  select file;
             return monitoredFiles;
         }                
 
+        public IEnumerable<FileSystemInfo> EnumerateFileSystemInfos(string searchPattern="*")
+        {
+            var rootDir = new DirectoryInfo(RootPath);
+            //Ensure folders appear first, to allow folder processing as soon as possilbe
+            var monitoredFiles = (from file in rootDir.EnumerateDirectories(searchPattern, SearchOption.AllDirectories)
+                                 where !Ignore(file.FullName)
+                                 orderby file.FullName ascending 
+                                 select file)
+                                 //Process small files first, leaving expensive large files for last
+                                 .Concat(from file in rootDir.EnumerateFiles(searchPattern, SearchOption.AllDirectories)
+                                         where !Ignore(file.FullName)
+                                         orderby file.Length ascending 
+                                         select file as FileSystemInfo);
+            return monitoredFiles;
+        }                
+
         public IEnumerable<string> EnumerateFilesAsRelativeUrls(string searchPattern="*")
         {
             var rootDir = new DirectoryInfo(RootPath);
             var monitoredFiles = from file in rootDir.EnumerateFiles(searchPattern, SearchOption.AllDirectories)
                                  where !Ignore(file.FullName)
+                                 orderby file.FullName ascending 
                                  select file.AsRelativeUrlTo(RootPath);
             return monitoredFiles;
         }                
@@ -323,6 +358,7 @@ namespace Pithos.Core.Agents
             var rootDir = new DirectoryInfo(RootPath);
             var monitoredFiles = from file in rootDir.EnumerateFileSystemInfos(searchPattern, SearchOption.AllDirectories)
                                  where !Ignore(file.FullName)
+                                 orderby file.FullName ascending 
                                  select file.AsRelativeUrlTo(RootPath);
             return monitoredFiles;
         }                
@@ -330,24 +366,27 @@ namespace Pithos.Core.Agents
 
         
 
-        private bool Ignore(string filePath)
+        public bool Ignore(string filePath)
         {
             if (IgnorePaths(filePath)) return true;
 
 
             //If selective sync is enabled, 
-            if (Selectives.IsSelectiveEnabled(AccountInfo.AccountKey) 
-                //propagate folder events 
-                && Directory.Exists(filePath)
-                //from the container root folder only. Note, in the first level below the account root path are the containers
-                && FoundBelowRoot(filePath, RootPath, 2))
+            if (IsUnselectedRootFolder(filePath))
                     return false;
             //Ignore if selective synchronization is defined, 
             //And the target file is not below any of the selective paths
             return !Selectives.IsSelected(AccountInfo, filePath);
         }
 
-        private bool IgnorePaths(string filePath)
+        public bool IsUnselectedRootFolder(string filePath)
+        {
+            return Selectives.IsSelectiveEnabled(AccountInfo.AccountKey) //propagate folder events 
+                   && Directory.Exists(filePath) //from the container root folder only. Note, in the first level below the account root path are the containers
+                   && FoundBelowRoot(filePath, RootPath, 2);
+        }
+
+        public bool IgnorePaths(string filePath)
         {
 //Ignore all first-level directories and files (ie at the container folders level)
             if (FoundBelowRoot(filePath, RootPath, 1))
@@ -415,18 +454,7 @@ namespace Pithos.Core.Agents
             return false;            
         }
 
-        //Post a Change message for all events except rename
-        void OnFileEvent(object sender, FileSystemEventArgs e)
-        {
-            //Ignore events that affect the cache folder
-            var filePath = e.FullPath;
-            if (Ignore(filePath)) 
-                return;
-            _eventIdleBatch.Post(e);
-        }
-
-
-/*
+        /*
         //Post a Change message for renames containing the old and new names
         void OnRenameEvent(object sender, RenamedEventArgs e)
         {
@@ -445,7 +473,17 @@ namespace Pithos.Core.Agents
                 TriggeringChange = e.ChangeType
             });
         }
-*/
+        */
+
+        //Post a Change message for all events except rename
+        void OnFileEvent(object sender, FileSystemEventArgs e)
+        {
+            //Ignore events that affect the cache folder
+            var filePath = e.FullPath;
+            if (Ignore(filePath))
+                return;
+            _eventIdleBatch.Post(e);
+        }
 
         //Post a Change message for moves containing the old and new names
         void OnMoveEvent(object sender, MovedEventArgs e)
index f292299..ece050a 100644 (file)
@@ -66,7 +66,7 @@ namespace Pithos.Core.Agents
         private Agent<CloudAction> _agent;
 
         [System.ComponentModel.Composition.Import]
-        private DeleteAgent DeleteAgent { get; set; }
+        public DeleteAgent DeleteAgent { get; set; }
 
         [System.ComponentModel.Composition.Import]
         public IStatusKeeper StatusKeeper { get; set; }
index ba1d799..d125987 100644 (file)
@@ -45,6 +45,7 @@ using System.ComponentModel.Composition;
 using System.Diagnostics;\r
 using System.Diagnostics.Contracts;\r
 using System.IO;\r
+using System.Linq.Expressions;\r
 using System.Reflection;\r
 using System.Threading;\r
 using System.Threading.Tasks;\r
@@ -59,6 +60,48 @@ namespace Pithos.Core.Agents
     using System.Collections.Generic;\r
     using System.Linq;\r
 \r
+    [DebuggerDisplay("{FilePath} C:{C} L:{L} S:{S}")]\r
+    public class StateTuple\r
+    {\r
+        public string FilePath { get; private set; }\r
+\r
+        public string L\r
+        {\r
+            get { return FileState==null?null:FileState.Checksum; }\r
+        }\r
+\r
+        public string C { get; set; }\r
+\r
+        public string S\r
+        {\r
+            get { return ObjectInfo== null ? null : ObjectInfo.Hash; }\r
+        }\r
+\r
+        private FileSystemInfo _fileInfo;\r
+        public FileSystemInfo FileInfo\r
+        {\r
+            get { return _fileInfo; }\r
+            set\r
+            {\r
+                _fileInfo = value;\r
+                FilePath = value.FullName;\r
+            }\r
+        }\r
+\r
+        public FileState FileState { get; set; }\r
+        public ObjectInfo ObjectInfo{ get; set; }\r
+\r
+        public StateTuple() { }\r
+\r
+        public StateTuple(FileSystemInfo info)\r
+        {\r
+            FileInfo = info;\r
+        }\r
+\r
+\r
+    }\r
+\r
+\r
     /// <summary>\r
     /// PollAgent periodically polls the server to detect object changes. The agent retrieves a listing of all\r
     /// objects and compares it with a previously cached version to detect differences. \r
@@ -84,6 +127,29 @@ namespace Pithos.Core.Agents
 \r
         public IStatusNotification StatusNotification { get; set; }\r
 \r
+        private CancellationTokenSource _currentOperationCancellation = new CancellationTokenSource();\r
+\r
+        public void CancelCurrentOperation()\r
+        {\r
+            //What does it mean to cancel the current upload/download?\r
+            //Obviously, the current operation will be cancelled by throwing\r
+            //a cancellation exception.\r
+            //\r
+            //The default behavior is to retry any operations that throw.\r
+            //Obviously this is not what we want in this situation.\r
+            //The cancelled operation should NOT bea retried. \r
+            //\r
+            //This can be done by catching the cancellation exception\r
+            //and avoiding the retry.\r
+            //\r
+\r
+            //Have to reset the cancellation source - it is not possible to reset the source\r
+            //Have to prevent a case where an operation requests a token from the old source\r
+            var oldSource = Interlocked.Exchange(ref _currentOperationCancellation, new CancellationTokenSource());\r
+            oldSource.Cancel();\r
+\r
+        }\r
+\r
         public bool Pause\r
         {\r
             get {\r
@@ -264,6 +330,7 @@ namespace Pithos.Core.Agents
                         var remoteObjects = (from objectList in listTasks\r
                                             where (string)objectList.AsyncState != "trash"\r
                                             from obj in objectList.Result\r
+                                            orderby obj.Bytes ascending \r
                                             select obj).ToList();\r
                         \r
                         //Get the latest remote object modification date, only if it is after\r
@@ -300,42 +367,46 @@ namespace Pithos.Core.Agents
                             StatusKeeper.CleanupOrphanStates();\r
                         StatusKeeper.CleanupStaleStates(accountInfo, cleanRemotes);\r
                         \r
-                        var differencer = _differencer.PostSnapshot(accountInfo, cleanRemotes);\r
+                        //var differencer = _differencer.PostSnapshot(accountInfo, cleanRemotes);\r
 \r
                         var filterUris = Selectives.SelectiveUris[accountInfo.AccountKey];\r
 \r
 \r
-                        //On the first run\r
-                        if (_firstPoll)\r
-                        {\r
-                            MarkSuspectedDeletes(accountInfo, cleanRemotes);\r
-                        }\r
-                        ProcessDeletedFiles(accountInfo, differencer.Deleted.FilterDirectlyBelow(filterUris));\r
+                        //Get the local files here                        \r
+                        var agent = AgentLocator<FileAgent>.Get(accountInfo.AccountPath);\r
 \r
-                        // @@@ NEED To add previous state here as well, To compare with previous hash\r
+                        var files = LoadLocalFileTuples(accountInfo);\r
 \r
-                        \r
+                        var states = FileState.Queryable.ToList();\r
+\r
+                        var infos = (from remote in cleanRemotes\r
+                                    let path = remote.RelativeUrlToFilePath(accountInfo.UserName)\r
+                                    let info=agent.GetFileSystemInfo(path)\r
+                                    select Tuple.Create(info.FullName,remote))\r
+                                    .ToList();\r
+\r
+                        var token = _currentOperationCancellation.Token;\r
+\r
+                        var tuples = MergeSources(infos, files, states).ToList();\r
 \r
-                        //Create a list of actions from the remote files\r
                         \r
-                        var allActions = MovesToActions(accountInfo,differencer.Moved.FilterDirectlyBelow(filterUris))\r
-                                        .Union(\r
-                                        ChangesToActions(accountInfo, differencer.Changed.FilterDirectlyBelow(filterUris)))\r
-                                        .Union(\r
-                                        CreatesToActions(accountInfo, differencer.Created.FilterDirectlyBelow(filterUris)));\r
-\r
-                        //And remove those that are already being processed by the agent\r
-                        var distinctActions = allActions\r
-                            .Except(NetworkAgent.GetEnumerable(), new LocalFileComparer())\r
-                            .ToList();\r
-\r
-                        await _unPauseEvent.WaitAsync();\r
-                        //Queue all the actions\r
-                        foreach (var message in distinctActions)\r
+                        foreach (var tuple in tuples)\r
                         {\r
-                            NetworkAgent.Post(message);\r
+                            await _unPauseEvent.WaitAsync();\r
+                            \r
+                            SyncSingleItem(accountInfo, tuple, agent, token);\r
                         }\r
 \r
+\r
+                        //On the first run\r
+/*\r
+                        if (_firstPoll)\r
+                        {\r
+                            MarkSuspectedDeletes(accountInfo, cleanRemotes);\r
+                        }\r
+*/\r
+\r
+\r
                         Log.Info("[LISTENER] End Processing");\r
                     }\r
                 }\r
@@ -350,6 +421,234 @@ namespace Pithos.Core.Agents
             }\r
         }\r
 \r
+        private static List<Tuple<FileSystemInfo, string>> LoadLocalFileTuples(AccountInfo accountInfo)\r
+        {\r
+            using (ThreadContext.Stacks["Account Files Hashing"].Push(accountInfo.UserName))\r
+            {\r
+\r
+                var localInfos = AgentLocator<FileAgent>.Get(accountInfo.AccountPath).EnumerateFileSystemInfos();\r
+                //Use the queue to retry locked file hashing\r
+                var fileQueue = new Queue<FileSystemInfo>(localInfos);\r
+\r
+                var results = new List<Tuple<FileSystemInfo, string>>();\r
+\r
+                while (fileQueue.Count > 0)\r
+                {\r
+                    var file = fileQueue.Dequeue();\r
+                    using (ThreadContext.Stacks["File"].Push(file.FullName))\r
+                    {\r
+                        try\r
+                        {\r
+                            var hash = (file is DirectoryInfo)\r
+                                           ? "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855"\r
+                                           : Signature.CalculateTreeHash(file, accountInfo.BlockSize,\r
+                                                                         accountInfo.BlockHash)\r
+                                                 .\r
+                                                 TopHash.ToHashString();\r
+                            results.Add(Tuple.Create(file, hash));\r
+                        }\r
+                        catch (IOException exc)\r
+                        {\r
+                            Log.WarnFormat("[HASH] File in use, will retry [{0}]", exc);\r
+                            fileQueue.Enqueue(file);\r
+                        }\r
+                    }\r
+                }\r
+\r
+                return results;\r
+            }\r
+        }\r
+\r
+        private void SyncSingleItem(AccountInfo accountInfo, StateTuple tuple, FileAgent agent, CancellationToken token)\r
+        {\r
+            Log.DebugFormat("Sync [{0}] C:[{1}] L:[{2}] S:[{3}]",tuple.FilePath,tuple.C,tuple.L,tuple.S);\r
+\r
+            var localFilePath = tuple.FilePath;\r
+            //Don't use the tuple info, it may have been deleted\r
+            var localInfo = FileInfoExtensions.FromPath(localFilePath);\r
+\r
+            // Local file unchanged? If both C and L are null, make sure it's because \r
+            //both the file is missing and the state checksum is not missing\r
+            if (tuple.C == tuple.L && (localInfo.Exists || tuple.FileState==null))\r
+            {\r
+                //No local changes\r
+                //Server unchanged?\r
+                if (tuple.S == tuple.L)\r
+                {\r
+                    // No server changes\r
+                    ;\r
+                }\r
+                else\r
+                {\r
+                    //Different from server\r
+                    if (Selectives.IsSelected(accountInfo, localFilePath))\r
+                    {\r
+                        //Does the server file exist?\r
+                        if (tuple.S == null)\r
+                        {\r
+                            //Server file doesn't exist\r
+                            //deleteObjectFromLocal()\r
+                            StatusKeeper.SetFileState(localFilePath, FileStatus.Deleted,\r
+                                                      FileOverlayStatus.Deleted, "");\r
+                            agent.Delete(localFilePath);\r
+                            //updateRecord(Remove C, L)\r
+                            StatusKeeper.ClearFileStatus(localFilePath);\r
+                        }\r
+                        else\r
+                        {\r
+                            //Server file exists\r
+                            //downloadServerObject() // Result: L = S\r
+                            StatusKeeper.SetFileState(localFilePath, FileStatus.Modified,\r
+                                                      FileOverlayStatus.Modified, "");\r
+                            NetworkAgent.Downloader.DownloadCloudFile(accountInfo,\r
+                                                                            tuple.ObjectInfo,\r
+                                                                            localFilePath, token).Wait(token);\r
+                            //updateRecord( L = S )\r
+                            StatusKeeper.UpdateFileChecksum(localFilePath, tuple.FileState==null?"":tuple.FileState.ShortHash,\r
+                                                            tuple.ObjectInfo.Hash);\r
+\r
+                            StatusKeeper.SetFileState(localFilePath, FileStatus.Unchanged,\r
+                                                      FileOverlayStatus.Normal, "");\r
+                        }\r
+                    }\r
+                }\r
+            }\r
+            else\r
+            {\r
+                //Local changes found\r
+\r
+                //Server unchanged?\r
+                if (tuple.S == tuple.L)\r
+                {\r
+                    //The FileAgent selective sync checks for new root folder files\r
+                    if (!agent.Ignore(localFilePath))\r
+                    {\r
+                        if ((tuple.C == null || !localInfo.Exists) && tuple.ObjectInfo != null)\r
+                        {\r
+                            //deleteObjectFromServer()\r
+                            DeleteCloudFile(accountInfo, tuple);\r
+                            //updateRecord( Remove L, S)                  \r
+                        }\r
+                        else\r
+                        {\r
+                            //uploadLocalObject() // Result: S = C, L = S                        \r
+                            var isUnselected = agent.IsUnselectedRootFolder(tuple.FilePath);\r
+\r
+                            //Debug.Assert(tuple.FileState !=null);\r
+                            var action = new CloudUploadAction(accountInfo, localInfo, tuple.FileState,\r
+                                                               accountInfo.BlockSize, accountInfo.BlockHash,\r
+                                                               "Poll", isUnselected);\r
+                            NetworkAgent.Uploader.UploadCloudFile(action, token).Wait(token);\r
+\r
+\r
+                            //updateRecord( S = C )\r
+                            StatusKeeper.SetFileState(localFilePath, FileStatus.Unchanged,\r
+                                                      FileOverlayStatus.Normal, "");\r
+                            if (isUnselected)\r
+                            {\r
+                                ProcessChildren(accountInfo, tuple, agent, token);\r
+                            }\r
+                        }\r
+                    }\r
+                }\r
+                else\r
+                {\r
+                    if (Selectives.IsSelected(accountInfo, localFilePath))\r
+                    {\r
+                        if (tuple.C == tuple.S)\r
+                        {\r
+                            // (Identical Changes) Result: L = S\r
+                            //doNothing()\r
+                            StatusKeeper.UpdateFileChecksum(localFilePath, tuple.FileState == null ? "" : tuple.FileState.ShortHash,\r
+                                                            tuple.ObjectInfo.Hash);\r
+                            StatusKeeper.SetFileState(localFilePath, FileStatus.Unchanged,\r
+                                                      FileOverlayStatus.Normal, "");\r
+                        }\r
+                        else\r
+                        {\r
+                            if ((tuple.C == null || !localInfo.Exists) && tuple.ObjectInfo != null )\r
+                            {\r
+                                //deleteObjectFromServer()\r
+                                DeleteCloudFile(accountInfo, tuple);\r
+                                //updateRecord(Remove L, S)                  \r
+                            }\r
+                            else\r
+                            {\r
+                                ReportConflictForMismatch(localFilePath);\r
+                                //identifyAsConflict() // Manual action required\r
+                            }\r
+                        }\r
+                    }\r
+                }\r
+            }\r
+        }\r
+\r
+        private void DeleteCloudFile(AccountInfo accountInfo, StateTuple tuple)\r
+        {\r
+            StatusKeeper.SetFileState(tuple.FilePath, FileStatus.Deleted,\r
+                                      FileOverlayStatus.Deleted, "");\r
+            NetworkAgent.DeleteAgent.DeleteCloudFile(accountInfo, tuple.ObjectInfo);\r
+            StatusKeeper.ClearFileStatus(tuple.FilePath);\r
+        }\r
+\r
+        private void ProcessChildren(AccountInfo accountInfo, StateTuple tuple, FileAgent agent, CancellationToken token)\r
+        {\r
+\r
+            var dirInfo = tuple.FileInfo as DirectoryInfo;\r
+            var folderTuples = from folder in dirInfo.EnumerateDirectories("*", SearchOption.AllDirectories)\r
+                               select new StateTuple(folder);\r
+            var fileTuples = from file in dirInfo.EnumerateFiles("*", SearchOption.AllDirectories)\r
+                             select new StateTuple(file);\r
+            \r
+            //Process folders first, to ensure folders appear on the sever as soon as possible\r
+            folderTuples.ApplyAction(t => SyncSingleItem(accountInfo, t, agent, token));\r
+            \r
+            fileTuples.ApplyAction(t => SyncSingleItem(accountInfo, t, agent, token));\r
+        }\r
+\r
+        private static IEnumerable<StateTuple> MergeSources(\r
+            IEnumerable<Tuple<string, ObjectInfo>> infos, \r
+            IEnumerable<Tuple<FileSystemInfo, string>> files, \r
+            IEnumerable<FileState> states)\r
+        {\r
+            var dct = new Dictionary<string, StateTuple>();\r
+            foreach (var file in files)\r
+            {\r
+                var fsInfo = file.Item1;\r
+                var fileHash = file.Item2;\r
+                dct[fsInfo.FullName] = new StateTuple {FileInfo = fsInfo, C = fileHash};\r
+            }\r
+            foreach (var state in states)\r
+            {\r
+                StateTuple hashTuple;\r
+                if (dct.TryGetValue(state.FilePath, out hashTuple))\r
+                {\r
+                    hashTuple.FileState = state;\r
+                }\r
+                else\r
+                {\r
+                    var fsInfo = FileInfoExtensions.FromPath(state.FilePath);\r
+                    dct[state.FilePath] = new StateTuple {FileInfo = fsInfo, FileState = state};\r
+                }\r
+            }\r
+            foreach (var info in infos)\r
+            {\r
+                StateTuple hashTuple;\r
+                var filePath = info.Item1;\r
+                var objectInfo = info.Item2;\r
+                if (dct.TryGetValue(filePath, out hashTuple))\r
+                {\r
+                    hashTuple.ObjectInfo = objectInfo;\r
+                }\r
+                else\r
+                {\r
+                    var fsInfo = FileInfoExtensions.FromPath(filePath);\r
+                    dct[filePath] = new StateTuple {FileInfo = fsInfo, ObjectInfo = objectInfo};\r
+                }\r
+            }\r
+            return dct.Values;\r
+        }\r
+\r
         /// <summary>\r
         /// Returns the latest LastModified date from the list of objects, but only if it is before\r
         /// than the threshold value\r
@@ -388,7 +687,7 @@ namespace Pithos.Core.Agents
             return threshold;\r
         }\r
 \r
-        readonly AccountsDifferencer _differencer = new AccountsDifferencer();\r
+        //readonly AccountsDifferencer _differencer = new AccountsDifferencer();\r
         private Dictionary<Uri, List<Uri>> _selectiveUris = new Dictionary<Uri, List<Uri>>();\r
         private bool _pause;\r
 \r
@@ -484,6 +783,21 @@ namespace Pithos.Core.Agents
                                               TraceLevel.Info);\r
         }\r
 \r
+        private void ReportConflictForMismatch(string localFilePath)\r
+        {\r
+            if (String.IsNullOrWhiteSpace(localFilePath))\r
+                throw new ArgumentNullException("localFilePath");\r
+            Contract.EndContractBlock();\r
+\r
+            StatusKeeper.SetFileState(localFilePath, FileStatus.Conflict, FileOverlayStatus.Conflict, "File changed at the server");\r
+            UpdateStatus(PithosStatus.HasConflicts);\r
+            var message = String.Format("Conflict detected for file {0}", localFilePath);\r
+            Log.Warn(message);\r
+            StatusNotification.NotifyChange(message, TraceLevel.Warning);\r
+        }\r
+\r
+\r
+\r
         /// <summary>\r
         /// Creates a Sync action for each changed server file\r
         /// </summary>\r
@@ -655,8 +969,10 @@ namespace Pithos.Core.Agents
         {\r
             AccountInfo account;\r
             _accounts.TryRemove(accountInfo.AccountKey, out account);\r
+/*\r
             SnapshotDifferencer differencer;\r
             _differencer.Differencers.TryRemove(accountInfo.AccountKey, out differencer);\r
+*/\r
         }\r
 \r
         public void SetSelectivePaths(AccountInfo accountInfo,Uri[] added, Uri[] removed)\r
index 79a3f18..8276051 100644 (file)
@@ -83,8 +83,8 @@ namespace Pithos.Core.Agents
 \r
         public bool IsSelected(AccountInfo account,FileSystemInfo info)\r
         {\r
-            if (info is DirectoryInfo)\r
-                return true;\r
+            /*if (info is DirectoryInfo)\r
+                return true;*/\r
             return IsSelected(account,info.FullName);\r
         }\r
 \r
index 0f7f8a8..9074160 100644 (file)
@@ -667,7 +667,7 @@ namespace Pithos.Core.Agents
                     else
                     {
                         command.CommandText =
-                            "INSERT INTO FileState (Id,FilePath,Checksum,Version,VersionTimeStamp,ShortHash,FileStatus,OverlayStatus) VALUES (:id,:path,:checksum,:version,:versionTimeStamp,:shortHash,:fileStatus,:overlayStatus)";
+                            "INSERT INTO FileState (Id,FilePath,Checksum,Version,VersionTimeStamp,ShortHash,FileStatus,OverlayStatus,ObjectID) VALUES (:id,:path,:checksum,:version,:versionTimeStamp,:shortHash,:fileStatus,:overlayStatus,:objectID)";
                         command.Parameters.AddWithValue("id", Guid.NewGuid());
                     }
 
@@ -675,11 +675,10 @@ namespace Pithos.Core.Agents
                     command.Parameters.AddWithValue("checksum", objectInfo.Hash);
                     command.Parameters.AddWithValue("shortHash", "");
                     command.Parameters.AddWithValue("version", objectInfo.Version);
-                    command.Parameters.AddWithValue("versionTimeStamp",
-                                                    objectInfo.VersionTimestamp);
+                    command.Parameters.AddWithValue("versionTimeStamp", objectInfo.VersionTimestamp);
                     command.Parameters.AddWithValue("fileStatus", FileStatus.Unchanged);
-                    command.Parameters.AddWithValue("overlayStatus",
-                                                    FileOverlayStatus.Normal);
+                    command.Parameters.AddWithValue("overlayStatus", FileOverlayStatus.Normal);
+                    command.Parameters.AddWithValue("objectID",objectInfo.UUID);
 
                     var affected = command.ExecuteNonQuery();
                     return;
index 9a36adc..37c55be 100644 (file)
@@ -56,27 +56,29 @@ namespace Pithos.Core.Agents
                     //Try to load the action's local state, if it is empty\r
                     if (action.FileState == null)\r
                         action.FileState = StatusKeeper.GetStateByFilePath(fileInfo.FullName);\r
-                    if (action.FileState == null)\r
+                    if (action.FileState != null)\r
                     {\r
-                        Log.WarnFormat("File [{0}] has no local state. It was probably created by a download action", fileInfo.FullName);\r
-                        return;\r
-                    }\r
+                        /*\r
+                                                Log.WarnFormat("File [{0}] has no local state. It was probably created by a download action", fileInfo.FullName);\r
+                                                return;\r
+                        */\r
 \r
-                    var latestState = action.FileState;\r
 \r
-                    //Do not upload files in conflict\r
-                    if (latestState.FileStatus == FileStatus.Conflict)\r
-                    {\r
-                        Log.InfoFormat("Skipping file in conflict [{0}]", fileInfo.FullName);\r
-                        return;\r
-                    }\r
-                    //Do not upload files when we have no permission\r
-                    if (latestState.FileStatus == FileStatus.Forbidden)\r
-                    {\r
-                        Log.InfoFormat("Skipping forbidden file [{0}]", fileInfo.FullName);\r
-                        return;\r
-                    }\r
+                        var latestState = action.FileState;\r
 \r
+                        //Do not upload files in conflict\r
+                        if (latestState.FileStatus == FileStatus.Conflict)\r
+                        {\r
+                            Log.InfoFormat("Skipping file in conflict [{0}]", fileInfo.FullName);\r
+                            return;\r
+                        }\r
+                        //Do not upload files when we have no permission\r
+                        if (latestState.FileStatus == FileStatus.Forbidden)\r
+                        {\r
+                            Log.InfoFormat("Skipping forbidden file [{0}]", fileInfo.FullName);\r
+                            return;\r
+                        }\r
+                    }\r
                     //Are we targeting our own account or a sharer account?\r
                     var relativePath = fileInfo.AsRelativeTo(action.AccountInfo.AccountPath);\r
                     var accountInfo = relativePath.StartsWith(FolderConstants.OthersFolder) \r
index 2c459fe..2303d0d 100644 (file)
@@ -43,6 +43,7 @@ using System;
 using System.Collections.Generic;
 using System.Diagnostics.Contracts;
 using System.Linq;
+using System.Linq.Expressions;
 using System.Text;
 
 namespace Pithos.Core
index 3bafda0..5e2ef27 100644 (file)
@@ -72,6 +72,9 @@ namespace Pithos.Core
         public Guid Id { get; set; }
 
         
+        //[Property(Unique = true, UniqueKey = "IX_FileState_ObjectID")]
+        public string ObjectID { get; set; }
+
         [Property(Unique = true, UniqueKey = "IX_FileState_FilePath")]
         public string FilePath { get; set; }
 
index 7dbb947..ab6ba13 100644 (file)
@@ -111,14 +111,25 @@ namespace Pithos.Core
             set
             {
                 _workflowAgent = value;
-                FileAgent.WorkflowAgent = value;
+                //FileAgent.WorkflowAgent = value;
             }
         }
         
         [Import]
         public NetworkAgent NetworkAgent { get; set; }
+
+        private PollAgent _pollAgent;
+
         [Import]
-        public PollAgent PollAgent { get; set; }
+        public PollAgent PollAgent
+        {
+            get { return _pollAgent; }
+            set
+            {
+                _pollAgent = value;
+                FileAgent.PollAgent = value;
+            }
+        }
 
         private Selectives _selectives;
 
@@ -157,15 +168,15 @@ namespace Pithos.Core
 
 
 
-
-        public bool Pause
+        public bool Pause { get; set; }       
+        /*public bool Pause
         {
             get { return FileAgent.Pause; }
             set
             {
                 FileAgent.Pause = value;
             }
-        }
+        }*/
 
         private string _rootPath;
         public string RootPath
@@ -184,7 +195,7 @@ namespace Pithos.Core
 
         public PithosMonitor()
         {
-            FileAgent = new FileAgent();
+            FileAgent = new FileAgent();            
         }
         private bool _started;
 
@@ -405,11 +416,13 @@ namespace Pithos.Core
 
         public void Stop()
         {
+/*
             AgentLocator<FileAgent>.Remove(RootPath);
 
             if (FileAgent!=null)
                 FileAgent.Stop();
             FileAgent = null;
+*/
         }
 
 
index e22da1e..87d00bb 100644 (file)
@@ -121,6 +121,7 @@ namespace Pithos.Interfaces
             try
             {
 
+                dirInfo.Refresh();
 
                 if (dirInfo.Exists)
                     return Path.Combine(GetProperDirectoryCapitalization(parentDirInfo.FullName),
index f9ea743..70f0629 100644 (file)
@@ -246,6 +246,15 @@ namespace Pithos.Network
             var size = stream.Length;\r
             Log.DebugFormat("Hashing [{0}] size [{1}]", path, size);\r
 \r
+            //TODO: Handle zero-length files\r
+            if (size == 0)\r
+            {\r
+                var buf = new byte[0];\r
+                var hasher=HashAlgorithm.Create(algorithm);                \r
+                hashes[0]=hasher.ComputeHash(buf);\r
+                return hashes;\r
+            }\r
+\r
 \r
             var buffer = new byte[parallelism][];\r
             var hashers = new HashAlgorithm[parallelism];\r
@@ -263,6 +272,8 @@ namespace Pithos.Network
                 int read;\r
                 int bufIdx = 0;\r
                 long index = 0;\r
+\r
+\r
                 while ((read = await stream.ReadAsync(buffer[bufIdx], 0, blockSize)) > 0)\r
                 {\r
                     index += read;\r
@@ -274,25 +285,26 @@ namespace Pithos.Network
                     {\r
                         //var options = new ParallelOptions {MaxDegreeOfParallelism = parallelism};\r
                         Parallel.For(0, bufIdx + 1, idx =>\r
-                        {\r
-                            //This code was added for compatibility with the way Pithos calculates the last hash\r
-                            //We calculate the hash only up to the last non-null byte\r
-                            var lastByteIndex = Array.FindLastIndex(buffer[idx],\r
-                                                                    bufferCount[idx] - 1,\r
-                                                                    aByte => aByte != 0);\r
-\r
-                            var hasher = hashers[idx];\r
-                            var hash = hasher.ComputeHash(buffer[idx], 0, lastByteIndex + 1);\r
-                            var filePosition = indices[idx];\r
-                            /*\r
+                                                        {\r
+                                                            //This code was added for compatibility with the way Pithos calculates the last hash\r
+                                                            //We calculate the hash only up to the last non-null byte\r
+                                                            var lastByteIndex = Array.FindLastIndex(buffer[idx],\r
+                                                                                                    bufferCount[idx] - 1,\r
+                                                                                                    aByte => aByte != 0);\r
+\r
+                                                            var hasher = hashers[idx];\r
+                                                            var hash = hasher.ComputeHash(buffer[idx], 0,\r
+                                                                                          lastByteIndex + 1);\r
+                                                            var filePosition = indices[idx];\r
+                                                            /*\r
                                                         Trace.TraceInformation("Hashed [{0}] [{1}/{2}] [{3:p}]", path,\r
                                                                                 filePosition, size,\r
                                                                                 (double)filePosition / size);\r
                             */\r
-                            hashes[filePosition] = hash;\r
-                        });\r
+                                                            hashes[filePosition] = hash;\r
+                                                        });\r
                     }\r
-                    bufIdx = (bufIdx + 1) % parallelism;\r
+                    bufIdx = (bufIdx + 1)%parallelism;\r
                 }\r
             }\r
             finally\r
index 1e83328..61f1641 100644 (file)
@@ -1031,6 +1031,8 @@ namespace Pithos.Network
             //Send the tree hash as Json to the server            
             client.Headers[HttpRequestHeader.ContentType] = "application/octet-stream";
             var jsonHash = hash.ToJson();
+                        
+            client.Headers.Add("ETag",hash.MD5);
             var uploadTask=client.UploadStringTask(uri, "PUT", jsonHash);
             if (Log.IsDebugEnabled)
                 Log.DebugFormat("Hashes:\r\n{0}", jsonHash);
index 865e818..5fc2ea7 100644 (file)
@@ -135,6 +135,7 @@ namespace Pithos.Network
             TimedOut = false;
             var webRequest = base.GetWebRequest(address);            
             var request = (HttpWebRequest)webRequest;
+            request.CookieContainer=new CookieContainer();
             request.ServicePoint.ConnectionLimit = 50;
             if (IfModifiedSince.HasValue)
                 request.IfModifiedSince = IfModifiedSince.Value;
index 389dd79..5e73c18 100644 (file)
@@ -126,7 +126,7 @@ namespace Pithos.Network
             if (String.IsNullOrWhiteSpace(algorithm))
                 throw new ArgumentNullException("algorithm");
             Contract.EndContractBlock();
-
+            fileInfo.Refresh();
             if (fileInfo is DirectoryInfo || !fileInfo.Exists)
                 return TreeHash.Empty;