using System.Threading.Tasks;\r
using System.Threading.Tasks.Dataflow;\r
\r
+\r
namespace Pithos.Network\r
{\r
using System;\r
return _bufferMgr;\r
}\r
\r
- public static async Task<ConcurrentDictionary<long, byte[]>> CalculateBlockHashesInPlacePFor(FileStream stream, int blockSize, string algorithm, int parallelism)\r
+ //public static async Task<ConcurrentDictionary<long, byte[]>> CalculateBlockHashesInPlacePFor(FileStream stream, int blockSize, string algorithm, int parallelism, Action<long, byte[], int> postAction,CancellationToken token, IProgress<double> progress)\r
+ public static async Task<ConcurrentDictionary<long, byte[]>> CalculateBlockHashesInPlacePFor(FileStream stream, int blockSize, string algorithm, byte parallelism, CancellationToken token, IProgress<HashProgress> progress)\r
{\r
if (stream == null)\r
throw new ArgumentNullException("stream");\r
var size = stream.Length;\r
Log.DebugFormat("Hashing [{0}] size [{1}]", path, size);\r
\r
+ //TODO: Handle zero-length files\r
+ if (size == 0)\r
+ {\r
+ var buf = new byte[0];\r
+ using (var hasher = HashAlgorithm.Create(algorithm))\r
+ {\r
+ hashes[0] = hasher.ComputeHash(buf);\r
+ return hashes;\r
+ }\r
+ }\r
+\r
+ var blocks = size<blockSize?1:size/blockSize;\r
\r
- var buffer = new byte[parallelism][];\r
- var hashers = new HashAlgorithm[parallelism];\r
- var bufferManager = GetBufferManager(blockSize, parallelism);\r
- for (var i = 0; i < parallelism; i++)\r
+ var actualHashers = parallelism > blocks ? (byte)blocks : parallelism;\r
+\r
+ var buffer = new byte[actualHashers][];\r
+ var hashers = new HashAlgorithm[actualHashers];\r
+ var bufferManager = GetBufferManager(blockSize, actualHashers);\r
+ for (var i = 0; i < actualHashers; i++)\r
{\r
buffer[i] = bufferManager.TakeBuffer(blockSize);// new byte[blockSize];\r
- hashers[i] = HashAlgorithm.Create(algorithm);\r
+ hashers[i] = HashAlgorithm.Create(algorithm); \r
}\r
try\r
{\r
- var indices = new long[parallelism];\r
- var bufferCount = new int[parallelism];\r
+ var indices = new long[actualHashers];\r
+ var bufferCount = new int[actualHashers];\r
\r
int read;\r
int bufIdx = 0;\r
long index = 0;\r
- while ((read = await stream.ReadAsync(buffer[bufIdx], 0, blockSize)) > 0)\r
+\r
+ //long block = 0;\r
+\r
+ while ((read = await stream.ReadAsync(buffer[bufIdx], 0, blockSize).ConfigureAwait(false)) > 0)\r
{\r
index += read;\r
indices[bufIdx] = index;\r
bufferCount[bufIdx] = read;\r
+ //postAction(block++, buffer[bufIdx], read);\r
+\r
//If we have filled the last buffer or if we have read from the last block,\r
//we can calculate the clocks in parallel\r
- if (bufIdx == parallelism - 1 || read < blockSize)\r
+ if (bufIdx == actualHashers - 1 || read < blockSize)\r
{\r
- //var options = new ParallelOptions {MaxDegreeOfParallelism = parallelism};\r
- Parallel.For(0, bufIdx + 1, idx =>\r
- {\r
- //This code was added for compatibility with the way Pithos calculates the last hash\r
- //We calculate the hash only up to the last non-null byte\r
- var lastByteIndex = Array.FindLastIndex(buffer[idx],\r
- bufferCount[idx] - 1,\r
- aByte => aByte != 0);\r
-\r
- var hasher = hashers[idx];\r
- var hash = hasher.ComputeHash(buffer[idx], 0, lastByteIndex + 1);\r
- var filePosition = indices[idx];\r
- /*\r
+ var options = new ParallelOptions {MaxDegreeOfParallelism = actualHashers,CancellationToken=token};\r
+\r
+ Parallel.For(0, bufIdx + 1, options,(idx,state) =>\r
+ {\r
+ //This code was added for compatibility with the way Pithos calculates the last hash\r
+ //We calculate the hash only up to the last non-null byte\r
+ options.CancellationToken.ThrowIfCancellationRequested();\r
+ var lastByteIndex = Array.FindLastIndex(buffer[idx],\r
+ bufferCount[idx] - 1,\r
+ aByte => aByte != 0);\r
+\r
+ var hasher = hashers[idx];\r
+\r
+ byte[] hash=hasher.ComputeHash(buffer[idx],0,lastByteIndex+1);\r
+/*\r
+ if (buffer[idx].Length == lastByteIndex || lastByteIndex==-1)\r
+ hash = hasher.Digest(buffer[idx]);\r
+ else\r
+ {\r
+ var buf=new byte[lastByteIndex+1];\r
+ Buffer.BlockCopy(buffer[idx],0,buf,0,lastByteIndex+1);\r
+ hash = hasher.Digest(buf);\r
+ }\r
+*/\r
+\r
+ \r
+ \r
+ var filePosition = indices[idx];\r
+ /*\r
Trace.TraceInformation("Hashed [{0}] [{1}/{2}] [{3:p}]", path,\r
filePosition, size,\r
(double)filePosition / size);\r
*/\r
- hashes[filePosition] = hash;\r
- });\r
+ hashes[filePosition] = hash;\r
+ //Do not report for files smaller than 4MB\r
+ if (progress != null && stream.Length > 4*1024*1024)\r
+ progress.Report(new HashProgress((long)hashes.Count*blockSize,stream.Length));\r
+ });\r
}\r
- bufIdx = (bufIdx + 1) % parallelism;\r
+ bufIdx = (bufIdx +1)%actualHashers;\r
}\r
}\r
finally\r
{\r
- for (var i = 0; i < parallelism; i++)\r
+ for (var i = 0; i < actualHashers; i++)\r
{\r
if (hashers[i] != null)\r
hashers[i].Dispose();\r