Statistics
| Branch: | Revision:

root / trunk / Pithos.Network / BlockHashAlgorithms.cs @ 9d2c0fc0

History | View | Annotate | Download (13.6 kB)

1
#region
2
/* -----------------------------------------------------------------------
3
 * <copyright file="BlockHashAlgorithms.cs" company="GRNet">
4
 * 
5
 * Copyright 2011-2012 GRNET S.A. All rights reserved.
6
 *
7
 * Redistribution and use in source and binary forms, with or
8
 * without modification, are permitted provided that the following
9
 * conditions are met:
10
 *
11
 *   1. Redistributions of source code must retain the above
12
 *      copyright notice, this list of conditions and the following
13
 *      disclaimer.
14
 *
15
 *   2. Redistributions in binary form must reproduce the above
16
 *      copyright notice, this list of conditions and the following
17
 *      disclaimer in the documentation and/or other materials
18
 *      provided with the distribution.
19
 *
20
 *
21
 * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
22
 * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
23
 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
24
 * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
25
 * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26
 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27
 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
28
 * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
29
 * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
30
 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
31
 * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
32
 * POSSIBILITY OF SUCH DAMAGE.
33
 *
34
 * The views and conclusions contained in the software and
35
 * documentation are those of the authors and should not be
36
 * interpreted as representing official policies, either expressed
37
 * or implied, of GRNET S.A.
38
 * </copyright>
39
 * -----------------------------------------------------------------------
40
 */
