--- /dev/null
+// -----------------------------------------------------------------------\r
+// <copyright file="BlockHashAlgorithms.cs" company="Microsoft">\r
+// TODO: Update copyright text.\r
+// </copyright>\r
+// -----------------------------------------------------------------------\r
+\r
+using System.Collections.Concurrent;\r
+using System.Diagnostics.Contracts;\r
+using System.IO;\r
+using System.Security.Cryptography;\r
+using System.Threading.Tasks;\r
+using System.Threading.Tasks.Dataflow;\r
+\r
+namespace Pithos.Network\r
+{\r
+ using System;\r
+ using System.Collections.Generic;\r
+ using System.Linq;\r
+ using System.Text;\r
+\r
+ /// <summary>\r
+ /// TODO: Update summary.\r
+ /// </summary>\r
+ public static class BlockHashAlgorithms\r
+ {\r
+ public static Func<FileStream, int, string, ConcurrentDictionary<int, byte[]>, int, Task<ConcurrentDictionary<int, byte[]>>> CalculateBlockHash;\r
+\r
+ public static Task<ConcurrentDictionary<int, byte[]>> CalculateBlockHashesRecursiveAsync(FileStream stream, int blockSize, string algorithm, ConcurrentDictionary<int, byte[]> hashes = null, int index = 0)\r
+ {\r
+ if (stream == null)\r
+ throw new ArgumentNullException("stream");\r
+ if (String.IsNullOrWhiteSpace(algorithm))\r
+ throw new ArgumentNullException("algorithm");\r
+ if (blockSize <= 0)\r
+ throw new ArgumentOutOfRangeException("blockSize", "blockSize must be a value greater than zero ");\r
+ if (index < 0)\r
+ throw new ArgumentOutOfRangeException("index", "index must be a non-negative value");\r
+ Contract.EndContractBlock();\r
+\r
+\r
+ if (hashes == null)\r
+ hashes = new ConcurrentDictionary<int, byte[]>();\r
+\r
+ var buffer = new byte[blockSize];\r
+ return stream.ReadAsync(buffer, 0, blockSize).ContinueWith(t =>\r
+ {\r
+ var read = t.Result;\r
+\r
+ var nextTask = read == blockSize\r
+ ? CalculateBlockHashesRecursiveAsync(stream, blockSize, algorithm, hashes, index + 1)\r
+ : Task.Factory.StartNew(() => hashes);\r
+ if (read > 0)\r
+ using (var hasher = HashAlgorithm.Create(algorithm))\r
+ {\r
+ //This code was added for compatibility with the way Pithos calculates the last hash\r
+ //We calculate the hash only up to the last non-null byte\r
+ var lastByteIndex = Array.FindLastIndex(buffer, read - 1, aByte => aByte != 0);\r
+\r
+ var hash = hasher.ComputeHash(buffer, 0, lastByteIndex + 1);\r
+ hashes[index] = hash;\r
+ }\r
+ return nextTask;\r
+ }).Unwrap();\r
+ }\r
+ public static async Task<ConcurrentDictionary<int, byte[]>> CalculateBlockHashesAsync(FileStream stream, int blockSize, string algorithm, ConcurrentDictionary<int, byte[]> hashes = null, int index = 0)\r
+ {\r
+ if (stream == null)\r
+ throw new ArgumentNullException("stream");\r
+ if (String.IsNullOrWhiteSpace(algorithm))\r
+ throw new ArgumentNullException("algorithm");\r
+ if (blockSize <= 0)\r
+ throw new ArgumentOutOfRangeException("blockSize", "blockSize must be a value greater than zero ");\r
+ Contract.EndContractBlock();\r
+\r
+\r
+ if (hashes == null)\r
+ hashes = new ConcurrentDictionary<int, byte[]>();\r
+\r
+ var buffer = new byte[blockSize];\r
+ int read;\r
+ while ((read = await stream.ReadAsync(buffer, 0, blockSize)) > 0)\r
+ {\r
+ //TODO: identify the value of index\r
+\r
+ using (var hasher = HashAlgorithm.Create(algorithm))\r
+ {\r
+ //This code was added for compatibility with the way Pithos calculates the last hash\r
+ //We calculate the hash only up to the last non-null byte\r
+ var lastByteIndex = Array.FindLastIndex(buffer, read - 1, aByte => aByte != 0);\r
+\r
+ var hash = hasher.ComputeHash(buffer, 0, lastByteIndex + 1);\r
+ hashes[index] = hash;\r
+ }\r
+ index += read;\r
+ };\r
+ return hashes;\r
+ }\r
+ \r
+ public static async Task<ConcurrentDictionary<int, byte[]>> CalculateBlockHashesAgentAsync(FileStream stream, int blockSize, string algorithm, int parallelism)\r
+ {\r
+ if (stream == null)\r
+ throw new ArgumentNullException("stream");\r
+ if (String.IsNullOrWhiteSpace(algorithm))\r
+ throw new ArgumentNullException("algorithm");\r
+ if (blockSize <= 0)\r
+ throw new ArgumentOutOfRangeException("blockSize", "blockSize must be a value greater than zero ");\r
+ Contract.EndContractBlock();\r
+\r
+ var hashes = new ConcurrentDictionary<int, byte[]>();\r
+\r
+ var options = new ExecutionDataflowBlockOptions {BoundedCapacity = parallelism,MaxDegreeOfParallelism=parallelism};\r
+ var hashBlock=new ActionBlock<Tuple<int,byte[]>>(input=>\r
+ {\r
+ int idx = input.Item1;\r
+ byte[] block = input.Item2;\r
+ using (var hasher = HashAlgorithm.Create(algorithm))\r
+ {\r
+ //This code was added for compatibility with the way Pithos calculates the last hash\r
+ //We calculate the hash only up to the last non-null byte\r
+ var lastByteIndex = Array.FindLastIndex(block, block.Length-1, aByte => aByte != 0);\r
+\r
+ var hash = hasher.ComputeHash(block, 0, lastByteIndex + 1);\r
+ hashes[idx] = hash;\r
+ } \r
+ },options);\r
+\r
+ var buffer = new byte[blockSize];\r
+ int read;\r
+ int index = 0;\r
+ while ((read = await stream.ReadAsync(buffer, 0, blockSize)) > 0)\r
+ {\r
+ var block = new byte[read];\r
+ Buffer.BlockCopy(buffer,0,block,0,read);\r
+ await hashBlock.SendAsync(Tuple.Create(index, block));\r
+ index += read;\r
+ };\r
+\r
+ hashBlock.Complete();\r
+ await hashBlock.Completion;\r
+\r
+ return hashes;\r
+ }\r
+\r
+ static BlockHashAlgorithms()\r
+ {\r
+ CalculateBlockHash = CalculateBlockHashesRecursiveAsync;\r
+ }\r
+ }\r
+}\r