-using System;
+#region
+/* -----------------------------------------------------------------------
+ * <copyright file="WorkflowAgent.cs" company="GRNet">
+ *
+ * Copyright 2011-2012 GRNET S.A. All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or
+ * without modification, are permitted provided that the following
+ * conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above
+ * copyright notice, this list of conditions and the following
+ * disclaimer.
+ *
+ * 2. Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following
+ * disclaimer in the documentation and/or other materials
+ * provided with the distribution.
+ *
+ *
+ * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
+ * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+ * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
+ * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
+ * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
+ * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
+ * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
+ * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
+ * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ *
+ * The views and conclusions contained in the software and
+ * documentation are those of the authors and should not be
+ * interpreted as representing official policies, either expressed
+ * or implied, of GRNET S.A.
+ * </copyright>
+ * -----------------------------------------------------------------------
+ */
+#endregion
+using System;
using System.Collections.Generic;
using System.ComponentModel.Composition;
using System.Diagnostics;
using System.Diagnostics.Contracts;
using System.IO;
using System.Linq;
+using System.Reflection;
using System.Text;
+using System.Threading.Tasks;
+using Castle.ActiveRecord;
using Pithos.Interfaces;
+using Pithos.Network;
+using log4net;
namespace Pithos.Core.Agents
{
[Export]
public class WorkflowAgent
{
- Agent<WorkflowState> _agent;
+ private static readonly ILog Log = LogManager.GetLogger(MethodBase.GetCurrentMethod().DeclaringType);
+
+ readonly Agent<WorkflowState> _agent;
public IStatusNotification StatusNotification { get; set; }
- [Import]
+ [System.ComponentModel.Composition.Import]
public IStatusKeeper StatusKeeper { get; set; }
- //We should avoid processing files stored in the Fragments folder
- //The Full path to the fragments folder is stored in FragmentsPath
- public string FragmentsPath { get; set; }
-
- [Import]
+ [System.ComponentModel.Composition.Import]
public NetworkAgent NetworkAgent { get; set; }
- public void Start()
+ [System.ComponentModel.Composition.Import]
+ public IPithosSettings Settings { get; set; }
+
+
+ public WorkflowAgent()
{
_agent = Agent<WorkflowState>.Start(inbox =>
{
loop = () =>
{
var message = inbox.Receive();
- var process = message.ContinueWith(t =>
- {
- var state = t.Result;
- Process(state);
- inbox.DoAsync(loop);
- });
- process.ContinueWith(t =>
- {
- inbox.DoAsync(loop);
- if (t.IsFaulted)
- {
- var ex = t.Exception.InnerException;
- if (ex is OperationCanceledException)
- inbox.Stop();
- Trace.TraceError("[ERROR] Synch for {0}:\r{1}", message.Result.FileName, ex);
- }
- });
-
+ var process = message.Then(Process, inbox.CancellationToken);
+ inbox.LoopAsync(process,loop,ex=>
+ Log.ErrorFormat("[ERROR] Synch for {0}:\r{1}", message.Result.FileName, ex));
};
loop();
});
}
- public void RestartInterruptedFiles()
+ private Task<object> Process(WorkflowState state)
{
-
- StatusNotification.NotifyChange("Restart processing interrupted files", TraceLevel.Verbose);
- var interruptedStates = new[] { FileOverlayStatus.Unversioned, FileOverlayStatus.Modified };
-
- var pendingEntries = (from state in FileState.Queryable
- where interruptedStates.Contains(state.OverlayStatus) &&
- !state.FilePath.StartsWith(FragmentsPath) &&
- !state.FilePath.EndsWith(".ignore")
- select state).ToList();
- var staleEntries = from state in pendingEntries
- where !File.Exists(state.FilePath)
- select state;
- var staleKeys = staleEntries.Select(state=>state.Id);
- FileState.DeleteAll(staleKeys);
-
- var validEntries = from state in pendingEntries.Except(staleEntries)
- where File.Exists(state.FilePath)
- select new WorkflowState
- {
- Path = state.FilePath.ToLower(),
- FileName = Path.GetFileName(state.FilePath).ToLower(),
- Hash = state.Checksum,
- Status = state.OverlayStatus == FileOverlayStatus.Unversioned ?
- FileStatus.Created :
- FileStatus.Modified,
- TriggeringChange = state.OverlayStatus == FileOverlayStatus.Unversioned ?
- WatcherChangeTypes.Created :
- WatcherChangeTypes.Changed
- };
- foreach (var entry in validEntries)
+ var accountInfo = state.AccountInfo;
+ using (log4net.ThreadContext.Stacks["Workflow"].Push("Process"))
{
- Post(entry);
- }
+ try
+ {
- }
+ if (Log.IsDebugEnabled)
+ Log.DebugFormat("State {0} {1} {2}", state.FileName, state.Status, state.TriggeringChange);
- private void Process(WorkflowState state)
- {
- if (state.Skip)
- return;
- string path = state.Path.ToLower();
- string fileName = Path.GetFileName(path);
+ if (state.Skip)
+ {
+ if (Log.IsDebugEnabled) Log.DebugFormat("Skipping {0}", state.FileName);
+
+ return CompletedTask<object>.Default;
+ }
+
+ var info = Directory.Exists(state.Path) ? (FileSystemInfo)new DirectoryInfo(state.Path) : new FileInfo(state.Path);
+
+ //Bypass deleted files, unless the status is Deleted
+ if (!info.Exists && state.Status != FileStatus.Deleted)
+ {
+ state.Skip = true;
+ this.StatusKeeper.ClearFileStatus(state.Path);
+
+ if (Log.IsDebugEnabled) Log.DebugFormat("Skipped missing {0}", state.FileName);
+
+ return CompletedTask<object>.Default;
+ }
+
+ using (new SessionScope(FlushAction.Never))
+ {
+
+ var fileState = StatusKeeper.GetStateByFilePath(state.Path);
+
+ switch (state.Status)
+ {
+ case FileStatus.Created:
+ case FileStatus.Modified:
+ NetworkAgent.Post(new CloudUploadAction(accountInfo, info, fileState,
+ accountInfo.BlockSize,
+ accountInfo.BlockHash));
+ break;
+ case FileStatus.Deleted:
+ DeleteChildObjects(state, fileState);
+ NetworkAgent.Post(new CloudDeleteAction(accountInfo, info, fileState));
+ break;
+ case FileStatus.Renamed:
+ if (state.OldPath == null)
+ {
+ //We reach this point only if the app closed before propagating a rename to the server
+ Log.WarnFormat("Unfinished rename [{0}]",state.Path);
+ StatusKeeper.SetFileState(state.Path,FileStatus.Conflict,FileOverlayStatus.Conflict, "Rename without old path");
+ break;
+ }
+ FileSystemInfo oldInfo = Directory.Exists(state.OldPath)
+ ? (FileSystemInfo) new DirectoryInfo(state.OldPath)
+ : new FileInfo(state.OldPath);
+ FileSystemInfo newInfo = Directory.Exists(state.Path)
+ ? (FileSystemInfo) new DirectoryInfo(state.Path)
+ : new FileInfo(state.Path);
+ NetworkAgent.Post(new CloudMoveAction(accountInfo, CloudActionType.RenameCloud,
+ oldInfo,
+ newInfo));
+ //TODO: Do I have to move children as well or will Pithos handle this?
+ //Need to find all children of the OLD filepath
+ //MoveChildObjects(state);
+ break;
+ }
+ }
+
+ return CompletedTask<object>.Default;
+ }
+ catch (Exception ex)
+ {
+ Log.Error(ex.ToString());
+ throw;
+ }
+ }
+ }
- //Bypass deleted files, unless the status is Deleted
- if (!(File.Exists(path) || state.Status != FileStatus.Deleted))
+
+ private void DeleteChildObjects(WorkflowState state, FileState fileState)
+ {
+ if (fileState != null)
{
- state.Skip = true;
- this.StatusKeeper.RemoveFileOverlayStatus(path);
- return;
+ var children = StatusKeeper.GetChildren(fileState);
+ foreach (var child in children)
+ {
+ var childInfo = child.IsFolder
+ ? (FileSystemInfo) new DirectoryInfo(child.FilePath)
+ : new FileInfo(child.FilePath);
+ NetworkAgent.Post(new CloudDeleteAction(state.AccountInfo, childInfo, child));
+ }
}
- var fileState = FileState.FindByFilePath(path);
- switch (state.Status)
+ }
+
+ /*private void MoveChildObjects(WorkflowState state)
+ {
+ var oldFileState = StatusKeeper.GetStateByFilePath(state.OldPath);
+ if (oldFileState != null)
{
- case FileStatus.Created:
- case FileStatus.Modified:
- var info = new FileInfo(path);
- NetworkAgent.Post(new CloudAction(CloudActionType.UploadUnconditional, info, ObjectInfo.Empty,fileState));
- break;
- case FileStatus.Deleted:
- NetworkAgent.Post(new CloudAction(CloudActionType.DeleteCloud, null, new ObjectInfo {Name=fileName},fileState));
- break;
- case FileStatus.Renamed:
- NetworkAgent.Post(new CloudAction(CloudActionType.RenameCloud, state.OldFileName,state.OldPath,state.FileName,state.Path));
- break;
+ var children = StatusKeeper.GetChildren(oldFileState);
+ foreach (var child in children)
+ {
+ var newPath = Path.Combine(state.Path, child.FilePath.Substring(state.OldPath.Length+1));
+
+ var oldMoveInfo = child.IsFolder
+ ? (FileSystemInfo) new DirectoryInfo(child.FilePath)
+ : new FileInfo(child.FilePath);
+ var newMoveInfo = child.IsFolder
+ ? (FileSystemInfo) new DirectoryInfo(newPath)
+ : new FileInfo(newPath);
+ //The new file path will be created by trimming the old root path
+ //and substituting the new root path
+
+ NetworkAgent.Post(new CloudMoveAction(state.AccountInfo, CloudActionType.RenameCloud,
+ oldMoveInfo, newMoveInfo));
+ }
}
+ }*/
+
+
+ //Starts interrupted files for a specific account
+ public void RestartInterruptedFiles(AccountInfo accountInfo)
+ {
+
+ StatusKeeper.CleanupOrphanStates();
+ using (log4net.ThreadContext.Stacks["Operation"].Push("RestartInterrupted"))
+ {
+ if (Log.IsDebugEnabled)
+ Log.Debug("Starting interrupted files");
- return;
+ var cachePath = Path.Combine(accountInfo.AccountPath, FolderConstants.CacheFolder)
+ .ToLower();
+
+
+
+
+ var account = accountInfo;
+ var pendingEntries = (from state in FileState.Queryable
+ where state.FileStatus != FileStatus.Unchanged &&
+ state.FileStatus != FileStatus.Forbidden &&
+ state.FileStatus != FileStatus.Conflict &&
+ !state.FilePath.StartsWith(cachePath) &&
+ !state.FilePath.EndsWith(".ignore") &&
+ state.FilePath.StartsWith(account.AccountPath)
+ select state).ToList();
+ if (pendingEntries.Count>0)
+ StatusNotification.NotifyChange("Restart processing interrupted files", TraceLevel.Verbose);
+
+ var pendingStates = pendingEntries
+ .Select(state => new WorkflowState(account, state))
+ .ToList();
+
+ if (Log.IsDebugEnabled)
+ Log.DebugFormat("Found {0} interrupted files", pendingStates.Count);
+
+ pendingStates.ForEach(Post);
+ }
}
-
+
public void Post(WorkflowState workflowState)
{
+ if (Log.IsDebugEnabled)
+ Log.DebugFormat("Posted {0} {1} {2}", workflowState.Path, workflowState.Status, workflowState.TriggeringChange);
+
+ //Remove invalid state
+ //For now, ignore paths
+ /* if (Directory.Exists(workflowState.Path))
+ return;*/
+ //TODO: Need to handle folder renames
+
+ Debug.Assert(workflowState.Path.StartsWith(workflowState.AccountInfo.AccountPath, StringComparison.InvariantCultureIgnoreCase), "File from wrong account posted");
+
_agent.Post(workflowState);
- }
+ }
+
}
}