-// -----------------------------------------------------------------------\r
-// <copyright file="BlockHashAlgorithms.cs" company="Microsoft">\r
-// TODO: Update copyright text.\r
-// </copyright>\r
-// -----------------------------------------------------------------------\r
-\r
+#region\r
+/* -----------------------------------------------------------------------\r
+ * <copyright file="BlockHashAlgorithms.cs" company="GRNet">\r
+ * \r
+ * Copyright 2011-2012 GRNET S.A. All rights reserved.\r
+ *\r
+ * Redistribution and use in source and binary forms, with or\r
+ * without modification, are permitted provided that the following\r
+ * conditions are met:\r
+ *\r
+ * 1. Redistributions of source code must retain the above\r
+ * copyright notice, this list of conditions and the following\r
+ * disclaimer.\r
+ *\r
+ * 2. Redistributions in binary form must reproduce the above\r
+ * copyright notice, this list of conditions and the following\r
+ * disclaimer in the documentation and/or other materials\r
+ * provided with the distribution.\r
+ *\r
+ *\r
+ * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS\r
+ * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED\r
+ * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR\r
+ * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR\r
+ * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,\r
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT\r
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF\r
+ * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED\r
+ * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT\r
+ * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN\r
+ * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE\r
+ * POSSIBILITY OF SUCH DAMAGE.\r
+ *\r
+ * The views and conclusions contained in the software and\r
+ * documentation are those of the authors and should not be\r
+ * interpreted as representing official policies, either expressed\r
+ * or implied, of GRNET S.A.\r
+ * </copyright>\r
+ * -----------------------------------------------------------------------\r
+ */\r
+#endregion\r
using System.Collections.Concurrent;\r
using System.Diagnostics.Contracts;\r
using System.IO;\r
+using System.Reflection;\r
using System.Security.Cryptography;\r
+using System.ServiceModel.Channels;\r
+using System.Threading;\r
using System.Threading.Tasks;\r
using System.Threading.Tasks.Dataflow;\r
\r
+\r
namespace Pithos.Network\r
{\r
using System;\r
- using System.Collections.Generic;\r
- using System.Linq;\r
- using System.Text;\r
\r
/// <summary>\r
/// TODO: Update summary.\r
/// </summary>\r
public static class BlockHashAlgorithms\r
{\r
+ private static readonly log4net.ILog Log = log4net.LogManager.GetLogger(MethodBase.GetCurrentMethod().DeclaringType);\r
+\r
+/*\r
public static Func<FileStream, int, string, ConcurrentDictionary<int, byte[]>, int, Task<ConcurrentDictionary<int, byte[]>>> CalculateBlockHash;\r
\r
public static Task<ConcurrentDictionary<int, byte[]>> CalculateBlockHashesRecursiveAsync(FileStream stream, int blockSize, string algorithm, ConcurrentDictionary<int, byte[]> hashes = null, int index = 0)\r
int read;\r
while ((read = await stream.ReadAsync(buffer, 0, blockSize)) > 0)\r
{\r
- //TODO: identify the value of index\r
-\r
using (var hasher = HashAlgorithm.Create(algorithm))\r
{\r
//This code was added for compatibility with the way Pithos calculates the last hash\r
hashes[index] = hash;\r
}\r
index += read;\r
- };\r
+ }\r
return hashes;\r
}\r
\r
\r
var hashes = new ConcurrentDictionary<int, byte[]>();\r
\r
+ var path = stream.Name;\r
+ var size = stream.Length;\r
+ Log.DebugFormat("Hashing [{0}] size [{1}]",path,size);\r
+ \r
var options = new ExecutionDataflowBlockOptions {BoundedCapacity = parallelism,MaxDegreeOfParallelism=parallelism};\r
var hashBlock=new ActionBlock<Tuple<int,byte[]>>(input=>\r
{\r
while ((read = await stream.ReadAsync(buffer, 0, blockSize)) > 0)\r
{\r
var block = new byte[read];\r
- Buffer.BlockCopy(buffer,0,block,0,read);\r
+ Buffer.BlockCopy(buffer, 0, block, 0, read);\r
await hashBlock.SendAsync(Tuple.Create(index, block));\r
index += read;\r
- };\r
+ }\r
+ \r
\r
hashBlock.Complete();\r
await hashBlock.Completion;\r
\r
return hashes;\r
+ } \r
+ \r
+ public static async Task<ConcurrentDictionary<int, byte[]>> CalculateBlockHashesInPlace(FileStream stream, int blockSize, string algorithm, int parallelism)\r
+ {\r
+ if (stream == null)\r
+ throw new ArgumentNullException("stream");\r
+ if (String.IsNullOrWhiteSpace(algorithm))\r
+ throw new ArgumentNullException("algorithm");\r
+ if (blockSize <= 0)\r
+ throw new ArgumentOutOfRangeException("blockSize", "blockSize must be a value greater than zero ");\r
+ Contract.EndContractBlock();\r
+\r
+ var hashes = new ConcurrentDictionary<int, byte[]>();\r
+\r
+ var path = stream.Name;\r
+ var size = stream.Length;\r
+ Log.DebugFormat("Hashing [{0}] size [{1}]",path,size);\r
+ \r
+\r
+ var buffer = new byte[blockSize];\r
+ var index = 0;\r
+ using (var hasher = HashAlgorithm.Create(algorithm))\r
+ {\r
+ int read;\r
+ while ((read = await stream.ReadAsync(buffer, 0, blockSize)) > 0)\r
+ {\r
+ //This code was added for compatibility with the way Pithos calculates the last hash\r
+ //We calculate the hash only up to the last non-null byte\r
+ var lastByteIndex = Array.FindLastIndex(buffer, read - 1, aByte => aByte != 0);\r
+\r
+ var hash = hasher.ComputeHash(buffer, 0, lastByteIndex + 1);\r
+ Log.DebugFormat("Hashed [{0}] [{1}/{2}] [{3:p}]", path, index,size,(double)index/size);\r
+ hashes[index] = hash;\r
+ index += read;\r
+ }\r
+ }\r
+ return hashes;\r
+ }\r
+*/\r
+\r
+ private static BufferManager _bufferMgr;\r
+\r
+ private static BufferManager GetBufferManager(int blockSize,int parallelism)\r
+ {\r
+ Interlocked.CompareExchange(ref _bufferMgr,\r
+ BufferManager.CreateBufferManager(parallelism*blockSize, blockSize),\r
+ null);\r
+ return _bufferMgr;\r
+ }\r
+\r
+ //public static async Task<ConcurrentDictionary<long, byte[]>> CalculateBlockHashesInPlacePFor(FileStream stream, int blockSize, string algorithm, int parallelism, Action<long, byte[], int> postAction,CancellationToken token, IProgress<double> progress)\r
+ public static async Task<ConcurrentDictionary<long, byte[]>> CalculateBlockHashesInPlacePFor(FileStream stream, int blockSize, string algorithm, byte parallelism, CancellationToken token, IProgress<HashProgress> progress)\r
+ {\r
+ if (stream == null)\r
+ throw new ArgumentNullException("stream");\r
+ if (String.IsNullOrWhiteSpace(algorithm))\r
+ throw new ArgumentNullException("algorithm");\r
+ if (blockSize <= 0)\r
+ throw new ArgumentOutOfRangeException("blockSize", "blockSize must be a value greater than zero ");\r
+ Contract.EndContractBlock();\r
+\r
+ var hashes = new ConcurrentDictionary<long, byte[]>();\r
+\r
+ var path = stream.Name;\r
+ var size = stream.Length;\r
+ Log.DebugFormat("Hashing [{0}] size [{1}]", path, size);\r
+\r
+ //TODO: Handle zero-length files\r
+ if (size == 0)\r
+ {\r
+ var buf = new byte[0];\r
+ using (var hasher = HashAlgorithm.Create(algorithm))\r
+ {\r
+ hashes[0] = hasher.ComputeHash(buf);\r
+ return hashes;\r
+ }\r
+ }\r
+\r
+ var blocks = size<blockSize?1:size/blockSize;\r
+\r
+ var actualHashers = parallelism > blocks ? (byte)blocks : parallelism;\r
+\r
+ var buffer = new byte[actualHashers][];\r
+ var hashers = new HashAlgorithm[actualHashers];\r
+ var bufferManager = GetBufferManager(blockSize, actualHashers);\r
+ for (var i = 0; i < actualHashers; i++)\r
+ {\r
+ buffer[i] = bufferManager.TakeBuffer(blockSize);// new byte[blockSize];\r
+ hashers[i] = HashAlgorithm.Create(algorithm); \r
+ }\r
+ try\r
+ {\r
+ var indices = new long[actualHashers];\r
+ var bufferCount = new int[actualHashers];\r
+\r
+ int read;\r
+ int bufIdx = 0;\r
+ long index = 0;\r
+\r
+ //long block = 0;\r
+\r
+ while ((read = await stream.ReadAsync(buffer[bufIdx], 0, blockSize).ConfigureAwait(false)) > 0)\r
+ {\r
+ index += read;\r
+ indices[bufIdx] = index;\r
+ bufferCount[bufIdx] = read;\r
+ //postAction(block++, buffer[bufIdx], read);\r
+\r
+ //If we have filled the last buffer or if we have read from the last block,\r
+ //we can calculate the clocks in parallel\r
+ if (bufIdx == actualHashers - 1 || read < blockSize)\r
+ {\r
+ var options = new ParallelOptions {MaxDegreeOfParallelism = actualHashers,CancellationToken=token};\r
+\r
+ Parallel.For(0, bufIdx + 1, options,(idx,state) =>\r
+ {\r
+ //This code was added for compatibility with the way Pithos calculates the last hash\r
+ //We calculate the hash only up to the last non-null byte\r
+ options.CancellationToken.ThrowIfCancellationRequested();\r
+ var lastByteIndex = Array.FindLastIndex(buffer[idx],\r
+ bufferCount[idx] - 1,\r
+ aByte => aByte != 0);\r
+\r
+ var hasher = hashers[idx];\r
+\r
+ byte[] hash=hasher.ComputeHash(buffer[idx],0,lastByteIndex+1);\r
+/*\r
+ if (buffer[idx].Length == lastByteIndex || lastByteIndex==-1)\r
+ hash = hasher.Digest(buffer[idx]);\r
+ else\r
+ {\r
+ var buf=new byte[lastByteIndex+1];\r
+ Buffer.BlockCopy(buffer[idx],0,buf,0,lastByteIndex+1);\r
+ hash = hasher.Digest(buf);\r
+ }\r
+*/\r
+\r
+ \r
+ \r
+ var filePosition = indices[idx];\r
+ /*\r
+ Trace.TraceInformation("Hashed [{0}] [{1}/{2}] [{3:p}]", path,\r
+ filePosition, size,\r
+ (double)filePosition / size);\r
+ */\r
+ hashes[filePosition] = hash;\r
+ //Do not report for files smaller than 4MB\r
+ if (progress != null && stream.Length > 4*1024*1024)\r
+ progress.Report(new HashProgress((long)hashes.Count*blockSize,stream.Length));\r
+ });\r
+ }\r
+ bufIdx = (bufIdx +1)%actualHashers;\r
+ }\r
+ }\r
+ finally\r
+ {\r
+ for (var i = 0; i < actualHashers; i++)\r
+ {\r
+ if (hashers[i] != null)\r
+ hashers[i].Dispose();\r
+ bufferManager.ReturnBuffer(buffer[i]);\r
+ }\r
+\r
+ }\r
+\r
+ return hashes;\r
}\r
\r
static BlockHashAlgorithms()\r
{\r
+/*\r
CalculateBlockHash = CalculateBlockHashesRecursiveAsync;\r
+*/\r
}\r
}\r
}\r