From: pkanavos Date: Mon, 25 Jun 2012 17:14:50 +0000 (+0300) Subject: Modified TreeHash calculations to compute MD5 in parallel with SHA block hashing X-Git-Url: https://code.grnet.gr/git/pithos-ms-client/commitdiff_plain/db63c1aef86cb63432c067f48424c25cb89a5e2a Modified TreeHash calculations to compute MD5 in parallel with SHA block hashing --- diff --git a/trunk/Pithos.Network/BlockHashAlgorithms.cs b/trunk/Pithos.Network/BlockHashAlgorithms.cs index 84f582f..13b203b 100644 --- a/trunk/Pithos.Network/BlockHashAlgorithms.cs +++ b/trunk/Pithos.Network/BlockHashAlgorithms.cs @@ -230,7 +230,7 @@ namespace Pithos.Network return _bufferMgr; } - public static async Task> CalculateBlockHashesInPlacePFor(FileStream stream, int blockSize, string algorithm, int parallelism,IProgress progress ) + public static async Task> CalculateBlockHashesInPlacePFor(FileStream stream, int blockSize, string algorithm, int parallelism, Action postAction, IProgress progress) { if (stream == null) throw new ArgumentNullException("stream"); @@ -273,12 +273,15 @@ namespace Pithos.Network int bufIdx = 0; long index = 0; + long block = 0; while ((read = await stream.ReadAsync(buffer[bufIdx], 0, blockSize).ConfigureAwait(false)) > 0) { index += read; indices[bufIdx] = index; bufferCount[bufIdx] = read; + postAction(block++, buffer[bufIdx], read); + //If we have filled the last buffer or if we have read from the last block, //we can calculate the clocks in parallel if (bufIdx == parallelism - 1 || read < blockSize) @@ -305,7 +308,7 @@ namespace Pithos.Network progress.Report((long)hashes.Count*blockSize*1.0/stream.Length); }); } - bufIdx = (bufIdx + 1)%parallelism; + bufIdx = (bufIdx +1)%parallelism; } } finally diff --git a/trunk/Pithos.Network/MD5BlockCalculator.cs b/trunk/Pithos.Network/MD5BlockCalculator.cs new file mode 100644 index 0000000..9b230a2 --- /dev/null +++ b/trunk/Pithos.Network/MD5BlockCalculator.cs @@ -0,0 +1,67 @@ +using System; +using System.Collections.Generic; +using System.Diagnostics; +using System.Linq; +using System.Security.Cryptography; +using System.Text; +using System.Threading; +using System.Threading.Tasks; +using System.Threading.Tasks.Dataflow; + +namespace Pithos.Network +{ + class MD5BlockCalculator:IDisposable + { + private HashAlgorithm _hasher = HashAlgorithm.Create("md5"); + + private ActionBlock> _actionBlock; + + private long _currentBlock = 0; + + public MD5BlockCalculator() + { + _actionBlock=new ActionBlock>(t=> ProcessBlock(t)); + } + + private void ProcessBlock(Tuple tuple) + { + if (tuple.Item1 == _currentBlock) + { + _hasher.TransformBlock(tuple.Item2, 0, tuple.Item3, null, 0); + Interlocked.Increment(ref _currentBlock); + } + else + { + _actionBlock.Post(tuple); + } + } + + public void PostBlock(long blockIndex,byte[] buffer,int size) + { + _actionBlock.Post(Tuple.Create(blockIndex, buffer, size)); + } + + public async Task GetHash() + { + _actionBlock.Complete(); + await _actionBlock.Completion; + Debug.Assert(_actionBlock.InputCount == 0); + _hasher.TransformFinalBlock(new byte[0], 0, 0); + var hash=_hasher.Hash.ToHashString(); + return hash; + } + + public void Dispose() + { + Dispose(true); + GC.SuppressFinalize(this); + } + + protected virtual void Dispose(bool disposing) + { + if (disposing && _hasher!=null) + _hasher.Dispose(); + _hasher = null; + } + } +} diff --git a/trunk/Pithos.Network/Pithos.Network.csproj b/trunk/Pithos.Network/Pithos.Network.csproj index 3db3267..ff26b82 100644 --- a/trunk/Pithos.Network/Pithos.Network.csproj +++ b/trunk/Pithos.Network/Pithos.Network.csproj @@ -233,6 +233,7 @@ + Component diff --git a/trunk/Pithos.Network/Signature.cs b/trunk/Pithos.Network/Signature.cs index 7016d02..68709a4 100644 --- a/trunk/Pithos.Network/Signature.cs +++ b/trunk/Pithos.Network/Signature.cs @@ -198,8 +198,10 @@ namespace Pithos.Network //Calculate the hash of all blocks using a blockhash iterator using (var stream = new FileStream(filePath, FileMode.Open, FileAccess.Read, FileShare.ReadWrite, blockSize, true)) { + var md5 = new MD5BlockCalculator(); + Action postAction = md5.PostBlock; //Calculate the blocks asyncrhonously - var hashes = BlockHashAlgorithms.CalculateBlockHashesInPlacePFor(stream, blockSize, algorithm, parallelism,progress).Result; + var hashes = BlockHashAlgorithms.CalculateBlockHashesInPlacePFor(stream, blockSize, algorithm, parallelism,postAction, progress).Result; //And then proceed with creating and returning a TreeHash var length = stream.Length; @@ -213,9 +215,13 @@ namespace Pithos.Network }; string fileHash; + + var md5Hash=md5.GetHash().Result; +/* var hasher = HashAlgorithm.Create("MD5"); stream.Position = 0; - treeHash.MD5= hasher.ComputeHash(stream).ToHashString(); +*/ + treeHash.MD5= md5Hash; return treeHash; }