using System; using System.Collections.Generic; using System.ComponentModel.Composition; using System.Diagnostics; using System.Diagnostics.Contracts; using System.IO; using System.Linq; using System.Text; using System.Threading.Tasks; using Pithos.Interfaces; using Pithos.Network; using log4net; namespace Pithos.Core.Agents { [Export] public class WorkflowAgent { Agent _agent; public IStatusNotification StatusNotification { get; set; } [Import] public IStatusKeeper StatusKeeper { get; set; } [Import] public NetworkAgent NetworkAgent { get; set; } private static readonly ILog Log = LogManager.GetLogger("WorkflowAgent"); public void Start() { _agent = Agent.Start(inbox => { Action loop = null; loop = () => { var message = inbox.Receive(); var process = message.Then(Process, inbox.CancellationToken); inbox.LoopAsync(process,loop,ex=> Log.ErrorFormat("[ERROR] Synch for {0}:\r{1}", message.Result.FileName, ex)); }; loop(); }); } private Task Process(WorkflowState state) { var accountInfo = state.AccountInfo; using (log4net.ThreadContext.Stacks["Workflow"].Push("Process")) { if (Log.IsDebugEnabled) Log.DebugFormat("State {0} {1} {2}", state.FileName,state.Status,state.TriggeringChange); if (state.Skip) { if (Log.IsDebugEnabled) Log.DebugFormat("Skipping {0}",state.FileName); return CompletedTask.Default; } string path = state.Path.ToLower(); //Bypass deleted files, unless the status is Deleted if (!File.Exists(path) && state.Status != FileStatus.Deleted) { state.Skip = true; this.StatusKeeper.ClearFileStatus(path); if (Log.IsDebugEnabled) Log.DebugFormat("Skipped missing {0}", state.FileName); return CompletedTask.Default; } var fileState = FileState.FindByFilePath(path); var info = new FileInfo(path); switch (state.Status) { case FileStatus.Created: case FileStatus.Modified: NetworkAgent.Post(new CloudUploadAction(accountInfo,info, fileState, accountInfo.BlockSize, accountInfo.BlockHash)); break; case FileStatus.Deleted: NetworkAgent.Post(new CloudDeleteAction(accountInfo,info, fileState)); break; case FileStatus.Renamed: NetworkAgent.Post(new CloudMoveAction(accountInfo,CloudActionType.RenameCloud, new FileInfo(state.OldPath), new FileInfo(state.Path))); break; } return CompletedTask.Default; } } //Starts interrupted files for a specific account public void RestartInterruptedFiles(AccountInfo accountInfo) { StatusNotification.NotifyChange("Restart processing interrupted files", TraceLevel.Verbose); using (log4net.ThreadContext.Stacks["Workflow"].Push("Restart")) { if (Log.IsDebugEnabled) Log.Debug("Starting interrupted files"); var cachePath = Path.Combine(accountInfo.AccountPath, FolderConstants.CacheFolder) .ToLower(); var account = accountInfo; var pendingEntries = from state in FileState.Queryable where state.FileStatus != FileStatus.Unchanged && !state.FilePath.StartsWith(cachePath) && !state.FilePath.EndsWith(".ignore") && state.FilePath.StartsWith(account.AccountPath) select state; var pendingStates = new List(); foreach (var state in pendingEntries) { pendingStates.Add(new WorkflowState(account, state)); } if (Log.IsDebugEnabled) Log.DebugFormat("Found {0} interrupted files", pendingStates.Count); foreach (var entry in pendingStates) { //Remove invalid state if (Directory.Exists(entry.Path)) { Debug.Assert(false, "State has path instead of file", entry.Path); StatusKeeper.ClearFileStatus(entry.Path); return; } else Post(entry); } } } public void Post(WorkflowState workflowState) { if (Log.IsDebugEnabled) Log.DebugFormat("Posted {0} {1} {2}", workflowState.Path, workflowState.Status, workflowState.TriggeringChange); //Remove invalid state Debug.Assert(!Directory.Exists(workflowState.Path), "State has path instead of file", workflowState.Path); Debug.Assert(workflowState.Path.StartsWith(workflowState.AccountInfo.AccountPath, StringComparison.InvariantCultureIgnoreCase), "File from wrong account posted"); _agent.Post(workflowState); } } }