#region /* ----------------------------------------------------------------------- * * * Copyright 2011-2012 GRNET S.A. All rights reserved. * * Redistribution and use in source and binary forms, with or * without modification, are permitted provided that the following * conditions are met: * * 1. Redistributions of source code must retain the above * copyright notice, this list of conditions and the following * disclaimer. * * 2. Redistributions in binary form must reproduce the above * copyright notice, this list of conditions and the following * disclaimer in the documentation and/or other materials * provided with the distribution. * * * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE * POSSIBILITY OF SUCH DAMAGE. * * The views and conclusions contained in the software and * documentation are those of the authors and should not be * interpreted as representing official policies, either expressed * or implied, of GRNET S.A. * * ----------------------------------------------------------------------- */ #endregion using System.Collections.Concurrent; using System.ComponentModel.Composition; using System.Diagnostics; using System.Diagnostics.Contracts; using System.IO; using System.Reflection; using System.Threading; using System.Threading.Tasks; using Pithos.Interfaces; using Pithos.Network; using log4net; namespace Pithos.Core.Agents { using System; using System.Collections.Generic; using System.Linq; /*public class PollRequest { public DateTime? Since { get; set; } public IEnumerable Batch { get; set; } }*/ /// /// PollAgent periodically polls the server to detect object changes. The agent retrieves a listing of all /// objects and compares it with a previously cached version to detect differences. /// New files are downloaded, missing files are deleted from the local file system and common files are compared /// to determine the appropriate action /// [Export] public class PollAgent { private static readonly ILog Log = LogManager.GetLogger(MethodBase.GetCurrentMethod().DeclaringType); [System.ComponentModel.Composition.Import] public IStatusKeeper StatusKeeper { get; set; } [System.ComponentModel.Composition.Import] public IPithosSettings Settings { get; set; } [System.ComponentModel.Composition.Import] public NetworkAgent NetworkAgent { get; set; } [System.ComponentModel.Composition.Import] public Selectives Selectives { get; set; } public IStatusNotification StatusNotification { get; set; } private CancellationTokenSource _currentOperationCancellation = new CancellationTokenSource(); public void CancelCurrentOperation() { //What does it mean to cancel the current upload/download? //Obviously, the current operation will be cancelled by throwing //a cancellation exception. // //The default behavior is to retry any operations that throw. //Obviously this is not what we want in this situation. //The cancelled operation should NOT bea retried. // //This can be done by catching the cancellation exception //and avoiding the retry. // //Have to reset the cancellation source - it is not possible to reset the source //Have to prevent a case where an operation requests a token from the old source var oldSource = Interlocked.Exchange(ref _currentOperationCancellation, new CancellationTokenSource()); oldSource.Cancel(); } public bool Pause { get { return _pause; } set { _pause = value; if (!_pause) _unPauseEvent.Set(); else { _unPauseEvent.Reset(); } } } public CancellationToken CancellationToken { get { return _currentOperationCancellation.Token; } } private bool _firstPoll = true; //The Sync Event signals a manual synchronisation private readonly AsyncManualResetEvent _syncEvent = new AsyncManualResetEvent(); private readonly AsyncManualResetEvent _unPauseEvent = new AsyncManualResetEvent(true); private readonly ConcurrentDictionary _lastSeen = new ConcurrentDictionary(); private readonly ConcurrentDictionary _accounts = new ConcurrentDictionary(); //private readonly ActionBlock _pollAction; readonly HashSet _knownContainers = new HashSet(); /// /// Start a manual synchronization /// public void SynchNow(IEnumerable paths=null) { _batchQueue.Enqueue(paths); _syncEvent.Set(); //_pollAction.Post(new PollRequest {Batch = paths}); } readonly ConcurrentQueue> _batchQueue=new ConcurrentQueue>(); ConcurrentDictionary _moves=new ConcurrentDictionary(); public void PostMove(MovedEventArgs args) { TaskEx.Run(() => _moves.AddOrUpdate(args.OldFullPath, args,(s,e)=>e)); } /// /// Remote files are polled periodically. Any changes are processed /// /// /// public void PollRemoteFiles(DateTime? since = null) { if (Log.IsDebugEnabled) Log.DebugFormat("Polling changes after [{0}]",since); Debug.Assert(Thread.CurrentThread.IsBackground, "Polling Ended up in the main thread!"); //GC.Collect(); using (ThreadContext.Stacks["Retrieve Remote"].Push("All accounts")) { //If this poll fails, we will retry with the same since value var nextSince = since; try { _unPauseEvent.Wait(); UpdateStatus(PithosStatus.PollSyncing); var accountBatches=new Dictionary>(); IEnumerable batch = null; if (_batchQueue.TryDequeue(out batch) && batch != null) foreach (var account in _accounts.Values) { var accountBatch = batch.Where(path => path.IsAtOrBelow(account.AccountPath)); accountBatches[account.AccountKey] = accountBatch; } var moves=Interlocked.Exchange(ref _moves, new ConcurrentDictionary()); var tasks = new List>(); foreach(var accountInfo in _accounts.Values) { IEnumerable accountBatch ; accountBatches.TryGetValue(accountInfo.AccountKey,out accountBatch); var t=ProcessAccountFiles (accountInfo, accountBatch, moves,since); tasks.Add(t); } var nextTimes=TaskEx.WhenAll(tasks.ToList()).Result; _firstPoll = false; //Reschedule the poll with the current timestamp as a "since" value if (nextTimes.Length>0) nextSince = nextTimes.Min(); if (Log.IsDebugEnabled) Log.DebugFormat("Next Poll for changes since [{0}]",nextSince); } catch (Exception ex) { Log.ErrorFormat("Error while processing accounts\r\n{0}", ex); //In case of failure retry with the same "since" value } UpdateStatus(PithosStatus.PollComplete); //The multiple try blocks are required because we can't have an await call //inside a finally block //TODO: Find a more elegant solution for reschedulling in the event of an exception try { //Wait for the polling interval to pass or the Sync event to be signalled nextSince = WaitForScheduledOrManualPoll(nextSince).Result; } finally { //Ensure polling is scheduled even in case of error TaskEx.Run(()=>PollRemoteFiles(nextSince)); //_pollAction.Post(new PollRequest {Since = nextSince}); } } } /// /// Wait for the polling period to expire or a manual sync request /// /// /// private async Task WaitForScheduledOrManualPoll(DateTime? since) { var sync = _syncEvent.WaitAsync(); var delay = TimeSpan.FromSeconds(Settings.PollingInterval); if (Log.IsDebugEnabled) Log.DebugFormat("Next Poll at [{0}]", DateTime.Now.Add(delay)); var wait = TaskEx.Delay(delay); var signaledTask = await TaskEx.WhenAny(sync, wait).ConfigureAwait(false); //Pausing takes precedence over manual sync or awaiting _unPauseEvent.Wait(); //Wait for network processing to finish before polling var pauseTask=NetworkAgent.ProceedEvent.WaitAsync(); await TaskEx.WhenAll(signaledTask, pauseTask).ConfigureAwait(false); //If polling is signalled by SynchNow, ignore the since tag if (sync.IsCompleted) { _syncEvent.Reset(); return null; } return since; } public async Task ProcessAccountFiles(AccountInfo accountInfo, IEnumerable accountBatch, ConcurrentDictionary moves, DateTime? since = null) { if (accountInfo == null) throw new ArgumentNullException("accountInfo"); if (String.IsNullOrWhiteSpace(accountInfo.AccountPath)) throw new ArgumentException("The AccountInfo.AccountPath is empty", "accountInfo"); Contract.EndContractBlock(); using (ThreadContext.Stacks["Retrieve Remote"].Push(accountInfo.UserName)) { await NetworkAgent.GetDeleteAwaiter().ConfigureAwait(false); Log.Info("Scheduled"); var client = new CloudFilesClient(accountInfo); //We don't need to check the trash container var allContainers=await client.ListContainers(accountInfo.UserName).ConfigureAwait(false); var containers = allContainers .Where(c=>c.Name.ToString()!="trash") .ToList(); CreateContainerFolders(accountInfo, containers); //The nextSince time fallback time is the same as the current. //If polling succeeds, the next Since time will be the smallest of the maximum modification times //of the shared and account objects var nextSince = since; try { //Wait for any deletions to finish await NetworkAgent.GetDeleteAwaiter().ConfigureAwait(false); //Get the poll time now. We may miss some deletions but it's better to keep a file that was deleted //than delete a file that was created while we were executing the poll //Get the list of server objects changed since the last check //The name of the container is passed as state in order to create a dictionary of tasks in a subsequent step var listObjects = (from container in containers select Task>.Factory.StartNew(_ => client.ListObjects(accountInfo.UserName, container.Name, since), container.Name)).ToList(); var listShared = Task>.Factory.StartNew(_ => client.ListSharedObjects(_knownContainers,since), "shared"); listObjects.Add(listShared); var listTasks = await Task.Factory.WhenAll(listObjects.ToArray()).ConfigureAwait(false); using (ThreadContext.Stacks["SCHEDULE"].Push("Process Results")) { var dict = listTasks.ToDictionary(t => t.AsyncState); //Get all non-trash objects. Remember, the container name is stored in AsyncState var remoteObjects = (from objectList in listTasks where (string)objectList.AsyncState.ToString() != "trash" from obj in objectList.Result orderby obj.Bytes ascending select obj).ToList(); //Get the latest remote object modification date, only if it is after //the original since date nextSince = GetLatestDateAfter(nextSince, remoteObjects); var sharedObjects = dict["shared"].Result; //DON'T process trashed files //If some files are deleted and added again to a folder, they will be deleted //even though they are new. //We would have to check file dates and hashes to ensure that a trashed file //can be deleted safely from the local hard drive. /* //Items with the same name, hash may be both in the container and the trash //Don't delete items that exist in the container var realTrash = from trash in trashObjects where !remoteObjects.Any( info => info.Name == trash.Name && info.Hash == trash.Hash) 8 select trash; ProcessTrashedFiles(accountInfo, realTrash); */ var cleanRemotes = (from info in remoteObjects.Union(sharedObjects) let name = info.Name.ToUnescapedString()??"" where !name.EndsWith(".ignore", StringComparison.InvariantCultureIgnoreCase) && !name.StartsWith(FolderConstants.CacheFolder + "/", StringComparison.InvariantCultureIgnoreCase) select info).ToList(); if (_firstPoll) StatusKeeper.CleanupOrphanStates(); var differencer = _differencer.PostSnapshot(accountInfo, cleanRemotes); var currentRemotes = differencer.Current.ToList(); StatusKeeper.CleanupStaleStates(accountInfo, currentRemotes); //var filterUris = Selectives.SelectiveUris[accountInfo.AccountKey]; //May have to wait if the FileAgent has asked for a Pause, due to local changes await _unPauseEvent.WaitAsync().ConfigureAwait(false); //Get the local files here var agent = AgentLocator.Get(accountInfo.AccountPath); var files = LoadLocalFileTuples(accountInfo, accountBatch); var states = StatusKeeper.GetAllStates(); var infos = (from remote in currentRemotes let path = remote.RelativeUrlToFilePath(accountInfo.UserName) let info=agent.GetFileSystemInfo(path) select Tuple.Create(info.FullName,remote)) .ToList(); var token = _currentOperationCancellation.Token; var tuples = MergeSources(infos, files, states,moves).ToList(); var processedPaths = new HashSet(); //Process only the changes in the batch file, if one exists var stateTuples = accountBatch==null?tuples:tuples.Where(t => accountBatch.Contains(t.FilePath)); foreach (var tuple in stateTuples.Where(s=>!s.Locked)) { await _unPauseEvent.WaitAsync().ConfigureAwait(false); //Set the Merkle Hash //SetMerkleHash(accountInfo, tuple); await SyncSingleItem(accountInfo, tuple, agent, moves,processedPaths,token).ConfigureAwait(false); } //On the first run /* if (_firstPoll) { MarkSuspectedDeletes(accountInfo, cleanRemotes); } */ Log.Info("[LISTENER] End Processing"); } } catch (Exception ex) { Log.ErrorFormat("[FAIL] ListObjects for {0} in ProcessRemoteFiles with {1}", accountInfo.UserName, ex); return nextSince; } Log.Info("[LISTENER] Finished"); return nextSince; } } /* private static void SetMerkleHash(AccountInfo accountInfo, StateTuple tuple) { //The Merkle hash for directories is that of an empty buffer if (tuple.FileInfo is DirectoryInfo) tuple.C = MERKLE_EMPTY; else if (tuple.FileState != null && tuple.MD5 == tuple.FileState.ETag) { //If there is a state whose MD5 matches, load the merkle hash from the file state //insteaf of calculating it tuple.C = tuple.FileState.Checksum; } else { tuple.Merkle = Signature.CalculateTreeHashAsync((FileInfo)tuple.FileInfo, accountInfo.BlockSize, accountInfo.BlockHash,1,progress); //tuple.C=tuple.Merkle.TopHash.ToHashString(); } } */ private IEnumerable LoadLocalFileTuples(AccountInfo accountInfo,IEnumerable batch ) { using (ThreadContext.Stacks["Account Files Hashing"].Push(accountInfo.UserName)) { var batchPaths = (batch==null)?new List():batch.ToList(); IEnumerable localInfos=AgentLocator.Get(accountInfo.AccountPath) .EnumerateFileSystemInfos(); if (batchPaths.Count>0) localInfos= localInfos.Where(fi => batchPaths.Contains(fi.FullName)); return localInfos; } } /// /// Wait and Pause the agent while waiting /// /// /// private async Task PauseFor(int backoff) { Pause = true; await TaskEx.Delay(backoff).ConfigureAwait(false); Pause = false; } private async Task SyncSingleItem(AccountInfo accountInfo, StateTuple tuple, FileAgent agent, ConcurrentDictionary moves,HashSet processedPaths, CancellationToken token) { Log.DebugFormat("Sync [{0}] C:[{1}] L:[{2}] S:[{3}]", tuple.FilePath, tuple.C, tuple.L, tuple.S); //If the processed paths already contain the current path, exit if (!processedPaths.Add(tuple.FilePath)) return; try { bool isInferredParent = tuple.ObjectInfo != null && tuple.ObjectInfo.UUID.StartsWith("00000000-0000-0000"); var localFilePath = tuple.FilePath; //Don't use the tuple info, it may have been deleted var localInfo = FileInfoExtensions.FromPath(localFilePath); var isUnselectedRootFolder = agent.IsUnselectedRootFolder(tuple.FilePath); //Unselected root folders that have not yet been uploaded should be uploaded and added to the //selective folders if (!Selectives.IsSelected(accountInfo, localFilePath) && !(isUnselectedRootFolder && tuple.ObjectInfo == null)) return; // Local file unchanged? If both C and L are null, make sure it's because //both the file is missing and the state checksum is not missing if (tuple.C == tuple.L /*&& (localInfo.Exists || tuple.FileState == null)*/) { //No local changes //Server unchanged? if (tuple.S == tuple.L) { // No server changes //Has the file been renamed locally? if (!MoveForLocalMove(accountInfo,tuple)) //Has the file been renamed on the server? MoveForServerMove(accountInfo, tuple); } else { //Different from server //Does the server file exist? if (tuple.S == null) { //Server file doesn't exist //deleteObjectFromLocal() using ( StatusNotification.GetNotifier("Deleting local {0}", "Deleted local {0}", localInfo.Name)) { DeleteLocalFile(agent, localFilePath); } } else { //Server file exists //downloadServerObject() // Result: L = S //If the file has moved on the server, move it locally before downloading using ( StatusNotification.GetNotifier("Downloading {0}", "Downloaded {0}", localInfo.Name)) { var targetPath = MoveForServerMove(accountInfo, tuple); if (targetPath != null) { await DownloadCloudFile(accountInfo, tuple, token, targetPath).ConfigureAwait(false); AddOwnFolderToSelectives(accountInfo, tuple, targetPath); } } /* StatusKeeper.SetFileState(targetPath, FileStatus.Unchanged, FileOverlayStatus.Normal, ""); */ } } } else { //Local changes found //Server unchanged? if (tuple.S == tuple.L) { //The FileAgent selective sync checks for new root folder files if (!agent.Ignore(localFilePath)) { if ((tuple.C == null || !localInfo.Exists) && tuple.ObjectInfo != null) { //deleteObjectFromServer() DeleteCloudFile(accountInfo, tuple); //updateRecord( Remove L, S) } else { //uploadLocalObject() // Result: S = C, L = S var progress = new Progress(d => StatusNotification.Notify(new StatusNotification(String.Format("Merkle Hashing for Upload {0:p} of {1}", d, localInfo.Name)))); //Is it an unselected root folder var isCreation = isUnselectedRootFolder ||//or a new folder under a selected parent? (localInfo is DirectoryInfo && Selectives.IsSelected(accountInfo, localInfo) && tuple.FileState == null && tuple.ObjectInfo == null); //Is this a result of a FILE move with no modifications? Then try to move it, //to avoid an expensive hash if (!MoveForLocalMove(accountInfo, tuple)) { await UploadLocalFile(accountInfo, tuple, token, isCreation, localInfo,processedPaths, progress).ConfigureAwait(false); } //updateRecord( S = C ) //State updated by the uploader if (isCreation ) { ProcessChildren(accountInfo, tuple, agent, moves,processedPaths,token); } } } } else { if (tuple.C == tuple.S) { // (Identical Changes) Result: L = S //doNothing() //Don't update anything for nonexistend server files if (tuple.S != null) { //Detect server moves var targetPath = MoveForServerMove(accountInfo, tuple); if (targetPath != null) { Debug.Assert(tuple.Merkle != null); StatusKeeper.StoreInfo(targetPath, tuple.ObjectInfo, tuple.Merkle); AddOwnFolderToSelectives(accountInfo, tuple, targetPath); } } else { //At this point, C==S==NULL and we have a stale state (L) //Log the stale tuple for investigation Log.WarnFormat("Stale tuple detected FilePathPath:[{0}], State:[{1}], LocalFile:[{2}]", tuple.FilePath, tuple.FileState, tuple.FileInfo); //And remove it if (!String.IsNullOrWhiteSpace(tuple.FilePath)) StatusKeeper.ClearFileStatus(tuple.FilePath); } } else { if ((tuple.C == null || !localInfo.Exists) && tuple.ObjectInfo != null) { //deleteObjectFromServer() DeleteCloudFile(accountInfo, tuple); //updateRecord(Remove L, S) } //If both the local and server files are missing, the state is stale else if (!localInfo.Exists && (tuple.S == null || tuple.ObjectInfo == null)) { StatusKeeper.ClearFileStatus(localInfo.FullName); } else { ReportConflictForMismatch(localFilePath); //identifyAsConflict() // Manual action required } } } } } catch (Exception exc) { //In case of error log and retry with the next poll Log.ErrorFormat("[SYNC] Failed for file {0}. Will Retry.\r\n{1}",tuple.FilePath,exc); } } private void DeleteLocalFile(FileAgent agent, string localFilePath) { StatusKeeper.SetFileState(localFilePath, FileStatus.Deleted, FileOverlayStatus.Deleted, ""); using (NetworkGate.Acquire(localFilePath, NetworkOperation.Deleting)) { agent.Delete(localFilePath); } //updateRecord(Remove C, L) StatusKeeper.ClearFileStatus(localFilePath); } private async Task DownloadCloudFile(AccountInfo accountInfo, StateTuple tuple, CancellationToken token, string targetPath) { StatusKeeper.SetFileState(targetPath, FileStatus.Modified, FileOverlayStatus.Modified, ""); var finalHash=await NetworkAgent.Downloader.DownloadCloudFile(accountInfo, tuple.ObjectInfo, targetPath, token) .ConfigureAwait(false); //updateRecord( L = S ) StatusKeeper.UpdateFileChecksum(targetPath, tuple.ObjectInfo.ETag, finalHash); StatusKeeper.StoreInfo(targetPath, tuple.ObjectInfo,finalHash); } private async Task UploadLocalFile(AccountInfo accountInfo, StateTuple tuple, CancellationToken token, bool isUnselectedRootFolder, FileSystemInfo localInfo, HashSet processedPaths, Progress progress) { var action = new CloudUploadAction(accountInfo, localInfo, tuple.FileState, accountInfo.BlockSize, accountInfo.BlockHash, "Poll", isUnselectedRootFolder, token, progress,tuple.Merkle); using (StatusNotification.GetNotifier("Uploading {0}", "Uploaded {0}", localInfo.Name)) { await NetworkAgent.Uploader.UploadCloudFile(action, token).ConfigureAwait(false); } if (isUnselectedRootFolder) { var dirActions =( from dir in ((DirectoryInfo) localInfo).EnumerateDirectories("*", SearchOption.AllDirectories) let subAction = new CloudUploadAction(accountInfo, dir, null, accountInfo.BlockSize, accountInfo.BlockHash, "Poll", true, token, progress) select subAction).ToList(); foreach (var dirAction in dirActions) { processedPaths.Add(dirAction.LocalFile.FullName); } await TaskEx.WhenAll(dirActions.Select(a=>NetworkAgent.Uploader.UploadCloudFile(a,token)).ToArray()); } } private bool MoveForLocalMove(AccountInfo accountInfo, StateTuple tuple) { //Is the file a directory or previous path missing? if (tuple.FileInfo is DirectoryInfo) return false; //Is the previous path missing? if (String.IsNullOrWhiteSpace(tuple.OldFullPath)) return false; //Has the file locally, in which case it should be uploaded rather than moved? if (tuple.OldChecksum != tuple.Merkle.TopHash.ToHashString()) return false; var relativePath = tuple.ObjectInfo.RelativeUrlToFilePath(accountInfo.UserName); var serverPath = Path.Combine(accountInfo.AccountPath, relativePath); //Has the file been renamed on the server? if (!tuple.OldFullPath.Equals(serverPath)) { ReportConflictForDoubleRename(tuple.FilePath); return false; } try { var client = new CloudFilesClient(accountInfo); var objectInfo = CloudAction.CreateObjectInfoFor(accountInfo, tuple.FileInfo); var containerPath = Path.Combine(accountInfo.AccountPath, objectInfo.Container.ToUnescapedString()); //TODO: SImplify these multiple conversions from and to Uris var oldName = tuple.OldFullPath.AsRelativeTo(containerPath); //Then execute a move instead of an upload using (StatusNotification.GetNotifier("Moving {0}", "Moved {0}", tuple.FileInfo.Name)) { client.MoveObject(objectInfo.Account, objectInfo.Container, oldName.ToEscapedUri(), objectInfo.Container, objectInfo.Name); } return true; } catch (Exception exc) { Log.ErrorFormat("[MOVE] Failed for [{0}],:\r\n{1}", tuple.FilePath, exc); //Return false to force an upload of the file return false; } } private void AddOwnFolderToSelectives(AccountInfo accountInfo, StateTuple tuple, string targetPath) { //Not for shared folders if (tuple.ObjectInfo.IsShared==true) return; //Also ensure that any newly created folders are added to the selectives, if the original folder was selected var containerPath = Path.Combine(accountInfo.AccountPath, tuple.ObjectInfo.Container.ToUnescapedString()); //If this is a root folder encountered for the first time if (tuple.L == null && Directory.Exists(tuple.FileInfo.FullName) && (tuple.FileInfo.FullName.IsAtOrBelow(containerPath))) { var relativePath = tuple.ObjectInfo.RelativeUrlToFilePath(accountInfo.UserName); var initialPath = Path.Combine(accountInfo.AccountPath, relativePath); //var hasMoved = true;// !initialPath.Equals(targetPath); //If the new path is under a selected folder, add it to the selectives as well if (Selectives.IsSelected(accountInfo, initialPath)) { Selectives.AddUri(accountInfo, tuple.ObjectInfo.Uri); Selectives.Save(accountInfo); } } } private string MoveForServerMove(AccountInfo accountInfo, StateTuple tuple) { if (tuple.ObjectInfo == null) return null; var relativePath = tuple.ObjectInfo.RelativeUrlToFilePath(accountInfo.UserName); var serverPath = Path.Combine(accountInfo.AccountPath, relativePath); //Compare Case Insensitive if (String.Equals(tuple.FilePath ,serverPath,StringComparison.InvariantCultureIgnoreCase)) return serverPath; //Has the file been renamed locally? if (!String.IsNullOrWhiteSpace(tuple.OldFullPath) && !tuple.OldFullPath.Equals(tuple.FilePath)) { ReportConflictForDoubleRename(tuple.FilePath); return null; } tuple.FileInfo.Refresh(); //The file/folder may not exist if it was moved because its parent moved if (!tuple.FileInfo.Exists) { var target=FileInfoExtensions.FromPath(serverPath); if (!target.Exists) { Log.ErrorFormat("No source or target found while trying to move {0} to {1}", tuple.FileInfo.FullName, serverPath); } return serverPath; } using (StatusNotification.GetNotifier("Moving local {0}", "Moved local {0}", Path.GetFileName(tuple.FilePath))) using(NetworkGate.Acquire(tuple.FilePath,NetworkOperation.Renaming)) { var fi = tuple.FileInfo as FileInfo; if (fi != null) { var targetFile = new FileInfo(serverPath); if (!targetFile.Directory.Exists) targetFile.Directory.Create(); fi.MoveTo(serverPath); } var di = tuple.FileInfo as DirectoryInfo; if (di != null) { var targetDir = new DirectoryInfo(serverPath); if (!targetDir.Parent.Exists) targetDir.Parent.Create(); di.MoveTo(serverPath); } } StatusKeeper.StoreInfo(serverPath, tuple.ObjectInfo); return serverPath; } private void DeleteCloudFile(AccountInfo accountInfo, StateTuple tuple) { using (StatusNotification.GetNotifier("Deleting server {0}", "Deleted server {0}", Path.GetFileName(tuple.FilePath))) { StatusKeeper.SetFileState(tuple.FilePath, FileStatus.Deleted, FileOverlayStatus.Deleted, ""); NetworkAgent.DeleteAgent.DeleteCloudFile(accountInfo, tuple.ObjectInfo); StatusKeeper.ClearFileStatus(tuple.FilePath); } } private void ProcessChildren(AccountInfo accountInfo, StateTuple tuple, FileAgent agent, ConcurrentDictionary moves,HashSet processedPaths,CancellationToken token) { var dirInfo = tuple.FileInfo as DirectoryInfo; var folderTuples = from folder in dirInfo.EnumerateDirectories("*", SearchOption.AllDirectories) select new StateTuple(folder){C=Signature.MERKLE_EMPTY}; var fileTuples = from file in dirInfo.EnumerateFiles("*", SearchOption.AllDirectories) let state=StatusKeeper.GetStateByFilePath(file.FullName) select new StateTuple(file){ Merkle=StatusAgent.CalculateTreeHash(file,accountInfo,state, Settings.HashingParallelism,token,null) }; //Process folders first, to ensure folders appear on the sever as soon as possible folderTuples.ApplyAction(t => SyncSingleItem(accountInfo, t, agent, moves, processedPaths,token).Wait()); fileTuples.ApplyAction(t => SyncSingleItem(accountInfo, t, agent, moves,processedPaths, token).Wait()); } /* * //Use the queue to retry locked file hashing var fileQueue = new ConcurrentQueue(localInfos); var results = new List>(); var backoff = 0; while (fileQueue.Count > 0) { FileSystemInfo file; fileQueue.TryDequeue(out file); using (ThreadContext.Stacks["File"].Push(file.FullName)) { try { //Replace MD5 here, do the calc while syncing individual files string hash ; if (file is DirectoryInfo) hash = MD5_EMPTY; else { //Wait in case the FileAgent has requested a Pause await _unPauseEvent.WaitAsync().ConfigureAwait(false); using (StatusNotification.GetNotifier("Hashing {0}", "", file.Name)) { hash = ((FileInfo)file).ComputeShortHash(StatusNotification); backoff = 0; } } results.Add(Tuple.Create(file, hash)); } catch (IOException exc) { Log.WarnFormat("[HASH] File in use, will retry [{0}]", exc); fileQueue.Enqueue(file); //If this is the only enqueued file if (fileQueue.Count != 1) continue; //Increase delay if (backoff<60000) backoff += 10000; //Pause Polling for the specified time } if (backoff>0) await PauseFor(backoff).ConfigureAwait(false); } } return results; */ private IEnumerable MergeSources(IEnumerable> infos, IEnumerable files, List states, ConcurrentDictionary moves) { var tuplesByPath = new Dictionary(); foreach (var info in files) { var tuple = new StateTuple(info); //Is this the target of a move event? var moveArg = moves.Values.FirstOrDefault(arg => info.FullName.Equals(arg.FullPath, StringComparison.InvariantCultureIgnoreCase) || info.FullName.IsAtOrBelow(arg.FullPath)); if (moveArg != null) { tuple.NewFullPath = info.FullName; var relativePath = info.AsRelativeTo(moveArg.FullPath); tuple.OldFullPath = Path.Combine(moveArg.OldFullPath, relativePath); tuple.OldChecksum = states.FirstOrDefault(st => st.FilePath.Equals(tuple.OldFullPath, StringComparison.InvariantCultureIgnoreCase)) .NullSafe(st => st.Checksum); } tuplesByPath[tuple.FilePath] = tuple; } //For files that have state foreach (var state in states) { StateTuple hashTuple; if (tuplesByPath.TryGetValue(state.FilePath, out hashTuple)) { hashTuple.FileState = state; UpdateHashes(hashTuple); } else if (moves.ContainsKey(state.FilePath) && tuplesByPath.TryGetValue(moves[state.FilePath].FullPath, out hashTuple)) { hashTuple.FileState = state; UpdateHashes(hashTuple); } else { var fsInfo = FileInfoExtensions.FromPath(state.FilePath); hashTuple = new StateTuple {FileInfo = fsInfo, FileState = state}; //Is the source of a moved item? var moveArg = moves.Values.FirstOrDefault(arg => state.FilePath.Equals(arg.OldFullPath,StringComparison.InvariantCultureIgnoreCase) || state.FilePath.IsAtOrBelow(arg.OldFullPath)); if (moveArg != null) { var relativePath = state.FilePath.AsRelativeTo(moveArg.OldFullPath); hashTuple.NewFullPath = Path.Combine(moveArg.FullPath,relativePath); hashTuple.OldFullPath = state.FilePath; //Do we have the old MD5? //hashTuple.OldMD5 = state.LastMD5; } tuplesByPath[state.FilePath] = hashTuple; } } //for files that don't have state foreach (var tuple in tuplesByPath.Values.Where(t => t.FileState == null)) { UpdateHashes(tuple); } var tuplesByID = tuplesByPath.Values .Where(tuple => tuple.FileState != null && tuple.FileState.ObjectID!=null) .ToDictionary(tuple=>tuple.FileState.ObjectID,tuple=>tuple);//new Dictionary(); foreach (var info in infos) { StateTuple hashTuple; var filePath = info.Item1; var objectInfo = info.Item2; var objectID = objectInfo.UUID; if (objectID != _emptyGuid && tuplesByID.TryGetValue(objectID, out hashTuple)) { hashTuple.ObjectInfo = objectInfo; } else if (tuplesByPath.TryGetValue(filePath, out hashTuple)) { hashTuple.ObjectInfo = objectInfo; } else { var fsInfo = FileInfoExtensions.FromPath(filePath); hashTuple= new StateTuple {FileInfo = fsInfo, ObjectInfo = objectInfo}; tuplesByPath[filePath] = hashTuple; if (objectInfo.UUID!=_emptyGuid) tuplesByID[objectInfo.UUID] = hashTuple; } } var tuples = tuplesByPath.Values; var brokenTuples = from tuple in tuples where tuple.FileState != null && tuple.FileState.Checksum == null && tuple.ObjectInfo != null && (tuple.FileInfo==null || !tuple.FileInfo.Exists) select tuple; var actualTuples = tuples.Except(brokenTuples); Debug.Assert(actualTuples.All(t => t.HashesValid())); foreach (var tuple in brokenTuples) { StatusKeeper.SetFileState(tuple.FileState.FilePath, FileStatus.Conflict, FileOverlayStatus.Conflict, "FileState without checksum encountered for server object missing from disk"); } return actualTuples; } /// /// Update the tuple with the file's hashes, avoiding calculation if the file is unchanged /// /// /// /// The function first checks the file's size and last write date to see if there are any changes. If there are none, /// the file's stored hashes are used. /// Otherwise, MD5 is calculated first to ensure there are no changes. If MD5 is different, the Merkle hash is calculated /// private void UpdateHashes(StateTuple hashTuple) { try { var state = hashTuple.NullSafe(s => s.FileState); var storedHash = state.NullSafe(s => s.Checksum); var storedHashes = state.NullSafe(s => s.Hashes); //var storedMD5 = state.NullSafe(s => s.LastMD5); var storedDate = state.NullSafe(s => s.LastWriteDate) ?? DateTime.MinValue; var storedLength = state.NullSafe(s => s.LastLength); //var md5Hash = Signature.MD5_EMPTY; var merkle=TreeHash.Empty; if (hashTuple.FileInfo is FileInfo) { var file = (FileInfo)hashTuple.FileInfo.WithProperCapitalization(); //Attributes unchanged? //LastWriteTime is only accurate to the second var unchangedAttributes = file.LastWriteTime - storedDate < TimeSpan.FromSeconds(1) && storedLength == file.Length; //Attributes appear unchanged but the file length doesn't match the stored hash ? var nonEmptyMismatch = unchangedAttributes && (file.Length == 0 ^ storedHash== Signature.MERKLE_EMPTY); //Missing hashes for NON-EMPTY hash ? var missingHashes = storedHash != Signature.MERKLE_EMPTY && String.IsNullOrWhiteSpace(storedHashes); //Unchanged attributes but changed MD5 //Short-circuiting ensures MD5 is computed only if the attributes are changed //var md5Mismatch = (!unchangedAttributes && file.ComputeShortHash(StatusNotification) != storedMD5); //If the attributes are unchanged but the Merkle doesn't match the size, //or the attributes and the MD5 hash have changed, //or the hashes are missing but the tophash is NOT empty, we need to recalculate // //Otherwise we load the hashes from state if (!unchangedAttributes || nonEmptyMismatch || missingHashes) merkle = RecalculateTreehash(file); else { merkle=TreeHash.Parse(hashTuple.FileState.Hashes); //merkle.MD5 = storedMD5; } //md5Hash = merkle.MD5; } //hashTuple.MD5 = md5Hash; //Setting Merkle also updates C hashTuple.Merkle = merkle; } catch (IOException) { hashTuple.Locked = true; } } /// /// Recalculate a file's treehash and md5 and update the database /// /// /// private TreeHash RecalculateTreehash(FileInfo file) { var progress = new Progress(d =>StatusNotification.Notify( new StatusNotification(String.Format("Hashing {0} of {1}", d, file.Name)))); var merkle = Signature.CalculateTreeHash(file, StatusKeeper.BlockSize, StatusKeeper.BlockHash,CancellationToken, progress); StatusKeeper.UpdateFileHashes(file.FullName, merkle); return merkle; } /// /// Returns the latest LastModified date from the list of objects, but only if it is before /// than the threshold value /// /// /// /// private static DateTime? GetLatestDateBefore(DateTime? threshold, IList cloudObjects) { DateTime? maxDate = null; if (cloudObjects!=null && cloudObjects.Count > 0) maxDate = cloudObjects.Max(obj => obj.Last_Modified); if (maxDate == null || maxDate == DateTime.MinValue) return threshold; if (threshold == null || threshold == DateTime.MinValue || threshold > maxDate) return maxDate; return threshold; } /// /// Returns the latest LastModified date from the list of objects, but only if it is after /// the threshold value /// /// /// /// private static DateTime? GetLatestDateAfter(DateTime? threshold, IList cloudObjects) { DateTime? maxDate = null; if (cloudObjects!=null && cloudObjects.Count > 0) maxDate = cloudObjects.Max(obj => obj.Last_Modified); if (maxDate == null || maxDate == DateTime.MinValue) return threshold; if (threshold == null || threshold == DateTime.MinValue || threshold < maxDate) return maxDate; return threshold; } readonly AccountsDifferencer _differencer = new AccountsDifferencer(); private bool _pause; private readonly string _emptyGuid = Guid.Empty.ToString(); private void ReportConflictForMismatch(string localFilePath) { if (String.IsNullOrWhiteSpace(localFilePath)) throw new ArgumentNullException("localFilePath"); Contract.EndContractBlock(); StatusKeeper.SetFileState(localFilePath, FileStatus.Conflict, FileOverlayStatus.Conflict, "File changed at the server"); UpdateStatus(PithosStatus.HasConflicts); var message = String.Format("Conflict detected for file {0}", localFilePath); Log.Warn(message); StatusNotification.NotifyChange(message, TraceLevel.Warning); } private void ReportConflictForDoubleRename(string localFilePath) { if (String.IsNullOrWhiteSpace(localFilePath)) throw new ArgumentNullException("localFilePath"); Contract.EndContractBlock(); StatusKeeper.SetFileState(localFilePath, FileStatus.Conflict, FileOverlayStatus.Conflict, "File renamed both locally and on the server"); UpdateStatus(PithosStatus.HasConflicts); var message = String.Format("Double rename conflict detected for file {0}", localFilePath); Log.Warn(message); StatusNotification.NotifyChange(message, TraceLevel.Warning); } /// /// Notify the UI to update the visual status /// /// private void UpdateStatus(PithosStatus status) { try { StatusNotification.SetPithosStatus(status); //StatusNotification.Notify(new Notification()); } catch (Exception exc) { //Failure is not critical, just log it Log.Warn("Error while updating status", exc); } } private static void CreateContainerFolders(AccountInfo accountInfo, IEnumerable containers) { var containerPaths = from container in containers let containerPath = Path.Combine(accountInfo.AccountPath, container.Name.ToUnescapedString()) where container.Name.ToString() != FolderConstants.TrashContainer && !Directory.Exists(containerPath) select containerPath; foreach (var path in containerPaths) { Directory.CreateDirectory(path); } } public void AddAccount(AccountInfo accountInfo) { //Avoid adding a duplicate accountInfo _accounts.TryAdd(accountInfo.AccountKey, accountInfo); } public void RemoveAccount(AccountInfo accountInfo) { AccountInfo account; _accounts.TryRemove(accountInfo.AccountKey, out account); SnapshotDifferencer differencer; _differencer.Differencers.TryRemove(accountInfo.AccountKey, out differencer); } } }