2 /* -----------------------------------------------------------------------
\r
3 * <copyright file="PollAgent.cs" company="GRNet">
\r
5 * Copyright 2011-2012 GRNET S.A. All rights reserved.
\r
7 * Redistribution and use in source and binary forms, with or
\r
8 * without modification, are permitted provided that the following
\r
9 * conditions are met:
\r
11 * 1. Redistributions of source code must retain the above
\r
12 * copyright notice, this list of conditions and the following
\r
15 * 2. Redistributions in binary form must reproduce the above
\r
16 * copyright notice, this list of conditions and the following
\r
17 * disclaimer in the documentation and/or other materials
\r
18 * provided with the distribution.
\r
21 * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
\r
22 * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
\r
23 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
\r
24 * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
\r
25 * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
\r
26 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
\r
27 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
\r
28 * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
\r
29 * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
\r
30 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
\r
31 * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
\r
32 * POSSIBILITY OF SUCH DAMAGE.
\r
34 * The views and conclusions contained in the software and
\r
35 * documentation are those of the authors and should not be
\r
36 * interpreted as representing official policies, either expressed
\r
37 * or implied, of GRNET S.A.
\r
39 * -----------------------------------------------------------------------
\r
43 using System.Collections.Concurrent;
\r
44 using System.ComponentModel.Composition;
\r
45 using System.Diagnostics;
\r
46 using System.Diagnostics.Contracts;
\r
48 using System.Reflection;
\r
49 using System.Threading;
\r
50 using System.Threading.Tasks;
\r
51 using Pithos.Interfaces;
\r
52 using Pithos.Network;
\r
55 namespace Pithos.Core.Agents
\r
58 using System.Collections.Generic;
\r
61 /*public class PollRequest
\r
63 public DateTime? Since { get; set; }
\r
64 public IEnumerable<string> Batch { get; set; }
\r
69 /// PollAgent periodically polls the server to detect object changes. The agent retrieves a listing of all
\r
70 /// objects and compares it with a previously cached version to detect differences.
\r
71 /// New files are downloaded, missing files are deleted from the local file system and common files are compared
\r
72 /// to determine the appropriate action
\r
75 public class PollAgent
\r
77 private static readonly ILog Log = LogManager.GetLogger(MethodBase.GetCurrentMethod().DeclaringType);
\r
79 [System.ComponentModel.Composition.Import]
\r
80 public IStatusKeeper StatusKeeper { get; set; }
\r
82 [System.ComponentModel.Composition.Import]
\r
83 public IPithosSettings Settings { get; set; }
\r
85 [System.ComponentModel.Composition.Import]
\r
86 public NetworkAgent NetworkAgent { get; set; }
\r
88 [System.ComponentModel.Composition.Import]
\r
89 public Selectives Selectives { get; set; }
\r
91 public IStatusNotification StatusNotification { get; set; }
\r
93 private CancellationTokenSource _currentOperationCancellation = new CancellationTokenSource();
\r
95 public void CancelCurrentOperation()
\r
97 //What does it mean to cancel the current upload/download?
\r
98 //Obviously, the current operation will be cancelled by throwing
\r
99 //a cancellation exception.
\r
101 //The default behavior is to retry any operations that throw.
\r
102 //Obviously this is not what we want in this situation.
\r
103 //The cancelled operation should NOT bea retried.
\r
105 //This can be done by catching the cancellation exception
\r
106 //and avoiding the retry.
\r
109 //Have to reset the cancellation source - it is not possible to reset the source
\r
110 //Have to prevent a case where an operation requests a token from the old source
\r
111 var oldSource = Interlocked.Exchange(ref _currentOperationCancellation, new CancellationTokenSource());
\r
112 oldSource.Cancel();
\r
124 _unPauseEvent.Set();
\r
127 _unPauseEvent.Reset();
\r
132 public CancellationToken CancellationToken
\r
134 get { return _currentOperationCancellation.Token; }
\r
137 private bool _firstPoll = true;
\r
139 //The Sync Event signals a manual synchronisation
\r
140 private readonly AsyncManualResetEvent _syncEvent = new AsyncManualResetEvent();
\r
142 private readonly AsyncManualResetEvent _unPauseEvent = new AsyncManualResetEvent(true);
\r
144 private readonly ConcurrentDictionary<string, DateTime> _lastSeen = new ConcurrentDictionary<string, DateTime>();
\r
145 private readonly ConcurrentDictionary<Uri, AccountInfo> _accounts = new ConcurrentDictionary<Uri,AccountInfo>();
\r
147 //private readonly ActionBlock<PollRequest> _pollAction;
\r
149 readonly HashSet<string> _knownContainers = new HashSet<string>();
\r
153 /// Start a manual synchronization
\r
155 public void SynchNow(IEnumerable<string> paths=null)
\r
157 _batchQueue.Enqueue(paths);
\r
160 //_pollAction.Post(new PollRequest {Batch = paths});
\r
163 readonly ConcurrentQueue<IEnumerable<string>> _batchQueue=new ConcurrentQueue<IEnumerable<string>>();
\r
165 ConcurrentDictionary<string,MovedEventArgs> _moves=new ConcurrentDictionary<string, MovedEventArgs>();
\r
167 public void PostMove(MovedEventArgs args)
\r
169 TaskEx.Run(() => _moves.AddOrUpdate(args.OldFullPath, args,(s,e)=>e));
\r
173 /// Remote files are polled periodically. Any changes are processed
\r
175 /// <param name="since"></param>
\r
176 /// <returns></returns>
\r
177 public void PollRemoteFiles(DateTime? since = null)
\r
179 if (Log.IsDebugEnabled)
\r
180 Log.DebugFormat("Polling changes after [{0}]",since);
\r
182 Debug.Assert(Thread.CurrentThread.IsBackground, "Polling Ended up in the main thread!");
\r
186 using (ThreadContext.Stacks["Retrieve Remote"].Push("All accounts"))
\r
188 //If this poll fails, we will retry with the same since value
\r
189 var nextSince = since;
\r
192 _unPauseEvent.Wait();
\r
193 UpdateStatus(PithosStatus.PollSyncing);
\r
195 var accountBatches=new Dictionary<Uri, IEnumerable<string>>();
\r
196 IEnumerable<string> batch = null;
\r
197 if (_batchQueue.TryDequeue(out batch) && batch != null)
\r
198 foreach (var account in _accounts.Values)
\r
200 var accountBatch = batch.Where(path => path.IsAtOrBelow(account.AccountPath));
\r
201 accountBatches[account.AccountKey] = accountBatch;
\r
204 var moves=Interlocked.Exchange(ref _moves, new ConcurrentDictionary<string, MovedEventArgs>());
\r
206 var tasks = new List<Task<DateTime?>>();
\r
207 foreach(var accountInfo in _accounts.Values)
\r
209 IEnumerable<string> accountBatch ;
\r
210 accountBatches.TryGetValue(accountInfo.AccountKey,out accountBatch);
\r
211 var t=ProcessAccountFiles (accountInfo, accountBatch, moves,since);
\r
215 var nextTimes=TaskEx.WhenAll(tasks.ToList()).Result;
\r
217 _firstPoll = false;
\r
218 //Reschedule the poll with the current timestamp as a "since" value
\r
220 if (nextTimes.Length>0)
\r
221 nextSince = nextTimes.Min();
\r
222 if (Log.IsDebugEnabled)
\r
223 Log.DebugFormat("Next Poll for changes since [{0}]",nextSince);
\r
225 catch (Exception ex)
\r
227 Log.ErrorFormat("Error while processing accounts\r\n{0}", ex);
\r
228 //In case of failure retry with the same "since" value
\r
231 UpdateStatus(PithosStatus.PollComplete);
\r
232 //The multiple try blocks are required because we can't have an await call
\r
233 //inside a finally block
\r
234 //TODO: Find a more elegant solution for reschedulling in the event of an exception
\r
237 //Wait for the polling interval to pass or the Sync event to be signalled
\r
238 nextSince = WaitForScheduledOrManualPoll(nextSince).Result;
\r
242 //Ensure polling is scheduled even in case of error
\r
243 TaskEx.Run(()=>PollRemoteFiles(nextSince));
\r
244 //_pollAction.Post(new PollRequest {Since = nextSince});
\r
250 /// Wait for the polling period to expire or a manual sync request
\r
252 /// <param name="since"></param>
\r
253 /// <returns></returns>
\r
254 private async Task<DateTime?> WaitForScheduledOrManualPoll(DateTime? since)
\r
256 var sync = _syncEvent.WaitAsync();
\r
257 var delay = TimeSpan.FromSeconds(Settings.PollingInterval);
\r
258 if (Log.IsDebugEnabled)
\r
259 Log.DebugFormat("Next Poll at [{0}]", DateTime.Now.Add(delay));
\r
260 var wait = TaskEx.Delay(delay);
\r
262 var signaledTask = await TaskEx.WhenAny(sync, wait).ConfigureAwait(false);
\r
264 //Pausing takes precedence over manual sync or awaiting
\r
265 _unPauseEvent.Wait();
\r
267 //Wait for network processing to finish before polling
\r
268 var pauseTask=NetworkAgent.ProceedEvent.WaitAsync();
\r
269 await TaskEx.WhenAll(signaledTask, pauseTask).ConfigureAwait(false);
\r
271 //If polling is signalled by SynchNow, ignore the since tag
\r
272 if (sync.IsCompleted)
\r
274 _syncEvent.Reset();
\r
282 public async Task<DateTime?> ProcessAccountFiles(AccountInfo accountInfo, IEnumerable<string> accountBatch, ConcurrentDictionary<string, MovedEventArgs> moves, DateTime? since = null)
\r
284 if (accountInfo == null)
\r
285 throw new ArgumentNullException("accountInfo");
\r
286 if (String.IsNullOrWhiteSpace(accountInfo.AccountPath))
\r
287 throw new ArgumentException("The AccountInfo.AccountPath is empty", "accountInfo");
\r
288 Contract.EndContractBlock();
\r
291 using (ThreadContext.Stacks["Retrieve Remote"].Push(accountInfo.UserName))
\r
294 await NetworkAgent.GetDeleteAwaiter().ConfigureAwait(false);
\r
296 Log.Info("Scheduled");
\r
297 var client = new CloudFilesClient(accountInfo);
\r
299 //We don't need to check the trash container
\r
300 var allContainers=await client.ListContainers(accountInfo.UserName).ConfigureAwait(false);
\r
301 var containers = allContainers
\r
302 .Where(c=>c.Name.ToString()!="trash")
\r
306 CreateContainerFolders(accountInfo, containers);
\r
308 //The nextSince time fallback time is the same as the current.
\r
309 //If polling succeeds, the next Since time will be the smallest of the maximum modification times
\r
310 //of the shared and account objects
\r
311 var nextSince = since;
\r
315 //Wait for any deletions to finish
\r
316 await NetworkAgent.GetDeleteAwaiter().ConfigureAwait(false);
\r
317 //Get the poll time now. We may miss some deletions but it's better to keep a file that was deleted
\r
318 //than delete a file that was created while we were executing the poll
\r
320 //Get the list of server objects changed since the last check
\r
321 //The name of the container is passed as state in order to create a dictionary of tasks in a subsequent step
\r
322 var listObjects = (from container in containers
\r
323 select Task<IList<ObjectInfo>>.Factory.StartNew(_ =>
\r
324 client.ListObjects(accountInfo.UserName, container.Name, since), container.Name)).ToList();
\r
326 var listShared = Task<IList<ObjectInfo>>.Factory.StartNew(_ =>
\r
327 client.ListSharedObjects(_knownContainers,since), "shared");
\r
328 listObjects.Add(listShared);
\r
329 var listTasks = await Task.Factory.WhenAll(listObjects.ToArray()).ConfigureAwait(false);
\r
331 using (ThreadContext.Stacks["SCHEDULE"].Push("Process Results"))
\r
333 var dict = listTasks.ToDictionary(t => t.AsyncState);
\r
335 //Get all non-trash objects. Remember, the container name is stored in AsyncState
\r
336 var remoteObjects = (from objectList in listTasks
\r
337 where (string)objectList.AsyncState.ToString() != "trash"
\r
338 from obj in objectList.Result
\r
339 orderby obj.Bytes ascending
\r
340 select obj).ToList();
\r
342 //Get the latest remote object modification date, only if it is after
\r
343 //the original since date
\r
344 nextSince = GetLatestDateAfter(nextSince, remoteObjects);
\r
346 var sharedObjects = dict["shared"].Result;
\r
348 //DON'T process trashed files
\r
349 //If some files are deleted and added again to a folder, they will be deleted
\r
350 //even though they are new.
\r
351 //We would have to check file dates and hashes to ensure that a trashed file
\r
352 //can be deleted safely from the local hard drive.
\r
354 //Items with the same name, hash may be both in the container and the trash
\r
355 //Don't delete items that exist in the container
\r
356 var realTrash = from trash in trashObjects
\r
358 !remoteObjects.Any(
\r
359 info => info.Name == trash.Name && info.Hash == trash.Hash)
\r
361 ProcessTrashedFiles(accountInfo, realTrash);
\r
364 var cleanRemotes = (from info in remoteObjects.Union(sharedObjects)
\r
365 let name = info.Name.ToUnescapedString()??""
\r
366 where !name.EndsWith(".ignore", StringComparison.InvariantCultureIgnoreCase) &&
\r
367 !name.StartsWith(FolderConstants.CacheFolder + "/",
\r
368 StringComparison.InvariantCultureIgnoreCase)
\r
369 select info).ToList();
\r
372 StatusKeeper.CleanupOrphanStates();
\r
374 var differencer = _differencer.PostSnapshot(accountInfo, cleanRemotes);
\r
375 var currentRemotes = differencer.Current.ToList();
\r
376 StatusKeeper.CleanupStaleStates(accountInfo, currentRemotes);
\r
378 //var filterUris = Selectives.SelectiveUris[accountInfo.AccountKey];
\r
380 //May have to wait if the FileAgent has asked for a Pause, due to local changes
\r
381 await _unPauseEvent.WaitAsync().ConfigureAwait(false);
\r
383 //Get the local files here
\r
384 var agent = AgentLocator<FileAgent>.Get(accountInfo.AccountPath);
\r
385 var files = LoadLocalFileTuples(accountInfo, accountBatch);
\r
388 var states = StatusKeeper.GetAllStates();
\r
390 var infos = (from remote in currentRemotes
\r
391 let path = remote.RelativeUrlToFilePath(accountInfo.UserName)
\r
392 let info=agent.GetFileSystemInfo(path)
\r
393 select Tuple.Create(info.FullName,remote))
\r
396 var token = _currentOperationCancellation.Token;
\r
398 var tuples = MergeSources(infos, files, states,moves).ToList();
\r
400 var processedPaths = new HashSet<string>();
\r
401 //Process only the changes in the batch file, if one exists
\r
402 var stateTuples = accountBatch==null?tuples:tuples.Where(t => accountBatch.Contains(t.FilePath));
\r
403 foreach (var tuple in stateTuples.Where(s=>!s.Locked))
\r
405 await _unPauseEvent.WaitAsync().ConfigureAwait(false);
\r
407 //Set the Merkle Hash
\r
408 //SetMerkleHash(accountInfo, tuple);
\r
410 await SyncSingleItem(accountInfo, tuple, agent, moves,processedPaths,token).ConfigureAwait(false);
\r
419 MarkSuspectedDeletes(accountInfo, cleanRemotes);
\r
424 Log.Info("[LISTENER] End Processing");
\r
427 catch (Exception ex)
\r
429 Log.ErrorFormat("[FAIL] ListObjects for {0} in ProcessRemoteFiles with {1}", accountInfo.UserName, ex);
\r
433 Log.Info("[LISTENER] Finished");
\r
439 private static void SetMerkleHash(AccountInfo accountInfo, StateTuple tuple)
\r
441 //The Merkle hash for directories is that of an empty buffer
\r
442 if (tuple.FileInfo is DirectoryInfo)
\r
443 tuple.C = MERKLE_EMPTY;
\r
444 else if (tuple.FileState != null && tuple.MD5 == tuple.FileState.ETag)
\r
446 //If there is a state whose MD5 matches, load the merkle hash from the file state
\r
447 //insteaf of calculating it
\r
448 tuple.C = tuple.FileState.Checksum;
\r
452 tuple.Merkle = Signature.CalculateTreeHashAsync((FileInfo)tuple.FileInfo, accountInfo.BlockSize, accountInfo.BlockHash,1,progress);
\r
453 //tuple.C=tuple.Merkle.TopHash.ToHashString();
\r
458 private IEnumerable<FileSystemInfo> LoadLocalFileTuples(AccountInfo accountInfo,IEnumerable<string> batch )
\r
460 using (ThreadContext.Stacks["Account Files Hashing"].Push(accountInfo.UserName))
\r
462 var batchPaths = (batch==null)?new List<string>():batch.ToList();
\r
463 IEnumerable<FileSystemInfo> localInfos=AgentLocator<FileAgent>.Get(accountInfo.AccountPath)
\r
464 .EnumerateFileSystemInfos();
\r
465 if (batchPaths.Count>0)
\r
466 localInfos= localInfos.Where(fi => batchPaths.Contains(fi.FullName));
\r
473 /// Wait and Pause the agent while waiting
\r
475 /// <param name="backoff"></param>
\r
476 /// <returns></returns>
\r
477 private async Task PauseFor(int backoff)
\r
481 await TaskEx.Delay(backoff).ConfigureAwait(false);
\r
485 private async Task SyncSingleItem(AccountInfo accountInfo, StateTuple tuple, FileAgent agent, ConcurrentDictionary<string, MovedEventArgs> moves,HashSet<string> processedPaths, CancellationToken token)
\r
487 Log.DebugFormat("Sync [{0}] C:[{1}] L:[{2}] S:[{3}]", tuple.FilePath, tuple.C, tuple.L, tuple.S);
\r
489 //If the processed paths already contain the current path, exit
\r
490 if (!processedPaths.Add(tuple.FilePath))
\r
495 bool isInferredParent = tuple.ObjectInfo != null && tuple.ObjectInfo.UUID.StartsWith("00000000-0000-0000");
\r
497 var localFilePath = tuple.FilePath;
\r
498 //Don't use the tuple info, it may have been deleted
\r
499 var localInfo = FileInfoExtensions.FromPath(localFilePath);
\r
502 var isUnselectedRootFolder = agent.IsUnselectedRootFolder(tuple.FilePath);
\r
504 //Unselected root folders that have not yet been uploaded should be uploaded and added to the
\r
505 //selective folders
\r
507 if (!Selectives.IsSelected(accountInfo, localFilePath) &&
\r
508 !(isUnselectedRootFolder && tuple.ObjectInfo == null))
\r
511 // Local file unchanged? If both C and L are null, make sure it's because
\r
512 //both the file is missing and the state checksum is not missing
\r
513 if (tuple.C == tuple.L /*&& (localInfo.Exists || tuple.FileState == null)*/)
\r
516 //Server unchanged?
\r
517 if (tuple.S == tuple.L)
\r
519 // No server changes
\r
520 //Has the file been renamed locally?
\r
521 if (!MoveForLocalMove(accountInfo,tuple))
\r
522 //Has the file been renamed on the server?
\r
523 MoveForServerMove(accountInfo, tuple);
\r
527 //Different from server
\r
528 //Does the server file exist?
\r
529 if (tuple.S == null)
\r
531 //Server file doesn't exist
\r
532 //deleteObjectFromLocal()
\r
534 StatusNotification.GetNotifier("Deleting local {0}", "Deleted local {0}",
\r
537 DeleteLocalFile(agent, localFilePath);
\r
542 //Server file exists
\r
543 //downloadServerObject() // Result: L = S
\r
544 //If the file has moved on the server, move it locally before downloading
\r
546 StatusNotification.GetNotifier("Downloading {0}", "Downloaded {0}",
\r
549 var targetPath = MoveForServerMove(accountInfo, tuple);
\r
550 if (targetPath != null)
\r
553 await DownloadCloudFile(accountInfo, tuple, token, targetPath).ConfigureAwait(false);
\r
555 AddOwnFolderToSelectives(accountInfo, tuple, targetPath);
\r
560 StatusKeeper.SetFileState(targetPath, FileStatus.Unchanged,
\r
561 FileOverlayStatus.Normal, "");
\r
569 //Local changes found
\r
571 //Server unchanged?
\r
572 if (tuple.S == tuple.L)
\r
574 //The FileAgent selective sync checks for new root folder files
\r
575 if (!agent.Ignore(localFilePath))
\r
577 if ((tuple.C == null || !localInfo.Exists) && tuple.ObjectInfo != null)
\r
579 //deleteObjectFromServer()
\r
580 DeleteCloudFile(accountInfo, tuple);
\r
581 //updateRecord( Remove L, S)
\r
585 //uploadLocalObject() // Result: S = C, L = S
\r
586 var progress = new Progress<double>(d =>
\r
587 StatusNotification.Notify(new StatusNotification(String.Format("Merkle Hashing for Upload {0:p} of {1}", d, localInfo.Name))));
\r
589 //Is it an unselected root folder
\r
590 var isCreation = isUnselectedRootFolder ||//or a new folder under a selected parent?
\r
591 (localInfo is DirectoryInfo && Selectives.IsSelected(accountInfo, localInfo) && tuple.FileState == null && tuple.ObjectInfo == null);
\r
594 //Is this a result of a FILE move with no modifications? Then try to move it,
\r
595 //to avoid an expensive hash
\r
596 if (!MoveForLocalMove(accountInfo, tuple))
\r
598 await UploadLocalFile(accountInfo, tuple, token, isCreation, localInfo,processedPaths, progress).ConfigureAwait(false);
\r
601 //updateRecord( S = C )
\r
602 //State updated by the uploader
\r
606 ProcessChildren(accountInfo, tuple, agent, moves,processedPaths,token);
\r
613 if (tuple.C == tuple.S)
\r
615 // (Identical Changes) Result: L = S
\r
618 //Don't update anything for nonexistend server files
\r
619 if (tuple.S != null)
\r
621 //Detect server moves
\r
622 var targetPath = MoveForServerMove(accountInfo, tuple);
\r
623 if (targetPath != null)
\r
625 Debug.Assert(tuple.Merkle != null);
\r
626 StatusKeeper.StoreInfo(targetPath, tuple.ObjectInfo, tuple.Merkle);
\r
628 AddOwnFolderToSelectives(accountInfo, tuple, targetPath);
\r
633 //At this point, C==S==NULL and we have a stale state (L)
\r
634 //Log the stale tuple for investigation
\r
635 Log.WarnFormat("Stale tuple detected FilePathPath:[{0}], State:[{1}], LocalFile:[{2}]", tuple.FilePath, tuple.FileState, tuple.FileInfo);
\r
638 if (!String.IsNullOrWhiteSpace(tuple.FilePath))
\r
639 StatusKeeper.ClearFileStatus(tuple.FilePath);
\r
644 if ((tuple.C == null || !localInfo.Exists) && tuple.ObjectInfo != null)
\r
646 //deleteObjectFromServer()
\r
647 DeleteCloudFile(accountInfo, tuple);
\r
648 //updateRecord(Remove L, S)
\r
650 //If both the local and server files are missing, the state is stale
\r
651 else if (!localInfo.Exists && (tuple.S == null || tuple.ObjectInfo == null))
\r
653 StatusKeeper.ClearFileStatus(localInfo.FullName);
\r
657 ReportConflictForMismatch(localFilePath);
\r
658 //identifyAsConflict() // Manual action required
\r
664 catch (Exception exc)
\r
666 //In case of error log and retry with the next poll
\r
667 Log.ErrorFormat("[SYNC] Failed for file {0}. Will Retry.\r\n{1}",tuple.FilePath,exc);
\r
671 private void DeleteLocalFile(FileAgent agent, string localFilePath)
\r
673 StatusKeeper.SetFileState(localFilePath, FileStatus.Deleted,
\r
674 FileOverlayStatus.Deleted, "");
\r
675 using (NetworkGate.Acquire(localFilePath, NetworkOperation.Deleting))
\r
677 agent.Delete(localFilePath);
\r
679 //updateRecord(Remove C, L)
\r
680 StatusKeeper.ClearFileStatus(localFilePath);
\r
683 private async Task DownloadCloudFile(AccountInfo accountInfo, StateTuple tuple, CancellationToken token, string targetPath)
\r
685 StatusKeeper.SetFileState(targetPath, FileStatus.Modified, FileOverlayStatus.Modified,
\r
688 var finalHash=await
\r
689 NetworkAgent.Downloader.DownloadCloudFile(accountInfo, tuple.ObjectInfo, targetPath,
\r
691 .ConfigureAwait(false);
\r
692 //updateRecord( L = S )
\r
693 StatusKeeper.UpdateFileChecksum(targetPath, tuple.ObjectInfo.ETag,
\r
696 StatusKeeper.StoreInfo(targetPath, tuple.ObjectInfo,finalHash);
\r
699 private async Task UploadLocalFile(AccountInfo accountInfo, StateTuple tuple, CancellationToken token,
\r
700 bool isUnselectedRootFolder, FileSystemInfo localInfo, HashSet<string> processedPaths, Progress<double> progress)
\r
702 var action = new CloudUploadAction(accountInfo, localInfo, tuple.FileState,
\r
703 accountInfo.BlockSize, accountInfo.BlockHash,
\r
704 "Poll", isUnselectedRootFolder, token, progress,tuple.Merkle);
\r
706 using (StatusNotification.GetNotifier("Uploading {0}", "Uploaded {0}",
\r
709 await NetworkAgent.Uploader.UploadCloudFile(action, token).ConfigureAwait(false);
\r
712 if (isUnselectedRootFolder)
\r
715 from dir in ((DirectoryInfo) localInfo).EnumerateDirectories("*", SearchOption.AllDirectories)
\r
716 let subAction = new CloudUploadAction(accountInfo, dir, null,
\r
717 accountInfo.BlockSize, accountInfo.BlockHash,
\r
718 "Poll", true, token, progress)
\r
719 select subAction).ToList();
\r
720 foreach (var dirAction in dirActions)
\r
722 processedPaths.Add(dirAction.LocalFile.FullName);
\r
725 await TaskEx.WhenAll(dirActions.Select(a=>NetworkAgent.Uploader.UploadCloudFile(a,token)).ToArray());
\r
729 private bool MoveForLocalMove(AccountInfo accountInfo, StateTuple tuple)
\r
731 //Is the file a directory or previous path missing?
\r
732 if (tuple.FileInfo is DirectoryInfo)
\r
734 //Is the previous path missing?
\r
735 if (String.IsNullOrWhiteSpace(tuple.OldFullPath))
\r
737 //Has the file locally, in which case it should be uploaded rather than moved?
\r
738 if (tuple.OldChecksum != tuple.Merkle.TopHash.ToHashString())
\r
741 var relativePath = tuple.ObjectInfo.RelativeUrlToFilePath(accountInfo.UserName);
\r
742 var serverPath = Path.Combine(accountInfo.AccountPath, relativePath);
\r
743 //Has the file been renamed on the server?
\r
744 if (!tuple.OldFullPath.Equals(serverPath))
\r
746 ReportConflictForDoubleRename(tuple.FilePath);
\r
753 var client = new CloudFilesClient(accountInfo);
\r
754 var objectInfo = CloudAction.CreateObjectInfoFor(accountInfo, tuple.FileInfo);
\r
755 var containerPath = Path.Combine(accountInfo.AccountPath, objectInfo.Container.ToUnescapedString());
\r
756 //TODO: SImplify these multiple conversions from and to Uris
\r
757 var oldName = tuple.OldFullPath.AsRelativeTo(containerPath);
\r
758 //Then execute a move instead of an upload
\r
759 using (StatusNotification.GetNotifier("Moving {0}", "Moved {0}", tuple.FileInfo.Name))
\r
761 client.MoveObject(objectInfo.Account, objectInfo.Container, oldName.ToEscapedUri(),
\r
762 objectInfo.Container, objectInfo.Name);
\r
766 catch (Exception exc)
\r
768 Log.ErrorFormat("[MOVE] Failed for [{0}],:\r\n{1}", tuple.FilePath, exc);
\r
769 //Return false to force an upload of the file
\r
775 private void AddOwnFolderToSelectives(AccountInfo accountInfo, StateTuple tuple, string targetPath)
\r
777 //Not for shared folders
\r
778 if (tuple.ObjectInfo.IsShared==true)
\r
780 //Also ensure that any newly created folders are added to the selectives, if the original folder was selected
\r
781 var containerPath = Path.Combine(accountInfo.AccountPath, tuple.ObjectInfo.Container.ToUnescapedString());
\r
783 //If this is a root folder encountered for the first time
\r
784 if (tuple.L == null && Directory.Exists(tuple.FileInfo.FullName)
\r
785 && (tuple.FileInfo.FullName.IsAtOrBelow(containerPath)))
\r
788 var relativePath = tuple.ObjectInfo.RelativeUrlToFilePath(accountInfo.UserName);
\r
789 var initialPath = Path.Combine(accountInfo.AccountPath, relativePath);
\r
791 //var hasMoved = true;// !initialPath.Equals(targetPath);
\r
792 //If the new path is under a selected folder, add it to the selectives as well
\r
793 if (Selectives.IsSelected(accountInfo, initialPath))
\r
795 Selectives.AddUri(accountInfo, tuple.ObjectInfo.Uri);
\r
796 Selectives.Save(accountInfo);
\r
801 private string MoveForServerMove(AccountInfo accountInfo, StateTuple tuple)
\r
803 if (tuple.ObjectInfo == null)
\r
805 var relativePath = tuple.ObjectInfo.RelativeUrlToFilePath(accountInfo.UserName);
\r
806 var serverPath = Path.Combine(accountInfo.AccountPath, relativePath);
\r
808 //Compare Case Insensitive
\r
809 if (String.Equals(tuple.FilePath ,serverPath,StringComparison.InvariantCultureIgnoreCase))
\r
812 //Has the file been renamed locally?
\r
813 if (!String.IsNullOrWhiteSpace(tuple.OldFullPath) && !tuple.OldFullPath.Equals(tuple.FilePath))
\r
815 ReportConflictForDoubleRename(tuple.FilePath);
\r
819 tuple.FileInfo.Refresh();
\r
820 //The file/folder may not exist if it was moved because its parent moved
\r
821 if (!tuple.FileInfo.Exists)
\r
823 var target=FileInfoExtensions.FromPath(serverPath);
\r
824 if (!target.Exists)
\r
826 Log.ErrorFormat("No source or target found while trying to move {0} to {1}", tuple.FileInfo.FullName, serverPath);
\r
831 using (StatusNotification.GetNotifier("Moving local {0}", "Moved local {0}", Path.GetFileName(tuple.FilePath)))
\r
832 using(NetworkGate.Acquire(tuple.FilePath,NetworkOperation.Renaming))
\r
835 var fi = tuple.FileInfo as FileInfo;
\r
838 var targetFile = new FileInfo(serverPath);
\r
839 if (!targetFile.Directory.Exists)
\r
840 targetFile.Directory.Create();
\r
841 fi.MoveTo(serverPath);
\r
843 var di = tuple.FileInfo as DirectoryInfo;
\r
846 var targetDir = new DirectoryInfo(serverPath);
\r
847 if (!targetDir.Parent.Exists)
\r
848 targetDir.Parent.Create();
\r
849 di.MoveTo(serverPath);
\r
853 StatusKeeper.StoreInfo(serverPath, tuple.ObjectInfo);
\r
858 private void DeleteCloudFile(AccountInfo accountInfo, StateTuple tuple)
\r
860 using (StatusNotification.GetNotifier("Deleting server {0}", "Deleted server {0}", Path.GetFileName(tuple.FilePath)))
\r
863 StatusKeeper.SetFileState(tuple.FilePath, FileStatus.Deleted,
\r
864 FileOverlayStatus.Deleted, "");
\r
865 NetworkAgent.DeleteAgent.DeleteCloudFile(accountInfo, tuple.ObjectInfo);
\r
866 StatusKeeper.ClearFileStatus(tuple.FilePath);
\r
870 private void ProcessChildren(AccountInfo accountInfo, StateTuple tuple, FileAgent agent, ConcurrentDictionary<string, MovedEventArgs> moves,HashSet<string> processedPaths,CancellationToken token)
\r
873 var dirInfo = tuple.FileInfo as DirectoryInfo;
\r
874 var folderTuples = from folder in dirInfo.EnumerateDirectories("*", SearchOption.AllDirectories)
\r
875 select new StateTuple(folder){C=Signature.MERKLE_EMPTY};
\r
877 var fileTuples = from file in dirInfo.EnumerateFiles("*", SearchOption.AllDirectories)
\r
878 let state=StatusKeeper.GetStateByFilePath(file.FullName)
\r
879 select new StateTuple(file){
\r
880 Merkle=StatusAgent.CalculateTreeHash(file,accountInfo,state,
\r
881 Settings.HashingParallelism,token,null)
\r
884 //Process folders first, to ensure folders appear on the sever as soon as possible
\r
885 folderTuples.ApplyAction(t => SyncSingleItem(accountInfo, t, agent, moves, processedPaths,token).Wait());
\r
887 fileTuples.ApplyAction(t => SyncSingleItem(accountInfo, t, agent, moves,processedPaths, token).Wait());
\r
892 * //Use the queue to retry locked file hashing
\r
893 var fileQueue = new ConcurrentQueue<FileSystemInfo>(localInfos);
\r
896 var results = new List<Tuple<FileSystemInfo, string>>();
\r
898 while (fileQueue.Count > 0)
\r
900 FileSystemInfo file;
\r
901 fileQueue.TryDequeue(out file);
\r
902 using (ThreadContext.Stacks["File"].Push(file.FullName))
\r
906 //Replace MD5 here, do the calc while syncing individual files
\r
908 if (file is DirectoryInfo)
\r
912 //Wait in case the FileAgent has requested a Pause
\r
913 await _unPauseEvent.WaitAsync().ConfigureAwait(false);
\r
915 using (StatusNotification.GetNotifier("Hashing {0}", "", file.Name))
\r
917 hash = ((FileInfo)file).ComputeShortHash(StatusNotification);
\r
921 results.Add(Tuple.Create(file, hash));
\r
923 catch (IOException exc)
\r
925 Log.WarnFormat("[HASH] File in use, will retry [{0}]", exc);
\r
926 fileQueue.Enqueue(file);
\r
927 //If this is the only enqueued file
\r
928 if (fileQueue.Count != 1) continue;
\r
934 //Pause Polling for the specified time
\r
937 await PauseFor(backoff).ConfigureAwait(false);
\r
944 private IEnumerable<StateTuple> MergeSources(IEnumerable<Tuple<string, ObjectInfo>> infos, IEnumerable<FileSystemInfo> files, List<FileState> states, ConcurrentDictionary<string, MovedEventArgs> moves)
\r
946 var tuplesByPath = new Dictionary<string, StateTuple>();
\r
947 foreach (var info in files)
\r
949 var tuple = new StateTuple(info);
\r
950 //Is this the target of a move event?
\r
951 var moveArg = moves.Values.FirstOrDefault(arg => info.FullName.Equals(arg.FullPath, StringComparison.InvariantCultureIgnoreCase)
\r
952 || info.FullName.IsAtOrBelow(arg.FullPath));
\r
953 if (moveArg != null)
\r
955 tuple.NewFullPath = info.FullName;
\r
956 var relativePath = info.AsRelativeTo(moveArg.FullPath);
\r
957 tuple.OldFullPath = Path.Combine(moveArg.OldFullPath, relativePath);
\r
958 tuple.OldChecksum = states.FirstOrDefault(st => st.FilePath.Equals(tuple.OldFullPath, StringComparison.InvariantCultureIgnoreCase))
\r
959 .NullSafe(st => st.Checksum);
\r
962 tuplesByPath[tuple.FilePath] = tuple;
\r
968 //For files that have state
\r
969 foreach (var state in states)
\r
971 StateTuple hashTuple;
\r
974 if (tuplesByPath.TryGetValue(state.FilePath, out hashTuple))
\r
976 hashTuple.FileState = state;
\r
977 UpdateHashes(hashTuple);
\r
979 else if (moves.ContainsKey(state.FilePath) && tuplesByPath.TryGetValue(moves[state.FilePath].FullPath, out hashTuple))
\r
981 hashTuple.FileState = state;
\r
982 UpdateHashes(hashTuple);
\r
986 var fsInfo = FileInfoExtensions.FromPath(state.FilePath);
\r
987 hashTuple = new StateTuple {FileInfo = fsInfo, FileState = state};
\r
989 //Is the source of a moved item?
\r
990 var moveArg = moves.Values.FirstOrDefault(arg => state.FilePath.Equals(arg.OldFullPath,StringComparison.InvariantCultureIgnoreCase)
\r
991 || state.FilePath.IsAtOrBelow(arg.OldFullPath));
\r
992 if (moveArg != null)
\r
994 var relativePath = state.FilePath.AsRelativeTo(moveArg.OldFullPath);
\r
995 hashTuple.NewFullPath = Path.Combine(moveArg.FullPath,relativePath);
\r
996 hashTuple.OldFullPath = state.FilePath;
\r
997 //Do we have the old MD5?
\r
998 //hashTuple.OldMD5 = state.LastMD5;
\r
1002 tuplesByPath[state.FilePath] = hashTuple;
\r
1005 //for files that don't have state
\r
1006 foreach (var tuple in tuplesByPath.Values.Where(t => t.FileState == null))
\r
1008 UpdateHashes(tuple);
\r
1011 var tuplesByID = tuplesByPath.Values
\r
1012 .Where(tuple => tuple.FileState != null && tuple.FileState.ObjectID!=null)
\r
1013 .ToDictionary(tuple=>tuple.FileState.ObjectID,tuple=>tuple);//new Dictionary<Guid, StateTuple>();
\r
1015 foreach (var info in infos)
\r
1017 StateTuple hashTuple;
\r
1018 var filePath = info.Item1;
\r
1019 var objectInfo = info.Item2;
\r
1020 var objectID = objectInfo.UUID;
\r
1022 if (objectID != _emptyGuid && tuplesByID.TryGetValue(objectID, out hashTuple))
\r
1024 hashTuple.ObjectInfo = objectInfo;
\r
1026 else if (tuplesByPath.TryGetValue(filePath, out hashTuple))
\r
1028 hashTuple.ObjectInfo = objectInfo;
\r
1034 var fsInfo = FileInfoExtensions.FromPath(filePath);
\r
1035 hashTuple= new StateTuple {FileInfo = fsInfo, ObjectInfo = objectInfo};
\r
1036 tuplesByPath[filePath] = hashTuple;
\r
1038 if (objectInfo.UUID!=_emptyGuid)
\r
1039 tuplesByID[objectInfo.UUID] = hashTuple;
\r
1043 var tuples = tuplesByPath.Values;
\r
1044 var brokenTuples = from tuple in tuples
\r
1045 where tuple.FileState != null && tuple.FileState.Checksum == null
\r
1046 && tuple.ObjectInfo != null && (tuple.FileInfo==null || !tuple.FileInfo.Exists)
\r
1048 var actualTuples = tuples.Except(brokenTuples);
\r
1049 Debug.Assert(actualTuples.All(t => t.HashesValid()));
\r
1051 foreach (var tuple in brokenTuples)
\r
1053 StatusKeeper.SetFileState(tuple.FileState.FilePath,
\r
1054 FileStatus.Conflict, FileOverlayStatus.Conflict, "FileState without checksum encountered for server object missing from disk");
\r
1057 return actualTuples;
\r
1062 /// Update the tuple with the file's hashes, avoiding calculation if the file is unchanged
\r
1064 /// <param name="hashTuple"></param>
\r
1066 /// The function first checks the file's size and last write date to see if there are any changes. If there are none,
\r
1067 /// the file's stored hashes are used.
\r
1068 /// Otherwise, MD5 is calculated first to ensure there are no changes. If MD5 is different, the Merkle hash is calculated
\r
1070 private void UpdateHashes(StateTuple hashTuple)
\r
1075 var state = hashTuple.NullSafe(s => s.FileState);
\r
1076 var storedHash = state.NullSafe(s => s.Checksum);
\r
1077 var storedHashes = state.NullSafe(s => s.Hashes);
\r
1078 //var storedMD5 = state.NullSafe(s => s.LastMD5);
\r
1079 var storedDate = state.NullSafe(s => s.LastWriteDate) ?? DateTime.MinValue;
\r
1080 var storedLength = state.NullSafe(s => s.LastLength);
\r
1082 //var md5Hash = Signature.MD5_EMPTY;
\r
1083 var merkle=TreeHash.Empty;
\r
1085 if (hashTuple.FileInfo is FileInfo)
\r
1087 var file = (FileInfo)hashTuple.FileInfo.WithProperCapitalization();
\r
1089 //Attributes unchanged?
\r
1090 //LastWriteTime is only accurate to the second
\r
1091 var unchangedAttributes = file.LastWriteTime - storedDate < TimeSpan.FromSeconds(1)
\r
1092 && storedLength == file.Length;
\r
1094 //Attributes appear unchanged but the file length doesn't match the stored hash ?
\r
1095 var nonEmptyMismatch = unchangedAttributes &&
\r
1096 (file.Length == 0 ^ storedHash== Signature.MERKLE_EMPTY);
\r
1098 //Missing hashes for NON-EMPTY hash ?
\r
1099 var missingHashes = storedHash != Signature.MERKLE_EMPTY &&
\r
1100 String.IsNullOrWhiteSpace(storedHashes);
\r
1102 //Unchanged attributes but changed MD5
\r
1103 //Short-circuiting ensures MD5 is computed only if the attributes are changed
\r
1105 //var md5Mismatch = (!unchangedAttributes && file.ComputeShortHash(StatusNotification) != storedMD5);
\r
1108 //If the attributes are unchanged but the Merkle doesn't match the size,
\r
1109 //or the attributes and the MD5 hash have changed,
\r
1110 //or the hashes are missing but the tophash is NOT empty, we need to recalculate
\r
1112 //Otherwise we load the hashes from state
\r
1113 if (!unchangedAttributes || nonEmptyMismatch || missingHashes)
\r
1114 merkle = RecalculateTreehash(file);
\r
1117 merkle=TreeHash.Parse(hashTuple.FileState.Hashes);
\r
1118 //merkle.MD5 = storedMD5;
\r
1122 //md5Hash = merkle.MD5;
\r
1124 //hashTuple.MD5 = md5Hash;
\r
1125 //Setting Merkle also updates C
\r
1126 hashTuple.Merkle = merkle;
\r
1128 catch (IOException)
\r
1130 hashTuple.Locked = true;
\r
1135 /// Recalculate a file's treehash and md5 and update the database
\r
1137 /// <param name="file"></param>
\r
1138 /// <returns></returns>
\r
1139 private TreeHash RecalculateTreehash(FileInfo file)
\r
1141 var progress = new Progress<double>(d =>StatusNotification.Notify(
\r
1142 new StatusNotification(String.Format("Hashing {0} of {1}", d, file.Name))));
\r
1143 var merkle = Signature.CalculateTreeHash(file, StatusKeeper.BlockSize,
\r
1144 StatusKeeper.BlockHash,CancellationToken, progress);
\r
1146 StatusKeeper.UpdateFileHashes(file.FullName, merkle);
\r
1151 /// Returns the latest LastModified date from the list of objects, but only if it is before
\r
1152 /// than the threshold value
\r
1154 /// <param name="threshold"></param>
\r
1155 /// <param name="cloudObjects"></param>
\r
1156 /// <returns></returns>
\r
1157 private static DateTime? GetLatestDateBefore(DateTime? threshold, IList<ObjectInfo> cloudObjects)
\r
1159 DateTime? maxDate = null;
\r
1160 if (cloudObjects!=null && cloudObjects.Count > 0)
\r
1161 maxDate = cloudObjects.Max(obj => obj.Last_Modified);
\r
1162 if (maxDate == null || maxDate == DateTime.MinValue)
\r
1164 if (threshold == null || threshold == DateTime.MinValue || threshold > maxDate)
\r
1170 /// Returns the latest LastModified date from the list of objects, but only if it is after
\r
1171 /// the threshold value
\r
1173 /// <param name="threshold"></param>
\r
1174 /// <param name="cloudObjects"></param>
\r
1175 /// <returns></returns>
\r
1176 private static DateTime? GetLatestDateAfter(DateTime? threshold, IList<ObjectInfo> cloudObjects)
\r
1178 DateTime? maxDate = null;
\r
1179 if (cloudObjects!=null && cloudObjects.Count > 0)
\r
1180 maxDate = cloudObjects.Max(obj => obj.Last_Modified);
\r
1181 if (maxDate == null || maxDate == DateTime.MinValue)
\r
1183 if (threshold == null || threshold == DateTime.MinValue || threshold < maxDate)
\r
1188 readonly AccountsDifferencer _differencer = new AccountsDifferencer();
\r
1189 private bool _pause;
\r
1190 private readonly string _emptyGuid = Guid.Empty.ToString();
\r
1194 private void ReportConflictForMismatch(string localFilePath)
\r
1196 if (String.IsNullOrWhiteSpace(localFilePath))
\r
1197 throw new ArgumentNullException("localFilePath");
\r
1198 Contract.EndContractBlock();
\r
1200 StatusKeeper.SetFileState(localFilePath, FileStatus.Conflict, FileOverlayStatus.Conflict, "File changed at the server");
\r
1201 UpdateStatus(PithosStatus.HasConflicts);
\r
1202 var message = String.Format("Conflict detected for file {0}", localFilePath);
\r
1203 Log.Warn(message);
\r
1204 StatusNotification.NotifyChange(message, TraceLevel.Warning);
\r
1207 private void ReportConflictForDoubleRename(string localFilePath)
\r
1209 if (String.IsNullOrWhiteSpace(localFilePath))
\r
1210 throw new ArgumentNullException("localFilePath");
\r
1211 Contract.EndContractBlock();
\r
1213 StatusKeeper.SetFileState(localFilePath, FileStatus.Conflict, FileOverlayStatus.Conflict, "File renamed both locally and on the server");
\r
1214 UpdateStatus(PithosStatus.HasConflicts);
\r
1215 var message = String.Format("Double rename conflict detected for file {0}", localFilePath);
\r
1216 Log.Warn(message);
\r
1217 StatusNotification.NotifyChange(message, TraceLevel.Warning);
\r
1222 /// Notify the UI to update the visual status
\r
1224 /// <param name="status"></param>
\r
1225 private void UpdateStatus(PithosStatus status)
\r
1229 StatusNotification.SetPithosStatus(status);
\r
1230 //StatusNotification.Notify(new Notification());
\r
1232 catch (Exception exc)
\r
1234 //Failure is not critical, just log it
\r
1235 Log.Warn("Error while updating status", exc);
\r
1239 private static void CreateContainerFolders(AccountInfo accountInfo, IEnumerable<ContainerInfo> containers)
\r
1241 var containerPaths = from container in containers
\r
1242 let containerPath = Path.Combine(accountInfo.AccountPath, container.Name.ToUnescapedString())
\r
1243 where container.Name.ToString() != FolderConstants.TrashContainer && !Directory.Exists(containerPath)
\r
1244 select containerPath;
\r
1246 foreach (var path in containerPaths)
\r
1248 Directory.CreateDirectory(path);
\r
1252 public void AddAccount(AccountInfo accountInfo)
\r
1254 //Avoid adding a duplicate accountInfo
\r
1255 _accounts.TryAdd(accountInfo.AccountKey, accountInfo);
\r
1258 public void RemoveAccount(AccountInfo accountInfo)
\r
1260 AccountInfo account;
\r
1261 _accounts.TryRemove(accountInfo.AccountKey, out account);
\r
1263 SnapshotDifferencer differencer;
\r
1264 _differencer.Differencers.TryRemove(accountInfo.AccountKey, out differencer);
\r