Added SnapshotDifferencer.cs to calculate the difference between the current and...
authorPanagiotis Kanavos <pkanavos@gmail.com>
Mon, 13 Feb 2012 21:00:50 +0000 (23:00 +0200)
committerPanagiotis Kanavos <pkanavos@gmail.com>
Mon, 13 Feb 2012 21:00:50 +0000 (23:00 +0200)
Changed hashing to use an ActionBlock with parallelism options.
Added parallelism property in Preferences

22 files changed:
trunk/Pithos.Client.WPF/Configuration/PithosSettings.cs
trunk/Pithos.Client.WPF/Preferences/PreferencesView.xaml
trunk/Pithos.Client.WPF/Properties/Settings.Designer.cs
trunk/Pithos.Client.WPF/Properties/Settings.settings
trunk/Pithos.Client.WPF/Shell/ShellViewModel.cs
trunk/Pithos.Client.WPF/app.config
trunk/Pithos.Core.Test/MockSettings.cs
trunk/Pithos.Core.Test/NetworkAgentTest.cs
trunk/Pithos.Core/Agents/NetworkAgent.cs
trunk/Pithos.Core/Agents/SnapshotDifferencer.cs
trunk/Pithos.Core/PithosMonitor.cs
trunk/Pithos.Interfaces/IPithosSettings.cs
trunk/Pithos.Interfaces/PithosSettingsData.cs
trunk/Pithos.Network.Test/CloudFilesClientTest.cs
trunk/Pithos.Network.Test/SignatureTest.cs
trunk/Pithos.Network/BlockHashAlgorithms.cs [new file with mode: 0644]
trunk/Pithos.Network/CloudFilesClient.cs
trunk/Pithos.Network/Pithos.Network.csproj
trunk/Pithos.Network/Signature.cs
trunk/Pithos.ShellExtensions.Test/TestPithosSettings.cs
trunk/Pithos.ShellExtensions/ShellSettings.cs
trunk/Pithos.ShellExtensions/TestPithosSettings.cs

index 1427786..aa59f66 100644 (file)
@@ -195,6 +195,12 @@ namespace Pithos.Client.WPF.Configuration
                 _settings.StartOnSystemStartup = value;
             }
         }
