2 using System.Collections.Generic;
3 using System.ComponentModel.Composition;
4 using System.Diagnostics;
5 using System.Diagnostics.Contracts;
9 using System.Threading.Tasks;
10 using Pithos.Interfaces;
14 namespace Pithos.Core.Agents
17 public class WorkflowAgent
19 Agent<WorkflowState> _agent;
21 public IStatusNotification StatusNotification { get; set; }
23 public IStatusKeeper StatusKeeper { get; set; }
25 //We should avoid processing files stored in the Fragments folder
26 //The Full path to the fragments folder is stored in FragmentsPath
27 //public string FragmentsPath { get; set; }
30 public NetworkAgent NetworkAgent { get; set; }
32 private static readonly ILog Log = LogManager.GetLogger("WorkflowAgent");
36 _agent = Agent<WorkflowState>.Start(inbox =>
41 var message = inbox.Receive();
42 var process = message.Then(Process, inbox.CancellationToken);
43 inbox.LoopAsync(process,loop,ex=>
44 Log.ErrorFormat("[ERROR] Synch for {0}:\r{1}", message.Result.FileName, ex));
50 private Task<object> Process(WorkflowState state)
52 var accountInfo = state.AccountInfo;
53 using (log4net.ThreadContext.Stacks["Workflow"].Push("Process"))
55 if (Log.IsDebugEnabled) Log.DebugFormat("State {0} {1} {2}", state.FileName,state.Status,state.TriggeringChange);
59 if (Log.IsDebugEnabled) Log.DebugFormat("Skipping {0}",state.FileName);
61 return CompletedTask<object>.Default;
63 string path = state.Path.ToLower();
65 //Bypass deleted files, unless the status is Deleted
66 if (!File.Exists(path) && state.Status != FileStatus.Deleted)
69 this.StatusKeeper.ClearFileStatus(path);
71 if (Log.IsDebugEnabled) Log.DebugFormat("Skipped missing {0}", state.FileName);
73 return CompletedTask<object>.Default;
76 var fileState = FileState.FindByFilePath(path);
77 var info = new FileInfo(path);
82 case FileStatus.Created:
83 case FileStatus.Modified:
84 NetworkAgent.Post(new CloudUploadAction(accountInfo,info, fileState, accountInfo.BlockSize,
85 accountInfo.BlockHash));
87 case FileStatus.Deleted:
88 string fileName = info.AsRelativeUrlTo(accountInfo.AccountPath);
89 NetworkAgent.Post(new CloudDeleteAction(accountInfo,fileName, fileState));
91 case FileStatus.Renamed:
92 NetworkAgent.Post(new CloudMoveAction(accountInfo,CloudActionType.RenameCloud, state.OldFileName,
93 state.OldPath, state.FileName, state.Path));
97 return CompletedTask<object>.Default;
102 //Starts interrupted files for a specific account
103 public void RestartInterruptedFiles(AccountInfo accountInfo)
106 StatusNotification.NotifyChange("Restart processing interrupted files", TraceLevel.Verbose);
108 using (log4net.ThreadContext.Stacks["Workflow"].Push("Restart"))
110 if (Log.IsDebugEnabled)
111 Log.Debug("Starting interrupted files");
113 var fragmentsPath = Path.Combine(accountInfo.AccountPath, FolderConstants.FragmentsFolder)
117 var pendingEntries = from state in FileState.Queryable
118 where state.FileStatus != FileStatus.Unchanged &&
119 !state.FilePath.StartsWith(fragmentsPath) &&
120 !state.FilePath.EndsWith(".ignore") &&
121 state.FilePath.StartsWith(accountInfo.AccountPath)
123 if (Log.IsDebugEnabled)
124 Log.DebugFormat("Found {0} interrupted files",pendingEntries.Count());
126 var validEntries = from state in pendingEntries
127 select new WorkflowState
129 AccountInfo=accountInfo,
130 Path = state.FilePath.ToLower(),
131 FileName = Path.GetFileName(state.FilePath).ToLower(),
132 Hash = state.Checksum,
133 Status = state.OverlayStatus == FileOverlayStatus.Unversioned
137 state.OverlayStatus == FileOverlayStatus.Unversioned
138 ? WatcherChangeTypes.Created
139 : WatcherChangeTypes.Changed
141 foreach (var entry in validEntries)
150 public void Post(WorkflowState workflowState)
152 if (Log.IsDebugEnabled)
153 Log.DebugFormat("Posted {0} {1} {2}", workflowState.Path, workflowState.Status, workflowState.TriggeringChange);
154 _agent.Post(workflowState);