Multiple changes to enable delete detection, safer uploading
authorPanagiotis Kanavos <pkanavos@gmail.com>
Mon, 10 Oct 2011 20:41:06 +0000 (23:41 +0300)
committerPanagiotis Kanavos <pkanavos@gmail.com>
Mon, 10 Oct 2011 20:41:06 +0000 (23:41 +0300)
21 files changed:
trunk/Pithos.Core.Test/MockStatusKeeper.cs
trunk/Pithos.Core.Test/NetworkAgentTest.cs
trunk/Pithos.Core.Test/StatusCheckerTest.cs
trunk/Pithos.Core/Agents/BlockUpdater.cs
trunk/Pithos.Core/Agents/CloudAction.cs
trunk/Pithos.Core/Agents/FileAgent.cs
trunk/Pithos.Core/Agents/FileInfoExtensions.cs
trunk/Pithos.Core/Agents/NetworkAgent.cs
trunk/Pithos.Core/Agents/WorkflowAgent.cs
trunk/Pithos.Core/FileState.cs
trunk/Pithos.Core/IStatusKeeper.cs
trunk/Pithos.Core/InMemStatusChecker.cs
trunk/Pithos.Core/Pithos.Core.csproj
trunk/Pithos.Core/PithosMonitor.cs
trunk/Pithos.Core/PithosWorkflow.cs
trunk/Pithos.Core/StatusKeeper.cs
trunk/Pithos.Core/TaskExtensions.cs
trunk/Pithos.Network/CloudFilesClient.cs
trunk/Pithos.Network/ICloudClient.cs
trunk/Pithos.Network/RestClient.cs
trunk/Pithos.Network/Signature.cs

index c33ec8c..9cac001 100644 (file)
@@ -2,6 +2,7 @@
 using System.Collections.Concurrent;
 using System.Collections.Generic;
 using System.Diagnostics.Contracts;
+using System.IO;
 using System.Linq;
 using System.Text;
 using System.Threading;
@@ -35,27 +36,26 @@ namespace Pithos.Core.Test
             return FileOverlayStatus.Unversioned;
         }
 
-        public IEnumerable<string> StoreUnversionedFiles(ParallelQuery<string> paths)
+        public void ProcessExistingFiles(IEnumerable<FileInfo> paths)
         {
 
             var newFiles = (from file in paths
-                            where !_overlayCache.ContainsKey(file)
+                            where !_overlayCache.ContainsKey(file.FullName)
                             select new
                             {
-                                FilePath = file,
+                                FilePath = file.FullName.ToLower(),
                                 OverlayStatus = FileOverlayStatus.Unversioned,
                                 FileStatus = FileStatus.Created,
                                 Checksum = Signature.CalculateMD5(file)
                             });
             var files = new ConcurrentBag<string>();
-            newFiles.ForAll(state =>
-            {
+            foreach (var state in newFiles)
+            {            
                 _overlayCache[state.FilePath] = state.OverlayStatus;
                 _statusCache[state.FilePath] = state.FileStatus;
                 _checksums[state.FilePath] = state.Checksum;
                 files.Add(state.FilePath);
-            });
-            return files.ToList();
+            }            
             
         }
 
index 521dd23..6094e09 100644 (file)
@@ -81,7 +81,7 @@ namespace Pithos.Core.Test
 
             agent.CloudClient.Authenticate("890329@vho.grnet.gr", "24989dce4e0fcb072f8cb60c8922be19");
 
-            var fileName = @"vlc-1.1.11-win32.exe";
+            var fileName = @"AccessDatabaseEngine_x64.exe";
 
             var filePath = Path.Combine(@"e:\pithos\", fileName);
             if (File.Exists(filePath))
index f223878..3f0ad1e 100644 (file)
@@ -17,7 +17,7 @@ namespace Pithos.Core.Test
                                      Tuple.Create(@"e:\pithos\0File4.txt", FileOverlayStatus.Modified)
                                     };
 
-            var checker = new InMemStatusChecker();
+            var checker = new MockStatusChecker();
 
             foreach (var file in files)
             {
@@ -42,7 +42,7 @@ namespace Pithos.Core.Test
                                      Tuple.Create(@"e:\pithos\0File4.txt", FileOverlayStatus.Modified)
                                     };
 
-            var checker = new InMemStatusChecker();
+            var checker = new MockStatusChecker();
 
             foreach (var file in files)
             {
@@ -68,7 +68,7 @@ namespace Pithos.Core.Test
                                 Tuple.Create(@"e:\pithos\0File4.txt", FileOverlayStatus.Modified)
                             };
 
