Replaced hashing algorithm with inplace version
authorPanagiotis Kanavos <pkanavos@gmail.com>
Sat, 3 Mar 2012 19:37:39 +0000 (21:37 +0200)
committerPanagiotis Kanavos <pkanavos@gmail.com>
Sat, 3 Mar 2012 19:37:39 +0000 (21:37 +0200)
trunk/Pithos.Client.WPF/Shell/ShellViewModel.cs
trunk/Pithos.Core/Agents/NetworkAgent.cs
trunk/Pithos.Core/IPithosWorkflow.cs
trunk/Pithos.Network/AccountInfo.cs
trunk/Pithos.Network/BlockHashAlgorithms.cs
trunk/Pithos.Network/Signature.cs

index a12f712..05a794d 100644 (file)
@@ -409,10 +409,11 @@ namespace Pithos.Client.WPF {
 
                public void GoToSite(AccountInfo account)
                {
-                       Process.Start(account.SiteUri);
+                   var uri = account.SiteUri.Replace("http://","https://");            
+                   Process.Start(uri);
                }
 
-        /// <summary>
+           /// <summary>
         /// Open an explorer window to the target path's directory
         /// and select the file
         /// </summary>
index 85c40ad..47c4c37 100644 (file)
@@ -669,12 +669,24 @@ namespace Pithos.Core.Agents
             {
                 try
                 {
+
                     var accountInfo = action.AccountInfo;
 
                     var fileInfo = action.LocalFile;
 
                     if (fileInfo.Extension.Equals("ignore", StringComparison.InvariantCultureIgnoreCase))
                         return;
+                    //Do not upload files in conflict
+                    if (action.FileState.FileStatus == FileStatus.Conflict )
+                    {
+                        Log.InfoFormat("Skipping file in conflict [{0}]",fileInfo.FullName);
+                        return;
+                    }
+                    if (action.FileState.FileStatus == FileStatus.Forbidden)
+                    {
+                        Log.InfoFormat("Skipping forbidden file [{0}]",fileInfo.FullName);
+                        return;
+                    }
 
                     var relativePath = fileInfo.AsRelativeTo(accountInfo.AccountPath);
                     if (relativePath.StartsWith(FolderConstants.OthersFolder))
@@ -707,6 +719,8 @@ namespace Pithos.Core.Agents
 
                         var cloudFile = action.CloudFile;
                         var account = cloudFile.Account ?? accountInfo.UserName;
+                        try
+                        {
 
                         var client = new CloudFilesClient(accountInfo);
                         //Even if GetObjectInfo times out, we can proceed with the upload            
@@ -717,49 +731,60 @@ namespace Pithos.Core.Agents
                             return;
 
                         //TODO: Check how a directory hash is calculated -> All dirs seem to have the same hash
-                        if (fileInfo is DirectoryInfo)
-                        {
-                            //If the directory doesn't exist the Hash property will be empty
-                            if (String.IsNullOrWhiteSpace(info.Hash))
-                                //Go on and create the directory
-                                await
-                                    client.PutObject(account, cloudFile.Container, cloudFile.Name, fullFileName,
-                                                     String.Empty, "application/directory");
-                        }
-                        else
-                        {
+                            if (fileInfo is DirectoryInfo)
+                            {
+                                //If the directory doesn't exist the Hash property will be empty
+                                if (String.IsNullOrWhiteSpace(info.Hash))
+                                    //Go on and create the directory
+                                    await
+                                        client.PutObject(account, cloudFile.Container, cloudFile.Name, fullFileName,
+                                                         String.Empty, "application/directory");
+                            }
+                            else
+                            {
 
-                            var cloudHash = info.Hash.ToLower();
+                                var cloudHash = info.Hash.ToLower();
 
-                            var hash = action.LocalHash.Value;
-                            var topHash = action.TopHash.Value;
+                                var hash = action.LocalHash.Value;
+                                var topHash = action.TopHash.Value;
 
-                            //If the file hashes match, abort the upload
-                            if (hash == cloudHash || topHash == cloudHash)
-                            {
-                                //but store any metadata changes 
-                                StatusKeeper.StoreInfo(fullFileName, info);
-                                Log.InfoFormat("Skip upload of {0}, hashes match", fullFileName);
-                                return;
-                            }
+                                //If the file hashes match, abort the upload
+                                if (hash == cloudHash || topHash == cloudHash)
+                                {
+                                    //but store any metadata changes 
+                                    StatusKeeper.StoreInfo(fullFileName, info);
+                                    Log.InfoFormat("Skip upload of {0}, hashes match", fullFileName);
+                                    return;
+                                }
 
 
-                            //Mark the file as modified while we upload it
-                            StatusKeeper.SetFileOverlayStatus(fullFileName, FileOverlayStatus.Modified);
-                            //And then upload it
+                                //Mark the file as modified while we upload it
+                                StatusKeeper.SetFileOverlayStatus(fullFileName, FileOverlayStatus.Modified);
+                                //And then upload it
 
-                            //Upload even small files using the Hashmap. The server may already contain
-                            //the relevant block
+                                //Upload even small files using the Hashmap. The server may already contain
+                                //the relevant block
 
-                            //First, calculate the tree hash
-                            var treeHash = await Signature.CalculateTreeHashAsync(fullFileName, accountInfo.BlockSize,
-                                                                                  accountInfo.BlockHash, 2);
+                                //First, calculate the tree hash
+                                var treeHash = await Signature.CalculateTreeHashAsync(fullFileName, accountInfo.BlockSize,
+                                                                                      accountInfo.BlockHash, 2);
 
-                            await
-                                UploadWithHashMap(accountInfo, cloudFile, fileInfo as FileInfo, cloudFile.Name, treeHash);
+                                //TODO: If the upload fails with a 403, abort it and mark conflict
+
+                                await
+                                    UploadWithHashMap(accountInfo, cloudFile, fileInfo as FileInfo, cloudFile.Name, treeHash);
+                            }
+                            //If everything succeeds, change the file and overlay status to normal
+                            StatusKeeper.SetFileState(fullFileName, FileStatus.Unchanged, FileOverlayStatus.Normal);
+                        }
+                        catch (WebException exc)
+                        {
+                            var response=(exc.Response as HttpWebResponse);
+                            if (response.StatusCode == HttpStatusCode.Forbidden)
+                            {
+                                StatusKeeper.SetFileState(fileInfo.FullName,FileStatus.Forbidden, FileOverlayStatus.Conflict);                                
+                            }
                         }
-                        //If everything succeeds, change the file and overlay status to normal
-                        StatusKeeper.SetFileState(fullFileName, FileStatus.Unchanged, FileOverlayStatus.Normal);
                     }
                     //Notify the Shell to update the overlays
                     NativeMethods.RaiseChangeNotification(fullFileName);
index 18cc984..dd24888 100644 (file)
@@ -63,6 +63,7 @@ namespace Pithos.Core
         Renamed,
         Deleted,
         Conflict,
-        Unversioned
+        Unversioned,
+        Forbidden
     }
 }
\ No newline at end of file
index 3a95c60..2f40246 100644 (file)
@@ -76,7 +76,15 @@ namespace Pithos.Network
 
         }
 
