2 /* -----------------------------------------------------------------------
\r
3 * <copyright file="BlockHashAlgorithms.cs" company="GRNet">
\r
5 * Copyright 2011-2012 GRNET S.A. All rights reserved.
\r
7 * Redistribution and use in source and binary forms, with or
\r
8 * without modification, are permitted provided that the following
\r
9 * conditions are met:
\r
11 * 1. Redistributions of source code must retain the above
\r
12 * copyright notice, this list of conditions and the following
\r
15 * 2. Redistributions in binary form must reproduce the above
\r
16 * copyright notice, this list of conditions and the following
\r
17 * disclaimer in the documentation and/or other materials
\r
18 * provided with the distribution.
\r
21 * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
\r
22 * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
\r
23 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
\r
24 * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
\r
25 * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
\r
26 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
\r
27 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
\r
28 * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
\r
29 * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
\r
30 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
\r
31 * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
\r
32 * POSSIBILITY OF SUCH DAMAGE.
\r
34 * The views and conclusions contained in the software and
\r
35 * documentation are those of the authors and should not be
\r
36 * interpreted as representing official policies, either expressed
\r
37 * or implied, of GRNET S.A.
\r
39 * -----------------------------------------------------------------------
\r
42 using System.Collections.Concurrent;
\r
43 using System.Diagnostics.Contracts;
\r
45 using System.Reflection;
\r
46 using System.ServiceModel.Channels;
\r
47 using System.Threading;
\r
48 using System.Threading.Tasks;
\r
49 using System.Threading.Tasks.Dataflow;
\r
50 using OpenSSL.Crypto;
\r
52 namespace Pithos.Network
\r
57 /// TODO: Update summary.
\r
59 public static class BlockHashAlgorithms
\r
61 private static readonly log4net.ILog Log = log4net.LogManager.GetLogger(MethodBase.GetCurrentMethod().DeclaringType);
\r
64 public static Func<FileStream, int, string, ConcurrentDictionary<int, byte[]>, int, Task<ConcurrentDictionary<int, byte[]>>> CalculateBlockHash;
\r
66 public static Task<ConcurrentDictionary<int, byte[]>> CalculateBlockHashesRecursiveAsync(FileStream stream, int blockSize, string algorithm, ConcurrentDictionary<int, byte[]> hashes = null, int index = 0)
\r
69 throw new ArgumentNullException("stream");
\r
70 if (String.IsNullOrWhiteSpace(algorithm))
\r
71 throw new ArgumentNullException("algorithm");
\r
73 throw new ArgumentOutOfRangeException("blockSize", "blockSize must be a value greater than zero ");
\r
75 throw new ArgumentOutOfRangeException("index", "index must be a non-negative value");
\r
76 Contract.EndContractBlock();
\r
80 hashes = new ConcurrentDictionary<int, byte[]>();
\r
82 var buffer = new byte[blockSize];
\r
83 return stream.ReadAsync(buffer, 0, blockSize).ContinueWith(t =>
\r
85 var read = t.Result;
\r
87 var nextTask = read == blockSize
\r
88 ? CalculateBlockHashesRecursiveAsync(stream, blockSize, algorithm, hashes, index + 1)
\r
89 : Task.Factory.StartNew(() => hashes);
\r
91 using (var hasher = HashAlgorithm.Create(algorithm))
\r
93 //This code was added for compatibility with the way Pithos calculates the last hash
\r
94 //We calculate the hash only up to the last non-null byte
\r
95 var lastByteIndex = Array.FindLastIndex(buffer, read - 1, aByte => aByte != 0);
\r
97 var hash = hasher.ComputeHash(buffer, 0, lastByteIndex + 1);
\r
98 hashes[index] = hash;
\r
103 public static async Task<ConcurrentDictionary<int, byte[]>> CalculateBlockHashesAsync(FileStream stream, int blockSize, string algorithm, ConcurrentDictionary<int, byte[]> hashes = null, int index = 0)
\r
105 if (stream == null)
\r
106 throw new ArgumentNullException("stream");
\r
107 if (String.IsNullOrWhiteSpace(algorithm))
\r
108 throw new ArgumentNullException("algorithm");
\r
109 if (blockSize <= 0)
\r
110 throw new ArgumentOutOfRangeException("blockSize", "blockSize must be a value greater than zero ");
\r
111 Contract.EndContractBlock();
\r
114 if (hashes == null)
\r
115 hashes = new ConcurrentDictionary<int, byte[]>();
\r
117 var buffer = new byte[blockSize];
\r
119 while ((read = await stream.ReadAsync(buffer, 0, blockSize)) > 0)
\r
121 using (var hasher = HashAlgorithm.Create(algorithm))
\r
123 //This code was added for compatibility with the way Pithos calculates the last hash
\r
124 //We calculate the hash only up to the last non-null byte
\r
125 var lastByteIndex = Array.FindLastIndex(buffer, read - 1, aByte => aByte != 0);
\r
127 var hash = hasher.ComputeHash(buffer, 0, lastByteIndex + 1);
\r
128 hashes[index] = hash;
\r
135 public static async Task<ConcurrentDictionary<int, byte[]>> CalculateBlockHashesAgentAsync(FileStream stream, int blockSize, string algorithm, int parallelism)
\r
137 if (stream == null)
\r
138 throw new ArgumentNullException("stream");
\r
139 if (String.IsNullOrWhiteSpace(algorithm))
\r
140 throw new ArgumentNullException("algorithm");
\r
141 if (blockSize <= 0)
\r
142 throw new ArgumentOutOfRangeException("blockSize", "blockSize must be a value greater than zero ");
\r
143 Contract.EndContractBlock();
\r
145 var hashes = new ConcurrentDictionary<int, byte[]>();
\r
147 var path = stream.Name;
\r
148 var size = stream.Length;
\r
149 Log.DebugFormat("Hashing [{0}] size [{1}]",path,size);
\r
151 var options = new ExecutionDataflowBlockOptions {BoundedCapacity = parallelism,MaxDegreeOfParallelism=parallelism};
\r
152 var hashBlock=new ActionBlock<Tuple<int,byte[]>>(input=>
\r
154 int idx = input.Item1;
\r
155 byte[] block = input.Item2;
\r
156 using (var hasher = HashAlgorithm.Create(algorithm))
\r
158 //This code was added for compatibility with the way Pithos calculates the last hash
\r
159 //We calculate the hash only up to the last non-null byte
\r
160 var lastByteIndex = Array.FindLastIndex(block, block.Length-1, aByte => aByte != 0);
\r
162 var hash = hasher.ComputeHash(block, 0, lastByteIndex + 1);
\r
163 hashes[idx] = hash;
\r
167 var buffer = new byte[blockSize];
\r
170 while ((read = await stream.ReadAsync(buffer, 0, blockSize)) > 0)
\r
172 var block = new byte[read];
\r
173 Buffer.BlockCopy(buffer, 0, block, 0, read);
\r
174 await hashBlock.SendAsync(Tuple.Create(index, block));
\r
179 hashBlock.Complete();
\r
180 await hashBlock.Completion;
\r
185 public static async Task<ConcurrentDictionary<int, byte[]>> CalculateBlockHashesInPlace(FileStream stream, int blockSize, string algorithm, int parallelism)
\r
187 if (stream == null)
\r
188 throw new ArgumentNullException("stream");
\r
189 if (String.IsNullOrWhiteSpace(algorithm))
\r
190 throw new ArgumentNullException("algorithm");
\r
191 if (blockSize <= 0)
\r
192 throw new ArgumentOutOfRangeException("blockSize", "blockSize must be a value greater than zero ");
\r
193 Contract.EndContractBlock();
\r
195 var hashes = new ConcurrentDictionary<int, byte[]>();
\r
197 var path = stream.Name;
\r
198 var size = stream.Length;
\r
199 Log.DebugFormat("Hashing [{0}] size [{1}]",path,size);
\r
202 var buffer = new byte[blockSize];
\r
204 using (var hasher = HashAlgorithm.Create(algorithm))
\r
207 while ((read = await stream.ReadAsync(buffer, 0, blockSize)) > 0)
\r
209 //This code was added for compatibility with the way Pithos calculates the last hash
\r
210 //We calculate the hash only up to the last non-null byte
\r
211 var lastByteIndex = Array.FindLastIndex(buffer, read - 1, aByte => aByte != 0);
\r
213 var hash = hasher.ComputeHash(buffer, 0, lastByteIndex + 1);
\r
214 Log.DebugFormat("Hashed [{0}] [{1}/{2}] [{3:p}]", path, index,size,(double)index/size);
\r
215 hashes[index] = hash;
\r
223 private static BufferManager _bufferMgr;
\r
225 private static BufferManager GetBufferManager(int blockSize,int parallelism)
\r
227 Interlocked.CompareExchange(ref _bufferMgr,
\r
228 BufferManager.CreateBufferManager(parallelism*blockSize, blockSize),
\r
233 //public static async Task<ConcurrentDictionary<long, byte[]>> CalculateBlockHashesInPlacePFor(FileStream stream, int blockSize, string algorithm, int parallelism, Action<long, byte[], int> postAction,CancellationToken token, IProgress<double> progress)
\r
234 public static async Task<ConcurrentDictionary<long, byte[]>> CalculateBlockHashesInPlacePFor(FileStream stream, int blockSize, string algorithm, byte parallelism, CancellationToken token, IProgress<HashProgress> progress)
\r
236 if (stream == null)
\r
237 throw new ArgumentNullException("stream");
\r
238 if (String.IsNullOrWhiteSpace(algorithm))
\r
239 throw new ArgumentNullException("algorithm");
\r
240 if (blockSize <= 0)
\r
241 throw new ArgumentOutOfRangeException("blockSize", "blockSize must be a value greater than zero ");
\r
242 Contract.EndContractBlock();
\r
244 var hashes = new ConcurrentDictionary<long, byte[]>();
\r
246 var path = stream.Name;
\r
247 var size = stream.Length;
\r
248 Log.DebugFormat("Hashing [{0}] size [{1}]", path, size);
\r
250 //TODO: Handle zero-length files
\r
253 var buf = new byte[0];
\r
254 using (var hasher = new MessageDigestContext(MessageDigest.CreateByName(algorithm)))
\r
257 hashes[0] = hasher.Digest(buf);
\r
262 var blocks = size<blockSize?1:size/blockSize;
\r
264 var actualHashers = parallelism > blocks ? (byte)blocks : parallelism;
\r
266 var buffer = new byte[actualHashers][];
\r
267 var hashers = new MessageDigestContext[actualHashers];
\r
268 var bufferManager = GetBufferManager(blockSize, actualHashers);
\r
269 for (var i = 0; i < actualHashers; i++)
\r
271 buffer[i] = bufferManager.TakeBuffer(blockSize);// new byte[blockSize];
\r
272 hashers[i] = new MessageDigestContext(MessageDigest.CreateByName(algorithm));
\r
277 var indices = new long[actualHashers];
\r
278 var bufferCount = new int[actualHashers];
\r
286 while ((read = await stream.ReadAsync(buffer[bufIdx], 0, blockSize).ConfigureAwait(false)) > 0)
\r
289 indices[bufIdx] = index;
\r
290 bufferCount[bufIdx] = read;
\r
291 //postAction(block++, buffer[bufIdx], read);
\r
293 //If we have filled the last buffer or if we have read from the last block,
\r
294 //we can calculate the clocks in parallel
\r
295 if (bufIdx == actualHashers - 1 || read < blockSize)
\r
297 var options = new ParallelOptions {MaxDegreeOfParallelism = actualHashers,CancellationToken=token};
\r
299 Parallel.For(0, bufIdx + 1, options,(idx,state) =>
\r
301 //This code was added for compatibility with the way Pithos calculates the last hash
\r
302 //We calculate the hash only up to the last non-null byte
\r
303 options.CancellationToken.ThrowIfCancellationRequested();
\r
304 var lastByteIndex = Array.FindLastIndex(buffer[idx],
\r
305 bufferCount[idx] - 1,
\r
306 aByte => aByte != 0);
\r
308 var hasher = hashers[idx];
\r
311 if (buffer[idx].Length == lastByteIndex || lastByteIndex==-1)
\r
312 hash = hasher.Digest(buffer[idx]);
\r
315 var buf=new byte[lastByteIndex+1];
\r
316 Buffer.BlockCopy(buffer[idx],0,buf,0,lastByteIndex+1);
\r
317 hash = hasher.Digest(buf);
\r
322 var filePosition = indices[idx];
\r
324 Trace.TraceInformation("Hashed [{0}] [{1}/{2}] [{3:p}]", path,
\r
325 filePosition, size,
\r
326 (double)filePosition / size);
\r
328 hashes[filePosition] = hash;
\r
329 //Do not report for files smaller than 4MB
\r
330 if (progress != null && stream.Length > 4*1024*1024)
\r
331 progress.Report(new HashProgress((long)hashes.Count*blockSize,stream.Length));
\r
334 bufIdx = (bufIdx +1)%actualHashers;
\r
339 for (var i = 0; i < actualHashers; i++)
\r
341 if (hashers[i] != null)
\r
342 hashers[i].Dispose();
\r
343 bufferManager.ReturnBuffer(buffer[i]);
\r
351 static BlockHashAlgorithms()
\r
354 CalculateBlockHash = CalculateBlockHashesRecursiveAsync;
\r