2 using System.Collections.Generic;
3 using System.ComponentModel.Composition;
4 using System.Diagnostics;
5 using System.Diagnostics.Contracts;
9 using System.Threading.Tasks;
10 using Pithos.Interfaces;
12 namespace Pithos.Core.Agents
15 public class WorkflowAgent
17 Agent<WorkflowState> _agent;
19 public IStatusNotification StatusNotification { get; set; }
21 public IStatusKeeper StatusKeeper { get; set; }
23 //We should avoid processing files stored in the Fragments folder
24 //The Full path to the fragments folder is stored in FragmentsPath
25 public string FragmentsPath { get; set; }
28 public NetworkAgent NetworkAgent { get; set; }
32 _agent = Agent<WorkflowState>.Start(inbox =>
37 var message = inbox.Receive();
38 var process = message.Then(Process, inbox.CancellationToken);
39 inbox.LoopAsync(process,loop,ex=>
40 Trace.TraceError("[ERROR] Synch for {0}:\r{1}", message.Result.FileName, ex));
46 private Task<object> Process(WorkflowState state)
49 return CompletedTask<object>.Default;
50 string path = state.Path.ToLower();
51 string fileName = Path.GetFileName(path);
53 //Bypass deleted files, unless the status is Deleted
54 if (!File.Exists(path) && state.Status != FileStatus.Deleted)
57 this.StatusKeeper.RemoveFileOverlayStatus(path);
58 return CompletedTask<object>.Default;
60 var fileState = FileState.FindByFilePath(path);
61 var blockHash = NetworkAgent.BlockHash;
62 var blockSize = NetworkAgent.BlockSize;
66 case FileStatus.Created:
67 case FileStatus.Modified:
68 var info = new FileInfo(path);
69 NetworkAgent.Post(new CloudAction(CloudActionType.UploadUnconditional, info, ObjectInfo.Empty, fileState,blockSize,blockHash));
71 case FileStatus.Deleted:
72 NetworkAgent.Post(new CloudAction(CloudActionType.DeleteCloud, null, new ObjectInfo { Name = fileName }, fileState, blockSize, blockHash));
74 case FileStatus.Renamed:
75 NetworkAgent.Post(new CloudAction(CloudActionType.RenameCloud, state.OldFileName, state.OldPath, state.FileName, state.Path));
79 return CompletedTask<object>.Default;
84 public void RestartInterruptedFiles()
87 StatusNotification.NotifyChange("Restart processing interrupted files", TraceLevel.Verbose);
89 var pendingEntries = from state in FileState.Queryable
90 where state.FileStatus != FileStatus.Unchanged &&
91 !state.FilePath.StartsWith(FragmentsPath.ToLower()) &&
92 !state.FilePath.EndsWith(".ignore")
95 var validEntries = from state in pendingEntries
96 select new WorkflowState
98 Path = state.FilePath.ToLower(),
99 FileName = Path.GetFileName(state.FilePath).ToLower(),
100 Hash = state.Checksum,
101 Status = state.OverlayStatus == FileOverlayStatus.Unversioned ?
104 TriggeringChange = state.OverlayStatus == FileOverlayStatus.Unversioned ?
105 WatcherChangeTypes.Created :
106 WatcherChangeTypes.Changed
108 foreach (var entry in validEntries)
117 public void Post(WorkflowState workflowState)
119 _agent.Post(workflowState);