2 using System.Collections.Generic;
3 using System.ComponentModel.Composition;
4 using System.Diagnostics;
5 using System.Diagnostics.Contracts;
9 using System.Threading.Tasks;
10 using System.Threading.Tasks.Dataflow;
11 using Pithos.Interfaces;
15 namespace Pithos.Core.Agents
18 public class WorkflowAgent
20 private ActionBlock<WorkflowState> _agent;
22 public IStatusNotification StatusNotification { get; set; }
24 public IStatusKeeper StatusKeeper { get; set; }
27 public NetworkAgent NetworkAgent { get; set; }
29 public ActionBlock<WorkflowState> Agent
31 get { return _agent; }
34 private static readonly ILog Log = LogManager.GetLogger("WorkflowAgent");
38 /*_agent = new ActionBlock<WorkflowState>(message =>
43 //var message = inbox.Receive();
45 var process = message.Then(Process, inbox.CancellationToken);
46 inbox.LoopAsync(process,loop,ex=>
47 Log.ErrorFormat("[ERROR] Synch for {0}:\r{1}", message.Result.FileName, ex));
51 _agent = new ActionBlock<WorkflowState>(async message =>
55 var action=await TaskEx.Run(()=>Process(message));
57 NetworkAgent.Post(action);
61 Log.ErrorFormat("[ERROR] Synch for {0}:\r{1}", message.FileName, ex);
66 private CloudAction Process(WorkflowState state)
68 var accountInfo = state.AccountInfo;
69 using (log4net.ThreadContext.Stacks["Workflow"].Push("Process"))
71 if (Log.IsDebugEnabled) Log.DebugFormat("State {0} {1} {2}", state.FileName,state.Status,state.TriggeringChange);
75 if (Log.IsDebugEnabled) Log.DebugFormat("Skipping {0}",state.FileName);
79 string path = state.Path.ToLower();
81 //Bypass deleted files, unless the status is Deleted
82 if (!File.Exists(path) && state.Status != FileStatus.Deleted)
85 this.StatusKeeper.ClearFileStatus(path);
87 if (Log.IsDebugEnabled) Log.DebugFormat("Skipped missing {0}", state.FileName);
92 var fileState = FileState.FindByFilePath(path);
93 var info = new FileInfo(path);
98 case FileStatus.Created:
99 case FileStatus.Modified:
100 return new CloudUploadAction(accountInfo,info, fileState, accountInfo.BlockSize,
101 accountInfo.BlockHash);
102 case FileStatus.Deleted:
103 return new CloudDeleteAction(accountInfo,info, fileState);
104 case FileStatus.Renamed:
105 return new CloudMoveAction(accountInfo,CloudActionType.RenameCloud,
106 new FileInfo(state.OldPath),
107 new FileInfo(state.Path));
114 //Starts interrupted files for a specific account
115 public void RestartInterruptedFiles(AccountInfo accountInfo)
118 StatusNotification.NotifyChange("Restart processing interrupted files", TraceLevel.Verbose);
120 using (log4net.ThreadContext.Stacks["Workflow"].Push("Restart"))
122 if (Log.IsDebugEnabled)
123 Log.Debug("Starting interrupted files");
125 var cachePath = Path.Combine(accountInfo.AccountPath, FolderConstants.CacheFolder)
129 var pendingEntries = from state in FileState.Queryable
130 where state.FileStatus != FileStatus.Unchanged &&
131 !state.FilePath.StartsWith(cachePath) &&
132 !state.FilePath.EndsWith(".ignore") &&
133 state.FilePath.StartsWith(accountInfo.AccountPath)
135 if (Log.IsDebugEnabled)
136 Log.DebugFormat("Found {0} interrupted files",pendingEntries.Count());
138 var validEntries = from state in pendingEntries
139 select new WorkflowState
141 AccountInfo=accountInfo,
142 Path = state.FilePath.ToLower(),
143 FileName = Path.GetFileName(state.FilePath).ToLower(),
144 Hash = state.Checksum,
145 Status = state.OverlayStatus == FileOverlayStatus.Unversioned
149 state.OverlayStatus == FileOverlayStatus.Unversioned
150 ? WatcherChangeTypes.Created
151 : WatcherChangeTypes.Changed
153 foreach (var entry in validEntries)
162 public void Post(WorkflowState workflowState)
164 if (Log.IsDebugEnabled)
165 Log.DebugFormat("Posted {0} {1} {2}", workflowState.Path, workflowState.Status, workflowState.TriggeringChange);
166 Agent.Post(workflowState);