Statistics
| Branch: | Revision:

root / trunk / Pithos.Core / Agents / FileAgent.cs @ 5e31048f

History | View | Annotate | Download (12.8 kB)

1
using System;
2
using System.Collections.Generic;
3
using System.ComponentModel.Composition;
4
using System.Diagnostics;
5
using System.Diagnostics.Contracts;
6
using System.IO;
7
using System.Linq;
8
using System.Text;
9
using System.Threading.Tasks;
10
using System.Threading.Tasks.Dataflow;
11
using Pithos.Interfaces;
12
using Pithos.Network;
13
using log4net;
14
using log4net.Core;
15

    
16
namespace Pithos.Core.Agents
17
{
18
    [Export]
19
    public class FileAgent
20
    {
21
        //Agent<WorkflowState> _agent;
22
        TransformBlock<WorkflowState, WorkflowState> _agent;
23
        private FileSystemWatcher _watcher;
24

    
25
        [Import]
26
        public IStatusKeeper StatusKeeper { get; set; }
27
        [Import]
28
        public IPithosWorkflow Workflow { get; set; }
29
        [Import]
30
        public WorkflowAgent WorkflowAgent { get; set; }
31

    
32
        private AccountInfo AccountInfo { get; set; }
33

    
34
        private string RootPath { get;  set; }
35

    
36
        private static readonly ILog Log = LogManager.GetLogger("FileAgent");
37

    
38
        public FileAgent()
39
        {
40
            _agent = new TransformBlock<WorkflowState, WorkflowState>(async message => 
41
                await TaskEx.Run(()=>Process(message)));
42
        }
43

    
44
        private WorkflowState Process(WorkflowState message)
45
        {
46
            Debug.Assert(!Ignore(message.Path));
47

    
48
            try
49
            {
50
                //Skip if the file is already being downloaded or uploaded and 
51
                //the change is create or modify
52
                var networkState = NetworkGate.GetNetworkState(message.Path);
53
                if (networkState != NetworkOperation.None &&
54
                    (message.TriggeringChange == WatcherChangeTypes.Created ||
55
                     message.TriggeringChange == WatcherChangeTypes.Changed))
56
                    return null;
57

    
58
                UpdateFileStatus(message);
59
                UpdateOverlayStatus(message);
60
                UpdateFileChecksum(message);
61
                return message;
62
            }
63
            catch (IOException exc)
64
            {
65
                if (File.Exists(message.Path))
66
                {
67
                    Log.WarnFormat("File access error occured, retrying {0}\n{1}", message.Path, exc);
68
                    _agent.Post(message);
69
                }
70
                else
71
                {
72
                    Log.WarnFormat("File {0} does not exist. Will be ignored\n{1}", message.Path, exc);
73
                }
74
            }
75
            catch (Exception exc)
76
            {
77
                Log.WarnFormat("Error occured while indexing{0}. The file will be skipped}\n{1}",
78
                               message.Path, exc);
79
            }
80

    
81
            return null;
82
        }
83

    
84
        public void Start(AccountInfo accountInfo,string rootPath)
85
        {
86
            if (accountInfo==null)
87
                throw new ArgumentNullException("accountInfo");
88
            if (String.IsNullOrWhiteSpace(rootPath))
89
                throw new ArgumentNullException("rootPath");
90
            if (!Path.IsPathRooted(rootPath))
91
                throw new ArgumentException("rootPath must be an absolute path","rootPath");
92
            Contract.EndContractBlock();
93

    
94
            _agent.LinkTo(WorkflowAgent.Agent);            
95

    
96
            AccountInfo = accountInfo;
97
            RootPath = rootPath;
98
            _watcher = new FileSystemWatcher(rootPath);
99
            _watcher.IncludeSubdirectories = true;            
100
            _watcher.Changed += OnFileEvent;
101
            _watcher.Created += OnFileEvent;
102
            _watcher.Deleted += OnFileEvent;
103
            _watcher.Renamed += OnRenameEvent;
104
            _watcher.EnableRaisingEvents = true;
105
           
106
        }
107

    
108
        public bool Pause
109
        {
110
            get { return _watcher == null || !_watcher.EnableRaisingEvents; }
111
            set
112
            {
113
                if (_watcher != null)
114
                    _watcher.EnableRaisingEvents = !value;                
115
            }
116
        }
117

    
118
        public string CachePath { get; set; }
119

    
120
        private List<string> _selectivePaths = new List<string>();
121
        public List<string> SelectivePaths
122
        {
123
            get { return _selectivePaths; }
124
            set { _selectivePaths = value; }
125
        }
126

    
127

    
128
        public void Post(WorkflowState workflowState)
129
        {
130
            if (workflowState == null)
131
                throw new ArgumentNullException("workflowState");
132
            Contract.EndContractBlock();
133

    
134
            _agent.Post(workflowState);
135
        }
136

    
137
        public void Stop()
138
        {
139
            if (_watcher != null)
140
            {
141
                _watcher.Changed -= OnFileEvent;
142
                _watcher.Created -= OnFileEvent;
143
                _watcher.Deleted -= OnFileEvent;
144
                _watcher.Renamed -= OnRenameEvent;
145
                _watcher.Dispose();
146
            }
147
            _watcher = null;
148

    
149
            if (_agent!=null)
150
                _agent.Complete();
151
        }
152

    
153
        // Enumerate all files in the Pithos directory except those in the Fragment folder
154
        // and files with a .ignore extension
155
        public IEnumerable<string> EnumerateFiles(string searchPattern="*")
156
        {
157
            var monitoredFiles = from filePath in Directory.EnumerateFileSystemEntries(RootPath, searchPattern, SearchOption.AllDirectories)
158
                                 where !Ignore(filePath)
159
                                 select filePath;
160
            return monitoredFiles;
161
        }
162

    
163
        public IEnumerable<FileInfo> EnumerateFileInfos(string searchPattern="*")
164
        {
165
            var rootDir = new DirectoryInfo(RootPath);
166
            var monitoredFiles = from file in rootDir.EnumerateFiles(searchPattern, SearchOption.AllDirectories)
167
                                 where !Ignore(file.FullName)
168
                                 select file;
169
            return monitoredFiles;
170
        }                
171

    
172
        public IEnumerable<string> EnumerateFilesAsRelativeUrls(string searchPattern="*")
173
        {
174
            var rootDir = new DirectoryInfo(RootPath);
175
            var monitoredFiles = from file in rootDir.EnumerateFiles(searchPattern, SearchOption.AllDirectories)
176
                                 where !Ignore(file.FullName)
177
                                 select file.AsRelativeUrlTo(RootPath);
178
            return monitoredFiles;
179
        }                
180

    
181

    
182
        
183

    
184
        private bool Ignore(string filePath)
185
        {
186
            if (filePath.StartsWith(CachePath))
187
                return true;
188
            if (_ignoreFiles.ContainsKey(filePath.ToLower()))
189
                return true;
190
            return false;
191
        }
192

    
193
        //Post a Change message for all events except rename
194
        void OnFileEvent(object sender, FileSystemEventArgs e)
195
        {
196
            //Ignore events that affect the cache folder
197
            var filePath = e.FullPath;
198
            if (Ignore(filePath)) 
199
                return;           
200
            
201
            _agent.Post(new WorkflowState{AccountInfo=AccountInfo, Path = filePath, FileName = e.Name, TriggeringChange = e.ChangeType });
202
        }
203

    
204

    
205
        //Post a Change message for renames containing the old and new names
206
        void OnRenameEvent(object sender, RenamedEventArgs e)
207
        {
208
            var oldFullPath = e.OldFullPath;
209
            var fullPath = e.FullPath;
210
            if (Ignore(oldFullPath) || Ignore(fullPath))
211
                return;
212

    
213
            _agent.Post(new WorkflowState
214
            {
215
                AccountInfo=AccountInfo,
216
                OldPath = oldFullPath,
217
                OldFileName = e.OldName,
218
                Path = fullPath,
219
                FileName = e.Name,
220
                TriggeringChange = e.ChangeType
221
            });
222
        }
223

    
224

    
225

    
226
        private Dictionary<WatcherChangeTypes, FileStatus> _statusDict = new Dictionary<WatcherChangeTypes, FileStatus>
227
        {
228
            {WatcherChangeTypes.Created,FileStatus.Created},
229
            {WatcherChangeTypes.Changed,FileStatus.Modified},
230
            {WatcherChangeTypes.Deleted,FileStatus.Deleted},
231
            {WatcherChangeTypes.Renamed,FileStatus.Renamed}
232
        };
233

    
234
        private Dictionary<string,string> _ignoreFiles=new Dictionary<string, string>();
235

    
236
        private WorkflowState UpdateFileStatus(WorkflowState state)
237
        {
238
            if (state==null)
239
                throw new ArgumentNullException("state");
240
            if (String.IsNullOrWhiteSpace(state.Path))
241
                throw new ArgumentException("The state's Path can't be empty","state");
242
            Contract.EndContractBlock();
243

    
244
            var path = state.Path;
245
            var status = _statusDict[state.TriggeringChange];
246
            var oldStatus = Workflow.StatusKeeper.GetFileStatus(path);
247
            if (status == oldStatus)
248
            {
249
                state.Status = status;
250
                state.Skip = true;
251
                return state;
252
            }
253
            if (state.Status == FileStatus.Renamed)
254
                Workflow.ClearFileStatus(path);
255

    
256
            state.Status = Workflow.SetFileStatus(path, status);
257
            return state;
258
        }
259

    
260
        private WorkflowState UpdateOverlayStatus(WorkflowState state)
261
        {
262
            if (state==null)
263
                throw new ArgumentNullException("state");
264
            Contract.EndContractBlock();
265

    
266
            if (state.Skip)
267
                return state;
268

    
269
            switch (state.Status)
270
            {
271
                case FileStatus.Created:
272
                case FileStatus.Modified:
273
                    this.StatusKeeper.SetFileOverlayStatus(state.Path, FileOverlayStatus.Modified);
274
                    break;
275
                case FileStatus.Deleted:
276
                    //this.StatusAgent.RemoveFileOverlayStatus(state.Path);
277
                    break;
278
                case FileStatus.Renamed:
279
                    this.StatusKeeper.ClearFileStatus(state.OldPath);
280
                    this.StatusKeeper.SetFileOverlayStatus(state.Path, FileOverlayStatus.Modified);
281
                    break;
282
                case FileStatus.Unchanged:
283
                    this.StatusKeeper.SetFileOverlayStatus(state.Path, FileOverlayStatus.Normal);
284
                    break;
285
            }
286

    
287
            if (state.Status == FileStatus.Deleted)
288
                NativeMethods.RaiseChangeNotification(Path.GetDirectoryName(state.Path));
289
            else
290
                NativeMethods.RaiseChangeNotification(state.Path);
291
            return state;
292
        }
293

    
294

    
295
        private WorkflowState UpdateFileChecksum(WorkflowState state)
296
        {
297
            if (state.Skip)
298
                return state;
299

    
300
            if (state.Status == FileStatus.Deleted)
301
                return state;
302

    
303
            var path = state.Path;
304
            //Skip calculation for folders
305
            if (Directory.Exists(path))
306
                return state;
307

    
308
            var info = new FileInfo(path);
309
            string hash = info.CalculateHash(StatusKeeper.BlockSize,StatusKeeper.BlockHash);
310
            StatusKeeper.UpdateFileChecksum(path, hash);
311

    
312
            state.Hash = hash;
313
            return state;
314
        }
315

    
316
        //Does the file exist in the container's local folder?
317
        public bool Exists(string relativePath)
318
        {
319
            if (String.IsNullOrWhiteSpace(relativePath))
320
                throw new ArgumentNullException("relativePath");
321
            //A RootPath must be set before calling this method
322
            if (String.IsNullOrWhiteSpace(RootPath))
323
                throw new InvalidOperationException("RootPath was not set");
324
            Contract.EndContractBlock();
325
            //Create the absolute path by combining the RootPath with the relativePath
326
            var absolutePath=Path.Combine(RootPath, relativePath);
327
            //Is this a valid file?
328
            if (File.Exists(absolutePath))
329
                return true;
330
            //Or a directory?
331
            if (Directory.Exists(absolutePath))
332
                return true;
333
            //Fail if it is neither
334
            return false;
335
        }
336

    
337
        public FileInfo GetFileInfo(string relativePath)
338
        {
339
            if (String.IsNullOrWhiteSpace(relativePath))
340
                throw new ArgumentNullException("relativePath");
341
            //A RootPath must be set before calling this method
342
            if (String.IsNullOrWhiteSpace(RootPath))
343
                throw new InvalidOperationException("RootPath was not set");            
344
            Contract.EndContractBlock();            
345

    
346
            var absolutePath = Path.Combine(RootPath, relativePath);
347
//            Debug.Assert(File.Exists(absolutePath),String.Format("Path {0} doesn't exist",absolutePath));
348

    
349
            return new FileInfo(absolutePath);
350
            
351
        }
352

    
353
        public void Delete(string relativePath)
354
        {
355
            var absolutePath = Path.Combine(RootPath, relativePath);
356
            if (File.Exists(absolutePath))
357
            {                   
358
                File.Delete(absolutePath);
359
                _ignoreFiles[absolutePath.ToLower()] = absolutePath.ToLower();                
360
            }
361
            StatusKeeper.ClearFileStatus(absolutePath);
362
        }
363
    }
364
}