From aa7ac00e76724d9f37e85051b96137d2110183da Mon Sep 17 00:00:00 2001 From: Panagiotis Kanavos Date: Mon, 20 Feb 2012 16:58:09 +0200 Subject: [PATCH] Extracted polling functionality to a separate PollAgent.cs The PollAgent pauses polling while network operations are in progress Modified deleted file processing to ignore files in the Trash folder. --- trunk/Pithos.Client.WPF/Shell/ShellViewModel.cs | 2 +- trunk/Pithos.Core/Agents/Agent.cs | 5 + trunk/Pithos.Core/Agents/NetworkAgent.cs | 366 ++---------------- trunk/Pithos.Core/Agents/PollAgent.cs | 461 +++++++++++++++++++++++ trunk/Pithos.Core/Pithos.Core.csproj | 1 + trunk/Pithos.Core/PithosMonitor.cs | 8 +- 6 files changed, 498 insertions(+), 345 deletions(-) create mode 100644 trunk/Pithos.Core/Agents/PollAgent.cs diff --git a/trunk/Pithos.Client.WPF/Shell/ShellViewModel.cs b/trunk/Pithos.Client.WPF/Shell/ShellViewModel.cs index 0bc650f..98e4627 100644 --- a/trunk/Pithos.Client.WPF/Shell/ShellViewModel.cs +++ b/trunk/Pithos.Client.WPF/Shell/ShellViewModel.cs @@ -465,7 +465,7 @@ namespace Pithos.Client.WPF { public void SynchNow() { - var agent = IoC.Get(); + var agent = IoC.Get(); agent.SynchNow(); } diff --git a/trunk/Pithos.Core/Agents/Agent.cs b/trunk/Pithos.Core/Agents/Agent.cs index f6ad241..ec927a6 100644 --- a/trunk/Pithos.Core/Agents/Agent.cs +++ b/trunk/Pithos.Core/Agents/Agent.cs @@ -69,6 +69,11 @@ namespace Pithos.Core CancellationToken = _cancelSource.Token; } + public bool IsEmpty + { + get { return _queue.IsEmpty; } + } + public void Post(TMessage message) { _messages.Add(message); diff --git a/trunk/Pithos.Core/Agents/NetworkAgent.cs b/trunk/Pithos.Core/Agents/NetworkAgent.cs index a1001af..167886b 100644 --- a/trunk/Pithos.Core/Agents/NetworkAgent.cs +++ b/trunk/Pithos.Core/Agents/NetworkAgent.cs @@ -83,16 +83,18 @@ namespace Pithos.Core.Agents [System.ComponentModel.Composition.Import] public IPithosSettings Settings { get; set; } - private bool _firstPoll = true; - - //The Sync Event signals a manual synchronisation - private readonly AsyncManualResetEvent _syncEvent=new AsyncManualResetEvent(); + //The Pause event stops the poll agent to give priority to the network agent + //Initially the event is signalled because we don't need to pause + private readonly AsyncManualResetEvent _pauseEvent = new AsyncManualResetEvent(true); + + public AsyncManualResetEvent PauseEvent + { + get { return _pauseEvent; } + } - private ConcurrentDictionary _lastSeen = new ConcurrentDictionary(); public void Start() { - _firstPoll = true; _agent = Agent.Start(inbox => { Action loop = null; @@ -116,8 +118,8 @@ namespace Pithos.Core.Agents throw new ArgumentException("The action.AccountInfo is empty","action"); Contract.EndContractBlock(); - UpdateStatus(PithosStatus.Syncing); - var accountInfo = action.AccountInfo; + + using (log4net.ThreadContext.Stacks["NETWORK"].Push("PROCESS")) { @@ -127,7 +129,11 @@ namespace Pithos.Core.Agents var downloadPath = action.GetDownloadPath(); try - { + { + _pauseEvent.Reset(); + UpdateStatus(PithosStatus.Syncing); + var accountInfo = action.AccountInfo; + if (action.Action == CloudActionType.DeleteCloud) { //Redirect deletes to the delete agent @@ -200,7 +206,9 @@ namespace Pithos.Core.Agents } finally { - UpdateStatus(PithosStatus.InSynch); + if (_agent.IsEmpty) + _pauseEvent.Set(); + UpdateStatus(PithosStatus.InSynch); } } } @@ -337,351 +345,25 @@ namespace Pithos.Core.Agents } - /// - /// Start a manual synchronization - /// - public void SynchNow() - { - _syncEvent.Set(); - } - - //Remote files are polled periodically. Any changes are processed - public async Task PollRemoteFiles(DateTime? since = null) + public IEnumerable GetEnumerable() { - Debug.Assert(Thread.CurrentThread.IsBackground,"Polling Ended up in the main thread!"); - - UpdateStatus(PithosStatus.Syncing); - StatusNotification.Notify(new PollNotification()); - - using (log4net.ThreadContext.Stacks["Retrieve Remote"].Push("All accounts")) - { - //If this poll fails, we will retry with the same since value - var nextSince = since; - try - { - //Next time we will check for all changes since the current check minus 1 second - //This is done to ensure there are no discrepancies due to clock differences - var current = DateTime.Now.AddSeconds(-1); - - var tasks = from accountInfo in _accounts - select ProcessAccountFiles(accountInfo, since); - - await TaskEx.WhenAll(tasks.ToList()); - - _firstPoll = false; - //Reschedule the poll with the current timestamp as a "since" value - nextSince = current; - } - catch (Exception ex) - { - Log.ErrorFormat("Error while processing accounts\r\n{0}",ex); - //In case of failure retry with the same "since" value - } - - UpdateStatus(PithosStatus.InSynch); - //Wait for the polling interval to pass or the Sync event to be signalled - nextSince = await WaitForScheduledOrManualPoll(nextSince); - - PollRemoteFiles(nextSince); - - } + return _agent.GetEnumerable(); } - /// - /// Wait for the polling period to expire or a manual sync request - /// - /// - /// - private async Task WaitForScheduledOrManualPoll(DateTime? since) + public Task GetDeleteAwaiter() { - var sync=_syncEvent.WaitAsync(); - var wait = TaskEx.Delay(TimeSpan.FromSeconds(Settings.PollingInterval), _agent.CancellationToken); - var signaledTask = await TaskEx.WhenAny(sync, wait); - - //If polling is signalled by SynchNow, ignore the since tag - if (signaledTask is Task) - return null; - return since; + return _deleteAgent.PauseEvent.WaitAsync(); } - - public async Task ProcessAccountFiles(AccountInfo accountInfo,DateTime? since=null) - { - if (accountInfo==null) - throw new ArgumentNullException("accountInfo"); - if (String.IsNullOrWhiteSpace(accountInfo.AccountPath)) - throw new ArgumentException("The AccountInfo.AccountPath is empty","accountInfo"); - Contract.EndContractBlock(); - - - using (log4net.ThreadContext.Stacks["Retrieve Remote"].Push(accountInfo.UserName)) - { - await _deleteAgent.PauseEvent.WaitAsync(); - - Log.Info("Scheduled"); - var client = new CloudFilesClient(accountInfo); - - var containers = client.ListContainers(accountInfo.UserName); - - - CreateContainerFolders(accountInfo, containers); - - try - { - await _deleteAgent.PauseEvent.WaitAsync(); - //Get the poll time now. We may miss some deletions but it's better to keep a file that was deleted - //than delete a file that was created while we were executing the poll - var pollTime = DateTime.Now; - - //Get the list of server objects changed since the last check - //The name of the container is passed as state in order to create a dictionary of tasks in a subsequent step - var listObjects = (from container in containers - select Task>.Factory.StartNew(_ => - client.ListObjects(accountInfo.UserName,container.Name, since),container.Name)).ToList(); - - var listShared = Task>.Factory.StartNew(_ => client.ListSharedObjects(since), "shared"); - listObjects.Add(listShared); - var listTasks = await Task.Factory.WhenAll(listObjects.ToArray()); - - using (log4net.ThreadContext.Stacks["SCHEDULE"].Push("Process Results")) - { - var dict = listTasks.ToDictionary(t => t.AsyncState); - - //Get all non-trash objects. Remember, the container name is stored in AsyncState - var remoteObjects = from objectList in listTasks - where (string) objectList.AsyncState != "trash" - from obj in objectList.Result - select obj; - - var trashObjects = dict["trash"].Result; - var sharedObjects = dict["shared"].Result; - - //Items with the same name, hash may be both in the container and the trash - //Don't delete items that exist in the container - var realTrash = from trash in trashObjects - where - !remoteObjects.Any( - info => info.Name == trash.Name && info.Hash == trash.Hash) - select trash; - ProcessTrashedFiles(accountInfo, realTrash); - - var cleanRemotes = (from info in remoteObjects.Union(sharedObjects) - let name = info.Name - where !name.EndsWith(".ignore", StringComparison.InvariantCultureIgnoreCase) && - !name.StartsWith(FolderConstants.CacheFolder + "/", - StringComparison.InvariantCultureIgnoreCase) - select info).ToList(); - - var differencer = _differencer.PostSnapshot(accountInfo, cleanRemotes); - - ProcessDeletedFiles(accountInfo, differencer.Deleted, pollTime); - - //Create a list of actions from the remote files - var allActions = ChangesToActions(accountInfo, differencer.Changed) - .Union( - CreatesToActions(accountInfo,differencer.Created)); - - //And remove those that are already being processed by the agent - var distinctActions = allActions - .Except(_agent.GetEnumerable(), new PithosMonitor.LocalFileComparer()) - .ToList(); - - //Queue all the actions - foreach (var message in distinctActions) - { - Post(message); - } - - Log.Info("[LISTENER] End Processing"); - } - } - catch (Exception ex) - { - Log.ErrorFormat("[FAIL] ListObjects for{0} in ProcessRemoteFiles with {1}", accountInfo.UserName, ex); - return; - } - - Log.Info("[LISTENER] Finished"); - - } - } - - AccountsDifferencer _differencer= new AccountsDifferencer(); - - /// - /// Deletes local files that are not found in the list of cloud files - /// - /// - /// - /// - private void ProcessDeletedFiles(AccountInfo accountInfo, IEnumerable cloudFiles, DateTime pollTime) + public CancellationToken CancellationToken { - if (accountInfo == null) - throw new ArgumentNullException("accountInfo"); - if (String.IsNullOrWhiteSpace(accountInfo.AccountPath)) - throw new ArgumentException("The AccountInfo.AccountPath is empty", "accountInfo"); - if (cloudFiles == null) - throw new ArgumentNullException("cloudFiles"); - Contract.EndContractBlock(); - - //On the first run - if (_firstPoll) - { - //Only consider files that are not being modified, ie they are in the Unchanged state - var deleteCandidates = FileState.Queryable.Where(state => - state.FilePath.StartsWith(accountInfo.AccountPath) - && state.FileStatus == FileStatus.Unchanged).ToList(); - - - //TODO: filesToDelete must take into account the Others container - var filesToDelete = (from deleteCandidate in deleteCandidates - let localFile = FileInfoExtensions.FromPath(deleteCandidate.FilePath) - let relativeFilePath = localFile.AsRelativeTo(accountInfo.AccountPath) - where - !cloudFiles.Any(r => r.RelativeUrlToFilePath(accountInfo.UserName) == relativeFilePath) - select localFile).ToList(); - - - - //Set the status of missing files to Conflict - foreach (var item in filesToDelete) - { - //Try to acquire a gate on the file, to take into account files that have been dequeued - //and are being processed - using (var gate = NetworkGate.Acquire(item.FullName, NetworkOperation.Deleting)) - { - if (gate.Failed) - continue; - StatusKeeper.SetFileState(item.FullName, FileStatus.Conflict, FileOverlayStatus.Deleted); - } - } - UpdateStatus(PithosStatus.HasConflicts); - StatusNotification.NotifyConflicts(filesToDelete, String.Format("{0} local files are missing from Pithos, possibly because they were deleted",filesToDelete.Count)); - StatusNotification.NotifyForFiles(filesToDelete, String.Format("{0} files were deleted", filesToDelete.Count), TraceLevel.Info); - } - else - { - var deletedFiles = new List(); - foreach (var objectInfo in cloudFiles) - { - var relativePath = objectInfo.RelativeUrlToFilePath(accountInfo.UserName); - var item = GetFileAgent(accountInfo).GetFileSystemInfo(relativePath); - if (item.Exists) - { - if ((item.Attributes & FileAttributes.ReadOnly) == FileAttributes.ReadOnly) - { - item.Attributes = item.Attributes & ~FileAttributes.ReadOnly; - - } - item.Delete(); - DateTime lastDate; - _lastSeen.TryRemove(item.FullName, out lastDate); - deletedFiles.Add(item); - } - StatusKeeper.SetFileState(item.FullName, FileStatus.Deleted, FileOverlayStatus.Deleted); - } - StatusNotification.NotifyForFiles(deletedFiles, String.Format("{0} files were deleted", deletedFiles.Count), TraceLevel.Info); - } - - } - - private static void CreateContainerFolders(AccountInfo accountInfo, IEnumerable containers) - { - var containerPaths = from container in containers - let containerPath = Path.Combine(accountInfo.AccountPath, container.Name) - where container.Name != FolderConstants.TrashContainer && !Directory.Exists(containerPath) - select containerPath; - - foreach (var path in containerPaths) - { - Directory.CreateDirectory(path); - } - } - - //Creates an appropriate action for each server file - private IEnumerable ChangesToActions(AccountInfo accountInfo,IEnumerable changes) - { - if (changes==null) - throw new ArgumentNullException(); - Contract.EndContractBlock(); - var fileAgent = GetFileAgent(accountInfo); - - //In order to avoid multiple iterations over the files, we iterate only once - //over the remote files - foreach (var objectInfo in changes) - { - var relativePath = objectInfo.RelativeUrlToFilePath(accountInfo.UserName); - //and remove any matching objects from the list, adding them to the commonObjects list - if (fileAgent.Exists(relativePath)) - { - //If a directory object already exists, we don't need to perform any other action - var localFile = fileAgent.GetFileSystemInfo(relativePath); - if (objectInfo.Content_Type == @"application/directory" && localFile is DirectoryInfo) - continue; - using (new SessionScope(FlushAction.Never)) - { - var state = StatusKeeper.GetStateByFilePath(localFile.FullName); - _lastSeen[localFile.FullName] = DateTime.Now; - //Common files should be checked on a per-case basis to detect differences, which is newer - - yield return new CloudAction(accountInfo, CloudActionType.MustSynch, - localFile, objectInfo, state, accountInfo.BlockSize, - accountInfo.BlockHash); - } - } - else - { - //Remote files should be downloaded - yield return new CloudDownloadAction(accountInfo,objectInfo); - } - } + get { return _agent.CancellationToken; } } - private IEnumerable CreatesToActions(AccountInfo accountInfo,IEnumerable creates) - { - if (creates==null) - throw new ArgumentNullException(); - Contract.EndContractBlock(); - var fileAgent = GetFileAgent(accountInfo); - - //In order to avoid multiple iterations over the files, we iterate only once - //over the remote files - foreach (var objectInfo in creates) - { - var relativePath = objectInfo.RelativeUrlToFilePath(accountInfo.UserName); - //and remove any matching objects from the list, adding them to the commonObjects list - if (fileAgent.Exists(relativePath)) - { - //If the object already exists, we probably have a conflict - //If a directory object already exists, we don't need to perform any other action - var localFile = fileAgent.GetFileSystemInfo(relativePath); - StatusKeeper.SetFileState(localFile.FullName,FileStatus.Conflict,FileOverlayStatus.Conflict); - } - else - { - //Remote files should be downloaded - yield return new CloudDownloadAction(accountInfo,objectInfo); - } - } - } - - private static FileAgent GetFileAgent(AccountInfo accountInfo) { return AgentLocator.Get(accountInfo.AccountPath); } - private void ProcessTrashedFiles(AccountInfo accountInfo,IEnumerable trashObjects) - { - var fileAgent = GetFileAgent(accountInfo); - foreach (var trashObject in trashObjects) - { - var barePath = trashObject.RelativeUrlToFilePath(accountInfo.UserName); - //HACK: Assume only the "pithos" container is used. Must find out what happens when - //deleting a file from a different container - var relativePath = Path.Combine("pithos", barePath); - fileAgent.Delete(relativePath); - } - } private void RenameCloudFile(AccountInfo accountInfo,CloudMoveAction action) diff --git a/trunk/Pithos.Core/Agents/PollAgent.cs b/trunk/Pithos.Core/Agents/PollAgent.cs new file mode 100644 index 0000000..583dd5b --- /dev/null +++ b/trunk/Pithos.Core/Agents/PollAgent.cs @@ -0,0 +1,461 @@ +#region +/* ----------------------------------------------------------------------- + * + * + * Copyright 2011-2012 GRNET S.A. All rights reserved. + * + * Redistribution and use in source and binary forms, with or + * without modification, are permitted provided that the following + * conditions are met: + * + * 1. Redistributions of source code must retain the above + * copyright notice, this list of conditions and the following + * disclaimer. + * + * 2. Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following + * disclaimer in the documentation and/or other materials + * provided with the distribution. + * + * + * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS + * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED + * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR + * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR + * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF + * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED + * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT + * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN + * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + * + * The views and conclusions contained in the software and + * documentation are those of the authors and should not be + * interpreted as representing official policies, either expressed + * or implied, of GRNET S.A. + * + * ----------------------------------------------------------------------- + */ +#endregion + +using System.Collections.Concurrent; +using System.ComponentModel.Composition; +using System.Diagnostics; +using System.Diagnostics.Contracts; +using System.IO; +using System.Threading; +using System.Threading.Tasks; +using System.Threading.Tasks.Dataflow; +using Castle.ActiveRecord; +using Pithos.Interfaces; +using Pithos.Network; +using log4net; + +namespace Pithos.Core.Agents +{ + using System; + using System.Collections.Generic; + using System.Linq; + using System.Text; + + /// + /// PollAgent periodically polls the server to detect object changes. The agent retrieves a listing of all + /// objects and compares it with a previously cached version to detect differences. + /// New files are downloaded, missing files are deleted from the local file system and common files are compared + /// to determine the appropriate action + /// + [Export] + public class PollAgent + { + private static readonly ILog Log = LogManager.GetLogger("PollAgent"); + + [System.ComponentModel.Composition.Import] + public IStatusKeeper StatusKeeper { get; set; } + + [System.ComponentModel.Composition.Import] + public IPithosSettings Settings { get; set; } + + [System.ComponentModel.Composition.Import] + public NetworkAgent NetworkAgent { get; set; } + + public IStatusNotification StatusNotification { get; set; } + + private bool _firstPoll = true; + + //The Sync Event signals a manual synchronisation + private readonly AsyncManualResetEvent _syncEvent = new AsyncManualResetEvent(); + + private ConcurrentDictionary _lastSeen = new ConcurrentDictionary(); + private readonly ConcurrentBag _accounts = new ConcurrentBag(); + + + /// + /// Start a manual synchronization + /// + public void SynchNow() + { + _syncEvent.Set(); + } + + //Remote files are polled periodically. Any changes are processed + public async Task PollRemoteFiles(DateTime? since = null) + { + Debug.Assert(Thread.CurrentThread.IsBackground, "Polling Ended up in the main thread!"); + + UpdateStatus(PithosStatus.Syncing); + StatusNotification.Notify(new PollNotification()); + + using (log4net.ThreadContext.Stacks["Retrieve Remote"].Push("All accounts")) + { + //If this poll fails, we will retry with the same since value + var nextSince = since; + try + { + //Next time we will check for all changes since the current check minus 1 second + //This is done to ensure there are no discrepancies due to clock differences + var current = DateTime.Now.AddSeconds(-1); + + var tasks = from accountInfo in _accounts + select ProcessAccountFiles(accountInfo, since); + + await TaskEx.WhenAll(tasks.ToList()); + + _firstPoll = false; + //Reschedule the poll with the current timestamp as a "since" value + nextSince = current; + } + catch (Exception ex) + { + Log.ErrorFormat("Error while processing accounts\r\n{0}", ex); + //In case of failure retry with the same "since" value + } + + UpdateStatus(PithosStatus.InSynch); + //Wait for the polling interval to pass or the Sync event to be signalled + nextSince = await WaitForScheduledOrManualPoll(nextSince); + + TaskEx.Run(()=>PollRemoteFiles(nextSince)); + + } + } + + /// + /// Wait for the polling period to expire or a manual sync request + /// + /// + /// + private async Task WaitForScheduledOrManualPoll(DateTime? since) + { + var sync = _syncEvent.WaitAsync(); + var wait = TaskEx.Delay(TimeSpan.FromSeconds(Settings.PollingInterval), NetworkAgent.CancellationToken); + var signaledTask = await TaskEx.WhenAny(sync, wait); + + //Wait for network processing to finish before polling + var pauseTask=NetworkAgent.PauseEvent.WaitAsync(); + await TaskEx.WhenAll(signaledTask, pauseTask); + + //If polling is signalled by SynchNow, ignore the since tag + if (sync.IsCompleted) + { + //TODO: Must convert to AutoReset + _syncEvent.Reset(); + return null; + } + return since; + } + + public async Task ProcessAccountFiles(AccountInfo accountInfo, DateTime? since = null) + { + if (accountInfo == null) + throw new ArgumentNullException("accountInfo"); + if (String.IsNullOrWhiteSpace(accountInfo.AccountPath)) + throw new ArgumentException("The AccountInfo.AccountPath is empty", "accountInfo"); + Contract.EndContractBlock(); + + + using (log4net.ThreadContext.Stacks["Retrieve Remote"].Push(accountInfo.UserName)) + { + await NetworkAgent.GetDeleteAwaiter(); + + Log.Info("Scheduled"); + var client = new CloudFilesClient(accountInfo); + + var containers = client.ListContainers(accountInfo.UserName); + + + CreateContainerFolders(accountInfo, containers); + + try + { + await NetworkAgent.GetDeleteAwaiter(); + //Get the poll time now. We may miss some deletions but it's better to keep a file that was deleted + //than delete a file that was created while we were executing the poll + var pollTime = DateTime.Now; + + //Get the list of server objects changed since the last check + //The name of the container is passed as state in order to create a dictionary of tasks in a subsequent step + var listObjects = (from container in containers + select Task>.Factory.StartNew(_ => + client.ListObjects(accountInfo.UserName, container.Name, since), container.Name)).ToList(); + + var listShared = Task>.Factory.StartNew(_ => client.ListSharedObjects(since), "shared"); + listObjects.Add(listShared); + var listTasks = await Task.Factory.WhenAll(listObjects.ToArray()); + + using (log4net.ThreadContext.Stacks["SCHEDULE"].Push("Process Results")) + { + var dict = listTasks.ToDictionary(t => t.AsyncState); + + //Get all non-trash objects. Remember, the container name is stored in AsyncState + var remoteObjects = from objectList in listTasks + where (string)objectList.AsyncState != "trash" + from obj in objectList.Result + select obj; + + var trashObjects = dict["trash"].Result; + var sharedObjects = dict["shared"].Result; + + //DON'T process trashed files + //If some files are deleted and added again to a folder, they will be deleted + //even though they are new. + //We would have to check file dates and hashes to ensure that a trashed file + //can be deleted safely from the local hard drive. + /* + //Items with the same name, hash may be both in the container and the trash + //Don't delete items that exist in the container + var realTrash = from trash in trashObjects + where + !remoteObjects.Any( + info => info.Name == trash.Name && info.Hash == trash.Hash) + select trash; + ProcessTrashedFiles(accountInfo, realTrash); +*/ + + var cleanRemotes = (from info in remoteObjects.Union(sharedObjects) + let name = info.Name + where !name.EndsWith(".ignore", StringComparison.InvariantCultureIgnoreCase) && + !name.StartsWith(FolderConstants.CacheFolder + "/", + StringComparison.InvariantCultureIgnoreCase) + select info).ToList(); + + var differencer = _differencer.PostSnapshot(accountInfo, cleanRemotes); + + ProcessDeletedFiles(accountInfo, differencer.Deleted, pollTime); + + //Create a list of actions from the remote files + var allActions = ChangesToActions(accountInfo, differencer.Changed) + .Union( + CreatesToActions(accountInfo, differencer.Created)); + + //And remove those that are already being processed by the agent + var distinctActions = allActions + .Except(NetworkAgent.GetEnumerable(), new PithosMonitor.LocalFileComparer()) + .ToList(); + + //Queue all the actions + foreach (var message in distinctActions) + { + NetworkAgent.Post(message); + } + + Log.Info("[LISTENER] End Processing"); + } + } + catch (Exception ex) + { + Log.ErrorFormat("[FAIL] ListObjects for{0} in ProcessRemoteFiles with {1}", accountInfo.UserName, ex); + return; + } + + Log.Info("[LISTENER] Finished"); + + } + } + + AccountsDifferencer _differencer = new AccountsDifferencer(); + + /// + /// Deletes local files that are not found in the list of cloud files + /// + /// + /// + /// + private void ProcessDeletedFiles(AccountInfo accountInfo, IEnumerable cloudFiles, DateTime pollTime) + { + if (accountInfo == null) + throw new ArgumentNullException("accountInfo"); + if (String.IsNullOrWhiteSpace(accountInfo.AccountPath)) + throw new ArgumentException("The AccountInfo.AccountPath is empty", "accountInfo"); + if (cloudFiles == null) + throw new ArgumentNullException("cloudFiles"); + Contract.EndContractBlock(); + + //On the first run + if (_firstPoll) + { + //Only consider files that are not being modified, ie they are in the Unchanged state + var deleteCandidates = FileState.Queryable.Where(state => + state.FilePath.StartsWith(accountInfo.AccountPath) + && state.FileStatus == FileStatus.Unchanged).ToList(); + + + //TODO: filesToDelete must take into account the Others container + var filesToDelete = (from deleteCandidate in deleteCandidates + let localFile = FileInfoExtensions.FromPath(deleteCandidate.FilePath) + let relativeFilePath = localFile.AsRelativeTo(accountInfo.AccountPath) + where + !cloudFiles.Any(r => r.RelativeUrlToFilePath(accountInfo.UserName) == relativeFilePath) + select localFile).ToList(); + + + + //Set the status of missing files to Conflict + foreach (var item in filesToDelete) + { + //Try to acquire a gate on the file, to take into account files that have been dequeued + //and are being processed + using (var gate = NetworkGate.Acquire(item.FullName, NetworkOperation.Deleting)) + { + if (gate.Failed) + continue; + StatusKeeper.SetFileState(item.FullName, FileStatus.Conflict, FileOverlayStatus.Deleted); + } + } + UpdateStatus(PithosStatus.HasConflicts); + StatusNotification.NotifyConflicts(filesToDelete, String.Format("{0} local files are missing from Pithos, possibly because they were deleted", filesToDelete.Count)); + StatusNotification.NotifyForFiles(filesToDelete, String.Format("{0} files were deleted", filesToDelete.Count), TraceLevel.Info); + } + else + { + var deletedFiles = new List(); + foreach (var objectInfo in cloudFiles) + { + var relativePath = objectInfo.RelativeUrlToFilePath(accountInfo.UserName); + var item = GetFileAgent(accountInfo).GetFileSystemInfo(relativePath); + if (item.Exists) + { + if ((item.Attributes & FileAttributes.ReadOnly) == FileAttributes.ReadOnly) + { + item.Attributes = item.Attributes & ~FileAttributes.ReadOnly; + + } + item.Delete(); + DateTime lastDate; + _lastSeen.TryRemove(item.FullName, out lastDate); + deletedFiles.Add(item); + } + StatusKeeper.SetFileState(item.FullName, FileStatus.Deleted, FileOverlayStatus.Deleted); + } + StatusNotification.NotifyForFiles(deletedFiles, String.Format("{0} files were deleted", deletedFiles.Count), TraceLevel.Info); + } + + } + + //Creates an appropriate action for each server file + private IEnumerable ChangesToActions(AccountInfo accountInfo, IEnumerable changes) + { + if (changes == null) + throw new ArgumentNullException(); + Contract.EndContractBlock(); + var fileAgent = GetFileAgent(accountInfo); + + //In order to avoid multiple iterations over the files, we iterate only once + //over the remote files + foreach (var objectInfo in changes) + { + var relativePath = objectInfo.RelativeUrlToFilePath(accountInfo.UserName); + //and remove any matching objects from the list, adding them to the commonObjects list + if (fileAgent.Exists(relativePath)) + { + //If a directory object already exists, we don't need to perform any other action + var localFile = fileAgent.GetFileSystemInfo(relativePath); + if (objectInfo.Content_Type == @"application/directory" && localFile is DirectoryInfo) + continue; + using (new SessionScope(FlushAction.Never)) + { + var state = StatusKeeper.GetStateByFilePath(localFile.FullName); + _lastSeen[localFile.FullName] = DateTime.Now; + //Common files should be checked on a per-case basis to detect differences, which is newer + + yield return new CloudAction(accountInfo, CloudActionType.MustSynch, + localFile, objectInfo, state, accountInfo.BlockSize, + accountInfo.BlockHash); + } + } + else + { + //Remote files should be downloaded + yield return new CloudDownloadAction(accountInfo, objectInfo); + } + } + } + + private IEnumerable CreatesToActions(AccountInfo accountInfo, IEnumerable creates) + { + if (creates == null) + throw new ArgumentNullException(); + Contract.EndContractBlock(); + var fileAgent = GetFileAgent(accountInfo); + + //In order to avoid multiple iterations over the files, we iterate only once + //over the remote files + foreach (var objectInfo in creates) + { + var relativePath = objectInfo.RelativeUrlToFilePath(accountInfo.UserName); + //and remove any matching objects from the list, adding them to the commonObjects list + if (fileAgent.Exists(relativePath)) + { + //If the object already exists, we probably have a conflict + //If a directory object already exists, we don't need to perform any other action + var localFile = fileAgent.GetFileSystemInfo(relativePath); + StatusKeeper.SetFileState(localFile.FullName, FileStatus.Conflict, FileOverlayStatus.Conflict); + } + else + { + //Remote files should be downloaded + yield return new CloudDownloadAction(accountInfo, objectInfo); + } + } + } + + private static FileAgent GetFileAgent(AccountInfo accountInfo) + { + return AgentLocator.Get(accountInfo.AccountPath); + } + + private void ProcessTrashedFiles(AccountInfo accountInfo, IEnumerable trashObjects) + { + var fileAgent = GetFileAgent(accountInfo); + foreach (var trashObject in trashObjects) + { + var barePath = trashObject.RelativeUrlToFilePath(accountInfo.UserName); + //HACK: Assume only the "pithos" container is used. Must find out what happens when + //deleting a file from a different container + var relativePath = Path.Combine("pithos", barePath); + fileAgent.Delete(relativePath); + } + } + + private void UpdateStatus(PithosStatus status) + { + StatusKeeper.SetPithosStatus(status); + StatusNotification.Notify(new Notification()); + } + + private static void CreateContainerFolders(AccountInfo accountInfo, IEnumerable containers) + { + var containerPaths = from container in containers + let containerPath = Path.Combine(accountInfo.AccountPath, container.Name) + where container.Name != FolderConstants.TrashContainer && !Directory.Exists(containerPath) + select containerPath; + + foreach (var path in containerPaths) + { + Directory.CreateDirectory(path); + } + } + + } +} diff --git a/trunk/Pithos.Core/Pithos.Core.csproj b/trunk/Pithos.Core/Pithos.Core.csproj index 570560f..f156832 100644 --- a/trunk/Pithos.Core/Pithos.Core.csproj +++ b/trunk/Pithos.Core/Pithos.Core.csproj @@ -388,6 +388,7 @@ + diff --git a/trunk/Pithos.Core/PithosMonitor.cs b/trunk/Pithos.Core/PithosMonitor.cs index a7bbfd5..c9fae62 100644 --- a/trunk/Pithos.Core/PithosMonitor.cs +++ b/trunk/Pithos.Core/PithosMonitor.cs @@ -121,7 +121,9 @@ namespace Pithos.Core } [Import] - public NetworkAgent NetworkAgent { get; set; } + public NetworkAgent NetworkAgent { get; set; } + [Import] + public PollAgent PollAgent { get; set; } public string UserName { get; set; } private string _apiKey; @@ -369,7 +371,9 @@ namespace Pithos.Core NetworkAgent.Start(); - NetworkAgent.PollRemoteFiles(); + PollAgent.StatusNotification = StatusNotification; + + PollAgent.PollRemoteFiles(); } //Make sure a hidden cache folder exists to store partial downloads -- 1.7.10.4