a6cd3c271e7f4b766ffbc4cf0f7229f191af5216
[pithos-ms-client] / trunk%2FPithos.Core%2FAgents%2FWorkflowAgent.cs
1 #region
2 /* -----------------------------------------------------------------------
3  * <copyright file="WorkflowAgent.cs" company="GRNet">
4  * 
5  * Copyright 2011-2012 GRNET S.A. All rights reserved.
6  *
7  * Redistribution and use in source and binary forms, with or
8  * without modification, are permitted provided that the following
9  * conditions are met:
10  *
11  *   1. Redistributions of source code must retain the above
12  *      copyright notice, this list of conditions and the following
13  *      disclaimer.
14  *
15  *   2. Redistributions in binary form must reproduce the above
16  *      copyright notice, this list of conditions and the following
17  *      disclaimer in the documentation and/or other materials
18  *      provided with the distribution.
19  *
20  *
21  * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
22  * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
23  * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
24  * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
25  * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
28  * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
29  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
30  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
31  * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
32  * POSSIBILITY OF SUCH DAMAGE.
33  *
34  * The views and conclusions contained in the software and
35  * documentation are those of the authors and should not be
36  * interpreted as representing official policies, either expressed
37  * or implied, of GRNET S.A.
38  * </copyright>
39  * -----------------------------------------------------------------------
40  */
41 #endregion
42 using System;
43 using System.Collections.Generic;
44 using System.ComponentModel.Composition;
45 using System.Diagnostics;
46 using System.Diagnostics.Contracts;
47 using System.IO;
48 using System.Linq;
49 using System.Reflection;
50 using System.Text;
51 using System.Threading.Tasks;
52 using Castle.ActiveRecord;
53 using Pithos.Interfaces;
54 using Pithos.Network;
55 using log4net;
56
57 namespace Pithos.Core.Agents
58 {
59     [Export]
60     public class WorkflowAgent
61     {
62         private static readonly ILog Log = LogManager.GetLogger(MethodBase.GetCurrentMethod().DeclaringType);
63
64         readonly Agent<WorkflowState> _agent;
65                 
66         public IStatusNotification StatusNotification { get; set; }
67         [System.ComponentModel.Composition.Import]
68         public IStatusKeeper StatusKeeper { get; set; }
69
70         [System.ComponentModel.Composition.Import]
71         public NetworkAgent NetworkAgent { get; set; }
72
73         [System.ComponentModel.Composition.Import]
74         public IPithosSettings Settings { get; set; }
75
76
77         public WorkflowAgent()
78         {
79             _agent = Agent<WorkflowState>.Start(inbox =>
80             {
81                 Action loop = null;
82                 loop = () =>
83                 {
84                     var message = inbox.Receive();
85                     var process = message.Then(Process, inbox.CancellationToken);                        
86                     inbox.LoopAsync(process,loop,ex=>
87                             Log.ErrorFormat("[ERROR] Synch for {0}:\r{1}", message.Result.FileName, ex));
88                 };
89                 loop();
90             });
91         }
92
93         private Task<object> Process(WorkflowState state)
94         {
95             var accountInfo = state.AccountInfo;
96             using (log4net.ThreadContext.Stacks["Workflow"].Push("Process"))
97             {
98                 try
99                 {
100
101                     if (Log.IsDebugEnabled)
102                         Log.DebugFormat("State {0} {1} {2}", state.FileName, state.Status, state.TriggeringChange);
103
104                     if (state.Skip)
105                     {
106                         if (Log.IsDebugEnabled) Log.DebugFormat("Skipping {0}", state.FileName);
107
108                         return CompletedTask<object>.Default;
109                     }                    
110
111                     var info = Directory.Exists(state.Path) ? (FileSystemInfo)new DirectoryInfo(state.Path) : new FileInfo(state.Path);
112
113                     //Bypass deleted files, unless the status is Deleted
114                     if (!info.Exists && state.Status != FileStatus.Deleted)
115                     {
116                         state.Skip = true;
117                         this.StatusKeeper.ClearFileStatus(state.Path);
118
119                         if (Log.IsDebugEnabled) Log.DebugFormat("Skipped missing {0}", state.FileName);
120
121                         return CompletedTask<object>.Default;
122                     }
123
124                     using (new SessionScope(FlushAction.Never))
125                     {
126
127                         var fileState = StatusKeeper.GetStateByFilePath(state.Path);
128
129                         switch (state.Status)
130                         {
131                             case FileStatus.Created:
132                             case FileStatus.Modified:
133                                 NetworkAgent.Post(new CloudUploadAction(accountInfo, info, fileState,
134                                                                         accountInfo.BlockSize,
135                                                                         accountInfo.BlockHash));
136                                 break;
137                             case FileStatus.Deleted:
138                                 DeleteChildObjects(state, fileState);
139                                 NetworkAgent.Post(new CloudDeleteAction(accountInfo, info, fileState));
140                                 break;
141                             case FileStatus.Renamed:
142                                 if (state.OldPath == null)
143                                 {
144                                     //We reach this point only if the app closed before propagating a rename to the server
145                                     Log.WarnFormat("Unfinished rename [{0}]",state.Path);
146                                     StatusKeeper.SetFileState(state.Path,FileStatus.Conflict,FileOverlayStatus.Conflict);
147                                     break;
148                                 }
149                                 FileSystemInfo oldInfo = Directory.Exists(state.OldPath)
150                                                              ? (FileSystemInfo) new DirectoryInfo(state.OldPath)
151                                                              : new FileInfo(state.OldPath);
152                                 FileSystemInfo newInfo = Directory.Exists(state.Path)
153                                                              ? (FileSystemInfo) new DirectoryInfo(state.Path)
154                                                              : new FileInfo(state.Path);
155                                 NetworkAgent.Post(new CloudMoveAction(accountInfo, CloudActionType.RenameCloud,
156                                                                       oldInfo,
157                                                                       newInfo));                                
158                                 //TODO: Do I have to move children as well or will Pithos handle this?
159                                //Need to find all children of the OLD filepath
160                                 //MoveChildObjects(state);
161                                 break;
162                         }
163                     }
164
165                     return CompletedTask<object>.Default;
166                 }
167                 catch (Exception ex)
168                 {
169                     Log.Error(ex.ToString());
170                     throw;
171                 }
172             }
173         }
174
175
176         private void DeleteChildObjects(WorkflowState state, FileState fileState)
177         {
178             if (fileState != null)
179             {
180                 var children = StatusKeeper.GetChildren(fileState);
181                 foreach (var child in children)
182                 {
183                     var childInfo = child.IsFolder
184                                         ? (FileSystemInfo) new DirectoryInfo(child.FilePath)
185                                         : new FileInfo(child.FilePath);
186                     NetworkAgent.Post(new CloudDeleteAction(state.AccountInfo, childInfo, child));
187                 }
188             }
189         }
190
191         /*private void MoveChildObjects(WorkflowState state)
192         {
193             var oldFileState = StatusKeeper.GetStateByFilePath(state.OldPath);
194             if (oldFileState != null)
195             {
196                 var children = StatusKeeper.GetChildren(oldFileState);
197                 foreach (var child in children)
198                 {
199                     var newPath = Path.Combine(state.Path, child.FilePath.Substring(state.OldPath.Length+1));
200
201                     var oldMoveInfo = child.IsFolder
202                                           ? (FileSystemInfo) new DirectoryInfo(child.FilePath)
203                                           : new FileInfo(child.FilePath);
204                     var newMoveInfo = child.IsFolder
205                                           ? (FileSystemInfo) new DirectoryInfo(newPath)
206                                           : new FileInfo(newPath);
207                     //The new file path will be created by trimming the old root path
208                     //and substituting the new root path
209
210                     NetworkAgent.Post(new CloudMoveAction(state.AccountInfo, CloudActionType.RenameCloud,
211                                                           oldMoveInfo, newMoveInfo));
212                 }
213             }
214         }*/
215
216
217         //Starts interrupted files for a specific account
218         public void RestartInterruptedFiles(AccountInfo accountInfo)
219         {
220             
221             StatusKeeper.CleanupOrphanStates();
222             using (log4net.ThreadContext.Stacks["Operation"].Push("RestartInterrupted"))
223             {
224                 if (Log.IsDebugEnabled)
225                     Log.Debug("Starting interrupted files");
226
227                 var cachePath = Path.Combine(accountInfo.AccountPath, FolderConstants.CacheFolder)
228                     .ToLower();
229
230
231                 
232                 
233                 var account = accountInfo;
234                 var pendingEntries = (from state in FileState.Queryable
235                                      where state.FileStatus != FileStatus.Unchanged &&
236                                            !state.FilePath.StartsWith(cachePath) &&
237                                            !state.FilePath.EndsWith(".ignore") &&
238                                            state.FilePath.StartsWith(account.AccountPath)                                            
239                                      select state).ToList();
240                 if (pendingEntries.Count>0)
241                     StatusNotification.NotifyChange("Restart processing interrupted files", TraceLevel.Verbose);
242
243                 var pendingStates = pendingEntries
244                     .Select(state => new WorkflowState(account, state))
245                     .ToList();
246
247                 if (Log.IsDebugEnabled)
248                     Log.DebugFormat("Found {0} interrupted files", pendingStates.Count);
249
250                 pendingStates.ForEach(Post);
251             }
252         }
253
254
255
256         public void Post(WorkflowState workflowState)
257         {
258             if (Log.IsDebugEnabled)
259                 Log.DebugFormat("Posted {0} {1} {2}", workflowState.Path, workflowState.Status, workflowState.TriggeringChange);
260
261             //Remove invalid state            
262             //For now, ignore paths
263            /* if (Directory.Exists(workflowState.Path))
264                 return;*/
265             //TODO: Need to handle folder renames            
266
267             Debug.Assert(workflowState.Path.StartsWith(workflowState.AccountInfo.AccountPath, StringComparison.InvariantCultureIgnoreCase), "File from wrong account posted");
268
269             _agent.Post(workflowState);
270         }     
271
272     }
273 }