-            var checker = new InMemStatusChecker();
+            var checker = new MockStatusChecker();
 
             foreach (var file in files)
             {
index f11f4e4..6b0d310 100644 (file)
@@ -4,6 +4,7 @@ using System.Collections.Generic;
 using System.Diagnostics.Contracts;
 using System.IO;
 using System.Linq;
+using System.Security.Cryptography;
 using System.Text;
 using System.Threading.Tasks;
 using Pithos.Network;
@@ -20,33 +21,81 @@ namespace Pithos.Core.Agents
         public TreeHash ServerHash { get; private set; }
 
         public string TempPath { get; private set; }
-        
+
 
         public BlockUpdater(string fragmentsPath, string filePath, string relativePath,TreeHash serverHash)
-        {            
+        {   
+            if (String.IsNullOrWhiteSpace(fragmentsPath))
+                throw new ArgumentNullException("fragmentsPath");
+            if (!Path.IsPathRooted(fragmentsPath))
+                throw new ArgumentException("The fragmentsPath must be rooted", "fragmentsPath");
+            
+            if (string.IsNullOrWhiteSpace(filePath))
+                throw new ArgumentNullException("filePath");
+            if (!Path.IsPathRooted(filePath))
+                throw new ArgumentException("The filePath must be rooted", "filePath");
+            
+            if (string.IsNullOrWhiteSpace(relativePath))
+                throw new ArgumentNullException("relativePath");
+            if (Path.IsPathRooted(relativePath))
+                throw new ArgumentException("The relativePath must NOT be rooted", "relativePath");
+
+            if (serverHash == null)
+                throw new ArgumentNullException("serverHash");
+            Contract.EndContractBlock();
+
             FragmentsPath=fragmentsPath;
             FilePath = filePath;
             RelativePath=relativePath;
             ServerHash = serverHash;
-
-            Start();
-        }
-
-        public void Start()
-        {
             //The file will be stored in a temporary location while downloading with an extension .download
             TempPath = Path.Combine(FragmentsPath, RelativePath + ".download");
+
             var directoryPath = Path.GetDirectoryName(TempPath);
             if (!Directory.Exists(directoryPath))
                 Directory.CreateDirectory(directoryPath);
+
+            LoadOrphans(directoryPath);
+        }
+
+        private void LoadOrphans(string directoryPath)
+        {
+            if (string.IsNullOrWhiteSpace(directoryPath))
+                throw new ArgumentNullException("directoryPath");
+            if (!Path.IsPathRooted(directoryPath))
+                throw new ArgumentException("The directoryPath must be rooted", "directoryPath");
+            Contract.EndContractBlock();
+
+            var fileNamename = Path.GetFileName(FilePath);
+            var orphans = Directory.GetFiles(directoryPath, fileNamename + ".*");
+            foreach (var orphan in orphans)
+            {
+                using (HashAlgorithm hasher = HashAlgorithm.Create(ServerHash.BlockHash))
+                {
+                    var buffer=File.ReadAllBytes(orphan);
+                    //The server truncates nulls before calculating hashes, have to do the same
+                    //Find the last non-null byte, starting from the end
+                    var lastByteIndex = Array.FindLastIndex(buffer, buffer.Length-1, aByte => aByte != 0);
+                    var binHash = hasher.ComputeHash(buffer,0,lastByteIndex);
+                    var hash = binHash.ToHashString();
+                    _orphanBlocks[hash] = orphan;
+                }
+            }
         }
 
 
         public void Commit()
-        {                       
+        {
+            if (String.IsNullOrWhiteSpace(FilePath))
+                throw new InvalidOperationException("FilePath is empty");
+            if (String.IsNullOrWhiteSpace(TempPath))
+                throw new InvalidOperationException("TempPath is empty");
+            Contract.EndContractBlock();
+
             //Copy the file to a temporary location. Changes will be made to the
             //temporary file, then it will replace the original file
-            File.Copy(FilePath, TempPath, true);
+            if (File.Exists(FilePath))
+                File.Copy(FilePath, TempPath, true);
 
             //Set the size of the file to the size specified in the treehash
             //This will also create an empty file if the file doesn't exist            
@@ -74,6 +123,12 @@ namespace Pithos.Core.Agents
 
         private void SwapFiles()
         {
+            if (String.IsNullOrWhiteSpace(FilePath))
+                throw new InvalidOperationException("FilePath is empty");
+            if (String.IsNullOrWhiteSpace(TempPath))
+                throw new InvalidOperationException("TempPath is empty");            
+            Contract.EndContractBlock();
+
             if (File.Exists(FilePath))
                 File.Replace(TempPath, FilePath, null, true);
             else
@@ -82,13 +137,17 @@ namespace Pithos.Core.Agents
 
         private void ClearBlocks()
         {
-            foreach (var block in _blocks)
+            //Get all the the block paths, orphan or not
+            var paths= _blocks.Select(pair => pair.Value)
+                          .Union(_orphanBlocks.Select(pair => pair.Value));
+            foreach (var filePath in paths)
             {
-                var filePath = block.Value;                
                 File.Delete(filePath);
             }
+
             File.Delete(TempPath);
             _blocks.Clear();
+            _orphanBlocks.Clear();
         }
 
         //Change the file's size, possibly truncating or adding to it
@@ -128,12 +187,27 @@ namespace Pithos.Core.Agents
         }*/
 
         ConcurrentDictionary<int,string> _blocks=new ConcurrentDictionary<int, string>();
+        ConcurrentDictionary<string, string> _orphanBlocks = new ConcurrentDictionary<string, string>();
+
+        public bool UseOrphan(int blockIndex, string blockHash)
+        {
+            string blockPath=null;
+            if (_orphanBlocks.TryGetValue(blockHash,out blockPath))
+            {
+                _blocks[blockIndex] = blockPath;
+                return true;
+            }
+            return false;
+        }
 
         public Task StoreBlock(int blockIndex,byte[] buffer)
         {
-            var blockPath = String.Format("{0}.{1:3}", TempPath, blockIndex);
+            var blockPath = String.Format("{0}.{1:000000}", TempPath, blockIndex);
             _blocks[blockIndex] = blockPath;
-            
+            //Remove any orphan files
+            if (File.Exists(blockPath))
+                File.Delete(blockPath);
+
             return FileAsync.WriteAllBytes(blockPath, buffer);
         }
 
index 90f2455..3a594cc 100644 (file)
@@ -39,10 +39,11 @@ namespace Pithos.Core.Agents
             OldPath = oldPath;
             NewFileName = newFileName;
             NewPath = newPath;
-            LocalHash = new Lazy<string>(() => Signature.CalculateMD5(NewFileName), LazyThreadSafetyMode.ExecutionAndPublication);
+            //This is a rename operation, a hash will not be used
+            LocalHash = new Lazy<string>(() => String.Empty, LazyThreadSafetyMode.ExecutionAndPublication);
         }
 
-        public CloudAction(CloudActionType action, FileInfo localFile, ObjectInfo cloudFile,FileState state)
+        public CloudAction(CloudActionType action, FileInfo localFile, ObjectInfo cloudFile,FileState state,int blockSize, string algorithm)
         {
             Action = action;
             LocalFile = localFile;
@@ -50,7 +51,7 @@ namespace Pithos.Core.Agents
             FileState = state;
             if (LocalFile != null)
             {
-                LocalHash = new Lazy<string>(() => Signature.CalculateMD5(LocalFile),
+                LocalHash = new Lazy<string>(() => LocalFile.CalculateHash(blockSize,algorithm),
                                              LazyThreadSafetyMode.ExecutionAndPublication);
             }
         }
index 3eacd5d..8b67103 100644 (file)
@@ -76,8 +76,15 @@ namespace Pithos.Core.Agents
             }
             catch (IOException exc)
             {
-                Trace.TraceWarning("File access error occured, retrying {0}\n{1}", state.Path, exc);
-                _agent.Post(state);
+                if (File.Exists(state.Path))
+                {
+                    Trace.TraceWarning("File access error occured, retrying {0}\n{1}", state.Path, exc);
+                    _agent.Post(state);
+                }
+                else
+                {
+                    Trace.TraceWarning("File {0} does not exist. Will be ignored\n{1}", state.Path, exc);
+                }
             }
             catch (Exception exc)
             {
@@ -162,6 +169,8 @@ namespace Pithos.Core.Agents
         {
             if (filePath.StartsWith(FragmentsPath))
                 return true;
+            if (_ignoreFiles.ContainsKey(filePath.ToLower()))
+                return true;
             return false;
         }
 
@@ -204,6 +213,8 @@ namespace Pithos.Core.Agents
             {WatcherChangeTypes.Renamed,FileStatus.Renamed}
         };
 
+        private Dictionary<string,string> _ignoreFiles=new Dictionary<string, string>();
+
         private WorkflowState UpdateFileStatus(WorkflowState state)
         {
             Debug.Assert(!state.Path.Contains("fragments"));
@@ -237,7 +248,7 @@ namespace Pithos.Core.Agents
                     this.StatusKeeper.SetFileOverlayStatus(state.Path, FileOverlayStatus.Modified);
                     break;
                 case FileStatus.Deleted:
-                    this.StatusKeeper.RemoveFileOverlayStatus(state.Path);
+                    //this.StatusKeeper.RemoveFileOverlayStatus(state.Path);
                     break;
                 case FileStatus.Renamed:
                     this.StatusKeeper.RemoveFileOverlayStatus(state.OldPath);
@@ -269,8 +280,8 @@ namespace Pithos.Core.Agents
             if (Directory.Exists(path))
                 return state;
 
-            string hash = Signature.CalculateMD5(path);
-
+            var info = new FileInfo(path);
+            string hash = info.CalculateHash(StatusKeeper.BlockSize,StatusKeeper.BlockHash);
             StatusKeeper.UpdateFileChecksum(path, hash);
 
             state.Hash = hash;
@@ -314,5 +325,15 @@ namespace Pithos.Core.Agents
             
         }
 
+        public void Delete(string relativePath)
+        {
+            var absolutePath = Path.Combine(RootPath, relativePath);
+            if (File.Exists(absolutePath))
+            {                   
+                File.Delete(absolutePath);
+                _ignoreFiles[absolutePath.ToLower()] = absolutePath.ToLower();                
+            }
+            StatusKeeper.ClearFileStatus(absolutePath);
+        }
     }
 }
index d730fab..b8608ad 100644 (file)
@@ -5,6 +5,7 @@ using System.Text;
 using System.IO;
 using System.Text.RegularExpressions;
 using System.Threading.Tasks;
+using Pithos.Network;
 
 namespace Pithos.Core.Agents
 {
@@ -87,5 +88,14 @@ namespace Pithos.Core.Agents
 
         }
 
+        public static string CalculateHash(this FileInfo info,int blockSize,string algorithm)
+        {
+            if (info.Length <= blockSize)
+                return Signature.CalculateMD5(info.FullName);
+            else
+                return Signature.CalculateTreeHash(info.FullName, blockSize, algorithm).TopHash.ToHashString();
+
+        }
+
     }
 }
index b3f6a46..ff7ca05 100644 (file)
@@ -5,6 +5,7 @@ using System.Diagnostics;
 using System.Diagnostics.Contracts;
 using System.IO;
 using System.Linq;
+using System.Net;
 using System.Text;
 using System.Threading;
 using System.Threading.Tasks;
