#region /* ----------------------------------------------------------------------- * * * Copyright 2011-2012 GRNET S.A. All rights reserved. * * Redistribution and use in source and binary forms, with or * without modification, are permitted provided that the following * conditions are met: * * 1. Redistributions of source code must retain the above * copyright notice, this list of conditions and the following * disclaimer. * * 2. Redistributions in binary form must reproduce the above * copyright notice, this list of conditions and the following * disclaimer in the documentation and/or other materials * provided with the distribution. * * * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE * POSSIBILITY OF SUCH DAMAGE. * * The views and conclusions contained in the software and * documentation are those of the authors and should not be * interpreted as representing official policies, either expressed * or implied, of GRNET S.A. * * ----------------------------------------------------------------------- */ #endregion using System; using System.Collections.Generic; using System.ComponentModel.Composition; using System.Diagnostics; using System.Diagnostics.Contracts; using System.IO; using System.Net; using System.Reflection; using System.Threading; using System.Threading.Tasks; using Castle.ActiveRecord; using Pithos.Interfaces; using Pithos.Network; using log4net; namespace Pithos.Core.Agents { [Export] public class NetworkAgent { private static readonly ILog Log = LogManager.GetLogger(MethodBase.GetCurrentMethod().DeclaringType); private Agent _agent; [System.ComponentModel.Composition.Import] private DeleteAgent DeleteAgent { get; set; } [System.ComponentModel.Composition.Import] public IStatusKeeper StatusKeeper { get; set; } private IStatusNotification _statusNotification; public IStatusNotification StatusNotification { get { return _statusNotification; } set { _statusNotification = value; DeleteAgent.StatusNotification = value; Uploader.StatusNotification = value; Downloader.StatusNotification = value; } } [System.ComponentModel.Composition.Import] public IPithosSettings Settings { get; set; } [System.ComponentModel.Composition.Import] public Uploader Uploader { get; set; } [System.ComponentModel.Composition.Import] public Downloader Downloader { get; set; } //The Proceed signals the poll agent that it can proceed with polling. //Essentially it stops the poll agent to give priority to the network agent //Initially the event is signalled because we don't need to pause private readonly AsyncManualResetEvent _proceedEvent = new AsyncManualResetEvent(true); public AsyncManualResetEvent ProceedEvent { get { return _proceedEvent; } } public void Start() { if (_agent != null) return; if (Log.IsDebugEnabled) Log.Debug("Starting Network Agent"); _agent = Agent.Start(inbox => { Action loop = null; loop = () => { DeleteAgent.ProceedEvent.Wait(); var message = inbox.Receive(); var process=message.Then(Process,inbox.CancellationToken); inbox.LoopAsync(process, loop); }; loop(); }); } private async Task Process(CloudAction action) { if (action == null) throw new ArgumentNullException("action"); if (action.AccountInfo==null) throw new ArgumentException("The action.AccountInfo is empty","action"); Contract.EndContractBlock(); using (ThreadContext.Stacks["Operation"].Push(action.ToString())) { var cloudFile = action.CloudFile; var downloadPath = action.GetDownloadPath(); try { StatusNotification.SetPithosStatus(PithosStatus.LocalSyncing,"Processing"); _proceedEvent.Reset(); var accountInfo = action.AccountInfo; if (action.Action == CloudActionType.DeleteCloud) { //Redirect deletes to the delete agent DeleteAgent.Post((CloudDeleteAction)action); } if (DeleteAgent.IsDeletedFile(action)) { //Clear the status of already deleted files to avoid reprocessing if (action.LocalFile != null) StatusKeeper.ClearFileStatus(action.LocalFile.FullName); } else { switch (action.Action) { case CloudActionType.UploadUnconditional: //Abort if the file was deleted before we reached this point await Uploader.UploadCloudFile(action); break; case CloudActionType.DownloadUnconditional: await Downloader.DownloadCloudFile(accountInfo, cloudFile, downloadPath); break; case CloudActionType.RenameCloud: var moveAction = (CloudMoveAction)action; RenameCloudFile(accountInfo, moveAction); break; case CloudActionType.RenameLocal: RenameLocalFile(accountInfo, action); break; case CloudActionType.MustSynch: if (!File.Exists(downloadPath) && !Directory.Exists(downloadPath)) { await Downloader.DownloadCloudFile(accountInfo, cloudFile, downloadPath); } else { await SyncFiles(accountInfo, action); } break; } } Log.InfoFormat("End Processing {0}:{1}->{2}", action.Action, action.LocalFile, action.CloudFile.Name); } /* catch (WebException exc) { Log.ErrorFormat("[WEB ERROR] {0} : {1} -> {2} due to exception\r\n{3}", action.Action, action.LocalFile, action.CloudFile, exc); //Actions that resulted in server errors should be retried var response = exc.Response as HttpWebResponse; if (response != null && response.StatusCode >= HttpStatusCode.InternalServerError) { _agent.Post(action); Log.WarnFormat("[REQUEUE] {0} : {1} -> {2}", action.Action, action.LocalFile, action.CloudFile); } } */ catch (OperationCanceledException) { throw; } catch (DirectoryNotFoundException) { Log.ErrorFormat("{0} : {1} -> {2} failed because the directory was not found.\n Rescheduling a delete", action.Action, action.LocalFile, action.CloudFile); //Post a delete action for the missing file Post(new CloudDeleteAction(action)); } catch (FileNotFoundException) { Log.ErrorFormat("{0} : {1} -> {2} failed because the file was not found.\n Rescheduling a delete", action.Action, action.LocalFile, action.CloudFile); //Post a delete action for the missing file Post(new CloudDeleteAction(action)); } catch (Exception exc) { Log.ErrorFormat("[REQUEUE] {0} : {1} -> {2} due to exception\r\n{3}", action.Action, action.LocalFile, action.CloudFile, exc); _agent.Post(action); } finally { if (_agent.IsEmpty) _proceedEvent.Set(); UpdateStatus(PithosStatus.LocalComplete); } } } private void UpdateStatus(PithosStatus status) { StatusNotification.SetPithosStatus(status); //StatusNotification.Notify(new Notification()); } private void RenameLocalFile(AccountInfo accountInfo, CloudAction action) { if (accountInfo == null) throw new ArgumentNullException("accountInfo"); if (action == null) throw new ArgumentNullException("action"); if (action.LocalFile == null) throw new ArgumentException("The action's local file is not specified", "action"); if (!Path.IsPathRooted(action.LocalFile.FullName)) throw new ArgumentException("The action's local file path must be absolute", "action"); if (action.CloudFile == null) throw new ArgumentException("The action's cloud file is not specified", "action"); Contract.EndContractBlock(); using (ThreadContext.Stacks["Operation"].Push("RenameLocalFile")) { //We assume that the local file already exists, otherwise the poll agent //would have issued a download request var currentInfo = action.CloudFile; var previousInfo = action.CloudFile.Previous; var fileAgent = FileAgent.GetFileAgent(accountInfo); var previousRelativepath = previousInfo.RelativeUrlToFilePath(accountInfo.UserName); var previousFile = fileAgent.GetFileSystemInfo(previousRelativepath); //In every case we need to move the local file first MoveLocalFile(accountInfo, previousFile, fileAgent, currentInfo); } } private void MoveLocalFile(AccountInfo accountInfo, FileSystemInfo previousFile, FileAgent fileAgent, ObjectInfo currentInfo) { var currentRelativepath = currentInfo.RelativeUrlToFilePath(accountInfo.UserName); var newPath = Path.Combine(fileAgent.RootPath, currentRelativepath); var isFile= (previousFile is FileInfo); var previousFullPath = isFile? FileInfoExtensions.GetProperFilePathCapitalization(previousFile.FullName): FileInfoExtensions.GetProperDirectoryCapitalization(previousFile.FullName); using (NetworkGate.Acquire(previousFullPath, NetworkOperation.Renaming)) using (NetworkGate.Acquire(newPath,NetworkOperation.Renaming)) using (new SessionScope(FlushAction.Auto)) { if (isFile) (previousFile as FileInfo).MoveTo(newPath); else { (previousFile as DirectoryInfo).MoveTo(newPath); } var state = StatusKeeper.GetStateByFilePath(previousFullPath); state.FilePath = newPath; state.SaveCopy(); StatusKeeper.SetFileState(previousFullPath,FileStatus.Deleted,FileOverlayStatus.Deleted, "Deleted"); } } private async Task SyncFiles(AccountInfo accountInfo,CloudAction action) { if (accountInfo == null) throw new ArgumentNullException("accountInfo"); if (action==null) throw new ArgumentNullException("action"); if (action.LocalFile==null) throw new ArgumentException("The action's local file is not specified","action"); if (!Path.IsPathRooted(action.LocalFile.FullName)) throw new ArgumentException("The action's local file path must be absolute","action"); if (action.CloudFile== null) throw new ArgumentException("The action's cloud file is not specified", "action"); Contract.EndContractBlock(); using (ThreadContext.Stacks["Operation"].Push("SyncFiles")) { //var localFile = action.LocalFile; var cloudFile = action.CloudFile; var downloadPath = action.LocalFile.GetProperCapitalization(); var cloudHash = cloudFile.Hash.ToLower(); var previousCloudHash = cloudFile.PreviousHash.ToLower(); var localHash = action.TreeHash.Value.TopHash.ToHashString();// LocalHash.Value.ToLower(); //var topHash = action.TopHash.Value.ToLower(); //At this point we know that an object has changed on the server and that a local //file already exists. We need to decide whether the file has only changed on //the server or there is a conflicting change on the client. // //If the hashes match, we are done if (cloudFile != ObjectInfo.Empty && cloudHash == localHash) { Log.InfoFormat("Skipping {0}, hashes match", downloadPath); return; } //The hashes DON'T match. We need to sync // If the previous tophash matches the local tophash, the file was only changed on the server. if (localHash == previousCloudHash) { await Downloader.DownloadCloudFile(accountInfo, cloudFile, downloadPath); } else { //If the previous and local hash don't match, there was a local conflict //that was not uploaded to the server. We have a conflict ReportConflictForMismatch(downloadPath); } } } private void ReportConflictForMismatch(string downloadPath) { if (String.IsNullOrWhiteSpace(downloadPath)) throw new ArgumentNullException("downloadPath"); Contract.EndContractBlock(); StatusKeeper.SetFileOverlayStatus(downloadPath, FileOverlayStatus.Conflict); UpdateStatus(PithosStatus.HasConflicts); var message = String.Format("Conflict detected for file {0}", downloadPath); Log.Warn(message); StatusNotification.NotifyChange(message, TraceLevel.Warning); } public void Post(CloudAction cloudAction) { if (cloudAction == null) throw new ArgumentNullException("cloudAction"); if (cloudAction.AccountInfo==null) throw new ArgumentException("The CloudAction.AccountInfo is empty","cloudAction"); Contract.EndContractBlock(); DeleteAgent.ProceedEvent.Wait(); /* //If the action targets a local file, add a treehash calculation if (!(cloudAction is CloudDeleteAction) && cloudAction.LocalFile as FileInfo != null) { var accountInfo = cloudAction.AccountInfo; var localFile = (FileInfo) cloudAction.LocalFile; if (localFile.Length > accountInfo.BlockSize) cloudAction.TopHash = new Lazy(() => Signature.CalculateTreeHashAsync(localFile, accountInfo.BlockSize, accountInfo.BlockHash, Settings.HashingParallelism).Result .TopHash.ToHashString()); else { cloudAction.TopHash = new Lazy(() => cloudAction.LocalHash.Value); } } else { //The hash for a directory is the empty string cloudAction.TopHash = new Lazy(() => String.Empty); } */ if (cloudAction is CloudDeleteAction) DeleteAgent.Post((CloudDeleteAction)cloudAction); else _agent.Post(cloudAction); } public IEnumerable GetEnumerable() { return _agent.GetEnumerable(); } public Task GetDeleteAwaiter() { return DeleteAgent.ProceedEvent.WaitAsync(); } public CancellationToken CancellationToken { get { return _agent.CancellationToken; } } private void RenameCloudFile(AccountInfo accountInfo,CloudMoveAction action) { if (accountInfo==null) throw new ArgumentNullException("accountInfo"); if (action==null) throw new ArgumentNullException("action"); if (action.CloudFile==null) throw new ArgumentException("CloudFile","action"); if (action.LocalFile==null) throw new ArgumentException("LocalFile","action"); if (action.OldLocalFile==null) throw new ArgumentException("OldLocalFile","action"); if (action.OldCloudFile==null) throw new ArgumentException("OldCloudFile","action"); Contract.EndContractBlock(); using (ThreadContext.Stacks["Operation"].Push("RenameCloudFile")) { var newFilePath = action.LocalFile.FullName; //How do we handle concurrent renames and deletes/uploads/downloads? //* A conflicting upload means that a file was renamed before it had a chance to finish uploading // This should never happen as the network agent executes only one action at a time //* A conflicting download means that the file was modified on the cloud. While we can go on and complete // the rename, there may be a problem if the file is downloaded in blocks, as subsequent block requests for the // same name will fail. // This should never happen as the network agent executes only one action at a time. //* A conflicting delete can happen if the rename was followed by a delete action that didn't have the chance // to remove the rename from the queue. // We can probably ignore this case. It will result in an error which should be ignored //The local file is already renamed StatusKeeper.SetFileOverlayStatus(newFilePath, FileOverlayStatus.Modified); var account = action.CloudFile.Account ?? accountInfo.UserName; var container = action.CloudFile.Container; var client = new CloudFilesClient(accountInfo); //TODO: What code is returned when the source file doesn't exist? client.MoveObject(account, container, action.OldCloudFile.Name, container, action.CloudFile.Name); StatusKeeper.SetFileStatus(newFilePath, FileStatus.Unchanged); StatusKeeper.SetFileOverlayStatus(newFilePath, FileOverlayStatus.Normal); NativeMethods.RaiseChangeNotification(newFilePath); } } } }