2 using System.Collections.Generic;
3 using System.ComponentModel.Composition;
4 using System.Diagnostics;
5 using System.Diagnostics.Contracts;
9 using System.Threading.Tasks;
10 using Pithos.Interfaces;
14 namespace Pithos.Core.Agents
17 public class WorkflowAgent
19 Agent<WorkflowState> _agent;
21 public IStatusNotification StatusNotification { get; set; }
23 public IStatusKeeper StatusKeeper { get; set; }
26 public NetworkAgent NetworkAgent { get; set; }
28 private static readonly ILog Log = LogManager.GetLogger("WorkflowAgent");
32 _agent = Agent<WorkflowState>.Start(inbox =>
37 var message = inbox.Receive();
38 var process = message.Then(Process, inbox.CancellationToken);
39 inbox.LoopAsync(process,loop,ex=>
40 Log.ErrorFormat("[ERROR] Synch for {0}:\r{1}", message.Result.FileName, ex));
46 private Task<object> Process(WorkflowState state)
48 var accountInfo = state.AccountInfo;
49 using (log4net.ThreadContext.Stacks["Workflow"].Push("Process"))
54 if (Log.IsDebugEnabled)
55 Log.DebugFormat("State {0} {1} {2}", state.FileName, state.Status, state.TriggeringChange);
59 if (Log.IsDebugEnabled) Log.DebugFormat("Skipping {0}", state.FileName);
61 return CompletedTask<object>.Default;
63 string path = state.Path.ToLower();
67 FileSystemInfo info = Directory.Exists(path) ? (FileSystemInfo) new DirectoryInfo(path) : new FileInfo(path);
69 //Bypass deleted files, unless the status is Deleted
70 if (!info.Exists && state.Status != FileStatus.Deleted)
73 this.StatusKeeper.ClearFileStatus(path);
75 if (Log.IsDebugEnabled) Log.DebugFormat("Skipped missing {0}", state.FileName);
77 return CompletedTask<object>.Default;
80 var fileState = FileState.FindByFilePath(path);
85 case FileStatus.Created:
86 case FileStatus.Modified:
87 NetworkAgent.Post(new CloudUploadAction(accountInfo, info, fileState, accountInfo.BlockSize,
88 accountInfo.BlockHash));
90 case FileStatus.Deleted:
91 NetworkAgent.Post(new CloudDeleteAction(accountInfo, info, fileState));
93 case FileStatus.Renamed:
94 FileSystemInfo oldInfo = Directory.Exists(state.OldPath) ? (FileSystemInfo)new DirectoryInfo(state.OldPath) : new FileInfo(state.OldPath);
95 FileSystemInfo newInfo = Directory.Exists(state.Path) ? (FileSystemInfo)new DirectoryInfo(state.Path) : new FileInfo(state.Path);
96 NetworkAgent.Post(new CloudMoveAction(accountInfo, CloudActionType.RenameCloud,
102 return CompletedTask<object>.Default;
106 Log.Error(ex.ToString());
113 //Starts interrupted files for a specific account
114 public void RestartInterruptedFiles(AccountInfo accountInfo)
117 StatusNotification.NotifyChange("Restart processing interrupted files", TraceLevel.Verbose);
119 using (log4net.ThreadContext.Stacks["Workflow"].Push("Restart"))
121 if (Log.IsDebugEnabled)
122 Log.Debug("Starting interrupted files");
124 var cachePath = Path.Combine(accountInfo.AccountPath, FolderConstants.CacheFolder)
129 var account = accountInfo;
130 var pendingEntries = from state in FileState.Queryable
131 where state.FileStatus != FileStatus.Unchanged &&
132 !state.FilePath.StartsWith(cachePath) &&
133 !state.FilePath.EndsWith(".ignore") &&
134 state.FilePath.StartsWith(account.AccountPath)
136 var pendingStates = new List<WorkflowState>();
137 foreach (var state in pendingEntries)
139 pendingStates.Add(new WorkflowState(account, state));
141 if (Log.IsDebugEnabled)
142 Log.DebugFormat("Found {0} interrupted files", pendingStates.Count);
145 foreach (var entry in pendingStates)
154 public void Post(WorkflowState workflowState)
156 if (Log.IsDebugEnabled)
157 Log.DebugFormat("Posted {0} {1} {2}", workflowState.Path, workflowState.Status, workflowState.TriggeringChange);
159 //Remove invalid state
160 //For now, ignore paths
161 /* if (Directory.Exists(workflowState.Path))
163 //TODO: Need to handle folder renames
165 Debug.Assert(workflowState.Path.StartsWith(workflowState.AccountInfo.AccountPath, StringComparison.InvariantCultureIgnoreCase), "File from wrong account posted");
167 _agent.Post(workflowState);