@@ -213,14 +214,22 @@ namespace Pithos.Core.Agents
             //Get the list of server objects changed since the last check
             var listObjects = Task<IList<ObjectInfo>>.Factory.StartNewDelayed(10000,()=>
                             CloudClient.ListObjects(PithosContainer,since));
+            //Get the list of deleted objects since the last check
+            var listTrash= Task<IList<ObjectInfo>>.Factory.StartNewDelayed(10000,()=>
+                            CloudClient.ListObjects(TrashContainer,since));
+
+            var listAll = Task.Factory.TrackedSequence(() => listObjects, () => listTrash);
 
             //Next time we will check for all changes since the current check minus 1 second
             //This is done to ensure there are no discrepancies due to clock differences
             DateTime nextSince = DateTime.Now.AddSeconds(-1);
+
+
+
             
-            
 
-            var enqueueFiles = listObjects.ContinueWith(task =>
+
+            var enqueueFiles = listAll.ContinueWith(task =>
             {
                 if (task.IsFaulted)
                 {
@@ -232,7 +241,17 @@ namespace Pithos.Core.Agents
                 Trace.CorrelationManager.StartLogicalOperation("Listener");
                 Trace.TraceInformation("[LISTENER] Start Processing");
 
-                var remoteObjects = task.Result;
+                var trashObjects = ((Task<IList<ObjectInfo>>)task.Result[1]).Result;
+                var remoteObjects = ((Task<IList<ObjectInfo>>)task.Result[0]).Result;
+
+                
+                //Items with the same name, hash may be both in the container and the trash
+                //Don't delete items that exist in the container
+                var realTrash = from trash in trashObjects
+                                where !remoteObjects.Any(info => info.Hash == trash.Hash)
+                                select trash;
+                ProcessDeletedFiles(realTrash);
+
 
                 var remote=from info in remoteObjects
                                   let name=info.Name
@@ -247,17 +266,25 @@ namespace Pithos.Core.Agents
                 //over the remote files
                 foreach (var objectInfo in remote)
                 {
-                    var relativePath= objectInfo.Name.RelativeUrlToFilePath();// fileInfo.AsRelativeUrlTo(FileAgent.RootPath);
+                    var relativePath= objectInfo.Name.RelativeUrlToFilePath();
                     //and remove any matching objects from the list, adding them to the commonObjects list
                     if (FileAgent.Exists(relativePath))
                     {
                         var localFile = FileAgent.GetFileInfo(relativePath);
-                        var state=FileState.FindByFilePath(localFile.FullName);                        
-                        commonObjects.Add(Tuple.Create(objectInfo, localFile,state));
+                        var state = FileState.FindByFilePath(localFile.FullName);
+                        commonObjects.Add(Tuple.Create(objectInfo, localFile, state));
                     }
                     else
+                    {
                         //If there is no match we add them to the localFiles list
-                        remoteOnly.Add(objectInfo);
+                        //but only if the file is not marked for deletion
+                        var targetFile = Path.Combine(FileAgent.RootPath, relativePath);
+                        var fileStatus = StatusKeeper.GetFileStatus(targetFile);
+                        if (fileStatus!=FileStatus.Deleted)
+                            remoteOnly.Add(objectInfo);
+                        
+                        
+                    }
                 }
 
                 //At the end of the iteration, the *remote* list will contain the files that exist 
@@ -273,8 +300,11 @@ namespace Pithos.Core.Agents
                                        let localFile = pair.Item2
                                        let state=pair.Item3
                                        select new CloudAction(CloudActionType.MustSynch, 
-                                           localFile, objectInfo,state);
+                                           localFile, objectInfo,state,BlockSize,BlockHash);
                 
+                    
+
+
 
                 //Collect all the actions
                 var allActions = actionsForRemote.Union(actionsForCommon);
@@ -315,8 +345,16 @@ namespace Pithos.Core.Agents
             return loop;
         }
 
-        
-       
+        private void ProcessDeletedFiles(IEnumerable<ObjectInfo> trashObjects)
+        {
+            foreach (var trashObject in trashObjects)
+            {
+                var relativePath = trashObject.Name.RelativeUrlToFilePath();
+                //and remove any matching objects from the list, adding them to the commonObjects list
+                FileAgent.Delete(relativePath);                                
+            }
+        }
+
 
         private void RenameCloudFile(string oldFileName, string newPath, string newFileName)
         {
@@ -346,8 +384,7 @@ namespace Pithos.Core.Agents
             Contract.EndContractBlock();
 
             this.StatusKeeper.SetFileOverlayStatus(fileName, FileOverlayStatus.Modified);
-
-            CloudClient.MoveObject(PithosContainer, fileName, TrashContainer, fileName);
+            CloudClient.DeleteObject(PithosContainer, fileName, TrashContainer);
 
             this.StatusKeeper.ClearFileStatus(fileName);
             this.StatusKeeper.RemoveFileOverlayStatus(fileName);
@@ -358,6 +395,22 @@ namespace Pithos.Core.Agents
         {
             if (String.IsNullOrWhiteSpace(container))
                 throw new ArgumentNullException("container");
+            if (relativeUrl == null)
+                throw new ArgumentNullException("relativeUrl");
+            if (String.IsNullOrWhiteSpace(localPath))
+                throw new ArgumentNullException("localPath");
+            if (!Path.IsPathRooted(localPath))
+                throw new ArgumentException("The localPath must be rooted", "localPath");
+            Contract.EndContractBlock();
+            
+            var download=Task.Factory.Iterate(DownloadIterator(container, relativeUrl, localPath));
+            download.Wait();
+        }
+
+        private IEnumerable<Task> DownloadIterator(string container, Uri relativeUrl, string localPath)
+        {
+            if (String.IsNullOrWhiteSpace(container))
+                throw new ArgumentNullException("container");
             if (relativeUrl==null)
                 throw new ArgumentNullException("relativeUrl");
             if (String.IsNullOrWhiteSpace(localPath))
@@ -368,46 +421,37 @@ namespace Pithos.Core.Agents
 
             var url = relativeUrl.ToString();
             if (url.EndsWith(".ignore",StringComparison.InvariantCultureIgnoreCase))
-                return;
+                yield break;
 
             //Are we already downloading or uploading the file? 
             using (var gate=NetworkGate.Acquire(localPath, NetworkOperation.Downloading))
             {
                 if (gate.Failed)
-                    return;
+                    yield break;
                 //The file's hashmap will be stored in the same location with the extension .hashmap
                 //var hashPath = Path.Combine(FileAgent.FragmentsPath, relativePath + ".hashmap");
                 
                 //Retrieve the hashmap from the server
-                var getHashMap = CloudClient.GetHashMap(container,url);                
+                var getHashMap = CloudClient.GetHashMap(container,url);
+                yield return getHashMap;
                 
-                var downloadTask= getHashMap.ContinueWith(t =>
-                {
-                    var serverHash=t.Result;
-                    //If it's a small file
-                    return serverHash.Hashes.Count == 1 
-                        //Download it in one go
-                        ? DownloadEntireFile(container, relativeUrl, localPath) 
-                        //Otherwise download it block by block
-                        : DownloadWithBlocks(container, relativeUrl, localPath, serverHash);
-                });
+                var serverHash=getHashMap.Result;
+                //If it's a small file
+                var downloadTask=(serverHash.Hashes.Count == 1 )
+                    //Download it in one go
+                    ? DownloadEntireFile(container, relativeUrl, localPath) 
+                    //Otherwise download it block by block
+                    : DownloadWithBlocks(container, relativeUrl, localPath, serverHash);
+
+                yield return downloadTask;
 
-                
 
                 //Retrieve the object's metadata
-                var getInfo = downloadTask.ContinueWith(t =>
-                {
-                    t.PropagateExceptions();
-                    return CloudClient.GetObjectInfo(container, url);
-                });
+                var info=CloudClient.GetObjectInfo(container, url);
                 //And store it
-                var storeInfo = getInfo.ContinueWith(t =>
-                {
-                    t.PropagateExceptions();
-                    StatusKeeper.StoreInfo(localPath, t.Result);
-                });
-
-                storeInfo.Wait();
+                StatusKeeper.StoreInfo(localPath, info);
+                
+                //Notify listeners that a local file has changed
                 StatusNotification.NotifyChangedFile(localPath);
 
             }
@@ -448,7 +492,25 @@ namespace Pithos.Core.Agents
             return getObject;
         }
 
-        public Task DownloadWithBlocks(string container,Uri relativeUrl, string localPath,TreeHash serverHash)
+        //Download a file asynchronously using blocks
+        public Task DownloadWithBlocks(string container, Uri relativeUrl, string localPath, TreeHash serverHash)
+        {
+            if (String.IsNullOrWhiteSpace(container))
+                throw new ArgumentNullException("container");
+            if (relativeUrl == null)
+                throw new ArgumentNullException("relativeUrl");
+            if (String.IsNullOrWhiteSpace(localPath))
+                throw new ArgumentNullException("localPath");
+            if (!Path.IsPathRooted(localPath))
+                throw new ArgumentException("The localPath must be rooted", "localPath");
+            if (serverHash == null)
+                throw new ArgumentNullException("serverHash");
+            Contract.EndContractBlock();
+            
+            return Task.Factory.Iterate(BlockDownloadIterator(container, relativeUrl, localPath, serverHash));
+        }
+        
+        private IEnumerable<Task> BlockDownloadIterator(string container,Uri relativeUrl, string localPath,TreeHash serverHash)
         {
             if (String.IsNullOrWhiteSpace(container))
                 throw new ArgumentNullException("container");
@@ -468,52 +530,77 @@ namespace Pithos.Core.Agents
             var blockUpdater = new BlockUpdater(FileAgent.FragmentsPath, localPath, relativePath, serverHash);
 
             
-            return Task.Factory.StartNew(() =>
-            {
-                //Calculate the temp file's treehash
-                var treeHash = Signature.CalculateTreeHashAsync(localPath, this.BlockSize,BlockHash).Result;
+                        
+            //Calculate the file's treehash
+            var calcHash = Signature.CalculateTreeHashAsync(localPath, this.BlockSize,BlockHash);
+            yield return calcHash;                        
+            var treeHash = calcHash.Result;
                 
-                //And compare it with the server's hash
-                var upHashes = serverHash.GetHashesAsStrings();
-                var localHashes = treeHash.HashDictionary;
-                for (int i = 0; i < upHashes.Length; i++)
+            //And compare it with the server's hash
+            var upHashes = serverHash.GetHashesAsStrings();
+            var localHashes = treeHash.HashDictionary;
+            for (int i = 0; i < upHashes.Length; i++)
+            {
+                //For every non-matching hash
+                var upHash = upHashes[i];
+                if (!localHashes.ContainsKey(upHash))
                 {
-                    //For every non-matching hash
-                    if (!localHashes.ContainsKey(upHashes[i]))
+                    if (blockUpdater.UseOrphan(i, upHash))
                     {
-                        Trace.TraceInformation("[BLOCK GET] START {0} of {1} for {2}",i,upHashes.Length,localPath);
-                        var start = i*BlockSize;
-                        long? end = null;
-                        if (i < upHashes.Length - 1 )
-                            end= ((i + 1)*BlockSize) ;
+                        Trace.TraceInformation("[BLOCK GET] ORPHAN FOUND for {0} of {1} for {2}", i, upHashes.Length, localPath);
+                        continue;
+                    }
+                    Trace.TraceInformation("[BLOCK GET] START {0} of {1} for {2}",i,upHashes.Length,localPath);
+                    var start = i*BlockSize;
+                    //To download the last block just pass a null for the end of the range
+                    long? end = null;
+                    if (i < upHashes.Length - 1 )
+                        end= ((i + 1)*BlockSize) ;
                             
-                        //Get its block
-                        var block= CloudClient.GetBlock(container, relativeUrl,start, end);
+                    //Download the missing block
+                    var getBlock = CloudClient.GetBlock(container, relativeUrl, start, end);
+                    yield return getBlock;
+                    var block = getBlock.Result;
 
-                        var store=block.Then(b => blockUpdater.StoreBlock(i, b));
-                        store.Wait();
+                    //and store it
+                    yield return blockUpdater.StoreBlock(i, block);
+                    
 
-                        Trace.TraceInformation("[BLOCK GET] FINISH {0} of {1} for {2}", i, upHashes.Length, localPath);
-                    }
+                    Trace.TraceInformation("[BLOCK GET] FINISH {0} of {1} for {2}", i, upHashes.Length, localPath);
                 }
-                
-                blockUpdater.Commit();
-                Trace.TraceInformation("[BLOCK GET] COMPLETE {0}", localPath);
-            });
-        }
+            }
 
+            blockUpdater.Commit();
+            Trace.TraceInformation("[BLOCK GET] COMPLETE {0}", localPath);            
+        }
 
 
         private void UploadCloudFile(FileInfo fileInfo, string hash,string topHash)
         {
+            if (fileInfo == null)
+                throw new ArgumentNullException("fileInfo");
+            if (String.IsNullOrWhiteSpace(hash))
+                throw new ArgumentNullException("hash");
+            if (topHash == null)
+                throw new ArgumentNullException("topHash");
+            Contract.EndContractBlock();
+
+            var upload = Task.Factory.Iterate(UploadIterator(fileInfo, hash, topHash));
+            upload.Wait();
+        }
+
+        private IEnumerable<Task> UploadIterator(FileInfo fileInfo, string hash,string topHash)
+        {
             if (fileInfo==null)
                 throw new ArgumentNullException("fileInfo");
             if (String.IsNullOrWhiteSpace(hash))
                 throw new ArgumentNullException("hash");
+            if (topHash == null)
+                throw new ArgumentNullException("topHash");
             Contract.EndContractBlock();
 
             if (fileInfo.Extension.Equals("ignore", StringComparison.InvariantCultureIgnoreCase))
-                return;
+                yield break;
             
             var url = fileInfo.AsRelativeUrlTo(FileAgent.RootPath);
 
@@ -522,7 +609,7 @@ namespace Pithos.Core.Agents
             {
                 //Abort if the file is already being uploaded or downloaded
                 if (gate.Failed)
-                    return;
+                    yield break; 
 
 
                 //Even if GetObjectInfo times out, we can proceed with the upload            
@@ -535,12 +622,11 @@ namespace Pithos.Core.Agents
                     //but store any metadata changes 
                     this.StatusKeeper.StoreInfo(fullFileName, info);
                     Trace.TraceInformation("Skip upload of {0}, hashes match", fullFileName);
-                    return;
+                    yield break;
                 }
 
                 //Mark the file as modified while we upload it
-                var setStatus = Task.Factory.StartNew(() =>
-                        StatusKeeper.SetFileOverlayStatus(fullFileName, FileOverlayStatus.Modified));
+                StatusKeeper.SetFileOverlayStatus(fullFileName, FileOverlayStatus.Modified);
                 //And then upload it
 
                 //If the file is larger than the block size, try a hashmap PUT
@@ -548,19 +634,16 @@ namespace Pithos.Core.Agents
                 {
                     //To upload using a hashmap
                     //First, calculate the tree hash
-                    var treeHash = Signature.CalculateTreeHashAsync(fileInfo.FullName, BlockSize, BlockHash);                    
-                    
-                    var putHashMap = setStatus.ContinueWith(t=>
-                        UploadWithHashMap(fileInfo,url,treeHash));
+                    var treeHash = Signature.CalculateTreeHashAsync(fileInfo.FullName, BlockSize, BlockHash);
+                    yield return treeHash;
                     
-                    putHashMap.Wait();
+                    yield return Task.Factory.Iterate(UploadWithHashMap(fileInfo,url,treeHash));
+                                        
                 }
                 else
                 {
                     //Otherwise do a regular PUT
-                    var put = setStatus.ContinueWith(t =>
-                        CloudClient.PutObject(PithosContainer,url,fullFileName,hash));
-                    put.Wait();
+                    yield return CloudClient.PutObject(PithosContainer,url,fullFileName,hash);                    
                 }
                 //If everything succeeds, change the file and overlay status to normal
                 this.StatusKeeper.SetFileState(fullFileName, FileStatus.Unchanged, FileOverlayStatus.Normal);
@@ -570,47 +653,52 @@ namespace Pithos.Core.Agents
             StatusNotification.NotifyChangedFile(fullFileName);
         }
 
-        public void UploadWithHashMap(FileInfo fileInfo,string url,Task<TreeHash> treeHash)
+        public IEnumerable<Task> UploadWithHashMap(FileInfo fileInfo,string url,Task<TreeHash> treeHash)
         {
+            if(fileInfo==null)
+                throw new ArgumentNullException("fileInfo");
+            if (String.IsNullOrWhiteSpace(url))
+                throw new ArgumentNullException(url);
+            if (treeHash==null)
+                throw new ArgumentNullException("treeHash");
+            Contract.EndContractBlock();
+
             var fullFileName = fileInfo.FullName;
 
             //Send the hashmap to the server            
             var hashPut = CloudClient.PutHashMap(PithosContainer, url, treeHash.Result);
-            var missingHashes = hashPut.Result;
-            if (missingHashes.Count == 0)
-                return;
+            yield return hashPut;
 
-            var buffer = new byte[BlockSize];                      
-            foreach (var missingHash in missingHashes)
+            var missingHashes = hashPut.Result;
+            //If the server returns no missing hashes, we are done
+            while (missingHashes.Count > 0)
             {
-                int blockIndex = -1;
-                try
+
+                var buffer = new byte[BlockSize];
+                foreach (var missingHash in missingHashes)
                 {
                     //Find the proper block
-                    blockIndex = treeHash.Result.HashDictionary[missingHash];
+                    var blockIndex = treeHash.Result.HashDictionary[missingHash];
                     var offset = blockIndex*BlockSize;
 
                     var read = fileInfo.Read(buffer, offset, BlockSize);
-                    if (read > 0)
-                    {
-                        //Copy the actual block data out of the buffer
-                        var data = new byte[read];
-                        Buffer.BlockCopy(buffer, 0, data, 0, read);
-
-                        //And POST them
-                        CloudClient.PostBlock(PithosContainer, data).Wait();
-                        Trace.TraceInformation("[BLOCK] Block {0} of {1} uploaded", blockIndex,
-                            fullFileName);
-                    }
-                }
-                catch (Exception exc)
-                {
-                    Trace.TraceError("[ERROR] uploading block {0} of {1}\n{2}", blockIndex, fullFileName, exc);
+
+                    //And upload the block                
+                    var postBlock = CloudClient.PostBlock(PithosContainer, buffer, 0, read);
+
+                    //We have to handle possible exceptions in a continuation because
+                    //*yield return* can't appear inside a try block
+                    yield return postBlock.ContinueWith(t => 
+                        t.ReportExceptions(
+                            exc=>Trace.TraceError("[ERROR] uploading block {0} of {1}\n{2}",blockIndex, fullFileName, exc),
+                            ()=>Trace.TraceInformation("[BLOCK] Block {0} of {1} uploaded", blockIndex,fullFileName)));
                 }
-            }
 
-            UploadWithHashMap(fileInfo, url, treeHash);
-            
+                //Repeat until there are no more missing hashes
+                hashPut = CloudClient.PutHashMap(PithosContainer, url, treeHash.Result);
+                yield return hashPut;
+                missingHashes = hashPut.Result;
+            }
         }
 
 
