Statistics
| Branch: | Revision:

root / trunk / Pithos.Network / BlockHashAlgorithms.cs @ df12ed4c

History | View | Annotate | Download (17 kB)

1
#region
2
/* -----------------------------------------------------------------------
3
 * <copyright file="BlockHashAlgorithms.cs" company="GRNet">
4
 * 
5
 * Copyright 2011-2012 GRNET S.A. All rights reserved.
6
 *
7
 * Redistribution and use in source and binary forms, with or
8
 * without modification, are permitted provided that the following
9
 * conditions are met:
10
 *
11
 *   1. Redistributions of source code must retain the above
12
 *      copyright notice, this list of conditions and the following
13
 *      disclaimer.
14
 *
15
 *   2. Redistributions in binary form must reproduce the above
16
 *      copyright notice, this list of conditions and the following
17
 *      disclaimer in the documentation and/or other materials
18
 *      provided with the distribution.
19
 *
20
 *
21
 * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
22
 * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
23
 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
24
 * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
25
 * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26
 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27
 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
28
 * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
29
 * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
30
 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
31
 * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
32
 * POSSIBILITY OF SUCH DAMAGE.
33
 *
34
 * The views and conclusions contained in the software and
35
 * documentation are those of the authors and should not be
36
 * interpreted as representing official policies, either expressed
37
 * or implied, of GRNET S.A.
38
 * </copyright>
39
 * -----------------------------------------------------------------------
40
 */
