Changed the retry function in PithosClient to use the TPL
[pithos-ms-client] / trunk / Pithos.Core / PithosMonitor.cs
index 7a2ea09..831c082 100644 (file)
@@ -12,6 +12,7 @@ using System.ServiceModel.Description;
 using System.Text;
 using System.Threading;
 using System.Threading.Tasks;
+using Castle.ActiveRecord.Queries;
 using Microsoft.WindowsAPICodePack.Net;
 using Pithos.Interfaces;
 using System.ServiceModel;
@@ -21,6 +22,9 @@ namespace Pithos.Core
     [Export(typeof(PithosMonitor))]
     public class PithosMonitor:IDisposable
     {
+        private const string PithosContainer = "pithos";
+        private const string TrashContainer = "trash";
+
         [Import]
         public IPithosSettings Settings{get;set;}
 
@@ -36,6 +40,9 @@ namespace Pithos.Core
         [Import]
         public ICloudClient CloudListeningClient { get; set; }
 
+        public string UserName { get; set; }
+        public string ApiKey { get; set; }
+
         private ServiceHost _statusService { get; set; }
 
         private FileSystemWatcher _watcher;
@@ -58,6 +65,8 @@ namespace Pithos.Core
             }
         }
 
+        public string RootPath { get; set; }
+
 
         CancellationTokenSource _cancellationSource;
 
@@ -76,17 +85,35 @@ namespace Pithos.Core
             }
             _cancellationSource = new CancellationTokenSource();
 
-            string path = Settings.PithosPath;
             var proxyUri = ProxyFromSettings();            
             CloudClient.Proxy = proxyUri;
-            IndexLocalFiles(path);
-            StartMonitoringFiles(path);
+            CloudClient.UsePithos = this.UsePithos;
+            EnsurePithosContainers();
+            StatusKeeper.StartProcessing(_cancellationSource.Token);
+            IndexLocalFiles(RootPath);
+            StartMonitoringFiles(RootPath);
 
             StartStatusService();
 
             StartNetwork();
         }
 
+        private void EnsurePithosContainers()
+        {
+            CloudClient.UsePithos = this.UsePithos;
+            CloudClient.AuthenticationUrl = this.AuthenticationUrl;
+            CloudClient.Authenticate(UserName, ApiKey);
+
+            var pithosContainers = new[] {PithosContainer, TrashContainer};
+            foreach (var container in pithosContainers)
+            {
+                if (!CloudClient.ContainerExists(container))
+                    CloudClient.CreateContainer(container);                
+            }
+        }
+
+        public string AuthenticationUrl { get; set; }
+
         private Uri ProxyFromSettings()
         {            
             if (Settings.UseManualProxy)
@@ -116,18 +143,7 @@ namespace Pithos.Core
                     select filePath;
                 StatusKeeper.StoreUnversionedFiles(files);
 
-                var newFiles= FileState.FindAllByProperty("OverlayStatus", FileOverlayStatus.Unversioned)
-                            .Select(state=>state.FilePath);
-                foreach (var newFile in newFiles)
-                {
-                    _uploadEvents.Add(new WorkflowState
-                                          {
-                                              Path = newFile,
-                                              FileName = Path.GetFileName(newFile),
-                                              TriggeringChange = WatcherChangeTypes.Created
-                                          });
-                }
-
+                RestartInterruptedFiles();
             }
             catch (Exception exc)
             {
@@ -139,6 +155,26 @@ namespace Pithos.Core
             }
         }
 
+        private void RestartInterruptedFiles()
+        {
+            var interruptedStates = new[] { FileOverlayStatus.Unversioned, FileOverlayStatus.Modified };
+            var filesQuery = from state in FileState.Queryable
+                             where interruptedStates.Contains(state.OverlayStatus)
+                             select new WorkflowState
+                                      {
+                                          Path = state.FilePath.ToLower(),
+                                          FileName = Path.GetFileName(state.FilePath).ToLower(),
+                                          Status=state.OverlayStatus==FileOverlayStatus.Unversioned?
+                                                            FileStatus.Created:
+                                                            FileStatus.Modified,
+                                          TriggeringChange = state.OverlayStatus==FileOverlayStatus.Unversioned?
+                                                            WatcherChangeTypes.Created:
+                                                            WatcherChangeTypes.Changed
+                                      };
+            _uploadEvents.AddFromEnumerable(filesQuery,false);           
+            
+        }
+
         private void StartStatusService()
         {
             // Create a ServiceHost for the CalculatorService type and provide the base address.
@@ -190,10 +226,11 @@ namespace Pithos.Core
 
             try
             {
-                
-                CloudClient.Authenticate(Settings.UserName, Settings.ApiKey);
+                CloudClient.UsePithos = this.UsePithos;
+                CloudClient.AuthenticationUrl = this.AuthenticationUrl;
+                CloudClient.Authenticate(UserName, ApiKey);
 
-                StartListening(Settings.PithosPath);
+                StartListening(RootPath);
                 StartSending();
             }
             catch (Exception)
@@ -204,6 +241,8 @@ namespace Pithos.Core
             }
         }
 