index 52d8916..6553636 100644 (file)
@@ -51,22 +51,25 @@ namespace Pithos.Core.Agents
             string fileName = Path.GetFileName(path);
 
             //Bypass deleted files, unless the status is Deleted
-            if (!(File.Exists(path) || state.Status != FileStatus.Deleted))
+            if (!File.Exists(path) && state.Status != FileStatus.Deleted)
             {
                 state.Skip = true;
                 this.StatusKeeper.RemoveFileOverlayStatus(path);
                 return CompletedTask<object>.Default;
             }
             var fileState = FileState.FindByFilePath(path);
+            var blockHash = NetworkAgent.BlockHash;
+            var blockSize = NetworkAgent.BlockSize;
+
             switch (state.Status)
             {
                 case FileStatus.Created:
                 case FileStatus.Modified:
                     var info = new FileInfo(path);
-                    NetworkAgent.Post(new CloudAction(CloudActionType.UploadUnconditional, info, ObjectInfo.Empty, fileState));
+                    NetworkAgent.Post(new CloudAction(CloudActionType.UploadUnconditional, info, ObjectInfo.Empty, fileState,blockSize,blockHash));
                     break;
                 case FileStatus.Deleted:
-                    NetworkAgent.Post(new CloudAction(CloudActionType.DeleteCloud, null, new ObjectInfo { Name = fileName }, fileState));
+                    NetworkAgent.Post(new CloudAction(CloudActionType.DeleteCloud, null, new ObjectInfo { Name = fileName }, fileState, blockSize, blockHash));
                     break;
                 case FileStatus.Renamed:
                     NetworkAgent.Post(new CloudAction(CloudActionType.RenameCloud, state.OldFileName, state.OldPath, state.FileName, state.Path));
@@ -81,22 +84,15 @@ namespace Pithos.Core.Agents
         public void RestartInterruptedFiles()
         {
             
-            StatusNotification.NotifyChange("Restart processing interrupted files", TraceLevel.Verbose);
-            var interruptedStates = new[] { FileOverlayStatus.Unversioned, FileOverlayStatus.Modified };
-
-            var pendingEntries = (from state in FileState.Queryable
-                                 where interruptedStates.Contains(state.OverlayStatus) &&
-                                       !state.FilePath.StartsWith(FragmentsPath) &&
-                                       !state.FilePath.EndsWith(".ignore")
-                                 select state).ToList();
-            var staleEntries = from state in pendingEntries
-                                  where !File.Exists(state.FilePath)
-                                  select state;
-            var staleKeys = staleEntries.Select(state=>state.Id);
-            FileState.DeleteAll(staleKeys);
-
-            var validEntries = from state in pendingEntries.Except(staleEntries)
-                             where File.Exists(state.FilePath)
+            StatusNotification.NotifyChange("Restart processing interrupted files", TraceLevel.Verbose);            
+
+            var pendingEntries = from state in FileState.Queryable
+                                   where state.FileStatus != FileStatus.Unchanged &&
+                                         !state.FilePath.StartsWith(FragmentsPath.ToLower()) &&
+                                         !state.FilePath.EndsWith(".ignore")
+                                   select state;
+
+            var validEntries = from state in pendingEntries
                              select new WorkflowState
                              {
                                  Path = state.FilePath.ToLower(),
@@ -104,7 +100,7 @@ namespace Pithos.Core.Agents
                                  Hash = state.Checksum,
                                  Status = state.OverlayStatus == FileOverlayStatus.Unversioned ?
                                                    FileStatus.Created :
-                                                   FileStatus.Modified,
+                                                   state.FileStatus,
                                  TriggeringChange = state.OverlayStatus == FileOverlayStatus.Unversioned ?
                                                    WatcherChangeTypes.Created :
                                                    WatcherChangeTypes.Changed
index 212b72e..8d04ba3 100644 (file)
@@ -10,6 +10,7 @@ using System.Threading.Tasks;
 using Castle.ActiveRecord;
 using Castle.ActiveRecord.Framework;
 using NHibernate.Engine;
+using Pithos.Core.Agents;
 using Pithos.Interfaces;
 using Pithos.Network;
 
@@ -48,8 +49,10 @@ namespace Pithos.Core
         [Property]
         public string Checksum { get; set; }
 
+/*
         [Property]
         public string TopHash { get; set; }
+*/
 
         [Property]
         public long? Version { get; set; }
@@ -73,9 +76,24 @@ namespace Pithos.Core
        
         public static FileState FindByFilePath(string absolutePath)
         {
+            if (string.IsNullOrWhiteSpace(absolutePath))
+                throw new ArgumentNullException("absolutePath");
+            Contract.EndContractBlock();
             return Queryable.FirstOrDefault(s => s.FilePath == absolutePath.ToLower());
         }
 
+        public static void DeleteByFilePath(string absolutePath)
+        {
+            if(string.IsNullOrWhiteSpace(absolutePath))
+                throw new ArgumentNullException("absolutePath");
+            Contract.EndContractBlock();
+            
+            var stateKeys = from state in FileState.Queryable
+                            where state.FilePath == absolutePath.ToLower()
+                            select state.Id;
+            DeleteAll(stateKeys);                                           
+        }
+
         public static Task<FileState> CreateForAsync(string filePath,int blockSize,string algorithm)
         {
             if (blockSize <= 0)
@@ -107,17 +125,17 @@ namespace Pithos.Core
             
             //Skip updating the hash for folders
             if (Directory.Exists(FilePath))
-                return Task.Factory.FromResult(this); 
+                return Task.Factory.FromResult(this);
 
-            var results=Task.Factory.TrackedSequence(
-                () => Task.Factory.StartNew(() => Signature.CalculateMD5(FilePath)),
-                () => Signature.CalculateTreeHashAsync(FilePath, blockSize, algorithm)
-            );
+            var results = Task.Factory.StartNew(() =>
+            {
+                var info = new FileInfo(FilePath);
+                return info.CalculateHash(blockSize, algorithm);
+            });
 
-            var state=results.Then(hashes =>
+            var state=results.Then(hash =>
             {
-                Checksum = (hashes[0] as Task<string>).Result;
-                TopHash = (hashes[1] as Task<TreeHash>).Result.TopHash.ToHashString();
+                Checksum = hash;
                 return Task.Factory.FromResult(this);
             });
             
index f004cf2..080d898 100644 (file)
@@ -19,7 +19,7 @@ namespace Pithos.Core
         void ClearFileStatus(string path);
         void SetPithosStatus(PithosStatus status);
         FileOverlayStatus GetFileOverlayStatus(string path);
-        IEnumerable<string> StoreUnversionedFiles(ParallelQuery<string> paths);
+        void ProcessExistingFiles(IEnumerable<FileInfo> paths);
         void Stop();
         void SetFileState(string path, FileStatus fileStatus, FileOverlayStatus overlayStatus);
         void StoreInfo(string path, ObjectInfo objectInfo);
@@ -77,11 +77,10 @@ namespace Pithos.Core
             return default(FileOverlayStatus);
         }
 
-        public IEnumerable<string> StoreUnversionedFiles(ParallelQuery<string> paths)
+        public void ProcessExistingFiles(IEnumerable<FileInfo> paths)
         {
             Contract.Requires(paths!=null);
 
-            return default(IEnumerable<string>);
         }
 
         public void Stop()
index 8bf3d47..bedd4c9 100644 (file)
@@ -36,7 +36,7 @@ namespace Pithos.Core
             return FileOverlayStatus.Unversioned;
         }
 
-        public IEnumerable<string> StoreUnversionedFiles(ParallelQuery<string> paths)
+        public void StoreUnversionedFiles(IEnumerable<FileInfo> paths)
         {
 
             var newFiles = (from file in paths
index 6882c8a..0c607f2 100644 (file)
     <Compile Include="PithosMonitor.cs" />
     <Compile Include="PithosWorkflow.cs" />
     <Compile Include="Properties\AssemblyInfo.cs" />
-    <Compile Include="InMemStatusChecker.cs" />
     <Compile Include="StatusInfo.cs" />
     <Compile Include="StatusService.cs" />
     <Compile Include="TaskExtensions.cs" />
index 155eb2b..9e93fdf 100644 (file)
@@ -176,12 +176,13 @@ namespace Pithos.Core
             try
             {
                 var fragmentsPath=Path.Combine(RootPath, FragmentsFolder);
+                var directory = new DirectoryInfo(path);
                 var files =
-                    from filePath in Directory.EnumerateFiles(path, "*", SearchOption.AllDirectories).AsParallel()
-                    where !filePath.StartsWith(fragmentsPath,StringComparison.InvariantCultureIgnoreCase) &&
-                            !filePath.EndsWith(".ignore",StringComparison.InvariantCultureIgnoreCase)
-                    select filePath.ToLower();
-                StatusKeeper.StoreUnversionedFiles(files);
+                    from file in directory.EnumerateFiles("*", SearchOption.AllDirectories)
+                    where !file.FullName.StartsWith(fragmentsPath,StringComparison.InvariantCultureIgnoreCase) &&
+                            !file.Extension.Equals("ignore",StringComparison.InvariantCultureIgnoreCase)
+                    select file;
+                StatusKeeper.ProcessExistingFiles(files);
                 
             }
             catch (Exception exc)
@@ -276,8 +277,17 @@ namespace Pithos.Core
                     return false;
                 if (x.LocalFile != null && y.LocalFile != null && !x.LocalFile.FullName.Equals(y.LocalFile.FullName))
                     return false;
-                if (x.CloudFile != null && y.CloudFile != null && !x.CloudFile.Hash.Equals(y.CloudFile.Hash))
-                    return false;
+                if (x.CloudFile != null && y.CloudFile != null )
+                {
+                    if (x.CloudFile.Hash == null & y.CloudFile.Hash != null)
+                        return false;
+                    if (x.CloudFile.Hash != null & y.CloudFile.Hash == null)
+                        return false;
+                    if (x.CloudFile.Hash == null & y.CloudFile.Hash == null)
+                        return (x.CloudFile.Name == y.CloudFile.Name);
+                    if (!x.CloudFile.Hash.Equals(y.CloudFile.Hash))
+                        return false;
+                }
                 if (x.CloudFile == null ^ y.CloudFile == null ||
                     x.LocalFile == null ^ y.LocalFile == null)
                     return false;
@@ -287,7 +297,7 @@ namespace Pithos.Core
             public override int GetHashCode(CloudAction obj)
             {
                 var hash1 = (obj.LocalFile == null) ? int.MaxValue : obj.LocalFile.FullName.GetHashCode();
-                var hash2 = (obj.CloudFile == null) ? int.MaxValue : obj.CloudFile.Hash.GetHashCode();
+                var hash2 = (obj.CloudFile == null) ? int.MaxValue : (obj.CloudFile.Hash ?? obj.CloudFile.Name).GetHashCode();
                 var hash3 = obj.Action.GetHashCode();
                 return hash1 ^ hash2 & hash3;
             }
index 68478c7..845632f 100644 (file)
@@ -81,7 +81,7 @@ namespace Pithos.Core
                                 var stream=File.OpenRead(path);
                                 return stream;                                
                             }
-                            catch (Exception ex)
+                            catch 
                             {
                                 Thread.Sleep(500);
                                 if (++counter > 10)
index 1b4b79b..52e903a 100644 (file)
@@ -15,6 +15,7 @@ using Castle.ActiveRecord;
 using Castle.ActiveRecord.Framework.Config;
 using NHibernate.Criterion;
 using NHibernate.Impl;
+using Pithos.Core.Agents;
 using Pithos.Interfaces;
 using Pithos.Network;
 
@@ -36,7 +37,7 @@ namespace Pithos.Core
             if (!Directory.Exists(_pithosDataPath))
                 Directory.CreateDirectory(_pithosDataPath);
 
-            File.Delete(Path.Combine(_pithosDataPath, "pithos.db"));
+            //File.Delete(Path.Combine(_pithosDataPath, "pithos.db"));
 
             var source = GetConfiguration(_pithosDataPath);
             ActiveRecordStarter.Initialize(source,typeof(FileState),typeof(FileTag));
@@ -121,45 +122,70 @@ namespace Pithos.Core
         }
        
 
-        public IEnumerable<string> StoreUnversionedFiles(ParallelQuery<string> paths)
+        public void ProcessExistingFiles(IEnumerable<FileInfo> existingFiles)
         {
-            var existingFiles = (from state in FileState.Queryable
-                                 select state.FilePath.ToLower()).ToList();
-
-            var newFiles = paths.Except(existingFiles.AsParallel());
-
+            if(existingFiles  ==null)
+                throw new ArgumentNullException("existingFiles");
+            Contract.EndContractBlock();
+            Dictionary<int, int> j;
             
-
-            newFiles.ForAll(file =>
+            //Find new or matching files with a left join to the stored states
+            var fileStates = FileState.Queryable;
+            var currentFiles=from file in existingFiles
+                      join state in fileStates on file.FullName.ToLower() equals state.FilePath.ToLower() into gs
+                      from substate in gs.DefaultIfEmpty()
+                               select new {File = file, State = substate};
+
+            //To get the deleted files we must get the states that have no corresponding
+            //files. 
+            //We can't use the File.Exists method inside a query, so we get all file paths from the states
+            var statePaths = (from state in fileStates
+                             select new {state.Id, state.FilePath}).ToList();
+            //and check each one
+            var missingStates= (from path in statePaths
+                               where !File.Exists(path.FilePath)
+                               select path.Id).ToList();
+            //Finally, retrieve the states that correspond to the deleted files            
+            var deletedFiles = from state in fileStates 
+                        where missingStates.Contains(state.Id)
+                        select new { File = default(FileInfo), State = state };
+
+            var pairs = currentFiles.Union(deletedFiles);
+
+            Parallel.ForEach(pairs, pair =>
             {
-                
-                var createState = FileState.CreateForAsync(file,this.BlockSize,this.BlockHash)
-                    .ContinueWith(state =>{                        
-                    _persistenceAgent.Post(state.Result.Create);
-                    return state.Result;
-                });
-
-                /*Func<Guid, Task<TreeHash>> treeBuilder = (stateId) => 
-                    Signature.CalculateTreeHashAsync(file, BlockSize, BlockHash)
-                    .ContinueWith(treeTask =>
+                var fileState = pair.State;
+                var file = pair.File;
+                if (fileState == null)
+                {
+                    //This is a new file
+                    var fullPath = pair.File.FullName.ToLower();
+                    var createState = FileState.CreateForAsync(fullPath, this.BlockSize, this.BlockHash);
+                    createState.ContinueWith(state => _persistenceAgent.Post(state.Result.Create));
+                }                
+                else if (file == null)
+                {
+                    //This file was deleted while we were down. We should mark it as deleted
+                    //We have to go through UpdateStatus here because the state object we are using
+                    //was created by a different ORM session.
+                    UpdateStatus(fileState.Id,state=> state.FileStatus = FileStatus.Deleted);                    
+                }
+                else
+                {
+                    //This file has a matching state. Need to check for possible changes
+                    var hashString = file.CalculateHash(BlockSize,BlockHash);
+                    //If the hashes don't match the file was changed
+                    if (fileState.Checksum != hashString)
                     {
-                        var treeHash = treeTask.Result;
-                        treeHash.FileId = stateId;
-                        return treeHash;
-                    });*/
-
-               /* var createTree=createState.ContinueWith(stateTask => 
-                    treeBuilder(stateTask.Result.Id))
-                    .Unwrap();
-
-                var saveTree=createTree.ContinueWith(treeTask =>
-                    treeTask.Result.Save(_pithosDataPath));*/
-            });
-
-            return newFiles;
-
+                        UpdateStatus(fileState.Id, state => state.FileStatus = FileStatus.Modified);
+                    }                    
+                }
+            });            
+         
         }
 
+       
+
 
         public string BlockHash { get; set; }
 
@@ -269,6 +295,30 @@ namespace Pithos.Core
                 
             });
         }
