Changed the retry function in PithosClient to use the TPL
[pithos-ms-client] / trunk / Pithos.Core / PithosMonitor.cs
index 2d53387..831c082 100644 (file)
@@ -12,6 +12,7 @@ using System.ServiceModel.Description;
 using System.Text;
 using System.Threading;
 using System.Threading.Tasks;
+using Castle.ActiveRecord.Queries;
 using Microsoft.WindowsAPICodePack.Net;
 using Pithos.Interfaces;
 using System.ServiceModel;
@@ -21,6 +22,9 @@ namespace Pithos.Core
     [Export(typeof(PithosMonitor))]
     public class PithosMonitor:IDisposable
     {
+        private const string PithosContainer = "pithos";
+        private const string TrashContainer = "trash";
+
         [Import]
         public IPithosSettings Settings{get;set;}
 
@@ -36,6 +40,9 @@ namespace Pithos.Core
         [Import]
         public ICloudClient CloudListeningClient { get; set; }
 
+        public string UserName { get; set; }
+        public string ApiKey { get; set; }
+
         private ServiceHost _statusService { get; set; }
 
         private FileSystemWatcher _watcher;
@@ -58,6 +65,8 @@ namespace Pithos.Core
             }
         }
 
+        public string RootPath { get; set; }
+
 
         CancellationTokenSource _cancellationSource;
 
@@ -76,16 +85,35 @@ namespace Pithos.Core
             }
             _cancellationSource = new CancellationTokenSource();
 
-            string path = Settings.PithosPath;
             var proxyUri = ProxyFromSettings();            
             CloudClient.Proxy = proxyUri;
-            StartMonitoringFiles(path);
+            CloudClient.UsePithos = this.UsePithos;
+            EnsurePithosContainers();
+            StatusKeeper.StartProcessing(_cancellationSource.Token);
+            IndexLocalFiles(RootPath);
+            StartMonitoringFiles(RootPath);
 
             StartStatusService();
 
             StartNetwork();
         }
 
+        private void EnsurePithosContainers()
+        {
+            CloudClient.UsePithos = this.UsePithos;
+            CloudClient.AuthenticationUrl = this.AuthenticationUrl;
+            CloudClient.Authenticate(UserName, ApiKey);
+
+            var pithosContainers = new[] {PithosContainer, TrashContainer};
+            foreach (var container in pithosContainers)
+            {
+                if (!CloudClient.ContainerExists(container))
+                    CloudClient.CreateContainer(container);                
+            }
+        }
+
+        public string AuthenticationUrl { get; set; }
+
         private Uri ProxyFromSettings()
         {            
             if (Settings.UseManualProxy)
@@ -105,6 +133,48 @@ namespace Pithos.Core
             return null;
         }
 
+        private void IndexLocalFiles(string path)
+        {
+            Trace.TraceInformation("[START] Inxed Local");
+            try
+            {
+                var files =
+                    from filePath in Directory.EnumerateFiles(path, "*", SearchOption.AllDirectories).AsParallel()
+                    select filePath;
+                StatusKeeper.StoreUnversionedFiles(files);
+
+                RestartInterruptedFiles();
+            }
+            catch (Exception exc)
+            {
+                Trace.TraceError("[ERROR] Index Local - {0}", exc);
+            }
+            finally
+            {
+                Trace.TraceInformation("[END] Inxed Local");
+            }
+        }
+
+        private void RestartInterruptedFiles()
+        {
+            var interruptedStates = new[] { FileOverlayStatus.Unversioned, FileOverlayStatus.Modified };
+            var filesQuery = from state in FileState.Queryable
+                             where interruptedStates.Contains(state.OverlayStatus)
+                             select new WorkflowState
+                                      {
+                                          Path = state.FilePath.ToLower(),
+                                          FileName = Path.GetFileName(state.FilePath).ToLower(),
+                                          Status=state.OverlayStatus==FileOverlayStatus.Unversioned?
+                                                            FileStatus.Created:
+                                                            FileStatus.Modified,
+                                          TriggeringChange = state.OverlayStatus==FileOverlayStatus.Unversioned?
+                                                            WatcherChangeTypes.Created:
+                                                            WatcherChangeTypes.Changed
+                                      };
+            _uploadEvents.AddFromEnumerable(filesQuery,false);           
+            
+        }
+
         private void StartStatusService()
         {
             // Create a ServiceHost for the CalculatorService type and provide the base address.
@@ -156,10 +226,11 @@ namespace Pithos.Core
 
             try
             {
-                
-                CloudClient.Authenticate(Settings.UserName, Settings.ApiKey);
+                CloudClient.UsePithos = this.UsePithos;
+                CloudClient.AuthenticationUrl = this.AuthenticationUrl;
+                CloudClient.Authenticate(UserName, ApiKey);
 
-                StartListening();
+                StartListening(RootPath);
                 StartSending();
             }
             catch (Exception)
@@ -170,6 +241,8 @@ namespace Pithos.Core
             }
         }
 
