Statistics
| Branch: | Revision:

root / trunk / Pithos.Network / BlockHashAlgorithms.cs @ 255f5f86

History | View | Annotate | Download (8.2 kB)

1
#region
2
/* -----------------------------------------------------------------------
3
 * <copyright file="BlockHashAlgorithms.cs" company="GRNet">
4
 * 
5
 * Copyright 2011-2012 GRNET S.A. All rights reserved.
6
 *
7
 * Redistribution and use in source and binary forms, with or
8
 * without modification, are permitted provided that the following
9
 * conditions are met:
10
 *
11
 *   1. Redistributions of source code must retain the above
12
 *      copyright notice, this list of conditions and the following
13
 *      disclaimer.
14
 *
15
 *   2. Redistributions in binary form must reproduce the above
16
 *      copyright notice, this list of conditions and the following
17
 *      disclaimer in the documentation and/or other materials
18
 *      provided with the distribution.
19
 *
20
 *
21
 * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
22
 * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
23
 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
24
 * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
25
 * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26
 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27
 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
28
 * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
29
 * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
30
 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
31
 * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
32
 * POSSIBILITY OF SUCH DAMAGE.
33
 *
34
 * The views and conclusions contained in the software and
35
 * documentation are those of the authors and should not be
36
 * interpreted as representing official policies, either expressed
37
 * or implied, of GRNET S.A.
38
 * </copyright>
39
 * -----------------------------------------------------------------------
40
 */