+        
+        /// <summary>
+        /// Sets the status of a specific state
+        /// </summary>
+        /// <param name="path"></param>
+        /// <param name="setter"></param>
+        private void UpdateStatus(Guid stateID, Action<FileState> setter)
+        {
+            _persistenceAgent.Post(() =>
+            {
+                using (new SessionScope())
+                {
+                    var state = FileState.Find(stateID);
+                    if (state == null)
+                    {
+                        Trace.TraceWarning("[NOFILE] Unable to set status for {0}.", stateID);
+                        return;
+                    }
+                    setter(state);
+                    state.Save();
+                }
+                
+            });
+        }
 
         public FileOverlayStatus GetFileOverlayStatus(string path)
         {
@@ -358,8 +408,6 @@ namespace Pithos.Core
                     state.Version = objectInfo.Version;
                     state.VersionTimeStamp = objectInfo.VersionTimestamp;
 
-                    if(objectInfo.Bytes>BlockSize)
-                        state.TopHash = objectInfo.Hash;
                     state.FileStatus = FileStatus.Unchanged;
                     state.OverlayStatus = FileOverlayStatus.Normal;
                     
@@ -397,8 +445,7 @@ namespace Pithos.Core
         public void ClearFileStatus(string path)
         {
             //TODO:SHOULDN'T need both clear file status and remove overlay status
-            _persistenceAgent.Post(()=>
-                FileState.DeleteAll(new[] { path.ToLower() }));   
+            _persistenceAgent.Post(()=> FileState.DeleteByFilePath(path));   
         }
 
         public void UpdateFileChecksum(string path, string checksum)
index aa1a807..b94dafb 100644 (file)
@@ -18,13 +18,13 @@ namespace Pithos.Core
         {
             return Then(first, next, CancellationToken.None);
         }
-        
-        public static Task<T2> Then<T1, T2>(this Task<T1> first, Func<T1, Task<T2>> next,CancellationToken cancellationToken)
+
+        public static Task<T2> Then<T1, T2>(this Task<T1> first, Func<T1, Task<T2>> next, CancellationToken cancellationToken)
         {
             if (first == null) throw new ArgumentNullException("first");
             if (next == null) throw new ArgumentNullException("next");
 
-            var tcs = new TaskCompletionSource<T2>();                        
+            var tcs = new TaskCompletionSource<T2>();
             first.ContinueWith(delegate
             {
                 if (first.IsFaulted) tcs.TrySetException(first.Exception.InnerExceptions);
@@ -44,16 +44,16 @@ namespace Pithos.Core
                     }
                     catch (Exception exc) { tcs.TrySetException(exc); }
                 }
-            },cancellationToken, TaskContinuationOptions.ExecuteSynchronously,TaskScheduler.Current);
+            }, cancellationToken, TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Current);
             return tcs.Task;
