Added check and failover of hash algorithms: OpenSSL > Cng > Default
[pithos-ms-client] / trunk / Pithos.Network / BlockHashAlgorithms.cs
index 150e052..c129a34 100644 (file)
@@ -44,15 +44,15 @@ using System.Diagnostics.Contracts;
 using System.IO;\r
 using System.Reflection;\r
 using System.Security.Cryptography;\r
+using System.ServiceModel.Channels;\r
+using System.Threading;\r
 using System.Threading.Tasks;\r
 using System.Threading.Tasks.Dataflow;\r
 \r
+\r
 namespace Pithos.Network\r
 {\r
     using System;\r
-    using System.Collections.Generic;\r
-    using System.Linq;\r
-    using System.Text;\r
 \r
     /// <summary>\r
     /// TODO: Update summary.\r
@@ -61,6 +61,7 @@ namespace Pithos.Network
     {\r
         private static readonly log4net.ILog Log = log4net.LogManager.GetLogger(MethodBase.GetCurrentMethod().DeclaringType);\r
 \r
+/*\r
         public static Func<FileStream, int, string, ConcurrentDictionary<int, byte[]>, int, Task<ConcurrentDictionary<int, byte[]>>> CalculateBlockHash;\r
 \r
         public static Task<ConcurrentDictionary<int, byte[]>> CalculateBlockHashesRecursiveAsync(FileStream stream, int blockSize, string algorithm, ConcurrentDictionary<int, byte[]> hashes = null, int index = 0)\r
@@ -118,8 +119,6 @@ namespace Pithos.Network
             int read;\r
             while ((read = await stream.ReadAsync(buffer, 0, blockSize)) > 0)\r
             {\r
-                //TODO: identify the value of index\r
-\r
                 using (var hasher = HashAlgorithm.Create(algorithm))\r
                 {\r
                     //This code was added for compatibility with the way Pithos calculates the last hash\r
@@ -130,7 +129,7 @@ namespace Pithos.Network
                     hashes[index] = hash;\r
                 }\r
                 index += read;\r
-            };\r
+            }\r
             return hashes;\r
         }\r
         \r
@@ -150,7 +149,6 @@ namespace Pithos.Network
             var size = stream.Length;\r
             Log.DebugFormat("Hashing [{0}] size [{1}]",path,size);\r
             \r
-/*\r
             var options = new ExecutionDataflowBlockOptions {BoundedCapacity = parallelism,MaxDegreeOfParallelism=parallelism};\r
             var hashBlock=new ActionBlock<Tuple<int,byte[]>>(input=>\r
                               {\r
@@ -166,17 +164,49 @@ namespace Pithos.Network
                                       hashes[idx] = hash;\r
                                   }                                  \r
                               },options);\r
-*/\r
 \r
             var buffer = new byte[blockSize];\r
             int read;\r
             int index = 0;\r
+            while ((read = await stream.ReadAsync(buffer, 0, blockSize)) > 0)\r
+            {\r
+                var block = new byte[read];\r
+                Buffer.BlockCopy(buffer, 0, block, 0, read);\r
+                await hashBlock.SendAsync(Tuple.Create(index, block));\r
+                index += read;\r
+            }\r
+            \r
+\r
+            hashBlock.Complete();\r
+            await hashBlock.Completion;\r
+\r
+            return hashes;\r
+        } \r
+        \r
+        public static async Task<ConcurrentDictionary<int, byte[]>> CalculateBlockHashesInPlace(FileStream stream, int blockSize, string algorithm, int parallelism)\r
+        {\r
+            if (stream == null)\r
+                throw new ArgumentNullException("stream");\r
+            if (String.IsNullOrWhiteSpace(algorithm))\r
+                throw new ArgumentNullException("algorithm");\r
+            if (blockSize <= 0)\r
+                throw new ArgumentOutOfRangeException("blockSize", "blockSize must be a value greater than zero ");\r
+            Contract.EndContractBlock();\r
+\r
+            var hashes = new ConcurrentDictionary<int, byte[]>();\r
+\r
+            var path = stream.Name;\r
+            var size = stream.Length;\r
+            Log.DebugFormat("Hashing [{0}] size [{1}]",path,size);\r
+            \r
+\r
+            var buffer = new byte[blockSize];\r
+            var index = 0;\r
             using (var hasher = HashAlgorithm.Create(algorithm))\r
             {\r
+                int read;\r
                 while ((read = await stream.ReadAsync(buffer, 0, blockSize)) > 0)\r
                 {\r
-                    //                var block = new byte[read];\r
-\r
                     //This code was added for compatibility with the way Pithos calculates the last hash\r
                     //We calculate the hash only up to the last non-null byte\r
                     var lastByteIndex = Array.FindLastIndex(buffer, read - 1, aByte => aByte != 0);\r
@@ -186,26 +216,144 @@ namespace Pithos.Network
                     hashes[index] = hash;\r
                     index += read;\r
                 }\r
+            }\r
+            return hashes;\r
+        }\r
+*/\r
+\r
+        private static BufferManager _bufferMgr;\r
 \r
-                /*\r
-                                Buffer.BlockCopy(buffer,0,block,0,read);\r
-                                await hashBlock.SendAsync(Tuple.Create(index, block));\r
-                */\r
-                \r
+        private static BufferManager GetBufferManager(int blockSize,int parallelism)\r
+        {\r
+            Interlocked.CompareExchange(ref _bufferMgr,\r
+                                        BufferManager.CreateBufferManager(parallelism*blockSize, blockSize),\r
+                                        null);\r
+            return _bufferMgr;\r
+        }\r
+\r
+        //public static async Task<ConcurrentDictionary<long, byte[]>> CalculateBlockHashesInPlacePFor(FileStream stream, int blockSize, string algorithm, int parallelism, Action<long, byte[], int> postAction,CancellationToken token, IProgress<double> progress)\r
+        public static async Task<ConcurrentDictionary<long, byte[]>> CalculateBlockHashesInPlacePFor(FileStream stream, int blockSize, string algorithm, byte parallelism, CancellationToken token, IProgress<HashProgress> progress)\r
+        {\r
+            if (stream == null)\r
+                throw new ArgumentNullException("stream");\r
+            if (String.IsNullOrWhiteSpace(algorithm))\r
+                throw new ArgumentNullException("algorithm");\r
+            if (blockSize <= 0)\r
+                throw new ArgumentOutOfRangeException("blockSize", "blockSize must be a value greater than zero ");\r
+            Contract.EndContractBlock();\r
+\r
+            var hashes = new ConcurrentDictionary<long, byte[]>();\r
+\r
+            var path = stream.Name;\r
+            var size = stream.Length;\r
+            Log.DebugFormat("Hashing [{0}] size [{1}]", path, size);\r
+\r
+            //TODO: Handle zero-length files\r
+            if (size == 0)\r
+            {\r
+                var buf = new byte[0];\r
+                using (var hasher = HashAlgorithm.Create(algorithm))\r
+                {\r
+                    hashes[0] = hasher.ComputeHash(buf);\r
+                    return hashes;\r
+                }\r
             }\r
-            \r
 \r
+            var blocks = size<blockSize?1:size/blockSize;\r
+\r
+            var actualHashers = parallelism > blocks ? (byte)blocks : parallelism;\r
+\r
+            var buffer = new byte[actualHashers][];\r
+            var hashers = new HashAlgorithm[actualHashers];\r
+            var bufferManager = GetBufferManager(blockSize, actualHashers);\r
+            for (var i = 0; i < actualHashers; i++)\r
+            {\r
+                buffer[i] = bufferManager.TakeBuffer(blockSize);// new byte[blockSize];\r
+                hashers[i] = HashAlgorithm.Create(algorithm);                \r
+            }\r
+            try\r
+            {\r
+                var indices = new long[actualHashers];\r
+                var bufferCount = new int[actualHashers];\r
+\r
+                int read;\r
+                int bufIdx = 0;\r
+                long index = 0;\r
+\r
+                //long block = 0;\r
+\r
+                while ((read = await stream.ReadAsync(buffer[bufIdx], 0, blockSize).ConfigureAwait(false)) > 0)\r
+                {\r
+                    index += read;\r
+                    indices[bufIdx] = index;\r
+                    bufferCount[bufIdx] = read;\r
+                    //postAction(block++, buffer[bufIdx], read);\r
+\r
+                    //If we have filled the last buffer or if we have read from the last block,\r
+                    //we can calculate the clocks in parallel\r
+                    if (bufIdx == actualHashers - 1 || read < blockSize)\r
+                    {\r
+                        var options = new ParallelOptions {MaxDegreeOfParallelism = actualHashers,CancellationToken=token};\r
+\r
+                        Parallel.For(0, bufIdx + 1, options,(idx,state) =>\r
+                                                        {\r
+                                                            //This code was added for compatibility with the way Pithos calculates the last hash\r
+                                                            //We calculate the hash only up to the last non-null byte\r
+                                                            options.CancellationToken.ThrowIfCancellationRequested();\r
+                                                            var lastByteIndex = Array.FindLastIndex(buffer[idx],\r
+                                                                                                    bufferCount[idx] - 1,\r
+                                                                                                    aByte => aByte != 0);\r
+\r
+                                                            var hasher = hashers[idx];\r
+\r
+                                                            byte[] hash=hasher.ComputeHash(buffer[idx],0,lastByteIndex+1);\r
 /*\r
-            hashBlock.Complete();\r
-            await hashBlock.Completion;\r
+                                                            if (buffer[idx].Length == lastByteIndex || lastByteIndex==-1)\r
+                                                                hash = hasher.Digest(buffer[idx]);\r
+                                                            else\r
+                                                            {\r
+                                                                var buf=new byte[lastByteIndex+1];\r
+                                                                Buffer.BlockCopy(buffer[idx],0,buf,0,lastByteIndex+1);\r
+                                                                hash = hasher.Digest(buf);\r
+                                                            }\r
 */\r
 \r
+                                                            \r
+                                                            \r
+                                                            var filePosition = indices[idx];\r
+                                                            /*\r
+                                                        Trace.TraceInformation("Hashed [{0}] [{1}/{2}] [{3:p}]", path,\r
+                                                                                filePosition, size,\r
+                                                                                (double)filePosition / size);\r
+                            */\r
+                                                            hashes[filePosition] = hash;\r
+                                                            //Do not report for files smaller than 4MB\r
+                                                            if (progress != null && stream.Length > 4*1024*1024)\r
+                                                                progress.Report(new HashProgress((long)hashes.Count*blockSize,stream.Length));\r
+                                                        });\r
+                    }\r
+                    bufIdx = (bufIdx +1)%actualHashers;\r
+                }\r
+            }\r
+            finally\r
+            {\r
+                for (var i = 0; i < actualHashers; i++)\r
+                {\r
+                    if (hashers[i] != null)\r
+                        hashers[i].Dispose();\r
+                    bufferManager.ReturnBuffer(buffer[i]);\r
+                }\r
+\r
+            }\r
+\r
             return hashes;\r
         }\r
 \r
         static BlockHashAlgorithms()\r
         {\r
+/*\r
             CalculateBlockHash = CalculateBlockHashesRecursiveAsync;\r
+*/\r
         }\r
     }\r
 }\r