using System; using System.Collections.Concurrent; using System.Collections.Generic; using System.ComponentModel.Composition; using System.Diagnostics; using System.Diagnostics.Contracts; using System.IO; using System.Linq; using System.Net.NetworkInformation; using System.Security.Cryptography; using System.Text; using System.Threading; using System.Threading.Tasks; using Microsoft.WindowsAPICodePack.Net; using Pithos.Interfaces; using System.ServiceModel; namespace Pithos.Core { [Export(typeof(PithosMonitor))] public class PithosMonitor:IDisposable { [Import] public IPithosSettings Settings{get;set;} [Import] public IStatusKeeper StatusKeeper { get; set; } [Import] public IPithosWorkflow Workflow { get; set; } [Import] public ICloudClient CloudClient { get; set; } [Import] public ICloudClient CloudListeningClient { get; set; } private ServiceHost _statusService { get; set; } private FileSystemWatcher _watcher; public bool Pause { get { return _watcher == null || !_watcher.EnableRaisingEvents; } set { if (_watcher!=null) _watcher.EnableRaisingEvents = !value; } } CancellationTokenSource _cancellationSource; readonly BlockingCollection _fileEvents = new BlockingCollection(); readonly BlockingCollection _uploadEvents = new BlockingCollection(); public void Start() { if (_cancellationSource != null) { if (!_cancellationSource.IsCancellationRequested) return; } _cancellationSource = new CancellationTokenSource(); string path = Settings.PithosPath; StartMonitoringFiles(path); StartStatusService(); StartNetwork(); } private void StartStatusService() { Uri baseAddress = new Uri("http://localhost:30000/pithos/statuscache"); string address = "net.pipe://localhost/pithos/statuscache"; // Create a ServiceHost for the CalculatorService type and provide the base address. _statusService= new ServiceHost(typeof(StatusService), baseAddress); NetNamedPipeBinding binding = new NetNamedPipeBinding(NetNamedPipeSecurityMode.None); _statusService.AddServiceEndpoint(typeof(IStatusService), binding, address); _statusService.Open(); } private void StopStatusService() { if (_statusService == null) return; if (_statusService.State == CommunicationState.Faulted) _statusService.Abort(); else if (_statusService.State != CommunicationState.Closed) _statusService.Close(); _statusService = null; } private void StartNetwork() { bool connected = NetworkListManager.IsConnectedToInternet; //If we are not connected retry later if (!connected) { Task.Factory.StartNewDelayed(10000, StartNetwork); return; } try { CloudClient.Authenticate(Settings.UserName, Settings.ApiKey); StartListening(); StartSending(); } catch (Exception) { //Faild to authenticate due to network or account error //Retry after a while Task.Factory.StartNewDelayed(10000, StartNetwork); } } internal enum CloudActionType { Upload=0, Download, UploadUnconditional, DownloadUnconditional, DeleteLocal, DeleteCloud } internal class ListenerAction { public CloudActionType Action { get; set; } public FileInfo LocalFile { get; set; } public ObjectInfo CloudFile { get; set; } public Lazy LocalHash { get; set; } public ListenerAction(CloudActionType action, FileInfo localFile, ObjectInfo cloudFile) { Action = action; LocalFile = localFile; CloudFile = cloudFile; LocalHash=new Lazy(()=>CalculateHash(LocalFile.FullName),LazyThreadSafetyMode.ExecutionAndPublication); } } internal class LocalFileComparer:EqualityComparer { public override bool Equals(ListenerAction x, ListenerAction y) { if (x.Action != y.Action) return false; if (x.LocalFile != null && y.LocalFile != null && !x.LocalFile.FullName.Equals(y.LocalFile.FullName)) return false; if (x.CloudFile != null && y.CloudFile != null && !x.CloudFile.Hash.Equals(y.CloudFile.Hash)) return false; if (x.CloudFile == null ^ y.CloudFile == null || x.LocalFile == null ^ y.LocalFile == null) return false; return true; } public override int GetHashCode(ListenerAction obj) { var hash1 = (obj.LocalFile == null) ? int.MaxValue : obj.LocalFile.FullName.GetHashCode(); var hash2 = (obj.CloudFile == null) ? int.MaxValue : obj.CloudFile.Hash.GetHashCode(); var hash3 = obj.Action.GetHashCode(); return hash1 ^ hash2 & hash3; } } private BlockingCollection _listenerActions=new BlockingCollection(); private Timer timer; private void StartListening() { Func listener = ()=>Task.Factory.StartNew(()=>CloudClient.ListObjects("PITHOS")) .ContinueWith(task => { var objects = task.Result; if (objects.Count == 0) return; var pithosDir = new DirectoryInfo(Settings.PithosPath); var upFiles = from info in objects select info.Name; var onlyLocal = from localFile in pithosDir.EnumerateFiles() where !upFiles.Contains(localFile.Name) select new ListenerAction(CloudActionType.UploadUnconditional, localFile,null); var localNames =pithosDir.EnumerateFiles().Select(info => info.Name); var onlyRemote = from upFile in objects where !localNames.Contains(upFile.Name) select new ListenerAction(CloudActionType.DownloadUnconditional,null,upFile); var existingObjects = from upFile in objects join localFile in pithosDir.EnumerateFiles() on upFile.Name equals localFile.Name select new ListenerAction(CloudActionType.Download, localFile, upFile); var uniques = onlyLocal.Union(onlyRemote).Union(existingObjects) .Except(_listenerActions,new LocalFileComparer()); _listenerActions.AddFromEnumerable(uniques, false); } ); Task.Factory.StartNew(() => { foreach (var action in _listenerActions.GetConsumingEnumerable()) { var localFile = action.LocalFile; var cloudFile = action.CloudFile; var downloadPath = (cloudFile==null)? String.Empty:Path.Combine(Settings.PithosPath,cloudFile.Name); try { switch (action.Action) { case CloudActionType.UploadUnconditional: UploadCloudFile(localFile.Name, localFile.Length, localFile.FullName, action.LocalHash.Value); break; case CloudActionType.DownloadUnconditional: DownloadCloudFile("PITHOS", cloudFile.Name, downloadPath); break; case CloudActionType.Download: if (File.Exists(downloadPath)) { if (cloudFile.Hash != action.LocalHash.Value) { var lastLocalTime = localFile.LastWriteTime; var lastUpTime = cloudFile.Last_Modified; if (lastUpTime <= lastLocalTime) { //Files in conflict StatusKeeper.SetFileOverlayStatus(downloadPath, FileOverlayStatus .Conflict); } else DownloadCloudFile("PITHOS", action.CloudFile.Name, downloadPath); } } else DownloadCloudFile("PITHOS", action.CloudFile.Name, downloadPath); break; } } catch (Exception exc) { Debug.WriteLine("Processing of {0}:{1}->{2} failed. Putting it back in the queue",action.Action,action.LocalFile,action.CloudFile); Debug.WriteLine(exc.ToString()); _listenerActions.Add(action); } } } ); timer = new Timer(o => listener(), null, TimeSpan.Zero, TimeSpan.FromSeconds(10)); } private void DownloadCloudFile(string container, string fileName, string localPath) { using (var upstream = CloudClient.GetObject(container, fileName)) using (var fileStream = File.OpenWrite(localPath)) { upstream.CopyTo(fileStream); } } private void StartMonitoringFiles(string path) { _watcher = new FileSystemWatcher(path); _watcher.Changed += OnFileEvent; _watcher.Created += OnFileEvent; _watcher.Deleted += OnFileEvent; _watcher.Renamed += OnRenameEvent; _watcher.EnableRaisingEvents = true; Task.Factory.StartNew(() => { foreach (var state in _fileEvents.GetConsumingEnumerable()) { try { UpdateFileStatus(state); UpdateOverlayStatus(state); UpdateFileChecksum(state); _uploadEvents.Add(state); } catch (OperationCanceledException) { throw; } catch(Exception ex) {} } },_cancellationSource.Token); } private void StartSending() { Task.Factory.StartNew(() => { foreach (var state in _uploadEvents.GetConsumingEnumerable()) { try { SynchToCloud(state); } catch (OperationCanceledException) { throw; } catch(Exception ex) {} } },_cancellationSource.Token); } private WorkflowState SynchToCloud(WorkflowState state) { if (state.Skip) return state; string path = state.Path; string fileName = Path.GetFileName(path); switch(state.Status) { case FileStatus.Created: case FileStatus.Modified: var info = new FileInfo(path); long fileSize = info.Length; UploadCloudFile(fileName, fileSize, path,state.Hash); break; case FileStatus.Deleted: DeleteCloudFile(fileName); break; case FileStatus.Renamed: RenameCloudFile(state); break; } return state; } private void RenameCloudFile(WorkflowState state) { this.StatusKeeper.SetFileOverlayStatus(state.Path, FileOverlayStatus.Synch); CloudClient.MoveObject("PITHOS", state.OldFileName, state.FileName); this.StatusKeeper.SetFileStatus(state.Path, FileStatus.Unchanged); this.StatusKeeper.SetFileOverlayStatus(state.Path, FileOverlayStatus.Normal); Workflow.RaiseChangeNotification(state.Path); } private void DeleteCloudFile(string fileName) { Contract.Requires(!Path.IsPathRooted(fileName)); this.StatusKeeper.SetFileOverlayStatus(fileName, FileOverlayStatus.Synch); CloudClient.DeleteObject("PITHOS", fileName); this.StatusKeeper.ClearFileStatus(fileName); this.StatusKeeper.RemoveFileOverlayStatus(fileName); } private void UploadCloudFile(string fileName, long fileSize, string path,string hash) { Contract.Requires(!Path.IsPathRooted(fileName)); //Even if GetObjectInfo times out, we can proceed with the upload var info=CloudClient.GetObjectInfo("PITHOS", fileName); if ( hash != info.Hash) { this.StatusKeeper.SetFileOverlayStatus(path, FileOverlayStatus.Synch); CloudClient.PutObject("PITHOS", fileName, path); } this.StatusKeeper.SetFileStatus(path,FileStatus.Unchanged); this.StatusKeeper.SetFileOverlayStatus(path,FileOverlayStatus.Normal); Workflow.RaiseChangeNotification(path); } private Dictionary _statusDict = new Dictionary { {WatcherChangeTypes.Created,FileStatus.Created}, {WatcherChangeTypes.Changed,FileStatus.Modified}, {WatcherChangeTypes.Deleted,FileStatus.Deleted}, {WatcherChangeTypes.Renamed,FileStatus.Renamed} }; private WorkflowState UpdateFileStatus(WorkflowState state) { string path = state.Path; FileStatus status = _statusDict[state.TriggeringChange]; var oldStatus = Workflow.StatusKeeper.GetFileStatus(path); if (status == oldStatus) { state.Status = status; state.Skip = true; return state; } if (state.Status == FileStatus.Renamed) Workflow.ClearFileStatus(path); state.Status = Workflow.SetFileStatus(path, status); return state; } private WorkflowState UpdateOverlayStatus(WorkflowState state) { if (state.Skip) return state; switch (state.Status) { case FileStatus.Created: case FileStatus.Modified: this.StatusKeeper.SetFileOverlayStatus(state.Path, FileOverlayStatus.Modified); break; case FileStatus.Deleted: this.StatusKeeper.RemoveFileOverlayStatus(state.Path); break; case FileStatus.Renamed: this.StatusKeeper.RemoveFileOverlayStatus(state.OldPath); this.StatusKeeper.SetFileOverlayStatus(state.Path, FileOverlayStatus.Modified); break; case FileStatus.Unchanged: this.StatusKeeper.SetFileOverlayStatus(state.Path, FileOverlayStatus.Normal); break; } if (state.Status==FileStatus.Deleted) Workflow.RaiseChangeNotification(Path.GetDirectoryName(state.Path)); else Workflow.RaiseChangeNotification(state.Path); return state; } private WorkflowState UpdateFileChecksum(WorkflowState state) { if (state.Skip) return state; if (state.Status == FileStatus.Deleted) return state; string path = state.Path; string hash = CalculateHash(path); StatusKeeper.UpdateFileChecksum(path, hash); state.Hash = hash; return state; } private static string CalculateHash(string path) { string hash; using (var hasher = MD5.Create()) using (var stream = new FileStream(path, FileMode.Open, FileAccess.Read, FileShare.ReadWrite, 4096, true)) { var hashBytes = hasher.ComputeHash(stream); var hashBuilder = new StringBuilder(); foreach (byte b in hasher.ComputeHash(stream)) hashBuilder.Append(b.ToString("x2").ToLower()); hash = hashBuilder.ToString(); } return hash; } private FileSystemEventArgs CalculateSignature(FileSystemEventArgs arg) { Debug.WriteLine(String.Format("{0} {1} {2}", arg.ChangeType, arg.Name, arg.FullPath), "INFO"); return arg; } void OnFileEvent(object sender, FileSystemEventArgs e) { _fileEvents.Add(new WorkflowState{Path=e.FullPath,FileName = e.Name,TriggeringChange=e.ChangeType}); } void OnRenameEvent(object sender, RenamedEventArgs e) { _fileEvents.Add(new WorkflowState { OldPath=e.OldFullPath,OldFileName=e.OldName, Path = e.FullPath, FileName = e.Name, TriggeringChange = e.ChangeType }); } public void Stop() { if (_watcher != null) { _watcher.Changed -= OnFileEvent; _watcher.Created -= OnFileEvent; _watcher.Deleted -= OnFileEvent; _watcher.Renamed -= OnRenameEvent; _watcher.Dispose(); } _watcher = null; _fileEvents.CompleteAdding(); if (timer != null) timer.Dispose(); timer = null; StopStatusService(); } ~PithosMonitor() { Dispose(false); } public void Dispose() { Dispose(true); GC.SuppressFinalize(this); } protected virtual void Dispose(bool disposing) { if (disposing) { Stop(); } } } }