+        public bool UsePithos { get; set; }
+
         internal enum CloudActionType
         {
             Upload=0,
@@ -257,7 +296,7 @@ namespace Pithos.Core
             }
         }
 
-        private BlockingCollection<ListenerAction> _listenerActions=new BlockingCollection<ListenerAction>();
+        private BlockingCollection<ListenerAction> _networkActions=new BlockingCollection<ListenerAction>();
 
         private Timer timer;
 
@@ -274,7 +313,7 @@ namespace Pithos.Core
         {
             Trace.TraceInformation("[LISTENER] Scheduled");    
             return Task.Factory.StartNewDelayed(10000)
-                .ContinueWith(t=>CloudClient.ListObjects("PITHOS"))
+                .ContinueWith(t=>CloudClient.ListObjects(PithosContainer))
                 .ContinueWith(task =>
                                   {
                                       Trace.TraceInformation("[LISTENER] Start Processing");
@@ -288,31 +327,32 @@ namespace Pithos.Core
                                       var pithosDir = new DirectoryInfo(accountPath);
                                       
                                       var remoteFiles = from info in remoteObjects
-                                                    select info.Name;
+                                                    select info.Name.ToLower();
                                       
                                       var onlyLocal = from localFile in pithosDir.EnumerateFiles()
-                                                      where !remoteFiles.Contains(localFile.Name) 
-                                                      select new ListenerAction(CloudActionType.UploadUnconditional, localFile,null);
-                                      
-                                      
-                                    
+                                                      where !remoteFiles.Contains(localFile.Name.ToLower()) 
+                                                      select new ListenerAction(CloudActionType.UploadUnconditional, localFile,ObjectInfo.Empty);
+
+
+
+                                      var localNames = from info in pithosDir.EnumerateFiles()
+                                                           select info.Name.ToLower();
 
-                                      var localNames =pithosDir.EnumerateFiles().Select(info => info.Name);
                                       var onlyRemote = from upFile in remoteObjects
-                                                       where !localNames.Contains(upFile.Name)
-                                                       select new ListenerAction(CloudActionType.DownloadUnconditional,null,upFile);
+                                                       where !localNames.Contains(upFile.Name.ToLower())
+                                                       select new ListenerAction(CloudActionType.DownloadUnconditional,new FileInfo(""), upFile);
 
 
                                       var commonObjects = from  upFile in remoteObjects
                                                             join  localFile in pithosDir.EnumerateFiles()
-                                                                on upFile.Name equals localFile.Name 
+                                                                on upFile.Name.ToLower() equals localFile.Name.ToLower() 
                                                             select new ListenerAction(CloudActionType.Download, localFile, upFile);
 
                                       var uniques =
                                           onlyLocal.Union(onlyRemote).Union(commonObjects)
-                                              .Except(_listenerActions,new LocalFileComparer());
+                                              .Except(_networkActions,new LocalFileComparer());
                                       
-                                      _listenerActions.AddFromEnumerable(uniques, false);
+                                      _networkActions.AddFromEnumerable(uniques, false);
 
                                       Trace.TraceInformation("[LISTENER] End Processing");
                                       
@@ -333,23 +373,22 @@ namespace Pithos.Core
 
         private void ProcessListenerActions()
         {
-            foreach(var action in _listenerActions.GetConsumingEnumerable())
+            foreach(var action in _networkActions.GetConsumingEnumerable())
             {
-                Trace.TraceInformation("[ACTION] Start Processing {0}:{1}->{2}",action.Action,action.LocalFile,action.CloudFile);
+                Trace.TraceInformation("[ACTION] Start Processing {0}:{1}->{2}",action.Action,action.LocalFile,action.CloudFile.Name);
                 var localFile = action.LocalFile;
                 var cloudFile = action.CloudFile;
                 var downloadPath = (cloudFile == null)? String.Empty
-                                        : Path.Combine(Settings.PithosPath,cloudFile.Name);
+                                        : Path.Combine(RootPath,cloudFile.Name);
                 try
                 {
                     switch (action.Action)
                     {
                         case CloudActionType.UploadUnconditional:
-
                             UploadCloudFile(localFile.Name,localFile.Length,localFile.FullName,action.LocalHash.Value);
                             break;
                         case CloudActionType.DownloadUnconditional:
-                            DownloadCloudFile("PITHOS",cloudFile.Name,downloadPath);
+                            DownloadCloudFile(PithosContainer,cloudFile.Name,downloadPath);
                             break;
                         case CloudActionType.Download:
                             if (File.Exists(downloadPath))
@@ -364,34 +403,26 @@ namespace Pithos.Core
                                         StatusKeeper.SetFileOverlayStatus(downloadPath,FileOverlayStatus.Conflict);
                                     }
                                     else
-                                        DownloadCloudFile("PITHOS",action.CloudFile.Name,downloadPath);
+                                        DownloadCloudFile(PithosContainer,action.CloudFile.Name,downloadPath);
                                 }
                             }
                             else
-                                DownloadCloudFile("PITHOS",action.CloudFile.Name,downloadPath);
+                                DownloadCloudFile(PithosContainer,action.CloudFile.Name,downloadPath);
                             break;
                     }
-                    Trace.TraceInformation("[ACTION] End Processing {0}:{1}->{2}", action.Action, action.LocalFile, action.CloudFile);
+                    Trace.TraceInformation("[ACTION] End Processing {0}:{1}->{2}", action.Action, action.LocalFile, action.CloudFile.Name);
                 }
                 catch (Exception exc)
                 {
                     Trace.TraceError("[REQUEUE] {0} : {1} -> {2} due to exception\r\n{3}",
-                                    action.Action, action.LocalFile,action.CloudFile,exc);
-                    Trace.TraceError(exc.ToString());
+                                    action.Action, action.LocalFile,action.CloudFile,exc);                    
 
-                    _listenerActions.Add(action);
+                    _networkActions.Add(action);
                 }
             }
         }
 
