#region /* ----------------------------------------------------------------------- * * * Copyright 2011-2012 GRNET S.A. All rights reserved. * * Redistribution and use in source and binary forms, with or * without modification, are permitted provided that the following * conditions are met: * * 1. Redistributions of source code must retain the above * copyright notice, this list of conditions and the following * disclaimer. * * 2. Redistributions in binary form must reproduce the above * copyright notice, this list of conditions and the following * disclaimer in the documentation and/or other materials * provided with the distribution. * * * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE * POSSIBILITY OF SUCH DAMAGE. * * The views and conclusions contained in the software and * documentation are those of the authors and should not be * interpreted as representing official policies, either expressed * or implied, of GRNET S.A. * * ----------------------------------------------------------------------- */ #endregion using System.Collections.Concurrent; using System.ComponentModel.Composition; using System.Diagnostics; using System.Diagnostics.Contracts; using System.IO; using System.Threading; using System.Threading.Tasks; using System.Threading.Tasks.Dataflow; using Castle.ActiveRecord; using Pithos.Interfaces; using Pithos.Network; using log4net; namespace Pithos.Core.Agents { using System; using System.Collections.Generic; using System.Linq; using System.Text; /// /// PollAgent periodically polls the server to detect object changes. The agent retrieves a listing of all /// objects and compares it with a previously cached version to detect differences. /// New files are downloaded, missing files are deleted from the local file system and common files are compared /// to determine the appropriate action /// [Export] public class PollAgent { private static readonly ILog Log = LogManager.GetLogger("PollAgent"); [System.ComponentModel.Composition.Import] public IStatusKeeper StatusKeeper { get; set; } [System.ComponentModel.Composition.Import] public IPithosSettings Settings { get; set; } [System.ComponentModel.Composition.Import] public NetworkAgent NetworkAgent { get; set; } public IStatusNotification StatusNotification { get; set; } private bool _firstPoll = true; //The Sync Event signals a manual synchronisation private readonly AsyncManualResetEvent _syncEvent = new AsyncManualResetEvent(); private ConcurrentDictionary _lastSeen = new ConcurrentDictionary(); private readonly ConcurrentBag _accounts = new ConcurrentBag(); /// /// Start a manual synchronization /// public void SynchNow() { _syncEvent.Set(); } /// /// Remote files are polled periodically. Any changes are processed /// /// /// public async Task PollRemoteFiles(DateTime? since = null) { Debug.Assert(Thread.CurrentThread.IsBackground, "Polling Ended up in the main thread!"); UpdateStatus(PithosStatus.Syncing); StatusNotification.Notify(new PollNotification()); using (log4net.ThreadContext.Stacks["Retrieve Remote"].Push("All accounts")) { //If this poll fails, we will retry with the same since value var nextSince = since; try { //Next time we will check for all changes since the current check minus 1 second //This is done to ensure there are no discrepancies due to clock differences var current = DateTime.Now.AddSeconds(-1); var tasks = from accountInfo in _accounts select ProcessAccountFiles(accountInfo, since); await TaskEx.WhenAll(tasks.ToList()); _firstPoll = false; //Reschedule the poll with the current timestamp as a "since" value nextSince = current; } catch (Exception ex) { Log.ErrorFormat("Error while processing accounts\r\n{0}", ex); //In case of failure retry with the same "since" value } UpdateStatus(PithosStatus.InSynch); //The multiple try blocks are required because we can't have an await call //inside a finally block //TODO: Find a more elegant solution for reschedulling in the event of an exception try { //Wait for the polling interval to pass or the Sync event to be signalled nextSince = await WaitForScheduledOrManualPoll(nextSince); } finally { //Ensure polling is scheduled even in case of error TaskEx.Run(() => PollRemoteFiles(nextSince)); } } } /// /// Wait for the polling period to expire or a manual sync request /// /// /// private async Task WaitForScheduledOrManualPoll(DateTime? since) { var sync = _syncEvent.WaitAsync(); var wait = TaskEx.Delay(TimeSpan.FromSeconds(Settings.PollingInterval), NetworkAgent.CancellationToken); var signaledTask = await TaskEx.WhenAny(sync, wait); //Wait for network processing to finish before polling var pauseTask=NetworkAgent.ProceedEvent.WaitAsync(); await TaskEx.WhenAll(signaledTask, pauseTask); //If polling is signalled by SynchNow, ignore the since tag if (sync.IsCompleted) { //TODO: Must convert to AutoReset _syncEvent.Reset(); return null; } return since; } public async Task ProcessAccountFiles(AccountInfo accountInfo, DateTime? since = null) { if (accountInfo == null) throw new ArgumentNullException("accountInfo"); if (String.IsNullOrWhiteSpace(accountInfo.AccountPath)) throw new ArgumentException("The AccountInfo.AccountPath is empty", "accountInfo"); Contract.EndContractBlock(); using (log4net.ThreadContext.Stacks["Retrieve Remote"].Push(accountInfo.UserName)) { await NetworkAgent.GetDeleteAwaiter(); Log.Info("Scheduled"); var client = new CloudFilesClient(accountInfo); //We don't need to check the trash container var containers = client.ListContainers(accountInfo.UserName).Where(c=>c.Name!="trash"); CreateContainerFolders(accountInfo, containers); try { //Wait for any deletions to finish await NetworkAgent.GetDeleteAwaiter(); //Get the poll time now. We may miss some deletions but it's better to keep a file that was deleted //than delete a file that was created while we were executing the poll var pollTime = DateTime.Now; //Get the list of server objects changed since the last check //The name of the container is passed as state in order to create a dictionary of tasks in a subsequent step var listObjects = (from container in containers select Task>.Factory.StartNew(_ => client.ListObjects(accountInfo.UserName, container.Name, since), container.Name)).ToList(); //BUG: Can't detect difference between no changes or no objects //ListObjects returns nothing if there are no changes since the last check time (since value) //TODO: Must detect the difference between no server objects and no change //NOTE: One option is to "mark" all result lists with their container name, or //rather the url of the container //Another option var listShared = Task>.Factory.StartNew(_ => client.ListSharedObjects(since), "shared"); listObjects.Add(listShared); var listTasks = await Task.Factory.WhenAll(listObjects.ToArray()); using (log4net.ThreadContext.Stacks["SCHEDULE"].Push("Process Results")) { var dict = listTasks.ToDictionary(t => t.AsyncState); //Get all non-trash objects. Remember, the container name is stored in AsyncState var remoteObjects = from objectList in listTasks where (string)objectList.AsyncState != "trash" from obj in objectList.Result select obj; var sharedObjects = dict["shared"].Result; //DON'T process trashed files //If some files are deleted and added again to a folder, they will be deleted //even though they are new. //We would have to check file dates and hashes to ensure that a trashed file //can be deleted safely from the local hard drive. /* //Items with the same name, hash may be both in the container and the trash //Don't delete items that exist in the container var realTrash = from trash in trashObjects where !remoteObjects.Any( info => info.Name == trash.Name && info.Hash == trash.Hash) select trash; ProcessTrashedFiles(accountInfo, realTrash); */ var cleanRemotes = (from info in remoteObjects.Union(sharedObjects) let name = info.Name where !name.EndsWith(".ignore", StringComparison.InvariantCultureIgnoreCase) && !name.StartsWith(FolderConstants.CacheFolder + "/", StringComparison.InvariantCultureIgnoreCase) select info).ToList(); var differencer = _differencer.PostSnapshot(accountInfo, cleanRemotes); ProcessDeletedFiles(accountInfo, differencer.Deleted.FilterDirectlyBelow(SelectiveUris), pollTime); // @@@ NEED To add previous state here as well, To compare with previous hash //Create a list of actions from the remote files var allActions = ChangesToActions(accountInfo, differencer.Changed.FilterDirectlyBelow(SelectiveUris)) .Union( CreatesToActions(accountInfo, differencer.Created.FilterDirectlyBelow(SelectiveUris))); //And remove those that are already being processed by the agent var distinctActions = allActions .Except(NetworkAgent.GetEnumerable(), new PithosMonitor.LocalFileComparer()) .ToList(); //Queue all the actions foreach (var message in distinctActions) { NetworkAgent.Post(message); } Log.Info("[LISTENER] End Processing"); } } catch (Exception ex) { Log.ErrorFormat("[FAIL] ListObjects for{0} in ProcessRemoteFiles with {1}", accountInfo.UserName, ex); return; } Log.Info("[LISTENER] Finished"); } } AccountsDifferencer _differencer = new AccountsDifferencer(); private List _selectiveUris=new List(); /// /// Deletes local files that are not found in the list of cloud files /// /// /// /// private void ProcessDeletedFiles(AccountInfo accountInfo, IEnumerable cloudFiles, DateTime pollTime) { if (accountInfo == null) throw new ArgumentNullException("accountInfo"); if (String.IsNullOrWhiteSpace(accountInfo.AccountPath)) throw new ArgumentException("The AccountInfo.AccountPath is empty", "accountInfo"); if (cloudFiles == null) throw new ArgumentNullException("cloudFiles"); Contract.EndContractBlock(); //On the first run if (_firstPoll) { //Only consider files that are not being modified, ie they are in the Unchanged state var deleteCandidates = FileState.Queryable.Where(state => state.FilePath.StartsWith(accountInfo.AccountPath) && state.FileStatus == FileStatus.Unchanged).ToList(); //TODO: filesToDelete must take into account the Others container var filesToDelete = (from deleteCandidate in deleteCandidates let localFile = FileInfoExtensions.FromPath(deleteCandidate.FilePath) let relativeFilePath = localFile.AsRelativeTo(accountInfo.AccountPath) where !cloudFiles.Any(r => r.RelativeUrlToFilePath(accountInfo.UserName) == relativeFilePath) select localFile).ToList(); //Set the status of missing files to Conflict foreach (var item in filesToDelete) { //Try to acquire a gate on the file, to take into account files that have been dequeued //and are being processed using (var gate = NetworkGate.Acquire(item.FullName, NetworkOperation.Deleting)) { if (gate.Failed) continue; StatusKeeper.SetFileState(item.FullName, FileStatus.Conflict, FileOverlayStatus.Deleted); } } UpdateStatus(PithosStatus.HasConflicts); StatusNotification.NotifyConflicts(filesToDelete, String.Format("{0} local files are missing from Pithos, possibly because they were deleted", filesToDelete.Count)); StatusNotification.NotifyForFiles(filesToDelete, String.Format("{0} files were deleted", filesToDelete.Count), TraceLevel.Info); } else { var deletedFiles = new List(); foreach (var objectInfo in cloudFiles) { var relativePath = objectInfo.RelativeUrlToFilePath(accountInfo.UserName); var item = FileAgent.GetFileAgent(accountInfo).GetFileSystemInfo(relativePath); if (item.Exists) { if ((item.Attributes & FileAttributes.ReadOnly) == FileAttributes.ReadOnly) { item.Attributes = item.Attributes & ~FileAttributes.ReadOnly; } item.Delete(); DateTime lastDate; _lastSeen.TryRemove(item.FullName, out lastDate); deletedFiles.Add(item); } StatusKeeper.SetFileState(item.FullName, FileStatus.Deleted, FileOverlayStatus.Deleted); } StatusNotification.NotifyForFiles(deletedFiles, String.Format("{0} files were deleted", deletedFiles.Count), TraceLevel.Info); } } //Creates an appropriate action for each server file private IEnumerable ChangesToActions(AccountInfo accountInfo, IEnumerable changes) { if (changes == null) throw new ArgumentNullException(); Contract.EndContractBlock(); var fileAgent = FileAgent.GetFileAgent(accountInfo); //In order to avoid multiple iterations over the files, we iterate only once //over the remote files foreach (var objectInfo in changes) { var relativePath = objectInfo.RelativeUrlToFilePath(accountInfo.UserName); //and remove any matching objects from the list, adding them to the commonObjects list if (fileAgent.Exists(relativePath)) { //If a directory object already exists, we don't need to perform any other action var localFile = fileAgent.GetFileSystemInfo(relativePath); if (objectInfo.Content_Type == @"application/directory" && localFile is DirectoryInfo) continue; using (new SessionScope(FlushAction.Never)) { var state = StatusKeeper.GetStateByFilePath(localFile.FullName); _lastSeen[localFile.FullName] = DateTime.Now; //Common files should be checked on a per-case basis to detect differences, which is newer yield return new CloudAction(accountInfo, CloudActionType.MustSynch, localFile, objectInfo, state, accountInfo.BlockSize, accountInfo.BlockHash); } } else { //Remote files should be downloaded yield return new CloudDownloadAction(accountInfo, objectInfo); } } } private IEnumerable CreatesToActions(AccountInfo accountInfo, IEnumerable creates) { if (creates == null) throw new ArgumentNullException(); Contract.EndContractBlock(); var fileAgent = FileAgent.GetFileAgent(accountInfo); //In order to avoid multiple iterations over the files, we iterate only once //over the remote files foreach (var objectInfo in creates) { var relativePath = objectInfo.RelativeUrlToFilePath(accountInfo.UserName); //and remove any matching objects from the list, adding them to the commonObjects list if (fileAgent.Exists(relativePath)) { //If the object already exists, we probably have a conflict //If a directory object already exists, we don't need to perform any other action var localFile = fileAgent.GetFileSystemInfo(relativePath); StatusKeeper.SetFileState(localFile.FullName, FileStatus.Conflict, FileOverlayStatus.Conflict); } else { //Remote files should be downloaded yield return new CloudDownloadAction(accountInfo, objectInfo); } } } private void ProcessTrashedFiles(AccountInfo accountInfo, IEnumerable trashObjects) { var fileAgent = FileAgent.GetFileAgent(accountInfo); foreach (var trashObject in trashObjects) { var barePath = trashObject.RelativeUrlToFilePath(accountInfo.UserName); //HACK: Assume only the "pithos" container is used. Must find out what happens when //deleting a file from a different container var relativePath = Path.Combine("pithos", barePath); fileAgent.Delete(relativePath); } } /// /// Notify the UI to update the visual status /// /// private void UpdateStatus(PithosStatus status) { try { StatusKeeper.SetPithosStatus(status); StatusNotification.Notify(new Notification()); } catch (Exception exc) { //Failure is not critical, just log it Log.Warn("Error while updating status", exc); } } private static void CreateContainerFolders(AccountInfo accountInfo, IEnumerable containers) { var containerPaths = from container in containers let containerPath = Path.Combine(accountInfo.AccountPath, container.Name) where container.Name != FolderConstants.TrashContainer && !Directory.Exists(containerPath) select containerPath; foreach (var path in containerPaths) { Directory.CreateDirectory(path); } } public void SetSyncUris(Uri[] uris) { SelectiveUris=uris.ToList(); } protected List SelectiveUris { get { return _selectiveUris;} set { _selectiveUris = value; } } public void AddAccount(AccountInfo accountInfo) { if (!_accounts.Contains(accountInfo)) _accounts.Add(accountInfo); } } }