Changed the retry function in PithosClient to use the TPL
[pithos-ms-client] / trunk / Pithos.Core / PithosMonitor.cs
index 425d0fe..831c082 100644 (file)
@@ -22,6 +22,9 @@ namespace Pithos.Core
     [Export(typeof(PithosMonitor))]
     public class PithosMonitor:IDisposable
     {
+        private const string PithosContainer = "pithos";
+        private const string TrashContainer = "trash";
+
         [Import]
         public IPithosSettings Settings{get;set;}
 
@@ -84,7 +87,9 @@ namespace Pithos.Core
 
             var proxyUri = ProxyFromSettings();            
             CloudClient.Proxy = proxyUri;
-            //CloudClient.UsePithos = true;
+            CloudClient.UsePithos = this.UsePithos;
+            EnsurePithosContainers();
+            StatusKeeper.StartProcessing(_cancellationSource.Token);
             IndexLocalFiles(RootPath);
             StartMonitoringFiles(RootPath);
 
@@ -93,6 +98,22 @@ namespace Pithos.Core
             StartNetwork();
         }
 
+        private void EnsurePithosContainers()
+        {
+            CloudClient.UsePithos = this.UsePithos;
+            CloudClient.AuthenticationUrl = this.AuthenticationUrl;
+            CloudClient.Authenticate(UserName, ApiKey);
+
+            var pithosContainers = new[] {PithosContainer, TrashContainer};
+            foreach (var container in pithosContainers)
+            {
+                if (!CloudClient.ContainerExists(container))
+                    CloudClient.CreateContainer(container);                
+            }
+        }
+
+        public string AuthenticationUrl { get; set; }
+
         private Uri ProxyFromSettings()
         {            
             if (Settings.UseManualProxy)
@@ -136,7 +157,7 @@ namespace Pithos.Core
 
         private void RestartInterruptedFiles()
         {
-            var interruptedStates = new[] { FileOverlayStatus.Unversioned, FileOverlayStatus.Synch };
+            var interruptedStates = new[] { FileOverlayStatus.Unversioned, FileOverlayStatus.Modified };
             var filesQuery = from state in FileState.Queryable
                              where interruptedStates.Contains(state.OverlayStatus)
                              select new WorkflowState
@@ -205,7 +226,8 @@ namespace Pithos.Core
 
             try
             {
-                CloudClient.UsePithos = false;
+                CloudClient.UsePithos = this.UsePithos;
+                CloudClient.AuthenticationUrl = this.AuthenticationUrl;
                 CloudClient.Authenticate(UserName, ApiKey);
 
                 StartListening(RootPath);
@@ -219,6 +241,8 @@ namespace Pithos.Core
             }
         }
 
+        public bool UsePithos { get; set; }
+
         internal enum CloudActionType
         {
             Upload=0,
@@ -272,7 +296,7 @@ namespace Pithos.Core
             }
         }
 
-        private BlockingCollection<ListenerAction> _listenerActions=new BlockingCollection<ListenerAction>();
+        private BlockingCollection<ListenerAction> _networkActions=new BlockingCollection<ListenerAction>();
 
         private Timer timer;
 