41
#endregion
42
using System.Collections.Concurrent;
43
using System.Diagnostics.Contracts;
44
using System.IO;
45
using System.Reflection;
46
using System.Security.Cryptography;
47
using System.ServiceModel.Channels;
48
using System.Threading;
49
using System.Threading.Tasks;
50
using System.Threading.Tasks.Dataflow;
51

    
52

    
53
namespace Pithos.Network
54
{
55
    using System;
56

    
57
    /// <summary>
58
    /// TODO: Update summary.
59
    /// </summary>
60
    public static class BlockHashAlgorithms
61
    {
62
        private static readonly log4net.ILog Log = log4net.LogManager.GetLogger(MethodBase.GetCurrentMethod().DeclaringType);
63

    
64
/*
65
        public static Func<FileStream, int, string, ConcurrentDictionary<int, byte[]>, int, Task<ConcurrentDictionary<int, byte[]>>> CalculateBlockHash;
66

    
67
        public static Task<ConcurrentDictionary<int, byte[]>> CalculateBlockHashesRecursiveAsync(FileStream stream, int blockSize, string algorithm, ConcurrentDictionary<int, byte[]> hashes = null, int index = 0)
68
        {
69
            if (stream == null)
70
                throw new ArgumentNullException("stream");
71
            if (String.IsNullOrWhiteSpace(algorithm))
72
                throw new ArgumentNullException("algorithm");
73
            if (blockSize <= 0)
74
                throw new ArgumentOutOfRangeException("blockSize", "blockSize must be a value greater than zero ");
75
            if (index < 0)
76
                throw new ArgumentOutOfRangeException("index", "index must be a non-negative value");
77
            Contract.EndContractBlock();
78

    
79

    
80
            if (hashes == null)
81
                hashes = new ConcurrentDictionary<int, byte[]>();
82

    
83
            var buffer = new byte[blockSize];
84
            return stream.ReadAsync(buffer, 0, blockSize).ContinueWith(t =>
85
            {
86
                var read = t.Result;
87

    
88
                var nextTask = read == blockSize
89
                                    ? CalculateBlockHashesRecursiveAsync(stream, blockSize, algorithm, hashes, index + 1)
90
                                    : Task.Factory.StartNew(() => hashes);
91
                if (read > 0)
92
                    using (var hasher = HashAlgorithm.Create(algorithm))
93
                    {
94
                        //This code was added for compatibility with the way Pithos calculates the last hash
95
                        //We calculate the hash only up to the last non-null byte
96
                        var lastByteIndex = Array.FindLastIndex(buffer, read - 1, aByte => aByte != 0);
97

    
98
                        var hash = hasher.ComputeHash(buffer, 0, lastByteIndex + 1);
99
                        hashes[index] = hash;
100
                    }
101
                return nextTask;
102
            }).Unwrap();
103
        }
104
        public static async Task<ConcurrentDictionary<int, byte[]>> CalculateBlockHashesAsync(FileStream stream, int blockSize, string algorithm, ConcurrentDictionary<int, byte[]> hashes = null, int index = 0)
105
        {
106
            if (stream == null)
107
                throw new ArgumentNullException("stream");
108
            if (String.IsNullOrWhiteSpace(algorithm))
109
                throw new ArgumentNullException("algorithm");
110
            if (blockSize <= 0)
111
                throw new ArgumentOutOfRangeException("blockSize", "blockSize must be a value greater than zero ");
112
            Contract.EndContractBlock();
113

    
114

    
115
            if (hashes == null)
116
                hashes = new ConcurrentDictionary<int, byte[]>();
117

    
118
            var buffer = new byte[blockSize];
119
            int read;
120
            while ((read = await stream.ReadAsync(buffer, 0, blockSize)) > 0)
121
            {
122
                using (var hasher = HashAlgorithm.Create(algorithm))
123
                {
124
                    //This code was added for compatibility with the way Pithos calculates the last hash
125
                    //We calculate the hash only up to the last non-null byte
126
                    var lastByteIndex = Array.FindLastIndex(buffer, read - 1, aByte => aByte != 0);
127

    
128
                    var hash = hasher.ComputeHash(buffer, 0, lastByteIndex + 1);
129
                    hashes[index] = hash;
130
                }
131
                index += read;
132
            }
133
            return hashes;
134
        }
135
        
136
        public static async Task<ConcurrentDictionary<int, byte[]>> CalculateBlockHashesAgentAsync(FileStream stream, int blockSize, string algorithm, int parallelism)
137
        {
138
            if (stream == null)
139
                throw new ArgumentNullException("stream");
140
            if (String.IsNullOrWhiteSpace(algorithm))
141
                throw new ArgumentNullException("algorithm");
142
            if (blockSize <= 0)
143
                throw new ArgumentOutOfRangeException("blockSize", "blockSize must be a value greater than zero ");
144
            Contract.EndContractBlock();
145

    
146
            var hashes = new ConcurrentDictionary<int, byte[]>();
147

    
148
            var path = stream.Name;
149
            var size = stream.Length;
150
            Log.DebugFormat("Hashing [{0}] size [{1}]",path,size);
151
            
152
            var options = new ExecutionDataflowBlockOptions {BoundedCapacity = parallelism,MaxDegreeOfParallelism=parallelism};
153
            var hashBlock=new ActionBlock<Tuple<int,byte[]>>(input=>
154
                              {
155
                                  int idx = input.Item1;
156
                                  byte[] block = input.Item2;
157
                                  using (var hasher = HashAlgorithm.Create(algorithm))
158
                                  {
159
                                      //This code was added for compatibility with the way Pithos calculates the last hash
160
                                      //We calculate the hash only up to the last non-null byte
161
                                      var lastByteIndex = Array.FindLastIndex(block, block.Length-1, aByte => aByte != 0);
162

    
163
                                      var hash = hasher.ComputeHash(block, 0, lastByteIndex + 1);
164
                                      hashes[idx] = hash;
165
                                  }                                  
166
                              },options);
167

    
168
            var buffer = new byte[blockSize];
169
            int read;
170
            int index = 0;
171
            while ((read = await stream.ReadAsync(buffer, 0, blockSize)) > 0)
172
            {
173
                var block = new byte[read];
174
                Buffer.BlockCopy(buffer, 0, block, 0, read);
175
                await hashBlock.SendAsync(Tuple.Create(index, block));
176
                index += read;
177
            }
178
            
179

    
180
            hashBlock.Complete();
181
            await hashBlock.Completion;
182

    
183
            return hashes;
184
        } 
185
        
186
        public static async Task<ConcurrentDictionary<int, byte[]>> CalculateBlockHashesInPlace(FileStream stream, int blockSize, string algorithm, int parallelism)
187
        {
188
            if (stream == null)
189
                throw new ArgumentNullException("stream");
190
            if (String.IsNullOrWhiteSpace(algorithm))
191
                throw new ArgumentNullException("algorithm");
192
            if (blockSize <= 0)
193
                throw new ArgumentOutOfRangeException("blockSize", "blockSize must be a value greater than zero ");
194
            Contract.EndContractBlock();
195

    
196
            var hashes = new ConcurrentDictionary<int, byte[]>();
197

    
198
            var path = stream.Name;
199
            var size = stream.Length;
200
            Log.DebugFormat("Hashing [{0}] size [{1}]",path,size);
201
            
202

    
203
            var buffer = new byte[blockSize];
204
            var index = 0;
205
            using (var hasher = HashAlgorithm.Create(algorithm))
206
            {
207
                int read;
208
                while ((read = await stream.ReadAsync(buffer, 0, blockSize)) > 0)
209
                {
210
                    //This code was added for compatibility with the way Pithos calculates the last hash
211
                    //We calculate the hash only up to the last non-null byte
212
                    var lastByteIndex = Array.FindLastIndex(buffer, read - 1, aByte => aByte != 0);
213

    
214
                    var hash = hasher.ComputeHash(buffer, 0, lastByteIndex + 1);
215
                    Log.DebugFormat("Hashed [{0}] [{1}/{2}] [{3:p}]", path, index,size,(double)index/size);
216
                    hashes[index] = hash;
217
                    index += read;
218
                }
219
            }
220
            return hashes;
221
        }
222
*/
223

    
224
        private static IBufferPool _bufferMgr;
225

    
226
        private static readonly object _syncRoot = new object();
227

    
228

    
229
        private static IBufferPool GetBufferManager(int blockSize,int parallelism)
230
        {
231
            lock (_syncRoot)
232
            {
233
                if (_bufferMgr == null)
234
                    _bufferMgr = new BufferPool(parallelism*blockSize, blockSize);
235
            }
236

    
237
            return _bufferMgr;
238
        }
239

    
240
        //public static async Task<ConcurrentDictionary<long, byte[]>> CalculateBlockHashesInPlacePFor(FileStream stream, int blockSize, string algorithm, int parallelism, Action<long, byte[], int> postAction,CancellationToken token, IProgress<double> progress)
241
        public static async Task<ConcurrentDictionary<long, byte[]>> CalculateBlockHashesInPlacePFor(FileStream stream, int blockSize, string algorithm, byte parallelism, CancellationToken token, IProgress<HashProgress> progress)
242
        {
243
            if (stream == null)
244
                throw new ArgumentNullException("stream");
245
            if (String.IsNullOrWhiteSpace(algorithm))
246
                throw new ArgumentNullException("algorithm");
247
            if (blockSize <= 0)
248
                throw new ArgumentOutOfRangeException("blockSize", "blockSize must be a value greater than zero ");
249
            Contract.EndContractBlock();
250

    
251
            var hashes = new ConcurrentDictionary<long, byte[]>();
252

    
253
            var path = stream.Name;
254
            var size = stream.Length;
255
            Log.DebugFormat("Hashing [{0}] size [{1}]", path, size);
256

    
257
            //TODO: Handle zero-length files
258
            if (size == 0)
259
            {
260
                var buf = new byte[0];
261
                using (var hasher = HashAlgorithm.Create(algorithm))
262
                {
263
                    hashes[0] = hasher.ComputeHash(buf);
264
                    return hashes;
265
                }
266
            }
267

    
268
            var blocks = size<blockSize?1:size/blockSize;
269

    
270
            var actualHashers = parallelism > blocks ? (byte)blocks : parallelism;
271

    
272
            var buffer = new byte[actualHashers][];
273
            var hashers = new HashAlgorithm[actualHashers];
274
            var bufferManager = GetBufferManager(blockSize, actualHashers);
275
            try
276
            {
277
                for (var i = 0; i < actualHashers; i++)
278
                {
279
                    buffer[i] = bufferManager.TakeBuffer(blockSize);// new byte[blockSize];
280
                    hashers[i] = HashAlgorithm.Create(algorithm);
281
                }
282

    
283
                var indices = new long[actualHashers];
284
                var bufferCount = new int[actualHashers];
285

    
286
                int read;
287
                int bufIdx = 0;
288
                long index = 0;
289

    
290
                //long block = 0;
291

    
292
                while ((read = await stream.ReadAsync(buffer[bufIdx], 0, blockSize).ConfigureAwait(false)) > 0)
293
                {
294
                    index += read;
295
                    indices[bufIdx] = index;
296
                    bufferCount[bufIdx] = read;
297
                    //postAction(block++, buffer[bufIdx], read);
298

    
299
                    //If we have filled the last buffer or if we have read from the last block,
300
                    //we can calculate the clocks in parallel
301
                    if (bufIdx == actualHashers - 1 || read < blockSize)
302
                    {
303
                        var options = new ParallelOptions {MaxDegreeOfParallelism = actualHashers,CancellationToken=token};
304

    
305
                        Parallel.For(0, bufIdx + 1, options,(idx,state) =>
306
                                                        {
307
                                                            //This code was added for compatibility with the way Pithos calculates the last hash
308
                                                            //We calculate the hash only up to the last non-null byte
309
                                                            options.CancellationToken.ThrowIfCancellationRequested();
310
                                                            var lastByteIndex = Array.FindLastIndex(buffer[idx],
311
                                                                                                    bufferCount[idx] - 1,
312
                                                                                                    aByte => aByte != 0);
313

    
314
                                                            var hasher = hashers[idx];
315

    
316
                                                            byte[] hash=hasher.ComputeHash(buffer[idx],0,lastByteIndex+1);
317
/*
318
                                                            if (buffer[idx].Length == lastByteIndex || lastByteIndex==-1)
319
                                                                hash = hasher.Digest(buffer[idx]);
320
                                                            else
321
                                                            {
322
                                                                var buf=new byte[lastByteIndex+1];
323
                                                                Buffer.BlockCopy(buffer[idx],0,buf,0,lastByteIndex+1);
324
                                                                hash = hasher.Digest(buf);
325
                                                            }
326
*/
327

    
328
                                                            
329
                                                            
330
                                                            var filePosition = indices[idx];
331
                                                            /*
332
                                                        Trace.TraceInformation("Hashed [{0}] [{1}/{2}] [{3:p}]", path,
333
                                                                                filePosition, size,
334
                                                                                (double)filePosition / size);
335
                            */
336
                                                            hashes[filePosition] = hash;
337
                                                            //Do not report for files smaller than 4MB
338
                                                            if (progress != null && stream.Length > 4*1024*1024)
339
                                                                progress.Report(new HashProgress((long)hashes.Count*blockSize,stream.Length));
340
                                                        });
341
                    }
342
                    bufIdx = (bufIdx +1)%actualHashers;
343
                }
344
            }
345
            finally
346
            {
347
                for (var i = 0; i < actualHashers; i++)
348
                {
349
                    if (hashers[i] != null)
350
                        hashers[i].Dispose();
351
                    bufferManager.ReturnBuffer(buffer[i]);
352
                }
353

    
354
            }
355

    
356
            return hashes;
357
        }
358

    
359
        static BlockHashAlgorithms()
360
        {
361
/*
362
            CalculateBlockHash = CalculateBlockHashesRecursiveAsync;
363
*/
364
        }
365
    }
366
}