+        public bool UsePithos { get; set; }
+
         internal enum CloudActionType
         {
             Upload=0,
@@ -193,7 +266,7 @@ namespace Pithos.Core
                 Action = action;
                 LocalFile = localFile;
                 CloudFile = cloudFile;
-                LocalHash=new Lazy<string>(()=>CalculateHash(LocalFile.FullName),LazyThreadSafetyMode.ExecutionAndPublication);
+                LocalHash=new Lazy<string>(()=>Signature.CalculateHash(LocalFile.FullName),LazyThreadSafetyMode.ExecutionAndPublication);
             }
             
         }
@@ -223,120 +296,134 @@ namespace Pithos.Core
             }
         }
 
-        private BlockingCollection<ListenerAction> _listenerActions=new BlockingCollection<ListenerAction>();
+        private BlockingCollection<ListenerAction> _networkActions=new BlockingCollection<ListenerAction>();
 
         private Timer timer;
 
-        private void StartListening()
+        private void StartListening(string accountPath)
         {
             
-            Func<Task> listener = ()=>Task.Factory.StartNew(()=>CloudClient.ListObjects("PITHOS"))
+            ProcessRemoteFiles(accountPath);
+
+            Task.Factory.StartNew(ProcessListenerActions);
+                        
+        }
+
+        private Task ProcessRemoteFiles(string accountPath)
+        {
+            Trace.TraceInformation("[LISTENER] Scheduled");    
+            return Task.Factory.StartNewDelayed(10000)
+                .ContinueWith(t=>CloudClient.ListObjects(PithosContainer))
                 .ContinueWith(task =>
                                   {
-                                      
-                                      var objects = task.Result;
-                                      if (objects.Count == 0)
+                                      Trace.TraceInformation("[LISTENER] Start Processing");
+
+                                      var remoteObjects = task.Result;
+/*
+                                      if (remoteObjects.Count == 0)
                                           return;
+*/
 
-                                      var pithosDir = new DirectoryInfo(Settings.PithosPath);
+                                      var pithosDir = new DirectoryInfo(accountPath);
                                       
-                                      var upFiles = from info in objects
-                                                    select info.Name;
+                                      var remoteFiles = from info in remoteObjects
+                                                    select info.Name.ToLower();
                                       
                                       var onlyLocal = from localFile in pithosDir.EnumerateFiles()
-                                                      where !upFiles.Contains(localFile.Name) 
-                                                      select new ListenerAction(CloudActionType.UploadUnconditional, localFile,null);
-                                      
-                                      
-                                    
+                                                      where !remoteFiles.Contains(localFile.Name.ToLower()) 
+                                                      select new ListenerAction(CloudActionType.UploadUnconditional, localFile,ObjectInfo.Empty);
+
+
 
-                                      var localNames =pithosDir.EnumerateFiles().Select(info => info.Name);
-                                      var onlyRemote = from upFile in objects
-                                                       where !localNames.Contains(upFile.Name)
-                                                       select new ListenerAction(CloudActionType.DownloadUnconditional,null,upFile);
+                                      var localNames = from info in pithosDir.EnumerateFiles()
+                                                           select info.Name.ToLower();
 
+                                      var onlyRemote = from upFile in remoteObjects
+                                                       where !localNames.Contains(upFile.Name.ToLower())
+                                                       select new ListenerAction(CloudActionType.DownloadUnconditional,new FileInfo(""), upFile);
 
-                                      var existingObjects = from  upFile in objects
+
+                                      var commonObjects = from  upFile in remoteObjects
                                                             join  localFile in pithosDir.EnumerateFiles()
-                                                            on upFile.Name equals localFile.Name 
-                                                       select new ListenerAction(CloudActionType.Download, localFile, upFile);
+                                                                on upFile.Name.ToLower() equals localFile.Name.ToLower() 
+                                                            select new ListenerAction(CloudActionType.Download, localFile, upFile);
 
                                       var uniques =
-                                          onlyLocal.Union(onlyRemote).Union(existingObjects)
-                                          .Except(_listenerActions,new LocalFileComparer());
-
-                                      _listenerActions.AddFromEnumerable(uniques, false);
-                                     
-                                 }
-                );
+                                          onlyLocal.Union(onlyRemote).Union(commonObjects)
+                                              .Except(_networkActions,new LocalFileComparer());
+                                      
+                                      _networkActions.AddFromEnumerable(uniques, false);
 
-            Task.Factory.StartNew(() =>
-                                      {
-                                          foreach (var action in _listenerActions.GetConsumingEnumerable())
-                                          {
-                                              var localFile = action.LocalFile;
-                                              var cloudFile = action.CloudFile;
-                                              var downloadPath = (cloudFile==null)? String.Empty:Path.Combine(Settings.PithosPath,cloudFile.Name);
-                                              try
-                                              {
-                                                  switch (action.Action)
-                                                  {
-                                                      case CloudActionType.UploadUnconditional:
-
-                                                          UploadCloudFile(localFile.Name, localFile.Length,
-                                                                          localFile.FullName, action.LocalHash.Value);
-                                                          break;
-                                                      case CloudActionType.DownloadUnconditional:
-                                                          DownloadCloudFile("PITHOS", cloudFile.Name, downloadPath);
-                                                          break;
-                                                      case CloudActionType.Download:
-                                                          if (File.Exists(downloadPath))
-                                                          {
-                                                              if (cloudFile.Hash != action.LocalHash.Value)
-                                                              {
-                                                                  var lastLocalTime = localFile.LastWriteTime;
-                                                                  var lastUpTime = cloudFile.Last_Modified;
-                                                                  if (lastUpTime <= lastLocalTime)
-                                                                  {
-                                                                      //Files in conflict
-                                                                      StatusKeeper.SetFileOverlayStatus(downloadPath,
-                                                                                                        FileOverlayStatus
-                                                                                                            .Conflict);
-                                                                  }
-                                                                  else
-                                                                      DownloadCloudFile("PITHOS", action.CloudFile.Name,
-                                                                                        downloadPath);
-                                                              }
-                                                          }
-                                                          else
-                                                              DownloadCloudFile("PITHOS", action.CloudFile.Name,
-                                                                                downloadPath);
-                                                          break;
-                                                  }
-                                              }
-                                              catch (Exception exc)
-                                              {
-                                                  Debug.WriteLine("Processing of {0}:{1}->{2} failed. Putting it back in the queue",action.Action,action.LocalFile,action.CloudFile);
-                                                  Debug.WriteLine(exc.ToString());
-                                                  _listenerActions.Add(action);
-                                              }
-                                          }
-                                      }
-                );
-            
-            timer = new Timer(o => listener(), null, TimeSpan.Zero, TimeSpan.FromSeconds(10));
-            
+                                      Trace.TraceInformation("[LISTENER] End Processing");
+                                      
+                                  }
+                ).ContinueWith(t=>
+                {
+                    if (t.IsFaulted)
+                    {
+                        Trace.TraceError("[LISTENER] Exception: {0}",t.Exception);                                           
+                    }
+                    else
+                    {
+                        Trace.TraceInformation("[LISTENER] Finished");                                           
+                    }                    
+                    ProcessRemoteFiles(accountPath);
+                });
         }
 
