X-Git-Url: https://code.grnet.gr/git/pithos-ms-client/blobdiff_plain/c875d683752bde605a05350e90f0d603d55958fc..88e2300a132f903fe892721360f12ec0ebb0824a:/trunk/Pithos.Core/Agents/PollAgent.cs diff --git a/trunk/Pithos.Core/Agents/PollAgent.cs b/trunk/Pithos.Core/Agents/PollAgent.cs index 28c4219..6d5d89b 100644 --- a/trunk/Pithos.Core/Agents/PollAgent.cs +++ b/trunk/Pithos.Core/Agents/PollAgent.cs @@ -45,13 +45,9 @@ using System.ComponentModel.Composition; using System.Diagnostics; using System.Diagnostics.Contracts; using System.IO; -using System.Linq.Expressions; using System.Reflection; -using System.Security.Cryptography; using System.Threading; using System.Threading.Tasks; -using System.Threading.Tasks.Dataflow; -using Castle.ActiveRecord; using Pithos.Interfaces; using Pithos.Network; using log4net; @@ -159,7 +155,18 @@ namespace Pithos.Core.Agents public void SynchNow(IEnumerable paths=null) { _batchQueue.Enqueue(paths); - _syncEvent.Set(); + _syncEvent.SetAsync(); + + //_pollAction.Post(new PollRequest {Batch = paths}); + } + + /// + /// Start a manual synchronization + /// + public Task SynchNowAsync(IEnumerable paths=null) + { + _batchQueue.Enqueue(paths); + return _syncEvent.SetAsync(); //_pollAction.Post(new PollRequest {Batch = paths}); } @@ -173,12 +180,15 @@ namespace Pithos.Core.Agents TaskEx.Run(() => _moves.AddOrUpdate(args.OldFullPath, args,(s,e)=>e)); } + + private bool _hasConnection; + /// /// Remote files are polled periodically. Any changes are processed /// /// /// - public void PollRemoteFiles(DateTime? since = null) + public async Task PollRemoteFiles(DateTimeOffset? since = null) { if (Log.IsDebugEnabled) Log.DebugFormat("Polling changes after [{0}]",since); @@ -187,44 +197,73 @@ namespace Pithos.Core.Agents //GC.Collect(); + using (ThreadContext.Stacks["Retrieve Remote"].Push("All accounts")) { //If this poll fails, we will retry with the same since value - var nextSince = since; + DateTimeOffset? nextSince = since; try { _unPauseEvent.Wait(); UpdateStatus(PithosStatus.PollSyncing); - var accountBatches=new Dictionary>(); - IEnumerable batch = null; - if (_batchQueue.TryDequeue(out batch) && batch != null) - foreach (var account in _accounts.Values) + if (!NetworkAgent.IsConnectedToInternet) + { + if (_hasConnection) { - var accountBatch = batch.Where(path => path.IsAtOrBelow(account.AccountPath)); - accountBatches[account.AccountKey] = accountBatch; + StatusNotification.Notify(new Notification + { + Level = TraceLevel.Error, + Title = "Internet Connection problem", + Message ="Internet connectivity was lost. Synchronization will continue when connectivity is restored" + }); } - - var moves=Interlocked.Exchange(ref _moves, new ConcurrentDictionary()); - - var tasks = new List>(); - foreach(var accountInfo in _accounts.Values) - { - IEnumerable accountBatch ; - accountBatches.TryGetValue(accountInfo.AccountKey,out accountBatch); - var t=ProcessAccountFiles (accountInfo, accountBatch, moves,since); - tasks.Add(t); + _hasConnection = false; } + else + { + if (!_hasConnection) + { + StatusNotification.Notify(new Notification + { + Level = TraceLevel.Info, + Title = "Internet Connection", + Message = "Internet connectivity restored." + }); + } + _hasConnection = true; - var nextTimes=TaskEx.WhenAll(tasks.ToList()).Result; + var accountBatches = new Dictionary>(); + IEnumerable batch = null; + if (_batchQueue.TryDequeue(out batch) && batch != null) + foreach (var account in _accounts.Values) + { + var accountBatch = batch.Where(path => path.IsAtOrBelow(account.AccountPath)); + accountBatches[account.AccountKey] = accountBatch; + } - _firstPoll = false; - //Reschedule the poll with the current timestamp as a "since" value + var moves = Interlocked.Exchange(ref _moves, new ConcurrentDictionary()); - if (nextTimes.Length>0) - nextSince = nextTimes.Min(); - if (Log.IsDebugEnabled) - Log.DebugFormat("Next Poll at [{0}]",nextSince); + var tasks = new List>(); + foreach (var accountInfo in _accounts.Values) + { + IEnumerable accountBatch; + accountBatches.TryGetValue(accountInfo.AccountKey, out accountBatch); + var t = ProcessAccountFiles(accountInfo, accountBatch, moves, since); + tasks.Add(t); + } + + var taskList = tasks.ToList(); + var nextTimes = await TaskEx.WhenAll(taskList).ConfigureAwait(false); + + _firstPoll = false; + //Reschedule the poll with the current timestamp as a "since" value + + if (nextTimes.Length > 0) + nextSince = nextTimes.Min(); + if (Log.IsDebugEnabled) + Log.DebugFormat("Next Poll for changes since [{0}]", nextSince); + } } catch (Exception ex) { @@ -239,12 +278,14 @@ namespace Pithos.Core.Agents try { //Wait for the polling interval to pass or the Sync event to be signalled - nextSince = WaitForScheduledOrManualPoll(nextSince).Result; + nextSince = await WaitForScheduledOrManualPoll(nextSince).ConfigureAwait(false); } finally { //Ensure polling is scheduled even in case of error +#pragma warning disable 4014 TaskEx.Run(()=>PollRemoteFiles(nextSince)); +#pragma warning restore 4014 //_pollAction.Post(new PollRequest {Since = nextSince}); } } @@ -255,10 +296,13 @@ namespace Pithos.Core.Agents /// /// /// - private async Task WaitForScheduledOrManualPoll(DateTime? since) + private async Task WaitForScheduledOrManualPoll(DateTimeOffset? since) { var sync = _syncEvent.WaitAsync(); - var wait = TaskEx.Delay(TimeSpan.FromSeconds(Settings.PollingInterval)); + var delay = TimeSpan.FromSeconds(Settings.PollingInterval); + if (Log.IsDebugEnabled) + Log.DebugFormat("Next Poll at [{0}]", DateTime.Now.Add(delay)); + var wait = TaskEx.Delay(delay); var signaledTask = await TaskEx.WhenAny(sync, wait).ConfigureAwait(false); @@ -280,7 +324,7 @@ namespace Pithos.Core.Agents - public async Task ProcessAccountFiles(AccountInfo accountInfo, IEnumerable accountBatch, ConcurrentDictionary moves, DateTime? since = null) + public async Task ProcessAccountFiles(AccountInfo accountInfo, IEnumerable accountBatch, ConcurrentDictionary moves, DateTimeOffset? since = null) { if (accountInfo == null) throw new ArgumentNullException("accountInfo"); @@ -298,8 +342,9 @@ namespace Pithos.Core.Agents var client = new CloudFilesClient(accountInfo); //We don't need to check the trash container - var containers = client.ListContainers(accountInfo.UserName) - .Where(c=>c.Name!="trash") + var allContainers=await client.ListContainers(accountInfo.UserName).ConfigureAwait(false); + var containers = allContainers + .Where(c=>c.Name.ToString()!="trash") .ToList(); @@ -308,7 +353,7 @@ namespace Pithos.Core.Agents //The nextSince time fallback time is the same as the current. //If polling succeeds, the next Since time will be the smallest of the maximum modification times //of the shared and account objects - var nextSince = since; + DateTimeOffset? nextSince = since; try { @@ -317,24 +362,34 @@ namespace Pithos.Core.Agents //Get the poll time now. We may miss some deletions but it's better to keep a file that was deleted //than delete a file that was created while we were executing the poll + var token = _currentOperationCancellation.Token; + //Get the list of server objects changed since the last check //The name of the container is passed as state in order to create a dictionary of tasks in a subsequent step var listObjects = (from container in containers select Task>.Factory.StartNew(_ => - client.ListObjects(accountInfo.UserName, container.Name, since), container.Name)).ToList(); + client.ListObjects(accountInfo.UserName, container.Name, since), container.Name,token)).ToList(); - var listShared = Task>.Factory.StartNew(_ => - client.ListSharedObjects(_knownContainers,since), "shared"); + var selectiveEnabled = Selectives.IsSelectiveEnabled(accountInfo.AccountKey); + var listShared = selectiveEnabled? + Task>.Factory.StartNew(_ => + client.ListSharedObjects(_knownContainers,since), "shared",token) + :Task.Factory.FromResult((IList) new List(),"shared"); + listObjects.Add(listShared); var listTasks = await Task.Factory.WhenAll(listObjects.ToArray()).ConfigureAwait(false); using (ThreadContext.Stacks["SCHEDULE"].Push("Process Results")) { + + //In case of cancellation, retry for the current date + if (token.IsCancellationRequested) return since; + var dict = listTasks.ToDictionary(t => t.AsyncState); //Get all non-trash objects. Remember, the container name is stored in AsyncState var remoteObjects = (from objectList in listTasks - where (string)objectList.AsyncState != "trash" + where objectList.AsyncState.ToString() != "trash" from obj in objectList.Result orderby obj.Bytes ascending select obj).ToList(); @@ -362,17 +417,24 @@ namespace Pithos.Core.Agents */ var cleanRemotes = (from info in remoteObjects.Union(sharedObjects) - let name = info.Name??"" + let name = info.Name.ToUnescapedString()??"" where !name.EndsWith(".ignore", StringComparison.InvariantCultureIgnoreCase) && !name.StartsWith(FolderConstants.CacheFolder + "/", StringComparison.InvariantCultureIgnoreCase) select info).ToList(); + //In case of cancellation, retry for the current date + if (token.IsCancellationRequested) return since; + if (_firstPoll) StatusKeeper.CleanupOrphanStates(); var differencer = _differencer.PostSnapshot(accountInfo, cleanRemotes); var currentRemotes = differencer.Current.ToList(); + + //In case of cancellation, retry for the current date + if (token.IsCancellationRequested) return since; + StatusKeeper.CleanupStaleStates(accountInfo, currentRemotes); //var filterUris = Selectives.SelectiveUris[accountInfo.AccountKey]; @@ -380,32 +442,44 @@ namespace Pithos.Core.Agents //May have to wait if the FileAgent has asked for a Pause, due to local changes await _unPauseEvent.WaitAsync().ConfigureAwait(false); + //In case of cancellation, retry for the current date + if (token.IsCancellationRequested) return since; + //Get the local files here var agent = AgentLocator.Get(accountInfo.AccountPath); var files = LoadLocalFileTuples(accountInfo, accountBatch); - var states = FileState.Queryable.ToList(); - + + + //WARNING: GetFileSystemInfo may create state entries. + //TODO: Find a different way to create the tuples and block long filenames var infos = (from remote in currentRemotes let path = remote.RelativeUrlToFilePath(accountInfo.UserName) let info=agent.GetFileSystemInfo(path) + where info != null select Tuple.Create(info.FullName,remote)) .ToList(); - var token = _currentOperationCancellation.Token; + var states = StatusKeeper.GetAllStates(); + + var tupleBuilder = new TupleBuilder(CancellationToken,StatusKeeper,StatusNotification,Settings); - var tuples = MergeSources(infos, files, states,moves).ToList(); + var tuples = tupleBuilder.MergeSources(infos, files, states,moves).ToList(); + var processedPaths = new HashSet(); //Process only the changes in the batch file, if one exists var stateTuples = accountBatch==null?tuples:tuples.Where(t => accountBatch.Contains(t.FilePath)); foreach (var tuple in stateTuples.Where(s=>!s.Locked)) { await _unPauseEvent.WaitAsync().ConfigureAwait(false); + //In case of cancellation, retry for the current date + if (token.IsCancellationRequested) return since; + //Set the Merkle Hash //SetMerkleHash(accountInfo, tuple); - await SyncSingleItem(accountInfo, tuple, agent, moves,token).ConfigureAwait(false); + await SyncSingleItem(accountInfo, tuple, agent, moves,processedPaths,token).ConfigureAwait(false); } @@ -480,10 +554,14 @@ namespace Pithos.Core.Agents Pause = false; } - private async Task SyncSingleItem(AccountInfo accountInfo, StateTuple tuple, FileAgent agent, ConcurrentDictionary moves, CancellationToken token) + private async Task SyncSingleItem(AccountInfo accountInfo, StateTuple tuple, FileAgent agent, ConcurrentDictionary moves,HashSet processedPaths, CancellationToken token) { Log.DebugFormat("Sync [{0}] C:[{1}] L:[{2}] S:[{3}]", tuple.FilePath, tuple.C, tuple.L, tuple.S); + //If the processed paths already contain the current path, exit + if (!processedPaths.Add(tuple.FilePath)) + return; + try { bool isInferredParent = tuple.ObjectInfo != null && tuple.ObjectInfo.UUID.StartsWith("00000000-0000-0000"); @@ -511,8 +589,10 @@ namespace Pithos.Core.Agents if (tuple.S == tuple.L) { // No server changes - //Has the file been renamed on the server? - MoveForServerMove(accountInfo, tuple); + //Has the file been renamed locally? + if (!await MoveForLocalMove(accountInfo,tuple)) + //Has the file been renamed on the server? + MoveForServerMove(accountInfo, tuple); } else { @@ -523,17 +603,10 @@ namespace Pithos.Core.Agents //Server file doesn't exist //deleteObjectFromLocal() using ( - StatusNotification.GetNotifier("Deleting local {0}", "Deleted local {0}", - Path.GetFileName(localFilePath))) + StatusNotification.GetNotifier("Deleting local {0}", "Deleted local {0}",true, + localInfo.Name)) { - StatusKeeper.SetFileState(localFilePath, FileStatus.Deleted, - FileOverlayStatus.Deleted, ""); - using (NetworkGate.Acquire(localFilePath, NetworkOperation.Deleting)) - { - agent.Delete(localFilePath); - } - //updateRecord(Remove C, L) - StatusKeeper.ClearFileStatus(localFilePath); + DeleteLocalFile(agent, localFilePath); } } else @@ -542,31 +615,18 @@ namespace Pithos.Core.Agents //downloadServerObject() // Result: L = S //If the file has moved on the server, move it locally before downloading using ( - StatusNotification.GetNotifier("Downloading {0}", "Downloaded {0}", - Path.GetFileName(localFilePath))) + StatusNotification.GetNotifier("Downloading {0}", "Downloaded {0}",true, + localInfo.Name)) { var targetPath = MoveForServerMove(accountInfo, tuple); + if (targetPath != null) + { - StatusKeeper.SetFileState(targetPath, FileStatus.Modified, FileOverlayStatus.Modified, - ""); - - await - NetworkAgent.Downloader.DownloadCloudFile(accountInfo, tuple.ObjectInfo, targetPath, - token) - .ConfigureAwait(false); - //updateRecord( L = S ) - StatusKeeper.UpdateFileChecksum(targetPath, tuple.ObjectInfo.ETag, - tuple.ObjectInfo.X_Object_Hash); - - StatusKeeper.StoreInfo(targetPath, tuple.ObjectInfo); + await DownloadCloudFile(accountInfo, tuple, token, targetPath).ConfigureAwait(false); - AddOwnFolderToSelectives(accountInfo, tuple, targetPath); + AddOwnFolderToSelectives(accountInfo, tuple, targetPath); + } } - - /* - StatusKeeper.SetFileState(targetPath, FileStatus.Unchanged, - FileOverlayStatus.Normal, ""); - */ } } } @@ -590,26 +650,27 @@ namespace Pithos.Core.Agents else { //uploadLocalObject() // Result: S = C, L = S - var progress = new Progress(d => - StatusNotification.Notify(new StatusNotification(String.Format("Merkle Hashing for Upload {0:p} of {1}", d, localInfo.Name)))); - - //Debug.Assert(tuple.FileState !=null); - var action = new CloudUploadAction(accountInfo, localInfo, tuple.FileState, - accountInfo.BlockSize, accountInfo.BlockHash, - "Poll", isUnselectedRootFolder,token,progress); - using ( - StatusNotification.GetNotifier("Uploading {0}", "Uploaded {0}", - Path.GetFileName(localFilePath))) + var progress = new Progress(d => + StatusNotification.Notify(new StatusNotification(String.Format("Merkle Hashing for Upload {0:p} of {1}", d.Percentage, localInfo.Name)))); + + //Is it an unselected root folder + var isCreation = isUnselectedRootFolder ||//or a new folder under a selected parent? + (localInfo is DirectoryInfo && Selectives.IsSelected(accountInfo, localInfo) && tuple.FileState == null && tuple.ObjectInfo == null); + + + //Is this a result of a FILE move with no modifications? Then try to move it, + //to avoid an expensive hash + if (!await MoveForLocalMove(accountInfo, tuple)) { - await NetworkAgent.Uploader.UploadCloudFile(action, token).ConfigureAwait(false); + await UploadLocalFile(accountInfo, tuple, token, isCreation, localInfo,processedPaths, progress).ConfigureAwait(false); } //updateRecord( S = C ) //State updated by the uploader - - if (isUnselectedRootFolder) - { - ProcessChildren(accountInfo, tuple, agent, moves,token); + + if (isCreation ) + { + ProcessChildren(accountInfo, tuple, agent, moves,processedPaths,token); } } } @@ -626,9 +687,13 @@ namespace Pithos.Core.Agents { //Detect server moves var targetPath = MoveForServerMove(accountInfo, tuple); - StatusKeeper.StoreInfo(targetPath, tuple.ObjectInfo); - - AddOwnFolderToSelectives(accountInfo, tuple, targetPath); + if (targetPath != null) + { + Debug.Assert(tuple.Merkle != null); + StatusKeeper.StoreInfo(targetPath, tuple.ObjectInfo, tuple.Merkle); + + AddOwnFolderToSelectives(accountInfo, tuple, targetPath); + } } else { @@ -667,18 +732,122 @@ namespace Pithos.Core.Agents { //In case of error log and retry with the next poll Log.ErrorFormat("[SYNC] Failed for file {0}. Will Retry.\r\n{1}",tuple.FilePath,exc); + } + } + + private void DeleteLocalFile(FileAgent agent, string localFilePath) + { + StatusKeeper.SetFileState(localFilePath, FileStatus.Deleted, + FileOverlayStatus.Deleted, ""); + using (NetworkGate.Acquire(localFilePath, NetworkOperation.Deleting)) + { + agent.Delete(localFilePath); + } + //updateRecord(Remove C, L) + StatusKeeper.ClearFileStatus(localFilePath); + } + + private async Task DownloadCloudFile(AccountInfo accountInfo, StateTuple tuple, CancellationToken token, string targetPath) + { + //Don't create a new state for non-existent files + if (File.Exists(targetPath) || Directory.Exists(targetPath)) + StatusKeeper.SetFileState(targetPath, FileStatus.Modified, FileOverlayStatus.Modified,""); + + var finalHash=await + NetworkAgent.Downloader.DownloadCloudFile(accountInfo, tuple.ObjectInfo, targetPath, + token) + .ConfigureAwait(false); + //updateRecord( L = S ) + StatusKeeper.UpdateFileChecksum(targetPath, tuple.ObjectInfo.ETag, + finalHash); + + StatusKeeper.StoreInfo(targetPath, tuple.ObjectInfo,finalHash); + } + private async Task UploadLocalFile(AccountInfo accountInfo, StateTuple tuple, CancellationToken token, + bool isUnselectedRootFolder, FileSystemInfo localInfo, HashSet processedPaths, IProgress progress) + { + var action = new CloudUploadAction(accountInfo, localInfo, tuple.FileState, + accountInfo.BlockSize, accountInfo.BlockHash, + "Poll", isUnselectedRootFolder, token, progress,tuple.Merkle); + + using (StatusNotification.GetNotifier("Uploading {0}", "Uploaded {0}",true, + localInfo.Name)) + { + await NetworkAgent.Uploader.UploadCloudFile(action, token).ConfigureAwait(false); + } + + if (isUnselectedRootFolder) + { + var dirActions =( + from dir in ((DirectoryInfo) localInfo).EnumerateDirectories("*", SearchOption.AllDirectories) + let subAction = new CloudUploadAction(accountInfo, dir, null, + accountInfo.BlockSize, accountInfo.BlockHash, + "Poll", true, token, progress) + select subAction).ToList(); + foreach (var dirAction in dirActions) + { + processedPaths.Add(dirAction.LocalFile.FullName); + } + await TaskEx.WhenAll(dirActions.Select(a=>NetworkAgent.Uploader.UploadCloudFile(a,token)).ToArray()); } } + private async Task MoveForLocalMove(AccountInfo accountInfo, StateTuple tuple) + { + //Is the previous path missing? + if (String.IsNullOrWhiteSpace(tuple.OldFullPath)) + return false; + //Has the file locally, in which case it should be uploaded rather than moved? + if (tuple.OldChecksum != tuple.Merkle.TopHash.ToHashString()) + return false; + + var relativePath = tuple.ObjectInfo.RelativeUrlToFilePath(accountInfo.UserName); + var serverPath = Path.Combine(accountInfo.AccountPath, relativePath); + //Has the file been renamed on the server? + if (!tuple.OldFullPath.Equals(serverPath)) + { + ReportConflictForDoubleRename(tuple.FilePath); + return false; + } + + try + { + + var client = new CloudFilesClient(accountInfo); + var objectInfo = CloudAction.CreateObjectInfoFor(accountInfo, tuple.FileInfo); + objectInfo.X_Object_Hash = tuple.Merkle.TopHash.ToHashString(); + var containerPath = Path.Combine(accountInfo.AccountPath, objectInfo.Container.ToUnescapedString()); + //TODO: SImplify these multiple conversions from and to Uris + var oldName = tuple.OldFullPath.AsRelativeTo(containerPath); + //Then execute a move instead of an upload + using (StatusNotification.GetNotifier("Moving {0}", "Moved {0}", true,tuple.FileInfo.Name)) + { + await client.MoveObject(objectInfo.Account, objectInfo.Container, oldName.Replace('\\','/').ToEscapedUri(), + objectInfo.Container, objectInfo.Name).ConfigureAwait(false); + StatusKeeper.MoveFileState(tuple.OldFullPath, tuple.FilePath, objectInfo, tuple.Merkle); + //StatusKeeper.StoreInfo(tuple.FilePath,objectInfo,tuple.Merkle); + //StatusKeeper.ClearFolderStatus(tuple.FilePath); + } + return true; + } + catch (Exception exc) + { + Log.ErrorFormat("[MOVE] Failed for [{0}],:\r\n{1}", tuple.FilePath, exc); + //Return false to force an upload of the file + return false; + } + + } + private void AddOwnFolderToSelectives(AccountInfo accountInfo, StateTuple tuple, string targetPath) { //Not for shared folders if (tuple.ObjectInfo.IsShared==true) return; //Also ensure that any newly created folders are added to the selectives, if the original folder was selected - var containerPath = Path.Combine(accountInfo.AccountPath, tuple.ObjectInfo.Container); + var containerPath = Path.Combine(accountInfo.AccountPath, tuple.ObjectInfo.Container.ToUnescapedString()); //If this is a root folder encountered for the first time if (tuple.L == null && Directory.Exists(tuple.FileInfo.FullName) @@ -709,30 +878,55 @@ namespace Pithos.Core.Agents if (String.Equals(tuple.FilePath ,serverPath,StringComparison.InvariantCultureIgnoreCase)) return serverPath; - if (tuple.FileInfo.Exists) + //Has the file been renamed locally? + if (!String.IsNullOrWhiteSpace(tuple.OldFullPath) && !tuple.OldFullPath.Equals(tuple.FilePath)) + { + ReportConflictForDoubleRename(tuple.FilePath); + return null; + } + + tuple.FileInfo.Refresh(); + //The file/folder may not exist if it was moved because its parent moved + if (!tuple.FileInfo.Exists) { - using (StatusNotification.GetNotifier("Moving local {0}", "Moved local {0}", Path.GetFileName(tuple.FilePath))) - using(NetworkGate.Acquire(tuple.FilePath,NetworkOperation.Renaming)) + var target=FileInfoExtensions.FromPath(serverPath); + if (!target.Exists) { - var fi = tuple.FileInfo as FileInfo; - if (fi != null) - fi.MoveTo(serverPath); - var di = tuple.FileInfo as DirectoryInfo; - if (di != null) - di.MoveTo(serverPath); + Log.ErrorFormat("No source or target found while trying to move {0} to {1}", tuple.FileInfo.FullName, serverPath); } - StatusKeeper.StoreInfo(serverPath, tuple.ObjectInfo); + return serverPath; } - else + + using (StatusNotification.GetNotifier("Moving local {0}", "Moved local {0}", true,Path.GetFileName(tuple.FilePath))) + using(NetworkGate.Acquire(tuple.FilePath,NetworkOperation.Renaming)) { - Debug.Assert(false, "File does not exist"); + + var fi = tuple.FileInfo as FileInfo; + if (fi != null) + { + var targetFile = new FileInfo(serverPath); + if (!targetFile.Directory.Exists) + targetFile.Directory.Create(); + fi.MoveTo(serverPath); + } + var di = tuple.FileInfo as DirectoryInfo; + if (di != null) + { + var targetDir = new DirectoryInfo(serverPath); + if (!targetDir.Parent.Exists) + targetDir.Parent.Create(); + di.MoveTo(serverPath); + } } + + StatusKeeper.StoreInfo(serverPath, tuple.ObjectInfo); + return serverPath; } private void DeleteCloudFile(AccountInfo accountInfo, StateTuple tuple) { - using (StatusNotification.GetNotifier("Deleting server {0}", "Deleted server {0}", Path.GetFileName(tuple.FilePath))) + using (StatusNotification.GetNotifier("Deleting server {0}", "Deleted server {0}", true,Path.GetFileName(tuple.FilePath))) { StatusKeeper.SetFileState(tuple.FilePath, FileStatus.Deleted, @@ -742,167 +936,31 @@ namespace Pithos.Core.Agents } } - private void ProcessChildren(AccountInfo accountInfo, StateTuple tuple, FileAgent agent, ConcurrentDictionary moves,CancellationToken token) + private void ProcessChildren(AccountInfo accountInfo, StateTuple tuple, FileAgent agent, ConcurrentDictionary moves,HashSet processedPaths,CancellationToken token) { var dirInfo = tuple.FileInfo as DirectoryInfo; var folderTuples = from folder in dirInfo.EnumerateDirectories("*", SearchOption.AllDirectories) - select new StateTuple(folder); + select new StateTuple(folder){C=Signature.MERKLE_EMPTY}; + var fileTuples = from file in dirInfo.EnumerateFiles("*", SearchOption.AllDirectories) - select new StateTuple(file); + let state=StatusKeeper.GetStateByFilePath(file.FullName) + select new StateTuple(file){ + Merkle=StatusAgent.CalculateTreeHash(file,accountInfo,state, + Settings.HashingParallelism,token,null) + }; //Process folders first, to ensure folders appear on the sever as soon as possible - folderTuples.ApplyAction(async t =>await SyncSingleItem(accountInfo, t, agent, moves, token).ConfigureAwait(false)); + folderTuples.ApplyAction(t => SyncSingleItem(accountInfo, t, agent, moves, processedPaths,token).Wait()); - fileTuples.ApplyAction(async t => await SyncSingleItem(accountInfo, t, agent, moves, token).ConfigureAwait(false)); + fileTuples.ApplyAction(t => SyncSingleItem(accountInfo, t, agent, moves,processedPaths, token).Wait()); } - /* - * //Use the queue to retry locked file hashing - var fileQueue = new ConcurrentQueue(localInfos); - - var results = new List>(); - var backoff = 0; - while (fileQueue.Count > 0) - { - FileSystemInfo file; - fileQueue.TryDequeue(out file); - using (ThreadContext.Stacks["File"].Push(file.FullName)) - { - try - { - //Replace MD5 here, do the calc while syncing individual files - string hash ; - if (file is DirectoryInfo) - hash = MD5_EMPTY; - else - { - //Wait in case the FileAgent has requested a Pause - await _unPauseEvent.WaitAsync().ConfigureAwait(false); - - using (StatusNotification.GetNotifier("Hashing {0}", "", file.Name)) - { - hash = ((FileInfo)file).ComputeShortHash(StatusNotification); - backoff = 0; - } - } - results.Add(Tuple.Create(file, hash)); - } - catch (IOException exc) - { - Log.WarnFormat("[HASH] File in use, will retry [{0}]", exc); - fileQueue.Enqueue(file); - //If this is the only enqueued file - if (fileQueue.Count != 1) continue; - - - //Increase delay - if (backoff<60000) - backoff += 10000; - //Pause Polling for the specified time - } - if (backoff>0) - await PauseFor(backoff).ConfigureAwait(false); - } - } - return results; - - */ - private IEnumerable MergeSources(IEnumerable> infos, IEnumerable files, IEnumerable states, ConcurrentDictionary moves) - { - var tuplesByPath = files.ToDictionary(f => f.FullName, f => new StateTuple {FileInfo = f}); new Dictionary(); - - //For files that have state - foreach (var state in states) - { - StateTuple hashTuple; - if (tuplesByPath.TryGetValue(state.FilePath, out hashTuple)) - { - hashTuple.FileState = state; - UpdateMD5(hashTuple); - } - else - { - var fsInfo = FileInfoExtensions.FromPath(state.FilePath); - hashTuple = new StateTuple {FileInfo = fsInfo, FileState = state}; - tuplesByPath[state.FilePath] = hashTuple; - } - } - //for files that don't have state - foreach (var tuple in tuplesByPath.Values.Where(t => t.FileState == null)) - { - UpdateMD5(tuple); - } - - var tuplesByID = tuplesByPath.Values - .Where(tuple => tuple.FileState != null && tuple.FileState.ObjectID!=null) - .ToDictionary(tuple=>tuple.FileState.ObjectID,tuple=>tuple);//new Dictionary(); - - foreach (var info in infos) - { - StateTuple hashTuple; - var filePath = info.Item1; - var objectInfo = info.Item2; - var objectID = objectInfo.UUID; - - if (objectID != _emptyGuid && tuplesByID.TryGetValue(objectID, out hashTuple)) - { - hashTuple.ObjectInfo = objectInfo; - } - else if (tuplesByPath.TryGetValue(filePath, out hashTuple)) - { - hashTuple.ObjectInfo = objectInfo; - } - else - { - - - var fsInfo = FileInfoExtensions.FromPath(filePath); - hashTuple= new StateTuple {FileInfo = fsInfo, ObjectInfo = objectInfo}; - tuplesByPath[filePath] = hashTuple; - - if (objectInfo.UUID!=_emptyGuid) - tuplesByID[objectInfo.UUID] = hashTuple; - } - } - Debug.Assert(tuplesByPath.Values.All(t => t.HashesValid())); - return tuplesByPath.Values; - } + - private void UpdateMD5(StateTuple hashTuple) - { - - try - { - var hash = Signature.MD5_EMPTY; - if (hashTuple.FileInfo is FileInfo) - { - var file = hashTuple.FileInfo as FileInfo; - var stateDate = hashTuple.NullSafe(h => h.FileState).NullSafe(s => s.LastWriteDate) ?? - DateTime.MinValue; - if (file.LastWriteTime - stateDate < TimeSpan.FromSeconds(1) && - hashTuple.FileState.LastLength == file.Length) - { - hash = hashTuple.FileState.LastMD5; - } - else - { - //Modified, must calculate hash - hash = file.ComputeShortHash(StatusNotification); - StatusKeeper.UpdateLastMD5(file, hash); - } - } - hashTuple.C = hash; - hashTuple.MD5 = hash; - } - catch (IOException) - { - hashTuple.Locked = true; - } - } /// /// Returns the latest LastModified date from the list of objects, but only if it is before @@ -911,14 +969,14 @@ namespace Pithos.Core.Agents /// /// /// - private static DateTime? GetLatestDateBefore(DateTime? threshold, IList cloudObjects) + private static DateTimeOffset? GetLatestDateBefore(DateTime? threshold, IList cloudObjects) { - DateTime? maxDate = null; + DateTimeOffset? maxDate = null; if (cloudObjects!=null && cloudObjects.Count > 0) maxDate = cloudObjects.Max(obj => obj.Last_Modified); - if (maxDate == null || maxDate == DateTime.MinValue) + if (!maxDate.HasValue) return threshold; - if (threshold == null || threshold == DateTime.MinValue || threshold > maxDate) + if (!threshold.HasValue|| threshold > maxDate) return maxDate; return threshold; } @@ -930,21 +988,21 @@ namespace Pithos.Core.Agents /// /// /// - private static DateTime? GetLatestDateAfter(DateTime? threshold, IList cloudObjects) + private static DateTimeOffset? GetLatestDateAfter(DateTimeOffset? threshold, IList cloudObjects) { - DateTime? maxDate = null; + DateTimeOffset? maxDate = null; if (cloudObjects!=null && cloudObjects.Count > 0) maxDate = cloudObjects.Max(obj => obj.Last_Modified); - if (maxDate == null || maxDate == DateTime.MinValue) + if (!maxDate.HasValue) return threshold; - if (threshold == null || threshold == DateTime.MinValue || threshold < maxDate) + if (!threshold.HasValue|| threshold < maxDate) return maxDate; return threshold; } readonly AccountsDifferencer _differencer = new AccountsDifferencer(); private bool _pause; - private readonly string _emptyGuid = Guid.Empty.ToString(); + @@ -961,6 +1019,19 @@ namespace Pithos.Core.Agents StatusNotification.NotifyChange(message, TraceLevel.Warning); } + private void ReportConflictForDoubleRename(string localFilePath) + { + if (String.IsNullOrWhiteSpace(localFilePath)) + throw new ArgumentNullException("localFilePath"); + Contract.EndContractBlock(); + + StatusKeeper.SetFileState(localFilePath, FileStatus.Conflict, FileOverlayStatus.Conflict, "File renamed both locally and on the server"); + UpdateStatus(PithosStatus.HasConflicts); + var message = String.Format("Double rename conflict detected for file {0}", localFilePath); + Log.Warn(message); + StatusNotification.NotifyChange(message, TraceLevel.Warning); + } + /// /// Notify the UI to update the visual status @@ -983,8 +1054,8 @@ namespace Pithos.Core.Agents private static void CreateContainerFolders(AccountInfo accountInfo, IEnumerable containers) { var containerPaths = from container in containers - let containerPath = Path.Combine(accountInfo.AccountPath, container.Name) - where container.Name != FolderConstants.TrashContainer && !Directory.Exists(containerPath) + let containerPath = Path.Combine(accountInfo.AccountPath, container.Name.ToUnescapedString()) + where container.Name.ToString() != FolderConstants.TrashContainer && !Directory.Exists(containerPath) select containerPath; foreach (var path in containerPaths) @@ -1001,6 +1072,9 @@ namespace Pithos.Core.Agents public void RemoveAccount(AccountInfo accountInfo) { + if (accountInfo == null) + return; + AccountInfo account; _accounts.TryRemove(accountInfo.AccountKey, out account);