2 using System.Collections.Generic;
3 using System.ComponentModel.Composition;
4 using System.Diagnostics;
5 using System.Diagnostics.Contracts;
9 using System.Threading.Tasks;
10 using Pithos.Interfaces;
13 namespace Pithos.Core.Agents
16 public class WorkflowAgent
18 Agent<WorkflowState> _agent;
20 public IStatusNotification StatusNotification { get; set; }
22 public IStatusKeeper StatusKeeper { get; set; }
24 //We should avoid processing files stored in the Fragments folder
25 //The Full path to the fragments folder is stored in FragmentsPath
26 public string FragmentsPath { get; set; }
29 public NetworkAgent NetworkAgent { get; set; }
31 private static readonly ILog Log = LogManager.GetLogger("WorkflowAgent");
35 _agent = Agent<WorkflowState>.Start(inbox =>
40 var message = inbox.Receive();
41 var process = message.Then(Process, inbox.CancellationToken);
42 inbox.LoopAsync(process,loop,ex=>
43 Log.ErrorFormat("[ERROR] Synch for {0}:\r{1}", message.Result.FileName, ex));
49 private Task<object> Process(WorkflowState state)
51 using (log4net.ThreadContext.Stacks["Workflow"].Push("Process"))
53 if (Log.IsDebugEnabled) Log.DebugFormat("State {0} {1} {2}", state.FileName,state.Status,state.TriggeringChange);
57 if (Log.IsDebugEnabled) Log.DebugFormat("Skipping {0}",state.FileName);
59 return CompletedTask<object>.Default;
61 string path = state.Path.ToLower();
63 //Bypass deleted files, unless the status is Deleted
64 if (!File.Exists(path) && state.Status != FileStatus.Deleted)
67 this.StatusKeeper.ClearFileStatus(path);
69 if (Log.IsDebugEnabled) Log.DebugFormat("Skipped missing {0}", state.FileName);
71 return CompletedTask<object>.Default;
73 var fileState = FileState.FindByFilePath(path);
74 var blockHash = NetworkAgent.BlockHash;
75 var blockSize = NetworkAgent.BlockSize;
76 var info = new FileInfo(path);
80 case FileStatus.Created:
81 case FileStatus.Modified:
82 NetworkAgent.Post(new CloudUploadAction(info, fileState, blockSize, blockHash));
84 case FileStatus.Deleted:
85 string fileName = info.AsRelativeUrlTo(NetworkAgent.FileAgent.RootPath);
86 NetworkAgent.Post(new CloudDeleteAction(fileName, fileState));
88 case FileStatus.Renamed:
89 NetworkAgent.Post(new CloudMoveAction(CloudActionType.RenameCloud, state.OldFileName,
90 state.OldPath, state.FileName, state.Path));
94 return CompletedTask<object>.Default;
100 public void RestartInterruptedFiles()
103 StatusNotification.NotifyChange("Restart processing interrupted files", TraceLevel.Verbose);
105 using (log4net.ThreadContext.Stacks["Workflow"].Push("Restart"))
107 if (Log.IsDebugEnabled)
108 Log.Debug("Starting interrupted files");
110 var pendingEntries = from state in FileState.Queryable
111 where state.FileStatus != FileStatus.Unchanged &&
112 !state.FilePath.StartsWith(FragmentsPath.ToLower()) &&
113 !state.FilePath.EndsWith(".ignore")
115 if (Log.IsDebugEnabled)
116 Log.DebugFormat("Found {0} interrupted files",pendingEntries.Count());
118 var validEntries = from state in pendingEntries
119 select new WorkflowState
121 Path = state.FilePath.ToLower(),
122 FileName = Path.GetFileName(state.FilePath).ToLower(),
123 Hash = state.Checksum,
124 Status = state.OverlayStatus == FileOverlayStatus.Unversioned
128 state.OverlayStatus == FileOverlayStatus.Unversioned
129 ? WatcherChangeTypes.Created
130 : WatcherChangeTypes.Changed
132 foreach (var entry in validEntries)
141 public void Post(WorkflowState workflowState)
143 if (Log.IsDebugEnabled)
144 Log.DebugFormat("Posted {0} {1} {2}", workflowState.Path, workflowState.Status, workflowState.TriggeringChange);
145 _agent.Post(workflowState);