-        private void DownloadCloudFile(string container, string fileName, string localPath)
+        private void ProcessListenerActions()
         {
-            using (var upstream = CloudClient.GetObject(container, fileName))
-            using (var fileStream = File.OpenWrite(localPath))
+            foreach(var action in _networkActions.GetConsumingEnumerable())
             {
-                upstream.CopyTo(fileStream);
+                Trace.TraceInformation("[ACTION] Start Processing {0}:{1}->{2}",action.Action,action.LocalFile,action.CloudFile.Name);
+                var localFile = action.LocalFile;
+                var cloudFile = action.CloudFile;
+                var downloadPath = (cloudFile == null)? String.Empty
+                                        : Path.Combine(RootPath,cloudFile.Name);
+                try
+                {
+                    switch (action.Action)
+                    {
+                        case CloudActionType.UploadUnconditional:
+                            UploadCloudFile(localFile.Name,localFile.Length,localFile.FullName,action.LocalHash.Value);
+                            break;
+                        case CloudActionType.DownloadUnconditional:
+                            DownloadCloudFile(PithosContainer,cloudFile.Name,downloadPath);
+                            break;
+                        case CloudActionType.Download:
+                            if (File.Exists(downloadPath))
+                            {
+                                if (cloudFile.Hash !=action.LocalHash.Value)
+                                {
+                                    var lastLocalTime =localFile.LastWriteTime;
+                                    var lastUpTime =cloudFile.Last_Modified;
+                                    if (lastUpTime <=lastLocalTime)
+                                    {
+                                        //Files in conflict
+                                        StatusKeeper.SetFileOverlayStatus(downloadPath,FileOverlayStatus.Conflict);
+                                    }
+                                    else
+                                        DownloadCloudFile(PithosContainer,action.CloudFile.Name,downloadPath);
+                                }
+                            }
+                            else
+                                DownloadCloudFile(PithosContainer,action.CloudFile.Name,downloadPath);
+                            break;
+                    }
+                    Trace.TraceInformation("[ACTION] End Processing {0}:{1}->{2}", action.Action, action.LocalFile, action.CloudFile.Name);
+                }
+                catch (Exception exc)
+                {
+                    Trace.TraceError("[REQUEUE] {0} : {1} -> {2} due to exception\r\n{3}",
+                                    action.Action, action.LocalFile,action.CloudFile,exc);                    
+
+                    _networkActions.Add(action);
+                }
             }
         }
 
