X-Git-Url: https://code.grnet.gr/git/pithos-ms-client/blobdiff_plain/759bd3c4ac30f60c4af5dcadf0ed4dded58f9535..88e2300a132f903fe892721360f12ec0ebb0824a:/trunk/Pithos.Core/Agents/PollAgent.cs diff --git a/trunk/Pithos.Core/Agents/PollAgent.cs b/trunk/Pithos.Core/Agents/PollAgent.cs index b1e1d0e..6d5d89b 100644 --- a/trunk/Pithos.Core/Agents/PollAgent.cs +++ b/trunk/Pithos.Core/Agents/PollAgent.cs @@ -45,10 +45,9 @@ using System.ComponentModel.Composition; using System.Diagnostics; using System.Diagnostics.Contracts; using System.IO; +using System.Reflection; using System.Threading; using System.Threading.Tasks; -using System.Threading.Tasks.Dataflow; -using Castle.ActiveRecord; using Pithos.Interfaces; using Pithos.Network; using log4net; @@ -58,7 +57,13 @@ namespace Pithos.Core.Agents using System; using System.Collections.Generic; using System.Linq; - using System.Text; + + /*public class PollRequest + { + public DateTime? Since { get; set; } + public IEnumerable Batch { get; set; } + }*/ + /// /// PollAgent periodically polls the server to detect object changes. The agent retrieves a listing of all @@ -69,7 +74,7 @@ namespace Pithos.Core.Agents [Export] public class PollAgent { - private static readonly ILog Log = LogManager.GetLogger("PollAgent"); + private static readonly ILog Log = LogManager.GetLogger(MethodBase.GetCurrentMethod().DeclaringType); [System.ComponentModel.Composition.Import] public IStatusKeeper StatusKeeper { get; set; } @@ -80,51 +85,185 @@ namespace Pithos.Core.Agents [System.ComponentModel.Composition.Import] public NetworkAgent NetworkAgent { get; set; } + [System.ComponentModel.Composition.Import] + public Selectives Selectives { get; set; } + public IStatusNotification StatusNotification { get; set; } + private CancellationTokenSource _currentOperationCancellation = new CancellationTokenSource(); + + public void CancelCurrentOperation() + { + //What does it mean to cancel the current upload/download? + //Obviously, the current operation will be cancelled by throwing + //a cancellation exception. + // + //The default behavior is to retry any operations that throw. + //Obviously this is not what we want in this situation. + //The cancelled operation should NOT bea retried. + // + //This can be done by catching the cancellation exception + //and avoiding the retry. + // + + //Have to reset the cancellation source - it is not possible to reset the source + //Have to prevent a case where an operation requests a token from the old source + var oldSource = Interlocked.Exchange(ref _currentOperationCancellation, new CancellationTokenSource()); + oldSource.Cancel(); + + } + + public bool Pause + { + get { + return _pause; + } + set { + _pause = value; + if (!_pause) + _unPauseEvent.Set(); + else + { + _unPauseEvent.Reset(); + } + } + } + + public CancellationToken CancellationToken + { + get { return _currentOperationCancellation.Token; } + } + private bool _firstPoll = true; //The Sync Event signals a manual synchronisation private readonly AsyncManualResetEvent _syncEvent = new AsyncManualResetEvent(); - private ConcurrentDictionary _lastSeen = new ConcurrentDictionary(); - private readonly ConcurrentBag _accounts = new ConcurrentBag(); + private readonly AsyncManualResetEvent _unPauseEvent = new AsyncManualResetEvent(true); + + private readonly ConcurrentDictionary _lastSeen = new ConcurrentDictionary(); + private readonly ConcurrentDictionary _accounts = new ConcurrentDictionary(); + + //private readonly ActionBlock _pollAction; + readonly HashSet _knownContainers = new HashSet(); + /// /// Start a manual synchronization /// - public void SynchNow() - { - _syncEvent.Set(); + public void SynchNow(IEnumerable paths=null) + { + _batchQueue.Enqueue(paths); + _syncEvent.SetAsync(); + + //_pollAction.Post(new PollRequest {Batch = paths}); } - //Remote files are polled periodically. Any changes are processed - public async Task PollRemoteFiles(DateTime? since = null) + /// + /// Start a manual synchronization + /// + public Task SynchNowAsync(IEnumerable paths=null) { + _batchQueue.Enqueue(paths); + return _syncEvent.SetAsync(); + + //_pollAction.Post(new PollRequest {Batch = paths}); + } + + readonly ConcurrentQueue> _batchQueue=new ConcurrentQueue>(); + + ConcurrentDictionary _moves=new ConcurrentDictionary(); + + public void PostMove(MovedEventArgs args) + { + TaskEx.Run(() => _moves.AddOrUpdate(args.OldFullPath, args,(s,e)=>e)); + } + + + private bool _hasConnection; + + /// + /// Remote files are polled periodically. Any changes are processed + /// + /// + /// + public async Task PollRemoteFiles(DateTimeOffset? since = null) + { + if (Log.IsDebugEnabled) + Log.DebugFormat("Polling changes after [{0}]",since); + Debug.Assert(Thread.CurrentThread.IsBackground, "Polling Ended up in the main thread!"); - UpdateStatus(PithosStatus.Syncing); - StatusNotification.Notify(new PollNotification()); + //GC.Collect(); - using (log4net.ThreadContext.Stacks["Retrieve Remote"].Push("All accounts")) + + using (ThreadContext.Stacks["Retrieve Remote"].Push("All accounts")) { //If this poll fails, we will retry with the same since value - var nextSince = since; + DateTimeOffset? nextSince = since; try { - //Next time we will check for all changes since the current check minus 1 second - //This is done to ensure there are no discrepancies due to clock differences - var current = DateTime.Now.AddSeconds(-1); + _unPauseEvent.Wait(); + UpdateStatus(PithosStatus.PollSyncing); - var tasks = from accountInfo in _accounts - select ProcessAccountFiles(accountInfo, since); + if (!NetworkAgent.IsConnectedToInternet) + { + if (_hasConnection) + { + StatusNotification.Notify(new Notification + { + Level = TraceLevel.Error, + Title = "Internet Connection problem", + Message ="Internet connectivity was lost. Synchronization will continue when connectivity is restored" + }); + } + _hasConnection = false; + } + else + { + if (!_hasConnection) + { + StatusNotification.Notify(new Notification + { + Level = TraceLevel.Info, + Title = "Internet Connection", + Message = "Internet connectivity restored." + }); + } + _hasConnection = true; + + var accountBatches = new Dictionary>(); + IEnumerable batch = null; + if (_batchQueue.TryDequeue(out batch) && batch != null) + foreach (var account in _accounts.Values) + { + var accountBatch = batch.Where(path => path.IsAtOrBelow(account.AccountPath)); + accountBatches[account.AccountKey] = accountBatch; + } - await TaskEx.WhenAll(tasks.ToList()); + var moves = Interlocked.Exchange(ref _moves, new ConcurrentDictionary()); - _firstPoll = false; - //Reschedule the poll with the current timestamp as a "since" value - nextSince = current; + var tasks = new List>(); + foreach (var accountInfo in _accounts.Values) + { + IEnumerable accountBatch; + accountBatches.TryGetValue(accountInfo.AccountKey, out accountBatch); + var t = ProcessAccountFiles(accountInfo, accountBatch, moves, since); + tasks.Add(t); + } + + var taskList = tasks.ToList(); + var nextTimes = await TaskEx.WhenAll(taskList).ConfigureAwait(false); + + _firstPoll = false; + //Reschedule the poll with the current timestamp as a "since" value + + if (nextTimes.Length > 0) + nextSince = nextTimes.Min(); + if (Log.IsDebugEnabled) + Log.DebugFormat("Next Poll for changes since [{0}]", nextSince); + } } catch (Exception ex) { @@ -132,12 +271,23 @@ namespace Pithos.Core.Agents //In case of failure retry with the same "since" value } - UpdateStatus(PithosStatus.InSynch); - //Wait for the polling interval to pass or the Sync event to be signalled - nextSince = await WaitForScheduledOrManualPoll(nextSince); - - TaskEx.Run(()=>PollRemoteFiles(nextSince)); - + UpdateStatus(PithosStatus.PollComplete); + //The multiple try blocks are required because we can't have an await call + //inside a finally block + //TODO: Find a more elegant solution for reschedulling in the event of an exception + try + { + //Wait for the polling interval to pass or the Sync event to be signalled + nextSince = await WaitForScheduledOrManualPoll(nextSince).ConfigureAwait(false); + } + finally + { + //Ensure polling is scheduled even in case of error +#pragma warning disable 4014 + TaskEx.Run(()=>PollRemoteFiles(nextSince)); +#pragma warning restore 4014 + //_pollAction.Post(new PollRequest {Since = nextSince}); + } } } @@ -146,27 +296,35 @@ namespace Pithos.Core.Agents /// /// /// - private async Task WaitForScheduledOrManualPoll(DateTime? since) + private async Task WaitForScheduledOrManualPoll(DateTimeOffset? since) { var sync = _syncEvent.WaitAsync(); - var wait = TaskEx.Delay(TimeSpan.FromSeconds(Settings.PollingInterval), NetworkAgent.CancellationToken); - var signaledTask = await TaskEx.WhenAny(sync, wait); - + var delay = TimeSpan.FromSeconds(Settings.PollingInterval); + if (Log.IsDebugEnabled) + Log.DebugFormat("Next Poll at [{0}]", DateTime.Now.Add(delay)); + var wait = TaskEx.Delay(delay); + + var signaledTask = await TaskEx.WhenAny(sync, wait).ConfigureAwait(false); + + //Pausing takes precedence over manual sync or awaiting + _unPauseEvent.Wait(); + //Wait for network processing to finish before polling var pauseTask=NetworkAgent.ProceedEvent.WaitAsync(); - await TaskEx.WhenAll(signaledTask, pauseTask); + await TaskEx.WhenAll(signaledTask, pauseTask).ConfigureAwait(false); //If polling is signalled by SynchNow, ignore the since tag if (sync.IsCompleted) - { - //TODO: Must convert to AutoReset + { _syncEvent.Reset(); return null; } return since; } - public async Task ProcessAccountFiles(AccountInfo accountInfo, DateTime? since = null) + + + public async Task ProcessAccountFiles(AccountInfo accountInfo, IEnumerable accountBatch, ConcurrentDictionary moves, DateTimeOffset? since = null) { if (accountInfo == null) throw new ArgumentNullException("accountInfo"); @@ -175,47 +333,71 @@ namespace Pithos.Core.Agents Contract.EndContractBlock(); - using (log4net.ThreadContext.Stacks["Retrieve Remote"].Push(accountInfo.UserName)) + using (ThreadContext.Stacks["Retrieve Remote"].Push(accountInfo.UserName)) { - await NetworkAgent.GetDeleteAwaiter(); + + await NetworkAgent.GetDeleteAwaiter().ConfigureAwait(false); Log.Info("Scheduled"); var client = new CloudFilesClient(accountInfo); - var containers = client.ListContainers(accountInfo.UserName); + //We don't need to check the trash container + var allContainers=await client.ListContainers(accountInfo.UserName).ConfigureAwait(false); + var containers = allContainers + .Where(c=>c.Name.ToString()!="trash") + .ToList(); CreateContainerFolders(accountInfo, containers); + //The nextSince time fallback time is the same as the current. + //If polling succeeds, the next Since time will be the smallest of the maximum modification times + //of the shared and account objects + DateTimeOffset? nextSince = since; + try { //Wait for any deletions to finish - await NetworkAgent.GetDeleteAwaiter(); + await NetworkAgent.GetDeleteAwaiter().ConfigureAwait(false); //Get the poll time now. We may miss some deletions but it's better to keep a file that was deleted //than delete a file that was created while we were executing the poll - var pollTime = DateTime.Now; + + var token = _currentOperationCancellation.Token; //Get the list of server objects changed since the last check //The name of the container is passed as state in order to create a dictionary of tasks in a subsequent step var listObjects = (from container in containers select Task>.Factory.StartNew(_ => - client.ListObjects(accountInfo.UserName, container.Name, since), container.Name)).ToList(); - - var listShared = Task>.Factory.StartNew(_ => client.ListSharedObjects(since), "shared"); + client.ListObjects(accountInfo.UserName, container.Name, since), container.Name,token)).ToList(); + + var selectiveEnabled = Selectives.IsSelectiveEnabled(accountInfo.AccountKey); + var listShared = selectiveEnabled? + Task>.Factory.StartNew(_ => + client.ListSharedObjects(_knownContainers,since), "shared",token) + :Task.Factory.FromResult((IList) new List(),"shared"); + listObjects.Add(listShared); - var listTasks = await Task.Factory.WhenAll(listObjects.ToArray()); + var listTasks = await Task.Factory.WhenAll(listObjects.ToArray()).ConfigureAwait(false); - using (log4net.ThreadContext.Stacks["SCHEDULE"].Push("Process Results")) + using (ThreadContext.Stacks["SCHEDULE"].Push("Process Results")) { + + //In case of cancellation, retry for the current date + if (token.IsCancellationRequested) return since; + var dict = listTasks.ToDictionary(t => t.AsyncState); //Get all non-trash objects. Remember, the container name is stored in AsyncState - var remoteObjects = from objectList in listTasks - where (string)objectList.AsyncState != "trash" + var remoteObjects = (from objectList in listTasks + where objectList.AsyncState.ToString() != "trash" from obj in objectList.Result - select obj; + orderby obj.Bytes ascending + select obj).ToList(); + + //Get the latest remote object modification date, only if it is after + //the original since date + nextSince = GetLatestDateAfter(nextSince, remoteObjects); - var trashObjects = dict["trash"].Result; var sharedObjects = dict["shared"].Result; //DON'T process trashed files @@ -230,226 +412,650 @@ namespace Pithos.Core.Agents where !remoteObjects.Any( info => info.Name == trash.Name && info.Hash == trash.Hash) - select trash; + 8 select trash; ProcessTrashedFiles(accountInfo, realTrash); */ var cleanRemotes = (from info in remoteObjects.Union(sharedObjects) - let name = info.Name + let name = info.Name.ToUnescapedString()??"" where !name.EndsWith(".ignore", StringComparison.InvariantCultureIgnoreCase) && !name.StartsWith(FolderConstants.CacheFolder + "/", StringComparison.InvariantCultureIgnoreCase) select info).ToList(); + //In case of cancellation, retry for the current date + if (token.IsCancellationRequested) return since; + + if (_firstPoll) + StatusKeeper.CleanupOrphanStates(); + var differencer = _differencer.PostSnapshot(accountInfo, cleanRemotes); + var currentRemotes = differencer.Current.ToList(); - ProcessDeletedFiles(accountInfo, differencer.Deleted.FilterBelow(SelectiveUris), pollTime); + //In case of cancellation, retry for the current date + if (token.IsCancellationRequested) return since; + + StatusKeeper.CleanupStaleStates(accountInfo, currentRemotes); - // @@@ NEED To add previous state here as well, To compare with previous hash + //var filterUris = Selectives.SelectiveUris[accountInfo.AccountKey]; - + //May have to wait if the FileAgent has asked for a Pause, due to local changes + await _unPauseEvent.WaitAsync().ConfigureAwait(false); + + //In case of cancellation, retry for the current date + if (token.IsCancellationRequested) return since; + + //Get the local files here + var agent = AgentLocator.Get(accountInfo.AccountPath); + var files = LoadLocalFileTuples(accountInfo, accountBatch); + + + + //WARNING: GetFileSystemInfo may create state entries. + //TODO: Find a different way to create the tuples and block long filenames + var infos = (from remote in currentRemotes + let path = remote.RelativeUrlToFilePath(accountInfo.UserName) + let info=agent.GetFileSystemInfo(path) + where info != null + select Tuple.Create(info.FullName,remote)) + .ToList(); + + var states = StatusKeeper.GetAllStates(); + + var tupleBuilder = new TupleBuilder(CancellationToken,StatusKeeper,StatusNotification,Settings); + + var tuples = tupleBuilder.MergeSources(infos, files, states,moves).ToList(); + + var processedPaths = new HashSet(); + //Process only the changes in the batch file, if one exists + var stateTuples = accountBatch==null?tuples:tuples.Where(t => accountBatch.Contains(t.FilePath)); + foreach (var tuple in stateTuples.Where(s=>!s.Locked)) + { + await _unPauseEvent.WaitAsync().ConfigureAwait(false); - //Create a list of actions from the remote files - var allActions = ChangesToActions(accountInfo, differencer.Changed.FilterBelow(SelectiveUris)) - .Union( - CreatesToActions(accountInfo, differencer.Created.FilterBelow(SelectiveUris))); + //In case of cancellation, retry for the current date + if (token.IsCancellationRequested) return since; - //And remove those that are already being processed by the agent - var distinctActions = allActions - .Except(NetworkAgent.GetEnumerable(), new PithosMonitor.LocalFileComparer()) - .ToList(); + //Set the Merkle Hash + //SetMerkleHash(accountInfo, tuple); - //Queue all the actions - foreach (var message in distinctActions) + await SyncSingleItem(accountInfo, tuple, agent, moves,processedPaths,token).ConfigureAwait(false); + + } + + + //On the first run +/* + if (_firstPoll) { - NetworkAgent.Post(message); + MarkSuspectedDeletes(accountInfo, cleanRemotes); } +*/ + Log.Info("[LISTENER] End Processing"); } } catch (Exception ex) { - Log.ErrorFormat("[FAIL] ListObjects for{0} in ProcessRemoteFiles with {1}", accountInfo.UserName, ex); - return; + Log.ErrorFormat("[FAIL] ListObjects for {0} in ProcessRemoteFiles with {1}", accountInfo.UserName, ex); + return nextSince; } Log.Info("[LISTENER] Finished"); + return nextSince; + } + } +/* + private static void SetMerkleHash(AccountInfo accountInfo, StateTuple tuple) + { + //The Merkle hash for directories is that of an empty buffer + if (tuple.FileInfo is DirectoryInfo) + tuple.C = MERKLE_EMPTY; + else if (tuple.FileState != null && tuple.MD5 == tuple.FileState.ETag) + { + //If there is a state whose MD5 matches, load the merkle hash from the file state + //insteaf of calculating it + tuple.C = tuple.FileState.Checksum; + } + else + { + tuple.Merkle = Signature.CalculateTreeHashAsync((FileInfo)tuple.FileInfo, accountInfo.BlockSize, accountInfo.BlockHash,1,progress); + //tuple.C=tuple.Merkle.TopHash.ToHashString(); } } +*/ - AccountsDifferencer _differencer = new AccountsDifferencer(); - private List _selectiveUris=new List(); + private IEnumerable LoadLocalFileTuples(AccountInfo accountInfo,IEnumerable batch ) + { + using (ThreadContext.Stacks["Account Files Hashing"].Push(accountInfo.UserName)) + { + var batchPaths = (batch==null)?new List():batch.ToList(); + IEnumerable localInfos=AgentLocator.Get(accountInfo.AccountPath) + .EnumerateFileSystemInfos(); + if (batchPaths.Count>0) + localInfos= localInfos.Where(fi => batchPaths.Contains(fi.FullName)); + + return localInfos; + } + } /// - /// Deletes local files that are not found in the list of cloud files + /// Wait and Pause the agent while waiting /// - /// - /// - /// - private void ProcessDeletedFiles(AccountInfo accountInfo, IEnumerable cloudFiles, DateTime pollTime) + /// + /// + private async Task PauseFor(int backoff) { - if (accountInfo == null) - throw new ArgumentNullException("accountInfo"); - if (String.IsNullOrWhiteSpace(accountInfo.AccountPath)) - throw new ArgumentException("The AccountInfo.AccountPath is empty", "accountInfo"); - if (cloudFiles == null) - throw new ArgumentNullException("cloudFiles"); - Contract.EndContractBlock(); - //On the first run - if (_firstPoll) + Pause = true; + await TaskEx.Delay(backoff).ConfigureAwait(false); + Pause = false; + } + + private async Task SyncSingleItem(AccountInfo accountInfo, StateTuple tuple, FileAgent agent, ConcurrentDictionary moves,HashSet processedPaths, CancellationToken token) + { + Log.DebugFormat("Sync [{0}] C:[{1}] L:[{2}] S:[{3}]", tuple.FilePath, tuple.C, tuple.L, tuple.S); + + //If the processed paths already contain the current path, exit + if (!processedPaths.Add(tuple.FilePath)) + return; + + try { - //Only consider files that are not being modified, ie they are in the Unchanged state - var deleteCandidates = FileState.Queryable.Where(state => - state.FilePath.StartsWith(accountInfo.AccountPath) - && state.FileStatus == FileStatus.Unchanged).ToList(); + bool isInferredParent = tuple.ObjectInfo != null && tuple.ObjectInfo.UUID.StartsWith("00000000-0000-0000"); + var localFilePath = tuple.FilePath; + //Don't use the tuple info, it may have been deleted + var localInfo = FileInfoExtensions.FromPath(localFilePath); - //TODO: filesToDelete must take into account the Others container - var filesToDelete = (from deleteCandidate in deleteCandidates - let localFile = FileInfoExtensions.FromPath(deleteCandidate.FilePath) - let relativeFilePath = localFile.AsRelativeTo(accountInfo.AccountPath) - where - !cloudFiles.Any(r => r.RelativeUrlToFilePath(accountInfo.UserName) == relativeFilePath) - select localFile).ToList(); + var isUnselectedRootFolder = agent.IsUnselectedRootFolder(tuple.FilePath); + //Unselected root folders that have not yet been uploaded should be uploaded and added to the + //selective folders + + if (!Selectives.IsSelected(accountInfo, localFilePath) && + !(isUnselectedRootFolder && tuple.ObjectInfo == null)) + return; - //Set the status of missing files to Conflict - foreach (var item in filesToDelete) + // Local file unchanged? If both C and L are null, make sure it's because + //both the file is missing and the state checksum is not missing + if (tuple.C == tuple.L /*&& (localInfo.Exists || tuple.FileState == null)*/) { - //Try to acquire a gate on the file, to take into account files that have been dequeued - //and are being processed - using (var gate = NetworkGate.Acquire(item.FullName, NetworkOperation.Deleting)) + //No local changes + //Server unchanged? + if (tuple.S == tuple.L) { - if (gate.Failed) - continue; - StatusKeeper.SetFileState(item.FullName, FileStatus.Conflict, FileOverlayStatus.Deleted); + // No server changes + //Has the file been renamed locally? + if (!await MoveForLocalMove(accountInfo,tuple)) + //Has the file been renamed on the server? + MoveForServerMove(accountInfo, tuple); } - } - UpdateStatus(PithosStatus.HasConflicts); - StatusNotification.NotifyConflicts(filesToDelete, String.Format("{0} local files are missing from Pithos, possibly because they were deleted", filesToDelete.Count)); - StatusNotification.NotifyForFiles(filesToDelete, String.Format("{0} files were deleted", filesToDelete.Count), TraceLevel.Info); - } - else - { - var deletedFiles = new List(); - foreach (var objectInfo in cloudFiles) - { - var relativePath = objectInfo.RelativeUrlToFilePath(accountInfo.UserName); - var item = FileAgent.GetFileAgent(accountInfo).GetFileSystemInfo(relativePath); - if (item.Exists) + else { - if ((item.Attributes & FileAttributes.ReadOnly) == FileAttributes.ReadOnly) + //Different from server + //Does the server file exist? + if (tuple.S == null) + { + //Server file doesn't exist + //deleteObjectFromLocal() + using ( + StatusNotification.GetNotifier("Deleting local {0}", "Deleted local {0}",true, + localInfo.Name)) + { + DeleteLocalFile(agent, localFilePath); + } + } + else { - item.Attributes = item.Attributes & ~FileAttributes.ReadOnly; + //Server file exists + //downloadServerObject() // Result: L = S + //If the file has moved on the server, move it locally before downloading + using ( + StatusNotification.GetNotifier("Downloading {0}", "Downloaded {0}",true, + localInfo.Name)) + { + var targetPath = MoveForServerMove(accountInfo, tuple); + if (targetPath != null) + { + + await DownloadCloudFile(accountInfo, tuple, token, targetPath).ConfigureAwait(false); + + AddOwnFolderToSelectives(accountInfo, tuple, targetPath); + } + } + } + } + } + else + { + //Local changes found + + //Server unchanged? + if (tuple.S == tuple.L) + { + //The FileAgent selective sync checks for new root folder files + if (!agent.Ignore(localFilePath)) + { + if ((tuple.C == null || !localInfo.Exists) && tuple.ObjectInfo != null) + { + //deleteObjectFromServer() + DeleteCloudFile(accountInfo, tuple); + //updateRecord( Remove L, S) + } + else + { + //uploadLocalObject() // Result: S = C, L = S + var progress = new Progress(d => + StatusNotification.Notify(new StatusNotification(String.Format("Merkle Hashing for Upload {0:p} of {1}", d.Percentage, localInfo.Name)))); + + //Is it an unselected root folder + var isCreation = isUnselectedRootFolder ||//or a new folder under a selected parent? + (localInfo is DirectoryInfo && Selectives.IsSelected(accountInfo, localInfo) && tuple.FileState == null && tuple.ObjectInfo == null); + + + //Is this a result of a FILE move with no modifications? Then try to move it, + //to avoid an expensive hash + if (!await MoveForLocalMove(accountInfo, tuple)) + { + await UploadLocalFile(accountInfo, tuple, token, isCreation, localInfo,processedPaths, progress).ConfigureAwait(false); + } + + //updateRecord( S = C ) + //State updated by the uploader + + if (isCreation ) + { + ProcessChildren(accountInfo, tuple, agent, moves,processedPaths,token); + } + } + } + } + else + { + if (tuple.C == tuple.S) + { + // (Identical Changes) Result: L = S + //doNothing() + + //Don't update anything for nonexistend server files + if (tuple.S != null) + { + //Detect server moves + var targetPath = MoveForServerMove(accountInfo, tuple); + if (targetPath != null) + { + Debug.Assert(tuple.Merkle != null); + StatusKeeper.StoreInfo(targetPath, tuple.ObjectInfo, tuple.Merkle); + + AddOwnFolderToSelectives(accountInfo, tuple, targetPath); + } + } + else + { + //At this point, C==S==NULL and we have a stale state (L) + //Log the stale tuple for investigation + Log.WarnFormat("Stale tuple detected FilePathPath:[{0}], State:[{1}], LocalFile:[{2}]", tuple.FilePath, tuple.FileState, tuple.FileInfo); + + //And remove it + if (!String.IsNullOrWhiteSpace(tuple.FilePath)) + StatusKeeper.ClearFileStatus(tuple.FilePath); + } + } + else + { + if ((tuple.C == null || !localInfo.Exists) && tuple.ObjectInfo != null) + { + //deleteObjectFromServer() + DeleteCloudFile(accountInfo, tuple); + //updateRecord(Remove L, S) + } + //If both the local and server files are missing, the state is stale + else if (!localInfo.Exists && (tuple.S == null || tuple.ObjectInfo == null)) + { + StatusKeeper.ClearFileStatus(localInfo.FullName); + } + else + { + ReportConflictForMismatch(localFilePath); + //identifyAsConflict() // Manual action required + } } - item.Delete(); - DateTime lastDate; - _lastSeen.TryRemove(item.FullName, out lastDate); - deletedFiles.Add(item); } - StatusKeeper.SetFileState(item.FullName, FileStatus.Deleted, FileOverlayStatus.Deleted); } - StatusNotification.NotifyForFiles(deletedFiles, String.Format("{0} files were deleted", deletedFiles.Count), TraceLevel.Info); } + catch (Exception exc) + { + //In case of error log and retry with the next poll + Log.ErrorFormat("[SYNC] Failed for file {0}. Will Retry.\r\n{1}",tuple.FilePath,exc); + } + } + private void DeleteLocalFile(FileAgent agent, string localFilePath) + { + StatusKeeper.SetFileState(localFilePath, FileStatus.Deleted, + FileOverlayStatus.Deleted, ""); + using (NetworkGate.Acquire(localFilePath, NetworkOperation.Deleting)) + { + agent.Delete(localFilePath); + } + //updateRecord(Remove C, L) + StatusKeeper.ClearFileStatus(localFilePath); } - //Creates an appropriate action for each server file - private IEnumerable ChangesToActions(AccountInfo accountInfo, IEnumerable changes) + private async Task DownloadCloudFile(AccountInfo accountInfo, StateTuple tuple, CancellationToken token, string targetPath) + { + //Don't create a new state for non-existent files + if (File.Exists(targetPath) || Directory.Exists(targetPath)) + StatusKeeper.SetFileState(targetPath, FileStatus.Modified, FileOverlayStatus.Modified,""); + + var finalHash=await + NetworkAgent.Downloader.DownloadCloudFile(accountInfo, tuple.ObjectInfo, targetPath, + token) + .ConfigureAwait(false); + //updateRecord( L = S ) + StatusKeeper.UpdateFileChecksum(targetPath, tuple.ObjectInfo.ETag, + finalHash); + + StatusKeeper.StoreInfo(targetPath, tuple.ObjectInfo,finalHash); + } + + private async Task UploadLocalFile(AccountInfo accountInfo, StateTuple tuple, CancellationToken token, + bool isUnselectedRootFolder, FileSystemInfo localInfo, HashSet processedPaths, IProgress progress) { - if (changes == null) - throw new ArgumentNullException(); - Contract.EndContractBlock(); - var fileAgent = FileAgent.GetFileAgent(accountInfo); + var action = new CloudUploadAction(accountInfo, localInfo, tuple.FileState, + accountInfo.BlockSize, accountInfo.BlockHash, + "Poll", isUnselectedRootFolder, token, progress,tuple.Merkle); + + using (StatusNotification.GetNotifier("Uploading {0}", "Uploaded {0}",true, + localInfo.Name)) + { + await NetworkAgent.Uploader.UploadCloudFile(action, token).ConfigureAwait(false); + } - //In order to avoid multiple iterations over the files, we iterate only once - //over the remote files - foreach (var objectInfo in changes) + if (isUnselectedRootFolder) { - var relativePath = objectInfo.RelativeUrlToFilePath(accountInfo.UserName); - //and remove any matching objects from the list, adding them to the commonObjects list - if (fileAgent.Exists(relativePath)) + var dirActions =( + from dir in ((DirectoryInfo) localInfo).EnumerateDirectories("*", SearchOption.AllDirectories) + let subAction = new CloudUploadAction(accountInfo, dir, null, + accountInfo.BlockSize, accountInfo.BlockHash, + "Poll", true, token, progress) + select subAction).ToList(); + foreach (var dirAction in dirActions) { - //If a directory object already exists, we don't need to perform any other action - var localFile = fileAgent.GetFileSystemInfo(relativePath); - if (objectInfo.Content_Type == @"application/directory" && localFile is DirectoryInfo) - continue; - using (new SessionScope(FlushAction.Never)) - { - var state = StatusKeeper.GetStateByFilePath(localFile.FullName); - _lastSeen[localFile.FullName] = DateTime.Now; - //Common files should be checked on a per-case basis to detect differences, which is newer + processedPaths.Add(dirAction.LocalFile.FullName); + } + + await TaskEx.WhenAll(dirActions.Select(a=>NetworkAgent.Uploader.UploadCloudFile(a,token)).ToArray()); + } + } - yield return new CloudAction(accountInfo, CloudActionType.MustSynch, - localFile, objectInfo, state, accountInfo.BlockSize, - accountInfo.BlockHash); - } + private async Task MoveForLocalMove(AccountInfo accountInfo, StateTuple tuple) + { + //Is the previous path missing? + if (String.IsNullOrWhiteSpace(tuple.OldFullPath)) + return false; + //Has the file locally, in which case it should be uploaded rather than moved? + if (tuple.OldChecksum != tuple.Merkle.TopHash.ToHashString()) + return false; + + var relativePath = tuple.ObjectInfo.RelativeUrlToFilePath(accountInfo.UserName); + var serverPath = Path.Combine(accountInfo.AccountPath, relativePath); + //Has the file been renamed on the server? + if (!tuple.OldFullPath.Equals(serverPath)) + { + ReportConflictForDoubleRename(tuple.FilePath); + return false; + } + + try + { + + var client = new CloudFilesClient(accountInfo); + var objectInfo = CloudAction.CreateObjectInfoFor(accountInfo, tuple.FileInfo); + objectInfo.X_Object_Hash = tuple.Merkle.TopHash.ToHashString(); + var containerPath = Path.Combine(accountInfo.AccountPath, objectInfo.Container.ToUnescapedString()); + //TODO: SImplify these multiple conversions from and to Uris + var oldName = tuple.OldFullPath.AsRelativeTo(containerPath); + //Then execute a move instead of an upload + using (StatusNotification.GetNotifier("Moving {0}", "Moved {0}", true,tuple.FileInfo.Name)) + { + await client.MoveObject(objectInfo.Account, objectInfo.Container, oldName.Replace('\\','/').ToEscapedUri(), + objectInfo.Container, objectInfo.Name).ConfigureAwait(false); + StatusKeeper.MoveFileState(tuple.OldFullPath, tuple.FilePath, objectInfo, tuple.Merkle); + //StatusKeeper.StoreInfo(tuple.FilePath,objectInfo,tuple.Merkle); + //StatusKeeper.ClearFolderStatus(tuple.FilePath); } - else + return true; + } + catch (Exception exc) + { + Log.ErrorFormat("[MOVE] Failed for [{0}],:\r\n{1}", tuple.FilePath, exc); + //Return false to force an upload of the file + return false; + } + + } + + private void AddOwnFolderToSelectives(AccountInfo accountInfo, StateTuple tuple, string targetPath) + { + //Not for shared folders + if (tuple.ObjectInfo.IsShared==true) + return; + //Also ensure that any newly created folders are added to the selectives, if the original folder was selected + var containerPath = Path.Combine(accountInfo.AccountPath, tuple.ObjectInfo.Container.ToUnescapedString()); + + //If this is a root folder encountered for the first time + if (tuple.L == null && Directory.Exists(tuple.FileInfo.FullName) + && (tuple.FileInfo.FullName.IsAtOrBelow(containerPath))) + { + + var relativePath = tuple.ObjectInfo.RelativeUrlToFilePath(accountInfo.UserName); + var initialPath = Path.Combine(accountInfo.AccountPath, relativePath); + + //var hasMoved = true;// !initialPath.Equals(targetPath); + //If the new path is under a selected folder, add it to the selectives as well + if (Selectives.IsSelected(accountInfo, initialPath)) { - //Remote files should be downloaded - yield return new CloudDownloadAction(accountInfo, objectInfo); + Selectives.AddUri(accountInfo, tuple.ObjectInfo.Uri); + Selectives.Save(accountInfo); } } } - private IEnumerable CreatesToActions(AccountInfo accountInfo, IEnumerable creates) + private string MoveForServerMove(AccountInfo accountInfo, StateTuple tuple) { - if (creates == null) - throw new ArgumentNullException(); - Contract.EndContractBlock(); - var fileAgent = FileAgent.GetFileAgent(accountInfo); + if (tuple.ObjectInfo == null) + return null; + var relativePath = tuple.ObjectInfo.RelativeUrlToFilePath(accountInfo.UserName); + var serverPath = Path.Combine(accountInfo.AccountPath, relativePath); + + //Compare Case Insensitive + if (String.Equals(tuple.FilePath ,serverPath,StringComparison.InvariantCultureIgnoreCase)) + return serverPath; + + //Has the file been renamed locally? + if (!String.IsNullOrWhiteSpace(tuple.OldFullPath) && !tuple.OldFullPath.Equals(tuple.FilePath)) + { + ReportConflictForDoubleRename(tuple.FilePath); + return null; + } - //In order to avoid multiple iterations over the files, we iterate only once - //over the remote files - foreach (var objectInfo in creates) + tuple.FileInfo.Refresh(); + //The file/folder may not exist if it was moved because its parent moved + if (!tuple.FileInfo.Exists) { - var relativePath = objectInfo.RelativeUrlToFilePath(accountInfo.UserName); - //and remove any matching objects from the list, adding them to the commonObjects list - if (fileAgent.Exists(relativePath)) + var target=FileInfoExtensions.FromPath(serverPath); + if (!target.Exists) { - //If the object already exists, we probably have a conflict - //If a directory object already exists, we don't need to perform any other action - var localFile = fileAgent.GetFileSystemInfo(relativePath); - StatusKeeper.SetFileState(localFile.FullName, FileStatus.Conflict, FileOverlayStatus.Conflict); + Log.ErrorFormat("No source or target found while trying to move {0} to {1}", tuple.FileInfo.FullName, serverPath); } - else + return serverPath; + } + + using (StatusNotification.GetNotifier("Moving local {0}", "Moved local {0}", true,Path.GetFileName(tuple.FilePath))) + using(NetworkGate.Acquire(tuple.FilePath,NetworkOperation.Renaming)) + { + + var fi = tuple.FileInfo as FileInfo; + if (fi != null) + { + var targetFile = new FileInfo(serverPath); + if (!targetFile.Directory.Exists) + targetFile.Directory.Create(); + fi.MoveTo(serverPath); + } + var di = tuple.FileInfo as DirectoryInfo; + if (di != null) { - //Remote files should be downloaded - yield return new CloudDownloadAction(accountInfo, objectInfo); + var targetDir = new DirectoryInfo(serverPath); + if (!targetDir.Parent.Exists) + targetDir.Parent.Create(); + di.MoveTo(serverPath); } } + + StatusKeeper.StoreInfo(serverPath, tuple.ObjectInfo); + + return serverPath; } - private void ProcessTrashedFiles(AccountInfo accountInfo, IEnumerable trashObjects) + private void DeleteCloudFile(AccountInfo accountInfo, StateTuple tuple) { - var fileAgent = FileAgent.GetFileAgent(accountInfo); - foreach (var trashObject in trashObjects) + using (StatusNotification.GetNotifier("Deleting server {0}", "Deleted server {0}", true,Path.GetFileName(tuple.FilePath))) { - var barePath = trashObject.RelativeUrlToFilePath(accountInfo.UserName); - //HACK: Assume only the "pithos" container is used. Must find out what happens when - //deleting a file from a different container - var relativePath = Path.Combine("pithos", barePath); - fileAgent.Delete(relativePath); + + StatusKeeper.SetFileState(tuple.FilePath, FileStatus.Deleted, + FileOverlayStatus.Deleted, ""); + NetworkAgent.DeleteAgent.DeleteCloudFile(accountInfo, tuple.ObjectInfo); + StatusKeeper.ClearFileStatus(tuple.FilePath); } } + private void ProcessChildren(AccountInfo accountInfo, StateTuple tuple, FileAgent agent, ConcurrentDictionary moves,HashSet processedPaths,CancellationToken token) + { + + var dirInfo = tuple.FileInfo as DirectoryInfo; + var folderTuples = from folder in dirInfo.EnumerateDirectories("*", SearchOption.AllDirectories) + select new StateTuple(folder){C=Signature.MERKLE_EMPTY}; + + var fileTuples = from file in dirInfo.EnumerateFiles("*", SearchOption.AllDirectories) + let state=StatusKeeper.GetStateByFilePath(file.FullName) + select new StateTuple(file){ + Merkle=StatusAgent.CalculateTreeHash(file,accountInfo,state, + Settings.HashingParallelism,token,null) + }; + + //Process folders first, to ensure folders appear on the sever as soon as possible + folderTuples.ApplyAction(t => SyncSingleItem(accountInfo, t, agent, moves, processedPaths,token).Wait()); + + fileTuples.ApplyAction(t => SyncSingleItem(accountInfo, t, agent, moves,processedPaths, token).Wait()); + } + + + + + + + + /// + /// Returns the latest LastModified date from the list of objects, but only if it is before + /// than the threshold value + /// + /// + /// + /// + private static DateTimeOffset? GetLatestDateBefore(DateTime? threshold, IList cloudObjects) + { + DateTimeOffset? maxDate = null; + if (cloudObjects!=null && cloudObjects.Count > 0) + maxDate = cloudObjects.Max(obj => obj.Last_Modified); + if (!maxDate.HasValue) + return threshold; + if (!threshold.HasValue|| threshold > maxDate) + return maxDate; + return threshold; + } + + /// + /// Returns the latest LastModified date from the list of objects, but only if it is after + /// the threshold value + /// + /// + /// + /// + private static DateTimeOffset? GetLatestDateAfter(DateTimeOffset? threshold, IList cloudObjects) + { + DateTimeOffset? maxDate = null; + if (cloudObjects!=null && cloudObjects.Count > 0) + maxDate = cloudObjects.Max(obj => obj.Last_Modified); + if (!maxDate.HasValue) + return threshold; + if (!threshold.HasValue|| threshold < maxDate) + return maxDate; + return threshold; + } + + readonly AccountsDifferencer _differencer = new AccountsDifferencer(); + private bool _pause; + + + + + private void ReportConflictForMismatch(string localFilePath) + { + if (String.IsNullOrWhiteSpace(localFilePath)) + throw new ArgumentNullException("localFilePath"); + Contract.EndContractBlock(); + + StatusKeeper.SetFileState(localFilePath, FileStatus.Conflict, FileOverlayStatus.Conflict, "File changed at the server"); + UpdateStatus(PithosStatus.HasConflicts); + var message = String.Format("Conflict detected for file {0}", localFilePath); + Log.Warn(message); + StatusNotification.NotifyChange(message, TraceLevel.Warning); + } + + private void ReportConflictForDoubleRename(string localFilePath) + { + if (String.IsNullOrWhiteSpace(localFilePath)) + throw new ArgumentNullException("localFilePath"); + Contract.EndContractBlock(); + + StatusKeeper.SetFileState(localFilePath, FileStatus.Conflict, FileOverlayStatus.Conflict, "File renamed both locally and on the server"); + UpdateStatus(PithosStatus.HasConflicts); + var message = String.Format("Double rename conflict detected for file {0}", localFilePath); + Log.Warn(message); + StatusNotification.NotifyChange(message, TraceLevel.Warning); + } + + + /// + /// Notify the UI to update the visual status + /// + /// private void UpdateStatus(PithosStatus status) { - StatusKeeper.SetPithosStatus(status); - StatusNotification.Notify(new Notification()); + try + { + StatusNotification.SetPithosStatus(status); + //StatusNotification.Notify(new Notification()); + } + catch (Exception exc) + { + //Failure is not critical, just log it + Log.Warn("Error while updating status", exc); + } } private static void CreateContainerFolders(AccountInfo accountInfo, IEnumerable containers) { var containerPaths = from container in containers - let containerPath = Path.Combine(accountInfo.AccountPath, container.Name) - where container.Name != FolderConstants.TrashContainer && !Directory.Exists(containerPath) + let containerPath = Path.Combine(accountInfo.AccountPath, container.Name.ToUnescapedString()) + where container.Name.ToString() != FolderConstants.TrashContainer && !Directory.Exists(containerPath) select containerPath; foreach (var path in containerPaths) @@ -458,16 +1064,23 @@ namespace Pithos.Core.Agents } } - public void SetSyncUris(string[] uris) + public void AddAccount(AccountInfo accountInfo) { - var selectiveUris = uris.Select(uri => new Uri(uri)); - SelectiveUris=selectiveUris.ToList(); + //Avoid adding a duplicate accountInfo + _accounts.TryAdd(accountInfo.AccountKey, accountInfo); } - protected List SelectiveUris + public void RemoveAccount(AccountInfo accountInfo) { - get { return _selectiveUris;} - set { _selectiveUris = value; } + if (accountInfo == null) + return; + + AccountInfo account; + _accounts.TryRemove(accountInfo.AccountKey, out account); + + SnapshotDifferencer differencer; + _differencer.Differencers.TryRemove(accountInfo.AccountKey, out differencer); } + } }