Revision 9d2c0fc0 trunk/Pithos.Network/BlockHashAlgorithms.cs

b/trunk/Pithos.Network/BlockHashAlgorithms.cs
50 50
namespace Pithos.Network
51 51
{
52 52
    using System;
53
    using System.Collections.Generic;
54
    using System.Linq;
55
    using System.Text;
56 53

  
57 54
    /// <summary>
58 55
    /// TODO: Update summary.
......
61 58
    {
62 59
        private static readonly log4net.ILog Log = log4net.LogManager.GetLogger(MethodBase.GetCurrentMethod().DeclaringType);
63 60

  
61
/*
64 62
        public static Func<FileStream, int, string, ConcurrentDictionary<int, byte[]>, int, Task<ConcurrentDictionary<int, byte[]>>> CalculateBlockHash;
65 63

  
66 64
        public static Task<ConcurrentDictionary<int, byte[]>> CalculateBlockHashesRecursiveAsync(FileStream stream, int blockSize, string algorithm, ConcurrentDictionary<int, byte[]> hashes = null, int index = 0)
......
118 116
            int read;
119 117
            while ((read = await stream.ReadAsync(buffer, 0, blockSize)) > 0)
120 118
            {
121
                //TODO: identify the value of index
122

  
123 119
                using (var hasher = HashAlgorithm.Create(algorithm))
124 120
                {
125 121
                    //This code was added for compatibility with the way Pithos calculates the last hash
......
130 126
                    hashes[index] = hash;
131 127
                }
132 128
                index += read;
133
            };
129
            }
134 130
            return hashes;
135 131
        }
136 132
        
......
150 146
            var size = stream.Length;
151 147
            Log.DebugFormat("Hashing [{0}] size [{1}]",path,size);
152 148
            
153
/*
154 149
            var options = new ExecutionDataflowBlockOptions {BoundedCapacity = parallelism,MaxDegreeOfParallelism=parallelism};
155 150
            var hashBlock=new ActionBlock<Tuple<int,byte[]>>(input=>
156 151
                              {
......
166 161
                                      hashes[idx] = hash;
167 162
                                  }                                  
168 163
                              },options);
169
*/
170 164

  
171 165
            var buffer = new byte[blockSize];
172 166
            int read;
173 167
            int index = 0;
168
            while ((read = await stream.ReadAsync(buffer, 0, blockSize)) > 0)
169
            {
170
                var block = new byte[read];
171
                Buffer.BlockCopy(buffer, 0, block, 0, read);
172
                await hashBlock.SendAsync(Tuple.Create(index, block));
173
                index += read;
174
            }
175
            
176

  
177
            hashBlock.Complete();
178
            await hashBlock.Completion;
179

  
180
            return hashes;
181
        } 
182
        
183
        public static async Task<ConcurrentDictionary<int, byte[]>> CalculateBlockHashesInPlace(FileStream stream, int blockSize, string algorithm, int parallelism)
184
        {
185
            if (stream == null)
186
                throw new ArgumentNullException("stream");
187
            if (String.IsNullOrWhiteSpace(algorithm))
188
                throw new ArgumentNullException("algorithm");
189
            if (blockSize <= 0)
190
                throw new ArgumentOutOfRangeException("blockSize", "blockSize must be a value greater than zero ");
191
            Contract.EndContractBlock();
192

  
193
            var hashes = new ConcurrentDictionary<int, byte[]>();
194

  
195
            var path = stream.Name;
196
            var size = stream.Length;
197
            Log.DebugFormat("Hashing [{0}] size [{1}]",path,size);
198
            
199

  
200
            var buffer = new byte[blockSize];
201
            var index = 0;
174 202
            using (var hasher = HashAlgorithm.Create(algorithm))
175 203
            {
204
                int read;
176 205
                while ((read = await stream.ReadAsync(buffer, 0, blockSize)) > 0)
177 206
                {
178
                    //                var block = new byte[read];
179

  
180 207
                    //This code was added for compatibility with the way Pithos calculates the last hash
181 208
                    //We calculate the hash only up to the last non-null byte
182 209
                    var lastByteIndex = Array.FindLastIndex(buffer, read - 1, aByte => aByte != 0);
......
186 213
                    hashes[index] = hash;
187 214
                    index += read;
188 215
                }
216
            }
217
            return hashes;
218
        }
219
*/
220

  
221
        public static async Task<ConcurrentDictionary<long, byte[]>> CalculateBlockHashesInPlacePFor(FileStream stream, int blockSize, string algorithm, int parallelism)
222
        {
223
            if (stream == null)
224
                throw new ArgumentNullException("stream");
225
            if (String.IsNullOrWhiteSpace(algorithm))
226
                throw new ArgumentNullException("algorithm");
227
            if (blockSize <= 0)
228
                throw new ArgumentOutOfRangeException("blockSize", "blockSize must be a value greater than zero ");
229
            Contract.EndContractBlock();
230

  
231
            var hashes = new ConcurrentDictionary<long, byte[]>();
232

  
233
            var path = stream.Name;
234
            var size = stream.Length;
235
            Log.DebugFormat("Hashing [{0}] size [{1}]", path, size);
189 236

  
190
                /*
191
                                Buffer.BlockCopy(buffer,0,block,0,read);
192
                                await hashBlock.SendAsync(Tuple.Create(index, block));
193
                */
194
                
237

  
238
            var buffer = new byte[parallelism][];
239
            var hashers = new HashAlgorithm[parallelism];
240
            for (var i = 0; i < parallelism; i++)
241
            {
242
                buffer[i] = new byte[blockSize];
243
                hashers[i] = HashAlgorithm.Create(algorithm);
195 244
            }
196
            
245
            try
246
            {
247
                var indices = new long[parallelism];
248
                var bufferCount = new int[parallelism];
197 249

  
198
/*
199
            hashBlock.Complete();
200
            await hashBlock.Completion;
201
*/
250
                int read;
251
                int bufIdx = 0;
252
                long index = 0;
253
                while ((read = await stream.ReadAsync(buffer[bufIdx], 0, blockSize)) > 0)
254
                {
255
                    index += read;
256
                    indices[bufIdx] = index;
257
                    bufferCount[bufIdx] = read;
258
                    //If we have filled the last buffer or if we have read from the last block,
259
                    //we can calculate the clocks in parallel
260
                    if (bufIdx == parallelism - 1 || read < blockSize)
261
                    {
262
                        //var options = new ParallelOptions {MaxDegreeOfParallelism = parallelism};
263
                        Parallel.For(0, bufIdx + 1, idx =>
264
                        {
265
                            //This code was added for compatibility with the way Pithos calculates the last hash
266
                            //We calculate the hash only up to the last non-null byte
267
                            var lastByteIndex = Array.FindLastIndex(buffer[idx],
268
                                                                    bufferCount[idx] - 1,
269
                                                                    aByte => aByte != 0);
270

  
271
                            var hasher = hashers[idx];
272
                            var hash = hasher.ComputeHash(buffer[idx], 0, lastByteIndex + 1);
273
                            var filePosition = indices[idx];
274
                            /*
275
                                                        Trace.TraceInformation("Hashed [{0}] [{1}/{2}] [{3:p}]", path,
276
                                                                                filePosition, size,
277
                                                                                (double)filePosition / size);
278
                            */
279
                            hashes[filePosition] = hash;
280
                        });
281
                    }
282
                    bufIdx = (bufIdx + 1) % parallelism;
283
                }
284
            }
285
            finally
286
            {
287
                for (var i = 0; i < parallelism; i++)
288
                {
289
                    if (hashers[i] != null)
290
                        hashers[i].Dispose();
291
                }
292

  
293
            }
202 294

  
203 295
            return hashes;
204 296
        }
205 297

  
206 298
        static BlockHashAlgorithms()
207 299
        {
300
/*
208 301
            CalculateBlockHash = CalculateBlockHashesRecursiveAsync;
302
*/
209 303
        }
210 304
    }
211 305
}

Also available in: Unified diff