Convert ActiveRecord update code to direct ADO calls to reduce locks
[pithos-ms-client] / trunk / Pithos.Core / Agents / WorkflowAgent.cs
index 6553636..1112076 100644 (file)
@@ -1,4 +1,41 @@
-using System;
+// -----------------------------------------------------------------------
+// <copyright file="WorkflowAgent.cs" company="GRNET">
+// Copyright 2011 GRNET S.A. All rights reserved.
+// 
+// Redistribution and use in source and binary forms, with or
+// without modification, are permitted provided that the following
+// conditions are met:
+// 
+//   1. Redistributions of source code must retain the above
+//      copyright notice, this list of conditions and the following
+//      disclaimer.
+// 
+//   2. Redistributions in binary form must reproduce the above
+//      copyright notice, this list of conditions and the following
+//      disclaimer in the documentation and/or other materials
+//      provided with the distribution.
+// 
+// THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
+// OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+// WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
+// PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
+// CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
+// USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
+// AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
+// LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
+// ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+// POSSIBILITY OF SUCH DAMAGE.
+// 
+// The views and conclusions contained in the software and
+// documentation are those of the authors and should not be
+// interpreted as representing official policies, either expressed
+// or implied, of GRNET S.A.
+// </copyright>
+// -----------------------------------------------------------------------
+
+using System;
 using System.Collections.Generic;
 using System.ComponentModel.Composition;
 using System.Diagnostics;
@@ -7,7 +44,10 @@ using System.IO;
 using System.Linq;
 using System.Text;
 using System.Threading.Tasks;
+using Castle.ActiveRecord;
 using Pithos.Interfaces;
+using Pithos.Network;
+using log4net;
 
 namespace Pithos.Core.Agents
 {
@@ -17,16 +57,14 @@ namespace Pithos.Core.Agents
         Agent<WorkflowState> _agent;
                 
         public IStatusNotification StatusNotification { get; set; }
-        [Import]
+        [System.ComponentModel.Composition.Import]
         public IStatusKeeper StatusKeeper { get; set; }
 
-        //We should avoid processing files stored in the Fragments folder
-        //The Full path to the fragments folder is stored in FragmentsPath
-        public string FragmentsPath { get; set; }
-
-        [Import]
+        [System.ComponentModel.Composition.Import]
         public NetworkAgent NetworkAgent { get; set; }
 
+        private static readonly ILog Log = LogManager.GetLogger("WorkflowAgent");
+
         public void Start()
         {
             _agent = Agent<WorkflowState>.Start(inbox =>
@@ -37,7 +75,7 @@ namespace Pithos.Core.Agents
                     var message = inbox.Receive();
                     var process = message.Then(Process, inbox.CancellationToken);                        
                     inbox.LoopAsync(process,loop,ex=>
-                            Trace.TraceError("[ERROR] Synch for {0}:\r{1}", message.Result.FileName, ex));
+                            Log.ErrorFormat("[ERROR] Synch for {0}:\r{1}", message.Result.FileName, ex));
                 };
                 loop();
             });
@@ -45,78 +83,136 @@ namespace Pithos.Core.Agents
 
         private Task<object> Process(WorkflowState state)
         {
-            if (state.Skip)
-                return CompletedTask<object>.Default;
-            string path = state.Path.ToLower();
-            string fileName = Path.GetFileName(path);
-
-            //Bypass deleted files, unless the status is Deleted
-            if (!File.Exists(path) && state.Status != FileStatus.Deleted)
+            var accountInfo = state.AccountInfo;
+            using (log4net.ThreadContext.Stacks["Workflow"].Push("Process"))
             {
-                state.Skip = true;
-                this.StatusKeeper.RemoveFileOverlayStatus(path);
-                return CompletedTask<object>.Default;
-            }
-            var fileState = FileState.FindByFilePath(path);
-            var blockHash = NetworkAgent.BlockHash;
-            var blockSize = NetworkAgent.BlockSize;
+                try
+                {
 
-            switch (state.Status)
-            {
-                case FileStatus.Created:
-                case FileStatus.Modified:
-                    var info = new FileInfo(path);
-                    NetworkAgent.Post(new CloudAction(CloudActionType.UploadUnconditional, info, ObjectInfo.Empty, fileState,blockSize,blockHash));
-                    break;
-                case FileStatus.Deleted:
-                    NetworkAgent.Post(new CloudAction(CloudActionType.DeleteCloud, null, new ObjectInfo { Name = fileName }, fileState, blockSize, blockHash));
-                    break;
-                case FileStatus.Renamed:
-                    NetworkAgent.Post(new CloudAction(CloudActionType.RenameCloud, state.OldFileName, state.OldPath, state.FileName, state.Path));
-                    break;
+                    if (Log.IsDebugEnabled)
+                        Log.DebugFormat("State {0} {1} {2}", state.FileName, state.Status, state.TriggeringChange);
+
+                    if (state.Skip)
+                    {
+                        if (Log.IsDebugEnabled) Log.DebugFormat("Skipping {0}", state.FileName);
+
+                        return CompletedTask<object>.Default;
+                    }
+                    string path = state.Path.ToLower();
+
+
+
+                    FileSystemInfo info = Directory.Exists(path) ? (FileSystemInfo) new DirectoryInfo(path) : new FileInfo(path);
+
+                    //Bypass deleted files, unless the status is Deleted
+                    if (!info.Exists && state.Status != FileStatus.Deleted)
+                    {
+                        state.Skip = true;
+                        this.StatusKeeper.ClearFileStatus(path);
+
+                        if (Log.IsDebugEnabled) Log.DebugFormat("Skipped missing {0}", state.FileName);
+
+                        return CompletedTask<object>.Default;
+                    }
+
+                    using (new SessionScope(FlushAction.Never))
+                    {
+
+                        var fileState = FileState.FindByFilePath(path);
+
+
+                        switch (state.Status)
+                        {
+                            case FileStatus.Created:
+                            case FileStatus.Modified:
+                                NetworkAgent.Post(new CloudUploadAction(accountInfo, info, fileState,
+                                                                        accountInfo.BlockSize,
+                                                                        accountInfo.BlockHash));
+                                break;
+                            case FileStatus.Deleted:
+                                NetworkAgent.Post(new CloudDeleteAction(accountInfo, info, fileState));
+                                break;
+                            case FileStatus.Renamed:
+                                FileSystemInfo oldInfo = Directory.Exists(state.OldPath)
+                                                             ? (FileSystemInfo) new DirectoryInfo(state.OldPath)
+                                                             : new FileInfo(state.OldPath);
+                                FileSystemInfo newInfo = Directory.Exists(state.Path)
+                                                             ? (FileSystemInfo) new DirectoryInfo(state.Path)
+                                                             : new FileInfo(state.Path);
+                                NetworkAgent.Post(new CloudMoveAction(accountInfo, CloudActionType.RenameCloud,
+                                                                      oldInfo,
+                                                                      newInfo));
+                                break;
+                        }
+                    }
+
+                    return CompletedTask<object>.Default;
+                }
+                catch (Exception ex)
+                {
+                    Log.Error(ex.ToString());
+                    throw;
+                }
             }
-
-            return CompletedTask<object>.Default;
         }
 
 