+
+        public byte HashingParallelism
+        {
+            get { return _settings.HashingParallelism; }
+            set { _settings.HashingParallelism = value; }
+        }
 /*
         public override IEnumerable<string> GetDynamicMemberNames()
         {
index 0103bab..e919285 100644 (file)
                     <Button Content="Refresh Overlays" Name="RefreshOverlays" HorizontalAlignment="Left" Margin="5" Style="{StaticResource ButtonStyle}" Width="Auto" />
                     <TextBlock Text="Polling Interval (secs)" Margin="5"/>
                     <extToolkit:IntegerUpDown x:Name="Settings_PollingInterval" HorizontalAlignment="Left" Width="100" Margin="5,0" Watermark="Enter seconds" Minimum="10" />                    
+                    <TextBlock Text="Hashing Parallelism" Margin="5"/>
+                    <extToolkit:IntegerUpDown x:Name="Settings_HashingParallelism" HorizontalAlignment="Left" Width="100" Margin="5,0" Watermark="Enter number of tasks" Minimum="1" />                    
                 </StackPanel>
             </TabItem>
         </TabControl>
index d354094..aca6439 100644 (file)
@@ -214,7 +214,7 @@ namespace Pithos.Client.WPF.Properties {
         
         [global::System.Configuration.ApplicationScopedSettingAttribute()]
         [global::System.Diagnostics.DebuggerNonUserCodeAttribute()]
-        [global::System.Configuration.DefaultSettingValueAttribute("http://pithos.dev.grnet.gr/im/login")]
+        [global::System.Configuration.DefaultSettingValueAttribute("http://pithos.dev.grnet.gr/login")]
         public string PithosLoginUrl {
             get {
                 return ((string)(this["PithosLoginUrl"]));
@@ -294,5 +294,17 @@ namespace Pithos.Client.WPF.Properties {
                 this["ProxyDomain"] = value;
             }
         }
+        
+        [global::System.Configuration.UserScopedSettingAttribute()]
+        [global::System.Diagnostics.DebuggerNonUserCodeAttribute()]
+        [global::System.Configuration.DefaultSettingValueAttribute("1")]
+        public byte HashingParallelism {
+            get {
+                return ((byte)(this["HashingParallelism"]));
+            }
+            set {
+                this["HashingParallelism"] = value;
+            }
+        }
     }
 }
index a466a69..b39d93b 100644 (file)
@@ -51,7 +51,7 @@
       <Value Profile="(Default)">https://auth.api.rackspacecloud.com</Value>
     </Setting>
     <Setting Name="PithosLoginUrl" Type="System.String" Scope="Application">
-      <Value Profile="(Default)">http://pithos.dev.grnet.gr/im/login</Value>
+      <Value Profile="(Default)">http://pithos.dev.grnet.gr/login</Value>
     </Setting>
     <Setting Name="FeedbackUri" Type="System.String" Scope="Application">
       <Value Profile="(Default)">http://pithos.dev.grnet.gr/tools/feedback</Value>
@@ -74,5 +74,8 @@
     <Setting Name="ProxyDomain" Type="System.String" Scope="User">
       <Value Profile="(Default)" />
     </Setting>
+    <Setting Name="HashingParallelism" Type="System.Byte" Scope="User">
+      <Value Profile="(Default)">1</Value>
+    </Setting>
   </Settings>
 </SettingsFile>
\ No newline at end of file
index 025ae09..9240989 100644 (file)
@@ -79,6 +79,9 @@ namespace Pithos.Client.WPF {
                //Logging in the Pithos client is provided by log4net
                private static readonly log4net.ILog Log = log4net.LogManager.GetLogger("Pithos");
 
+        //Lazily initialized File Version info. This is done once and lazily to avoid blocking the UI
+        private Lazy<FileVersionInfo> _fileVersion;
+
                ///<summary>
                /// The Shell depends on MEF to provide implementations for windowManager, events, the status checker service and the settings
                ///</summary>
@@ -103,6 +106,12 @@ namespace Pithos.Client.WPF {
 
                                StatusMessage = "In Synch";
 
+                _fileVersion=  new Lazy<FileVersionInfo>(() =>
+                {
+                    Assembly assembly = Assembly.GetExecutingAssembly();
+                    var fileVersion = FileVersionInfo.GetVersionInfo(assembly.Location);
+                    return fileVersion;
+                });
                                _accounts.CollectionChanged += (sender, e) =>
                                                                                                   {
                                                                                                           NotifyOfPropertyChange(() => OpenFolderCaption);
@@ -473,7 +482,7 @@ namespace Pithos.Client.WPF {
                        }.ToDictionary(s => s.Status);
 
                readonly IWindowManager _windowManager;
-
+           
 
                ///<summary>
                /// Updates the visual status indicators of the application depending on status changes, e.g. icon, stat                
@@ -487,11 +496,9 @@ namespace Pithos.Client.WPF {
                                var info = _iconNames[pithosStatus];
                                StatusIcon = String.Format(@"../Images/{0}.ico", info.IconName);
 
-                               Assembly assembly = Assembly.GetExecutingAssembly();                               
-                               var fileVersion = FileVersionInfo.GetVersionInfo(assembly.Location);
 
 
-                               StatusMessage = String.Format("Pithos {0}\r\n{1}", fileVersion.FileVersion,info.StatusText);
+                               StatusMessage = String.Format("Pithos {0}\r\n{1}", _fileVersion.Value.FileVersion,info.StatusText);
                        }
                        
                        //_events.Publish(new Notification { Title = "Start", Message = "Start Monitoring", Level = TraceLevel.Info});
index 5ba4c02..216c85b 100644 (file)
@@ -81,6 +81,9 @@
       <setting name="ProxyDomain" serializeAs="String">
         <value />
       </setting>
+      <setting name="HashingParallelism" serializeAs="String">
+        <value>1</value>
+      </setting>
     </Pithos.Client.WPF.Properties.Settings>
   </userSettings>
   <connectionStrings>
         <value>https://auth.api.rackspacecloud.com</value>
       </setting>
       <setting name="PithosLoginUrl" serializeAs="String">
-        <value>http://pithos.dev.grnet.gr/im/login</value>
+        <value>http://pithos.dev.grnet.gr/login</value>
       </setting>
       <setting name="FeedbackUri" serializeAs="String">
         <value>http://pithos.dev.grnet.gr/tools/feedback</value>
index ee26ebb..0257e77 100644 (file)
@@ -39,6 +39,8 @@ namespace Pithos.Core.Test
 
         public int PollingInterval { get; set; }
 
+        public byte HashingParallelism { get; set; }
+
 
         public void Save()
         {
index 16d14d4..2ba1e37 100644 (file)
@@ -1,4 +1,5 @@
 using System;
+using System.Collections.Concurrent;
 using System.Collections.Generic;
 using System.IO;
 using System.Linq;
@@ -50,7 +51,7 @@ namespace Pithos.Core.Test
 
             client.DeleteObject(null, FolderConstants.PithosContainer, fileName);
 
-            var treeHash = await Signature.CalculateTreeHashAsync(filePath, accountInfo.BlockSize, accountInfo.BlockHash);
+            var treeHash = await Signature.CalculateTreeHashAsync(filePath, accountInfo.BlockSize, accountInfo.BlockHash, 2);
             var cloudFile = new ObjectInfo {Account = account, Container = "pithos"};
             var fileInfo = new FileInfo(filePath);
 
@@ -96,7 +97,7 @@ namespace Pithos.Core.Test
                 .Wait();
 
             Assert.IsTrue(File.Exists(filePath));
-            var treeHash = Signature.CalculateTreeHashAsync(filePath, accountInfo.BlockSize, accountInfo.BlockHash).Result;
+            var treeHash = Signature.CalculateTreeHashAsync(filePath, accountInfo.BlockSize, accountInfo.BlockHash, 2).Result;
 
             Assert.AreEqual(treeHash.TopHash, newHash.TopHash);
 
index 02c0ec7..b55c52b 100644 (file)
@@ -413,7 +413,7 @@ namespace Pithos.Core.Agents
                     cloudAction.TopHash =
                         new Lazy<string>(() => Signature.CalculateTreeHashAsync(localFile,
                                                                                 accountInfo.BlockSize,
-                                                                                accountInfo.BlockHash).Result
+                                                                                accountInfo.BlockHash, Settings.HashingParallelism).Result
                                                     .TopHash.ToHashString());
                 else
                 {
@@ -525,7 +525,10 @@ namespace Pithos.Core.Agents
                 _pauseAgent.Wait();
 
                 Log.Info("Scheduled");
-                var client=new CloudFilesClient(accountInfo);
+                var client = new CloudFilesClient(accountInfo)
+                                 {
+                                     Proxy = PithosMonitor.ProxyFromSettings(this.Settings)
+                                 };
 
                 var containers = client.ListContainers(accountInfo.UserName);
 
@@ -607,12 +610,22 @@ namespace Pithos.Core.Agents
                                                             StringComparison.InvariantCultureIgnoreCase)
                                      select info).ToList();
 
+                        //TODO: Introduced state here, must remove somehow
+                        //Must move all this elsewhere
+                        SnapshotDifferencer differencer;
+                        if (!_differencers.TryGetValue(accountInfo.UserName,out differencer))
+                        {
+                            differencer = new SnapshotDifferencer();
+                            _differencers[accountInfo.UserName] = differencer;
+                        }
+                        differencer.Post(cleanRemotes);                        
 
-
-                        ProcessDeletedFiles(accountInfo, cleanRemotes, pollTime);
+                        ProcessDeletedFiles(accountInfo, differencer.Deleted, pollTime);
 
                         //Create a list of actions from the remote files
-                        var allActions = ObjectsToActions(accountInfo, cleanRemotes);
+                        var allActions = ChangesToActions(accountInfo, differencer.Changed)
+                                        .Union(
+                                        CreatesToActions(accountInfo,differencer.Created));
 
                         
                         //var relativePath = objectInfo.RelativeUrlToFilePath(accountInfo.UserName);
@@ -643,8 +656,12 @@ namespace Pithos.Core.Agents
         }
 
 
+        Dictionary<string, SnapshotDifferencer> _differencers= new Dictionary<string, SnapshotDifferencer>();
+
+/*
         Dictionary<string, List<ObjectInfo>> _currentSnapshot = new Dictionary<string, List<ObjectInfo>>();
         Dictionary<string, List<ObjectInfo>> _previousSnapshot = new Dictionary<string, List<ObjectInfo>>();
+*/
 
         /// <summary>
         /// Deletes local files that are not found in the list of cloud files
