#region /* ----------------------------------------------------------------------- * * * Copyright 2011-2012 GRNET S.A. All rights reserved. * * Redistribution and use in source and binary forms, with or * without modification, are permitted provided that the following * conditions are met: * * 1. Redistributions of source code must retain the above * copyright notice, this list of conditions and the following * disclaimer. * * 2. Redistributions in binary form must reproduce the above * copyright notice, this list of conditions and the following * disclaimer in the documentation and/or other materials * provided with the distribution. * * * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE * POSSIBILITY OF SUCH DAMAGE. * * The views and conclusions contained in the software and * documentation are those of the authors and should not be * interpreted as representing official policies, either expressed * or implied, of GRNET S.A. * * ----------------------------------------------------------------------- */ #endregion using System.Collections.Concurrent; using System.ComponentModel.Composition; using System.Diagnostics; using System.Diagnostics.Contracts; using System.IO; using System.Linq.Expressions; using System.Reflection; using System.Security.Cryptography; using System.Threading; using System.Threading.Tasks; using Castle.ActiveRecord; using Pithos.Interfaces; using Pithos.Network; using log4net; namespace Pithos.Core.Agents { using System; using System.Collections.Generic; using System.Linq; [DebuggerDisplay("{FilePath} C:{C} L:{L} S:{S}")] public class StateTuple { public string FilePath { get; private set; } public string MD5 { get; set; } public string L { get { return FileState==null?null:FileState.Checksum; } } private string _c; public string C { get { return _c; } set { _c = String.IsNullOrWhiteSpace(value) ? null : value; } } public string S { get { return ObjectInfo == null ? null : ObjectInfo.X_Object_Hash; } } private FileSystemInfo _fileInfo; private TreeHash _merkle; public FileSystemInfo FileInfo { get { return _fileInfo; } set { _fileInfo = value; FilePath = value.FullName; } } public FileState FileState { get; set; } public ObjectInfo ObjectInfo{ get; set; } public TreeHash Merkle { get { return _merkle; } set { _merkle = value; C = _merkle.TopHash.ToHashString(); } } public StateTuple() { } public StateTuple(FileSystemInfo info) { FileInfo = info; } } /// /// PollAgent periodically polls the server to detect object changes. The agent retrieves a listing of all /// objects and compares it with a previously cached version to detect differences. /// New files are downloaded, missing files are deleted from the local file system and common files are compared /// to determine the appropriate action /// [Export] public class PollAgent { private static readonly ILog Log = LogManager.GetLogger(MethodBase.GetCurrentMethod().DeclaringType); [System.ComponentModel.Composition.Import] public IStatusKeeper StatusKeeper { get; set; } [System.ComponentModel.Composition.Import] public IPithosSettings Settings { get; set; } [System.ComponentModel.Composition.Import] public NetworkAgent NetworkAgent { get; set; } [System.ComponentModel.Composition.Import] public Selectives Selectives { get; set; } public IStatusNotification StatusNotification { get; set; } private CancellationTokenSource _currentOperationCancellation = new CancellationTokenSource(); public void CancelCurrentOperation() { //What does it mean to cancel the current upload/download? //Obviously, the current operation will be cancelled by throwing //a cancellation exception. // //The default behavior is to retry any operations that throw. //Obviously this is not what we want in this situation. //The cancelled operation should NOT bea retried. // //This can be done by catching the cancellation exception //and avoiding the retry. // //Have to reset the cancellation source - it is not possible to reset the source //Have to prevent a case where an operation requests a token from the old source var oldSource = Interlocked.Exchange(ref _currentOperationCancellation, new CancellationTokenSource()); oldSource.Cancel(); } public bool Pause { get { return _pause; } set { _pause = value; if (!_pause) _unPauseEvent.Set(); else { _unPauseEvent.Reset(); } } } private bool _firstPoll = true; //The Sync Event signals a manual synchronisation private readonly AsyncManualResetEvent _syncEvent = new AsyncManualResetEvent(); private readonly AsyncManualResetEvent _unPauseEvent = new AsyncManualResetEvent(true); private readonly ConcurrentDictionary _lastSeen = new ConcurrentDictionary(); private readonly ConcurrentDictionary _accounts = new ConcurrentDictionary(); /// /// Start a manual synchronization /// public void SynchNow(IEnumerable paths=null) { //_batchQueue.Enqueue(paths); _syncEvent.Set(); } readonly ConcurrentQueue> _batchQueue=new ConcurrentQueue>(); /// /// Remote files are polled periodically. Any changes are processed /// /// /// public async Task PollRemoteFiles(DateTime? since = null) { if (Log.IsDebugEnabled) Log.DebugFormat("Polling changes after [{0}]",since); Debug.Assert(Thread.CurrentThread.IsBackground, "Polling Ended up in the main thread!"); //GC.Collect(); using (ThreadContext.Stacks["Retrieve Remote"].Push("All accounts")) { //If this poll fails, we will retry with the same since value var nextSince = since; try { await _unPauseEvent.WaitAsync(); UpdateStatus(PithosStatus.PollSyncing); var accountBatches=new Dictionary>(); IEnumerable batch = null; if (_batchQueue.TryDequeue(out batch) && batch != null) foreach (var account in _accounts.Values) { var accountBatch = batch.Where(path => path.IsAtOrBelow(account.AccountPath)); accountBatches[account.AccountKey] = accountBatch; } IEnumerable> tasks = new List>(); foreach(var accountInfo in _accounts.Values) { IEnumerable accountBatch ; accountBatches.TryGetValue(accountInfo.AccountKey,out accountBatch); ProcessAccountFiles (accountInfo, accountBatch, since); } var nextTimes=await TaskEx.WhenAll(tasks.ToList()); _firstPoll = false; //Reschedule the poll with the current timestamp as a "since" value if (nextTimes.Length>0) nextSince = nextTimes.Min(); if (Log.IsDebugEnabled) Log.DebugFormat("Next Poll at [{0}]",nextSince); } catch (Exception ex) { Log.ErrorFormat("Error while processing accounts\r\n{0}", ex); //In case of failure retry with the same "since" value } UpdateStatus(PithosStatus.PollComplete); //The multiple try blocks are required because we can't have an await call //inside a finally block //TODO: Find a more elegant solution for reschedulling in the event of an exception try { //Wait for the polling interval to pass or the Sync event to be signalled nextSince = await WaitForScheduledOrManualPoll(nextSince); } finally { //Ensure polling is scheduled even in case of error TaskEx.Run(() => PollRemoteFiles(nextSince)); } } } /// /// Wait for the polling period to expire or a manual sync request /// /// /// private async Task WaitForScheduledOrManualPoll(DateTime? since) { var sync = _syncEvent.WaitAsync(); var wait = TaskEx.Delay(TimeSpan.FromSeconds(Settings.PollingInterval)); var signaledTask = await TaskEx.WhenAny(sync, wait); //Pausing takes precedence over manual sync or awaiting _unPauseEvent.Wait(); //Wait for network processing to finish before polling var pauseTask=NetworkAgent.ProceedEvent.WaitAsync(); await TaskEx.WhenAll(signaledTask, pauseTask); //If polling is signalled by SynchNow, ignore the since tag if (sync.IsCompleted) { //TODO: Must convert to AutoReset _syncEvent.Reset(); return null; } return since; } public async Task ProcessAccountFiles(AccountInfo accountInfo, IEnumerable accountBatch, DateTime? since = null) { if (accountInfo == null) throw new ArgumentNullException("accountInfo"); if (String.IsNullOrWhiteSpace(accountInfo.AccountPath)) throw new ArgumentException("The AccountInfo.AccountPath is empty", "accountInfo"); Contract.EndContractBlock(); using (ThreadContext.Stacks["Retrieve Remote"].Push(accountInfo.UserName)) { await NetworkAgent.GetDeleteAwaiter(); Log.Info("Scheduled"); var client = new CloudFilesClient(accountInfo); //We don't need to check the trash container var containers = client.ListContainers(accountInfo.UserName) .Where(c=>c.Name!="trash") .ToList(); CreateContainerFolders(accountInfo, containers); //The nextSince time fallback time is the same as the current. //If polling succeeds, the next Since time will be the smallest of the maximum modification times //of the shared and account objects var nextSince = since; try { //Wait for any deletions to finish await NetworkAgent.GetDeleteAwaiter(); //Get the poll time now. We may miss some deletions but it's better to keep a file that was deleted //than delete a file that was created while we were executing the poll //Get the list of server objects changed since the last check //The name of the container is passed as state in order to create a dictionary of tasks in a subsequent step var listObjects = (from container in containers select Task>.Factory.StartNew(_ => client.ListObjects(accountInfo.UserName, container.Name, since), container.Name)).ToList(); var listShared = Task>.Factory.StartNew(_ => client.ListSharedObjects(since), "shared"); listObjects.Add(listShared); var listTasks = await Task.Factory.WhenAll(listObjects.ToArray()); using (ThreadContext.Stacks["SCHEDULE"].Push("Process Results")) { var dict = listTasks.ToDictionary(t => t.AsyncState); //Get all non-trash objects. Remember, the container name is stored in AsyncState var remoteObjects = (from objectList in listTasks where (string)objectList.AsyncState != "trash" from obj in objectList.Result orderby obj.Bytes ascending select obj).ToList(); //Get the latest remote object modification date, only if it is after //the original since date nextSince = GetLatestDateAfter(nextSince, remoteObjects); var sharedObjects = dict["shared"].Result; nextSince = GetLatestDateBefore(nextSince, sharedObjects); //DON'T process trashed files //If some files are deleted and added again to a folder, they will be deleted //even though they are new. //We would have to check file dates and hashes to ensure that a trashed file //can be deleted safely from the local hard drive. /* //Items with the same name, hash may be both in the container and the trash //Don't delete items that exist in the container var realTrash = from trash in trashObjects where !remoteObjects.Any( info => info.Name == trash.Name && info.Hash == trash.Hash) 8 select trash; ProcessTrashedFiles(accountInfo, realTrash); */ var cleanRemotes = (from info in remoteObjects.Union(sharedObjects) let name = info.Name??"" where !name.EndsWith(".ignore", StringComparison.InvariantCultureIgnoreCase) && !name.StartsWith(FolderConstants.CacheFolder + "/", StringComparison.InvariantCultureIgnoreCase) select info).ToList(); if (_firstPoll) StatusKeeper.CleanupOrphanStates(); StatusKeeper.CleanupStaleStates(accountInfo, cleanRemotes); //var differencer = _differencer.PostSnapshot(accountInfo, cleanRemotes); //var filterUris = Selectives.SelectiveUris[accountInfo.AccountKey]; //Get the local files here var agent = AgentLocator.Get(accountInfo.AccountPath); var files = LoadLocalFileTuples(accountInfo); var states = FileState.Queryable.ToList(); var infos = (from remote in cleanRemotes let path = remote.RelativeUrlToFilePath(accountInfo.UserName) let info=agent.GetFileSystemInfo(path) select Tuple.Create(info.FullName,remote)) .ToList(); var token = _currentOperationCancellation.Token; var tuples = MergeSources(infos, files, states).ToList(); var stateTuples = accountBatch==null?tuples:tuples.Where(t => accountBatch.Contains(t.FilePath)); foreach (var tuple in stateTuples) { await _unPauseEvent.WaitAsync(); //Set the Merkle Hash SetMerkleHash(accountInfo, tuple); SyncSingleItem(accountInfo, tuple, agent, token); } //On the first run /* if (_firstPoll) { MarkSuspectedDeletes(accountInfo, cleanRemotes); } */ Log.Info("[LISTENER] End Processing"); } } catch (Exception ex) { Log.ErrorFormat("[FAIL] ListObjects for{0} in ProcessRemoteFiles with {1}", accountInfo.UserName, ex); return nextSince; } Log.Info("[LISTENER] Finished"); return nextSince; } } private static void SetMerkleHash(AccountInfo accountInfo, StateTuple tuple) { //The Merkle hash for directories is that of an empty buffer if (tuple.FileInfo is DirectoryInfo) tuple.C = MERKLE_EMPTY; else if (tuple.FileState != null && tuple.MD5 == tuple.FileState.ShortHash) { //If there is a state whose MD5 matches, load the merkle hash from the file state //insteaf of calculating it tuple.C = tuple.FileState.Checksum; } else { tuple.Merkle = Signature.CalculateTreeHash(tuple.FileInfo, accountInfo.BlockSize, accountInfo.BlockHash); //tuple.C=tuple.Merkle.TopHash.ToHashString(); } } private static List> LoadLocalFileTuples(AccountInfo accountInfo) { using (ThreadContext.Stacks["Account Files Hashing"].Push(accountInfo.UserName)) { var localInfos = AgentLocator.Get(accountInfo.AccountPath).EnumerateFileSystemInfos(); //Use the queue to retry locked file hashing var fileQueue = new Queue(localInfos); var hasher = MD5.Create(); var results = new List>(); while (fileQueue.Count > 0) { var file = fileQueue.Dequeue(); using (ThreadContext.Stacks["File"].Push(file.FullName)) { /* Signature.CalculateTreeHash(file, accountInfo.BlockSize, accountInfo.BlockHash). TopHash.ToHashString() */ try { //Replace MD5 here, do the calc while syncing individual files string hash ; if (file is DirectoryInfo) hash = MERKLE_EMPTY; else { using (StatusNotification.GetNotifier("Hashing ", "{0}", file.Name)) using (var stream = (file as FileInfo).OpenRead()) { hash = hasher.ComputeHash(stream).ToHashString(); } } results.Add(Tuple.Create(file, hash)); } catch (IOException exc) { Log.WarnFormat("[HASH] File in use, will retry [{0}]", exc); fileQueue.Enqueue(file); } } } return results; } } private void SyncSingleItem(AccountInfo accountInfo, StateTuple tuple, FileAgent agent, CancellationToken token) { Log.DebugFormat("Sync [{0}] C:[{1}] L:[{2}] S:[{3}]",tuple.FilePath,tuple.C,tuple.L,tuple.S); var localFilePath = tuple.FilePath; //Don't use the tuple info, it may have been deleted var localInfo = FileInfoExtensions.FromPath(localFilePath); var isUnselectedRootFolder = agent.IsUnselectedRootFolder(tuple.FilePath); //Unselected root folders that have not yet been uploaded should be uploaded and added to the //selective folders if (!Selectives.IsSelected(accountInfo, localFilePath) && !(isUnselectedRootFolder && tuple.ObjectInfo==null) ) return; // Local file unchanged? If both C and L are null, make sure it's because //both the file is missing and the state checksum is not missing if (tuple.C == tuple.L /*&& (localInfo.Exists || tuple.FileState == null)*/) { //No local changes //Server unchanged? if (tuple.S == tuple.L) { // No server changes //Has the file been renamed on the server? MoveForServerMove(accountInfo, tuple); } else { //Different from server //Does the server file exist? if (tuple.S == null) { //Server file doesn't exist //deleteObjectFromLocal() StatusKeeper.SetFileState(localFilePath, FileStatus.Deleted, FileOverlayStatus.Deleted, ""); agent.Delete(localFilePath); //updateRecord(Remove C, L) StatusKeeper.ClearFileStatus(localFilePath); } else { //Server file exists //downloadServerObject() // Result: L = S //If the file has moved on the server, move it locally before downloading var targetPath = MoveForServerMove(accountInfo, tuple); StatusKeeper.SetFileState(targetPath, FileStatus.Modified, FileOverlayStatus.Modified, ""); NetworkAgent.Downloader.DownloadCloudFile(accountInfo, tuple.ObjectInfo, targetPath, tuple.Merkle, token).Wait(token); //updateRecord( L = S ) StatusKeeper.UpdateFileChecksum(targetPath, tuple.ObjectInfo.ETag, tuple.ObjectInfo.X_Object_Hash); StatusKeeper.StoreInfo(targetPath, tuple.ObjectInfo); /* StatusKeeper.SetFileState(targetPath, FileStatus.Unchanged, FileOverlayStatus.Normal, ""); */ } } } else { //Local changes found //Server unchanged? if (tuple.S == tuple.L) { //The FileAgent selective sync checks for new root folder files if (!agent.Ignore(localFilePath)) { if ((tuple.C == null || !localInfo.Exists) && tuple.ObjectInfo != null) { //deleteObjectFromServer() DeleteCloudFile(accountInfo, tuple); //updateRecord( Remove L, S) } else { //uploadLocalObject() // Result: S = C, L = S //Debug.Assert(tuple.FileState !=null); var action = new CloudUploadAction(accountInfo, localInfo, tuple.FileState, accountInfo.BlockSize, accountInfo.BlockHash, "Poll", isUnselectedRootFolder); NetworkAgent.Uploader.UploadCloudFile(action, tuple.Merkle, token).Wait(token); //updateRecord( S = C ) //State updated by the uploader if (isUnselectedRootFolder) { ProcessChildren(accountInfo, tuple, agent, token); } } } } else { if (tuple.C == tuple.S) { // (Identical Changes) Result: L = S //doNothing() //Detect server moves var targetPath = MoveForServerMove(accountInfo, tuple); StatusKeeper.StoreInfo(targetPath, tuple.ObjectInfo); } else { if ((tuple.C == null || !localInfo.Exists) && tuple.ObjectInfo != null) { //deleteObjectFromServer() DeleteCloudFile(accountInfo, tuple); //updateRecord(Remove L, S) } //If both the local and server files are missing, the state is stale else if (!localInfo.Exists && (tuple.S == null || tuple.ObjectInfo == null)) { StatusKeeper.ClearFileStatus(localInfo.FullName); } else { ReportConflictForMismatch(localFilePath); //identifyAsConflict() // Manual action required } } } } } private string MoveForServerMove(AccountInfo accountInfo, StateTuple tuple) { if (tuple.ObjectInfo == null) return null; var relativePath = tuple.ObjectInfo.RelativeUrlToFilePath(accountInfo.UserName); var serverPath = Path.Combine(accountInfo.AccountPath, relativePath); //Compare Case Insensitive if (String.Equals(tuple.FilePath ,serverPath,StringComparison.InvariantCultureIgnoreCase)) return serverPath; if (tuple.FileInfo.Exists) { var fi = tuple.FileInfo as FileInfo; if (fi != null) fi.MoveTo(serverPath); var di = tuple.FileInfo as DirectoryInfo; if (di != null) di.MoveTo(serverPath); StatusKeeper.StoreInfo(serverPath, tuple.ObjectInfo); } else { Debug.Assert(false, "File does not exist"); } return serverPath; } private void DeleteCloudFile(AccountInfo accountInfo, StateTuple tuple) { StatusKeeper.SetFileState(tuple.FilePath, FileStatus.Deleted, FileOverlayStatus.Deleted, ""); NetworkAgent.DeleteAgent.DeleteCloudFile(accountInfo, tuple.ObjectInfo); StatusKeeper.ClearFileStatus(tuple.FilePath); } private void ProcessChildren(AccountInfo accountInfo, StateTuple tuple, FileAgent agent, CancellationToken token) { var dirInfo = tuple.FileInfo as DirectoryInfo; var folderTuples = from folder in dirInfo.EnumerateDirectories("*", SearchOption.AllDirectories) select new StateTuple(folder); var fileTuples = from file in dirInfo.EnumerateFiles("*", SearchOption.AllDirectories) select new StateTuple(file); //Process folders first, to ensure folders appear on the sever as soon as possible folderTuples.ApplyAction(t => SyncSingleItem(accountInfo, t, agent, token)); fileTuples.ApplyAction(t => SyncSingleItem(accountInfo, t, agent, token)); } private static IEnumerable MergeSources( IEnumerable> infos, IEnumerable> files, IEnumerable states) { var tuplesByPath = new Dictionary(); foreach (var file in files) { var fsInfo = file.Item1; var fileHash = fsInfo is DirectoryInfo? MERKLE_EMPTY:file.Item2; tuplesByPath[fsInfo.FullName] = new StateTuple {FileInfo = fsInfo, MD5 = fileHash}; } foreach (var state in states) { StateTuple hashTuple; if (tuplesByPath.TryGetValue(state.FilePath, out hashTuple)) { hashTuple.FileState = state; } else { var fsInfo = FileInfoExtensions.FromPath(state.FilePath); tuplesByPath[state.FilePath] = new StateTuple {FileInfo = fsInfo, FileState = state}; } } var tuplesByID = tuplesByPath.Values .Where(tuple => tuple.FileState != null && tuple.FileState.ObjectID!=null) .ToDictionary(tuple=>tuple.FileState.ObjectID,tuple=>tuple);//new Dictionary(); foreach (var info in infos) { StateTuple hashTuple; var filePath = info.Item1; var objectInfo = info.Item2; var objectID = objectInfo.UUID; if (tuplesByID.TryGetValue(objectID, out hashTuple)) { hashTuple.ObjectInfo = objectInfo; } else if (tuplesByPath.TryGetValue(filePath, out hashTuple)) { hashTuple.ObjectInfo = objectInfo; } else { var fsInfo = FileInfoExtensions.FromPath(filePath); var tuple = new StateTuple {FileInfo = fsInfo, ObjectInfo = objectInfo}; tuplesByPath[filePath] = tuple; tuplesByID[objectInfo.UUID] = tuple; } } return tuplesByPath.Values; } /// /// Returns the latest LastModified date from the list of objects, but only if it is before /// than the threshold value /// /// /// /// private static DateTime? GetLatestDateBefore(DateTime? threshold, IList cloudObjects) { DateTime? maxDate = null; if (cloudObjects!=null && cloudObjects.Count > 0) maxDate = cloudObjects.Max(obj => obj.Last_Modified); if (maxDate == null || maxDate == DateTime.MinValue) return threshold; if (threshold == null || threshold == DateTime.MinValue || threshold > maxDate) return maxDate; return threshold; } /// /// Returns the latest LastModified date from the list of objects, but only if it is after /// the threshold value /// /// /// /// private static DateTime? GetLatestDateAfter(DateTime? threshold, IList cloudObjects) { DateTime? maxDate = null; if (cloudObjects!=null && cloudObjects.Count > 0) maxDate = cloudObjects.Max(obj => obj.Last_Modified); if (maxDate == null || maxDate == DateTime.MinValue) return threshold; if (threshold == null || threshold == DateTime.MinValue || threshold < maxDate) return maxDate; return threshold; } readonly AccountsDifferencer _differencer = new AccountsDifferencer(); private Dictionary> _selectiveUris = new Dictionary>(); private bool _pause; private static string MERKLE_EMPTY = "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855"; /// /// Deletes local files that are not found in the list of cloud files /// /// /// private void ProcessDeletedFiles(AccountInfo accountInfo, IEnumerable cloudFiles) { if (accountInfo == null) throw new ArgumentNullException("accountInfo"); if (String.IsNullOrWhiteSpace(accountInfo.AccountPath)) throw new ArgumentException("The AccountInfo.AccountPath is empty", "accountInfo"); if (cloudFiles == null) throw new ArgumentNullException("cloudFiles"); Contract.EndContractBlock(); var deletedFiles = new List(); foreach (var objectInfo in cloudFiles) { if (Log.IsDebugEnabled) Log.DebugFormat("Handle deleted [{0}]", objectInfo.Uri); var relativePath = objectInfo.RelativeUrlToFilePath(accountInfo.UserName); var item = FileAgent.GetFileAgent(accountInfo).GetFileSystemInfo(relativePath); if (Log.IsDebugEnabled) Log.DebugFormat("Will delete [{0}] for [{1}]", item.FullName, objectInfo.Uri); if (item.Exists) { if ((item.Attributes & FileAttributes.ReadOnly) == FileAttributes.ReadOnly) { item.Attributes = item.Attributes & ~FileAttributes.ReadOnly; } Log.DebugFormat("Deleting {0}", item.FullName); var directory = item as DirectoryInfo; if (directory != null) directory.Delete(true); else item.Delete(); Log.DebugFormat("Deleted [{0}] for [{1}]", item.FullName, objectInfo.Uri); DateTime lastDate; _lastSeen.TryRemove(item.FullName, out lastDate); deletedFiles.Add(item); } StatusKeeper.SetFileState(item.FullName, FileStatus.Deleted, FileOverlayStatus.Deleted, "File Deleted"); } Log.InfoFormat("[{0}] files were deleted", deletedFiles.Count); StatusNotification.NotifyForFiles(deletedFiles, String.Format("{0} files were deleted", deletedFiles.Count), TraceLevel.Info); } private void MarkSuspectedDeletes(AccountInfo accountInfo, IEnumerable cloudFiles) { //Only consider files that are not being modified, ie they are in the Unchanged state var deleteCandidates = FileState.Queryable.Where(state => state.FilePath.StartsWith(accountInfo.AccountPath) && state.FileStatus == FileStatus.Unchanged).ToList(); //TODO: filesToDelete must take into account the Others container var filesToDelete = (from deleteCandidate in deleteCandidates let localFile = FileInfoExtensions.FromPath(deleteCandidate.FilePath) let relativeFilePath = localFile.AsRelativeTo(accountInfo.AccountPath) where !cloudFiles.Any(r => r.RelativeUrlToFilePath(accountInfo.UserName) == relativeFilePath) select localFile).ToList(); //Set the status of missing files to Conflict foreach (var item in filesToDelete) { //Try to acquire a gate on the file, to take into account files that have been dequeued //and are being processed using (var gate = NetworkGate.Acquire(item.FullName, NetworkOperation.Deleting)) { if (gate.Failed) continue; StatusKeeper.SetFileState(item.FullName, FileStatus.Conflict, FileOverlayStatus.Deleted, "Local file missing from server"); } } UpdateStatus(PithosStatus.HasConflicts); StatusNotification.NotifyConflicts(filesToDelete, String.Format( "{0} local files are missing from Pithos, possibly because they were deleted", filesToDelete.Count)); StatusNotification.NotifyForFiles(filesToDelete, String.Format("{0} files were deleted", filesToDelete.Count), TraceLevel.Info); } private void ReportConflictForMismatch(string localFilePath) { if (String.IsNullOrWhiteSpace(localFilePath)) throw new ArgumentNullException("localFilePath"); Contract.EndContractBlock(); StatusKeeper.SetFileState(localFilePath, FileStatus.Conflict, FileOverlayStatus.Conflict, "File changed at the server"); UpdateStatus(PithosStatus.HasConflicts); var message = String.Format("Conflict detected for file {0}", localFilePath); Log.Warn(message); StatusNotification.NotifyChange(message, TraceLevel.Warning); } /// /// Creates a Sync action for each changed server file /// /// /// /// private IEnumerable ChangesToActions(AccountInfo accountInfo, IEnumerable changes) { if (changes == null) throw new ArgumentNullException(); Contract.EndContractBlock(); var fileAgent = FileAgent.GetFileAgent(accountInfo); //In order to avoid multiple iterations over the files, we iterate only once //over the remote files foreach (var objectInfo in changes) { var relativePath = objectInfo.RelativeUrlToFilePath(accountInfo.UserName); //If a directory object already exists, we may need to sync it if (fileAgent.Exists(relativePath)) { var localFile = fileAgent.GetFileSystemInfo(relativePath); //We don't need to sync directories if (objectInfo.IsDirectory && localFile is DirectoryInfo) continue; using (new SessionScope(FlushAction.Never)) { var state = StatusKeeper.GetStateByFilePath(localFile.FullName); _lastSeen[localFile.FullName] = DateTime.Now; //Common files should be checked on a per-case basis to detect differences, which is newer yield return new CloudAction(accountInfo, CloudActionType.MustSynch, localFile, objectInfo, state, accountInfo.BlockSize, accountInfo.BlockHash,"Poll Changes"); } } else { //Remote files should be downloaded yield return new CloudDownloadAction(accountInfo, objectInfo,"Poll Changes"); } } } /// /// Creates a Local Move action for each moved server file /// /// /// /// private IEnumerable MovesToActions(AccountInfo accountInfo, IEnumerable moves) { if (moves == null) throw new ArgumentNullException(); Contract.EndContractBlock(); var fileAgent = FileAgent.GetFileAgent(accountInfo); //In order to avoid multiple iterations over the files, we iterate only once //over the remote files foreach (var objectInfo in moves) { var previousRelativepath = objectInfo.Previous.RelativeUrlToFilePath(accountInfo.UserName); //If the previous file already exists, we can execute a Move operation if (fileAgent.Exists(previousRelativepath)) { var previousFile = fileAgent.GetFileSystemInfo(previousRelativepath); using (new SessionScope(FlushAction.Never)) { var state = StatusKeeper.GetStateByFilePath(previousFile.FullName); _lastSeen[previousFile.FullName] = DateTime.Now; //For each moved object we need to move both the local file and update yield return new CloudAction(accountInfo, CloudActionType.RenameLocal, previousFile, objectInfo, state, accountInfo.BlockSize, accountInfo.BlockHash,"Poll Moves"); //For modified files, we need to download the changes as well if (objectInfo.X_Object_Hash != objectInfo.PreviousHash) yield return new CloudDownloadAction(accountInfo,objectInfo, "Poll Moves"); } } //If the previous file does not exist, we need to download it in the new location else { //Remote files should be downloaded yield return new CloudDownloadAction(accountInfo, objectInfo, "Poll Moves"); } } } /// /// Creates a download action for each new server file /// /// /// /// private IEnumerable CreatesToActions(AccountInfo accountInfo, IEnumerable creates) { if (creates == null) throw new ArgumentNullException(); Contract.EndContractBlock(); var fileAgent = FileAgent.GetFileAgent(accountInfo); //In order to avoid multiple iterations over the files, we iterate only once //over the remote files foreach (var objectInfo in creates) { if (Log.IsDebugEnabled) Log.DebugFormat("[NEW INFO] {0}",objectInfo.Uri); var relativePath = objectInfo.RelativeUrlToFilePath(accountInfo.UserName); //If the object already exists, we should check before uploading or downloading if (fileAgent.Exists(relativePath)) { var localFile= fileAgent.GetFileSystemInfo(relativePath); var state = StatusKeeper.GetStateByFilePath(localFile.WithProperCapitalization().FullName); yield return new CloudAction(accountInfo, CloudActionType.MustSynch, localFile, objectInfo, state, accountInfo.BlockSize, accountInfo.BlockHash,"Poll Creates"); } else { //Remote files should be downloaded yield return new CloudDownloadAction(accountInfo, objectInfo,"Poll Creates"); } } } /// /// Notify the UI to update the visual status /// /// private void UpdateStatus(PithosStatus status) { try { StatusNotification.SetPithosStatus(status); //StatusNotification.Notify(new Notification()); } catch (Exception exc) { //Failure is not critical, just log it Log.Warn("Error while updating status", exc); } } private static void CreateContainerFolders(AccountInfo accountInfo, IEnumerable containers) { var containerPaths = from container in containers let containerPath = Path.Combine(accountInfo.AccountPath, container.Name) where container.Name != FolderConstants.TrashContainer && !Directory.Exists(containerPath) select containerPath; foreach (var path in containerPaths) { Directory.CreateDirectory(path); } } public void AddAccount(AccountInfo accountInfo) { //Avoid adding a duplicate accountInfo _accounts.TryAdd(accountInfo.AccountKey, accountInfo); } public void RemoveAccount(AccountInfo accountInfo) { AccountInfo account; _accounts.TryRemove(accountInfo.AccountKey, out account); SnapshotDifferencer differencer; _differencer.Differencers.TryRemove(accountInfo.AccountKey, out differencer); } public void SetSelectivePaths(AccountInfo accountInfo,Uri[] added, Uri[] removed) { AbortRemovedPaths(accountInfo,removed); //DownloadNewPaths(accountInfo,added); } /* private void DownloadNewPaths(AccountInfo accountInfo, Uri[] added) { var client = new CloudFilesClient(accountInfo); foreach (var folderUri in added) { try { string account; string container; var segmentsCount = folderUri.Segments.Length; //Is this an account URL? if (segmentsCount < 3) continue; //Is this a container or folder URL? if (segmentsCount == 3) { account = folderUri.Segments[1].TrimEnd('/'); container = folderUri.Segments[2].TrimEnd('/'); } else { account = folderUri.Segments[2].TrimEnd('/'); container = folderUri.Segments[3].TrimEnd('/'); } IList items; if (segmentsCount > 3) { //List folder var folder = String.Join("", folderUri.Segments.Splice(4)); items = client.ListObjects(account, container, folder); } else { //List container items = client.ListObjects(account, container); } var actions = CreatesToActions(accountInfo, items); foreach (var action in actions) { NetworkAgent.Post(action); } } catch (Exception exc) { Log.WarnFormat("Listing of new selective path [{0}] failed with \r\n{1}", folderUri, exc); } } //Need to get a listing of each of the URLs, then post them to the NetworkAgent //CreatesToActions(accountInfo,) /* NetworkAgent.Post();#1# } */ private void AbortRemovedPaths(AccountInfo accountInfo, Uri[] removed) { /*this.NetworkAgent.*/ } } }