-        public string SiteUri { get; set; }
+        private string _siteUri;
+        public string SiteUri
+        {
+            get { return _siteUri; }
+            set
+            {
+                _siteUri = value;
+            }
+        }
 
         public List<Group> Groups { get; set; }
     }
index 150e052..42df962 100644 (file)
@@ -50,9 +50,6 @@ using System.Threading.Tasks.Dataflow;
 namespace Pithos.Network\r
 {\r
     using System;\r
-    using System.Collections.Generic;\r
-    using System.Linq;\r
-    using System.Text;\r
 \r
     /// <summary>\r
     /// TODO: Update summary.\r
@@ -61,6 +58,7 @@ namespace Pithos.Network
     {\r
         private static readonly log4net.ILog Log = log4net.LogManager.GetLogger(MethodBase.GetCurrentMethod().DeclaringType);\r
 \r
+/*\r
         public static Func<FileStream, int, string, ConcurrentDictionary<int, byte[]>, int, Task<ConcurrentDictionary<int, byte[]>>> CalculateBlockHash;\r
 \r
         public static Task<ConcurrentDictionary<int, byte[]>> CalculateBlockHashesRecursiveAsync(FileStream stream, int blockSize, string algorithm, ConcurrentDictionary<int, byte[]> hashes = null, int index = 0)\r
@@ -118,8 +116,6 @@ namespace Pithos.Network
             int read;\r
             while ((read = await stream.ReadAsync(buffer, 0, blockSize)) > 0)\r
             {\r
-                //TODO: identify the value of index\r
-\r
                 using (var hasher = HashAlgorithm.Create(algorithm))\r
                 {\r
                     //This code was added for compatibility with the way Pithos calculates the last hash\r
@@ -130,7 +126,7 @@ namespace Pithos.Network
                     hashes[index] = hash;\r
                 }\r
                 index += read;\r
-            };\r
+            }\r
             return hashes;\r
         }\r
         \r
@@ -150,7 +146,6 @@ namespace Pithos.Network
             var size = stream.Length;\r
             Log.DebugFormat("Hashing [{0}] size [{1}]",path,size);\r
             \r
-/*\r
             var options = new ExecutionDataflowBlockOptions {BoundedCapacity = parallelism,MaxDegreeOfParallelism=parallelism};\r
             var hashBlock=new ActionBlock<Tuple<int,byte[]>>(input=>\r
                               {\r
@@ -166,17 +161,49 @@ namespace Pithos.Network
                                       hashes[idx] = hash;\r
                                   }                                  \r
                               },options);\r
-*/\r
 \r
             var buffer = new byte[blockSize];\r
             int read;\r
             int index = 0;\r
+            while ((read = await stream.ReadAsync(buffer, 0, blockSize)) > 0)\r
+            {\r
+                var block = new byte[read];\r
+                Buffer.BlockCopy(buffer, 0, block, 0, read);\r
+                await hashBlock.SendAsync(Tuple.Create(index, block));\r
+                index += read;\r
+            }\r
+            \r
+\r
+            hashBlock.Complete();\r
+            await hashBlock.Completion;\r
+\r
+            return hashes;\r
+        } \r
+        \r
+        public static async Task<ConcurrentDictionary<int, byte[]>> CalculateBlockHashesInPlace(FileStream stream, int blockSize, string algorithm, int parallelism)\r
+        {\r
+            if (stream == null)\r
+                throw new ArgumentNullException("stream");\r
+            if (String.IsNullOrWhiteSpace(algorithm))\r
+                throw new ArgumentNullException("algorithm");\r
+            if (blockSize <= 0)\r
+                throw new ArgumentOutOfRangeException("blockSize", "blockSize must be a value greater than zero ");\r
+            Contract.EndContractBlock();\r
+\r
+            var hashes = new ConcurrentDictionary<int, byte[]>();\r
+\r
+            var path = stream.Name;\r
+            var size = stream.Length;\r
+            Log.DebugFormat("Hashing [{0}] size [{1}]",path,size);\r
+            \r
+\r
+            var buffer = new byte[blockSize];\r
+            var index = 0;\r
             using (var hasher = HashAlgorithm.Create(algorithm))\r
             {\r
+                int read;\r
                 while ((read = await stream.ReadAsync(buffer, 0, blockSize)) > 0)\r
                 {\r
-                    //                var block = new byte[read];\r
-\r
                     //This code was added for compatibility with the way Pithos calculates the last hash\r
                     //We calculate the hash only up to the last non-null byte\r
                     var lastByteIndex = Array.FindLastIndex(buffer, read - 1, aByte => aByte != 0);\r
@@ -186,26 +213,93 @@ namespace Pithos.Network
                     hashes[index] = hash;\r
                     index += read;\r
                 }\r
+            }\r
+            return hashes;\r
+        }\r
+*/\r
+\r
+        public static async Task<ConcurrentDictionary<long, byte[]>> CalculateBlockHashesInPlacePFor(FileStream stream, int blockSize, string algorithm, int parallelism)\r
+        {\r
+            if (stream == null)\r
+                throw new ArgumentNullException("stream");\r
+            if (String.IsNullOrWhiteSpace(algorithm))\r
+                throw new ArgumentNullException("algorithm");\r
+            if (blockSize <= 0)\r
+                throw new ArgumentOutOfRangeException("blockSize", "blockSize must be a value greater than zero ");\r
+            Contract.EndContractBlock();\r
+\r
+            var hashes = new ConcurrentDictionary<long, byte[]>();\r
+\r
+            var path = stream.Name;\r
+            var size = stream.Length;\r
+            Log.DebugFormat("Hashing [{0}] size [{1}]", path, size);\r
 \r
-                /*\r
-                                Buffer.BlockCopy(buffer,0,block,0,read);\r
-                                await hashBlock.SendAsync(Tuple.Create(index, block));\r
-                */\r
-                \r
+\r
+            var buffer = new byte[parallelism][];\r
+            var hashers = new HashAlgorithm[parallelism];\r
+            for (var i = 0; i < parallelism; i++)\r
+            {\r
+                buffer[i] = new byte[blockSize];\r
+                hashers[i] = HashAlgorithm.Create(algorithm);\r
             }\r
-            \r
+            try\r
+            {\r
+                var indices = new long[parallelism];\r
+                var bufferCount = new int[parallelism];\r
 \r
-/*\r
-            hashBlock.Complete();\r
-            await hashBlock.Completion;\r
-*/\r
+                int read;\r
+                int bufIdx = 0;\r
+                long index = 0;\r
+                while ((read = await stream.ReadAsync(buffer[bufIdx], 0, blockSize)) > 0)\r
+                {\r
+                    index += read;\r
+                    indices[bufIdx] = index;\r
+                    bufferCount[bufIdx] = read;\r
+                    //If we have filled the last buffer or if we have read from the last block,\r
+                    //we can calculate the clocks in parallel\r
+                    if (bufIdx == parallelism - 1 || read < blockSize)\r
+                    {\r
+                        //var options = new ParallelOptions {MaxDegreeOfParallelism = parallelism};\r
+                        Parallel.For(0, bufIdx + 1, idx =>\r
+                        {\r
+                            //This code was added for compatibility with the way Pithos calculates the last hash\r
+                            //We calculate the hash only up to the last non-null byte\r
+                            var lastByteIndex = Array.FindLastIndex(buffer[idx],\r
+                                                                    bufferCount[idx] - 1,\r
+                                                                    aByte => aByte != 0);\r
+\r
+                            var hasher = hashers[idx];\r
+                            var hash = hasher.ComputeHash(buffer[idx], 0, lastByteIndex + 1);\r
+                            var filePosition = indices[idx];\r
+                            /*\r
+                                                        Trace.TraceInformation("Hashed [{0}] [{1}/{2}] [{3:p}]", path,\r
+                                                                                filePosition, size,\r
+                                                                                (double)filePosition / size);\r
+                            */\r
+                            hashes[filePosition] = hash;\r
+                        });\r
+                    }\r
+                    bufIdx = (bufIdx + 1) % parallelism;\r
+                }\r
+            }\r
+            finally\r
+            {\r
+                for (var i = 0; i < parallelism; i++)\r
+                {\r
+                    if (hashers[i] != null)\r
+                        hashers[i].Dispose();\r
+                }\r
+\r
+            }\r
 \r
             return hashes;\r
         }\r
 \r
         static BlockHashAlgorithms()\r
         {\r
+/*\r
             CalculateBlockHash = CalculateBlockHashesRecursiveAsync;\r
+*/\r
         }\r
     }\r
 }\r
index 296e810..e696bd3 100644 (file)
@@ -188,7 +188,7 @@ namespace Pithos.Network
             using (var stream = new FileStream(filePath, FileMode.Open, FileAccess.Read, FileShare.ReadWrite, blockSize, true))
             {
                 //Calculate the blocks asyncrhonously
-                var hashes = await BlockHashAlgorithms.CalculateBlockHashesAgentAsync(stream, blockSize, algorithm, parallelism);                
+                var hashes = await BlockHashAlgorithms.CalculateBlockHashesInPlacePFor(stream, blockSize, algorithm, parallelism);                
 
                 //And then proceed with creating and returning a TreeHash
                 var length = stream.Length;