Statistics
| Branch: | Revision:

root / trunk / Pithos.Core / Agents / FileAgent.cs @ 77e10b4f

History | View | Annotate | Download (13.2 kB)

1
using System;
2
using System.Collections.Generic;
3
using System.ComponentModel.Composition;
4
using System.Diagnostics;
5
using System.Diagnostics.Contracts;
6
using System.IO;
7
using System.Linq;
8
using System.Text;
9
using System.Threading.Tasks;
10
using Pithos.Interfaces;
11
using Pithos.Network;
12
using log4net;
13
using log4net.Core;
14

    
15
namespace Pithos.Core.Agents
16
{
17
    [Export]
18
    public class FileAgent
19
    {
20
        Agent<WorkflowState> _agent;
21
        private FileSystemWatcher _watcher;
22

    
23
        [Import]
24
        public IStatusKeeper StatusKeeper { get; set; }
25
        [Import]
26
        public IPithosWorkflow Workflow { get; set; }
27
        [Import]
28
        public WorkflowAgent WorkflowAgent { get; set; }
29

    
30
        private AccountInfo AccountInfo { get; set; }
31

    
32
        private string RootPath { get;  set; }
33

    
34
        private static readonly ILog Log = LogManager.GetLogger("FileAgent");
35

    
36
        public void Start(AccountInfo accountInfo,string rootPath)
37
        {
38
            if (accountInfo==null)
39
                throw new ArgumentNullException("accountInfo");
40
            if (String.IsNullOrWhiteSpace(rootPath))
41
                throw new ArgumentNullException("rootPath");
42
            if (!Path.IsPathRooted(rootPath))
43
                throw new ArgumentException("rootPath must be an absolute path","rootPath");
44
            Contract.EndContractBlock();
45

    
46
            AccountInfo = accountInfo;
47
            RootPath = rootPath;
48
            _watcher = new FileSystemWatcher(rootPath);
49
            _watcher.IncludeSubdirectories = true;            
50
            _watcher.Changed += OnFileEvent;
51
            _watcher.Created += OnFileEvent;
52
            _watcher.Deleted += OnFileEvent;
53
            _watcher.Renamed += OnRenameEvent;
54
            _watcher.EnableRaisingEvents = true;
55

    
56

    
57
            _agent = Agent<WorkflowState>.Start(inbox =>
58
            {
59
                Action loop = null;
60
                loop = () =>
61
                {
62
                    var message = inbox.Receive();
63
                    var process=message.Then(Process,inbox.CancellationToken);
64

    
65
                    inbox.LoopAsync(process,loop,ex=>
66
                        Log.ErrorFormat("[ERROR] File Event Processing:\r{0}", ex));
67
                };
68
                loop();
69
            });
70
        }
71

    
72
        private Task<object> Process(WorkflowState state)
73
        {
74
            if (state==null)
75
                throw new ArgumentNullException("state");
76
            Contract.EndContractBlock();
77

    
78
            Debug.Assert(!Ignore(state.Path));
79

    
80
            var networkState = NetworkGate.GetNetworkState(state.Path);
81
            //Skip if the file is already being downloaded or uploaded and 
82
            //the change is create or modify
83
            if (networkState != NetworkOperation.None &&
84
                (
85
                    state.TriggeringChange == WatcherChangeTypes.Created ||
86
                    state.TriggeringChange == WatcherChangeTypes.Changed
87
                ))
88
                return CompletedTask<object>.Default;
89

    
90
            try
91
            {
92
                UpdateFileStatus(state);
93
                UpdateOverlayStatus(state);
94
                UpdateFileChecksum(state);
95
                WorkflowAgent.Post(state);
96
            }
97
            catch (IOException exc)
98
            {
99
                if (File.Exists(state.Path))
100
                {
101
                    Log.WarnFormat("File access error occured, retrying {0}\n{1}", state.Path, exc);
102
                    _agent.Post(state);
103
                }
104
                else
105
                {
106
                    Log.WarnFormat("File {0} does not exist. Will be ignored\n{1}", state.Path, exc);
107
                }
108
            }
109
            catch (Exception exc)
110
            {
111
                Log.WarnFormat("Error occured while indexing{0. The file will be skipped}\n{1}", state.Path, exc);
112
            }
113
            return CompletedTask<object>.Default;
114
        }
115

    
116

    
117
/*
118
        private Task Process(Task<WorkflowState> action)
119
        {
120
            return action.ContinueWith(t => Process(t.Result));
121
        }
122
*/
123

    
124

    
125
        public bool Pause
126
        {
127
            get { return _watcher == null || !_watcher.EnableRaisingEvents; }
128
            set
129
            {
130
                if (_watcher != null)
131
                    _watcher.EnableRaisingEvents = !value;                
132
            }
133
        }
134

    
135
        public string CachePath { get; set; }
136

    
137
        private List<string> _selectivePaths = new List<string>();
138
        public List<string> SelectivePaths
139
        {
140
            get { return _selectivePaths; }
141
            set { _selectivePaths = value; }
142
        }
143

    
144

    
145
        public void Post(WorkflowState workflowState)
146
        {
147
            if (workflowState == null)
148
                throw new ArgumentNullException("workflowState");
149
            Contract.EndContractBlock();
150

    
151
            _agent.Post(workflowState);
152
        }
153

    
154
        public void Stop()
155
        {
156
            if (_watcher != null)
157
            {
158
                _watcher.Changed -= OnFileEvent;
159
                _watcher.Created -= OnFileEvent;
160
                _watcher.Deleted -= OnFileEvent;
161
                _watcher.Renamed -= OnRenameEvent;
162
                _watcher.Dispose();
163
            }
164
            _watcher = null;
165

    
166
            if (_agent!=null)
167
                _agent.Stop();
168
        }
169

    
170
        // Enumerate all files in the Pithos directory except those in the Fragment folder
171
        // and files with a .ignore extension
172
        public IEnumerable<string> EnumerateFiles(string searchPattern="*")
173
        {
174
            var monitoredFiles = from filePath in Directory.EnumerateFileSystemEntries(RootPath, searchPattern, SearchOption.AllDirectories)
175
                                 where !Ignore(filePath)
176
                                 select filePath;
177
            return monitoredFiles;
178
        }
179

    
180
        public IEnumerable<FileInfo> EnumerateFileInfos(string searchPattern="*")
181
        {
182
            var rootDir = new DirectoryInfo(RootPath);
183
            var monitoredFiles = from file in rootDir.EnumerateFiles(searchPattern, SearchOption.AllDirectories)
184
                                 where !Ignore(file.FullName)
185
                                 select file;
186
            return monitoredFiles;
187
        }                
188

    
189
        public IEnumerable<string> EnumerateFilesAsRelativeUrls(string searchPattern="*")
190
        {
191
            var rootDir = new DirectoryInfo(RootPath);
192
            var monitoredFiles = from file in rootDir.EnumerateFiles(searchPattern, SearchOption.AllDirectories)
193
                                 where !Ignore(file.FullName)
194
                                 select file.AsRelativeUrlTo(RootPath);
195
            return monitoredFiles;
196
        }                
197

    
198

    
199
        
200

    
201
        private bool Ignore(string filePath)
202
        {
203
            if (filePath.StartsWith(CachePath))
204
                return true;
205
            if (_ignoreFiles.ContainsKey(filePath.ToLower()))
206
                return true;
207
            return false;
208
        }
209

    
210
        //Post a Change message for all events except rename
211
        void OnFileEvent(object sender, FileSystemEventArgs e)
212
        {
213
            //Ignore events that affect the cache folder
214
            var filePath = e.FullPath;
215
            if (Ignore(filePath)) 
216
                return;           
217
            
218
            _agent.Post(new WorkflowState{AccountInfo=AccountInfo, Path = filePath, FileName = e.Name, TriggeringChange = e.ChangeType });
219
        }
220

    
221

    
222
        //Post a Change message for renames containing the old and new names
223
        void OnRenameEvent(object sender, RenamedEventArgs e)
224
        {
225
            var oldFullPath = e.OldFullPath;
226
            var fullPath = e.FullPath;
227
            if (Ignore(oldFullPath) || Ignore(fullPath))
228
                return;
229

    
230
            _agent.Post(new WorkflowState
231
            {
232
                AccountInfo=AccountInfo,
233
                OldPath = oldFullPath,
234
                OldFileName = e.OldName,
235
                Path = fullPath,
236
                FileName = e.Name,
237
                TriggeringChange = e.ChangeType
238
            });
239
        }
240

    
241

    
242

    
243
        private Dictionary<WatcherChangeTypes, FileStatus> _statusDict = new Dictionary<WatcherChangeTypes, FileStatus>
244
        {
245
            {WatcherChangeTypes.Created,FileStatus.Created},
246
            {WatcherChangeTypes.Changed,FileStatus.Modified},
247
            {WatcherChangeTypes.Deleted,FileStatus.Deleted},
248
            {WatcherChangeTypes.Renamed,FileStatus.Renamed}
249
        };
250

    
251
        private Dictionary<string,string> _ignoreFiles=new Dictionary<string, string>();
252

    
253
        private WorkflowState UpdateFileStatus(WorkflowState state)
254
        {
255
            if (state==null)
256
                throw new ArgumentNullException("state");
257
            if (String.IsNullOrWhiteSpace(state.Path))
258
                throw new ArgumentException("The state's Path can't be empty","state");
259
            Contract.EndContractBlock();
260

    
261
            var path = state.Path;
262
            var status = _statusDict[state.TriggeringChange];
263
            var oldStatus = Workflow.StatusKeeper.GetFileStatus(path);
264
            if (status == oldStatus)
265
            {
266
                state.Status = status;
267
                state.Skip = true;
268
                return state;
269
            }
270
            if (state.Status == FileStatus.Renamed)
271
                Workflow.ClearFileStatus(path);
272

    
273
            state.Status = Workflow.SetFileStatus(path, status);
274
            return state;
275
        }
276

    
277
        private WorkflowState UpdateOverlayStatus(WorkflowState state)
278
        {
279
            if (state==null)
280
                throw new ArgumentNullException("state");
281
            Contract.EndContractBlock();
282

    
283
            if (state.Skip)
284
                return state;
285

    
286
            switch (state.Status)
287
            {
288
                case FileStatus.Created:
289
                case FileStatus.Modified:
290
                    this.StatusKeeper.SetFileOverlayStatus(state.Path, FileOverlayStatus.Modified);
291
                    break;
292
                case FileStatus.Deleted:
293
                    //this.StatusAgent.RemoveFileOverlayStatus(state.Path);
294
                    break;
295
                case FileStatus.Renamed:
296
                    this.StatusKeeper.ClearFileStatus(state.OldPath);
297
                    this.StatusKeeper.SetFileOverlayStatus(state.Path, FileOverlayStatus.Modified);
298
                    break;
299
                case FileStatus.Unchanged:
300
                    this.StatusKeeper.SetFileOverlayStatus(state.Path, FileOverlayStatus.Normal);
301
                    break;
302
            }
303

    
304
            if (state.Status == FileStatus.Deleted)
305
                NativeMethods.RaiseChangeNotification(Path.GetDirectoryName(state.Path));
306
            else
307
                NativeMethods.RaiseChangeNotification(state.Path);
308
            return state;
309
        }
310

    
311

    
312
        private WorkflowState UpdateFileChecksum(WorkflowState state)
313
        {
314
            if (state.Skip)
315
                return state;
316

    
317
            if (state.Status == FileStatus.Deleted)
318
                return state;
319

    
320
            var path = state.Path;
321
            //Skip calculation for folders
322
            if (Directory.Exists(path))
323
                return state;
324

    
325
            var info = new FileInfo(path);
326
            string hash = info.CalculateHash(StatusKeeper.BlockSize,StatusKeeper.BlockHash);
327
            StatusKeeper.UpdateFileChecksum(path, hash);
328

    
329
            state.Hash = hash;
330
            return state;
331
        }
332

    
333
        //Does the file exist in the container's local folder?
334
        public bool Exists(string relativePath)
335
        {
336
            if (String.IsNullOrWhiteSpace(relativePath))
337
                throw new ArgumentNullException("relativePath");
338
            //A RootPath must be set before calling this method
339
            if (String.IsNullOrWhiteSpace(RootPath))
340
                throw new InvalidOperationException("RootPath was not set");
341
            Contract.EndContractBlock();
342
            //Create the absolute path by combining the RootPath with the relativePath
343
            var absolutePath=Path.Combine(RootPath, relativePath);
344
            //Is this a valid file?
345
            if (File.Exists(absolutePath))
346
                return true;
347
            //Or a directory?
348
            if (Directory.Exists(absolutePath))
349
                return true;
350
            //Fail if it is neither
351
            return false;
352
        }
353

    
354
        public FileInfo GetFileInfo(string relativePath)
355
        {
356
            if (String.IsNullOrWhiteSpace(relativePath))
357
                throw new ArgumentNullException("relativePath");
358
            //A RootPath must be set before calling this method
359
            if (String.IsNullOrWhiteSpace(RootPath))
360
                throw new InvalidOperationException("RootPath was not set");            
361
            Contract.EndContractBlock();            
362

    
363
            var absolutePath = Path.Combine(RootPath, relativePath);
364
//            Debug.Assert(File.Exists(absolutePath),String.Format("Path {0} doesn't exist",absolutePath));
365

    
366
            return new FileInfo(absolutePath);
367
            
368
        }
369

    
370
        public void Delete(string relativePath)
371
        {
372
            var absolutePath = Path.Combine(RootPath, relativePath);
373
            if (File.Exists(absolutePath))
374
            {                   
375
                File.Delete(absolutePath);
376
                _ignoreFiles[absolutePath.ToLower()] = absolutePath.ToLower();                
377
            }
378
            StatusKeeper.ClearFileStatus(absolutePath);
379
        }
380
    }
381
}