Statistics
| Branch: | Revision:

root / trunk / Pithos.Core / Agents / FileAgent.cs @ c53aa229

History | View | Annotate | Download (13 kB)

1
using System;
2
using System.Collections.Generic;
3
using System.ComponentModel.Composition;
4
using System.Diagnostics;
5
using System.Diagnostics.Contracts;
6
using System.IO;
7
using System.Linq;
8
using System.Text;
9
using System.Threading.Tasks;
10
using Pithos.Interfaces;
11
using Pithos.Network;
12
using log4net;
13
using log4net.Core;
14

    
15
namespace Pithos.Core.Agents
16
{
17
    [Export,PartCreationPolicy(CreationPolicy.NonShared)]
18
    public class FileAgent
19
    {
20
        Agent<WorkflowState> _agent;
21
        private FileSystemWatcher _watcher;
22

    
23
        [Import]
24
        public IStatusKeeper StatusKeeper { get; set; }
25
        [Import]
26
        public IPithosWorkflow Workflow { get; set; }
27
        [Import]
28
        public WorkflowAgent WorkflowAgent { get; set; }
29

    
30
        private AccountInfo AccountInfo { get; set; }
31

    
32
        private string RootPath { get;  set; }
33

    
34
        private static readonly ILog Log = LogManager.GetLogger("FileAgent");
35

    
36
        public void Start(AccountInfo accountInfo,string rootPath)
37
        {
38
            if (accountInfo==null)
39
                throw new ArgumentNullException("accountInfo");
40
            if (String.IsNullOrWhiteSpace(rootPath))
41
                throw new ArgumentNullException("rootPath");
42
            if (!Path.IsPathRooted(rootPath))
43
                throw new ArgumentException("rootPath must be an absolute path","rootPath");
44
            Contract.EndContractBlock();
45

    
46
            AccountInfo = accountInfo;
47
            RootPath = rootPath;
48
            _watcher = new FileSystemWatcher(rootPath);
49
            _watcher.IncludeSubdirectories = true;            
50
            _watcher.Changed += OnFileEvent;
51
            _watcher.Created += OnFileEvent;
52
            _watcher.Deleted += OnFileEvent;
53
            _watcher.Renamed += OnRenameEvent;
54
            _watcher.EnableRaisingEvents = true;
55

    
56

    
57
            _agent = Agent<WorkflowState>.Start(inbox =>
58
            {
59
                Action loop = null;
60
                loop = () =>
61
                {
62
                    var message = inbox.Receive();
63
                    var process=message.Then(Process,inbox.CancellationToken);
64

    
65
                    inbox.LoopAsync(process,loop,ex=>
66
                        Log.ErrorFormat("[ERROR] File Event Processing:\r{0}", ex));
67
                };
68
                loop();
69
            });
70
        }
71

    
72
        private Task<object> Process(WorkflowState state)
73
        {
74
            if (state==null)
75
                throw new ArgumentNullException("state");
76
            Contract.EndContractBlock();
77

    
78
            Debug.Assert(!Ignore(state.Path));
79

    
80
            var networkState = NetworkGate.GetNetworkState(state.Path);
81
            //Skip if the file is already being downloaded or uploaded and 
82
            //the change is create or modify
83
            if (networkState != NetworkOperation.None &&
84
                (
85
                    state.TriggeringChange == WatcherChangeTypes.Created ||
86
                    state.TriggeringChange == WatcherChangeTypes.Changed
87
                ))
88
                return CompletedTask<object>.Default;
89

    
90
            try
91
            {
92
                UpdateFileStatus(state);
93
                UpdateOverlayStatus(state);
94
                UpdateFileChecksum(state);
95
                WorkflowAgent.Post(state);
96
            }
97
            catch (IOException exc)
98
            {
99
                if (File.Exists(state.Path))
100
                {
101
                    Log.WarnFormat("File access error occured, retrying {0}\n{1}", state.Path, exc);
102
                    _agent.Post(state);
103
                }
104
                else
105
                {
106
                    Log.WarnFormat("File {0} does not exist. Will be ignored\n{1}", state.Path, exc);
107
                }
108
            }
109
            catch (Exception exc)
110
            {
111
                Log.WarnFormat("Error occured while indexing{0. The file will be skipped}\n{1}", state.Path, exc);
112
            }
113
            return CompletedTask<object>.Default;
114
        }
115

    
116

    
117
/*
118
        private Task Process(Task<WorkflowState> action)
119
        {
120
            return action.ContinueWith(t => Process(t.Result));
121
        }
122
*/
123

    
124

    
125
        public bool Pause
126
        {
127
            get { return _watcher == null || !_watcher.EnableRaisingEvents; }
128
            set
129
            {
130
                if (_watcher != null)
131
                    _watcher.EnableRaisingEvents = !value;                
132
            }
133
        }
134

    
135
        public string FragmentsPath { get; set; }
136

    
137
        public void Post(WorkflowState workflowState)
138
        {
139
            if (workflowState == null)
140
                throw new ArgumentNullException("workflowState");
141
            Contract.EndContractBlock();
142

    
143
            _agent.Post(workflowState);
144
        }
145

    
146
        public void Stop()
147
        {
148
            if (_watcher != null)
149
            {
150
                _watcher.Changed -= OnFileEvent;
151
                _watcher.Created -= OnFileEvent;
152
                _watcher.Deleted -= OnFileEvent;
153
                _watcher.Renamed -= OnRenameEvent;
154
                _watcher.Dispose();
155
            }
156
            _watcher = null;
157

    
158
            if (_agent!=null)
159
                _agent.Stop();
160
        }
161

    
162
        // Enumerate all files in the Pithos directory except those in the Fragment folder
163
        // and files with a .ignore extension
164
        public IEnumerable<string> EnumerateFiles(string searchPattern="*")
165
        {
166
            var monitoredFiles = from filePath in Directory.EnumerateFileSystemEntries(RootPath, searchPattern, SearchOption.AllDirectories)
167
                                 where !Ignore(filePath)
168
                                 select filePath;
169
            return monitoredFiles;
170
        }
171

    
172
        public IEnumerable<FileInfo> EnumerateFileInfos(string searchPattern="*")
173
        {
174
            var rootDir = new DirectoryInfo(RootPath);
175
            var monitoredFiles = from file in rootDir.EnumerateFiles(searchPattern, SearchOption.AllDirectories)
176
                                 where !Ignore(file.FullName)
177
                                 select file;
178
            return monitoredFiles;
179
        }                
180

    
181
        public IEnumerable<string> EnumerateFilesAsRelativeUrls(string searchPattern="*")
182
        {
183
            var rootDir = new DirectoryInfo(RootPath);
184
            var monitoredFiles = from file in rootDir.EnumerateFiles(searchPattern, SearchOption.AllDirectories)
185
                                 where !Ignore(file.FullName)
186
                                 select file.AsRelativeUrlTo(RootPath);
187
            return monitoredFiles;
188
        }                
189

    
190

    
191
        
192

    
193
        private bool Ignore(string filePath)
194
        {
195
            if (filePath.StartsWith(FragmentsPath))
196
                return true;
197
            if (_ignoreFiles.ContainsKey(filePath.ToLower()))
198
                return true;
199
            return false;
200
        }
201

    
202
        //Post a Change message for all events except rename
203
        void OnFileEvent(object sender, FileSystemEventArgs e)
204
        {
205
            //Ignore events that affect the Fragments folder
206
            var filePath = e.FullPath;
207
            if (Ignore(filePath)) 
208
                return;
209
            _agent.Post(new WorkflowState(AccountInfo) { Path = filePath, FileName = e.Name, TriggeringChange = e.ChangeType });
210
        }
211

    
212

    
213
        //Post a Change message for renames containing the old and new names
214
        void OnRenameEvent(object sender, RenamedEventArgs e)
215
        {
216
            var oldFullPath = e.OldFullPath;
217
            var fullPath = e.FullPath;
218
            if (Ignore(oldFullPath) || Ignore(fullPath))
219
                return;
220

    
221
            _agent.Post(new WorkflowState(AccountInfo)
222
            {
223
                OldPath = oldFullPath,
224
                OldFileName = e.OldName,
225
                Path = fullPath,
226
                FileName = e.Name,
227
                TriggeringChange = e.ChangeType
228
            });
229
        }
230

    
231

    
232

    
233
        private Dictionary<WatcherChangeTypes, FileStatus> _statusDict = new Dictionary<WatcherChangeTypes, FileStatus>
234
        {
235
            {WatcherChangeTypes.Created,FileStatus.Created},
236
            {WatcherChangeTypes.Changed,FileStatus.Modified},
237
            {WatcherChangeTypes.Deleted,FileStatus.Deleted},
238
            {WatcherChangeTypes.Renamed,FileStatus.Renamed}
239
        };
240

    
241
        private Dictionary<string,string> _ignoreFiles=new Dictionary<string, string>();
242

    
243
        private WorkflowState UpdateFileStatus(WorkflowState state)
244
        {
245
            if (state==null)
246
                throw new ArgumentNullException("state");
247
            if (String.IsNullOrWhiteSpace(state.Path))
248
                throw new ArgumentException("The state's Path can't be empty","state");
249
            Contract.EndContractBlock();
250

    
251
            var path = state.Path;
252
            var status = _statusDict[state.TriggeringChange];
253
            var oldStatus = Workflow.StatusKeeper.GetFileStatus(path);
254
            if (status == oldStatus)
255
            {
256
                state.Status = status;
257
                state.Skip = true;
258
                return state;
259
            }
260
            if (state.Status == FileStatus.Renamed)
261
                Workflow.ClearFileStatus(path);
262

    
263
            state.Status = Workflow.SetFileStatus(path, status);
264
            return state;
265
        }
266

    
267
        private WorkflowState UpdateOverlayStatus(WorkflowState state)
268
        {
269
            if (state==null)
270
                throw new ArgumentNullException("state");
271
            Contract.EndContractBlock();
272

    
273
            if (state.Skip)
274
                return state;
275

    
276
            switch (state.Status)
277
            {
278
                case FileStatus.Created:
279
                case FileStatus.Modified:
280
                    this.StatusKeeper.SetFileOverlayStatus(state.Path, FileOverlayStatus.Modified);
281
                    break;
282
                case FileStatus.Deleted:
283
                    //this.StatusAgent.RemoveFileOverlayStatus(state.Path);
284
                    break;
285
                case FileStatus.Renamed:
286
                    this.StatusKeeper.ClearFileStatus(state.OldPath);
287
                    this.StatusKeeper.SetFileOverlayStatus(state.Path, FileOverlayStatus.Modified);
288
                    break;
289
                case FileStatus.Unchanged:
290
                    this.StatusKeeper.SetFileOverlayStatus(state.Path, FileOverlayStatus.Normal);
291
                    break;
292
            }
293

    
294
            if (state.Status == FileStatus.Deleted)
295
                NativeMethods.RaiseChangeNotification(Path.GetDirectoryName(state.Path));
296
            else
297
                NativeMethods.RaiseChangeNotification(state.Path);
298
            return state;
299
        }
300

    
301

    
302
        private WorkflowState UpdateFileChecksum(WorkflowState state)
303
        {
304
            if (state.Skip)
305
                return state;
306

    
307
            if (state.Status == FileStatus.Deleted)
308
                return state;
309

    
310
            var path = state.Path;
311
            //Skip calculation for folders
312
            if (Directory.Exists(path))
313
                return state;
314

    
315
            var info = new FileInfo(path);
316
            string hash = info.CalculateHash(StatusKeeper.BlockSize,StatusKeeper.BlockHash);
317
            StatusKeeper.UpdateFileChecksum(path, hash);
318

    
319
            state.Hash = hash;
320
            return state;
321
        }
322

    
323
        //Does the file exist in the container's local folder?
324
        public bool Exists(string relativePath)
325
        {
326
            if (String.IsNullOrWhiteSpace(relativePath))
327
                throw new ArgumentNullException("relativePath");
328
            //A RootPath must be set before calling this method
329
            if (String.IsNullOrWhiteSpace(RootPath))
330
                throw new InvalidOperationException("RootPath was not set");
331
            Contract.EndContractBlock();
332
            //Create the absolute path by combining the RootPath with the relativePath
333
            var absolutePath=Path.Combine(RootPath, relativePath);
334
            //Is this a valid file?
335
            if (File.Exists(absolutePath))
336
                return true;
337
            //Or a directory?
338
            if (Directory.Exists(absolutePath))
339
                return true;
340
            //Fail if it is neither
341
            return false;
342
        }
343

    
344
        public FileInfo GetFileInfo(string relativePath)
345
        {
346
            if (String.IsNullOrWhiteSpace(relativePath))
347
                throw new ArgumentNullException("relativePath");
348
            //A RootPath must be set before calling this method
349
            if (String.IsNullOrWhiteSpace(RootPath))
350
                throw new InvalidOperationException("RootPath was not set");            
351
            Contract.EndContractBlock();            
352

    
353
            var absolutePath = Path.Combine(RootPath, relativePath);
354
//            Debug.Assert(File.Exists(absolutePath),String.Format("Path {0} doesn't exist",absolutePath));
355

    
356
            return new FileInfo(absolutePath);
357
            
358
        }
359

    
360
        public void Delete(string relativePath)
361
        {
362
            var absolutePath = Path.Combine(RootPath, relativePath);
363
            if (File.Exists(absolutePath))
364
            {                   
365
                File.Delete(absolutePath);
366
                _ignoreFiles[absolutePath.ToLower()] = absolutePath.ToLower();                
367
            }
368
            StatusKeeper.ClearFileStatus(absolutePath);
369
        }
370
    }
371
}