X-Git-Url: https://code.grnet.gr/git/pithos-ms-client/blobdiff_plain/2967267201ba09a2d4029d9c5ddb125642ae98d2..7d915c34d5a6ec34fe8b5b821f89630eae739a32:/trunk/Pithos.Core/PithosMonitor.cs?ds=inline diff --git a/trunk/Pithos.Core/PithosMonitor.cs b/trunk/Pithos.Core/PithosMonitor.cs index 77674a0..831c082 100644 --- a/trunk/Pithos.Core/PithosMonitor.cs +++ b/trunk/Pithos.Core/PithosMonitor.cs @@ -8,16 +8,22 @@ using System.IO; using System.Linq; using System.Net.NetworkInformation; using System.Security.Cryptography; +using System.ServiceModel.Description; using System.Text; using System.Threading; using System.Threading.Tasks; +using Castle.ActiveRecord.Queries; +using Microsoft.WindowsAPICodePack.Net; using Pithos.Interfaces; +using System.ServiceModel; namespace Pithos.Core { [Export(typeof(PithosMonitor))] public class PithosMonitor:IDisposable { + private const string PithosContainer = "pithos"; + private const string TrashContainer = "trash"; [Import] public IPithosSettings Settings{get;set;} @@ -34,6 +40,11 @@ namespace Pithos.Core [Import] public ICloudClient CloudListeningClient { get; set; } + public string UserName { get; set; } + public string ApiKey { get; set; } + + private ServiceHost _statusService { get; set; } + private FileSystemWatcher _watcher; public bool Pause @@ -42,10 +53,20 @@ namespace Pithos.Core set { if (_watcher!=null) - _watcher.EnableRaisingEvents = !value; + _watcher.EnableRaisingEvents = !value; + if (value) + { + StatusKeeper.SetPithosStatus(PithosStatus.SyncPaused); + } + else + { + StatusKeeper.SetPithosStatus(PithosStatus.InSynch); + } } } + public string RootPath { get; set; } + CancellationTokenSource _cancellationSource; @@ -64,15 +85,138 @@ namespace Pithos.Core } _cancellationSource = new CancellationTokenSource(); - string path = Settings.PithosPath; - StartMonitoringFiles(path); + var proxyUri = ProxyFromSettings(); + CloudClient.Proxy = proxyUri; + CloudClient.UsePithos = this.UsePithos; + EnsurePithosContainers(); + StatusKeeper.StartProcessing(_cancellationSource.Token); + IndexLocalFiles(RootPath); + StartMonitoringFiles(RootPath); + + StartStatusService(); StartNetwork(); } + private void EnsurePithosContainers() + { + CloudClient.UsePithos = this.UsePithos; + CloudClient.AuthenticationUrl = this.AuthenticationUrl; + CloudClient.Authenticate(UserName, ApiKey); + + var pithosContainers = new[] {PithosContainer, TrashContainer}; + foreach (var container in pithosContainers) + { + if (!CloudClient.ContainerExists(container)) + CloudClient.CreateContainer(container); + } + } + + public string AuthenticationUrl { get; set; } + + private Uri ProxyFromSettings() + { + if (Settings.UseManualProxy) + { + var proxyUri = new UriBuilder + { + Host = Settings.ProxyServer, + Port = Settings.ProxyPort + }; + if (Settings.ProxyAuthentication) + { + proxyUri.UserName = Settings.ProxyUsername; + proxyUri.Password = Settings.ProxyPassword; + } + return proxyUri.Uri; + } + return null; + } + + private void IndexLocalFiles(string path) + { + Trace.TraceInformation("[START] Inxed Local"); + try + { + var files = + from filePath in Directory.EnumerateFiles(path, "*", SearchOption.AllDirectories).AsParallel() + select filePath; + StatusKeeper.StoreUnversionedFiles(files); + + RestartInterruptedFiles(); + } + catch (Exception exc) + { + Trace.TraceError("[ERROR] Index Local - {0}", exc); + } + finally + { + Trace.TraceInformation("[END] Inxed Local"); + } + } + + private void RestartInterruptedFiles() + { + var interruptedStates = new[] { FileOverlayStatus.Unversioned, FileOverlayStatus.Modified }; + var filesQuery = from state in FileState.Queryable + where interruptedStates.Contains(state.OverlayStatus) + select new WorkflowState + { + Path = state.FilePath.ToLower(), + FileName = Path.GetFileName(state.FilePath).ToLower(), + Status=state.OverlayStatus==FileOverlayStatus.Unversioned? + FileStatus.Created: + FileStatus.Modified, + TriggeringChange = state.OverlayStatus==FileOverlayStatus.Unversioned? + WatcherChangeTypes.Created: + WatcherChangeTypes.Changed + }; + _uploadEvents.AddFromEnumerable(filesQuery,false); + + } + + private void StartStatusService() + { + // Create a ServiceHost for the CalculatorService type and provide the base address. + var baseAddress = new Uri("net.pipe://localhost/pithos"); + _statusService = new ServiceHost(typeof(StatusService), baseAddress); + + var binding = new NetNamedPipeBinding(NetNamedPipeSecurityMode.None); + + _statusService.AddServiceEndpoint(typeof(IStatusService), binding, "net.pipe://localhost/pithos/statuscache"); + _statusService.AddServiceEndpoint(typeof (ISettingsService), binding, "net.pipe://localhost/pithos/settings"); + + + //// Add a mex endpoint + var smb = new ServiceMetadataBehavior + { + HttpGetEnabled = true, + HttpGetUrl = new Uri("http://localhost:30000/pithos/mex") + }; + _statusService.Description.Behaviors.Add(smb); + + + _statusService.Open(); + } + + private void StopStatusService() + { + if (_statusService == null) + return; + + if (_statusService.State == CommunicationState.Faulted) + _statusService.Abort(); + else if (_statusService.State != CommunicationState.Closed) + _statusService.Close(); + _statusService = null; + + } + + private void StartNetwork() { - bool connected = NetworkInterface.GetIsNetworkAvailable(); + + bool connected = NetworkListManager.IsConnectedToInternet; //If we are not connected retry later if (!connected) { @@ -82,9 +226,11 @@ namespace Pithos.Core try { - CloudClient.Authenticate(Settings.UserName, Settings.ApiKey); + CloudClient.UsePithos = this.UsePithos; + CloudClient.AuthenticationUrl = this.AuthenticationUrl; + CloudClient.Authenticate(UserName, ApiKey); - StartListening(); + StartListening(RootPath); StartSending(); } catch (Exception) @@ -95,6 +241,8 @@ namespace Pithos.Core } } + public bool UsePithos { get; set; } + internal enum CloudActionType { Upload=0, @@ -118,7 +266,7 @@ namespace Pithos.Core Action = action; LocalFile = localFile; CloudFile = cloudFile; - LocalHash=new Lazy(()=>CalculateHash(LocalFile.FullName),LazyThreadSafetyMode.ExecutionAndPublication); + LocalHash=new Lazy(()=>Signature.CalculateHash(LocalFile.FullName),LazyThreadSafetyMode.ExecutionAndPublication); } } @@ -148,107 +296,134 @@ namespace Pithos.Core } } - private BlockingCollection _listenerActions=new BlockingCollection(); + private BlockingCollection _networkActions=new BlockingCollection(); private Timer timer; - private void StartListening() + private void StartListening(string accountPath) { - Func listener = ()=>Task.Factory.StartNew(()=>CloudClient.ListObjects("PITHOS")) + ProcessRemoteFiles(accountPath); + + Task.Factory.StartNew(ProcessListenerActions); + + } + + private Task ProcessRemoteFiles(string accountPath) + { + Trace.TraceInformation("[LISTENER] Scheduled"); + return Task.Factory.StartNewDelayed(10000) + .ContinueWith(t=>CloudClient.ListObjects(PithosContainer)) .ContinueWith(task => { - - var objects = task.Result; - if (objects.Count == 0) + Trace.TraceInformation("[LISTENER] Start Processing"); + + var remoteObjects = task.Result; +/* + if (remoteObjects.Count == 0) return; +*/ - var pithosDir = new DirectoryInfo(Settings.PithosPath); + var pithosDir = new DirectoryInfo(accountPath); - var upFiles = from info in objects - select info.Name; + var remoteFiles = from info in remoteObjects + select info.Name.ToLower(); var onlyLocal = from localFile in pithosDir.EnumerateFiles() - where !upFiles.Contains(localFile.Name) - select new ListenerAction(CloudActionType.UploadUnconditional, localFile,null); - - - + where !remoteFiles.Contains(localFile.Name.ToLower()) + select new ListenerAction(CloudActionType.UploadUnconditional, localFile,ObjectInfo.Empty); - var localNames =pithosDir.EnumerateFiles().Select(info => info.Name); - var onlyRemote = from upFile in objects - where !localNames.Contains(upFile.Name) - select new ListenerAction(CloudActionType.DownloadUnconditional,null,upFile); - var existingObjects = from upFile in objects - join localFile in pithosDir.EnumerateFiles() - on upFile.Name equals localFile.Name - select new ListenerAction(CloudActionType.Download, localFile, upFile); + var localNames = from info in pithosDir.EnumerateFiles() + select info.Name.ToLower(); - var uniques = - onlyLocal.Union(onlyRemote).Union(existingObjects) - .Except(_listenerActions,new LocalFileComparer()); + var onlyRemote = from upFile in remoteObjects + where !localNames.Contains(upFile.Name.ToLower()) + select new ListenerAction(CloudActionType.DownloadUnconditional,new FileInfo(""), upFile); - _listenerActions.AddFromEnumerable(uniques, false); - - } - ); - Task.Factory.StartNew(() => - { - foreach (var action in _listenerActions.GetConsumingEnumerable()) - { - var localFile = action.LocalFile; - var cloudFile = action.CloudFile; - var downloadPath = (cloudFile==null)? String.Empty:Path.Combine(Settings.PithosPath,cloudFile.Name); + var commonObjects = from upFile in remoteObjects + join localFile in pithosDir.EnumerateFiles() + on upFile.Name.ToLower() equals localFile.Name.ToLower() + select new ListenerAction(CloudActionType.Download, localFile, upFile); - switch(action.Action) - { - case CloudActionType.UploadUnconditional: - - UploadCloudFile(localFile.Name, localFile.Length, localFile.FullName, action.LocalHash.Value); - break; - case CloudActionType.DownloadUnconditional: - DownloadCloudFile("PITHOS", cloudFile.Name, downloadPath); - break; - case CloudActionType.Download: - if (File.Exists(downloadPath)) - { - if (cloudFile.Hash != action.LocalHash.Value) - { - var lastLocalTime=localFile.LastWriteTime; - var lastUpTime=cloudFile.Last_Modified; - if(lastUpTime<=lastLocalTime) - { - //Files in conflict - StatusKeeper.SetFileOverlayStatus(downloadPath,FileOverlayStatus.Conflict); - } - else - DownloadCloudFile("PITHOS", action.CloudFile.Name, downloadPath); - } - } - else - DownloadCloudFile("PITHOS", action.CloudFile.Name, downloadPath); - break; - } - } - } - ); - - timer = new Timer(o => listener(), null, TimeSpan.Zero, TimeSpan.FromSeconds(10)); - + var uniques = + onlyLocal.Union(onlyRemote).Union(commonObjects) + .Except(_networkActions,new LocalFileComparer()); + + _networkActions.AddFromEnumerable(uniques, false); + + Trace.TraceInformation("[LISTENER] End Processing"); + + } + ).ContinueWith(t=> + { + if (t.IsFaulted) + { + Trace.TraceError("[LISTENER] Exception: {0}",t.Exception); + } + else + { + Trace.TraceInformation("[LISTENER] Finished"); + } + ProcessRemoteFiles(accountPath); + }); } - private void DownloadCloudFile(string container, string fileName, string localPath) + private void ProcessListenerActions() { - using (var upstream = CloudClient.GetObject(container, fileName)) - using (var fileStream = File.OpenWrite(localPath)) + foreach(var action in _networkActions.GetConsumingEnumerable()) { - upstream.CopyTo(fileStream); + Trace.TraceInformation("[ACTION] Start Processing {0}:{1}->{2}",action.Action,action.LocalFile,action.CloudFile.Name); + var localFile = action.LocalFile; + var cloudFile = action.CloudFile; + var downloadPath = (cloudFile == null)? String.Empty + : Path.Combine(RootPath,cloudFile.Name); + try + { + switch (action.Action) + { + case CloudActionType.UploadUnconditional: + UploadCloudFile(localFile.Name,localFile.Length,localFile.FullName,action.LocalHash.Value); + break; + case CloudActionType.DownloadUnconditional: + DownloadCloudFile(PithosContainer,cloudFile.Name,downloadPath); + break; + case CloudActionType.Download: + if (File.Exists(downloadPath)) + { + if (cloudFile.Hash !=action.LocalHash.Value) + { + var lastLocalTime =localFile.LastWriteTime; + var lastUpTime =cloudFile.Last_Modified; + if (lastUpTime <=lastLocalTime) + { + //Files in conflict + StatusKeeper.SetFileOverlayStatus(downloadPath,FileOverlayStatus.Conflict); + } + else + DownloadCloudFile(PithosContainer,action.CloudFile.Name,downloadPath); + } + } + else + DownloadCloudFile(PithosContainer,action.CloudFile.Name,downloadPath); + break; + } + Trace.TraceInformation("[ACTION] End Processing {0}:{1}->{2}", action.Action, action.LocalFile, action.CloudFile.Name); + } + catch (Exception exc) + { + Trace.TraceError("[REQUEUE] {0} : {1} -> {2} due to exception\r\n{3}", + action.Action, action.LocalFile,action.CloudFile,exc); + + _networkActions.Add(action); + } } } + + private void StartMonitoringFiles(string path) { _watcher = new FileSystemWatcher(path); @@ -258,26 +433,39 @@ namespace Pithos.Core _watcher.Renamed += OnRenameEvent; _watcher.EnableRaisingEvents = true; - Task.Factory.StartNew(() => - { - foreach (var state in _fileEvents.GetConsumingEnumerable()) - { - try - { - UpdateFileStatus(state); - UpdateOverlayStatus(state); - UpdateFileChecksum(state); - _uploadEvents.Add(state); - } - catch (OperationCanceledException) - { - throw; - } - catch(Exception ex) - {} - } - - },_cancellationSource.Token); + Task.Factory.StartNew(ProcesFileEvents,_cancellationSource.Token); + } + + private void ProcesFileEvents() + { + foreach (var state in _fileEvents.GetConsumingEnumerable()) + { + try + { + var networkState=StatusKeeper.GetNetworkState(state.Path); + //Skip if the file is already being downloaded or uploaded and + //the change is create or modify + if (networkState != NetworkState.None && + ( + state.TriggeringChange==WatcherChangeTypes.Created || + state.TriggeringChange==WatcherChangeTypes.Changed + )) + continue; + UpdateFileStatus(state); + UpdateOverlayStatus(state); + UpdateFileChecksum(state); + _uploadEvents.Add(state); + } + catch (OperationCanceledException exc) + { + Trace.TraceError("[ERROR] File Event Processing:\r{0}", exc); + throw; + } + catch (Exception exc) + { + Trace.TraceError("[ERROR] File Event Processing:\r{0}",exc); + } + } } private void StartSending() @@ -295,7 +483,9 @@ namespace Pithos.Core throw; } catch(Exception ex) - {} + { + Trace.TraceError("[ERROR] Synch for {0}:\r{1}",state.FileName,ex); + } } },_cancellationSource.Token); @@ -306,9 +496,17 @@ namespace Pithos.Core { if (state.Skip) return state; - string path = state.Path; + string path = state.Path.ToLower(); string fileName = Path.GetFileName(path); + //Bypass deleted files, unless the status is Deleted + if (!(File.Exists(path) || state.Status == FileStatus.Deleted)) + { + state.Skip = true; + this.StatusKeeper.RemoveFileOverlayStatus(path); + return state; + } + switch(state.Status) { case FileStatus.Created: @@ -329,11 +527,11 @@ namespace Pithos.Core private void RenameCloudFile(WorkflowState state) { - this.StatusKeeper.SetFileOverlayStatus(state.Path, FileOverlayStatus.Synch); + this.StatusKeeper.SetFileOverlayStatus(state.Path, FileOverlayStatus.Modified); - CloudClient.MoveObject("PITHOS", state.OldFileName, state.FileName); + CloudClient.MoveObject(PithosContainer, state.OldFileName,PithosContainer, state.FileName); this.StatusKeeper.SetFileStatus(state.Path, FileStatus.Unchanged); this.StatusKeeper.SetFileOverlayStatus(state.Path, FileOverlayStatus.Normal); @@ -344,27 +542,62 @@ namespace Pithos.Core { Contract.Requires(!Path.IsPathRooted(fileName)); - this.StatusKeeper.SetFileOverlayStatus(fileName, FileOverlayStatus.Synch); - CloudClient.DeleteObject("PITHOS", fileName); + this.StatusKeeper.SetFileOverlayStatus(fileName, FileOverlayStatus.Modified); + + CloudClient.MoveObject(PithosContainer,fileName,TrashContainer,fileName); this.StatusKeeper.ClearFileStatus(fileName); this.StatusKeeper.RemoveFileOverlayStatus(fileName); } + private void DownloadCloudFile(string container, string fileName, string localPath) + { + var state = StatusKeeper.GetNetworkState(fileName); + //Abort if the file is already being uploaded or downloaded + if (state != NetworkState.None) + return; + + StatusKeeper.SetNetworkState(localPath,NetworkState.Downloading); + CloudClient.GetObject(container, fileName, localPath) + .ContinueWith(t=> + CloudClient.GetObjectInfo(container,fileName)) + .ContinueWith(t=> + StatusKeeper.StoreInfo(fileName,t.Result)) + .ContinueWith(t=> + StatusKeeper.SetNetworkState(localPath,NetworkState.None)) + .Wait(); + } + private void UploadCloudFile(string fileName, long fileSize, string path,string hash) { Contract.Requires(!Path.IsPathRooted(fileName)); - //Even if GetObjectInfo times out, we can proceed with the upload - var info=CloudClient.GetObjectInfo("PITHOS", fileName); - if ( hash != info.Hash) + var state=StatusKeeper.GetNetworkState(fileName); + //Abort if the file is already being uploaded or downloaded + if (state != NetworkState.None) + return; + + StatusKeeper.SetNetworkState(fileName,NetworkState.Uploading); + + //Even if GetObjectInfo times out, we can proceed with the upload + var info = CloudClient.GetObjectInfo(PithosContainer, fileName); + Task.Factory.StartNew(() => { - this.StatusKeeper.SetFileOverlayStatus(path, FileOverlayStatus.Synch); - using (var stream = File.OpenRead(path)) + if (hash != info.Hash) { - CloudClient.PutObject("PITHOS", fileName, stream, fileSize); + Task.Factory.StartNew(() => + this.StatusKeeper.SetFileOverlayStatus(path, FileOverlayStatus.Modified)) + .ContinueWith(t => + CloudClient.PutObject(PithosContainer, fileName, path, hash)); } - } - this.StatusKeeper.SetFileStatus(path,FileStatus.Unchanged); - this.StatusKeeper.SetFileOverlayStatus(path,FileOverlayStatus.Normal); + else + { + this.StatusKeeper.StoreInfo(path,info); + } + }) + .ContinueWith(t => + this.StatusKeeper.SetFileState(path, FileStatus.Unchanged, FileOverlayStatus.Normal)) + .ContinueWith(t=> + this.StatusKeeper.SetNetworkState(path,NetworkState.None)) + .Wait(); Workflow.RaiseChangeNotification(path); } @@ -434,7 +667,7 @@ namespace Pithos.Core return state; string path = state.Path; - string hash = CalculateHash(path); + string hash = Signature.CalculateHash(path); StatusKeeper.UpdateFileChecksum(path, hash); @@ -442,21 +675,7 @@ namespace Pithos.Core return state; } - private static string CalculateHash(string path) - { - string hash; - using (var hasher = MD5.Create()) - using (var stream = new FileStream(path, FileMode.Open, FileAccess.Read, FileShare.ReadWrite, 4096, true)) - { - var hashBytes = hasher.ComputeHash(stream); - var hashBuilder = new StringBuilder(); - foreach (byte b in hasher.ComputeHash(stream)) - hashBuilder.Append(b.ToString("x2").ToLower()); - hash = hashBuilder.ToString(); - - } - return hash; - } + private FileSystemEventArgs CalculateSignature(FileSystemEventArgs arg) { @@ -490,8 +709,10 @@ namespace Pithos.Core if (timer != null) timer.Dispose(); timer = null; + StopStatusService(); } + ~PithosMonitor() { Dispose(false);