@@ -662,18 +679,6 @@ namespace Pithos.Core.Agents
                 throw new ArgumentNullException("cloudFiles");
             Contract.EndContractBlock();
 
-            if (_previousSnapshot.ContainsKey(accountInfo.UserName) && _currentSnapshot.ContainsKey(accountInfo.UserName))
-                _previousSnapshot[accountInfo.UserName] = _currentSnapshot[accountInfo.UserName] ?? new List<ObjectInfo>();
-            else
-            {
-                _previousSnapshot[accountInfo.UserName]=new List<ObjectInfo>();
-            }
-
-            _currentSnapshot[accountInfo.UserName] = cloudFiles.ToList();
-
-            var deletedObjects = _previousSnapshot[accountInfo.UserName].Except(_currentSnapshot[accountInfo.UserName], new ObjectInfoComparer()).ToList();
-
-            
             //On the first run
             if (_firstPoll)
             {
@@ -712,7 +717,7 @@ namespace Pithos.Core.Agents
             else
             {
                 var deletedFiles = new List<FileSystemInfo>();
-                foreach (var objectInfo in deletedObjects)
+                foreach (var objectInfo in cloudFiles)
                 {
                     var relativePath = objectInfo.RelativeUrlToFilePath(accountInfo.UserName);
                     var item = GetFileAgent(accountInfo).GetFileSystemInfo(relativePath);
@@ -741,8 +746,7 @@ namespace Pithos.Core.Agents
                         }
 */
                     }
-                    StatusKeeper.ClearFileStatus(item.FullName);
-                    
+                    StatusKeeper.SetFileState(item.FullName, FileStatus.Deleted, FileOverlayStatus.Deleted);                    
                 }
                 StatusNotification.NotifyForFiles(deletedFiles, String.Format("{0} files were deleted", deletedFiles.Count), TraceLevel.Info);
             }
@@ -763,6 +767,75 @@ namespace Pithos.Core.Agents
         }
 
         //Creates an appropriate action for each server file
