2 /* -----------------------------------------------------------------------
\r
3 * <copyright file="BlockHashAlgorithms.cs" company="GRNet">
\r
5 * Copyright 2011-2012 GRNET S.A. All rights reserved.
\r
7 * Redistribution and use in source and binary forms, with or
\r
8 * without modification, are permitted provided that the following
\r
9 * conditions are met:
\r
11 * 1. Redistributions of source code must retain the above
\r
12 * copyright notice, this list of conditions and the following
\r
15 * 2. Redistributions in binary form must reproduce the above
\r
16 * copyright notice, this list of conditions and the following
\r
17 * disclaimer in the documentation and/or other materials
\r
18 * provided with the distribution.
\r
21 * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
\r
22 * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
\r
23 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
\r
24 * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
\r
25 * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
\r
26 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
\r
27 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
\r
28 * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
\r
29 * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
\r
30 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
\r
31 * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
\r
32 * POSSIBILITY OF SUCH DAMAGE.
\r
34 * The views and conclusions contained in the software and
\r
35 * documentation are those of the authors and should not be
\r
36 * interpreted as representing official policies, either expressed
\r
37 * or implied, of GRNET S.A.
\r
39 * -----------------------------------------------------------------------
\r
42 using System.Collections.Concurrent;
\r
43 using System.Diagnostics.Contracts;
\r
45 using System.Reflection;
\r
46 using System.Security.Cryptography;
\r
47 using System.ServiceModel.Channels;
\r
48 using System.Threading;
\r
49 using System.Threading.Tasks;
\r
50 using System.Threading.Tasks.Dataflow;
\r
53 namespace Pithos.Network
\r
58 /// TODO: Update summary.
\r
60 public static class BlockHashAlgorithms
\r
62 private static readonly log4net.ILog Log = log4net.LogManager.GetLogger(MethodBase.GetCurrentMethod().DeclaringType);
\r
65 public static Func<FileStream, int, string, ConcurrentDictionary<int, byte[]>, int, Task<ConcurrentDictionary<int, byte[]>>> CalculateBlockHash;
\r
67 public static Task<ConcurrentDictionary<int, byte[]>> CalculateBlockHashesRecursiveAsync(FileStream stream, int blockSize, string algorithm, ConcurrentDictionary<int, byte[]> hashes = null, int index = 0)
\r
70 throw new ArgumentNullException("stream");
\r
71 if (String.IsNullOrWhiteSpace(algorithm))
\r
72 throw new ArgumentNullException("algorithm");
\r
74 throw new ArgumentOutOfRangeException("blockSize", "blockSize must be a value greater than zero ");
\r
76 throw new ArgumentOutOfRangeException("index", "index must be a non-negative value");
\r
77 Contract.EndContractBlock();
\r
81 hashes = new ConcurrentDictionary<int, byte[]>();
\r
83 var buffer = new byte[blockSize];
\r
84 return stream.ReadAsync(buffer, 0, blockSize).ContinueWith(t =>
\r
86 var read = t.Result;
\r
88 var nextTask = read == blockSize
\r
89 ? CalculateBlockHashesRecursiveAsync(stream, blockSize, algorithm, hashes, index + 1)
\r
90 : Task.Factory.StartNew(() => hashes);
\r
92 using (var hasher = HashAlgorithm.Create(algorithm))
\r
94 //This code was added for compatibility with the way Pithos calculates the last hash
\r
95 //We calculate the hash only up to the last non-null byte
\r
96 var lastByteIndex = Array.FindLastIndex(buffer, read - 1, aByte => aByte != 0);
\r
98 var hash = hasher.ComputeHash(buffer, 0, lastByteIndex + 1);
\r
99 hashes[index] = hash;
\r
104 public static async Task<ConcurrentDictionary<int, byte[]>> CalculateBlockHashesAsync(FileStream stream, int blockSize, string algorithm, ConcurrentDictionary<int, byte[]> hashes = null, int index = 0)
\r
106 if (stream == null)
\r
107 throw new ArgumentNullException("stream");
\r
108 if (String.IsNullOrWhiteSpace(algorithm))
\r
109 throw new ArgumentNullException("algorithm");
\r
110 if (blockSize <= 0)
\r
111 throw new ArgumentOutOfRangeException("blockSize", "blockSize must be a value greater than zero ");
\r
112 Contract.EndContractBlock();
\r
115 if (hashes == null)
\r
116 hashes = new ConcurrentDictionary<int, byte[]>();
\r
118 var buffer = new byte[blockSize];
\r
120 while ((read = await stream.ReadAsync(buffer, 0, blockSize)) > 0)
\r
122 using (var hasher = HashAlgorithm.Create(algorithm))
\r
124 //This code was added for compatibility with the way Pithos calculates the last hash
\r
125 //We calculate the hash only up to the last non-null byte
\r
126 var lastByteIndex = Array.FindLastIndex(buffer, read - 1, aByte => aByte != 0);
\r
128 var hash = hasher.ComputeHash(buffer, 0, lastByteIndex + 1);
\r
129 hashes[index] = hash;
\r
136 public static async Task<ConcurrentDictionary<int, byte[]>> CalculateBlockHashesAgentAsync(FileStream stream, int blockSize, string algorithm, int parallelism)
\r
138 if (stream == null)
\r
139 throw new ArgumentNullException("stream");
\r
140 if (String.IsNullOrWhiteSpace(algorithm))
\r
141 throw new ArgumentNullException("algorithm");
\r
142 if (blockSize <= 0)
\r
143 throw new ArgumentOutOfRangeException("blockSize", "blockSize must be a value greater than zero ");
\r
144 Contract.EndContractBlock();
\r
146 var hashes = new ConcurrentDictionary<int, byte[]>();
\r
148 var path = stream.Name;
\r
149 var size = stream.Length;
\r
150 Log.DebugFormat("Hashing [{0}] size [{1}]",path,size);
\r
152 var options = new ExecutionDataflowBlockOptions {BoundedCapacity = parallelism,MaxDegreeOfParallelism=parallelism};
\r
153 var hashBlock=new ActionBlock<Tuple<int,byte[]>>(input=>
\r
155 int idx = input.Item1;
\r
156 byte[] block = input.Item2;
\r
157 using (var hasher = HashAlgorithm.Create(algorithm))
\r
159 //This code was added for compatibility with the way Pithos calculates the last hash
\r
160 //We calculate the hash only up to the last non-null byte
\r
161 var lastByteIndex = Array.FindLastIndex(block, block.Length-1, aByte => aByte != 0);
\r
163 var hash = hasher.ComputeHash(block, 0, lastByteIndex + 1);
\r
164 hashes[idx] = hash;
\r
168 var buffer = new byte[blockSize];
\r
171 while ((read = await stream.ReadAsync(buffer, 0, blockSize)) > 0)
\r
173 var block = new byte[read];
\r
174 Buffer.BlockCopy(buffer, 0, block, 0, read);
\r
175 await hashBlock.SendAsync(Tuple.Create(index, block));
\r
180 hashBlock.Complete();
\r
181 await hashBlock.Completion;
\r
186 public static async Task<ConcurrentDictionary<int, byte[]>> CalculateBlockHashesInPlace(FileStream stream, int blockSize, string algorithm, int parallelism)
\r
188 if (stream == null)
\r
189 throw new ArgumentNullException("stream");
\r
190 if (String.IsNullOrWhiteSpace(algorithm))
\r
191 throw new ArgumentNullException("algorithm");
\r
192 if (blockSize <= 0)
\r
193 throw new ArgumentOutOfRangeException("blockSize", "blockSize must be a value greater than zero ");
\r
194 Contract.EndContractBlock();
\r
196 var hashes = new ConcurrentDictionary<int, byte[]>();
\r
198 var path = stream.Name;
\r
199 var size = stream.Length;
\r
200 Log.DebugFormat("Hashing [{0}] size [{1}]",path,size);
\r
203 var buffer = new byte[blockSize];
\r
205 using (var hasher = HashAlgorithm.Create(algorithm))
\r
208 while ((read = await stream.ReadAsync(buffer, 0, blockSize)) > 0)
\r
210 //This code was added for compatibility with the way Pithos calculates the last hash
\r
211 //We calculate the hash only up to the last non-null byte
\r
212 var lastByteIndex = Array.FindLastIndex(buffer, read - 1, aByte => aByte != 0);
\r
214 var hash = hasher.ComputeHash(buffer, 0, lastByteIndex + 1);
\r
215 Log.DebugFormat("Hashed [{0}] [{1}/{2}] [{3:p}]", path, index,size,(double)index/size);
\r
216 hashes[index] = hash;
\r
224 private static BufferManager _bufferMgr;
\r
226 private static BufferManager GetBufferManager(int blockSize,int parallelism)
\r
228 Interlocked.CompareExchange(ref _bufferMgr,
\r
229 BufferManager.CreateBufferManager(parallelism*blockSize, blockSize),
\r
234 //public static async Task<ConcurrentDictionary<long, byte[]>> CalculateBlockHashesInPlacePFor(FileStream stream, int blockSize, string algorithm, int parallelism, Action<long, byte[], int> postAction,CancellationToken token, IProgress<double> progress)
\r
235 public static async Task<ConcurrentDictionary<long, byte[]>> CalculateBlockHashesInPlacePFor(FileStream stream, int blockSize, string algorithm, byte parallelism, CancellationToken token, IProgress<HashProgress> progress)
\r
237 if (stream == null)
\r
238 throw new ArgumentNullException("stream");
\r
239 if (String.IsNullOrWhiteSpace(algorithm))
\r
240 throw new ArgumentNullException("algorithm");
\r
241 if (blockSize <= 0)
\r
242 throw new ArgumentOutOfRangeException("blockSize", "blockSize must be a value greater than zero ");
\r
243 Contract.EndContractBlock();
\r
245 var hashes = new ConcurrentDictionary<long, byte[]>();
\r
247 var path = stream.Name;
\r
248 var size = stream.Length;
\r
249 Log.DebugFormat("Hashing [{0}] size [{1}]", path, size);
\r
251 //TODO: Handle zero-length files
\r
254 var buf = new byte[0];
\r
255 using (var hasher = HashAlgorithm.Create(algorithm))
\r
257 hashes[0] = hasher.ComputeHash(buf);
\r
262 var blocks = size<blockSize?1:size/blockSize;
\r
264 var actualHashers = parallelism > blocks ? (byte)blocks : parallelism;
\r
266 var buffer = new byte[actualHashers][];
\r
267 var hashers = new HashAlgorithm[actualHashers];
\r
268 var bufferManager = GetBufferManager(blockSize, actualHashers);
\r
269 for (var i = 0; i < actualHashers; i++)
\r
271 buffer[i] = bufferManager.TakeBuffer(blockSize);// new byte[blockSize];
\r
272 hashers[i] = HashAlgorithm.Create(algorithm);
\r
276 var indices = new long[actualHashers];
\r
277 var bufferCount = new int[actualHashers];
\r
285 while ((read = await stream.ReadAsync(buffer[bufIdx], 0, blockSize).ConfigureAwait(false)) > 0)
\r
288 indices[bufIdx] = index;
\r
289 bufferCount[bufIdx] = read;
\r
290 //postAction(block++, buffer[bufIdx], read);
\r
292 //If we have filled the last buffer or if we have read from the last block,
\r
293 //we can calculate the clocks in parallel
\r
294 if (bufIdx == actualHashers - 1 || read < blockSize)
\r
296 var options = new ParallelOptions {MaxDegreeOfParallelism = actualHashers,CancellationToken=token};
\r
298 Parallel.For(0, bufIdx + 1, options,(idx,state) =>
\r
300 //This code was added for compatibility with the way Pithos calculates the last hash
\r
301 //We calculate the hash only up to the last non-null byte
\r
302 options.CancellationToken.ThrowIfCancellationRequested();
\r
303 var lastByteIndex = Array.FindLastIndex(buffer[idx],
\r
304 bufferCount[idx] - 1,
\r
305 aByte => aByte != 0);
\r
307 var hasher = hashers[idx];
\r
309 byte[] hash=hasher.ComputeHash(buffer[idx],0,lastByteIndex+1);
\r
311 if (buffer[idx].Length == lastByteIndex || lastByteIndex==-1)
\r
312 hash = hasher.Digest(buffer[idx]);
\r
315 var buf=new byte[lastByteIndex+1];
\r
316 Buffer.BlockCopy(buffer[idx],0,buf,0,lastByteIndex+1);
\r
317 hash = hasher.Digest(buf);
\r
323 var filePosition = indices[idx];
\r
325 Trace.TraceInformation("Hashed [{0}] [{1}/{2}] [{3:p}]", path,
\r
326 filePosition, size,
\r
327 (double)filePosition / size);
\r
329 hashes[filePosition] = hash;
\r
330 //Do not report for files smaller than 4MB
\r
331 if (progress != null && stream.Length > 4*1024*1024)
\r
332 progress.Report(new HashProgress((long)hashes.Count*blockSize,stream.Length));
\r
335 bufIdx = (bufIdx +1)%actualHashers;
\r
340 for (var i = 0; i < actualHashers; i++)
\r
342 if (hashers[i] != null)
\r
343 hashers[i].Dispose();
\r
344 bufferManager.ReturnBuffer(buffer[i]);
\r
352 static BlockHashAlgorithms()
\r
355 CalculateBlockHash = CalculateBlockHashesRecursiveAsync;
\r