Replaced .NET hashing with OpenSSL
[pithos-ms-client] / trunk / Pithos.Network / BlockHashAlgorithms.cs
1 #region\r
2 /* -----------------------------------------------------------------------\r
3  * <copyright file="BlockHashAlgorithms.cs" company="GRNet">\r
4  * \r
5  * Copyright 2011-2012 GRNET S.A. All rights reserved.\r
6  *\r
7  * Redistribution and use in source and binary forms, with or\r
8  * without modification, are permitted provided that the following\r
9  * conditions are met:\r
10  *\r
11  *   1. Redistributions of source code must retain the above\r
12  *      copyright notice, this list of conditions and the following\r
13  *      disclaimer.\r
14  *\r
15  *   2. Redistributions in binary form must reproduce the above\r
16  *      copyright notice, this list of conditions and the following\r
17  *      disclaimer in the documentation and/or other materials\r
18  *      provided with the distribution.\r
19  *\r
20  *\r
21  * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS\r
22  * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED\r
23  * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR\r
24  * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR\r
25  * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,\r
26  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT\r
27  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF\r
28  * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED\r
29  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT\r
30  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN\r
31  * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE\r
32  * POSSIBILITY OF SUCH DAMAGE.\r
33  *\r
34  * The views and conclusions contained in the software and\r
35  * documentation are those of the authors and should not be\r
36  * interpreted as representing official policies, either expressed\r
37  * or implied, of GRNET S.A.\r
38  * </copyright>\r
39  * -----------------------------------------------------------------------\r
40  */\r
41 #endregion\r
42 using System.Collections.Concurrent;\r
43 using System.Diagnostics.Contracts;\r
44 using System.IO;\r
45 using System.Reflection;\r
46 using System.ServiceModel.Channels;\r
47 using System.Threading;\r
48 using System.Threading.Tasks;\r
49 using System.Threading.Tasks.Dataflow;\r
50 using OpenSSL.Crypto;\r
51 \r
52 namespace Pithos.Network\r
53 {\r
54     using System;\r
55 \r
56     /// <summary>\r
57     /// TODO: Update summary.\r
58     /// </summary>\r
59     public static class BlockHashAlgorithms\r
60     {\r
61         private static readonly log4net.ILog Log = log4net.LogManager.GetLogger(MethodBase.GetCurrentMethod().DeclaringType);\r
62 \r
63 /*\r
64         public static Func<FileStream, int, string, ConcurrentDictionary<int, byte[]>, int, Task<ConcurrentDictionary<int, byte[]>>> CalculateBlockHash;\r
65 \r
66         public static Task<ConcurrentDictionary<int, byte[]>> CalculateBlockHashesRecursiveAsync(FileStream stream, int blockSize, string algorithm, ConcurrentDictionary<int, byte[]> hashes = null, int index = 0)\r
67         {\r
68             if (stream == null)\r
69                 throw new ArgumentNullException("stream");\r
70             if (String.IsNullOrWhiteSpace(algorithm))\r
71                 throw new ArgumentNullException("algorithm");\r
72             if (blockSize <= 0)\r
73                 throw new ArgumentOutOfRangeException("blockSize", "blockSize must be a value greater than zero ");\r
74             if (index < 0)\r
75                 throw new ArgumentOutOfRangeException("index", "index must be a non-negative value");\r
76             Contract.EndContractBlock();\r
77 \r
78 \r
79             if (hashes == null)\r
80                 hashes = new ConcurrentDictionary<int, byte[]>();\r
81 \r
82             var buffer = new byte[blockSize];\r
83             return stream.ReadAsync(buffer, 0, blockSize).ContinueWith(t =>\r
84             {\r
85                 var read = t.Result;\r
86 \r
87                 var nextTask = read == blockSize\r
88                                     ? CalculateBlockHashesRecursiveAsync(stream, blockSize, algorithm, hashes, index + 1)\r
89                                     : Task.Factory.StartNew(() => hashes);\r
90                 if (read > 0)\r
91                     using (var hasher = HashAlgorithm.Create(algorithm))\r
92                     {\r
93                         //This code was added for compatibility with the way Pithos calculates the last hash\r
94                         //We calculate the hash only up to the last non-null byte\r
95                         var lastByteIndex = Array.FindLastIndex(buffer, read - 1, aByte => aByte != 0);\r
96 \r
97                         var hash = hasher.ComputeHash(buffer, 0, lastByteIndex + 1);\r
98                         hashes[index] = hash;\r
99                     }\r
100                 return nextTask;\r
101             }).Unwrap();\r
102         }\r
103         public static async Task<ConcurrentDictionary<int, byte[]>> CalculateBlockHashesAsync(FileStream stream, int blockSize, string algorithm, ConcurrentDictionary<int, byte[]> hashes = null, int index = 0)\r
104         {\r
105             if (stream == null)\r
106                 throw new ArgumentNullException("stream");\r
107             if (String.IsNullOrWhiteSpace(algorithm))\r
108                 throw new ArgumentNullException("algorithm");\r
109             if (blockSize <= 0)\r
110                 throw new ArgumentOutOfRangeException("blockSize", "blockSize must be a value greater than zero ");\r
111             Contract.EndContractBlock();\r
112 \r
113 \r
114             if (hashes == null)\r
115                 hashes = new ConcurrentDictionary<int, byte[]>();\r
116 \r
117             var buffer = new byte[blockSize];\r
118             int read;\r
119             while ((read = await stream.ReadAsync(buffer, 0, blockSize)) > 0)\r
120             {\r
121                 using (var hasher = HashAlgorithm.Create(algorithm))\r
122                 {\r
123                     //This code was added for compatibility with the way Pithos calculates the last hash\r
124                     //We calculate the hash only up to the last non-null byte\r
125                     var lastByteIndex = Array.FindLastIndex(buffer, read - 1, aByte => aByte != 0);\r
126 \r
127                     var hash = hasher.ComputeHash(buffer, 0, lastByteIndex + 1);\r
128                     hashes[index] = hash;\r
129                 }\r
130                 index += read;\r
131             }\r
132             return hashes;\r
133         }\r
134         \r
135         public static async Task<ConcurrentDictionary<int, byte[]>> CalculateBlockHashesAgentAsync(FileStream stream, int blockSize, string algorithm, int parallelism)\r
136         {\r
137             if (stream == null)\r
138                 throw new ArgumentNullException("stream");\r
139             if (String.IsNullOrWhiteSpace(algorithm))\r
140                 throw new ArgumentNullException("algorithm");\r
141             if (blockSize <= 0)\r
142                 throw new ArgumentOutOfRangeException("blockSize", "blockSize must be a value greater than zero ");\r
143             Contract.EndContractBlock();\r
144 \r
145             var hashes = new ConcurrentDictionary<int, byte[]>();\r
146 \r
147             var path = stream.Name;\r
148             var size = stream.Length;\r
149             Log.DebugFormat("Hashing [{0}] size [{1}]",path,size);\r
150             \r
151             var options = new ExecutionDataflowBlockOptions {BoundedCapacity = parallelism,MaxDegreeOfParallelism=parallelism};\r
152             var hashBlock=new ActionBlock<Tuple<int,byte[]>>(input=>\r
153                               {\r
154                                   int idx = input.Item1;\r
155                                   byte[] block = input.Item2;\r
156                                   using (var hasher = HashAlgorithm.Create(algorithm))\r
157                                   {\r
158                                       //This code was added for compatibility with the way Pithos calculates the last hash\r
159                                       //We calculate the hash only up to the last non-null byte\r
160                                       var lastByteIndex = Array.FindLastIndex(block, block.Length-1, aByte => aByte != 0);\r
161 \r
162                                       var hash = hasher.ComputeHash(block, 0, lastByteIndex + 1);\r
163                                       hashes[idx] = hash;\r
164                                   }                                  \r
165                               },options);\r
166 \r
167             var buffer = new byte[blockSize];\r
168             int read;\r
169             int index = 0;\r
170             while ((read = await stream.ReadAsync(buffer, 0, blockSize)) > 0)\r
171             {\r
172                 var block = new byte[read];\r
173                 Buffer.BlockCopy(buffer, 0, block, 0, read);\r
174                 await hashBlock.SendAsync(Tuple.Create(index, block));\r
175                 index += read;\r
176             }\r
177             \r
178 \r
179             hashBlock.Complete();\r
180             await hashBlock.Completion;\r
181 \r
182             return hashes;\r
183         } \r
184         \r
185         public static async Task<ConcurrentDictionary<int, byte[]>> CalculateBlockHashesInPlace(FileStream stream, int blockSize, string algorithm, int parallelism)\r
186         {\r
187             if (stream == null)\r
188                 throw new ArgumentNullException("stream");\r
189             if (String.IsNullOrWhiteSpace(algorithm))\r
190                 throw new ArgumentNullException("algorithm");\r
191             if (blockSize <= 0)\r
192                 throw new ArgumentOutOfRangeException("blockSize", "blockSize must be a value greater than zero ");\r
193             Contract.EndContractBlock();\r
194 \r
195             var hashes = new ConcurrentDictionary<int, byte[]>();\r
196 \r
197             var path = stream.Name;\r
198             var size = stream.Length;\r
199             Log.DebugFormat("Hashing [{0}] size [{1}]",path,size);\r
200             \r
201 \r
202             var buffer = new byte[blockSize];\r
203             var index = 0;\r
204             using (var hasher = HashAlgorithm.Create(algorithm))\r
205             {\r
206                 int read;\r
207                 while ((read = await stream.ReadAsync(buffer, 0, blockSize)) > 0)\r
208                 {\r
209                     //This code was added for compatibility with the way Pithos calculates the last hash\r
210                     //We calculate the hash only up to the last non-null byte\r
211                     var lastByteIndex = Array.FindLastIndex(buffer, read - 1, aByte => aByte != 0);\r
212 \r
213                     var hash = hasher.ComputeHash(buffer, 0, lastByteIndex + 1);\r
214                     Log.DebugFormat("Hashed [{0}] [{1}/{2}] [{3:p}]", path, index,size,(double)index/size);\r
215                     hashes[index] = hash;\r
216                     index += read;\r
217                 }\r
218             }\r
219             return hashes;\r
220         }\r
221 */\r
222 \r
223         private static BufferManager _bufferMgr;\r
224 \r
225         private static BufferManager GetBufferManager(int blockSize,int parallelism)\r
226         {\r
227             Interlocked.CompareExchange(ref _bufferMgr,\r
228                                         BufferManager.CreateBufferManager(parallelism*blockSize, blockSize),\r
229                                         null);\r
230             return _bufferMgr;\r
231         }\r
232 \r
233         public static async Task<ConcurrentDictionary<long, byte[]>> CalculateBlockHashesInPlacePFor(FileStream stream, int blockSize, string algorithm, int parallelism, Action<long, byte[], int> postAction,CancellationToken token, IProgress<double> progress)\r
234         {\r
235             if (stream == null)\r
236                 throw new ArgumentNullException("stream");\r
237             if (String.IsNullOrWhiteSpace(algorithm))\r
238                 throw new ArgumentNullException("algorithm");\r
239             if (blockSize <= 0)\r
240                 throw new ArgumentOutOfRangeException("blockSize", "blockSize must be a value greater than zero ");\r
241             Contract.EndContractBlock();\r
242 \r
243             var hashes = new ConcurrentDictionary<long, byte[]>();\r
244 \r
245             var path = stream.Name;\r
246             var size = stream.Length;\r
247             Log.DebugFormat("Hashing [{0}] size [{1}]", path, size);\r
248 \r
249             //TODO: Handle zero-length files\r
250             if (size == 0)\r
251             {\r
252                 var buf = new byte[0];\r
253                 using (var hasher = new MessageDigestContext(MessageDigest.CreateByName(algorithm)))\r
254                 {                    \r
255                     hasher.Init();\r
256                     hashes[0] = hasher.Digest(buf);\r
257                     return hashes;\r
258                 }\r
259             }\r
260 \r
261 \r
262             var buffer = new byte[parallelism][];\r
263             var hashers = new MessageDigestContext[parallelism];\r
264             var bufferManager = GetBufferManager(blockSize, parallelism);\r
265             for (var i = 0; i < parallelism; i++)\r
266             {\r
267                 buffer[i] = bufferManager.TakeBuffer(blockSize);// new byte[blockSize];\r
268                 hashers[i] = new MessageDigestContext(MessageDigest.CreateByName(algorithm));\r
269                 hashers[i].Init();\r
270             }\r
271             try\r
272             {\r
273                 var indices = new long[parallelism];\r
274                 var bufferCount = new int[parallelism];\r
275 \r
276                 int read;\r
277                 int bufIdx = 0;\r
278                 long index = 0;\r
279 \r
280                 long block = 0;\r
281 \r
282                 while ((read = await stream.ReadAsync(buffer[bufIdx], 0, blockSize).ConfigureAwait(false)) > 0)\r
283                 {\r
284                     index += read;\r
285                     indices[bufIdx] = index;\r
286                     bufferCount[bufIdx] = read;\r
287                     postAction(block++, buffer[bufIdx], read);\r
288 \r
289                     //If we have filled the last buffer or if we have read from the last block,\r
290                     //we can calculate the clocks in parallel\r
291                     if (bufIdx == parallelism - 1 || read < blockSize)\r
292                     {\r
293                         var options = new ParallelOptions {MaxDegreeOfParallelism = parallelism,CancellationToken=token};\r
294 \r
295                         Parallel.For(0, bufIdx + 1, options,(idx,state) =>\r
296                                                         {\r
297                                                             //This code was added for compatibility with the way Pithos calculates the last hash\r
298                                                             //We calculate the hash only up to the last non-null byte\r
299                                                             options.CancellationToken.ThrowIfCancellationRequested();\r
300                                                             var lastByteIndex = Array.FindLastIndex(buffer[idx],\r
301                                                                                                     bufferCount[idx] - 1,\r
302                                                                                                     aByte => aByte != 0);\r
303 \r
304                                                             var hasher = hashers[idx];\r
305 \r
306                                                             byte[] hash;\r
307                                                             if (buffer[idx].Length == lastByteIndex || lastByteIndex==-1)\r
308                                                                 hash = hasher.Digest(buffer[idx]);\r
309                                                             else\r
310                                                             {\r
311                                                                 var buf=new byte[lastByteIndex];\r
312                                                                 Buffer.BlockCopy(buffer[idx],0,buf,0,lastByteIndex);\r
313                                                                 hash = hasher.Digest(buf);\r
314                                                             }\r
315 \r
316                                                             \r
317                                                             \r
318                                                             var filePosition = indices[idx];\r
319                                                             /*\r
320                                                         Trace.TraceInformation("Hashed [{0}] [{1}/{2}] [{3:p}]", path,\r
321                                                                                 filePosition, size,\r
322                                                                                 (double)filePosition / size);\r
323                             */\r
324                                                             hashes[filePosition] = hash;\r
325                                                             progress.Report((long)hashes.Count*blockSize*1.0/stream.Length);\r
326                                                         });\r
327                     }\r
328                     bufIdx = (bufIdx +1)%parallelism;\r
329                 }\r
330             }\r
331             finally\r
332             {\r
333                 for (var i = 0; i < parallelism; i++)\r
334                 {\r
335                     if (hashers[i] != null)\r
336                         hashers[i].Dispose();\r
337                     bufferManager.ReturnBuffer(buffer[i]);\r
338                 }\r
339 \r
340             }\r
341 \r
342             return hashes;\r
343         }\r
344 \r
345         static BlockHashAlgorithms()\r
346         {\r
347 /*\r
348             CalculateBlockHash = CalculateBlockHashesRecursiveAsync;\r
349 */\r
350         }\r
351     }\r
352 }\r