2 /* -----------------------------------------------------------------------
\r
3 * <copyright file="BlockHashAlgorithms.cs" company="GRNet">
\r
5 * Copyright 2011-2012 GRNET S.A. All rights reserved.
\r
7 * Redistribution and use in source and binary forms, with or
\r
8 * without modification, are permitted provided that the following
\r
9 * conditions are met:
\r
11 * 1. Redistributions of source code must retain the above
\r
12 * copyright notice, this list of conditions and the following
\r
15 * 2. Redistributions in binary form must reproduce the above
\r
16 * copyright notice, this list of conditions and the following
\r
17 * disclaimer in the documentation and/or other materials
\r
18 * provided with the distribution.
\r
21 * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
\r
22 * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
\r
23 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
\r
24 * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
\r
25 * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
\r
26 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
\r
27 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
\r
28 * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
\r
29 * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
\r
30 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
\r
31 * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
\r
32 * POSSIBILITY OF SUCH DAMAGE.
\r
34 * The views and conclusions contained in the software and
\r
35 * documentation are those of the authors and should not be
\r
36 * interpreted as representing official policies, either expressed
\r
37 * or implied, of GRNET S.A.
\r
39 * -----------------------------------------------------------------------
\r
42 using System.Collections.Concurrent;
\r
43 using System.Diagnostics.Contracts;
\r
45 using System.Reflection;
\r
46 using System.Security.Cryptography;
\r
47 using System.ServiceModel.Channels;
\r
48 using System.Threading;
\r
49 using System.Threading.Tasks;
\r
50 using System.Threading.Tasks.Dataflow;
\r
52 namespace Pithos.Network
\r
57 /// TODO: Update summary.
\r
59 public static class BlockHashAlgorithms
\r
61 private static readonly log4net.ILog Log = log4net.LogManager.GetLogger(MethodBase.GetCurrentMethod().DeclaringType);
\r
64 public static Func<FileStream, int, string, ConcurrentDictionary<int, byte[]>, int, Task<ConcurrentDictionary<int, byte[]>>> CalculateBlockHash;
\r
66 public static Task<ConcurrentDictionary<int, byte[]>> CalculateBlockHashesRecursiveAsync(FileStream stream, int blockSize, string algorithm, ConcurrentDictionary<int, byte[]> hashes = null, int index = 0)
\r
69 throw new ArgumentNullException("stream");
\r
70 if (String.IsNullOrWhiteSpace(algorithm))
\r
71 throw new ArgumentNullException("algorithm");
\r
73 throw new ArgumentOutOfRangeException("blockSize", "blockSize must be a value greater than zero ");
\r
75 throw new ArgumentOutOfRangeException("index", "index must be a non-negative value");
\r
76 Contract.EndContractBlock();
\r
80 hashes = new ConcurrentDictionary<int, byte[]>();
\r
82 var buffer = new byte[blockSize];
\r
83 return stream.ReadAsync(buffer, 0, blockSize).ContinueWith(t =>
\r
85 var read = t.Result;
\r
87 var nextTask = read == blockSize
\r
88 ? CalculateBlockHashesRecursiveAsync(stream, blockSize, algorithm, hashes, index + 1)
\r
89 : Task.Factory.StartNew(() => hashes);
\r
91 using (var hasher = HashAlgorithm.Create(algorithm))
\r
93 //This code was added for compatibility with the way Pithos calculates the last hash
\r
94 //We calculate the hash only up to the last non-null byte
\r
95 var lastByteIndex = Array.FindLastIndex(buffer, read - 1, aByte => aByte != 0);
\r
97 var hash = hasher.ComputeHash(buffer, 0, lastByteIndex + 1);
\r
98 hashes[index] = hash;
\r
103 public static async Task<ConcurrentDictionary<int, byte[]>> CalculateBlockHashesAsync(FileStream stream, int blockSize, string algorithm, ConcurrentDictionary<int, byte[]> hashes = null, int index = 0)
\r
105 if (stream == null)
\r
106 throw new ArgumentNullException("stream");
\r
107 if (String.IsNullOrWhiteSpace(algorithm))
\r
108 throw new ArgumentNullException("algorithm");
\r
109 if (blockSize <= 0)
\r
110 throw new ArgumentOutOfRangeException("blockSize", "blockSize must be a value greater than zero ");
\r
111 Contract.EndContractBlock();
\r
114 if (hashes == null)
\r
115 hashes = new ConcurrentDictionary<int, byte[]>();
\r
117 var buffer = new byte[blockSize];
\r
119 while ((read = await stream.ReadAsync(buffer, 0, blockSize)) > 0)
\r
121 using (var hasher = HashAlgorithm.Create(algorithm))
\r
123 //This code was added for compatibility with the way Pithos calculates the last hash
\r
124 //We calculate the hash only up to the last non-null byte
\r
125 var lastByteIndex = Array.FindLastIndex(buffer, read - 1, aByte => aByte != 0);
\r
127 var hash = hasher.ComputeHash(buffer, 0, lastByteIndex + 1);
\r
128 hashes[index] = hash;
\r
135 public static async Task<ConcurrentDictionary<int, byte[]>> CalculateBlockHashesAgentAsync(FileStream stream, int blockSize, string algorithm, int parallelism)
\r
137 if (stream == null)
\r
138 throw new ArgumentNullException("stream");
\r
139 if (String.IsNullOrWhiteSpace(algorithm))
\r
140 throw new ArgumentNullException("algorithm");
\r
141 if (blockSize <= 0)
\r
142 throw new ArgumentOutOfRangeException("blockSize", "blockSize must be a value greater than zero ");
\r
143 Contract.EndContractBlock();
\r
145 var hashes = new ConcurrentDictionary<int, byte[]>();
\r
147 var path = stream.Name;
\r
148 var size = stream.Length;
\r
149 Log.DebugFormat("Hashing [{0}] size [{1}]",path,size);
\r
151 var options = new ExecutionDataflowBlockOptions {BoundedCapacity = parallelism,MaxDegreeOfParallelism=parallelism};
\r
152 var hashBlock=new ActionBlock<Tuple<int,byte[]>>(input=>
\r
154 int idx = input.Item1;
\r
155 byte[] block = input.Item2;
\r
156 using (var hasher = HashAlgorithm.Create(algorithm))
\r
158 //This code was added for compatibility with the way Pithos calculates the last hash
\r
159 //We calculate the hash only up to the last non-null byte
\r
160 var lastByteIndex = Array.FindLastIndex(block, block.Length-1, aByte => aByte != 0);
\r
162 var hash = hasher.ComputeHash(block, 0, lastByteIndex + 1);
\r
163 hashes[idx] = hash;
\r
167 var buffer = new byte[blockSize];
\r
170 while ((read = await stream.ReadAsync(buffer, 0, blockSize)) > 0)
\r
172 var block = new byte[read];
\r
173 Buffer.BlockCopy(buffer, 0, block, 0, read);
\r
174 await hashBlock.SendAsync(Tuple.Create(index, block));
\r
179 hashBlock.Complete();
\r
180 await hashBlock.Completion;
\r
185 public static async Task<ConcurrentDictionary<int, byte[]>> CalculateBlockHashesInPlace(FileStream stream, int blockSize, string algorithm, int parallelism)
\r
187 if (stream == null)
\r
188 throw new ArgumentNullException("stream");
\r
189 if (String.IsNullOrWhiteSpace(algorithm))
\r
190 throw new ArgumentNullException("algorithm");
\r
191 if (blockSize <= 0)
\r
192 throw new ArgumentOutOfRangeException("blockSize", "blockSize must be a value greater than zero ");
\r
193 Contract.EndContractBlock();
\r
195 var hashes = new ConcurrentDictionary<int, byte[]>();
\r
197 var path = stream.Name;
\r
198 var size = stream.Length;
\r
199 Log.DebugFormat("Hashing [{0}] size [{1}]",path,size);
\r
202 var buffer = new byte[blockSize];
\r
204 using (var hasher = HashAlgorithm.Create(algorithm))
\r
207 while ((read = await stream.ReadAsync(buffer, 0, blockSize)) > 0)
\r
209 //This code was added for compatibility with the way Pithos calculates the last hash
\r
210 //We calculate the hash only up to the last non-null byte
\r
211 var lastByteIndex = Array.FindLastIndex(buffer, read - 1, aByte => aByte != 0);
\r
213 var hash = hasher.ComputeHash(buffer, 0, lastByteIndex + 1);
\r
214 Log.DebugFormat("Hashed [{0}] [{1}/{2}] [{3:p}]", path, index,size,(double)index/size);
\r
215 hashes[index] = hash;
\r
223 private static BufferManager _bufferMgr;
\r
225 private static BufferManager GetBufferManager(int blockSize,int parallelism)
\r
227 Interlocked.CompareExchange(ref _bufferMgr,
\r
228 BufferManager.CreateBufferManager(parallelism*blockSize, blockSize),
\r
233 public static async Task<ConcurrentDictionary<long, byte[]>> CalculateBlockHashesInPlacePFor(FileStream stream, int blockSize, string algorithm, int parallelism,IProgress<double> progress )
\r
235 if (stream == null)
\r
236 throw new ArgumentNullException("stream");
\r
237 if (String.IsNullOrWhiteSpace(algorithm))
\r
238 throw new ArgumentNullException("algorithm");
\r
239 if (blockSize <= 0)
\r
240 throw new ArgumentOutOfRangeException("blockSize", "blockSize must be a value greater than zero ");
\r
241 Contract.EndContractBlock();
\r
243 var hashes = new ConcurrentDictionary<long, byte[]>();
\r
245 var path = stream.Name;
\r
246 var size = stream.Length;
\r
247 Log.DebugFormat("Hashing [{0}] size [{1}]", path, size);
\r
249 //TODO: Handle zero-length files
\r
252 var buf = new byte[0];
\r
253 var hasher=HashAlgorithm.Create(algorithm);
\r
254 hashes[0]=hasher.ComputeHash(buf);
\r
259 var buffer = new byte[parallelism][];
\r
260 var hashers = new HashAlgorithm[parallelism];
\r
261 var bufferManager = GetBufferManager(blockSize, parallelism);
\r
262 for (var i = 0; i < parallelism; i++)
\r
264 buffer[i] = bufferManager.TakeBuffer(blockSize);// new byte[blockSize];
\r
265 hashers[i] = HashAlgorithm.Create(algorithm);
\r
269 var indices = new long[parallelism];
\r
270 var bufferCount = new int[parallelism];
\r
277 while ((read = await stream.ReadAsync(buffer[bufIdx], 0, blockSize).ConfigureAwait(false)) > 0)
\r
280 indices[bufIdx] = index;
\r
281 bufferCount[bufIdx] = read;
\r
282 //If we have filled the last buffer or if we have read from the last block,
\r
283 //we can calculate the clocks in parallel
\r
284 if (bufIdx == parallelism - 1 || read < blockSize)
\r
286 //var options = new ParallelOptions {MaxDegreeOfParallelism = parallelism};
\r
287 Parallel.For(0, bufIdx + 1, idx =>
\r
289 //This code was added for compatibility with the way Pithos calculates the last hash
\r
290 //We calculate the hash only up to the last non-null byte
\r
291 var lastByteIndex = Array.FindLastIndex(buffer[idx],
\r
292 bufferCount[idx] - 1,
\r
293 aByte => aByte != 0);
\r
295 var hasher = hashers[idx];
\r
296 var hash = hasher.ComputeHash(buffer[idx], 0,
\r
297 lastByteIndex + 1);
\r
298 var filePosition = indices[idx];
\r
300 Trace.TraceInformation("Hashed [{0}] [{1}/{2}] [{3:p}]", path,
\r
301 filePosition, size,
\r
302 (double)filePosition / size);
\r
304 hashes[filePosition] = hash;
\r
305 progress.Report((long)hashes.Count*blockSize*1.0/stream.Length);
\r
308 bufIdx = (bufIdx + 1)%parallelism;
\r
313 for (var i = 0; i < parallelism; i++)
\r
315 if (hashers[i] != null)
\r
316 hashers[i].Dispose();
\r
317 bufferManager.ReturnBuffer(buffer[i]);
\r
325 static BlockHashAlgorithms()
\r
328 CalculateBlockHash = CalculateBlockHashesRecursiveAsync;
\r