2 using System.Collections.Concurrent;
3 using System.Collections.Generic;
4 using System.ComponentModel.Composition;
5 using System.Diagnostics;
6 using System.Diagnostics.Contracts;
9 using System.Net.NetworkInformation;
10 using System.Security.Cryptography;
11 using System.ServiceModel.Description;
13 using System.Threading;
14 using System.Threading.Tasks;
15 using Castle.ActiveRecord.Queries;
16 using Microsoft.WindowsAPICodePack.Net;
17 using Pithos.Interfaces;
18 using System.ServiceModel;
22 [Export(typeof(PithosMonitor))]
23 public class PithosMonitor:IDisposable
25 private const string PithosContainer = "pithos";
26 private const string TrashContainer = "trash";
29 public IPithosSettings Settings{get;set;}
32 public IStatusKeeper StatusKeeper { get; set; }
35 public IPithosWorkflow Workflow { get; set; }
38 public ICloudClient CloudClient { get; set; }
41 public ICloudClient CloudListeningClient { get; set; }
43 public string UserName { get; set; }
44 public string ApiKey { get; set; }
46 private ServiceHost _statusService { get; set; }
48 private FileSystemWatcher _watcher;
52 get { return _watcher == null || !_watcher.EnableRaisingEvents; }
56 _watcher.EnableRaisingEvents = !value;
59 StatusKeeper.SetPithosStatus(PithosStatus.SyncPaused);
63 StatusKeeper.SetPithosStatus(PithosStatus.InSynch);
68 public string RootPath { get; set; }
71 CancellationTokenSource _cancellationSource;
73 readonly BlockingCollection<WorkflowState> _fileEvents = new BlockingCollection<WorkflowState>();
74 readonly BlockingCollection<WorkflowState> _uploadEvents = new BlockingCollection<WorkflowState>();
81 if (_cancellationSource != null)
83 if (!_cancellationSource.IsCancellationRequested)
86 _cancellationSource = new CancellationTokenSource();
88 var proxyUri = ProxyFromSettings();
89 CloudClient.Proxy = proxyUri;
90 CloudClient.UsePithos = this.UsePithos;
91 EnsurePithosContainers();
92 StatusKeeper.StartProcessing(_cancellationSource.Token);
93 IndexLocalFiles(RootPath);
94 StartMonitoringFiles(RootPath);
101 private void EnsurePithosContainers()
103 CloudClient.UsePithos = this.UsePithos;
104 CloudClient.AuthenticationUrl = this.AuthenticationUrl;
105 CloudClient.Authenticate(UserName, ApiKey);
107 var pithosContainers = new[] {PithosContainer, TrashContainer};
108 foreach (var container in pithosContainers)
110 if (!CloudClient.ContainerExists(container))
111 CloudClient.CreateContainer(container);
115 public string AuthenticationUrl { get; set; }
117 private Uri ProxyFromSettings()
119 if (Settings.UseManualProxy)
121 var proxyUri = new UriBuilder
123 Host = Settings.ProxyServer,
124 Port = Settings.ProxyPort
126 if (Settings.ProxyAuthentication)
128 proxyUri.UserName = Settings.ProxyUsername;
129 proxyUri.Password = Settings.ProxyPassword;
136 private void IndexLocalFiles(string path)
138 Trace.TraceInformation("[START] Inxed Local");
142 from filePath in Directory.EnumerateFiles(path, "*", SearchOption.AllDirectories).AsParallel()
144 StatusKeeper.StoreUnversionedFiles(files);
146 RestartInterruptedFiles();
148 catch (Exception exc)
150 Trace.TraceError("[ERROR] Index Local - {0}", exc);
154 Trace.TraceInformation("[END] Inxed Local");
158 private void RestartInterruptedFiles()
160 var interruptedStates = new[] { FileOverlayStatus.Unversioned, FileOverlayStatus.Modified };
161 var filesQuery = from state in FileState.Queryable
162 where interruptedStates.Contains(state.OverlayStatus)
163 select new WorkflowState
165 Path = state.FilePath.ToLower(),
166 FileName = Path.GetFileName(state.FilePath).ToLower(),
167 Status=state.OverlayStatus==FileOverlayStatus.Unversioned?
170 TriggeringChange = state.OverlayStatus==FileOverlayStatus.Unversioned?
171 WatcherChangeTypes.Created:
172 WatcherChangeTypes.Changed
174 _uploadEvents.AddFromEnumerable(filesQuery,false);
178 private void StartStatusService()
180 // Create a ServiceHost for the CalculatorService type and provide the base address.
181 var baseAddress = new Uri("net.pipe://localhost/pithos");
182 _statusService = new ServiceHost(typeof(StatusService), baseAddress);
184 var binding = new NetNamedPipeBinding(NetNamedPipeSecurityMode.None);
186 _statusService.AddServiceEndpoint(typeof(IStatusService), binding, "net.pipe://localhost/pithos/statuscache");
187 _statusService.AddServiceEndpoint(typeof (ISettingsService), binding, "net.pipe://localhost/pithos/settings");
190 //// Add a mex endpoint
191 var smb = new ServiceMetadataBehavior
193 HttpGetEnabled = true,
194 HttpGetUrl = new Uri("http://localhost:30000/pithos/mex")
196 _statusService.Description.Behaviors.Add(smb);
199 _statusService.Open();
202 private void StopStatusService()
204 if (_statusService == null)
207 if (_statusService.State == CommunicationState.Faulted)
208 _statusService.Abort();
209 else if (_statusService.State != CommunicationState.Closed)
210 _statusService.Close();
211 _statusService = null;
216 private void StartNetwork()
219 bool connected = NetworkListManager.IsConnectedToInternet;
220 //If we are not connected retry later
223 Task.Factory.StartNewDelayed(10000, StartNetwork);
229 CloudClient.UsePithos = this.UsePithos;
230 CloudClient.AuthenticationUrl = this.AuthenticationUrl;
231 CloudClient.Authenticate(UserName, ApiKey);
233 StartListening(RootPath);
238 //Faild to authenticate due to network or account error
239 //Retry after a while
240 Task.Factory.StartNewDelayed(10000, StartNetwork);
244 public bool UsePithos { get; set; }
246 internal enum CloudActionType
251 DownloadUnconditional,
256 internal class ListenerAction
258 public CloudActionType Action { get; set; }
259 public FileInfo LocalFile { get; set; }
260 public ObjectInfo CloudFile { get; set; }
262 public Lazy<string> LocalHash { get; set; }
264 public ListenerAction(CloudActionType action, FileInfo localFile, ObjectInfo cloudFile)
267 LocalFile = localFile;
268 CloudFile = cloudFile;
269 LocalHash=new Lazy<string>(()=>Signature.CalculateHash(LocalFile.FullName),LazyThreadSafetyMode.ExecutionAndPublication);
274 internal class LocalFileComparer:EqualityComparer<ListenerAction>
276 public override bool Equals(ListenerAction x, ListenerAction y)
278 if (x.Action != y.Action)
280 if (x.LocalFile != null && y.LocalFile != null && !x.LocalFile.FullName.Equals(y.LocalFile.FullName))
282 if (x.CloudFile != null && y.CloudFile != null && !x.CloudFile.Hash.Equals(y.CloudFile.Hash))
284 if (x.CloudFile == null ^ y.CloudFile == null ||
285 x.LocalFile == null ^ y.LocalFile == null)
290 public override int GetHashCode(ListenerAction obj)
292 var hash1 = (obj.LocalFile == null) ? int.MaxValue : obj.LocalFile.FullName.GetHashCode();
293 var hash2 = (obj.CloudFile == null) ? int.MaxValue : obj.CloudFile.Hash.GetHashCode();
294 var hash3 = obj.Action.GetHashCode();
295 return hash1 ^ hash2 & hash3;
299 private BlockingCollection<ListenerAction> _networkActions=new BlockingCollection<ListenerAction>();
303 private void StartListening(string accountPath)
306 ProcessRemoteFiles(accountPath);
308 Task.Factory.StartNew(ProcessListenerActions);
312 private Task ProcessRemoteFiles(string accountPath)
314 Trace.TraceInformation("[LISTENER] Scheduled");
315 return Task.Factory.StartNewDelayed(10000)
316 .ContinueWith(t=>CloudClient.ListObjects(PithosContainer))
317 .ContinueWith(task =>
319 Trace.TraceInformation("[LISTENER] Start Processing");
321 var remoteObjects = task.Result;
323 if (remoteObjects.Count == 0)
327 var pithosDir = new DirectoryInfo(accountPath);
329 var remoteFiles = from info in remoteObjects
330 select info.Name.ToLower();
332 var onlyLocal = from localFile in pithosDir.EnumerateFiles()
333 where !remoteFiles.Contains(localFile.Name.ToLower())
334 select new ListenerAction(CloudActionType.UploadUnconditional, localFile,null);
338 var localNames = from info in pithosDir.EnumerateFiles()
339 select info.Name.ToLower();
341 var onlyRemote = from upFile in remoteObjects
342 where !localNames.Contains(upFile.Name.ToLower())
343 select new ListenerAction(CloudActionType.DownloadUnconditional,null,upFile);
346 var commonObjects = from upFile in remoteObjects
347 join localFile in pithosDir.EnumerateFiles()
348 on upFile.Name.ToLower() equals localFile.Name.ToLower()
349 select new ListenerAction(CloudActionType.Download, localFile, upFile);
352 onlyLocal.Union(onlyRemote).Union(commonObjects)
353 .Except(_networkActions,new LocalFileComparer());
355 _networkActions.AddFromEnumerable(uniques, false);
357 Trace.TraceInformation("[LISTENER] End Processing");
364 Trace.TraceError("[LISTENER] Exception: {0}",t.Exception);
368 Trace.TraceInformation("[LISTENER] Finished");
370 ProcessRemoteFiles(accountPath);
374 private void ProcessListenerActions()
376 foreach(var action in _networkActions.GetConsumingEnumerable())
378 Trace.TraceInformation("[ACTION] Start Processing {0}:{1}->{2}",action.Action,action.LocalFile,action.CloudFile.Name);
379 var localFile = action.LocalFile;
380 var cloudFile = action.CloudFile;
381 var downloadPath = (cloudFile == null)? String.Empty
382 : Path.Combine(RootPath,cloudFile.Name);
385 switch (action.Action)
387 case CloudActionType.UploadUnconditional:
389 UploadCloudFile(localFile.Name,localFile.Length,localFile.FullName,action.LocalHash.Value);
391 case CloudActionType.DownloadUnconditional:
392 DownloadCloudFile(PithosContainer,cloudFile.Name,downloadPath);
394 case CloudActionType.Download:
395 if (File.Exists(downloadPath))
397 if (cloudFile.Hash !=action.LocalHash.Value)
399 var lastLocalTime =localFile.LastWriteTime;
400 var lastUpTime =cloudFile.Last_Modified;
401 if (lastUpTime <=lastLocalTime)
404 StatusKeeper.SetFileOverlayStatus(downloadPath,FileOverlayStatus.Conflict);
407 DownloadCloudFile(PithosContainer,action.CloudFile.Name,downloadPath);
411 DownloadCloudFile(PithosContainer,action.CloudFile.Name,downloadPath);
414 Trace.TraceInformation("[ACTION] End Processing {0}:{1}->{2}", action.Action, action.LocalFile, action.CloudFile.Name);
416 catch (Exception exc)
418 Trace.TraceError("[REQUEUE] {0} : {1} -> {2} due to exception\r\n{3}",
419 action.Action, action.LocalFile,action.CloudFile,exc);
421 _networkActions.Add(action);
428 private void StartMonitoringFiles(string path)
430 _watcher = new FileSystemWatcher(path);
431 _watcher.Changed += OnFileEvent;
432 _watcher.Created += OnFileEvent;
433 _watcher.Deleted += OnFileEvent;
434 _watcher.Renamed += OnRenameEvent;
435 _watcher.EnableRaisingEvents = true;
437 Task.Factory.StartNew(ProcesFileEvents,_cancellationSource.Token);
440 private void ProcesFileEvents()
442 foreach (var state in _fileEvents.GetConsumingEnumerable())
446 var networkState=StatusKeeper.GetNetworkState(state.Path);
447 //Skip if the file is already being downloaded or uploaded and
448 //the change is create or modify
449 if (networkState != NetworkState.None &&
451 state.TriggeringChange==WatcherChangeTypes.Created ||
452 state.TriggeringChange==WatcherChangeTypes.Changed
455 UpdateFileStatus(state);
456 UpdateOverlayStatus(state);
457 UpdateFileChecksum(state);
458 _uploadEvents.Add(state);
460 catch (OperationCanceledException exc)
462 Trace.TraceError("[ERROR] File Event Processing:\r{0}", exc);
465 catch (Exception exc)
467 Trace.TraceError("[ERROR] File Event Processing:\r{0}",exc);
472 private void StartSending()
474 Task.Factory.StartNew(() =>
476 foreach (var state in _uploadEvents.GetConsumingEnumerable())
482 catch (OperationCanceledException)
488 Trace.TraceError("[ERROR] Synch for {0}:\r{1}",state.FileName,ex);
492 },_cancellationSource.Token);
496 private WorkflowState SynchToCloud(WorkflowState state)
500 string path = state.Path.ToLower();
501 string fileName = Path.GetFileName(path);
503 //Bypass deleted files, unless the status is Deleted
504 if (!(File.Exists(path) || state.Status == FileStatus.Deleted))
507 this.StatusKeeper.RemoveFileOverlayStatus(path);
513 case FileStatus.Created:
514 case FileStatus.Modified:
515 var info = new FileInfo(path);
516 long fileSize = info.Length;
517 UploadCloudFile(fileName, fileSize, path,state.Hash);
519 case FileStatus.Deleted:
520 DeleteCloudFile(fileName);
522 case FileStatus.Renamed:
523 RenameCloudFile(state);
529 private void RenameCloudFile(WorkflowState state)
531 this.StatusKeeper.SetFileOverlayStatus(state.Path, FileOverlayStatus.Modified);
535 CloudClient.MoveObject(PithosContainer, state.OldFileName,PithosContainer, state.FileName);
537 this.StatusKeeper.SetFileStatus(state.Path, FileStatus.Unchanged);
538 this.StatusKeeper.SetFileOverlayStatus(state.Path, FileOverlayStatus.Normal);
539 Workflow.RaiseChangeNotification(state.Path);
542 private void DeleteCloudFile(string fileName)
544 Contract.Requires(!Path.IsPathRooted(fileName));
546 this.StatusKeeper.SetFileOverlayStatus(fileName, FileOverlayStatus.Modified);
548 CloudClient.MoveObject(PithosContainer,fileName,TrashContainer,fileName);
549 this.StatusKeeper.ClearFileStatus(fileName);
550 this.StatusKeeper.RemoveFileOverlayStatus(fileName);
553 private void DownloadCloudFile(string container, string fileName, string localPath)
555 StatusKeeper.SetNetworkState(localPath,NetworkState.Downloading);
556 CloudClient.GetObject(container, fileName, localPath)
558 CloudClient.GetObjectInfo(container,fileName))
560 StatusKeeper.StoreInfo(fileName,t.Result))
562 StatusKeeper.SetNetworkState(localPath,NetworkState.None))
566 private void UploadCloudFile(string fileName, long fileSize, string path,string hash)
568 Contract.Requires(!Path.IsPathRooted(fileName));
570 StatusKeeper.SetNetworkState(fileName,NetworkState.Uploading);
572 //Even if GetObjectInfo times out, we can proceed with the upload
573 var info = CloudClient.GetObjectInfo(PithosContainer, fileName);
574 Task.Factory.StartNew(() =>
576 if (hash != info.Hash)
578 Task.Factory.StartNew(() =>
579 this.StatusKeeper.SetFileOverlayStatus(path, FileOverlayStatus.Modified))
581 CloudClient.PutObject(PithosContainer, fileName, path, hash));
585 this.StatusKeeper.StoreInfo(path,info);
590 this.StatusKeeper.SetFileState(path, FileStatus.Unchanged, FileOverlayStatus.Normal))
592 this.StatusKeeper.SetNetworkState(path,NetworkState.None))
594 Workflow.RaiseChangeNotification(path);
597 private Dictionary<WatcherChangeTypes, FileStatus> _statusDict = new Dictionary<WatcherChangeTypes, FileStatus>
599 {WatcherChangeTypes.Created,FileStatus.Created},
600 {WatcherChangeTypes.Changed,FileStatus.Modified},
601 {WatcherChangeTypes.Deleted,FileStatus.Deleted},
602 {WatcherChangeTypes.Renamed,FileStatus.Renamed}
605 private WorkflowState UpdateFileStatus(WorkflowState state)
607 string path = state.Path;
608 FileStatus status = _statusDict[state.TriggeringChange];
609 var oldStatus = Workflow.StatusKeeper.GetFileStatus(path);
610 if (status == oldStatus)
612 state.Status = status;
616 if (state.Status == FileStatus.Renamed)
617 Workflow.ClearFileStatus(path);
619 state.Status = Workflow.SetFileStatus(path, status);
623 private WorkflowState UpdateOverlayStatus(WorkflowState state)
628 switch (state.Status)
630 case FileStatus.Created:
631 case FileStatus.Modified:
632 this.StatusKeeper.SetFileOverlayStatus(state.Path, FileOverlayStatus.Modified);
634 case FileStatus.Deleted:
635 this.StatusKeeper.RemoveFileOverlayStatus(state.Path);
637 case FileStatus.Renamed:
638 this.StatusKeeper.RemoveFileOverlayStatus(state.OldPath);
639 this.StatusKeeper.SetFileOverlayStatus(state.Path, FileOverlayStatus.Modified);
641 case FileStatus.Unchanged:
642 this.StatusKeeper.SetFileOverlayStatus(state.Path, FileOverlayStatus.Normal);
646 if (state.Status==FileStatus.Deleted)
647 Workflow.RaiseChangeNotification(Path.GetDirectoryName(state.Path));
649 Workflow.RaiseChangeNotification(state.Path);
654 private WorkflowState UpdateFileChecksum(WorkflowState state)
659 if (state.Status == FileStatus.Deleted)
662 string path = state.Path;
663 string hash = Signature.CalculateHash(path);
665 StatusKeeper.UpdateFileChecksum(path, hash);
673 private FileSystemEventArgs CalculateSignature(FileSystemEventArgs arg)
675 Debug.WriteLine(String.Format("{0} {1} {2}", arg.ChangeType, arg.Name, arg.FullPath), "INFO");
679 void OnFileEvent(object sender, FileSystemEventArgs e)
681 _fileEvents.Add(new WorkflowState{Path=e.FullPath,FileName = e.Name,TriggeringChange=e.ChangeType});
684 void OnRenameEvent(object sender, RenamedEventArgs e)
686 _fileEvents.Add(new WorkflowState { OldPath=e.OldFullPath,OldFileName=e.OldName,
687 Path = e.FullPath, FileName = e.Name, TriggeringChange = e.ChangeType });
692 if (_watcher != null)
694 _watcher.Changed -= OnFileEvent;
695 _watcher.Created -= OnFileEvent;
696 _watcher.Deleted -= OnFileEvent;
697 _watcher.Renamed -= OnRenameEvent;
701 _fileEvents.CompleteAdding();
714 public void Dispose()
717 GC.SuppressFinalize(this);
720 protected virtual void Dispose(bool disposing)