+      
+
         private void StartMonitoringFiles(string path)
         {
             _watcher = new FileSystemWatcher(path);
@@ -346,26 +433,39 @@ namespace Pithos.Core
             _watcher.Renamed += OnRenameEvent;
             _watcher.EnableRaisingEvents = true;
 
-            Task.Factory.StartNew(() =>
-                                      {
-                                          foreach (var state in _fileEvents.GetConsumingEnumerable())
-                                          {
-                                              try
-                                              {
-                                                  UpdateFileStatus(state);
-                                                  UpdateOverlayStatus(state);
-                                                  UpdateFileChecksum(state);
-                                                  _uploadEvents.Add(state);                                                  
-                                              }
-                                              catch (OperationCanceledException)
-                                              {
-                                                  throw;
-                                              }
-                                              catch(Exception ex)
-                                              {}
-                                          }
-                                          
-                                      },_cancellationSource.Token);
+            Task.Factory.StartNew(ProcesFileEvents,_cancellationSource.Token);
+        }
+
+        private void ProcesFileEvents()
+        {
+            foreach (var state in _fileEvents.GetConsumingEnumerable())
+            {
+                try
+                {
+                    var networkState=StatusKeeper.GetNetworkState(state.Path);
+                    //Skip if the file is already being downloaded or uploaded and 
+                    //the change is create or modify
+                    if (networkState != NetworkState.None && 
+                        (
+                            state.TriggeringChange==WatcherChangeTypes.Created ||
+                            state.TriggeringChange==WatcherChangeTypes.Changed
+                        ))
+                        continue;
+                    UpdateFileStatus(state);
+                    UpdateOverlayStatus(state);
+                    UpdateFileChecksum(state);
+                    _uploadEvents.Add(state);
+                }
+                catch (OperationCanceledException exc)
+                {
+                    Trace.TraceError("[ERROR] File Event Processing:\r{0}", exc);
+                    throw;
+                }
+                catch (Exception exc)
+                {
+                    Trace.TraceError("[ERROR] File Event Processing:\r{0}",exc);
+                }
+            }
         }
 
         private void StartSending()
@@ -383,7 +483,9 @@ namespace Pithos.Core
                                                   throw;
                                               }
                                               catch(Exception ex)
-                                              {}
+                                              {
+                                                  Trace.TraceError("[ERROR] Synch for {0}:\r{1}",state.FileName,ex);
+                                              }
                                           }
                                           
                                       },_cancellationSource.Token);
