Further changes to reduce locking and switch to WAL journal mode for SQLite
[pithos-ms-client] / trunk / Pithos.Core / Agents / WorkflowAgent.cs
1 // -----------------------------------------------------------------------
2 // <copyright file="WorkflowAgent.cs" company="GRNET">
3 // Copyright 2011 GRNET S.A. All rights reserved.
4 // 
5 // Redistribution and use in source and binary forms, with or
6 // without modification, are permitted provided that the following
7 // conditions are met:
8 // 
9 //   1. Redistributions of source code must retain the above
10 //      copyright notice, this list of conditions and the following
11 //      disclaimer.
12 // 
13 //   2. Redistributions in binary form must reproduce the above
14 //      copyright notice, this list of conditions and the following
15 //      disclaimer in the documentation and/or other materials
16 //      provided with the distribution.
17 // 
18 // THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
19 // OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
20 // WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
21 // PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
22 // CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
23 // SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
24 // LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
25 // USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
26 // AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
27 // LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
28 // ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
29 // POSSIBILITY OF SUCH DAMAGE.
30 // 
31 // The views and conclusions contained in the software and
32 // documentation are those of the authors and should not be
33 // interpreted as representing official policies, either expressed
34 // or implied, of GRNET S.A.
35 // </copyright>
36 // -----------------------------------------------------------------------
37
38 using System;
39 using System.Collections.Generic;
40 using System.ComponentModel.Composition;
41 using System.Diagnostics;
42 using System.Diagnostics.Contracts;
43 using System.IO;
44 using System.Linq;
45 using System.Text;
46 using System.Threading.Tasks;
47 using Castle.ActiveRecord;
48 using Pithos.Interfaces;
49 using Pithos.Network;
50 using log4net;
51
52 namespace Pithos.Core.Agents
53 {
54     [Export]
55     public class WorkflowAgent
56     {
57         Agent<WorkflowState> _agent;
58                 
59         public IStatusNotification StatusNotification { get; set; }
60         [System.ComponentModel.Composition.Import]
61         public IStatusKeeper StatusKeeper { get; set; }
62
63         [System.ComponentModel.Composition.Import]
64         public NetworkAgent NetworkAgent { get; set; }
65
66         private static readonly ILog Log = LogManager.GetLogger("WorkflowAgent");
67
68         public void Start()
69         {
70             _agent = Agent<WorkflowState>.Start(inbox =>
71             {
72                 Action loop = null;
73                 loop = () =>
74                 {
75                     var message = inbox.Receive();
76                     var process = message.Then(Process, inbox.CancellationToken);                        
77                     inbox.LoopAsync(process,loop,ex=>
78                             Log.ErrorFormat("[ERROR] Synch for {0}:\r{1}", message.Result.FileName, ex));
79                 };
80                 loop();
81             });
82         }
83
84         private Task<object> Process(WorkflowState state)
85         {
86             var accountInfo = state.AccountInfo;
87             using (log4net.ThreadContext.Stacks["Workflow"].Push("Process"))
88             {
89                 try
90                 {
91
92                     if (Log.IsDebugEnabled)
93                         Log.DebugFormat("State {0} {1} {2}", state.FileName, state.Status, state.TriggeringChange);
94
95                     if (state.Skip)
96                     {
97                         if (Log.IsDebugEnabled) Log.DebugFormat("Skipping {0}", state.FileName);
98
99                         return CompletedTask<object>.Default;
100                     }
101                     string path = state.Path.ToLower();
102
103
104
105                     FileSystemInfo info = Directory.Exists(path) ? (FileSystemInfo) new DirectoryInfo(path) : new FileInfo(path);
106
107                     //Bypass deleted files, unless the status is Deleted
108                     if (!info.Exists && state.Status != FileStatus.Deleted)
109                     {
110                         state.Skip = true;
111                         this.StatusKeeper.ClearFileStatus(path);
112
113                         if (Log.IsDebugEnabled) Log.DebugFormat("Skipped missing {0}", state.FileName);
114
115                         return CompletedTask<object>.Default;
116                     }
117
118                     using (new SessionScope(FlushAction.Never))
119                     {
120
121                         var fileState = StatusKeeper.GetStateByFilePath(path);
122
123                         switch (state.Status)
124                         {
125                             case FileStatus.Created:
126                             case FileStatus.Modified:
127                                 NetworkAgent.Post(new CloudUploadAction(accountInfo, info, fileState,
128                                                                         accountInfo.BlockSize,
129                                                                         accountInfo.BlockHash));
130                                 break;
131                             case FileStatus.Deleted:
132                                 NetworkAgent.Post(new CloudDeleteAction(accountInfo, info, fileState));
133                                 break;
134                             case FileStatus.Renamed:
135                                 FileSystemInfo oldInfo = Directory.Exists(state.OldPath)
136                                                              ? (FileSystemInfo) new DirectoryInfo(state.OldPath)
137                                                              : new FileInfo(state.OldPath);
138                                 FileSystemInfo newInfo = Directory.Exists(state.Path)
139                                                              ? (FileSystemInfo) new DirectoryInfo(state.Path)
140                                                              : new FileInfo(state.Path);
141                                 NetworkAgent.Post(new CloudMoveAction(accountInfo, CloudActionType.RenameCloud,
142                                                                       oldInfo,
143                                                                       newInfo));
144                                 break;
145                         }
146                     }
147
148                     return CompletedTask<object>.Default;
149                 }
150                 catch (Exception ex)
151                 {
152                     Log.Error(ex.ToString());
153                     throw;
154                 }
155             }
156         }
157
158
159         //Starts interrupted files for a specific account
160         public void RestartInterruptedFiles(AccountInfo accountInfo)
161         {
162             
163             StatusNotification.NotifyChange("Restart processing interrupted files", TraceLevel.Verbose);
164
165             using (log4net.ThreadContext.Stacks["Workflow"].Push("Restart"))
166             {
167                 if (Log.IsDebugEnabled)
168                     Log.Debug("Starting interrupted files");
169
170                 var cachePath = Path.Combine(accountInfo.AccountPath, FolderConstants.CacheFolder)
171                     .ToLower();
172
173
174
175                 var account = accountInfo;
176                 var pendingEntries = from state in FileState.Queryable
177                                      where state.FileStatus != FileStatus.Unchanged &&
178                                            !state.FilePath.StartsWith(cachePath) &&
179                                            !state.FilePath.EndsWith(".ignore") &&
180                                            state.FilePath.StartsWith(account.AccountPath)
181                                      select state;
182                 var pendingStates = new List<WorkflowState>();
183                 foreach (var state in pendingEntries)
184                 {
185                         pendingStates.Add(new WorkflowState(account, state));
186                 }
187                 if (Log.IsDebugEnabled)
188                     Log.DebugFormat("Found {0} interrupted files", pendingStates.Count);
189
190
191                 foreach (var entry in pendingStates)
192                 {
193                        Post(entry);
194                 }
195             }
196         }
197
198
199
200         public void Post(WorkflowState workflowState)
201         {
202             if (Log.IsDebugEnabled)
203                 Log.DebugFormat("Posted {0} {1} {2}", workflowState.Path, workflowState.Status, workflowState.TriggeringChange);
204
205             //Remove invalid state            
206             //For now, ignore paths
207            /* if (Directory.Exists(workflowState.Path))
208                 return;*/
209             //TODO: Need to handle folder renames            
210
211             Debug.Assert(workflowState.Path.StartsWith(workflowState.AccountInfo.AccountPath, StringComparison.InvariantCultureIgnoreCase), "File from wrong account posted");
212
213             _agent.Post(workflowState);
214         }     
215
216     }
217 }