root / trunk / Pithos.Core / Agents / WorkflowAgent.cs @ 5e31048f
History | View | Annotate | Download (6.6 kB)
1 |
using System; |
---|---|
2 |
using System.Collections.Generic; |
3 |
using System.ComponentModel.Composition; |
4 |
using System.Diagnostics; |
5 |
using System.Diagnostics.Contracts; |
6 |
using System.IO; |
7 |
using System.Linq; |
8 |
using System.Text; |
9 |
using System.Threading.Tasks; |
10 |
using System.Threading.Tasks.Dataflow; |
11 |
using Pithos.Interfaces; |
12 |
using Pithos.Network; |
13 |
using log4net; |
14 |
|
15 |
namespace Pithos.Core.Agents |
16 |
{ |
17 |
[Export] |
18 |
public class WorkflowAgent |
19 |
{ |
20 |
private ActionBlock<WorkflowState> _agent; |
21 |
|
22 |
public IStatusNotification StatusNotification { get; set; } |
23 |
[Import] |
24 |
public IStatusKeeper StatusKeeper { get; set; } |
25 |
|
26 |
[Import] |
27 |
public NetworkAgent NetworkAgent { get; set; } |
28 |
|
29 |
public ActionBlock<WorkflowState> Agent |
30 |
{ |
31 |
get { return _agent; } |
32 |
} |
33 |
|
34 |
private static readonly ILog Log = LogManager.GetLogger("WorkflowAgent"); |
35 |
|
36 |
public void Start() |
37 |
{ |
38 |
/*_agent = new ActionBlock<WorkflowState>(message => |
39 |
{ |
40 |
Action loop = null; |
41 |
loop = () => |
42 |
{ |
43 |
//var message = inbox.Receive(); |
44 |
Process(message); |
45 |
var process = message.Then(Process, inbox.CancellationToken); |
46 |
inbox.LoopAsync(process,loop,ex=> |
47 |
Log.ErrorFormat("[ERROR] Synch for {0}:\r{1}", message.Result.FileName, ex)); |
48 |
}; |
49 |
loop(); |
50 |
});*/ |
51 |
_agent = new ActionBlock<WorkflowState>(async message => |
52 |
{ |
53 |
try |
54 |
{ |
55 |
var action=await TaskEx.Run(()=>Process(message)); |
56 |
if (action!=null) |
57 |
NetworkAgent.Post(action); |
58 |
} |
59 |
catch (Exception ex) |
60 |
{ |
61 |
Log.ErrorFormat("[ERROR] Synch for {0}:\r{1}", message.FileName, ex); |
62 |
} |
63 |
}); |
64 |
} |
65 |
|
66 |
private CloudAction Process(WorkflowState state) |
67 |
{ |
68 |
var accountInfo = state.AccountInfo; |
69 |
using (log4net.ThreadContext.Stacks["Workflow"].Push("Process")) |
70 |
{ |
71 |
if (Log.IsDebugEnabled) Log.DebugFormat("State {0} {1} {2}", state.FileName,state.Status,state.TriggeringChange); |
72 |
|
73 |
if (state.Skip) |
74 |
{ |
75 |
if (Log.IsDebugEnabled) Log.DebugFormat("Skipping {0}",state.FileName); |
76 |
|
77 |
return null; |
78 |
} |
79 |
string path = state.Path.ToLower(); |
80 |
|
81 |
//Bypass deleted files, unless the status is Deleted |
82 |
if (!File.Exists(path) && state.Status != FileStatus.Deleted) |
83 |
{ |
84 |
state.Skip = true; |
85 |
this.StatusKeeper.ClearFileStatus(path); |
86 |
|
87 |
if (Log.IsDebugEnabled) Log.DebugFormat("Skipped missing {0}", state.FileName); |
88 |
|
89 |
return null; |
90 |
} |
91 |
|
92 |
var fileState = FileState.FindByFilePath(path); |
93 |
var info = new FileInfo(path); |
94 |
|
95 |
|
96 |
switch (state.Status) |
97 |
{ |
98 |
case FileStatus.Created: |
99 |
case FileStatus.Modified: |
100 |
return new CloudUploadAction(accountInfo,info, fileState, accountInfo.BlockSize, |
101 |
accountInfo.BlockHash); |
102 |
case FileStatus.Deleted: |
103 |
return new CloudDeleteAction(accountInfo,info, fileState); |
104 |
case FileStatus.Renamed: |
105 |
return new CloudMoveAction(accountInfo,CloudActionType.RenameCloud, |
106 |
new FileInfo(state.OldPath), |
107 |
new FileInfo(state.Path)); |
108 |
} |
109 |
return null; |
110 |
} |
111 |
} |
112 |
|
113 |
|
114 |
//Starts interrupted files for a specific account |
115 |
public void RestartInterruptedFiles(AccountInfo accountInfo) |
116 |
{ |
117 |
|
118 |
StatusNotification.NotifyChange("Restart processing interrupted files", TraceLevel.Verbose); |
119 |
|
120 |
using (log4net.ThreadContext.Stacks["Workflow"].Push("Restart")) |
121 |
{ |
122 |
if (Log.IsDebugEnabled) |
123 |
Log.Debug("Starting interrupted files"); |
124 |
|
125 |
var cachePath = Path.Combine(accountInfo.AccountPath, FolderConstants.CacheFolder) |
126 |
.ToLower(); |
127 |
|
128 |
|
129 |
var pendingEntries = from state in FileState.Queryable |
130 |
where state.FileStatus != FileStatus.Unchanged && |
131 |
!state.FilePath.StartsWith(cachePath) && |
132 |
!state.FilePath.EndsWith(".ignore") && |
133 |
state.FilePath.StartsWith(accountInfo.AccountPath) |
134 |
select state; |
135 |
if (Log.IsDebugEnabled) |
136 |
Log.DebugFormat("Found {0} interrupted files",pendingEntries.Count()); |
137 |
|
138 |
var validEntries = from state in pendingEntries |
139 |
select new WorkflowState |
140 |
{ |
141 |
AccountInfo=accountInfo, |
142 |
Path = state.FilePath.ToLower(), |
143 |
FileName = Path.GetFileName(state.FilePath).ToLower(), |
144 |
Hash = state.Checksum, |
145 |
Status = state.OverlayStatus == FileOverlayStatus.Unversioned |
146 |
? FileStatus.Created |
147 |
: state.FileStatus, |
148 |
TriggeringChange = |
149 |
state.OverlayStatus == FileOverlayStatus.Unversioned |
150 |
? WatcherChangeTypes.Created |
151 |
: WatcherChangeTypes.Changed |
152 |
}; |
153 |
foreach (var entry in validEntries) |
154 |
{ |
155 |
Post(entry); |
156 |
} |
157 |
} |
158 |
} |
159 |
|
160 |
|
161 |
|
162 |
public void Post(WorkflowState workflowState) |
163 |
{ |
164 |
if (Log.IsDebugEnabled) |
165 |
Log.DebugFormat("Posted {0} {1} {2}", workflowState.Path, workflowState.Status, workflowState.TriggeringChange); |
166 |
Agent.Post(workflowState); |
167 |
} |
168 |
} |
169 |
} |