#region
/* -----------------------------------------------------------------------
*
*
* Copyright 2011-2012 GRNET S.A. All rights reserved.
*
* Redistribution and use in source and binary forms, with or
* without modification, are permitted provided that the following
* conditions are met:
*
* 1. Redistributions of source code must retain the above
* copyright notice, this list of conditions and the following
* disclaimer.
*
* 2. Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following
* disclaimer in the documentation and/or other materials
* provided with the distribution.
*
*
* THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
* OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
* WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
* PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
* CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
* USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
* AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
* LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
* ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*
* The views and conclusions contained in the software and
* documentation are those of the authors and should not be
* interpreted as representing official policies, either expressed
* or implied, of GRNET S.A.
*
* -----------------------------------------------------------------------
*/
#endregion
using System.Collections.Concurrent;
using System.Diagnostics.Contracts;
using System.IO;
using System.Reflection;
using System.Security.Cryptography;
using System.ServiceModel.Channels;
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
namespace Pithos.Network
{
using System;
///
/// TODO: Update summary.
///
public static class BlockHashAlgorithms
{
private static readonly log4net.ILog Log = log4net.LogManager.GetLogger(MethodBase.GetCurrentMethod().DeclaringType);
/*
public static Func, int, Task>> CalculateBlockHash;
public static Task> CalculateBlockHashesRecursiveAsync(FileStream stream, int blockSize, string algorithm, ConcurrentDictionary hashes = null, int index = 0)
{
if (stream == null)
throw new ArgumentNullException("stream");
if (String.IsNullOrWhiteSpace(algorithm))
throw new ArgumentNullException("algorithm");
if (blockSize <= 0)
throw new ArgumentOutOfRangeException("blockSize", "blockSize must be a value greater than zero ");
if (index < 0)
throw new ArgumentOutOfRangeException("index", "index must be a non-negative value");
Contract.EndContractBlock();
if (hashes == null)
hashes = new ConcurrentDictionary();
var buffer = new byte[blockSize];
return stream.ReadAsync(buffer, 0, blockSize).ContinueWith(t =>
{
var read = t.Result;
var nextTask = read == blockSize
? CalculateBlockHashesRecursiveAsync(stream, blockSize, algorithm, hashes, index + 1)
: Task.Factory.StartNew(() => hashes);
if (read > 0)
using (var hasher = HashAlgorithm.Create(algorithm))
{
//This code was added for compatibility with the way Pithos calculates the last hash
//We calculate the hash only up to the last non-null byte
var lastByteIndex = Array.FindLastIndex(buffer, read - 1, aByte => aByte != 0);
var hash = hasher.ComputeHash(buffer, 0, lastByteIndex + 1);
hashes[index] = hash;
}
return nextTask;
}).Unwrap();
}
public static async Task> CalculateBlockHashesAsync(FileStream stream, int blockSize, string algorithm, ConcurrentDictionary hashes = null, int index = 0)
{
if (stream == null)
throw new ArgumentNullException("stream");
if (String.IsNullOrWhiteSpace(algorithm))
throw new ArgumentNullException("algorithm");
if (blockSize <= 0)
throw new ArgumentOutOfRangeException("blockSize", "blockSize must be a value greater than zero ");
Contract.EndContractBlock();
if (hashes == null)
hashes = new ConcurrentDictionary();
var buffer = new byte[blockSize];
int read;
while ((read = await stream.ReadAsync(buffer, 0, blockSize)) > 0)
{
using (var hasher = HashAlgorithm.Create(algorithm))
{
//This code was added for compatibility with the way Pithos calculates the last hash
//We calculate the hash only up to the last non-null byte
var lastByteIndex = Array.FindLastIndex(buffer, read - 1, aByte => aByte != 0);
var hash = hasher.ComputeHash(buffer, 0, lastByteIndex + 1);
hashes[index] = hash;
}
index += read;
}
return hashes;
}
public static async Task> CalculateBlockHashesAgentAsync(FileStream stream, int blockSize, string algorithm, int parallelism)
{
if (stream == null)
throw new ArgumentNullException("stream");
if (String.IsNullOrWhiteSpace(algorithm))
throw new ArgumentNullException("algorithm");
if (blockSize <= 0)
throw new ArgumentOutOfRangeException("blockSize", "blockSize must be a value greater than zero ");
Contract.EndContractBlock();
var hashes = new ConcurrentDictionary();
var path = stream.Name;
var size = stream.Length;
Log.DebugFormat("Hashing [{0}] size [{1}]",path,size);
var options = new ExecutionDataflowBlockOptions {BoundedCapacity = parallelism,MaxDegreeOfParallelism=parallelism};
var hashBlock=new ActionBlock>(input=>
{
int idx = input.Item1;
byte[] block = input.Item2;
using (var hasher = HashAlgorithm.Create(algorithm))
{
//This code was added for compatibility with the way Pithos calculates the last hash
//We calculate the hash only up to the last non-null byte
var lastByteIndex = Array.FindLastIndex(block, block.Length-1, aByte => aByte != 0);
var hash = hasher.ComputeHash(block, 0, lastByteIndex + 1);
hashes[idx] = hash;
}
},options);
var buffer = new byte[blockSize];
int read;
int index = 0;
while ((read = await stream.ReadAsync(buffer, 0, blockSize)) > 0)
{
var block = new byte[read];
Buffer.BlockCopy(buffer, 0, block, 0, read);
await hashBlock.SendAsync(Tuple.Create(index, block));
index += read;
}
hashBlock.Complete();
await hashBlock.Completion;
return hashes;
}
public static async Task> CalculateBlockHashesInPlace(FileStream stream, int blockSize, string algorithm, int parallelism)
{
if (stream == null)
throw new ArgumentNullException("stream");
if (String.IsNullOrWhiteSpace(algorithm))
throw new ArgumentNullException("algorithm");
if (blockSize <= 0)
throw new ArgumentOutOfRangeException("blockSize", "blockSize must be a value greater than zero ");
Contract.EndContractBlock();
var hashes = new ConcurrentDictionary();
var path = stream.Name;
var size = stream.Length;
Log.DebugFormat("Hashing [{0}] size [{1}]",path,size);
var buffer = new byte[blockSize];
var index = 0;
using (var hasher = HashAlgorithm.Create(algorithm))
{
int read;
while ((read = await stream.ReadAsync(buffer, 0, blockSize)) > 0)
{
//This code was added for compatibility with the way Pithos calculates the last hash
//We calculate the hash only up to the last non-null byte
var lastByteIndex = Array.FindLastIndex(buffer, read - 1, aByte => aByte != 0);
var hash = hasher.ComputeHash(buffer, 0, lastByteIndex + 1);
Log.DebugFormat("Hashed [{0}] [{1}/{2}] [{3:p}]", path, index,size,(double)index/size);
hashes[index] = hash;
index += read;
}
}
return hashes;
}
*/
private static BufferManager _bufferMgr;
private static BufferManager GetBufferManager(int blockSize,int parallelism)
{
Interlocked.CompareExchange(ref _bufferMgr,
BufferManager.CreateBufferManager(parallelism*blockSize, blockSize),
null);
return _bufferMgr;
}
public static async Task> CalculateBlockHashesInPlacePFor(FileStream stream, int blockSize, string algorithm, int parallelism)
{
if (stream == null)
throw new ArgumentNullException("stream");
if (String.IsNullOrWhiteSpace(algorithm))
throw new ArgumentNullException("algorithm");
if (blockSize <= 0)
throw new ArgumentOutOfRangeException("blockSize", "blockSize must be a value greater than zero ");
Contract.EndContractBlock();
var hashes = new ConcurrentDictionary();
var path = stream.Name;
var size = stream.Length;
Log.DebugFormat("Hashing [{0}] size [{1}]", path, size);
//TODO: Handle zero-length files
if (size == 0)
{
var buf = new byte[0];
var hasher=HashAlgorithm.Create(algorithm);
hashes[0]=hasher.ComputeHash(buf);
return hashes;
}
var buffer = new byte[parallelism][];
var hashers = new HashAlgorithm[parallelism];
var bufferManager = GetBufferManager(blockSize, parallelism);
for (var i = 0; i < parallelism; i++)
{
buffer[i] = bufferManager.TakeBuffer(blockSize);// new byte[blockSize];
hashers[i] = HashAlgorithm.Create(algorithm);
}
try
{
var indices = new long[parallelism];
var bufferCount = new int[parallelism];
int read;
int bufIdx = 0;
long index = 0;
while ((read = await stream.ReadAsync(buffer[bufIdx], 0, blockSize).ConfigureAwait(false)) > 0)
{
index += read;
indices[bufIdx] = index;
bufferCount[bufIdx] = read;
//If we have filled the last buffer or if we have read from the last block,
//we can calculate the clocks in parallel
if (bufIdx == parallelism - 1 || read < blockSize)
{
//var options = new ParallelOptions {MaxDegreeOfParallelism = parallelism};
Parallel.For(0, bufIdx + 1, idx =>
{
//This code was added for compatibility with the way Pithos calculates the last hash
//We calculate the hash only up to the last non-null byte
var lastByteIndex = Array.FindLastIndex(buffer[idx],
bufferCount[idx] - 1,
aByte => aByte != 0);
var hasher = hashers[idx];
var hash = hasher.ComputeHash(buffer[idx], 0,
lastByteIndex + 1);
var filePosition = indices[idx];
/*
Trace.TraceInformation("Hashed [{0}] [{1}/{2}] [{3:p}]", path,
filePosition, size,
(double)filePosition / size);
*/
hashes[filePosition] = hash;
});
}
bufIdx = (bufIdx + 1)%parallelism;
}
}
finally
{
for (var i = 0; i < parallelism; i++)
{
if (hashers[i] != null)
hashers[i].Dispose();
bufferManager.ReturnBuffer(buffer[i]);
}
}
return hashes;
}
static BlockHashAlgorithms()
{
/*
CalculateBlockHash = CalculateBlockHashesRecursiveAsync;
*/
}
}
}