-        private void DownloadCloudFile(string container, string fileName, string localPath)
-        {
-            using (var upstream = CloudClient.GetObject(container, fileName))
-            using (var fileStream = File.OpenWrite(localPath))
-            {
-                upstream.CopyTo(fileStream);
-            }
-        }
+      
 
         private void StartMonitoringFiles(string path)
         {
@@ -402,26 +433,39 @@ namespace Pithos.Core
             _watcher.Renamed += OnRenameEvent;
             _watcher.EnableRaisingEvents = true;
 
-            Task.Factory.StartNew(() =>
-                                      {
-                                          foreach (var state in _fileEvents.GetConsumingEnumerable())
-                                          {
-                                              try
-                                              {
-                                                  UpdateFileStatus(state);
-                                                  UpdateOverlayStatus(state);
-                                                  UpdateFileChecksum(state);
-                                                  _uploadEvents.Add(state);                                                  
-                                              }
-                                              catch (OperationCanceledException)
-                                              {
-                                                  throw;
-                                              }
-                                              catch(Exception ex)
-                                              {}
-                                          }
-                                          
-                                      },_cancellationSource.Token);
+            Task.Factory.StartNew(ProcesFileEvents,_cancellationSource.Token);
+        }
+
+        private void ProcesFileEvents()
+        {
+            foreach (var state in _fileEvents.GetConsumingEnumerable())
+            {
+                try
+                {
+                    var networkState=StatusKeeper.GetNetworkState(state.Path);
+                    //Skip if the file is already being downloaded or uploaded and 
+                    //the change is create or modify
+                    if (networkState != NetworkState.None && 
+                        (
+                            state.TriggeringChange==WatcherChangeTypes.Created ||
+                            state.TriggeringChange==WatcherChangeTypes.Changed
+                        ))
+                        continue;
+                    UpdateFileStatus(state);
+                    UpdateOverlayStatus(state);
+                    UpdateFileChecksum(state);
+                    _uploadEvents.Add(state);
+                }
+                catch (OperationCanceledException exc)
+                {
+                    Trace.TraceError("[ERROR] File Event Processing:\r{0}", exc);
+                    throw;
+                }
+                catch (Exception exc)
+                {
+                    Trace.TraceError("[ERROR] File Event Processing:\r{0}",exc);
+                }
+            }
         }
 
         private void StartSending()
@@ -439,7 +483,9 @@ namespace Pithos.Core
                                                   throw;
                                               }
                                               catch(Exception ex)
-                                              {}
+                                              {
+                                                  Trace.TraceError("[ERROR] Synch for {0}:\r{1}",state.FileName,ex);
+                                              }
                                           }
                                           
                                       },_cancellationSource.Token);
