#region /* ----------------------------------------------------------------------- * * * Copyright 2011-2012 GRNET S.A. All rights reserved. * * Redistribution and use in source and binary forms, with or * without modification, are permitted provided that the following * conditions are met: * * 1. Redistributions of source code must retain the above * copyright notice, this list of conditions and the following * disclaimer. * * 2. Redistributions in binary form must reproduce the above * copyright notice, this list of conditions and the following * disclaimer in the documentation and/or other materials * provided with the distribution. * * * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE * POSSIBILITY OF SUCH DAMAGE. * * The views and conclusions contained in the software and * documentation are those of the authors and should not be * interpreted as representing official policies, either expressed * or implied, of GRNET S.A. * * ----------------------------------------------------------------------- */ #endregion using System; using System.Collections.Generic; using System.ComponentModel.Composition; using System.Diagnostics; using System.Diagnostics.Contracts; using System.IO; using System.Linq; using System.Reflection; using System.Text; using System.Threading.Tasks; using Castle.ActiveRecord; using Pithos.Interfaces; using Pithos.Network; using log4net; namespace Pithos.Core.Agents { [Export] public class WorkflowAgent { private static readonly ILog Log = LogManager.GetLogger(MethodBase.GetCurrentMethod().DeclaringType); readonly Agent _agent; public IStatusNotification StatusNotification { get; set; } [System.ComponentModel.Composition.Import] public IStatusKeeper StatusKeeper { get; set; } [System.ComponentModel.Composition.Import] public NetworkAgent NetworkAgent { get; set; } [System.ComponentModel.Composition.Import] public IPithosSettings Settings { get; set; } public WorkflowAgent() { _agent = Agent.Start(inbox => { Action loop = null; loop = () => { var message = inbox.Receive(); var process = message.Then(Process, inbox.CancellationToken); inbox.LoopAsync(process,loop,ex=> Log.ErrorFormat("[ERROR] Synch for {0}:\r{1}", message.Result.FileName, ex)); }; loop(); }); } private Task Process(WorkflowState state) { var accountInfo = state.AccountInfo; using (log4net.ThreadContext.Stacks["Workflow"].Push("Process")) { try { if (Log.IsDebugEnabled) Log.DebugFormat("State {0} {1} {2}", state.FileName, state.Status, state.TriggeringChange); if (state.Skip) { if (Log.IsDebugEnabled) Log.DebugFormat("Skipping {0}", state.FileName); return CompletedTask.Default; } var info = Directory.Exists(state.Path) ? (FileSystemInfo)new DirectoryInfo(state.Path) : new FileInfo(state.Path); //Bypass deleted files, unless the status is Deleted if (!info.Exists && state.Status != FileStatus.Deleted) { state.Skip = true; this.StatusKeeper.ClearFileStatus(state.Path); if (Log.IsDebugEnabled) Log.DebugFormat("Skipped missing {0}", state.FileName); return CompletedTask.Default; } using (new SessionScope(FlushAction.Never)) { var fileState = StatusKeeper.GetStateByFilePath(state.Path); switch (state.Status) { case FileStatus.Created: case FileStatus.Modified: NetworkAgent.Post(new CloudUploadAction(accountInfo, info, fileState, accountInfo.BlockSize, accountInfo.BlockHash)); break; case FileStatus.Deleted: DeleteChildObjects(state, fileState); NetworkAgent.Post(new CloudDeleteAction(accountInfo, info, fileState)); break; case FileStatus.Renamed: if (state.OldPath == null) { //We reach this point only if the app closed before propagating a rename to the server Log.WarnFormat("Unfinished rename [{0}]",state.Path); StatusKeeper.SetFileState(state.Path,FileStatus.Conflict,FileOverlayStatus.Conflict, "Rename without old path"); break; } FileSystemInfo oldInfo = Directory.Exists(state.OldPath) ? (FileSystemInfo) new DirectoryInfo(state.OldPath) : new FileInfo(state.OldPath); FileSystemInfo newInfo = Directory.Exists(state.Path) ? (FileSystemInfo) new DirectoryInfo(state.Path) : new FileInfo(state.Path); NetworkAgent.Post(new CloudMoveAction(accountInfo, CloudActionType.RenameCloud, oldInfo, newInfo)); //TODO: Do I have to move children as well or will Pithos handle this? //Need to find all children of the OLD filepath //MoveChildObjects(state); break; } } return CompletedTask.Default; } catch (Exception ex) { Log.Error(ex.ToString()); throw; } } } private void DeleteChildObjects(WorkflowState state, FileState fileState) { if (fileState != null) { var children = StatusKeeper.GetChildren(fileState); foreach (var child in children) { var childInfo = child.IsFolder ? (FileSystemInfo) new DirectoryInfo(child.FilePath) : new FileInfo(child.FilePath); NetworkAgent.Post(new CloudDeleteAction(state.AccountInfo, childInfo, child)); } } } /*private void MoveChildObjects(WorkflowState state) { var oldFileState = StatusKeeper.GetStateByFilePath(state.OldPath); if (oldFileState != null) { var children = StatusKeeper.GetChildren(oldFileState); foreach (var child in children) { var newPath = Path.Combine(state.Path, child.FilePath.Substring(state.OldPath.Length+1)); var oldMoveInfo = child.IsFolder ? (FileSystemInfo) new DirectoryInfo(child.FilePath) : new FileInfo(child.FilePath); var newMoveInfo = child.IsFolder ? (FileSystemInfo) new DirectoryInfo(newPath) : new FileInfo(newPath); //The new file path will be created by trimming the old root path //and substituting the new root path NetworkAgent.Post(new CloudMoveAction(state.AccountInfo, CloudActionType.RenameCloud, oldMoveInfo, newMoveInfo)); } } }*/ //Starts interrupted files for a specific account public void RestartInterruptedFiles(AccountInfo accountInfo) { StatusKeeper.CleanupOrphanStates(); using (log4net.ThreadContext.Stacks["Operation"].Push("RestartInterrupted")) { if (Log.IsDebugEnabled) Log.Debug("Starting interrupted files"); var cachePath = Path.Combine(accountInfo.AccountPath, FolderConstants.CacheFolder) .ToLower(); var account = accountInfo; var pendingEntries = (from state in FileState.Queryable where state.FileStatus != FileStatus.Unchanged && state.FileStatus != FileStatus.Forbidden && state.FileStatus != FileStatus.Conflict && !state.FilePath.StartsWith(cachePath) && !state.FilePath.EndsWith(".ignore") && state.FilePath.StartsWith(account.AccountPath) select state).ToList(); if (pendingEntries.Count>0) StatusNotification.NotifyChange("Restart processing interrupted files", TraceLevel.Verbose); var pendingStates = pendingEntries .Select(state => new WorkflowState(account, state)) .ToList(); if (Log.IsDebugEnabled) Log.DebugFormat("Found {0} interrupted files", pendingStates.Count); pendingStates.ForEach(Post); } } public void Post(WorkflowState workflowState) { if (Log.IsDebugEnabled) Log.DebugFormat("Posted {0} {1} {2}", workflowState.Path, workflowState.Status, workflowState.TriggeringChange); //Remove invalid state //For now, ignore paths /* if (Directory.Exists(workflowState.Path)) return;*/ //TODO: Need to handle folder renames Debug.Assert(workflowState.Path.StartsWith(workflowState.AccountInfo.AccountPath, StringComparison.InvariantCultureIgnoreCase), "File from wrong account posted"); _agent.Post(workflowState); } } }