Statistics
| Branch: | Revision:

root / trunk / Pithos.Core / PithosMonitor.cs @ b5061ac8

History | View | Annotate | Download (24.3 kB)

1
using System;
2
using System.Collections.Concurrent;
3
using System.Collections.Generic;
4
using System.ComponentModel.Composition;
5
using System.Diagnostics;
6
using System.Diagnostics.Contracts;
7
using System.IO;
8
using System.Linq;
9
using System.Net.NetworkInformation;
10
using System.Security.Cryptography;
11
using System.ServiceModel.Description;
12
using System.Text;
13
using System.Threading;
14
using System.Threading.Tasks;
15
using Microsoft.WindowsAPICodePack.Net;
16
using Pithos.Interfaces;
17
using System.ServiceModel;
18

    
19
namespace Pithos.Core
20
{
21
    [Export(typeof(PithosMonitor))]
22
    public class PithosMonitor:IDisposable
23
    {
24
        [Import]
25
        public IPithosSettings Settings{get;set;}
26

    
27
        [Import]
28
        public IStatusKeeper StatusKeeper { get; set; }
29

    
30
        [Import]
31
        public IPithosWorkflow Workflow { get; set; }
32

    
33
        [Import]
34
        public ICloudClient CloudClient { get; set; }
35

    
36
        [Import]
37
        public ICloudClient CloudListeningClient { get; set; }
38

    
39
        private ServiceHost _statusService { get; set; }
40

    
41
        private FileSystemWatcher _watcher;
42

    
43
        public bool Pause
44
        {
45
            get { return _watcher == null || !_watcher.EnableRaisingEvents; }
46
            set
47
            {
48
                if (_watcher!=null)
49
                    _watcher.EnableRaisingEvents = !value;
50
                if (value)
51
                {
52
                    StatusKeeper.SetPithosStatus(PithosStatus.SyncPaused);
53
                }
54
                else
55
                {
56
                    StatusKeeper.SetPithosStatus(PithosStatus.InSynch);
57
                }
58
            }
59
        }
60

    
61

    
62
        CancellationTokenSource _cancellationSource;
63

    
64
        readonly BlockingCollection<WorkflowState> _fileEvents = new BlockingCollection<WorkflowState>();
65
        readonly BlockingCollection<WorkflowState> _uploadEvents = new BlockingCollection<WorkflowState>();
66

    
67
        
68

    
69
        public void Start()
70
        {
71

    
72
            if (_cancellationSource != null)
73
            {
74
                if (!_cancellationSource.IsCancellationRequested)
75
                    return;
76
            }
77
            _cancellationSource = new CancellationTokenSource();
78

    
79
            string path = Settings.PithosPath;
80
            var proxyUri = ProxyFromSettings();            
81
            CloudClient.Proxy = proxyUri;
82
            IndexLocalFiles(path);
83
            StartMonitoringFiles(path);
84

    
85
            StartStatusService();
86

    
87
            StartNetwork();
88
        }
89

    
90
        private Uri ProxyFromSettings()
91
        {            
92
            if (Settings.UseManualProxy)
93
            {
94
                var proxyUri = new UriBuilder
95
                                   {
96
                                       Host = Settings.ProxyServer, 
97
                                       Port = Settings.ProxyPort
98
                                   };
99
                if (Settings.ProxyAuthentication)
100
                {
101
                    proxyUri.UserName = Settings.ProxyUsername;
102
                    proxyUri.Password = Settings.ProxyPassword;
103
                }
104
                return proxyUri.Uri;
105
            }
106
            return null;
107
        }
108

    
109
        private void IndexLocalFiles(string path)
110
        {
111
            Trace.TraceInformation("[START] Inxed Local");
112
            try
113
            {
114
                var files =
115
                    from filePath in Directory.EnumerateFiles(path, "*", SearchOption.AllDirectories).AsParallel()
116
                    select filePath;
117
                StatusKeeper.StoreUnversionedFiles(files);
118

    
119
                var newFiles= FileState.FindAllByProperty("OverlayStatus", FileOverlayStatus.Unversioned)
120
                            .Select(state=>state.FilePath);
121
                foreach (var newFile in newFiles)
122
                {
123
                    _uploadEvents.Add(new WorkflowState
124
                                          {
125
                                              Path = newFile,
126
                                              FileName = Path.GetFileName(newFile),
127
                                              TriggeringChange = WatcherChangeTypes.Created
128
                                          });
129
                }
130

    
131
            }
132
            catch (Exception exc)
133
            {
134
                Trace.TraceError("[ERROR] Index Local - {0}", exc);
135
            }
136
            finally
137
            {
138
                Trace.TraceInformation("[END] Inxed Local");
139
            }
140
        }
141

    
142
        private void StartStatusService()
143
        {
144
            // Create a ServiceHost for the CalculatorService type and provide the base address.
145
            var baseAddress = new Uri("net.pipe://localhost/pithos");
146
            _statusService = new ServiceHost(typeof(StatusService), baseAddress);
147
            
148
            var binding = new NetNamedPipeBinding(NetNamedPipeSecurityMode.None);
149
            
150
            _statusService.AddServiceEndpoint(typeof(IStatusService), binding, "net.pipe://localhost/pithos/statuscache");
151
            _statusService.AddServiceEndpoint(typeof (ISettingsService), binding, "net.pipe://localhost/pithos/settings");
152

    
153

    
154
            //// Add a mex endpoint
155
            var smb = new ServiceMetadataBehavior
156
                          {
157
                              HttpGetEnabled = true, 
158
                              HttpGetUrl = new Uri("http://localhost:30000/pithos/mex")
159
                          };
160
            _statusService.Description.Behaviors.Add(smb);
161

    
162

    
163
            _statusService.Open();
164
        }
165

    
166
        private void StopStatusService()
167
        {
168
            if (_statusService == null)
169
                return;
170

    
171
            if (_statusService.State == CommunicationState.Faulted)
172
                _statusService.Abort();
173
            else if (_statusService.State != CommunicationState.Closed)
174
                _statusService.Close();
175
            _statusService = null;
176

    
177
        }
178

    
179

    
180
        private void StartNetwork()
181
        {
182

    
183
            bool connected = NetworkListManager.IsConnectedToInternet;
184
            //If we are not connected retry later
185
            if (!connected)
186
            {
187
                Task.Factory.StartNewDelayed(10000, StartNetwork);
188
                return;
189
            }
190

    
191
            try
192
            {
193
                
194
                CloudClient.Authenticate(Settings.UserName, Settings.ApiKey);
195

    
196
                StartListening(Settings.PithosPath);
197
                StartSending();
198
            }
199
            catch (Exception)
200
            {
201
                //Faild to authenticate due to network or account error
202
                //Retry after a while
203
                Task.Factory.StartNewDelayed(10000, StartNetwork);
204
            }
205
        }
206

    
207
        internal enum CloudActionType
208
        {
209
            Upload=0,
210
            Download,
211
            UploadUnconditional,
212
            DownloadUnconditional,
213
            DeleteLocal,
214
            DeleteCloud
215
        }
216

    
217
        internal class ListenerAction
218
        {
219
            public CloudActionType Action { get; set; }
220
            public FileInfo LocalFile { get; set; }
221
            public ObjectInfo CloudFile { get; set; }
222

    
223
            public Lazy<string> LocalHash { get; set; }
224

    
225
            public ListenerAction(CloudActionType action, FileInfo localFile, ObjectInfo cloudFile)
226
            {
227
                Action = action;
228
                LocalFile = localFile;
229
                CloudFile = cloudFile;
230
                LocalHash=new Lazy<string>(()=>Signature.CalculateHash(LocalFile.FullName),LazyThreadSafetyMode.ExecutionAndPublication);
231
            }
232
            
233
        }
234

    
235
        internal class LocalFileComparer:EqualityComparer<ListenerAction>
236
        {
237
            public override bool Equals(ListenerAction x, ListenerAction y)
238
            {
239
                if (x.Action != y.Action)
240
                    return false;
241
                if (x.LocalFile != null && y.LocalFile != null && !x.LocalFile.FullName.Equals(y.LocalFile.FullName))
242
                    return false;
243
                if (x.CloudFile != null && y.CloudFile != null && !x.CloudFile.Hash.Equals(y.CloudFile.Hash))
244
                    return false;
245
                if (x.CloudFile == null ^ y.CloudFile == null ||
246
                    x.LocalFile == null ^ y.LocalFile == null)
247
                    return false;
248
                return true;
249
            }
250

    
251
            public override int GetHashCode(ListenerAction obj)
252
            {
253
                var hash1 = (obj.LocalFile == null) ? int.MaxValue : obj.LocalFile.FullName.GetHashCode();
254
                var hash2 = (obj.CloudFile == null) ? int.MaxValue : obj.CloudFile.Hash.GetHashCode();
255
                var hash3 = obj.Action.GetHashCode();
256
                return hash1 ^ hash2 & hash3;
257
            }
258
        }
259

    
260
        private BlockingCollection<ListenerAction> _listenerActions=new BlockingCollection<ListenerAction>();
261

    
262
        private Timer timer;
263

    
264
        private void StartListening(string accountPath)
265
        {
266
            
267
            ProcessRemoteFiles(accountPath);
268

    
269
            Task.Factory.StartNew(ProcessListenerActions);
270
                        
271
        }
272

    
273
        private Task ProcessRemoteFiles(string accountPath)
274
        {
275
            Trace.TraceInformation("[LISTENER] Scheduled");    
276
            return Task.Factory.StartNewDelayed(10000)
277
                .ContinueWith(t=>CloudClient.ListObjects("PITHOS"))
278
                .ContinueWith(task =>
279
                                  {
280
                                      Trace.TraceInformation("[LISTENER] Start Processing");
281

    
282
                                      var remoteObjects = task.Result;
283
/*
284
                                      if (remoteObjects.Count == 0)
285
                                          return;
286
*/
287

    
288
                                      var pithosDir = new DirectoryInfo(accountPath);
289
                                      
290
                                      var remoteFiles = from info in remoteObjects
291
                                                    select info.Name;
292
                                      
293
                                      var onlyLocal = from localFile in pithosDir.EnumerateFiles()
294
                                                      where !remoteFiles.Contains(localFile.Name) 
295
                                                      select new ListenerAction(CloudActionType.UploadUnconditional, localFile,null);
296
                                      
297
                                      
298
                                    
299

    
300
                                      var localNames =pithosDir.EnumerateFiles().Select(info => info.Name);
301
                                      var onlyRemote = from upFile in remoteObjects
302
                                                       where !localNames.Contains(upFile.Name)
303
                                                       select new ListenerAction(CloudActionType.DownloadUnconditional,null,upFile);
304

    
305

    
306
                                      var commonObjects = from  upFile in remoteObjects
307
                                                            join  localFile in pithosDir.EnumerateFiles()
308
                                                                on upFile.Name equals localFile.Name 
309
                                                            select new ListenerAction(CloudActionType.Download, localFile, upFile);
310

    
311
                                      var uniques =
312
                                          onlyLocal.Union(onlyRemote).Union(commonObjects)
313
                                              .Except(_listenerActions,new LocalFileComparer());
314
                                      
315
                                      _listenerActions.AddFromEnumerable(uniques, false);
316

    
317
                                      Trace.TraceInformation("[LISTENER] End Processing");
318
                                      
319
                                  }
320
                ).ContinueWith(t=>
321
                {
322
                    if (t.IsFaulted)
323
                    {
324
                        Trace.TraceError("[LISTENER] Exception: {0}",t.Exception);                                           
325
                    }
326
                    else
327
                    {
328
                        Trace.TraceInformation("[LISTENER] Finished");                                           
329
                    }                    
330
                    ProcessRemoteFiles(accountPath);
331
                });
332
        }
333

    
334
        private void ProcessListenerActions()
335
        {
336
            foreach(var action in _listenerActions.GetConsumingEnumerable())
337
            {
338
                Trace.TraceInformation("[ACTION] Start Processing {0}:{1}->{2}",action.Action,action.LocalFile,action.CloudFile);
339
                var localFile = action.LocalFile;
340
                var cloudFile = action.CloudFile;
341
                var downloadPath = (cloudFile == null)? String.Empty
342
                                        : Path.Combine(Settings.PithosPath,cloudFile.Name);
343
                try
344
                {
345
                    switch (action.Action)
346
                    {
347
                        case CloudActionType.UploadUnconditional:
348

    
349
                            UploadCloudFile(localFile.Name,localFile.Length,localFile.FullName,action.LocalHash.Value);
350
                            break;
351
                        case CloudActionType.DownloadUnconditional:
352
                            DownloadCloudFile("PITHOS",cloudFile.Name,downloadPath);
353
                            break;
354
                        case CloudActionType.Download:
355
                            if (File.Exists(downloadPath))
356
                            {
357
                                if (cloudFile.Hash !=action.LocalHash.Value)
358
                                {
359
                                    var lastLocalTime =localFile.LastWriteTime;
360
                                    var lastUpTime =cloudFile.Last_Modified;
361
                                    if (lastUpTime <=lastLocalTime)
362
                                    {
363
                                        //Files in conflict
364
                                        StatusKeeper.SetFileOverlayStatus(downloadPath,FileOverlayStatus.Conflict);
365
                                    }
366
                                    else
367
                                        DownloadCloudFile("PITHOS",action.CloudFile.Name,downloadPath);
368
                                }
369
                            }
370
                            else
371
                                DownloadCloudFile("PITHOS",action.CloudFile.Name,downloadPath);
372
                            break;
373
                    }
374
                    Trace.TraceInformation("[ACTION] End Processing {0}:{1}->{2}", action.Action, action.LocalFile, action.CloudFile);
375
                }
376
                catch (Exception exc)
377
                {
378
                    Trace.TraceError("[REQUEUE] {0} : {1} -> {2} due to exception\r\n{3}",
379
                                    action.Action, action.LocalFile,action.CloudFile,exc);
380
                    Trace.TraceError(exc.ToString());
381

    
382
                    _listenerActions.Add(action);
383
                }
384
            }
385
        }
386

    
387
        private void DownloadCloudFile(string container, string fileName, string localPath)
388
        {
389
            using (var upstream = CloudClient.GetObject(container, fileName))
390
            using (var fileStream = File.OpenWrite(localPath))
391
            {
392
                upstream.CopyTo(fileStream);
393
            }
394
        }
395

    
396
        private void StartMonitoringFiles(string path)
397
        {
398
            _watcher = new FileSystemWatcher(path);
399
            _watcher.Changed += OnFileEvent;
400
            _watcher.Created += OnFileEvent;
401
            _watcher.Deleted += OnFileEvent;
402
            _watcher.Renamed += OnRenameEvent;
403
            _watcher.EnableRaisingEvents = true;
404

    
405
            Task.Factory.StartNew(() =>
406
                                      {
407
                                          foreach (var state in _fileEvents.GetConsumingEnumerable())
408
                                          {
409
                                              try
410
                                              {
411
                                                  UpdateFileStatus(state);
412
                                                  UpdateOverlayStatus(state);
413
                                                  UpdateFileChecksum(state);
414
                                                  _uploadEvents.Add(state);                                                  
415
                                              }
416
                                              catch (OperationCanceledException)
417
                                              {
418
                                                  throw;
419
                                              }
420
                                              catch(Exception ex)
421
                                              {}
422
                                          }
423
                                          
424
                                      },_cancellationSource.Token);
425
        }
426

    
427
        private void StartSending()
428
        {
429
            Task.Factory.StartNew(() =>
430
                                      {
431
                                          foreach (var state in _uploadEvents.GetConsumingEnumerable())
432
                                          {
433
                                              try
434
                                              {
435
                                                  SynchToCloud(state);
436
                                              }
437
                                              catch (OperationCanceledException)
438
                                              {
439
                                                  throw;
440
                                              }
441
                                              catch(Exception ex)
442
                                              {}
443
                                          }
444
                                          
445
                                      },_cancellationSource.Token);
446
        }
447

    
448

    
449
        private WorkflowState SynchToCloud(WorkflowState state)
450
        {
451
            if (state.Skip)
452
                return state;
453
            string path = state.Path;
454
            string fileName = Path.GetFileName(path);
455

    
456
            switch(state.Status)
457
            {
458
                case FileStatus.Created:
459
                case FileStatus.Modified:
460
                    var info = new FileInfo(path);
461
                    long fileSize = info.Length;
462
                    UploadCloudFile(fileName, fileSize, path,state.Hash);
463
                    break;
464
                case FileStatus.Deleted:
465
                    DeleteCloudFile(fileName);
466
                    break;
467
                case FileStatus.Renamed:
468
                    RenameCloudFile(state);
469
                    break;
470
            }
471
            return state;
472
        }
473

    
474
        private void RenameCloudFile(WorkflowState state)
475
        {
476
            this.StatusKeeper.SetFileOverlayStatus(state.Path, FileOverlayStatus.Synch);
477

    
478

    
479

    
480
            CloudClient.MoveObject("PITHOS", state.OldFileName, state.FileName);
481

    
482
            this.StatusKeeper.SetFileStatus(state.Path, FileStatus.Unchanged);
483
            this.StatusKeeper.SetFileOverlayStatus(state.Path, FileOverlayStatus.Normal);
484
            Workflow.RaiseChangeNotification(state.Path);
485
        }
486

    
487
        private void DeleteCloudFile(string fileName)
488
        {
489
            Contract.Requires(!Path.IsPathRooted(fileName));
490

    
491
            this.StatusKeeper.SetFileOverlayStatus(fileName, FileOverlayStatus.Synch);
492
            CloudClient.DeleteObject("PITHOS", fileName);
493
            this.StatusKeeper.ClearFileStatus(fileName);
494
            this.StatusKeeper.RemoveFileOverlayStatus(fileName);            
495
        }
496

    
497
        private void UploadCloudFile(string fileName, long fileSize, string path,string hash)
498
        {
499
            Contract.Requires(!Path.IsPathRooted(fileName));
500
            //Even if GetObjectInfo times out, we can proceed with the upload
501
            var info=CloudClient.GetObjectInfo("PITHOS", fileName);
502
            if ( hash != info.Hash)
503
            {
504
                this.StatusKeeper.SetFileOverlayStatus(path, FileOverlayStatus.Synch);
505

    
506
                CloudClient.PutObject("PITHOS", fileName, path, hash).Wait();
507
            }
508
            this.StatusKeeper.SetFileStatus(path,FileStatus.Unchanged);
509
            this.StatusKeeper.SetFileOverlayStatus(path,FileOverlayStatus.Normal);
510
            Workflow.RaiseChangeNotification(path);
511
        }
512

    
513
        private Dictionary<WatcherChangeTypes, FileStatus> _statusDict = new Dictionary<WatcherChangeTypes, FileStatus>
514
        {
515
            {WatcherChangeTypes.Created,FileStatus.Created},
516
            {WatcherChangeTypes.Changed,FileStatus.Modified},
517
            {WatcherChangeTypes.Deleted,FileStatus.Deleted},
518
            {WatcherChangeTypes.Renamed,FileStatus.Renamed}
519
        };
520

    
521
        private WorkflowState UpdateFileStatus(WorkflowState  state)
522
        {
523
            string path = state.Path;
524
            FileStatus status = _statusDict[state.TriggeringChange];
525
            var oldStatus = Workflow.StatusKeeper.GetFileStatus(path);
526
            if (status == oldStatus)
527
            {
528
                state.Status = status;
529
                state.Skip = true;
530
                return state;
531
            }
532
            if (state.Status == FileStatus.Renamed)
533
                Workflow.ClearFileStatus(path);                
534

    
535
            state.Status = Workflow.SetFileStatus(path, status);                
536
            return state;
537
        }
538

    
539
        private WorkflowState UpdateOverlayStatus(WorkflowState state)
540
        {            
541
            if (state.Skip)
542
                return state;
543

    
544
            switch (state.Status)
545
            {
546
                case FileStatus.Created:
547
                case FileStatus.Modified:
548
                    this.StatusKeeper.SetFileOverlayStatus(state.Path, FileOverlayStatus.Modified);
549
                    break;
550
                case FileStatus.Deleted:
551
                    this.StatusKeeper.RemoveFileOverlayStatus(state.Path);
552
                    break;
553
                case FileStatus.Renamed:
554
                    this.StatusKeeper.RemoveFileOverlayStatus(state.OldPath);
555
                    this.StatusKeeper.SetFileOverlayStatus(state.Path, FileOverlayStatus.Modified);
556
                    break;
557
                case FileStatus.Unchanged:
558
                    this.StatusKeeper.SetFileOverlayStatus(state.Path, FileOverlayStatus.Normal);
559
                    break;
560
            }
561

    
562
            if (state.Status==FileStatus.Deleted)
563
                Workflow.RaiseChangeNotification(Path.GetDirectoryName(state.Path));
564
            else
565
                Workflow.RaiseChangeNotification(state.Path);
566
            return state;
567
        }
568

    
569

    
570
        private WorkflowState UpdateFileChecksum(WorkflowState state)
571
        {
572
            if (state.Skip)
573
                return state;
574

    
575
            if (state.Status == FileStatus.Deleted)
576
                return state;
577

    
578
            string path = state.Path;
579
            string hash = Signature.CalculateHash(path);
580

    
581
            StatusKeeper.UpdateFileChecksum(path, hash);
582

    
583
            state.Hash = hash;
584
            return state;
585
        }
586

    
587
       
588

    
589
        private FileSystemEventArgs CalculateSignature(FileSystemEventArgs arg)
590
        {
591
            Debug.WriteLine(String.Format("{0} {1} {2}", arg.ChangeType, arg.Name, arg.FullPath), "INFO");
592
            return arg;
593
        }
594

    
595
        void OnFileEvent(object sender, FileSystemEventArgs e)
596
        {
597
            _fileEvents.Add(new WorkflowState{Path=e.FullPath,FileName = e.Name,TriggeringChange=e.ChangeType});            
598
        }
599

    
600
        void OnRenameEvent(object sender, RenamedEventArgs e)
601
        {
602
            _fileEvents.Add(new WorkflowState { OldPath=e.OldFullPath,OldFileName=e.OldName,
603
                Path = e.FullPath, FileName = e.Name, TriggeringChange = e.ChangeType });
604
        }
605

    
606
        public void Stop()
607
        {
608
            if (_watcher != null)
609
            {
610
                _watcher.Changed -= OnFileEvent;
611
                _watcher.Created -= OnFileEvent;
612
                _watcher.Deleted -= OnFileEvent;
613
                _watcher.Renamed -= OnRenameEvent;
614
                _watcher.Dispose();
615
            }
616
            _watcher = null;
617
            _fileEvents.CompleteAdding();
618
            if (timer != null)
619
                timer.Dispose();
620
            timer = null;
621
            StopStatusService();
622
        }
623

    
624

    
625
        ~PithosMonitor()
626
        {
627
            Dispose(false);
628
        }
629

    
630
        public void Dispose()
631
        {
632
            Dispose(true);
633
            GC.SuppressFinalize(this);
634
        }
635

    
636
        protected virtual void Dispose(bool disposing)
637
        {
638
            if (disposing)
639
            {
640
                Stop();
641
            }
642
        }
643

    
644

    
645
    }
646
}