using System.Text;
using System.Threading.Tasks;
using Pithos.Interfaces;
+using log4net;
namespace Pithos.Core.Agents
{
[Import]
public NetworkAgent NetworkAgent { get; set; }
+ private static readonly ILog Log = LogManager.GetLogger("WorkflowAgent");
+
public void Start()
{
_agent = Agent<WorkflowState>.Start(inbox =>
var message = inbox.Receive();
var process = message.Then(Process, inbox.CancellationToken);
inbox.LoopAsync(process,loop,ex=>
- Trace.TraceError("[ERROR] Synch for {0}:\r{1}", message.Result.FileName, ex));
+ Log.ErrorFormat("[ERROR] Synch for {0}:\r{1}", message.Result.FileName, ex));
};
loop();
});
private Task<object> Process(WorkflowState state)
{
- if (state.Skip)
- return CompletedTask<object>.Default;
- string path = state.Path.ToLower();
-
- //Bypass deleted files, unless the status is Deleted
- if (!File.Exists(path) && state.Status != FileStatus.Deleted)
+ using (log4net.ThreadContext.Stacks["Workflow"].Push("Process"))
{
- state.Skip = true;
- this.StatusKeeper.ClearFileStatus(path);
- return CompletedTask<object>.Default;
- }
- var fileState = FileState.FindByFilePath(path);
- var blockHash = NetworkAgent.BlockHash;
- var blockSize = NetworkAgent.BlockSize;
- var info = new FileInfo(path);
+ if (Log.IsDebugEnabled) Log.DebugFormat("State {0} {1} {2}", state.FileName,state.Status,state.TriggeringChange);
- switch (state.Status)
- {
- case FileStatus.Created:
- case FileStatus.Modified:
- NetworkAgent.Post(new CloudAction(CloudActionType.UploadUnconditional, info, ObjectInfo.Empty, fileState,blockSize,blockHash));
- break;
- case FileStatus.Deleted:
- string fileName = info.AsRelativeUrlTo(NetworkAgent.FileAgent.RootPath);
- NetworkAgent.Post(new CloudAction(CloudActionType.DeleteCloud, null, new ObjectInfo { Name = fileName }, fileState, blockSize, blockHash));
- break;
- case FileStatus.Renamed:
- NetworkAgent.Post(new CloudMoveAction(CloudActionType.RenameCloud, state.OldFileName, state.OldPath, state.FileName, state.Path));
- break;
- }
+ if (state.Skip)
+ {
+ if (Log.IsDebugEnabled) Log.DebugFormat("Skipping {0}",state.FileName);
+
+ return CompletedTask<object>.Default;
+ }
+ string path = state.Path.ToLower();
+
+ //Bypass deleted files, unless the status is Deleted
+ if (!File.Exists(path) && state.Status != FileStatus.Deleted)
+ {
+ state.Skip = true;
+ this.StatusKeeper.ClearFileStatus(path);
+
+ if (Log.IsDebugEnabled) Log.DebugFormat("Skipped missing {0}", state.FileName);
+
+ return CompletedTask<object>.Default;
+ }
+ var fileState = FileState.FindByFilePath(path);
+ var blockHash = NetworkAgent.BlockHash;
+ var blockSize = NetworkAgent.BlockSize;
+ var info = new FileInfo(path);
+
+ switch (state.Status)
+ {
+ case FileStatus.Created:
+ case FileStatus.Modified:
+ NetworkAgent.Post(new CloudUploadAction(info, fileState, blockSize, blockHash));
+ break;
+ case FileStatus.Deleted:
+ string fileName = info.AsRelativeUrlTo(NetworkAgent.FileAgent.RootPath);
+ NetworkAgent.Post(new CloudDeleteAction(fileName, fileState));
+ break;
+ case FileStatus.Renamed:
+ NetworkAgent.Post(new CloudMoveAction(CloudActionType.RenameCloud, state.OldFileName,
+ state.OldPath, state.FileName, state.Path));
+ break;
+ }
- return CompletedTask<object>.Default;
+ return CompletedTask<object>.Default;
+ }
}
public void RestartInterruptedFiles()
{
- StatusNotification.NotifyChange("Restart processing interrupted files", TraceLevel.Verbose);
-
- var pendingEntries = from state in FileState.Queryable
- where state.FileStatus != FileStatus.Unchanged &&
- !state.FilePath.StartsWith(FragmentsPath.ToLower()) &&
- !state.FilePath.EndsWith(".ignore")
- select state;
-
- var validEntries = from state in pendingEntries
- select new WorkflowState
- {
- Path = state.FilePath.ToLower(),
- FileName = Path.GetFileName(state.FilePath).ToLower(),
- Hash = state.Checksum,
- Status = state.OverlayStatus == FileOverlayStatus.Unversioned ?
- FileStatus.Created :
- state.FileStatus,
- TriggeringChange = state.OverlayStatus == FileOverlayStatus.Unversioned ?
- WatcherChangeTypes.Created :
- WatcherChangeTypes.Changed
- };
- foreach (var entry in validEntries)
- {
- Post(entry);
- }
+ StatusNotification.NotifyChange("Restart processing interrupted files", TraceLevel.Verbose);
+ using (log4net.ThreadContext.Stacks["Workflow"].Push("Restart"))
+ {
+ if (Log.IsDebugEnabled)
+ Log.Debug("Starting interrupted files");
+
+ var pendingEntries = from state in FileState.Queryable
+ where state.FileStatus != FileStatus.Unchanged &&
+ !state.FilePath.StartsWith(FragmentsPath.ToLower()) &&
+ !state.FilePath.EndsWith(".ignore")
+ select state;
+ if (Log.IsDebugEnabled)
+ Log.DebugFormat("Found {0} interrupted files",pendingEntries.Count());
+
+ var validEntries = from state in pendingEntries
+ select new WorkflowState
+ {
+ Path = state.FilePath.ToLower(),
+ FileName = Path.GetFileName(state.FilePath).ToLower(),
+ Hash = state.Checksum,
+ Status = state.OverlayStatus == FileOverlayStatus.Unversioned
+ ? FileStatus.Created
+ : state.FileStatus,
+ TriggeringChange =
+ state.OverlayStatus == FileOverlayStatus.Unversioned
+ ? WatcherChangeTypes.Created
+ : WatcherChangeTypes.Changed
+ };
+ foreach (var entry in validEntries)
+ {
+ Post(entry);
+ }
+ }
}
public void Post(WorkflowState workflowState)
{
+ if (Log.IsDebugEnabled)
+ Log.DebugFormat("Posted {0} {1} {2}", workflowState.Path, workflowState.Status, workflowState.TriggeringChange);
_agent.Post(workflowState);
}
}