2 /* -----------------------------------------------------------------------
\r
3 * <copyright file="PollAgent.cs" company="GRNet">
\r
5 * Copyright 2011-2012 GRNET S.A. All rights reserved.
\r
7 * Redistribution and use in source and binary forms, with or
\r
8 * without modification, are permitted provided that the following
\r
9 * conditions are met:
\r
11 * 1. Redistributions of source code must retain the above
\r
12 * copyright notice, this list of conditions and the following
\r
15 * 2. Redistributions in binary form must reproduce the above
\r
16 * copyright notice, this list of conditions and the following
\r
17 * disclaimer in the documentation and/or other materials
\r
18 * provided with the distribution.
\r
21 * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
\r
22 * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
\r
23 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
\r
24 * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
\r
25 * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
\r
26 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
\r
27 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
\r
28 * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
\r
29 * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
\r
30 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
\r
31 * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
\r
32 * POSSIBILITY OF SUCH DAMAGE.
\r
34 * The views and conclusions contained in the software and
\r
35 * documentation are those of the authors and should not be
\r
36 * interpreted as representing official policies, either expressed
\r
37 * or implied, of GRNET S.A.
\r
39 * -----------------------------------------------------------------------
\r
43 using System.Collections.Concurrent;
\r
44 using System.ComponentModel.Composition;
\r
45 using System.Diagnostics;
\r
46 using System.Diagnostics.Contracts;
\r
48 using System.Linq.Expressions;
\r
49 using System.Reflection;
\r
50 using System.Security.Cryptography;
\r
51 using System.Threading;
\r
52 using System.Threading.Tasks;
\r
53 using Castle.ActiveRecord;
\r
54 using Pithos.Interfaces;
\r
55 using Pithos.Network;
\r
58 namespace Pithos.Core.Agents
\r
61 using System.Collections.Generic;
\r
64 [DebuggerDisplay("{FilePath} C:{C} L:{L} S:{S}")]
\r
65 public class StateTuple
\r
67 public string FilePath { get; private set; }
\r
69 public string MD5 { get; set; }
\r
73 get { return FileState==null?null:FileState.Checksum; }
\r
81 _c = String.IsNullOrWhiteSpace(value) ? null : value;
\r
87 get { return ObjectInfo == null ? null : ObjectInfo.X_Object_Hash; }
\r
90 private FileSystemInfo _fileInfo;
\r
91 private TreeHash _merkle;
\r
93 public FileSystemInfo FileInfo
\r
95 get { return _fileInfo; }
\r
99 FilePath = value.FullName;
\r
103 public FileState FileState { get; set; }
\r
104 public ObjectInfo ObjectInfo{ get; set; }
\r
107 public TreeHash Merkle
\r
114 C = _merkle.TopHash.ToHashString();
\r
118 public StateTuple() { }
\r
120 public StateTuple(FileSystemInfo info)
\r
130 /// PollAgent periodically polls the server to detect object changes. The agent retrieves a listing of all
\r
131 /// objects and compares it with a previously cached version to detect differences.
\r
132 /// New files are downloaded, missing files are deleted from the local file system and common files are compared
\r
133 /// to determine the appropriate action
\r
136 public class PollAgent
\r
138 private static readonly ILog Log = LogManager.GetLogger(MethodBase.GetCurrentMethod().DeclaringType);
\r
140 [System.ComponentModel.Composition.Import]
\r
141 public IStatusKeeper StatusKeeper { get; set; }
\r
143 [System.ComponentModel.Composition.Import]
\r
144 public IPithosSettings Settings { get; set; }
\r
146 [System.ComponentModel.Composition.Import]
\r
147 public NetworkAgent NetworkAgent { get; set; }
\r
149 [System.ComponentModel.Composition.Import]
\r
150 public Selectives Selectives { get; set; }
\r
152 public IStatusNotification StatusNotification { get; set; }
\r
154 private CancellationTokenSource _currentOperationCancellation = new CancellationTokenSource();
\r
156 public void CancelCurrentOperation()
\r
158 //What does it mean to cancel the current upload/download?
\r
159 //Obviously, the current operation will be cancelled by throwing
\r
160 //a cancellation exception.
\r
162 //The default behavior is to retry any operations that throw.
\r
163 //Obviously this is not what we want in this situation.
\r
164 //The cancelled operation should NOT bea retried.
\r
166 //This can be done by catching the cancellation exception
\r
167 //and avoiding the retry.
\r
170 //Have to reset the cancellation source - it is not possible to reset the source
\r
171 //Have to prevent a case where an operation requests a token from the old source
\r
172 var oldSource = Interlocked.Exchange(ref _currentOperationCancellation, new CancellationTokenSource());
\r
173 oldSource.Cancel();
\r
185 _unPauseEvent.Set();
\r
188 _unPauseEvent.Reset();
\r
193 private bool _firstPoll = true;
\r
195 //The Sync Event signals a manual synchronisation
\r
196 private readonly AsyncManualResetEvent _syncEvent = new AsyncManualResetEvent();
\r
198 private readonly AsyncManualResetEvent _unPauseEvent = new AsyncManualResetEvent(true);
\r
200 private readonly ConcurrentDictionary<string, DateTime> _lastSeen = new ConcurrentDictionary<string, DateTime>();
\r
201 private readonly ConcurrentDictionary<Uri, AccountInfo> _accounts = new ConcurrentDictionary<Uri,AccountInfo>();
\r
205 /// Start a manual synchronization
\r
207 public void SynchNow(IEnumerable<string> paths=null)
\r
209 _batchQueue.Enqueue(paths);
\r
213 readonly ConcurrentQueue<IEnumerable<string>> _batchQueue=new ConcurrentQueue<IEnumerable<string>>();
\r
216 /// Remote files are polled periodically. Any changes are processed
\r
218 /// <param name="since"></param>
\r
219 /// <returns></returns>
\r
220 public async Task PollRemoteFiles(DateTime? since = null)
\r
222 if (Log.IsDebugEnabled)
\r
223 Log.DebugFormat("Polling changes after [{0}]",since);
\r
225 Debug.Assert(Thread.CurrentThread.IsBackground, "Polling Ended up in the main thread!");
\r
229 using (ThreadContext.Stacks["Retrieve Remote"].Push("All accounts"))
\r
231 //If this poll fails, we will retry with the same since value
\r
232 var nextSince = since;
\r
235 await _unPauseEvent.WaitAsync();
\r
236 UpdateStatus(PithosStatus.PollSyncing);
\r
238 var accountBatches=new Dictionary<Uri, IEnumerable<string>>();
\r
239 IEnumerable<string> batch = null;
\r
240 if (_batchQueue.TryDequeue(out batch) && batch != null)
\r
241 foreach (var account in _accounts.Values)
\r
243 var accountBatch = batch.Where(path => path.IsAtOrBelow(account.AccountPath));
\r
244 accountBatches[account.AccountKey] = accountBatch;
\r
248 IEnumerable<Task<DateTime?>> tasks = new List<Task<DateTime?>>();
\r
249 foreach(var accountInfo in _accounts.Values)
\r
251 IEnumerable<string> accountBatch ;
\r
252 accountBatches.TryGetValue(accountInfo.AccountKey,out accountBatch);
\r
253 ProcessAccountFiles (accountInfo, accountBatch, since);
\r
256 var nextTimes=await TaskEx.WhenAll(tasks.ToList());
\r
258 _firstPoll = false;
\r
259 //Reschedule the poll with the current timestamp as a "since" value
\r
261 if (nextTimes.Length>0)
\r
262 nextSince = nextTimes.Min();
\r
263 if (Log.IsDebugEnabled)
\r
264 Log.DebugFormat("Next Poll at [{0}]",nextSince);
\r
266 catch (Exception ex)
\r
268 Log.ErrorFormat("Error while processing accounts\r\n{0}", ex);
\r
269 //In case of failure retry with the same "since" value
\r
272 UpdateStatus(PithosStatus.PollComplete);
\r
273 //The multiple try blocks are required because we can't have an await call
\r
274 //inside a finally block
\r
275 //TODO: Find a more elegant solution for reschedulling in the event of an exception
\r
278 //Wait for the polling interval to pass or the Sync event to be signalled
\r
279 nextSince = await WaitForScheduledOrManualPoll(nextSince);
\r
283 //Ensure polling is scheduled even in case of error
\r
284 TaskEx.Run(() => PollRemoteFiles(nextSince));
\r
290 /// Wait for the polling period to expire or a manual sync request
\r
292 /// <param name="since"></param>
\r
293 /// <returns></returns>
\r
294 private async Task<DateTime?> WaitForScheduledOrManualPoll(DateTime? since)
\r
296 var sync = _syncEvent.WaitAsync();
\r
297 var wait = TaskEx.Delay(TimeSpan.FromSeconds(Settings.PollingInterval));
\r
299 var signaledTask = await TaskEx.WhenAny(sync, wait);
\r
301 //Pausing takes precedence over manual sync or awaiting
\r
302 _unPauseEvent.Wait();
\r
304 //Wait for network processing to finish before polling
\r
305 var pauseTask=NetworkAgent.ProceedEvent.WaitAsync();
\r
306 await TaskEx.WhenAll(signaledTask, pauseTask);
\r
308 //If polling is signalled by SynchNow, ignore the since tag
\r
309 if (sync.IsCompleted)
\r
311 //TODO: Must convert to AutoReset
\r
312 _syncEvent.Reset();
\r
318 public async Task<DateTime?> ProcessAccountFiles(AccountInfo accountInfo, IEnumerable<string> accountBatch, DateTime? since = null)
\r
320 if (accountInfo == null)
\r
321 throw new ArgumentNullException("accountInfo");
\r
322 if (String.IsNullOrWhiteSpace(accountInfo.AccountPath))
\r
323 throw new ArgumentException("The AccountInfo.AccountPath is empty", "accountInfo");
\r
324 Contract.EndContractBlock();
\r
327 using (ThreadContext.Stacks["Retrieve Remote"].Push(accountInfo.UserName))
\r
330 await NetworkAgent.GetDeleteAwaiter();
\r
332 Log.Info("Scheduled");
\r
333 var client = new CloudFilesClient(accountInfo);
\r
335 //We don't need to check the trash container
\r
336 var containers = client.ListContainers(accountInfo.UserName)
\r
337 .Where(c=>c.Name!="trash")
\r
341 CreateContainerFolders(accountInfo, containers);
\r
343 //The nextSince time fallback time is the same as the current.
\r
344 //If polling succeeds, the next Since time will be the smallest of the maximum modification times
\r
345 //of the shared and account objects
\r
346 var nextSince = since;
\r
350 //Wait for any deletions to finish
\r
351 await NetworkAgent.GetDeleteAwaiter();
\r
352 //Get the poll time now. We may miss some deletions but it's better to keep a file that was deleted
\r
353 //than delete a file that was created while we were executing the poll
\r
355 //Get the list of server objects changed since the last check
\r
356 //The name of the container is passed as state in order to create a dictionary of tasks in a subsequent step
\r
357 var listObjects = (from container in containers
\r
358 select Task<IList<ObjectInfo>>.Factory.StartNew(_ =>
\r
359 client.ListObjects(accountInfo.UserName, container.Name, since), container.Name)).ToList();
\r
361 var listShared = Task<IList<ObjectInfo>>.Factory.StartNew(_ =>
\r
362 client.ListSharedObjects(since), "shared");
\r
363 listObjects.Add(listShared);
\r
364 var listTasks = await Task.Factory.WhenAll(listObjects.ToArray());
\r
366 using (ThreadContext.Stacks["SCHEDULE"].Push("Process Results"))
\r
368 var dict = listTasks.ToDictionary(t => t.AsyncState);
\r
370 //Get all non-trash objects. Remember, the container name is stored in AsyncState
\r
371 var remoteObjects = (from objectList in listTasks
\r
372 where (string)objectList.AsyncState != "trash"
\r
373 from obj in objectList.Result
\r
374 orderby obj.Bytes ascending
\r
375 select obj).ToList();
\r
377 //Get the latest remote object modification date, only if it is after
\r
378 //the original since date
\r
379 nextSince = GetLatestDateAfter(nextSince, remoteObjects);
\r
381 var sharedObjects = dict["shared"].Result;
\r
382 nextSince = GetLatestDateBefore(nextSince, sharedObjects);
\r
384 //DON'T process trashed files
\r
385 //If some files are deleted and added again to a folder, they will be deleted
\r
386 //even though they are new.
\r
387 //We would have to check file dates and hashes to ensure that a trashed file
\r
388 //can be deleted safely from the local hard drive.
\r
390 //Items with the same name, hash may be both in the container and the trash
\r
391 //Don't delete items that exist in the container
\r
392 var realTrash = from trash in trashObjects
\r
394 !remoteObjects.Any(
\r
395 info => info.Name == trash.Name && info.Hash == trash.Hash)
\r
397 ProcessTrashedFiles(accountInfo, realTrash);
\r
400 var cleanRemotes = (from info in remoteObjects.Union(sharedObjects)
\r
401 let name = info.Name??""
\r
402 where !name.EndsWith(".ignore", StringComparison.InvariantCultureIgnoreCase) &&
\r
403 !name.StartsWith(FolderConstants.CacheFolder + "/",
\r
404 StringComparison.InvariantCultureIgnoreCase)
\r
405 select info).ToList();
\r
408 StatusKeeper.CleanupOrphanStates();
\r
409 StatusKeeper.CleanupStaleStates(accountInfo, cleanRemotes);
\r
411 //var differencer = _differencer.PostSnapshot(accountInfo, cleanRemotes);
\r
413 //var filterUris = Selectives.SelectiveUris[accountInfo.AccountKey];
\r
416 //Get the local files here
\r
417 var agent = AgentLocator<FileAgent>.Get(accountInfo.AccountPath);
\r
419 var files = LoadLocalFileTuples(accountInfo);
\r
421 var states = FileState.Queryable.ToList();
\r
424 var infos = (from remote in cleanRemotes
\r
425 let path = remote.RelativeUrlToFilePath(accountInfo.UserName)
\r
426 let info=agent.GetFileSystemInfo(path)
\r
427 select Tuple.Create(info.FullName,remote))
\r
430 var token = _currentOperationCancellation.Token;
\r
432 var tuples = MergeSources(infos, files, states).ToList();
\r
435 var stateTuples = accountBatch==null?tuples:tuples.Where(t => accountBatch.Contains(t.FilePath));
\r
436 foreach (var tuple in stateTuples)
\r
438 await _unPauseEvent.WaitAsync();
\r
440 //Set the Merkle Hash
\r
441 SetMerkleHash(accountInfo, tuple);
\r
443 SyncSingleItem(accountInfo, tuple, agent, token);
\r
452 MarkSuspectedDeletes(accountInfo, cleanRemotes);
\r
457 Log.Info("[LISTENER] End Processing");
\r
460 catch (Exception ex)
\r
462 Log.ErrorFormat("[FAIL] ListObjects for{0} in ProcessRemoteFiles with {1}", accountInfo.UserName, ex);
\r
466 Log.Info("[LISTENER] Finished");
\r
471 private static void SetMerkleHash(AccountInfo accountInfo, StateTuple tuple)
\r
473 //The Merkle hash for directories is that of an empty buffer
\r
474 if (tuple.FileInfo is DirectoryInfo)
\r
475 tuple.C = MERKLE_EMPTY;
\r
476 else if (tuple.FileState != null && tuple.MD5 == tuple.FileState.ShortHash)
\r
478 //If there is a state whose MD5 matches, load the merkle hash from the file state
\r
479 //insteaf of calculating it
\r
480 tuple.C = tuple.FileState.Checksum;
\r
484 tuple.Merkle = Signature.CalculateTreeHash(tuple.FileInfo, accountInfo.BlockSize, accountInfo.BlockHash);
\r
485 //tuple.C=tuple.Merkle.TopHash.ToHashString();
\r
489 private static List<Tuple<FileSystemInfo, string>> LoadLocalFileTuples(AccountInfo accountInfo)
\r
491 using (ThreadContext.Stacks["Account Files Hashing"].Push(accountInfo.UserName))
\r
494 var localInfos = AgentLocator<FileAgent>.Get(accountInfo.AccountPath).EnumerateFileSystemInfos();
\r
495 //Use the queue to retry locked file hashing
\r
496 var fileQueue = new Queue<FileSystemInfo>(localInfos);
\r
497 var hasher = MD5.Create();
\r
499 var results = new List<Tuple<FileSystemInfo, string>>();
\r
501 while (fileQueue.Count > 0)
\r
503 var file = fileQueue.Dequeue();
\r
504 using (ThreadContext.Stacks["File"].Push(file.FullName))
\r
507 Signature.CalculateTreeHash(file, accountInfo.BlockSize,
\r
508 accountInfo.BlockHash).
\r
509 TopHash.ToHashString()
\r
513 //Replace MD5 here, do the calc while syncing individual files
\r
515 if (file is DirectoryInfo)
\r
516 hash = MERKLE_EMPTY;
\r
519 using (var stream = (file as FileInfo).OpenRead())
\r
521 hash = hasher.ComputeHash(stream).ToHashString();
\r
524 results.Add(Tuple.Create(file, hash));
\r
526 catch (IOException exc)
\r
528 Log.WarnFormat("[HASH] File in use, will retry [{0}]", exc);
\r
529 fileQueue.Enqueue(file);
\r
538 private void SyncSingleItem(AccountInfo accountInfo, StateTuple tuple, FileAgent agent, CancellationToken token)
\r
540 Log.DebugFormat("Sync [{0}] C:[{1}] L:[{2}] S:[{3}]",tuple.FilePath,tuple.C,tuple.L,tuple.S);
\r
542 var localFilePath = tuple.FilePath;
\r
543 //Don't use the tuple info, it may have been deleted
\r
544 var localInfo = FileInfoExtensions.FromPath(localFilePath);
\r
547 var isUnselectedRootFolder = agent.IsUnselectedRootFolder(tuple.FilePath);
\r
549 //Unselected root folders that have not yet been uploaded should be uploaded and added to the
\r
550 //selective folders
\r
552 if (!Selectives.IsSelected(accountInfo, localFilePath) && !(isUnselectedRootFolder && tuple.ObjectInfo==null) )
\r
555 // Local file unchanged? If both C and L are null, make sure it's because
\r
556 //both the file is missing and the state checksum is not missing
\r
557 if (tuple.C == tuple.L /*&& (localInfo.Exists || tuple.FileState == null)*/)
\r
560 //Server unchanged?
\r
561 if (tuple.S == tuple.L)
\r
563 // No server changes
\r
564 //Has the file been renamed on the server?
\r
565 MoveForServerMove(accountInfo, tuple);
\r
569 //Different from server
\r
570 //Does the server file exist?
\r
571 if (tuple.S == null)
\r
573 //Server file doesn't exist
\r
574 //deleteObjectFromLocal()
\r
575 StatusKeeper.SetFileState(localFilePath, FileStatus.Deleted,
\r
576 FileOverlayStatus.Deleted, "");
\r
577 agent.Delete(localFilePath);
\r
578 //updateRecord(Remove C, L)
\r
579 StatusKeeper.ClearFileStatus(localFilePath);
\r
583 //Server file exists
\r
584 //downloadServerObject() // Result: L = S
\r
585 //If the file has moved on the server, move it locally before downloading
\r
586 var targetPath = MoveForServerMove(accountInfo, tuple);
\r
588 StatusKeeper.SetFileState(targetPath, FileStatus.Modified,
\r
589 FileOverlayStatus.Modified, "");
\r
590 NetworkAgent.Downloader.DownloadCloudFile(accountInfo,
\r
592 targetPath, tuple.Merkle, token).Wait(token);
\r
593 //updateRecord( L = S )
\r
594 StatusKeeper.UpdateFileChecksum(targetPath, tuple.ObjectInfo.ETag,
\r
595 tuple.ObjectInfo.X_Object_Hash);
\r
597 StatusKeeper.StoreInfo(targetPath, tuple.ObjectInfo);
\r
600 StatusKeeper.SetFileState(targetPath, FileStatus.Unchanged,
\r
601 FileOverlayStatus.Normal, "");
\r
609 //Local changes found
\r
611 //Server unchanged?
\r
612 if (tuple.S == tuple.L)
\r
614 //The FileAgent selective sync checks for new root folder files
\r
615 if (!agent.Ignore(localFilePath))
\r
617 if ((tuple.C == null || !localInfo.Exists) && tuple.ObjectInfo != null)
\r
619 //deleteObjectFromServer()
\r
620 DeleteCloudFile(accountInfo, tuple);
\r
621 //updateRecord( Remove L, S)
\r
625 //uploadLocalObject() // Result: S = C, L = S
\r
627 //Debug.Assert(tuple.FileState !=null);
\r
628 var action = new CloudUploadAction(accountInfo, localInfo, tuple.FileState,
\r
629 accountInfo.BlockSize, accountInfo.BlockHash,
\r
630 "Poll", isUnselectedRootFolder);
\r
631 NetworkAgent.Uploader.UploadCloudFile(action, tuple.Merkle, token).Wait(token);
\r
633 //updateRecord( S = C )
\r
634 //State updated by the uploader
\r
636 if (isUnselectedRootFolder)
\r
638 ProcessChildren(accountInfo, tuple, agent, token);
\r
645 if (tuple.C == tuple.S)
\r
647 // (Identical Changes) Result: L = S
\r
649 //Detect server moves
\r
650 var targetPath = MoveForServerMove(accountInfo, tuple);
\r
651 StatusKeeper.StoreInfo(targetPath, tuple.ObjectInfo);
\r
655 if ((tuple.C == null || !localInfo.Exists) && tuple.ObjectInfo != null)
\r
657 //deleteObjectFromServer()
\r
658 DeleteCloudFile(accountInfo, tuple);
\r
659 //updateRecord(Remove L, S)
\r
661 //If both the local and server files are missing, the state is stale
\r
662 else if (!localInfo.Exists && (tuple.S == null || tuple.ObjectInfo == null))
\r
664 StatusKeeper.ClearFileStatus(localInfo.FullName);
\r
668 ReportConflictForMismatch(localFilePath);
\r
669 //identifyAsConflict() // Manual action required
\r
676 private string MoveForServerMove(AccountInfo accountInfo, StateTuple tuple)
\r
678 if (tuple.ObjectInfo == null)
\r
680 var relativePath = tuple.ObjectInfo.RelativeUrlToFilePath(accountInfo.UserName);
\r
681 var serverPath = Path.Combine(accountInfo.AccountPath, relativePath);
\r
683 //Compare Case Insensitive
\r
684 if (String.Equals(tuple.FilePath ,serverPath,StringComparison.InvariantCultureIgnoreCase)) return serverPath;
\r
686 if (tuple.FileInfo.Exists)
\r
688 var fi = tuple.FileInfo as FileInfo;
\r
690 fi.MoveTo(serverPath);
\r
691 var di = tuple.FileInfo as DirectoryInfo;
\r
693 di.MoveTo(serverPath);
\r
694 StatusKeeper.StoreInfo(serverPath, tuple.ObjectInfo);
\r
698 Debug.Assert(false, "File does not exist");
\r
703 private void DeleteCloudFile(AccountInfo accountInfo, StateTuple tuple)
\r
705 StatusKeeper.SetFileState(tuple.FilePath, FileStatus.Deleted,
\r
706 FileOverlayStatus.Deleted, "");
\r
707 NetworkAgent.DeleteAgent.DeleteCloudFile(accountInfo, tuple.ObjectInfo);
\r
708 StatusKeeper.ClearFileStatus(tuple.FilePath);
\r
711 private void ProcessChildren(AccountInfo accountInfo, StateTuple tuple, FileAgent agent, CancellationToken token)
\r
714 var dirInfo = tuple.FileInfo as DirectoryInfo;
\r
715 var folderTuples = from folder in dirInfo.EnumerateDirectories("*", SearchOption.AllDirectories)
\r
716 select new StateTuple(folder);
\r
717 var fileTuples = from file in dirInfo.EnumerateFiles("*", SearchOption.AllDirectories)
\r
718 select new StateTuple(file);
\r
720 //Process folders first, to ensure folders appear on the sever as soon as possible
\r
721 folderTuples.ApplyAction(t => SyncSingleItem(accountInfo, t, agent, token));
\r
723 fileTuples.ApplyAction(t => SyncSingleItem(accountInfo, t, agent, token));
\r
726 private static IEnumerable<StateTuple> MergeSources(
\r
727 IEnumerable<Tuple<string, ObjectInfo>> infos,
\r
728 IEnumerable<Tuple<FileSystemInfo, string>> files,
\r
729 IEnumerable<FileState> states)
\r
731 var tuplesByPath = new Dictionary<string, StateTuple>();
\r
732 foreach (var file in files)
\r
734 var fsInfo = file.Item1;
\r
735 var fileHash = fsInfo is DirectoryInfo? MERKLE_EMPTY:file.Item2;
\r
737 tuplesByPath[fsInfo.FullName] = new StateTuple {FileInfo = fsInfo, MD5 = fileHash};
\r
739 foreach (var state in states)
\r
741 StateTuple hashTuple;
\r
742 if (tuplesByPath.TryGetValue(state.FilePath, out hashTuple))
\r
744 hashTuple.FileState = state;
\r
748 var fsInfo = FileInfoExtensions.FromPath(state.FilePath);
\r
749 tuplesByPath[state.FilePath] = new StateTuple {FileInfo = fsInfo, FileState = state};
\r
753 var tuplesByID = tuplesByPath.Values
\r
754 .Where(tuple => tuple.FileState != null && tuple.FileState.ObjectID!=null)
\r
755 .ToDictionary(tuple=>tuple.FileState.ObjectID,tuple=>tuple);//new Dictionary<Guid, StateTuple>();
\r
757 foreach (var info in infos)
\r
759 StateTuple hashTuple;
\r
760 var filePath = info.Item1;
\r
761 var objectInfo = info.Item2;
\r
762 var objectID = objectInfo.UUID;
\r
764 if (tuplesByID.TryGetValue(objectID, out hashTuple))
\r
766 hashTuple.ObjectInfo = objectInfo;
\r
768 else if (tuplesByPath.TryGetValue(filePath, out hashTuple))
\r
770 hashTuple.ObjectInfo = objectInfo;
\r
774 var fsInfo = FileInfoExtensions.FromPath(filePath);
\r
775 var tuple = new StateTuple {FileInfo = fsInfo, ObjectInfo = objectInfo};
\r
776 tuplesByPath[filePath] = tuple;
\r
777 tuplesByID[objectInfo.UUID] = tuple;
\r
780 return tuplesByPath.Values;
\r
784 /// Returns the latest LastModified date from the list of objects, but only if it is before
\r
785 /// than the threshold value
\r
787 /// <param name="threshold"></param>
\r
788 /// <param name="cloudObjects"></param>
\r
789 /// <returns></returns>
\r
790 private static DateTime? GetLatestDateBefore(DateTime? threshold, IList<ObjectInfo> cloudObjects)
\r
792 DateTime? maxDate = null;
\r
793 if (cloudObjects!=null && cloudObjects.Count > 0)
\r
794 maxDate = cloudObjects.Max(obj => obj.Last_Modified);
\r
795 if (maxDate == null || maxDate == DateTime.MinValue)
\r
797 if (threshold == null || threshold == DateTime.MinValue || threshold > maxDate)
\r
803 /// Returns the latest LastModified date from the list of objects, but only if it is after
\r
804 /// the threshold value
\r
806 /// <param name="threshold"></param>
\r
807 /// <param name="cloudObjects"></param>
\r
808 /// <returns></returns>
\r
809 private static DateTime? GetLatestDateAfter(DateTime? threshold, IList<ObjectInfo> cloudObjects)
\r
811 DateTime? maxDate = null;
\r
812 if (cloudObjects!=null && cloudObjects.Count > 0)
\r
813 maxDate = cloudObjects.Max(obj => obj.Last_Modified);
\r
814 if (maxDate == null || maxDate == DateTime.MinValue)
\r
816 if (threshold == null || threshold == DateTime.MinValue || threshold < maxDate)
\r
821 readonly AccountsDifferencer _differencer = new AccountsDifferencer();
\r
822 private Dictionary<Uri, List<Uri>> _selectiveUris = new Dictionary<Uri, List<Uri>>();
\r
823 private bool _pause;
\r
824 private static string MERKLE_EMPTY = "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855";
\r
827 /// Deletes local files that are not found in the list of cloud files
\r
829 /// <param name="accountInfo"></param>
\r
830 /// <param name="cloudFiles"></param>
\r
831 private void ProcessDeletedFiles(AccountInfo accountInfo, IEnumerable<ObjectInfo> cloudFiles)
\r
833 if (accountInfo == null)
\r
834 throw new ArgumentNullException("accountInfo");
\r
835 if (String.IsNullOrWhiteSpace(accountInfo.AccountPath))
\r
836 throw new ArgumentException("The AccountInfo.AccountPath is empty", "accountInfo");
\r
837 if (cloudFiles == null)
\r
838 throw new ArgumentNullException("cloudFiles");
\r
839 Contract.EndContractBlock();
\r
841 var deletedFiles = new List<FileSystemInfo>();
\r
842 foreach (var objectInfo in cloudFiles)
\r
844 if (Log.IsDebugEnabled)
\r
845 Log.DebugFormat("Handle deleted [{0}]", objectInfo.Uri);
\r
846 var relativePath = objectInfo.RelativeUrlToFilePath(accountInfo.UserName);
\r
847 var item = FileAgent.GetFileAgent(accountInfo).GetFileSystemInfo(relativePath);
\r
848 if (Log.IsDebugEnabled)
\r
849 Log.DebugFormat("Will delete [{0}] for [{1}]", item.FullName, objectInfo.Uri);
\r
852 if ((item.Attributes & FileAttributes.ReadOnly) == FileAttributes.ReadOnly)
\r
854 item.Attributes = item.Attributes & ~FileAttributes.ReadOnly;
\r
859 Log.DebugFormat("Deleting {0}", item.FullName);
\r
861 var directory = item as DirectoryInfo;
\r
862 if (directory != null)
\r
863 directory.Delete(true);
\r
866 Log.DebugFormat("Deleted [{0}] for [{1}]", item.FullName, objectInfo.Uri);
\r
868 _lastSeen.TryRemove(item.FullName, out lastDate);
\r
869 deletedFiles.Add(item);
\r
871 StatusKeeper.SetFileState(item.FullName, FileStatus.Deleted, FileOverlayStatus.Deleted, "File Deleted");
\r
873 Log.InfoFormat("[{0}] files were deleted", deletedFiles.Count);
\r
874 StatusNotification.NotifyForFiles(deletedFiles, String.Format("{0} files were deleted", deletedFiles.Count),
\r
879 private void MarkSuspectedDeletes(AccountInfo accountInfo, IEnumerable<ObjectInfo> cloudFiles)
\r
881 //Only consider files that are not being modified, ie they are in the Unchanged state
\r
882 var deleteCandidates = FileState.Queryable.Where(state =>
\r
883 state.FilePath.StartsWith(accountInfo.AccountPath)
\r
884 && state.FileStatus == FileStatus.Unchanged).ToList();
\r
887 //TODO: filesToDelete must take into account the Others container
\r
888 var filesToDelete = (from deleteCandidate in deleteCandidates
\r
889 let localFile = FileInfoExtensions.FromPath(deleteCandidate.FilePath)
\r
890 let relativeFilePath = localFile.AsRelativeTo(accountInfo.AccountPath)
\r
892 !cloudFiles.Any(r => r.RelativeUrlToFilePath(accountInfo.UserName) == relativeFilePath)
\r
893 select localFile).ToList();
\r
896 //Set the status of missing files to Conflict
\r
897 foreach (var item in filesToDelete)
\r
899 //Try to acquire a gate on the file, to take into account files that have been dequeued
\r
900 //and are being processed
\r
901 using (var gate = NetworkGate.Acquire(item.FullName, NetworkOperation.Deleting))
\r
905 StatusKeeper.SetFileState(item.FullName, FileStatus.Conflict, FileOverlayStatus.Deleted,
\r
906 "Local file missing from server");
\r
909 UpdateStatus(PithosStatus.HasConflicts);
\r
910 StatusNotification.NotifyConflicts(filesToDelete,
\r
912 "{0} local files are missing from Pithos, possibly because they were deleted",
\r
913 filesToDelete.Count));
\r
914 StatusNotification.NotifyForFiles(filesToDelete, String.Format("{0} files were deleted", filesToDelete.Count),
\r
918 private void ReportConflictForMismatch(string localFilePath)
\r
920 if (String.IsNullOrWhiteSpace(localFilePath))
\r
921 throw new ArgumentNullException("localFilePath");
\r
922 Contract.EndContractBlock();
\r
924 StatusKeeper.SetFileState(localFilePath, FileStatus.Conflict, FileOverlayStatus.Conflict, "File changed at the server");
\r
925 UpdateStatus(PithosStatus.HasConflicts);
\r
926 var message = String.Format("Conflict detected for file {0}", localFilePath);
\r
928 StatusNotification.NotifyChange(message, TraceLevel.Warning);
\r
934 /// Creates a Sync action for each changed server file
\r
936 /// <param name="accountInfo"></param>
\r
937 /// <param name="changes"></param>
\r
938 /// <returns></returns>
\r
939 private IEnumerable<CloudAction> ChangesToActions(AccountInfo accountInfo, IEnumerable<ObjectInfo> changes)
\r
941 if (changes == null)
\r
942 throw new ArgumentNullException();
\r
943 Contract.EndContractBlock();
\r
944 var fileAgent = FileAgent.GetFileAgent(accountInfo);
\r
946 //In order to avoid multiple iterations over the files, we iterate only once
\r
947 //over the remote files
\r
948 foreach (var objectInfo in changes)
\r
950 var relativePath = objectInfo.RelativeUrlToFilePath(accountInfo.UserName);
\r
951 //If a directory object already exists, we may need to sync it
\r
952 if (fileAgent.Exists(relativePath))
\r
954 var localFile = fileAgent.GetFileSystemInfo(relativePath);
\r
955 //We don't need to sync directories
\r
956 if (objectInfo.IsDirectory && localFile is DirectoryInfo)
\r
958 using (new SessionScope(FlushAction.Never))
\r
960 var state = StatusKeeper.GetStateByFilePath(localFile.FullName);
\r
961 _lastSeen[localFile.FullName] = DateTime.Now;
\r
962 //Common files should be checked on a per-case basis to detect differences, which is newer
\r
964 yield return new CloudAction(accountInfo, CloudActionType.MustSynch,
\r
965 localFile, objectInfo, state, accountInfo.BlockSize,
\r
966 accountInfo.BlockHash,"Poll Changes");
\r
971 //Remote files should be downloaded
\r
972 yield return new CloudDownloadAction(accountInfo, objectInfo,"Poll Changes");
\r
978 /// Creates a Local Move action for each moved server file
\r
980 /// <param name="accountInfo"></param>
\r
981 /// <param name="moves"></param>
\r
982 /// <returns></returns>
\r
983 private IEnumerable<CloudAction> MovesToActions(AccountInfo accountInfo, IEnumerable<ObjectInfo> moves)
\r
986 throw new ArgumentNullException();
\r
987 Contract.EndContractBlock();
\r
988 var fileAgent = FileAgent.GetFileAgent(accountInfo);
\r
990 //In order to avoid multiple iterations over the files, we iterate only once
\r
991 //over the remote files
\r
992 foreach (var objectInfo in moves)
\r
994 var previousRelativepath = objectInfo.Previous.RelativeUrlToFilePath(accountInfo.UserName);
\r
995 //If the previous file already exists, we can execute a Move operation
\r
996 if (fileAgent.Exists(previousRelativepath))
\r
998 var previousFile = fileAgent.GetFileSystemInfo(previousRelativepath);
\r
999 using (new SessionScope(FlushAction.Never))
\r
1001 var state = StatusKeeper.GetStateByFilePath(previousFile.FullName);
\r
1002 _lastSeen[previousFile.FullName] = DateTime.Now;
\r
1004 //For each moved object we need to move both the local file and update
\r
1005 yield return new CloudAction(accountInfo, CloudActionType.RenameLocal,
\r
1006 previousFile, objectInfo, state, accountInfo.BlockSize,
\r
1007 accountInfo.BlockHash,"Poll Moves");
\r
1008 //For modified files, we need to download the changes as well
\r
1009 if (objectInfo.X_Object_Hash != objectInfo.PreviousHash)
\r
1010 yield return new CloudDownloadAction(accountInfo,objectInfo, "Poll Moves");
\r
1013 //If the previous file does not exist, we need to download it in the new location
\r
1016 //Remote files should be downloaded
\r
1017 yield return new CloudDownloadAction(accountInfo, objectInfo, "Poll Moves");
\r
1024 /// Creates a download action for each new server file
\r
1026 /// <param name="accountInfo"></param>
\r
1027 /// <param name="creates"></param>
\r
1028 /// <returns></returns>
\r
1029 private IEnumerable<CloudAction> CreatesToActions(AccountInfo accountInfo, IEnumerable<ObjectInfo> creates)
\r
1031 if (creates == null)
\r
1032 throw new ArgumentNullException();
\r
1033 Contract.EndContractBlock();
\r
1034 var fileAgent = FileAgent.GetFileAgent(accountInfo);
\r
1036 //In order to avoid multiple iterations over the files, we iterate only once
\r
1037 //over the remote files
\r
1038 foreach (var objectInfo in creates)
\r
1040 if (Log.IsDebugEnabled)
\r
1041 Log.DebugFormat("[NEW INFO] {0}",objectInfo.Uri);
\r
1043 var relativePath = objectInfo.RelativeUrlToFilePath(accountInfo.UserName);
\r
1045 //If the object already exists, we should check before uploading or downloading
\r
1046 if (fileAgent.Exists(relativePath))
\r
1048 var localFile= fileAgent.GetFileSystemInfo(relativePath);
\r
1049 var state = StatusKeeper.GetStateByFilePath(localFile.WithProperCapitalization().FullName);
\r
1050 yield return new CloudAction(accountInfo, CloudActionType.MustSynch,
\r
1051 localFile, objectInfo, state, accountInfo.BlockSize,
\r
1052 accountInfo.BlockHash,"Poll Creates");
\r
1056 //Remote files should be downloaded
\r
1057 yield return new CloudDownloadAction(accountInfo, objectInfo,"Poll Creates");
\r
1064 /// Notify the UI to update the visual status
\r
1066 /// <param name="status"></param>
\r
1067 private void UpdateStatus(PithosStatus status)
\r
1071 StatusNotification.SetPithosStatus(status);
\r
1072 //StatusNotification.Notify(new Notification());
\r
1074 catch (Exception exc)
\r
1076 //Failure is not critical, just log it
\r
1077 Log.Warn("Error while updating status", exc);
\r
1081 private static void CreateContainerFolders(AccountInfo accountInfo, IEnumerable<ContainerInfo> containers)
\r
1083 var containerPaths = from container in containers
\r
1084 let containerPath = Path.Combine(accountInfo.AccountPath, container.Name)
\r
1085 where container.Name != FolderConstants.TrashContainer && !Directory.Exists(containerPath)
\r
1086 select containerPath;
\r
1088 foreach (var path in containerPaths)
\r
1090 Directory.CreateDirectory(path);
\r
1094 public void AddAccount(AccountInfo accountInfo)
\r
1096 //Avoid adding a duplicate accountInfo
\r
1097 _accounts.TryAdd(accountInfo.AccountKey, accountInfo);
\r
1100 public void RemoveAccount(AccountInfo accountInfo)
\r
1102 AccountInfo account;
\r
1103 _accounts.TryRemove(accountInfo.AccountKey, out account);
\r
1105 SnapshotDifferencer differencer;
\r
1106 _differencer.Differencers.TryRemove(accountInfo.AccountKey, out differencer);
\r
1109 public void SetSelectivePaths(AccountInfo accountInfo,Uri[] added, Uri[] removed)
\r
1111 AbortRemovedPaths(accountInfo,removed);
\r
1112 //DownloadNewPaths(accountInfo,added);
\r
1116 private void DownloadNewPaths(AccountInfo accountInfo, Uri[] added)
\r
1118 var client = new CloudFilesClient(accountInfo);
\r
1119 foreach (var folderUri in added)
\r
1126 var segmentsCount = folderUri.Segments.Length;
\r
1127 //Is this an account URL?
\r
1128 if (segmentsCount < 3)
\r
1130 //Is this a container or folder URL?
\r
1131 if (segmentsCount == 3)
\r
1133 account = folderUri.Segments[1].TrimEnd('/');
\r
1134 container = folderUri.Segments[2].TrimEnd('/');
\r
1138 account = folderUri.Segments[2].TrimEnd('/');
\r
1139 container = folderUri.Segments[3].TrimEnd('/');
\r
1141 IList<ObjectInfo> items;
\r
1142 if (segmentsCount > 3)
\r
1145 var folder = String.Join("", folderUri.Segments.Splice(4));
\r
1146 items = client.ListObjects(account, container, folder);
\r
1151 items = client.ListObjects(account, container);
\r
1153 var actions = CreatesToActions(accountInfo, items);
\r
1154 foreach (var action in actions)
\r
1156 NetworkAgent.Post(action);
\r
1159 catch (Exception exc)
\r
1161 Log.WarnFormat("Listing of new selective path [{0}] failed with \r\n{1}", folderUri, exc);
\r
1165 //Need to get a listing of each of the URLs, then post them to the NetworkAgent
\r
1166 //CreatesToActions(accountInfo,)
\r
1168 /* NetworkAgent.Post();#1#
\r
1172 private void AbortRemovedPaths(AccountInfo accountInfo, Uri[] removed)
\r
1174 /*this.NetworkAgent.*/
\r