Revision 5e31048f trunk/Pithos.Core/Agents/FileAgent.cs
b/trunk/Pithos.Core/Agents/FileAgent.cs | ||
---|---|---|
7 | 7 |
using System.Linq; |
8 | 8 |
using System.Text; |
9 | 9 |
using System.Threading.Tasks; |
10 |
using System.Threading.Tasks.Dataflow; |
|
10 | 11 |
using Pithos.Interfaces; |
11 | 12 |
using Pithos.Network; |
12 | 13 |
using log4net; |
... | ... | |
17 | 18 |
[Export] |
18 | 19 |
public class FileAgent |
19 | 20 |
{ |
20 |
Agent<WorkflowState> _agent; |
|
21 |
//Agent<WorkflowState> _agent; |
|
22 |
TransformBlock<WorkflowState, WorkflowState> _agent; |
|
21 | 23 |
private FileSystemWatcher _watcher; |
22 | 24 |
|
23 | 25 |
[Import] |
... | ... | |
33 | 35 |
|
34 | 36 |
private static readonly ILog Log = LogManager.GetLogger("FileAgent"); |
35 | 37 |
|
36 |
public void Start(AccountInfo accountInfo,string rootPath)
|
|
38 |
public FileAgent()
|
|
37 | 39 |
{ |
38 |
if (accountInfo==null) |
|
39 |
throw new ArgumentNullException("accountInfo"); |
|
40 |
if (String.IsNullOrWhiteSpace(rootPath)) |
|
41 |
throw new ArgumentNullException("rootPath"); |
|
42 |
if (!Path.IsPathRooted(rootPath)) |
|
43 |
throw new ArgumentException("rootPath must be an absolute path","rootPath"); |
|
44 |
Contract.EndContractBlock(); |
|
45 |
|
|
46 |
AccountInfo = accountInfo; |
|
47 |
RootPath = rootPath; |
|
48 |
_watcher = new FileSystemWatcher(rootPath); |
|
49 |
_watcher.IncludeSubdirectories = true; |
|
50 |
_watcher.Changed += OnFileEvent; |
|
51 |
_watcher.Created += OnFileEvent; |
|
52 |
_watcher.Deleted += OnFileEvent; |
|
53 |
_watcher.Renamed += OnRenameEvent; |
|
54 |
_watcher.EnableRaisingEvents = true; |
|
55 |
|
|
56 |
|
|
57 |
_agent = Agent<WorkflowState>.Start(inbox => |
|
58 |
{ |
|
59 |
Action loop = null; |
|
60 |
loop = () => |
|
61 |
{ |
|
62 |
var message = inbox.Receive(); |
|
63 |
var process=message.Then(Process,inbox.CancellationToken); |
|
64 |
|
|
65 |
inbox.LoopAsync(process,loop,ex=> |
|
66 |
Log.ErrorFormat("[ERROR] File Event Processing:\r{0}", ex)); |
|
67 |
}; |
|
68 |
loop(); |
|
69 |
}); |
|
40 |
_agent = new TransformBlock<WorkflowState, WorkflowState>(async message => |
|
41 |
await TaskEx.Run(()=>Process(message))); |
|
70 | 42 |
} |
71 | 43 |
|
72 |
private Task<object> Process(WorkflowState state)
|
|
44 |
private WorkflowState Process(WorkflowState message)
|
|
73 | 45 |
{ |
74 |
if (state==null) |
|
75 |
throw new ArgumentNullException("state"); |
|
76 |
Contract.EndContractBlock(); |
|
77 |
|
|
78 |
Debug.Assert(!Ignore(state.Path)); |
|
79 |
|
|
80 |
var networkState = NetworkGate.GetNetworkState(state.Path); |
|
81 |
//Skip if the file is already being downloaded or uploaded and |
|
82 |
//the change is create or modify |
|
83 |
if (networkState != NetworkOperation.None && |
|
84 |
( |
|
85 |
state.TriggeringChange == WatcherChangeTypes.Created || |
|
86 |
state.TriggeringChange == WatcherChangeTypes.Changed |
|
87 |
)) |
|
88 |
return CompletedTask<object>.Default; |
|
46 |
Debug.Assert(!Ignore(message.Path)); |
|
89 | 47 |
|
90 | 48 |
try |
91 | 49 |
{ |
92 |
UpdateFileStatus(state); |
|
93 |
UpdateOverlayStatus(state); |
|
94 |
UpdateFileChecksum(state); |
|
95 |
WorkflowAgent.Post(state); |
|
50 |
//Skip if the file is already being downloaded or uploaded and |
|
51 |
//the change is create or modify |
|
52 |
var networkState = NetworkGate.GetNetworkState(message.Path); |
|
53 |
if (networkState != NetworkOperation.None && |
|
54 |
(message.TriggeringChange == WatcherChangeTypes.Created || |
|
55 |
message.TriggeringChange == WatcherChangeTypes.Changed)) |
|
56 |
return null; |
|
57 |
|
|
58 |
UpdateFileStatus(message); |
|
59 |
UpdateOverlayStatus(message); |
|
60 |
UpdateFileChecksum(message); |
|
61 |
return message; |
|
96 | 62 |
} |
97 | 63 |
catch (IOException exc) |
98 | 64 |
{ |
99 |
if (File.Exists(state.Path))
|
|
65 |
if (File.Exists(message.Path))
|
|
100 | 66 |
{ |
101 |
Log.WarnFormat("File access error occured, retrying {0}\n{1}", state.Path, exc);
|
|
102 |
_agent.Post(state);
|
|
67 |
Log.WarnFormat("File access error occured, retrying {0}\n{1}", message.Path, exc);
|
|
68 |
_agent.Post(message);
|
|
103 | 69 |
} |
104 | 70 |
else |
105 | 71 |
{ |
106 |
Log.WarnFormat("File {0} does not exist. Will be ignored\n{1}", state.Path, exc);
|
|
72 |
Log.WarnFormat("File {0} does not exist. Will be ignored\n{1}", message.Path, exc);
|
|
107 | 73 |
} |
108 | 74 |
} |
109 | 75 |
catch (Exception exc) |
110 | 76 |
{ |
111 |
Log.WarnFormat("Error occured while indexing{0. The file will be skipped}\n{1}", state.Path, exc); |
|
77 |
Log.WarnFormat("Error occured while indexing{0}. The file will be skipped}\n{1}", |
|
78 |
message.Path, exc); |
|
112 | 79 |
} |
113 |
return CompletedTask<object>.Default; |
|
114 |
} |
|
115 | 80 |
|
81 |
return null; |
|
82 |
} |
|
116 | 83 |
|
117 |
/* |
|
118 |
private Task Process(Task<WorkflowState> action) |
|
84 |
public void Start(AccountInfo accountInfo,string rootPath) |
|
119 | 85 |
{ |
120 |
return action.ContinueWith(t => Process(t.Result)); |
|
121 |
} |
|
122 |
*/ |
|
86 |
if (accountInfo==null) |
|
87 |
throw new ArgumentNullException("accountInfo"); |
|
88 |
if (String.IsNullOrWhiteSpace(rootPath)) |
|
89 |
throw new ArgumentNullException("rootPath"); |
|
90 |
if (!Path.IsPathRooted(rootPath)) |
|
91 |
throw new ArgumentException("rootPath must be an absolute path","rootPath"); |
|
92 |
Contract.EndContractBlock(); |
|
93 |
|
|
94 |
_agent.LinkTo(WorkflowAgent.Agent); |
|
123 | 95 |
|
96 |
AccountInfo = accountInfo; |
|
97 |
RootPath = rootPath; |
|
98 |
_watcher = new FileSystemWatcher(rootPath); |
|
99 |
_watcher.IncludeSubdirectories = true; |
|
100 |
_watcher.Changed += OnFileEvent; |
|
101 |
_watcher.Created += OnFileEvent; |
|
102 |
_watcher.Deleted += OnFileEvent; |
|
103 |
_watcher.Renamed += OnRenameEvent; |
|
104 |
_watcher.EnableRaisingEvents = true; |
|
105 |
|
|
106 |
} |
|
124 | 107 |
|
125 | 108 |
public bool Pause |
126 | 109 |
{ |
... | ... | |
164 | 147 |
_watcher = null; |
165 | 148 |
|
166 | 149 |
if (_agent!=null) |
167 |
_agent.Stop();
|
|
150 |
_agent.Complete();
|
|
168 | 151 |
} |
169 | 152 |
|
170 | 153 |
// Enumerate all files in the Pithos directory except those in the Fragment folder |
Also available in: Unified diff