Statistics
| Branch: | Revision:

root / trunk / Pithos.Core / Agents / FileAgent.cs @ ee950288

History | View | Annotate | Download (23 kB)

1
#region
2
/* -----------------------------------------------------------------------
3
 * <copyright file="FileAgent.cs" company="GRNet">
4
 * 
5
 * Copyright 2011-2012 GRNET S.A. All rights reserved.
6
 *
7
 * Redistribution and use in source and binary forms, with or
8
 * without modification, are permitted provided that the following
9
 * conditions are met:
10
 *
11
 *   1. Redistributions of source code must retain the above
12
 *      copyright notice, this list of conditions and the following
13
 *      disclaimer.
14
 *
15
 *   2. Redistributions in binary form must reproduce the above
16
 *      copyright notice, this list of conditions and the following
17
 *      disclaimer in the documentation and/or other materials
18
 *      provided with the distribution.
19
 *
20
 *
21
 * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
22
 * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
23
 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
24
 * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
25
 * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26
 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27
 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
28
 * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
29
 * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
30
 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
31
 * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
32
 * POSSIBILITY OF SUCH DAMAGE.
33
 *
34
 * The views and conclusions contained in the software and
35
 * documentation are those of the authors and should not be
36
 * interpreted as representing official policies, either expressed
37
 * or implied, of GRNET S.A.
38
 * </copyright>
39
 * -----------------------------------------------------------------------
40
 */