-
-        public void RestartInterruptedFiles()
+        //Starts interrupted files for a specific account
+        public void RestartInterruptedFiles(AccountInfo accountInfo)
         {
             
-            StatusNotification.NotifyChange("Restart processing interrupted files", TraceLevel.Verbose);            
-
-            var pendingEntries = from state in FileState.Queryable
-                                   where state.FileStatus != FileStatus.Unchanged &&
-                                         !state.FilePath.StartsWith(FragmentsPath.ToLower()) &&
-                                         !state.FilePath.EndsWith(".ignore")
-                                   select state;
-
-            var validEntries = from state in pendingEntries
-                             select new WorkflowState
-                             {
-                                 Path = state.FilePath.ToLower(),
-                                 FileName = Path.GetFileName(state.FilePath).ToLower(),
-                                 Hash = state.Checksum,
-                                 Status = state.OverlayStatus == FileOverlayStatus.Unversioned ?
-                                                   FileStatus.Created :
-                                                   state.FileStatus,
-                                 TriggeringChange = state.OverlayStatus == FileOverlayStatus.Unversioned ?
-                                                   WatcherChangeTypes.Created :
-                                                   WatcherChangeTypes.Changed
-                             };
-            foreach (var entry in validEntries)
+            StatusNotification.NotifyChange("Restart processing interrupted files", TraceLevel.Verbose);
+
+            using (log4net.ThreadContext.Stacks["Workflow"].Push("Restart"))
             {
-                Post(entry);
-            }            
+                if (Log.IsDebugEnabled)
+                    Log.Debug("Starting interrupted files");
+
+                var cachePath = Path.Combine(accountInfo.AccountPath, FolderConstants.CacheFolder)
+                    .ToLower();
+
+
+
+                var account = accountInfo;
+                var pendingEntries = from state in FileState.Queryable
+                                     where state.FileStatus != FileStatus.Unchanged &&
+                                           !state.FilePath.StartsWith(cachePath) &&
+                                           !state.FilePath.EndsWith(".ignore") &&
+                                           state.FilePath.StartsWith(account.AccountPath)
+                                     select state;
+                var pendingStates = new List<WorkflowState>();
+                foreach (var state in pendingEntries)
+                {
+                        pendingStates.Add(new WorkflowState(account, state));
+                }
+                if (Log.IsDebugEnabled)
+                    Log.DebugFormat("Found {0} interrupted files", pendingStates.Count);
+
+
+                foreach (var entry in pendingStates)
+                {
+                       Post(entry);
+                }
+            }
+        }
 
-        }       
 
-       
 
         public void Post(WorkflowState workflowState)
         {
+            if (Log.IsDebugEnabled)
+                Log.DebugFormat("Posted {0} {1} {2}", workflowState.Path, workflowState.Status, workflowState.TriggeringChange);
+
+            //Remove invalid state            
+            //For now, ignore paths
+           /* if (Directory.Exists(workflowState.Path))
+                return;*/
+            //TODO: Need to handle folder renames            
+
+            Debug.Assert(workflowState.Path.StartsWith(workflowState.AccountInfo.AccountPath, StringComparison.InvariantCultureIgnoreCase), "File from wrong account posted");
+
             _agent.Post(workflowState);
-        }
+        }     
+
     }
 }