@@ -450,9 +496,17 @@ namespace Pithos.Core
         {
             if (state.Skip)
                 return state;
-            string path = state.Path;
+            string path = state.Path.ToLower();            
             string fileName = Path.GetFileName(path);
 
+            //Bypass deleted files, unless the status is Deleted
+            if (!(File.Exists(path) || state.Status == FileStatus.Deleted))
+            {
+                state.Skip = true;
+                this.StatusKeeper.RemoveFileOverlayStatus(path);
+                return state;
+            }
+
             switch(state.Status)
             {
                 case FileStatus.Created:
@@ -473,11 +527,11 @@ namespace Pithos.Core
 
         private void RenameCloudFile(WorkflowState state)
         {
-            this.StatusKeeper.SetFileOverlayStatus(state.Path, FileOverlayStatus.Synch);
+            this.StatusKeeper.SetFileOverlayStatus(state.Path, FileOverlayStatus.Modified);
 
 
 
-            CloudClient.MoveObject("PITHOS", state.OldFileName, state.FileName);
+            CloudClient.MoveObject(PithosContainer, state.OldFileName,PithosContainer, state.FileName);
 
             this.StatusKeeper.SetFileStatus(state.Path, FileStatus.Unchanged);
             this.StatusKeeper.SetFileOverlayStatus(state.Path, FileOverlayStatus.Normal);
@@ -488,25 +542,62 @@ namespace Pithos.Core
         {
             Contract.Requires(!Path.IsPathRooted(fileName));
 
-            this.StatusKeeper.SetFileOverlayStatus(fileName, FileOverlayStatus.Synch);
-            CloudClient.DeleteObject("PITHOS", fileName);
+            this.StatusKeeper.SetFileOverlayStatus(fileName, FileOverlayStatus.Modified);
+
+            CloudClient.MoveObject(PithosContainer,fileName,TrashContainer,fileName);
             this.StatusKeeper.ClearFileStatus(fileName);
             this.StatusKeeper.RemoveFileOverlayStatus(fileName);            
         }
 
+        private void DownloadCloudFile(string container, string fileName, string localPath)
+        {
+            var state = StatusKeeper.GetNetworkState(fileName);
+            //Abort if the file is already being uploaded or downloaded
+            if (state != NetworkState.None)
+                return;
+
+            StatusKeeper.SetNetworkState(localPath,NetworkState.Downloading);
+            CloudClient.GetObject(container, fileName, localPath)
+            .ContinueWith(t=>
+                CloudClient.GetObjectInfo(container,fileName))
+            .ContinueWith(t=>
+                StatusKeeper.StoreInfo(fileName,t.Result))
+            .ContinueWith(t=>
+                StatusKeeper.SetNetworkState(localPath,NetworkState.None))
+            .Wait();
+        }
+
         private void UploadCloudFile(string fileName, long fileSize, string path,string hash)
         {
             Contract.Requires(!Path.IsPathRooted(fileName));
-            //Even if GetObjectInfo times out, we can proceed with the upload
-            var info=CloudClient.GetObjectInfo("PITHOS", fileName);
-            if ( hash != info.Hash)
-            {
-                this.StatusKeeper.SetFileOverlayStatus(path, FileOverlayStatus.Synch);
+            var state=StatusKeeper.GetNetworkState(fileName);
+            //Abort if the file is already being uploaded or downloaded
+            if (state != NetworkState.None)
+                return;
 
-                CloudClient.PutObject("PITHOS", fileName, path, hash).Wait();
-            }
-            this.StatusKeeper.SetFileStatus(path,FileStatus.Unchanged);
-            this.StatusKeeper.SetFileOverlayStatus(path,FileOverlayStatus.Normal);
+            StatusKeeper.SetNetworkState(fileName,NetworkState.Uploading);
+            
+            //Even if GetObjectInfo times out, we can proceed with the upload            
+            var info = CloudClient.GetObjectInfo(PithosContainer, fileName);
+            Task.Factory.StartNew(() =>
+            {
+                if (hash != info.Hash)
+                {
+                    Task.Factory.StartNew(() => 
+                        this.StatusKeeper.SetFileOverlayStatus(path, FileOverlayStatus.Modified))
+                    .ContinueWith(t => 
+                        CloudClient.PutObject(PithosContainer, fileName, path, hash));
+                }
+                else
+                {
+                    this.StatusKeeper.StoreInfo(path,info);
+                }
+            })
+            .ContinueWith(t => 
+                this.StatusKeeper.SetFileState(path, FileStatus.Unchanged, FileOverlayStatus.Normal))
+                .ContinueWith(t=>
+                    this.StatusKeeper.SetNetworkState(path,NetworkState.None))
+            .Wait();
             Workflow.RaiseChangeNotification(path);
         }