Extracted Authentication URL to application settings
[pithos-ms-client] / trunk / Pithos.Core / PithosMonitor.cs
1 using System;
2 using System.Collections.Concurrent;
3 using System.Collections.Generic;
4 using System.ComponentModel.Composition;
5 using System.Diagnostics;
6 using System.Diagnostics.Contracts;
7 using System.IO;
8 using System.Linq;
9 using System.Net.NetworkInformation;
10 using System.Security.Cryptography;
11 using System.ServiceModel.Description;
12 using System.Text;
13 using System.Threading;
14 using System.Threading.Tasks;
15 using Castle.ActiveRecord.Queries;
16 using Microsoft.WindowsAPICodePack.Net;
17 using Pithos.Interfaces;
18 using System.ServiceModel;
19
20 namespace Pithos.Core
21 {
22     [Export(typeof(PithosMonitor))]
23     public class PithosMonitor:IDisposable
24     {
25         private const string PithosContainer = "pithos";
26         private const string TrashContainer = "trash";
27
28         [Import]
29         public IPithosSettings Settings{get;set;}
30
31         [Import]
32         public IStatusKeeper StatusKeeper { get; set; }
33
34         [Import]
35         public IPithosWorkflow Workflow { get; set; }
36
37         [Import]
38         public ICloudClient CloudClient { get; set; }
39
40         [Import]
41         public ICloudClient CloudListeningClient { get; set; }
42
43         public string UserName { get; set; }
44         public string ApiKey { get; set; }
45
46         private ServiceHost _statusService { get; set; }
47
48         private FileSystemWatcher _watcher;
49
50         public bool Pause
51         {
52             get { return _watcher == null || !_watcher.EnableRaisingEvents; }
53             set
54             {
55                 if (_watcher!=null)
56                     _watcher.EnableRaisingEvents = !value;
57                 if (value)
58                 {
59                     StatusKeeper.SetPithosStatus(PithosStatus.SyncPaused);
60                 }
61                 else
62                 {
63                     StatusKeeper.SetPithosStatus(PithosStatus.InSynch);
64                 }
65             }
66         }
67
68         public string RootPath { get; set; }
69
70
71         CancellationTokenSource _cancellationSource;
72
73         readonly BlockingCollection<WorkflowState> _fileEvents = new BlockingCollection<WorkflowState>();
74         readonly BlockingCollection<WorkflowState> _uploadEvents = new BlockingCollection<WorkflowState>();
75
76         
77
78         public void Start()
79         {
80
81             if (_cancellationSource != null)
82             {
83                 if (!_cancellationSource.IsCancellationRequested)
84                     return;
85             }
86             _cancellationSource = new CancellationTokenSource();
87
88             var proxyUri = ProxyFromSettings();            
89             CloudClient.Proxy = proxyUri;
90             CloudClient.UsePithos = this.UsePithos;
91             EnsurePithosContainers();
92             StatusKeeper.StartProcessing(_cancellationSource.Token);
93             IndexLocalFiles(RootPath);
94             StartMonitoringFiles(RootPath);
95
96             StartStatusService();
97
98             StartNetwork();
99         }
100
101         private void EnsurePithosContainers()
102         {
103             CloudClient.UsePithos = this.UsePithos;
104             CloudClient.AuthenticationUrl = this.AuthenticationUrl;
105             CloudClient.Authenticate(UserName, ApiKey);
106
107             var pithosContainers = new[] {PithosContainer, TrashContainer};
108             foreach (var container in pithosContainers)
109             {
110                 if (!CloudClient.ContainerExists(container))
111                     CloudClient.CreateContainer(container);                
112             }
113         }
114
115         public string AuthenticationUrl { get; set; }
116
117         private Uri ProxyFromSettings()
118         {            
119             if (Settings.UseManualProxy)
120             {
121                 var proxyUri = new UriBuilder
122                                    {
123                                        Host = Settings.ProxyServer, 
124                                        Port = Settings.ProxyPort
125                                    };
126                 if (Settings.ProxyAuthentication)
127                 {
128                     proxyUri.UserName = Settings.ProxyUsername;
129                     proxyUri.Password = Settings.ProxyPassword;
130                 }
131                 return proxyUri.Uri;
132             }
133             return null;
134         }
135
136         private void IndexLocalFiles(string path)
137         {
138             Trace.TraceInformation("[START] Inxed Local");
139             try
140             {
141                 var files =
142                     from filePath in Directory.EnumerateFiles(path, "*", SearchOption.AllDirectories).AsParallel()
143                     select filePath;
144                 StatusKeeper.StoreUnversionedFiles(files);
145
146                 RestartInterruptedFiles();
147             }
148             catch (Exception exc)
149             {
150                 Trace.TraceError("[ERROR] Index Local - {0}", exc);
151             }
152             finally
153             {
154                 Trace.TraceInformation("[END] Inxed Local");
155             }
156         }
157
158         private void RestartInterruptedFiles()
159         {
160             var interruptedStates = new[] { FileOverlayStatus.Unversioned, FileOverlayStatus.Modified };
161             var filesQuery = from state in FileState.Queryable
162                              where interruptedStates.Contains(state.OverlayStatus)
163                              select new WorkflowState
164                                       {
165                                           Path = state.FilePath.ToLower(),
166                                           FileName = Path.GetFileName(state.FilePath).ToLower(),
167                                           Status=state.OverlayStatus==FileOverlayStatus.Unversioned?
168                                                             FileStatus.Created:
169                                                             FileStatus.Modified,
170                                           TriggeringChange = state.OverlayStatus==FileOverlayStatus.Unversioned?
171                                                             WatcherChangeTypes.Created:
172                                                             WatcherChangeTypes.Changed
173                                       };
174             _uploadEvents.AddFromEnumerable(filesQuery,false);           
175             
176         }
177
178         private void StartStatusService()
179         {
180             // Create a ServiceHost for the CalculatorService type and provide the base address.
181             var baseAddress = new Uri("net.pipe://localhost/pithos");
182             _statusService = new ServiceHost(typeof(StatusService), baseAddress);
183             
184             var binding = new NetNamedPipeBinding(NetNamedPipeSecurityMode.None);
185             
186             _statusService.AddServiceEndpoint(typeof(IStatusService), binding, "net.pipe://localhost/pithos/statuscache");
187             _statusService.AddServiceEndpoint(typeof (ISettingsService), binding, "net.pipe://localhost/pithos/settings");
188
189
190             //// Add a mex endpoint
191             var smb = new ServiceMetadataBehavior
192                           {
193                               HttpGetEnabled = true, 
194                               HttpGetUrl = new Uri("http://localhost:30000/pithos/mex")
195                           };
196             _statusService.Description.Behaviors.Add(smb);
197
198
199             _statusService.Open();
200         }
201
202         private void StopStatusService()
203         {
204             if (_statusService == null)
205                 return;
206
207             if (_statusService.State == CommunicationState.Faulted)
208                 _statusService.Abort();
209             else if (_statusService.State != CommunicationState.Closed)
210                 _statusService.Close();
211             _statusService = null;
212
213         }
214
215
216         private void StartNetwork()
217         {
218
219             bool connected = NetworkListManager.IsConnectedToInternet;
220             //If we are not connected retry later
221             if (!connected)
222             {
223                 Task.Factory.StartNewDelayed(10000, StartNetwork);
224                 return;
225             }
226
227             try
228             {
229                 CloudClient.UsePithos = this.UsePithos;
230                 CloudClient.AuthenticationUrl = this.AuthenticationUrl;
231                 CloudClient.Authenticate(UserName, ApiKey);
232
233                 StartListening(RootPath);
234                 StartSending();
235             }
236             catch (Exception)
237             {
238                 //Faild to authenticate due to network or account error
239                 //Retry after a while
240                 Task.Factory.StartNewDelayed(10000, StartNetwork);
241             }
242         }
243
244         public bool UsePithos { get; set; }
245
246         internal enum CloudActionType
247         {
248             Upload=0,
249             Download,
250             UploadUnconditional,
251             DownloadUnconditional,
252             DeleteLocal,
253             DeleteCloud
254         }
255
256         internal class ListenerAction
257         {
258             public CloudActionType Action { get; set; }
259             public FileInfo LocalFile { get; set; }
260             public ObjectInfo CloudFile { get; set; }
261
262             public Lazy<string> LocalHash { get; set; }
263
264             public ListenerAction(CloudActionType action, FileInfo localFile, ObjectInfo cloudFile)
265             {
266                 Action = action;
267                 LocalFile = localFile;
268                 CloudFile = cloudFile;
269                 LocalHash=new Lazy<string>(()=>Signature.CalculateHash(LocalFile.FullName),LazyThreadSafetyMode.ExecutionAndPublication);
270             }
271             
272         }
273
274         internal class LocalFileComparer:EqualityComparer<ListenerAction>
275         {
276             public override bool Equals(ListenerAction x, ListenerAction y)
277             {
278                 if (x.Action != y.Action)
279                     return false;
280                 if (x.LocalFile != null && y.LocalFile != null && !x.LocalFile.FullName.Equals(y.LocalFile.FullName))
281                     return false;
282                 if (x.CloudFile != null && y.CloudFile != null && !x.CloudFile.Hash.Equals(y.CloudFile.Hash))
283                     return false;
284                 if (x.CloudFile == null ^ y.CloudFile == null ||
285                     x.LocalFile == null ^ y.LocalFile == null)
286                     return false;
287                 return true;
288             }
289
290             public override int GetHashCode(ListenerAction obj)
291             {
292                 var hash1 = (obj.LocalFile == null) ? int.MaxValue : obj.LocalFile.FullName.GetHashCode();
293                 var hash2 = (obj.CloudFile == null) ? int.MaxValue : obj.CloudFile.Hash.GetHashCode();
294                 var hash3 = obj.Action.GetHashCode();
295                 return hash1 ^ hash2 & hash3;
296             }
297         }
298
299         private BlockingCollection<ListenerAction> _networkActions=new BlockingCollection<ListenerAction>();
300
301         private Timer timer;
302
303         private void StartListening(string accountPath)
304         {
305             
306             ProcessRemoteFiles(accountPath);
307
308             Task.Factory.StartNew(ProcessListenerActions);
309                         
310         }
311
312         private Task ProcessRemoteFiles(string accountPath)
313         {
314             Trace.TraceInformation("[LISTENER] Scheduled");    
315             return Task.Factory.StartNewDelayed(10000)
316                 .ContinueWith(t=>CloudClient.ListObjects(PithosContainer))
317                 .ContinueWith(task =>
318                                   {
319                                       Trace.TraceInformation("[LISTENER] Start Processing");
320
321                                       var remoteObjects = task.Result;
322 /*
323                                       if (remoteObjects.Count == 0)
324                                           return;
325 */
326
327                                       var pithosDir = new DirectoryInfo(accountPath);
328                                       
329                                       var remoteFiles = from info in remoteObjects
330                                                     select info.Name.ToLower();
331                                       
332                                       var onlyLocal = from localFile in pithosDir.EnumerateFiles()
333                                                       where !remoteFiles.Contains(localFile.Name.ToLower()) 
334                                                       select new ListenerAction(CloudActionType.UploadUnconditional, localFile,null);
335
336
337
338                                       var localNames = from info in pithosDir.EnumerateFiles()
339                                                            select info.Name.ToLower();
340
341                                       var onlyRemote = from upFile in remoteObjects
342                                                        where !localNames.Contains(upFile.Name.ToLower())
343                                                        select new ListenerAction(CloudActionType.DownloadUnconditional,null,upFile);
344
345
346                                       var commonObjects = from  upFile in remoteObjects
347                                                             join  localFile in pithosDir.EnumerateFiles()
348                                                                 on upFile.Name.ToLower() equals localFile.Name.ToLower() 
349                                                             select new ListenerAction(CloudActionType.Download, localFile, upFile);
350
351                                       var uniques =
352                                           onlyLocal.Union(onlyRemote).Union(commonObjects)
353                                               .Except(_networkActions,new LocalFileComparer());
354                                       
355                                       _networkActions.AddFromEnumerable(uniques, false);
356
357                                       Trace.TraceInformation("[LISTENER] End Processing");
358                                       
359                                   }
360                 ).ContinueWith(t=>
361                 {
362                     if (t.IsFaulted)
363                     {
364                         Trace.TraceError("[LISTENER] Exception: {0}",t.Exception);                                           
365                     }
366                     else
367                     {
368                         Trace.TraceInformation("[LISTENER] Finished");                                           
369                     }                    
370                     ProcessRemoteFiles(accountPath);
371                 });
372         }
373
374         private void ProcessListenerActions()
375         {
376             foreach(var action in _networkActions.GetConsumingEnumerable())
377             {
378                 Trace.TraceInformation("[ACTION] Start Processing {0}:{1}->{2}",action.Action,action.LocalFile,action.CloudFile.Name);
379                 var localFile = action.LocalFile;
380                 var cloudFile = action.CloudFile;
381                 var downloadPath = (cloudFile == null)? String.Empty
382                                         : Path.Combine(RootPath,cloudFile.Name);
383                 try
384                 {
385                     switch (action.Action)
386                     {
387                         case CloudActionType.UploadUnconditional:
388
389                             UploadCloudFile(localFile.Name,localFile.Length,localFile.FullName,action.LocalHash.Value);
390                             break;
391                         case CloudActionType.DownloadUnconditional:
392                             DownloadCloudFile(PithosContainer,cloudFile.Name,downloadPath);
393                             break;
394                         case CloudActionType.Download:
395                             if (File.Exists(downloadPath))
396                             {
397                                 if (cloudFile.Hash !=action.LocalHash.Value)
398                                 {
399                                     var lastLocalTime =localFile.LastWriteTime;
400                                     var lastUpTime =cloudFile.Last_Modified;
401                                     if (lastUpTime <=lastLocalTime)
402                                     {
403                                         //Files in conflict
404                                         StatusKeeper.SetFileOverlayStatus(downloadPath,FileOverlayStatus.Conflict);
405                                     }
406                                     else
407                                         DownloadCloudFile(PithosContainer,action.CloudFile.Name,downloadPath);
408                                 }
409                             }
410                             else
411                                 DownloadCloudFile(PithosContainer,action.CloudFile.Name,downloadPath);
412                             break;
413                     }
414                     Trace.TraceInformation("[ACTION] End Processing {0}:{1}->{2}", action.Action, action.LocalFile, action.CloudFile.Name);
415                 }
416                 catch (Exception exc)
417                 {
418                     Trace.TraceError("[REQUEUE] {0} : {1} -> {2} due to exception\r\n{3}",
419                                     action.Action, action.LocalFile,action.CloudFile,exc);                    
420
421                     _networkActions.Add(action);
422                 }
423             }
424         }
425
426       
427
428         private void StartMonitoringFiles(string path)
429         {
430             _watcher = new FileSystemWatcher(path);
431             _watcher.Changed += OnFileEvent;
432             _watcher.Created += OnFileEvent;
433             _watcher.Deleted += OnFileEvent;
434             _watcher.Renamed += OnRenameEvent;
435             _watcher.EnableRaisingEvents = true;
436
437             Task.Factory.StartNew(ProcesFileEvents,_cancellationSource.Token);
438         }
439
440         private void ProcesFileEvents()
441         {
442             foreach (var state in _fileEvents.GetConsumingEnumerable())
443             {
444                 try
445                 {
446                     var networkState=StatusKeeper.GetNetworkState(state.Path);
447                     //Skip if the file is already being downloaded or uploaded and 
448                     //the change is create or modify
449                     if (networkState != NetworkState.None && 
450                         (
451                             state.TriggeringChange==WatcherChangeTypes.Created ||
452                             state.TriggeringChange==WatcherChangeTypes.Changed
453                         ))
454                         continue;
455                     UpdateFileStatus(state);
456                     UpdateOverlayStatus(state);
457                     UpdateFileChecksum(state);
458                     _uploadEvents.Add(state);
459                 }
460                 catch (OperationCanceledException exc)
461                 {
462                     Trace.TraceError("[ERROR] File Event Processing:\r{0}", exc);
463                     throw;
464                 }
465                 catch (Exception exc)
466                 {
467                     Trace.TraceError("[ERROR] File Event Processing:\r{0}",exc);
468                 }
469             }
470         }
471
472         private void StartSending()
473         {
474             Task.Factory.StartNew(() =>
475                                       {
476                                           foreach (var state in _uploadEvents.GetConsumingEnumerable())
477                                           {
478                                               try
479                                               {
480                                                   SynchToCloud(state);
481                                               }
482                                               catch (OperationCanceledException)
483                                               {
484                                                   throw;
485                                               }
486                                               catch(Exception ex)
487                                               {
488                                                   Trace.TraceError("[ERROR] Synch for {0}:\r{1}",state.FileName,ex);
489                                               }
490                                           }
491                                           
492                                       },_cancellationSource.Token);
493         }
494
495
496         private WorkflowState SynchToCloud(WorkflowState state)
497         {
498             if (state.Skip)
499                 return state;
500             string path = state.Path.ToLower();            
501             string fileName = Path.GetFileName(path);
502
503             //Bypass deleted files, unless the status is Deleted
504             if (!(File.Exists(path) || state.Status == FileStatus.Deleted))
505             {
506                 state.Skip = true;
507                 this.StatusKeeper.RemoveFileOverlayStatus(path);
508                 return state;
509             }
510
511             switch(state.Status)
512             {
513                 case FileStatus.Created:
514                 case FileStatus.Modified:
515                     var info = new FileInfo(path);
516                     long fileSize = info.Length;
517                     UploadCloudFile(fileName, fileSize, path,state.Hash);
518                     break;
519                 case FileStatus.Deleted:
520                     DeleteCloudFile(fileName);
521                     break;
522                 case FileStatus.Renamed:
523                     RenameCloudFile(state);
524                     break;
525             }
526             return state;
527         }
528
529         private void RenameCloudFile(WorkflowState state)
530         {
531             this.StatusKeeper.SetFileOverlayStatus(state.Path, FileOverlayStatus.Modified);
532
533
534
535             CloudClient.MoveObject(PithosContainer, state.OldFileName,PithosContainer, state.FileName);
536
537             this.StatusKeeper.SetFileStatus(state.Path, FileStatus.Unchanged);
538             this.StatusKeeper.SetFileOverlayStatus(state.Path, FileOverlayStatus.Normal);
539             Workflow.RaiseChangeNotification(state.Path);
540         }
541
542         private void DeleteCloudFile(string fileName)
543         {
544             Contract.Requires(!Path.IsPathRooted(fileName));
545
546             this.StatusKeeper.SetFileOverlayStatus(fileName, FileOverlayStatus.Modified);
547
548             CloudClient.MoveObject(PithosContainer,fileName,TrashContainer,fileName);
549             this.StatusKeeper.ClearFileStatus(fileName);
550             this.StatusKeeper.RemoveFileOverlayStatus(fileName);            
551         }
552
553         private void DownloadCloudFile(string container, string fileName, string localPath)
554         {
555             StatusKeeper.SetNetworkState(localPath,NetworkState.Downloading);
556             CloudClient.GetObject(container, fileName, localPath)
557             .ContinueWith(t=>
558                 CloudClient.GetObjectInfo(container,fileName))
559             .ContinueWith(t=>
560                 StatusKeeper.StoreInfo(fileName,t.Result))
561             .ContinueWith(t=>
562                 StatusKeeper.SetNetworkState(localPath,NetworkState.None))
563             .Wait();
564         }
565
566         private void UploadCloudFile(string fileName, long fileSize, string path,string hash)
567         {
568             Contract.Requires(!Path.IsPathRooted(fileName));
569
570             StatusKeeper.SetNetworkState(fileName,NetworkState.Uploading);
571             
572             //Even if GetObjectInfo times out, we can proceed with the upload            
573             var info = CloudClient.GetObjectInfo(PithosContainer, fileName);
574             Task.Factory.StartNew(() =>
575             {
576                 if (hash != info.Hash)
577                 {
578                     Task.Factory.StartNew(() => 
579                         this.StatusKeeper.SetFileOverlayStatus(path, FileOverlayStatus.Modified))
580                     .ContinueWith(t => 
581                         CloudClient.PutObject(PithosContainer, fileName, path, hash));
582                 }
583                 else
584                 {
585                     this.StatusKeeper.StoreInfo(path,info);
586                 }
587             }
588             )
589             .ContinueWith(t => 
590                 this.StatusKeeper.SetFileState(path, FileStatus.Unchanged, FileOverlayStatus.Normal))
591                 .ContinueWith(t=>
592                     this.StatusKeeper.SetNetworkState(path,NetworkState.None))
593             .Wait();
594             Workflow.RaiseChangeNotification(path);
595         }
596
597         private Dictionary<WatcherChangeTypes, FileStatus> _statusDict = new Dictionary<WatcherChangeTypes, FileStatus>
598         {
599             {WatcherChangeTypes.Created,FileStatus.Created},
600             {WatcherChangeTypes.Changed,FileStatus.Modified},
601             {WatcherChangeTypes.Deleted,FileStatus.Deleted},
602             {WatcherChangeTypes.Renamed,FileStatus.Renamed}
603         };
604
605         private WorkflowState UpdateFileStatus(WorkflowState  state)
606         {
607             string path = state.Path;
608             FileStatus status = _statusDict[state.TriggeringChange];
609             var oldStatus = Workflow.StatusKeeper.GetFileStatus(path);
610             if (status == oldStatus)
611             {
612                 state.Status = status;
613                 state.Skip = true;
614                 return state;
615             }
616             if (state.Status == FileStatus.Renamed)
617                 Workflow.ClearFileStatus(path);                
618
619             state.Status = Workflow.SetFileStatus(path, status);                
620             return state;
621         }
622
623         private WorkflowState UpdateOverlayStatus(WorkflowState state)
624         {            
625             if (state.Skip)
626                 return state;
627
628             switch (state.Status)
629             {
630                 case FileStatus.Created:
631                 case FileStatus.Modified:
632                     this.StatusKeeper.SetFileOverlayStatus(state.Path, FileOverlayStatus.Modified);
633                     break;
634                 case FileStatus.Deleted:
635                     this.StatusKeeper.RemoveFileOverlayStatus(state.Path);
636                     break;
637                 case FileStatus.Renamed:
638                     this.StatusKeeper.RemoveFileOverlayStatus(state.OldPath);
639                     this.StatusKeeper.SetFileOverlayStatus(state.Path, FileOverlayStatus.Modified);
640                     break;
641                 case FileStatus.Unchanged:
642                     this.StatusKeeper.SetFileOverlayStatus(state.Path, FileOverlayStatus.Normal);
643                     break;
644             }
645
646             if (state.Status==FileStatus.Deleted)
647                 Workflow.RaiseChangeNotification(Path.GetDirectoryName(state.Path));
648             else
649                 Workflow.RaiseChangeNotification(state.Path);
650             return state;
651         }
652
653
654         private WorkflowState UpdateFileChecksum(WorkflowState state)
655         {
656             if (state.Skip)
657                 return state;
658
659             if (state.Status == FileStatus.Deleted)
660                 return state;
661
662             string path = state.Path;
663             string hash = Signature.CalculateHash(path);
664
665             StatusKeeper.UpdateFileChecksum(path, hash);
666
667             state.Hash = hash;
668             return state;
669         }
670
671        
672
673         private FileSystemEventArgs CalculateSignature(FileSystemEventArgs arg)
674         {
675             Debug.WriteLine(String.Format("{0} {1} {2}", arg.ChangeType, arg.Name, arg.FullPath), "INFO");
676             return arg;
677         }
678
679         void OnFileEvent(object sender, FileSystemEventArgs e)
680         {
681             _fileEvents.Add(new WorkflowState{Path=e.FullPath,FileName = e.Name,TriggeringChange=e.ChangeType});            
682         }
683
684         void OnRenameEvent(object sender, RenamedEventArgs e)
685         {
686             _fileEvents.Add(new WorkflowState { OldPath=e.OldFullPath,OldFileName=e.OldName,
687                 Path = e.FullPath, FileName = e.Name, TriggeringChange = e.ChangeType });
688         }
689
690         public void Stop()
691         {
692             if (_watcher != null)
693             {
694                 _watcher.Changed -= OnFileEvent;
695                 _watcher.Created -= OnFileEvent;
696                 _watcher.Deleted -= OnFileEvent;
697                 _watcher.Renamed -= OnRenameEvent;
698                 _watcher.Dispose();
699             }
700             _watcher = null;
701             _fileEvents.CompleteAdding();
702             if (timer != null)
703                 timer.Dispose();
704             timer = null;
705             StopStatusService();
706         }
707
708
709         ~PithosMonitor()
710         {
711             Dispose(false);
712         }
713
714         public void Dispose()
715         {
716             Dispose(true);
717             GC.SuppressFinalize(this);
718         }
719
720         protected virtual void Dispose(bool disposing)
721         {
722             if (disposing)
723             {
724                 Stop();
725             }
726         }
727
728
729     }
730 }