Statistics
| Branch: | Revision:

root / trunk / Pithos.Network / BlockHashAlgorithms.cs @ 422c9598

History | View | Annotate | Download (6.7 kB)

1
// -----------------------------------------------------------------------
2
// <copyright file="BlockHashAlgorithms.cs" company="Microsoft">
3
// TODO: Update copyright text.
4
// </copyright>
5
// -----------------------------------------------------------------------
6

    
7
using System.Collections.Concurrent;
8
using System.Diagnostics.Contracts;
9
using System.IO;
10
using System.Security.Cryptography;
11
using System.Threading.Tasks;
12
using System.Threading.Tasks.Dataflow;
13

    
14
namespace Pithos.Network
15
{
16
    using System;
17
    using System.Collections.Generic;
18
    using System.Linq;
19
    using System.Text;
20

    
21
    /// <summary>
22
    /// TODO: Update summary.
23
    /// </summary>
24
    public static class BlockHashAlgorithms
25
    {
26
        public static Func<FileStream, int, string, ConcurrentDictionary<int, byte[]>, int, Task<ConcurrentDictionary<int, byte[]>>> CalculateBlockHash;
27

    
28
        public static Task<ConcurrentDictionary<int, byte[]>> CalculateBlockHashesRecursiveAsync(FileStream stream, int blockSize, string algorithm, ConcurrentDictionary<int, byte[]> hashes = null, int index = 0)
29
        {
30
            if (stream == null)
31
                throw new ArgumentNullException("stream");
32
            if (String.IsNullOrWhiteSpace(algorithm))
33
                throw new ArgumentNullException("algorithm");
34
            if (blockSize <= 0)
35
                throw new ArgumentOutOfRangeException("blockSize", "blockSize must be a value greater than zero ");
36
            if (index < 0)
37
                throw new ArgumentOutOfRangeException("index", "index must be a non-negative value");
38
            Contract.EndContractBlock();
39

    
40

    
41
            if (hashes == null)
42
                hashes = new ConcurrentDictionary<int, byte[]>();
43

    
44
            var buffer = new byte[blockSize];
45
            return stream.ReadAsync(buffer, 0, blockSize).ContinueWith(t =>
46
            {
47
                var read = t.Result;
48

    
49
                var nextTask = read == blockSize
50
                                    ? CalculateBlockHashesRecursiveAsync(stream, blockSize, algorithm, hashes, index + 1)
51
                                    : Task.Factory.StartNew(() => hashes);
52
                if (read > 0)
53
                    using (var hasher = HashAlgorithm.Create(algorithm))
54
                    {
55
                        //This code was added for compatibility with the way Pithos calculates the last hash
56
                        //We calculate the hash only up to the last non-null byte
57
                        var lastByteIndex = Array.FindLastIndex(buffer, read - 1, aByte => aByte != 0);
58

    
59
                        var hash = hasher.ComputeHash(buffer, 0, lastByteIndex + 1);
60
                        hashes[index] = hash;
61
                    }
62
                return nextTask;
63
            }).Unwrap();
64
        }
65
        public static async Task<ConcurrentDictionary<int, byte[]>> CalculateBlockHashesAsync(FileStream stream, int blockSize, string algorithm, ConcurrentDictionary<int, byte[]> hashes = null, int index = 0)
66
        {
67
            if (stream == null)
68
                throw new ArgumentNullException("stream");
69
            if (String.IsNullOrWhiteSpace(algorithm))
70
                throw new ArgumentNullException("algorithm");
71
            if (blockSize <= 0)
72
                throw new ArgumentOutOfRangeException("blockSize", "blockSize must be a value greater than zero ");
73
            Contract.EndContractBlock();
74

    
75

    
76
            if (hashes == null)
77
                hashes = new ConcurrentDictionary<int, byte[]>();
78

    
79
            var buffer = new byte[blockSize];
80
            int read;
81
            while ((read = await stream.ReadAsync(buffer, 0, blockSize)) > 0)
82
            {
83
                //TODO: identify the value of index
84

    
85
                using (var hasher = HashAlgorithm.Create(algorithm))
86
                {
87
                    //This code was added for compatibility with the way Pithos calculates the last hash
88
                    //We calculate the hash only up to the last non-null byte
89
                    var lastByteIndex = Array.FindLastIndex(buffer, read - 1, aByte => aByte != 0);
90

    
91
                    var hash = hasher.ComputeHash(buffer, 0, lastByteIndex + 1);
92
                    hashes[index] = hash;
93
                }
94
                index += read;
95
            };
96
            return hashes;
97
        }
98
        
99
        public static async Task<ConcurrentDictionary<int, byte[]>> CalculateBlockHashesAgentAsync(FileStream stream, int blockSize, string algorithm, int parallelism)
100
        {
101
            if (stream == null)
102
                throw new ArgumentNullException("stream");
103
            if (String.IsNullOrWhiteSpace(algorithm))
104
                throw new ArgumentNullException("algorithm");
105
            if (blockSize <= 0)
106
                throw new ArgumentOutOfRangeException("blockSize", "blockSize must be a value greater than zero ");
107
            Contract.EndContractBlock();
108

    
109
            var hashes = new ConcurrentDictionary<int, byte[]>();
110

    
111
            var options = new ExecutionDataflowBlockOptions {BoundedCapacity = parallelism,MaxDegreeOfParallelism=parallelism};
112
            var hashBlock=new ActionBlock<Tuple<int,byte[]>>(input=>
113
                              {
114
                                  int idx = input.Item1;
115
                                  byte[] block = input.Item2;
116
                                  using (var hasher = HashAlgorithm.Create(algorithm))
117
                                  {
118
                                      //This code was added for compatibility with the way Pithos calculates the last hash
119
                                      //We calculate the hash only up to the last non-null byte
120
                                      var lastByteIndex = Array.FindLastIndex(block, block.Length-1, aByte => aByte != 0);
121

    
122
                                      var hash = hasher.ComputeHash(block, 0, lastByteIndex + 1);
123
                                      hashes[idx] = hash;
124
                                  }                                  
125
                              },options);
126

    
127
            var buffer = new byte[blockSize];
128
            int read;
129
            int index = 0;
130
            while ((read = await stream.ReadAsync(buffer, 0, blockSize)) > 0)
131
            {
132
                var block = new byte[read];
133
                Buffer.BlockCopy(buffer,0,block,0,read);
134
                await hashBlock.SendAsync(Tuple.Create(index, block));
135
                index += read;
136
            };
137

    
138
            hashBlock.Complete();
139
            await hashBlock.Completion;
140

    
141
            return hashes;
142
        }
143

    
144
        static BlockHashAlgorithms()
145
        {
146
            CalculateBlockHash = CalculateBlockHashesRecursiveAsync;
147
        }
148
    }
149
}