Added check and failover of hash algorithms: OpenSSL > Cng > Default
[pithos-ms-client] / trunk / Pithos.Network / BlockHashAlgorithms.cs
1 #region\r
2 /* -----------------------------------------------------------------------\r
3  * <copyright file="BlockHashAlgorithms.cs" company="GRNet">\r
4  * \r
5  * Copyright 2011-2012 GRNET S.A. All rights reserved.\r
6  *\r
7  * Redistribution and use in source and binary forms, with or\r
8  * without modification, are permitted provided that the following\r
9  * conditions are met:\r
10  *\r
11  *   1. Redistributions of source code must retain the above\r
12  *      copyright notice, this list of conditions and the following\r
13  *      disclaimer.\r
14  *\r
15  *   2. Redistributions in binary form must reproduce the above\r
16  *      copyright notice, this list of conditions and the following\r
17  *      disclaimer in the documentation and/or other materials\r
18  *      provided with the distribution.\r
19  *\r
20  *\r
21  * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS\r
22  * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED\r
23  * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR\r
24  * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR\r
25  * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,\r
26  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT\r
27  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF\r
28  * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED\r
29  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT\r
30  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN\r
31  * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE\r
32  * POSSIBILITY OF SUCH DAMAGE.\r
33  *\r
34  * The views and conclusions contained in the software and\r
35  * documentation are those of the authors and should not be\r
36  * interpreted as representing official policies, either expressed\r
37  * or implied, of GRNET S.A.\r
38  * </copyright>\r
39  * -----------------------------------------------------------------------\r
40  */\r
41 #endregion\r
42 using System.Collections.Concurrent;\r
43 using System.Diagnostics.Contracts;\r
44 using System.IO;\r
45 using System.Reflection;\r
46 using System.Security.Cryptography;\r
47 using System.ServiceModel.Channels;\r
48 using System.Threading;\r
49 using System.Threading.Tasks;\r
50 using System.Threading.Tasks.Dataflow;\r
51 \r
52 \r
53 namespace Pithos.Network\r
54 {\r
55     using System;\r
56 \r
57     /// <summary>\r
58     /// TODO: Update summary.\r
59     /// </summary>\r
60     public static class BlockHashAlgorithms\r
61     {\r
62         private static readonly log4net.ILog Log = log4net.LogManager.GetLogger(MethodBase.GetCurrentMethod().DeclaringType);\r
63 \r
64 /*\r
65         public static Func<FileStream, int, string, ConcurrentDictionary<int, byte[]>, int, Task<ConcurrentDictionary<int, byte[]>>> CalculateBlockHash;\r
66 \r
67         public static Task<ConcurrentDictionary<int, byte[]>> CalculateBlockHashesRecursiveAsync(FileStream stream, int blockSize, string algorithm, ConcurrentDictionary<int, byte[]> hashes = null, int index = 0)\r
68         {\r
69             if (stream == null)\r
70                 throw new ArgumentNullException("stream");\r
71             if (String.IsNullOrWhiteSpace(algorithm))\r
72                 throw new ArgumentNullException("algorithm");\r
73             if (blockSize <= 0)\r
74                 throw new ArgumentOutOfRangeException("blockSize", "blockSize must be a value greater than zero ");\r
75             if (index < 0)\r
76                 throw new ArgumentOutOfRangeException("index", "index must be a non-negative value");\r
77             Contract.EndContractBlock();\r
78 \r
79 \r
80             if (hashes == null)\r
81                 hashes = new ConcurrentDictionary<int, byte[]>();\r
82 \r
83             var buffer = new byte[blockSize];\r
84             return stream.ReadAsync(buffer, 0, blockSize).ContinueWith(t =>\r
85             {\r
86                 var read = t.Result;\r
87 \r
88                 var nextTask = read == blockSize\r
89                                     ? CalculateBlockHashesRecursiveAsync(stream, blockSize, algorithm, hashes, index + 1)\r
90                                     : Task.Factory.StartNew(() => hashes);\r
91                 if (read > 0)\r
92                     using (var hasher = HashAlgorithm.Create(algorithm))\r
93                     {\r
94                         //This code was added for compatibility with the way Pithos calculates the last hash\r
95                         //We calculate the hash only up to the last non-null byte\r
96                         var lastByteIndex = Array.FindLastIndex(buffer, read - 1, aByte => aByte != 0);\r
97 \r
98                         var hash = hasher.ComputeHash(buffer, 0, lastByteIndex + 1);\r
99                         hashes[index] = hash;\r
100                     }\r
101                 return nextTask;\r
102             }).Unwrap();\r
103         }\r
104         public static async Task<ConcurrentDictionary<int, byte[]>> CalculateBlockHashesAsync(FileStream stream, int blockSize, string algorithm, ConcurrentDictionary<int, byte[]> hashes = null, int index = 0)\r
105         {\r
106             if (stream == null)\r
107                 throw new ArgumentNullException("stream");\r
108             if (String.IsNullOrWhiteSpace(algorithm))\r
109                 throw new ArgumentNullException("algorithm");\r
110             if (blockSize <= 0)\r
111                 throw new ArgumentOutOfRangeException("blockSize", "blockSize must be a value greater than zero ");\r
112             Contract.EndContractBlock();\r
113 \r
114 \r
115             if (hashes == null)\r
116                 hashes = new ConcurrentDictionary<int, byte[]>();\r
117 \r
118             var buffer = new byte[blockSize];\r
119             int read;\r
120             while ((read = await stream.ReadAsync(buffer, 0, blockSize)) > 0)\r
121             {\r
122                 using (var hasher = HashAlgorithm.Create(algorithm))\r
123                 {\r
124                     //This code was added for compatibility with the way Pithos calculates the last hash\r
125                     //We calculate the hash only up to the last non-null byte\r
126                     var lastByteIndex = Array.FindLastIndex(buffer, read - 1, aByte => aByte != 0);\r
127 \r
128                     var hash = hasher.ComputeHash(buffer, 0, lastByteIndex + 1);\r
129                     hashes[index] = hash;\r
130                 }\r
131                 index += read;\r
132             }\r
133             return hashes;\r
134         }\r
135         \r
136         public static async Task<ConcurrentDictionary<int, byte[]>> CalculateBlockHashesAgentAsync(FileStream stream, int blockSize, string algorithm, int parallelism)\r
137         {\r
138             if (stream == null)\r
139                 throw new ArgumentNullException("stream");\r
140             if (String.IsNullOrWhiteSpace(algorithm))\r
141                 throw new ArgumentNullException("algorithm");\r
142             if (blockSize <= 0)\r
143                 throw new ArgumentOutOfRangeException("blockSize", "blockSize must be a value greater than zero ");\r
144             Contract.EndContractBlock();\r
145 \r
146             var hashes = new ConcurrentDictionary<int, byte[]>();\r
147 \r
148             var path = stream.Name;\r
149             var size = stream.Length;\r
150             Log.DebugFormat("Hashing [{0}] size [{1}]",path,size);\r
151             \r
152             var options = new ExecutionDataflowBlockOptions {BoundedCapacity = parallelism,MaxDegreeOfParallelism=parallelism};\r
153             var hashBlock=new ActionBlock<Tuple<int,byte[]>>(input=>\r
154                               {\r
155                                   int idx = input.Item1;\r
156                                   byte[] block = input.Item2;\r
157                                   using (var hasher = HashAlgorithm.Create(algorithm))\r
158                                   {\r
159                                       //This code was added for compatibility with the way Pithos calculates the last hash\r
160                                       //We calculate the hash only up to the last non-null byte\r
161                                       var lastByteIndex = Array.FindLastIndex(block, block.Length-1, aByte => aByte != 0);\r
162 \r
163                                       var hash = hasher.ComputeHash(block, 0, lastByteIndex + 1);\r
164                                       hashes[idx] = hash;\r
165                                   }                                  \r
166                               },options);\r
167 \r
168             var buffer = new byte[blockSize];\r
169             int read;\r
170             int index = 0;\r
171             while ((read = await stream.ReadAsync(buffer, 0, blockSize)) > 0)\r
172             {\r
173                 var block = new byte[read];\r
174                 Buffer.BlockCopy(buffer, 0, block, 0, read);\r
175                 await hashBlock.SendAsync(Tuple.Create(index, block));\r
176                 index += read;\r
177             }\r
178             \r
179 \r
180             hashBlock.Complete();\r
181             await hashBlock.Completion;\r
182 \r
183             return hashes;\r
184         } \r
185         \r
186         public static async Task<ConcurrentDictionary<int, byte[]>> CalculateBlockHashesInPlace(FileStream stream, int blockSize, string algorithm, int parallelism)\r
187         {\r
188             if (stream == null)\r
189                 throw new ArgumentNullException("stream");\r
190             if (String.IsNullOrWhiteSpace(algorithm))\r
191                 throw new ArgumentNullException("algorithm");\r
192             if (blockSize <= 0)\r
193                 throw new ArgumentOutOfRangeException("blockSize", "blockSize must be a value greater than zero ");\r
194             Contract.EndContractBlock();\r
195 \r
196             var hashes = new ConcurrentDictionary<int, byte[]>();\r
197 \r
198             var path = stream.Name;\r
199             var size = stream.Length;\r
200             Log.DebugFormat("Hashing [{0}] size [{1}]",path,size);\r
201             \r
202 \r
203             var buffer = new byte[blockSize];\r
204             var index = 0;\r
205             using (var hasher = HashAlgorithm.Create(algorithm))\r
206             {\r
207                 int read;\r
208                 while ((read = await stream.ReadAsync(buffer, 0, blockSize)) > 0)\r
209                 {\r
210                     //This code was added for compatibility with the way Pithos calculates the last hash\r
211                     //We calculate the hash only up to the last non-null byte\r
212                     var lastByteIndex = Array.FindLastIndex(buffer, read - 1, aByte => aByte != 0);\r
213 \r
214                     var hash = hasher.ComputeHash(buffer, 0, lastByteIndex + 1);\r
215                     Log.DebugFormat("Hashed [{0}] [{1}/{2}] [{3:p}]", path, index,size,(double)index/size);\r
216                     hashes[index] = hash;\r
217                     index += read;\r
218                 }\r
219             }\r
220             return hashes;\r
221         }\r
222 */\r
223 \r
224         private static BufferManager _bufferMgr;\r
225 \r
226         private static BufferManager GetBufferManager(int blockSize,int parallelism)\r
227         {\r
228             Interlocked.CompareExchange(ref _bufferMgr,\r
229                                         BufferManager.CreateBufferManager(parallelism*blockSize, blockSize),\r
230                                         null);\r
231             return _bufferMgr;\r
232         }\r
233 \r
234         //public static async Task<ConcurrentDictionary<long, byte[]>> CalculateBlockHashesInPlacePFor(FileStream stream, int blockSize, string algorithm, int parallelism, Action<long, byte[], int> postAction,CancellationToken token, IProgress<double> progress)\r
235         public static async Task<ConcurrentDictionary<long, byte[]>> CalculateBlockHashesInPlacePFor(FileStream stream, int blockSize, string algorithm, byte parallelism, CancellationToken token, IProgress<HashProgress> progress)\r
236         {\r
237             if (stream == null)\r
238                 throw new ArgumentNullException("stream");\r
239             if (String.IsNullOrWhiteSpace(algorithm))\r
240                 throw new ArgumentNullException("algorithm");\r
241             if (blockSize <= 0)\r
242                 throw new ArgumentOutOfRangeException("blockSize", "blockSize must be a value greater than zero ");\r
243             Contract.EndContractBlock();\r
244 \r
245             var hashes = new ConcurrentDictionary<long, byte[]>();\r
246 \r
247             var path = stream.Name;\r
248             var size = stream.Length;\r
249             Log.DebugFormat("Hashing [{0}] size [{1}]", path, size);\r
250 \r
251             //TODO: Handle zero-length files\r
252             if (size == 0)\r
253             {\r
254                 var buf = new byte[0];\r
255                 using (var hasher = HashAlgorithm.Create(algorithm))\r
256                 {\r
257                     hashes[0] = hasher.ComputeHash(buf);\r
258                     return hashes;\r
259                 }\r
260             }\r
261 \r
262             var blocks = size<blockSize?1:size/blockSize;\r
263 \r
264             var actualHashers = parallelism > blocks ? (byte)blocks : parallelism;\r
265 \r
266             var buffer = new byte[actualHashers][];\r
267             var hashers = new HashAlgorithm[actualHashers];\r
268             var bufferManager = GetBufferManager(blockSize, actualHashers);\r
269             for (var i = 0; i < actualHashers; i++)\r
270             {\r
271                 buffer[i] = bufferManager.TakeBuffer(blockSize);// new byte[blockSize];\r
272                 hashers[i] = HashAlgorithm.Create(algorithm);                \r
273             }\r
274             try\r
275             {\r
276                 var indices = new long[actualHashers];\r
277                 var bufferCount = new int[actualHashers];\r
278 \r
279                 int read;\r
280                 int bufIdx = 0;\r
281                 long index = 0;\r
282 \r
283                 //long block = 0;\r
284 \r
285                 while ((read = await stream.ReadAsync(buffer[bufIdx], 0, blockSize).ConfigureAwait(false)) > 0)\r
286                 {\r
287                     index += read;\r
288                     indices[bufIdx] = index;\r
289                     bufferCount[bufIdx] = read;\r
290                     //postAction(block++, buffer[bufIdx], read);\r
291 \r
292                     //If we have filled the last buffer or if we have read from the last block,\r
293                     //we can calculate the clocks in parallel\r
294                     if (bufIdx == actualHashers - 1 || read < blockSize)\r
295                     {\r
296                         var options = new ParallelOptions {MaxDegreeOfParallelism = actualHashers,CancellationToken=token};\r
297 \r
298                         Parallel.For(0, bufIdx + 1, options,(idx,state) =>\r
299                                                         {\r
300                                                             //This code was added for compatibility with the way Pithos calculates the last hash\r
301                                                             //We calculate the hash only up to the last non-null byte\r
302                                                             options.CancellationToken.ThrowIfCancellationRequested();\r
303                                                             var lastByteIndex = Array.FindLastIndex(buffer[idx],\r
304                                                                                                     bufferCount[idx] - 1,\r
305                                                                                                     aByte => aByte != 0);\r
306 \r
307                                                             var hasher = hashers[idx];\r
308 \r
309                                                             byte[] hash=hasher.ComputeHash(buffer[idx],0,lastByteIndex+1);\r
310 /*\r
311                                                             if (buffer[idx].Length == lastByteIndex || lastByteIndex==-1)\r
312                                                                 hash = hasher.Digest(buffer[idx]);\r
313                                                             else\r
314                                                             {\r
315                                                                 var buf=new byte[lastByteIndex+1];\r
316                                                                 Buffer.BlockCopy(buffer[idx],0,buf,0,lastByteIndex+1);\r
317                                                                 hash = hasher.Digest(buf);\r
318                                                             }\r
319 */\r
320 \r
321                                                             \r
322                                                             \r
323                                                             var filePosition = indices[idx];\r
324                                                             /*\r
325                                                         Trace.TraceInformation("Hashed [{0}] [{1}/{2}] [{3:p}]", path,\r
326                                                                                 filePosition, size,\r
327                                                                                 (double)filePosition / size);\r
328                             */\r
329                                                             hashes[filePosition] = hash;\r
330                                                             //Do not report for files smaller than 4MB\r
331                                                             if (progress != null && stream.Length > 4*1024*1024)\r
332                                                                 progress.Report(new HashProgress((long)hashes.Count*blockSize,stream.Length));\r
333                                                         });\r
334                     }\r
335                     bufIdx = (bufIdx +1)%actualHashers;\r
336                 }\r
337             }\r
338             finally\r
339             {\r
340                 for (var i = 0; i < actualHashers; i++)\r
341                 {\r
342                     if (hashers[i] != null)\r
343                         hashers[i].Dispose();\r
344                     bufferManager.ReturnBuffer(buffer[i]);\r
345                 }\r
346 \r
347             }\r
348 \r
349             return hashes;\r
350         }\r
351 \r
352         static BlockHashAlgorithms()\r
353         {\r
354 /*\r
355             CalculateBlockHash = CalculateBlockHashesRecursiveAsync;\r
356 */\r
357         }\r
358     }\r
359 }\r