using System.Linq;
using System.Text;
using System.Threading.Tasks;
-using System.Threading.Tasks.Dataflow;
using Pithos.Interfaces;
using Pithos.Network;
using log4net;
[Export]
public class WorkflowAgent
{
- private ActionBlock<WorkflowState> _agent;
+ Agent<WorkflowState> _agent;
public IStatusNotification StatusNotification { get; set; }
[Import]
[Import]
public NetworkAgent NetworkAgent { get; set; }
- public ActionBlock<WorkflowState> Agent
- {
- get { return _agent; }
- }
-
private static readonly ILog Log = LogManager.GetLogger("WorkflowAgent");
public void Start()
{
- /*_agent = new ActionBlock<WorkflowState>(message =>
+ _agent = Agent<WorkflowState>.Start(inbox =>
{
Action loop = null;
loop = () =>
{
- //var message = inbox.Receive();
- Process(message);
+ var message = inbox.Receive();
var process = message.Then(Process, inbox.CancellationToken);
inbox.LoopAsync(process,loop,ex=>
Log.ErrorFormat("[ERROR] Synch for {0}:\r{1}", message.Result.FileName, ex));
};
loop();
- });*/
- _agent = new ActionBlock<WorkflowState>(async message =>
- {
- try
- {
- var action=await TaskEx.Run(()=>Process(message));
- if (action!=null)
- NetworkAgent.Post(action);
- }
- catch (Exception ex)
- {
- Log.ErrorFormat("[ERROR] Synch for {0}:\r{1}", message.FileName, ex);
- }
- });
+ });
}
- private CloudAction Process(WorkflowState state)
+ private Task<object> Process(WorkflowState state)
{
var accountInfo = state.AccountInfo;
using (log4net.ThreadContext.Stacks["Workflow"].Push("Process"))
{
if (Log.IsDebugEnabled) Log.DebugFormat("Skipping {0}",state.FileName);
- return null;
+ return CompletedTask<object>.Default;
}
string path = state.Path.ToLower();
if (Log.IsDebugEnabled) Log.DebugFormat("Skipped missing {0}", state.FileName);
- return null;
+ return CompletedTask<object>.Default;
}
var fileState = FileState.FindByFilePath(path);
{
case FileStatus.Created:
case FileStatus.Modified:
- return new CloudUploadAction(accountInfo,info, fileState, accountInfo.BlockSize,
- accountInfo.BlockHash);
+ NetworkAgent.Post(new CloudUploadAction(accountInfo,info, fileState, accountInfo.BlockSize,
+ accountInfo.BlockHash));
+ break;
case FileStatus.Deleted:
- return new CloudDeleteAction(accountInfo,info, fileState);
+ NetworkAgent.Post(new CloudDeleteAction(accountInfo,info, fileState));
+ break;
case FileStatus.Renamed:
- return new CloudMoveAction(accountInfo,CloudActionType.RenameCloud,
- new FileInfo(state.OldPath),
- new FileInfo(state.Path));
+ NetworkAgent.Post(new CloudMoveAction(accountInfo,CloudActionType.RenameCloud, new FileInfo(state.OldPath),
+ new FileInfo(state.Path)));
+ break;
}
- return null;
+
+ return CompletedTask<object>.Default;
}
}
.ToLower();
+
+ var account = accountInfo;
var pendingEntries = from state in FileState.Queryable
where state.FileStatus != FileStatus.Unchanged &&
!state.FilePath.StartsWith(cachePath) &&
!state.FilePath.EndsWith(".ignore") &&
- state.FilePath.StartsWith(accountInfo.AccountPath)
+ state.FilePath.StartsWith(account.AccountPath)
select state;
+ var pendingStates = new List<WorkflowState>();
+ foreach (var state in pendingEntries)
+ {
+ pendingStates.Add(new WorkflowState(account, state));
+ }
if (Log.IsDebugEnabled)
- Log.DebugFormat("Found {0} interrupted files",pendingEntries.Count());
-
- var validEntries = from state in pendingEntries
- select new WorkflowState
- {
- AccountInfo=accountInfo,
- Path = state.FilePath.ToLower(),
- FileName = Path.GetFileName(state.FilePath).ToLower(),
- Hash = state.Checksum,
- Status = state.OverlayStatus == FileOverlayStatus.Unversioned
- ? FileStatus.Created
- : state.FileStatus,
- TriggeringChange =
- state.OverlayStatus == FileOverlayStatus.Unversioned
- ? WatcherChangeTypes.Created
- : WatcherChangeTypes.Changed
- };
- foreach (var entry in validEntries)
- {
- Post(entry);
+ Log.DebugFormat("Found {0} interrupted files", pendingStates.Count);
+
+
+ foreach (var entry in pendingStates)
+ {
+ //Remove invalid state
+ if (Directory.Exists(entry.Path))
+ {
+ Debug.Assert(false, "State has path instead of file", entry.Path);
+ StatusKeeper.ClearFileStatus(entry.Path);
+ return;
+ }
+ else
+ Post(entry);
}
}
- }
+ }
+
-
public void Post(WorkflowState workflowState)
{
if (Log.IsDebugEnabled)
Log.DebugFormat("Posted {0} {1} {2}", workflowState.Path, workflowState.Status, workflowState.TriggeringChange);
- Agent.Post(workflowState);
- }
+
+ //Remove invalid state
+ Debug.Assert(!Directory.Exists(workflowState.Path), "State has path instead of file", workflowState.Path);
+
+ Debug.Assert(workflowState.Path.StartsWith(workflowState.AccountInfo.AccountPath, StringComparison.InvariantCultureIgnoreCase), "File from wrong account posted");
+
+ _agent.Post(workflowState);
+ }
+
}
}