using System; using System.Collections.Concurrent; using System.Collections.Generic; using System.ComponentModel.Composition; using System.Diagnostics; using System.Diagnostics.Contracts; using System.IO; using System.Linq; using System.Net.NetworkInformation; using System.Security.Cryptography; using System.ServiceModel.Description; using System.Text; using System.Threading; using System.Threading.Tasks; using Castle.ActiveRecord.Queries; using Microsoft.WindowsAPICodePack.Net; using Pithos.Interfaces; using System.ServiceModel; namespace Pithos.Core { [Export(typeof(PithosMonitor))] public class PithosMonitor:IDisposable { private const string PithosContainer = "pithos"; private const string TrashContainer = "trash"; [Import] public IPithosSettings Settings{get;set;} [Import] public IStatusKeeper StatusKeeper { get; set; } [Import] public IPithosWorkflow Workflow { get; set; } [Import] public ICloudClient CloudClient { get; set; } [Import] public ICloudClient CloudListeningClient { get; set; } public string UserName { get; set; } public string ApiKey { get; set; } private ServiceHost _statusService { get; set; } private FileSystemWatcher _watcher; public bool Pause { get { return _watcher == null || !_watcher.EnableRaisingEvents; } set { if (_watcher!=null) _watcher.EnableRaisingEvents = !value; if (value) { StatusKeeper.SetPithosStatus(PithosStatus.SyncPaused); } else { StatusKeeper.SetPithosStatus(PithosStatus.InSynch); } } } public string RootPath { get; set; } CancellationTokenSource _cancellationSource; readonly BlockingCollection _fileEvents = new BlockingCollection(); readonly BlockingCollection _uploadEvents = new BlockingCollection(); public void Start() { if (_cancellationSource != null) { if (!_cancellationSource.IsCancellationRequested) return; } _cancellationSource = new CancellationTokenSource(); var proxyUri = ProxyFromSettings(); CloudClient.Proxy = proxyUri; CloudClient.UsePithos = this.UsePithos; EnsurePithosContainers(); StatusKeeper.StartProcessing(_cancellationSource.Token); IndexLocalFiles(RootPath); StartMonitoringFiles(RootPath); StartStatusService(); StartNetwork(); } private void EnsurePithosContainers() { CloudClient.UsePithos = this.UsePithos; CloudClient.AuthenticationUrl = this.AuthenticationUrl; CloudClient.Authenticate(UserName, ApiKey); var pithosContainers = new[] {PithosContainer, TrashContainer}; foreach (var container in pithosContainers) { if (!CloudClient.ContainerExists(container)) CloudClient.CreateContainer(container); } } public string AuthenticationUrl { get; set; } private Uri ProxyFromSettings() { if (Settings.UseManualProxy) { var proxyUri = new UriBuilder { Host = Settings.ProxyServer, Port = Settings.ProxyPort }; if (Settings.ProxyAuthentication) { proxyUri.UserName = Settings.ProxyUsername; proxyUri.Password = Settings.ProxyPassword; } return proxyUri.Uri; } return null; } private void IndexLocalFiles(string path) { Trace.TraceInformation("[START] Inxed Local"); try { var files = from filePath in Directory.EnumerateFiles(path, "*", SearchOption.AllDirectories).AsParallel() select filePath; StatusKeeper.StoreUnversionedFiles(files); RestartInterruptedFiles(); } catch (Exception exc) { Trace.TraceError("[ERROR] Index Local - {0}", exc); } finally { Trace.TraceInformation("[END] Inxed Local"); } } private void RestartInterruptedFiles() { var interruptedStates = new[] { FileOverlayStatus.Unversioned, FileOverlayStatus.Modified }; var filesQuery = from state in FileState.Queryable where interruptedStates.Contains(state.OverlayStatus) select new WorkflowState { Path = state.FilePath.ToLower(), FileName = Path.GetFileName(state.FilePath).ToLower(), Status=state.OverlayStatus==FileOverlayStatus.Unversioned? FileStatus.Created: FileStatus.Modified, TriggeringChange = state.OverlayStatus==FileOverlayStatus.Unversioned? WatcherChangeTypes.Created: WatcherChangeTypes.Changed }; _uploadEvents.AddFromEnumerable(filesQuery,false); } private void StartStatusService() { // Create a ServiceHost for the CalculatorService type and provide the base address. var baseAddress = new Uri("net.pipe://localhost/pithos"); _statusService = new ServiceHost(typeof(StatusService), baseAddress); var binding = new NetNamedPipeBinding(NetNamedPipeSecurityMode.None); _statusService.AddServiceEndpoint(typeof(IStatusService), binding, "net.pipe://localhost/pithos/statuscache"); _statusService.AddServiceEndpoint(typeof (ISettingsService), binding, "net.pipe://localhost/pithos/settings"); //// Add a mex endpoint var smb = new ServiceMetadataBehavior { HttpGetEnabled = true, HttpGetUrl = new Uri("http://localhost:30000/pithos/mex") }; _statusService.Description.Behaviors.Add(smb); _statusService.Open(); } private void StopStatusService() { if (_statusService == null) return; if (_statusService.State == CommunicationState.Faulted) _statusService.Abort(); else if (_statusService.State != CommunicationState.Closed) _statusService.Close(); _statusService = null; } private void StartNetwork() { bool connected = NetworkListManager.IsConnectedToInternet; //If we are not connected retry later if (!connected) { Task.Factory.StartNewDelayed(10000, StartNetwork); return; } try { CloudClient.UsePithos = this.UsePithos; CloudClient.AuthenticationUrl = this.AuthenticationUrl; CloudClient.Authenticate(UserName, ApiKey); StartListening(RootPath); StartSending(); } catch (Exception) { //Faild to authenticate due to network or account error //Retry after a while Task.Factory.StartNewDelayed(10000, StartNetwork); } } public bool UsePithos { get; set; } internal enum CloudActionType { Upload=0, Download, UploadUnconditional, DownloadUnconditional, DeleteLocal, DeleteCloud } internal class ListenerAction { public CloudActionType Action { get; set; } public FileInfo LocalFile { get; set; } public ObjectInfo CloudFile { get; set; } public Lazy LocalHash { get; set; } public ListenerAction(CloudActionType action, FileInfo localFile, ObjectInfo cloudFile) { Action = action; LocalFile = localFile; CloudFile = cloudFile; LocalHash=new Lazy(()=>Signature.CalculateHash(LocalFile.FullName),LazyThreadSafetyMode.ExecutionAndPublication); } } internal class LocalFileComparer:EqualityComparer { public override bool Equals(ListenerAction x, ListenerAction y) { if (x.Action != y.Action) return false; if (x.LocalFile != null && y.LocalFile != null && !x.LocalFile.FullName.Equals(y.LocalFile.FullName)) return false; if (x.CloudFile != null && y.CloudFile != null && !x.CloudFile.Hash.Equals(y.CloudFile.Hash)) return false; if (x.CloudFile == null ^ y.CloudFile == null || x.LocalFile == null ^ y.LocalFile == null) return false; return true; } public override int GetHashCode(ListenerAction obj) { var hash1 = (obj.LocalFile == null) ? int.MaxValue : obj.LocalFile.FullName.GetHashCode(); var hash2 = (obj.CloudFile == null) ? int.MaxValue : obj.CloudFile.Hash.GetHashCode(); var hash3 = obj.Action.GetHashCode(); return hash1 ^ hash2 & hash3; } } private BlockingCollection _networkActions=new BlockingCollection(); private Timer timer; private void StartListening(string accountPath) { ProcessRemoteFiles(accountPath); Task.Factory.StartNew(ProcessListenerActions); } private Task ProcessRemoteFiles(string accountPath) { Trace.TraceInformation("[LISTENER] Scheduled"); return Task.Factory.StartNewDelayed(10000) .ContinueWith(t=>CloudClient.ListObjects(PithosContainer)) .ContinueWith(task => { Trace.TraceInformation("[LISTENER] Start Processing"); var remoteObjects = task.Result; /* if (remoteObjects.Count == 0) return; */ var pithosDir = new DirectoryInfo(accountPath); var remoteFiles = from info in remoteObjects select info.Name.ToLower(); var onlyLocal = from localFile in pithosDir.EnumerateFiles() where !remoteFiles.Contains(localFile.Name.ToLower()) select new ListenerAction(CloudActionType.UploadUnconditional, localFile,ObjectInfo.Empty); var localNames = from info in pithosDir.EnumerateFiles() select info.Name.ToLower(); var onlyRemote = from upFile in remoteObjects where !localNames.Contains(upFile.Name.ToLower()) select new ListenerAction(CloudActionType.DownloadUnconditional,new FileInfo(""), upFile); var commonObjects = from upFile in remoteObjects join localFile in pithosDir.EnumerateFiles() on upFile.Name.ToLower() equals localFile.Name.ToLower() select new ListenerAction(CloudActionType.Download, localFile, upFile); var uniques = onlyLocal.Union(onlyRemote).Union(commonObjects) .Except(_networkActions,new LocalFileComparer()); _networkActions.AddFromEnumerable(uniques, false); Trace.TraceInformation("[LISTENER] End Processing"); } ).ContinueWith(t=> { if (t.IsFaulted) { Trace.TraceError("[LISTENER] Exception: {0}",t.Exception); } else { Trace.TraceInformation("[LISTENER] Finished"); } ProcessRemoteFiles(accountPath); }); } private void ProcessListenerActions() { foreach(var action in _networkActions.GetConsumingEnumerable()) { Trace.TraceInformation("[ACTION] Start Processing {0}:{1}->{2}",action.Action,action.LocalFile,action.CloudFile.Name); var localFile = action.LocalFile; var cloudFile = action.CloudFile; var downloadPath = (cloudFile == null)? String.Empty : Path.Combine(RootPath,cloudFile.Name); try { switch (action.Action) { case CloudActionType.UploadUnconditional: UploadCloudFile(localFile.Name,localFile.Length,localFile.FullName,action.LocalHash.Value); break; case CloudActionType.DownloadUnconditional: DownloadCloudFile(PithosContainer,cloudFile.Name,downloadPath); break; case CloudActionType.Download: if (File.Exists(downloadPath)) { if (cloudFile.Hash !=action.LocalHash.Value) { var lastLocalTime =localFile.LastWriteTime; var lastUpTime =cloudFile.Last_Modified; if (lastUpTime <=lastLocalTime) { //Files in conflict StatusKeeper.SetFileOverlayStatus(downloadPath,FileOverlayStatus.Conflict); } else DownloadCloudFile(PithosContainer,action.CloudFile.Name,downloadPath); } } else DownloadCloudFile(PithosContainer,action.CloudFile.Name,downloadPath); break; } Trace.TraceInformation("[ACTION] End Processing {0}:{1}->{2}", action.Action, action.LocalFile, action.CloudFile.Name); } catch (Exception exc) { Trace.TraceError("[REQUEUE] {0} : {1} -> {2} due to exception\r\n{3}", action.Action, action.LocalFile,action.CloudFile,exc); _networkActions.Add(action); } } } private void StartMonitoringFiles(string path) { _watcher = new FileSystemWatcher(path); _watcher.Changed += OnFileEvent; _watcher.Created += OnFileEvent; _watcher.Deleted += OnFileEvent; _watcher.Renamed += OnRenameEvent; _watcher.EnableRaisingEvents = true; Task.Factory.StartNew(ProcesFileEvents,_cancellationSource.Token); } private void ProcesFileEvents() { foreach (var state in _fileEvents.GetConsumingEnumerable()) { try { var networkState=StatusKeeper.GetNetworkState(state.Path); //Skip if the file is already being downloaded or uploaded and //the change is create or modify if (networkState != NetworkState.None && ( state.TriggeringChange==WatcherChangeTypes.Created || state.TriggeringChange==WatcherChangeTypes.Changed )) continue; UpdateFileStatus(state); UpdateOverlayStatus(state); UpdateFileChecksum(state); _uploadEvents.Add(state); } catch (OperationCanceledException exc) { Trace.TraceError("[ERROR] File Event Processing:\r{0}", exc); throw; } catch (Exception exc) { Trace.TraceError("[ERROR] File Event Processing:\r{0}",exc); } } } private void StartSending() { Task.Factory.StartNew(() => { foreach (var state in _uploadEvents.GetConsumingEnumerable()) { try { SynchToCloud(state); } catch (OperationCanceledException) { throw; } catch(Exception ex) { Trace.TraceError("[ERROR] Synch for {0}:\r{1}",state.FileName,ex); } } },_cancellationSource.Token); } private WorkflowState SynchToCloud(WorkflowState state) { if (state.Skip) return state; string path = state.Path.ToLower(); string fileName = Path.GetFileName(path); //Bypass deleted files, unless the status is Deleted if (!(File.Exists(path) || state.Status == FileStatus.Deleted)) { state.Skip = true; this.StatusKeeper.RemoveFileOverlayStatus(path); return state; } switch(state.Status) { case FileStatus.Created: case FileStatus.Modified: var info = new FileInfo(path); long fileSize = info.Length; UploadCloudFile(fileName, fileSize, path,state.Hash); break; case FileStatus.Deleted: DeleteCloudFile(fileName); break; case FileStatus.Renamed: RenameCloudFile(state); break; } return state; } private void RenameCloudFile(WorkflowState state) { this.StatusKeeper.SetFileOverlayStatus(state.Path, FileOverlayStatus.Modified); CloudClient.MoveObject(PithosContainer, state.OldFileName,PithosContainer, state.FileName); this.StatusKeeper.SetFileStatus(state.Path, FileStatus.Unchanged); this.StatusKeeper.SetFileOverlayStatus(state.Path, FileOverlayStatus.Normal); Workflow.RaiseChangeNotification(state.Path); } private void DeleteCloudFile(string fileName) { Contract.Requires(!Path.IsPathRooted(fileName)); this.StatusKeeper.SetFileOverlayStatus(fileName, FileOverlayStatus.Modified); CloudClient.MoveObject(PithosContainer,fileName,TrashContainer,fileName); this.StatusKeeper.ClearFileStatus(fileName); this.StatusKeeper.RemoveFileOverlayStatus(fileName); } private void DownloadCloudFile(string container, string fileName, string localPath) { var state = StatusKeeper.GetNetworkState(fileName); //Abort if the file is already being uploaded or downloaded if (state != NetworkState.None) return; StatusKeeper.SetNetworkState(localPath,NetworkState.Downloading); CloudClient.GetObject(container, fileName, localPath) .ContinueWith(t=> CloudClient.GetObjectInfo(container,fileName)) .ContinueWith(t=> StatusKeeper.StoreInfo(fileName,t.Result)) .ContinueWith(t=> StatusKeeper.SetNetworkState(localPath,NetworkState.None)) .Wait(); } private void UploadCloudFile(string fileName, long fileSize, string path,string hash) { Contract.Requires(!Path.IsPathRooted(fileName)); var state=StatusKeeper.GetNetworkState(fileName); //Abort if the file is already being uploaded or downloaded if (state != NetworkState.None) return; StatusKeeper.SetNetworkState(fileName,NetworkState.Uploading); //Even if GetObjectInfo times out, we can proceed with the upload var info = CloudClient.GetObjectInfo(PithosContainer, fileName); Task.Factory.StartNew(() => { if (hash != info.Hash) { Task.Factory.StartNew(() => this.StatusKeeper.SetFileOverlayStatus(path, FileOverlayStatus.Modified)) .ContinueWith(t => CloudClient.PutObject(PithosContainer, fileName, path, hash)); } else { this.StatusKeeper.StoreInfo(path,info); } }) .ContinueWith(t => this.StatusKeeper.SetFileState(path, FileStatus.Unchanged, FileOverlayStatus.Normal)) .ContinueWith(t=> this.StatusKeeper.SetNetworkState(path,NetworkState.None)) .Wait(); Workflow.RaiseChangeNotification(path); } private Dictionary _statusDict = new Dictionary { {WatcherChangeTypes.Created,FileStatus.Created}, {WatcherChangeTypes.Changed,FileStatus.Modified}, {WatcherChangeTypes.Deleted,FileStatus.Deleted}, {WatcherChangeTypes.Renamed,FileStatus.Renamed} }; private WorkflowState UpdateFileStatus(WorkflowState state) { string path = state.Path; FileStatus status = _statusDict[state.TriggeringChange]; var oldStatus = Workflow.StatusKeeper.GetFileStatus(path); if (status == oldStatus) { state.Status = status; state.Skip = true; return state; } if (state.Status == FileStatus.Renamed) Workflow.ClearFileStatus(path); state.Status = Workflow.SetFileStatus(path, status); return state; } private WorkflowState UpdateOverlayStatus(WorkflowState state) { if (state.Skip) return state; switch (state.Status) { case FileStatus.Created: case FileStatus.Modified: this.StatusKeeper.SetFileOverlayStatus(state.Path, FileOverlayStatus.Modified); break; case FileStatus.Deleted: this.StatusKeeper.RemoveFileOverlayStatus(state.Path); break; case FileStatus.Renamed: this.StatusKeeper.RemoveFileOverlayStatus(state.OldPath); this.StatusKeeper.SetFileOverlayStatus(state.Path, FileOverlayStatus.Modified); break; case FileStatus.Unchanged: this.StatusKeeper.SetFileOverlayStatus(state.Path, FileOverlayStatus.Normal); break; } if (state.Status==FileStatus.Deleted) Workflow.RaiseChangeNotification(Path.GetDirectoryName(state.Path)); else Workflow.RaiseChangeNotification(state.Path); return state; } private WorkflowState UpdateFileChecksum(WorkflowState state) { if (state.Skip) return state; if (state.Status == FileStatus.Deleted) return state; string path = state.Path; string hash = Signature.CalculateHash(path); StatusKeeper.UpdateFileChecksum(path, hash); state.Hash = hash; return state; } private FileSystemEventArgs CalculateSignature(FileSystemEventArgs arg) { Debug.WriteLine(String.Format("{0} {1} {2}", arg.ChangeType, arg.Name, arg.FullPath), "INFO"); return arg; } void OnFileEvent(object sender, FileSystemEventArgs e) { _fileEvents.Add(new WorkflowState{Path=e.FullPath,FileName = e.Name,TriggeringChange=e.ChangeType}); } void OnRenameEvent(object sender, RenamedEventArgs e) { _fileEvents.Add(new WorkflowState { OldPath=e.OldFullPath,OldFileName=e.OldName, Path = e.FullPath, FileName = e.Name, TriggeringChange = e.ChangeType }); } public void Stop() { if (_watcher != null) { _watcher.Changed -= OnFileEvent; _watcher.Created -= OnFileEvent; _watcher.Deleted -= OnFileEvent; _watcher.Renamed -= OnRenameEvent; _watcher.Dispose(); } _watcher = null; _fileEvents.CompleteAdding(); if (timer != null) timer.Dispose(); timer = null; StopStatusService(); } ~PithosMonitor() { Dispose(false); } public void Dispose() { Dispose(true); GC.SuppressFinalize(this); } protected virtual void Dispose(bool disposing) { if (disposing) { Stop(); } } } }