41
#endregion
42
using System.Collections.Concurrent;
43
using System.Diagnostics.Contracts;
44
using System.IO;
45
using System.Security.Cryptography;
46
using System.Threading.Tasks;
47
using System.Threading.Tasks.Dataflow;
48

    
49
namespace Pithos.Network
50
{
51
    using System;
52
    using System.Collections.Generic;
53
    using System.Linq;
54
    using System.Text;
55

    
56
    /// <summary>
57
    /// TODO: Update summary.
58
    /// </summary>
59
    public static class BlockHashAlgorithms
60
    {
61
        public static Func<FileStream, int, string, ConcurrentDictionary<int, byte[]>, int, Task<ConcurrentDictionary<int, byte[]>>> CalculateBlockHash;
62

    
63
        public static Task<ConcurrentDictionary<int, byte[]>> CalculateBlockHashesRecursiveAsync(FileStream stream, int blockSize, string algorithm, ConcurrentDictionary<int, byte[]> hashes = null, int index = 0)
64
        {
65
            if (stream == null)
66
                throw new ArgumentNullException("stream");
67
            if (String.IsNullOrWhiteSpace(algorithm))
68
                throw new ArgumentNullException("algorithm");
69
            if (blockSize <= 0)
70
                throw new ArgumentOutOfRangeException("blockSize", "blockSize must be a value greater than zero ");
71
            if (index < 0)
72
                throw new ArgumentOutOfRangeException("index", "index must be a non-negative value");
73
            Contract.EndContractBlock();
74

    
75

    
76
            if (hashes == null)
77
                hashes = new ConcurrentDictionary<int, byte[]>();
78

    
79
            var buffer = new byte[blockSize];
80
            return stream.ReadAsync(buffer, 0, blockSize).ContinueWith(t =>
81
            {
82
                var read = t.Result;
83

    
84
                var nextTask = read == blockSize
85
                                    ? CalculateBlockHashesRecursiveAsync(stream, blockSize, algorithm, hashes, index + 1)
86
                                    : Task.Factory.StartNew(() => hashes);
87
                if (read > 0)
88
                    using (var hasher = HashAlgorithm.Create(algorithm))
89
                    {
90
                        //This code was added for compatibility with the way Pithos calculates the last hash
91
                        //We calculate the hash only up to the last non-null byte
92
                        var lastByteIndex = Array.FindLastIndex(buffer, read - 1, aByte => aByte != 0);
93

    
94
                        var hash = hasher.ComputeHash(buffer, 0, lastByteIndex + 1);
95
                        hashes[index] = hash;
96
                    }
97
                return nextTask;
98
            }).Unwrap();
99
        }
100
        public static async Task<ConcurrentDictionary<int, byte[]>> CalculateBlockHashesAsync(FileStream stream, int blockSize, string algorithm, ConcurrentDictionary<int, byte[]> hashes = null, int index = 0)
101
        {
102
            if (stream == null)
103
                throw new ArgumentNullException("stream");
104
            if (String.IsNullOrWhiteSpace(algorithm))
105
                throw new ArgumentNullException("algorithm");
106
            if (blockSize <= 0)
107
                throw new ArgumentOutOfRangeException("blockSize", "blockSize must be a value greater than zero ");
108
            Contract.EndContractBlock();
109

    
110

    
111
            if (hashes == null)
112
                hashes = new ConcurrentDictionary<int, byte[]>();
113

    
114
            var buffer = new byte[blockSize];
115
            int read;
116
            while ((read = await stream.ReadAsync(buffer, 0, blockSize)) > 0)
117
            {
118
                //TODO: identify the value of index
119

    
120
                using (var hasher = HashAlgorithm.Create(algorithm))
121
                {
122
                    //This code was added for compatibility with the way Pithos calculates the last hash
123
                    //We calculate the hash only up to the last non-null byte
124
                    var lastByteIndex = Array.FindLastIndex(buffer, read - 1, aByte => aByte != 0);
125

    
126
                    var hash = hasher.ComputeHash(buffer, 0, lastByteIndex + 1);
127
                    hashes[index] = hash;
128
                }
129
                index += read;
130
            };
131
            return hashes;
132
        }
133
        
134
        public static async Task<ConcurrentDictionary<int, byte[]>> CalculateBlockHashesAgentAsync(FileStream stream, int blockSize, string algorithm, int parallelism)
135
        {
136
            if (stream == null)
137
                throw new ArgumentNullException("stream");
138
            if (String.IsNullOrWhiteSpace(algorithm))
139
                throw new ArgumentNullException("algorithm");
140
            if (blockSize <= 0)
141
                throw new ArgumentOutOfRangeException("blockSize", "blockSize must be a value greater than zero ");
142
            Contract.EndContractBlock();
143

    
144
            var hashes = new ConcurrentDictionary<int, byte[]>();
145

    
146
            var options = new ExecutionDataflowBlockOptions {BoundedCapacity = parallelism,MaxDegreeOfParallelism=parallelism};
147
            var hashBlock=new ActionBlock<Tuple<int,byte[]>>(input=>
148
                              {
149
                                  int idx = input.Item1;
150
                                  byte[] block = input.Item2;
151
                                  using (var hasher = HashAlgorithm.Create(algorithm))
152
                                  {
153
                                      //This code was added for compatibility with the way Pithos calculates the last hash
154
                                      //We calculate the hash only up to the last non-null byte
155
                                      var lastByteIndex = Array.FindLastIndex(block, block.Length-1, aByte => aByte != 0);
156

    
157
                                      var hash = hasher.ComputeHash(block, 0, lastByteIndex + 1);
158
                                      hashes[idx] = hash;
159
                                  }                                  
160
                              },options);
161

    
162
            var buffer = new byte[blockSize];
163
            int read;
164
            int index = 0;
165
            while ((read = await stream.ReadAsync(buffer, 0, blockSize)) > 0)
166
            {
167
                var block = new byte[read];
168
                Buffer.BlockCopy(buffer,0,block,0,read);
169
                await hashBlock.SendAsync(Tuple.Create(index, block));
170
                index += read;
171
            };
172

    
173
            hashBlock.Complete();
174
            await hashBlock.Completion;
175

    
176
            return hashes;
177
        }
178

    
179
        static BlockHashAlgorithms()
180
        {
181
            CalculateBlockHash = CalculateBlockHashesRecursiveAsync;
182
        }
183
    }
184
}