using System; using System.Collections.Generic; using System.ComponentModel.Composition; using System.Diagnostics; using System.Diagnostics.Contracts; using System.IO; using System.Linq; using System.Text; using System.Threading.Tasks; using Pithos.Interfaces; using Pithos.Network; using log4net; namespace Pithos.Core.Agents { [Export] public class WorkflowAgent { Agent _agent; public IStatusNotification StatusNotification { get; set; } [Import] public IStatusKeeper StatusKeeper { get; set; } [Import] public NetworkAgent NetworkAgent { get; set; } private static readonly ILog Log = LogManager.GetLogger("WorkflowAgent"); public void Start() { _agent = Agent.Start(inbox => { Action loop = null; loop = () => { var message = inbox.Receive(); var process = message.Then(Process, inbox.CancellationToken); inbox.LoopAsync(process,loop,ex=> Log.ErrorFormat("[ERROR] Synch for {0}:\r{1}", message.Result.FileName, ex)); }; loop(); }); } private Task Process(WorkflowState state) { var accountInfo = state.AccountInfo; using (log4net.ThreadContext.Stacks["Workflow"].Push("Process")) { try { if (Log.IsDebugEnabled) Log.DebugFormat("State {0} {1} {2}", state.FileName, state.Status, state.TriggeringChange); if (state.Skip) { if (Log.IsDebugEnabled) Log.DebugFormat("Skipping {0}", state.FileName); return CompletedTask.Default; } string path = state.Path.ToLower(); FileSystemInfo info = Directory.Exists(path) ? (FileSystemInfo) new DirectoryInfo(path) : new FileInfo(path); //Bypass deleted files, unless the status is Deleted if (!info.Exists && state.Status != FileStatus.Deleted) { state.Skip = true; this.StatusKeeper.ClearFileStatus(path); if (Log.IsDebugEnabled) Log.DebugFormat("Skipped missing {0}", state.FileName); return CompletedTask.Default; } var fileState = FileState.FindByFilePath(path); switch (state.Status) { case FileStatus.Created: case FileStatus.Modified: NetworkAgent.Post(new CloudUploadAction(accountInfo, info, fileState, accountInfo.BlockSize, accountInfo.BlockHash)); break; case FileStatus.Deleted: NetworkAgent.Post(new CloudDeleteAction(accountInfo, info, fileState)); break; case FileStatus.Renamed: FileSystemInfo oldInfo = Directory.Exists(state.OldPath) ? (FileSystemInfo)new DirectoryInfo(state.OldPath) : new FileInfo(state.OldPath); FileSystemInfo newInfo = Directory.Exists(state.Path) ? (FileSystemInfo)new DirectoryInfo(state.Path) : new FileInfo(state.Path); NetworkAgent.Post(new CloudMoveAction(accountInfo, CloudActionType.RenameCloud, oldInfo, newInfo)); break; } return CompletedTask.Default; } catch (Exception ex) { Log.Error(ex.ToString()); throw; } } } //Starts interrupted files for a specific account public void RestartInterruptedFiles(AccountInfo accountInfo) { StatusNotification.NotifyChange("Restart processing interrupted files", TraceLevel.Verbose); using (log4net.ThreadContext.Stacks["Workflow"].Push("Restart")) { if (Log.IsDebugEnabled) Log.Debug("Starting interrupted files"); var cachePath = Path.Combine(accountInfo.AccountPath, FolderConstants.CacheFolder) .ToLower(); var account = accountInfo; var pendingEntries = from state in FileState.Queryable where state.FileStatus != FileStatus.Unchanged && !state.FilePath.StartsWith(cachePath) && !state.FilePath.EndsWith(".ignore") && state.FilePath.StartsWith(account.AccountPath) select state; var pendingStates = new List(); foreach (var state in pendingEntries) { pendingStates.Add(new WorkflowState(account, state)); } if (Log.IsDebugEnabled) Log.DebugFormat("Found {0} interrupted files", pendingStates.Count); foreach (var entry in pendingStates) { Post(entry); } } } public void Post(WorkflowState workflowState) { if (Log.IsDebugEnabled) Log.DebugFormat("Posted {0} {1} {2}", workflowState.Path, workflowState.Status, workflowState.TriggeringChange); //Remove invalid state //For now, ignore paths /* if (Directory.Exists(workflowState.Path)) return;*/ //TODO: Need to handle folder renames Debug.Assert(workflowState.Path.StartsWith(workflowState.AccountInfo.AccountPath, StringComparison.InvariantCultureIgnoreCase), "File from wrong account posted"); _agent.Post(workflowState); } } }