return _bufferMgr;\r
}\r
\r
- public static async Task<ConcurrentDictionary<long, byte[]>> CalculateBlockHashesInPlacePFor(FileStream stream, int blockSize, string algorithm, int parallelism,IProgress<double> progress )\r
+ public static async Task<ConcurrentDictionary<long, byte[]>> CalculateBlockHashesInPlacePFor(FileStream stream, int blockSize, string algorithm, int parallelism, Action<long, byte[], int> postAction, IProgress<double> progress)\r
{\r
if (stream == null)\r
throw new ArgumentNullException("stream");\r
int bufIdx = 0;\r
long index = 0;\r
\r
+ long block = 0;\r
\r
while ((read = await stream.ReadAsync(buffer[bufIdx], 0, blockSize).ConfigureAwait(false)) > 0)\r
{\r
index += read;\r
indices[bufIdx] = index;\r
bufferCount[bufIdx] = read;\r
+ postAction(block++, buffer[bufIdx], read);\r
+\r
//If we have filled the last buffer or if we have read from the last block,\r
//we can calculate the clocks in parallel\r
if (bufIdx == parallelism - 1 || read < blockSize)\r
progress.Report((long)hashes.Count*blockSize*1.0/stream.Length);\r
});\r
}\r
- bufIdx = (bufIdx + 1)%parallelism;\r
+ bufIdx = (bufIdx +1)%parallelism;\r
}\r
}\r
finally\r
--- /dev/null
+using System;
+using System.Collections.Generic;
+using System.Diagnostics;
+using System.Linq;
+using System.Security.Cryptography;
+using System.Text;
+using System.Threading;
+using System.Threading.Tasks;
+using System.Threading.Tasks.Dataflow;
+
+namespace Pithos.Network
+{
+ class MD5BlockCalculator:IDisposable
+ {
+ private HashAlgorithm _hasher = HashAlgorithm.Create("md5");
+
+ private ActionBlock<Tuple<long, byte[],int>> _actionBlock;
+
+ private long _currentBlock = 0;
+
+ public MD5BlockCalculator()
+ {
+ _actionBlock=new ActionBlock<Tuple<long, byte[],int>>(t=> ProcessBlock(t));
+ }
+
+ private void ProcessBlock(Tuple<long,byte[],int> tuple)
+ {
+ if (tuple.Item1 == _currentBlock)
+ {
+ _hasher.TransformBlock(tuple.Item2, 0, tuple.Item3, null, 0);
+ Interlocked.Increment(ref _currentBlock);
+ }
+ else
+ {
+ _actionBlock.Post(tuple);
+ }
+ }
+
+ public void PostBlock(long blockIndex,byte[] buffer,int size)
+ {
+ _actionBlock.Post(Tuple.Create(blockIndex, buffer, size));
+ }
+
+ public async Task<string> GetHash()
+ {
+ _actionBlock.Complete();
+ await _actionBlock.Completion;
+ Debug.Assert(_actionBlock.InputCount == 0);
+ _hasher.TransformFinalBlock(new byte[0], 0, 0);
+ var hash=_hasher.Hash.ToHashString();
+ return hash;
+ }
+
+ public void Dispose()
+ {
+ Dispose(true);
+ GC.SuppressFinalize(this);
+ }
+
+ protected virtual void Dispose(bool disposing)
+ {
+ if (disposing && _hasher!=null)
+ _hasher.Dispose();
+ _hasher = null;
+ }
+ }
+}
<Compile Include="CloudFilesClient.cs" />
<Compile Include="ContainerInfo.cs" />
<Compile Include="ICloudClient.cs" />
+ <Compile Include="MD5BlockCalculator.cs" />
<Compile Include="NoModificationInfo.cs" />
<Compile Include="RestClient.cs">
<SubType>Component</SubType>
//Calculate the hash of all blocks using a blockhash iterator
using (var stream = new FileStream(filePath, FileMode.Open, FileAccess.Read, FileShare.ReadWrite, blockSize, true))
{
+ var md5 = new MD5BlockCalculator();
+ Action<long, byte[], int> postAction = md5.PostBlock;
//Calculate the blocks asyncrhonously
- var hashes = BlockHashAlgorithms.CalculateBlockHashesInPlacePFor(stream, blockSize, algorithm, parallelism,progress).Result;
+ var hashes = BlockHashAlgorithms.CalculateBlockHashesInPlacePFor(stream, blockSize, algorithm, parallelism,postAction, progress).Result;
//And then proceed with creating and returning a TreeHash
var length = stream.Length;
};
string fileHash;
+
+ var md5Hash=md5.GetHash().Result;
+/*
var hasher = HashAlgorithm.Create("MD5");
stream.Position = 0;
- treeHash.MD5= hasher.ComputeHash(stream).ToHashString();
+*/
+ treeHash.MD5= md5Hash;
return treeHash;
}