@@ -289,7 +313,7 @@ namespace Pithos.Core
         {
             Trace.TraceInformation("[LISTENER] Scheduled");    
             return Task.Factory.StartNewDelayed(10000)
-                .ContinueWith(t=>CloudClient.ListObjects("PITHOS"))
+                .ContinueWith(t=>CloudClient.ListObjects(PithosContainer))
                 .ContinueWith(task =>
                                   {
                                       Trace.TraceInformation("[LISTENER] Start Processing");
@@ -303,31 +327,32 @@ namespace Pithos.Core
                                       var pithosDir = new DirectoryInfo(accountPath);
                                       
                                       var remoteFiles = from info in remoteObjects
-                                                    select info.Name;
+                                                    select info.Name.ToLower();
                                       
                                       var onlyLocal = from localFile in pithosDir.EnumerateFiles()
-                                                      where !remoteFiles.Contains(localFile.Name) 
-                                                      select new ListenerAction(CloudActionType.UploadUnconditional, localFile,null);
-                                      
-                                      
-                                    
+                                                      where !remoteFiles.Contains(localFile.Name.ToLower()) 
+                                                      select new ListenerAction(CloudActionType.UploadUnconditional, localFile,ObjectInfo.Empty);
+
+
+
+                                      var localNames = from info in pithosDir.EnumerateFiles()
+                                                           select info.Name.ToLower();
 
-                                      var localNames =pithosDir.EnumerateFiles().Select(info => info.Name);
                                       var onlyRemote = from upFile in remoteObjects
-                                                       where !localNames.Contains(upFile.Name)
-                                                       select new ListenerAction(CloudActionType.DownloadUnconditional,null,upFile);
+                                                       where !localNames.Contains(upFile.Name.ToLower())
+                                                       select new ListenerAction(CloudActionType.DownloadUnconditional,new FileInfo(""), upFile);
 
 
                                       var commonObjects = from  upFile in remoteObjects
                                                             join  localFile in pithosDir.EnumerateFiles()
-                                                                on upFile.Name equals localFile.Name 
+                                                                on upFile.Name.ToLower() equals localFile.Name.ToLower() 
                                                             select new ListenerAction(CloudActionType.Download, localFile, upFile);
 
                                       var uniques =
                                           onlyLocal.Union(onlyRemote).Union(commonObjects)
-                                              .Except(_listenerActions,new LocalFileComparer());
+                                              .Except(_networkActions,new LocalFileComparer());
                                       
-                                      _listenerActions.AddFromEnumerable(uniques, false);
+                                      _networkActions.AddFromEnumerable(uniques, false);
 
                                       Trace.TraceInformation("[LISTENER] End Processing");
                                       
@@ -348,9 +373,9 @@ namespace Pithos.Core
 
         private void ProcessListenerActions()
         {
-            foreach(var action in _listenerActions.GetConsumingEnumerable())
+            foreach(var action in _networkActions.GetConsumingEnumerable())
             {
-                Trace.TraceInformation("[ACTION] Start Processing {0}:{1}->{2}",action.Action,action.LocalFile,action.CloudFile);
+                Trace.TraceInformation("[ACTION] Start Processing {0}:{1}->{2}",action.Action,action.LocalFile,action.CloudFile.Name);
                 var localFile = action.LocalFile;
                 var cloudFile = action.CloudFile;
                 var downloadPath = (cloudFile == null)? String.Empty
@@ -360,11 +385,10 @@ namespace Pithos.Core
                     switch (action.Action)
                     {
                         case CloudActionType.UploadUnconditional:
-
                             UploadCloudFile(localFile.Name,localFile.Length,localFile.FullName,action.LocalHash.Value);
                             break;
                         case CloudActionType.DownloadUnconditional:
-                            DownloadCloudFile("PITHOS",cloudFile.Name,downloadPath);
+                            DownloadCloudFile(PithosContainer,cloudFile.Name,downloadPath);
                             break;
                         case CloudActionType.Download:
                             if (File.Exists(downloadPath))
@@ -379,33 +403,26 @@ namespace Pithos.Core
                                         StatusKeeper.SetFileOverlayStatus(downloadPath,FileOverlayStatus.Conflict);
                                     }
                                     else
-                                        DownloadCloudFile("PITHOS",action.CloudFile.Name,downloadPath);
+                                        DownloadCloudFile(PithosContainer,action.CloudFile.Name,downloadPath);
                                 }
                             }
                             else
-                                DownloadCloudFile("PITHOS",action.CloudFile.Name,downloadPath);
+                                DownloadCloudFile(PithosContainer,action.CloudFile.Name,downloadPath);
                             break;
                     }
-                    Trace.TraceInformation("[ACTION] End Processing {0}:{1}->{2}", action.Action, action.LocalFile, action.CloudFile);
+                    Trace.TraceInformation("[ACTION] End Processing {0}:{1}->{2}", action.Action, action.LocalFile, action.CloudFile.Name);
                 }
                 catch (Exception exc)
                 {
                     Trace.TraceError("[REQUEUE] {0} : {1} -> {2} due to exception\r\n{3}",
                                     action.Action, action.LocalFile,action.CloudFile,exc);                    
 
-                    _listenerActions.Add(action);
+                    _networkActions.Add(action);
                 }
             }
         }
 