+        private IEnumerable<CloudAction> ChangesToActions(AccountInfo accountInfo,IEnumerable<ObjectInfo> changes)
+        {
+            if (changes==null)
+                throw new ArgumentNullException();
+            Contract.EndContractBlock();
+            var fileAgent = GetFileAgent(accountInfo);
+
+            //In order to avoid multiple iterations over the files, we iterate only once
+            //over the remote files
+            foreach (var objectInfo in changes)
+            {
+                var relativePath = objectInfo.RelativeUrlToFilePath(accountInfo.UserName);
+                //and remove any matching objects from the list, adding them to the commonObjects list
+                if (fileAgent.Exists(relativePath))
+                {
+                    //If a directory object already exists, we don't need to perform any other action                    
+                    var localFile = fileAgent.GetFileSystemInfo(relativePath);
+                    if (objectInfo.Content_Type == @"application/directory" && localFile is DirectoryInfo)
+                        continue;
+                    using (new SessionScope(FlushAction.Never))
+                    {
+                        var state =  StatusKeeper.GetStateByFilePath(localFile.FullName);
+                        _lastSeen[localFile.FullName] = DateTime.Now;
+                        //FileState.FindByFilePath(localFile.FullName);
+                        //Common files should be checked on a per-case basis to detect differences, which is newer
+
+                        yield return new CloudAction(accountInfo, CloudActionType.MustSynch,
+                                                     localFile, objectInfo, state, accountInfo.BlockSize,
+                                                     accountInfo.BlockHash);
+                    }
+                }
+                else
+                {
+                    //Remote files should be downloaded
+                    yield return new CloudDownloadAction(accountInfo,objectInfo);
+                }
+            }            
+        }
+
+        private IEnumerable<CloudAction> CreatesToActions(AccountInfo accountInfo,IEnumerable<ObjectInfo> creates)
+        {
+            if (creates==null)
+                throw new ArgumentNullException();
+            Contract.EndContractBlock();
+            var fileAgent = GetFileAgent(accountInfo);
+
+            //In order to avoid multiple iterations over the files, we iterate only once
+            //over the remote files
+            foreach (var objectInfo in creates)
+            {
+                var relativePath = objectInfo.RelativeUrlToFilePath(accountInfo.UserName);
+                //and remove any matching objects from the list, adding them to the commonObjects list
+                if (fileAgent.Exists(relativePath))
+                {
+                    //If the object already exists, we probably have a conflict
+                    //If a directory object already exists, we don't need to perform any other action                    
+                    var localFile = fileAgent.GetFileSystemInfo(relativePath);
+                    StatusKeeper.SetFileState(localFile.FullName,FileStatus.Conflict,FileOverlayStatus.Conflict);
+                }
+                else
+                {
+                    //Remote files should be downloaded
+                    yield return new CloudDownloadAction(accountInfo,objectInfo);
+                }
+            }            
+        }
+
+        //Creates an appropriate action for each server file
+/*
         private IEnumerable<CloudAction> ObjectsToActions(AccountInfo accountInfo,IEnumerable<ObjectInfo> remote)
         {
             if (remote==null)
@@ -809,6 +882,7 @@ namespace Pithos.Core.Agents
                 }
             }            
         }
+*/
 
         private static FileAgent GetFileAgent(AccountInfo accountInfo)
         {
@@ -1063,7 +1137,7 @@ namespace Pithos.Core.Agents
             
                         
             //Calculate the file's treehash
-            var treeHash = await Signature.CalculateTreeHashAsync(localPath, serverHash.BlockSize, serverHash.BlockHash);
+            var treeHash = await Signature.CalculateTreeHashAsync(localPath, serverHash.BlockSize, serverHash.BlockHash, 2);
                 
             //And compare it with the server's hash
             var upHashes = serverHash.GetHashesAsStrings();
@@ -1201,7 +1275,7 @@ namespace Pithos.Core.Agents
 
                         //First, calculate the tree hash
                         var treeHash = await Signature.CalculateTreeHashAsync(fullFileName, accountInfo.BlockSize,
-                                                                              accountInfo.BlockHash);
+                                                                              accountInfo.BlockHash, 2);
 
                         await UploadWithHashMap(accountInfo, cloudFile, fileInfo as FileInfo, cloudFile.Name, treeHash);
                     }
index e90532f..17e59a1 100644 (file)
@@ -37,7 +37,9 @@ namespace Pithos.Core.Agents
         }\r
         public SnapshotDifferencer Post(IEnumerable<ObjectInfo> list)\r
         {\r
-            return new SnapshotDifferencer(_current,list);\r
+            _previous = _current;\r
+            _current = list ?? new List<ObjectInfo>();\r
+            return this;\r
         }\r
         \r
         public IEnumerable<ObjectInfo> Deleted\r
index 9563381..1fa7807 100644 (file)
@@ -171,7 +171,7 @@ namespace Pithos.Core
             
 
             CloudClient=new CloudFilesClient(UserName,ApiKey);
-            var proxy = ProxyFromSettings();            
+            var proxy = ProxyFromSettings(Settings);            
             CloudClient.Proxy = proxy;
             CloudClient.UsePithos = true;
             CloudClient.AuthenticationUrl = this.AuthenticationUrl;            
@@ -227,15 +227,16 @@ namespace Pithos.Core
 
         public string AuthenticationUrl { get; set; }
 
