2 /* -----------------------------------------------------------------------
\r
3 * <copyright file="PollAgent.cs" company="GRNet">
\r
5 * Copyright 2011-2012 GRNET S.A. All rights reserved.
\r
7 * Redistribution and use in source and binary forms, with or
\r
8 * without modification, are permitted provided that the following
\r
9 * conditions are met:
\r
11 * 1. Redistributions of source code must retain the above
\r
12 * copyright notice, this list of conditions and the following
\r
15 * 2. Redistributions in binary form must reproduce the above
\r
16 * copyright notice, this list of conditions and the following
\r
17 * disclaimer in the documentation and/or other materials
\r
18 * provided with the distribution.
\r
21 * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
\r
22 * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
\r
23 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
\r
24 * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
\r
25 * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
\r
26 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
\r
27 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
\r
28 * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
\r
29 * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
\r
30 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
\r
31 * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
\r
32 * POSSIBILITY OF SUCH DAMAGE.
\r
34 * The views and conclusions contained in the software and
\r
35 * documentation are those of the authors and should not be
\r
36 * interpreted as representing official policies, either expressed
\r
37 * or implied, of GRNET S.A.
\r
39 * -----------------------------------------------------------------------
\r
43 using System.Collections.Concurrent;
\r
44 using System.ComponentModel.Composition;
\r
45 using System.Diagnostics;
\r
46 using System.Diagnostics.Contracts;
\r
48 using System.Linq.Expressions;
\r
49 using System.Reflection;
\r
50 using System.Security.Cryptography;
\r
51 using System.Threading;
\r
52 using System.Threading.Tasks;
\r
53 using Castle.ActiveRecord;
\r
54 using Pithos.Interfaces;
\r
55 using Pithos.Network;
\r
58 namespace Pithos.Core.Agents
\r
61 using System.Collections.Generic;
\r
64 [DebuggerDisplay("{FilePath} C:{C} L:{L} S:{S}")]
\r
65 public class StateTuple
\r
67 public string FilePath { get; private set; }
\r
69 public string MD5 { get; set; }
\r
73 get { return FileState==null?null:FileState.Checksum; }
\r
76 public string C { get; set; }
\r
80 get { return ObjectInfo == null ? null : ObjectInfo.X_Object_Hash; }
\r
83 private FileSystemInfo _fileInfo;
\r
84 public FileSystemInfo FileInfo
\r
86 get { return _fileInfo; }
\r
90 FilePath = value.FullName;
\r
94 public FileState FileState { get; set; }
\r
95 public ObjectInfo ObjectInfo{ get; set; }
\r
97 public StateTuple() { }
\r
99 public StateTuple(FileSystemInfo info)
\r
109 /// PollAgent periodically polls the server to detect object changes. The agent retrieves a listing of all
\r
110 /// objects and compares it with a previously cached version to detect differences.
\r
111 /// New files are downloaded, missing files are deleted from the local file system and common files are compared
\r
112 /// to determine the appropriate action
\r
115 public class PollAgent
\r
117 private static readonly ILog Log = LogManager.GetLogger(MethodBase.GetCurrentMethod().DeclaringType);
\r
119 [System.ComponentModel.Composition.Import]
\r
120 public IStatusKeeper StatusKeeper { get; set; }
\r
122 [System.ComponentModel.Composition.Import]
\r
123 public IPithosSettings Settings { get; set; }
\r
125 [System.ComponentModel.Composition.Import]
\r
126 public NetworkAgent NetworkAgent { get; set; }
\r
128 [System.ComponentModel.Composition.Import]
\r
129 public Selectives Selectives { get; set; }
\r
131 public IStatusNotification StatusNotification { get; set; }
\r
133 private CancellationTokenSource _currentOperationCancellation = new CancellationTokenSource();
\r
135 public void CancelCurrentOperation()
\r
137 //What does it mean to cancel the current upload/download?
\r
138 //Obviously, the current operation will be cancelled by throwing
\r
139 //a cancellation exception.
\r
141 //The default behavior is to retry any operations that throw.
\r
142 //Obviously this is not what we want in this situation.
\r
143 //The cancelled operation should NOT bea retried.
\r
145 //This can be done by catching the cancellation exception
\r
146 //and avoiding the retry.
\r
149 //Have to reset the cancellation source - it is not possible to reset the source
\r
150 //Have to prevent a case where an operation requests a token from the old source
\r
151 var oldSource = Interlocked.Exchange(ref _currentOperationCancellation, new CancellationTokenSource());
\r
152 oldSource.Cancel();
\r
164 _unPauseEvent.Set();
\r
167 _unPauseEvent.Reset();
\r
172 private bool _firstPoll = true;
\r
174 //The Sync Event signals a manual synchronisation
\r
175 private readonly AsyncManualResetEvent _syncEvent = new AsyncManualResetEvent();
\r
177 private readonly AsyncManualResetEvent _unPauseEvent = new AsyncManualResetEvent(true);
\r
179 private readonly ConcurrentDictionary<string, DateTime> _lastSeen = new ConcurrentDictionary<string, DateTime>();
\r
180 private readonly ConcurrentDictionary<Uri, AccountInfo> _accounts = new ConcurrentDictionary<Uri,AccountInfo>();
\r
184 /// Start a manual synchronization
\r
186 public void SynchNow()
\r
193 /// Remote files are polled periodically. Any changes are processed
\r
195 /// <param name="since"></param>
\r
196 /// <returns></returns>
\r
197 public async Task PollRemoteFiles(DateTime? since = null)
\r
199 if (Log.IsDebugEnabled)
\r
200 Log.DebugFormat("Polling changes after [{0}]",since);
\r
202 Debug.Assert(Thread.CurrentThread.IsBackground, "Polling Ended up in the main thread!");
\r
206 using (ThreadContext.Stacks["Retrieve Remote"].Push("All accounts"))
\r
208 //If this poll fails, we will retry with the same since value
\r
209 var nextSince = since;
\r
212 await _unPauseEvent.WaitAsync();
\r
213 UpdateStatus(PithosStatus.PollSyncing);
\r
215 var tasks = from accountInfo in _accounts.Values
\r
216 select ProcessAccountFiles(accountInfo, since);
\r
218 var nextTimes=await TaskEx.WhenAll(tasks.ToList());
\r
220 _firstPoll = false;
\r
221 //Reschedule the poll with the current timestamp as a "since" value
\r
223 if (nextTimes.Length>0)
\r
224 nextSince = nextTimes.Min();
\r
225 if (Log.IsDebugEnabled)
\r
226 Log.DebugFormat("Next Poll at [{0}]",nextSince);
\r
228 catch (Exception ex)
\r
230 Log.ErrorFormat("Error while processing accounts\r\n{0}", ex);
\r
231 //In case of failure retry with the same "since" value
\r
234 UpdateStatus(PithosStatus.PollComplete);
\r
235 //The multiple try blocks are required because we can't have an await call
\r
236 //inside a finally block
\r
237 //TODO: Find a more elegant solution for reschedulling in the event of an exception
\r
240 //Wait for the polling interval to pass or the Sync event to be signalled
\r
241 nextSince = await WaitForScheduledOrManualPoll(nextSince);
\r
245 //Ensure polling is scheduled even in case of error
\r
246 TaskEx.Run(() => PollRemoteFiles(nextSince));
\r
252 /// Wait for the polling period to expire or a manual sync request
\r
254 /// <param name="since"></param>
\r
255 /// <returns></returns>
\r
256 private async Task<DateTime?> WaitForScheduledOrManualPoll(DateTime? since)
\r
258 var sync = _syncEvent.WaitAsync();
\r
259 var wait = TaskEx.Delay(TimeSpan.FromSeconds(Settings.PollingInterval), NetworkAgent.CancellationToken);
\r
261 var signaledTask = await TaskEx.WhenAny(sync, wait);
\r
263 //Pausing takes precedence over manual sync or awaiting
\r
264 _unPauseEvent.Wait();
\r
266 //Wait for network processing to finish before polling
\r
267 var pauseTask=NetworkAgent.ProceedEvent.WaitAsync();
\r
268 await TaskEx.WhenAll(signaledTask, pauseTask);
\r
270 //If polling is signalled by SynchNow, ignore the since tag
\r
271 if (sync.IsCompleted)
\r
273 //TODO: Must convert to AutoReset
\r
274 _syncEvent.Reset();
\r
280 public async Task<DateTime?> ProcessAccountFiles(AccountInfo accountInfo, DateTime? since = null)
\r
282 if (accountInfo == null)
\r
283 throw new ArgumentNullException("accountInfo");
\r
284 if (String.IsNullOrWhiteSpace(accountInfo.AccountPath))
\r
285 throw new ArgumentException("The AccountInfo.AccountPath is empty", "accountInfo");
\r
286 Contract.EndContractBlock();
\r
289 using (ThreadContext.Stacks["Retrieve Remote"].Push(accountInfo.UserName))
\r
292 await NetworkAgent.GetDeleteAwaiter();
\r
294 Log.Info("Scheduled");
\r
295 var client = new CloudFilesClient(accountInfo);
\r
297 //We don't need to check the trash container
\r
298 var containers = client.ListContainers(accountInfo.UserName)
\r
299 .Where(c=>c.Name!="trash")
\r
303 CreateContainerFolders(accountInfo, containers);
\r
305 //The nextSince time fallback time is the same as the current.
\r
306 //If polling succeeds, the next Since time will be the smallest of the maximum modification times
\r
307 //of the shared and account objects
\r
308 var nextSince = since;
\r
312 //Wait for any deletions to finish
\r
313 await NetworkAgent.GetDeleteAwaiter();
\r
314 //Get the poll time now. We may miss some deletions but it's better to keep a file that was deleted
\r
315 //than delete a file that was created while we were executing the poll
\r
317 //Get the list of server objects changed since the last check
\r
318 //The name of the container is passed as state in order to create a dictionary of tasks in a subsequent step
\r
319 var listObjects = (from container in containers
\r
320 select Task<IList<ObjectInfo>>.Factory.StartNew(_ =>
\r
321 client.ListObjects(accountInfo.UserName, container.Name, since), container.Name)).ToList();
\r
323 var listShared = Task<IList<ObjectInfo>>.Factory.StartNew(_ =>
\r
324 client.ListSharedObjects(since), "shared");
\r
325 listObjects.Add(listShared);
\r
326 var listTasks = await Task.Factory.WhenAll(listObjects.ToArray());
\r
328 using (ThreadContext.Stacks["SCHEDULE"].Push("Process Results"))
\r
330 var dict = listTasks.ToDictionary(t => t.AsyncState);
\r
332 //Get all non-trash objects. Remember, the container name is stored in AsyncState
\r
333 var remoteObjects = (from objectList in listTasks
\r
334 where (string)objectList.AsyncState != "trash"
\r
335 from obj in objectList.Result
\r
336 orderby obj.Bytes ascending
\r
337 select obj).ToList();
\r
339 //Get the latest remote object modification date, only if it is after
\r
340 //the original since date
\r
341 nextSince = GetLatestDateAfter(nextSince, remoteObjects);
\r
343 var sharedObjects = dict["shared"].Result;
\r
344 nextSince = GetLatestDateBefore(nextSince, sharedObjects);
\r
346 //DON'T process trashed files
\r
347 //If some files are deleted and added again to a folder, they will be deleted
\r
348 //even though they are new.
\r
349 //We would have to check file dates and hashes to ensure that a trashed file
\r
350 //can be deleted safely from the local hard drive.
\r
352 //Items with the same name, hash may be both in the container and the trash
\r
353 //Don't delete items that exist in the container
\r
354 var realTrash = from trash in trashObjects
\r
356 !remoteObjects.Any(
\r
357 info => info.Name == trash.Name && info.Hash == trash.Hash)
\r
359 ProcessTrashedFiles(accountInfo, realTrash);
\r
362 var cleanRemotes = (from info in remoteObjects.Union(sharedObjects)
\r
363 let name = info.Name??""
\r
364 where !name.EndsWith(".ignore", StringComparison.InvariantCultureIgnoreCase) &&
\r
365 !name.StartsWith(FolderConstants.CacheFolder + "/",
\r
366 StringComparison.InvariantCultureIgnoreCase)
\r
367 select info).ToList();
\r
370 StatusKeeper.CleanupOrphanStates();
\r
371 StatusKeeper.CleanupStaleStates(accountInfo, cleanRemotes);
\r
373 //var differencer = _differencer.PostSnapshot(accountInfo, cleanRemotes);
\r
375 var filterUris = Selectives.SelectiveUris[accountInfo.AccountKey];
\r
378 //Get the local files here
\r
379 var agent = AgentLocator<FileAgent>.Get(accountInfo.AccountPath);
\r
381 var files = LoadLocalFileTuples(accountInfo);
\r
383 var states = FileState.Queryable.ToList();
\r
385 var infos = (from remote in cleanRemotes
\r
386 let path = remote.RelativeUrlToFilePath(accountInfo.UserName)
\r
387 let info=agent.GetFileSystemInfo(path)
\r
388 select Tuple.Create(info.FullName,remote))
\r
391 var token = _currentOperationCancellation.Token;
\r
393 var tuples = MergeSources(infos, files, states).ToList();
\r
396 foreach (var tuple in tuples)
\r
398 await _unPauseEvent.WaitAsync();
\r
400 //Set the Merkle Hash
\r
401 SetMerkleHash(accountInfo, tuple);
\r
403 SyncSingleItem(accountInfo, tuple, agent, token);
\r
412 MarkSuspectedDeletes(accountInfo, cleanRemotes);
\r
417 Log.Info("[LISTENER] End Processing");
\r
420 catch (Exception ex)
\r
422 Log.ErrorFormat("[FAIL] ListObjects for{0} in ProcessRemoteFiles with {1}", accountInfo.UserName, ex);
\r
426 Log.Info("[LISTENER] Finished");
\r
431 private static void SetMerkleHash(AccountInfo accountInfo, StateTuple tuple)
\r
433 //The Merkle hash for directories is that of an empty buffer
\r
434 if (tuple.FileInfo is DirectoryInfo)
\r
435 tuple.C = MERKLE_EMPTY;
\r
436 else if (tuple.FileState != null && tuple.MD5 == tuple.FileState.ShortHash)
\r
438 //If there is a state whose MD5 matches, load the merkle hash fromthe file state
\r
439 //insteaf of calculating it
\r
440 tuple.C = tuple.FileState.Checksum;
\r
444 tuple.C=Signature.CalculateTreeHash(tuple.FileInfo, accountInfo.BlockSize, accountInfo.BlockHash)
\r
445 .TopHash.ToHashString();
\r
449 private static List<Tuple<FileSystemInfo, string>> LoadLocalFileTuples(AccountInfo accountInfo)
\r
451 using (ThreadContext.Stacks["Account Files Hashing"].Push(accountInfo.UserName))
\r
454 var localInfos = AgentLocator<FileAgent>.Get(accountInfo.AccountPath).EnumerateFileSystemInfos();
\r
455 //Use the queue to retry locked file hashing
\r
456 var fileQueue = new Queue<FileSystemInfo>(localInfos);
\r
457 var hasher = MD5.Create();
\r
459 var results = new List<Tuple<FileSystemInfo, string>>();
\r
461 while (fileQueue.Count > 0)
\r
463 var file = fileQueue.Dequeue();
\r
464 using (ThreadContext.Stacks["File"].Push(file.FullName))
\r
467 Signature.CalculateTreeHash(file, accountInfo.BlockSize,
\r
468 accountInfo.BlockHash).
\r
469 TopHash.ToHashString()
\r
473 //Replace MD5 here, do the calc while syncing individual files
\r
475 if (file is DirectoryInfo)
\r
476 hash = MERKLE_EMPTY;
\r
479 using (var stream = (file as FileInfo).OpenRead())
\r
481 hash = hasher.ComputeHash(stream).ToHashString();
\r
484 results.Add(Tuple.Create(file, hash));
\r
486 catch (IOException exc)
\r
488 Log.WarnFormat("[HASH] File in use, will retry [{0}]", exc);
\r
489 fileQueue.Enqueue(file);
\r
498 private void SyncSingleItem(AccountInfo accountInfo, StateTuple tuple, FileAgent agent, CancellationToken token)
\r
500 Log.DebugFormat("Sync [{0}] C:[{1}] L:[{2}] S:[{3}]",tuple.FilePath,tuple.C,tuple.L,tuple.S);
\r
502 var localFilePath = tuple.FilePath;
\r
503 //Don't use the tuple info, it may have been deleted
\r
504 var localInfo = FileInfoExtensions.FromPath(localFilePath);
\r
506 // Local file unchanged? If both C and L are null, make sure it's because
\r
507 //both the file is missing and the state checksum is not missing
\r
508 if (tuple.C == tuple.L && (localInfo.Exists || tuple.FileState==null))
\r
511 //Server unchanged?
\r
512 if (tuple.S == tuple.L)
\r
514 // No server changes
\r
519 //Different from server
\r
520 if (Selectives.IsSelected(accountInfo, localFilePath))
\r
522 //Does the server file exist?
\r
523 if (tuple.S == null)
\r
525 //Server file doesn't exist
\r
526 //deleteObjectFromLocal()
\r
527 StatusKeeper.SetFileState(localFilePath, FileStatus.Deleted,
\r
528 FileOverlayStatus.Deleted, "");
\r
529 agent.Delete(localFilePath);
\r
530 //updateRecord(Remove C, L)
\r
531 StatusKeeper.ClearFileStatus(localFilePath);
\r
535 //Server file exists
\r
536 //downloadServerObject() // Result: L = S
\r
537 StatusKeeper.SetFileState(localFilePath, FileStatus.Modified,
\r
538 FileOverlayStatus.Modified, "");
\r
539 NetworkAgent.Downloader.DownloadCloudFile(accountInfo,
\r
541 localFilePath, token).Wait(token);
\r
542 //updateRecord( L = S )
\r
543 StatusKeeper.UpdateFileChecksum(localFilePath, tuple.ObjectInfo.ETag,
\r
544 tuple.ObjectInfo.X_Object_Hash);
\r
546 StatusKeeper.SetFileState(localFilePath, FileStatus.Unchanged,
\r
547 FileOverlayStatus.Normal, "");
\r
554 //Local changes found
\r
556 //Server unchanged?
\r
557 if (tuple.S == tuple.L)
\r
559 //The FileAgent selective sync checks for new root folder files
\r
560 if (!agent.Ignore(localFilePath))
\r
562 if ((tuple.C == null || !localInfo.Exists) && tuple.ObjectInfo != null)
\r
564 //deleteObjectFromServer()
\r
565 DeleteCloudFile(accountInfo, tuple);
\r
566 //updateRecord( Remove L, S)
\r
570 //uploadLocalObject() // Result: S = C, L = S
\r
571 var isUnselected = agent.IsUnselectedRootFolder(tuple.FilePath);
\r
573 //Debug.Assert(tuple.FileState !=null);
\r
574 var action = new CloudUploadAction(accountInfo, localInfo, tuple.FileState,
\r
575 accountInfo.BlockSize, accountInfo.BlockHash,
\r
576 "Poll", isUnselected);
\r
577 NetworkAgent.Uploader.UploadCloudFile(action, token).Wait(token);
\r
580 //updateRecord( S = C )
\r
581 StatusKeeper.SetFileState(localFilePath, FileStatus.Unchanged,
\r
582 FileOverlayStatus.Normal, "");
\r
585 ProcessChildren(accountInfo, tuple, agent, token);
\r
592 if (Selectives.IsSelected(accountInfo, localFilePath))
\r
594 if (tuple.C == tuple.S)
\r
596 // (Identical Changes) Result: L = S
\r
598 StatusKeeper.UpdateFileChecksum(localFilePath, tuple.ObjectInfo.ETag,
\r
599 tuple.ObjectInfo.X_Object_Hash);
\r
600 StatusKeeper.SetFileState(localFilePath, FileStatus.Unchanged,
\r
601 FileOverlayStatus.Normal, "");
\r
605 if ((tuple.C == null || !localInfo.Exists) && tuple.ObjectInfo != null )
\r
607 //deleteObjectFromServer()
\r
608 DeleteCloudFile(accountInfo, tuple);
\r
609 //updateRecord(Remove L, S)
\r
613 ReportConflictForMismatch(localFilePath);
\r
614 //identifyAsConflict() // Manual action required
\r
622 private void DeleteCloudFile(AccountInfo accountInfo, StateTuple tuple)
\r
624 StatusKeeper.SetFileState(tuple.FilePath, FileStatus.Deleted,
\r
625 FileOverlayStatus.Deleted, "");
\r
626 NetworkAgent.DeleteAgent.DeleteCloudFile(accountInfo, tuple.ObjectInfo);
\r
627 StatusKeeper.ClearFileStatus(tuple.FilePath);
\r
630 private void ProcessChildren(AccountInfo accountInfo, StateTuple tuple, FileAgent agent, CancellationToken token)
\r
633 var dirInfo = tuple.FileInfo as DirectoryInfo;
\r
634 var folderTuples = from folder in dirInfo.EnumerateDirectories("*", SearchOption.AllDirectories)
\r
635 select new StateTuple(folder);
\r
636 var fileTuples = from file in dirInfo.EnumerateFiles("*", SearchOption.AllDirectories)
\r
637 select new StateTuple(file);
\r
639 //Process folders first, to ensure folders appear on the sever as soon as possible
\r
640 folderTuples.ApplyAction(t => SyncSingleItem(accountInfo, t, agent, token));
\r
642 fileTuples.ApplyAction(t => SyncSingleItem(accountInfo, t, agent, token));
\r
645 private static IEnumerable<StateTuple> MergeSources(
\r
646 IEnumerable<Tuple<string, ObjectInfo>> infos,
\r
647 IEnumerable<Tuple<FileSystemInfo, string>> files,
\r
648 IEnumerable<FileState> states)
\r
650 var dct = new Dictionary<string, StateTuple>();
\r
651 foreach (var file in files)
\r
653 var fsInfo = file.Item1;
\r
654 var fileHash = fsInfo is DirectoryInfo? MERKLE_EMPTY:file.Item2;
\r
656 dct[fsInfo.FullName] = new StateTuple {FileInfo = fsInfo, MD5 = fileHash};
\r
658 foreach (var state in states)
\r
660 StateTuple hashTuple;
\r
661 if (dct.TryGetValue(state.FilePath, out hashTuple))
\r
663 hashTuple.FileState = state;
\r
667 var fsInfo = FileInfoExtensions.FromPath(state.FilePath);
\r
668 dct[state.FilePath] = new StateTuple {FileInfo = fsInfo, FileState = state};
\r
671 foreach (var info in infos)
\r
673 StateTuple hashTuple;
\r
674 var filePath = info.Item1;
\r
675 var objectInfo = info.Item2;
\r
676 if (dct.TryGetValue(filePath, out hashTuple))
\r
678 hashTuple.ObjectInfo = objectInfo;
\r
682 var fsInfo = FileInfoExtensions.FromPath(filePath);
\r
683 dct[filePath] = new StateTuple {FileInfo = fsInfo, ObjectInfo = objectInfo};
\r
690 /// Returns the latest LastModified date from the list of objects, but only if it is before
\r
691 /// than the threshold value
\r
693 /// <param name="threshold"></param>
\r
694 /// <param name="cloudObjects"></param>
\r
695 /// <returns></returns>
\r
696 private static DateTime? GetLatestDateBefore(DateTime? threshold, IList<ObjectInfo> cloudObjects)
\r
698 DateTime? maxDate = null;
\r
699 if (cloudObjects!=null && cloudObjects.Count > 0)
\r
700 maxDate = cloudObjects.Max(obj => obj.Last_Modified);
\r
701 if (maxDate == null || maxDate == DateTime.MinValue)
\r
703 if (threshold == null || threshold == DateTime.MinValue || threshold > maxDate)
\r
709 /// Returns the latest LastModified date from the list of objects, but only if it is after
\r
710 /// the threshold value
\r
712 /// <param name="threshold"></param>
\r
713 /// <param name="cloudObjects"></param>
\r
714 /// <returns></returns>
\r
715 private static DateTime? GetLatestDateAfter(DateTime? threshold, IList<ObjectInfo> cloudObjects)
\r
717 DateTime? maxDate = null;
\r
718 if (cloudObjects!=null && cloudObjects.Count > 0)
\r
719 maxDate = cloudObjects.Max(obj => obj.Last_Modified);
\r
720 if (maxDate == null || maxDate == DateTime.MinValue)
\r
722 if (threshold == null || threshold == DateTime.MinValue || threshold < maxDate)
\r
727 //readonly AccountsDifferencer _differencer = new AccountsDifferencer();
\r
728 private Dictionary<Uri, List<Uri>> _selectiveUris = new Dictionary<Uri, List<Uri>>();
\r
729 private bool _pause;
\r
730 private static string MERKLE_EMPTY = "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855";
\r
733 /// Deletes local files that are not found in the list of cloud files
\r
735 /// <param name="accountInfo"></param>
\r
736 /// <param name="cloudFiles"></param>
\r
737 private void ProcessDeletedFiles(AccountInfo accountInfo, IEnumerable<ObjectInfo> cloudFiles)
\r
739 if (accountInfo == null)
\r
740 throw new ArgumentNullException("accountInfo");
\r
741 if (String.IsNullOrWhiteSpace(accountInfo.AccountPath))
\r
742 throw new ArgumentException("The AccountInfo.AccountPath is empty", "accountInfo");
\r
743 if (cloudFiles == null)
\r
744 throw new ArgumentNullException("cloudFiles");
\r
745 Contract.EndContractBlock();
\r
747 var deletedFiles = new List<FileSystemInfo>();
\r
748 foreach (var objectInfo in cloudFiles)
\r
750 if (Log.IsDebugEnabled)
\r
751 Log.DebugFormat("Handle deleted [{0}]", objectInfo.Uri);
\r
752 var relativePath = objectInfo.RelativeUrlToFilePath(accountInfo.UserName);
\r
753 var item = FileAgent.GetFileAgent(accountInfo).GetFileSystemInfo(relativePath);
\r
754 if (Log.IsDebugEnabled)
\r
755 Log.DebugFormat("Will delete [{0}] for [{1}]", item.FullName, objectInfo.Uri);
\r
758 if ((item.Attributes & FileAttributes.ReadOnly) == FileAttributes.ReadOnly)
\r
760 item.Attributes = item.Attributes & ~FileAttributes.ReadOnly;
\r
765 Log.DebugFormat("Deleting {0}", item.FullName);
\r
767 var directory = item as DirectoryInfo;
\r
768 if (directory != null)
\r
769 directory.Delete(true);
\r
772 Log.DebugFormat("Deleted [{0}] for [{1}]", item.FullName, objectInfo.Uri);
\r
774 _lastSeen.TryRemove(item.FullName, out lastDate);
\r
775 deletedFiles.Add(item);
\r
777 StatusKeeper.SetFileState(item.FullName, FileStatus.Deleted, FileOverlayStatus.Deleted, "File Deleted");
\r
779 Log.InfoFormat("[{0}] files were deleted", deletedFiles.Count);
\r
780 StatusNotification.NotifyForFiles(deletedFiles, String.Format("{0} files were deleted", deletedFiles.Count),
\r
785 private void MarkSuspectedDeletes(AccountInfo accountInfo, IEnumerable<ObjectInfo> cloudFiles)
\r
787 //Only consider files that are not being modified, ie they are in the Unchanged state
\r
788 var deleteCandidates = FileState.Queryable.Where(state =>
\r
789 state.FilePath.StartsWith(accountInfo.AccountPath)
\r
790 && state.FileStatus == FileStatus.Unchanged).ToList();
\r
793 //TODO: filesToDelete must take into account the Others container
\r
794 var filesToDelete = (from deleteCandidate in deleteCandidates
\r
795 let localFile = FileInfoExtensions.FromPath(deleteCandidate.FilePath)
\r
796 let relativeFilePath = localFile.AsRelativeTo(accountInfo.AccountPath)
\r
798 !cloudFiles.Any(r => r.RelativeUrlToFilePath(accountInfo.UserName) == relativeFilePath)
\r
799 select localFile).ToList();
\r
802 //Set the status of missing files to Conflict
\r
803 foreach (var item in filesToDelete)
\r
805 //Try to acquire a gate on the file, to take into account files that have been dequeued
\r
806 //and are being processed
\r
807 using (var gate = NetworkGate.Acquire(item.FullName, NetworkOperation.Deleting))
\r
811 StatusKeeper.SetFileState(item.FullName, FileStatus.Conflict, FileOverlayStatus.Deleted,
\r
812 "Local file missing from server");
\r
815 UpdateStatus(PithosStatus.HasConflicts);
\r
816 StatusNotification.NotifyConflicts(filesToDelete,
\r
818 "{0} local files are missing from Pithos, possibly because they were deleted",
\r
819 filesToDelete.Count));
\r
820 StatusNotification.NotifyForFiles(filesToDelete, String.Format("{0} files were deleted", filesToDelete.Count),
\r
824 private void ReportConflictForMismatch(string localFilePath)
\r
826 if (String.IsNullOrWhiteSpace(localFilePath))
\r
827 throw new ArgumentNullException("localFilePath");
\r
828 Contract.EndContractBlock();
\r
830 StatusKeeper.SetFileState(localFilePath, FileStatus.Conflict, FileOverlayStatus.Conflict, "File changed at the server");
\r
831 UpdateStatus(PithosStatus.HasConflicts);
\r
832 var message = String.Format("Conflict detected for file {0}", localFilePath);
\r
834 StatusNotification.NotifyChange(message, TraceLevel.Warning);
\r
840 /// Creates a Sync action for each changed server file
\r
842 /// <param name="accountInfo"></param>
\r
843 /// <param name="changes"></param>
\r
844 /// <returns></returns>
\r
845 private IEnumerable<CloudAction> ChangesToActions(AccountInfo accountInfo, IEnumerable<ObjectInfo> changes)
\r
847 if (changes == null)
\r
848 throw new ArgumentNullException();
\r
849 Contract.EndContractBlock();
\r
850 var fileAgent = FileAgent.GetFileAgent(accountInfo);
\r
852 //In order to avoid multiple iterations over the files, we iterate only once
\r
853 //over the remote files
\r
854 foreach (var objectInfo in changes)
\r
856 var relativePath = objectInfo.RelativeUrlToFilePath(accountInfo.UserName);
\r
857 //If a directory object already exists, we may need to sync it
\r
858 if (fileAgent.Exists(relativePath))
\r
860 var localFile = fileAgent.GetFileSystemInfo(relativePath);
\r
861 //We don't need to sync directories
\r
862 if (objectInfo.IsDirectory && localFile is DirectoryInfo)
\r
864 using (new SessionScope(FlushAction.Never))
\r
866 var state = StatusKeeper.GetStateByFilePath(localFile.FullName);
\r
867 _lastSeen[localFile.FullName] = DateTime.Now;
\r
868 //Common files should be checked on a per-case basis to detect differences, which is newer
\r
870 yield return new CloudAction(accountInfo, CloudActionType.MustSynch,
\r
871 localFile, objectInfo, state, accountInfo.BlockSize,
\r
872 accountInfo.BlockHash,"Poll Changes");
\r
877 //Remote files should be downloaded
\r
878 yield return new CloudDownloadAction(accountInfo, objectInfo,"Poll Changes");
\r
884 /// Creates a Local Move action for each moved server file
\r
886 /// <param name="accountInfo"></param>
\r
887 /// <param name="moves"></param>
\r
888 /// <returns></returns>
\r
889 private IEnumerable<CloudAction> MovesToActions(AccountInfo accountInfo, IEnumerable<ObjectInfo> moves)
\r
892 throw new ArgumentNullException();
\r
893 Contract.EndContractBlock();
\r
894 var fileAgent = FileAgent.GetFileAgent(accountInfo);
\r
896 //In order to avoid multiple iterations over the files, we iterate only once
\r
897 //over the remote files
\r
898 foreach (var objectInfo in moves)
\r
900 var previousRelativepath = objectInfo.Previous.RelativeUrlToFilePath(accountInfo.UserName);
\r
901 //If the previous file already exists, we can execute a Move operation
\r
902 if (fileAgent.Exists(previousRelativepath))
\r
904 var previousFile = fileAgent.GetFileSystemInfo(previousRelativepath);
\r
905 using (new SessionScope(FlushAction.Never))
\r
907 var state = StatusKeeper.GetStateByFilePath(previousFile.FullName);
\r
908 _lastSeen[previousFile.FullName] = DateTime.Now;
\r
910 //For each moved object we need to move both the local file and update
\r
911 yield return new CloudAction(accountInfo, CloudActionType.RenameLocal,
\r
912 previousFile, objectInfo, state, accountInfo.BlockSize,
\r
913 accountInfo.BlockHash,"Poll Moves");
\r
914 //For modified files, we need to download the changes as well
\r
915 if (objectInfo.X_Object_Hash != objectInfo.PreviousHash)
\r
916 yield return new CloudDownloadAction(accountInfo,objectInfo, "Poll Moves");
\r
919 //If the previous file does not exist, we need to download it in the new location
\r
922 //Remote files should be downloaded
\r
923 yield return new CloudDownloadAction(accountInfo, objectInfo, "Poll Moves");
\r
930 /// Creates a download action for each new server file
\r
932 /// <param name="accountInfo"></param>
\r
933 /// <param name="creates"></param>
\r
934 /// <returns></returns>
\r
935 private IEnumerable<CloudAction> CreatesToActions(AccountInfo accountInfo, IEnumerable<ObjectInfo> creates)
\r
937 if (creates == null)
\r
938 throw new ArgumentNullException();
\r
939 Contract.EndContractBlock();
\r
940 var fileAgent = FileAgent.GetFileAgent(accountInfo);
\r
942 //In order to avoid multiple iterations over the files, we iterate only once
\r
943 //over the remote files
\r
944 foreach (var objectInfo in creates)
\r
946 if (Log.IsDebugEnabled)
\r
947 Log.DebugFormat("[NEW INFO] {0}",objectInfo.Uri);
\r
949 var relativePath = objectInfo.RelativeUrlToFilePath(accountInfo.UserName);
\r
951 //If the object already exists, we should check before uploading or downloading
\r
952 if (fileAgent.Exists(relativePath))
\r
954 var localFile= fileAgent.GetFileSystemInfo(relativePath);
\r
955 var state = StatusKeeper.GetStateByFilePath(localFile.WithProperCapitalization().FullName);
\r
956 yield return new CloudAction(accountInfo, CloudActionType.MustSynch,
\r
957 localFile, objectInfo, state, accountInfo.BlockSize,
\r
958 accountInfo.BlockHash,"Poll Creates");
\r
962 //Remote files should be downloaded
\r
963 yield return new CloudDownloadAction(accountInfo, objectInfo,"Poll Creates");
\r
970 /// Notify the UI to update the visual status
\r
972 /// <param name="status"></param>
\r
973 private void UpdateStatus(PithosStatus status)
\r
977 StatusNotification.SetPithosStatus(status);
\r
978 //StatusNotification.Notify(new Notification());
\r
980 catch (Exception exc)
\r
982 //Failure is not critical, just log it
\r
983 Log.Warn("Error while updating status", exc);
\r
987 private static void CreateContainerFolders(AccountInfo accountInfo, IEnumerable<ContainerInfo> containers)
\r
989 var containerPaths = from container in containers
\r
990 let containerPath = Path.Combine(accountInfo.AccountPath, container.Name)
\r
991 where container.Name != FolderConstants.TrashContainer && !Directory.Exists(containerPath)
\r
992 select containerPath;
\r
994 foreach (var path in containerPaths)
\r
996 Directory.CreateDirectory(path);
\r
1000 public void AddAccount(AccountInfo accountInfo)
\r
1002 //Avoid adding a duplicate accountInfo
\r
1003 _accounts.TryAdd(accountInfo.AccountKey, accountInfo);
\r
1006 public void RemoveAccount(AccountInfo accountInfo)
\r
1008 AccountInfo account;
\r
1009 _accounts.TryRemove(accountInfo.AccountKey, out account);
\r
1011 SnapshotDifferencer differencer;
\r
1012 _differencer.Differencers.TryRemove(accountInfo.AccountKey, out differencer);
\r
1016 public void SetSelectivePaths(AccountInfo accountInfo,Uri[] added, Uri[] removed)
\r
1018 AbortRemovedPaths(accountInfo,removed);
\r
1019 DownloadNewPaths(accountInfo,added);
\r
1022 private void DownloadNewPaths(AccountInfo accountInfo, Uri[] added)
\r
1024 var client = new CloudFilesClient(accountInfo);
\r
1025 foreach (var folderUri in added)
\r
1032 var segmentsCount = folderUri.Segments.Length;
\r
1033 //Is this an account URL?
\r
1034 if (segmentsCount < 3)
\r
1036 //Is this a container or folder URL?
\r
1037 if (segmentsCount == 3)
\r
1039 account = folderUri.Segments[1].TrimEnd('/');
\r
1040 container = folderUri.Segments[2].TrimEnd('/');
\r
1044 account = folderUri.Segments[2].TrimEnd('/');
\r
1045 container = folderUri.Segments[3].TrimEnd('/');
\r
1047 IList<ObjectInfo> items;
\r
1048 if (segmentsCount > 3)
\r
1051 var folder = String.Join("", folderUri.Segments.Splice(4));
\r
1052 items = client.ListObjects(account, container, folder);
\r
1057 items = client.ListObjects(account, container);
\r
1059 var actions = CreatesToActions(accountInfo, items);
\r
1060 foreach (var action in actions)
\r
1062 NetworkAgent.Post(action);
\r
1065 catch (Exception exc)
\r
1067 Log.WarnFormat("Listing of new selective path [{0}] failed with \r\n{1}", folderUri, exc);
\r
1071 //Need to get a listing of each of the URLs, then post them to the NetworkAgent
\r
1072 //CreatesToActions(accountInfo,)
\r
1074 /* NetworkAgent.Post();*/
\r
1077 private void AbortRemovedPaths(AccountInfo accountInfo, Uri[] removed)
\r
1079 /*this.NetworkAgent.*/
\r