-        private void DownloadCloudFile(string container, string fileName, string localPath)
-        {
-            using (var upstream = CloudClient.GetObject(container, fileName))
-            using (var fileStream = File.OpenWrite(localPath))
-            {
-                upstream.CopyTo(fileStream);
-            }
-        }
+      
 
         private void StartMonitoringFiles(string path)
         {
@@ -416,26 +433,39 @@ namespace Pithos.Core
             _watcher.Renamed += OnRenameEvent;
             _watcher.EnableRaisingEvents = true;
 
-            Task.Factory.StartNew(() =>
-                                      {
-                                          foreach (var state in _fileEvents.GetConsumingEnumerable())
-                                          {
-                                              try
-                                              {
-                                                  UpdateFileStatus(state);
-                                                  UpdateOverlayStatus(state);
-                                                  UpdateFileChecksum(state);
-                                                  _uploadEvents.Add(state);                                                  
-                                              }
-                                              catch (OperationCanceledException)
-                                              {
-                                                  throw;
-                                              }
-                                              catch(Exception ex)
-                                              {}
-                                          }
-                                          
-                                      },_cancellationSource.Token);
+            Task.Factory.StartNew(ProcesFileEvents,_cancellationSource.Token);
+        }
+
+        private void ProcesFileEvents()
+        {
+            foreach (var state in _fileEvents.GetConsumingEnumerable())
+            {
+                try
+                {
+                    var networkState=StatusKeeper.GetNetworkState(state.Path);
+                    //Skip if the file is already being downloaded or uploaded and 
+                    //the change is create or modify
+                    if (networkState != NetworkState.None && 
+                        (
+                            state.TriggeringChange==WatcherChangeTypes.Created ||
+                            state.TriggeringChange==WatcherChangeTypes.Changed
+                        ))
+                        continue;
+                    UpdateFileStatus(state);
+                    UpdateOverlayStatus(state);
+                    UpdateFileChecksum(state);
+                    _uploadEvents.Add(state);
+                }
+                catch (OperationCanceledException exc)
+                {
+                    Trace.TraceError("[ERROR] File Event Processing:\r{0}", exc);
+                    throw;
+                }
+                catch (Exception exc)
+                {
+                    Trace.TraceError("[ERROR] File Event Processing:\r{0}",exc);
+                }
+            }
         }
 
         private void StartSending()
