#region /* ----------------------------------------------------------------------- * * * Copyright 2011-2012 GRNET S.A. All rights reserved. * * Redistribution and use in source and binary forms, with or * without modification, are permitted provided that the following * conditions are met: * * 1. Redistributions of source code must retain the above * copyright notice, this list of conditions and the following * disclaimer. * * 2. Redistributions in binary form must reproduce the above * copyright notice, this list of conditions and the following * disclaimer in the documentation and/or other materials * provided with the distribution. * * * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE * POSSIBILITY OF SUCH DAMAGE. * * The views and conclusions contained in the software and * documentation are those of the authors and should not be * interpreted as representing official policies, either expressed * or implied, of GRNET S.A. * * ----------------------------------------------------------------------- */ #endregion using System.Collections.Concurrent; using System.Diagnostics.Contracts; using System.IO; using System.Reflection; using System.ServiceModel.Channels; using System.Threading; using System.Threading.Tasks; using System.Threading.Tasks.Dataflow; using OpenSSL.Crypto; namespace Pithos.Network { using System; /// /// TODO: Update summary. /// public static class BlockHashAlgorithms { private static readonly log4net.ILog Log = log4net.LogManager.GetLogger(MethodBase.GetCurrentMethod().DeclaringType); /* public static Func, int, Task>> CalculateBlockHash; public static Task> CalculateBlockHashesRecursiveAsync(FileStream stream, int blockSize, string algorithm, ConcurrentDictionary hashes = null, int index = 0) { if (stream == null) throw new ArgumentNullException("stream"); if (String.IsNullOrWhiteSpace(algorithm)) throw new ArgumentNullException("algorithm"); if (blockSize <= 0) throw new ArgumentOutOfRangeException("blockSize", "blockSize must be a value greater than zero "); if (index < 0) throw new ArgumentOutOfRangeException("index", "index must be a non-negative value"); Contract.EndContractBlock(); if (hashes == null) hashes = new ConcurrentDictionary(); var buffer = new byte[blockSize]; return stream.ReadAsync(buffer, 0, blockSize).ContinueWith(t => { var read = t.Result; var nextTask = read == blockSize ? CalculateBlockHashesRecursiveAsync(stream, blockSize, algorithm, hashes, index + 1) : Task.Factory.StartNew(() => hashes); if (read > 0) using (var hasher = HashAlgorithm.Create(algorithm)) { //This code was added for compatibility with the way Pithos calculates the last hash //We calculate the hash only up to the last non-null byte var lastByteIndex = Array.FindLastIndex(buffer, read - 1, aByte => aByte != 0); var hash = hasher.ComputeHash(buffer, 0, lastByteIndex + 1); hashes[index] = hash; } return nextTask; }).Unwrap(); } public static async Task> CalculateBlockHashesAsync(FileStream stream, int blockSize, string algorithm, ConcurrentDictionary hashes = null, int index = 0) { if (stream == null) throw new ArgumentNullException("stream"); if (String.IsNullOrWhiteSpace(algorithm)) throw new ArgumentNullException("algorithm"); if (blockSize <= 0) throw new ArgumentOutOfRangeException("blockSize", "blockSize must be a value greater than zero "); Contract.EndContractBlock(); if (hashes == null) hashes = new ConcurrentDictionary(); var buffer = new byte[blockSize]; int read; while ((read = await stream.ReadAsync(buffer, 0, blockSize)) > 0) { using (var hasher = HashAlgorithm.Create(algorithm)) { //This code was added for compatibility with the way Pithos calculates the last hash //We calculate the hash only up to the last non-null byte var lastByteIndex = Array.FindLastIndex(buffer, read - 1, aByte => aByte != 0); var hash = hasher.ComputeHash(buffer, 0, lastByteIndex + 1); hashes[index] = hash; } index += read; } return hashes; } public static async Task> CalculateBlockHashesAgentAsync(FileStream stream, int blockSize, string algorithm, int parallelism) { if (stream == null) throw new ArgumentNullException("stream"); if (String.IsNullOrWhiteSpace(algorithm)) throw new ArgumentNullException("algorithm"); if (blockSize <= 0) throw new ArgumentOutOfRangeException("blockSize", "blockSize must be a value greater than zero "); Contract.EndContractBlock(); var hashes = new ConcurrentDictionary(); var path = stream.Name; var size = stream.Length; Log.DebugFormat("Hashing [{0}] size [{1}]",path,size); var options = new ExecutionDataflowBlockOptions {BoundedCapacity = parallelism,MaxDegreeOfParallelism=parallelism}; var hashBlock=new ActionBlock>(input=> { int idx = input.Item1; byte[] block = input.Item2; using (var hasher = HashAlgorithm.Create(algorithm)) { //This code was added for compatibility with the way Pithos calculates the last hash //We calculate the hash only up to the last non-null byte var lastByteIndex = Array.FindLastIndex(block, block.Length-1, aByte => aByte != 0); var hash = hasher.ComputeHash(block, 0, lastByteIndex + 1); hashes[idx] = hash; } },options); var buffer = new byte[blockSize]; int read; int index = 0; while ((read = await stream.ReadAsync(buffer, 0, blockSize)) > 0) { var block = new byte[read]; Buffer.BlockCopy(buffer, 0, block, 0, read); await hashBlock.SendAsync(Tuple.Create(index, block)); index += read; } hashBlock.Complete(); await hashBlock.Completion; return hashes; } public static async Task> CalculateBlockHashesInPlace(FileStream stream, int blockSize, string algorithm, int parallelism) { if (stream == null) throw new ArgumentNullException("stream"); if (String.IsNullOrWhiteSpace(algorithm)) throw new ArgumentNullException("algorithm"); if (blockSize <= 0) throw new ArgumentOutOfRangeException("blockSize", "blockSize must be a value greater than zero "); Contract.EndContractBlock(); var hashes = new ConcurrentDictionary(); var path = stream.Name; var size = stream.Length; Log.DebugFormat("Hashing [{0}] size [{1}]",path,size); var buffer = new byte[blockSize]; var index = 0; using (var hasher = HashAlgorithm.Create(algorithm)) { int read; while ((read = await stream.ReadAsync(buffer, 0, blockSize)) > 0) { //This code was added for compatibility with the way Pithos calculates the last hash //We calculate the hash only up to the last non-null byte var lastByteIndex = Array.FindLastIndex(buffer, read - 1, aByte => aByte != 0); var hash = hasher.ComputeHash(buffer, 0, lastByteIndex + 1); Log.DebugFormat("Hashed [{0}] [{1}/{2}] [{3:p}]", path, index,size,(double)index/size); hashes[index] = hash; index += read; } } return hashes; } */ private static BufferManager _bufferMgr; private static BufferManager GetBufferManager(int blockSize,int parallelism) { Interlocked.CompareExchange(ref _bufferMgr, BufferManager.CreateBufferManager(parallelism*blockSize, blockSize), null); return _bufferMgr; } public static async Task> CalculateBlockHashesInPlacePFor(FileStream stream, int blockSize, string algorithm, int parallelism, Action postAction,CancellationToken token, IProgress progress) { if (stream == null) throw new ArgumentNullException("stream"); if (String.IsNullOrWhiteSpace(algorithm)) throw new ArgumentNullException("algorithm"); if (blockSize <= 0) throw new ArgumentOutOfRangeException("blockSize", "blockSize must be a value greater than zero "); Contract.EndContractBlock(); var hashes = new ConcurrentDictionary(); var path = stream.Name; var size = stream.Length; Log.DebugFormat("Hashing [{0}] size [{1}]", path, size); //TODO: Handle zero-length files if (size == 0) { var buf = new byte[0]; using (var hasher = new MessageDigestContext(MessageDigest.CreateByName(algorithm))) { hasher.Init(); hashes[0] = hasher.Digest(buf); return hashes; } } var buffer = new byte[parallelism][]; var hashers = new MessageDigestContext[parallelism]; var bufferManager = GetBufferManager(blockSize, parallelism); for (var i = 0; i < parallelism; i++) { buffer[i] = bufferManager.TakeBuffer(blockSize);// new byte[blockSize]; hashers[i] = new MessageDigestContext(MessageDigest.CreateByName(algorithm)); hashers[i].Init(); } try { var indices = new long[parallelism]; var bufferCount = new int[parallelism]; int read; int bufIdx = 0; long index = 0; long block = 0; while ((read = await stream.ReadAsync(buffer[bufIdx], 0, blockSize).ConfigureAwait(false)) > 0) { index += read; indices[bufIdx] = index; bufferCount[bufIdx] = read; postAction(block++, buffer[bufIdx], read); //If we have filled the last buffer or if we have read from the last block, //we can calculate the clocks in parallel if (bufIdx == parallelism - 1 || read < blockSize) { var options = new ParallelOptions {MaxDegreeOfParallelism = parallelism,CancellationToken=token}; Parallel.For(0, bufIdx + 1, options,(idx,state) => { //This code was added for compatibility with the way Pithos calculates the last hash //We calculate the hash only up to the last non-null byte options.CancellationToken.ThrowIfCancellationRequested(); var lastByteIndex = Array.FindLastIndex(buffer[idx], bufferCount[idx] - 1, aByte => aByte != 0); var hasher = hashers[idx]; byte[] hash; if (buffer[idx].Length == lastByteIndex || lastByteIndex==-1) hash = hasher.Digest(buffer[idx]); else { var buf=new byte[lastByteIndex]; Buffer.BlockCopy(buffer[idx],0,buf,0,lastByteIndex); hash = hasher.Digest(buf); } var filePosition = indices[idx]; /* Trace.TraceInformation("Hashed [{0}] [{1}/{2}] [{3:p}]", path, filePosition, size, (double)filePosition / size); */ hashes[filePosition] = hash; progress.Report((long)hashes.Count*blockSize*1.0/stream.Length); }); } bufIdx = (bufIdx +1)%parallelism; } } finally { for (var i = 0; i < parallelism; i++) { if (hashers[i] != null) hashers[i].Dispose(); bufferManager.ReturnBuffer(buffer[i]); } } return hashes; } static BlockHashAlgorithms() { /* CalculateBlockHash = CalculateBlockHashesRecursiveAsync; */ } } }