<Compile Include="Converters\DummyConverter.cs" />
<Compile Include="Converters\EmptyToVisibilityConverter.cs" />
<Compile Include="Converters\EnumTypeConverter.cs" />
+ <Compile Include="Converters\ImageNameToImageConverter.cs" />
<Compile Include="Converters\NullToVisibilityConverter.cs" />
<Compile Include="Converters\SingleLineConverter.cs" />
<Compile Include="Diagnostics\log4netForwarder.cs" />
<ItemGroup>
<Resource Include="Images\pithos_logo-icon.ico" />
</ItemGroup>
- <ItemGroup />
+ <ItemGroup>
+ <Resource Include="Images\Pause.png" />
+ </ItemGroup>
+ <ItemGroup>
+ <Resource Include="Images\Play.png" />
+ </ItemGroup>
<Import Project="$(MSBuildToolsPath)\Microsoft.CSharp.targets" />
<!-- To modify your build process, add your task inside one of the targets below and uncomment it.
Other similar extension points exist, see Microsoft.Common.targets.
xmlns="http://schemas.microsoft.com/winfx/2006/xaml/presentation"
xmlns:x="http://schemas.microsoft.com/winfx/2006/xaml"
- Title="Pithos+ Status" Height="100" Width="250" WindowStyle="ToolWindow" Topmost="True" Icon="/PithosPlus;component/Images/PithosTaskbar.png" WindowStartupLocation="CenterOwner">
+ Title="Pithos+ Status" Height="100" Width="250" WindowStyle="ToolWindow" Topmost="True" Icon="/PithosPlus;component/Images/PithosTaskbar.png" WindowStartupLocation="CenterOwner" xmlns:my="clr-namespace:Pithos.Client.WPF.Converters">
+ <Window.Resources>
+ <my:ImageNameToImageConverter x:Key="PathToImage" />
+ </Window.Resources>
<Border HorizontalAlignment="Stretch"
VerticalAlignment="Stretch" CornerRadius="2">
<Grid>
<Grid.ColumnDefinitions>
<ColumnDefinition Width="Auto"/>
<ColumnDefinition/>
+ <ColumnDefinition Width="Auto"/>
</Grid.ColumnDefinitions>
- <Image Grid.Column="0" Width="48" Height="48" HorizontalAlignment="Right" Margin="0,5,5,0" VerticalAlignment="Top" Source="/PithosPlus;component/Images/PithosTaskbar.png" Stretch="Fill" Opacity="0.7" ToolTip="Close Balloon" />
+ <Grid.RowDefinitions>
+ <RowDefinition />
+ <RowDefinition Height="Auto"/>
+ </Grid.RowDefinitions>
+ <Image Grid.Column="0" Grid.RowSpan="2" Width="48" Height="48" HorizontalAlignment="Right" Margin="0,5,5,0" VerticalAlignment="Top" Source="/PithosPlus;component/Images/PithosTaskbar.png" Stretch="Fill" Opacity="0.7" ToolTip="Close Balloon" />
<TextBlock TextWrapping="Wrap" x:Name="Status"
HorizontalAlignment="Stretch"
- VerticalAlignment="Stretch" Grid.Column="1" Margin="2,5,2,2"/>
+ VerticalAlignment="Stretch" Grid.Column="1" Grid.RowSpan="2" Margin="2,5,2,2"/>
+ <Button Grid.Column="2" Grid.Row="1" x:Name="ToggleSyncing"
+ ToolTip="{Binding PauseToolTip}" >
+ <Image Source="{Binding Path=PauseImage, Converter={StaticResource PathToImage},FallbackValue='Pause.png'}"
+ Stretch="Uniform" Height="16"/>
+ </Button>
</Grid>
</Border>
</Window>
if (e.PropertyName=="StatusMessage")
NotifyOfPropertyChange(()=>Status);
}
+
+ private string _pauseImage="../Images/Pause.png";
+ public string PauseImage
+ {
+ get { return _pauseImage; }
+ set
+ {
+ _pauseImage = value;
+ NotifyOfPropertyChange(()=>PauseImage);
+ }
+ }
+
+ private string _pauseToolTip="Pause Syncing";
+ public string PauseToolTip
+ {
+ get { return _pauseToolTip; }
+ set
+ {
+ _pauseToolTip = value;
+ NotifyOfPropertyChange(()=>PauseToolTip);
+ }
+ }
+
+ public void ToggleSyncing()
+ {
+ Shell.ToggleSynching();
+ PauseImage = Shell.IsPaused
+ ? "../Images/Play.png"
+ : "../Images/Pause.png";
+ PauseToolTip = Shell.IsPaused
+ ? "Resume Syncing"
+ : "Pause Syncing";
+ }
}
}
<MenuItem x:Name="StatusMessage" Header="{Binding Path=StatusMessage, Converter={StaticResource SingleConverter}}" MaxWidth="200" cal:Message.Attach="CurrentSyncStatus" ToolTip="{Binding TooltipMessage}"/>
<Separator Visibility="{Binding Path=HasAccounts, Converter={StaticResource BooleanToVisible}}"/>
<MenuItem Header="{Binding PauseSyncCaption}" x:Name="ToggleSynching" cal:Message.Attach="ToggleSynching" Visibility="{Binding Path=HasAccounts, Converter={StaticResource BooleanToVisible}}"/>
+ <MenuItem Header="Cancel current operation" x:Name="CancelCurrentOperation" cal:Message.Attach="CancelCurrentOperation" />
<Separator />
<MenuItem x:Name="ShowConflicts" Header="Show Conflicts" Visibility="{Binding Path=HasConflicts, Converter={StaticResource BooleanToVisible}}" cal:Message.Attach="ShowConflicts" />
<MenuItem Header="Preferences ..." x:Name="ShowPreferences" cal:Message.Attach="ShowPreferences" />
private static readonly log4net.ILog Log = log4net.LogManager.GetLogger(MethodBase.GetCurrentMethod().DeclaringType);
private readonly PollAgent _pollAgent;
+ private readonly NetworkAgent _networkAgent;
private MiniStatusViewModel _miniStatus;
/// The PithosSettings class encapsulates the app's settings to abstract their storage mechanism (App settings, a database or registry)
///</remarks>
[ImportingConstructor]
- public ShellViewModel(IWindowManager windowManager, IEventAggregator events, IStatusChecker statusChecker, PithosSettings settings,PollAgent pollAgent)
+ public ShellViewModel(IWindowManager windowManager, IEventAggregator events, IStatusChecker statusChecker, PithosSettings settings,PollAgent pollAgent,NetworkAgent networkAgent)
{
try
{
_events.Subscribe(this);
_pollAgent = pollAgent;
+ _networkAgent = networkAgent;
Settings = settings;
Proxy.SetFromSettings(settings);
#region Commands
+ public void CancelCurrentOperation()
+ {
+ _networkAgent.CancelCurrentOperation();
+ }
+
public void ShowPreferences()
{
ShowPreferences(null);
return newInfo;
}
+ private bool _isPaused;
+ public bool IsPaused
+ {
+ get { return _isPaused; }
+ set
+ {
+ _isPaused = value;
+ PauseSyncCaption = IsPaused ? "Resume syncing" : "Pause syncing";
+ var iconKey = IsPaused ? "TraySyncPaused" : "TrayInSynch";
+ StatusIcon = String.Format(@"../Images/{0}.ico", iconKey);
+
+ NotifyOfPropertyChange(() => IsPaused);
+ }
+ }
- public void ToggleSynching()
+ public void ToggleSynching()
{
- bool isPaused=false;
- foreach (var pair in Monitors)
+ IsPaused=!IsPaused;
+ foreach (var monitor in Monitors.Values)
{
- var monitor = pair.Value;
- monitor.Pause = !monitor.Pause;
- isPaused = monitor.Pause;
+ monitor.Pause = IsPaused ;
}
-
+ _pollAgent.Pause = IsPaused;
+ _networkAgent.Pause = IsPaused;
+
- PauseSyncCaption = isPaused ? "Resume syncing" : "Pause syncing";
- var iconKey = isPaused? "TraySyncPaused" : "TrayInSynch";
- StatusIcon = String.Format(@"../Images/{0}.ico", iconKey);
}
public void ExitPithos()
var parts = item.Name.Split('/');
- //Dont't add files
- if (!item.IsDirectory)
+ if (item.IsDirectory)
{
+ AddParentNodes(lookups, rootNodes, parts, item);
+
+ //Store each item using its current path
+ var newNode = new DirectoryRecord {DisplayName = parts.Last(), ObjectInfo = item};
+ AddNode(rootNodes, lookups, newNode);
+ }
+ else
+ {
+ //Dont't add files
//But check to ensure that we DO have it's parent on record
- //It it exist
+ //If it exist
if (lookups.TryGetValue(parentPath, out parent))
{
//Just continue
//Since this is not a directory, we won't add the item itself
AddParentNodes(lookups, rootNodes, parts, item);
}
- else
- {
- AddParentNodes(lookups, rootNodes, parts, item);
-
- //Store each item using its current path
- var newNode = new DirectoryRecord {DisplayName = parts.Last(), ObjectInfo = item};
- AddNode(rootNodes, lookups, newNode);
- }
-
}
return rootNodes;
}
using System.IO;
using System.Linq;
using System.Text;
+using System.Threading;
using System.Threading.Tasks;
using NUnit.Framework;
using Pithos.Core.Agents;
var fileInfo = new FileInfo(filePath);
var uploader = new Uploader();
- uploader.UploadWithHashMap(accountInfo,cloudFile,fileInfo,fileName,treeHash);
+ uploader.UploadWithHashMap(accountInfo,cloudFile,fileInfo,fileName,treeHash, CancellationToken.None);
var newHash = await client.GetHashMap(null, FolderConstants.PithosContainer, fileName);
var newHash = client.GetHashMap(null, FolderConstants.PithosContainer, fileName).Result;
var downloader = new Downloader();
- downloader.DownloadWithBlocks(accountInfo, client, cloudFile, new Uri(fileName, UriKind.Relative), filePath, newHash)
+ downloader.DownloadWithBlocks(accountInfo, client, cloudFile, new Uri(fileName, UriKind.Relative), filePath, newHash, CancellationToken.None)
.Wait();
Assert.IsTrue(File.Exists(filePath));
onError(ex);
}
return default(T);
- });
+ },CancellationToken);
}
}
}
using System;
+using System.Collections.Generic;
using System.ComponentModel.Composition;
using System.Diagnostics.Contracts;
using System.IO;
+using System.Linq;
using System.Reflection;
+using System.Threading;
using System.Threading.Tasks;
using Pithos.Interfaces;
using Pithos.Network;
public IStatusNotification StatusNotification { get; set; }
+/*
+ private CancellationTokenSource _cts=new CancellationTokenSource();
+
+ public void SignalStop()
+ {
+ _cts.Cancel();
+ }
+*/
+
+
//Download a file.
- public async Task DownloadCloudFile(AccountInfo accountInfo, ObjectInfo cloudFile, string filePath)
+ public async Task DownloadCloudFile(AccountInfo accountInfo, ObjectInfo cloudFile, string filePath,CancellationToken cancellationToken)
{
if (accountInfo == null)
throw new ArgumentNullException("accountInfo");
if (!Path.IsPathRooted(filePath))
throw new ArgumentException("The filePath must be rooted", "filePath");
Contract.EndContractBlock();
+ using (ThreadContext.Stacks["Operation"].Push("DownloadCloudFile"))
+ {
+ // var cancellationToken=_cts.Token;// .ThrowIfCancellationRequested();
- using (ThreadContext.Stacks["Operation"].Push("DownloadCloudFile"))
- {
+ cancellationToken.ThrowIfCancellationRequested();
+ await UnpauseEvent.WaitAsync();
- var localPath = FileInfoExtensions.GetProperFilePathCapitalization(filePath);
- var relativeUrl = new Uri(cloudFile.Name, UriKind.Relative);
- var url = relativeUrl.ToString();
- if (cloudFile.Name.EndsWith(".ignore", StringComparison.InvariantCultureIgnoreCase))
- return;
+ var localPath = FileInfoExtensions.GetProperFilePathCapitalization(filePath);
+ var relativeUrl = new Uri(cloudFile.Name, UriKind.Relative);
+ var url = relativeUrl.ToString();
+ if (cloudFile.Name.EndsWith(".ignore", StringComparison.InvariantCultureIgnoreCase))
+ return;
- //Are we already downloading or uploading the file?
- using (var gate = NetworkGate.Acquire(localPath, NetworkOperation.Downloading))
- {
- if (gate.Failed)
+ if (!Selectives.IsSelected(cloudFile))
return;
- var client = new CloudFilesClient(accountInfo);
- var account = cloudFile.Account;
- var container = cloudFile.Container;
- if (cloudFile.IsDirectory)
+ //Are we already downloading or uploading the file?
+ using (var gate = NetworkGate.Acquire(localPath, NetworkOperation.Downloading))
{
- if (!Directory.Exists(localPath))
- try
- {
- Directory.CreateDirectory(localPath);
- if (Log.IsDebugEnabled)
- Log.DebugFormat("Created Directory [{0}]", localPath);
- }
- catch (IOException)
- {
- var localInfo = new FileInfo(localPath);
- if (localInfo.Exists && localInfo.Length == 0)
+ if (gate.Failed)
+ return;
+
+ var client = new CloudFilesClient(accountInfo);
+ var account = cloudFile.Account;
+ var container = cloudFile.Container;
+
+ if (cloudFile.IsDirectory)
+ {
+ if (!Directory.Exists(localPath))
+ try
{
- Log.WarnFormat("Malformed directory object detected for [{0}]", localPath);
- localInfo.Delete();
Directory.CreateDirectory(localPath);
if (Log.IsDebugEnabled)
Log.DebugFormat("Created Directory [{0}]", localPath);
}
- }
- }
- else
- {
- var isChanged = IsObjectChanged(cloudFile, localPath);
- if (isChanged)
+ catch (IOException)
+ {
+ var localInfo = new FileInfo(localPath);
+ if (localInfo.Exists && localInfo.Length == 0)
+ {
+ Log.WarnFormat("Malformed directory object detected for [{0}]", localPath);
+ localInfo.Delete();
+ Directory.CreateDirectory(localPath);
+ if (Log.IsDebugEnabled)
+ Log.DebugFormat("Created Directory [{0}]", localPath);
+ }
+ }
+ }
+ else
{
- //Retrieve the hashmap from the server
- var serverHash = await client.GetHashMap(account, container, url);
- //If it's a small file
- if (serverHash.Hashes.Count == 1)
- //Download it in one go
- await
- DownloadEntireFileAsync(accountInfo, client, cloudFile, relativeUrl, localPath);
- //Otherwise download it block by block
- else
- await
- DownloadWithBlocks(accountInfo, client, cloudFile, relativeUrl, localPath,
- serverHash);
-
- if (!cloudFile.IsWritable(accountInfo.UserName))
+ var isChanged = IsObjectChanged(cloudFile, localPath);
+ if (isChanged)
{
- var attributes = File.GetAttributes(localPath);
- File.SetAttributes(localPath, attributes | FileAttributes.ReadOnly);
+ //Retrieve the hashmap from the server
+ var serverHash = await client.GetHashMap(account, container, url);
+ //If it's a small file
+ if (serverHash.Hashes.Count == 1)
+ //Download it in one go
+ await
+ DownloadEntireFileAsync(accountInfo, client, cloudFile, relativeUrl, localPath,cancellationToken);
+ //Otherwise download it block by block
+ else
+ await
+ DownloadWithBlocks(accountInfo, client, cloudFile, relativeUrl, localPath,
+ serverHash,cancellationToken);
+
+ if (!cloudFile.IsWritable(accountInfo.UserName))
+ {
+ var attributes = File.GetAttributes(localPath);
+ File.SetAttributes(localPath, attributes | FileAttributes.ReadOnly);
+ }
}
}
- }
- //Now we can store the object's metadata without worrying about ghost status entries
- StatusKeeper.StoreInfo(localPath, cloudFile);
+ //Now we can store the object's metadata without worrying about ghost status entries
+ StatusKeeper.StoreInfo(localPath, cloudFile);
+ }
}
- }
+
}
//Download a file asynchronously using blocks
- public async Task DownloadWithBlocks(AccountInfo accountInfo, CloudFilesClient client, ObjectInfo cloudFile, Uri relativeUrl, string filePath, TreeHash serverHash)
+ public async Task DownloadWithBlocks(AccountInfo accountInfo, CloudFilesClient client, ObjectInfo cloudFile, Uri relativeUrl, string filePath, TreeHash serverHash, CancellationToken cancellationToken)
{
if (client == null)
throw new ArgumentNullException("client");
throw new ArgumentException("cloudFile is a directory, not a file", "cloudFile");
Contract.EndContractBlock();
+ cancellationToken.ThrowIfCancellationRequested();
+ await UnpauseEvent.WaitAsync();
+
var fileAgent = GetFileAgent(accountInfo);
var localPath = FileInfoExtensions.GetProperFilePathCapitalization(filePath);
var relativePath = relativeUrl.RelativeUriToFilePath();
var blockUpdater = new BlockUpdater(fileAgent.CachePath, localPath, relativePath, serverHash);
-
StatusNotification.SetPithosStatus(PithosStatus.LocalSyncing, String.Format("Calculating hashmap for {0} before download", Path.GetFileName(localPath)));
//Calculate the file's treehash
+
+ //TODO: Should pass cancellation token here
var treeHash = await Signature.CalculateTreeHashAsync(localPath, serverHash.BlockSize, serverHash.BlockHash, 2);
//And compare it with the server's hash
ReportDownloadProgress(Path.GetFileName(localPath), 0, upHashes.Length, cloudFile.Bytes);
for (int i = 0; i < upHashes.Length; i++)
{
+ cancellationToken.ThrowIfCancellationRequested();
+ await UnpauseEvent.WaitAsync();
+
//For every non-matching hash
var upHash = upHashes[i];
if (!localHashes.ContainsKey(upHash))
if (i < upHashes.Length - 1)
end = ((i + 1) * serverHash.BlockSize);
+ //TODO: Pass token here
//Download the missing block
- var block = await client.GetBlock(cloudFile.Account, cloudFile.Container, relativeUrl, start, end);
+ var block = await client.GetBlock(cloudFile.Account, cloudFile.Container, relativeUrl, start, end,cancellationToken);
//and store it
blockUpdater.StoreBlock(i, block);
}
//Download a small file with a single GET operation
- private async Task DownloadEntireFileAsync(AccountInfo accountInfo, CloudFilesClient client, ObjectInfo cloudFile, Uri relativeUrl, string filePath)
+ private async Task DownloadEntireFileAsync(AccountInfo accountInfo, CloudFilesClient client, ObjectInfo cloudFile, Uri relativeUrl, string filePath, CancellationToken cancellationToken)
{
if (client == null)
throw new ArgumentNullException("client");
throw new ArgumentException("cloudFile is a directory, not a file", "cloudFile");
Contract.EndContractBlock();
+ cancellationToken.ThrowIfCancellationRequested();
+ await UnpauseEvent.WaitAsync();
+
var localPath = FileInfoExtensions.GetProperFilePathCapitalization(filePath);
StatusNotification.SetPithosStatus(PithosStatus.LocalSyncing, String.Format("Downloading {0}", Path.GetFileName(localPath)));
StatusNotification.Notify(new CloudNotification { Data = cloudFile });
if (!Directory.Exists(tempFolder))
Directory.CreateDirectory(tempFolder);
+ //TODO: Should pass the token here
+
//Download the object to the temporary location
- await client.GetObject(cloudFile.Account, cloudFile.Container, relativeUrl.ToString(), tempPath);
+ await client.GetObject(cloudFile.Account, cloudFile.Container, relativeUrl.ToString(), tempPath,cancellationToken);
//Create the local folder if it doesn't exist (necessary for shared objects)
var localFolder = Path.GetDirectoryName(localPath);
return AgentLocator<FileAgent>.Get(accountInfo.AccountPath);
}
+ [Import]
+ public Selectives Selectives { get; set; }
+ public AsyncManualResetEvent UnpauseEvent { get; set; }
}
}
public string CachePath { get; set; }
- private List<string> _selectivePaths = new List<string>();
+ /*private List<string> _selectivePaths = new List<string>();
public List<string> SelectivePaths
{
get { return _selectivePaths; }
set { _selectivePaths = value; }
}
+*/
+ public Selectives Selectives { get; set; }
public void Post(WorkflowState workflowState)
return true;
//Ignore if selective synchronization is defined,
- return SelectivePaths.Count > 0
//And the target file is not below any of the selective paths
- && !SelectivePaths.Any(filePath.IsAtOrDirectlyBelow);
+ return !Selectives.IsSelected(AccountInfo, filePath);
}
/* private static bool FoundInRoot(string filePath, string rootPath)
using System.Diagnostics;
using System.Diagnostics.Contracts;
using System.IO;
+using System.Linq;
using System.Net;
using System.Reflection;
using System.Threading;
[System.ComponentModel.Composition.Import]
public IPithosSettings Settings { get; set; }
+ private Uploader _uploader;
+
[System.ComponentModel.Composition.Import]
- public Uploader Uploader { get; set; }
+ public Uploader Uploader
+ {
+ get { return _uploader; }
+ set
+ {
+ _uploader = value;
+ _uploader.UnpauseEvent = _unPauseEvent;
+ }
+ }
+
+ private Downloader _downloader;
[System.ComponentModel.Composition.Import]
- public Downloader Downloader { get; set; }
+ public Downloader Downloader
+ {
+ get { return _downloader; }
+ set
+ {
+ _downloader = value;
+ _downloader.UnpauseEvent = _unPauseEvent;
+ }
+ }
+ [System.ComponentModel.Composition.Import]
+ public Selectives Selectives { get; set; }
+
//The Proceed signals the poll agent that it can proceed with polling.
//Essentially it stops the poll agent to give priority to the network agent
//Initially the event is signalled because we don't need to pause
private readonly AsyncManualResetEvent _proceedEvent = new AsyncManualResetEvent(true);
+ private Agents.Selectives _selectives;
+ private bool _pause;
public AsyncManualResetEvent ProceedEvent
{
get { return _proceedEvent; }
}
+ private readonly AsyncManualResetEvent _unPauseEvent = new AsyncManualResetEvent(true);
+
+ private CancellationTokenSource _currentOperationCancellation=new CancellationTokenSource();
+
+ public void CancelCurrentOperation()
+ {
+ //What does it mean to cancel the current upload/download?
+ //Obviously, the current operation will be cancelled by throwing
+ //a cancellation exception.
+ //
+ //The default behavior is to retry any operations that throw.
+ //Obviously this is not what we want in this situation.
+ //The cancelled operation should NOT bea retried.
+ //
+ //This can be done by catching the cancellation exception
+ //and avoiding the retry.
+ //
+
+ //Have to reset the cancellation source - it is not possible to reset the source
+ //Have to prevent a case where an operation requests a token from the old source
+ var oldSource = Interlocked.Exchange(ref _currentOperationCancellation, new CancellationTokenSource());
+ oldSource.Cancel();
+
+ }
public void Start()
{
loop = () =>
{
DeleteAgent.ProceedEvent.Wait();
+ _unPauseEvent.Wait();
var message = inbox.Receive();
var process=message.Then(Process,inbox.CancellationToken);
inbox.LoopAsync(process, loop);
{
case CloudActionType.UploadUnconditional:
//Abort if the file was deleted before we reached this point
- await Uploader.UploadCloudFile(action);
+ await Uploader.UploadCloudFile(action,CurrentOperationCancelToken);
break;
case CloudActionType.DownloadUnconditional:
- await Downloader.DownloadCloudFile(accountInfo, cloudFile, downloadPath);
+ await Downloader.DownloadCloudFile(accountInfo, cloudFile, downloadPath, CurrentOperationCancelToken);
break;
case CloudActionType.RenameCloud:
var moveAction = (CloudMoveAction)action;
case CloudActionType.MustSynch:
if (!File.Exists(downloadPath) && !Directory.Exists(downloadPath))
{
- await Downloader.DownloadCloudFile(accountInfo, cloudFile, downloadPath);
+ await Downloader.DownloadCloudFile(accountInfo, cloudFile, downloadPath, CurrentOperationCancelToken);
}
else
{
}
}
*/
- catch (OperationCanceledException)
- {
- throw;
+ catch (OperationCanceledException ex)
+ {
+ Log.WarnFormat("Cancelling [{0}]",ex);
}
catch (DirectoryNotFoundException)
{
}
}
+ private CancellationToken CurrentOperationCancelToken
+ {
+ get { return _currentOperationCancellation.Token; }
+ }
+
private void UpdateStatus(PithosStatus status)
{
// If the previous tophash matches the local tophash, the file was only changed on the server.
if (localHash == previousCloudHash)
{
- await Downloader.DownloadCloudFile(accountInfo, cloudFile, downloadPath);
+ await Downloader.DownloadCloudFile(accountInfo, cloudFile, downloadPath, CurrentOperationCancelToken);
}
else
{
get { return _agent.CancellationToken; }
}
+ public bool Pause
+ {
+ get {
+ return _pause;
+ }
+ set {
+ _pause = value;
+ if (_pause)
+ _unPauseEvent.Reset();
+ else
+ {
+ _unPauseEvent.Set();
+ }
+ }
+ }
private void RenameCloudFile(AccountInfo accountInfo,CloudMoveAction action)
}
+
}
[System.ComponentModel.Composition.Import]\r
public NetworkAgent NetworkAgent { get; set; }\r
\r
+ [System.ComponentModel.Composition.Import]\r
+ public Selectives Selectives { get; set; }\r
+\r
public IStatusNotification StatusNotification { get; set; }\r
\r
+ public bool Pause\r
+ {\r
+ get {\r
+ return _pause;\r
+ }\r
+ set {\r
+ _pause = value; \r
+ if (!_pause)\r
+ _unPauseEvent.Set();\r
+ else\r
+ {\r
+ _unPauseEvent.Reset();\r
+ }\r
+ }\r
+ }\r
+\r
private bool _firstPoll = true;\r
\r
//The Sync Event signals a manual synchronisation\r
private readonly AsyncManualResetEvent _syncEvent = new AsyncManualResetEvent();\r
\r
+ private readonly AsyncManualResetEvent _unPauseEvent = new AsyncManualResetEvent(true);\r
+\r
private readonly ConcurrentDictionary<string, DateTime> _lastSeen = new ConcurrentDictionary<string, DateTime>();\r
private readonly ConcurrentDictionary<Uri, AccountInfo> _accounts = new ConcurrentDictionary<Uri,AccountInfo>();\r
\r
var nextSince = since;\r
try\r
{\r
+ await _unPauseEvent.WaitAsync();\r
UpdateStatus(PithosStatus.PollSyncing);\r
\r
var tasks = from accountInfo in _accounts.Values\r
{\r
var sync = _syncEvent.WaitAsync();\r
var wait = TaskEx.Delay(TimeSpan.FromSeconds(Settings.PollingInterval), NetworkAgent.CancellationToken);\r
+ \r
var signaledTask = await TaskEx.WhenAny(sync, wait);\r
-\r
+ \r
+ //Pausing takes precedence over manual sync or awaiting\r
+ _unPauseEvent.Wait();\r
+ \r
//Wait for network processing to finish before polling\r
var pauseTask=NetworkAgent.ProceedEvent.WaitAsync();\r
await TaskEx.WhenAll(signaledTask, pauseTask);\r
\r
var differencer = _differencer.PostSnapshot(accountInfo, cleanRemotes);\r
\r
- var filterUris = SelectiveUris[accountInfo.AccountKey];\r
+ var filterUris = Selectives.SelectiveUris[accountInfo.AccountKey];\r
\r
ProcessDeletedFiles(accountInfo, differencer.Deleted.FilterDirectlyBelow(filterUris));\r
\r
.Except(NetworkAgent.GetEnumerable(), new LocalFileComparer())\r
.ToList();\r
\r
+ await _unPauseEvent.WaitAsync();\r
//Queue all the actions\r
foreach (var message in distinctActions)\r
{\r
\r
readonly AccountsDifferencer _differencer = new AccountsDifferencer();\r
private Dictionary<Uri, List<Uri>> _selectiveUris = new Dictionary<Uri, List<Uri>>();\r
+ private bool _pause;\r
\r
/// <summary>\r
/// Deletes local files that are not found in the list of cloud files\r
}\r
}\r
\r
- public void SetSyncUris(Uri accountKey, Uri[] uris)\r
- { \r
- SelectiveUris[accountKey]=uris.ToList();\r
- }\r
-\r
- protected Dictionary<Uri,List<Uri>> SelectiveUris\r
- {\r
- get { return _selectiveUris;}\r
- set { _selectiveUris = value; }\r
- }\r
-\r
public void AddAccount(AccountInfo accountInfo)\r
{\r
//Avoid adding a duplicate accountInfo\r
--- /dev/null
+using System;
+using System.Collections.Generic;
+using System.ComponentModel.Composition;
+using System.IO;
+using System.Linq;
+using System.Text;
+using Pithos.Interfaces;
+using Pithos.Network;
+
+namespace Pithos.Core.Agents
+{
+ [Export(typeof(Selectives))]
+ public class Selectives
+ {
+
+ public Dictionary<Uri, List<Uri>> SelectiveUris { get; private set; }
+
+ private Dictionary<Uri, List<string>> SelectivePaths { get; set; }
+
+ public Selectives()
+ {
+ SelectiveUris = new Dictionary<Uri, List<Uri>>();
+ SelectivePaths = new Dictionary<Uri, List<string>>();
+ }
+
+ public void SetSelectedUris(AccountInfo account,List<Uri> uris)
+ {
+ SelectiveUris[account.AccountKey] = uris;
+ SelectivePaths[account.AccountKey] = UrisToFilePaths(account,uris);
+ }
+
+ public bool IsSelected(ObjectInfo info)
+ {
+ List<Uri> filterUris;
+ return !SelectiveUris.TryGetValue(info.AccountKey, out filterUris)
+ || filterUris.Count ==0
+ || filterUris.Any(f => info.Uri.IsAtOrDirectlyBelow(f));
+ }
+
+ public bool IsSelected(AccountInfo account,FileSystemInfo info)
+ {
+ return IsSelected(account,info.FullName);
+ }
+
+ public bool IsSelected(AccountInfo account, string fullPath)
+ {
+ List<string> paths;
+ return !SelectivePaths.TryGetValue(account.AccountKey, out paths)
+ || paths.Count == 0
+ || paths.Any(fullPath.IsAtOrDirectlyBelow);
+ }
+
+ /// <summary>
+ /// Return a list of absolute filepaths from a list of Uris
+ /// </summary>
+ /// <param name="uris"></param>
+ /// <returns></returns>
+ private List<string> UrisToFilePaths(AccountInfo account,IEnumerable<Uri> uris)
+ {
+ if (uris == null)
+ return new List<string>();
+
+ var accountPath = account.AccountPath;
+ var storageUrl = account.StorageUri.ToString();
+ var own = (from uri in uris
+ where uri.ToString().StartsWith(storageUrl)
+ let relativePath = account.StorageUri.MakeRelativeUri(uri).RelativeUriToFilePath()
+ //Trim the account name
+ select Path.Combine(accountPath, relativePath.After(account.UserName + '\\'))).ToList();
+ var others = (from uri in uris
+ where !uri.ToString().StartsWith(storageUrl)
+ let relativePath = account.StorageUri.MakeRelativeUri(uri).RelativeUriToFilePath()
+ //Trim the account name
+ select Path.Combine(accountPath, "others-shared", relativePath)).ToList();
+ return own.Union(others).ToList();
+ }
+
+ }
+}
using System;
+using System.Collections.Generic;
using System.ComponentModel.Composition;
using System.Diagnostics;
using System.Diagnostics.Contracts;
using System.IO;
+using System.Linq;
using System.Net;
using System.Reflection;
+using System.Threading;
using System.Threading.Tasks;
using Pithos.Interfaces;
using Pithos.Network;
public IStatusNotification StatusNotification { get; set; }
- public async Task UploadCloudFile(CloudAction action)
+
+ //CancellationTokenSource _cts = new CancellationTokenSource();
+ /*public void SignalStop()
+ {
+ _cts.Cancel();
+ }*/
+
+ public async Task UploadCloudFile(CloudAction action,CancellationToken cancellationToken)
{
if (action == null)
throw new ArgumentNullException("action");
{
try
{
+ await UnpauseEvent.WaitAsync();
+
var fileInfo = action.LocalFile;
if (fileInfo.Extension.Equals("ignore", StringComparison.InvariantCultureIgnoreCase))
return;
+ if (Selectives.IsSelected(action.AccountInfo, fileInfo))
+ return;
+
//Try to load the action's local state, if it is empty
if (action.FileState == null)
action.FileState = StatusKeeper.GetStateByFilePath(fileInfo.FullName);
}
+ await UnpauseEvent.WaitAsync();
+
if (fileInfo is DirectoryInfo)
{
//If the directory doesn't exist the Hash property will be empty
//Upload even small files using the Hashmap. The server may already contain
//the relevant block
- await UploadWithHashMap(accountInfo, cloudFile, fileInfo as FileInfo, cloudFile.Name, treeHash);
+
+
+ await UploadWithHashMap(accountInfo, cloudFile, fileInfo as FileInfo, cloudFile.Name, treeHash,cancellationToken);
}
//If everything succeeds, change the file and overlay status to normal
StatusKeeper.SetFileState(fullFileName, FileStatus.Unchanged, FileOverlayStatus.Normal, "");
}
- public async Task UploadWithHashMap(AccountInfo accountInfo, ObjectInfo cloudFile, FileInfo fileInfo, string url, TreeHash treeHash)
+ public async Task UploadWithHashMap(AccountInfo accountInfo, ObjectInfo cloudFile, FileInfo fileInfo, string url, TreeHash treeHash, CancellationToken token)
{
if (accountInfo == null)
throw new ArgumentNullException("accountInfo");
throw new ArgumentException("Invalid container", "cloudFile");
Contract.EndContractBlock();
+
using (StatusNotification.GetNotifier("Uploading {0}", "Finished Uploading {0}", fileInfo.Name))
{
-
+ token.ThrowIfCancellationRequested();
+ await UnpauseEvent.WaitAsync();
+
var fullFileName = fileInfo.GetProperCapitalization();
var account = cloudFile.Account ?? accountInfo.UserName;
var container = cloudFile.Container;
+
var client = new CloudFilesClient(accountInfo);
//Send the hashmap to the server
var missingHashes = await client.PutHashMap(account, container, url, treeHash);
while (missingHashes.Count > 0)
{
+ token.ThrowIfCancellationRequested();
+ await UnpauseEvent.WaitAsync();
+
var buffer = new byte[accountInfo.BlockSize];
foreach (var missingHash in missingHashes)
{
+ token.ThrowIfCancellationRequested();
+ await UnpauseEvent.WaitAsync();
+
//Find the proper block
var blockIndex = treeHash.HashDictionary[missingHash];
long offset = blockIndex*accountInfo.BlockSize;
ReportUploadProgress(fileInfo.Name, block++, missingHashes.Count, fileInfo.Length);
}
+ token.ThrowIfCancellationRequested();
//Repeat until there are no more missing hashes
missingHashes = await client.PutHashMap(account, container, url, treeHash);
}
}
return false;
}
+
+ [Import]
+ public Selectives Selectives { get; set; }
+
+ public AsyncManualResetEvent UnpauseEvent { get; set; }
}
}
[System.ComponentModel.Composition.Import]
public IPithosSettings Settings { get; set; }
- private List<string> _selectivePaths = new List<string>();
- public List<string> SelectivePaths
- {
- get { return _selectivePaths; }
- set { _selectivePaths = value; }
- }
+ [System.ComponentModel.Composition.Import]
+ public Selectives Selectives { get; set; }
public WorkflowAgent()
{
return;*/
//TODO: Need to handle folder renames
- //If there are selective sync paths defined
- if (SelectivePaths.Count > 0
- //And the target file is not below any of the selective paths
- && !SelectivePaths.Any(workflowState.Path.IsAtOrDirectlyBelow))
- //abort the post
+
+ if (!Selectives.IsSelected(workflowState.AccountInfo, workflowState.Path))
{
- Log.InfoFormat("File skipped, not under a selected folder [{0}] ",workflowState.Path);
+ Log.InfoFormat("File skipped, not under a selected folder [{0}] ", workflowState.Path);
return;
}
-
Debug.Assert(workflowState.Path.StartsWith(workflowState.AccountInfo.AccountPath, StringComparison.InvariantCultureIgnoreCase), "File from wrong account posted");
_agent.Post(workflowState);
<Compile Include="Agents\Notifier.cs" />
<Compile Include="Agents\ObjectInfoComparer.cs" />
<Compile Include="Agents\PollAgent.cs" />
+ <Compile Include="Agents\SelectiveUris.cs" />
<Compile Include="Agents\SnapshotDifferencer.cs" />
<Compile Include="Agents\Uploader.cs" />
<Compile Include="Agents\WorkflowAgent.cs" />
}
+
+
private IPithosWorkflow _workflow;
[Import]
[Import]
public NetworkAgent NetworkAgent { get; set; }
[Import]
- public PollAgent PollAgent { get; set; }
+ public PollAgent PollAgent { get; set; }
+
+ private Selectives _selectives;
+
+ [Import]
+ public Selectives Selectives
+ {
+ get { return _selectives; }
+ set
+ {
+ _selectives = value;
+ FileAgent.Selectives = value;
+ }
+ }
public string UserName { get; set; }
private string _apiKey;
//Convert the uris to paths
var selectivePaths = UrisToFilePaths(uris);
- FileAgent.SelectivePaths=selectivePaths;
- WorkflowAgent.SelectivePaths = selectivePaths;
- PollAgent.SetSyncUris(_accountInfo.AccountKey,uris);
-
+ var selectiveUri = uris.ToList();
+ this.Selectives.SetSelectedUris(_accountInfo,selectiveUri);
+
var removedPaths = UrisToFilePaths(removed);
UnversionSelectivePaths(removedPaths);
using System.Diagnostics.Contracts;
using System.Linq;
using System.Text;
+using System.Threading;
using NUnit.Framework;
using Pithos.Interfaces;
using System.IO;
string downloadFile = "test2.txt";
- client.GetObject(null, "Shares", testFileName, downloadFile)
+ client.GetObject(null, "Shares", testFileName, downloadFile,CancellationToken.None)
.Wait();
Assert.IsTrue(File.Exists(downloadFile));
using System.Reflection;
using System.Security.Cryptography;
using System.Text;
+using System.Threading;
using System.Threading.Tasks;
using Newtonsoft.Json;
using Pithos.Interfaces;
/// <remarks>This method should have no timeout or a very long one</remarks>
//Asynchronously download the object specified by *objectName* in a specific *container* to
// a local file
- public Task GetObject(string account, string container, string objectName, string fileName)
+ public async Task GetObject(string account, string container, string objectName, string fileName,CancellationToken cancellationToken)
{
if (String.IsNullOrWhiteSpace(container))
throw new ArgumentNullException("container", "The container property can't be empty");
//object to avoid concurrency errors.
//
//Download operations take a long time therefore they have no timeout.
- var client = new RestClient(_baseClient) { Timeout = 0 };
- if (!String.IsNullOrWhiteSpace(account))
- client.BaseAddress = GetAccountUrl(account);
+ using(var client = new RestClient(_baseClient) { Timeout = 0 })
+ {
+ if (!String.IsNullOrWhiteSpace(account))
+ client.BaseAddress = GetAccountUrl(account);
- //The container and objectName are relative names. They are joined with the client's
- //BaseAddress to create the object's absolute address
- var builder = client.GetAddressBuilder(container, objectName);
- var uri = builder.Uri;
+ //The container and objectName are relative names. They are joined with the client's
+ //BaseAddress to create the object's absolute address
+ var builder = client.GetAddressBuilder(container, objectName);
+ var uri = builder.Uri;
- //Download progress is reported to the Trace log
- Log.InfoFormat("[GET] START {0}", objectName);
- client.DownloadProgressChanged += (sender, args) =>
- Log.InfoFormat("[GET PROGRESS] {0} {1}% {2} of {3}",
- fileName, args.ProgressPercentage,
- args.BytesReceived,
- args.TotalBytesToReceive);
+ //Download progress is reported to the Trace log
+ Log.InfoFormat("[GET] START {0}", objectName);
+ client.DownloadProgressChanged += (sender, args) =>
+ Log.InfoFormat("[GET PROGRESS] {0} {1}% {2} of {3}",
+ fileName, args.ProgressPercentage,
+ args.BytesReceived,
+ args.TotalBytesToReceive);
+
+ //Start downloading the object asynchronously
+ await client.DownloadFileTaskAsync(uri, fileName,cancellationToken);
- //Start downloading the object asynchronously
- var downloadTask = client.DownloadFileTask(uri, fileName);
-
- //Once the download completes
- return downloadTask.ContinueWith(download =>
- {
- //Delete the local client object
- client.Dispose();
- //And report failure or completion
- if (download.IsFaulted)
- {
- Log.ErrorFormat("[GET] FAIL for {0} with \r{1}", objectName,
- download.Exception);
- }
- else
- {
- Log.InfoFormat("[GET] END {0}", objectName);
- }
- });
+ //Once the download completes
+ //Delete the local client object
+ }
+ //And report failure or completion
}
catch (Exception exc)
{
- Log.ErrorFormat("[GET] END {0} with {1}", objectName, exc);
+ Log.ErrorFormat("[GET] FAIL {0} with {1}", objectName, exc);
throw;
}
+ Log.InfoFormat("[GET] END {0}", objectName);
}
}
-
- public Task<byte[]> GetBlock(string account, string container, Uri relativeUrl, long start, long? end)
+
+ public async Task<byte[]> GetBlock(string account, string container, Uri relativeUrl, long start, long? end, CancellationToken cancellationToken)
{
if (String.IsNullOrWhiteSpace(Token))
throw new InvalidOperationException("Invalid Token");
throw new InvalidOperationException("Invalid Storage Url");
if (String.IsNullOrWhiteSpace(container))
throw new ArgumentNullException("container");
- if (relativeUrl== null)
+ if (relativeUrl == null)
throw new ArgumentNullException("relativeUrl");
- if (end.HasValue && end<0)
+ if (end.HasValue && end < 0)
throw new ArgumentOutOfRangeException("end");
- if (start<0)
+ if (start < 0)
throw new ArgumentOutOfRangeException("start");
Contract.EndContractBlock();
-
//Don't use a timeout because putting the hashmap may be a long process
- var client = new RestClient(_baseClient) {Timeout = 0, RangeFrom = start, RangeTo = end};
- if (!String.IsNullOrWhiteSpace(account))
- client.BaseAddress = GetAccountUrl(account);
+ using (var client = new RestClient(_baseClient) {Timeout = 0, RangeFrom = start, RangeTo = end})
+ {
+ if (!String.IsNullOrWhiteSpace(account))
+ client.BaseAddress = GetAccountUrl(account);
- var builder = client.GetAddressBuilder(container, relativeUrl.ToString());
- var uri = builder.Uri;
+ var builder = client.GetAddressBuilder(container, relativeUrl.ToString());
+ var uri = builder.Uri;
- return client.DownloadDataTask(uri)
- .ContinueWith(t=>
- {
- client.Dispose();
- return t.Result;
- });
+ var result = await client.DownloadDataTaskAsync(uri, cancellationToken);
+ return result;
+ }
}
//Don't use a timeout because putting the hashmap may be a long process
using (var client = new RestClient(_baseClient) { Timeout = 0 })
- {
+ {
if (!String.IsNullOrWhiteSpace(account))
client.BaseAddress = GetAccountUrl(account);
var buffer = new byte[count];
Buffer.BlockCopy(block, offset, buffer, 0, count);
//Send the block
- await client.UploadDataTask(uri, "POST", buffer);
+ await client.UploadDataTaskAsync(uri, "POST", buffer);
Log.InfoFormat("[BLOCK POST] END");
}
}
{
Log.InfoFormat("Completed {0}", fileName);
}
- };
+ };
if (contentType=="application/directory")
await client.UploadDataTaskAsync(uri, "PUT", new byte[0]);
else
using System.Collections.Generic;
using System.Diagnostics.Contracts;
using System.Net;
+using System.Threading;
using System.Threading.Tasks;
using Pithos.Interfaces;
#endregion
#region Object operations
- Task GetObject(string account, string container, string objectName, string fileName);
+ Task GetObject(string account, string container, string objectName, string fileName,CancellationToken cancellationToken);
Task PutObject(string account, string container, string objectName, string fileName, string hash = null, string contentType = "application/octet-stream");
void DeleteObject(string account, string container, string objectName);
//void DeleteObject(string container, string objectName, string account = null);
Task<TreeHash> GetHashMap(string account, string container, string objectName);
Task<IList<string>> PutHashMap(string account, string container, string objectName, TreeHash hash);
Task PostBlock(string account, string container, byte[] block, int offset, int count);
- Task<byte[]> GetBlock(string account, string container, Uri relativeUrl, long start, long? end);
+ Task<byte[]> GetBlock(string account, string container, Uri relativeUrl, long start, long? end,CancellationToken cancellationToken);
#endregion
#region Sharing operations
Contract.Requires(!String.IsNullOrWhiteSpace(container));
}
- public Task GetObject(string account, string container, string objectName, string fileName)
+ public Task GetObject(string account, string container, string objectName, string fileName,CancellationToken cancellationToken)
{
Contract.Requires(!String.IsNullOrWhiteSpace(Token));
return default(Task);
}
- public Task<byte[]> GetBlock(string account, string container, Uri relativeUrl, long start, long? end)
+ public Task<byte[]> GetBlock(string account, string container, Uri relativeUrl, long start, long? end,CancellationToken cancellationToken)
{
Contract.Requires(!String.IsNullOrWhiteSpace(Token));