-        }        
+        }
 
-        public static Task Then<T1>(this Task<T1> first, Func<T1, Task> next,CancellationToken cancellationToken)
+        public static Task Then<T1>(this Task<T1> first, Func<T1, Task> next, CancellationToken cancellationToken)
         {
             if (first == null) throw new ArgumentNullException("first");
             if (next == null) throw new ArgumentNullException("next");
 
-            var tcs = new TaskCompletionSource<object>();                        
+            var tcs = new TaskCompletionSource<object>();
             first.ContinueWith(delegate
             {
                 if (first.IsFaulted) tcs.TrySetException(first.Exception.InnerExceptions);
@@ -73,9 +73,26 @@ namespace Pithos.Core
                     }
                     catch (Exception exc) { tcs.TrySetException(exc); }
                 }
-            },cancellationToken, TaskContinuationOptions.ExecuteSynchronously,TaskScheduler.Current);
+            }, cancellationToken, TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Current);
             return tcs.Task;
-        }        
+        }
+
+
+
+        public static void ReportExceptions(this Task task,Action<AggregateException> OnError,Action OnSuccess )
+        {
+            if (!task.IsCompleted) throw new InvalidOperationException("The task has not completed.");
+            if (task.IsFaulted)             
+                task.Exception.Handle(exc=>
+                                          {
+                                              OnError(task.Exception);
+                                              return true;
+                                          }); 
+            else
+            {
+                OnSuccess();
+            }
+        }
 
     }
