root / trunk / Pithos.Network / BlockHashAlgorithms.cs @ 422c9598
History | View | Annotate | Download (6.7 kB)
1 |
// ----------------------------------------------------------------------- |
---|---|
2 |
// <copyright file="BlockHashAlgorithms.cs" company="Microsoft"> |
3 |
// TODO: Update copyright text. |
4 |
// </copyright> |
5 |
// ----------------------------------------------------------------------- |
6 |
|
7 |
using System.Collections.Concurrent; |
8 |
using System.Diagnostics.Contracts; |
9 |
using System.IO; |
10 |
using System.Security.Cryptography; |
11 |
using System.Threading.Tasks; |
12 |
using System.Threading.Tasks.Dataflow; |
13 |
|
14 |
namespace Pithos.Network |
15 |
{ |
16 |
using System; |
17 |
using System.Collections.Generic; |
18 |
using System.Linq; |
19 |
using System.Text; |
20 |
|
21 |
/// <summary> |
22 |
/// TODO: Update summary. |
23 |
/// </summary> |
24 |
public static class BlockHashAlgorithms |
25 |
{ |
26 |
public static Func<FileStream, int, string, ConcurrentDictionary<int, byte[]>, int, Task<ConcurrentDictionary<int, byte[]>>> CalculateBlockHash; |
27 |
|
28 |
public static Task<ConcurrentDictionary<int, byte[]>> CalculateBlockHashesRecursiveAsync(FileStream stream, int blockSize, string algorithm, ConcurrentDictionary<int, byte[]> hashes = null, int index = 0) |
29 |
{ |
30 |
if (stream == null) |
31 |
throw new ArgumentNullException("stream"); |
32 |
if (String.IsNullOrWhiteSpace(algorithm)) |
33 |
throw new ArgumentNullException("algorithm"); |
34 |
if (blockSize <= 0) |
35 |
throw new ArgumentOutOfRangeException("blockSize", "blockSize must be a value greater than zero "); |
36 |
if (index < 0) |
37 |
throw new ArgumentOutOfRangeException("index", "index must be a non-negative value"); |
38 |
Contract.EndContractBlock(); |
39 |
|
40 |
|
41 |
if (hashes == null) |
42 |
hashes = new ConcurrentDictionary<int, byte[]>(); |
43 |
|
44 |
var buffer = new byte[blockSize]; |
45 |
return stream.ReadAsync(buffer, 0, blockSize).ContinueWith(t => |
46 |
{ |
47 |
var read = t.Result; |
48 |
|
49 |
var nextTask = read == blockSize |
50 |
? CalculateBlockHashesRecursiveAsync(stream, blockSize, algorithm, hashes, index + 1) |
51 |
: Task.Factory.StartNew(() => hashes); |
52 |
if (read > 0) |
53 |
using (var hasher = HashAlgorithm.Create(algorithm)) |
54 |
{ |
55 |
//This code was added for compatibility with the way Pithos calculates the last hash |
56 |
//We calculate the hash only up to the last non-null byte |
57 |
var lastByteIndex = Array.FindLastIndex(buffer, read - 1, aByte => aByte != 0); |
58 |
|
59 |
var hash = hasher.ComputeHash(buffer, 0, lastByteIndex + 1); |
60 |
hashes[index] = hash; |
61 |
} |
62 |
return nextTask; |
63 |
}).Unwrap(); |
64 |
} |
65 |
public static async Task<ConcurrentDictionary<int, byte[]>> CalculateBlockHashesAsync(FileStream stream, int blockSize, string algorithm, ConcurrentDictionary<int, byte[]> hashes = null, int index = 0) |
66 |
{ |
67 |
if (stream == null) |
68 |
throw new ArgumentNullException("stream"); |
69 |
if (String.IsNullOrWhiteSpace(algorithm)) |
70 |
throw new ArgumentNullException("algorithm"); |
71 |
if (blockSize <= 0) |
72 |
throw new ArgumentOutOfRangeException("blockSize", "blockSize must be a value greater than zero "); |
73 |
Contract.EndContractBlock(); |
74 |
|
75 |
|
76 |
if (hashes == null) |
77 |
hashes = new ConcurrentDictionary<int, byte[]>(); |
78 |
|
79 |
var buffer = new byte[blockSize]; |
80 |
int read; |
81 |
while ((read = await stream.ReadAsync(buffer, 0, blockSize)) > 0) |
82 |
{ |
83 |
//TODO: identify the value of index |
84 |
|
85 |
using (var hasher = HashAlgorithm.Create(algorithm)) |
86 |
{ |
87 |
//This code was added for compatibility with the way Pithos calculates the last hash |
88 |
//We calculate the hash only up to the last non-null byte |
89 |
var lastByteIndex = Array.FindLastIndex(buffer, read - 1, aByte => aByte != 0); |
90 |
|
91 |
var hash = hasher.ComputeHash(buffer, 0, lastByteIndex + 1); |
92 |
hashes[index] = hash; |
93 |
} |
94 |
index += read; |
95 |
}; |
96 |
return hashes; |
97 |
} |
98 |
|
99 |
public static async Task<ConcurrentDictionary<int, byte[]>> CalculateBlockHashesAgentAsync(FileStream stream, int blockSize, string algorithm, int parallelism) |
100 |
{ |
101 |
if (stream == null) |
102 |
throw new ArgumentNullException("stream"); |
103 |
if (String.IsNullOrWhiteSpace(algorithm)) |
104 |
throw new ArgumentNullException("algorithm"); |
105 |
if (blockSize <= 0) |
106 |
throw new ArgumentOutOfRangeException("blockSize", "blockSize must be a value greater than zero "); |
107 |
Contract.EndContractBlock(); |
108 |
|
109 |
var hashes = new ConcurrentDictionary<int, byte[]>(); |
110 |
|
111 |
var options = new ExecutionDataflowBlockOptions {BoundedCapacity = parallelism,MaxDegreeOfParallelism=parallelism}; |
112 |
var hashBlock=new ActionBlock<Tuple<int,byte[]>>(input=> |
113 |
{ |
114 |
int idx = input.Item1; |
115 |
byte[] block = input.Item2; |
116 |
using (var hasher = HashAlgorithm.Create(algorithm)) |
117 |
{ |
118 |
//This code was added for compatibility with the way Pithos calculates the last hash |
119 |
//We calculate the hash only up to the last non-null byte |
120 |
var lastByteIndex = Array.FindLastIndex(block, block.Length-1, aByte => aByte != 0); |
121 |
|
122 |
var hash = hasher.ComputeHash(block, 0, lastByteIndex + 1); |
123 |
hashes[idx] = hash; |
124 |
} |
125 |
},options); |
126 |
|
127 |
var buffer = new byte[blockSize]; |
128 |
int read; |
129 |
int index = 0; |
130 |
while ((read = await stream.ReadAsync(buffer, 0, blockSize)) > 0) |
131 |
{ |
132 |
var block = new byte[read]; |
133 |
Buffer.BlockCopy(buffer,0,block,0,read); |
134 |
await hashBlock.SendAsync(Tuple.Create(index, block)); |
135 |
index += read; |
136 |
}; |
137 |
|
138 |
hashBlock.Complete(); |
139 |
await hashBlock.Completion; |
140 |
|
141 |
return hashes; |
142 |
} |
143 |
|
144 |
static BlockHashAlgorithms() |
145 |
{ |
146 |
CalculateBlockHash = CalculateBlockHashesRecursiveAsync; |
147 |
} |
148 |
} |
149 |
} |