41
#endregion
42
using System;
43
using System.Collections.Generic;
44
using System.Diagnostics.Contracts;
45
using System.IO;
46
using System.Linq;
47
using System.Reflection;
48
using System.Threading.Tasks;
49
using Pithos.Interfaces;
50
using Pithos.Network;
51
using log4net;
52

    
53
namespace Pithos.Core.Agents
54
{
55
//    [Export]
56
    public class FileAgent
57
    {
58
        private static readonly ILog Log = LogManager.GetLogger(MethodBase.GetCurrentMethod().DeclaringType);
59

    
60
        Agent<WorkflowState> _agent;
61
        private FileSystemWatcher _watcher;
62
        private FileSystemWatcherAdapter _adapter;
63

    
64
        //[Import]
65
        public IStatusKeeper StatusKeeper { get; set; }
66

    
67
        public IStatusNotification StatusNotification { get; set; }
68
        //[Import]
69
        public IPithosWorkflow Workflow { get; set; }
70
        //[Import]
71
        public WorkflowAgent WorkflowAgent { get; set; }
72

    
73
        private AccountInfo AccountInfo { get; set; }
74

    
75
        internal string RootPath { get;  set; }
76
        
77
        private FileEventIdleBatch _eventIdleBatch;
78

    
79
        public TimeSpan IdleTimeout { get; set; }
80

    
81

    
82
        private void ProcessBatchedEvents(Dictionary<string, FileSystemEventArgs[]> fileEvents)
83
        {
84
            StatusNotification.SetPithosStatus(PithosStatus.LocalSyncing,String.Format("Uploading {0} files",fileEvents.Count));
85
            foreach (var fileEvent in fileEvents)
86
            {
87
                var filePath = fileEvent.Key;
88
                var changes = fileEvent.Value;
89
                
90
                if (Ignore(filePath)) continue;
91
                                
92
                foreach (var change in changes)
93
                {
94
                    if (change.ChangeType == WatcherChangeTypes.Renamed)
95
                    {
96
                        var rename = (MovedEventArgs) change;
97
                        _agent.Post(new WorkflowState(change)
98
                                        {
99
                                            AccountInfo = AccountInfo,
100
                                            OldPath = rename.OldFullPath,
101
                                            OldFileName = Path.GetFileName(rename.OldName),
102
                                            Path = rename.FullPath,
103
                                            FileName = Path.GetFileName(rename.Name),
104
                                            TriggeringChange = rename.ChangeType
105
                                        });
106
                    }
107
                    else
108
                        _agent.Post(new WorkflowState(change)
109
                        {
110
                            AccountInfo = AccountInfo,
111
                            Path = change.FullPath,
112
                            FileName = Path.GetFileName(change.Name),
113
                            TriggeringChange = change.ChangeType
114
                        });                        
115
                }
116
            }
117
            StatusNotification.SetPithosStatus(PithosStatus.LocalComplete);
118
        }
119

    
120
        public void Start(AccountInfo accountInfo,string rootPath)
121
        {
122
            if (accountInfo==null)
123
                throw new ArgumentNullException("accountInfo");
124
            if (String.IsNullOrWhiteSpace(rootPath))
125
                throw new ArgumentNullException("rootPath");
126
            if (!Path.IsPathRooted(rootPath))
127
                throw new ArgumentException("rootPath must be an absolute path","rootPath");
128
            if (IdleTimeout == null)
129
                throw new InvalidOperationException("IdleTimeout must have a valid value");
130
            Contract.EndContractBlock();
131

    
132
            AccountInfo = accountInfo;
133
            RootPath = rootPath;
134

    
135
            _eventIdleBatch = new FileEventIdleBatch((int)IdleTimeout.TotalMilliseconds, ProcessBatchedEvents);
136

    
137
            _watcher = new FileSystemWatcher(rootPath) {IncludeSubdirectories = true,InternalBufferSize=8*4096};
138
            _adapter = new FileSystemWatcherAdapter(_watcher);
139

    
140
            _adapter.Changed += OnFileEvent;
141
            _adapter.Created += OnFileEvent;
142
            _adapter.Deleted += OnFileEvent;
143
            //_adapter.Renamed += OnRenameEvent;
144
            _adapter.Moved += OnMoveEvent;
145
            _watcher.EnableRaisingEvents = true;
146

    
147

    
148
            _agent = Agent<WorkflowState>.Start(inbox =>
149
            {
150
                Action loop = null;
151
                loop = () =>
152
                {
153
                    var message = inbox.Receive();
154
                    var process=message.Then(Process,inbox.CancellationToken);                    
155
                    inbox.LoopAsync(process,loop,ex=>
156
                        Log.ErrorFormat("[ERROR] File Event Processing:\r{0}", ex));
157
                };
158
                loop();
159
            });
160
        }
161

    
162
        private Task<object> Process(WorkflowState state)
163
        {
164
            if (state==null)
165
                throw new ArgumentNullException("state");
166
            Contract.EndContractBlock();
167

    
168
            if (Ignore(state.Path))
169
                return CompletedTask<object>.Default;
170

    
171
            var networkState = NetworkGate.GetNetworkState(state.Path);
172
            //Skip if the file is already being downloaded or uploaded and 
173
            //the change is create or modify
174
            if (networkState != NetworkOperation.None &&
175
                (
176
                    state.TriggeringChange == WatcherChangeTypes.Created ||
177
                    state.TriggeringChange == WatcherChangeTypes.Changed
178
                ))
179
                return CompletedTask<object>.Default;
180

    
181
            try
182
            {
183
                //StatusKeeper.EnsureFileState(state.Path);
184
                
185
                UpdateFileStatus(state);
186
                UpdateOverlayStatus(state);
187
                UpdateFileChecksum(state);
188
                WorkflowAgent.Post(state);
189
            }
190
            catch (IOException exc)
191
            {
192
                if (File.Exists(state.Path))
193
                {
194
                    Log.WarnFormat("File access error occured, retrying {0}\n{1}", state.Path, exc);
195
                    _agent.Post(state);
196
                }
197
                else
198
                {
199
                    Log.WarnFormat("File {0} does not exist. Will be ignored\n{1}", state.Path, exc);
200
                }
201
            }
202
            catch (Exception exc)
203
            {
204
                Log.WarnFormat("Error occured while indexing{0}. The file will be skipped\n{1}",
205
                               state.Path, exc);
206
            }
207
            return CompletedTask<object>.Default;
208
        }
209

    
210
        public bool Pause
211
        {
212
            get { return _watcher == null || !_watcher.EnableRaisingEvents; }
213
            set
214
            {
215
                if (_watcher != null)
216
                    _watcher.EnableRaisingEvents = !value;                
217
            }
218
        }
219

    
220
        public string CachePath { get; set; }
221

    
222
        /*private List<string> _selectivePaths = new List<string>();
223
        public List<string> SelectivePaths
224
        {
225
            get { return _selectivePaths; }
226
            set { _selectivePaths = value; }
227
        }
228
*/
229
        public Selectives Selectives { get; set; }
230

    
231

    
232
        public void Post(WorkflowState workflowState)
233
        {
234
            if (workflowState == null)
235
                throw new ArgumentNullException("workflowState");
236
            Contract.EndContractBlock();
237

    
238
            _agent.Post(workflowState);
239
        }
240

    
241
        public void Stop()
242
        {
243
            if (_watcher != null)
244
            {
245
                _watcher.Dispose();
246
            }
247
            _watcher = null;
248

    
249
            if (_agent!=null)
250
                _agent.Stop();
251
        }
252

    
253
        // Enumerate all files in the Pithos directory except those in the Fragment folder
254
        // and files with a .ignore extension
255
        public IEnumerable<string> EnumerateFiles(string searchPattern="*")
256
        {
257
            var monitoredFiles = from filePath in Directory.EnumerateFileSystemEntries(RootPath, searchPattern, SearchOption.AllDirectories)
258
                                 where !Ignore(filePath)
259
                                 select filePath;
260
            return monitoredFiles;
261
        }
262

    
263
        public IEnumerable<FileInfo> EnumerateFileInfos(string searchPattern="*")
264
        {
265
            var rootDir = new DirectoryInfo(RootPath);
266
            var monitoredFiles = from file in rootDir.EnumerateFiles(searchPattern, SearchOption.AllDirectories)
267
                                 where !Ignore(file.FullName)
268
                                 select file;
269
            return monitoredFiles;
270
        }                
271

    
272
        public IEnumerable<string> EnumerateFilesAsRelativeUrls(string searchPattern="*")
273
        {
274
            var rootDir = new DirectoryInfo(RootPath);
275
            var monitoredFiles = from file in rootDir.EnumerateFiles(searchPattern, SearchOption.AllDirectories)
276
                                 where !Ignore(file.FullName)
277
                                 select file.AsRelativeUrlTo(RootPath);
278
            return monitoredFiles;
279
        }                
280

    
281
        public IEnumerable<string> EnumerateFilesSystemInfosAsRelativeUrls(string searchPattern="*")
282
        {
283
            var rootDir = new DirectoryInfo(RootPath);
284
            var monitoredFiles = from file in rootDir.EnumerateFileSystemInfos(searchPattern, SearchOption.AllDirectories)
285
                                 where !Ignore(file.FullName)
286
                                 select file.AsRelativeUrlTo(RootPath);
287
            return monitoredFiles;
288
        }                
289

    
290

    
291
        
292

    
293
        private bool Ignore(string filePath)
294
        {
295
            if (IgnorePaths(filePath)) return true;
296

    
297

    
298
            //If selective sync is enabled, propagate folder events
299
            if (Selectives.IsSelectiveEnabled(AccountInfo.AccountKey) && Directory.Exists(filePath))
300
                return false;
301
            //Ignore if selective synchronization is defined, 
302
            //And the target file is not below any of the selective paths
303
            return !Selectives.IsSelected(AccountInfo, filePath);
304
        }
305

    
306
        private bool IgnorePaths(string filePath)
307
        {
308
//Ignore all first-level directories and files (ie at the container folders level)
309
            if (FoundBelowRoot(filePath, RootPath, 1))
310
                return true;
311

    
312
            //Ignore first-level items under the "others" folder (ie at the accounts folders level).
313
            var othersPath = Path.Combine(RootPath, FolderConstants.OthersFolder);
314
            if (FoundBelowRoot(filePath, othersPath, 1))
315
                return true;
316

    
317
            //Ignore second-level (container) folders under the "others" folder (ie at the container folders level). 
318
            if (FoundBelowRoot(filePath, othersPath, 2))
319
                return true;
320

    
321

    
322
            //Ignore anything happening in the cache path
323
            if (filePath.StartsWith(CachePath))
324
                return true;
325
            if (_ignoreFiles.ContainsKey(filePath.ToLower()))
326
                return true;
327
            
328
            //If selective sync is enabled, propagate folder events
329
            if (Selectives.IsSelectiveEnabled(AccountInfo.AccountKey) && Directory.Exists(filePath))
330
                return false;
331
            //Ignore if selective synchronization is defined, 
332
            //And the target file is not below any of the selective paths
333
            return !Selectives.IsSelected(AccountInfo, filePath);
334
        }
335

    
336
/*        private static bool FoundInRoot(string filePath, string rootPath)
337
        {
338
            //var rootDirectory = new DirectoryInfo(rootPath);
339

    
340
            //If the paths are equal, return true
341
            if (filePath.Equals(rootPath, StringComparison.InvariantCultureIgnoreCase))
342
                return true;
343

    
344
            //If the filepath is below the root path
345
            if (filePath.StartsWith(rootPath,StringComparison.InvariantCulture))
346
            {
347
                //Get the relative path
348
                var relativePath = filePath.Substring(rootPath.Length + 1);
349
                //If the relativePath does NOT contains a path separator, we found a match
350
                return (!relativePath.Contains(@"\"));
351
            }
352

    
353
            //If the filepath is not under the root path, return false
354
            return false;            
355
        }*/
356

    
357

    
358
        private static bool FoundBelowRoot(string filePath, string rootPath,int level)
359
        {
360
            //var rootDirectory = new DirectoryInfo(rootPath);
361

    
362
            //If the paths are equal, return true
363
            if (filePath.Equals(rootPath, StringComparison.InvariantCultureIgnoreCase))
364
                return true;
365

    
366
            //If the filepath is below the root path
367
            if (filePath.StartsWith(rootPath,StringComparison.InvariantCulture))
368
            {
369
                //Get the relative path
370
                var relativePath = filePath.Substring(rootPath.Length + 1);
371
                //If the relativePath does NOT contains a path separator, we found a match
372
                var levels=relativePath.ToCharArray().Count(c=>c=='\\')+1;                
373
                return levels==level;
374
            }
375

    
376
            //If the filepath is not under the root path, return false
377
            return false;            
378
        }
379

    
380
        //Post a Change message for all events except rename
381
        void OnFileEvent(object sender, FileSystemEventArgs e)
382
        {
383
            //Ignore events that affect the cache folder
384
            var filePath = e.FullPath;
385
            if (Ignore(filePath)) 
386
                return;
387
            _eventIdleBatch.Post(e);
388
        }
389

    
390

    
391
/*
392
        //Post a Change message for renames containing the old and new names
393
        void OnRenameEvent(object sender, RenamedEventArgs e)
394
        {
395
            var oldFullPath = e.OldFullPath;
396
            var fullPath = e.FullPath;
397
            if (Ignore(oldFullPath) || Ignore(fullPath))
398
                return;
399

    
400
            _agent.Post(new WorkflowState
401
            {
402
                AccountInfo=AccountInfo,
403
                OldPath = oldFullPath,
404
                OldFileName = e.OldName,
405
                Path = fullPath,
406
                FileName = e.Name,
407
                TriggeringChange = e.ChangeType
408
            });
409
        }
410
*/
411

    
412
        //Post a Change message for moves containing the old and new names
413
        void OnMoveEvent(object sender, MovedEventArgs e)
414
        {
415
            var oldFullPath = e.OldFullPath;
416
            var fullPath = e.FullPath;
417
            
418

    
419
            //If the source path is one of the ignored folders, ignore
420
            if (IgnorePaths(oldFullPath)) 
421
                return;
422

    
423
            //TODO: Must prevent move propagation if the source folder is blocked by selective sync
424
            //Ignore takes into account Selective Sync
425
            if (Ignore(fullPath))
426
                return;
427

    
428
            _eventIdleBatch.Post(e);
429
        }
430

    
431

    
432

    
433
        private Dictionary<WatcherChangeTypes, FileStatus> _statusDict = new Dictionary<WatcherChangeTypes, FileStatus>
434
                                                                             {
435
            {WatcherChangeTypes.Created,FileStatus.Created},
436
            {WatcherChangeTypes.Changed,FileStatus.Modified},
437
            {WatcherChangeTypes.Deleted,FileStatus.Deleted},
438
            {WatcherChangeTypes.Renamed,FileStatus.Renamed}
439
        };
440

    
441
        private Dictionary<string, string> _ignoreFiles=new Dictionary<string, string>();
442

    
443
        private WorkflowState UpdateFileStatus(WorkflowState state)
444
        {
445
            if (state==null)
446
                throw new ArgumentNullException("state");
447
            if (String.IsNullOrWhiteSpace(state.Path))
448
                throw new ArgumentException("The state's Path can't be empty","state");
449
            Contract.EndContractBlock();
450

    
451
            var path = state.Path;
452
            var status = _statusDict[state.TriggeringChange];
453
            var oldStatus = Workflow.StatusKeeper.GetFileStatus(path);
454
            if (status == oldStatus)
455
            {
456
                state.Status = status;
457
                state.Skip = true;
458
                return state;
459
            }
460
            if (state.Status == FileStatus.Renamed)
461
                Workflow.ClearFileStatus(path);
462

    
463
            state.Status = Workflow.SetFileStatus(path, status);
464
            return state;
465
        }
466

    
467
        private WorkflowState UpdateOverlayStatus(WorkflowState state)
468
        {
469
            if (state==null)
470
                throw new ArgumentNullException("state");
471
            Contract.EndContractBlock();
472

    
473
            if (state.Skip)
474
                return state;
475

    
476
            switch (state.Status)
477
            {
478
                case FileStatus.Created:
479
                    this.StatusKeeper.SetFileOverlayStatus(state.Path, FileOverlayStatus.Modified,state.ShortHash);
480
                    break;
481
                case FileStatus.Modified:
482
                    this.StatusKeeper.SetFileOverlayStatus(state.Path, FileOverlayStatus.Modified,state.ShortHash);
483
                    break;
484
                case FileStatus.Deleted:
485
                    //this.StatusAgent.RemoveFileOverlayStatus(state.Path);
486
                    break;
487
                case FileStatus.Renamed:
488
                    this.StatusKeeper.ClearFileStatus(state.OldPath);
489
                    this.StatusKeeper.SetFileOverlayStatus(state.Path, FileOverlayStatus.Modified,state.ShortHash);
490
                    break;
491
                case FileStatus.Unchanged:
492
                    this.StatusKeeper.SetFileOverlayStatus(state.Path, FileOverlayStatus.Normal,state.ShortHash);
493
                    break;
494
            }
495

    
496
            if (state.Status == FileStatus.Deleted)
497
                NativeMethods.RaiseChangeNotification(Path.GetDirectoryName(state.Path));
498
            else
499
                NativeMethods.RaiseChangeNotification(state.Path);
500
            return state;
501
        }
502

    
503

    
504
        private WorkflowState UpdateFileChecksum(WorkflowState state)
505
        {
506
            if (state.Skip)
507
                return state;
508

    
509
            if (state.Status == FileStatus.Deleted)
510
                return state;
511

    
512
            var path = state.Path;
513
            //Skip calculation for folders
514
            if (Directory.Exists(path))
515
                return state;
516

    
517
            var info = new FileInfo(path);
518

    
519
            using (StatusNotification.GetNotifier("Hashing {0}", "Finished Hashing {0}", info.Name))
520
            {
521

    
522
                var shortHash = info.ComputeShortHash();
523

    
524
                string merkleHash = info.CalculateHash(StatusKeeper.BlockSize, StatusKeeper.BlockHash);
525
                StatusKeeper.UpdateFileChecksum(path, shortHash, merkleHash);
526

    
527
                state.Hash = merkleHash;
528
                return state;
529
            }
530
        }
531

    
532
        //Does the file exist in the container's local folder?
533
        public bool Exists(string relativePath)
534
        {
535
            if (String.IsNullOrWhiteSpace(relativePath))
536
                throw new ArgumentNullException("relativePath");
537
            //A RootPath must be set before calling this method
538
            if (String.IsNullOrWhiteSpace(RootPath))
539
                throw new InvalidOperationException("RootPath was not set");
540
            Contract.EndContractBlock();
541
            //Create the absolute path by combining the RootPath with the relativePath
542
            var absolutePath=Path.Combine(RootPath, relativePath);
543
            //Is this a valid file?
544
            if (File.Exists(absolutePath))
545
                return true;
546
            //Or a directory?
547
            if (Directory.Exists(absolutePath))
548
                return true;
549
            //Fail if it is neither
550
            return false;
551
        }
552

    
553
        public static FileAgent GetFileAgent(AccountInfo accountInfo)
554
        {
555
            return GetFileAgent(accountInfo.AccountPath);
556
        }
557

    
558
        public static FileAgent GetFileAgent(string rootPath)
559
        {
560
            return AgentLocator<FileAgent>.Get(rootPath.ToLower());
561
        }
562

    
563

    
564
        public FileSystemInfo GetFileSystemInfo(string relativePath)
565
        {
566
            if (String.IsNullOrWhiteSpace(relativePath))
567
                throw new ArgumentNullException("relativePath");
568
            //A RootPath must be set before calling this method
569
            if (String.IsNullOrWhiteSpace(RootPath))
570
                throw new InvalidOperationException("RootPath was not set");            
571
            Contract.EndContractBlock();            
572

    
573
            var absolutePath = Path.Combine(RootPath, relativePath);
574

    
575
            if (Directory.Exists(absolutePath))
576
                return new DirectoryInfo(absolutePath).WithProperCapitalization();
577
            else
578
                return new FileInfo(absolutePath).WithProperCapitalization();
579
            
580
        }
581

    
582
        public void Delete(string relativePath)
583
        {
584
            var absolutePath = Path.Combine(RootPath, relativePath).ToLower();
585
            if (Log.IsDebugEnabled)
586
                Log.DebugFormat("Deleting {0}", absolutePath);
587
            if (File.Exists(absolutePath))
588
            {    
589
                try
590
                {
591
                    File.Delete(absolutePath);
592
                }
593
                //The file may have been deleted by another thread. Just ignore the relevant exception
594
                catch (FileNotFoundException) { }
595
            }
596
            else if (Directory.Exists(absolutePath))
597
            {
598
                try
599
                {
600
                    Directory.Delete(absolutePath, true);
601
                }
602
                //The directory may have been deleted by another thread. Just ignore the relevant exception
603
                catch (DirectoryNotFoundException){}                
604
            }
605
        
606
            //_ignoreFiles[absolutePath] = absolutePath;                
607
            StatusKeeper.ClearFileStatus(absolutePath);
608
        }
609
    }
610
}