2 /* -----------------------------------------------------------------------
\r
3 * <copyright file="PollAgent.cs" company="GRNet">
\r
5 * Copyright 2011-2012 GRNET S.A. All rights reserved.
\r
7 * Redistribution and use in source and binary forms, with or
\r
8 * without modification, are permitted provided that the following
\r
9 * conditions are met:
\r
11 * 1. Redistributions of source code must retain the above
\r
12 * copyright notice, this list of conditions and the following
\r
15 * 2. Redistributions in binary form must reproduce the above
\r
16 * copyright notice, this list of conditions and the following
\r
17 * disclaimer in the documentation and/or other materials
\r
18 * provided with the distribution.
\r
21 * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
\r
22 * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
\r
23 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
\r
24 * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
\r
25 * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
\r
26 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
\r
27 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
\r
28 * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
\r
29 * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
\r
30 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
\r
31 * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
\r
32 * POSSIBILITY OF SUCH DAMAGE.
\r
34 * The views and conclusions contained in the software and
\r
35 * documentation are those of the authors and should not be
\r
36 * interpreted as representing official policies, either expressed
\r
37 * or implied, of GRNET S.A.
\r
39 * -----------------------------------------------------------------------
\r
43 using System.Collections.Concurrent;
\r
44 using System.ComponentModel.Composition;
\r
45 using System.Diagnostics;
\r
46 using System.Diagnostics.Contracts;
\r
48 using System.Linq.Expressions;
\r
49 using System.Reflection;
\r
50 using System.Security.Cryptography;
\r
51 using System.Threading;
\r
52 using System.Threading.Tasks;
\r
53 using System.Threading.Tasks.Dataflow;
\r
54 using Castle.ActiveRecord;
\r
55 using Pithos.Interfaces;
\r
56 using Pithos.Network;
\r
59 namespace Pithos.Core.Agents
\r
62 using System.Collections.Generic;
\r
65 public class PollRequest
\r
67 public DateTime? Since { get; set; }
\r
68 public IEnumerable<string> Batch { get; set; }
\r
71 [DebuggerDisplay("{FilePath} C:{C} L:{L} S:{S}")]
\r
72 public class StateTuple
\r
74 public string FilePath { get; private set; }
\r
76 public string MD5 { get; set; }
\r
80 get { return FileState==null?null:FileState.Checksum; }
\r
88 _c = String.IsNullOrWhiteSpace(value) ? null : value;
\r
94 get { return ObjectInfo == null ? null : ObjectInfo.X_Object_Hash; }
\r
97 private FileSystemInfo _fileInfo;
\r
98 private TreeHash _merkle;
\r
100 public FileSystemInfo FileInfo
\r
102 get { return _fileInfo; }
\r
106 FilePath = value.FullName;
\r
110 public FileState FileState { get; set; }
\r
111 public ObjectInfo ObjectInfo{ get; set; }
\r
114 public TreeHash Merkle
\r
121 C = _merkle.TopHash.ToHashString();
\r
125 public StateTuple() { }
\r
127 public StateTuple(FileSystemInfo info)
\r
137 /// PollAgent periodically polls the server to detect object changes. The agent retrieves a listing of all
\r
138 /// objects and compares it with a previously cached version to detect differences.
\r
139 /// New files are downloaded, missing files are deleted from the local file system and common files are compared
\r
140 /// to determine the appropriate action
\r
143 public class PollAgent
\r
145 private static readonly ILog Log = LogManager.GetLogger(MethodBase.GetCurrentMethod().DeclaringType);
\r
147 [System.ComponentModel.Composition.Import]
\r
148 public IStatusKeeper StatusKeeper { get; set; }
\r
150 [System.ComponentModel.Composition.Import]
\r
151 public IPithosSettings Settings { get; set; }
\r
153 [System.ComponentModel.Composition.Import]
\r
154 public NetworkAgent NetworkAgent { get; set; }
\r
156 [System.ComponentModel.Composition.Import]
\r
157 public Selectives Selectives { get; set; }
\r
159 public IStatusNotification StatusNotification { get; set; }
\r
161 private CancellationTokenSource _currentOperationCancellation = new CancellationTokenSource();
\r
163 public void CancelCurrentOperation()
\r
165 //What does it mean to cancel the current upload/download?
\r
166 //Obviously, the current operation will be cancelled by throwing
\r
167 //a cancellation exception.
\r
169 //The default behavior is to retry any operations that throw.
\r
170 //Obviously this is not what we want in this situation.
\r
171 //The cancelled operation should NOT bea retried.
\r
173 //This can be done by catching the cancellation exception
\r
174 //and avoiding the retry.
\r
177 //Have to reset the cancellation source - it is not possible to reset the source
\r
178 //Have to prevent a case where an operation requests a token from the old source
\r
179 var oldSource = Interlocked.Exchange(ref _currentOperationCancellation, new CancellationTokenSource());
\r
180 oldSource.Cancel();
\r
192 _unPauseEvent.Set();
\r
195 _unPauseEvent.Reset();
\r
200 private bool _firstPoll = true;
\r
202 //The Sync Event signals a manual synchronisation
\r
203 private readonly AsyncManualResetEvent _syncEvent = new AsyncManualResetEvent();
\r
205 private readonly AsyncManualResetEvent _unPauseEvent = new AsyncManualResetEvent(true);
\r
207 private readonly ConcurrentDictionary<string, DateTime> _lastSeen = new ConcurrentDictionary<string, DateTime>();
\r
208 private readonly ConcurrentDictionary<Uri, AccountInfo> _accounts = new ConcurrentDictionary<Uri,AccountInfo>();
\r
210 //private readonly ActionBlock<PollRequest> _pollAction;
\r
214 //_pollAction=new ActionBlock<PollRequest>(p=>ProcessPollRequest(p));
\r
218 /*private void ProcessPollRequest(PollRequest request)
\r
221 if (request.Since == null && request.Batch != null)
\r
223 _batchQueue.Enqueue(request.Batch);
\r
228 PollRemoteFiles(request.Since).Wait();
\r
232 /// Start a manual synchronization
\r
234 public void SynchNow(IEnumerable<string> paths=null)
\r
236 _batchQueue.Enqueue(paths);
\r
239 //_pollAction.Post(new PollRequest {Batch = paths});
\r
242 readonly ConcurrentQueue<IEnumerable<string>> _batchQueue=new ConcurrentQueue<IEnumerable<string>>();
\r
245 /// Remote files are polled periodically. Any changes are processed
\r
247 /// <param name="since"></param>
\r
248 /// <returns></returns>
\r
249 public void PollRemoteFiles(DateTime? since = null)
\r
251 if (Log.IsDebugEnabled)
\r
252 Log.DebugFormat("Polling changes after [{0}]",since);
\r
254 Debug.Assert(Thread.CurrentThread.IsBackground, "Polling Ended up in the main thread!");
\r
258 using (ThreadContext.Stacks["Retrieve Remote"].Push("All accounts"))
\r
260 //If this poll fails, we will retry with the same since value
\r
261 var nextSince = since;
\r
264 _unPauseEvent.Wait();
\r
265 UpdateStatus(PithosStatus.PollSyncing);
\r
267 var accountBatches=new Dictionary<Uri, IEnumerable<string>>();
\r
268 IEnumerable<string> batch = null;
\r
269 if (_batchQueue.TryDequeue(out batch) && batch != null)
\r
270 foreach (var account in _accounts.Values)
\r
272 var accountBatch = batch.Where(path => path.IsAtOrBelow(account.AccountPath));
\r
273 accountBatches[account.AccountKey] = accountBatch;
\r
276 var tasks = new List<Task<DateTime?>>();
\r
277 foreach(var accountInfo in _accounts.Values)
\r
279 IEnumerable<string> accountBatch ;
\r
280 accountBatches.TryGetValue(accountInfo.AccountKey,out accountBatch);
\r
281 var t=ProcessAccountFiles (accountInfo, accountBatch, since);
\r
285 var nextTimes=TaskEx.WhenAll(tasks.ToList()).Result;
\r
287 _firstPoll = false;
\r
288 //Reschedule the poll with the current timestamp as a "since" value
\r
290 if (nextTimes.Length>0)
\r
291 nextSince = nextTimes.Min();
\r
292 if (Log.IsDebugEnabled)
\r
293 Log.DebugFormat("Next Poll at [{0}]",nextSince);
\r
295 catch (Exception ex)
\r
297 Log.ErrorFormat("Error while processing accounts\r\n{0}", ex);
\r
298 //In case of failure retry with the same "since" value
\r
301 UpdateStatus(PithosStatus.PollComplete);
\r
302 //The multiple try blocks are required because we can't have an await call
\r
303 //inside a finally block
\r
304 //TODO: Find a more elegant solution for reschedulling in the event of an exception
\r
307 //Wait for the polling interval to pass or the Sync event to be signalled
\r
308 nextSince = WaitForScheduledOrManualPoll(nextSince).Result;
\r
312 //Ensure polling is scheduled even in case of error
\r
313 PollRemoteFiles(nextSince);
\r
314 //_pollAction.Post(new PollRequest {Since = nextSince});
\r
320 /// Wait for the polling period to expire or a manual sync request
\r
322 /// <param name="since"></param>
\r
323 /// <returns></returns>
\r
324 private async Task<DateTime?> WaitForScheduledOrManualPoll(DateTime? since)
\r
326 var sync = _syncEvent.WaitAsync();
\r
327 var wait = TaskEx.Delay(TimeSpan.FromSeconds(Settings.PollingInterval));
\r
329 var signaledTask = await TaskEx.WhenAny(sync, wait);
\r
331 //Pausing takes precedence over manual sync or awaiting
\r
332 _unPauseEvent.Wait();
\r
334 //Wait for network processing to finish before polling
\r
335 var pauseTask=NetworkAgent.ProceedEvent.WaitAsync();
\r
336 await TaskEx.WhenAll(signaledTask, pauseTask);
\r
338 //If polling is signalled by SynchNow, ignore the since tag
\r
339 if (sync.IsCompleted)
\r
341 //TODO: Must convert to AutoReset
\r
342 _syncEvent.Reset();
\r
348 public async Task<DateTime?> ProcessAccountFiles(AccountInfo accountInfo, IEnumerable<string> accountBatch, DateTime? since = null)
\r
350 if (accountInfo == null)
\r
351 throw new ArgumentNullException("accountInfo");
\r
352 if (String.IsNullOrWhiteSpace(accountInfo.AccountPath))
\r
353 throw new ArgumentException("The AccountInfo.AccountPath is empty", "accountInfo");
\r
354 Contract.EndContractBlock();
\r
357 using (ThreadContext.Stacks["Retrieve Remote"].Push(accountInfo.UserName))
\r
360 await NetworkAgent.GetDeleteAwaiter();
\r
362 Log.Info("Scheduled");
\r
363 var client = new CloudFilesClient(accountInfo);
\r
365 //We don't need to check the trash container
\r
366 var containers = client.ListContainers(accountInfo.UserName)
\r
367 .Where(c=>c.Name!="trash")
\r
371 CreateContainerFolders(accountInfo, containers);
\r
373 //The nextSince time fallback time is the same as the current.
\r
374 //If polling succeeds, the next Since time will be the smallest of the maximum modification times
\r
375 //of the shared and account objects
\r
376 var nextSince = since;
\r
380 //Wait for any deletions to finish
\r
381 await NetworkAgent.GetDeleteAwaiter();
\r
382 //Get the poll time now. We may miss some deletions but it's better to keep a file that was deleted
\r
383 //than delete a file that was created while we were executing the poll
\r
385 //Get the list of server objects changed since the last check
\r
386 //The name of the container is passed as state in order to create a dictionary of tasks in a subsequent step
\r
387 var listObjects = (from container in containers
\r
388 select Task<IList<ObjectInfo>>.Factory.StartNew(_ =>
\r
389 client.ListObjects(accountInfo.UserName, container.Name, since), container.Name)).ToList();
\r
391 var listShared = Task<IList<ObjectInfo>>.Factory.StartNew(_ =>
\r
392 client.ListSharedObjects(since), "shared");
\r
393 listObjects.Add(listShared);
\r
394 var listTasks = await Task.Factory.WhenAll(listObjects.ToArray());
\r
396 using (ThreadContext.Stacks["SCHEDULE"].Push("Process Results"))
\r
398 var dict = listTasks.ToDictionary(t => t.AsyncState);
\r
400 //Get all non-trash objects. Remember, the container name is stored in AsyncState
\r
401 var remoteObjects = (from objectList in listTasks
\r
402 where (string)objectList.AsyncState != "trash"
\r
403 from obj in objectList.Result
\r
404 orderby obj.Bytes ascending
\r
405 select obj).ToList();
\r
407 //Get the latest remote object modification date, only if it is after
\r
408 //the original since date
\r
409 nextSince = GetLatestDateAfter(nextSince, remoteObjects);
\r
411 var sharedObjects = dict["shared"].Result;
\r
413 //DON'T process trashed files
\r
414 //If some files are deleted and added again to a folder, they will be deleted
\r
415 //even though they are new.
\r
416 //We would have to check file dates and hashes to ensure that a trashed file
\r
417 //can be deleted safely from the local hard drive.
\r
419 //Items with the same name, hash may be both in the container and the trash
\r
420 //Don't delete items that exist in the container
\r
421 var realTrash = from trash in trashObjects
\r
423 !remoteObjects.Any(
\r
424 info => info.Name == trash.Name && info.Hash == trash.Hash)
\r
426 ProcessTrashedFiles(accountInfo, realTrash);
\r
429 var cleanRemotes = (from info in remoteObjects.Union(sharedObjects)
\r
430 let name = info.Name??""
\r
431 where !name.EndsWith(".ignore", StringComparison.InvariantCultureIgnoreCase) &&
\r
432 !name.StartsWith(FolderConstants.CacheFolder + "/",
\r
433 StringComparison.InvariantCultureIgnoreCase)
\r
434 select info).ToList();
\r
437 StatusKeeper.CleanupOrphanStates();
\r
439 var differencer = _differencer.PostSnapshot(accountInfo, cleanRemotes);
\r
440 var currentRemotes = differencer.Current.ToList();
\r
441 StatusKeeper.CleanupStaleStates(accountInfo, currentRemotes);
\r
443 //var filterUris = Selectives.SelectiveUris[accountInfo.AccountKey];
\r
445 //May have to wait if the FileAgent has asked for a Pause, due to local changes
\r
446 await _unPauseEvent.WaitAsync();
\r
448 //Get the local files here
\r
449 var agent = AgentLocator<FileAgent>.Get(accountInfo.AccountPath);
\r
450 //TODO: Pass the batch here as well
\r
451 var files = await LoadLocalFileTuples(accountInfo, accountBatch);
\r
453 var states = FileState.Queryable.ToList();
\r
455 var infos = (from remote in currentRemotes
\r
456 let path = remote.RelativeUrlToFilePath(accountInfo.UserName)
\r
457 let info=agent.GetFileSystemInfo(path)
\r
458 select Tuple.Create(info.FullName,remote))
\r
461 var token = _currentOperationCancellation.Token;
\r
463 var tuples = MergeSources(infos, files, states).ToList();
\r
465 //Process only the changes in the batch file, if one exists
\r
466 var stateTuples = accountBatch==null?tuples:tuples.Where(t => accountBatch.Contains(t.FilePath));
\r
467 foreach (var tuple in stateTuples)
\r
469 await _unPauseEvent.WaitAsync();
\r
471 //Set the Merkle Hash
\r
472 SetMerkleHash(accountInfo, tuple);
\r
474 SyncSingleItem(accountInfo, tuple, agent, token);
\r
483 MarkSuspectedDeletes(accountInfo, cleanRemotes);
\r
488 Log.Info("[LISTENER] End Processing");
\r
491 catch (Exception ex)
\r
493 Log.ErrorFormat("[FAIL] ListObjects for{0} in ProcessRemoteFiles with {1}", accountInfo.UserName, ex);
\r
497 Log.Info("[LISTENER] Finished");
\r
502 private static void SetMerkleHash(AccountInfo accountInfo, StateTuple tuple)
\r
504 //The Merkle hash for directories is that of an empty buffer
\r
505 if (tuple.FileInfo is DirectoryInfo)
\r
506 tuple.C = MERKLE_EMPTY;
\r
507 else if (tuple.FileState != null && tuple.MD5 == tuple.FileState.ShortHash)
\r
509 //If there is a state whose MD5 matches, load the merkle hash from the file state
\r
510 //insteaf of calculating it
\r
511 tuple.C = tuple.FileState.Checksum;
\r
515 tuple.Merkle = TaskEx.Run(()=> Signature.CalculateTreeHash(tuple.FileInfo, accountInfo.BlockSize, accountInfo.BlockHash)).Result;
\r
516 //tuple.C=tuple.Merkle.TopHash.ToHashString();
\r
520 private async Task<List<Tuple<FileSystemInfo, string>>> LoadLocalFileTuples(AccountInfo accountInfo,IEnumerable<string> batch )
\r
522 using (ThreadContext.Stacks["Account Files Hashing"].Push(accountInfo.UserName))
\r
524 var batchPaths = (batch==null)?new List<string>():batch.ToList();
\r
525 IEnumerable<FileSystemInfo> localInfos=AgentLocator<FileAgent>.Get(accountInfo.AccountPath)
\r
526 .EnumerateFileSystemInfos();
\r
527 if (batchPaths.Count>0)
\r
528 localInfos= localInfos.Where(fi => batchPaths.Contains(fi.FullName));
\r
530 //Use the queue to retry locked file hashing
\r
531 var fileQueue = new Queue<FileSystemInfo>(localInfos);
\r
532 var hasher = MD5.Create();
\r
534 var results = new List<Tuple<FileSystemInfo, string>>();
\r
536 while (fileQueue.Count > 0)
\r
538 var file = fileQueue.Dequeue();
\r
539 using (ThreadContext.Stacks["File"].Push(file.FullName))
\r
542 Signature.CalculateTreeHash(file, accountInfo.BlockSize,
\r
543 accountInfo.BlockHash).
\r
544 TopHash.ToHashString()
\r
548 //Replace MD5 here, do the calc while syncing individual files
\r
550 if (file is DirectoryInfo)
\r
551 hash = MERKLE_EMPTY;
\r
554 //Wait in case the FileAgent has requested a Pause
\r
555 await _unPauseEvent.WaitAsync();
\r
557 using (StatusNotification.GetNotifier("Hashing {0}", "Finished hashing {0}", file.Name))
\r
558 using (var stream = (file as FileInfo).OpenRead())
\r
560 hash = hasher.ComputeHash(stream).ToHashString();
\r
564 results.Add(Tuple.Create(file, hash));
\r
566 catch (IOException exc)
\r
568 Log.WarnFormat("[HASH] File in use, will retry [{0}]", exc);
\r
569 fileQueue.Enqueue(file);
\r
570 //If this is the only enqueued file
\r
571 if (fileQueue.Count != 1) continue;
\r
577 //Pause Polling for the specified time
\r
580 await PauseFor(backoff);
\r
589 /// Wait and Pause the agent while waiting
\r
591 /// <param name="backoff"></param>
\r
592 /// <returns></returns>
\r
593 private async Task PauseFor(int backoff)
\r
597 await TaskEx.Delay(backoff);
\r
601 private void SyncSingleItem(AccountInfo accountInfo, StateTuple tuple, FileAgent agent, CancellationToken token)
\r
603 Log.DebugFormat("Sync [{0}] C:[{1}] L:[{2}] S:[{3}]",tuple.FilePath,tuple.C,tuple.L,tuple.S);
\r
605 var localFilePath = tuple.FilePath;
\r
606 //Don't use the tuple info, it may have been deleted
\r
607 var localInfo = FileInfoExtensions.FromPath(localFilePath);
\r
610 var isUnselectedRootFolder = agent.IsUnselectedRootFolder(tuple.FilePath);
\r
612 //Unselected root folders that have not yet been uploaded should be uploaded and added to the
\r
613 //selective folders
\r
615 if (!Selectives.IsSelected(accountInfo, localFilePath) && !(isUnselectedRootFolder && tuple.ObjectInfo==null) )
\r
618 // Local file unchanged? If both C and L are null, make sure it's because
\r
619 //both the file is missing and the state checksum is not missing
\r
620 if (tuple.C == tuple.L /*&& (localInfo.Exists || tuple.FileState == null)*/)
\r
623 //Server unchanged?
\r
624 if (tuple.S == tuple.L)
\r
626 // No server changes
\r
627 //Has the file been renamed on the server?
\r
628 MoveForServerMove(accountInfo, tuple);
\r
632 //Different from server
\r
633 //Does the server file exist?
\r
634 if (tuple.S == null)
\r
636 //Server file doesn't exist
\r
637 //deleteObjectFromLocal()
\r
638 StatusKeeper.SetFileState(localFilePath, FileStatus.Deleted,
\r
639 FileOverlayStatus.Deleted, "");
\r
640 agent.Delete(localFilePath);
\r
641 //updateRecord(Remove C, L)
\r
642 StatusKeeper.ClearFileStatus(localFilePath);
\r
646 //Server file exists
\r
647 //downloadServerObject() // Result: L = S
\r
648 //If the file has moved on the server, move it locally before downloading
\r
649 var targetPath = MoveForServerMove(accountInfo, tuple);
\r
651 StatusKeeper.SetFileState(targetPath, FileStatus.Modified,
\r
652 FileOverlayStatus.Modified, "");
\r
653 NetworkAgent.Downloader.DownloadCloudFile(accountInfo,
\r
655 targetPath, tuple.Merkle, token).Wait(token);
\r
656 //updateRecord( L = S )
\r
657 StatusKeeper.UpdateFileChecksum(targetPath, tuple.ObjectInfo.ETag,
\r
658 tuple.ObjectInfo.X_Object_Hash);
\r
660 StatusKeeper.StoreInfo(targetPath, tuple.ObjectInfo);
\r
663 StatusKeeper.SetFileState(targetPath, FileStatus.Unchanged,
\r
664 FileOverlayStatus.Normal, "");
\r
672 //Local changes found
\r
674 //Server unchanged?
\r
675 if (tuple.S == tuple.L)
\r
677 //The FileAgent selective sync checks for new root folder files
\r
678 if (!agent.Ignore(localFilePath))
\r
680 if ((tuple.C == null || !localInfo.Exists) && tuple.ObjectInfo != null)
\r
682 //deleteObjectFromServer()
\r
683 DeleteCloudFile(accountInfo, tuple);
\r
684 //updateRecord( Remove L, S)
\r
688 //uploadLocalObject() // Result: S = C, L = S
\r
690 //Debug.Assert(tuple.FileState !=null);
\r
691 var action = new CloudUploadAction(accountInfo, localInfo, tuple.FileState,
\r
692 accountInfo.BlockSize, accountInfo.BlockHash,
\r
693 "Poll", isUnselectedRootFolder);
\r
694 NetworkAgent.Uploader.UploadCloudFile(action, tuple.Merkle, token).Wait(token);
\r
696 //updateRecord( S = C )
\r
697 //State updated by the uploader
\r
699 if (isUnselectedRootFolder)
\r
701 ProcessChildren(accountInfo, tuple, agent, token);
\r
708 if (tuple.C == tuple.S)
\r
710 // (Identical Changes) Result: L = S
\r
712 //Detect server moves
\r
713 var targetPath = MoveForServerMove(accountInfo, tuple);
\r
714 StatusKeeper.StoreInfo(targetPath, tuple.ObjectInfo);
\r
718 if ((tuple.C == null || !localInfo.Exists) && tuple.ObjectInfo != null)
\r
720 //deleteObjectFromServer()
\r
721 DeleteCloudFile(accountInfo, tuple);
\r
722 //updateRecord(Remove L, S)
\r
724 //If both the local and server files are missing, the state is stale
\r
725 else if (!localInfo.Exists && (tuple.S == null || tuple.ObjectInfo == null))
\r
727 StatusKeeper.ClearFileStatus(localInfo.FullName);
\r
731 ReportConflictForMismatch(localFilePath);
\r
732 //identifyAsConflict() // Manual action required
\r
739 private string MoveForServerMove(AccountInfo accountInfo, StateTuple tuple)
\r
741 if (tuple.ObjectInfo == null)
\r
743 var relativePath = tuple.ObjectInfo.RelativeUrlToFilePath(accountInfo.UserName);
\r
744 var serverPath = Path.Combine(accountInfo.AccountPath, relativePath);
\r
746 //Compare Case Insensitive
\r
747 if (String.Equals(tuple.FilePath ,serverPath,StringComparison.InvariantCultureIgnoreCase)) return serverPath;
\r
749 if (tuple.FileInfo.Exists)
\r
751 var fi = tuple.FileInfo as FileInfo;
\r
753 fi.MoveTo(serverPath);
\r
754 var di = tuple.FileInfo as DirectoryInfo;
\r
756 di.MoveTo(serverPath);
\r
757 StatusKeeper.StoreInfo(serverPath, tuple.ObjectInfo);
\r
761 Debug.Assert(false, "File does not exist");
\r
766 private void DeleteCloudFile(AccountInfo accountInfo, StateTuple tuple)
\r
768 StatusKeeper.SetFileState(tuple.FilePath, FileStatus.Deleted,
\r
769 FileOverlayStatus.Deleted, "");
\r
770 NetworkAgent.DeleteAgent.DeleteCloudFile(accountInfo, tuple.ObjectInfo);
\r
771 StatusKeeper.ClearFileStatus(tuple.FilePath);
\r
774 private void ProcessChildren(AccountInfo accountInfo, StateTuple tuple, FileAgent agent, CancellationToken token)
\r
777 var dirInfo = tuple.FileInfo as DirectoryInfo;
\r
778 var folderTuples = from folder in dirInfo.EnumerateDirectories("*", SearchOption.AllDirectories)
\r
779 select new StateTuple(folder);
\r
780 var fileTuples = from file in dirInfo.EnumerateFiles("*", SearchOption.AllDirectories)
\r
781 select new StateTuple(file);
\r
783 //Process folders first, to ensure folders appear on the sever as soon as possible
\r
784 folderTuples.ApplyAction(t => SyncSingleItem(accountInfo, t, agent, token));
\r
786 fileTuples.ApplyAction(t => SyncSingleItem(accountInfo, t, agent, token));
\r
789 private static IEnumerable<StateTuple> MergeSources(
\r
790 IEnumerable<Tuple<string, ObjectInfo>> infos,
\r
791 IEnumerable<Tuple<FileSystemInfo, string>> files,
\r
792 IEnumerable<FileState> states)
\r
794 var tuplesByPath = new Dictionary<string, StateTuple>();
\r
795 foreach (var file in files)
\r
797 var fsInfo = file.Item1;
\r
798 var fileHash = fsInfo is DirectoryInfo? MERKLE_EMPTY:file.Item2;
\r
800 tuplesByPath[fsInfo.FullName] = new StateTuple {FileInfo = fsInfo, MD5 = fileHash};
\r
802 foreach (var state in states)
\r
804 StateTuple hashTuple;
\r
805 if (tuplesByPath.TryGetValue(state.FilePath, out hashTuple))
\r
807 hashTuple.FileState = state;
\r
811 var fsInfo = FileInfoExtensions.FromPath(state.FilePath);
\r
812 tuplesByPath[state.FilePath] = new StateTuple {FileInfo = fsInfo, FileState = state};
\r
816 var tuplesByID = tuplesByPath.Values
\r
817 .Where(tuple => tuple.FileState != null && tuple.FileState.ObjectID!=null)
\r
818 .ToDictionary(tuple=>tuple.FileState.ObjectID,tuple=>tuple);//new Dictionary<Guid, StateTuple>();
\r
820 foreach (var info in infos)
\r
822 StateTuple hashTuple;
\r
823 var filePath = info.Item1;
\r
824 var objectInfo = info.Item2;
\r
825 var objectID = objectInfo.UUID;
\r
827 if (tuplesByID.TryGetValue(objectID, out hashTuple))
\r
829 hashTuple.ObjectInfo = objectInfo;
\r
831 else if (tuplesByPath.TryGetValue(filePath, out hashTuple))
\r
833 hashTuple.ObjectInfo = objectInfo;
\r
837 var fsInfo = FileInfoExtensions.FromPath(filePath);
\r
838 var tuple = new StateTuple {FileInfo = fsInfo, ObjectInfo = objectInfo};
\r
839 tuplesByPath[filePath] = tuple;
\r
840 tuplesByID[objectInfo.UUID] = tuple;
\r
843 return tuplesByPath.Values;
\r
847 /// Returns the latest LastModified date from the list of objects, but only if it is before
\r
848 /// than the threshold value
\r
850 /// <param name="threshold"></param>
\r
851 /// <param name="cloudObjects"></param>
\r
852 /// <returns></returns>
\r
853 private static DateTime? GetLatestDateBefore(DateTime? threshold, IList<ObjectInfo> cloudObjects)
\r
855 DateTime? maxDate = null;
\r
856 if (cloudObjects!=null && cloudObjects.Count > 0)
\r
857 maxDate = cloudObjects.Max(obj => obj.Last_Modified);
\r
858 if (maxDate == null || maxDate == DateTime.MinValue)
\r
860 if (threshold == null || threshold == DateTime.MinValue || threshold > maxDate)
\r
866 /// Returns the latest LastModified date from the list of objects, but only if it is after
\r
867 /// the threshold value
\r
869 /// <param name="threshold"></param>
\r
870 /// <param name="cloudObjects"></param>
\r
871 /// <returns></returns>
\r
872 private static DateTime? GetLatestDateAfter(DateTime? threshold, IList<ObjectInfo> cloudObjects)
\r
874 DateTime? maxDate = null;
\r
875 if (cloudObjects!=null && cloudObjects.Count > 0)
\r
876 maxDate = cloudObjects.Max(obj => obj.Last_Modified);
\r
877 if (maxDate == null || maxDate == DateTime.MinValue)
\r
879 if (threshold == null || threshold == DateTime.MinValue || threshold < maxDate)
\r
884 readonly AccountsDifferencer _differencer = new AccountsDifferencer();
\r
885 private Dictionary<Uri, List<Uri>> _selectiveUris = new Dictionary<Uri, List<Uri>>();
\r
886 private bool _pause;
\r
887 private static string MERKLE_EMPTY = "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855";
\r
890 /// Deletes local files that are not found in the list of cloud files
\r
892 /// <param name="accountInfo"></param>
\r
893 /// <param name="cloudFiles"></param>
\r
894 private void ProcessDeletedFiles(AccountInfo accountInfo, IEnumerable<ObjectInfo> cloudFiles)
\r
896 if (accountInfo == null)
\r
897 throw new ArgumentNullException("accountInfo");
\r
898 if (String.IsNullOrWhiteSpace(accountInfo.AccountPath))
\r
899 throw new ArgumentException("The AccountInfo.AccountPath is empty", "accountInfo");
\r
900 if (cloudFiles == null)
\r
901 throw new ArgumentNullException("cloudFiles");
\r
902 Contract.EndContractBlock();
\r
904 var deletedFiles = new List<FileSystemInfo>();
\r
905 foreach (var objectInfo in cloudFiles)
\r
907 if (Log.IsDebugEnabled)
\r
908 Log.DebugFormat("Handle deleted [{0}]", objectInfo.Uri);
\r
909 var relativePath = objectInfo.RelativeUrlToFilePath(accountInfo.UserName);
\r
910 var item = FileAgent.GetFileAgent(accountInfo).GetFileSystemInfo(relativePath);
\r
911 if (Log.IsDebugEnabled)
\r
912 Log.DebugFormat("Will delete [{0}] for [{1}]", item.FullName, objectInfo.Uri);
\r
915 if ((item.Attributes & FileAttributes.ReadOnly) == FileAttributes.ReadOnly)
\r
917 item.Attributes = item.Attributes & ~FileAttributes.ReadOnly;
\r
922 Log.DebugFormat("Deleting {0}", item.FullName);
\r
924 var directory = item as DirectoryInfo;
\r
925 if (directory != null)
\r
926 directory.Delete(true);
\r
929 Log.DebugFormat("Deleted [{0}] for [{1}]", item.FullName, objectInfo.Uri);
\r
931 _lastSeen.TryRemove(item.FullName, out lastDate);
\r
932 deletedFiles.Add(item);
\r
934 StatusKeeper.SetFileState(item.FullName, FileStatus.Deleted, FileOverlayStatus.Deleted, "File Deleted");
\r
936 Log.InfoFormat("[{0}] files were deleted", deletedFiles.Count);
\r
937 StatusNotification.NotifyForFiles(deletedFiles, String.Format("{0} files were deleted", deletedFiles.Count),
\r
942 private void MarkSuspectedDeletes(AccountInfo accountInfo, IEnumerable<ObjectInfo> cloudFiles)
\r
944 //Only consider files that are not being modified, ie they are in the Unchanged state
\r
945 var deleteCandidates = FileState.Queryable.Where(state =>
\r
946 state.FilePath.StartsWith(accountInfo.AccountPath)
\r
947 && state.FileStatus == FileStatus.Unchanged).ToList();
\r
950 //TODO: filesToDelete must take into account the Others container
\r
951 var filesToDelete = (from deleteCandidate in deleteCandidates
\r
952 let localFile = FileInfoExtensions.FromPath(deleteCandidate.FilePath)
\r
953 let relativeFilePath = localFile.AsRelativeTo(accountInfo.AccountPath)
\r
955 !cloudFiles.Any(r => r.RelativeUrlToFilePath(accountInfo.UserName) == relativeFilePath)
\r
956 select localFile).ToList();
\r
959 //Set the status of missing files to Conflict
\r
960 foreach (var item in filesToDelete)
\r
962 //Try to acquire a gate on the file, to take into account files that have been dequeued
\r
963 //and are being processed
\r
964 using (var gate = NetworkGate.Acquire(item.FullName, NetworkOperation.Deleting))
\r
968 StatusKeeper.SetFileState(item.FullName, FileStatus.Conflict, FileOverlayStatus.Deleted,
\r
969 "Local file missing from server");
\r
972 UpdateStatus(PithosStatus.HasConflicts);
\r
973 StatusNotification.NotifyConflicts(filesToDelete,
\r
975 "{0} local files are missing from Pithos, possibly because they were deleted",
\r
976 filesToDelete.Count));
\r
977 StatusNotification.NotifyForFiles(filesToDelete, String.Format("{0} files were deleted", filesToDelete.Count),
\r
981 private void ReportConflictForMismatch(string localFilePath)
\r
983 if (String.IsNullOrWhiteSpace(localFilePath))
\r
984 throw new ArgumentNullException("localFilePath");
\r
985 Contract.EndContractBlock();
\r
987 StatusKeeper.SetFileState(localFilePath, FileStatus.Conflict, FileOverlayStatus.Conflict, "File changed at the server");
\r
988 UpdateStatus(PithosStatus.HasConflicts);
\r
989 var message = String.Format("Conflict detected for file {0}", localFilePath);
\r
991 StatusNotification.NotifyChange(message, TraceLevel.Warning);
\r
997 /// Creates a Sync action for each changed server file
\r
999 /// <param name="accountInfo"></param>
\r
1000 /// <param name="changes"></param>
\r
1001 /// <returns></returns>
\r
1002 private IEnumerable<CloudAction> ChangesToActions(AccountInfo accountInfo, IEnumerable<ObjectInfo> changes)
\r
1004 if (changes == null)
\r
1005 throw new ArgumentNullException();
\r
1006 Contract.EndContractBlock();
\r
1007 var fileAgent = FileAgent.GetFileAgent(accountInfo);
\r
1009 //In order to avoid multiple iterations over the files, we iterate only once
\r
1010 //over the remote files
\r
1011 foreach (var objectInfo in changes)
\r
1013 var relativePath = objectInfo.RelativeUrlToFilePath(accountInfo.UserName);
\r
1014 //If a directory object already exists, we may need to sync it
\r
1015 if (fileAgent.Exists(relativePath))
\r
1017 var localFile = fileAgent.GetFileSystemInfo(relativePath);
\r
1018 //We don't need to sync directories
\r
1019 if (objectInfo.IsDirectory && localFile is DirectoryInfo)
\r
1021 using (new SessionScope(FlushAction.Never))
\r
1023 var state = StatusKeeper.GetStateByFilePath(localFile.FullName);
\r
1024 _lastSeen[localFile.FullName] = DateTime.Now;
\r
1025 //Common files should be checked on a per-case basis to detect differences, which is newer
\r
1027 yield return new CloudAction(accountInfo, CloudActionType.MustSynch,
\r
1028 localFile, objectInfo, state, accountInfo.BlockSize,
\r
1029 accountInfo.BlockHash,"Poll Changes");
\r
1034 //Remote files should be downloaded
\r
1035 yield return new CloudDownloadAction(accountInfo, objectInfo,"Poll Changes");
\r
1041 /// Creates a Local Move action for each moved server file
\r
1043 /// <param name="accountInfo"></param>
\r
1044 /// <param name="moves"></param>
\r
1045 /// <returns></returns>
\r
1046 private IEnumerable<CloudAction> MovesToActions(AccountInfo accountInfo, IEnumerable<ObjectInfo> moves)
\r
1048 if (moves == null)
\r
1049 throw new ArgumentNullException();
\r
1050 Contract.EndContractBlock();
\r
1051 var fileAgent = FileAgent.GetFileAgent(accountInfo);
\r
1053 //In order to avoid multiple iterations over the files, we iterate only once
\r
1054 //over the remote files
\r
1055 foreach (var objectInfo in moves)
\r
1057 var previousRelativepath = objectInfo.Previous.RelativeUrlToFilePath(accountInfo.UserName);
\r
1058 //If the previous file already exists, we can execute a Move operation
\r
1059 if (fileAgent.Exists(previousRelativepath))
\r
1061 var previousFile = fileAgent.GetFileSystemInfo(previousRelativepath);
\r
1062 using (new SessionScope(FlushAction.Never))
\r
1064 var state = StatusKeeper.GetStateByFilePath(previousFile.FullName);
\r
1065 _lastSeen[previousFile.FullName] = DateTime.Now;
\r
1067 //For each moved object we need to move both the local file and update
\r
1068 yield return new CloudAction(accountInfo, CloudActionType.RenameLocal,
\r
1069 previousFile, objectInfo, state, accountInfo.BlockSize,
\r
1070 accountInfo.BlockHash,"Poll Moves");
\r
1071 //For modified files, we need to download the changes as well
\r
1072 if (objectInfo.X_Object_Hash != objectInfo.PreviousHash)
\r
1073 yield return new CloudDownloadAction(accountInfo,objectInfo, "Poll Moves");
\r
1076 //If the previous file does not exist, we need to download it in the new location
\r
1079 //Remote files should be downloaded
\r
1080 yield return new CloudDownloadAction(accountInfo, objectInfo, "Poll Moves");
\r
1087 /// Creates a download action for each new server file
\r
1089 /// <param name="accountInfo"></param>
\r
1090 /// <param name="creates"></param>
\r
1091 /// <returns></returns>
\r
1092 private IEnumerable<CloudAction> CreatesToActions(AccountInfo accountInfo, IEnumerable<ObjectInfo> creates)
\r
1094 if (creates == null)
\r
1095 throw new ArgumentNullException();
\r
1096 Contract.EndContractBlock();
\r
1097 var fileAgent = FileAgent.GetFileAgent(accountInfo);
\r
1099 //In order to avoid multiple iterations over the files, we iterate only once
\r
1100 //over the remote files
\r
1101 foreach (var objectInfo in creates)
\r
1103 if (Log.IsDebugEnabled)
\r
1104 Log.DebugFormat("[NEW INFO] {0}",objectInfo.Uri);
\r
1106 var relativePath = objectInfo.RelativeUrlToFilePath(accountInfo.UserName);
\r
1108 //If the object already exists, we should check before uploading or downloading
\r
1109 if (fileAgent.Exists(relativePath))
\r
1111 var localFile= fileAgent.GetFileSystemInfo(relativePath);
\r
1112 var state = StatusKeeper.GetStateByFilePath(localFile.WithProperCapitalization().FullName);
\r
1113 yield return new CloudAction(accountInfo, CloudActionType.MustSynch,
\r
1114 localFile, objectInfo, state, accountInfo.BlockSize,
\r
1115 accountInfo.BlockHash,"Poll Creates");
\r
1119 //Remote files should be downloaded
\r
1120 yield return new CloudDownloadAction(accountInfo, objectInfo,"Poll Creates");
\r
1127 /// Notify the UI to update the visual status
\r
1129 /// <param name="status"></param>
\r
1130 private void UpdateStatus(PithosStatus status)
\r
1134 StatusNotification.SetPithosStatus(status);
\r
1135 //StatusNotification.Notify(new Notification());
\r
1137 catch (Exception exc)
\r
1139 //Failure is not critical, just log it
\r
1140 Log.Warn("Error while updating status", exc);
\r
1144 private static void CreateContainerFolders(AccountInfo accountInfo, IEnumerable<ContainerInfo> containers)
\r
1146 var containerPaths = from container in containers
\r
1147 let containerPath = Path.Combine(accountInfo.AccountPath, container.Name)
\r
1148 where container.Name != FolderConstants.TrashContainer && !Directory.Exists(containerPath)
\r
1149 select containerPath;
\r
1151 foreach (var path in containerPaths)
\r
1153 Directory.CreateDirectory(path);
\r
1157 public void AddAccount(AccountInfo accountInfo)
\r
1159 //Avoid adding a duplicate accountInfo
\r
1160 _accounts.TryAdd(accountInfo.AccountKey, accountInfo);
\r
1163 public void RemoveAccount(AccountInfo accountInfo)
\r
1165 AccountInfo account;
\r
1166 _accounts.TryRemove(accountInfo.AccountKey, out account);
\r
1168 SnapshotDifferencer differencer;
\r
1169 _differencer.Differencers.TryRemove(accountInfo.AccountKey, out differencer);
\r
1172 public void SetSelectivePaths(AccountInfo accountInfo,Uri[] added, Uri[] removed)
\r
1174 AbortRemovedPaths(accountInfo,removed);
\r
1175 //DownloadNewPaths(accountInfo,added);
\r
1179 private void DownloadNewPaths(AccountInfo accountInfo, Uri[] added)
\r
1181 var client = new CloudFilesClient(accountInfo);
\r
1182 foreach (var folderUri in added)
\r
1189 var segmentsCount = folderUri.Segments.Length;
\r
1190 //Is this an account URL?
\r
1191 if (segmentsCount < 3)
\r
1193 //Is this a container or folder URL?
\r
1194 if (segmentsCount == 3)
\r
1196 account = folderUri.Segments[1].TrimEnd('/');
\r
1197 container = folderUri.Segments[2].TrimEnd('/');
\r
1201 account = folderUri.Segments[2].TrimEnd('/');
\r
1202 container = folderUri.Segments[3].TrimEnd('/');
\r
1204 IList<ObjectInfo> items;
\r
1205 if (segmentsCount > 3)
\r
1208 var folder = String.Join("", folderUri.Segments.Splice(4));
\r
1209 items = client.ListObjects(account, container, folder);
\r
1214 items = client.ListObjects(account, container);
\r
1216 var actions = CreatesToActions(accountInfo, items);
\r
1217 foreach (var action in actions)
\r
1219 NetworkAgent.Post(action);
\r
1222 catch (Exception exc)
\r
1224 Log.WarnFormat("Listing of new selective path [{0}] failed with \r\n{1}", folderUri, exc);
\r
1228 //Need to get a listing of each of the URLs, then post them to the NetworkAgent
\r
1229 //CreatesToActions(accountInfo,)
\r
1231 /* NetworkAgent.Post();#1#
\r
1235 private void AbortRemovedPaths(AccountInfo accountInfo, Uri[] removed)
\r
1237 /*this.NetworkAgent.*/
\r