2 /* -----------------------------------------------------------------------
\r
3 * <copyright file="PollAgent.cs" company="GRNet">
\r
5 * Copyright 2011-2012 GRNET S.A. All rights reserved.
\r
7 * Redistribution and use in source and binary forms, with or
\r
8 * without modification, are permitted provided that the following
\r
9 * conditions are met:
\r
11 * 1. Redistributions of source code must retain the above
\r
12 * copyright notice, this list of conditions and the following
\r
15 * 2. Redistributions in binary form must reproduce the above
\r
16 * copyright notice, this list of conditions and the following
\r
17 * disclaimer in the documentation and/or other materials
\r
18 * provided with the distribution.
\r
21 * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
\r
22 * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
\r
23 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
\r
24 * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
\r
25 * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
\r
26 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
\r
27 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
\r
28 * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
\r
29 * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
\r
30 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
\r
31 * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
\r
32 * POSSIBILITY OF SUCH DAMAGE.
\r
34 * The views and conclusions contained in the software and
\r
35 * documentation are those of the authors and should not be
\r
36 * interpreted as representing official policies, either expressed
\r
37 * or implied, of GRNET S.A.
\r
39 * -----------------------------------------------------------------------
\r
43 using System.Collections.Concurrent;
\r
44 using System.ComponentModel.Composition;
\r
45 using System.Diagnostics;
\r
46 using System.Diagnostics.Contracts;
\r
48 using System.Linq.Expressions;
\r
49 using System.Reflection;
\r
50 using System.Security.Cryptography;
\r
51 using System.Threading;
\r
52 using System.Threading.Tasks;
\r
53 using System.Threading.Tasks.Dataflow;
\r
54 using Castle.ActiveRecord;
\r
55 using Pithos.Interfaces;
\r
56 using Pithos.Network;
\r
59 namespace Pithos.Core.Agents
\r
62 using System.Collections.Generic;
\r
65 /*public class PollRequest
\r
67 public DateTime? Since { get; set; }
\r
68 public IEnumerable<string> Batch { get; set; }
\r
73 /// PollAgent periodically polls the server to detect object changes. The agent retrieves a listing of all
\r
74 /// objects and compares it with a previously cached version to detect differences.
\r
75 /// New files are downloaded, missing files are deleted from the local file system and common files are compared
\r
76 /// to determine the appropriate action
\r
79 public class PollAgent
\r
81 private static readonly ILog Log = LogManager.GetLogger(MethodBase.GetCurrentMethod().DeclaringType);
\r
83 [System.ComponentModel.Composition.Import]
\r
84 public IStatusKeeper StatusKeeper { get; set; }
\r
86 [System.ComponentModel.Composition.Import]
\r
87 public IPithosSettings Settings { get; set; }
\r
89 [System.ComponentModel.Composition.Import]
\r
90 public NetworkAgent NetworkAgent { get; set; }
\r
92 [System.ComponentModel.Composition.Import]
\r
93 public Selectives Selectives { get; set; }
\r
95 public IStatusNotification StatusNotification { get; set; }
\r
97 private CancellationTokenSource _currentOperationCancellation = new CancellationTokenSource();
\r
99 public void CancelCurrentOperation()
\r
101 //What does it mean to cancel the current upload/download?
\r
102 //Obviously, the current operation will be cancelled by throwing
\r
103 //a cancellation exception.
\r
105 //The default behavior is to retry any operations that throw.
\r
106 //Obviously this is not what we want in this situation.
\r
107 //The cancelled operation should NOT bea retried.
\r
109 //This can be done by catching the cancellation exception
\r
110 //and avoiding the retry.
\r
113 //Have to reset the cancellation source - it is not possible to reset the source
\r
114 //Have to prevent a case where an operation requests a token from the old source
\r
115 var oldSource = Interlocked.Exchange(ref _currentOperationCancellation, new CancellationTokenSource());
\r
116 oldSource.Cancel();
\r
128 _unPauseEvent.Set();
\r
131 _unPauseEvent.Reset();
\r
136 private bool _firstPoll = true;
\r
138 //The Sync Event signals a manual synchronisation
\r
139 private readonly AsyncManualResetEvent _syncEvent = new AsyncManualResetEvent();
\r
141 private readonly AsyncManualResetEvent _unPauseEvent = new AsyncManualResetEvent(true);
\r
143 private readonly ConcurrentDictionary<string, DateTime> _lastSeen = new ConcurrentDictionary<string, DateTime>();
\r
144 private readonly ConcurrentDictionary<Uri, AccountInfo> _accounts = new ConcurrentDictionary<Uri,AccountInfo>();
\r
146 //private readonly ActionBlock<PollRequest> _pollAction;
\r
148 HashSet<string> _knwonContainers = new HashSet<string>();
\r
152 /// Start a manual synchronization
\r
154 public void SynchNow(IEnumerable<string> paths=null)
\r
156 _batchQueue.Enqueue(paths);
\r
159 //_pollAction.Post(new PollRequest {Batch = paths});
\r
162 readonly ConcurrentQueue<IEnumerable<string>> _batchQueue=new ConcurrentQueue<IEnumerable<string>>();
\r
165 /// Remote files are polled periodically. Any changes are processed
\r
167 /// <param name="since"></param>
\r
168 /// <returns></returns>
\r
169 public void PollRemoteFiles(DateTime? since = null)
\r
171 if (Log.IsDebugEnabled)
\r
172 Log.DebugFormat("Polling changes after [{0}]",since);
\r
174 Debug.Assert(Thread.CurrentThread.IsBackground, "Polling Ended up in the main thread!");
\r
178 using (ThreadContext.Stacks["Retrieve Remote"].Push("All accounts"))
\r
180 //If this poll fails, we will retry with the same since value
\r
181 var nextSince = since;
\r
184 _unPauseEvent.Wait();
\r
185 UpdateStatus(PithosStatus.PollSyncing);
\r
187 var accountBatches=new Dictionary<Uri, IEnumerable<string>>();
\r
188 IEnumerable<string> batch = null;
\r
189 if (_batchQueue.TryDequeue(out batch) && batch != null)
\r
190 foreach (var account in _accounts.Values)
\r
192 var accountBatch = batch.Where(path => path.IsAtOrBelow(account.AccountPath));
\r
193 accountBatches[account.AccountKey] = accountBatch;
\r
196 var tasks = new List<Task<DateTime?>>();
\r
197 foreach(var accountInfo in _accounts.Values)
\r
199 IEnumerable<string> accountBatch ;
\r
200 accountBatches.TryGetValue(accountInfo.AccountKey,out accountBatch);
\r
201 var t=ProcessAccountFiles (accountInfo, accountBatch, since);
\r
205 var nextTimes=TaskEx.WhenAll(tasks.ToList()).Result;
\r
207 _firstPoll = false;
\r
208 //Reschedule the poll with the current timestamp as a "since" value
\r
210 if (nextTimes.Length>0)
\r
211 nextSince = nextTimes.Min();
\r
212 if (Log.IsDebugEnabled)
\r
213 Log.DebugFormat("Next Poll at [{0}]",nextSince);
\r
215 catch (Exception ex)
\r
217 Log.ErrorFormat("Error while processing accounts\r\n{0}", ex);
\r
218 //In case of failure retry with the same "since" value
\r
221 UpdateStatus(PithosStatus.PollComplete);
\r
222 //The multiple try blocks are required because we can't have an await call
\r
223 //inside a finally block
\r
224 //TODO: Find a more elegant solution for reschedulling in the event of an exception
\r
227 //Wait for the polling interval to pass or the Sync event to be signalled
\r
228 nextSince = WaitForScheduledOrManualPoll(nextSince).Result;
\r
232 //Ensure polling is scheduled even in case of error
\r
233 TaskEx.Run(()=>PollRemoteFiles(nextSince));
\r
234 //_pollAction.Post(new PollRequest {Since = nextSince});
\r
240 /// Wait for the polling period to expire or a manual sync request
\r
242 /// <param name="since"></param>
\r
243 /// <returns></returns>
\r
244 private async Task<DateTime?> WaitForScheduledOrManualPoll(DateTime? since)
\r
246 var sync = _syncEvent.WaitAsync();
\r
247 var wait = TaskEx.Delay(TimeSpan.FromSeconds(Settings.PollingInterval));
\r
249 var signaledTask = await TaskEx.WhenAny(sync, wait).ConfigureAwait(false);
\r
251 //Pausing takes precedence over manual sync or awaiting
\r
252 _unPauseEvent.Wait();
\r
254 //Wait for network processing to finish before polling
\r
255 var pauseTask=NetworkAgent.ProceedEvent.WaitAsync();
\r
256 await TaskEx.WhenAll(signaledTask, pauseTask).ConfigureAwait(false);
\r
258 //If polling is signalled by SynchNow, ignore the since tag
\r
259 if (sync.IsCompleted)
\r
261 //TODO: Must convert to AutoReset
\r
262 _syncEvent.Reset();
\r
270 public async Task<DateTime?> ProcessAccountFiles(AccountInfo accountInfo, IEnumerable<string> accountBatch, DateTime? since = null)
\r
272 if (accountInfo == null)
\r
273 throw new ArgumentNullException("accountInfo");
\r
274 if (String.IsNullOrWhiteSpace(accountInfo.AccountPath))
\r
275 throw new ArgumentException("The AccountInfo.AccountPath is empty", "accountInfo");
\r
276 Contract.EndContractBlock();
\r
279 using (ThreadContext.Stacks["Retrieve Remote"].Push(accountInfo.UserName))
\r
282 await NetworkAgent.GetDeleteAwaiter().ConfigureAwait(false);
\r
284 Log.Info("Scheduled");
\r
285 var client = new CloudFilesClient(accountInfo);
\r
287 //We don't need to check the trash container
\r
288 var containers = client.ListContainers(accountInfo.UserName)
\r
289 .Where(c=>c.Name!="trash")
\r
293 CreateContainerFolders(accountInfo, containers);
\r
295 //The nextSince time fallback time is the same as the current.
\r
296 //If polling succeeds, the next Since time will be the smallest of the maximum modification times
\r
297 //of the shared and account objects
\r
298 var nextSince = since;
\r
302 //Wait for any deletions to finish
\r
303 await NetworkAgent.GetDeleteAwaiter().ConfigureAwait(false);
\r
304 //Get the poll time now. We may miss some deletions but it's better to keep a file that was deleted
\r
305 //than delete a file that was created while we were executing the poll
\r
307 //Get the list of server objects changed since the last check
\r
308 //The name of the container is passed as state in order to create a dictionary of tasks in a subsequent step
\r
309 var listObjects = (from container in containers
\r
310 select Task<IList<ObjectInfo>>.Factory.StartNew(_ =>
\r
311 client.ListObjects(accountInfo.UserName, container.Name, since), container.Name)).ToList();
\r
313 var listShared = Task<IList<ObjectInfo>>.Factory.StartNew(_ =>
\r
314 client.ListSharedObjects(_knwonContainers,since), "shared");
\r
315 listObjects.Add(listShared);
\r
316 var listTasks = await Task.Factory.WhenAll(listObjects.ToArray()).ConfigureAwait(false);
\r
318 using (ThreadContext.Stacks["SCHEDULE"].Push("Process Results"))
\r
320 var dict = listTasks.ToDictionary(t => t.AsyncState);
\r
322 //Get all non-trash objects. Remember, the container name is stored in AsyncState
\r
323 var remoteObjects = (from objectList in listTasks
\r
324 where (string)objectList.AsyncState != "trash"
\r
325 from obj in objectList.Result
\r
326 orderby obj.Bytes ascending
\r
327 select obj).ToList();
\r
329 //Get the latest remote object modification date, only if it is after
\r
330 //the original since date
\r
331 nextSince = GetLatestDateAfter(nextSince, remoteObjects);
\r
333 var sharedObjects = dict["shared"].Result;
\r
335 //DON'T process trashed files
\r
336 //If some files are deleted and added again to a folder, they will be deleted
\r
337 //even though they are new.
\r
338 //We would have to check file dates and hashes to ensure that a trashed file
\r
339 //can be deleted safely from the local hard drive.
\r
341 //Items with the same name, hash may be both in the container and the trash
\r
342 //Don't delete items that exist in the container
\r
343 var realTrash = from trash in trashObjects
\r
345 !remoteObjects.Any(
\r
346 info => info.Name == trash.Name && info.Hash == trash.Hash)
\r
348 ProcessTrashedFiles(accountInfo, realTrash);
\r
351 var cleanRemotes = (from info in remoteObjects.Union(sharedObjects)
\r
352 let name = info.Name??""
\r
353 where !name.EndsWith(".ignore", StringComparison.InvariantCultureIgnoreCase) &&
\r
354 !name.StartsWith(FolderConstants.CacheFolder + "/",
\r
355 StringComparison.InvariantCultureIgnoreCase)
\r
356 select info).ToList();
\r
359 StatusKeeper.CleanupOrphanStates();
\r
361 var differencer = _differencer.PostSnapshot(accountInfo, cleanRemotes);
\r
362 var currentRemotes = differencer.Current.ToList();
\r
363 StatusKeeper.CleanupStaleStates(accountInfo, currentRemotes);
\r
365 //var filterUris = Selectives.SelectiveUris[accountInfo.AccountKey];
\r
367 //May have to wait if the FileAgent has asked for a Pause, due to local changes
\r
368 await _unPauseEvent.WaitAsync().ConfigureAwait(false);
\r
370 //Get the local files here
\r
371 var agent = AgentLocator<FileAgent>.Get(accountInfo.AccountPath);
\r
372 var files = await LoadLocalFileTuples(accountInfo, accountBatch);
\r
374 var states = FileState.Queryable.ToList();
\r
376 var infos = (from remote in currentRemotes
\r
377 let path = remote.RelativeUrlToFilePath(accountInfo.UserName)
\r
378 let info=agent.GetFileSystemInfo(path)
\r
379 select Tuple.Create(info.FullName,remote))
\r
382 var token = _currentOperationCancellation.Token;
\r
384 var tuples = MergeSources(infos, files, states).ToList();
\r
386 //Process only the changes in the batch file, if one exists
\r
387 var stateTuples = accountBatch==null?tuples:tuples.Where(t => accountBatch.Contains(t.FilePath));
\r
388 foreach (var tuple in stateTuples)
\r
390 await _unPauseEvent.WaitAsync().ConfigureAwait(false);
\r
392 //Set the Merkle Hash
\r
393 //SetMerkleHash(accountInfo, tuple);
\r
395 await SyncSingleItem(accountInfo, tuple, agent, token).ConfigureAwait(false);
\r
404 MarkSuspectedDeletes(accountInfo, cleanRemotes);
\r
409 Log.Info("[LISTENER] End Processing");
\r
412 catch (Exception ex)
\r
414 Log.ErrorFormat("[FAIL] ListObjects for {0} in ProcessRemoteFiles with {1}", accountInfo.UserName, ex);
\r
418 Log.Info("[LISTENER] Finished");
\r
423 private static void SetMerkleHash(AccountInfo accountInfo, StateTuple tuple)
\r
425 //The Merkle hash for directories is that of an empty buffer
\r
426 if (tuple.FileInfo is DirectoryInfo)
\r
427 tuple.C = MERKLE_EMPTY;
\r
428 else if (tuple.FileState != null && tuple.MD5 == tuple.FileState.ShortHash)
\r
430 //If there is a state whose MD5 matches, load the merkle hash from the file state
\r
431 //insteaf of calculating it
\r
432 tuple.C = tuple.FileState.Checksum;
\r
436 tuple.Merkle = Signature.CalculateTreeHashAsync((FileInfo)tuple.FileInfo, accountInfo.BlockSize, accountInfo.BlockHash,1);
\r
437 //tuple.C=tuple.Merkle.TopHash.ToHashString();
\r
441 private async Task<List<Tuple<FileSystemInfo, string>>> LoadLocalFileTuples(AccountInfo accountInfo,IEnumerable<string> batch )
\r
443 using (ThreadContext.Stacks["Account Files Hashing"].Push(accountInfo.UserName))
\r
445 var batchPaths = (batch==null)?new List<string>():batch.ToList();
\r
446 IEnumerable<FileSystemInfo> localInfos=AgentLocator<FileAgent>.Get(accountInfo.AccountPath)
\r
447 .EnumerateFileSystemInfos();
\r
448 if (batchPaths.Count>0)
\r
449 localInfos= localInfos.Where(fi => batchPaths.Contains(fi.FullName));
\r
451 //Use the queue to retry locked file hashing
\r
452 var fileQueue = new ConcurrentQueue<FileSystemInfo>(localInfos);
\r
455 var results = new List<Tuple<FileSystemInfo, string>>();
\r
457 while (fileQueue.Count > 0)
\r
459 FileSystemInfo file;
\r
460 fileQueue.TryDequeue(out file);
\r
461 using (ThreadContext.Stacks["File"].Push(file.FullName))
\r
465 //Replace MD5 here, do the calc while syncing individual files
\r
467 if (file is DirectoryInfo)
\r
471 //Wait in case the FileAgent has requested a Pause
\r
472 await _unPauseEvent.WaitAsync().ConfigureAwait(false);
\r
474 using (StatusNotification.GetNotifier("Hashing {0}", "", file.Name))
\r
476 hash = ((FileInfo)file).ComputeShortHash(StatusNotification);
\r
480 results.Add(Tuple.Create(file, hash));
\r
482 catch (IOException exc)
\r
484 Log.WarnFormat("[HASH] File in use, will retry [{0}]", exc);
\r
485 fileQueue.Enqueue(file);
\r
486 //If this is the only enqueued file
\r
487 if (fileQueue.Count != 1) continue;
\r
493 //Pause Polling for the specified time
\r
496 await PauseFor(backoff).ConfigureAwait(false);
\r
505 /// Wait and Pause the agent while waiting
\r
507 /// <param name="backoff"></param>
\r
508 /// <returns></returns>
\r
509 private async Task PauseFor(int backoff)
\r
513 await TaskEx.Delay(backoff).ConfigureAwait(false);
\r
517 private async Task SyncSingleItem(AccountInfo accountInfo, StateTuple tuple, FileAgent agent, CancellationToken token)
\r
519 Log.DebugFormat("Sync [{0}] C:[{1}] L:[{2}] S:[{3}]",tuple.FilePath,tuple.C,tuple.L,tuple.S);
\r
521 var localFilePath = tuple.FilePath;
\r
522 //Don't use the tuple info, it may have been deleted
\r
523 var localInfo = FileInfoExtensions.FromPath(localFilePath);
\r
526 var isUnselectedRootFolder = agent.IsUnselectedRootFolder(tuple.FilePath);
\r
528 //Unselected root folders that have not yet been uploaded should be uploaded and added to the
\r
529 //selective folders
\r
531 if (!Selectives.IsSelected(accountInfo, localFilePath) && !(isUnselectedRootFolder && tuple.ObjectInfo==null) )
\r
534 // Local file unchanged? If both C and L are null, make sure it's because
\r
535 //both the file is missing and the state checksum is not missing
\r
536 if (tuple.C == tuple.L /*&& (localInfo.Exists || tuple.FileState == null)*/)
\r
539 //Server unchanged?
\r
540 if (tuple.S == tuple.L)
\r
542 // No server changes
\r
543 //Has the file been renamed on the server?
\r
544 MoveForServerMove(accountInfo, tuple);
\r
548 //Different from server
\r
549 //Does the server file exist?
\r
550 if (tuple.S == null)
\r
552 //Server file doesn't exist
\r
553 //deleteObjectFromLocal()
\r
554 using (StatusNotification.GetNotifier("Deleting local {0}", "Deleted local {0}", Path.GetFileName(localFilePath)))
\r
556 StatusKeeper.SetFileState(localFilePath, FileStatus.Deleted,
\r
557 FileOverlayStatus.Deleted, "");
\r
558 agent.Delete(localFilePath);
\r
559 //updateRecord(Remove C, L)
\r
560 StatusKeeper.ClearFileStatus(localFilePath);
\r
565 //Server file exists
\r
566 //downloadServerObject() // Result: L = S
\r
567 //If the file has moved on the server, move it locally before downloading
\r
568 using (StatusNotification.GetNotifier("Downloading {0}", "Downloaded {0}", Path.GetFileName(localFilePath)))
\r
570 var targetPath = MoveForServerMove(accountInfo, tuple);
\r
572 StatusKeeper.SetFileState(targetPath, FileStatus.Modified,FileOverlayStatus.Modified, "");
\r
574 await NetworkAgent.Downloader.DownloadCloudFile(accountInfo, tuple.ObjectInfo, targetPath, token)
\r
575 .ConfigureAwait(false);
\r
576 //updateRecord( L = S )
\r
577 StatusKeeper.UpdateFileChecksum(targetPath, tuple.ObjectInfo.ETag,
\r
578 tuple.ObjectInfo.X_Object_Hash);
\r
580 StatusKeeper.StoreInfo(targetPath, tuple.ObjectInfo);
\r
584 StatusKeeper.SetFileState(targetPath, FileStatus.Unchanged,
\r
585 FileOverlayStatus.Normal, "");
\r
593 //Local changes found
\r
595 //Server unchanged?
\r
596 if (tuple.S == tuple.L)
\r
598 //The FileAgent selective sync checks for new root folder files
\r
599 if (!agent.Ignore(localFilePath))
\r
601 if ((tuple.C == null || !localInfo.Exists) && tuple.ObjectInfo != null)
\r
603 //deleteObjectFromServer()
\r
604 DeleteCloudFile(accountInfo, tuple);
\r
605 //updateRecord( Remove L, S)
\r
609 //uploadLocalObject() // Result: S = C, L = S
\r
611 //Debug.Assert(tuple.FileState !=null);
\r
612 var action = new CloudUploadAction(accountInfo, localInfo, tuple.FileState,
\r
613 accountInfo.BlockSize, accountInfo.BlockHash,
\r
614 "Poll", isUnselectedRootFolder);
\r
615 using (StatusNotification.GetNotifier("Uploading {0}", "Uploaded {0}", Path.GetFileName(localFilePath)))
\r
617 await NetworkAgent.Uploader.UploadCloudFile(action, token).ConfigureAwait(false);
\r
620 //updateRecord( S = C )
\r
621 //State updated by the uploader
\r
623 if (isUnselectedRootFolder)
\r
625 ProcessChildren(accountInfo, tuple, agent, token);
\r
632 if (tuple.C == tuple.S)
\r
634 // (Identical Changes) Result: L = S
\r
636 //Detect server moves
\r
637 var targetPath = MoveForServerMove(accountInfo, tuple);
\r
638 StatusKeeper.StoreInfo(targetPath, tuple.ObjectInfo);
\r
642 if ((tuple.C == null || !localInfo.Exists) && tuple.ObjectInfo != null)
\r
644 //deleteObjectFromServer()
\r
645 DeleteCloudFile(accountInfo, tuple);
\r
646 //updateRecord(Remove L, S)
\r
648 //If both the local and server files are missing, the state is stale
\r
649 else if (!localInfo.Exists && (tuple.S == null || tuple.ObjectInfo == null))
\r
651 StatusKeeper.ClearFileStatus(localInfo.FullName);
\r
655 ReportConflictForMismatch(localFilePath);
\r
656 //identifyAsConflict() // Manual action required
\r
663 private string MoveForServerMove(AccountInfo accountInfo, StateTuple tuple)
\r
665 if (tuple.ObjectInfo == null)
\r
667 var relativePath = tuple.ObjectInfo.RelativeUrlToFilePath(accountInfo.UserName);
\r
668 var serverPath = Path.Combine(accountInfo.AccountPath, relativePath);
\r
670 //Compare Case Insensitive
\r
671 if (String.Equals(tuple.FilePath ,serverPath,StringComparison.InvariantCultureIgnoreCase)) return serverPath;
\r
673 if (tuple.FileInfo.Exists)
\r
675 using (StatusNotification.GetNotifier("Moving local {0}", "Moved local {0}", Path.GetFileName(tuple.FilePath)))
\r
677 var fi = tuple.FileInfo as FileInfo;
\r
679 fi.MoveTo(serverPath);
\r
680 var di = tuple.FileInfo as DirectoryInfo;
\r
682 di.MoveTo(serverPath);
\r
683 StatusKeeper.StoreInfo(serverPath, tuple.ObjectInfo);
\r
688 Debug.Assert(false, "File does not exist");
\r
693 private void DeleteCloudFile(AccountInfo accountInfo, StateTuple tuple)
\r
695 using (StatusNotification.GetNotifier("Deleting server {0}", "Deleted server {0}", Path.GetFileName(tuple.FilePath)))
\r
698 StatusKeeper.SetFileState(tuple.FilePath, FileStatus.Deleted,
\r
699 FileOverlayStatus.Deleted, "");
\r
700 NetworkAgent.DeleteAgent.DeleteCloudFile(accountInfo, tuple.ObjectInfo);
\r
701 StatusKeeper.ClearFileStatus(tuple.FilePath);
\r
705 private void ProcessChildren(AccountInfo accountInfo, StateTuple tuple, FileAgent agent, CancellationToken token)
\r
708 var dirInfo = tuple.FileInfo as DirectoryInfo;
\r
709 var folderTuples = from folder in dirInfo.EnumerateDirectories("*", SearchOption.AllDirectories)
\r
710 select new StateTuple(folder);
\r
711 var fileTuples = from file in dirInfo.EnumerateFiles("*", SearchOption.AllDirectories)
\r
712 select new StateTuple(file);
\r
714 //Process folders first, to ensure folders appear on the sever as soon as possible
\r
715 folderTuples.ApplyAction(async t =>await SyncSingleItem(accountInfo, t, agent, token).ConfigureAwait(false));
\r
717 fileTuples.ApplyAction(async t => await SyncSingleItem(accountInfo, t, agent, token).ConfigureAwait(false));
\r
720 private static IEnumerable<StateTuple> MergeSources(
\r
721 IEnumerable<Tuple<string, ObjectInfo>> infos,
\r
722 IEnumerable<Tuple<FileSystemInfo, string>> files,
\r
723 IEnumerable<FileState> states)
\r
725 var tuplesByPath = new Dictionary<string, StateTuple>();
\r
726 foreach (var file in files)
\r
728 var fsInfo = file.Item1;
\r
729 var fileHash = fsInfo is DirectoryInfo? MD5_EMPTY:file.Item2;
\r
731 tuplesByPath[fsInfo.FullName] = new StateTuple {FileInfo = fsInfo, C=fileHash,MD5 = fileHash};
\r
733 foreach (var state in states)
\r
735 StateTuple hashTuple;
\r
736 if (tuplesByPath.TryGetValue(state.FilePath, out hashTuple))
\r
738 hashTuple.FileState = state;
\r
742 var fsInfo = FileInfoExtensions.FromPath(state.FilePath);
\r
743 hashTuple = new StateTuple {FileInfo = fsInfo, FileState = state};
\r
744 tuplesByPath[state.FilePath] = hashTuple;
\r
748 var tuplesByID = tuplesByPath.Values
\r
749 .Where(tuple => tuple.FileState != null && tuple.FileState.ObjectID!=null)
\r
750 .ToDictionary(tuple=>tuple.FileState.ObjectID,tuple=>tuple);//new Dictionary<Guid, StateTuple>();
\r
752 foreach (var info in infos)
\r
754 StateTuple hashTuple;
\r
755 var filePath = info.Item1;
\r
756 var objectInfo = info.Item2;
\r
757 var objectID = objectInfo.UUID;
\r
759 if (tuplesByID.TryGetValue(objectID, out hashTuple))
\r
761 hashTuple.ObjectInfo = objectInfo;
\r
763 else if (tuplesByPath.TryGetValue(filePath, out hashTuple))
\r
765 hashTuple.ObjectInfo = objectInfo;
\r
769 var fsInfo = FileInfoExtensions.FromPath(filePath);
\r
770 hashTuple= new StateTuple {FileInfo = fsInfo, ObjectInfo = objectInfo};
\r
771 tuplesByPath[filePath] = hashTuple;
\r
772 tuplesByID[objectInfo.UUID] = hashTuple;
\r
775 Debug.Assert(tuplesByPath.Values.All(t => t.HashesValid()));
\r
776 return tuplesByPath.Values;
\r
780 /// Returns the latest LastModified date from the list of objects, but only if it is before
\r
781 /// than the threshold value
\r
783 /// <param name="threshold"></param>
\r
784 /// <param name="cloudObjects"></param>
\r
785 /// <returns></returns>
\r
786 private static DateTime? GetLatestDateBefore(DateTime? threshold, IList<ObjectInfo> cloudObjects)
\r
788 DateTime? maxDate = null;
\r
789 if (cloudObjects!=null && cloudObjects.Count > 0)
\r
790 maxDate = cloudObjects.Max(obj => obj.Last_Modified);
\r
791 if (maxDate == null || maxDate == DateTime.MinValue)
\r
793 if (threshold == null || threshold == DateTime.MinValue || threshold > maxDate)
\r
799 /// Returns the latest LastModified date from the list of objects, but only if it is after
\r
800 /// the threshold value
\r
802 /// <param name="threshold"></param>
\r
803 /// <param name="cloudObjects"></param>
\r
804 /// <returns></returns>
\r
805 private static DateTime? GetLatestDateAfter(DateTime? threshold, IList<ObjectInfo> cloudObjects)
\r
807 DateTime? maxDate = null;
\r
808 if (cloudObjects!=null && cloudObjects.Count > 0)
\r
809 maxDate = cloudObjects.Max(obj => obj.Last_Modified);
\r
810 if (maxDate == null || maxDate == DateTime.MinValue)
\r
812 if (threshold == null || threshold == DateTime.MinValue || threshold < maxDate)
\r
817 readonly AccountsDifferencer _differencer = new AccountsDifferencer();
\r
818 private bool _pause;
\r
820 const string MERKLE_EMPTY = "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855";
\r
821 const string MD5_EMPTY = "d41d8cd98f00b204e9800998ecf8427e";
\r
824 private void ReportConflictForMismatch(string localFilePath)
\r
826 if (String.IsNullOrWhiteSpace(localFilePath))
\r
827 throw new ArgumentNullException("localFilePath");
\r
828 Contract.EndContractBlock();
\r
830 StatusKeeper.SetFileState(localFilePath, FileStatus.Conflict, FileOverlayStatus.Conflict, "File changed at the server");
\r
831 UpdateStatus(PithosStatus.HasConflicts);
\r
832 var message = String.Format("Conflict detected for file {0}", localFilePath);
\r
834 StatusNotification.NotifyChange(message, TraceLevel.Warning);
\r
839 /// Notify the UI to update the visual status
\r
841 /// <param name="status"></param>
\r
842 private void UpdateStatus(PithosStatus status)
\r
846 StatusNotification.SetPithosStatus(status);
\r
847 //StatusNotification.Notify(new Notification());
\r
849 catch (Exception exc)
\r
851 //Failure is not critical, just log it
\r
852 Log.Warn("Error while updating status", exc);
\r
856 private static void CreateContainerFolders(AccountInfo accountInfo, IEnumerable<ContainerInfo> containers)
\r
858 var containerPaths = from container in containers
\r
859 let containerPath = Path.Combine(accountInfo.AccountPath, container.Name)
\r
860 where container.Name != FolderConstants.TrashContainer && !Directory.Exists(containerPath)
\r
861 select containerPath;
\r
863 foreach (var path in containerPaths)
\r
865 Directory.CreateDirectory(path);
\r
869 public void AddAccount(AccountInfo accountInfo)
\r
871 //Avoid adding a duplicate accountInfo
\r
872 _accounts.TryAdd(accountInfo.AccountKey, accountInfo);
\r
875 public void RemoveAccount(AccountInfo accountInfo)
\r
877 AccountInfo account;
\r
878 _accounts.TryRemove(accountInfo.AccountKey, out account);
\r
880 SnapshotDifferencer differencer;
\r
881 _differencer.Differencers.TryRemove(accountInfo.AccountKey, out differencer);
\r