X-Git-Url: https://code.grnet.gr/git/pithos-ms-client/blobdiff_plain/5120f3cb5e1678e7a142e00f463b502427f60c88..e394ef0fa66d41c247eeebef35ccf6228024665e:/trunk/Pithos.Core/Agents/WorkflowAgent.cs diff --git a/trunk/Pithos.Core/Agents/WorkflowAgent.cs b/trunk/Pithos.Core/Agents/WorkflowAgent.cs index 3dba9a5..7b796ec 100644 --- a/trunk/Pithos.Core/Agents/WorkflowAgent.cs +++ b/trunk/Pithos.Core/Agents/WorkflowAgent.cs @@ -1,13 +1,57 @@ -using System; +#region +/* ----------------------------------------------------------------------- + * + * + * Copyright 2011-2012 GRNET S.A. All rights reserved. + * + * Redistribution and use in source and binary forms, with or + * without modification, are permitted provided that the following + * conditions are met: + * + * 1. Redistributions of source code must retain the above + * copyright notice, this list of conditions and the following + * disclaimer. + * + * 2. Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following + * disclaimer in the documentation and/or other materials + * provided with the distribution. + * + * + * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS + * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED + * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR + * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR + * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF + * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED + * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT + * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN + * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + * + * The views and conclusions contained in the software and + * documentation are those of the authors and should not be + * interpreted as representing official policies, either expressed + * or implied, of GRNET S.A. + * + * ----------------------------------------------------------------------- + */ +#endregion +using System; using System.Collections.Generic; using System.ComponentModel.Composition; using System.Diagnostics; using System.Diagnostics.Contracts; using System.IO; using System.Linq; +using System.Reflection; using System.Text; using System.Threading.Tasks; +using Castle.ActiveRecord; using Pithos.Interfaces; +using Pithos.Network; using log4net; namespace Pithos.Core.Agents @@ -15,22 +59,22 @@ namespace Pithos.Core.Agents [Export] public class WorkflowAgent { - Agent _agent; + private static readonly ILog Log = LogManager.GetLogger(MethodBase.GetCurrentMethod().DeclaringType); + + readonly Agent _agent; public IStatusNotification StatusNotification { get; set; } - [Import] + [System.ComponentModel.Composition.Import] public IStatusKeeper StatusKeeper { get; set; } - //We should avoid processing files stored in the Fragments folder - //The Full path to the fragments folder is stored in FragmentsPath - public string FragmentsPath { get; set; } - - [Import] + [System.ComponentModel.Composition.Import] public NetworkAgent NetworkAgent { get; set; } - private static readonly ILog Log = LogManager.GetLogger("WorkflowAgent"); + [System.ComponentModel.Composition.Import] + public IPithosSettings Settings { get; set; } + - public void Start() + public WorkflowAgent() { _agent = Agent.Start(inbox => { @@ -48,101 +92,182 @@ namespace Pithos.Core.Agents private Task Process(WorkflowState state) { + var accountInfo = state.AccountInfo; using (log4net.ThreadContext.Stacks["Workflow"].Push("Process")) { - if (Log.IsDebugEnabled) Log.DebugFormat("State {0} {1} {2}", state.FileName,state.Status,state.TriggeringChange); - - if (state.Skip) + try { - if (Log.IsDebugEnabled) Log.DebugFormat("Skipping {0}",state.FileName); - - return CompletedTask.Default; - } - string path = state.Path.ToLower(); - //Bypass deleted files, unless the status is Deleted - if (!File.Exists(path) && state.Status != FileStatus.Deleted) - { - state.Skip = true; - this.StatusKeeper.ClearFileStatus(path); - - if (Log.IsDebugEnabled) Log.DebugFormat("Skipped missing {0}", state.FileName); + if (Log.IsDebugEnabled) + Log.DebugFormat("State {0} {1} {2}", state.FileName, state.Status, state.TriggeringChange); + + if (state.Skip) + { + if (Log.IsDebugEnabled) Log.DebugFormat("Skipping {0}", state.FileName); + + return CompletedTask.Default; + } + + var info = Directory.Exists(state.Path) ? (FileSystemInfo)new DirectoryInfo(state.Path) : new FileInfo(state.Path); + + //Bypass deleted files, unless the status is Deleted + if (!info.Exists && state.Status != FileStatus.Deleted) + { + state.Skip = true; + this.StatusKeeper.ClearFileStatus(state.Path); + + if (Log.IsDebugEnabled) Log.DebugFormat("Skipped missing {0}", state.FileName); + + return CompletedTask.Default; + } + + using (new SessionScope(FlushAction.Never)) + { + + var fileState = StatusKeeper.GetStateByFilePath(state.Path); + + switch (state.Status) + { + case FileStatus.Created: + case FileStatus.Modified: + NetworkAgent.Post(new CloudUploadAction(accountInfo, info, fileState, + accountInfo.BlockSize, + accountInfo.BlockHash)); + break; + case FileStatus.Deleted: + DeleteChildObjects(state, fileState); + NetworkAgent.Post(new CloudDeleteAction(accountInfo, info, fileState)); + break; + case FileStatus.Renamed: + if (state.OldPath == null) + { + //We reach this point only if the app closed before propagating a rename to the server + Log.WarnFormat("Unfinished rename [{0}]",state.Path); + StatusKeeper.SetFileState(state.Path,FileStatus.Conflict,FileOverlayStatus.Conflict); + break; + } + FileSystemInfo oldInfo = Directory.Exists(state.OldPath) + ? (FileSystemInfo) new DirectoryInfo(state.OldPath) + : new FileInfo(state.OldPath); + FileSystemInfo newInfo = Directory.Exists(state.Path) + ? (FileSystemInfo) new DirectoryInfo(state.Path) + : new FileInfo(state.Path); + NetworkAgent.Post(new CloudMoveAction(accountInfo, CloudActionType.RenameCloud, + oldInfo, + newInfo)); + //TODO: Do I have to move children as well or will Pithos handle this? + //Need to find all children of the OLD filepath + //MoveChildObjects(state); + break; + } + } return CompletedTask.Default; } - var fileState = FileState.FindByFilePath(path); - var blockHash = NetworkAgent.BlockHash; - var blockSize = NetworkAgent.BlockSize; - var info = new FileInfo(path); - - switch (state.Status) + catch (Exception ex) { - case FileStatus.Created: - case FileStatus.Modified: - NetworkAgent.Post(new CloudUploadAction(info, fileState, blockSize, blockHash)); - break; - case FileStatus.Deleted: - string fileName = info.AsRelativeUrlTo(NetworkAgent.FileAgent.RootPath); - NetworkAgent.Post(new CloudDeleteAction(fileName, fileState)); - break; - case FileStatus.Renamed: - NetworkAgent.Post(new CloudMoveAction(CloudActionType.RenameCloud, state.OldFileName, - state.OldPath, state.FileName, state.Path)); - break; + Log.Error(ex.ToString()); + throw; } + } + } + - return CompletedTask.Default; + private void DeleteChildObjects(WorkflowState state, FileState fileState) + { + if (fileState != null) + { + var children = StatusKeeper.GetChildren(fileState); + foreach (var child in children) + { + var childInfo = child.IsFolder + ? (FileSystemInfo) new DirectoryInfo(child.FilePath) + : new FileInfo(child.FilePath); + NetworkAgent.Post(new CloudDeleteAction(state.AccountInfo, childInfo, child)); + } } } + /*private void MoveChildObjects(WorkflowState state) + { + var oldFileState = StatusKeeper.GetStateByFilePath(state.OldPath); + if (oldFileState != null) + { + var children = StatusKeeper.GetChildren(oldFileState); + foreach (var child in children) + { + var newPath = Path.Combine(state.Path, child.FilePath.Substring(state.OldPath.Length+1)); + + var oldMoveInfo = child.IsFolder + ? (FileSystemInfo) new DirectoryInfo(child.FilePath) + : new FileInfo(child.FilePath); + var newMoveInfo = child.IsFolder + ? (FileSystemInfo) new DirectoryInfo(newPath) + : new FileInfo(newPath); + //The new file path will be created by trimming the old root path + //and substituting the new root path + + NetworkAgent.Post(new CloudMoveAction(state.AccountInfo, CloudActionType.RenameCloud, + oldMoveInfo, newMoveInfo)); + } + } + }*/ - public void RestartInterruptedFiles() + //Starts interrupted files for a specific account + public void RestartInterruptedFiles(AccountInfo accountInfo) { - StatusNotification.NotifyChange("Restart processing interrupted files", TraceLevel.Verbose); - using (log4net.ThreadContext.Stacks["Workflow"].Push("Restart")) + using (log4net.ThreadContext.Stacks["Operation"].Push("RestartInterrupted")) { if (Log.IsDebugEnabled) Log.Debug("Starting interrupted files"); + + var cachePath = Path.Combine(accountInfo.AccountPath, FolderConstants.CacheFolder) + .ToLower(); + + + - var pendingEntries = from state in FileState.Queryable + var account = accountInfo; + var pendingEntries = (from state in FileState.Queryable where state.FileStatus != FileStatus.Unchanged && - !state.FilePath.StartsWith(FragmentsPath.ToLower()) && - !state.FilePath.EndsWith(".ignore") - select state; + !state.FilePath.StartsWith(cachePath) && + !state.FilePath.EndsWith(".ignore") && + state.FilePath.StartsWith(account.AccountPath) + select state).ToList(); + if (pendingEntries.Count>0) + StatusNotification.NotifyChange("Restart processing interrupted files", TraceLevel.Verbose); + + var pendingStates = pendingEntries + .Select(state => new WorkflowState(account, state)) + .ToList(); + if (Log.IsDebugEnabled) - Log.DebugFormat("Found {0} interrupted files",pendingEntries.Count()); - - var validEntries = from state in pendingEntries - select new WorkflowState - { - Path = state.FilePath.ToLower(), - FileName = Path.GetFileName(state.FilePath).ToLower(), - Hash = state.Checksum, - Status = state.OverlayStatus == FileOverlayStatus.Unversioned - ? FileStatus.Created - : state.FileStatus, - TriggeringChange = - state.OverlayStatus == FileOverlayStatus.Unversioned - ? WatcherChangeTypes.Created - : WatcherChangeTypes.Changed - }; - foreach (var entry in validEntries) - { - Post(entry); - } + Log.DebugFormat("Found {0} interrupted files", pendingStates.Count); + + pendingStates.ForEach(Post); } - } + } + - public void Post(WorkflowState workflowState) { if (Log.IsDebugEnabled) Log.DebugFormat("Posted {0} {1} {2}", workflowState.Path, workflowState.Status, workflowState.TriggeringChange); + + //Remove invalid state + //For now, ignore paths + /* if (Directory.Exists(workflowState.Path)) + return;*/ + //TODO: Need to handle folder renames + + Debug.Assert(workflowState.Path.StartsWith(workflowState.AccountInfo.AccountPath, StringComparison.InvariantCultureIgnoreCase), "File from wrong account posted"); + _agent.Post(workflowState); - } + } + } }