2 /* -----------------------------------------------------------------------
\r
3 * <copyright file="BlockHashAlgorithms.cs" company="GRNet">
\r
5 * Copyright 2011-2012 GRNET S.A. All rights reserved.
\r
7 * Redistribution and use in source and binary forms, with or
\r
8 * without modification, are permitted provided that the following
\r
9 * conditions are met:
\r
11 * 1. Redistributions of source code must retain the above
\r
12 * copyright notice, this list of conditions and the following
\r
15 * 2. Redistributions in binary form must reproduce the above
\r
16 * copyright notice, this list of conditions and the following
\r
17 * disclaimer in the documentation and/or other materials
\r
18 * provided with the distribution.
\r
21 * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
\r
22 * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
\r
23 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
\r
24 * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
\r
25 * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
\r
26 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
\r
27 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
\r
28 * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
\r
29 * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
\r
30 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
\r
31 * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
\r
32 * POSSIBILITY OF SUCH DAMAGE.
\r
34 * The views and conclusions contained in the software and
\r
35 * documentation are those of the authors and should not be
\r
36 * interpreted as representing official policies, either expressed
\r
37 * or implied, of GRNET S.A.
\r
39 * -----------------------------------------------------------------------
\r
42 using System.Collections.Concurrent;
\r
43 using System.Diagnostics.Contracts;
\r
45 using System.Security.Cryptography;
\r
46 using System.Threading.Tasks;
\r
47 using System.Threading.Tasks.Dataflow;
\r
49 namespace Pithos.Network
\r
52 using System.Collections.Generic;
\r
57 /// TODO: Update summary.
\r
59 public static class BlockHashAlgorithms
\r
61 public static Func<FileStream, int, string, ConcurrentDictionary<int, byte[]>, int, Task<ConcurrentDictionary<int, byte[]>>> CalculateBlockHash;
\r
63 public static Task<ConcurrentDictionary<int, byte[]>> CalculateBlockHashesRecursiveAsync(FileStream stream, int blockSize, string algorithm, ConcurrentDictionary<int, byte[]> hashes = null, int index = 0)
\r
66 throw new ArgumentNullException("stream");
\r
67 if (String.IsNullOrWhiteSpace(algorithm))
\r
68 throw new ArgumentNullException("algorithm");
\r
70 throw new ArgumentOutOfRangeException("blockSize", "blockSize must be a value greater than zero ");
\r
72 throw new ArgumentOutOfRangeException("index", "index must be a non-negative value");
\r
73 Contract.EndContractBlock();
\r
77 hashes = new ConcurrentDictionary<int, byte[]>();
\r
79 var buffer = new byte[blockSize];
\r
80 return stream.ReadAsync(buffer, 0, blockSize).ContinueWith(t =>
\r
82 var read = t.Result;
\r
84 var nextTask = read == blockSize
\r
85 ? CalculateBlockHashesRecursiveAsync(stream, blockSize, algorithm, hashes, index + 1)
\r
86 : Task.Factory.StartNew(() => hashes);
\r
88 using (var hasher = HashAlgorithm.Create(algorithm))
\r
90 //This code was added for compatibility with the way Pithos calculates the last hash
\r
91 //We calculate the hash only up to the last non-null byte
\r
92 var lastByteIndex = Array.FindLastIndex(buffer, read - 1, aByte => aByte != 0);
\r
94 var hash = hasher.ComputeHash(buffer, 0, lastByteIndex + 1);
\r
95 hashes[index] = hash;
\r
100 public static async Task<ConcurrentDictionary<int, byte[]>> CalculateBlockHashesAsync(FileStream stream, int blockSize, string algorithm, ConcurrentDictionary<int, byte[]> hashes = null, int index = 0)
\r
102 if (stream == null)
\r
103 throw new ArgumentNullException("stream");
\r
104 if (String.IsNullOrWhiteSpace(algorithm))
\r
105 throw new ArgumentNullException("algorithm");
\r
106 if (blockSize <= 0)
\r
107 throw new ArgumentOutOfRangeException("blockSize", "blockSize must be a value greater than zero ");
\r
108 Contract.EndContractBlock();
\r
111 if (hashes == null)
\r
112 hashes = new ConcurrentDictionary<int, byte[]>();
\r
114 var buffer = new byte[blockSize];
\r
116 while ((read = await stream.ReadAsync(buffer, 0, blockSize)) > 0)
\r
118 //TODO: identify the value of index
\r
120 using (var hasher = HashAlgorithm.Create(algorithm))
\r
122 //This code was added for compatibility with the way Pithos calculates the last hash
\r
123 //We calculate the hash only up to the last non-null byte
\r
124 var lastByteIndex = Array.FindLastIndex(buffer, read - 1, aByte => aByte != 0);
\r
126 var hash = hasher.ComputeHash(buffer, 0, lastByteIndex + 1);
\r
127 hashes[index] = hash;
\r
134 public static async Task<ConcurrentDictionary<int, byte[]>> CalculateBlockHashesAgentAsync(FileStream stream, int blockSize, string algorithm, int parallelism)
\r
136 if (stream == null)
\r
137 throw new ArgumentNullException("stream");
\r
138 if (String.IsNullOrWhiteSpace(algorithm))
\r
139 throw new ArgumentNullException("algorithm");
\r
140 if (blockSize <= 0)
\r
141 throw new ArgumentOutOfRangeException("blockSize", "blockSize must be a value greater than zero ");
\r
142 Contract.EndContractBlock();
\r
144 var hashes = new ConcurrentDictionary<int, byte[]>();
\r
146 var options = new ExecutionDataflowBlockOptions {BoundedCapacity = parallelism,MaxDegreeOfParallelism=parallelism};
\r
147 var hashBlock=new ActionBlock<Tuple<int,byte[]>>(input=>
\r
149 int idx = input.Item1;
\r
150 byte[] block = input.Item2;
\r
151 using (var hasher = HashAlgorithm.Create(algorithm))
\r
153 //This code was added for compatibility with the way Pithos calculates the last hash
\r
154 //We calculate the hash only up to the last non-null byte
\r
155 var lastByteIndex = Array.FindLastIndex(block, block.Length-1, aByte => aByte != 0);
\r
157 var hash = hasher.ComputeHash(block, 0, lastByteIndex + 1);
\r
158 hashes[idx] = hash;
\r
162 var buffer = new byte[blockSize];
\r
165 while ((read = await stream.ReadAsync(buffer, 0, blockSize)) > 0)
\r
167 var block = new byte[read];
\r
168 Buffer.BlockCopy(buffer,0,block,0,read);
\r
169 await hashBlock.SendAsync(Tuple.Create(index, block));
\r
173 hashBlock.Complete();
\r
174 await hashBlock.Completion;
\r
179 static BlockHashAlgorithms()
\r
181 CalculateBlockHash = CalculateBlockHashesRecursiveAsync;
\r