41
#endregion
42
using System.Collections.Concurrent;
43
using System.Diagnostics.Contracts;
44
using System.IO;
45
using System.Reflection;
46
using System.Security.Cryptography;
47
using System.Threading.Tasks;
48
using System.Threading.Tasks.Dataflow;
49

    
50
namespace Pithos.Network
51
{
52
    using System;
53

    
54
    /// <summary>
55
    /// TODO: Update summary.
56
    /// </summary>
57
    public static class BlockHashAlgorithms
58
    {
59
        private static readonly log4net.ILog Log = log4net.LogManager.GetLogger(MethodBase.GetCurrentMethod().DeclaringType);
60

    
61
/*
62
        public static Func<FileStream, int, string, ConcurrentDictionary<int, byte[]>, int, Task<ConcurrentDictionary<int, byte[]>>> CalculateBlockHash;
63

    
64
        public static Task<ConcurrentDictionary<int, byte[]>> CalculateBlockHashesRecursiveAsync(FileStream stream, int blockSize, string algorithm, ConcurrentDictionary<int, byte[]> hashes = null, int index = 0)
65
        {
66
            if (stream == null)
67
                throw new ArgumentNullException("stream");
68
            if (String.IsNullOrWhiteSpace(algorithm))
69
                throw new ArgumentNullException("algorithm");
70
            if (blockSize <= 0)
71
                throw new ArgumentOutOfRangeException("blockSize", "blockSize must be a value greater than zero ");
72
            if (index < 0)
73
                throw new ArgumentOutOfRangeException("index", "index must be a non-negative value");
74
            Contract.EndContractBlock();
75

    
76

    
77
            if (hashes == null)
78
                hashes = new ConcurrentDictionary<int, byte[]>();
79

    
80
            var buffer = new byte[blockSize];
81
            return stream.ReadAsync(buffer, 0, blockSize).ContinueWith(t =>
82
            {
83
                var read = t.Result;
84

    
85
                var nextTask = read == blockSize
86
                                    ? CalculateBlockHashesRecursiveAsync(stream, blockSize, algorithm, hashes, index + 1)
87
                                    : Task.Factory.StartNew(() => hashes);
88
                if (read > 0)
89
                    using (var hasher = HashAlgorithm.Create(algorithm))
90
                    {
91
                        //This code was added for compatibility with the way Pithos calculates the last hash
92
                        //We calculate the hash only up to the last non-null byte
93
                        var lastByteIndex = Array.FindLastIndex(buffer, read - 1, aByte => aByte != 0);
94

    
95
                        var hash = hasher.ComputeHash(buffer, 0, lastByteIndex + 1);
96
                        hashes[index] = hash;
97
                    }
98
                return nextTask;
99
            }).Unwrap();
100
        }
101
        public static async Task<ConcurrentDictionary<int, byte[]>> CalculateBlockHashesAsync(FileStream stream, int blockSize, string algorithm, ConcurrentDictionary<int, byte[]> hashes = null, int index = 0)
102
        {
103
            if (stream == null)
104
                throw new ArgumentNullException("stream");
105
            if (String.IsNullOrWhiteSpace(algorithm))
106
                throw new ArgumentNullException("algorithm");
107
            if (blockSize <= 0)
108
                throw new ArgumentOutOfRangeException("blockSize", "blockSize must be a value greater than zero ");
109
            Contract.EndContractBlock();
110

    
111

    
112
            if (hashes == null)
113
                hashes = new ConcurrentDictionary<int, byte[]>();
114

    
115
            var buffer = new byte[blockSize];
116
            int read;
117
            while ((read = await stream.ReadAsync(buffer, 0, blockSize)) > 0)
118
            {
119
                using (var hasher = HashAlgorithm.Create(algorithm))
120
                {
121
                    //This code was added for compatibility with the way Pithos calculates the last hash
122
                    //We calculate the hash only up to the last non-null byte
123
                    var lastByteIndex = Array.FindLastIndex(buffer, read - 1, aByte => aByte != 0);
124

    
125
                    var hash = hasher.ComputeHash(buffer, 0, lastByteIndex + 1);
126
                    hashes[index] = hash;
127
                }
128
                index += read;
129
            }
130
            return hashes;
131
        }
132
        
133
        public static async Task<ConcurrentDictionary<int, byte[]>> CalculateBlockHashesAgentAsync(FileStream stream, int blockSize, string algorithm, int parallelism)
134
        {
135
            if (stream == null)
136
                throw new ArgumentNullException("stream");
137
            if (String.IsNullOrWhiteSpace(algorithm))
138
                throw new ArgumentNullException("algorithm");
139
            if (blockSize <= 0)
140
                throw new ArgumentOutOfRangeException("blockSize", "blockSize must be a value greater than zero ");
141
            Contract.EndContractBlock();
142

    
143
            var hashes = new ConcurrentDictionary<int, byte[]>();
144

    
145
            var path = stream.Name;
146
            var size = stream.Length;
147
            Log.DebugFormat("Hashing [{0}] size [{1}]",path,size);
148
            
149
            var options = new ExecutionDataflowBlockOptions {BoundedCapacity = parallelism,MaxDegreeOfParallelism=parallelism};
150
            var hashBlock=new ActionBlock<Tuple<int,byte[]>>(input=>
151
                              {
152
                                  int idx = input.Item1;
153
                                  byte[] block = input.Item2;
154
                                  using (var hasher = HashAlgorithm.Create(algorithm))
155
                                  {
156
                                      //This code was added for compatibility with the way Pithos calculates the last hash
157
                                      //We calculate the hash only up to the last non-null byte
158
                                      var lastByteIndex = Array.FindLastIndex(block, block.Length-1, aByte => aByte != 0);
159

    
160
                                      var hash = hasher.ComputeHash(block, 0, lastByteIndex + 1);
161
                                      hashes[idx] = hash;
162
                                  }                                  
163
                              },options);
164

    
165
            var buffer = new byte[blockSize];
166
            int read;
167
            int index = 0;
168
            while ((read = await stream.ReadAsync(buffer, 0, blockSize)) > 0)
169
            {
170
                var block = new byte[read];
171
                Buffer.BlockCopy(buffer, 0, block, 0, read);
172
                await hashBlock.SendAsync(Tuple.Create(index, block));
173
                index += read;
174
            }
175
            
176

    
177
            hashBlock.Complete();
178
            await hashBlock.Completion;
179

    
180
            return hashes;
181
        } 
182
        
183
        public static async Task<ConcurrentDictionary<int, byte[]>> CalculateBlockHashesInPlace(FileStream stream, int blockSize, string algorithm, int parallelism)
184
        {
185
            if (stream == null)
186
                throw new ArgumentNullException("stream");
187
            if (String.IsNullOrWhiteSpace(algorithm))
188
                throw new ArgumentNullException("algorithm");
189
            if (blockSize <= 0)
190
                throw new ArgumentOutOfRangeException("blockSize", "blockSize must be a value greater than zero ");
191
            Contract.EndContractBlock();
192

    
193
            var hashes = new ConcurrentDictionary<int, byte[]>();
194

    
195
            var path = stream.Name;
196
            var size = stream.Length;
197
            Log.DebugFormat("Hashing [{0}] size [{1}]",path,size);
198
            
199

    
200
            var buffer = new byte[blockSize];
201
            var index = 0;
202
            using (var hasher = HashAlgorithm.Create(algorithm))
203
            {
204
                int read;
205
                while ((read = await stream.ReadAsync(buffer, 0, blockSize)) > 0)
206
                {
207
                    //This code was added for compatibility with the way Pithos calculates the last hash
208
                    //We calculate the hash only up to the last non-null byte
209
                    var lastByteIndex = Array.FindLastIndex(buffer, read - 1, aByte => aByte != 0);
210

    
211
                    var hash = hasher.ComputeHash(buffer, 0, lastByteIndex + 1);
212
                    Log.DebugFormat("Hashed [{0}] [{1}/{2}] [{3:p}]", path, index,size,(double)index/size);
213
                    hashes[index] = hash;
214
                    index += read;
215
                }
216
            }
217
            return hashes;
218
        }
219
*/
220

    
221
        public static async Task<ConcurrentDictionary<long, byte[]>> CalculateBlockHashesInPlacePFor(FileStream stream, int blockSize, string algorithm, int parallelism)
222
        {
223
            if (stream == null)
224
                throw new ArgumentNullException("stream");
225
            if (String.IsNullOrWhiteSpace(algorithm))
226
                throw new ArgumentNullException("algorithm");
227
            if (blockSize <= 0)
228
                throw new ArgumentOutOfRangeException("blockSize", "blockSize must be a value greater than zero ");
229
            Contract.EndContractBlock();
230

    
231
            var hashes = new ConcurrentDictionary<long, byte[]>();
232

    
233
            var path = stream.Name;
234
            var size = stream.Length;
235
            Log.DebugFormat("Hashing [{0}] size [{1}]", path, size);
236

    
237

    
238
            var buffer = new byte[parallelism][];
239
            var hashers = new HashAlgorithm[parallelism];
240
            for (var i = 0; i < parallelism; i++)
241
            {
242
                buffer[i] = new byte[blockSize];
243
                hashers[i] = HashAlgorithm.Create(algorithm);
244
            }
245
            try
246
            {
247
                var indices = new long[parallelism];
248
                var bufferCount = new int[parallelism];
249

    
250
                int read;
251
                int bufIdx = 0;
252
                long index = 0;
253
                while ((read = await stream.ReadAsync(buffer[bufIdx], 0, blockSize)) > 0)
254
                {
255
                    index += read;
256
                    indices[bufIdx] = index;
257
                    bufferCount[bufIdx] = read;
258
                    //If we have filled the last buffer or if we have read from the last block,
259
                    //we can calculate the clocks in parallel
260
                    if (bufIdx == parallelism - 1 || read < blockSize)
261
                    {
262
                        //var options = new ParallelOptions {MaxDegreeOfParallelism = parallelism};
263
                        Parallel.For(0, bufIdx + 1, idx =>
264
                        {
265
                            //This code was added for compatibility with the way Pithos calculates the last hash
266
                            //We calculate the hash only up to the last non-null byte
267
                            var lastByteIndex = Array.FindLastIndex(buffer[idx],
268
                                                                    bufferCount[idx] - 1,
269
                                                                    aByte => aByte != 0);
270

    
271
                            var hasher = hashers[idx];
272
                            var hash = hasher.ComputeHash(buffer[idx], 0, lastByteIndex + 1);
273
                            var filePosition = indices[idx];
274
                            /*
275
                                                        Trace.TraceInformation("Hashed [{0}] [{1}/{2}] [{3:p}]", path,
276
                                                                                filePosition, size,
277
                                                                                (double)filePosition / size);
278
                            */
279
                            hashes[filePosition] = hash;
280
                        });
281
                    }
282
                    bufIdx = (bufIdx + 1) % parallelism;
283
                }
284
            }
285
            finally
286
            {
287
                for (var i = 0; i < parallelism; i++)
288
                {
289
                    if (hashers[i] != null)
290
                        hashers[i].Dispose();
291
                }
292

    
293
            }
294

    
295
            return hashes;
296
        }
297

    
298
        static BlockHashAlgorithms()
299
        {
300
/*
301
            CalculateBlockHash = CalculateBlockHashesRecursiveAsync;
302
*/
303
        }
304
    }
305
}