Added SQLite status storage
[pithos-ms-client] / trunk / Pithos.Core / PithosMonitor.cs
1 using System;
2 using System.Collections.Concurrent;
3 using System.Collections.Generic;
4 using System.ComponentModel.Composition;
5 using System.Diagnostics;
6 using System.Diagnostics.Contracts;
7 using System.IO;
8 using System.Linq;
9 using System.Net.NetworkInformation;
10 using System.Security.Cryptography;
11 using System.Text;
12 using System.Threading;
13 using System.Threading.Tasks;
14 using Microsoft.WindowsAPICodePack.Net;
15 using Pithos.Interfaces;
16 using System.ServiceModel;
17
18 namespace Pithos.Core
19 {
20     [Export(typeof(PithosMonitor))]
21     public class PithosMonitor:IDisposable
22     {
23         [Import]
24         public IPithosSettings Settings{get;set;}
25
26         [Import]
27         public IStatusKeeper StatusKeeper { get; set; }
28
29         [Import]
30         public IPithosWorkflow Workflow { get; set; }
31
32         [Import]
33         public ICloudClient CloudClient { get; set; }
34
35         [Import]
36         public ICloudClient CloudListeningClient { get; set; }
37
38         private ServiceHost _statusService { get; set; }
39
40         private FileSystemWatcher _watcher;
41
42         public bool Pause
43         {
44             get { return _watcher == null || !_watcher.EnableRaisingEvents; }
45             set
46             {
47                 if (_watcher!=null)
48                     _watcher.EnableRaisingEvents = !value;                
49             }
50         }
51
52
53         CancellationTokenSource _cancellationSource;
54
55         readonly BlockingCollection<WorkflowState> _fileEvents = new BlockingCollection<WorkflowState>();
56         readonly BlockingCollection<WorkflowState> _uploadEvents = new BlockingCollection<WorkflowState>();
57
58         
59
60         public void Start()
61         {
62
63             if (_cancellationSource != null)
64             {
65                 if (!_cancellationSource.IsCancellationRequested)
66                     return;
67             }
68             _cancellationSource = new CancellationTokenSource();
69
70             string path = Settings.PithosPath;
71             StartMonitoringFiles(path);
72
73             StartStatusService();
74
75             StartNetwork();
76         }
77
78         private void StartStatusService()
79         {
80             Uri baseAddress = new Uri("http://localhost:30000/pithos/statuscache");
81             string address = "net.pipe://localhost/pithos/statuscache";
82
83             // Create a ServiceHost for the CalculatorService type and provide the base address.
84             _statusService= new ServiceHost(typeof(StatusService), baseAddress);
85             
86             NetNamedPipeBinding binding = new NetNamedPipeBinding(NetNamedPipeSecurityMode.None);
87             _statusService.AddServiceEndpoint(typeof(IStatusService), binding, address);
88             _statusService.Open();
89         }
90
91         private void StopStatusService()
92         {
93             if (_statusService == null)
94                 return;
95
96             if (_statusService.State == CommunicationState.Faulted)
97                 _statusService.Abort();
98             else if (_statusService.State != CommunicationState.Closed)
99                 _statusService.Close();
100             _statusService = null;
101
102         }
103
104
105         private void StartNetwork()
106         {
107
108             bool connected = NetworkListManager.IsConnectedToInternet;
109             //If we are not connected retry later
110             if (!connected)
111             {
112                 Task.Factory.StartNewDelayed(10000, StartNetwork);
113                 return;
114             }
115
116             try
117             {
118                 CloudClient.Authenticate(Settings.UserName, Settings.ApiKey);
119
120                 StartListening();
121                 StartSending();
122             }
123             catch (Exception)
124             {
125                 //Faild to authenticate due to network or account error
126                 //Retry after a while
127                 Task.Factory.StartNewDelayed(10000, StartNetwork);
128             }
129         }
130
131         internal enum CloudActionType
132         {
133             Upload=0,
134             Download,
135             UploadUnconditional,
136             DownloadUnconditional,
137             DeleteLocal,
138             DeleteCloud
139         }
140
141         internal class ListenerAction
142         {
143             public CloudActionType Action { get; set; }
144             public FileInfo LocalFile { get; set; }
145             public ObjectInfo CloudFile { get; set; }
146
147             public Lazy<string> LocalHash { get; set; }
148
149             public ListenerAction(CloudActionType action, FileInfo localFile, ObjectInfo cloudFile)
150             {
151                 Action = action;
152                 LocalFile = localFile;
153                 CloudFile = cloudFile;
154                 LocalHash=new Lazy<string>(()=>CalculateHash(LocalFile.FullName),LazyThreadSafetyMode.ExecutionAndPublication);
155             }
156             
157         }
158
159         internal class LocalFileComparer:EqualityComparer<ListenerAction>
160         {
161             public override bool Equals(ListenerAction x, ListenerAction y)
162             {
163                 if (x.Action != y.Action)
164                     return false;
165                 if (x.LocalFile != null && y.LocalFile != null && !x.LocalFile.FullName.Equals(y.LocalFile.FullName))
166                     return false;
167                 if (x.CloudFile != null && y.CloudFile != null && !x.CloudFile.Hash.Equals(y.CloudFile.Hash))
168                     return false;
169                 if (x.CloudFile == null ^ y.CloudFile == null ||
170                     x.LocalFile == null ^ y.LocalFile == null)
171                     return false;
172                 return true;
173             }
174
175             public override int GetHashCode(ListenerAction obj)
176             {
177                 var hash1 = (obj.LocalFile == null) ? int.MaxValue : obj.LocalFile.FullName.GetHashCode();
178                 var hash2 = (obj.CloudFile == null) ? int.MaxValue : obj.CloudFile.Hash.GetHashCode();
179                 var hash3 = obj.Action.GetHashCode();
180                 return hash1 ^ hash2 & hash3;
181             }
182         }
183
184         private BlockingCollection<ListenerAction> _listenerActions=new BlockingCollection<ListenerAction>();
185
186         private Timer timer;
187
188         private void StartListening()
189         {
190             
191             Func<Task> listener = ()=>Task.Factory.StartNew(()=>CloudClient.ListObjects("PITHOS"))
192                 .ContinueWith(task =>
193                                   {
194                                       
195                                       var objects = task.Result;
196                                       if (objects.Count == 0)
197                                           return;
198
199                                       var pithosDir = new DirectoryInfo(Settings.PithosPath);
200                                       
201                                       var upFiles = from info in objects
202                                                     select info.Name;
203                                       
204                                       var onlyLocal = from localFile in pithosDir.EnumerateFiles()
205                                                       where !upFiles.Contains(localFile.Name) 
206                                                       select new ListenerAction(CloudActionType.UploadUnconditional, localFile,null);
207                                       
208                                       
209                                     
210
211                                       var localNames =pithosDir.EnumerateFiles().Select(info => info.Name);
212                                       var onlyRemote = from upFile in objects
213                                                        where !localNames.Contains(upFile.Name)
214                                                        select new ListenerAction(CloudActionType.DownloadUnconditional,null,upFile);
215
216
217                                       var existingObjects = from  upFile in objects
218                                                             join  localFile in pithosDir.EnumerateFiles()
219                                                             on upFile.Name equals localFile.Name 
220                                                        select new ListenerAction(CloudActionType.Download, localFile, upFile);
221
222                                       var uniques =
223                                           onlyLocal.Union(onlyRemote).Union(existingObjects)
224                                           .Except(_listenerActions,new LocalFileComparer());
225
226                                       _listenerActions.AddFromEnumerable(uniques, false);
227                                      
228                                  }
229                 );
230
231             Task.Factory.StartNew(() =>
232                                       {
233                                           foreach (var action in _listenerActions.GetConsumingEnumerable())
234                                           {
235                                               var localFile = action.LocalFile;
236                                               var cloudFile = action.CloudFile;
237                                               var downloadPath = (cloudFile==null)? String.Empty:Path.Combine(Settings.PithosPath,cloudFile.Name);
238                                               try
239                                               {
240                                                   switch (action.Action)
241                                                   {
242                                                       case CloudActionType.UploadUnconditional:
243
244                                                           UploadCloudFile(localFile.Name, localFile.Length,
245                                                                           localFile.FullName, action.LocalHash.Value);
246                                                           break;
247                                                       case CloudActionType.DownloadUnconditional:
248                                                           DownloadCloudFile("PITHOS", cloudFile.Name, downloadPath);
249                                                           break;
250                                                       case CloudActionType.Download:
251                                                           if (File.Exists(downloadPath))
252                                                           {
253                                                               if (cloudFile.Hash != action.LocalHash.Value)
254                                                               {
255                                                                   var lastLocalTime = localFile.LastWriteTime;
256                                                                   var lastUpTime = cloudFile.Last_Modified;
257                                                                   if (lastUpTime <= lastLocalTime)
258                                                                   {
259                                                                       //Files in conflict
260                                                                       StatusKeeper.SetFileOverlayStatus(downloadPath,
261                                                                                                         FileOverlayStatus
262                                                                                                             .Conflict);
263                                                                   }
264                                                                   else
265                                                                       DownloadCloudFile("PITHOS", action.CloudFile.Name,
266                                                                                         downloadPath);
267                                                               }
268                                                           }
269                                                           else
270                                                               DownloadCloudFile("PITHOS", action.CloudFile.Name,
271                                                                                 downloadPath);
272                                                           break;
273                                                   }
274                                               }
275                                               catch (Exception exc)
276                                               {
277                                                   Debug.WriteLine("Processing of {0}:{1}->{2} failed. Putting it back in the queue",action.Action,action.LocalFile,action.CloudFile);
278                                                   Debug.WriteLine(exc.ToString());
279                                                   _listenerActions.Add(action);
280                                               }
281                                           }
282                                       }
283                 );
284             
285             timer = new Timer(o => listener(), null, TimeSpan.Zero, TimeSpan.FromSeconds(10));
286             
287         }
288
289         private void DownloadCloudFile(string container, string fileName, string localPath)
290         {
291             using (var upstream = CloudClient.GetObject(container, fileName))
292             using (var fileStream = File.OpenWrite(localPath))
293             {
294                 upstream.CopyTo(fileStream);
295             }
296         }
297
298         private void StartMonitoringFiles(string path)
299         {
300             _watcher = new FileSystemWatcher(path);
301             _watcher.Changed += OnFileEvent;
302             _watcher.Created += OnFileEvent;
303             _watcher.Deleted += OnFileEvent;
304             _watcher.Renamed += OnRenameEvent;
305             _watcher.EnableRaisingEvents = true;
306
307             Task.Factory.StartNew(() =>
308                                       {
309                                           foreach (var state in _fileEvents.GetConsumingEnumerable())
310                                           {
311                                               try
312                                               {
313                                                   UpdateFileStatus(state);
314                                                   UpdateOverlayStatus(state);
315                                                   UpdateFileChecksum(state);
316                                                   _uploadEvents.Add(state);                                                  
317                                               }
318                                               catch (OperationCanceledException)
319                                               {
320                                                   throw;
321                                               }
322                                               catch(Exception ex)
323                                               {}
324                                           }
325                                           
326                                       },_cancellationSource.Token);
327         }
328
329         private void StartSending()
330         {
331             Task.Factory.StartNew(() =>
332                                       {
333                                           foreach (var state in _uploadEvents.GetConsumingEnumerable())
334                                           {
335                                               try
336                                               {
337                                                   SynchToCloud(state);
338                                               }
339                                               catch (OperationCanceledException)
340                                               {
341                                                   throw;
342                                               }
343                                               catch(Exception ex)
344                                               {}
345                                           }
346                                           
347                                       },_cancellationSource.Token);
348         }
349
350
351         private WorkflowState SynchToCloud(WorkflowState state)
352         {
353             if (state.Skip)
354                 return state;
355             string path = state.Path;
356             string fileName = Path.GetFileName(path);
357
358             switch(state.Status)
359             {
360                 case FileStatus.Created:
361                 case FileStatus.Modified:
362                     var info = new FileInfo(path);
363                     long fileSize = info.Length;
364                     UploadCloudFile(fileName, fileSize, path,state.Hash);
365                     break;
366                 case FileStatus.Deleted:
367                     DeleteCloudFile(fileName);
368                     break;
369                 case FileStatus.Renamed:
370                     RenameCloudFile(state);
371                     break;
372             }
373             return state;
374         }
375
376         private void RenameCloudFile(WorkflowState state)
377         {
378             this.StatusKeeper.SetFileOverlayStatus(state.Path, FileOverlayStatus.Synch);
379
380
381
382             CloudClient.MoveObject("PITHOS", state.OldFileName, state.FileName);
383
384             this.StatusKeeper.SetFileStatus(state.Path, FileStatus.Unchanged);
385             this.StatusKeeper.SetFileOverlayStatus(state.Path, FileOverlayStatus.Normal);
386             Workflow.RaiseChangeNotification(state.Path);
387         }
388
389         private void DeleteCloudFile(string fileName)
390         {
391             Contract.Requires(!Path.IsPathRooted(fileName));
392
393             this.StatusKeeper.SetFileOverlayStatus(fileName, FileOverlayStatus.Synch);
394             CloudClient.DeleteObject("PITHOS", fileName);
395             this.StatusKeeper.ClearFileStatus(fileName);
396             this.StatusKeeper.RemoveFileOverlayStatus(fileName);            
397         }
398
399         private void UploadCloudFile(string fileName, long fileSize, string path,string hash)
400         {
401             Contract.Requires(!Path.IsPathRooted(fileName));
402             //Even if GetObjectInfo times out, we can proceed with the upload
403             var info=CloudClient.GetObjectInfo("PITHOS", fileName);
404             if ( hash != info.Hash)
405             {
406                 this.StatusKeeper.SetFileOverlayStatus(path, FileOverlayStatus.Synch);
407                 
408                     CloudClient.PutObject("PITHOS", fileName, path);
409                 
410             }
411             this.StatusKeeper.SetFileStatus(path,FileStatus.Unchanged);
412             this.StatusKeeper.SetFileOverlayStatus(path,FileOverlayStatus.Normal);
413             Workflow.RaiseChangeNotification(path);
414         }
415
416         private Dictionary<WatcherChangeTypes, FileStatus> _statusDict = new Dictionary<WatcherChangeTypes, FileStatus>
417         {
418             {WatcherChangeTypes.Created,FileStatus.Created},
419             {WatcherChangeTypes.Changed,FileStatus.Modified},
420             {WatcherChangeTypes.Deleted,FileStatus.Deleted},
421             {WatcherChangeTypes.Renamed,FileStatus.Renamed}
422         };
423
424         private WorkflowState UpdateFileStatus(WorkflowState  state)
425         {
426             string path = state.Path;
427             FileStatus status = _statusDict[state.TriggeringChange];
428             var oldStatus = Workflow.StatusKeeper.GetFileStatus(path);
429             if (status == oldStatus)
430             {
431                 state.Status = status;
432                 state.Skip = true;
433                 return state;
434             }
435             if (state.Status == FileStatus.Renamed)
436                 Workflow.ClearFileStatus(path);                
437
438             state.Status = Workflow.SetFileStatus(path, status);                
439             return state;
440         }
441
442         private WorkflowState UpdateOverlayStatus(WorkflowState state)
443         {            
444             if (state.Skip)
445                 return state;
446
447             switch (state.Status)
448             {
449                 case FileStatus.Created:
450                 case FileStatus.Modified:
451                     this.StatusKeeper.SetFileOverlayStatus(state.Path, FileOverlayStatus.Modified);
452                     break;
453                 case FileStatus.Deleted:
454                     this.StatusKeeper.RemoveFileOverlayStatus(state.Path);
455                     break;
456                 case FileStatus.Renamed:
457                     this.StatusKeeper.RemoveFileOverlayStatus(state.OldPath);
458                     this.StatusKeeper.SetFileOverlayStatus(state.Path, FileOverlayStatus.Modified);
459                     break;
460                 case FileStatus.Unchanged:
461                     this.StatusKeeper.SetFileOverlayStatus(state.Path, FileOverlayStatus.Normal);
462                     break;
463             }
464
465             if (state.Status==FileStatus.Deleted)
466                 Workflow.RaiseChangeNotification(Path.GetDirectoryName(state.Path));
467             else
468                 Workflow.RaiseChangeNotification(state.Path);
469             return state;
470         }
471
472
473         private WorkflowState UpdateFileChecksum(WorkflowState state)
474         {
475             if (state.Skip)
476                 return state;
477
478             if (state.Status == FileStatus.Deleted)
479                 return state;
480
481             string path = state.Path;
482             string hash = CalculateHash(path);
483
484             StatusKeeper.UpdateFileChecksum(path, hash);
485
486             state.Hash = hash;
487             return state;
488         }
489
490         private static string CalculateHash(string path)
491         {
492             string hash;
493             using (var hasher = MD5.Create())
494             using (var stream = new FileStream(path, FileMode.Open, FileAccess.Read, FileShare.ReadWrite, 4096, true))
495             {
496                 var hashBytes = hasher.ComputeHash(stream);
497                 var hashBuilder = new StringBuilder();
498                 foreach (byte b in hasher.ComputeHash(stream))
499                     hashBuilder.Append(b.ToString("x2").ToLower());
500                 hash = hashBuilder.ToString();
501
502             }
503             return hash;
504         }
505
506         private FileSystemEventArgs CalculateSignature(FileSystemEventArgs arg)
507         {
508             Debug.WriteLine(String.Format("{0} {1} {2}", arg.ChangeType, arg.Name, arg.FullPath), "INFO");
509             return arg;
510         }
511
512         void OnFileEvent(object sender, FileSystemEventArgs e)
513         {
514             _fileEvents.Add(new WorkflowState{Path=e.FullPath,FileName = e.Name,TriggeringChange=e.ChangeType});            
515         }
516
517         void OnRenameEvent(object sender, RenamedEventArgs e)
518         {
519             _fileEvents.Add(new WorkflowState { OldPath=e.OldFullPath,OldFileName=e.OldName,
520                 Path = e.FullPath, FileName = e.Name, TriggeringChange = e.ChangeType });
521         }
522
523         public void Stop()
524         {
525             if (_watcher != null)
526             {
527                 _watcher.Changed -= OnFileEvent;
528                 _watcher.Created -= OnFileEvent;
529                 _watcher.Deleted -= OnFileEvent;
530                 _watcher.Renamed -= OnRenameEvent;
531                 _watcher.Dispose();
532             }
533             _watcher = null;
534             _fileEvents.CompleteAdding();
535             if (timer != null)
536                 timer.Dispose();
537             timer = null;
538             StopStatusService();
539         }
540
541
542         ~PithosMonitor()
543         {
544             Dispose(false);
545         }
546
547         public void Dispose()
548         {
549             Dispose(true);
550             GC.SuppressFinalize(this);
551         }
552
553         protected virtual void Dispose(bool disposing)
554         {
555             if (disposing)
556             {
557                 Stop();
558             }
559         }
560
561
562     }
563 }