root / trunk / Pithos.Network / BlockHashAlgorithms.cs @ df12ed4c
History | View | Annotate | Download (17 kB)
1 |
#region |
---|---|
2 |
/* ----------------------------------------------------------------------- |
3 |
* <copyright file="BlockHashAlgorithms.cs" company="GRNet"> |
4 |
* |
5 |
* Copyright 2011-2012 GRNET S.A. All rights reserved. |
6 |
* |
7 |
* Redistribution and use in source and binary forms, with or |
8 |
* without modification, are permitted provided that the following |
9 |
* conditions are met: |
10 |
* |
11 |
* 1. Redistributions of source code must retain the above |
12 |
* copyright notice, this list of conditions and the following |
13 |
* disclaimer. |
14 |
* |
15 |
* 2. Redistributions in binary form must reproduce the above |
16 |
* copyright notice, this list of conditions and the following |
17 |
* disclaimer in the documentation and/or other materials |
18 |
* provided with the distribution. |
19 |
* |
20 |
* |
21 |
* THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS |
22 |
* OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED |
23 |
* WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR |
24 |
* PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR |
25 |
* CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, |
26 |
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT |
27 |
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF |
28 |
* USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED |
29 |
* AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT |
30 |
* LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN |
31 |
* ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE |
32 |
* POSSIBILITY OF SUCH DAMAGE. |
33 |
* |
34 |
* The views and conclusions contained in the software and |
35 |
* documentation are those of the authors and should not be |
36 |
* interpreted as representing official policies, either expressed |
37 |
* or implied, of GRNET S.A. |
38 |
* </copyright> |
39 |
* ----------------------------------------------------------------------- |
40 |
*/ |
41 |
#endregion |
42 |
using System.Collections.Concurrent; |
43 |
using System.Diagnostics.Contracts; |
44 |
using System.IO; |
45 |
using System.Reflection; |
46 |
using System.Security.Cryptography; |
47 |
using System.ServiceModel.Channels; |
48 |
using System.Threading; |
49 |
using System.Threading.Tasks; |
50 |
using System.Threading.Tasks.Dataflow; |
51 |
|
52 |
|
53 |
namespace Pithos.Network |
54 |
{ |
55 |
using System; |
56 |
|
57 |
/// <summary> |
58 |
/// TODO: Update summary. |
59 |
/// </summary> |
60 |
public static class BlockHashAlgorithms |
61 |
{ |
62 |
private static readonly log4net.ILog Log = log4net.LogManager.GetLogger(MethodBase.GetCurrentMethod().DeclaringType); |
63 |
|
64 |
/* |
65 |
public static Func<FileStream, int, string, ConcurrentDictionary<int, byte[]>, int, Task<ConcurrentDictionary<int, byte[]>>> CalculateBlockHash; |
66 |
|
67 |
public static Task<ConcurrentDictionary<int, byte[]>> CalculateBlockHashesRecursiveAsync(FileStream stream, int blockSize, string algorithm, ConcurrentDictionary<int, byte[]> hashes = null, int index = 0) |
68 |
{ |
69 |
if (stream == null) |
70 |
throw new ArgumentNullException("stream"); |
71 |
if (String.IsNullOrWhiteSpace(algorithm)) |
72 |
throw new ArgumentNullException("algorithm"); |
73 |
if (blockSize <= 0) |
74 |
throw new ArgumentOutOfRangeException("blockSize", "blockSize must be a value greater than zero "); |
75 |
if (index < 0) |
76 |
throw new ArgumentOutOfRangeException("index", "index must be a non-negative value"); |
77 |
Contract.EndContractBlock(); |
78 |
|
79 |
|
80 |
if (hashes == null) |
81 |
hashes = new ConcurrentDictionary<int, byte[]>(); |
82 |
|
83 |
var buffer = new byte[blockSize]; |
84 |
return stream.ReadAsync(buffer, 0, blockSize).ContinueWith(t => |
85 |
{ |
86 |
var read = t.Result; |
87 |
|
88 |
var nextTask = read == blockSize |
89 |
? CalculateBlockHashesRecursiveAsync(stream, blockSize, algorithm, hashes, index + 1) |
90 |
: Task.Factory.StartNew(() => hashes); |
91 |
if (read > 0) |
92 |
using (var hasher = HashAlgorithm.Create(algorithm)) |
93 |
{ |
94 |
//This code was added for compatibility with the way Pithos calculates the last hash |
95 |
//We calculate the hash only up to the last non-null byte |
96 |
var lastByteIndex = Array.FindLastIndex(buffer, read - 1, aByte => aByte != 0); |
97 |
|
98 |
var hash = hasher.ComputeHash(buffer, 0, lastByteIndex + 1); |
99 |
hashes[index] = hash; |
100 |
} |
101 |
return nextTask; |
102 |
}).Unwrap(); |
103 |
} |
104 |
public static async Task<ConcurrentDictionary<int, byte[]>> CalculateBlockHashesAsync(FileStream stream, int blockSize, string algorithm, ConcurrentDictionary<int, byte[]> hashes = null, int index = 0) |
105 |
{ |
106 |
if (stream == null) |
107 |
throw new ArgumentNullException("stream"); |
108 |
if (String.IsNullOrWhiteSpace(algorithm)) |
109 |
throw new ArgumentNullException("algorithm"); |
110 |
if (blockSize <= 0) |
111 |
throw new ArgumentOutOfRangeException("blockSize", "blockSize must be a value greater than zero "); |
112 |
Contract.EndContractBlock(); |
113 |
|
114 |
|
115 |
if (hashes == null) |
116 |
hashes = new ConcurrentDictionary<int, byte[]>(); |
117 |
|
118 |
var buffer = new byte[blockSize]; |
119 |
int read; |
120 |
while ((read = await stream.ReadAsync(buffer, 0, blockSize)) > 0) |
121 |
{ |
122 |
using (var hasher = HashAlgorithm.Create(algorithm)) |
123 |
{ |
124 |
//This code was added for compatibility with the way Pithos calculates the last hash |
125 |
//We calculate the hash only up to the last non-null byte |
126 |
var lastByteIndex = Array.FindLastIndex(buffer, read - 1, aByte => aByte != 0); |
127 |
|
128 |
var hash = hasher.ComputeHash(buffer, 0, lastByteIndex + 1); |
129 |
hashes[index] = hash; |
130 |
} |
131 |
index += read; |
132 |
} |
133 |
return hashes; |
134 |
} |
135 |
|
136 |
public static async Task<ConcurrentDictionary<int, byte[]>> CalculateBlockHashesAgentAsync(FileStream stream, int blockSize, string algorithm, int parallelism) |
137 |
{ |
138 |
if (stream == null) |
139 |
throw new ArgumentNullException("stream"); |
140 |
if (String.IsNullOrWhiteSpace(algorithm)) |
141 |
throw new ArgumentNullException("algorithm"); |
142 |
if (blockSize <= 0) |
143 |
throw new ArgumentOutOfRangeException("blockSize", "blockSize must be a value greater than zero "); |
144 |
Contract.EndContractBlock(); |
145 |
|
146 |
var hashes = new ConcurrentDictionary<int, byte[]>(); |
147 |
|
148 |
var path = stream.Name; |
149 |
var size = stream.Length; |
150 |
Log.DebugFormat("Hashing [{0}] size [{1}]",path,size); |
151 |
|
152 |
var options = new ExecutionDataflowBlockOptions {BoundedCapacity = parallelism,MaxDegreeOfParallelism=parallelism}; |
153 |
var hashBlock=new ActionBlock<Tuple<int,byte[]>>(input=> |
154 |
{ |
155 |
int idx = input.Item1; |
156 |
byte[] block = input.Item2; |
157 |
using (var hasher = HashAlgorithm.Create(algorithm)) |
158 |
{ |
159 |
//This code was added for compatibility with the way Pithos calculates the last hash |
160 |
//We calculate the hash only up to the last non-null byte |
161 |
var lastByteIndex = Array.FindLastIndex(block, block.Length-1, aByte => aByte != 0); |
162 |
|
163 |
var hash = hasher.ComputeHash(block, 0, lastByteIndex + 1); |
164 |
hashes[idx] = hash; |
165 |
} |
166 |
},options); |
167 |
|
168 |
var buffer = new byte[blockSize]; |
169 |
int read; |
170 |
int index = 0; |
171 |
while ((read = await stream.ReadAsync(buffer, 0, blockSize)) > 0) |
172 |
{ |
173 |
var block = new byte[read]; |
174 |
Buffer.BlockCopy(buffer, 0, block, 0, read); |
175 |
await hashBlock.SendAsync(Tuple.Create(index, block)); |
176 |
index += read; |
177 |
} |
178 |
|
179 |
|
180 |
hashBlock.Complete(); |
181 |
await hashBlock.Completion; |
182 |
|
183 |
return hashes; |
184 |
} |
185 |
|
186 |
public static async Task<ConcurrentDictionary<int, byte[]>> CalculateBlockHashesInPlace(FileStream stream, int blockSize, string algorithm, int parallelism) |
187 |
{ |
188 |
if (stream == null) |
189 |
throw new ArgumentNullException("stream"); |
190 |
if (String.IsNullOrWhiteSpace(algorithm)) |
191 |
throw new ArgumentNullException("algorithm"); |
192 |
if (blockSize <= 0) |
193 |
throw new ArgumentOutOfRangeException("blockSize", "blockSize must be a value greater than zero "); |
194 |
Contract.EndContractBlock(); |
195 |
|
196 |
var hashes = new ConcurrentDictionary<int, byte[]>(); |
197 |
|
198 |
var path = stream.Name; |
199 |
var size = stream.Length; |
200 |
Log.DebugFormat("Hashing [{0}] size [{1}]",path,size); |
201 |
|
202 |
|
203 |
var buffer = new byte[blockSize]; |
204 |
var index = 0; |
205 |
using (var hasher = HashAlgorithm.Create(algorithm)) |
206 |
{ |
207 |
int read; |
208 |
while ((read = await stream.ReadAsync(buffer, 0, blockSize)) > 0) |
209 |
{ |
210 |
//This code was added for compatibility with the way Pithos calculates the last hash |
211 |
//We calculate the hash only up to the last non-null byte |
212 |
var lastByteIndex = Array.FindLastIndex(buffer, read - 1, aByte => aByte != 0); |
213 |
|
214 |
var hash = hasher.ComputeHash(buffer, 0, lastByteIndex + 1); |
215 |
Log.DebugFormat("Hashed [{0}] [{1}/{2}] [{3:p}]", path, index,size,(double)index/size); |
216 |
hashes[index] = hash; |
217 |
index += read; |
218 |
} |
219 |
} |
220 |
return hashes; |
221 |
} |
222 |
*/ |
223 |
|
224 |
private static IBufferPool _bufferMgr; |
225 |
|
226 |
private static readonly object _syncRoot = new object(); |
227 |
|
228 |
|
229 |
private static IBufferPool GetBufferManager(int blockSize,int parallelism) |
230 |
{ |
231 |
lock (_syncRoot) |
232 |
{ |
233 |
if (_bufferMgr == null) |
234 |
_bufferMgr = new BufferPool(parallelism*blockSize, blockSize); |
235 |
} |
236 |
|
237 |
return _bufferMgr; |
238 |
} |
239 |
|
240 |
//public static async Task<ConcurrentDictionary<long, byte[]>> CalculateBlockHashesInPlacePFor(FileStream stream, int blockSize, string algorithm, int parallelism, Action<long, byte[], int> postAction,CancellationToken token, IProgress<double> progress) |
241 |
public static async Task<ConcurrentDictionary<long, byte[]>> CalculateBlockHashesInPlacePFor(FileStream stream, int blockSize, string algorithm, byte parallelism, CancellationToken token, IProgress<HashProgress> progress) |
242 |
{ |
243 |
if (stream == null) |
244 |
throw new ArgumentNullException("stream"); |
245 |
if (String.IsNullOrWhiteSpace(algorithm)) |
246 |
throw new ArgumentNullException("algorithm"); |
247 |
if (blockSize <= 0) |
248 |
throw new ArgumentOutOfRangeException("blockSize", "blockSize must be a value greater than zero "); |
249 |
Contract.EndContractBlock(); |
250 |
|
251 |
var hashes = new ConcurrentDictionary<long, byte[]>(); |
252 |
|
253 |
var path = stream.Name; |
254 |
var size = stream.Length; |
255 |
Log.DebugFormat("Hashing [{0}] size [{1}]", path, size); |
256 |
|
257 |
//TODO: Handle zero-length files |
258 |
if (size == 0) |
259 |
{ |
260 |
var buf = new byte[0]; |
261 |
using (var hasher = HashAlgorithm.Create(algorithm)) |
262 |
{ |
263 |
hashes[0] = hasher.ComputeHash(buf); |
264 |
return hashes; |
265 |
} |
266 |
} |
267 |
|
268 |
var blocks = size<blockSize?1:size/blockSize; |
269 |
|
270 |
var actualHashers = parallelism > blocks ? (byte)blocks : parallelism; |
271 |
|
272 |
var buffer = new byte[actualHashers][]; |
273 |
var hashers = new HashAlgorithm[actualHashers]; |
274 |
var bufferManager = GetBufferManager(blockSize, actualHashers); |
275 |
try |
276 |
{ |
277 |
for (var i = 0; i < actualHashers; i++) |
278 |
{ |
279 |
buffer[i] = bufferManager.TakeBuffer(blockSize);// new byte[blockSize]; |
280 |
hashers[i] = HashAlgorithm.Create(algorithm); |
281 |
} |
282 |
|
283 |
var indices = new long[actualHashers]; |
284 |
var bufferCount = new int[actualHashers]; |
285 |
|
286 |
int read; |
287 |
int bufIdx = 0; |
288 |
long index = 0; |
289 |
|
290 |
//long block = 0; |
291 |
|
292 |
while ((read = await stream.ReadAsync(buffer[bufIdx], 0, blockSize).ConfigureAwait(false)) > 0) |
293 |
{ |
294 |
index += read; |
295 |
indices[bufIdx] = index; |
296 |
bufferCount[bufIdx] = read; |
297 |
//postAction(block++, buffer[bufIdx], read); |
298 |
|
299 |
//If we have filled the last buffer or if we have read from the last block, |
300 |
//we can calculate the clocks in parallel |
301 |
if (bufIdx == actualHashers - 1 || read < blockSize) |
302 |
{ |
303 |
var options = new ParallelOptions {MaxDegreeOfParallelism = actualHashers,CancellationToken=token}; |
304 |
|
305 |
Parallel.For(0, bufIdx + 1, options,(idx,state) => |
306 |
{ |
307 |
//This code was added for compatibility with the way Pithos calculates the last hash |
308 |
//We calculate the hash only up to the last non-null byte |
309 |
options.CancellationToken.ThrowIfCancellationRequested(); |
310 |
var lastByteIndex = Array.FindLastIndex(buffer[idx], |
311 |
bufferCount[idx] - 1, |
312 |
aByte => aByte != 0); |
313 |
|
314 |
var hasher = hashers[idx]; |
315 |
|
316 |
byte[] hash=hasher.ComputeHash(buffer[idx],0,lastByteIndex+1); |
317 |
/* |
318 |
if (buffer[idx].Length == lastByteIndex || lastByteIndex==-1) |
319 |
hash = hasher.Digest(buffer[idx]); |
320 |
else |
321 |
{ |
322 |
var buf=new byte[lastByteIndex+1]; |
323 |
Buffer.BlockCopy(buffer[idx],0,buf,0,lastByteIndex+1); |
324 |
hash = hasher.Digest(buf); |
325 |
} |
326 |
*/ |
327 |
|
328 |
|
329 |
|
330 |
var filePosition = indices[idx]; |
331 |
/* |
332 |
Trace.TraceInformation("Hashed [{0}] [{1}/{2}] [{3:p}]", path, |
333 |
filePosition, size, |
334 |
(double)filePosition / size); |
335 |
*/ |
336 |
hashes[filePosition] = hash; |
337 |
//Do not report for files smaller than 4MB |
338 |
if (progress != null && stream.Length > 4*1024*1024) |
339 |
progress.Report(new HashProgress((long)hashes.Count*blockSize,stream.Length)); |
340 |
}); |
341 |
} |
342 |
bufIdx = (bufIdx +1)%actualHashers; |
343 |
} |
344 |
} |
345 |
finally |
346 |
{ |
347 |
for (var i = 0; i < actualHashers; i++) |
348 |
{ |
349 |
if (hashers[i] != null) |
350 |
hashers[i].Dispose(); |
351 |
bufferManager.ReturnBuffer(buffer[i]); |
352 |
} |
353 |
|
354 |
} |
355 |
|
356 |
return hashes; |
357 |
} |
358 |
|
359 |
static BlockHashAlgorithms() |
360 |
{ |
361 |
/* |
362 |
CalculateBlockHash = CalculateBlockHashesRecursiveAsync; |
363 |
*/ |
364 |
} |
365 |
} |
366 |
} |