@@ -394,9 +496,17 @@ namespace Pithos.Core
         {
             if (state.Skip)
                 return state;
-            string path = state.Path;
+            string path = state.Path.ToLower();            
             string fileName = Path.GetFileName(path);
 
+            //Bypass deleted files, unless the status is Deleted
+            if (!(File.Exists(path) || state.Status == FileStatus.Deleted))
+            {
+                state.Skip = true;
+                this.StatusKeeper.RemoveFileOverlayStatus(path);
+                return state;
+            }
+
             switch(state.Status)
             {
                 case FileStatus.Created:
@@ -417,11 +527,11 @@ namespace Pithos.Core
 
         private void RenameCloudFile(WorkflowState state)
         {
-            this.StatusKeeper.SetFileOverlayStatus(state.Path, FileOverlayStatus.Synch);
+            this.StatusKeeper.SetFileOverlayStatus(state.Path, FileOverlayStatus.Modified);
 
 
 
-            CloudClient.MoveObject("PITHOS", state.OldFileName, state.FileName);
+            CloudClient.MoveObject(PithosContainer, state.OldFileName,PithosContainer, state.FileName);
 
             this.StatusKeeper.SetFileStatus(state.Path, FileStatus.Unchanged);
             this.StatusKeeper.SetFileOverlayStatus(state.Path, FileOverlayStatus.Normal);
@@ -432,26 +542,62 @@ namespace Pithos.Core
         {
             Contract.Requires(!Path.IsPathRooted(fileName));
 
-            this.StatusKeeper.SetFileOverlayStatus(fileName, FileOverlayStatus.Synch);
-            CloudClient.DeleteObject("PITHOS", fileName);
+            this.StatusKeeper.SetFileOverlayStatus(fileName, FileOverlayStatus.Modified);
+
+            CloudClient.MoveObject(PithosContainer,fileName,TrashContainer,fileName);
             this.StatusKeeper.ClearFileStatus(fileName);
             this.StatusKeeper.RemoveFileOverlayStatus(fileName);            
         }
 
+        private void DownloadCloudFile(string container, string fileName, string localPath)
+        {
+            var state = StatusKeeper.GetNetworkState(fileName);
+            //Abort if the file is already being uploaded or downloaded
+            if (state != NetworkState.None)
+                return;
+
+            StatusKeeper.SetNetworkState(localPath,NetworkState.Downloading);
+            CloudClient.GetObject(container, fileName, localPath)
+            .ContinueWith(t=>
+                CloudClient.GetObjectInfo(container,fileName))
+            .ContinueWith(t=>
+                StatusKeeper.StoreInfo(fileName,t.Result))
+            .ContinueWith(t=>
+                StatusKeeper.SetNetworkState(localPath,NetworkState.None))
+            .Wait();
+        }
+
         private void UploadCloudFile(string fileName, long fileSize, string path,string hash)
         {
             Contract.Requires(!Path.IsPathRooted(fileName));
-            //Even if GetObjectInfo times out, we can proceed with the upload
-            var info=CloudClient.GetObjectInfo("PITHOS", fileName);
-            if ( hash != info.Hash)
+            var state=StatusKeeper.GetNetworkState(fileName);
+            //Abort if the file is already being uploaded or downloaded
+            if (state != NetworkState.None)
+                return;
+
+            StatusKeeper.SetNetworkState(fileName,NetworkState.Uploading);
+            
+            //Even if GetObjectInfo times out, we can proceed with the upload            
+            var info = CloudClient.GetObjectInfo(PithosContainer, fileName);
+            Task.Factory.StartNew(() =>
             {
-                this.StatusKeeper.SetFileOverlayStatus(path, FileOverlayStatus.Synch);
-                
-                    CloudClient.PutObject("PITHOS", fileName, path);
-                
-            }
-            this.StatusKeeper.SetFileStatus(path,FileStatus.Unchanged);
-            this.StatusKeeper.SetFileOverlayStatus(path,FileOverlayStatus.Normal);
+                if (hash != info.Hash)
+                {
+                    Task.Factory.StartNew(() => 
+                        this.StatusKeeper.SetFileOverlayStatus(path, FileOverlayStatus.Modified))
+                    .ContinueWith(t => 
+                        CloudClient.PutObject(PithosContainer, fileName, path, hash));
+                }
+                else
+                {
+                    this.StatusKeeper.StoreInfo(path,info);
+                }
+            })
+            .ContinueWith(t => 
+                this.StatusKeeper.SetFileState(path, FileStatus.Unchanged, FileOverlayStatus.Normal))
+                .ContinueWith(t=>
+                    this.StatusKeeper.SetNetworkState(path,NetworkState.None))
+            .Wait();
             Workflow.RaiseChangeNotification(path);
         }
 
@@ -521,7 +667,7 @@ namespace Pithos.Core
                 return state;
 
             string path = state.Path;
-            string hash = CalculateHash(path);
+            string hash = Signature.CalculateHash(path);
 
             StatusKeeper.UpdateFileChecksum(path, hash);
 
@@ -529,21 +675,7 @@ namespace Pithos.Core
             return state;
         }
 
-        private static string CalculateHash(string path)
-        {
-            string hash;
-            using (var hasher = MD5.Create())
-            using (var stream = new FileStream(path, FileMode.Open, FileAccess.Read, FileShare.ReadWrite, 4096, true))
-            {
-                var hashBytes = hasher.ComputeHash(stream);
-                var hashBuilder = new StringBuilder();
-                foreach (byte b in hasher.ComputeHash(stream))
-                    hashBuilder.Append(b.ToString("x2").ToLower());
-                hash = hashBuilder.ToString();
-
-            }
-            return hash;
-        }
+       
 
         private FileSystemEventArgs CalculateSignature(FileSystemEventArgs arg)
         {