2 /* -----------------------------------------------------------------------
\r
3 * <copyright file="PollAgent.cs" company="GRNet">
\r
5 * Copyright 2011-2012 GRNET S.A. All rights reserved.
\r
7 * Redistribution and use in source and binary forms, with or
\r
8 * without modification, are permitted provided that the following
\r
9 * conditions are met:
\r
11 * 1. Redistributions of source code must retain the above
\r
12 * copyright notice, this list of conditions and the following
\r
15 * 2. Redistributions in binary form must reproduce the above
\r
16 * copyright notice, this list of conditions and the following
\r
17 * disclaimer in the documentation and/or other materials
\r
18 * provided with the distribution.
\r
21 * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
\r
22 * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
\r
23 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
\r
24 * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
\r
25 * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
\r
26 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
\r
27 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
\r
28 * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
\r
29 * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
\r
30 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
\r
31 * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
\r
32 * POSSIBILITY OF SUCH DAMAGE.
\r
34 * The views and conclusions contained in the software and
\r
35 * documentation are those of the authors and should not be
\r
36 * interpreted as representing official policies, either expressed
\r
37 * or implied, of GRNET S.A.
\r
39 * -----------------------------------------------------------------------
\r
43 using System.Collections.Concurrent;
\r
44 using System.ComponentModel.Composition;
\r
45 using System.Diagnostics;
\r
46 using System.Diagnostics.Contracts;
\r
48 using System.Linq.Expressions;
\r
49 using System.Reflection;
\r
50 using System.Security.Cryptography;
\r
51 using System.Threading;
\r
52 using System.Threading.Tasks;
\r
53 using Castle.ActiveRecord;
\r
54 using Pithos.Interfaces;
\r
55 using Pithos.Network;
\r
58 namespace Pithos.Core.Agents
\r
61 using System.Collections.Generic;
\r
64 [DebuggerDisplay("{FilePath} C:{C} L:{L} S:{S}")]
\r
65 public class StateTuple
\r
67 public string FilePath { get; private set; }
\r
69 public string MD5 { get; set; }
\r
73 get { return FileState==null?null:FileState.Checksum; }
\r
81 _c = String.IsNullOrWhiteSpace(value) ? null : value;
\r
87 get { return ObjectInfo == null ? null : ObjectInfo.X_Object_Hash; }
\r
90 private FileSystemInfo _fileInfo;
\r
91 private TreeHash _merkle;
\r
93 public FileSystemInfo FileInfo
\r
95 get { return _fileInfo; }
\r
99 FilePath = value.FullName;
\r
103 public FileState FileState { get; set; }
\r
104 public ObjectInfo ObjectInfo{ get; set; }
\r
107 public TreeHash Merkle
\r
114 C = _merkle.TopHash.ToHashString();
\r
118 public StateTuple() { }
\r
120 public StateTuple(FileSystemInfo info)
\r
130 /// PollAgent periodically polls the server to detect object changes. The agent retrieves a listing of all
\r
131 /// objects and compares it with a previously cached version to detect differences.
\r
132 /// New files are downloaded, missing files are deleted from the local file system and common files are compared
\r
133 /// to determine the appropriate action
\r
136 public class PollAgent
\r
138 private static readonly ILog Log = LogManager.GetLogger(MethodBase.GetCurrentMethod().DeclaringType);
\r
140 [System.ComponentModel.Composition.Import]
\r
141 public IStatusKeeper StatusKeeper { get; set; }
\r
143 [System.ComponentModel.Composition.Import]
\r
144 public IPithosSettings Settings { get; set; }
\r
146 [System.ComponentModel.Composition.Import]
\r
147 public NetworkAgent NetworkAgent { get; set; }
\r
149 [System.ComponentModel.Composition.Import]
\r
150 public Selectives Selectives { get; set; }
\r
152 public IStatusNotification StatusNotification { get; set; }
\r
154 private CancellationTokenSource _currentOperationCancellation = new CancellationTokenSource();
\r
156 public void CancelCurrentOperation()
\r
158 //What does it mean to cancel the current upload/download?
\r
159 //Obviously, the current operation will be cancelled by throwing
\r
160 //a cancellation exception.
\r
162 //The default behavior is to retry any operations that throw.
\r
163 //Obviously this is not what we want in this situation.
\r
164 //The cancelled operation should NOT bea retried.
\r
166 //This can be done by catching the cancellation exception
\r
167 //and avoiding the retry.
\r
170 //Have to reset the cancellation source - it is not possible to reset the source
\r
171 //Have to prevent a case where an operation requests a token from the old source
\r
172 var oldSource = Interlocked.Exchange(ref _currentOperationCancellation, new CancellationTokenSource());
\r
173 oldSource.Cancel();
\r
185 _unPauseEvent.Set();
\r
188 _unPauseEvent.Reset();
\r
193 private bool _firstPoll = true;
\r
195 //The Sync Event signals a manual synchronisation
\r
196 private readonly AsyncManualResetEvent _syncEvent = new AsyncManualResetEvent();
\r
198 private readonly AsyncManualResetEvent _unPauseEvent = new AsyncManualResetEvent(true);
\r
200 private readonly ConcurrentDictionary<string, DateTime> _lastSeen = new ConcurrentDictionary<string, DateTime>();
\r
201 private readonly ConcurrentDictionary<Uri, AccountInfo> _accounts = new ConcurrentDictionary<Uri,AccountInfo>();
\r
205 /// Start a manual synchronization
\r
207 public void SynchNow(IEnumerable<string> paths=null)
\r
209 //_batchQueue.Enqueue(paths);
\r
213 readonly ConcurrentQueue<IEnumerable<string>> _batchQueue=new ConcurrentQueue<IEnumerable<string>>();
\r
216 /// Remote files are polled periodically. Any changes are processed
\r
218 /// <param name="since"></param>
\r
219 /// <returns></returns>
\r
220 public async Task PollRemoteFiles(DateTime? since = null)
\r
222 if (Log.IsDebugEnabled)
\r
223 Log.DebugFormat("Polling changes after [{0}]",since);
\r
225 Debug.Assert(Thread.CurrentThread.IsBackground, "Polling Ended up in the main thread!");
\r
229 using (ThreadContext.Stacks["Retrieve Remote"].Push("All accounts"))
\r
231 //If this poll fails, we will retry with the same since value
\r
232 var nextSince = since;
\r
235 await _unPauseEvent.WaitAsync();
\r
236 UpdateStatus(PithosStatus.PollSyncing);
\r
238 var accountBatches=new Dictionary<Uri, IEnumerable<string>>();
\r
239 IEnumerable<string> batch = null;
\r
240 if (_batchQueue.TryDequeue(out batch) && batch != null)
\r
241 foreach (var account in _accounts.Values)
\r
243 var accountBatch = batch.Where(path => path.IsAtOrBelow(account.AccountPath));
\r
244 accountBatches[account.AccountKey] = accountBatch;
\r
248 IEnumerable<Task<DateTime?>> tasks = new List<Task<DateTime?>>();
\r
249 foreach(var accountInfo in _accounts.Values)
\r
251 IEnumerable<string> accountBatch ;
\r
252 accountBatches.TryGetValue(accountInfo.AccountKey,out accountBatch);
\r
253 ProcessAccountFiles (accountInfo, accountBatch, since);
\r
256 var nextTimes=await TaskEx.WhenAll(tasks.ToList());
\r
258 _firstPoll = false;
\r
259 //Reschedule the poll with the current timestamp as a "since" value
\r
261 if (nextTimes.Length>0)
\r
262 nextSince = nextTimes.Min();
\r
263 if (Log.IsDebugEnabled)
\r
264 Log.DebugFormat("Next Poll at [{0}]",nextSince);
\r
266 catch (Exception ex)
\r
268 Log.ErrorFormat("Error while processing accounts\r\n{0}", ex);
\r
269 //In case of failure retry with the same "since" value
\r
272 UpdateStatus(PithosStatus.PollComplete);
\r
273 //The multiple try blocks are required because we can't have an await call
\r
274 //inside a finally block
\r
275 //TODO: Find a more elegant solution for reschedulling in the event of an exception
\r
278 //Wait for the polling interval to pass or the Sync event to be signalled
\r
279 nextSince = await WaitForScheduledOrManualPoll(nextSince);
\r
283 //Ensure polling is scheduled even in case of error
\r
284 TaskEx.Run(() => PollRemoteFiles(nextSince));
\r
290 /// Wait for the polling period to expire or a manual sync request
\r
292 /// <param name="since"></param>
\r
293 /// <returns></returns>
\r
294 private async Task<DateTime?> WaitForScheduledOrManualPoll(DateTime? since)
\r
296 var sync = _syncEvent.WaitAsync();
\r
297 var wait = TaskEx.Delay(TimeSpan.FromSeconds(Settings.PollingInterval));
\r
299 var signaledTask = await TaskEx.WhenAny(sync, wait);
\r
301 //Pausing takes precedence over manual sync or awaiting
\r
302 _unPauseEvent.Wait();
\r
304 //Wait for network processing to finish before polling
\r
305 var pauseTask=NetworkAgent.ProceedEvent.WaitAsync();
\r
306 await TaskEx.WhenAll(signaledTask, pauseTask);
\r
308 //If polling is signalled by SynchNow, ignore the since tag
\r
309 if (sync.IsCompleted)
\r
311 //TODO: Must convert to AutoReset
\r
312 _syncEvent.Reset();
\r
318 public async Task<DateTime?> ProcessAccountFiles(AccountInfo accountInfo, IEnumerable<string> accountBatch, DateTime? since = null)
\r
320 if (accountInfo == null)
\r
321 throw new ArgumentNullException("accountInfo");
\r
322 if (String.IsNullOrWhiteSpace(accountInfo.AccountPath))
\r
323 throw new ArgumentException("The AccountInfo.AccountPath is empty", "accountInfo");
\r
324 Contract.EndContractBlock();
\r
327 using (ThreadContext.Stacks["Retrieve Remote"].Push(accountInfo.UserName))
\r
330 await NetworkAgent.GetDeleteAwaiter();
\r
332 Log.Info("Scheduled");
\r
333 var client = new CloudFilesClient(accountInfo);
\r
335 //We don't need to check the trash container
\r
336 var containers = client.ListContainers(accountInfo.UserName)
\r
337 .Where(c=>c.Name!="trash")
\r
341 CreateContainerFolders(accountInfo, containers);
\r
343 //The nextSince time fallback time is the same as the current.
\r
344 //If polling succeeds, the next Since time will be the smallest of the maximum modification times
\r
345 //of the shared and account objects
\r
346 var nextSince = since;
\r
350 //Wait for any deletions to finish
\r
351 await NetworkAgent.GetDeleteAwaiter();
\r
352 //Get the poll time now. We may miss some deletions but it's better to keep a file that was deleted
\r
353 //than delete a file that was created while we were executing the poll
\r
355 //Get the list of server objects changed since the last check
\r
356 //The name of the container is passed as state in order to create a dictionary of tasks in a subsequent step
\r
357 var listObjects = (from container in containers
\r
358 select Task<IList<ObjectInfo>>.Factory.StartNew(_ =>
\r
359 client.ListObjects(accountInfo.UserName, container.Name, since), container.Name)).ToList();
\r
361 var listShared = Task<IList<ObjectInfo>>.Factory.StartNew(_ =>
\r
362 client.ListSharedObjects(since), "shared");
\r
363 listObjects.Add(listShared);
\r
364 var listTasks = await Task.Factory.WhenAll(listObjects.ToArray());
\r
366 using (ThreadContext.Stacks["SCHEDULE"].Push("Process Results"))
\r
368 var dict = listTasks.ToDictionary(t => t.AsyncState);
\r
370 //Get all non-trash objects. Remember, the container name is stored in AsyncState
\r
371 var remoteObjects = (from objectList in listTasks
\r
372 where (string)objectList.AsyncState != "trash"
\r
373 from obj in objectList.Result
\r
374 orderby obj.Bytes ascending
\r
375 select obj).ToList();
\r
377 //Get the latest remote object modification date, only if it is after
\r
378 //the original since date
\r
379 nextSince = GetLatestDateAfter(nextSince, remoteObjects);
\r
381 var sharedObjects = dict["shared"].Result;
\r
382 nextSince = GetLatestDateBefore(nextSince, sharedObjects);
\r
384 //DON'T process trashed files
\r
385 //If some files are deleted and added again to a folder, they will be deleted
\r
386 //even though they are new.
\r
387 //We would have to check file dates and hashes to ensure that a trashed file
\r
388 //can be deleted safely from the local hard drive.
\r
390 //Items with the same name, hash may be both in the container and the trash
\r
391 //Don't delete items that exist in the container
\r
392 var realTrash = from trash in trashObjects
\r
394 !remoteObjects.Any(
\r
395 info => info.Name == trash.Name && info.Hash == trash.Hash)
\r
397 ProcessTrashedFiles(accountInfo, realTrash);
\r
400 var cleanRemotes = (from info in remoteObjects.Union(sharedObjects)
\r
401 let name = info.Name??""
\r
402 where !name.EndsWith(".ignore", StringComparison.InvariantCultureIgnoreCase) &&
\r
403 !name.StartsWith(FolderConstants.CacheFolder + "/",
\r
404 StringComparison.InvariantCultureIgnoreCase)
\r
405 select info).ToList();
\r
408 StatusKeeper.CleanupOrphanStates();
\r
409 StatusKeeper.CleanupStaleStates(accountInfo, cleanRemotes);
\r
411 //var differencer = _differencer.PostSnapshot(accountInfo, cleanRemotes);
\r
413 //var filterUris = Selectives.SelectiveUris[accountInfo.AccountKey];
\r
416 //Get the local files here
\r
417 var agent = AgentLocator<FileAgent>.Get(accountInfo.AccountPath);
\r
419 var files = LoadLocalFileTuples(accountInfo);
\r
421 var states = FileState.Queryable.ToList();
\r
424 var infos = (from remote in cleanRemotes
\r
425 let path = remote.RelativeUrlToFilePath(accountInfo.UserName)
\r
426 let info=agent.GetFileSystemInfo(path)
\r
427 select Tuple.Create(info.FullName,remote))
\r
430 var token = _currentOperationCancellation.Token;
\r
432 var tuples = MergeSources(infos, files, states).ToList();
\r
435 var stateTuples = accountBatch==null?tuples:tuples.Where(t => accountBatch.Contains(t.FilePath));
\r
436 foreach (var tuple in stateTuples)
\r
438 await _unPauseEvent.WaitAsync();
\r
440 //Set the Merkle Hash
\r
441 SetMerkleHash(accountInfo, tuple);
\r
443 SyncSingleItem(accountInfo, tuple, agent, token);
\r
452 MarkSuspectedDeletes(accountInfo, cleanRemotes);
\r
457 Log.Info("[LISTENER] End Processing");
\r
460 catch (Exception ex)
\r
462 Log.ErrorFormat("[FAIL] ListObjects for{0} in ProcessRemoteFiles with {1}", accountInfo.UserName, ex);
\r
466 Log.Info("[LISTENER] Finished");
\r
471 private static void SetMerkleHash(AccountInfo accountInfo, StateTuple tuple)
\r
473 //The Merkle hash for directories is that of an empty buffer
\r
474 if (tuple.FileInfo is DirectoryInfo)
\r
475 tuple.C = MERKLE_EMPTY;
\r
476 else if (tuple.FileState != null && tuple.MD5 == tuple.FileState.ShortHash)
\r
478 //If there is a state whose MD5 matches, load the merkle hash from the file state
\r
479 //insteaf of calculating it
\r
480 tuple.C = tuple.FileState.Checksum;
\r
484 tuple.Merkle = Signature.CalculateTreeHash(tuple.FileInfo, accountInfo.BlockSize, accountInfo.BlockHash);
\r
485 //tuple.C=tuple.Merkle.TopHash.ToHashString();
\r
489 private static List<Tuple<FileSystemInfo, string>> LoadLocalFileTuples(AccountInfo accountInfo)
\r
491 using (ThreadContext.Stacks["Account Files Hashing"].Push(accountInfo.UserName))
\r
494 var localInfos = AgentLocator<FileAgent>.Get(accountInfo.AccountPath).EnumerateFileSystemInfos();
\r
495 //Use the queue to retry locked file hashing
\r
496 var fileQueue = new Queue<FileSystemInfo>(localInfos);
\r
497 var hasher = MD5.Create();
\r
499 var results = new List<Tuple<FileSystemInfo, string>>();
\r
501 while (fileQueue.Count > 0)
\r
503 var file = fileQueue.Dequeue();
\r
504 using (ThreadContext.Stacks["File"].Push(file.FullName))
\r
507 Signature.CalculateTreeHash(file, accountInfo.BlockSize,
\r
508 accountInfo.BlockHash).
\r
509 TopHash.ToHashString()
\r
513 //Replace MD5 here, do the calc while syncing individual files
\r
515 if (file is DirectoryInfo)
\r
516 hash = MERKLE_EMPTY;
\r
519 using (StatusNotification.GetNotifier("Hashing ", "{0}", file.Name))
\r
520 using (var stream = (file as FileInfo).OpenRead())
\r
522 hash = hasher.ComputeHash(stream).ToHashString();
\r
525 results.Add(Tuple.Create(file, hash));
\r
527 catch (IOException exc)
\r
529 Log.WarnFormat("[HASH] File in use, will retry [{0}]", exc);
\r
530 fileQueue.Enqueue(file);
\r
539 private void SyncSingleItem(AccountInfo accountInfo, StateTuple tuple, FileAgent agent, CancellationToken token)
\r
541 Log.DebugFormat("Sync [{0}] C:[{1}] L:[{2}] S:[{3}]",tuple.FilePath,tuple.C,tuple.L,tuple.S);
\r
543 var localFilePath = tuple.FilePath;
\r
544 //Don't use the tuple info, it may have been deleted
\r
545 var localInfo = FileInfoExtensions.FromPath(localFilePath);
\r
548 var isUnselectedRootFolder = agent.IsUnselectedRootFolder(tuple.FilePath);
\r
550 //Unselected root folders that have not yet been uploaded should be uploaded and added to the
\r
551 //selective folders
\r
553 if (!Selectives.IsSelected(accountInfo, localFilePath) && !(isUnselectedRootFolder && tuple.ObjectInfo==null) )
\r
556 // Local file unchanged? If both C and L are null, make sure it's because
\r
557 //both the file is missing and the state checksum is not missing
\r
558 if (tuple.C == tuple.L /*&& (localInfo.Exists || tuple.FileState == null)*/)
\r
561 //Server unchanged?
\r
562 if (tuple.S == tuple.L)
\r
564 // No server changes
\r
565 //Has the file been renamed on the server?
\r
566 MoveForServerMove(accountInfo, tuple);
\r
570 //Different from server
\r
571 //Does the server file exist?
\r
572 if (tuple.S == null)
\r
574 //Server file doesn't exist
\r
575 //deleteObjectFromLocal()
\r
576 StatusKeeper.SetFileState(localFilePath, FileStatus.Deleted,
\r
577 FileOverlayStatus.Deleted, "");
\r
578 agent.Delete(localFilePath);
\r
579 //updateRecord(Remove C, L)
\r
580 StatusKeeper.ClearFileStatus(localFilePath);
\r
584 //Server file exists
\r
585 //downloadServerObject() // Result: L = S
\r
586 //If the file has moved on the server, move it locally before downloading
\r
587 var targetPath = MoveForServerMove(accountInfo, tuple);
\r
589 StatusKeeper.SetFileState(targetPath, FileStatus.Modified,
\r
590 FileOverlayStatus.Modified, "");
\r
591 NetworkAgent.Downloader.DownloadCloudFile(accountInfo,
\r
593 targetPath, tuple.Merkle, token).Wait(token);
\r
594 //updateRecord( L = S )
\r
595 StatusKeeper.UpdateFileChecksum(targetPath, tuple.ObjectInfo.ETag,
\r
596 tuple.ObjectInfo.X_Object_Hash);
\r
598 StatusKeeper.StoreInfo(targetPath, tuple.ObjectInfo);
\r
601 StatusKeeper.SetFileState(targetPath, FileStatus.Unchanged,
\r
602 FileOverlayStatus.Normal, "");
\r
610 //Local changes found
\r
612 //Server unchanged?
\r
613 if (tuple.S == tuple.L)
\r
615 //The FileAgent selective sync checks for new root folder files
\r
616 if (!agent.Ignore(localFilePath))
\r
618 if ((tuple.C == null || !localInfo.Exists) && tuple.ObjectInfo != null)
\r
620 //deleteObjectFromServer()
\r
621 DeleteCloudFile(accountInfo, tuple);
\r
622 //updateRecord( Remove L, S)
\r
626 //uploadLocalObject() // Result: S = C, L = S
\r
628 //Debug.Assert(tuple.FileState !=null);
\r
629 var action = new CloudUploadAction(accountInfo, localInfo, tuple.FileState,
\r
630 accountInfo.BlockSize, accountInfo.BlockHash,
\r
631 "Poll", isUnselectedRootFolder);
\r
632 NetworkAgent.Uploader.UploadCloudFile(action, tuple.Merkle, token).Wait(token);
\r
634 //updateRecord( S = C )
\r
635 //State updated by the uploader
\r
637 if (isUnselectedRootFolder)
\r
639 ProcessChildren(accountInfo, tuple, agent, token);
\r
646 if (tuple.C == tuple.S)
\r
648 // (Identical Changes) Result: L = S
\r
650 //Detect server moves
\r
651 var targetPath = MoveForServerMove(accountInfo, tuple);
\r
652 StatusKeeper.StoreInfo(targetPath, tuple.ObjectInfo);
\r
656 if ((tuple.C == null || !localInfo.Exists) && tuple.ObjectInfo != null)
\r
658 //deleteObjectFromServer()
\r
659 DeleteCloudFile(accountInfo, tuple);
\r
660 //updateRecord(Remove L, S)
\r
662 //If both the local and server files are missing, the state is stale
\r
663 else if (!localInfo.Exists && (tuple.S == null || tuple.ObjectInfo == null))
\r
665 StatusKeeper.ClearFileStatus(localInfo.FullName);
\r
669 ReportConflictForMismatch(localFilePath);
\r
670 //identifyAsConflict() // Manual action required
\r
677 private string MoveForServerMove(AccountInfo accountInfo, StateTuple tuple)
\r
679 if (tuple.ObjectInfo == null)
\r
681 var relativePath = tuple.ObjectInfo.RelativeUrlToFilePath(accountInfo.UserName);
\r
682 var serverPath = Path.Combine(accountInfo.AccountPath, relativePath);
\r
684 //Compare Case Insensitive
\r
685 if (String.Equals(tuple.FilePath ,serverPath,StringComparison.InvariantCultureIgnoreCase)) return serverPath;
\r
687 if (tuple.FileInfo.Exists)
\r
689 var fi = tuple.FileInfo as FileInfo;
\r
691 fi.MoveTo(serverPath);
\r
692 var di = tuple.FileInfo as DirectoryInfo;
\r
694 di.MoveTo(serverPath);
\r
695 StatusKeeper.StoreInfo(serverPath, tuple.ObjectInfo);
\r
699 Debug.Assert(false, "File does not exist");
\r
704 private void DeleteCloudFile(AccountInfo accountInfo, StateTuple tuple)
\r
706 StatusKeeper.SetFileState(tuple.FilePath, FileStatus.Deleted,
\r
707 FileOverlayStatus.Deleted, "");
\r
708 NetworkAgent.DeleteAgent.DeleteCloudFile(accountInfo, tuple.ObjectInfo);
\r
709 StatusKeeper.ClearFileStatus(tuple.FilePath);
\r
712 private void ProcessChildren(AccountInfo accountInfo, StateTuple tuple, FileAgent agent, CancellationToken token)
\r
715 var dirInfo = tuple.FileInfo as DirectoryInfo;
\r
716 var folderTuples = from folder in dirInfo.EnumerateDirectories("*", SearchOption.AllDirectories)
\r
717 select new StateTuple(folder);
\r
718 var fileTuples = from file in dirInfo.EnumerateFiles("*", SearchOption.AllDirectories)
\r
719 select new StateTuple(file);
\r
721 //Process folders first, to ensure folders appear on the sever as soon as possible
\r
722 folderTuples.ApplyAction(t => SyncSingleItem(accountInfo, t, agent, token));
\r
724 fileTuples.ApplyAction(t => SyncSingleItem(accountInfo, t, agent, token));
\r
727 private static IEnumerable<StateTuple> MergeSources(
\r
728 IEnumerable<Tuple<string, ObjectInfo>> infos,
\r
729 IEnumerable<Tuple<FileSystemInfo, string>> files,
\r
730 IEnumerable<FileState> states)
\r
732 var tuplesByPath = new Dictionary<string, StateTuple>();
\r
733 foreach (var file in files)
\r
735 var fsInfo = file.Item1;
\r
736 var fileHash = fsInfo is DirectoryInfo? MERKLE_EMPTY:file.Item2;
\r
738 tuplesByPath[fsInfo.FullName] = new StateTuple {FileInfo = fsInfo, MD5 = fileHash};
\r
740 foreach (var state in states)
\r
742 StateTuple hashTuple;
\r
743 if (tuplesByPath.TryGetValue(state.FilePath, out hashTuple))
\r
745 hashTuple.FileState = state;
\r
749 var fsInfo = FileInfoExtensions.FromPath(state.FilePath);
\r
750 tuplesByPath[state.FilePath] = new StateTuple {FileInfo = fsInfo, FileState = state};
\r
754 var tuplesByID = tuplesByPath.Values
\r
755 .Where(tuple => tuple.FileState != null && tuple.FileState.ObjectID!=null)
\r
756 .ToDictionary(tuple=>tuple.FileState.ObjectID,tuple=>tuple);//new Dictionary<Guid, StateTuple>();
\r
758 foreach (var info in infos)
\r
760 StateTuple hashTuple;
\r
761 var filePath = info.Item1;
\r
762 var objectInfo = info.Item2;
\r
763 var objectID = objectInfo.UUID;
\r
765 if (tuplesByID.TryGetValue(objectID, out hashTuple))
\r
767 hashTuple.ObjectInfo = objectInfo;
\r
769 else if (tuplesByPath.TryGetValue(filePath, out hashTuple))
\r
771 hashTuple.ObjectInfo = objectInfo;
\r
775 var fsInfo = FileInfoExtensions.FromPath(filePath);
\r
776 var tuple = new StateTuple {FileInfo = fsInfo, ObjectInfo = objectInfo};
\r
777 tuplesByPath[filePath] = tuple;
\r
778 tuplesByID[objectInfo.UUID] = tuple;
\r
781 return tuplesByPath.Values;
\r
785 /// Returns the latest LastModified date from the list of objects, but only if it is before
\r
786 /// than the threshold value
\r
788 /// <param name="threshold"></param>
\r
789 /// <param name="cloudObjects"></param>
\r
790 /// <returns></returns>
\r
791 private static DateTime? GetLatestDateBefore(DateTime? threshold, IList<ObjectInfo> cloudObjects)
\r
793 DateTime? maxDate = null;
\r
794 if (cloudObjects!=null && cloudObjects.Count > 0)
\r
795 maxDate = cloudObjects.Max(obj => obj.Last_Modified);
\r
796 if (maxDate == null || maxDate == DateTime.MinValue)
\r
798 if (threshold == null || threshold == DateTime.MinValue || threshold > maxDate)
\r
804 /// Returns the latest LastModified date from the list of objects, but only if it is after
\r
805 /// the threshold value
\r
807 /// <param name="threshold"></param>
\r
808 /// <param name="cloudObjects"></param>
\r
809 /// <returns></returns>
\r
810 private static DateTime? GetLatestDateAfter(DateTime? threshold, IList<ObjectInfo> cloudObjects)
\r
812 DateTime? maxDate = null;
\r
813 if (cloudObjects!=null && cloudObjects.Count > 0)
\r
814 maxDate = cloudObjects.Max(obj => obj.Last_Modified);
\r
815 if (maxDate == null || maxDate == DateTime.MinValue)
\r
817 if (threshold == null || threshold == DateTime.MinValue || threshold < maxDate)
\r
822 readonly AccountsDifferencer _differencer = new AccountsDifferencer();
\r
823 private Dictionary<Uri, List<Uri>> _selectiveUris = new Dictionary<Uri, List<Uri>>();
\r
824 private bool _pause;
\r
825 private static string MERKLE_EMPTY = "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855";
\r
828 /// Deletes local files that are not found in the list of cloud files
\r
830 /// <param name="accountInfo"></param>
\r
831 /// <param name="cloudFiles"></param>
\r
832 private void ProcessDeletedFiles(AccountInfo accountInfo, IEnumerable<ObjectInfo> cloudFiles)
\r
834 if (accountInfo == null)
\r
835 throw new ArgumentNullException("accountInfo");
\r
836 if (String.IsNullOrWhiteSpace(accountInfo.AccountPath))
\r
837 throw new ArgumentException("The AccountInfo.AccountPath is empty", "accountInfo");
\r
838 if (cloudFiles == null)
\r
839 throw new ArgumentNullException("cloudFiles");
\r
840 Contract.EndContractBlock();
\r
842 var deletedFiles = new List<FileSystemInfo>();
\r
843 foreach (var objectInfo in cloudFiles)
\r
845 if (Log.IsDebugEnabled)
\r
846 Log.DebugFormat("Handle deleted [{0}]", objectInfo.Uri);
\r
847 var relativePath = objectInfo.RelativeUrlToFilePath(accountInfo.UserName);
\r
848 var item = FileAgent.GetFileAgent(accountInfo).GetFileSystemInfo(relativePath);
\r
849 if (Log.IsDebugEnabled)
\r
850 Log.DebugFormat("Will delete [{0}] for [{1}]", item.FullName, objectInfo.Uri);
\r
853 if ((item.Attributes & FileAttributes.ReadOnly) == FileAttributes.ReadOnly)
\r
855 item.Attributes = item.Attributes & ~FileAttributes.ReadOnly;
\r
860 Log.DebugFormat("Deleting {0}", item.FullName);
\r
862 var directory = item as DirectoryInfo;
\r
863 if (directory != null)
\r
864 directory.Delete(true);
\r
867 Log.DebugFormat("Deleted [{0}] for [{1}]", item.FullName, objectInfo.Uri);
\r
869 _lastSeen.TryRemove(item.FullName, out lastDate);
\r
870 deletedFiles.Add(item);
\r
872 StatusKeeper.SetFileState(item.FullName, FileStatus.Deleted, FileOverlayStatus.Deleted, "File Deleted");
\r
874 Log.InfoFormat("[{0}] files were deleted", deletedFiles.Count);
\r
875 StatusNotification.NotifyForFiles(deletedFiles, String.Format("{0} files were deleted", deletedFiles.Count),
\r
880 private void MarkSuspectedDeletes(AccountInfo accountInfo, IEnumerable<ObjectInfo> cloudFiles)
\r
882 //Only consider files that are not being modified, ie they are in the Unchanged state
\r
883 var deleteCandidates = FileState.Queryable.Where(state =>
\r
884 state.FilePath.StartsWith(accountInfo.AccountPath)
\r
885 && state.FileStatus == FileStatus.Unchanged).ToList();
\r
888 //TODO: filesToDelete must take into account the Others container
\r
889 var filesToDelete = (from deleteCandidate in deleteCandidates
\r
890 let localFile = FileInfoExtensions.FromPath(deleteCandidate.FilePath)
\r
891 let relativeFilePath = localFile.AsRelativeTo(accountInfo.AccountPath)
\r
893 !cloudFiles.Any(r => r.RelativeUrlToFilePath(accountInfo.UserName) == relativeFilePath)
\r
894 select localFile).ToList();
\r
897 //Set the status of missing files to Conflict
\r
898 foreach (var item in filesToDelete)
\r
900 //Try to acquire a gate on the file, to take into account files that have been dequeued
\r
901 //and are being processed
\r
902 using (var gate = NetworkGate.Acquire(item.FullName, NetworkOperation.Deleting))
\r
906 StatusKeeper.SetFileState(item.FullName, FileStatus.Conflict, FileOverlayStatus.Deleted,
\r
907 "Local file missing from server");
\r
910 UpdateStatus(PithosStatus.HasConflicts);
\r
911 StatusNotification.NotifyConflicts(filesToDelete,
\r
913 "{0} local files are missing from Pithos, possibly because they were deleted",
\r
914 filesToDelete.Count));
\r
915 StatusNotification.NotifyForFiles(filesToDelete, String.Format("{0} files were deleted", filesToDelete.Count),
\r
919 private void ReportConflictForMismatch(string localFilePath)
\r
921 if (String.IsNullOrWhiteSpace(localFilePath))
\r
922 throw new ArgumentNullException("localFilePath");
\r
923 Contract.EndContractBlock();
\r
925 StatusKeeper.SetFileState(localFilePath, FileStatus.Conflict, FileOverlayStatus.Conflict, "File changed at the server");
\r
926 UpdateStatus(PithosStatus.HasConflicts);
\r
927 var message = String.Format("Conflict detected for file {0}", localFilePath);
\r
929 StatusNotification.NotifyChange(message, TraceLevel.Warning);
\r
935 /// Creates a Sync action for each changed server file
\r
937 /// <param name="accountInfo"></param>
\r
938 /// <param name="changes"></param>
\r
939 /// <returns></returns>
\r
940 private IEnumerable<CloudAction> ChangesToActions(AccountInfo accountInfo, IEnumerable<ObjectInfo> changes)
\r
942 if (changes == null)
\r
943 throw new ArgumentNullException();
\r
944 Contract.EndContractBlock();
\r
945 var fileAgent = FileAgent.GetFileAgent(accountInfo);
\r
947 //In order to avoid multiple iterations over the files, we iterate only once
\r
948 //over the remote files
\r
949 foreach (var objectInfo in changes)
\r
951 var relativePath = objectInfo.RelativeUrlToFilePath(accountInfo.UserName);
\r
952 //If a directory object already exists, we may need to sync it
\r
953 if (fileAgent.Exists(relativePath))
\r
955 var localFile = fileAgent.GetFileSystemInfo(relativePath);
\r
956 //We don't need to sync directories
\r
957 if (objectInfo.IsDirectory && localFile is DirectoryInfo)
\r
959 using (new SessionScope(FlushAction.Never))
\r
961 var state = StatusKeeper.GetStateByFilePath(localFile.FullName);
\r
962 _lastSeen[localFile.FullName] = DateTime.Now;
\r
963 //Common files should be checked on a per-case basis to detect differences, which is newer
\r
965 yield return new CloudAction(accountInfo, CloudActionType.MustSynch,
\r
966 localFile, objectInfo, state, accountInfo.BlockSize,
\r
967 accountInfo.BlockHash,"Poll Changes");
\r
972 //Remote files should be downloaded
\r
973 yield return new CloudDownloadAction(accountInfo, objectInfo,"Poll Changes");
\r
979 /// Creates a Local Move action for each moved server file
\r
981 /// <param name="accountInfo"></param>
\r
982 /// <param name="moves"></param>
\r
983 /// <returns></returns>
\r
984 private IEnumerable<CloudAction> MovesToActions(AccountInfo accountInfo, IEnumerable<ObjectInfo> moves)
\r
987 throw new ArgumentNullException();
\r
988 Contract.EndContractBlock();
\r
989 var fileAgent = FileAgent.GetFileAgent(accountInfo);
\r
991 //In order to avoid multiple iterations over the files, we iterate only once
\r
992 //over the remote files
\r
993 foreach (var objectInfo in moves)
\r
995 var previousRelativepath = objectInfo.Previous.RelativeUrlToFilePath(accountInfo.UserName);
\r
996 //If the previous file already exists, we can execute a Move operation
\r
997 if (fileAgent.Exists(previousRelativepath))
\r
999 var previousFile = fileAgent.GetFileSystemInfo(previousRelativepath);
\r
1000 using (new SessionScope(FlushAction.Never))
\r
1002 var state = StatusKeeper.GetStateByFilePath(previousFile.FullName);
\r
1003 _lastSeen[previousFile.FullName] = DateTime.Now;
\r
1005 //For each moved object we need to move both the local file and update
\r
1006 yield return new CloudAction(accountInfo, CloudActionType.RenameLocal,
\r
1007 previousFile, objectInfo, state, accountInfo.BlockSize,
\r
1008 accountInfo.BlockHash,"Poll Moves");
\r
1009 //For modified files, we need to download the changes as well
\r
1010 if (objectInfo.X_Object_Hash != objectInfo.PreviousHash)
\r
1011 yield return new CloudDownloadAction(accountInfo,objectInfo, "Poll Moves");
\r
1014 //If the previous file does not exist, we need to download it in the new location
\r
1017 //Remote files should be downloaded
\r
1018 yield return new CloudDownloadAction(accountInfo, objectInfo, "Poll Moves");
\r
1025 /// Creates a download action for each new server file
\r
1027 /// <param name="accountInfo"></param>
\r
1028 /// <param name="creates"></param>
\r
1029 /// <returns></returns>
\r
1030 private IEnumerable<CloudAction> CreatesToActions(AccountInfo accountInfo, IEnumerable<ObjectInfo> creates)
\r
1032 if (creates == null)
\r
1033 throw new ArgumentNullException();
\r
1034 Contract.EndContractBlock();
\r
1035 var fileAgent = FileAgent.GetFileAgent(accountInfo);
\r
1037 //In order to avoid multiple iterations over the files, we iterate only once
\r
1038 //over the remote files
\r
1039 foreach (var objectInfo in creates)
\r
1041 if (Log.IsDebugEnabled)
\r
1042 Log.DebugFormat("[NEW INFO] {0}",objectInfo.Uri);
\r
1044 var relativePath = objectInfo.RelativeUrlToFilePath(accountInfo.UserName);
\r
1046 //If the object already exists, we should check before uploading or downloading
\r
1047 if (fileAgent.Exists(relativePath))
\r
1049 var localFile= fileAgent.GetFileSystemInfo(relativePath);
\r
1050 var state = StatusKeeper.GetStateByFilePath(localFile.WithProperCapitalization().FullName);
\r
1051 yield return new CloudAction(accountInfo, CloudActionType.MustSynch,
\r
1052 localFile, objectInfo, state, accountInfo.BlockSize,
\r
1053 accountInfo.BlockHash,"Poll Creates");
\r
1057 //Remote files should be downloaded
\r
1058 yield return new CloudDownloadAction(accountInfo, objectInfo,"Poll Creates");
\r
1065 /// Notify the UI to update the visual status
\r
1067 /// <param name="status"></param>
\r
1068 private void UpdateStatus(PithosStatus status)
\r
1072 StatusNotification.SetPithosStatus(status);
\r
1073 //StatusNotification.Notify(new Notification());
\r
1075 catch (Exception exc)
\r
1077 //Failure is not critical, just log it
\r
1078 Log.Warn("Error while updating status", exc);
\r
1082 private static void CreateContainerFolders(AccountInfo accountInfo, IEnumerable<ContainerInfo> containers)
\r
1084 var containerPaths = from container in containers
\r
1085 let containerPath = Path.Combine(accountInfo.AccountPath, container.Name)
\r
1086 where container.Name != FolderConstants.TrashContainer && !Directory.Exists(containerPath)
\r
1087 select containerPath;
\r
1089 foreach (var path in containerPaths)
\r
1091 Directory.CreateDirectory(path);
\r
1095 public void AddAccount(AccountInfo accountInfo)
\r
1097 //Avoid adding a duplicate accountInfo
\r
1098 _accounts.TryAdd(accountInfo.AccountKey, accountInfo);
\r
1101 public void RemoveAccount(AccountInfo accountInfo)
\r
1103 AccountInfo account;
\r
1104 _accounts.TryRemove(accountInfo.AccountKey, out account);
\r
1106 SnapshotDifferencer differencer;
\r
1107 _differencer.Differencers.TryRemove(accountInfo.AccountKey, out differencer);
\r
1110 public void SetSelectivePaths(AccountInfo accountInfo,Uri[] added, Uri[] removed)
\r
1112 AbortRemovedPaths(accountInfo,removed);
\r
1113 //DownloadNewPaths(accountInfo,added);
\r
1117 private void DownloadNewPaths(AccountInfo accountInfo, Uri[] added)
\r
1119 var client = new CloudFilesClient(accountInfo);
\r
1120 foreach (var folderUri in added)
\r
1127 var segmentsCount = folderUri.Segments.Length;
\r
1128 //Is this an account URL?
\r
1129 if (segmentsCount < 3)
\r
1131 //Is this a container or folder URL?
\r
1132 if (segmentsCount == 3)
\r
1134 account = folderUri.Segments[1].TrimEnd('/');
\r
1135 container = folderUri.Segments[2].TrimEnd('/');
\r
1139 account = folderUri.Segments[2].TrimEnd('/');
\r
1140 container = folderUri.Segments[3].TrimEnd('/');
\r
1142 IList<ObjectInfo> items;
\r
1143 if (segmentsCount > 3)
\r
1146 var folder = String.Join("", folderUri.Segments.Splice(4));
\r
1147 items = client.ListObjects(account, container, folder);
\r
1152 items = client.ListObjects(account, container);
\r
1154 var actions = CreatesToActions(accountInfo, items);
\r
1155 foreach (var action in actions)
\r
1157 NetworkAgent.Post(action);
\r
1160 catch (Exception exc)
\r
1162 Log.WarnFormat("Listing of new selective path [{0}] failed with \r\n{1}", folderUri, exc);
\r
1166 //Need to get a listing of each of the URLs, then post them to the NetworkAgent
\r
1167 //CreatesToActions(accountInfo,)
\r
1169 /* NetworkAgent.Post();#1#
\r
1173 private void AbortRemovedPaths(AccountInfo accountInfo, Uri[] removed)
\r
1175 /*this.NetworkAgent.*/
\r