root / trunk / Pithos.Network / BlockHashAlgorithms.cs @ 6bcdd8e2
History | View | Annotate | Download (9.3 kB)
1 |
#region |
---|---|
2 |
/* ----------------------------------------------------------------------- |
3 |
* <copyright file="BlockHashAlgorithms.cs" company="GRNet"> |
4 |
* |
5 |
* Copyright 2011-2012 GRNET S.A. All rights reserved. |
6 |
* |
7 |
* Redistribution and use in source and binary forms, with or |
8 |
* without modification, are permitted provided that the following |
9 |
* conditions are met: |
10 |
* |
11 |
* 1. Redistributions of source code must retain the above |
12 |
* copyright notice, this list of conditions and the following |
13 |
* disclaimer. |
14 |
* |
15 |
* 2. Redistributions in binary form must reproduce the above |
16 |
* copyright notice, this list of conditions and the following |
17 |
* disclaimer in the documentation and/or other materials |
18 |
* provided with the distribution. |
19 |
* |
20 |
* |
21 |
* THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS |
22 |
* OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED |
23 |
* WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR |
24 |
* PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR |
25 |
* CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, |
26 |
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT |
27 |
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF |
28 |
* USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED |
29 |
* AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT |
30 |
* LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN |
31 |
* ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE |
32 |
* POSSIBILITY OF SUCH DAMAGE. |
33 |
* |
34 |
* The views and conclusions contained in the software and |
35 |
* documentation are those of the authors and should not be |
36 |
* interpreted as representing official policies, either expressed |
37 |
* or implied, of GRNET S.A. |
38 |
* </copyright> |
39 |
* ----------------------------------------------------------------------- |
40 |
*/ |
41 |
#endregion |
42 |
using System.Collections.Concurrent; |
43 |
using System.Diagnostics.Contracts; |
44 |
using System.IO; |
45 |
using System.Reflection; |
46 |
using System.Security.Cryptography; |
47 |
using System.Threading.Tasks; |
48 |
using System.Threading.Tasks.Dataflow; |
49 |
|
50 |
namespace Pithos.Network |
51 |
{ |
52 |
using System; |
53 |
using System.Collections.Generic; |
54 |
using System.Linq; |
55 |
using System.Text; |
56 |
|
57 |
/// <summary> |
58 |
/// TODO: Update summary. |
59 |
/// </summary> |
60 |
public static class BlockHashAlgorithms |
61 |
{ |
62 |
private static readonly log4net.ILog Log = log4net.LogManager.GetLogger(MethodBase.GetCurrentMethod().DeclaringType); |
63 |
|
64 |
public static Func<FileStream, int, string, ConcurrentDictionary<int, byte[]>, int, Task<ConcurrentDictionary<int, byte[]>>> CalculateBlockHash; |
65 |
|
66 |
public static Task<ConcurrentDictionary<int, byte[]>> CalculateBlockHashesRecursiveAsync(FileStream stream, int blockSize, string algorithm, ConcurrentDictionary<int, byte[]> hashes = null, int index = 0) |
67 |
{ |
68 |
if (stream == null) |
69 |
throw new ArgumentNullException("stream"); |
70 |
if (String.IsNullOrWhiteSpace(algorithm)) |
71 |
throw new ArgumentNullException("algorithm"); |
72 |
if (blockSize <= 0) |
73 |
throw new ArgumentOutOfRangeException("blockSize", "blockSize must be a value greater than zero "); |
74 |
if (index < 0) |
75 |
throw new ArgumentOutOfRangeException("index", "index must be a non-negative value"); |
76 |
Contract.EndContractBlock(); |
77 |
|
78 |
|
79 |
if (hashes == null) |
80 |
hashes = new ConcurrentDictionary<int, byte[]>(); |
81 |
|
82 |
var buffer = new byte[blockSize]; |
83 |
return stream.ReadAsync(buffer, 0, blockSize).ContinueWith(t => |
84 |
{ |
85 |
var read = t.Result; |
86 |
|
87 |
var nextTask = read == blockSize |
88 |
? CalculateBlockHashesRecursiveAsync(stream, blockSize, algorithm, hashes, index + 1) |
89 |
: Task.Factory.StartNew(() => hashes); |
90 |
if (read > 0) |
91 |
using (var hasher = HashAlgorithm.Create(algorithm)) |
92 |
{ |
93 |
//This code was added for compatibility with the way Pithos calculates the last hash |
94 |
//We calculate the hash only up to the last non-null byte |
95 |
var lastByteIndex = Array.FindLastIndex(buffer, read - 1, aByte => aByte != 0); |
96 |
|
97 |
var hash = hasher.ComputeHash(buffer, 0, lastByteIndex + 1); |
98 |
hashes[index] = hash; |
99 |
} |
100 |
return nextTask; |
101 |
}).Unwrap(); |
102 |
} |
103 |
public static async Task<ConcurrentDictionary<int, byte[]>> CalculateBlockHashesAsync(FileStream stream, int blockSize, string algorithm, ConcurrentDictionary<int, byte[]> hashes = null, int index = 0) |
104 |
{ |
105 |
if (stream == null) |
106 |
throw new ArgumentNullException("stream"); |
107 |
if (String.IsNullOrWhiteSpace(algorithm)) |
108 |
throw new ArgumentNullException("algorithm"); |
109 |
if (blockSize <= 0) |
110 |
throw new ArgumentOutOfRangeException("blockSize", "blockSize must be a value greater than zero "); |
111 |
Contract.EndContractBlock(); |
112 |
|
113 |
|
114 |
if (hashes == null) |
115 |
hashes = new ConcurrentDictionary<int, byte[]>(); |
116 |
|
117 |
var buffer = new byte[blockSize]; |
118 |
int read; |
119 |
while ((read = await stream.ReadAsync(buffer, 0, blockSize)) > 0) |
120 |
{ |
121 |
//TODO: identify the value of index |
122 |
|
123 |
using (var hasher = HashAlgorithm.Create(algorithm)) |
124 |
{ |
125 |
//This code was added for compatibility with the way Pithos calculates the last hash |
126 |
//We calculate the hash only up to the last non-null byte |
127 |
var lastByteIndex = Array.FindLastIndex(buffer, read - 1, aByte => aByte != 0); |
128 |
|
129 |
var hash = hasher.ComputeHash(buffer, 0, lastByteIndex + 1); |
130 |
hashes[index] = hash; |
131 |
} |
132 |
index += read; |
133 |
}; |
134 |
return hashes; |
135 |
} |
136 |
|
137 |
public static async Task<ConcurrentDictionary<int, byte[]>> CalculateBlockHashesAgentAsync(FileStream stream, int blockSize, string algorithm, int parallelism) |
138 |
{ |
139 |
if (stream == null) |
140 |
throw new ArgumentNullException("stream"); |
141 |
if (String.IsNullOrWhiteSpace(algorithm)) |
142 |
throw new ArgumentNullException("algorithm"); |
143 |
if (blockSize <= 0) |
144 |
throw new ArgumentOutOfRangeException("blockSize", "blockSize must be a value greater than zero "); |
145 |
Contract.EndContractBlock(); |
146 |
|
147 |
var hashes = new ConcurrentDictionary<int, byte[]>(); |
148 |
|
149 |
var path = stream.Name; |
150 |
var size = stream.Length; |
151 |
Log.DebugFormat("Hashing [{0}] size [{1}]",path,size); |
152 |
|
153 |
/* |
154 |
var options = new ExecutionDataflowBlockOptions {BoundedCapacity = parallelism,MaxDegreeOfParallelism=parallelism}; |
155 |
var hashBlock=new ActionBlock<Tuple<int,byte[]>>(input=> |
156 |
{ |
157 |
int idx = input.Item1; |
158 |
byte[] block = input.Item2; |
159 |
using (var hasher = HashAlgorithm.Create(algorithm)) |
160 |
{ |
161 |
//This code was added for compatibility with the way Pithos calculates the last hash |
162 |
//We calculate the hash only up to the last non-null byte |
163 |
var lastByteIndex = Array.FindLastIndex(block, block.Length-1, aByte => aByte != 0); |
164 |
|
165 |
var hash = hasher.ComputeHash(block, 0, lastByteIndex + 1); |
166 |
hashes[idx] = hash; |
167 |
} |
168 |
},options); |
169 |
*/ |
170 |
|
171 |
var buffer = new byte[blockSize]; |
172 |
int read; |
173 |
int index = 0; |
174 |
using (var hasher = HashAlgorithm.Create(algorithm)) |
175 |
{ |
176 |
while ((read = await stream.ReadAsync(buffer, 0, blockSize)) > 0) |
177 |
{ |
178 |
// var block = new byte[read]; |
179 |
|
180 |
//This code was added for compatibility with the way Pithos calculates the last hash |
181 |
//We calculate the hash only up to the last non-null byte |
182 |
var lastByteIndex = Array.FindLastIndex(buffer, read - 1, aByte => aByte != 0); |
183 |
|
184 |
var hash = hasher.ComputeHash(buffer, 0, lastByteIndex + 1); |
185 |
Log.DebugFormat("Hashed [{0}] [{1}/{2}] [{3:p}]", path, index,size,(double)index/size); |
186 |
hashes[index] = hash; |
187 |
index += read; |
188 |
} |
189 |
|
190 |
/* |
191 |
Buffer.BlockCopy(buffer,0,block,0,read); |
192 |
await hashBlock.SendAsync(Tuple.Create(index, block)); |
193 |
*/ |
194 |
|
195 |
} |
196 |
|
197 |
|
198 |
/* |
199 |
hashBlock.Complete(); |
200 |
await hashBlock.Completion; |
201 |
*/ |
202 |
|
203 |
return hashes; |
204 |
} |
205 |
|
206 |
static BlockHashAlgorithms() |
207 |
{ |
208 |
CalculateBlockHash = CalculateBlockHashesRecursiveAsync; |
209 |
} |
210 |
} |
211 |
} |