-}
+}
\ No newline at end of file
index c7728e1..b01c7a3 100644 (file)
@@ -292,7 +292,7 @@ namespace Pithos.Network
                     }
 
                 }
-                catch(RetryException e)
+                catch(RetryException)
                 {
                     Trace.TraceWarning("[RETRY FAIL] GetObjectInfo for {0} failed.");
                     return ObjectInfo.Empty;
@@ -564,16 +564,20 @@ namespace Pithos.Network
         }
 
 
-        public Task PostBlock(string container,byte[] block)
+        public Task PostBlock(string container,byte[] block,int offset,int count)
         {
             if (String.IsNullOrWhiteSpace(container))
                 throw new ArgumentNullException("container");
             if (block == null)
                 throw new ArgumentNullException("block");
+            if (offset < 0 || offset >= block.Length)
+                throw new ArgumentOutOfRangeException("offset");
+            if (count < 0 || count > block.Length)
+                throw new ArgumentOutOfRangeException("count");
             if (String.IsNullOrWhiteSpace(Token))
                 throw new InvalidOperationException("Invalid Token");
             if (StorageUrl == null)
-                throw new InvalidOperationException("Invalid Storage Url");            
+                throw new InvalidOperationException("Invalid Storage Url");                        
             Contract.EndContractBlock();
 
             var builder = GetAddressBuilder(container, "");
@@ -795,19 +799,38 @@ namespace Pithos.Network
 
             using (var client = new RestClient(_baseClient))
             {
-                client.Headers.Add("X-Copy-From", sourceUrl);
+                client.Headers.Add("X-Move-From", sourceUrl);
                 client.PutWithRetry(targetUrl, 3);
 
                 var expectedCodes = new[] {HttpStatusCode.OK, HttpStatusCode.NoContent, HttpStatusCode.Created};
-                if (expectedCodes.Contains(client.StatusCode))
-                {
-                    this.DeleteObject(sourceContainer, oldObjectName);
-                }
-                else
+                if (!expectedCodes.Contains(client.StatusCode))
                     throw CreateWebException("MoveObject", client.StatusCode);
             }
         }
 
