Changed hashing to use an ActionBlock with parallelism options.
Added parallelism property in Preferences
_settings.StartOnSystemStartup = value;
}
}
+
+ public byte HashingParallelism
+ {
+ get { return _settings.HashingParallelism; }
+ set { _settings.HashingParallelism = value; }
+ }
/*
public override IEnumerable<string> GetDynamicMemberNames()
{
<Button Content="Refresh Overlays" Name="RefreshOverlays" HorizontalAlignment="Left" Margin="5" Style="{StaticResource ButtonStyle}" Width="Auto" />
<TextBlock Text="Polling Interval (secs)" Margin="5"/>
<extToolkit:IntegerUpDown x:Name="Settings_PollingInterval" HorizontalAlignment="Left" Width="100" Margin="5,0" Watermark="Enter seconds" Minimum="10" />
+ <TextBlock Text="Hashing Parallelism" Margin="5"/>
+ <extToolkit:IntegerUpDown x:Name="Settings_HashingParallelism" HorizontalAlignment="Left" Width="100" Margin="5,0" Watermark="Enter number of tasks" Minimum="1" />
</StackPanel>
</TabItem>
</TabControl>
[global::System.Configuration.ApplicationScopedSettingAttribute()]
[global::System.Diagnostics.DebuggerNonUserCodeAttribute()]
- [global::System.Configuration.DefaultSettingValueAttribute("http://pithos.dev.grnet.gr/im/login")]
+ [global::System.Configuration.DefaultSettingValueAttribute("http://pithos.dev.grnet.gr/login")]
public string PithosLoginUrl {
get {
return ((string)(this["PithosLoginUrl"]));
this["ProxyDomain"] = value;
}
}
+
+ [global::System.Configuration.UserScopedSettingAttribute()]
+ [global::System.Diagnostics.DebuggerNonUserCodeAttribute()]
+ [global::System.Configuration.DefaultSettingValueAttribute("1")]
+ public byte HashingParallelism {
+ get {
+ return ((byte)(this["HashingParallelism"]));
+ }
+ set {
+ this["HashingParallelism"] = value;
+ }
+ }
}
}
<Value Profile="(Default)">https://auth.api.rackspacecloud.com</Value>
</Setting>
<Setting Name="PithosLoginUrl" Type="System.String" Scope="Application">
- <Value Profile="(Default)">http://pithos.dev.grnet.gr/im/login</Value>
+ <Value Profile="(Default)">http://pithos.dev.grnet.gr/login</Value>
</Setting>
<Setting Name="FeedbackUri" Type="System.String" Scope="Application">
<Value Profile="(Default)">http://pithos.dev.grnet.gr/tools/feedback</Value>
<Setting Name="ProxyDomain" Type="System.String" Scope="User">
<Value Profile="(Default)" />
</Setting>
+ <Setting Name="HashingParallelism" Type="System.Byte" Scope="User">
+ <Value Profile="(Default)">1</Value>
+ </Setting>
</Settings>
</SettingsFile>
\ No newline at end of file
//Logging in the Pithos client is provided by log4net
private static readonly log4net.ILog Log = log4net.LogManager.GetLogger("Pithos");
+ //Lazily initialized File Version info. This is done once and lazily to avoid blocking the UI
+ private Lazy<FileVersionInfo> _fileVersion;
+
///<summary>
/// The Shell depends on MEF to provide implementations for windowManager, events, the status checker service and the settings
///</summary>
StatusMessage = "In Synch";
+ _fileVersion= new Lazy<FileVersionInfo>(() =>
+ {
+ Assembly assembly = Assembly.GetExecutingAssembly();
+ var fileVersion = FileVersionInfo.GetVersionInfo(assembly.Location);
+ return fileVersion;
+ });
_accounts.CollectionChanged += (sender, e) =>
{
NotifyOfPropertyChange(() => OpenFolderCaption);
}.ToDictionary(s => s.Status);
readonly IWindowManager _windowManager;
-
+
///<summary>
/// Updates the visual status indicators of the application depending on status changes, e.g. icon, stat
var info = _iconNames[pithosStatus];
StatusIcon = String.Format(@"../Images/{0}.ico", info.IconName);
- Assembly assembly = Assembly.GetExecutingAssembly();
- var fileVersion = FileVersionInfo.GetVersionInfo(assembly.Location);
- StatusMessage = String.Format("Pithos {0}\r\n{1}", fileVersion.FileVersion,info.StatusText);
+ StatusMessage = String.Format("Pithos {0}\r\n{1}", _fileVersion.Value.FileVersion,info.StatusText);
}
//_events.Publish(new Notification { Title = "Start", Message = "Start Monitoring", Level = TraceLevel.Info});
<setting name="ProxyDomain" serializeAs="String">
<value />
</setting>
+ <setting name="HashingParallelism" serializeAs="String">
+ <value>1</value>
+ </setting>
</Pithos.Client.WPF.Properties.Settings>
</userSettings>
<connectionStrings>
<value>https://auth.api.rackspacecloud.com</value>
</setting>
<setting name="PithosLoginUrl" serializeAs="String">
- <value>http://pithos.dev.grnet.gr/im/login</value>
+ <value>http://pithos.dev.grnet.gr/login</value>
</setting>
<setting name="FeedbackUri" serializeAs="String">
<value>http://pithos.dev.grnet.gr/tools/feedback</value>
public int PollingInterval { get; set; }
+ public byte HashingParallelism { get; set; }
+
public void Save()
{
using System;
+using System.Collections.Concurrent;
using System.Collections.Generic;
using System.IO;
using System.Linq;
client.DeleteObject(null, FolderConstants.PithosContainer, fileName);
- var treeHash = await Signature.CalculateTreeHashAsync(filePath, accountInfo.BlockSize, accountInfo.BlockHash);
+ var treeHash = await Signature.CalculateTreeHashAsync(filePath, accountInfo.BlockSize, accountInfo.BlockHash, 2);
var cloudFile = new ObjectInfo {Account = account, Container = "pithos"};
var fileInfo = new FileInfo(filePath);
.Wait();
Assert.IsTrue(File.Exists(filePath));
- var treeHash = Signature.CalculateTreeHashAsync(filePath, accountInfo.BlockSize, accountInfo.BlockHash).Result;
+ var treeHash = Signature.CalculateTreeHashAsync(filePath, accountInfo.BlockSize, accountInfo.BlockHash, 2).Result;
Assert.AreEqual(treeHash.TopHash, newHash.TopHash);
cloudAction.TopHash =
new Lazy<string>(() => Signature.CalculateTreeHashAsync(localFile,
accountInfo.BlockSize,
- accountInfo.BlockHash).Result
+ accountInfo.BlockHash, Settings.HashingParallelism).Result
.TopHash.ToHashString());
else
{
_pauseAgent.Wait();
Log.Info("Scheduled");
- var client=new CloudFilesClient(accountInfo);
+ var client = new CloudFilesClient(accountInfo)
+ {
+ Proxy = PithosMonitor.ProxyFromSettings(this.Settings)
+ };
var containers = client.ListContainers(accountInfo.UserName);
StringComparison.InvariantCultureIgnoreCase)
select info).ToList();
+ //TODO: Introduced state here, must remove somehow
+ //Must move all this elsewhere
+ SnapshotDifferencer differencer;
+ if (!_differencers.TryGetValue(accountInfo.UserName,out differencer))
+ {
+ differencer = new SnapshotDifferencer();
+ _differencers[accountInfo.UserName] = differencer;
+ }
+ differencer.Post(cleanRemotes);
-
- ProcessDeletedFiles(accountInfo, cleanRemotes, pollTime);
+ ProcessDeletedFiles(accountInfo, differencer.Deleted, pollTime);
//Create a list of actions from the remote files
- var allActions = ObjectsToActions(accountInfo, cleanRemotes);
+ var allActions = ChangesToActions(accountInfo, differencer.Changed)
+ .Union(
+ CreatesToActions(accountInfo,differencer.Created));
//var relativePath = objectInfo.RelativeUrlToFilePath(accountInfo.UserName);
}
+ Dictionary<string, SnapshotDifferencer> _differencers= new Dictionary<string, SnapshotDifferencer>();
+
+/*
Dictionary<string, List<ObjectInfo>> _currentSnapshot = new Dictionary<string, List<ObjectInfo>>();
Dictionary<string, List<ObjectInfo>> _previousSnapshot = new Dictionary<string, List<ObjectInfo>>();
+*/
/// <summary>
/// Deletes local files that are not found in the list of cloud files
throw new ArgumentNullException("cloudFiles");
Contract.EndContractBlock();
- if (_previousSnapshot.ContainsKey(accountInfo.UserName) && _currentSnapshot.ContainsKey(accountInfo.UserName))
- _previousSnapshot[accountInfo.UserName] = _currentSnapshot[accountInfo.UserName] ?? new List<ObjectInfo>();
- else
- {
- _previousSnapshot[accountInfo.UserName]=new List<ObjectInfo>();
- }
-
- _currentSnapshot[accountInfo.UserName] = cloudFiles.ToList();
-
- var deletedObjects = _previousSnapshot[accountInfo.UserName].Except(_currentSnapshot[accountInfo.UserName], new ObjectInfoComparer()).ToList();
-
-
//On the first run
if (_firstPoll)
{
else
{
var deletedFiles = new List<FileSystemInfo>();
- foreach (var objectInfo in deletedObjects)
+ foreach (var objectInfo in cloudFiles)
{
var relativePath = objectInfo.RelativeUrlToFilePath(accountInfo.UserName);
var item = GetFileAgent(accountInfo).GetFileSystemInfo(relativePath);
}
*/
}
- StatusKeeper.ClearFileStatus(item.FullName);
-
+ StatusKeeper.SetFileState(item.FullName, FileStatus.Deleted, FileOverlayStatus.Deleted);
}
StatusNotification.NotifyForFiles(deletedFiles, String.Format("{0} files were deleted", deletedFiles.Count), TraceLevel.Info);
}
}
//Creates an appropriate action for each server file
+ private IEnumerable<CloudAction> ChangesToActions(AccountInfo accountInfo,IEnumerable<ObjectInfo> changes)
+ {
+ if (changes==null)
+ throw new ArgumentNullException();
+ Contract.EndContractBlock();
+ var fileAgent = GetFileAgent(accountInfo);
+
+ //In order to avoid multiple iterations over the files, we iterate only once
+ //over the remote files
+ foreach (var objectInfo in changes)
+ {
+ var relativePath = objectInfo.RelativeUrlToFilePath(accountInfo.UserName);
+ //and remove any matching objects from the list, adding them to the commonObjects list
+ if (fileAgent.Exists(relativePath))
+ {
+ //If a directory object already exists, we don't need to perform any other action
+ var localFile = fileAgent.GetFileSystemInfo(relativePath);
+ if (objectInfo.Content_Type == @"application/directory" && localFile is DirectoryInfo)
+ continue;
+ using (new SessionScope(FlushAction.Never))
+ {
+ var state = StatusKeeper.GetStateByFilePath(localFile.FullName);
+ _lastSeen[localFile.FullName] = DateTime.Now;
+ //FileState.FindByFilePath(localFile.FullName);
+ //Common files should be checked on a per-case basis to detect differences, which is newer
+
+ yield return new CloudAction(accountInfo, CloudActionType.MustSynch,
+ localFile, objectInfo, state, accountInfo.BlockSize,
+ accountInfo.BlockHash);
+ }
+ }
+ else
+ {
+ //Remote files should be downloaded
+ yield return new CloudDownloadAction(accountInfo,objectInfo);
+ }
+ }
+ }
+
+ private IEnumerable<CloudAction> CreatesToActions(AccountInfo accountInfo,IEnumerable<ObjectInfo> creates)
+ {
+ if (creates==null)
+ throw new ArgumentNullException();
+ Contract.EndContractBlock();
+ var fileAgent = GetFileAgent(accountInfo);
+
+ //In order to avoid multiple iterations over the files, we iterate only once
+ //over the remote files
+ foreach (var objectInfo in creates)
+ {
+ var relativePath = objectInfo.RelativeUrlToFilePath(accountInfo.UserName);
+ //and remove any matching objects from the list, adding them to the commonObjects list
+ if (fileAgent.Exists(relativePath))
+ {
+ //If the object already exists, we probably have a conflict
+ //If a directory object already exists, we don't need to perform any other action
+ var localFile = fileAgent.GetFileSystemInfo(relativePath);
+ StatusKeeper.SetFileState(localFile.FullName,FileStatus.Conflict,FileOverlayStatus.Conflict);
+ }
+ else
+ {
+ //Remote files should be downloaded
+ yield return new CloudDownloadAction(accountInfo,objectInfo);
+ }
+ }
+ }
+
+ //Creates an appropriate action for each server file
+/*
private IEnumerable<CloudAction> ObjectsToActions(AccountInfo accountInfo,IEnumerable<ObjectInfo> remote)
{
if (remote==null)
}
}
}
+*/
private static FileAgent GetFileAgent(AccountInfo accountInfo)
{
//Calculate the file's treehash
- var treeHash = await Signature.CalculateTreeHashAsync(localPath, serverHash.BlockSize, serverHash.BlockHash);
+ var treeHash = await Signature.CalculateTreeHashAsync(localPath, serverHash.BlockSize, serverHash.BlockHash, 2);
//And compare it with the server's hash
var upHashes = serverHash.GetHashesAsStrings();
//First, calculate the tree hash
var treeHash = await Signature.CalculateTreeHashAsync(fullFileName, accountInfo.BlockSize,
- accountInfo.BlockHash);
+ accountInfo.BlockHash, 2);
await UploadWithHashMap(accountInfo, cloudFile, fileInfo as FileInfo, cloudFile.Name, treeHash);
}
}\r
public SnapshotDifferencer Post(IEnumerable<ObjectInfo> list)\r
{\r
- return new SnapshotDifferencer(_current,list);\r
+ _previous = _current;\r
+ _current = list ?? new List<ObjectInfo>();\r
+ return this;\r
}\r
\r
public IEnumerable<ObjectInfo> Deleted\r
CloudClient=new CloudFilesClient(UserName,ApiKey);
- var proxy = ProxyFromSettings();
+ var proxy = ProxyFromSettings(Settings);
CloudClient.Proxy = proxy;
CloudClient.UsePithos = true;
CloudClient.AuthenticationUrl = this.AuthenticationUrl;
public string AuthenticationUrl { get; set; }
- private WebProxy ProxyFromSettings()
+ //TODO: Move this method somewhere else
+ public static WebProxy ProxyFromSettings(IPithosSettings pithosSettings)
{
- if (Settings.UseManualProxy)
+ if (pithosSettings.UseManualProxy)
{
- var proxy = new WebProxy(Settings.ProxyServer, Settings.ProxyPort);
+ var proxy = new WebProxy(pithosSettings.ProxyServer, pithosSettings.ProxyPort);
//If the proxy requires specific authentication settings, use them
- if (Settings.ProxyAuthentication)
+ if (pithosSettings.ProxyAuthentication)
{
- proxy.Credentials=new NetworkCredential(Settings.ProxyUsername,Settings.ProxyPassword,Settings.ProxyDomain);
+ proxy.Credentials=new NetworkCredential(pithosSettings.ProxyUsername,pithosSettings.ProxyPassword,pithosSettings.ProxyDomain);
}
//Otherwise, if there are generic authentication settings, use them
if (!String.IsNullOrWhiteSpace(CredentialCache.DefaultNetworkCredentials.UserName))
bool ExtensionsActivated { get; set; }
int PollingInterval { get; set; }
+ byte HashingParallelism { get; set; }
void Save();
void Reload();
public bool ProxyAuthentication { get; set; }
public bool ExtensionsActivated { get; set; }
public int PollingInterval { get; set; }
+ public byte HashingParallelism { get; set; }
public PithosSettingsData()
{
using System;
+using System.Collections.Concurrent;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Text;
+using System.Threading.Tasks;
using NUnit.Framework;
namespace Pithos.Network.Test
};
client.Authenticate();
var fileName = @"vlc-1.1.11-win32.exe";
- var treeHash=Signature.CalculateTreeHashAsync(Path.Combine(@"e:\pithos\" ,fileName), 4*1024*1024 , "sha256").Result;
+ var treeHash=Signature.CalculateTreeHashAsync(Path.Combine(@"e:\pithos\" ,fileName), 4*1024*1024 , "sha256", 2).Result;
var result = client.PutHashMap(account, "pithos", fileName, treeHash).Result;
Assert.AreEqual(0,result.Count);
using System;
+using System.Collections.Concurrent;
using System.Collections.Generic;
+using System.Diagnostics;
using System.IO;
using System.Linq;
using System.Security.Cryptography;
var md5 = Signature.CalculateMD5(file);
- var hash1 = Signature.CalculateTreeHashAsync(file, (int) blockSize,"sha256").Result;
+ var hash1 = Signature.CalculateTreeHashAsync(file, (int) blockSize,"sha256", 2).Result;
Assert.IsNotNull(hash1.Hashes);
Assert.AreEqual(numBlocks, hash1.Hashes.Count());
var md5 = Signature.CalculateMD5(file);
- var hash1 = Signature.CalculateTreeHashAsync(file, (int) blockSize, "sha256").Result;
+ var hash1 = Signature.CalculateTreeHashAsync(file, (int) blockSize, "sha256", 2).Result;
hash1.FileId = Guid.NewGuid();
var task = hash1.Save(@"e:\")
.ContinueWith(_ => TreeHash.LoadTreeHash(@"e:\", hash1.FileId)).Unwrap();
}).Wait();
}
+ [Test]
+ public void TestLargeHashCalculation()
+ {
+
+ var file = "e:\\testFile.tmp";
+ if (!File.Exists(file))
+ {
+ byte[] buffer= new byte[300 * 1048576];
+ Random rnd=new Random();
+ rnd.NextBytes(buffer);
+ //Create a 100MB buffer
+ File.WriteAllBytes(file,buffer);
+ }
+
+ decimal blockSize = 4 * 1048576;
+
+ Trace.WriteLine("1");
+ var stopwatch = Stopwatch.StartNew();
+ var hash1 = Signature.CalculateTreeHashAsync(file, (int)blockSize, "sha256", 1).Result;
+ stopwatch.Stop();
+ Trace.WriteLine(stopwatch.Elapsed);
+
+ Trace.WriteLine("2");
+ stopwatch.Restart();
+ var hash2 = Signature.CalculateTreeHashAsync(file, (int)blockSize, "sha256", 2).Result;
+ stopwatch.Stop();
+ Trace.WriteLine(stopwatch.Elapsed);
+
+ Trace.WriteLine("3");
+ stopwatch.Restart();
+ var hash3 = Signature.CalculateTreeHashAsync(file, (int)blockSize, "sha256", 3).Result;
+ stopwatch.Stop();
+ Trace.WriteLine(stopwatch.Elapsed);
+
+ Assert.That(hash3.TopHash,Is.EquivalentTo(hash2.TopHash));
+
+
+
+ }
+
public static string BytesToStr(byte[] bytes)
{
var str = new StringBuilder();
client.UsePithos = true;
client.Authenticate();
var fileName = @"vlc-1.1.11-win32.exe";
- var localHash= Signature.CalculateTreeHashAsync(Path.Combine(@"e:\pithos\", fileName), 4 * 1024 * 1024, "sha256").Result;
+ var localHash= Signature.CalculateTreeHashAsync(Path.Combine(@"e:\pithos\", fileName), 4 * 1024 * 1024, "sha256", 2).Result;
var upHash= client.GetHashMap(fileName, account, "pithos").Result;
Assert.AreEqual(upHash.TopHash, localHash.TopHash);
var fileName = @"vlc-1.1.11-win32.exe";
var syncHash= Signature.CalculateTreeHash(Path.Combine(@"e:\pithos\", fileName), 4 * 1024 * 1024, "sha256");
- var asyncHash = Signature.CalculateTreeHashAsync(Path.Combine(@"e:\pithos\", fileName), 4 * 1024 * 1024, "sha256")
+ var asyncHash = Signature.CalculateTreeHashAsync(Path.Combine(@"e:\pithos\", fileName), 4 * 1024 * 1024, "sha256", 2)
.Result;
Assert.AreEqual(syncHash.TopHash, asyncHash.TopHash);
--- /dev/null
+// -----------------------------------------------------------------------\r
+// <copyright file="BlockHashAlgorithms.cs" company="Microsoft">\r
+// TODO: Update copyright text.\r
+// </copyright>\r
+// -----------------------------------------------------------------------\r
+\r
+using System.Collections.Concurrent;\r
+using System.Diagnostics.Contracts;\r
+using System.IO;\r
+using System.Security.Cryptography;\r
+using System.Threading.Tasks;\r
+using System.Threading.Tasks.Dataflow;\r
+\r
+namespace Pithos.Network\r
+{\r
+ using System;\r
+ using System.Collections.Generic;\r
+ using System.Linq;\r
+ using System.Text;\r
+\r
+ /// <summary>\r
+ /// TODO: Update summary.\r
+ /// </summary>\r
+ public static class BlockHashAlgorithms\r
+ {\r
+ public static Func<FileStream, int, string, ConcurrentDictionary<int, byte[]>, int, Task<ConcurrentDictionary<int, byte[]>>> CalculateBlockHash;\r
+\r
+ public static Task<ConcurrentDictionary<int, byte[]>> CalculateBlockHashesRecursiveAsync(FileStream stream, int blockSize, string algorithm, ConcurrentDictionary<int, byte[]> hashes = null, int index = 0)\r
+ {\r
+ if (stream == null)\r
+ throw new ArgumentNullException("stream");\r
+ if (String.IsNullOrWhiteSpace(algorithm))\r
+ throw new ArgumentNullException("algorithm");\r
+ if (blockSize <= 0)\r
+ throw new ArgumentOutOfRangeException("blockSize", "blockSize must be a value greater than zero ");\r
+ if (index < 0)\r
+ throw new ArgumentOutOfRangeException("index", "index must be a non-negative value");\r
+ Contract.EndContractBlock();\r
+\r
+\r
+ if (hashes == null)\r
+ hashes = new ConcurrentDictionary<int, byte[]>();\r
+\r
+ var buffer = new byte[blockSize];\r
+ return stream.ReadAsync(buffer, 0, blockSize).ContinueWith(t =>\r
+ {\r
+ var read = t.Result;\r
+\r
+ var nextTask = read == blockSize\r
+ ? CalculateBlockHashesRecursiveAsync(stream, blockSize, algorithm, hashes, index + 1)\r
+ : Task.Factory.StartNew(() => hashes);\r
+ if (read > 0)\r
+ using (var hasher = HashAlgorithm.Create(algorithm))\r
+ {\r
+ //This code was added for compatibility with the way Pithos calculates the last hash\r
+ //We calculate the hash only up to the last non-null byte\r
+ var lastByteIndex = Array.FindLastIndex(buffer, read - 1, aByte => aByte != 0);\r
+\r
+ var hash = hasher.ComputeHash(buffer, 0, lastByteIndex + 1);\r
+ hashes[index] = hash;\r
+ }\r
+ return nextTask;\r
+ }).Unwrap();\r
+ }\r
+ public static async Task<ConcurrentDictionary<int, byte[]>> CalculateBlockHashesAsync(FileStream stream, int blockSize, string algorithm, ConcurrentDictionary<int, byte[]> hashes = null, int index = 0)\r
+ {\r
+ if (stream == null)\r
+ throw new ArgumentNullException("stream");\r
+ if (String.IsNullOrWhiteSpace(algorithm))\r
+ throw new ArgumentNullException("algorithm");\r
+ if (blockSize <= 0)\r
+ throw new ArgumentOutOfRangeException("blockSize", "blockSize must be a value greater than zero ");\r
+ Contract.EndContractBlock();\r
+\r
+\r
+ if (hashes == null)\r
+ hashes = new ConcurrentDictionary<int, byte[]>();\r
+\r
+ var buffer = new byte[blockSize];\r
+ int read;\r
+ while ((read = await stream.ReadAsync(buffer, 0, blockSize)) > 0)\r
+ {\r
+ //TODO: identify the value of index\r
+\r
+ using (var hasher = HashAlgorithm.Create(algorithm))\r
+ {\r
+ //This code was added for compatibility with the way Pithos calculates the last hash\r
+ //We calculate the hash only up to the last non-null byte\r
+ var lastByteIndex = Array.FindLastIndex(buffer, read - 1, aByte => aByte != 0);\r
+\r
+ var hash = hasher.ComputeHash(buffer, 0, lastByteIndex + 1);\r
+ hashes[index] = hash;\r
+ }\r
+ index += read;\r
+ };\r
+ return hashes;\r
+ }\r
+ \r
+ public static async Task<ConcurrentDictionary<int, byte[]>> CalculateBlockHashesAgentAsync(FileStream stream, int blockSize, string algorithm, int parallelism)\r
+ {\r
+ if (stream == null)\r
+ throw new ArgumentNullException("stream");\r
+ if (String.IsNullOrWhiteSpace(algorithm))\r
+ throw new ArgumentNullException("algorithm");\r
+ if (blockSize <= 0)\r
+ throw new ArgumentOutOfRangeException("blockSize", "blockSize must be a value greater than zero ");\r
+ Contract.EndContractBlock();\r
+\r
+ var hashes = new ConcurrentDictionary<int, byte[]>();\r
+\r
+ var options = new ExecutionDataflowBlockOptions {BoundedCapacity = parallelism,MaxDegreeOfParallelism=parallelism};\r
+ var hashBlock=new ActionBlock<Tuple<int,byte[]>>(input=>\r
+ {\r
+ int idx = input.Item1;\r
+ byte[] block = input.Item2;\r
+ using (var hasher = HashAlgorithm.Create(algorithm))\r
+ {\r
+ //This code was added for compatibility with the way Pithos calculates the last hash\r
+ //We calculate the hash only up to the last non-null byte\r
+ var lastByteIndex = Array.FindLastIndex(block, block.Length-1, aByte => aByte != 0);\r
+\r
+ var hash = hasher.ComputeHash(block, 0, lastByteIndex + 1);\r
+ hashes[idx] = hash;\r
+ } \r
+ },options);\r
+\r
+ var buffer = new byte[blockSize];\r
+ int read;\r
+ int index = 0;\r
+ while ((read = await stream.ReadAsync(buffer, 0, blockSize)) > 0)\r
+ {\r
+ var block = new byte[read];\r
+ Buffer.BlockCopy(buffer,0,block,0,read);\r
+ await hashBlock.SendAsync(Tuple.Create(index, block));\r
+ index += read;\r
+ };\r
+\r
+ hashBlock.Complete();\r
+ await hashBlock.Completion;\r
+\r
+ return hashes;\r
+ }\r
+\r
+ static BlockHashAlgorithms()\r
+ {\r
+ CalculateBlockHash = CalculateBlockHashesRecursiveAsync;\r
+ }\r
+ }\r
+}\r
Contract.Ensures(StorageUrl != null);
Contract.Ensures(_baseClient != null);
Contract.Ensures(RootAddressUri != null);
- Contract.EndContractBlock();
+ Contract.EndContractBlock();
_baseClient = new RestClient
{
BaseAddress = accountInfo.StorageUri.ToString(),
Timeout = 10000,
Retries = 3,
- Proxy=this.Proxy
};
StorageUrl = accountInfo.StorageUri;
Token = accountInfo.Token;
<Reference Include="System" />
<Reference Include="System.ComponentModel.Composition" />
<Reference Include="System.Core" />
+ <Reference Include="System.ServiceModel" />
+ <Reference Include="System.Threading.Tasks.Dataflow">
+ <HintPath>..\Libraries\System.Threading.Tasks.Dataflow.dll</HintPath>
+ </Reference>
<Reference Include="System.Xml.Linq" />
<Reference Include="System.Data.DataSetExtensions" />
<Reference Include="System.Data" />
</ItemGroup>
<ItemGroup>
<Compile Include="AccountInfo.cs" />
+ <Compile Include="BlockHashAlgorithms.cs" />
<Compile Include="CloudFilesClient.cs" />
<Compile Include="ContainerInfo.cs" />
<Compile Include="ICloudClient.cs" />
throw new ArgumentNullException("algorithm");
Contract.EndContractBlock();
- var hash=CalculateTreeHashAsync(filePath, blockSize, algorithm);
+ var hash=CalculateTreeHashAsync(filePath, blockSize, algorithm, 2);
return hash.Result;
}
- public static async Task<TreeHash> CalculateTreeHashAsync(FileInfo fileInfo, int blockSize, string algorithm)
+ public static async Task<TreeHash> CalculateTreeHashAsync(FileInfo fileInfo, int blockSize, string algorithm, byte parallelism)
{
if (fileInfo == null)
throw new ArgumentNullException("fileInfo");
if (String.IsNullOrWhiteSpace(algorithm))
throw new ArgumentNullException("algorithm");
Contract.EndContractBlock();
-
- return await CalculateTreeHashAsync(fileInfo.FullName, blockSize, algorithm);
+
+ return await CalculateTreeHashAsync(fileInfo.FullName, blockSize, algorithm, parallelism);
}
- public static async Task<TreeHash> CalculateTreeHashAsync(string filePath, int blockSize,string algorithm)
+ public static async Task<TreeHash> CalculateTreeHashAsync(string filePath, int blockSize,string algorithm, int parallelism)
{
if (String.IsNullOrWhiteSpace(filePath))
throw new ArgumentNullException("filePath");
using (var stream = new FileStream(filePath, FileMode.Open, FileAccess.Read, FileShare.ReadWrite, blockSize, true))
{
//Calculate the blocks asyncrhonously
- var hashes = await CalculateBlockHashesAsync(stream, blockSize, algorithm);
+ var hashes = await BlockHashAlgorithms.CalculateBlockHashesAgentAsync(stream, blockSize, algorithm, parallelism);
//And then proceed with creating and returning a TreeHash
var length = stream.Length;
}
}*/
- private static Task<ConcurrentDictionary<int, byte[]>> CalculateBlockHashesAsync(FileStream stream, int blockSize, string algorithm, ConcurrentDictionary<int, byte[]> hashes=null, int index = 0)
- {
- if (stream==null)
- throw new ArgumentNullException("stream");
- if (String.IsNullOrWhiteSpace(algorithm))
- throw new ArgumentNullException("algorithm");
- if (blockSize <= 0)
- throw new ArgumentOutOfRangeException("blockSize", "blockSize must be a value greater than zero ");
- if (index< 0)
- throw new ArgumentOutOfRangeException("index", "index must be a non-negative value");
- Contract.EndContractBlock();
-
-
- if (hashes==null)
- hashes= new ConcurrentDictionary<int, byte[]>();
-
- var buffer = new byte[blockSize];
- return stream.ReadAsync(buffer, 0, blockSize).ContinueWith(t =>
- {
- var read = t.Result;
-
- var nextTask = read == blockSize
- ? CalculateBlockHashesAsync(stream, blockSize, algorithm, hashes, index + 1)
- : Task.Factory.StartNew(() => hashes);
- if (read>0)
- using (var hasher = HashAlgorithm.Create(algorithm))
- {
- //This code was added for compatibility with the way Pithos calculates the last hash
- //We calculate the hash only up to the last non-null byte
- //TODO: Remove if the server starts using the full block instead of the trimmed block
- var lastByteIndex = Array.FindLastIndex(buffer, read - 1, aByte => aByte != 0);
-
- var hash = hasher.ComputeHash(buffer, 0, lastByteIndex+1);
- hashes[index]=hash;
- }
- return nextTask;
- }).Unwrap();
- }
public int PollingInterval { get; set; }
+ public byte HashingParallelism{get; set; }
+
public void Save()
{
get { return _settings.Value.PollingInterval; }
set { _settings.Value.PollingInterval = value; }
}
+
+ public byte HashingParallelism
+ {
+ get { return _settings.Value.HashingParallelism; }
+ set { _settings.Value.HashingParallelism = value; }
+ }
public void Save()
{
public int PollingInterval { get; set; }
+ public byte HashingParallelism { get; set; }
+
public bool ProxyAuthentication { get; set; }
public void Save()