-        private WebProxy ProxyFromSettings()
+        //TODO: Move this method somewhere else
+        public static WebProxy ProxyFromSettings(IPithosSettings pithosSettings)
         {            
-            if (Settings.UseManualProxy)
+            if (pithosSettings.UseManualProxy)
             {
-                var proxy = new WebProxy(Settings.ProxyServer, Settings.ProxyPort);
+                var proxy = new WebProxy(pithosSettings.ProxyServer, pithosSettings.ProxyPort);
                 //If the proxy requires specific authentication settings, use them
-                if (Settings.ProxyAuthentication)
+                if (pithosSettings.ProxyAuthentication)
                 {
-                    proxy.Credentials=new NetworkCredential(Settings.ProxyUsername,Settings.ProxyPassword,Settings.ProxyDomain);
+                    proxy.Credentials=new NetworkCredential(pithosSettings.ProxyUsername,pithosSettings.ProxyPassword,pithosSettings.ProxyDomain);
                 }
                     //Otherwise, if there are generic authentication settings, use them
                 if (!String.IsNullOrWhiteSpace(CredentialCache.DefaultNetworkCredentials.UserName))
index a680105..0048bfe 100644 (file)
@@ -35,6 +35,7 @@ namespace Pithos.Interfaces
         bool ExtensionsActivated { get; set; }
 
         int PollingInterval { get; set; }
+        byte HashingParallelism { get; set; }
 
         void Save();
         void Reload();
index 18f8f39..478f67a 100644 (file)
@@ -66,6 +66,7 @@ namespace Pithos.Interfaces
         public bool ProxyAuthentication { get; set; }
         public bool ExtensionsActivated { get; set; }
         public int PollingInterval { get; set; }
+        public byte HashingParallelism { get; set; }
 
         public PithosSettingsData()
         {
index 407c8c9..a5c690a 100644 (file)
@@ -1,8 +1,10 @@
 using System;
+using System.Collections.Concurrent;
 using System.Collections.Generic;
 using System.IO;
 using System.Linq;
 using System.Text;
+using System.Threading.Tasks;
 using NUnit.Framework;
 
 namespace Pithos.Network.Test
@@ -22,7 +24,7 @@ namespace Pithos.Network.Test
                              };
             client.Authenticate();
             var fileName = @"vlc-1.1.11-win32.exe";
-            var treeHash=Signature.CalculateTreeHashAsync(Path.Combine(@"e:\pithos\" ,fileName), 4*1024*1024 , "sha256").Result;
+            var treeHash=Signature.CalculateTreeHashAsync(Path.Combine(@"e:\pithos\" ,fileName), 4*1024*1024 , "sha256", 2).Result;
             var result = client.PutHashMap(account, "pithos", fileName, treeHash).Result;
 
             Assert.AreEqual(0,result.Count);
index 98bf6b4..e2adcb2 100644 (file)
@@ -1,5 +1,7 @@
 using System;
+using System.Collections.Concurrent;
 using System.Collections.Generic;
+using System.Diagnostics;
 using System.IO;
 using System.Linq;
 using System.Security.Cryptography;
@@ -35,7 +37,7 @@ namespace Pithos.Core.Test
 
             var md5 = Signature.CalculateMD5(file);            
 
-            var hash1 = Signature.CalculateTreeHashAsync(file, (int) blockSize,"sha256").Result;
+            var hash1 = Signature.CalculateTreeHashAsync(file, (int) blockSize,"sha256", 2).Result;
             Assert.IsNotNull(hash1.Hashes);
             Assert.AreEqual(numBlocks, hash1.Hashes.Count());
 
@@ -60,7 +62,7 @@ namespace Pithos.Core.Test
 
             var md5 = Signature.CalculateMD5(file);
 
-            var hash1 = Signature.CalculateTreeHashAsync(file, (int) blockSize, "sha256").Result;
+            var hash1 = Signature.CalculateTreeHashAsync(file, (int) blockSize, "sha256", 2).Result;
             hash1.FileId = Guid.NewGuid();
             var task = hash1.Save(@"e:\")
                 .ContinueWith(_ => TreeHash.LoadTreeHash(@"e:\", hash1.FileId)).Unwrap();            
@@ -71,6 +73,46 @@ namespace Pithos.Core.Test
             }).Wait();
         }
 
+        [Test]
+        public void TestLargeHashCalculation()
+        {
+
+            var file = "e:\\testFile.tmp";
+            if (!File.Exists(file))
+            {
+                byte[] buffer= new byte[300 * 1048576];
+                Random rnd=new Random();
+                rnd.NextBytes(buffer);                
+                //Create a 100MB buffer
+                File.WriteAllBytes(file,buffer);                
+            }
+
+            decimal blockSize = 4 * 1048576;
+
+            Trace.WriteLine("1");
+            var stopwatch = Stopwatch.StartNew();            
+            var hash1 = Signature.CalculateTreeHashAsync(file, (int)blockSize, "sha256", 1).Result;
+            stopwatch.Stop();
+            Trace.WriteLine(stopwatch.Elapsed);
+            
+            Trace.WriteLine("2");
+            stopwatch.Restart();            
+            var hash2 = Signature.CalculateTreeHashAsync(file, (int)blockSize, "sha256", 2).Result;
+            stopwatch.Stop();
+            Trace.WriteLine(stopwatch.Elapsed);
+
+            Trace.WriteLine("3");
+            stopwatch.Restart();
+            var hash3 = Signature.CalculateTreeHashAsync(file, (int)blockSize, "sha256", 3).Result;
+            stopwatch.Stop();
+            Trace.WriteLine(stopwatch.Elapsed);
+
+            Assert.That(hash3.TopHash,Is.EquivalentTo(hash2.TopHash));
+
+
+            
+        }
+
         public static string BytesToStr(byte[] bytes)
         {
             var str = new StringBuilder();
@@ -106,7 +148,7 @@ namespace Pithos.Core.Test
             client.UsePithos = true;            
             client.Authenticate();
             var fileName = @"vlc-1.1.11-win32.exe";
-            var localHash= Signature.CalculateTreeHashAsync(Path.Combine(@"e:\pithos\", fileName), 4 * 1024 * 1024, "sha256").Result;
+            var localHash= Signature.CalculateTreeHashAsync(Path.Combine(@"e:\pithos\", fileName), 4 * 1024 * 1024, "sha256", 2).Result;
             var upHash= client.GetHashMap(fileName, account, "pithos").Result;
 
             Assert.AreEqual(upHash.TopHash, localHash.TopHash);
@@ -120,7 +162,7 @@ namespace Pithos.Core.Test
 
             var fileName = @"vlc-1.1.11-win32.exe";
             var syncHash= Signature.CalculateTreeHash(Path.Combine(@"e:\pithos\", fileName), 4 * 1024 * 1024, "sha256");
-            var asyncHash = Signature.CalculateTreeHashAsync(Path.Combine(@"e:\pithos\", fileName), 4 * 1024 * 1024, "sha256")
+            var asyncHash = Signature.CalculateTreeHashAsync(Path.Combine(@"e:\pithos\", fileName), 4 * 1024 * 1024, "sha256", 2)
                 .Result;
 
             Assert.AreEqual(syncHash.TopHash, asyncHash.TopHash);
diff --git a/trunk/Pithos.Network/BlockHashAlgorithms.cs b/trunk/Pithos.Network/BlockHashAlgorithms.cs
new file mode 100644 (file)
index 0000000..58757cd
--- /dev/null
@@ -0,0 +1,149 @@
+// -----------------------------------------------------------------------\r
+// <copyright file="BlockHashAlgorithms.cs" company="Microsoft">\r
+// TODO: Update copyright text.\r
+// </copyright>\r
+// -----------------------------------------------------------------------\r
+\r
+using System.Collections.Concurrent;\r
+using System.Diagnostics.Contracts;\r
+using System.IO;\r
+using System.Security.Cryptography;\r
+using System.Threading.Tasks;\r
+using System.Threading.Tasks.Dataflow;\r
+\r
+namespace Pithos.Network\r
+{\r
+    using System;\r
+    using System.Collections.Generic;\r
+    using System.Linq;\r
+    using System.Text;\r
+\r
+    /// <summary>\r
+    /// TODO: Update summary.\r
+    /// </summary>\r
+    public static class BlockHashAlgorithms\r
+    {\r
+        public static Func<FileStream, int, string, ConcurrentDictionary<int, byte[]>, int, Task<ConcurrentDictionary<int, byte[]>>> CalculateBlockHash;\r
+\r
+        public static Task<ConcurrentDictionary<int, byte[]>> CalculateBlockHashesRecursiveAsync(FileStream stream, int blockSize, string algorithm, ConcurrentDictionary<int, byte[]> hashes = null, int index = 0)\r
+        {\r
+            if (stream == null)\r
+                throw new ArgumentNullException("stream");\r
+            if (String.IsNullOrWhiteSpace(algorithm))\r
+                throw new ArgumentNullException("algorithm");\r
+            if (blockSize <= 0)\r
+                throw new ArgumentOutOfRangeException("blockSize", "blockSize must be a value greater than zero ");\r
+            if (index < 0)\r
+                throw new ArgumentOutOfRangeException("index", "index must be a non-negative value");\r
+            Contract.EndContractBlock();\r
+\r
+\r
+            if (hashes == null)\r
+                hashes = new ConcurrentDictionary<int, byte[]>();\r
+\r
+            var buffer = new byte[blockSize];\r
+            return stream.ReadAsync(buffer, 0, blockSize).ContinueWith(t =>\r
+            {\r
+                var read = t.Result;\r
+\r
+                var nextTask = read == blockSize\r
+                                    ? CalculateBlockHashesRecursiveAsync(stream, blockSize, algorithm, hashes, index + 1)\r
+                                    : Task.Factory.StartNew(() => hashes);\r
+                if (read > 0)\r
+                    using (var hasher = HashAlgorithm.Create(algorithm))\r
+                    {\r
+                        //This code was added for compatibility with the way Pithos calculates the last hash\r
+                        //We calculate the hash only up to the last non-null byte\r
+                        var lastByteIndex = Array.FindLastIndex(buffer, read - 1, aByte => aByte != 0);\r
+\r
+                        var hash = hasher.ComputeHash(buffer, 0, lastByteIndex + 1);\r
+                        hashes[index] = hash;\r
+                    }\r
+                return nextTask;\r
+            }).Unwrap();\r
+        }\r
+        public static async Task<ConcurrentDictionary<int, byte[]>> CalculateBlockHashesAsync(FileStream stream, int blockSize, string algorithm, ConcurrentDictionary<int, byte[]> hashes = null, int index = 0)\r
+        {\r
+            if (stream == null)\r
+                throw new ArgumentNullException("stream");\r
+            if (String.IsNullOrWhiteSpace(algorithm))\r
+                throw new ArgumentNullException("algorithm");\r
+            if (blockSize <= 0)\r
+                throw new ArgumentOutOfRangeException("blockSize", "blockSize must be a value greater than zero ");\r
+            Contract.EndContractBlock();\r
+\r
+\r
+            if (hashes == null)\r
+                hashes = new ConcurrentDictionary<int, byte[]>();\r
+\r
+            var buffer = new byte[blockSize];\r
+            int read;\r
+            while ((read = await stream.ReadAsync(buffer, 0, blockSize)) > 0)\r
+            {\r
+                //TODO: identify the value of index\r
+\r
+                using (var hasher = HashAlgorithm.Create(algorithm))\r
+                {\r
+                    //This code was added for compatibility with the way Pithos calculates the last hash\r
+                    //We calculate the hash only up to the last non-null byte\r
+                    var lastByteIndex = Array.FindLastIndex(buffer, read - 1, aByte => aByte != 0);\r
+\r
+                    var hash = hasher.ComputeHash(buffer, 0, lastByteIndex + 1);\r
+                    hashes[index] = hash;\r
+                }\r
+                index += read;\r
+            };\r
+            return hashes;\r
+        }\r
+        \r
+        public static async Task<ConcurrentDictionary<int, byte[]>> CalculateBlockHashesAgentAsync(FileStream stream, int blockSize, string algorithm, int parallelism)\r
+        {\r
+            if (stream == null)\r
+                throw new ArgumentNullException("stream");\r
+            if (String.IsNullOrWhiteSpace(algorithm))\r
+                throw new ArgumentNullException("algorithm");\r
+            if (blockSize <= 0)\r
+                throw new ArgumentOutOfRangeException("blockSize", "blockSize must be a value greater than zero ");\r
+            Contract.EndContractBlock();\r
+\r
+            var hashes = new ConcurrentDictionary<int, byte[]>();\r
+\r
+            var options = new ExecutionDataflowBlockOptions {BoundedCapacity = parallelism,MaxDegreeOfParallelism=parallelism};\r
+            var hashBlock=new ActionBlock<Tuple<int,byte[]>>(input=>\r
+                              {\r
+                                  int idx = input.Item1;\r
+                                  byte[] block = input.Item2;\r
+                                  using (var hasher = HashAlgorithm.Create(algorithm))\r
+                                  {\r
+                                      //This code was added for compatibility with the way Pithos calculates the last hash\r
+                                      //We calculate the hash only up to the last non-null byte\r
+                                      var lastByteIndex = Array.FindLastIndex(block, block.Length-1, aByte => aByte != 0);\r
+\r
+                                      var hash = hasher.ComputeHash(block, 0, lastByteIndex + 1);\r
+                                      hashes[idx] = hash;\r
+                                  }                                  \r
+                              },options);\r
+\r
+            var buffer = new byte[blockSize];\r
+            int read;\r
+            int index = 0;\r
+            while ((read = await stream.ReadAsync(buffer, 0, blockSize)) > 0)\r
+            {\r
+                var block = new byte[read];\r
+                Buffer.BlockCopy(buffer,0,block,0,read);\r
+                await hashBlock.SendAsync(Tuple.Create(index, block));\r
+                index += read;\r
+            };\r
+\r
+            hashBlock.Complete();\r
+            await hashBlock.Completion;\r
+\r
+            return hashes;\r
+        }\r
+\r
+        static BlockHashAlgorithms()\r
+        {\r
+            CalculateBlockHash = CalculateBlockHashesRecursiveAsync;\r
+        }\r
+    }\r
+}\r
index c7fd04d..30c7644 100644 (file)
@@ -148,14 +148,13 @@ namespace Pithos.Network
             Contract.Ensures(StorageUrl != null);
             Contract.Ensures(_baseClient != null);
             Contract.Ensures(RootAddressUri != null);
-            Contract.EndContractBlock();
+            Contract.EndContractBlock();          
 
             _baseClient = new RestClient
             {
                 BaseAddress = accountInfo.StorageUri.ToString(),
                 Timeout = 10000,
                 Retries = 3,
-                Proxy=this.Proxy
             };
             StorageUrl = accountInfo.StorageUri;
             Token = accountInfo.Token;
index f2930f3..2e38f5e 100644 (file)
     <Reference Include="System" />
     <Reference Include="System.ComponentModel.Composition" />
     <Reference Include="System.Core" />
+    <Reference Include="System.ServiceModel" />
+    <Reference Include="System.Threading.Tasks.Dataflow">
+      <HintPath>..\Libraries\System.Threading.Tasks.Dataflow.dll</HintPath>
+    </Reference>
     <Reference Include="System.Xml.Linq" />
     <Reference Include="System.Data.DataSetExtensions" />
     <Reference Include="System.Data" />
   </ItemGroup>
   <ItemGroup>
     <Compile Include="AccountInfo.cs" />
+    <Compile Include="BlockHashAlgorithms.cs" />
     <Compile Include="CloudFilesClient.cs" />
     <Compile Include="ContainerInfo.cs" />
     <Compile Include="ICloudClient.cs" />
index 4d3df6e..fdfe011 100644 (file)
@@ -104,11 +104,11 @@ namespace Pithos.Network
                 throw new ArgumentNullException("algorithm");
             Contract.EndContractBlock();
 
-            var hash=CalculateTreeHashAsync(filePath, blockSize, algorithm);
+            var hash=CalculateTreeHashAsync(filePath, blockSize, algorithm, 2);
             return hash.Result;
         }
         
-        public static async Task<TreeHash> CalculateTreeHashAsync(FileInfo fileInfo, int blockSize, string algorithm)
+        public static async Task<TreeHash> CalculateTreeHashAsync(FileInfo fileInfo, int blockSize, string algorithm, byte parallelism)
         {
             if (fileInfo == null)
                 throw new ArgumentNullException("fileInfo");
@@ -119,12 +119,12 @@ namespace Pithos.Network
             if (String.IsNullOrWhiteSpace(algorithm))
                 throw new ArgumentNullException("algorithm");
             Contract.EndContractBlock();
-
-            return await CalculateTreeHashAsync(fileInfo.FullName, blockSize, algorithm);
+            
+            return await CalculateTreeHashAsync(fileInfo.FullName, blockSize, algorithm, parallelism);
         }
 
 
-        public static async Task<TreeHash> CalculateTreeHashAsync(string filePath, int blockSize,string algorithm)
+        public static async Task<TreeHash> CalculateTreeHashAsync(string filePath, int blockSize,string algorithm, int parallelism)
         {
             if (String.IsNullOrWhiteSpace(filePath))
                 throw new ArgumentNullException("filePath");
@@ -145,7 +145,7 @@ namespace Pithos.Network
             using (var stream = new FileStream(filePath, FileMode.Open, FileAccess.Read, FileShare.ReadWrite, blockSize, true))
             {
                 //Calculate the blocks asyncrhonously
-                var hashes = await CalculateBlockHashesAsync(stream, blockSize, algorithm);                
+                var hashes = await BlockHashAlgorithms.CalculateBlockHashesAgentAsync(stream, blockSize, algorithm, parallelism);                
 
                 //And then proceed with creating and returning a TreeHash
                 var length = stream.Length;
@@ -254,44 +254,6 @@ namespace Pithos.Network
             }
         }*/
 
-        private static Task<ConcurrentDictionary<int, byte[]>> CalculateBlockHashesAsync(FileStream stream, int blockSize, string algorithm, ConcurrentDictionary<int, byte[]> hashes=null, int index = 0)
-        {
-            if (stream==null)
-                throw new ArgumentNullException("stream");
-            if (String.IsNullOrWhiteSpace(algorithm))
-                throw new ArgumentNullException("algorithm");
-            if (blockSize <= 0)
-                throw new ArgumentOutOfRangeException("blockSize", "blockSize must be a value greater than zero ");
-            if (index< 0)
-                throw new ArgumentOutOfRangeException("index", "index must be a non-negative value");
-            Contract.EndContractBlock();
-
-
-            if (hashes==null)
-                hashes= new ConcurrentDictionary<int, byte[]>();
-
-            var buffer = new byte[blockSize];
-            return stream.ReadAsync(buffer, 0, blockSize).ContinueWith(t =>
-            {
-                var read = t.Result;
-
-                var nextTask = read == blockSize
-                                    ? CalculateBlockHashesAsync(stream, blockSize, algorithm, hashes, index + 1) 
-                                    : Task.Factory.StartNew(() => hashes);
-                if (read>0)
-                    using (var hasher = HashAlgorithm.Create(algorithm))
-                    {
-                        //This code was added for compatibility with the way Pithos calculates the last hash
-                        //We calculate the hash only up to the last non-null byte
-                        //TODO: Remove if the server starts using the full block instead of the trimmed block
-                        var lastByteIndex = Array.FindLastIndex(buffer, read - 1, aByte => aByte != 0);
-
-                        var hash = hasher.ComputeHash(buffer, 0, lastByteIndex+1);
-                        hashes[index]=hash;
-                    }
-                return nextTask;
-            }).Unwrap();
-        }
 
     
 
index 4bc4589..02ba4b2 100644 (file)
@@ -54,6 +54,8 @@ namespace Pithos.ShellExtensions.Test
 
         public int PollingInterval { get; set; }
 
+        public byte HashingParallelism{get; set; }
+
         public void Save()
         {
             
index 67971bd..6a15249 100644 (file)
@@ -167,6 +167,12 @@ namespace Pithos.ShellExtensions
             get { return _settings.Value.PollingInterval; }
             set { _settings.Value.PollingInterval = value; }
         }
+
+        public byte HashingParallelism
+        {
+            get { return _settings.Value.HashingParallelism; }
+            set { _settings.Value.HashingParallelism = value; }
+        }
         public void Save()
         {
            
index b621d84..552f38f 100644 (file)
@@ -52,6 +52,8 @@ namespace Pithos.ShellExtensions.Test
 
         public int PollingInterval { get; set; }
 
+        public byte HashingParallelism { get; set; }
+
 
         public bool ProxyAuthentication { get; set; }
         public void Save()