Added check and failover of hash algorithms: OpenSSL > Cng > Default
[pithos-ms-client] / trunk / Pithos.Network / BlockHashAlgorithms.cs
index 84f582f..c129a34 100644 (file)
@@ -49,6 +49,7 @@ using System.Threading;
 using System.Threading.Tasks;\r
 using System.Threading.Tasks.Dataflow;\r
 \r
+\r
 namespace Pithos.Network\r
 {\r
     using System;\r
@@ -230,7 +231,8 @@ namespace Pithos.Network
             return _bufferMgr;\r
         }\r
 \r
-        public static async Task<ConcurrentDictionary<long, byte[]>> CalculateBlockHashesInPlacePFor(FileStream stream, int blockSize, string algorithm, int parallelism,IProgress<double> progress )\r
+        //public static async Task<ConcurrentDictionary<long, byte[]>> CalculateBlockHashesInPlacePFor(FileStream stream, int blockSize, string algorithm, int parallelism, Action<long, byte[], int> postAction,CancellationToken token, IProgress<double> progress)\r
+        public static async Task<ConcurrentDictionary<long, byte[]>> CalculateBlockHashesInPlacePFor(FileStream stream, int blockSize, string algorithm, byte parallelism, CancellationToken token, IProgress<HashProgress> progress)\r
         {\r
             if (stream == null)\r
                 throw new ArgumentNullException("stream");\r
@@ -250,51 +252,74 @@ namespace Pithos.Network
             if (size == 0)\r
             {\r
                 var buf = new byte[0];\r
-                var hasher=HashAlgorithm.Create(algorithm);                \r
-                hashes[0]=hasher.ComputeHash(buf);\r
-                return hashes;\r
+                using (var hasher = HashAlgorithm.Create(algorithm))\r
+                {\r
+                    hashes[0] = hasher.ComputeHash(buf);\r
+                    return hashes;\r
+                }\r
             }\r
 \r
+            var blocks = size<blockSize?1:size/blockSize;\r
+\r
+            var actualHashers = parallelism > blocks ? (byte)blocks : parallelism;\r
 \r
-            var buffer = new byte[parallelism][];\r
-            var hashers = new HashAlgorithm[parallelism];\r
-            var bufferManager = GetBufferManager(blockSize, parallelism);\r
-            for (var i = 0; i < parallelism; i++)\r
+            var buffer = new byte[actualHashers][];\r
+            var hashers = new HashAlgorithm[actualHashers];\r
+            var bufferManager = GetBufferManager(blockSize, actualHashers);\r
+            for (var i = 0; i < actualHashers; i++)\r
             {\r
                 buffer[i] = bufferManager.TakeBuffer(blockSize);// new byte[blockSize];\r
-                hashers[i] = HashAlgorithm.Create(algorithm);\r
+                hashers[i] = HashAlgorithm.Create(algorithm);                \r
             }\r
             try\r
             {\r
-                var indices = new long[parallelism];\r
-                var bufferCount = new int[parallelism];\r
+                var indices = new long[actualHashers];\r
+                var bufferCount = new int[actualHashers];\r
 \r
                 int read;\r
                 int bufIdx = 0;\r
                 long index = 0;\r
 \r
+                //long block = 0;\r
 \r
                 while ((read = await stream.ReadAsync(buffer[bufIdx], 0, blockSize).ConfigureAwait(false)) > 0)\r
                 {\r
                     index += read;\r
                     indices[bufIdx] = index;\r
                     bufferCount[bufIdx] = read;\r
+                    //postAction(block++, buffer[bufIdx], read);\r
+\r
                     //If we have filled the last buffer or if we have read from the last block,\r
                     //we can calculate the clocks in parallel\r
-                    if (bufIdx == parallelism - 1 || read < blockSize)\r
+                    if (bufIdx == actualHashers - 1 || read < blockSize)\r
                     {\r
-                        //var options = new ParallelOptions {MaxDegreeOfParallelism = parallelism};\r
-                        Parallel.For(0, bufIdx + 1, idx =>\r
+                        var options = new ParallelOptions {MaxDegreeOfParallelism = actualHashers,CancellationToken=token};\r
+\r
+                        Parallel.For(0, bufIdx + 1, options,(idx,state) =>\r
                                                         {\r
                                                             //This code was added for compatibility with the way Pithos calculates the last hash\r
                                                             //We calculate the hash only up to the last non-null byte\r
+                                                            options.CancellationToken.ThrowIfCancellationRequested();\r
                                                             var lastByteIndex = Array.FindLastIndex(buffer[idx],\r
                                                                                                     bufferCount[idx] - 1,\r
                                                                                                     aByte => aByte != 0);\r
 \r
                                                             var hasher = hashers[idx];\r
-                                                            var hash = hasher.ComputeHash(buffer[idx], 0,\r
-                                                                                          lastByteIndex + 1);\r
+\r
+                                                            byte[] hash=hasher.ComputeHash(buffer[idx],0,lastByteIndex+1);\r
+/*\r
+                                                            if (buffer[idx].Length == lastByteIndex || lastByteIndex==-1)\r
+                                                                hash = hasher.Digest(buffer[idx]);\r
+                                                            else\r
+                                                            {\r
+                                                                var buf=new byte[lastByteIndex+1];\r
+                                                                Buffer.BlockCopy(buffer[idx],0,buf,0,lastByteIndex+1);\r
+                                                                hash = hasher.Digest(buf);\r
+                                                            }\r
+*/\r
+\r
+                                                            \r
+                                                            \r
                                                             var filePosition = indices[idx];\r
                                                             /*\r
                                                         Trace.TraceInformation("Hashed [{0}] [{1}/{2}] [{3:p}]", path,\r
@@ -302,15 +327,17 @@ namespace Pithos.Network
                                                                                 (double)filePosition / size);\r
                             */\r
                                                             hashes[filePosition] = hash;\r
-                                                            progress.Report((long)hashes.Count*blockSize*1.0/stream.Length);\r
+                                                            //Do not report for files smaller than 4MB\r
+                                                            if (progress != null && stream.Length > 4*1024*1024)\r
+                                                                progress.Report(new HashProgress((long)hashes.Count*blockSize,stream.Length));\r
                                                         });\r
                     }\r
-                    bufIdx = (bufIdx + 1)%parallelism;\r
+                    bufIdx = (bufIdx +1)%actualHashers;\r
                 }\r
             }\r
             finally\r
             {\r
-                for (var i = 0; i < parallelism; i++)\r
+                for (var i = 0; i < actualHashers; i++)\r
                 {\r
                     if (hashers[i] != null)\r
                         hashers[i].Dispose();\r