Modified TreeHash calculations to compute MD5 in parallel with SHA block hashing
authorpkanavos <pkanavos@gmail.com>
Mon, 25 Jun 2012 17:14:50 +0000 (20:14 +0300)
committerpkanavos <pkanavos@gmail.com>
Mon, 25 Jun 2012 17:14:50 +0000 (20:14 +0300)
trunk/Pithos.Network/BlockHashAlgorithms.cs
trunk/Pithos.Network/MD5BlockCalculator.cs [new file with mode: 0644]
trunk/Pithos.Network/Pithos.Network.csproj
trunk/Pithos.Network/Signature.cs

index 84f582f..13b203b 100644 (file)
@@ -230,7 +230,7 @@ namespace Pithos.Network
             return _bufferMgr;\r
         }\r
 \r
-        public static async Task<ConcurrentDictionary<long, byte[]>> CalculateBlockHashesInPlacePFor(FileStream stream, int blockSize, string algorithm, int parallelism,IProgress<double> progress )\r
+        public static async Task<ConcurrentDictionary<long, byte[]>> CalculateBlockHashesInPlacePFor(FileStream stream, int blockSize, string algorithm, int parallelism, Action<long, byte[], int> postAction, IProgress<double> progress)\r
         {\r
             if (stream == null)\r
                 throw new ArgumentNullException("stream");\r
@@ -273,12 +273,15 @@ namespace Pithos.Network
                 int bufIdx = 0;\r
                 long index = 0;\r
 \r
+                long block = 0;\r
 \r
                 while ((read = await stream.ReadAsync(buffer[bufIdx], 0, blockSize).ConfigureAwait(false)) > 0)\r
                 {\r
                     index += read;\r
                     indices[bufIdx] = index;\r
                     bufferCount[bufIdx] = read;\r
+                    postAction(block++, buffer[bufIdx], read);\r
+\r
                     //If we have filled the last buffer or if we have read from the last block,\r
                     //we can calculate the clocks in parallel\r
                     if (bufIdx == parallelism - 1 || read < blockSize)\r
@@ -305,7 +308,7 @@ namespace Pithos.Network
                                                             progress.Report((long)hashes.Count*blockSize*1.0/stream.Length);\r
                                                         });\r
                     }\r
-                    bufIdx = (bufIdx + 1)%parallelism;\r
+                    bufIdx = (bufIdx +1)%parallelism;\r
                 }\r
             }\r
             finally\r
diff --git a/trunk/Pithos.Network/MD5BlockCalculator.cs b/trunk/Pithos.Network/MD5BlockCalculator.cs
new file mode 100644 (file)
index 0000000..9b230a2
--- /dev/null
@@ -0,0 +1,67 @@
+using System;
+using System.Collections.Generic;
+using System.Diagnostics;
+using System.Linq;
+using System.Security.Cryptography;
+using System.Text;
+using System.Threading;
+using System.Threading.Tasks;
+using System.Threading.Tasks.Dataflow;
+
+namespace Pithos.Network
+{
+    class MD5BlockCalculator:IDisposable
+    {
+        private HashAlgorithm _hasher = HashAlgorithm.Create("md5");
+
+        private ActionBlock<Tuple<long, byte[],int>>  _actionBlock;
+
+        private long _currentBlock = 0;
+
+        public MD5BlockCalculator()
+        {
+            _actionBlock=new ActionBlock<Tuple<long, byte[],int>>(t=> ProcessBlock(t));
+        }
+
+        private void ProcessBlock(Tuple<long,byte[],int> tuple)
+        {
+            if (tuple.Item1 == _currentBlock)
+            {
+                _hasher.TransformBlock(tuple.Item2, 0, tuple.Item3, null, 0);
+                Interlocked.Increment(ref _currentBlock);
+            }
+            else
+            {
+                _actionBlock.Post(tuple);
+            }
+        }
+
+        public void PostBlock(long blockIndex,byte[] buffer,int size)
+        {
+            _actionBlock.Post(Tuple.Create(blockIndex, buffer, size));
+        }
+
+        public async Task<string> GetHash()
+        {
+            _actionBlock.Complete();
+            await _actionBlock.Completion;
+            Debug.Assert(_actionBlock.InputCount == 0);
+            _hasher.TransformFinalBlock(new byte[0], 0, 0);
+            var hash=_hasher.Hash.ToHashString();
+            return hash;
+        }
+
+        public void Dispose()
+        {
+            Dispose(true);
+            GC.SuppressFinalize(this);
+        }
+
+        protected virtual void Dispose(bool disposing)
+        {
+            if (disposing && _hasher!=null)
+                _hasher.Dispose();
+            _hasher = null;            
+        }
+    }
+}
index 3db3267..ff26b82 100644 (file)
     <Compile Include="CloudFilesClient.cs" />
     <Compile Include="ContainerInfo.cs" />
     <Compile Include="ICloudClient.cs" />
+    <Compile Include="MD5BlockCalculator.cs" />
     <Compile Include="NoModificationInfo.cs" />
     <Compile Include="RestClient.cs">
       <SubType>Component</SubType>
index 7016d02..68709a4 100644 (file)
@@ -198,8 +198,10 @@ namespace Pithos.Network
             //Calculate the hash of all blocks using a blockhash iterator
             using (var stream = new FileStream(filePath, FileMode.Open, FileAccess.Read, FileShare.ReadWrite, blockSize, true))
             {
+                var md5 = new MD5BlockCalculator();
+                Action<long, byte[], int> postAction = md5.PostBlock;
                 //Calculate the blocks asyncrhonously
-                var hashes = BlockHashAlgorithms.CalculateBlockHashesInPlacePFor(stream, blockSize, algorithm, parallelism,progress).Result;                
+                var hashes = BlockHashAlgorithms.CalculateBlockHashesInPlacePFor(stream, blockSize, algorithm, parallelism,postAction, progress).Result;                
 
                 //And then proceed with creating and returning a TreeHash
                 var length = stream.Length;
@@ -213,9 +215,13 @@ namespace Pithos.Network
                 };
 
                 string fileHash;
+
+                var md5Hash=md5.GetHash().Result;
+/*
                 var hasher = HashAlgorithm.Create("MD5");
                 stream.Position = 0;
-                treeHash.MD5= hasher.ComputeHash(stream).ToHashString();
+*/
+                treeHash.MD5= md5Hash;
 
                 return treeHash;
             }