Revision 9d2c0fc0 trunk/Pithos.Network/BlockHashAlgorithms.cs
b/trunk/Pithos.Network/BlockHashAlgorithms.cs | ||
---|---|---|
50 | 50 |
namespace Pithos.Network |
51 | 51 |
{ |
52 | 52 |
using System; |
53 |
using System.Collections.Generic; |
|
54 |
using System.Linq; |
|
55 |
using System.Text; |
|
56 | 53 |
|
57 | 54 |
/// <summary> |
58 | 55 |
/// TODO: Update summary. |
... | ... | |
61 | 58 |
{ |
62 | 59 |
private static readonly log4net.ILog Log = log4net.LogManager.GetLogger(MethodBase.GetCurrentMethod().DeclaringType); |
63 | 60 |
|
61 |
/* |
|
64 | 62 |
public static Func<FileStream, int, string, ConcurrentDictionary<int, byte[]>, int, Task<ConcurrentDictionary<int, byte[]>>> CalculateBlockHash; |
65 | 63 |
|
66 | 64 |
public static Task<ConcurrentDictionary<int, byte[]>> CalculateBlockHashesRecursiveAsync(FileStream stream, int blockSize, string algorithm, ConcurrentDictionary<int, byte[]> hashes = null, int index = 0) |
... | ... | |
118 | 116 |
int read; |
119 | 117 |
while ((read = await stream.ReadAsync(buffer, 0, blockSize)) > 0) |
120 | 118 |
{ |
121 |
//TODO: identify the value of index |
|
122 |
|
|
123 | 119 |
using (var hasher = HashAlgorithm.Create(algorithm)) |
124 | 120 |
{ |
125 | 121 |
//This code was added for compatibility with the way Pithos calculates the last hash |
... | ... | |
130 | 126 |
hashes[index] = hash; |
131 | 127 |
} |
132 | 128 |
index += read; |
133 |
};
|
|
129 |
} |
|
134 | 130 |
return hashes; |
135 | 131 |
} |
136 | 132 |
|
... | ... | |
150 | 146 |
var size = stream.Length; |
151 | 147 |
Log.DebugFormat("Hashing [{0}] size [{1}]",path,size); |
152 | 148 |
|
153 |
/* |
|
154 | 149 |
var options = new ExecutionDataflowBlockOptions {BoundedCapacity = parallelism,MaxDegreeOfParallelism=parallelism}; |
155 | 150 |
var hashBlock=new ActionBlock<Tuple<int,byte[]>>(input=> |
156 | 151 |
{ |
... | ... | |
166 | 161 |
hashes[idx] = hash; |
167 | 162 |
} |
168 | 163 |
},options); |
169 |
*/ |
|
170 | 164 |
|
171 | 165 |
var buffer = new byte[blockSize]; |
172 | 166 |
int read; |
173 | 167 |
int index = 0; |
168 |
while ((read = await stream.ReadAsync(buffer, 0, blockSize)) > 0) |
|
169 |
{ |
|
170 |
var block = new byte[read]; |
|
171 |
Buffer.BlockCopy(buffer, 0, block, 0, read); |
|
172 |
await hashBlock.SendAsync(Tuple.Create(index, block)); |
|
173 |
index += read; |
|
174 |
} |
|
175 |
|
|
176 |
|
|
177 |
hashBlock.Complete(); |
|
178 |
await hashBlock.Completion; |
|
179 |
|
|
180 |
return hashes; |
|
181 |
} |
|
182 |
|
|
183 |
public static async Task<ConcurrentDictionary<int, byte[]>> CalculateBlockHashesInPlace(FileStream stream, int blockSize, string algorithm, int parallelism) |
|
184 |
{ |
|
185 |
if (stream == null) |
|
186 |
throw new ArgumentNullException("stream"); |
|
187 |
if (String.IsNullOrWhiteSpace(algorithm)) |
|
188 |
throw new ArgumentNullException("algorithm"); |
|
189 |
if (blockSize <= 0) |
|
190 |
throw new ArgumentOutOfRangeException("blockSize", "blockSize must be a value greater than zero "); |
|
191 |
Contract.EndContractBlock(); |
|
192 |
|
|
193 |
var hashes = new ConcurrentDictionary<int, byte[]>(); |
|
194 |
|
|
195 |
var path = stream.Name; |
|
196 |
var size = stream.Length; |
|
197 |
Log.DebugFormat("Hashing [{0}] size [{1}]",path,size); |
|
198 |
|
|
199 |
|
|
200 |
var buffer = new byte[blockSize]; |
|
201 |
var index = 0; |
|
174 | 202 |
using (var hasher = HashAlgorithm.Create(algorithm)) |
175 | 203 |
{ |
204 |
int read; |
|
176 | 205 |
while ((read = await stream.ReadAsync(buffer, 0, blockSize)) > 0) |
177 | 206 |
{ |
178 |
// var block = new byte[read]; |
|
179 |
|
|
180 | 207 |
//This code was added for compatibility with the way Pithos calculates the last hash |
181 | 208 |
//We calculate the hash only up to the last non-null byte |
182 | 209 |
var lastByteIndex = Array.FindLastIndex(buffer, read - 1, aByte => aByte != 0); |
... | ... | |
186 | 213 |
hashes[index] = hash; |
187 | 214 |
index += read; |
188 | 215 |
} |
216 |
} |
|
217 |
return hashes; |
|
218 |
} |
|
219 |
*/ |
|
220 |
|
|
221 |
public static async Task<ConcurrentDictionary<long, byte[]>> CalculateBlockHashesInPlacePFor(FileStream stream, int blockSize, string algorithm, int parallelism) |
|
222 |
{ |
|
223 |
if (stream == null) |
|
224 |
throw new ArgumentNullException("stream"); |
|
225 |
if (String.IsNullOrWhiteSpace(algorithm)) |
|
226 |
throw new ArgumentNullException("algorithm"); |
|
227 |
if (blockSize <= 0) |
|
228 |
throw new ArgumentOutOfRangeException("blockSize", "blockSize must be a value greater than zero "); |
|
229 |
Contract.EndContractBlock(); |
|
230 |
|
|
231 |
var hashes = new ConcurrentDictionary<long, byte[]>(); |
|
232 |
|
|
233 |
var path = stream.Name; |
|
234 |
var size = stream.Length; |
|
235 |
Log.DebugFormat("Hashing [{0}] size [{1}]", path, size); |
|
189 | 236 |
|
190 |
/* |
|
191 |
Buffer.BlockCopy(buffer,0,block,0,read); |
|
192 |
await hashBlock.SendAsync(Tuple.Create(index, block)); |
|
193 |
*/ |
|
194 |
|
|
237 |
|
|
238 |
var buffer = new byte[parallelism][]; |
|
239 |
var hashers = new HashAlgorithm[parallelism]; |
|
240 |
for (var i = 0; i < parallelism; i++) |
|
241 |
{ |
|
242 |
buffer[i] = new byte[blockSize]; |
|
243 |
hashers[i] = HashAlgorithm.Create(algorithm); |
|
195 | 244 |
} |
196 |
|
|
245 |
try |
|
246 |
{ |
|
247 |
var indices = new long[parallelism]; |
|
248 |
var bufferCount = new int[parallelism]; |
|
197 | 249 |
|
198 |
/* |
|
199 |
hashBlock.Complete(); |
|
200 |
await hashBlock.Completion; |
|
201 |
*/ |
|
250 |
int read; |
|
251 |
int bufIdx = 0; |
|
252 |
long index = 0; |
|
253 |
while ((read = await stream.ReadAsync(buffer[bufIdx], 0, blockSize)) > 0) |
|
254 |
{ |
|
255 |
index += read; |
|
256 |
indices[bufIdx] = index; |
|
257 |
bufferCount[bufIdx] = read; |
|
258 |
//If we have filled the last buffer or if we have read from the last block, |
|
259 |
//we can calculate the clocks in parallel |
|
260 |
if (bufIdx == parallelism - 1 || read < blockSize) |
|
261 |
{ |
|
262 |
//var options = new ParallelOptions {MaxDegreeOfParallelism = parallelism}; |
|
263 |
Parallel.For(0, bufIdx + 1, idx => |
|
264 |
{ |
|
265 |
//This code was added for compatibility with the way Pithos calculates the last hash |
|
266 |
//We calculate the hash only up to the last non-null byte |
|
267 |
var lastByteIndex = Array.FindLastIndex(buffer[idx], |
|
268 |
bufferCount[idx] - 1, |
|
269 |
aByte => aByte != 0); |
|
270 |
|
|
271 |
var hasher = hashers[idx]; |
|
272 |
var hash = hasher.ComputeHash(buffer[idx], 0, lastByteIndex + 1); |
|
273 |
var filePosition = indices[idx]; |
|
274 |
/* |
|
275 |
Trace.TraceInformation("Hashed [{0}] [{1}/{2}] [{3:p}]", path, |
|
276 |
filePosition, size, |
|
277 |
(double)filePosition / size); |
|
278 |
*/ |
|
279 |
hashes[filePosition] = hash; |
|
280 |
}); |
|
281 |
} |
|
282 |
bufIdx = (bufIdx + 1) % parallelism; |
|
283 |
} |
|
284 |
} |
|
285 |
finally |
|
286 |
{ |
|
287 |
for (var i = 0; i < parallelism; i++) |
|
288 |
{ |
|
289 |
if (hashers[i] != null) |
|
290 |
hashers[i].Dispose(); |
|
291 |
} |
|
292 |
|
|
293 |
} |
|
202 | 294 |
|
203 | 295 |
return hashes; |
204 | 296 |
} |
205 | 297 |
|
206 | 298 |
static BlockHashAlgorithms() |
207 | 299 |
{ |
300 |
/* |
|
208 | 301 |
CalculateBlockHash = CalculateBlockHashesRecursiveAsync; |
302 |
*/ |
|
209 | 303 |
} |
210 | 304 |
} |
211 | 305 |
} |
Also available in: Unified diff