X-Git-Url: https://code.grnet.gr/git/pithos-ms-client/blobdiff_plain/0af3141d74dcd7b7a75fcaa51414e52691c1f5fe..e81dd1f6fd93a8c094dff04db580385029fc8ee4:/trunk/Pithos.Core/Agents/WorkflowAgent.cs?ds=sidebyside diff --git a/trunk/Pithos.Core/Agents/WorkflowAgent.cs b/trunk/Pithos.Core/Agents/WorkflowAgent.cs index 6553636..1112076 100644 --- a/trunk/Pithos.Core/Agents/WorkflowAgent.cs +++ b/trunk/Pithos.Core/Agents/WorkflowAgent.cs @@ -1,4 +1,41 @@ -using System; +// ----------------------------------------------------------------------- +// +// Copyright 2011 GRNET S.A. All rights reserved. +// +// Redistribution and use in source and binary forms, with or +// without modification, are permitted provided that the following +// conditions are met: +// +// 1. Redistributions of source code must retain the above +// copyright notice, this list of conditions and the following +// disclaimer. +// +// 2. Redistributions in binary form must reproduce the above +// copyright notice, this list of conditions and the following +// disclaimer in the documentation and/or other materials +// provided with the distribution. +// +// THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS +// OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED +// WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR +// PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR +// CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF +// USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED +// AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT +// LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN +// ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE +// POSSIBILITY OF SUCH DAMAGE. +// +// The views and conclusions contained in the software and +// documentation are those of the authors and should not be +// interpreted as representing official policies, either expressed +// or implied, of GRNET S.A. +// +// ----------------------------------------------------------------------- + +using System; using System.Collections.Generic; using System.ComponentModel.Composition; using System.Diagnostics; @@ -7,7 +44,10 @@ using System.IO; using System.Linq; using System.Text; using System.Threading.Tasks; +using Castle.ActiveRecord; using Pithos.Interfaces; +using Pithos.Network; +using log4net; namespace Pithos.Core.Agents { @@ -17,16 +57,14 @@ namespace Pithos.Core.Agents Agent _agent; public IStatusNotification StatusNotification { get; set; } - [Import] + [System.ComponentModel.Composition.Import] public IStatusKeeper StatusKeeper { get; set; } - //We should avoid processing files stored in the Fragments folder - //The Full path to the fragments folder is stored in FragmentsPath - public string FragmentsPath { get; set; } - - [Import] + [System.ComponentModel.Composition.Import] public NetworkAgent NetworkAgent { get; set; } + private static readonly ILog Log = LogManager.GetLogger("WorkflowAgent"); + public void Start() { _agent = Agent.Start(inbox => @@ -37,7 +75,7 @@ namespace Pithos.Core.Agents var message = inbox.Receive(); var process = message.Then(Process, inbox.CancellationToken); inbox.LoopAsync(process,loop,ex=> - Trace.TraceError("[ERROR] Synch for {0}:\r{1}", message.Result.FileName, ex)); + Log.ErrorFormat("[ERROR] Synch for {0}:\r{1}", message.Result.FileName, ex)); }; loop(); }); @@ -45,78 +83,136 @@ namespace Pithos.Core.Agents private Task Process(WorkflowState state) { - if (state.Skip) - return CompletedTask.Default; - string path = state.Path.ToLower(); - string fileName = Path.GetFileName(path); - - //Bypass deleted files, unless the status is Deleted - if (!File.Exists(path) && state.Status != FileStatus.Deleted) + var accountInfo = state.AccountInfo; + using (log4net.ThreadContext.Stacks["Workflow"].Push("Process")) { - state.Skip = true; - this.StatusKeeper.RemoveFileOverlayStatus(path); - return CompletedTask.Default; - } - var fileState = FileState.FindByFilePath(path); - var blockHash = NetworkAgent.BlockHash; - var blockSize = NetworkAgent.BlockSize; + try + { - switch (state.Status) - { - case FileStatus.Created: - case FileStatus.Modified: - var info = new FileInfo(path); - NetworkAgent.Post(new CloudAction(CloudActionType.UploadUnconditional, info, ObjectInfo.Empty, fileState,blockSize,blockHash)); - break; - case FileStatus.Deleted: - NetworkAgent.Post(new CloudAction(CloudActionType.DeleteCloud, null, new ObjectInfo { Name = fileName }, fileState, blockSize, blockHash)); - break; - case FileStatus.Renamed: - NetworkAgent.Post(new CloudAction(CloudActionType.RenameCloud, state.OldFileName, state.OldPath, state.FileName, state.Path)); - break; + if (Log.IsDebugEnabled) + Log.DebugFormat("State {0} {1} {2}", state.FileName, state.Status, state.TriggeringChange); + + if (state.Skip) + { + if (Log.IsDebugEnabled) Log.DebugFormat("Skipping {0}", state.FileName); + + return CompletedTask.Default; + } + string path = state.Path.ToLower(); + + + + FileSystemInfo info = Directory.Exists(path) ? (FileSystemInfo) new DirectoryInfo(path) : new FileInfo(path); + + //Bypass deleted files, unless the status is Deleted + if (!info.Exists && state.Status != FileStatus.Deleted) + { + state.Skip = true; + this.StatusKeeper.ClearFileStatus(path); + + if (Log.IsDebugEnabled) Log.DebugFormat("Skipped missing {0}", state.FileName); + + return CompletedTask.Default; + } + + using (new SessionScope(FlushAction.Never)) + { + + var fileState = FileState.FindByFilePath(path); + + + switch (state.Status) + { + case FileStatus.Created: + case FileStatus.Modified: + NetworkAgent.Post(new CloudUploadAction(accountInfo, info, fileState, + accountInfo.BlockSize, + accountInfo.BlockHash)); + break; + case FileStatus.Deleted: + NetworkAgent.Post(new CloudDeleteAction(accountInfo, info, fileState)); + break; + case FileStatus.Renamed: + FileSystemInfo oldInfo = Directory.Exists(state.OldPath) + ? (FileSystemInfo) new DirectoryInfo(state.OldPath) + : new FileInfo(state.OldPath); + FileSystemInfo newInfo = Directory.Exists(state.Path) + ? (FileSystemInfo) new DirectoryInfo(state.Path) + : new FileInfo(state.Path); + NetworkAgent.Post(new CloudMoveAction(accountInfo, CloudActionType.RenameCloud, + oldInfo, + newInfo)); + break; + } + } + + return CompletedTask.Default; + } + catch (Exception ex) + { + Log.Error(ex.ToString()); + throw; + } } - - return CompletedTask.Default; } - - public void RestartInterruptedFiles() + //Starts interrupted files for a specific account + public void RestartInterruptedFiles(AccountInfo accountInfo) { - StatusNotification.NotifyChange("Restart processing interrupted files", TraceLevel.Verbose); - - var pendingEntries = from state in FileState.Queryable - where state.FileStatus != FileStatus.Unchanged && - !state.FilePath.StartsWith(FragmentsPath.ToLower()) && - !state.FilePath.EndsWith(".ignore") - select state; - - var validEntries = from state in pendingEntries - select new WorkflowState - { - Path = state.FilePath.ToLower(), - FileName = Path.GetFileName(state.FilePath).ToLower(), - Hash = state.Checksum, - Status = state.OverlayStatus == FileOverlayStatus.Unversioned ? - FileStatus.Created : - state.FileStatus, - TriggeringChange = state.OverlayStatus == FileOverlayStatus.Unversioned ? - WatcherChangeTypes.Created : - WatcherChangeTypes.Changed - }; - foreach (var entry in validEntries) + StatusNotification.NotifyChange("Restart processing interrupted files", TraceLevel.Verbose); + + using (log4net.ThreadContext.Stacks["Workflow"].Push("Restart")) { - Post(entry); - } + if (Log.IsDebugEnabled) + Log.Debug("Starting interrupted files"); + + var cachePath = Path.Combine(accountInfo.AccountPath, FolderConstants.CacheFolder) + .ToLower(); + + + + var account = accountInfo; + var pendingEntries = from state in FileState.Queryable + where state.FileStatus != FileStatus.Unchanged && + !state.FilePath.StartsWith(cachePath) && + !state.FilePath.EndsWith(".ignore") && + state.FilePath.StartsWith(account.AccountPath) + select state; + var pendingStates = new List(); + foreach (var state in pendingEntries) + { + pendingStates.Add(new WorkflowState(account, state)); + } + if (Log.IsDebugEnabled) + Log.DebugFormat("Found {0} interrupted files", pendingStates.Count); + + + foreach (var entry in pendingStates) + { + Post(entry); + } + } + } - } - public void Post(WorkflowState workflowState) { + if (Log.IsDebugEnabled) + Log.DebugFormat("Posted {0} {1} {2}", workflowState.Path, workflowState.Status, workflowState.TriggeringChange); + + //Remove invalid state + //For now, ignore paths + /* if (Directory.Exists(workflowState.Path)) + return;*/ + //TODO: Need to handle folder renames + + Debug.Assert(workflowState.Path.StartsWith(workflowState.AccountInfo.AccountPath, StringComparison.InvariantCultureIgnoreCase), "File from wrong account posted"); + _agent.Post(workflowState); - } + } + } }