X-Git-Url: https://code.grnet.gr/git/pithos-ms-client/blobdiff_plain/a27aa4479e60923c850cab41a5b16704805dbb08..0a9d4d1838a4783c898ec6e10a5cec1fec045b63:/trunk/Pithos.Core/Agents/WorkflowAgent.cs diff --git a/trunk/Pithos.Core/Agents/WorkflowAgent.cs b/trunk/Pithos.Core/Agents/WorkflowAgent.cs index d237958..5ff697e 100644 --- a/trunk/Pithos.Core/Agents/WorkflowAgent.cs +++ b/trunk/Pithos.Core/Agents/WorkflowAgent.cs @@ -1,32 +1,80 @@ -using System; +#region +/* ----------------------------------------------------------------------- + * + * + * Copyright 2011-2012 GRNET S.A. All rights reserved. + * + * Redistribution and use in source and binary forms, with or + * without modification, are permitted provided that the following + * conditions are met: + * + * 1. Redistributions of source code must retain the above + * copyright notice, this list of conditions and the following + * disclaimer. + * + * 2. Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following + * disclaimer in the documentation and/or other materials + * provided with the distribution. + * + * + * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS + * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED + * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR + * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR + * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF + * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED + * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT + * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN + * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + * + * The views and conclusions contained in the software and + * documentation are those of the authors and should not be + * interpreted as representing official policies, either expressed + * or implied, of GRNET S.A. + * + * ----------------------------------------------------------------------- + */ +#endregion +using System; using System.Collections.Generic; using System.ComponentModel.Composition; using System.Diagnostics; using System.Diagnostics.Contracts; using System.IO; using System.Linq; +using System.Reflection; using System.Text; +using System.Threading.Tasks; +using Castle.ActiveRecord; using Pithos.Interfaces; +using Pithos.Network; +using log4net; namespace Pithos.Core.Agents { [Export] public class WorkflowAgent { - Agent _agent; + private static readonly ILog Log = LogManager.GetLogger(MethodBase.GetCurrentMethod().DeclaringType); + + readonly Agent _agent; public IStatusNotification StatusNotification { get; set; } - [Import] + [System.ComponentModel.Composition.Import] public IStatusKeeper StatusKeeper { get; set; } - //We should avoid processing files stored in the Fragments folder - //The Full path to the fragments folder is stored in FragmentsPath - public string FragmentsPath { get; set; } - - [Import] + [System.ComponentModel.Composition.Import] public NetworkAgent NetworkAgent { get; set; } - public void Start() + [System.ComponentModel.Composition.Import] + public IPithosSettings Settings { get; set; } + + + public WorkflowAgent() { _agent = Agent.Start(inbox => { @@ -34,105 +82,194 @@ namespace Pithos.Core.Agents loop = () => { var message = inbox.Receive(); - var process = message.ContinueWith(t => - { - var state = t.Result; - Process(state); - inbox.DoAsync(loop); - }); - process.ContinueWith(t => - { - inbox.DoAsync(loop); - if (t.IsFaulted) - { - var ex = t.Exception.InnerException; - if (ex is OperationCanceledException) - inbox.Stop(); - Trace.TraceError("[ERROR] Synch for {0}:\r{1}", message.Result.FileName, ex); - } - }); - + var process = message.Then(Process, inbox.CancellationToken); + inbox.LoopAsync(process,loop,ex=> + Log.ErrorFormat("[ERROR] Synch for {0}:\r{1}", message.Result.FileName, ex)); }; loop(); }); } - public void RestartInterruptedFiles() + private Task Process(WorkflowState state) { - - StatusNotification.NotifyChange("Restart processing interrupted files", TraceLevel.Verbose); - var interruptedStates = new[] { FileOverlayStatus.Unversioned, FileOverlayStatus.Modified }; - - var pendingEntries = (from state in FileState.Queryable - where interruptedStates.Contains(state.OverlayStatus) && - !state.FilePath.StartsWith(FragmentsPath) && - !state.FilePath.EndsWith(".ignore") - select state).ToList(); - var staleEntries = from state in pendingEntries - where !File.Exists(state.FilePath) - select state; - var staleKeys = staleEntries.Select(state=>state.Id); - FileState.DeleteAll(staleKeys); - - var validEntries = from state in pendingEntries.Except(staleEntries) - where File.Exists(state.FilePath) - select new WorkflowState - { - Path = state.FilePath.ToLower(), - FileName = Path.GetFileName(state.FilePath).ToLower(), - Hash = state.Checksum, - Status = state.OverlayStatus == FileOverlayStatus.Unversioned ? - FileStatus.Created : - FileStatus.Modified, - TriggeringChange = state.OverlayStatus == FileOverlayStatus.Unversioned ? - WatcherChangeTypes.Created : - WatcherChangeTypes.Changed - }; - foreach (var entry in validEntries) + var accountInfo = state.AccountInfo; + using (log4net.ThreadContext.Stacks["Workflow"].Push("Process")) { - Post(entry); - } + try + { - } + if (Log.IsDebugEnabled) + Log.DebugFormat("State {0} {1} {2}", state.FileName, state.Status, state.TriggeringChange); - private void Process(WorkflowState state) - { - if (state.Skip) - return; - string path = state.Path.ToLower(); - string fileName = Path.GetFileName(path); + if (state.Skip) + { + if (Log.IsDebugEnabled) Log.DebugFormat("Skipping {0}", state.FileName); + + return CompletedTask.Default; + } + + var info = Directory.Exists(state.Path) ? (FileSystemInfo)new DirectoryInfo(state.Path) : new FileInfo(state.Path); + + //Bypass deleted files, unless the status is Deleted + if (!info.Exists && state.Status != FileStatus.Deleted) + { + state.Skip = true; + this.StatusKeeper.ClearFileStatus(state.Path); + + if (Log.IsDebugEnabled) Log.DebugFormat("Skipped missing {0}", state.FileName); + + return CompletedTask.Default; + } + + using (new SessionScope(FlushAction.Never)) + { + + var fileState = StatusKeeper.GetStateByFilePath(state.Path); + + switch (state.Status) + { + case FileStatus.Created: + case FileStatus.Modified: + NetworkAgent.Post(new CloudUploadAction(accountInfo, info, fileState, + accountInfo.BlockSize, + accountInfo.BlockHash)); + break; + case FileStatus.Deleted: + DeleteChildObjects(state, fileState); + NetworkAgent.Post(new CloudDeleteAction(accountInfo, info, fileState)); + break; + case FileStatus.Renamed: + if (state.OldPath == null) + { + //We reach this point only if the app closed before propagating a rename to the server + Log.WarnFormat("Unfinished rename [{0}]",state.Path); + StatusKeeper.SetFileState(state.Path,FileStatus.Conflict,FileOverlayStatus.Conflict, "Rename without old path"); + break; + } + FileSystemInfo oldInfo = Directory.Exists(state.OldPath) + ? (FileSystemInfo) new DirectoryInfo(state.OldPath) + : new FileInfo(state.OldPath); + FileSystemInfo newInfo = Directory.Exists(state.Path) + ? (FileSystemInfo) new DirectoryInfo(state.Path) + : new FileInfo(state.Path); + NetworkAgent.Post(new CloudMoveAction(accountInfo, CloudActionType.RenameCloud, + oldInfo, + newInfo)); + //TODO: Do I have to move children as well or will Pithos handle this? + //Need to find all children of the OLD filepath + //MoveChildObjects(state); + break; + } + } + + return CompletedTask.Default; + } + catch (Exception ex) + { + Log.Error(ex.ToString()); + throw; + } + } + } - //Bypass deleted files, unless the status is Deleted - if (!(File.Exists(path) || state.Status != FileStatus.Deleted)) + + private void DeleteChildObjects(WorkflowState state, FileState fileState) + { + if (fileState != null) { - state.Skip = true; - this.StatusKeeper.RemoveFileOverlayStatus(path); - return; + var children = StatusKeeper.GetChildren(fileState); + foreach (var child in children) + { + var childInfo = child.IsFolder + ? (FileSystemInfo) new DirectoryInfo(child.FilePath) + : new FileInfo(child.FilePath); + NetworkAgent.Post(new CloudDeleteAction(state.AccountInfo, childInfo, child)); + } } - var fileState = FileState.FindByFilePath(path); - switch (state.Status) + } + + /*private void MoveChildObjects(WorkflowState state) + { + var oldFileState = StatusKeeper.GetStateByFilePath(state.OldPath); + if (oldFileState != null) { - case FileStatus.Created: - case FileStatus.Modified: - var info = new FileInfo(path); - NetworkAgent.Post(new CloudAction(CloudActionType.UploadUnconditional, info, ObjectInfo.Empty,fileState)); - break; - case FileStatus.Deleted: - NetworkAgent.Post(new CloudAction(CloudActionType.DeleteCloud, null, new ObjectInfo {Name=fileName},fileState)); - break; - case FileStatus.Renamed: - NetworkAgent.Post(new CloudAction(CloudActionType.RenameCloud, state.OldFileName,state.OldPath,state.FileName,state.Path)); - break; + var children = StatusKeeper.GetChildren(oldFileState); + foreach (var child in children) + { + var newPath = Path.Combine(state.Path, child.FilePath.Substring(state.OldPath.Length+1)); + + var oldMoveInfo = child.IsFolder + ? (FileSystemInfo) new DirectoryInfo(child.FilePath) + : new FileInfo(child.FilePath); + var newMoveInfo = child.IsFolder + ? (FileSystemInfo) new DirectoryInfo(newPath) + : new FileInfo(newPath); + //The new file path will be created by trimming the old root path + //and substituting the new root path + + NetworkAgent.Post(new CloudMoveAction(state.AccountInfo, CloudActionType.RenameCloud, + oldMoveInfo, newMoveInfo)); + } } + }*/ + + + //Starts interrupted files for a specific account + public void RestartInterruptedFiles(AccountInfo accountInfo) + { + + StatusKeeper.CleanupOrphanStates(); + using (log4net.ThreadContext.Stacks["Operation"].Push("RestartInterrupted")) + { + if (Log.IsDebugEnabled) + Log.Debug("Starting interrupted files"); - return; + var cachePath = Path.Combine(accountInfo.AccountPath, FolderConstants.CacheFolder) + .ToLower(); + + + + + var account = accountInfo; + var pendingEntries = (from state in FileState.Queryable + where state.FileStatus != FileStatus.Unchanged && + state.FileStatus != FileStatus.Forbidden && + state.FileStatus != FileStatus.Conflict && + !state.FilePath.StartsWith(cachePath) && + !state.FilePath.EndsWith(".ignore") && + state.FilePath.StartsWith(account.AccountPath) + select state).ToList(); + if (pendingEntries.Count>0) + StatusNotification.NotifyChange("Restart processing interrupted files", TraceLevel.Verbose); + + var pendingStates = pendingEntries + .Select(state => new WorkflowState(account, state)) + .ToList(); + + if (Log.IsDebugEnabled) + Log.DebugFormat("Found {0} interrupted files", pendingStates.Count); + + pendingStates.ForEach(Post); + } } - + public void Post(WorkflowState workflowState) { + if (Log.IsDebugEnabled) + Log.DebugFormat("Posted {0} {1} {2}", workflowState.Path, workflowState.Status, workflowState.TriggeringChange); + + //Remove invalid state + //For now, ignore paths + /* if (Directory.Exists(workflowState.Path)) + return;*/ + //TODO: Need to handle folder renames + + Debug.Assert(workflowState.Path.StartsWith(workflowState.AccountInfo.AccountPath, StringComparison.InvariantCultureIgnoreCase), "File from wrong account posted"); + _agent.Post(workflowState); - } + } + } }