Multiple changes to enable delete detection, safer uploading
[pithos-ms-client] / trunk / Pithos.Core / Agents / WorkflowAgent.cs
index beead9d..6553636 100644 (file)
@@ -6,6 +6,7 @@ using System.Diagnostics.Contracts;
 using System.IO;
 using System.Linq;
 using System.Text;
+using System.Threading.Tasks;
 using Pithos.Interfaces;
 
 namespace Pithos.Core.Agents
@@ -34,99 +35,83 @@ namespace Pithos.Core.Agents
                 loop = () =>
                 {
                     var message = inbox.Receive();
-                    var process = message.ContinueWith(t =>
-                    {
-                        var state = t.Result;
-                        Process(state);
-                        inbox.DoAsync(loop);
-                    });
-                    process.ContinueWith(t =>
-                    {
-                        inbox.DoAsync(loop);
-                        if (t.IsFaulted)
-                        {
-                            var ex = t.Exception.InnerException;
-                            if (ex is OperationCanceledException)
-                                inbox.Stop();
-                            Trace.TraceError("[ERROR] Synch for {0}:\r{1}", message.Result.FileName, ex);
-                        }
-                    });
-
+                    var process = message.Then(Process, inbox.CancellationToken);                        
+                    inbox.LoopAsync(process,loop,ex=>
+                            Trace.TraceError("[ERROR] Synch for {0}:\r{1}", message.Result.FileName, ex));
                 };
                 loop();
             });
         }
 
-        public void RestartInterruptedFiles()
-        {
-            
-            StatusNotification.NotifyChange("Restart processing interrupted files", TraceLevel.Verbose);
-            var interruptedStates = new[] { FileOverlayStatus.Unversioned, FileOverlayStatus.Modified };
-
-            var pendingEntries = (from state in FileState.Queryable
-                                 where interruptedStates.Contains(state.OverlayStatus) &&
-                                       !state.FilePath.StartsWith(FragmentsPath) &&
-                                       !state.FilePath.EndsWith(".ignore")
-                                 select state).ToList();
-            var staleEntries = from state in pendingEntries
-                                  where !File.Exists(state.FilePath)
-                                  select state;
-            var staleKeys = staleEntries.Select(state=>state.Id);
-            FileState.DeleteAll(staleKeys);
-
-            var validEntries = from state in pendingEntries.Except(staleEntries)
-                             where File.Exists(state.FilePath)
-                             select new WorkflowState
-                             {
-                                 Path = state.FilePath.ToLower(),
-                                 FileName = Path.GetFileName(state.FilePath).ToLower(),
-                                 Hash = state.Checksum,
-                                 Status = state.OverlayStatus == FileOverlayStatus.Unversioned ?
-                                                   FileStatus.Created :
-                                                   FileStatus.Modified,
-                                 TriggeringChange = state.OverlayStatus == FileOverlayStatus.Unversioned ?
-                                                   WatcherChangeTypes.Created :
-                                                   WatcherChangeTypes.Changed
-                             };
-
-            _agent.AddFromEnumerable(validEntries);
-
-
-        }       
-
-        private void Process(WorkflowState state)
+        private Task<object> Process(WorkflowState state)
         {
             if (state.Skip)
-                return;
+                return CompletedTask<object>.Default;
             string path = state.Path.ToLower();
             string fileName = Path.GetFileName(path);
 
             //Bypass deleted files, unless the status is Deleted
-            if (!(File.Exists(path) || state.Status != FileStatus.Deleted))
+            if (!File.Exists(path) && state.Status != FileStatus.Deleted)
             {
                 state.Skip = true;
                 this.StatusKeeper.RemoveFileOverlayStatus(path);
-                return;
+                return CompletedTask<object>.Default;
             }
+            var fileState = FileState.FindByFilePath(path);
+            var blockHash = NetworkAgent.BlockHash;
+            var blockSize = NetworkAgent.BlockSize;
 
             switch (state.Status)
             {
                 case FileStatus.Created:
                 case FileStatus.Modified:
                     var info = new FileInfo(path);
-                    NetworkAgent.Post(new CloudAction(CloudActionType.UploadUnconditional, info, ObjectInfo.Empty));
+                    NetworkAgent.Post(new CloudAction(CloudActionType.UploadUnconditional, info, ObjectInfo.Empty, fileState,blockSize,blockHash));
                     break;
                 case FileStatus.Deleted:
-                    NetworkAgent.Post(new CloudAction(CloudActionType.DeleteCloud, null, new ObjectInfo {Name=fileName}));                    
+                    NetworkAgent.Post(new CloudAction(CloudActionType.DeleteCloud, null, new ObjectInfo { Name = fileName }, fileState, blockSize, blockHash));
                     break;
                 case FileStatus.Renamed:
-                    NetworkAgent.Post(new CloudAction(CloudActionType.RenameCloud, state.OldFileName,state.OldPath,state.FileName,state.Path));                    
+                    NetworkAgent.Post(new CloudAction(CloudActionType.RenameCloud, state.OldFileName, state.OldPath, state.FileName, state.Path));
                     break;
             }
 
-            return;
+            return CompletedTask<object>.Default;
         }
 
+
+
+        public void RestartInterruptedFiles()
+        {
+            
+            StatusNotification.NotifyChange("Restart processing interrupted files", TraceLevel.Verbose);            
+
+            var pendingEntries = from state in FileState.Queryable
+                                   where state.FileStatus != FileStatus.Unchanged &&
+                                         !state.FilePath.StartsWith(FragmentsPath.ToLower()) &&
+                                         !state.FilePath.EndsWith(".ignore")
+                                   select state;
+
+            var validEntries = from state in pendingEntries
+                             select new WorkflowState
+                             {
+                                 Path = state.FilePath.ToLower(),
+                                 FileName = Path.GetFileName(state.FilePath).ToLower(),
+                                 Hash = state.Checksum,
+                                 Status = state.OverlayStatus == FileOverlayStatus.Unversioned ?
+                                                   FileStatus.Created :
+                                                   state.FileStatus,
+                                 TriggeringChange = state.OverlayStatus == FileOverlayStatus.Unversioned ?
+                                                   WatcherChangeTypes.Created :
+                                                   WatcherChangeTypes.Changed
+                             };
+            foreach (var entry in validEntries)
+            {
+                Post(entry);
+            }            
+
+        }       
+
        
 
         public void Post(WorkflowState workflowState)