+        public void DeleteObject(string sourceContainer, string objectName, string targetContainer)
+        {            
+            if (String.IsNullOrWhiteSpace(sourceContainer))
+                throw new ArgumentNullException("sourceContainer", "The container property can't be empty");
+            if (String.IsNullOrWhiteSpace(objectName))
+                throw new ArgumentNullException("objectName", "The oldObjectName property can't be empty");
+            if (String.IsNullOrWhiteSpace(targetContainer))
+                throw new ArgumentNullException("targetContainer", "The container property can't be empty");
+
+            var targetUrl = targetContainer + "/" + objectName;
+            var sourceUrl = String.Format("/{0}/{1}", sourceContainer, objectName);
+
+            using (var client = new RestClient(_baseClient))
+            {
+                client.Headers.Add("X-Move-From", sourceUrl);
+                client.PutWithRetry(targetUrl, 3);
+
+                var expectedCodes = new[] {HttpStatusCode.OK, HttpStatusCode.NoContent, HttpStatusCode.Created,HttpStatusCode.NotFound};
+                if (!expectedCodes.Contains(client.StatusCode))
+                    throw CreateWebException("DeleteObject", client.StatusCode);
+            }
+        }
+
       
         private static WebException CreateWebException(string operation, HttpStatusCode statusCode)
         {
index 568b544..fde14dd 100644 (file)
@@ -31,6 +31,7 @@ namespace Pithos.Network
         
         Task GetObject(string container, string objectName, string fileName);
         Task PutObject(string container, string objectName, string fileName, string hash = null);
+        void DeleteObject(string container, string objectName, string trashContainer);
         void DeleteObject(string container, string objectName);
         void MoveObject(string sourceContainer, string oldObjectName, string targetContainer,string newObjectName);
         bool ObjectExists(string container,string objectName);
@@ -40,7 +41,7 @@ namespace Pithos.Network
 
         Task<TreeHash> GetHashMap(string container, string objectName);
         Task<IList<string>> PutHashMap(string container, string objectName, TreeHash hash);
-        Task PostBlock(string container,byte[] block);
+        Task PostBlock(string container,byte[] block,int offset,int count);
         Task<byte[]> GetBlock(string container, Uri relativeUrl, long start, long? end);
     }
 
@@ -153,12 +154,21 @@ namespace Pithos.Network
             return default(Task);
         }
 
-        public void DeleteObject(string container, string objectName)
+        public void DeleteObject(string container, string objectName, string trashContainer)
         {
             Contract.Requires(!String.IsNullOrWhiteSpace(Token));
             Contract.Requires(StorageUrl!=null);
             Contract.Requires(!String.IsNullOrWhiteSpace(container));
             Contract.Requires(!String.IsNullOrWhiteSpace(objectName));
+            Contract.Requires(!String.IsNullOrWhiteSpace(trashContainer));
+        }
+
+        public void DeleteObject(string container, string objectName)
+        {
+            Contract.Requires(!String.IsNullOrWhiteSpace(Token));
+            Contract.Requires(StorageUrl!=null);
+            Contract.Requires(!String.IsNullOrWhiteSpace(container));
+            Contract.Requires(!String.IsNullOrWhiteSpace(objectName));            
         }
 
         public void MoveObject(string sourceContainer, string oldObjectName, string targetContainer,string newObjectName)
@@ -220,13 +230,17 @@ namespace Pithos.Network
             return default(Task<IList<string>>);
         }
 
-        public Task PostBlock(string container,byte[] block)
+        public Task PostBlock(string container,byte[] block,int offset,int count)
         {
             Contract.Requires(!String.IsNullOrWhiteSpace(Token));
             Contract.Requires(StorageUrl != null);
             Contract.Requires(!String.IsNullOrWhiteSpace(container));
             Contract.Requires(block != null);
-
+            Contract.Requires(offset>=0);
+            Contract.Requires(offset < block.Length);
+            Contract.Requires(count>=0);
+            Contract.Requires(count <= block.Length);
+            
             return default(Task);
         }
 
index f3f060f..c58d469 100644 (file)
@@ -208,7 +208,8 @@ namespace Pithos.Network
                     ResponseHeaders.Clear();
 
                 TraceStart(method, uriString);
-
+                if (method == "PUT")
+                    request.ContentLength = 0;
                 var response = (HttpWebResponse)GetWebResponse(request);
                 StatusCode = response.StatusCode;
                 StatusDescription = response.StatusDescription;                
index a0a82e7..df1fb91 100644 (file)
@@ -100,31 +100,8 @@ namespace Pithos.Network
                 throw new ArgumentNullException("algorithm");
             Contract.EndContractBlock();
 
-            //DON'T calculate hashes for folders
-            if (Directory.Exists(filePath))
-                return null;
-
-
-            var list = new List<byte[]>();
-            using (var stream = new FileStream(filePath, FileMode.Open, FileAccess.Read, FileShare.ReadWrite, blockSize, false))
-            using (var hasher = HashAlgorithm.Create(algorithm))
-            {
-                var buffer = new byte[blockSize];
-                int read;
-                while ((read=stream.Read(buffer, 0, blockSize)) > 0)
-                {
-                    //This code was added for compatibility with the way Pithos calculates the last hash
-                    //We calculate the hash only up to the last non-null byte
-                    //TODO: Remove if the server starts using the full block instead of the trimmed block
-                    var lastByteIndex=Array.FindLastIndex(buffer,read-1, aByte => aByte != 0);
-                    
-                    var hash = hasher.ComputeHash(buffer, 0, lastByteIndex+1);
-                    list.Add(hash);                    
-                }
-                return new TreeHash(algorithm) { Hashes = list,                    
-                    BlockSize = blockSize, 
-                    Bytes = stream.Length};
-            }            
+            var hash=CalculateTreeHashAsync(filePath, blockSize, algorithm);
+            return hash.Result;
         }
         
         public static Task<TreeHash> CalculateTreeHashAsync(FileInfo fileInfo, int blockSize, string algorithm)