Added check for last MD5 modification, to avoid redundant recalculation of MD5
[pithos-ms-client] / trunk / Pithos.Network / BlockHashAlgorithms.cs
1 #region\r
2 /* -----------------------------------------------------------------------\r
3  * <copyright file="BlockHashAlgorithms.cs" company="GRNet">\r
4  * \r
5  * Copyright 2011-2012 GRNET S.A. All rights reserved.\r
6  *\r
7  * Redistribution and use in source and binary forms, with or\r
8  * without modification, are permitted provided that the following\r
9  * conditions are met:\r
10  *\r
11  *   1. Redistributions of source code must retain the above\r
12  *      copyright notice, this list of conditions and the following\r
13  *      disclaimer.\r
14  *\r
15  *   2. Redistributions in binary form must reproduce the above\r
16  *      copyright notice, this list of conditions and the following\r
17  *      disclaimer in the documentation and/or other materials\r
18  *      provided with the distribution.\r
19  *\r
20  *\r
21  * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS\r
22  * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED\r
23  * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR\r
24  * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR\r
25  * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,\r
26  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT\r
27  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF\r
28  * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED\r
29  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT\r
30  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN\r
31  * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE\r
32  * POSSIBILITY OF SUCH DAMAGE.\r
33  *\r
34  * The views and conclusions contained in the software and\r
35  * documentation are those of the authors and should not be\r
36  * interpreted as representing official policies, either expressed\r
37  * or implied, of GRNET S.A.\r
38  * </copyright>\r
39  * -----------------------------------------------------------------------\r
40  */\r
41 #endregion\r
42 using System.Collections.Concurrent;\r
43 using System.Diagnostics.Contracts;\r
44 using System.IO;\r
45 using System.Reflection;\r
46 using System.Security.Cryptography;\r
47 using System.ServiceModel.Channels;\r
48 using System.Threading;\r
49 using System.Threading.Tasks;\r
50 using System.Threading.Tasks.Dataflow;\r
51 \r
52 namespace Pithos.Network\r
53 {\r
54     using System;\r
55 \r
56     /// <summary>\r
57     /// TODO: Update summary.\r
58     /// </summary>\r
59     public static class BlockHashAlgorithms\r
60     {\r
61         private static readonly log4net.ILog Log = log4net.LogManager.GetLogger(MethodBase.GetCurrentMethod().DeclaringType);\r
62 \r
63 /*\r
64         public static Func<FileStream, int, string, ConcurrentDictionary<int, byte[]>, int, Task<ConcurrentDictionary<int, byte[]>>> CalculateBlockHash;\r
65 \r
66         public static Task<ConcurrentDictionary<int, byte[]>> CalculateBlockHashesRecursiveAsync(FileStream stream, int blockSize, string algorithm, ConcurrentDictionary<int, byte[]> hashes = null, int index = 0)\r
67         {\r
68             if (stream == null)\r
69                 throw new ArgumentNullException("stream");\r
70             if (String.IsNullOrWhiteSpace(algorithm))\r
71                 throw new ArgumentNullException("algorithm");\r
72             if (blockSize <= 0)\r
73                 throw new ArgumentOutOfRangeException("blockSize", "blockSize must be a value greater than zero ");\r
74             if (index < 0)\r
75                 throw new ArgumentOutOfRangeException("index", "index must be a non-negative value");\r
76             Contract.EndContractBlock();\r
77 \r
78 \r
79             if (hashes == null)\r
80                 hashes = new ConcurrentDictionary<int, byte[]>();\r
81 \r
82             var buffer = new byte[blockSize];\r
83             return stream.ReadAsync(buffer, 0, blockSize).ContinueWith(t =>\r
84             {\r
85                 var read = t.Result;\r
86 \r
87                 var nextTask = read == blockSize\r
88                                     ? CalculateBlockHashesRecursiveAsync(stream, blockSize, algorithm, hashes, index + 1)\r
89                                     : Task.Factory.StartNew(() => hashes);\r
90                 if (read > 0)\r
91                     using (var hasher = HashAlgorithm.Create(algorithm))\r
92                     {\r
93                         //This code was added for compatibility with the way Pithos calculates the last hash\r
94                         //We calculate the hash only up to the last non-null byte\r
95                         var lastByteIndex = Array.FindLastIndex(buffer, read - 1, aByte => aByte != 0);\r
96 \r
97                         var hash = hasher.ComputeHash(buffer, 0, lastByteIndex + 1);\r
98                         hashes[index] = hash;\r
99                     }\r
100                 return nextTask;\r
101             }).Unwrap();\r
102         }\r
103         public static async Task<ConcurrentDictionary<int, byte[]>> CalculateBlockHashesAsync(FileStream stream, int blockSize, string algorithm, ConcurrentDictionary<int, byte[]> hashes = null, int index = 0)\r
104         {\r
105             if (stream == null)\r
106                 throw new ArgumentNullException("stream");\r
107             if (String.IsNullOrWhiteSpace(algorithm))\r
108                 throw new ArgumentNullException("algorithm");\r
109             if (blockSize <= 0)\r
110                 throw new ArgumentOutOfRangeException("blockSize", "blockSize must be a value greater than zero ");\r
111             Contract.EndContractBlock();\r
112 \r
113 \r
114             if (hashes == null)\r
115                 hashes = new ConcurrentDictionary<int, byte[]>();\r
116 \r
117             var buffer = new byte[blockSize];\r
118             int read;\r
119             while ((read = await stream.ReadAsync(buffer, 0, blockSize)) > 0)\r
120             {\r
121                 using (var hasher = HashAlgorithm.Create(algorithm))\r
122                 {\r
123                     //This code was added for compatibility with the way Pithos calculates the last hash\r
124                     //We calculate the hash only up to the last non-null byte\r
125                     var lastByteIndex = Array.FindLastIndex(buffer, read - 1, aByte => aByte != 0);\r
126 \r
127                     var hash = hasher.ComputeHash(buffer, 0, lastByteIndex + 1);\r
128                     hashes[index] = hash;\r
129                 }\r
130                 index += read;\r
131             }\r
132             return hashes;\r
133         }\r
134         \r
135         public static async Task<ConcurrentDictionary<int, byte[]>> CalculateBlockHashesAgentAsync(FileStream stream, int blockSize, string algorithm, int parallelism)\r
136         {\r
137             if (stream == null)\r
138                 throw new ArgumentNullException("stream");\r
139             if (String.IsNullOrWhiteSpace(algorithm))\r
140                 throw new ArgumentNullException("algorithm");\r
141             if (blockSize <= 0)\r
142                 throw new ArgumentOutOfRangeException("blockSize", "blockSize must be a value greater than zero ");\r
143             Contract.EndContractBlock();\r
144 \r
145             var hashes = new ConcurrentDictionary<int, byte[]>();\r
146 \r
147             var path = stream.Name;\r
148             var size = stream.Length;\r
149             Log.DebugFormat("Hashing [{0}] size [{1}]",path,size);\r
150             \r
151             var options = new ExecutionDataflowBlockOptions {BoundedCapacity = parallelism,MaxDegreeOfParallelism=parallelism};\r
152             var hashBlock=new ActionBlock<Tuple<int,byte[]>>(input=>\r
153                               {\r
154                                   int idx = input.Item1;\r
155                                   byte[] block = input.Item2;\r
156                                   using (var hasher = HashAlgorithm.Create(algorithm))\r
157                                   {\r
158                                       //This code was added for compatibility with the way Pithos calculates the last hash\r
159                                       //We calculate the hash only up to the last non-null byte\r
160                                       var lastByteIndex = Array.FindLastIndex(block, block.Length-1, aByte => aByte != 0);\r
161 \r
162                                       var hash = hasher.ComputeHash(block, 0, lastByteIndex + 1);\r
163                                       hashes[idx] = hash;\r
164                                   }                                  \r
165                               },options);\r
166 \r
167             var buffer = new byte[blockSize];\r
168             int read;\r
169             int index = 0;\r
170             while ((read = await stream.ReadAsync(buffer, 0, blockSize)) > 0)\r
171             {\r
172                 var block = new byte[read];\r
173                 Buffer.BlockCopy(buffer, 0, block, 0, read);\r
174                 await hashBlock.SendAsync(Tuple.Create(index, block));\r
175                 index += read;\r
176             }\r
177             \r
178 \r
179             hashBlock.Complete();\r
180             await hashBlock.Completion;\r
181 \r
182             return hashes;\r
183         } \r
184         \r
185         public static async Task<ConcurrentDictionary<int, byte[]>> CalculateBlockHashesInPlace(FileStream stream, int blockSize, string algorithm, int parallelism)\r
186         {\r
187             if (stream == null)\r
188                 throw new ArgumentNullException("stream");\r
189             if (String.IsNullOrWhiteSpace(algorithm))\r
190                 throw new ArgumentNullException("algorithm");\r
191             if (blockSize <= 0)\r
192                 throw new ArgumentOutOfRangeException("blockSize", "blockSize must be a value greater than zero ");\r
193             Contract.EndContractBlock();\r
194 \r
195             var hashes = new ConcurrentDictionary<int, byte[]>();\r
196 \r
197             var path = stream.Name;\r
198             var size = stream.Length;\r
199             Log.DebugFormat("Hashing [{0}] size [{1}]",path,size);\r
200             \r
201 \r
202             var buffer = new byte[blockSize];\r
203             var index = 0;\r
204             using (var hasher = HashAlgorithm.Create(algorithm))\r
205             {\r
206                 int read;\r
207                 while ((read = await stream.ReadAsync(buffer, 0, blockSize)) > 0)\r
208                 {\r
209                     //This code was added for compatibility with the way Pithos calculates the last hash\r
210                     //We calculate the hash only up to the last non-null byte\r
211                     var lastByteIndex = Array.FindLastIndex(buffer, read - 1, aByte => aByte != 0);\r
212 \r
213                     var hash = hasher.ComputeHash(buffer, 0, lastByteIndex + 1);\r
214                     Log.DebugFormat("Hashed [{0}] [{1}/{2}] [{3:p}]", path, index,size,(double)index/size);\r
215                     hashes[index] = hash;\r
216                     index += read;\r
217                 }\r
218             }\r
219             return hashes;\r
220         }\r
221 */\r
222 \r
223         private static BufferManager _bufferMgr;\r
224 \r
225         private static BufferManager GetBufferManager(int blockSize,int parallelism)\r
226         {\r
227             Interlocked.CompareExchange(ref _bufferMgr,\r
228                                         BufferManager.CreateBufferManager(parallelism*blockSize, blockSize),\r
229                                         null);\r
230             return _bufferMgr;\r
231         }\r
232 \r
233         public static async Task<ConcurrentDictionary<long, byte[]>> CalculateBlockHashesInPlacePFor(FileStream stream, int blockSize, string algorithm, int parallelism,IProgress<double> progress )\r
234         {\r
235             if (stream == null)\r
236                 throw new ArgumentNullException("stream");\r
237             if (String.IsNullOrWhiteSpace(algorithm))\r
238                 throw new ArgumentNullException("algorithm");\r
239             if (blockSize <= 0)\r
240                 throw new ArgumentOutOfRangeException("blockSize", "blockSize must be a value greater than zero ");\r
241             Contract.EndContractBlock();\r
242 \r
243             var hashes = new ConcurrentDictionary<long, byte[]>();\r
244 \r
245             var path = stream.Name;\r
246             var size = stream.Length;\r
247             Log.DebugFormat("Hashing [{0}] size [{1}]", path, size);\r
248 \r
249             //TODO: Handle zero-length files\r
250             if (size == 0)\r
251             {\r
252                 var buf = new byte[0];\r
253                 var hasher=HashAlgorithm.Create(algorithm);                \r
254                 hashes[0]=hasher.ComputeHash(buf);\r
255                 return hashes;\r
256             }\r
257 \r
258 \r
259             var buffer = new byte[parallelism][];\r
260             var hashers = new HashAlgorithm[parallelism];\r
261             var bufferManager = GetBufferManager(blockSize, parallelism);\r
262             for (var i = 0; i < parallelism; i++)\r
263             {\r
264                 buffer[i] = bufferManager.TakeBuffer(blockSize);// new byte[blockSize];\r
265                 hashers[i] = HashAlgorithm.Create(algorithm);\r
266             }\r
267             try\r
268             {\r
269                 var indices = new long[parallelism];\r
270                 var bufferCount = new int[parallelism];\r
271 \r
272                 int read;\r
273                 int bufIdx = 0;\r
274                 long index = 0;\r
275 \r
276 \r
277                 while ((read = await stream.ReadAsync(buffer[bufIdx], 0, blockSize).ConfigureAwait(false)) > 0)\r
278                 {\r
279                     index += read;\r
280                     indices[bufIdx] = index;\r
281                     bufferCount[bufIdx] = read;\r
282                     //If we have filled the last buffer or if we have read from the last block,\r
283                     //we can calculate the clocks in parallel\r
284                     if (bufIdx == parallelism - 1 || read < blockSize)\r
285                     {\r
286                         //var options = new ParallelOptions {MaxDegreeOfParallelism = parallelism};\r
287                         Parallel.For(0, bufIdx + 1, idx =>\r
288                                                         {\r
289                                                             //This code was added for compatibility with the way Pithos calculates the last hash\r
290                                                             //We calculate the hash only up to the last non-null byte\r
291                                                             var lastByteIndex = Array.FindLastIndex(buffer[idx],\r
292                                                                                                     bufferCount[idx] - 1,\r
293                                                                                                     aByte => aByte != 0);\r
294 \r
295                                                             var hasher = hashers[idx];\r
296                                                             var hash = hasher.ComputeHash(buffer[idx], 0,\r
297                                                                                           lastByteIndex + 1);\r
298                                                             var filePosition = indices[idx];\r
299                                                             /*\r
300                                                         Trace.TraceInformation("Hashed [{0}] [{1}/{2}] [{3:p}]", path,\r
301                                                                                 filePosition, size,\r
302                                                                                 (double)filePosition / size);\r
303                             */\r
304                                                             hashes[filePosition] = hash;\r
305                                                             progress.Report((long)hashes.Count*blockSize*1.0/stream.Length);\r
306                                                         });\r
307                     }\r
308                     bufIdx = (bufIdx + 1)%parallelism;\r
309                 }\r
310             }\r
311             finally\r
312             {\r
313                 for (var i = 0; i < parallelism; i++)\r
314                 {\r
315                     if (hashers[i] != null)\r
316                         hashers[i].Dispose();\r
317                     bufferManager.ReturnBuffer(buffer[i]);\r
318                 }\r
319 \r
320             }\r
321 \r
322             return hashes;\r
323         }\r
324 \r
325         static BlockHashAlgorithms()\r
326         {\r
327 /*\r
328             CalculateBlockHash = CalculateBlockHashesRecursiveAsync;\r
329 */\r
330         }\r
331     }\r
332 }\r