@@ -466,9 +496,17 @@ namespace Pithos.Core
         {
             if (state.Skip)
                 return state;
-            string path = state.Path;
+            string path = state.Path.ToLower();            
             string fileName = Path.GetFileName(path);
 
+            //Bypass deleted files, unless the status is Deleted
+            if (!(File.Exists(path) || state.Status == FileStatus.Deleted))
+            {
+                state.Skip = true;
+                this.StatusKeeper.RemoveFileOverlayStatus(path);
+                return state;
+            }
+
             switch(state.Status)
             {
                 case FileStatus.Created:
@@ -489,11 +527,11 @@ namespace Pithos.Core
 
         private void RenameCloudFile(WorkflowState state)
         {
-            this.StatusKeeper.SetFileOverlayStatus(state.Path, FileOverlayStatus.Synch);
+            this.StatusKeeper.SetFileOverlayStatus(state.Path, FileOverlayStatus.Modified);
 
 
 
-            CloudClient.MoveObject("PITHOS", state.OldFileName, state.FileName);
+            CloudClient.MoveObject(PithosContainer, state.OldFileName,PithosContainer, state.FileName);
 
             this.StatusKeeper.SetFileStatus(state.Path, FileStatus.Unchanged);
             this.StatusKeeper.SetFileOverlayStatus(state.Path, FileOverlayStatus.Normal);
@@ -504,32 +542,61 @@ namespace Pithos.Core
         {
             Contract.Requires(!Path.IsPathRooted(fileName));
 
-            this.StatusKeeper.SetFileOverlayStatus(fileName, FileOverlayStatus.Synch);
-            CloudClient.DeleteObject("PITHOS", fileName);
+            this.StatusKeeper.SetFileOverlayStatus(fileName, FileOverlayStatus.Modified);
+
+            CloudClient.MoveObject(PithosContainer,fileName,TrashContainer,fileName);
             this.StatusKeeper.ClearFileStatus(fileName);
             this.StatusKeeper.RemoveFileOverlayStatus(fileName);            
         }
 
+        private void DownloadCloudFile(string container, string fileName, string localPath)
+        {
+            var state = StatusKeeper.GetNetworkState(fileName);
+            //Abort if the file is already being uploaded or downloaded
+            if (state != NetworkState.None)
+                return;
+
+            StatusKeeper.SetNetworkState(localPath,NetworkState.Downloading);
+            CloudClient.GetObject(container, fileName, localPath)
+            .ContinueWith(t=>
+                CloudClient.GetObjectInfo(container,fileName))
+            .ContinueWith(t=>
+                StatusKeeper.StoreInfo(fileName,t.Result))
+            .ContinueWith(t=>
+                StatusKeeper.SetNetworkState(localPath,NetworkState.None))
+            .Wait();
+        }
+
         private void UploadCloudFile(string fileName, long fileSize, string path,string hash)
         {
             Contract.Requires(!Path.IsPathRooted(fileName));
-            //Even if GetObjectInfo times out, we can proceed with the upload
-            var info = CloudClient.GetObjectInfo("PITHOS", fileName);
+            var state=StatusKeeper.GetNetworkState(fileName);
+            //Abort if the file is already being uploaded or downloaded
+            if (state != NetworkState.None)
+                return;
+
+            StatusKeeper.SetNetworkState(fileName,NetworkState.Uploading);
+            
+            //Even if GetObjectInfo times out, we can proceed with the upload            
+            var info = CloudClient.GetObjectInfo(PithosContainer, fileName);
             Task.Factory.StartNew(() =>
             {
                 if (hash != info.Hash)
                 {
                     Task.Factory.StartNew(() => 
-                        this.StatusKeeper.SetFileOverlayStatus(path, FileOverlayStatus.Synch))
+                        this.StatusKeeper.SetFileOverlayStatus(path, FileOverlayStatus.Modified))
                     .ContinueWith(t => 
-                        CloudClient.PutObject("PITHOS", fileName, path, hash));
+                        CloudClient.PutObject(PithosContainer, fileName, path, hash));
+                }
+                else
+                {
+                    this.StatusKeeper.StoreInfo(path,info);
                 }
-            }
-            )
-            .ContinueWith(t =>{
-                        this.StatusKeeper.SetFileStatus(path, FileStatus.Unchanged);
-                        this.StatusKeeper.SetFileOverlayStatus(path, FileOverlayStatus.Normal);
             })
+            .ContinueWith(t => 
+                this.StatusKeeper.SetFileState(path, FileStatus.Unchanged, FileOverlayStatus.Normal))
+                .ContinueWith(t=>
+                    this.StatusKeeper.SetNetworkState(path,NetworkState.None))
             .Wait();
             Workflow.RaiseChangeNotification(path);
         }