X-Git-Url: https://code.grnet.gr/git/pithos-ms-client/blobdiff_plain/a27aa4479e60923c850cab41a5b16704805dbb08..9c6d31931196c90864d509843bfac0e62dc965bc:/trunk/Pithos.Core/Agents/NetworkAgent.cs diff --git a/trunk/Pithos.Core/Agents/NetworkAgent.cs b/trunk/Pithos.Core/Agents/NetworkAgent.cs index b8c9dcc..6ce712a 100644 --- a/trunk/Pithos.Core/Agents/NetworkAgent.cs +++ b/trunk/Pithos.Core/Agents/NetworkAgent.cs @@ -1,131 +1,425 @@ -using System; +// ----------------------------------------------------------------------- +// +// Copyright 2011 GRNET S.A. All rights reserved. +// +// Redistribution and use in source and binary forms, with or +// without modification, are permitted provided that the following +// conditions are met: +// +// 1. Redistributions of source code must retain the above +// copyright notice, this list of conditions and the following +// disclaimer. +// +// 2. Redistributions in binary form must reproduce the above +// copyright notice, this list of conditions and the following +// disclaimer in the documentation and/or other materials +// provided with the distribution. +// +// THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS +// OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED +// WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR +// PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR +// CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF +// USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED +// AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT +// LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN +// ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE +// POSSIBILITY OF SUCH DAMAGE. +// +// The views and conclusions contained in the software and +// documentation are those of the authors and should not be +// interpreted as representing official policies, either expressed +// or implied, of GRNET S.A. +// +// ----------------------------------------------------------------------- + +using System; +using System.Collections.Concurrent; using System.Collections.Generic; using System.ComponentModel.Composition; using System.Diagnostics; using System.Diagnostics.Contracts; using System.IO; using System.Linq; -using System.Text; +using System.Net; using System.Threading; using System.Threading.Tasks; +using System.Threading.Tasks.Dataflow; +using Castle.ActiveRecord; using Pithos.Interfaces; using Pithos.Network; +using log4net; namespace Pithos.Core.Agents { + //TODO: Ensure all network operations use exact casing. Pithos is case sensitive [Export] public class NetworkAgent { private Agent _agent; - [Import] + //A separate agent is used to execute delete actions immediatelly; + private ActionBlock _deleteAgent; + readonly ConcurrentDictionary _deletedFiles=new ConcurrentDictionary(); + + + private readonly ManualResetEventSlim _pauseAgent = new ManualResetEventSlim(true); + + [System.ComponentModel.Composition.Import] public IStatusKeeper StatusKeeper { get; set; } public IStatusNotification StatusNotification { get; set; } - [Import] - public ICloudClient CloudClient { get; set; } - [Import] - public FileAgent FileAgent {get;set;} + private static readonly ILog Log = LogManager.GetLogger("NetworkAgent"); - /* - [Import] - public IPithosWorkflow Workflow { get; set; } -*/ + private readonly ConcurrentBag _accounts = new ConcurrentBag(); + [System.ComponentModel.Composition.Import] + public IPithosSettings Settings { get; set; } - public string PithosContainer { get; set; } - public string TrashContainer { get; private set; } + private bool _firstPoll = true; + private TaskCompletionSource _tcs; - public int BlockSize { get; set; } - public string BlockHash { get; set; } - - - public void Start(string pithosContainer, string trashContainer, int blockSize, string blockHash) + public void Start() { - if (String.IsNullOrWhiteSpace(pithosContainer)) - throw new ArgumentNullException("pithosContainer"); - if (String.IsNullOrWhiteSpace(trashContainer)) - throw new ArgumentNullException("trashContainer"); - Contract.EndContractBlock(); - - PithosContainer = pithosContainer; - TrashContainer = trashContainer; - BlockSize = blockSize; - BlockHash = blockHash; - - + _firstPoll = true; _agent = Agent.Start(inbox => { Action loop = null; loop = () => { + _pauseAgent.Wait(); var message = inbox.Receive(); - -/* - var process=Process(message); + var process=message.Then(Process,inbox.CancellationToken); inbox.LoopAsync(process, loop); + }; + loop(); + }); + + _deleteAgent = new ActionBlock(message =>ProcessDelete(message),new ExecutionDataflowBlockOptions{MaxDegreeOfParallelism=4}); + /* + Action loop = null; + loop = () => + { + var message = inbox.Receive(); + var process = message.Then(ProcessDelete,inbox.CancellationToken); + inbox.LoopAsync(process, loop); + }; + loop(); */ -/* - process1.ContinueWith(t => + } + + private async Task Process(CloudAction action) + { + if (action == null) + throw new ArgumentNullException("action"); + if (action.AccountInfo==null) + throw new ArgumentException("The action.AccountInfo is empty","action"); + Contract.EndContractBlock(); + + var accountInfo = action.AccountInfo; + + using (log4net.ThreadContext.Stacks["NETWORK"].Push("PROCESS")) + { + Log.InfoFormat("[ACTION] Start Processing {0}", action); + + var cloudFile = action.CloudFile; + var downloadPath = action.GetDownloadPath(); + + try + { + + if (action.Action == CloudActionType.DeleteCloud) { - inbox.DoAsync(loop); - if (t.IsFaulted) + //Redirect deletes to the delete agent + _deleteAgent.Post((CloudDeleteAction)action); + } + if (IsDeletedFile(action)) + { + //Clear the status of already deleted files to avoid reprocessing + if (action.LocalFile != null) + this.StatusKeeper.ClearFileStatus(action.LocalFile.FullName); + } + else + { + switch (action.Action) { - var ex = t.Exception.InnerException; - if (ex is OperationCanceledException) - inbox.Stop(); + case CloudActionType.UploadUnconditional: + //Abort if the file was deleted before we reached this point + await UploadCloudFile(action); + break; + case CloudActionType.DownloadUnconditional: + await DownloadCloudFile(accountInfo, cloudFile, downloadPath); + break; + case CloudActionType.RenameCloud: + var moveAction = (CloudMoveAction) action; + RenameCloudFile(accountInfo, moveAction); + break; + case CloudActionType.MustSynch: + if (!File.Exists(downloadPath) && !Directory.Exists(downloadPath)) + { + await DownloadCloudFile(accountInfo, cloudFile, downloadPath); + } + else + { + await SyncFiles(accountInfo, action); + } + break; } - }); -*/ - //inbox.DoAsync(loop); + } + Log.InfoFormat("[ACTION] End Processing {0}:{1}->{2}", action.Action, action.LocalFile, + action.CloudFile.Name); + } + catch (WebException exc) + { + Log.ErrorFormat("[WEB ERROR] {0} : {1} -> {2} due to exception\r\n{3}", action.Action, action.LocalFile, action.CloudFile, exc); + } + catch (OperationCanceledException) + { + throw; + } + catch (DirectoryNotFoundException) + { + Log.ErrorFormat("{0} : {1} -> {2} failed because the directory was not found.\n Rescheduling a delete", + action.Action, action.LocalFile, action.CloudFile); + //Post a delete action for the missing file + Post(new CloudDeleteAction(action)); + } + catch (FileNotFoundException) + { + Log.ErrorFormat("{0} : {1} -> {2} failed because the file was not found.\n Rescheduling a delete", + action.Action, action.LocalFile, action.CloudFile); + //Post a delete action for the missing file + Post(new CloudDeleteAction(action)); + } + catch (Exception exc) + { + Log.ErrorFormat("[REQUEUE] {0} : {1} -> {2} due to exception\r\n{3}", + action.Action, action.LocalFile, action.CloudFile, exc); + _agent.Post(action); + } + } + } - var process = message.ContinueWith(t => - { - var action = t.Result; - - Process(action); - inbox.DoAsync(loop); - }); + /// + /// Processes cloud delete actions + /// + /// The delete action to execute + /// + /// + /// When a file/folder is deleted locally, we must delete it ASAP from the server and block any download + /// operations that may be in progress. + /// + /// A separate agent is used to process deletes because the main agent may be busy with a long operation. + /// + /// + private async Task ProcessDelete(CloudDeleteAction action) + { + if (action == null) + throw new ArgumentNullException("action"); + if (action.AccountInfo==null) + throw new ArgumentException("The action.AccountInfo is empty","action"); + Contract.EndContractBlock(); + + var accountInfo = action.AccountInfo; - process.ContinueWith(t => + using (log4net.ThreadContext.Stacks["NETWORK"].Push("PROCESS")) + { + Log.InfoFormat("[ACTION] Start Processing {0}", action); + + var cloudFile = action.CloudFile; + + try + { + //Acquire a lock on the deleted file to prevent uploading/downloading operations from the normal + //agent + using (var gate = NetworkGate.Acquire(action.LocalFile.FullName, NetworkOperation.Deleting)) { - inbox.DoAsync(loop); - if (t.IsFaulted) - { - var ex = t.Exception.InnerException; - if (ex is OperationCanceledException) - inbox.Stop(); - } - }); + //Add the file URL to the deleted files list + var key = GetFileKey(action.CloudFile); + _deletedFiles[key] = DateTime.Now; - }; - loop(); - }); + _pauseAgent.Reset(); + // and then delete the file from the server + DeleteCloudFile(accountInfo, cloudFile); + + Log.InfoFormat("[ACTION] End Delete {0}:{1}->{2}", action.Action, action.LocalFile, + action.CloudFile.Name); + } + } + catch (WebException exc) + { + Log.ErrorFormat("[WEB ERROR] {0} : {1} -> {2} due to exception\r\n{3}", action.Action, action.LocalFile, action.CloudFile, exc); + } + catch (OperationCanceledException) + { + throw; + } + catch (DirectoryNotFoundException) + { + Log.ErrorFormat("{0} : {1} -> {2} failed because the directory was not found.\n Rescheduling a delete", + action.Action, action.LocalFile, action.CloudFile); + //Repost a delete action for the missing file + _deleteAgent.Post(action); + } + catch (FileNotFoundException) + { + Log.ErrorFormat("{0} : {1} -> {2} failed because the file was not found.\n Rescheduling a delete", + action.Action, action.LocalFile, action.CloudFile); + //Post a delete action for the missing file + _deleteAgent.Post(action); + } + catch (Exception exc) + { + Log.ErrorFormat("[REQUEUE] {0} : {1} -> {2} due to exception\r\n{3}", + action.Action, action.LocalFile, action.CloudFile, exc); + + _deleteAgent.Post(action); + } + finally + { + if (_deleteAgent.InputCount == 0) + _pauseAgent.Set(); + + } + } + } + + private static string GetFileKey(ObjectInfo info) + { + var key = String.Format("{0}/{1}/{2}", info.Account, info.Container,info.Name); + return key; } + private async Task SyncFiles(AccountInfo accountInfo,CloudAction action) + { + if (accountInfo == null) + throw new ArgumentNullException("accountInfo"); + if (action==null) + throw new ArgumentNullException("action"); + if (action.LocalFile==null) + throw new ArgumentException("The action's local file is not specified","action"); + if (!Path.IsPathRooted(action.LocalFile.FullName)) + throw new ArgumentException("The action's local file path must be absolute","action"); + if (action.CloudFile== null) + throw new ArgumentException("The action's cloud file is not specified", "action"); + Contract.EndContractBlock(); + + var localFile = action.LocalFile; + var cloudFile = action.CloudFile; + var downloadPath=action.LocalFile.GetProperCapitalization(); + + var cloudHash = cloudFile.Hash.ToLower(); + var localHash = action.LocalHash.Value.ToLower(); + var topHash = action.TopHash.Value.ToLower(); + + //Not enough to compare only the local hashes, also have to compare the tophashes + + //If any of the hashes match, we are done + if ((cloudHash == localHash || cloudHash == topHash)) + { + Log.InfoFormat("Skipping {0}, hashes match",downloadPath); + return; + } + + //The hashes DON'T match. We need to sync + var lastLocalTime = localFile.LastWriteTime; + var lastUpTime = cloudFile.Last_Modified; + + //If the local file is newer upload it + if (lastUpTime <= lastLocalTime) + { + //It probably means it was changed while the app was down + UploadCloudFile(action); + } + else + { + //It the cloud file has a later date, it was modified by another user or computer. + //We need to check the local file's status + var status = StatusKeeper.GetFileStatus(downloadPath); + switch (status) + { + case FileStatus.Unchanged: + //If the local file's status is Unchanged, we can go on and download the newer cloud file + await DownloadCloudFile(accountInfo,cloudFile,downloadPath); + break; + case FileStatus.Modified: + //If the local file is Modified, we may have a conflict. In this case we should mark the file as Conflict + //We can't ensure that a file modified online since the last time will appear as Modified, unless we + //index all files before we start listening. + case FileStatus.Created: + //If the local file is Created, it means that the local and cloud files aren't related, + // yet they have the same name. + + //In both cases we must mark the file as in conflict + ReportConflict(downloadPath); + break; + default: + //Other cases should never occur. Mark them as Conflict as well but log a warning + ReportConflict(downloadPath); + Log.WarnFormat("Unexcepted status {0} for file {1}->{2}", status, + downloadPath, action.CloudFile.Name); + break; + } + } + } + + private void ReportConflict(string downloadPath) + { + if (String.IsNullOrWhiteSpace(downloadPath)) + throw new ArgumentNullException("downloadPath"); + Contract.EndContractBlock(); + + StatusKeeper.SetFileOverlayStatus(downloadPath, FileOverlayStatus.Conflict); + var message = String.Format("Conflict detected for file {0}", downloadPath); + Log.Warn(message); + StatusNotification.NotifyChange(message, TraceLevel.Warning); + } public void Post(CloudAction cloudAction) { if (cloudAction == null) throw new ArgumentNullException("cloudAction"); + if (cloudAction.AccountInfo==null) + throw new ArgumentException("The CloudAction.AccountInfo is empty","cloudAction"); Contract.EndContractBlock(); - + + _pauseAgent.Wait(); + //If the action targets a local file, add a treehash calculation - if (cloudAction.LocalFile != null) + if (!(cloudAction is CloudDeleteAction) && cloudAction.LocalFile as FileInfo != null) { - cloudAction.TopHash = new Lazy(() => Signature.CalculateTreeHashAsync(cloudAction.LocalFile, - BlockSize, BlockHash).Result - .TopHash.ToHashString()); - + var accountInfo = cloudAction.AccountInfo; + var localFile = (FileInfo) cloudAction.LocalFile; + if (localFile.Length > accountInfo.BlockSize) + cloudAction.TopHash = + new Lazy(() => Signature.CalculateTreeHashAsync(localFile, + accountInfo.BlockSize, + accountInfo.BlockHash).Result + .TopHash.ToHashString()); + else + { + cloudAction.TopHash = new Lazy(() => cloudAction.LocalHash.Value); + } + } + else + { + //The hash for a directory is the empty string + cloudAction.TopHash = new Lazy(() => String.Empty); } - _agent.Post(cloudAction); + + if (cloudAction is CloudDeleteAction) + _deleteAgent.Post((CloudDeleteAction)cloudAction); + else + _agent.Post(cloudAction); } - class ObjectInfoByNameComparer:IEqualityComparer + /* class ObjectInfoByNameComparer:IEqualityComparer { public bool Equals(ObjectInfo x, ObjectInfo y) { @@ -136,572 +430,816 @@ namespace Pithos.Core.Agents { return obj.Name.ToLower().GetHashCode(); } - } + }*/ - public Task ProcessRemoteFiles(string accountPath,DateTime? since=null) - { + public void SynchNow() + { + if (_tcs!=null) + _tcs.SetResult(true); + else + { + //TODO: This may be OK for testing purposes, but we have no guarantee that it will + //work properly in production + PollRemoteFiles(repeat:false); + } + } - Trace.CorrelationManager.StartLogicalOperation(); - Trace.TraceInformation("[LISTENER] Scheduled"); - var listObjects = Task.Factory.StartNewDelayed(10000).ContinueWith(t => - CloudClient.ListObjects(PithosContainer,since)); + //Remote files are polled periodically. Any changes are processed + public async Task PollRemoteFiles(DateTime? since = null,bool repeat=true) + { - DateTime nextSince = DateTime.Now.AddSeconds(-1); + _tcs = new TaskCompletionSource(); + var wait = TaskEx.Delay(TimeSpan.FromSeconds(Settings.PollingInterval), _agent.CancellationToken); + var signaledTask=await TaskEx.WhenAny(_tcs.Task,wait); + //If polling is signalled by SynchNow, ignore the since tag + if (signaledTask is Task) + since = null; - var enqueueFiles = listObjects.ContinueWith(task => + using (log4net.ThreadContext.Stacks["Retrieve Remote"].Push("All accounts")) { - if (task.IsFaulted) + + try { - //ListObjects failed at this point, need to reschedule - Trace.TraceError("[FAIL] ListObjects in ProcessRemoteFiles with {0}", task.Exception); - ProcessRemoteFiles(accountPath, since); - return; - } - Trace.CorrelationManager.StartLogicalOperation("Listener"); - Trace.TraceInformation("[LISTENER] Start Processing"); + //Next time we will check for all changes since the current check minus 1 second + //This is done to ensure there are no discrepancies due to clock differences + DateTime nextSince = DateTime.Now.AddSeconds(-1); - var remoteObjects = task.Result; + var tasks = from accountInfo in _accounts + select ProcessAccountFiles(accountInfo, since); - var remote=from info in remoteObjects - let name=info.Name - where !name.EndsWith(".ignore",StringComparison.InvariantCultureIgnoreCase) && - !name.StartsWith("fragments/",StringComparison.InvariantCultureIgnoreCase) - select info; + await TaskEx.WhenAll(tasks.ToList()); - var commonObjects = new List>(); - var remoteOnly = new List(); - - //In order to avoid multiple iterations over the files, we iterate only once - //over the remote files - foreach (var objectInfo in remote) + _firstPoll = false; + if (repeat) + PollRemoteFiles(nextSince); + } + catch (Exception ex) { - var relativePath= objectInfo.Name.RelativeUrlToFilePath();// fileInfo.AsRelativeUrlTo(FileAgent.RootPath); - //and remove any matching objects from the list, adding them to the commonObjects list - if (FileAgent.Exists(relativePath)) - { - var localFile = FileAgent.GetFileInfo(relativePath); - var state=FileState.FindByFilePath(localFile.FullName); - commonObjects.Add(Tuple.Create(objectInfo, localFile,state)); - } - else - //If there is no match we add them to the localFiles list - remoteOnly.Add(objectInfo); + Log.ErrorFormat("Error while processing accounts\r\n{0}",ex); + //In case of failure retry with the same parameter + PollRemoteFiles(since); } + + + } + } - //At the end of the iteration, the *remote* list will contain the files that exist - //only on the server + public async Task ProcessAccountFiles(AccountInfo accountInfo,DateTime? since=null) + { + if (accountInfo==null) + throw new ArgumentNullException("accountInfo"); + if (String.IsNullOrWhiteSpace(accountInfo.AccountPath)) + throw new ArgumentException("The AccountInfo.AccountPath is empty","accountInfo"); + Contract.EndContractBlock(); - //Remote files should be downloaded - var actionsForRemote = from upFile in remoteOnly - select new CloudAction(CloudActionType.DownloadUnconditional,upFile); + using (log4net.ThreadContext.Stacks["Retrieve Remote"].Push(accountInfo.UserName)) + { + Log.Info("Scheduled"); + var client=new CloudFilesClient(accountInfo); - //Common files should be checked on a per-case basis to detect differences, which is newer - var actionsForCommon = from pair in commonObjects - let objectInfo = pair.Item1 - let localFile = pair.Item2 - let state=pair.Item3 - select new CloudAction(CloudActionType.MustSynch, - localFile, objectInfo,state); + var containers = client.ListContainers(accountInfo.UserName); + CreateContainerFolders(accountInfo, containers); + + try + { + _pauseAgent.Wait(); + //Get the poll time now. We may miss some deletions but it's better to keep a file that was deleted + //than delete a file that was created while we were executing the poll + var pollTime = DateTime.Now; + + //Get the list of server objects changed since the last check + //The name of the container is passed as state in order to create a dictionary of tasks in a subsequent step + var listObjects = from container in containers + select Task>.Factory.StartNew(_ => + client.ListObjects(accountInfo.UserName,container.Name, since),container.Name); + - //Collect all the actions - var allActions = actionsForRemote.Union(actionsForCommon); + var listTasks = await Task.Factory.WhenAll(listObjects.ToArray()); - //And remove those that are already being processed by the agent - var distinctActions =allActions - .Except(_agent.GetEnumerable(), new PithosMonitor.LocalFileComparer()) - .ToList(); + using (log4net.ThreadContext.Stacks["SCHEDULE"].Push("Process Results")) + { + var dict = listTasks.ToDictionary(t => t.AsyncState); + + //Get all non-trash objects. Remember, the container name is stored in AsyncState + var remoteObjects = from objectList in listTasks + where (string) objectList.AsyncState != "trash" + from obj in objectList.Result + select obj; + + //TODO: Change the way deleted objects are detected. + //The list operation returns all existing objects so we could detect deleted remote objects + //by detecting objects that exist only locally. There are several cases where this is NOT the case: + //1. The first time the application runs, as there may be files that were added while + // the application was down. + //2. An object that is currently being uploaded will not appear in the remote list + // until the upload finishes. + // SOLUTION 1: Check the upload/download queue for the file + // SOLUTION 2: Check the SQLite states for the file's entry. If it is being uploaded, + // or its last modification was after the current poll, don't delete it. This way we don't + // delete objects whose upload finished too late to be included in the list. + //We need to detect and protect against such situations + //TODO: Does FileState have a LastModification field? + //TODO: How do we update the LastModification field? Do we need to add SQLite triggers? + // Do we need to use a proper SQLite schema? + // We can create a trigger with + // CREATE TRIGGER IF NOT EXISTS update_last_modified UPDATE ON FileState FOR EACH ROW + // BEGIN + // UPDATE FileState SET LastModification=datetime('now') WHERE Id=old.Id; + // END; + // + //NOTE: Some files may have been deleted remotely while the application was down. + // We DO have to delete those files. Checking the trash makes it easy to detect them, + // Otherwise, we can't be really sure whether we need to upload or delete + // the local-only files. + // SOLUTION 1: Ask the user when such a local-only file is detected during the first poll. + // SOLUTION 2: Mark conflict and ask the user as in #1 + + var trashObjects = dict["trash"].Result; + //var sharedObjects = ((Task>) task.Result[2]).Result; + + //Items with the same name, hash may be both in the container and the trash + //Don't delete items that exist in the container + var realTrash = from trash in trashObjects + where + !remoteObjects.Any( + info => info.Name == trash.Name && info.Hash == trash.Hash) + select trash; + ProcessTrashedFiles(accountInfo, realTrash); + + + var cleanRemotes = (from info in remoteObjects + //.Union(sharedObjects) + let name = info.Name + where !name.EndsWith(".ignore", StringComparison.InvariantCultureIgnoreCase) && + !name.StartsWith(FolderConstants.CacheFolder + "/", + StringComparison.InvariantCultureIgnoreCase) + select info).ToList(); + + + + ProcessDeletedFiles(accountInfo, cleanRemotes, pollTime); + + //Create a list of actions from the remote files + var allActions = ObjectsToActions(accountInfo, cleanRemotes); + + + //var relativePath = objectInfo.RelativeUrlToFilePath(accountInfo.UserName); + + //And remove those that are already being processed by the agent + var distinctActions = allActions + .Except(_agent.GetEnumerable(), new PithosMonitor.LocalFileComparer()) + .ToList(); + + //Queue all the actions + foreach (var message in distinctActions) + { + Post(message); + } - //Queue all the actions - foreach (var message in distinctActions) + Log.Info("[LISTENER] End Processing"); + } + } + catch (Exception ex) { - Post(message); + Log.ErrorFormat("[FAIL] ListObjects for{0} in ProcessRemoteFiles with {1}", accountInfo.UserName, ex); + return; } + Log.Info("[LISTENER] Finished"); - if(remoteOnly.Count>0) - StatusNotification.NotifyChange(String.Format("Processing {0} new files", remoteOnly.Count)); - - Trace.TraceInformation("[LISTENER] End Processing"); - Trace.CorrelationManager.StopLogicalOperation(); + } + } - }); + /// + /// Deletes local files that are not found in the list of cloud files + /// + /// + /// + /// + private void ProcessDeletedFiles(AccountInfo accountInfo, IEnumerable cloudFiles, DateTime pollTime) + { + if (accountInfo == null) + throw new ArgumentNullException("accountInfo"); + if (String.IsNullOrWhiteSpace(accountInfo.AccountPath)) + throw new ArgumentException("The AccountInfo.AccountPath is empty", "accountInfo"); + if (cloudFiles == null) + throw new ArgumentNullException("cloudFiles"); + Contract.EndContractBlock(); - var loop = enqueueFiles.ContinueWith(t => + //Check the Modified date to ensure that were just created and haven't been uploaded yet + //NOTE: The NHibernate LINQ provider doesn't support custom functions so we need to break the query + //in two steps + //NOTE: DON'T return files that are already in conflict. The first poll would mark them as + //"In Conflict" but subsequent polls would delete them + var deleteCandidates = (from state in FileState.Queryable + where + state.Modified <= pollTime + && state.FilePath.StartsWith(accountInfo.AccountPath) + && state.FileStatus != FileStatus.Conflict + select state).ToList(); + + var filesToDelete = (from deleteCandidate in deleteCandidates + let localFile = FileInfoExtensions.FromPath(deleteCandidate.FilePath) + let relativeFilePath = localFile.AsRelativeTo(accountInfo.AccountPath) + where !cloudFiles.Any(r => Path.Combine(r.Container, r.Name) == relativeFilePath) + select localFile).ToList(); + + //On the first run + if (_firstPoll) { - if (t.IsFaulted) + //Set the status of missing files to Conflict + foreach (var item in filesToDelete) { - Trace.TraceError("[LISTENER] Exception: {0}", t.Exception); + StatusKeeper.SetFileState(item.FullName, FileStatus.Conflict, FileOverlayStatus.Deleted); } - else + StatusNotification.NotifyConflicts(filesToDelete, String.Format("{0} local files are missing from Pithos, possibly because they were deleted",filesToDelete.Count)); + } + else + { + foreach (var item in filesToDelete) { - Trace.TraceInformation("[LISTENER] Finished"); + item.Delete(); + StatusKeeper.ClearFileStatus(item.FullName); } - ProcessRemoteFiles(accountPath, nextSince); - - }); - return loop; + StatusNotification.NotifyForFiles(filesToDelete, String.Format("{0} files were deleted",filesToDelete.Count),TraceLevel.Info); + } + } - - private Task Process(Task action) + private static void CreateContainerFolders(AccountInfo accountInfo, IEnumerable containers) { - return action.ContinueWith(t=> Process(t.Result)); - } + var containerPaths = from container in containers + let containerPath = Path.Combine(accountInfo.AccountPath, container.Name) + where container.Name != FolderConstants.TrashContainer && !Directory.Exists(containerPath) + select containerPath; + foreach (var path in containerPaths) + { + Directory.CreateDirectory(path); + } + } - private void Process(CloudAction action) + //Creates an appropriate action for each server file + private IEnumerable ObjectsToActions(AccountInfo accountInfo,IEnumerable remote) { - if (action==null) - throw new ArgumentNullException("action"); + if (remote==null) + throw new ArgumentNullException(); Contract.EndContractBlock(); + var fileAgent = GetFileAgent(accountInfo); - Trace.TraceInformation("[ACTION] Start Processing {0}:{1}->{2}", action.Action, action.LocalFile, action.CloudFile.Name); - var localFile = action.LocalFile; - var cloudFile = action.CloudFile; - var downloadPath = (cloudFile == null) ? String.Empty - : Path.Combine(FileAgent.RootPath, cloudFile.Name.RelativeUrlToFilePath()); - - try + //In order to avoid multiple iterations over the files, we iterate only once + //over the remote files + foreach (var objectInfo in remote) { - switch (action.Action) + var relativePath = objectInfo.RelativeUrlToFilePath(accountInfo.UserName); + //and remove any matching objects from the list, adding them to the commonObjects list + + if (fileAgent.Exists(relativePath)) { - case CloudActionType.UploadUnconditional: - UploadCloudFile(localFile, action.LocalHash.Value,action.TopHash.Value); - break; - case CloudActionType.DownloadUnconditional: - DownloadCloudFile(PithosContainer, new Uri(cloudFile.Name,UriKind.Relative), downloadPath); - break; - case CloudActionType.DeleteCloud: - DeleteCloudFile(cloudFile.Name); - break; - case CloudActionType.RenameCloud: - RenameCloudFile(action.OldFileName, action.NewPath, action.NewFileName); - break; - case CloudActionType.MustSynch: - if (File.Exists(downloadPath)) - { - var cloudHash = cloudFile.Hash; - var localHash = action.LocalHash.Value; - var topHash = action.TopHash.Value; - //Not enough to compare only the local hashes, also have to compare the tophashes - if (!cloudHash.Equals(localHash, StringComparison.InvariantCultureIgnoreCase) && - !cloudHash.Equals(topHash, StringComparison.InvariantCultureIgnoreCase)) - { - var lastLocalTime = localFile.LastWriteTime; - var lastUpTime = cloudFile.Last_Modified; - if (lastUpTime <= lastLocalTime) - { - //Local change while the app was down or Files in conflict - //Maybe need to store version as well, to check who has the latest version + //If a directory object already exists, we don't need to perform any other action + var localFile = fileAgent.GetFileSystemInfo(relativePath); + if (objectInfo.Content_Type == @"application/directory" && localFile is DirectoryInfo) + continue; + using (new SessionScope(FlushAction.Never)) + { + var state = StatusKeeper.GetStateByFilePath(localFile.FullName); + //FileState.FindByFilePath(localFile.FullName); + //Common files should be checked on a per-case basis to detect differences, which is newer - //StatusKeeper.SetFileOverlayStatus(downloadPath, FileOverlayStatus.Conflict); - UploadCloudFile(localFile, action.LocalHash.Value,action.TopHash.Value); - } - else - { - var status = StatusKeeper.GetFileStatus(downloadPath); - switch (status) - { - case FileStatus.Unchanged: - //It he cloud file has a later date, it was modified by another user or computer. - //If the local file's status is Unchanged, we should go on and download the cloud file - DownloadCloudFile(PithosContainer, new Uri(action.CloudFile.Name,UriKind.Relative), downloadPath); - break; - case FileStatus.Modified: - //If the local file is Modified, we may have a conflict. In this case we should mark the file as Conflict - //We can't ensure that a file modified online since the last time will appear as Modified, unless we - //index all files before we start listening. - StatusKeeper.SetFileOverlayStatus(downloadPath, FileOverlayStatus.Conflict); - break; - case FileStatus.Created: - //If the local file is Created, it means that the local and cloud files aren't related yet have the same name - //In this case we must mark the file as in conflict - //Other cases should never occur. Mark them as Conflict as well but log a warning - StatusKeeper.SetFileOverlayStatus(downloadPath,FileOverlayStatus.Conflict); - break; - default: - //If the local file is Created, it means that the local and cloud files aren't related yet have the same name - //In this case we must mark the file as in conflict - //Other cases should never occur. Mark them as Conflict as well but log a warning - StatusKeeper.SetFileOverlayStatus(downloadPath,FileOverlayStatus.Conflict); - Trace.TraceWarning("Unexcepted status {0} for file {1}->{2}",status,downloadPath,action.CloudFile.Name); - break; - } - } - } - } - else - DownloadCloudFile(PithosContainer, new Uri(action.CloudFile.Name,UriKind.Relative), downloadPath); - break; + yield return new CloudAction(accountInfo, CloudActionType.MustSynch, + localFile, objectInfo, state, accountInfo.BlockSize, + accountInfo.BlockHash); + } } - Trace.TraceInformation("[ACTION] End Processing {0}:{1}->{2}", action.Action, action.LocalFile, action.CloudFile.Name); - } - catch (OperationCanceledException) - { - throw; - } - catch (Exception exc) - { - Trace.TraceError("[REQUEUE] {0} : {1} -> {2} due to exception\r\n{3}", - action.Action, action.LocalFile, action.CloudFile, exc); + else + { + //If there is no match we add them to the localFiles list + //but only if the file is not marked for deletion + var targetFile = Path.Combine(accountInfo.AccountPath, relativePath); + var fileStatus = StatusKeeper.GetFileStatus(targetFile); + if (fileStatus != FileStatus.Deleted) + { + //Remote files should be downloaded + yield return new CloudDownloadAction(accountInfo,objectInfo); + } + } + } + } - _agent.Post(action); - } + private static FileAgent GetFileAgent(AccountInfo accountInfo) + { + return AgentLocator.Get(accountInfo.AccountPath); + } + private void ProcessTrashedFiles(AccountInfo accountInfo,IEnumerable trashObjects) + { + var fileAgent = GetFileAgent(accountInfo); + foreach (var trashObject in trashObjects) + { + var barePath = trashObject.RelativeUrlToFilePath(accountInfo.UserName); + //HACK: Assume only the "pithos" container is used. Must find out what happens when + //deleting a file from a different container + var relativePath = Path.Combine("pithos", barePath); + fileAgent.Delete(relativePath); + } } - - private void RenameCloudFile(string oldFileName, string newPath, string newFileName) + private void RenameCloudFile(AccountInfo accountInfo,CloudMoveAction action) { - if (String.IsNullOrWhiteSpace(oldFileName)) - throw new ArgumentNullException("oldFileName"); - if (String.IsNullOrWhiteSpace(oldFileName)) - throw new ArgumentNullException("newPath"); - if (String.IsNullOrWhiteSpace(oldFileName)) - throw new ArgumentNullException("newFileName"); + if (accountInfo==null) + throw new ArgumentNullException("accountInfo"); + if (action==null) + throw new ArgumentNullException("action"); + if (action.CloudFile==null) + throw new ArgumentException("CloudFile","action"); + if (action.LocalFile==null) + throw new ArgumentException("LocalFile","action"); + if (action.OldLocalFile==null) + throw new ArgumentException("OldLocalFile","action"); + if (action.OldCloudFile==null) + throw new ArgumentException("OldCloudFile","action"); Contract.EndContractBlock(); + + + var newFilePath = action.LocalFile.FullName; + + //How do we handle concurrent renames and deletes/uploads/downloads? + //* A conflicting upload means that a file was renamed before it had a chance to finish uploading + // This should never happen as the network agent executes only one action at a time + //* A conflicting download means that the file was modified on the cloud. While we can go on and complete + // the rename, there may be a problem if the file is downloaded in blocks, as subsequent block requests for the + // same name will fail. + // This should never happen as the network agent executes only one action at a time. + //* A conflicting delete can happen if the rename was followed by a delete action that didn't have the chance + // to remove the rename from the queue. + // We can probably ignore this case. It will result in an error which should be ignored + + //The local file is already renamed - this.StatusKeeper.SetFileOverlayStatus(newPath, FileOverlayStatus.Modified); + StatusKeeper.SetFileOverlayStatus(newFilePath, FileOverlayStatus.Modified); + - CloudClient.MoveObject(PithosContainer, oldFileName, PithosContainer, newFileName); + var account = action.CloudFile.Account ?? accountInfo.UserName; + var container = action.CloudFile.Container; + + var client = new CloudFilesClient(accountInfo); + //TODO: What code is returned when the source file doesn't exist? + client.MoveObject(account, container, action.OldCloudFile.Name, container, action.CloudFile.Name); - this.StatusKeeper.SetFileStatus(newPath, FileStatus.Unchanged); - this.StatusKeeper.SetFileOverlayStatus(newPath, FileOverlayStatus.Normal); - NativeMethods.RaiseChangeNotification(newPath); + StatusKeeper.SetFileStatus(newFilePath, FileStatus.Unchanged); + StatusKeeper.SetFileOverlayStatus(newFilePath, FileOverlayStatus.Normal); + NativeMethods.RaiseChangeNotification(newFilePath); } - private void DeleteCloudFile(string fileName) - { - if (String.IsNullOrWhiteSpace(fileName)) - throw new ArgumentNullException("fileName"); - if (Path.IsPathRooted(fileName)) - throw new ArgumentException("The fileName should not be rooted","fileName"); + private void DeleteCloudFile(AccountInfo accountInfo, ObjectInfo cloudFile) + { + if (accountInfo == null) + throw new ArgumentNullException("accountInfo"); + if (cloudFile==null) + throw new ArgumentNullException("cloudFile"); + + if (String.IsNullOrWhiteSpace(cloudFile.Container)) + throw new ArgumentException("Invalid container", "cloudFile"); Contract.EndContractBlock(); + + var fileAgent = GetFileAgent(accountInfo); + + using ( log4net.ThreadContext.Stacks["DeleteCloudFile"].Push("Delete")) + { + var fileName= cloudFile.RelativeUrlToFilePath(accountInfo.UserName); + var info = fileAgent.GetFileSystemInfo(fileName); + var fullPath = info.FullName.ToLower(); - this.StatusKeeper.SetFileOverlayStatus(fileName, FileOverlayStatus.Modified); + StatusKeeper.SetFileOverlayStatus(fullPath, FileOverlayStatus.Modified); - CloudClient.MoveObject(PithosContainer, fileName, TrashContainer, fileName); + var account = cloudFile.Account ?? accountInfo.UserName; + var container = cloudFile.Container ;//?? FolderConstants.PithosContainer; - this.StatusKeeper.ClearFileStatus(fileName); - this.StatusKeeper.RemoveFileOverlayStatus(fileName); + var client = new CloudFilesClient(accountInfo); + client.DeleteObject(account, container, cloudFile.Name); + + StatusKeeper.ClearFileStatus(fullPath); + } } //Download a file. - private void DownloadCloudFile(string container, Uri relativeUrl, string localPath) + private async Task DownloadCloudFile(AccountInfo accountInfo, ObjectInfo cloudFile , string filePath) { - if (String.IsNullOrWhiteSpace(container)) - throw new ArgumentNullException("container"); - if (relativeUrl==null) - throw new ArgumentNullException("relativeUrl"); - if (String.IsNullOrWhiteSpace(localPath)) - throw new ArgumentNullException("localPath"); - if (!Path.IsPathRooted(localPath)) - throw new ArgumentException("The localPath must be rooted", "localPath"); + if (accountInfo == null) + throw new ArgumentNullException("accountInfo"); + if (cloudFile == null) + throw new ArgumentNullException("cloudFile"); + if (String.IsNullOrWhiteSpace(cloudFile.Account)) + throw new ArgumentNullException("cloudFile"); + if (String.IsNullOrWhiteSpace(cloudFile.Container)) + throw new ArgumentNullException("cloudFile"); + if (String.IsNullOrWhiteSpace(filePath)) + throw new ArgumentNullException("filePath"); + if (!Path.IsPathRooted(filePath)) + throw new ArgumentException("The filePath must be rooted", "filePath"); Contract.EndContractBlock(); + var localPath = Interfaces.FileInfoExtensions.GetProperFilePathCapitalization(filePath); + var relativeUrl = new Uri(cloudFile.Name, UriKind.Relative); + var url = relativeUrl.ToString(); - if (url.EndsWith(".ignore",StringComparison.InvariantCultureIgnoreCase)) + if (cloudFile.Name.EndsWith(".ignore", StringComparison.InvariantCultureIgnoreCase)) return; + //Are we already downloading or uploading the file? using (var gate=NetworkGate.Acquire(localPath, NetworkOperation.Downloading)) { if (gate.Failed) return; //The file's hashmap will be stored in the same location with the extension .hashmap - //var hashPath = Path.Combine(FileAgent.FragmentsPath, relativePath + ".hashmap"); + //var hashPath = Path.Combine(FileAgent.CachePath, relativePath + ".hashmap"); - //Retrieve the hashmap from the server - var getHashMap = CloudClient.GetHashMap(container,url); - var downloadTask= getHashMap.ContinueWith(t => + var client = new CloudFilesClient(accountInfo); + var account = cloudFile.Account; + var container = cloudFile.Container; + + if (cloudFile.Content_Type == @"application/directory") + { + if (!Directory.Exists(localPath)) + Directory.CreateDirectory(localPath); + } + else { - var serverHash=t.Result; + //Retrieve the hashmap from the server + var serverHash = await client.GetHashMap(account, container, url); //If it's a small file - return serverHash.Hashes.Count == 1 + if (serverHash.Hashes.Count == 1) //Download it in one go - ? DownloadEntireFile(container, relativeUrl, localPath) + await + DownloadEntireFileAsync(accountInfo, client, cloudFile, relativeUrl, localPath, serverHash); //Otherwise download it block by block - : DownloadWithBlocks(container, relativeUrl, localPath, serverHash); - }); - - - - //Retrieve the object's metadata - var getInfo = downloadTask.ContinueWith(t => - CloudClient.GetObjectInfo(container, url)); - //And store it - var storeInfo = getInfo.ContinueWith(t => - StatusKeeper.StoreInfo(localPath, t.Result)); + else + await DownloadWithBlocks(accountInfo, client, cloudFile, relativeUrl, localPath, serverHash); - storeInfo.Wait(); - StatusNotification.NotifyChangedFile(localPath); + if (cloudFile.AllowedTo == "read") + { + var attributes = File.GetAttributes(localPath); + File.SetAttributes(localPath, attributes | FileAttributes.ReadOnly); + } + } + //Now we can store the object's metadata without worrying about ghost status entries + StatusKeeper.StoreInfo(localPath, cloudFile); + } } //Download a small file with a single GET operation - private Task DownloadEntireFile(string container, Uri relativeUrl, string localPath) + private async Task DownloadEntireFileAsync(AccountInfo accountInfo, CloudFilesClient client, ObjectInfo cloudFile, Uri relativeUrl, string filePath,TreeHash serverHash) { - if (String.IsNullOrWhiteSpace(container)) - throw new ArgumentNullException("container"); + if (client == null) + throw new ArgumentNullException("client"); + if (cloudFile==null) + throw new ArgumentNullException("cloudFile"); if (relativeUrl == null) throw new ArgumentNullException("relativeUrl"); - if (String.IsNullOrWhiteSpace(localPath)) - throw new ArgumentNullException("localPath"); - if (!Path.IsPathRooted(localPath)) - throw new ArgumentException("The localPath must be rooted", "localPath"); + if (String.IsNullOrWhiteSpace(filePath)) + throw new ArgumentNullException("filePath"); + if (!Path.IsPathRooted(filePath)) + throw new ArgumentException("The localPath must be rooted", "filePath"); Contract.EndContractBlock(); + var localPath = Pithos.Interfaces.FileInfoExtensions.GetProperFilePathCapitalization(filePath); + //If the file already exists + if (File.Exists(localPath)) + { + //First check with MD5 as this is a small file + var localMD5 = Signature.CalculateMD5(localPath); + var cloudHash=serverHash.TopHash.ToHashString(); + if (localMD5==cloudHash) + return; + //Then check with a treehash + var localTreeHash = Signature.CalculateTreeHash(localPath, serverHash.BlockSize, serverHash.BlockHash); + var localHash = localTreeHash.TopHash.ToHashString(); + if (localHash==cloudHash) + return; + } + + var fileAgent = GetFileAgent(accountInfo); //Calculate the relative file path for the new file var relativePath = relativeUrl.RelativeUriToFilePath(); //The file will be stored in a temporary location while downloading with an extension .download - var tempPath = Path.Combine(FileAgent.FragmentsPath, relativePath + ".download"); + var tempPath = Path.Combine(fileAgent.CachePath, relativePath + ".download"); //Make sure the target folder exists. DownloadFileTask will not create the folder - var directoryPath = Path.GetDirectoryName(tempPath); - if (!Directory.Exists(directoryPath)) - Directory.CreateDirectory(directoryPath); + var tempFolder = Path.GetDirectoryName(tempPath); + if (!Directory.Exists(tempFolder)) + Directory.CreateDirectory(tempFolder); //Download the object to the temporary location - var getObject = CloudClient.GetObject(container, relativeUrl.ToString(), tempPath).ContinueWith(t => - { - //And move it to its actual location once downloading is finished - if (File.Exists(localPath)) - File.Replace(tempPath,localPath,null,true); - else - File.Move(tempPath,localPath); - }); - return getObject; + await client.GetObject(cloudFile.Account, cloudFile.Container, relativeUrl.ToString(), tempPath); + + //Create the local folder if it doesn't exist (necessary for shared objects) + var localFolder = Path.GetDirectoryName(localPath); + if (!Directory.Exists(localFolder)) + Directory.CreateDirectory(localFolder); + //And move it to its actual location once downloading is finished + if (File.Exists(localPath)) + File.Replace(tempPath,localPath,null,true); + else + File.Move(tempPath,localPath); + //Notify listeners that a local file has changed + StatusNotification.NotifyChangedFile(localPath); + + } - public Task DownloadWithBlocks(string container,Uri relativeUrl, string localPath,TreeHash serverHash) + //Download a file asynchronously using blocks + public async Task DownloadWithBlocks(AccountInfo accountInfo, CloudFilesClient client, ObjectInfo cloudFile, Uri relativeUrl, string filePath, TreeHash serverHash) { - if (String.IsNullOrWhiteSpace(container)) - throw new ArgumentNullException("container"); + if (client == null) + throw new ArgumentNullException("client"); + if (cloudFile == null) + throw new ArgumentNullException("cloudFile"); if (relativeUrl == null) throw new ArgumentNullException("relativeUrl"); - if (String.IsNullOrWhiteSpace(localPath)) - throw new ArgumentNullException("localPath"); - if (!Path.IsPathRooted(localPath)) - throw new ArgumentException("The localPath must be rooted", "localPath"); - if(serverHash==null) + if (String.IsNullOrWhiteSpace(filePath)) + throw new ArgumentNullException("filePath"); + if (!Path.IsPathRooted(filePath)) + throw new ArgumentException("The filePath must be rooted", "filePath"); + if (serverHash == null) throw new ArgumentNullException("serverHash"); Contract.EndContractBlock(); - + + var fileAgent = GetFileAgent(accountInfo); + var localPath = Interfaces.FileInfoExtensions.GetProperFilePathCapitalization(filePath); + //Calculate the relative file path for the new file var relativePath = relativeUrl.RelativeUriToFilePath(); - //The file will be stored in a temporary location while downloading with an extension .download - var tempPath = Path.Combine(FileAgent.FragmentsPath, relativePath + ".download"); - var directoryPath = Path.GetDirectoryName(tempPath); - if (!Directory.Exists(directoryPath)) - Directory.CreateDirectory(directoryPath); - - //If the local file exists we should make a copy of it to the - //fragments folder, unless a newer temp copy already exists, which - //means there is an interrupted download - if (ShouldCopy(localPath, tempPath)) - File.Copy(localPath, tempPath, true); + var blockUpdater = new BlockUpdater(fileAgent.CachePath, localPath, relativePath, serverHash); - //Set the size of the file to the size specified in the treehash - //This will also create an empty file if the file doesn't exist - SetFileSize(tempPath, serverHash.Bytes); - - return Task.Factory.StartNew(() => - { - //Calculate the temp file's treehash - var treeHash = Signature.CalculateTreeHashAsync(tempPath, this.BlockSize,BlockHash).Result; + + + //Calculate the file's treehash + var treeHash = await Signature.CalculateTreeHashAsync(localPath, serverHash.BlockSize, serverHash.BlockHash); - //And compare it with the server's hash - var upHashes = serverHash.GetHashesAsStrings(); - var localHashes = treeHash.HashDictionary; - for (int i = 0; i < upHashes.Length; i++) + //And compare it with the server's hash + var upHashes = serverHash.GetHashesAsStrings(); + var localHashes = treeHash.HashDictionary; + for (int i = 0; i < upHashes.Length; i++) + { + //For every non-matching hash + var upHash = upHashes[i]; + if (!localHashes.ContainsKey(upHash)) { - //For every non-matching hash - if (!localHashes.ContainsKey(upHashes[i])) + if (blockUpdater.UseOrphan(i, upHash)) { - Trace.TraceInformation("[BLOCK GET] START {0} of {1} for {2}",i,upHashes.Length,localPath); - var start = i*BlockSize; - long? end = null; - if (i < upHashes.Length - 1 ) - end= ((i + 1)*BlockSize) ; + Log.InfoFormat("[BLOCK GET] ORPHAN FOUND for {0} of {1} for {2}", i, upHashes.Length, localPath); + continue; + } + Log.InfoFormat("[BLOCK GET] START {0} of {1} for {2}", i, upHashes.Length, localPath); + var start = i*serverHash.BlockSize; + //To download the last block just pass a null for the end of the range + long? end = null; + if (i < upHashes.Length - 1 ) + end= ((i + 1)*serverHash.BlockSize) ; - //Get its block - var blockTask = CloudClient.GetBlock(container, relativeUrl, - start, end); + //Download the missing block + var block = await client.GetBlock(cloudFile.Account, cloudFile.Container, relativeUrl, start, end); + + //and store it + blockUpdater.StoreBlock(i, block); - blockTask.ContinueWith(b => - { - //And apply it to the temp file - var buffer = b.Result; - var stream =FileAsync.OpenWrite(tempPath); - stream.Seek(start,SeekOrigin.Begin); - return stream.WriteAsync(buffer, 0, buffer.Length) - .ContinueWith(s => stream.Close()); - - }).Unwrap() - .Wait(); - Trace.TraceInformation("[BLOCK GET] FINISH {0} of {1} for {2}", i, upHashes.Length, localPath); - } + Log.InfoFormat("[BLOCK GET] FINISH {0} of {1} for {2}", i, upHashes.Length, localPath); } - + } - //Replace the existing file with the temp - if (File.Exists(localPath)) - File.Replace(tempPath, localPath, null, true); - else - File.Move(tempPath, localPath); - Trace.TraceInformation("[BLOCK GET] COMPLETE {0}", localPath); - }); + //Want to avoid notifications if no changes were made + var hasChanges = blockUpdater.HasBlocks; + blockUpdater.Commit(); + + if (hasChanges) + //Notify listeners that a local file has changed + StatusNotification.NotifyChangedFile(localPath); + + Log.InfoFormat("[BLOCK GET] COMPLETE {0}", localPath); } - //Change the file's size, possibly truncating or adding to it - private static void SetFileSize(string filePath, long fileSize) + + private async Task UploadCloudFile(CloudAction action) { - if (String.IsNullOrWhiteSpace(filePath)) - throw new ArgumentNullException("filePath"); - if (!Path.IsPathRooted(filePath)) - throw new ArgumentException("The filePath must be rooted", "filePath"); - if (fileSize<0) - throw new ArgumentOutOfRangeException("fileSize"); + if (action == null) + throw new ArgumentNullException("action"); Contract.EndContractBlock(); - using (var stream = File.Open(filePath, FileMode.OpenOrCreate, FileAccess.Write)) + try { - stream.SetLength(fileSize); - } - } + var accountInfo = action.AccountInfo; - //Check whether we should copy the local file to a temp path - private static bool ShouldCopy(string localPath, string tempPath) - { - //No need to copy if there is no file - if (!File.Exists(localPath)) - return false; + var fileInfo = action.LocalFile; - //If there is no temp file, go ahead and copy - if (!File.Exists(tempPath)) - return true; + if (fileInfo.Extension.Equals("ignore", StringComparison.InvariantCultureIgnoreCase)) + return; + + var relativePath = fileInfo.AsRelativeTo(accountInfo.AccountPath); + if (relativePath.StartsWith(FolderConstants.OthersFolder)) + { + var parts = relativePath.Split('\\'); + var accountName = parts[1]; + var oldName = accountInfo.UserName; + var absoluteUri = accountInfo.StorageUri.AbsoluteUri; + var nameIndex = absoluteUri.IndexOf(oldName); + var root = absoluteUri.Substring(0, nameIndex); + + accountInfo = new AccountInfo + { + UserName = accountName, + AccountPath = Path.Combine(accountInfo.AccountPath, parts[0], parts[1]), + StorageUri = new Uri(root + accountName), + BlockHash = accountInfo.BlockHash, + BlockSize = accountInfo.BlockSize, + Token = accountInfo.Token + }; + } - //If there is a temp file and is newer than the actual file, don't copy - var localLastWrite = File.GetLastWriteTime(localPath); - var tempLastWrite = File.GetLastWriteTime(tempPath); - - //This could mean there is an interrupted download in progress - return (tempLastWrite < localLastWrite); - } - private void UploadCloudFile(FileInfo fileInfo, string hash,string topHash) - { - if (fileInfo==null) - throw new ArgumentNullException("fileInfo"); - if (String.IsNullOrWhiteSpace(hash)) - throw new ArgumentNullException("hash"); - Contract.EndContractBlock(); + var fullFileName = fileInfo.GetProperCapitalization(); + using (var gate = NetworkGate.Acquire(fullFileName, NetworkOperation.Uploading)) + { + //Abort if the file is already being uploaded or downloaded + if (gate.Failed) + return; - if (fileInfo.Extension.Equals("ignore", StringComparison.InvariantCultureIgnoreCase)) - return; - - var url = fileInfo.AsRelativeUrlTo(FileAgent.RootPath); + var cloudFile = action.CloudFile; + var account = cloudFile.Account ?? accountInfo.UserName; - var fullFileName = fileInfo.FullName; - using(var gate=NetworkGate.Acquire(fullFileName,NetworkOperation.Uploading)) - { - //Abort if the file is already being uploaded or downloaded - if (gate.Failed) - return; + var client = new CloudFilesClient(accountInfo); + //Even if GetObjectInfo times out, we can proceed with the upload + var info = client.GetObjectInfo(account, cloudFile.Container, cloudFile.Name); + //If this is a read-only file, do not upload changes + if (info.AllowedTo == "read") + return; + + //TODO: Check how a directory hash is calculated -> All dirs seem to have the same hash + if (fileInfo is DirectoryInfo) + { + //If the directory doesn't exist the Hash property will be empty + if (String.IsNullOrWhiteSpace(info.Hash)) + //Go on and create the directory + await client.PutObject(account, cloudFile.Container, cloudFile.Name, fullFileName, String.Empty, "application/directory"); + } + else + { - //Even if GetObjectInfo times out, we can proceed with the upload - var info = CloudClient.GetObjectInfo(PithosContainer, url); + var cloudHash = info.Hash.ToLower(); - //If the file hashes match, abort the upload - if (hash.Equals(info.Hash, StringComparison.InvariantCultureIgnoreCase) || - topHash.Equals(info.Hash, StringComparison.InvariantCultureIgnoreCase)) - { - //but store any metadata changes - this.StatusKeeper.StoreInfo(fullFileName, info); - Trace.TraceInformation("Skip upload of {0}, hashes match", fullFileName); - return; - } + var hash = action.LocalHash.Value; + var topHash = action.TopHash.Value; - //Mark the file as modified while we upload it - var setStatus = Task.Factory.StartNew(() => - StatusKeeper.SetFileOverlayStatus(fullFileName, FileOverlayStatus.Modified)); - //And then upload it + //If the file hashes match, abort the upload + if (hash == cloudHash || topHash == cloudHash) + { + //but store any metadata changes + StatusKeeper.StoreInfo(fullFileName, info); + Log.InfoFormat("Skip upload of {0}, hashes match", fullFileName); + return; + } - //If the file is larger than the block size, try a hashmap PUT - if (fileInfo.Length > BlockSize ) - { - //To upload using a hashmap - //First, calculate the tree hash - var treeHash = Signature.CalculateTreeHashAsync(fileInfo.FullName, BlockSize, BlockHash); - - var putHashMap = setStatus.ContinueWith(t=> - UploadWithHashMap(fileInfo,url,treeHash)); - - putHashMap.Wait(); - } - else - { - //Otherwise do a regular PUT - var put = setStatus.ContinueWith(t => - CloudClient.PutObject(PithosContainer,url,fullFileName,hash)); - put.Wait(); + + //Mark the file as modified while we upload it + StatusKeeper.SetFileOverlayStatus(fullFileName, FileOverlayStatus.Modified); + //And then upload it + + //Upload even small files using the Hashmap. The server may already contain + //the relevant block + + //First, calculate the tree hash + var treeHash = await Signature.CalculateTreeHashAsync(fullFileName, accountInfo.BlockSize, + accountInfo.BlockHash); + + await UploadWithHashMap(accountInfo, cloudFile, fileInfo as FileInfo, cloudFile.Name, treeHash); + } + //If everything succeeds, change the file and overlay status to normal + StatusKeeper.SetFileState(fullFileName, FileStatus.Unchanged, FileOverlayStatus.Normal); } - //If everything succeeds, change the file and overlay status to normal - this.StatusKeeper.SetFileState(fullFileName, FileStatus.Unchanged, FileOverlayStatus.Normal); + //Notify the Shell to update the overlays + NativeMethods.RaiseChangeNotification(fullFileName); + StatusNotification.NotifyChangedFile(fullFileName); + } + catch (AggregateException ex) + { + var exc = ex.InnerException as WebException; + if (exc == null) + throw ex.InnerException; + if (HandleUploadWebException(action, exc)) + return; + throw; + } + catch (WebException ex) + { + if (HandleUploadWebException(action, ex)) + return; + throw; + } + catch (Exception ex) + { + Log.Error("Unexpected error while uploading file", ex); + throw; + } + + } + + private bool IsDeletedFile(CloudAction action) + { + var key = GetFileKey(action.CloudFile); + DateTime entryDate; + if (_deletedFiles.TryGetValue(key, out entryDate)) + { + //If the delete entry was created after this action, abort the action + if (entryDate > action.Created) + return true; + //Otherwise, remove the stale entry + _deletedFiles.TryRemove(key, out entryDate); + } + return false; + } + + private bool HandleUploadWebException(CloudAction action, WebException exc) + { + var response = exc.Response as HttpWebResponse; + if (response == null) + throw exc; + if (response.StatusCode == HttpStatusCode.Unauthorized) + { + Log.Error("Not allowed to upload file", exc); + var message = String.Format("Not allowed to uplad file {0}", action.LocalFile.FullName); + StatusKeeper.SetFileState(action.LocalFile.FullName, FileStatus.Unchanged, FileOverlayStatus.Normal); + StatusNotification.NotifyChange(message, TraceLevel.Warning); + return true; } - //Notify the Shell to update the overlays - NativeMethods.RaiseChangeNotification(fullFileName); - StatusNotification.NotifyChangedFile(fullFileName); + return false; } - public void UploadWithHashMap(FileInfo fileInfo,string url,Task treeHash) + public async Task UploadWithHashMap(AccountInfo accountInfo,ObjectInfo cloudFile,FileInfo fileInfo,string url,TreeHash treeHash) { - var fullFileName = fileInfo.FullName; + if (accountInfo == null) + throw new ArgumentNullException("accountInfo"); + if (cloudFile==null) + throw new ArgumentNullException("cloudFile"); + if (fileInfo == null) + throw new ArgumentNullException("fileInfo"); + if (String.IsNullOrWhiteSpace(url)) + throw new ArgumentNullException(url); + if (treeHash==null) + throw new ArgumentNullException("treeHash"); + if (String.IsNullOrWhiteSpace(cloudFile.Container) ) + throw new ArgumentException("Invalid container","cloudFile"); + Contract.EndContractBlock(); - //Send the hashmap to the server - var hashPut = CloudClient.PutHashMap(PithosContainer, url, treeHash.Result); - var missingHashes = hashPut.Result; - if (missingHashes.Count == 0) - return; + var fullFileName = fileInfo.GetProperCapitalization(); + + var account = cloudFile.Account ?? accountInfo.UserName; + var container = cloudFile.Container ; - var buffer = new byte[BlockSize]; - foreach (var missingHash in missingHashes) + var client = new CloudFilesClient(accountInfo); + //Send the hashmap to the server + var missingHashes = await client.PutHashMap(account, container, url, treeHash); + //If the server returns no missing hashes, we are done + while (missingHashes.Count > 0) { - int blockIndex = -1; - try + + var buffer = new byte[accountInfo.BlockSize]; + foreach (var missingHash in missingHashes) { //Find the proper block - blockIndex = treeHash.Result.HashDictionary[missingHash]; - var offset = blockIndex*BlockSize; + var blockIndex = treeHash.HashDictionary[missingHash]; + var offset = blockIndex*accountInfo.BlockSize; - var read = fileInfo.Read(buffer, offset, BlockSize); - if (read > 0) + var read = fileInfo.Read(buffer, offset, accountInfo.BlockSize); + + try { - //Copy the actual block data out of the buffer - var data = new byte[read]; - Buffer.BlockCopy(buffer, 0, data, 0, read); - - //And POST them - CloudClient.PostBlock(PithosContainer, data).Wait(); - Trace.TraceInformation("[BLOCK] Block {0} of {1} uploaded", blockIndex, - fullFileName); + //And upload the block + await client.PostBlock(account, container, buffer, 0, read); + Log.InfoFormat("[BLOCK] Block {0} of {1} uploaded", blockIndex, fullFileName); } + catch (Exception exc) + { + Log.ErrorFormat("[ERROR] uploading block {0} of {1}\n{2}", blockIndex, fullFileName, exc); + } + } - catch (Exception exc) - { - Trace.TraceError("[ERROR] uploading block {0} of {1}\n{2}", blockIndex, fullFileName, exc); - } - } - UploadWithHashMap(fileInfo, url, treeHash); - + //Repeat until there are no more missing hashes + missingHashes = await client.PutHashMap(account, container, url, treeHash); + } } + public void AddAccount(AccountInfo accountInfo) + { + if (!_accounts.Contains(accountInfo)) + _accounts.Add(accountInfo); + } }