1 // -----------------------------------------------------------------------
2 // <copyright file="WorkflowAgent.cs" company="GRNET">
3 // Copyright 2011 GRNET S.A. All rights reserved.
5 // Redistribution and use in source and binary forms, with or
6 // without modification, are permitted provided that the following
9 // 1. Redistributions of source code must retain the above
10 // copyright notice, this list of conditions and the following
13 // 2. Redistributions in binary form must reproduce the above
14 // copyright notice, this list of conditions and the following
15 // disclaimer in the documentation and/or other materials
16 // provided with the distribution.
18 // THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
19 // OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
20 // WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
21 // PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
22 // CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
23 // SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
24 // LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
25 // USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
26 // AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
27 // LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
28 // ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
29 // POSSIBILITY OF SUCH DAMAGE.
31 // The views and conclusions contained in the software and
32 // documentation are those of the authors and should not be
33 // interpreted as representing official policies, either expressed
34 // or implied, of GRNET S.A.
36 // -----------------------------------------------------------------------
39 using System.Collections.Generic;
40 using System.ComponentModel.Composition;
41 using System.Diagnostics;
42 using System.Diagnostics.Contracts;
46 using System.Threading.Tasks;
47 using Castle.ActiveRecord;
48 using Pithos.Interfaces;
52 namespace Pithos.Core.Agents
55 public class WorkflowAgent
57 Agent<WorkflowState> _agent;
59 public IStatusNotification StatusNotification { get; set; }
60 [System.ComponentModel.Composition.Import]
61 public IStatusKeeper StatusKeeper { get; set; }
63 [System.ComponentModel.Composition.Import]
64 public NetworkAgent NetworkAgent { get; set; }
66 private static readonly ILog Log = LogManager.GetLogger("WorkflowAgent");
70 _agent = Agent<WorkflowState>.Start(inbox =>
75 var message = inbox.Receive();
76 var process = message.Then(Process, inbox.CancellationToken);
77 inbox.LoopAsync(process,loop,ex=>
78 Log.ErrorFormat("[ERROR] Synch for {0}:\r{1}", message.Result.FileName, ex));
84 private Task<object> Process(WorkflowState state)
86 var accountInfo = state.AccountInfo;
87 using (log4net.ThreadContext.Stacks["Workflow"].Push("Process"))
92 if (Log.IsDebugEnabled)
93 Log.DebugFormat("State {0} {1} {2}", state.FileName, state.Status, state.TriggeringChange);
97 if (Log.IsDebugEnabled) Log.DebugFormat("Skipping {0}", state.FileName);
99 return CompletedTask<object>.Default;
101 string path = state.Path.ToLower();
105 FileSystemInfo info = Directory.Exists(path) ? (FileSystemInfo) new DirectoryInfo(path) : new FileInfo(path);
107 //Bypass deleted files, unless the status is Deleted
108 if (!info.Exists && state.Status != FileStatus.Deleted)
111 this.StatusKeeper.ClearFileStatus(path);
113 if (Log.IsDebugEnabled) Log.DebugFormat("Skipped missing {0}", state.FileName);
115 return CompletedTask<object>.Default;
118 using (new SessionScope(FlushAction.Never))
121 var fileState = StatusKeeper.GetStateByFilePath(path);
123 switch (state.Status)
125 case FileStatus.Created:
126 case FileStatus.Modified:
127 NetworkAgent.Post(new CloudUploadAction(accountInfo, info, fileState,
128 accountInfo.BlockSize,
129 accountInfo.BlockHash));
131 case FileStatus.Deleted:
132 NetworkAgent.Post(new CloudDeleteAction(accountInfo, info, fileState));
134 case FileStatus.Renamed:
135 FileSystemInfo oldInfo = Directory.Exists(state.OldPath)
136 ? (FileSystemInfo) new DirectoryInfo(state.OldPath)
137 : new FileInfo(state.OldPath);
138 FileSystemInfo newInfo = Directory.Exists(state.Path)
139 ? (FileSystemInfo) new DirectoryInfo(state.Path)
140 : new FileInfo(state.Path);
141 NetworkAgent.Post(new CloudMoveAction(accountInfo, CloudActionType.RenameCloud,
148 return CompletedTask<object>.Default;
152 Log.Error(ex.ToString());
159 //Starts interrupted files for a specific account
160 public void RestartInterruptedFiles(AccountInfo accountInfo)
163 StatusNotification.NotifyChange("Restart processing interrupted files", TraceLevel.Verbose);
165 using (log4net.ThreadContext.Stacks["Workflow"].Push("Restart"))
167 if (Log.IsDebugEnabled)
168 Log.Debug("Starting interrupted files");
170 var cachePath = Path.Combine(accountInfo.AccountPath, FolderConstants.CacheFolder)
175 var account = accountInfo;
176 var pendingEntries = from state in FileState.Queryable
177 where state.FileStatus != FileStatus.Unchanged &&
178 !state.FilePath.StartsWith(cachePath) &&
179 !state.FilePath.EndsWith(".ignore") &&
180 state.FilePath.StartsWith(account.AccountPath)
182 var pendingStates = new List<WorkflowState>();
183 foreach (var state in pendingEntries)
185 pendingStates.Add(new WorkflowState(account, state));
187 if (Log.IsDebugEnabled)
188 Log.DebugFormat("Found {0} interrupted files", pendingStates.Count);
191 foreach (var entry in pendingStates)
200 public void Post(WorkflowState workflowState)
202 if (Log.IsDebugEnabled)
203 Log.DebugFormat("Posted {0} {1} {2}", workflowState.Path, workflowState.Status, workflowState.TriggeringChange);
205 //Remove invalid state
206 //For now, ignore paths
207 /* if (Directory.Exists(workflowState.Path))
209 //TODO: Need to handle folder renames
211 Debug.Assert(workflowState.Path.StartsWith(workflowState.AccountInfo.AccountPath, StringComparison.InvariantCultureIgnoreCase), "File from wrong account posted");
213 _agent.Post(workflowState);