Statistics
| Branch: | Revision:

root / trunk / Pithos.Core / Agents / FileAgent.cs @ 79f92570

History | View | Annotate | Download (21.7 kB)

1
#region
2
/* -----------------------------------------------------------------------
3
 * <copyright file="FileAgent.cs" company="GRNet">
4
 * 
5
 * Copyright 2011-2012 GRNET S.A. All rights reserved.
6
 *
7
 * Redistribution and use in source and binary forms, with or
8
 * without modification, are permitted provided that the following
9
 * conditions are met:
10
 *
11
 *   1. Redistributions of source code must retain the above
12
 *      copyright notice, this list of conditions and the following
13
 *      disclaimer.
14
 *
15
 *   2. Redistributions in binary form must reproduce the above
16
 *      copyright notice, this list of conditions and the following
17
 *      disclaimer in the documentation and/or other materials
18
 *      provided with the distribution.
19
 *
20
 *
21
 * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
22
 * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
23
 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
24
 * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
25
 * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26
 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27
 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
28
 * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
29
 * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
30
 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
31
 * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
32
 * POSSIBILITY OF SUCH DAMAGE.
33
 *
34
 * The views and conclusions contained in the software and
35
 * documentation are those of the authors and should not be
36
 * interpreted as representing official policies, either expressed
37
 * or implied, of GRNET S.A.
38
 * </copyright>
39
 * -----------------------------------------------------------------------
40
 */
