Statistics
| Branch: | Revision:

root / trunk / Pithos.Core / Agents / FileAgent.cs @ cfed7823

History | View | Annotate | Download (12.6 kB)

1
using System;
2
using System.Collections.Generic;
3
using System.ComponentModel.Composition;
4
using System.Diagnostics;
5
using System.Diagnostics.Contracts;
6
using System.IO;
7
using System.Linq;
8
using System.Text;
9
using System.Threading.Tasks;
10
using Pithos.Interfaces;
11
using Pithos.Network;
12

    
13
namespace Pithos.Core.Agents
14
{
15
    [Export]
16
    public class FileAgent
17
    {
18
        Agent<WorkflowState> _agent;
19
        private FileSystemWatcher _watcher;
20

    
21
        [Import]
22
        public IStatusKeeper StatusKeeper { get; set; }
23
        [Import]
24
        public IPithosWorkflow Workflow { get; set; }
25
        [Import]
26
        public WorkflowAgent WorkflowAgent { get; set; }
27

    
28
        public string RootPath { get; private set; }
29

    
30

    
31
        public void Start(string rootPath)
32
        {
33
            if (String.IsNullOrWhiteSpace(rootPath))
34
                throw new ArgumentNullException("rootPath");
35
            if (!Path.IsPathRooted(rootPath))
36
                throw new ArgumentException("rootPath must be an absolute path","rootPath");
37
            Contract.EndContractBlock();
38

    
39
            RootPath = rootPath;
40
            _watcher = new FileSystemWatcher(rootPath);
41
            _watcher.IncludeSubdirectories = true;            
42
            _watcher.Changed += OnFileEvent;
43
            _watcher.Created += OnFileEvent;
44
            _watcher.Deleted += OnFileEvent;
45
            _watcher.Renamed += OnRenameEvent;
46
            _watcher.EnableRaisingEvents = true;
47

    
48

    
49
            _agent = Agent<WorkflowState>.Start(inbox =>
50
            {
51
                Action loop = null;
52
                loop = () =>
53
                {
54
                    var message = inbox.Receive();
55
                    var process=message.Then(Process,inbox.CancellationToken);
56

    
57
                    inbox.LoopAsync(process,loop,ex=>
58
                        Trace.TraceError("[ERROR] File Event Processing:\r{0}", ex));
59
                };
60
                loop();
61
            });
62
        }
63

    
64
        private Task<object> Process(WorkflowState state)
65
        {
66
            if (state==null)
67
                throw new ArgumentNullException("state");
68
            Contract.EndContractBlock();
69

    
70
            Debug.Assert(!Ignore(state.Path));
71

    
72
            var networkState = NetworkGate.GetNetworkState(state.Path);
73
            //Skip if the file is already being downloaded or uploaded and 
74
            //the change is create or modify
75
            if (networkState != NetworkOperation.None &&
76
                (
77
                    state.TriggeringChange == WatcherChangeTypes.Created ||
78
                    state.TriggeringChange == WatcherChangeTypes.Changed
79
                ))
80
                return CompletedTask<object>.Default;
81

    
82
            try
83
            {
84
                UpdateFileStatus(state);
85
                UpdateOverlayStatus(state);
86
                UpdateFileChecksum(state);
87
                WorkflowAgent.Post(state);
88
            }
89
            catch (IOException exc)
90
            {
91
                if (File.Exists(state.Path))
92
                {
93
                    Trace.TraceWarning("File access error occured, retrying {0}\n{1}", state.Path, exc);
94
                    _agent.Post(state);
95
                }
96
                else
97
                {
98
                    Trace.TraceWarning("File {0} does not exist. Will be ignored\n{1}", state.Path, exc);
99
                }
100
            }
101
            catch (Exception exc)
102
            {
103
                Trace.TraceWarning("Error occured while indexing{0. The file will be skipped}\n{1}", state.Path, exc);
104
            }
105
            return CompletedTask<object>.Default;
106
        }
107

    
108

    
109
/*
110
        private Task Process(Task<WorkflowState> action)
111
        {
112
            return action.ContinueWith(t => Process(t.Result));
113
        }
114
*/
115

    
116

    
117
        public bool Pause
118
        {
119
            get { return _watcher == null || !_watcher.EnableRaisingEvents; }
120
            set
121
            {
122
                if (_watcher != null)
123
                    _watcher.EnableRaisingEvents = !value;                
124
            }
125
        }
126

    
127
        public string FragmentsPath { get; set; }
128

    
129
        public void Post(WorkflowState workflowState)
130
        {
131
            if (workflowState == null)
132
                throw new ArgumentNullException("workflowState");
133
            Contract.EndContractBlock();
134

    
135
            _agent.Post(workflowState);
136
        }
137

    
138
        public void Stop()
139
        {
140
            if (_watcher != null)
141
            {
142
                _watcher.Changed -= OnFileEvent;
143
                _watcher.Created -= OnFileEvent;
144
                _watcher.Deleted -= OnFileEvent;
145
                _watcher.Renamed -= OnRenameEvent;
146
                _watcher.Dispose();
147
            }
148
            _watcher = null;
149

    
150
            if (_agent!=null)
151
                _agent.Stop();
152
        }
153

    
154
        // Enumerate all files in the Pithos directory except those in the Fragment folder
155
        // and files with a .ignore extension
156
        public IEnumerable<string> EnumerateFiles(string searchPattern="*")
157
        {
158
            var monitoredFiles = from filePath in Directory.EnumerateFileSystemEntries(RootPath, searchPattern, SearchOption.AllDirectories)
159
                                 where !Ignore(filePath)
160
                                 select filePath;
161
            return monitoredFiles;
162
        }
163

    
164
        public IEnumerable<FileInfo> EnumerateFileInfos(string searchPattern="*")
165
        {
166
            var rootDir = new DirectoryInfo(RootPath);
167
            var monitoredFiles = from file in rootDir.EnumerateFiles(searchPattern, SearchOption.AllDirectories)
168
                                 where !Ignore(file.FullName)
169
                                 select file;
170
            return monitoredFiles;
171
        }                
172

    
173
        public IEnumerable<string> EnumerateFilesAsRelativeUrls(string searchPattern="*")
174
        {
175
            var rootDir = new DirectoryInfo(RootPath);
176
            var monitoredFiles = from file in rootDir.EnumerateFiles(searchPattern, SearchOption.AllDirectories)
177
                                 where !Ignore(file.FullName)
178
                                 select file.AsRelativeUrlTo(RootPath);
179
            return monitoredFiles;
180
        }                
181

    
182

    
183
        
184

    
185
        private bool Ignore(string filePath)
186
        {
187
            if (filePath.StartsWith(FragmentsPath))
188
                return true;
189
            if (_ignoreFiles.ContainsKey(filePath.ToLower()))
190
                return true;
191
            return false;
192
        }
193

    
194
        //Post a Change message for all events except rename
195
        void OnFileEvent(object sender, FileSystemEventArgs e)
196
        {
197
            //Ignore events that affect the Fragments folder
198
            var filePath = e.FullPath;
199
            if (Ignore(filePath)) 
200
                return;
201
            _agent.Post(new WorkflowState { Path = filePath, FileName = e.Name, TriggeringChange = e.ChangeType });
202
        }
203

    
204

    
205
        //Post a Change message for renames containing the old and new names
206
        void OnRenameEvent(object sender, RenamedEventArgs e)
207
        {
208
            var oldFullPath = e.OldFullPath;
209
            var fullPath = e.FullPath;
210
            if (Ignore(oldFullPath) || Ignore(fullPath))
211
                return;
212

    
213
            _agent.Post(new WorkflowState
214
            {
215
                OldPath = oldFullPath,
216
                OldFileName = e.OldName,
217
                Path = fullPath,
218
                FileName = e.Name,
219
                TriggeringChange = e.ChangeType
220
            });
221
        }
222

    
223

    
224

    
225
        private Dictionary<WatcherChangeTypes, FileStatus> _statusDict = new Dictionary<WatcherChangeTypes, FileStatus>
226
        {
227
            {WatcherChangeTypes.Created,FileStatus.Created},
228
            {WatcherChangeTypes.Changed,FileStatus.Modified},
229
            {WatcherChangeTypes.Deleted,FileStatus.Deleted},
230
            {WatcherChangeTypes.Renamed,FileStatus.Renamed}
231
        };
232

    
233
        private Dictionary<string,string> _ignoreFiles=new Dictionary<string, string>();
234

    
235
        private WorkflowState UpdateFileStatus(WorkflowState state)
236
        {
237
            if (state==null)
238
                throw new ArgumentNullException("state");
239
            if (String.IsNullOrWhiteSpace(state.Path))
240
                throw new ArgumentException("The state's Path can't be empty","state");
241
            Contract.EndContractBlock();
242

    
243
            var path = state.Path;
244
            var status = _statusDict[state.TriggeringChange];
245
            var oldStatus = Workflow.StatusKeeper.GetFileStatus(path);
246
            if (status == oldStatus)
247
            {
248
                state.Status = status;
249
                state.Skip = true;
250
                return state;
251
            }
252
            if (state.Status == FileStatus.Renamed)
253
                Workflow.ClearFileStatus(path);
254

    
255
            state.Status = Workflow.SetFileStatus(path, status);
256
            return state;
257
        }
258

    
259
        private WorkflowState UpdateOverlayStatus(WorkflowState state)
260
        {
261
            if (state==null)
262
                throw new ArgumentNullException("state");
263
            Contract.EndContractBlock();
264

    
265
            if (state.Skip)
266
                return state;
267

    
268
            switch (state.Status)
269
            {
270
                case FileStatus.Created:
271
                case FileStatus.Modified:
272
                    this.StatusKeeper.SetFileOverlayStatus(state.Path, FileOverlayStatus.Modified);
273
                    break;
274
                case FileStatus.Deleted:
275
                    //this.StatusAgent.RemoveFileOverlayStatus(state.Path);
276
                    break;
277
                case FileStatus.Renamed:
278
                    this.StatusKeeper.ClearFileStatus(state.OldPath);
279
                    this.StatusKeeper.SetFileOverlayStatus(state.Path, FileOverlayStatus.Modified);
280
                    break;
281
                case FileStatus.Unchanged:
282
                    this.StatusKeeper.SetFileOverlayStatus(state.Path, FileOverlayStatus.Normal);
283
                    break;
284
            }
285

    
286
            if (state.Status == FileStatus.Deleted)
287
                NativeMethods.RaiseChangeNotification(Path.GetDirectoryName(state.Path));
288
            else
289
                NativeMethods.RaiseChangeNotification(state.Path);
290
            return state;
291
        }
292

    
293

    
294
        private WorkflowState UpdateFileChecksum(WorkflowState state)
295
        {
296
            if (state.Skip)
297
                return state;
298

    
299
            if (state.Status == FileStatus.Deleted)
300
                return state;
301

    
302
            var path = state.Path;
303
            //Skip calculation for folders
304
            if (Directory.Exists(path))
305
                return state;
306

    
307
            var info = new FileInfo(path);
308
            string hash = info.CalculateHash(StatusKeeper.BlockSize,StatusKeeper.BlockHash);
309
            StatusKeeper.UpdateFileChecksum(path, hash);
310

    
311
            state.Hash = hash;
312
            return state;
313
        }
314

    
315
        //Does the file exist in the container's local folder?
316
        public bool Exists(string relativePath)
317
        {
318
            if (String.IsNullOrWhiteSpace(relativePath))
319
                throw new ArgumentNullException("relativePath");
320
            //A RootPath must be set before calling this method
321
            if (String.IsNullOrWhiteSpace(RootPath))
322
                throw new InvalidOperationException("RootPath was not set");
323
            Contract.EndContractBlock();
324
            //Create the absolute path by combining the RootPath with the relativePath
325
            var absolutePath=Path.Combine(RootPath, relativePath);
326
            //Is this a valid file?
327
            if (File.Exists(absolutePath))
328
                return true;
329
            //Or a directory?
330
            if (Directory.Exists(absolutePath))
331
                return true;
332
            //Fail if it is neither
333
            return false;
334
        }
335

    
336
        public FileInfo GetFileInfo(string relativePath)
337
        {
338
            if (String.IsNullOrWhiteSpace(relativePath))
339
                throw new ArgumentNullException("relativePath");
340
            //A RootPath must be set before calling this method
341
            if (String.IsNullOrWhiteSpace(RootPath))
342
                throw new InvalidOperationException("RootPath was not set");            
343
            Contract.EndContractBlock();            
344

    
345
            var absolutePath = Path.Combine(RootPath, relativePath);
346
//            Debug.Assert(File.Exists(absolutePath),String.Format("Path {0} doesn't exist",absolutePath));
347

    
348
            return new FileInfo(absolutePath);
349
            
350
        }
351

    
352
        public void Delete(string relativePath)
353
        {
354
            var absolutePath = Path.Combine(RootPath, relativePath);
355
            if (File.Exists(absolutePath))
356
            {                   
357
                File.Delete(absolutePath);
358
                _ignoreFiles[absolutePath.ToLower()] = absolutePath.ToLower();                
359
            }
360
            StatusKeeper.ClearFileStatus(absolutePath);
361
        }
362
    }
363
}