Removed Dataflow code
[pithos-ms-client] / trunk / Pithos.Core / Agents / WorkflowAgent.cs
index ba0f3a6..bc412b1 100644 (file)
@@ -7,7 +7,6 @@ using System.IO;
 using System.Linq;
 using System.Text;
 using System.Threading.Tasks;
-using System.Threading.Tasks.Dataflow;
 using Pithos.Interfaces;
 using Pithos.Network;
 using log4net;
@@ -17,7 +16,7 @@ namespace Pithos.Core.Agents
     [Export]
     public class WorkflowAgent
     {
-        private ActionBlock<WorkflowState> _agent;
+        Agent<WorkflowState> _agent;
                 
         public IStatusNotification StatusNotification { get; set; }
         [Import]
@@ -26,44 +25,25 @@ namespace Pithos.Core.Agents
         [Import]
         public NetworkAgent NetworkAgent { get; set; }
 
-        public ActionBlock<WorkflowState> Agent
-        {
-            get { return _agent; }
-        }
-
         private static readonly ILog Log = LogManager.GetLogger("WorkflowAgent");
 
         public void Start()
         {
-            /*_agent = new ActionBlock<WorkflowState>(message =>
+            _agent = Agent<WorkflowState>.Start(inbox =>
             {
                 Action loop = null;
                 loop = () =>
                 {
-                    //var message = inbox.Receive();
-                    Process(message);
+                    var message = inbox.Receive();
                     var process = message.Then(Process, inbox.CancellationToken);                        
                     inbox.LoopAsync(process,loop,ex=>
                             Log.ErrorFormat("[ERROR] Synch for {0}:\r{1}", message.Result.FileName, ex));
                 };
                 loop();
-            });*/
-            _agent = new ActionBlock<WorkflowState>(async message =>
-            {
-                try
-                {
-                    var action=await TaskEx.Run(()=>Process(message));
-                    if (action!=null)
-                        NetworkAgent.Post(action);
-                }
-                catch (Exception ex)
-                {
-                    Log.ErrorFormat("[ERROR] Synch for {0}:\r{1}", message.FileName, ex);                    
-                }
-            });            
+            });
         }
 
-        private CloudAction Process(WorkflowState state)
+        private Task<object> Process(WorkflowState state)
         {
             var accountInfo = state.AccountInfo;
             using (log4net.ThreadContext.Stacks["Workflow"].Push("Process"))
@@ -74,7 +54,7 @@ namespace Pithos.Core.Agents
                 {
                     if (Log.IsDebugEnabled) Log.DebugFormat("Skipping {0}",state.FileName);
                     
-                    return null;
+                    return CompletedTask<object>.Default;
                 }
                 string path = state.Path.ToLower();
 
@@ -86,7 +66,7 @@ namespace Pithos.Core.Agents
                     
                     if (Log.IsDebugEnabled) Log.DebugFormat("Skipped missing {0}", state.FileName);
 
-                    return null;
+                    return CompletedTask<object>.Default;
                 }
 
                 var fileState = FileState.FindByFilePath(path);
@@ -97,16 +77,19 @@ namespace Pithos.Core.Agents
                 {
                     case FileStatus.Created:
                     case FileStatus.Modified:
-                        return new CloudUploadAction(accountInfo,info, fileState, accountInfo.BlockSize,
-                            accountInfo.BlockHash);
+                        NetworkAgent.Post(new CloudUploadAction(accountInfo,info, fileState, accountInfo.BlockSize,
+                            accountInfo.BlockHash));
+                        break;
                     case FileStatus.Deleted:                       
-                        return new CloudDeleteAction(accountInfo,info, fileState);
+                        NetworkAgent.Post(new CloudDeleteAction(accountInfo,info, fileState));
+                        break;
                     case FileStatus.Renamed:
-                        return new CloudMoveAction(accountInfo,CloudActionType.RenameCloud, 
-                            new FileInfo(state.OldPath),
-                            new FileInfo(state.Path));
+                        NetworkAgent.Post(new CloudMoveAction(accountInfo,CloudActionType.RenameCloud, new FileInfo(state.OldPath),
+                                                              new FileInfo(state.Path)));
+                        break;
                 }
-                return null;
+
+                return CompletedTask<object>.Default;
             }
         }
 
@@ -126,44 +109,52 @@ namespace Pithos.Core.Agents
                     .ToLower();
 
 
+
+                var account = accountInfo;
                 var pendingEntries = from state in FileState.Queryable
                                      where state.FileStatus != FileStatus.Unchanged &&
                                            !state.FilePath.StartsWith(cachePath) &&
                                            !state.FilePath.EndsWith(".ignore") &&
-                                           state.FilePath.StartsWith(accountInfo.AccountPath)
+                                           state.FilePath.StartsWith(account.AccountPath)
                                      select state;
+                var pendingStates = new List<WorkflowState>();
+                foreach (var state in pendingEntries)
+                {
+                    pendingStates.Add(new WorkflowState(account, state));
+                }
                 if (Log.IsDebugEnabled)
-                    Log.DebugFormat("Found {0} interrupted files",pendingEntries.Count());
-
-                var validEntries = from state in pendingEntries
-                                   select new WorkflowState
-                                              {
-                                                  AccountInfo=accountInfo,
-                                                  Path = state.FilePath.ToLower(),
-                                                  FileName = Path.GetFileName(state.FilePath).ToLower(),
-                                                  Hash = state.Checksum,
-                                                  Status = state.OverlayStatus == FileOverlayStatus.Unversioned
-                                                               ? FileStatus.Created
-                                                               : state.FileStatus,
-                                                  TriggeringChange =
-                                                      state.OverlayStatus == FileOverlayStatus.Unversioned
-                                                          ? WatcherChangeTypes.Created
-                                                          : WatcherChangeTypes.Changed
-                                              };
-                foreach (var entry in validEntries)
-                {                    
-                    Post(entry);
+                    Log.DebugFormat("Found {0} interrupted files", pendingStates.Count);
+
+
+                foreach (var entry in pendingStates)
+                {
+                    //Remove invalid state            
+                    if (Directory.Exists(entry.Path))
+                    {
+                        Debug.Assert(false, "State has path instead of file", entry.Path);
+                        StatusKeeper.ClearFileStatus(entry.Path);
+                        return;
+                    }
+                    else
+                        Post(entry);
                 }
             }
-        }       
+        }
+
 
-       
 
         public void Post(WorkflowState workflowState)
         {
             if (Log.IsDebugEnabled)
                 Log.DebugFormat("Posted {0} {1} {2}", workflowState.Path, workflowState.Status, workflowState.TriggeringChange);
-            Agent.Post(workflowState);
-        }
+
+            //Remove invalid state            
+            Debug.Assert(!Directory.Exists(workflowState.Path), "State has path instead of file", workflowState.Path);
+
+            Debug.Assert(workflowState.Path.StartsWith(workflowState.AccountInfo.AccountPath, StringComparison.InvariantCultureIgnoreCase), "File from wrong account posted");
+
+            _agent.Post(workflowState);
+        }     
+
     }
 }