Added SnapshotDifferencer.cs to calculate the difference between the current and...
[pithos-ms-client] / trunk / Pithos.Network / BlockHashAlgorithms.cs
diff --git a/trunk/Pithos.Network/BlockHashAlgorithms.cs b/trunk/Pithos.Network/BlockHashAlgorithms.cs
new file mode 100644 (file)
index 0000000..58757cd
--- /dev/null
@@ -0,0 +1,149 @@
+// -----------------------------------------------------------------------\r
+// <copyright file="BlockHashAlgorithms.cs" company="Microsoft">\r
+// TODO: Update copyright text.\r
+// </copyright>\r
+// -----------------------------------------------------------------------\r
+\r
+using System.Collections.Concurrent;\r
+using System.Diagnostics.Contracts;\r
+using System.IO;\r
+using System.Security.Cryptography;\r
+using System.Threading.Tasks;\r
+using System.Threading.Tasks.Dataflow;\r
+\r
+namespace Pithos.Network\r
+{\r
+    using System;\r
+    using System.Collections.Generic;\r
+    using System.Linq;\r
+    using System.Text;\r
+\r
+    /// <summary>\r
+    /// TODO: Update summary.\r
+    /// </summary>\r
+    public static class BlockHashAlgorithms\r
+    {\r
+        public static Func<FileStream, int, string, ConcurrentDictionary<int, byte[]>, int, Task<ConcurrentDictionary<int, byte[]>>> CalculateBlockHash;\r
+\r
+        public static Task<ConcurrentDictionary<int, byte[]>> CalculateBlockHashesRecursiveAsync(FileStream stream, int blockSize, string algorithm, ConcurrentDictionary<int, byte[]> hashes = null, int index = 0)\r
+        {\r
+            if (stream == null)\r
+                throw new ArgumentNullException("stream");\r
+            if (String.IsNullOrWhiteSpace(algorithm))\r
+                throw new ArgumentNullException("algorithm");\r
+            if (blockSize <= 0)\r
+                throw new ArgumentOutOfRangeException("blockSize", "blockSize must be a value greater than zero ");\r
+            if (index < 0)\r
+                throw new ArgumentOutOfRangeException("index", "index must be a non-negative value");\r
+            Contract.EndContractBlock();\r
+\r
+\r
+            if (hashes == null)\r
+                hashes = new ConcurrentDictionary<int, byte[]>();\r
+\r
+            var buffer = new byte[blockSize];\r
+            return stream.ReadAsync(buffer, 0, blockSize).ContinueWith(t =>\r
+            {\r
+                var read = t.Result;\r
+\r
+                var nextTask = read == blockSize\r
+                                    ? CalculateBlockHashesRecursiveAsync(stream, blockSize, algorithm, hashes, index + 1)\r
+                                    : Task.Factory.StartNew(() => hashes);\r
+                if (read > 0)\r
+                    using (var hasher = HashAlgorithm.Create(algorithm))\r
+                    {\r
+                        //This code was added for compatibility with the way Pithos calculates the last hash\r
+                        //We calculate the hash only up to the last non-null byte\r
+                        var lastByteIndex = Array.FindLastIndex(buffer, read - 1, aByte => aByte != 0);\r
+\r
+                        var hash = hasher.ComputeHash(buffer, 0, lastByteIndex + 1);\r
+                        hashes[index] = hash;\r
+                    }\r
+                return nextTask;\r
+            }).Unwrap();\r
+        }\r
+        public static async Task<ConcurrentDictionary<int, byte[]>> CalculateBlockHashesAsync(FileStream stream, int blockSize, string algorithm, ConcurrentDictionary<int, byte[]> hashes = null, int index = 0)\r
+        {\r
+            if (stream == null)\r
+                throw new ArgumentNullException("stream");\r
+            if (String.IsNullOrWhiteSpace(algorithm))\r
+                throw new ArgumentNullException("algorithm");\r
+            if (blockSize <= 0)\r
+                throw new ArgumentOutOfRangeException("blockSize", "blockSize must be a value greater than zero ");\r
+            Contract.EndContractBlock();\r
+\r
+\r
+            if (hashes == null)\r
+                hashes = new ConcurrentDictionary<int, byte[]>();\r
+\r
+            var buffer = new byte[blockSize];\r
+            int read;\r
+            while ((read = await stream.ReadAsync(buffer, 0, blockSize)) > 0)\r
+            {\r
+                //TODO: identify the value of index\r
+\r
+                using (var hasher = HashAlgorithm.Create(algorithm))\r
+                {\r
+                    //This code was added for compatibility with the way Pithos calculates the last hash\r
+                    //We calculate the hash only up to the last non-null byte\r
+                    var lastByteIndex = Array.FindLastIndex(buffer, read - 1, aByte => aByte != 0);\r
+\r
+                    var hash = hasher.ComputeHash(buffer, 0, lastByteIndex + 1);\r
+                    hashes[index] = hash;\r
+                }\r
+                index += read;\r
+            };\r
+            return hashes;\r
+        }\r
+        \r
+        public static async Task<ConcurrentDictionary<int, byte[]>> CalculateBlockHashesAgentAsync(FileStream stream, int blockSize, string algorithm, int parallelism)\r
+        {\r
+            if (stream == null)\r
+                throw new ArgumentNullException("stream");\r
+            if (String.IsNullOrWhiteSpace(algorithm))\r
+                throw new ArgumentNullException("algorithm");\r
+            if (blockSize <= 0)\r
+                throw new ArgumentOutOfRangeException("blockSize", "blockSize must be a value greater than zero ");\r
+            Contract.EndContractBlock();\r
+\r
+            var hashes = new ConcurrentDictionary<int, byte[]>();\r
+\r
+            var options = new ExecutionDataflowBlockOptions {BoundedCapacity = parallelism,MaxDegreeOfParallelism=parallelism};\r
+            var hashBlock=new ActionBlock<Tuple<int,byte[]>>(input=>\r
+                              {\r
+                                  int idx = input.Item1;\r
+                                  byte[] block = input.Item2;\r
+                                  using (var hasher = HashAlgorithm.Create(algorithm))\r
+                                  {\r
+                                      //This code was added for compatibility with the way Pithos calculates the last hash\r
+                                      //We calculate the hash only up to the last non-null byte\r
+                                      var lastByteIndex = Array.FindLastIndex(block, block.Length-1, aByte => aByte != 0);\r
+\r
+                                      var hash = hasher.ComputeHash(block, 0, lastByteIndex + 1);\r
+                                      hashes[idx] = hash;\r
+                                  }                                  \r
+                              },options);\r
+\r
+            var buffer = new byte[blockSize];\r
+            int read;\r
+            int index = 0;\r
+            while ((read = await stream.ReadAsync(buffer, 0, blockSize)) > 0)\r
+            {\r
+                var block = new byte[read];\r
+                Buffer.BlockCopy(buffer,0,block,0,read);\r
+                await hashBlock.SendAsync(Tuple.Create(index, block));\r
+                index += read;\r
+            };\r
+\r
+            hashBlock.Complete();\r
+            await hashBlock.Completion;\r
+\r
+            return hashes;\r
+        }\r
+\r
+        static BlockHashAlgorithms()\r
+        {\r
+            CalculateBlockHash = CalculateBlockHashesRecursiveAsync;\r
+        }\r
+    }\r
+}\r