41
#endregion
42
using System;
43
using System.Collections.Generic;
44
using System.Diagnostics.Contracts;
45
using System.IO;
46
using System.Linq;
47
using System.Reflection;
48
using System.Threading.Tasks;
49
using Pithos.Interfaces;
50
using Pithos.Network;
51
using log4net;
52

    
53
namespace Pithos.Core.Agents
54
{
55
//    [Export]
56
    public class FileAgent
57
    {
58
        private static readonly ILog Log = LogManager.GetLogger(MethodBase.GetCurrentMethod().DeclaringType);
59

    
60
        Agent<WorkflowState> _agent;
61
        private FileSystemWatcher _watcher;
62
        private FileSystemWatcherAdapter _adapter;
63

    
64
        //[Import]
65
        public IStatusKeeper StatusKeeper { get; set; }
66

    
67
        public IStatusNotification StatusNotification { get; set; }
68
        //[Import]
69
        public IPithosWorkflow Workflow { get; set; }
70
        //[Import]
71
        public WorkflowAgent WorkflowAgent { get; set; }
72

    
73
        private AccountInfo AccountInfo { get; set; }
74

    
75
        internal string RootPath { get;  set; }
76
        
77
        private FileEventIdleBatch _eventIdleBatch;
78

    
79
        public FileAgent(int idleTimeout)
80
        {            
81
            _eventIdleBatch=new FileEventIdleBatch(idleTimeout,ProcessBatchedEvents);
82
        }
83

    
84
        private void ProcessBatchedEvents(Dictionary<string, FileSystemEventArgs[]> fileEvents)
85
        {
86
            StatusNotification.SetPithosStatus(PithosStatus.LocalSyncing,String.Format("Uploading {0} files",fileEvents.Count));
87
            foreach (var fileEvent in fileEvents)
88
            {
89
                var filePath = fileEvent.Key;
90
                var changes = fileEvent.Value;
91
                
92
                if (Ignore(filePath)) continue;
93
                                
94
                foreach (var change in changes)
95
                {
96
                    if (change.ChangeType == WatcherChangeTypes.Renamed)
97
                    {
98
                        var rename = (RenamedEventArgs) change;
99
                        _agent.Post(new WorkflowState
100
                                        {
101
                                            AccountInfo = AccountInfo,
102
                                            OldPath = rename.OldFullPath,
103
                                            OldFileName = rename.OldName,
104
                                            Path = rename.FullPath,
105
                                            FileName = rename.Name,
106
                                            TriggeringChange = rename.ChangeType
107
                                        });
108
                    }
109
                    else
110
                        _agent.Post(new WorkflowState
111
                        {
112
                            AccountInfo = AccountInfo,
113
                            Path = change.FullPath,
114
                            FileName = change.Name,
115
                            TriggeringChange = change.ChangeType
116
                        });                        
117
                }
118
            }
119
            StatusNotification.SetPithosStatus(PithosStatus.LocalComplete);
120
        }
121

    
122
        public void Start(AccountInfo accountInfo,string rootPath)
123
        {
124
            if (accountInfo==null)
125
                throw new ArgumentNullException("accountInfo");
126
            if (String.IsNullOrWhiteSpace(rootPath))
127
                throw new ArgumentNullException("rootPath");
128
            if (!Path.IsPathRooted(rootPath))
129
                throw new ArgumentException("rootPath must be an absolute path","rootPath");
130
            Contract.EndContractBlock();
131

    
132
            AccountInfo = accountInfo;
133
            RootPath = rootPath;
134
            _watcher = new FileSystemWatcher(rootPath) {IncludeSubdirectories = true,InternalBufferSize=8*4096};
135
            _adapter = new FileSystemWatcherAdapter(_watcher);
136

    
137
            _adapter.Changed += OnFileEvent;
138
            _adapter.Created += OnFileEvent;
139
            _adapter.Deleted += OnFileEvent;
140
            //_adapter.Renamed += OnRenameEvent;
141
            _adapter.Moved += OnMoveEvent;
142
            _watcher.EnableRaisingEvents = true;
143

    
144

    
145
            _agent = Agent<WorkflowState>.Start(inbox =>
146
            {
147
                Action loop = null;
148
                loop = () =>
149
                {
150
                    var message = inbox.Receive();
151
                    var process=message.Then(Process,inbox.CancellationToken);                    
152
                    inbox.LoopAsync(process,loop,ex=>
153
                        Log.ErrorFormat("[ERROR] File Event Processing:\r{0}", ex));
154
                };
155
                loop();
156
            });
157
        }
158

    
159
        private Task<object> Process(WorkflowState state)
160
        {
161
            if (state==null)
162
                throw new ArgumentNullException("state");
163
            Contract.EndContractBlock();
164

    
165
            if (Ignore(state.Path))
166
                return CompletedTask<object>.Default;
167

    
168
            var networkState = NetworkGate.GetNetworkState(state.Path);
169
            //Skip if the file is already being downloaded or uploaded and 
170
            //the change is create or modify
171
            if (networkState != NetworkOperation.None &&
172
                (
173
                    state.TriggeringChange == WatcherChangeTypes.Created ||
174
                    state.TriggeringChange == WatcherChangeTypes.Changed
175
                ))
176
                return CompletedTask<object>.Default;
177

    
178
            try
179
            {
180
                //StatusKeeper.EnsureFileState(state.Path);
181
                
182
                UpdateFileStatus(state);
183
                UpdateOverlayStatus(state);
184
                UpdateFileChecksum(state);
185
                WorkflowAgent.Post(state);
186
            }
187
            catch (IOException exc)
188
            {
189
                if (File.Exists(state.Path))
190
                {
191
                    Log.WarnFormat("File access error occured, retrying {0}\n{1}", state.Path, exc);
192
                    _agent.Post(state);
193
                }
194
                else
195
                {
196
                    Log.WarnFormat("File {0} does not exist. Will be ignored\n{1}", state.Path, exc);
197
                }
198
            }
199
            catch (Exception exc)
200
            {
201
                Log.WarnFormat("Error occured while indexing{0}. The file will be skipped\n{1}",
202
                               state.Path, exc);
203
            }
204
            return CompletedTask<object>.Default;
205
        }
206

    
207
        public bool Pause
208
        {
209
            get { return _watcher == null || !_watcher.EnableRaisingEvents; }
210
            set
211
            {
212
                if (_watcher != null)
213
                    _watcher.EnableRaisingEvents = !value;                
214
            }
215
        }
216

    
217
        public string CachePath { get; set; }
218

    
219
        private List<string> _selectivePaths = new List<string>();
220
        public List<string> SelectivePaths
221
        {
222
            get { return _selectivePaths; }
223
            set { _selectivePaths = value; }
224
        }
225

    
226

    
227
        public void Post(WorkflowState workflowState)
228
        {
229
            if (workflowState == null)
230
                throw new ArgumentNullException("workflowState");
231
            Contract.EndContractBlock();
232

    
233
            _agent.Post(workflowState);
234
        }
235

    
236
        public void Stop()
237
        {
238
            if (_watcher != null)
239
            {
240
                _watcher.Dispose();
241
            }
242
            _watcher = null;
243

    
244
            if (_agent!=null)
245
                _agent.Stop();
246
        }
247

    
248
        // Enumerate all files in the Pithos directory except those in the Fragment folder
249
        // and files with a .ignore extension
250
        public IEnumerable<string> EnumerateFiles(string searchPattern="*")
251
        {
252
            var monitoredFiles = from filePath in Directory.EnumerateFileSystemEntries(RootPath, searchPattern, SearchOption.AllDirectories)
253
                                 where !Ignore(filePath)
254
                                 select filePath;
255
            return monitoredFiles;
256
        }
257

    
258
        public IEnumerable<FileInfo> EnumerateFileInfos(string searchPattern="*")
259
        {
260
            var rootDir = new DirectoryInfo(RootPath);
261
            var monitoredFiles = from file in rootDir.EnumerateFiles(searchPattern, SearchOption.AllDirectories)
262
                                 where !Ignore(file.FullName)
263
                                 select file;
264
            return monitoredFiles;
265
        }                
266

    
267
        public IEnumerable<string> EnumerateFilesAsRelativeUrls(string searchPattern="*")
268
        {
269
            var rootDir = new DirectoryInfo(RootPath);
270
            var monitoredFiles = from file in rootDir.EnumerateFiles(searchPattern, SearchOption.AllDirectories)
271
                                 where !Ignore(file.FullName)
272
                                 select file.AsRelativeUrlTo(RootPath);
273
            return monitoredFiles;
274
        }                
275

    
276
        public IEnumerable<string> EnumerateFilesSystemInfosAsRelativeUrls(string searchPattern="*")
277
        {
278
            var rootDir = new DirectoryInfo(RootPath);
279
            var monitoredFiles = from file in rootDir.EnumerateFileSystemInfos(searchPattern, SearchOption.AllDirectories)
280
                                 where !Ignore(file.FullName)
281
                                 select file.AsRelativeUrlTo(RootPath);
282
            return monitoredFiles;
283
        }                
284

    
285

    
286
        
287

    
288
        private bool Ignore(string filePath)
289
        {
290
            //Ignore all first-level directories and files (ie at the container folders level)
291
            if (FoundBelowRoot(filePath, RootPath,1))
292
                return true;
293

    
294
            //Ignore first-level items under the "others" folder (ie at the accounts folders level).
295
            var othersPath = Path.Combine(RootPath, FolderConstants.OthersFolder);
296
            if (FoundBelowRoot(filePath, othersPath,1))
297
                return true;
298

    
299
            //Ignore second-level (container) folders under the "others" folder (ie at the container folders level). 
300
            if (FoundBelowRoot(filePath, othersPath,2))
301
                return true;            
302

    
303

    
304
            //Ignore anything happening in the cache path
305
            if (filePath.StartsWith(CachePath))
306
                return true;
307
            if (_ignoreFiles.ContainsKey(filePath.ToLower()))
308
                return true;
309

    
310
            //Ignore if selective synchronization is defined, 
311
            return SelectivePaths.Count > 0 
312
                //And the target file is not below any of the selective paths
313
                && !SelectivePaths.Any(filePath.IsAtOrDirectlyBelow);
314
        }
315

    
316
/*        private static bool FoundInRoot(string filePath, string rootPath)
317
        {
318
            //var rootDirectory = new DirectoryInfo(rootPath);
319

    
320
            //If the paths are equal, return true
321
            if (filePath.Equals(rootPath, StringComparison.InvariantCultureIgnoreCase))
322
                return true;
323

    
324
            //If the filepath is below the root path
325
            if (filePath.StartsWith(rootPath,StringComparison.InvariantCulture))
326
            {
327
                //Get the relative path
328
                var relativePath = filePath.Substring(rootPath.Length + 1);
329
                //If the relativePath does NOT contains a path separator, we found a match
330
                return (!relativePath.Contains(@"\"));
331
            }
332

    
333
            //If the filepath is not under the root path, return false
334
            return false;            
335
        }*/
336

    
337

    
338
        private static bool FoundBelowRoot(string filePath, string rootPath,int level)
339
        {
340
            //var rootDirectory = new DirectoryInfo(rootPath);
341

    
342
            //If the paths are equal, return true
343
            if (filePath.Equals(rootPath, StringComparison.InvariantCultureIgnoreCase))
344
                return true;
345

    
346
            //If the filepath is below the root path
347
            if (filePath.StartsWith(rootPath,StringComparison.InvariantCulture))
348
            {
349
                //Get the relative path
350
                var relativePath = filePath.Substring(rootPath.Length + 1);
351
                //If the relativePath does NOT contains a path separator, we found a match
352
                var levels=relativePath.ToCharArray().Count(c=>c=='\\')+1;                
353
                return levels==level;
354
            }
355

    
356
            //If the filepath is not under the root path, return false
357
            return false;            
358
        }
359

    
360
        //Post a Change message for all events except rename
361
        void OnFileEvent(object sender, FileSystemEventArgs e)
362
        {
363
            //Ignore events that affect the cache folder
364
            var filePath = e.FullPath;
365
            if (Ignore(filePath)) 
366
                return;
367
            _eventIdleBatch.Post(e);
368
        }
369

    
370

    
371
/*
372
        //Post a Change message for renames containing the old and new names
373
        void OnRenameEvent(object sender, RenamedEventArgs e)
374
        {
375
            var oldFullPath = e.OldFullPath;
376
            var fullPath = e.FullPath;
377
            if (Ignore(oldFullPath) || Ignore(fullPath))
378
                return;
379

    
380
            _agent.Post(new WorkflowState
381
            {
382
                AccountInfo=AccountInfo,
383
                OldPath = oldFullPath,
384
                OldFileName = e.OldName,
385
                Path = fullPath,
386
                FileName = e.Name,
387
                TriggeringChange = e.ChangeType
388
            });
389
        }
390
*/
391

    
392
        //Post a Change message for moves containing the old and new names
393
        void OnMoveEvent(object sender, MovedEventArgs e)
394
        {
395
            var oldFullPath = e.OldFullPath;
396
            var fullPath = e.FullPath;
397
            if (Ignore(oldFullPath) || Ignore(fullPath))
398
                return;
399

    
400
            _eventIdleBatch.Post(e);
401
        }
402

    
403

    
404

    
405
        private Dictionary<WatcherChangeTypes, FileStatus> _statusDict = new Dictionary<WatcherChangeTypes, FileStatus>
406
                                                                             {
407
            {WatcherChangeTypes.Created,FileStatus.Created},
408
            {WatcherChangeTypes.Changed,FileStatus.Modified},
409
            {WatcherChangeTypes.Deleted,FileStatus.Deleted},
410
            {WatcherChangeTypes.Renamed,FileStatus.Renamed}
411
        };
412

    
413
        private Dictionary<string, string> _ignoreFiles=new Dictionary<string, string>();
414

    
415
        private WorkflowState UpdateFileStatus(WorkflowState state)
416
        {
417
            if (state==null)
418
                throw new ArgumentNullException("state");
419
            if (String.IsNullOrWhiteSpace(state.Path))
420
                throw new ArgumentException("The state's Path can't be empty","state");
421
            Contract.EndContractBlock();
422

    
423
            var path = state.Path;
424
            var status = _statusDict[state.TriggeringChange];
425
            var oldStatus = Workflow.StatusKeeper.GetFileStatus(path);
426
            if (status == oldStatus)
427
            {
428
                state.Status = status;
429
                state.Skip = true;
430
                return state;
431
            }
432
            if (state.Status == FileStatus.Renamed)
433
                Workflow.ClearFileStatus(path);
434

    
435
            state.Status = Workflow.SetFileStatus(path, status);
436
            return state;
437
        }
438

    
439
        private WorkflowState UpdateOverlayStatus(WorkflowState state)
440
        {
441
            if (state==null)
442
                throw new ArgumentNullException("state");
443
            Contract.EndContractBlock();
444

    
445
            if (state.Skip)
446
                return state;
447

    
448
            switch (state.Status)
449
            {
450
                case FileStatus.Created:
451
                    this.StatusKeeper.SetFileOverlayStatus(state.Path, FileOverlayStatus.Modified,state.ShortHash);
452
                    break;
453
                case FileStatus.Modified:
454
                    this.StatusKeeper.SetFileOverlayStatus(state.Path, FileOverlayStatus.Modified,state.ShortHash);
455
                    break;
456
                case FileStatus.Deleted:
457
                    //this.StatusAgent.RemoveFileOverlayStatus(state.Path);
458
                    break;
459
                case FileStatus.Renamed:
460
                    this.StatusKeeper.ClearFileStatus(state.OldPath);
461
                    this.StatusKeeper.SetFileOverlayStatus(state.Path, FileOverlayStatus.Modified,state.ShortHash);
462
                    break;
463
                case FileStatus.Unchanged:
464
                    this.StatusKeeper.SetFileOverlayStatus(state.Path, FileOverlayStatus.Normal,state.ShortHash);
465
                    break;
466
            }
467

    
468
            if (state.Status == FileStatus.Deleted)
469
                NativeMethods.RaiseChangeNotification(Path.GetDirectoryName(state.Path));
470
            else
471
                NativeMethods.RaiseChangeNotification(state.Path);
472
            return state;
473
        }
474

    
475

    
476
        private WorkflowState UpdateFileChecksum(WorkflowState state)
477
        {
478
            if (state.Skip)
479
                return state;
480

    
481
            if (state.Status == FileStatus.Deleted)
482
                return state;
483

    
484
            var path = state.Path;
485
            //Skip calculation for folders
486
            if (Directory.Exists(path))
487
                return state;
488

    
489

    
490
            var info = new FileInfo(path);
491
            StatusNotification.Notify(new StatusNotification(String.Format("Hashing [{0}]",info.Name)));
492

    
493
            var shortHash = info.ComputeShortHash(); 
494
            
495
            string merkleHash = info.CalculateHash(StatusKeeper.BlockSize,StatusKeeper.BlockHash);
496
            StatusKeeper.UpdateFileChecksum(path,shortHash, merkleHash);
497

    
498
            state.Hash = merkleHash;
499
            return state;
500
        }
501

    
502
        //Does the file exist in the container's local folder?
503
        public bool Exists(string relativePath)
504
        {
505
            if (String.IsNullOrWhiteSpace(relativePath))
506
                throw new ArgumentNullException("relativePath");
507
            //A RootPath must be set before calling this method
508
            if (String.IsNullOrWhiteSpace(RootPath))
509
                throw new InvalidOperationException("RootPath was not set");
510
            Contract.EndContractBlock();
511
            //Create the absolute path by combining the RootPath with the relativePath
512
            var absolutePath=Path.Combine(RootPath, relativePath);
513
            //Is this a valid file?
514
            if (File.Exists(absolutePath))
515
                return true;
516
            //Or a directory?
517
            if (Directory.Exists(absolutePath))
518
                return true;
519
            //Fail if it is neither
520
            return false;
521
        }
522

    
523
        public static FileAgent GetFileAgent(AccountInfo accountInfo)
524
        {
525
            return GetFileAgent(accountInfo.AccountPath);
526
        }
527

    
528
        public static FileAgent GetFileAgent(string rootPath)
529
        {
530
            return AgentLocator<FileAgent>.Get(rootPath.ToLower());
531
        }
532

    
533

    
534
        public FileSystemInfo GetFileSystemInfo(string relativePath)
535
        {
536
            if (String.IsNullOrWhiteSpace(relativePath))
537
                throw new ArgumentNullException("relativePath");
538
            //A RootPath must be set before calling this method
539
            if (String.IsNullOrWhiteSpace(RootPath))
540
                throw new InvalidOperationException("RootPath was not set");            
541
            Contract.EndContractBlock();            
542

    
543
            var absolutePath = Path.Combine(RootPath, relativePath);
544

    
545
            if (Directory.Exists(absolutePath))
546
                return new DirectoryInfo(absolutePath).WithProperCapitalization();
547
            else
548
                return new FileInfo(absolutePath).WithProperCapitalization();
549
            
550
        }
551

    
552
        public void Delete(string relativePath)
553
        {
554
            var absolutePath = Path.Combine(RootPath, relativePath).ToLower();
555
            if (File.Exists(absolutePath))
556
            {    
557
                try
558
                {
559
                    File.Delete(absolutePath);
560
                }
561
                //The file may have been deleted by another thread. Just ignore the relevant exception
562
                catch (FileNotFoundException) { }
563
            }
564
            else if (Directory.Exists(absolutePath))
565
            {
566
                try
567
                {
568
                    Directory.Delete(absolutePath, true);
569
                }
570
                //The directory may have been deleted by another thread. Just ignore the relevant exception
571
                catch (DirectoryNotFoundException){}                
572
            }
573
        
574
            //_ignoreFiles[absolutePath] = absolutePath;                
575
            StatusKeeper.ClearFileStatus(absolutePath);
576
        }
577
    }
578
}