#region /* ----------------------------------------------------------------------- * * * Copyright 2011-2012 GRNET S.A. All rights reserved. * * Redistribution and use in source and binary forms, with or * without modification, are permitted provided that the following * conditions are met: * * 1. Redistributions of source code must retain the above * copyright notice, this list of conditions and the following * disclaimer. * * 2. Redistributions in binary form must reproduce the above * copyright notice, this list of conditions and the following * disclaimer in the documentation and/or other materials * provided with the distribution. * * * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE * POSSIBILITY OF SUCH DAMAGE. * * The views and conclusions contained in the software and * documentation are those of the authors and should not be * interpreted as representing official policies, either expressed * or implied, of GRNET S.A. * * ----------------------------------------------------------------------- */ #endregion using System.Collections.Concurrent; using System.ComponentModel.Composition; using System.Diagnostics; using System.Diagnostics.Contracts; using System.IO; using System.Reflection; using System.Threading; using System.Threading.Tasks; using Castle.ActiveRecord; using Pithos.Interfaces; using Pithos.Network; using log4net; namespace Pithos.Core.Agents { using System; using System.Collections.Generic; using System.Linq; /// /// PollAgent periodically polls the server to detect object changes. The agent retrieves a listing of all /// objects and compares it with a previously cached version to detect differences. /// New files are downloaded, missing files are deleted from the local file system and common files are compared /// to determine the appropriate action /// [Export] public class PollAgent { private static readonly ILog Log = LogManager.GetLogger(MethodBase.GetCurrentMethod().DeclaringType); [System.ComponentModel.Composition.Import] public IStatusKeeper StatusKeeper { get; set; } [System.ComponentModel.Composition.Import] public IPithosSettings Settings { get; set; } [System.ComponentModel.Composition.Import] public NetworkAgent NetworkAgent { get; set; } [System.ComponentModel.Composition.Import] public Selectives Selectives { get; set; } public IStatusNotification StatusNotification { get; set; } public bool Pause { get { return _pause; } set { _pause = value; if (!_pause) _unPauseEvent.Set(); else { _unPauseEvent.Reset(); } } } private bool _firstPoll = true; //The Sync Event signals a manual synchronisation private readonly AsyncManualResetEvent _syncEvent = new AsyncManualResetEvent(); private readonly AsyncManualResetEvent _unPauseEvent = new AsyncManualResetEvent(true); private readonly ConcurrentDictionary _lastSeen = new ConcurrentDictionary(); private readonly ConcurrentDictionary _accounts = new ConcurrentDictionary(); /// /// Start a manual synchronization /// public void SynchNow() { _syncEvent.Set(); } /// /// Remote files are polled periodically. Any changes are processed /// /// /// public async Task PollRemoteFiles(DateTime? since = null) { if (Log.IsDebugEnabled) Log.DebugFormat("Polling changes after [{0}]",since); Debug.Assert(Thread.CurrentThread.IsBackground, "Polling Ended up in the main thread!"); GC.Collect(); using (ThreadContext.Stacks["Retrieve Remote"].Push("All accounts")) { //If this poll fails, we will retry with the same since value var nextSince = since; try { await _unPauseEvent.WaitAsync(); UpdateStatus(PithosStatus.PollSyncing); var tasks = from accountInfo in _accounts.Values select ProcessAccountFiles(accountInfo, since); var nextTimes=await TaskEx.WhenAll(tasks.ToList()); _firstPoll = false; //Reschedule the poll with the current timestamp as a "since" value if (nextTimes.Length>0) nextSince = nextTimes.Min(); if (Log.IsDebugEnabled) Log.DebugFormat("Next Poll at [{0}]",nextSince); } catch (Exception ex) { Log.ErrorFormat("Error while processing accounts\r\n{0}", ex); //In case of failure retry with the same "since" value } UpdateStatus(PithosStatus.PollComplete); //The multiple try blocks are required because we can't have an await call //inside a finally block //TODO: Find a more elegant solution for reschedulling in the event of an exception try { //Wait for the polling interval to pass or the Sync event to be signalled nextSince = await WaitForScheduledOrManualPoll(nextSince); } finally { //Ensure polling is scheduled even in case of error TaskEx.Run(() => PollRemoteFiles(nextSince)); } } } /// /// Wait for the polling period to expire or a manual sync request /// /// /// private async Task WaitForScheduledOrManualPoll(DateTime? since) { var sync = _syncEvent.WaitAsync(); var wait = TaskEx.Delay(TimeSpan.FromSeconds(Settings.PollingInterval), NetworkAgent.CancellationToken); var signaledTask = await TaskEx.WhenAny(sync, wait); //Pausing takes precedence over manual sync or awaiting _unPauseEvent.Wait(); //Wait for network processing to finish before polling var pauseTask=NetworkAgent.ProceedEvent.WaitAsync(); await TaskEx.WhenAll(signaledTask, pauseTask); //If polling is signalled by SynchNow, ignore the since tag if (sync.IsCompleted) { //TODO: Must convert to AutoReset _syncEvent.Reset(); return null; } return since; } public async Task ProcessAccountFiles(AccountInfo accountInfo, DateTime? since = null) { if (accountInfo == null) throw new ArgumentNullException("accountInfo"); if (String.IsNullOrWhiteSpace(accountInfo.AccountPath)) throw new ArgumentException("The AccountInfo.AccountPath is empty", "accountInfo"); Contract.EndContractBlock(); using (ThreadContext.Stacks["Retrieve Remote"].Push(accountInfo.UserName)) { await NetworkAgent.GetDeleteAwaiter(); Log.Info("Scheduled"); var client = new CloudFilesClient(accountInfo); //We don't need to check the trash container var containers = client.ListContainers(accountInfo.UserName) .Where(c=>c.Name!="trash") .ToList(); CreateContainerFolders(accountInfo, containers); //The nextSince time fallback time is the same as the current. //If polling succeeds, the next Since time will be the smallest of the maximum modification times //of the shared and account objects var nextSince = since; try { //Wait for any deletions to finish await NetworkAgent.GetDeleteAwaiter(); //Get the poll time now. We may miss some deletions but it's better to keep a file that was deleted //than delete a file that was created while we were executing the poll //Get the list of server objects changed since the last check //The name of the container is passed as state in order to create a dictionary of tasks in a subsequent step var listObjects = (from container in containers select Task>.Factory.StartNew(_ => client.ListObjects(accountInfo.UserName, container.Name, since), container.Name)).ToList(); var listShared = Task>.Factory.StartNew(_ => client.ListSharedObjects(since), "shared"); listObjects.Add(listShared); var listTasks = await Task.Factory.WhenAll(listObjects.ToArray()); using (ThreadContext.Stacks["SCHEDULE"].Push("Process Results")) { var dict = listTasks.ToDictionary(t => t.AsyncState); //Get all non-trash objects. Remember, the container name is stored in AsyncState var remoteObjects = (from objectList in listTasks where (string)objectList.AsyncState != "trash" from obj in objectList.Result select obj).ToList(); //Get the latest remote object modification date, only if it is after //the original since date nextSince = GetLatestDateAfter(nextSince, remoteObjects); var sharedObjects = dict["shared"].Result; nextSince = GetLatestDateBefore(nextSince, sharedObjects); //DON'T process trashed files //If some files are deleted and added again to a folder, they will be deleted //even though they are new. //We would have to check file dates and hashes to ensure that a trashed file //can be deleted safely from the local hard drive. /* //Items with the same name, hash may be both in the container and the trash //Don't delete items that exist in the container var realTrash = from trash in trashObjects where !remoteObjects.Any( info => info.Name == trash.Name && info.Hash == trash.Hash) select trash; ProcessTrashedFiles(accountInfo, realTrash); */ var cleanRemotes = (from info in remoteObjects.Union(sharedObjects) let name = info.Name??"" where !name.EndsWith(".ignore", StringComparison.InvariantCultureIgnoreCase) && !name.StartsWith(FolderConstants.CacheFolder + "/", StringComparison.InvariantCultureIgnoreCase) select info).ToList(); if (_firstPoll) StatusKeeper.CleanupOrphanStates(); StatusKeeper.CleanupStaleStates(accountInfo, cleanRemotes); var differencer = _differencer.PostSnapshot(accountInfo, cleanRemotes); var filterUris = Selectives.SelectiveUris[accountInfo.AccountKey]; //On the first run if (_firstPoll) { MarkSuspectedDeletes(accountInfo, cleanRemotes); } ProcessDeletedFiles(accountInfo, differencer.Deleted.FilterDirectlyBelow(filterUris)); // @@@ NEED To add previous state here as well, To compare with previous hash //Create a list of actions from the remote files var allActions = MovesToActions(accountInfo,differencer.Moved.FilterDirectlyBelow(filterUris)) .Union( ChangesToActions(accountInfo, differencer.Changed.FilterDirectlyBelow(filterUris))) .Union( CreatesToActions(accountInfo, differencer.Created.FilterDirectlyBelow(filterUris))); //And remove those that are already being processed by the agent var distinctActions = allActions .Except(NetworkAgent.GetEnumerable(), new LocalFileComparer()) .ToList(); await _unPauseEvent.WaitAsync(); //Queue all the actions foreach (var message in distinctActions) { NetworkAgent.Post(message); } Log.Info("[LISTENER] End Processing"); } } catch (Exception ex) { Log.ErrorFormat("[FAIL] ListObjects for{0} in ProcessRemoteFiles with {1}", accountInfo.UserName, ex); return nextSince; } Log.Info("[LISTENER] Finished"); return nextSince; } } /// /// Returns the latest LastModified date from the list of objects, but only if it is before /// than the threshold value /// /// /// /// private static DateTime? GetLatestDateBefore(DateTime? threshold, IList cloudObjects) { DateTime? maxDate = null; if (cloudObjects!=null && cloudObjects.Count > 0) maxDate = cloudObjects.Max(obj => obj.Last_Modified); if (maxDate == null || maxDate == DateTime.MinValue) return threshold; if (threshold == null || threshold == DateTime.MinValue || threshold > maxDate) return maxDate; return threshold; } /// /// Returns the latest LastModified date from the list of objects, but only if it is after /// the threshold value /// /// /// /// private static DateTime? GetLatestDateAfter(DateTime? threshold, IList cloudObjects) { DateTime? maxDate = null; if (cloudObjects!=null && cloudObjects.Count > 0) maxDate = cloudObjects.Max(obj => obj.Last_Modified); if (maxDate == null || maxDate == DateTime.MinValue) return threshold; if (threshold == null || threshold == DateTime.MinValue || threshold < maxDate) return maxDate; return threshold; } readonly AccountsDifferencer _differencer = new AccountsDifferencer(); private Dictionary> _selectiveUris = new Dictionary>(); private bool _pause; /// /// Deletes local files that are not found in the list of cloud files /// /// /// private void ProcessDeletedFiles(AccountInfo accountInfo, IEnumerable cloudFiles) { if (accountInfo == null) throw new ArgumentNullException("accountInfo"); if (String.IsNullOrWhiteSpace(accountInfo.AccountPath)) throw new ArgumentException("The AccountInfo.AccountPath is empty", "accountInfo"); if (cloudFiles == null) throw new ArgumentNullException("cloudFiles"); Contract.EndContractBlock(); var deletedFiles = new List(); foreach (var objectInfo in cloudFiles) { if (Log.IsDebugEnabled) Log.DebugFormat("Handle deleted [{0}]", objectInfo.Uri); var relativePath = objectInfo.RelativeUrlToFilePath(accountInfo.UserName); var item = FileAgent.GetFileAgent(accountInfo).GetFileSystemInfo(relativePath); if (Log.IsDebugEnabled) Log.DebugFormat("Will delete [{0}] for [{1}]", item.FullName, objectInfo.Uri); if (item.Exists) { if ((item.Attributes & FileAttributes.ReadOnly) == FileAttributes.ReadOnly) { item.Attributes = item.Attributes & ~FileAttributes.ReadOnly; } Log.DebugFormat("Deleting {0}", item.FullName); var directory = item as DirectoryInfo; if (directory != null) directory.Delete(true); else item.Delete(); Log.DebugFormat("Deleted [{0}] for [{1}]", item.FullName, objectInfo.Uri); DateTime lastDate; _lastSeen.TryRemove(item.FullName, out lastDate); deletedFiles.Add(item); } StatusKeeper.SetFileState(item.FullName, FileStatus.Deleted, FileOverlayStatus.Deleted, "File Deleted"); } Log.InfoFormat("[{0}] files were deleted", deletedFiles.Count); StatusNotification.NotifyForFiles(deletedFiles, String.Format("{0} files were deleted", deletedFiles.Count), TraceLevel.Info); } private void MarkSuspectedDeletes(AccountInfo accountInfo, IEnumerable cloudFiles) { //Only consider files that are not being modified, ie they are in the Unchanged state var deleteCandidates = FileState.Queryable.Where(state => state.FilePath.StartsWith(accountInfo.AccountPath) && state.FileStatus == FileStatus.Unchanged).ToList(); //TODO: filesToDelete must take into account the Others container var filesToDelete = (from deleteCandidate in deleteCandidates let localFile = FileInfoExtensions.FromPath(deleteCandidate.FilePath) let relativeFilePath = localFile.AsRelativeTo(accountInfo.AccountPath) where !cloudFiles.Any(r => r.RelativeUrlToFilePath(accountInfo.UserName) == relativeFilePath) select localFile).ToList(); //Set the status of missing files to Conflict foreach (var item in filesToDelete) { //Try to acquire a gate on the file, to take into account files that have been dequeued //and are being processed using (var gate = NetworkGate.Acquire(item.FullName, NetworkOperation.Deleting)) { if (gate.Failed) continue; StatusKeeper.SetFileState(item.FullName, FileStatus.Conflict, FileOverlayStatus.Deleted, "Local file missing from server"); } } UpdateStatus(PithosStatus.HasConflicts); StatusNotification.NotifyConflicts(filesToDelete, String.Format( "{0} local files are missing from Pithos, possibly because they were deleted", filesToDelete.Count)); StatusNotification.NotifyForFiles(filesToDelete, String.Format("{0} files were deleted", filesToDelete.Count), TraceLevel.Info); } /// /// Creates a Sync action for each changed server file /// /// /// /// private IEnumerable ChangesToActions(AccountInfo accountInfo, IEnumerable changes) { if (changes == null) throw new ArgumentNullException(); Contract.EndContractBlock(); var fileAgent = FileAgent.GetFileAgent(accountInfo); //In order to avoid multiple iterations over the files, we iterate only once //over the remote files foreach (var objectInfo in changes) { var relativePath = objectInfo.RelativeUrlToFilePath(accountInfo.UserName); //If a directory object already exists, we may need to sync it if (fileAgent.Exists(relativePath)) { var localFile = fileAgent.GetFileSystemInfo(relativePath); //We don't need to sync directories if (objectInfo.IsDirectory && localFile is DirectoryInfo) continue; using (new SessionScope(FlushAction.Never)) { var state = StatusKeeper.GetStateByFilePath(localFile.FullName); _lastSeen[localFile.FullName] = DateTime.Now; //Common files should be checked on a per-case basis to detect differences, which is newer yield return new CloudAction(accountInfo, CloudActionType.MustSynch, localFile, objectInfo, state, accountInfo.BlockSize, accountInfo.BlockHash,"Poll Changes"); } } else { //Remote files should be downloaded yield return new CloudDownloadAction(accountInfo, objectInfo,"Poll Changes"); } } } /// /// Creates a Local Move action for each moved server file /// /// /// /// private IEnumerable MovesToActions(AccountInfo accountInfo, IEnumerable moves) { if (moves == null) throw new ArgumentNullException(); Contract.EndContractBlock(); var fileAgent = FileAgent.GetFileAgent(accountInfo); //In order to avoid multiple iterations over the files, we iterate only once //over the remote files foreach (var objectInfo in moves) { var previousRelativepath = objectInfo.Previous.RelativeUrlToFilePath(accountInfo.UserName); //If the previous file already exists, we can execute a Move operation if (fileAgent.Exists(previousRelativepath)) { var previousFile = fileAgent.GetFileSystemInfo(previousRelativepath); using (new SessionScope(FlushAction.Never)) { var state = StatusKeeper.GetStateByFilePath(previousFile.FullName); _lastSeen[previousFile.FullName] = DateTime.Now; //For each moved object we need to move both the local file and update yield return new CloudAction(accountInfo, CloudActionType.RenameLocal, previousFile, objectInfo, state, accountInfo.BlockSize, accountInfo.BlockHash,"Poll Moves"); //For modified files, we need to download the changes as well if (objectInfo.Hash!=objectInfo.PreviousHash) yield return new CloudDownloadAction(accountInfo,objectInfo, "Poll Moves"); } } //If the previous file does not exist, we need to download it in the new location else { //Remote files should be downloaded yield return new CloudDownloadAction(accountInfo, objectInfo, "Poll Moves"); } } } /// /// Creates a download action for each new server file /// /// /// /// private IEnumerable CreatesToActions(AccountInfo accountInfo, IEnumerable creates) { if (creates == null) throw new ArgumentNullException(); Contract.EndContractBlock(); var fileAgent = FileAgent.GetFileAgent(accountInfo); //In order to avoid multiple iterations over the files, we iterate only once //over the remote files foreach (var objectInfo in creates) { if (Log.IsDebugEnabled) Log.DebugFormat("[NEW INFO] {0}",objectInfo.Uri); var relativePath = objectInfo.RelativeUrlToFilePath(accountInfo.UserName); //If the object already exists, we should check before uploading or downloading if (fileAgent.Exists(relativePath)) { var localFile= fileAgent.GetFileSystemInfo(relativePath); var state = StatusKeeper.GetStateByFilePath(localFile.WithProperCapitalization().FullName); yield return new CloudAction(accountInfo, CloudActionType.MustSynch, localFile, objectInfo, state, accountInfo.BlockSize, accountInfo.BlockHash,"Poll Creates"); } else { //Remote files should be downloaded yield return new CloudDownloadAction(accountInfo, objectInfo,"Poll Creates"); } } } /// /// Notify the UI to update the visual status /// /// private void UpdateStatus(PithosStatus status) { try { StatusNotification.SetPithosStatus(status); //StatusNotification.Notify(new Notification()); } catch (Exception exc) { //Failure is not critical, just log it Log.Warn("Error while updating status", exc); } } private static void CreateContainerFolders(AccountInfo accountInfo, IEnumerable containers) { var containerPaths = from container in containers let containerPath = Path.Combine(accountInfo.AccountPath, container.Name) where container.Name != FolderConstants.TrashContainer && !Directory.Exists(containerPath) select containerPath; foreach (var path in containerPaths) { Directory.CreateDirectory(path); } } public void AddAccount(AccountInfo accountInfo) { //Avoid adding a duplicate accountInfo _accounts.TryAdd(accountInfo.AccountKey, accountInfo); } public void RemoveAccount(AccountInfo accountInfo) { AccountInfo account; _accounts.TryRemove(accountInfo.AccountKey, out account); SnapshotDifferencer differencer; _differencer.Differencers.TryRemove(accountInfo.AccountKey, out differencer); } public void SetSelectivePaths(AccountInfo accountInfo,Uri[] added, Uri[] removed) { AbortRemovedPaths(accountInfo,removed); DownloadNewPaths(accountInfo,added); } private void DownloadNewPaths(AccountInfo accountInfo, Uri[] added) { var client = new CloudFilesClient(accountInfo); foreach (var folderUri in added) { try { string account; string container; var segmentsCount = folderUri.Segments.Length; //Is this an account URL? if (segmentsCount < 3) continue; //Is this a container or folder URL? if (segmentsCount == 3) { account = folderUri.Segments[1].TrimEnd('/'); container = folderUri.Segments[2].TrimEnd('/'); } else { account = folderUri.Segments[2].TrimEnd('/'); container = folderUri.Segments[3].TrimEnd('/'); } IList items; if (segmentsCount > 3) { //List folder var folder = String.Join("", folderUri.Segments.Splice(4)); items = client.ListObjects(account, container, folder); } else { //List container items = client.ListObjects(account, container); } var actions = CreatesToActions(accountInfo, items); foreach (var action in actions) { NetworkAgent.Post(action); } } catch (Exception exc) { Log.WarnFormat("Listing of new selective path [{0}] failed with \r\n{1}", folderUri, exc); } } //Need to get a listing of each of the URLs, then post them to the NetworkAgent //CreatesToActions(accountInfo,) /* NetworkAgent.Post();*/ } private void AbortRemovedPaths(AccountInfo accountInfo, Uri[] removed) { /*this.NetworkAgent.*/ } } }