X-Git-Url: https://code.grnet.gr/git/pithos-ms-client/blobdiff_plain/6f03d6e1b8b682db85b9515d423a57ad89d1fd93..88e2300a132f903fe892721360f12ec0ebb0824a:/trunk/Pithos.Core/Agents/PollAgent.cs diff --git a/trunk/Pithos.Core/Agents/PollAgent.cs b/trunk/Pithos.Core/Agents/PollAgent.cs index 3d3cb1b..6d5d89b 100644 --- a/trunk/Pithos.Core/Agents/PollAgent.cs +++ b/trunk/Pithos.Core/Agents/PollAgent.cs @@ -48,7 +48,6 @@ using System.IO; using System.Reflection; using System.Threading; using System.Threading.Tasks; -using Castle.ActiveRecord; using Pithos.Interfaces; using Pithos.Network; using log4net; @@ -59,6 +58,13 @@ namespace Pithos.Core.Agents using System.Collections.Generic; using System.Linq; + /*public class PollRequest + { + public DateTime? Since { get; set; } + public IEnumerable Batch { get; set; } + }*/ + + /// /// PollAgent periodically polls the server to detect object changes. The agent retrieves a listing of all /// objects and compares it with a previously cached version to detect differences. @@ -79,58 +85,185 @@ namespace Pithos.Core.Agents [System.ComponentModel.Composition.Import] public NetworkAgent NetworkAgent { get; set; } + [System.ComponentModel.Composition.Import] + public Selectives Selectives { get; set; } + public IStatusNotification StatusNotification { get; set; } + private CancellationTokenSource _currentOperationCancellation = new CancellationTokenSource(); + + public void CancelCurrentOperation() + { + //What does it mean to cancel the current upload/download? + //Obviously, the current operation will be cancelled by throwing + //a cancellation exception. + // + //The default behavior is to retry any operations that throw. + //Obviously this is not what we want in this situation. + //The cancelled operation should NOT bea retried. + // + //This can be done by catching the cancellation exception + //and avoiding the retry. + // + + //Have to reset the cancellation source - it is not possible to reset the source + //Have to prevent a case where an operation requests a token from the old source + var oldSource = Interlocked.Exchange(ref _currentOperationCancellation, new CancellationTokenSource()); + oldSource.Cancel(); + + } + + public bool Pause + { + get { + return _pause; + } + set { + _pause = value; + if (!_pause) + _unPauseEvent.Set(); + else + { + _unPauseEvent.Reset(); + } + } + } + + public CancellationToken CancellationToken + { + get { return _currentOperationCancellation.Token; } + } + private bool _firstPoll = true; //The Sync Event signals a manual synchronisation private readonly AsyncManualResetEvent _syncEvent = new AsyncManualResetEvent(); + private readonly AsyncManualResetEvent _unPauseEvent = new AsyncManualResetEvent(true); + private readonly ConcurrentDictionary _lastSeen = new ConcurrentDictionary(); - private readonly ConcurrentDictionary _accounts = new ConcurrentDictionary(); + private readonly ConcurrentDictionary _accounts = new ConcurrentDictionary(); + + //private readonly ActionBlock _pollAction; + readonly HashSet _knownContainers = new HashSet(); + /// /// Start a manual synchronization /// - public void SynchNow() - { - _syncEvent.Set(); + public void SynchNow(IEnumerable paths=null) + { + _batchQueue.Enqueue(paths); + _syncEvent.SetAsync(); + + //_pollAction.Post(new PollRequest {Batch = paths}); + } + + /// + /// Start a manual synchronization + /// + public Task SynchNowAsync(IEnumerable paths=null) + { + _batchQueue.Enqueue(paths); + return _syncEvent.SetAsync(); + + //_pollAction.Post(new PollRequest {Batch = paths}); + } + + readonly ConcurrentQueue> _batchQueue=new ConcurrentQueue>(); + + ConcurrentDictionary _moves=new ConcurrentDictionary(); + + public void PostMove(MovedEventArgs args) + { + TaskEx.Run(() => _moves.AddOrUpdate(args.OldFullPath, args,(s,e)=>e)); } + + private bool _hasConnection; + /// /// Remote files are polled periodically. Any changes are processed /// /// /// - public async Task PollRemoteFiles(DateTime? since = null) + public async Task PollRemoteFiles(DateTimeOffset? since = null) { if (Log.IsDebugEnabled) Log.DebugFormat("Polling changes after [{0}]",since); Debug.Assert(Thread.CurrentThread.IsBackground, "Polling Ended up in the main thread!"); - + //GC.Collect(); + + using (ThreadContext.Stacks["Retrieve Remote"].Push("All accounts")) { //If this poll fails, we will retry with the same since value - var nextSince = since; + DateTimeOffset? nextSince = since; try { + _unPauseEvent.Wait(); UpdateStatus(PithosStatus.PollSyncing); - var tasks = from accountInfo in _accounts.Values - select ProcessAccountFiles(accountInfo, since); + if (!NetworkAgent.IsConnectedToInternet) + { + if (_hasConnection) + { + StatusNotification.Notify(new Notification + { + Level = TraceLevel.Error, + Title = "Internet Connection problem", + Message ="Internet connectivity was lost. Synchronization will continue when connectivity is restored" + }); + } + _hasConnection = false; + } + else + { + if (!_hasConnection) + { + StatusNotification.Notify(new Notification + { + Level = TraceLevel.Info, + Title = "Internet Connection", + Message = "Internet connectivity restored." + }); + } + _hasConnection = true; + + var accountBatches = new Dictionary>(); + IEnumerable batch = null; + if (_batchQueue.TryDequeue(out batch) && batch != null) + foreach (var account in _accounts.Values) + { + var accountBatch = batch.Where(path => path.IsAtOrBelow(account.AccountPath)); + accountBatches[account.AccountKey] = accountBatch; + } + + var moves = Interlocked.Exchange(ref _moves, new ConcurrentDictionary()); - var nextTimes=await TaskEx.WhenAll(tasks.ToList()); + var tasks = new List>(); + foreach (var accountInfo in _accounts.Values) + { + IEnumerable accountBatch; + accountBatches.TryGetValue(accountInfo.AccountKey, out accountBatch); + var t = ProcessAccountFiles(accountInfo, accountBatch, moves, since); + tasks.Add(t); + } - _firstPoll = false; - //Reschedule the poll with the current timestamp as a "since" value + var taskList = tasks.ToList(); + var nextTimes = await TaskEx.WhenAll(taskList).ConfigureAwait(false); - if (nextTimes.Length>0) - nextSince = nextTimes.Min(); - if (Log.IsDebugEnabled) - Log.DebugFormat("Next Poll at [{0}]",nextSince); + _firstPoll = false; + //Reschedule the poll with the current timestamp as a "since" value + + if (nextTimes.Length > 0) + nextSince = nextTimes.Min(); + if (Log.IsDebugEnabled) + Log.DebugFormat("Next Poll for changes since [{0}]", nextSince); + } } catch (Exception ex) { @@ -145,12 +278,15 @@ namespace Pithos.Core.Agents try { //Wait for the polling interval to pass or the Sync event to be signalled - nextSince = await WaitForScheduledOrManualPoll(nextSince); + nextSince = await WaitForScheduledOrManualPoll(nextSince).ConfigureAwait(false); } finally { //Ensure polling is scheduled even in case of error - TaskEx.Run(() => PollRemoteFiles(nextSince)); +#pragma warning disable 4014 + TaskEx.Run(()=>PollRemoteFiles(nextSince)); +#pragma warning restore 4014 + //_pollAction.Post(new PollRequest {Since = nextSince}); } } } @@ -160,27 +296,35 @@ namespace Pithos.Core.Agents /// /// /// - private async Task WaitForScheduledOrManualPoll(DateTime? since) + private async Task WaitForScheduledOrManualPoll(DateTimeOffset? since) { var sync = _syncEvent.WaitAsync(); - var wait = TaskEx.Delay(TimeSpan.FromSeconds(Settings.PollingInterval), NetworkAgent.CancellationToken); - var signaledTask = await TaskEx.WhenAny(sync, wait); + var delay = TimeSpan.FromSeconds(Settings.PollingInterval); + if (Log.IsDebugEnabled) + Log.DebugFormat("Next Poll at [{0}]", DateTime.Now.Add(delay)); + var wait = TaskEx.Delay(delay); + var signaledTask = await TaskEx.WhenAny(sync, wait).ConfigureAwait(false); + + //Pausing takes precedence over manual sync or awaiting + _unPauseEvent.Wait(); + //Wait for network processing to finish before polling var pauseTask=NetworkAgent.ProceedEvent.WaitAsync(); - await TaskEx.WhenAll(signaledTask, pauseTask); + await TaskEx.WhenAll(signaledTask, pauseTask).ConfigureAwait(false); //If polling is signalled by SynchNow, ignore the since tag if (sync.IsCompleted) - { - //TODO: Must convert to AutoReset + { _syncEvent.Reset(); return null; } return since; } - public async Task ProcessAccountFiles(AccountInfo accountInfo, DateTime? since = null) + + + public async Task ProcessAccountFiles(AccountInfo accountInfo, IEnumerable accountBatch, ConcurrentDictionary moves, DateTimeOffset? since = null) { if (accountInfo == null) throw new ArgumentNullException("accountInfo"); @@ -189,17 +333,18 @@ namespace Pithos.Core.Agents Contract.EndContractBlock(); - using (log4net.ThreadContext.Stacks["Retrieve Remote"].Push(accountInfo.UserName)) + using (ThreadContext.Stacks["Retrieve Remote"].Push(accountInfo.UserName)) { - await NetworkAgent.GetDeleteAwaiter(); + await NetworkAgent.GetDeleteAwaiter().ConfigureAwait(false); Log.Info("Scheduled"); var client = new CloudFilesClient(accountInfo); //We don't need to check the trash container - var containers = client.ListContainers(accountInfo.UserName) - .Where(c=>c.Name!="trash") + var allContainers=await client.ListContainers(accountInfo.UserName).ConfigureAwait(false); + var containers = allContainers + .Where(c=>c.Name.ToString()!="trash") .ToList(); @@ -208,42 +353,52 @@ namespace Pithos.Core.Agents //The nextSince time fallback time is the same as the current. //If polling succeeds, the next Since time will be the smallest of the maximum modification times //of the shared and account objects - var nextSince = since; + DateTimeOffset? nextSince = since; try { //Wait for any deletions to finish - await NetworkAgent.GetDeleteAwaiter(); + await NetworkAgent.GetDeleteAwaiter().ConfigureAwait(false); //Get the poll time now. We may miss some deletions but it's better to keep a file that was deleted //than delete a file that was created while we were executing the poll + var token = _currentOperationCancellation.Token; + //Get the list of server objects changed since the last check //The name of the container is passed as state in order to create a dictionary of tasks in a subsequent step var listObjects = (from container in containers select Task>.Factory.StartNew(_ => - client.ListObjects(accountInfo.UserName, container.Name, since), container.Name)).ToList(); - - var listShared = Task>.Factory.StartNew(_ => - client.ListSharedObjects(since), "shared"); + client.ListObjects(accountInfo.UserName, container.Name, since), container.Name,token)).ToList(); + + var selectiveEnabled = Selectives.IsSelectiveEnabled(accountInfo.AccountKey); + var listShared = selectiveEnabled? + Task>.Factory.StartNew(_ => + client.ListSharedObjects(_knownContainers,since), "shared",token) + :Task.Factory.FromResult((IList) new List(),"shared"); + listObjects.Add(listShared); - var listTasks = await Task.Factory.WhenAll(listObjects.ToArray()); + var listTasks = await Task.Factory.WhenAll(listObjects.ToArray()).ConfigureAwait(false); - using (log4net.ThreadContext.Stacks["SCHEDULE"].Push("Process Results")) + using (ThreadContext.Stacks["SCHEDULE"].Push("Process Results")) { + + //In case of cancellation, retry for the current date + if (token.IsCancellationRequested) return since; + var dict = listTasks.ToDictionary(t => t.AsyncState); //Get all non-trash objects. Remember, the container name is stored in AsyncState var remoteObjects = (from objectList in listTasks - where (string)objectList.AsyncState != "trash" + where objectList.AsyncState.ToString() != "trash" from obj in objectList.Result + orderby obj.Bytes ascending select obj).ToList(); //Get the latest remote object modification date, only if it is after - //the original since date + //the original since date nextSince = GetLatestDateAfter(nextSince, remoteObjects); var sharedObjects = dict["shared"].Result; - nextSince = GetLatestDateBefore(nextSince, sharedObjects); //DON'T process trashed files //If some files are deleted and added again to a folder, they will be deleted @@ -257,49 +412,93 @@ namespace Pithos.Core.Agents where !remoteObjects.Any( info => info.Name == trash.Name && info.Hash == trash.Hash) - select trash; + 8 select trash; ProcessTrashedFiles(accountInfo, realTrash); */ var cleanRemotes = (from info in remoteObjects.Union(sharedObjects) - let name = info.Name??"" + let name = info.Name.ToUnescapedString()??"" where !name.EndsWith(".ignore", StringComparison.InvariantCultureIgnoreCase) && !name.StartsWith(FolderConstants.CacheFolder + "/", StringComparison.InvariantCultureIgnoreCase) select info).ToList(); + //In case of cancellation, retry for the current date + if (token.IsCancellationRequested) return since; + + if (_firstPoll) + StatusKeeper.CleanupOrphanStates(); + var differencer = _differencer.PostSnapshot(accountInfo, cleanRemotes); + var currentRemotes = differencer.Current.ToList(); - ProcessDeletedFiles(accountInfo, differencer.Deleted.FilterDirectlyBelow(SelectiveUris)); + //In case of cancellation, retry for the current date + if (token.IsCancellationRequested) return since; + + StatusKeeper.CleanupStaleStates(accountInfo, currentRemotes); - // @@@ NEED To add previous state here as well, To compare with previous hash + //var filterUris = Selectives.SelectiveUris[accountInfo.AccountKey]; - + //May have to wait if the FileAgent has asked for a Pause, due to local changes + await _unPauseEvent.WaitAsync().ConfigureAwait(false); + + //In case of cancellation, retry for the current date + if (token.IsCancellationRequested) return since; + + //Get the local files here + var agent = AgentLocator.Get(accountInfo.AccountPath); + var files = LoadLocalFileTuples(accountInfo, accountBatch); - //Create a list of actions from the remote files - var allActions = MovesToActions(accountInfo,differencer.Moved.FilterDirectlyBelow(SelectiveUris)) - .Union( - ChangesToActions(accountInfo, differencer.Changed.FilterDirectlyBelow(SelectiveUris))) - .Union( - CreatesToActions(accountInfo, differencer.Created.FilterDirectlyBelow(SelectiveUris))); - //And remove those that are already being processed by the agent - var distinctActions = allActions - .Except(NetworkAgent.GetEnumerable(), new PithosMonitor.LocalFileComparer()) - .ToList(); + + //WARNING: GetFileSystemInfo may create state entries. + //TODO: Find a different way to create the tuples and block long filenames + var infos = (from remote in currentRemotes + let path = remote.RelativeUrlToFilePath(accountInfo.UserName) + let info=agent.GetFileSystemInfo(path) + where info != null + select Tuple.Create(info.FullName,remote)) + .ToList(); - //Queue all the actions - foreach (var message in distinctActions) + var states = StatusKeeper.GetAllStates(); + + var tupleBuilder = new TupleBuilder(CancellationToken,StatusKeeper,StatusNotification,Settings); + + var tuples = tupleBuilder.MergeSources(infos, files, states,moves).ToList(); + + var processedPaths = new HashSet(); + //Process only the changes in the batch file, if one exists + var stateTuples = accountBatch==null?tuples:tuples.Where(t => accountBatch.Contains(t.FilePath)); + foreach (var tuple in stateTuples.Where(s=>!s.Locked)) + { + await _unPauseEvent.WaitAsync().ConfigureAwait(false); + + //In case of cancellation, retry for the current date + if (token.IsCancellationRequested) return since; + + //Set the Merkle Hash + //SetMerkleHash(accountInfo, tuple); + + await SyncSingleItem(accountInfo, tuple, agent, moves,processedPaths,token).ConfigureAwait(false); + + } + + + //On the first run +/* + if (_firstPoll) { - NetworkAgent.Post(message); + MarkSuspectedDeletes(accountInfo, cleanRemotes); } +*/ + Log.Info("[LISTENER] End Processing"); } } catch (Exception ex) { - Log.ErrorFormat("[FAIL] ListObjects for{0} in ProcessRemoteFiles with {1}", accountInfo.UserName, ex); + Log.ErrorFormat("[FAIL] ListObjects for {0} in ProcessRemoteFiles with {1}", accountInfo.UserName, ex); return nextSince; } @@ -307,255 +506,533 @@ namespace Pithos.Core.Agents return nextSince; } } +/* - /// - /// Returns the latest LastModified date from the list of objects, but only if it is before - /// than the threshold value - /// - /// - /// - /// - private static DateTime? GetLatestDateBefore(DateTime? threshold, IList cloudObjects) + private static void SetMerkleHash(AccountInfo accountInfo, StateTuple tuple) { - DateTime? maxDate = null; - if (cloudObjects!=null && cloudObjects.Count > 0) - maxDate = cloudObjects.Max(obj => obj.Last_Modified); - if (maxDate == null || maxDate == DateTime.MinValue) - return threshold; - if (threshold == null || threshold == DateTime.MinValue || threshold > maxDate) - return maxDate; - return threshold; + //The Merkle hash for directories is that of an empty buffer + if (tuple.FileInfo is DirectoryInfo) + tuple.C = MERKLE_EMPTY; + else if (tuple.FileState != null && tuple.MD5 == tuple.FileState.ETag) + { + //If there is a state whose MD5 matches, load the merkle hash from the file state + //insteaf of calculating it + tuple.C = tuple.FileState.Checksum; + } + else + { + tuple.Merkle = Signature.CalculateTreeHashAsync((FileInfo)tuple.FileInfo, accountInfo.BlockSize, accountInfo.BlockHash,1,progress); + //tuple.C=tuple.Merkle.TopHash.ToHashString(); + } + } +*/ + + private IEnumerable LoadLocalFileTuples(AccountInfo accountInfo,IEnumerable batch ) + { + using (ThreadContext.Stacks["Account Files Hashing"].Push(accountInfo.UserName)) + { + var batchPaths = (batch==null)?new List():batch.ToList(); + IEnumerable localInfos=AgentLocator.Get(accountInfo.AccountPath) + .EnumerateFileSystemInfos(); + if (batchPaths.Count>0) + localInfos= localInfos.Where(fi => batchPaths.Contains(fi.FullName)); + + return localInfos; + } } /// - /// Returns the latest LastModified date from the list of objects, but only if it is after - /// the threshold value + /// Wait and Pause the agent while waiting /// - /// - /// + /// /// - private static DateTime? GetLatestDateAfter(DateTime? threshold, IList cloudObjects) + private async Task PauseFor(int backoff) { - DateTime? maxDate = null; - if (cloudObjects!=null && cloudObjects.Count > 0) - maxDate = cloudObjects.Max(obj => obj.Last_Modified); - if (maxDate == null || maxDate == DateTime.MinValue) - return threshold; - if (threshold == null || threshold == DateTime.MinValue || threshold < maxDate) - return maxDate; - return threshold; - } - readonly AccountsDifferencer _differencer = new AccountsDifferencer(); - private List _selectiveUris=new List(); + Pause = true; + await TaskEx.Delay(backoff).ConfigureAwait(false); + Pause = false; + } - /// - /// Deletes local files that are not found in the list of cloud files - /// - /// - /// - private void ProcessDeletedFiles(AccountInfo accountInfo, IEnumerable cloudFiles) + private async Task SyncSingleItem(AccountInfo accountInfo, StateTuple tuple, FileAgent agent, ConcurrentDictionary moves,HashSet processedPaths, CancellationToken token) { - if (accountInfo == null) - throw new ArgumentNullException("accountInfo"); - if (String.IsNullOrWhiteSpace(accountInfo.AccountPath)) - throw new ArgumentException("The AccountInfo.AccountPath is empty", "accountInfo"); - if (cloudFiles == null) - throw new ArgumentNullException("cloudFiles"); - Contract.EndContractBlock(); + Log.DebugFormat("Sync [{0}] C:[{1}] L:[{2}] S:[{3}]", tuple.FilePath, tuple.C, tuple.L, tuple.S); - //On the first run - if (_firstPoll) + //If the processed paths already contain the current path, exit + if (!processedPaths.Add(tuple.FilePath)) + return; + + try { - //Only consider files that are not being modified, ie they are in the Unchanged state - var deleteCandidates = FileState.Queryable.Where(state => - state.FilePath.StartsWith(accountInfo.AccountPath) - && state.FileStatus == FileStatus.Unchanged).ToList(); + bool isInferredParent = tuple.ObjectInfo != null && tuple.ObjectInfo.UUID.StartsWith("00000000-0000-0000"); + var localFilePath = tuple.FilePath; + //Don't use the tuple info, it may have been deleted + var localInfo = FileInfoExtensions.FromPath(localFilePath); - //TODO: filesToDelete must take into account the Others container - var filesToDelete = (from deleteCandidate in deleteCandidates - let localFile = FileInfoExtensions.FromPath(deleteCandidate.FilePath) - let relativeFilePath = localFile.AsRelativeTo(accountInfo.AccountPath) - where - !cloudFiles.Any(r => r.RelativeUrlToFilePath(accountInfo.UserName) == relativeFilePath) - select localFile).ToList(); + var isUnselectedRootFolder = agent.IsUnselectedRootFolder(tuple.FilePath); + //Unselected root folders that have not yet been uploaded should be uploaded and added to the + //selective folders - //Set the status of missing files to Conflict - foreach (var item in filesToDelete) + if (!Selectives.IsSelected(accountInfo, localFilePath) && + !(isUnselectedRootFolder && tuple.ObjectInfo == null)) + return; + + // Local file unchanged? If both C and L are null, make sure it's because + //both the file is missing and the state checksum is not missing + if (tuple.C == tuple.L /*&& (localInfo.Exists || tuple.FileState == null)*/) { - //Try to acquire a gate on the file, to take into account files that have been dequeued - //and are being processed - using (var gate = NetworkGate.Acquire(item.FullName, NetworkOperation.Deleting)) + //No local changes + //Server unchanged? + if (tuple.S == tuple.L) { - if (gate.Failed) - continue; - StatusKeeper.SetFileState(item.FullName, FileStatus.Conflict, FileOverlayStatus.Deleted); + // No server changes + //Has the file been renamed locally? + if (!await MoveForLocalMove(accountInfo,tuple)) + //Has the file been renamed on the server? + MoveForServerMove(accountInfo, tuple); } - } - UpdateStatus(PithosStatus.HasConflicts); - StatusNotification.NotifyConflicts(filesToDelete, String.Format("{0} local files are missing from Pithos, possibly because they were deleted", filesToDelete.Count)); - StatusNotification.NotifyForFiles(filesToDelete, String.Format("{0} files were deleted", filesToDelete.Count), TraceLevel.Info); - } - else - { - var deletedFiles = new List(); - foreach (var objectInfo in cloudFiles) - { - Log.DebugFormat("Handle deleted [{0}]",objectInfo.Uri); - var relativePath = objectInfo.RelativeUrlToFilePath(accountInfo.UserName); - var item = FileAgent.GetFileAgent(accountInfo).GetFileSystemInfo(relativePath); - Log.DebugFormat("Will delete [{0}] for [{1}]", item.FullName,objectInfo.Uri); - if (item.Exists) + else { - if ((item.Attributes & FileAttributes.ReadOnly) == FileAttributes.ReadOnly) + //Different from server + //Does the server file exist? + if (tuple.S == null) + { + //Server file doesn't exist + //deleteObjectFromLocal() + using ( + StatusNotification.GetNotifier("Deleting local {0}", "Deleted local {0}",true, + localInfo.Name)) + { + DeleteLocalFile(agent, localFilePath); + } + } + else { - item.Attributes = item.Attributes & ~FileAttributes.ReadOnly; + //Server file exists + //downloadServerObject() // Result: L = S + //If the file has moved on the server, move it locally before downloading + using ( + StatusNotification.GetNotifier("Downloading {0}", "Downloaded {0}",true, + localInfo.Name)) + { + var targetPath = MoveForServerMove(accountInfo, tuple); + if (targetPath != null) + { + + await DownloadCloudFile(accountInfo, tuple, token, targetPath).ConfigureAwait(false); + + AddOwnFolderToSelectives(accountInfo, tuple, targetPath); + } + } + } + } + } + else + { + + //Local changes found + //Server unchanged? + if (tuple.S == tuple.L) + { + //The FileAgent selective sync checks for new root folder files + if (!agent.Ignore(localFilePath)) + { + if ((tuple.C == null || !localInfo.Exists) && tuple.ObjectInfo != null) + { + //deleteObjectFromServer() + DeleteCloudFile(accountInfo, tuple); + //updateRecord( Remove L, S) + } + else + { + //uploadLocalObject() // Result: S = C, L = S + var progress = new Progress(d => + StatusNotification.Notify(new StatusNotification(String.Format("Merkle Hashing for Upload {0:p} of {1}", d.Percentage, localInfo.Name)))); + + //Is it an unselected root folder + var isCreation = isUnselectedRootFolder ||//or a new folder under a selected parent? + (localInfo is DirectoryInfo && Selectives.IsSelected(accountInfo, localInfo) && tuple.FileState == null && tuple.ObjectInfo == null); + + + //Is this a result of a FILE move with no modifications? Then try to move it, + //to avoid an expensive hash + if (!await MoveForLocalMove(accountInfo, tuple)) + { + await UploadLocalFile(accountInfo, tuple, token, isCreation, localInfo,processedPaths, progress).ConfigureAwait(false); + } + + //updateRecord( S = C ) + //State updated by the uploader + + if (isCreation ) + { + ProcessChildren(accountInfo, tuple, agent, moves,processedPaths,token); + } + } + } + } + else + { + if (tuple.C == tuple.S) + { + // (Identical Changes) Result: L = S + //doNothing() + + //Don't update anything for nonexistend server files + if (tuple.S != null) + { + //Detect server moves + var targetPath = MoveForServerMove(accountInfo, tuple); + if (targetPath != null) + { + Debug.Assert(tuple.Merkle != null); + StatusKeeper.StoreInfo(targetPath, tuple.ObjectInfo, tuple.Merkle); + + AddOwnFolderToSelectives(accountInfo, tuple, targetPath); + } + } + else + { + //At this point, C==S==NULL and we have a stale state (L) + //Log the stale tuple for investigation + Log.WarnFormat("Stale tuple detected FilePathPath:[{0}], State:[{1}], LocalFile:[{2}]", tuple.FilePath, tuple.FileState, tuple.FileInfo); + + //And remove it + if (!String.IsNullOrWhiteSpace(tuple.FilePath)) + StatusKeeper.ClearFileStatus(tuple.FilePath); + } } - var directory = item as DirectoryInfo; - if (directory!=null) - directory.Delete(true); else - item.Delete(); - Log.DebugFormat("Deleted [{0}] for [{1}]", item.FullName, objectInfo.Uri); - DateTime lastDate; - _lastSeen.TryRemove(item.FullName, out lastDate); - deletedFiles.Add(item); + { + if ((tuple.C == null || !localInfo.Exists) && tuple.ObjectInfo != null) + { + //deleteObjectFromServer() + DeleteCloudFile(accountInfo, tuple); + //updateRecord(Remove L, S) + } + //If both the local and server files are missing, the state is stale + else if (!localInfo.Exists && (tuple.S == null || tuple.ObjectInfo == null)) + { + StatusKeeper.ClearFileStatus(localInfo.FullName); + } + else + { + ReportConflictForMismatch(localFilePath); + //identifyAsConflict() // Manual action required + } + } } - StatusKeeper.SetFileState(item.FullName, FileStatus.Deleted, FileOverlayStatus.Deleted); } - Log.InfoFormat("[{0}] files were deleted",deletedFiles.Count); - StatusNotification.NotifyForFiles(deletedFiles, String.Format("{0} files were deleted", deletedFiles.Count), TraceLevel.Info); } + catch (Exception exc) + { + //In case of error log and retry with the next poll + Log.ErrorFormat("[SYNC] Failed for file {0}. Will Retry.\r\n{1}",tuple.FilePath,exc); + } + } + private void DeleteLocalFile(FileAgent agent, string localFilePath) + { + StatusKeeper.SetFileState(localFilePath, FileStatus.Deleted, + FileOverlayStatus.Deleted, ""); + using (NetworkGate.Acquire(localFilePath, NetworkOperation.Deleting)) + { + agent.Delete(localFilePath); + } + //updateRecord(Remove C, L) + StatusKeeper.ClearFileStatus(localFilePath); } - /// - /// Creates a Sync action for each changed server file - /// - /// - /// - /// - private IEnumerable ChangesToActions(AccountInfo accountInfo, IEnumerable changes) + private async Task DownloadCloudFile(AccountInfo accountInfo, StateTuple tuple, CancellationToken token, string targetPath) + { + //Don't create a new state for non-existent files + if (File.Exists(targetPath) || Directory.Exists(targetPath)) + StatusKeeper.SetFileState(targetPath, FileStatus.Modified, FileOverlayStatus.Modified,""); + + var finalHash=await + NetworkAgent.Downloader.DownloadCloudFile(accountInfo, tuple.ObjectInfo, targetPath, + token) + .ConfigureAwait(false); + //updateRecord( L = S ) + StatusKeeper.UpdateFileChecksum(targetPath, tuple.ObjectInfo.ETag, + finalHash); + + StatusKeeper.StoreInfo(targetPath, tuple.ObjectInfo,finalHash); + } + + private async Task UploadLocalFile(AccountInfo accountInfo, StateTuple tuple, CancellationToken token, + bool isUnselectedRootFolder, FileSystemInfo localInfo, HashSet processedPaths, IProgress progress) { - if (changes == null) - throw new ArgumentNullException(); - Contract.EndContractBlock(); - var fileAgent = FileAgent.GetFileAgent(accountInfo); + var action = new CloudUploadAction(accountInfo, localInfo, tuple.FileState, + accountInfo.BlockSize, accountInfo.BlockHash, + "Poll", isUnselectedRootFolder, token, progress,tuple.Merkle); - //In order to avoid multiple iterations over the files, we iterate only once - //over the remote files - foreach (var objectInfo in changes) + using (StatusNotification.GetNotifier("Uploading {0}", "Uploaded {0}",true, + localInfo.Name)) { - var relativePath = objectInfo.RelativeUrlToFilePath(accountInfo.UserName); - //If a directory object already exists, we may need to sync it - if (fileAgent.Exists(relativePath)) + await NetworkAgent.Uploader.UploadCloudFile(action, token).ConfigureAwait(false); + } + + if (isUnselectedRootFolder) + { + var dirActions =( + from dir in ((DirectoryInfo) localInfo).EnumerateDirectories("*", SearchOption.AllDirectories) + let subAction = new CloudUploadAction(accountInfo, dir, null, + accountInfo.BlockSize, accountInfo.BlockHash, + "Poll", true, token, progress) + select subAction).ToList(); + foreach (var dirAction in dirActions) { - var localFile = fileAgent.GetFileSystemInfo(relativePath); - //We don't need to sync directories - if (objectInfo.Content_Type == @"application/directory" && localFile is DirectoryInfo) - continue; - using (new SessionScope(FlushAction.Never)) - { - var state = StatusKeeper.GetStateByFilePath(localFile.FullName); - _lastSeen[localFile.FullName] = DateTime.Now; - //Common files should be checked on a per-case basis to detect differences, which is newer + processedPaths.Add(dirAction.LocalFile.FullName); + } + + await TaskEx.WhenAll(dirActions.Select(a=>NetworkAgent.Uploader.UploadCloudFile(a,token)).ToArray()); + } + } - yield return new CloudAction(accountInfo, CloudActionType.MustSynch, - localFile, objectInfo, state, accountInfo.BlockSize, - accountInfo.BlockHash); - } + private async Task MoveForLocalMove(AccountInfo accountInfo, StateTuple tuple) + { + //Is the previous path missing? + if (String.IsNullOrWhiteSpace(tuple.OldFullPath)) + return false; + //Has the file locally, in which case it should be uploaded rather than moved? + if (tuple.OldChecksum != tuple.Merkle.TopHash.ToHashString()) + return false; + + var relativePath = tuple.ObjectInfo.RelativeUrlToFilePath(accountInfo.UserName); + var serverPath = Path.Combine(accountInfo.AccountPath, relativePath); + //Has the file been renamed on the server? + if (!tuple.OldFullPath.Equals(serverPath)) + { + ReportConflictForDoubleRename(tuple.FilePath); + return false; + } + + try + { + + var client = new CloudFilesClient(accountInfo); + var objectInfo = CloudAction.CreateObjectInfoFor(accountInfo, tuple.FileInfo); + objectInfo.X_Object_Hash = tuple.Merkle.TopHash.ToHashString(); + var containerPath = Path.Combine(accountInfo.AccountPath, objectInfo.Container.ToUnescapedString()); + //TODO: SImplify these multiple conversions from and to Uris + var oldName = tuple.OldFullPath.AsRelativeTo(containerPath); + //Then execute a move instead of an upload + using (StatusNotification.GetNotifier("Moving {0}", "Moved {0}", true,tuple.FileInfo.Name)) + { + await client.MoveObject(objectInfo.Account, objectInfo.Container, oldName.Replace('\\','/').ToEscapedUri(), + objectInfo.Container, objectInfo.Name).ConfigureAwait(false); + StatusKeeper.MoveFileState(tuple.OldFullPath, tuple.FilePath, objectInfo, tuple.Merkle); + //StatusKeeper.StoreInfo(tuple.FilePath,objectInfo,tuple.Merkle); + //StatusKeeper.ClearFolderStatus(tuple.FilePath); } - else + return true; + } + catch (Exception exc) + { + Log.ErrorFormat("[MOVE] Failed for [{0}],:\r\n{1}", tuple.FilePath, exc); + //Return false to force an upload of the file + return false; + } + + } + + private void AddOwnFolderToSelectives(AccountInfo accountInfo, StateTuple tuple, string targetPath) + { + //Not for shared folders + if (tuple.ObjectInfo.IsShared==true) + return; + //Also ensure that any newly created folders are added to the selectives, if the original folder was selected + var containerPath = Path.Combine(accountInfo.AccountPath, tuple.ObjectInfo.Container.ToUnescapedString()); + + //If this is a root folder encountered for the first time + if (tuple.L == null && Directory.Exists(tuple.FileInfo.FullName) + && (tuple.FileInfo.FullName.IsAtOrBelow(containerPath))) + { + + var relativePath = tuple.ObjectInfo.RelativeUrlToFilePath(accountInfo.UserName); + var initialPath = Path.Combine(accountInfo.AccountPath, relativePath); + + //var hasMoved = true;// !initialPath.Equals(targetPath); + //If the new path is under a selected folder, add it to the selectives as well + if (Selectives.IsSelected(accountInfo, initialPath)) { - //Remote files should be downloaded - yield return new CloudDownloadAction(accountInfo, objectInfo); + Selectives.AddUri(accountInfo, tuple.ObjectInfo.Uri); + Selectives.Save(accountInfo); } } } - /// - /// Creates a Local Move action for each moved server file - /// - /// - /// - /// - private IEnumerable MovesToActions(AccountInfo accountInfo, IEnumerable moves) + private string MoveForServerMove(AccountInfo accountInfo, StateTuple tuple) { - if (moves == null) - throw new ArgumentNullException(); - Contract.EndContractBlock(); - var fileAgent = FileAgent.GetFileAgent(accountInfo); + if (tuple.ObjectInfo == null) + return null; + var relativePath = tuple.ObjectInfo.RelativeUrlToFilePath(accountInfo.UserName); + var serverPath = Path.Combine(accountInfo.AccountPath, relativePath); + + //Compare Case Insensitive + if (String.Equals(tuple.FilePath ,serverPath,StringComparison.InvariantCultureIgnoreCase)) + return serverPath; - //In order to avoid multiple iterations over the files, we iterate only once - //over the remote files - foreach (var objectInfo in moves) + //Has the file been renamed locally? + if (!String.IsNullOrWhiteSpace(tuple.OldFullPath) && !tuple.OldFullPath.Equals(tuple.FilePath)) { - var previousRelativepath = objectInfo.Previous.RelativeUrlToFilePath(accountInfo.UserName); - //If the previous file already exists, we can execute a Move operation - if (fileAgent.Exists(previousRelativepath)) + ReportConflictForDoubleRename(tuple.FilePath); + return null; + } + + tuple.FileInfo.Refresh(); + //The file/folder may not exist if it was moved because its parent moved + if (!tuple.FileInfo.Exists) + { + var target=FileInfoExtensions.FromPath(serverPath); + if (!target.Exists) { - var previousFile = fileAgent.GetFileSystemInfo(previousRelativepath); - using (new SessionScope(FlushAction.Never)) - { - var state = StatusKeeper.GetStateByFilePath(previousFile.FullName); - _lastSeen[previousFile.FullName] = DateTime.Now; - - //For each moved object we need to move both the local file and update - yield return new CloudAction(accountInfo, CloudActionType.RenameLocal, - previousFile, objectInfo, state, accountInfo.BlockSize, - accountInfo.BlockHash); - //For modified files, we need to download the changes as well - if (objectInfo.Hash!=objectInfo.PreviousHash) - yield return new CloudDownloadAction(accountInfo,objectInfo); - } + Log.ErrorFormat("No source or target found while trying to move {0} to {1}", tuple.FileInfo.FullName, serverPath); } - //If the previous file does not exist, we need to download it in the new location - else + return serverPath; + } + + using (StatusNotification.GetNotifier("Moving local {0}", "Moved local {0}", true,Path.GetFileName(tuple.FilePath))) + using(NetworkGate.Acquire(tuple.FilePath,NetworkOperation.Renaming)) + { + + var fi = tuple.FileInfo as FileInfo; + if (fi != null) + { + var targetFile = new FileInfo(serverPath); + if (!targetFile.Directory.Exists) + targetFile.Directory.Create(); + fi.MoveTo(serverPath); + } + var di = tuple.FileInfo as DirectoryInfo; + if (di != null) { - //Remote files should be downloaded - yield return new CloudDownloadAction(accountInfo, objectInfo); + var targetDir = new DirectoryInfo(serverPath); + if (!targetDir.Parent.Exists) + targetDir.Parent.Create(); + di.MoveTo(serverPath); } } + + StatusKeeper.StoreInfo(serverPath, tuple.ObjectInfo); + + return serverPath; + } + + private void DeleteCloudFile(AccountInfo accountInfo, StateTuple tuple) + { + using (StatusNotification.GetNotifier("Deleting server {0}", "Deleted server {0}", true,Path.GetFileName(tuple.FilePath))) + { + + StatusKeeper.SetFileState(tuple.FilePath, FileStatus.Deleted, + FileOverlayStatus.Deleted, ""); + NetworkAgent.DeleteAgent.DeleteCloudFile(accountInfo, tuple.ObjectInfo); + StatusKeeper.ClearFileStatus(tuple.FilePath); + } + } + + private void ProcessChildren(AccountInfo accountInfo, StateTuple tuple, FileAgent agent, ConcurrentDictionary moves,HashSet processedPaths,CancellationToken token) + { + + var dirInfo = tuple.FileInfo as DirectoryInfo; + var folderTuples = from folder in dirInfo.EnumerateDirectories("*", SearchOption.AllDirectories) + select new StateTuple(folder){C=Signature.MERKLE_EMPTY}; + + var fileTuples = from file in dirInfo.EnumerateFiles("*", SearchOption.AllDirectories) + let state=StatusKeeper.GetStateByFilePath(file.FullName) + select new StateTuple(file){ + Merkle=StatusAgent.CalculateTreeHash(file,accountInfo,state, + Settings.HashingParallelism,token,null) + }; + + //Process folders first, to ensure folders appear on the sever as soon as possible + folderTuples.ApplyAction(t => SyncSingleItem(accountInfo, t, agent, moves, processedPaths,token).Wait()); + + fileTuples.ApplyAction(t => SyncSingleItem(accountInfo, t, agent, moves,processedPaths, token).Wait()); } + + + + + /// - /// Creates a download action for each new server file + /// Returns the latest LastModified date from the list of objects, but only if it is before + /// than the threshold value + /// + /// + /// + /// + private static DateTimeOffset? GetLatestDateBefore(DateTime? threshold, IList cloudObjects) + { + DateTimeOffset? maxDate = null; + if (cloudObjects!=null && cloudObjects.Count > 0) + maxDate = cloudObjects.Max(obj => obj.Last_Modified); + if (!maxDate.HasValue) + return threshold; + if (!threshold.HasValue|| threshold > maxDate) + return maxDate; + return threshold; + } + + /// + /// Returns the latest LastModified date from the list of objects, but only if it is after + /// the threshold value /// - /// - /// + /// + /// /// - private IEnumerable CreatesToActions(AccountInfo accountInfo, IEnumerable creates) + private static DateTimeOffset? GetLatestDateAfter(DateTimeOffset? threshold, IList cloudObjects) + { + DateTimeOffset? maxDate = null; + if (cloudObjects!=null && cloudObjects.Count > 0) + maxDate = cloudObjects.Max(obj => obj.Last_Modified); + if (!maxDate.HasValue) + return threshold; + if (!threshold.HasValue|| threshold < maxDate) + return maxDate; + return threshold; + } + + readonly AccountsDifferencer _differencer = new AccountsDifferencer(); + private bool _pause; + + + + + private void ReportConflictForMismatch(string localFilePath) { - if (creates == null) - throw new ArgumentNullException(); + if (String.IsNullOrWhiteSpace(localFilePath)) + throw new ArgumentNullException("localFilePath"); Contract.EndContractBlock(); - var fileAgent = FileAgent.GetFileAgent(accountInfo); - //In order to avoid multiple iterations over the files, we iterate only once - //over the remote files - foreach (var objectInfo in creates) - { - var relativePath = objectInfo.RelativeUrlToFilePath(accountInfo.UserName); - //If the object already exists, we probably have a conflict - if (fileAgent.Exists(relativePath)) - { - //If a directory object already exists, we don't need to perform any other action - var localFile = fileAgent.GetFileSystemInfo(relativePath); - StatusKeeper.SetFileState(localFile.FullName, FileStatus.Conflict, FileOverlayStatus.Conflict); - } - else - { - //Remote files should be downloaded - yield return new CloudDownloadAction(accountInfo, objectInfo); - } - } + StatusKeeper.SetFileState(localFilePath, FileStatus.Conflict, FileOverlayStatus.Conflict, "File changed at the server"); + UpdateStatus(PithosStatus.HasConflicts); + var message = String.Format("Conflict detected for file {0}", localFilePath); + Log.Warn(message); + StatusNotification.NotifyChange(message, TraceLevel.Warning); } + private void ReportConflictForDoubleRename(string localFilePath) + { + if (String.IsNullOrWhiteSpace(localFilePath)) + throw new ArgumentNullException("localFilePath"); + Contract.EndContractBlock(); + + StatusKeeper.SetFileState(localFilePath, FileStatus.Conflict, FileOverlayStatus.Conflict, "File renamed both locally and on the server"); + UpdateStatus(PithosStatus.HasConflicts); + var message = String.Format("Double rename conflict detected for file {0}", localFilePath); + Log.Warn(message); + StatusNotification.NotifyChange(message, TraceLevel.Warning); + } + + /// /// Notify the UI to update the visual status /// @@ -577,8 +1054,8 @@ namespace Pithos.Core.Agents private static void CreateContainerFolders(AccountInfo accountInfo, IEnumerable containers) { var containerPaths = from container in containers - let containerPath = Path.Combine(accountInfo.AccountPath, container.Name) - where container.Name != FolderConstants.TrashContainer && !Directory.Exists(containerPath) + let containerPath = Path.Combine(accountInfo.AccountPath, container.Name.ToUnescapedString()) + where container.Name.ToString() != FolderConstants.TrashContainer && !Directory.Exists(containerPath) select containerPath; foreach (var path in containerPaths) @@ -587,27 +1064,23 @@ namespace Pithos.Core.Agents } } - public void SetSyncUris(Uri[] uris) - { - SelectiveUris=uris.ToList(); - } - - protected List SelectiveUris - { - get { return _selectiveUris;} - set { _selectiveUris = value; } - } - public void AddAccount(AccountInfo accountInfo) { //Avoid adding a duplicate accountInfo - _accounts.TryAdd(accountInfo.UserName, accountInfo); + _accounts.TryAdd(accountInfo.AccountKey, accountInfo); } public void RemoveAccount(AccountInfo accountInfo) { + if (accountInfo == null) + return; + AccountInfo account; - _accounts.TryRemove(accountInfo.UserName,out account); + _accounts.TryRemove(accountInfo.AccountKey, out account); + + SnapshotDifferencer differencer; + _differencer.Differencers.TryRemove(accountInfo.AccountKey, out differencer); } + } }