Statistics
| Branch: | Revision:

root / trunk / Pithos.Core / Agents / WorkflowAgent.cs @ 8f44fd3a

History | View | Annotate | Download (12.5 kB)

1
#region
2
/* -----------------------------------------------------------------------
3
 * <copyright file="WorkflowAgent.cs" company="GRNet">
4
 * 
5
 * Copyright 2011-2012 GRNET S.A. All rights reserved.
6
 *
7
 * Redistribution and use in source and binary forms, with or
8
 * without modification, are permitted provided that the following
9
 * conditions are met:
10
 *
11
 *   1. Redistributions of source code must retain the above
12
 *      copyright notice, this list of conditions and the following
13
 *      disclaimer.
14
 *
15
 *   2. Redistributions in binary form must reproduce the above
16
 *      copyright notice, this list of conditions and the following
17
 *      disclaimer in the documentation and/or other materials
18
 *      provided with the distribution.
19
 *
20
 *
21
 * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
22
 * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
23
 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
24
 * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
25
 * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26
 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27
 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
28
 * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
29
 * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
30
 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
31
 * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
32
 * POSSIBILITY OF SUCH DAMAGE.
33
 *
34
 * The views and conclusions contained in the software and
35
 * documentation are those of the authors and should not be
36
 * interpreted as representing official policies, either expressed
37
 * or implied, of GRNET S.A.
38
 * </copyright>
39
 * -----------------------------------------------------------------------
40
 */
41
#endregion
42
using System;
43
using System.Collections.Generic;
44
using System.ComponentModel.Composition;
45
using System.Diagnostics;
46
using System.Diagnostics.Contracts;
47
using System.IO;
48
using System.Linq;
49
using System.Reflection;
50
using System.Text;
51
using System.Threading.Tasks;
52
using Castle.ActiveRecord;
53
using Pithos.Interfaces;
54
using Pithos.Network;
55
using log4net;
56

    
57
namespace Pithos.Core.Agents
58
{
59
    [Export]
60
    public class WorkflowAgent
61
    {
62
        private static readonly ILog Log = LogManager.GetLogger(MethodBase.GetCurrentMethod().DeclaringType);
63

    
64
        readonly Agent<WorkflowState> _agent;
65
                
66
        public IStatusNotification StatusNotification { get; set; }
67
        [System.ComponentModel.Composition.Import]
68
        public IStatusKeeper StatusKeeper { get; set; }
69

    
70
        [System.ComponentModel.Composition.Import]
71
        public NetworkAgent NetworkAgent { get; set; }
72

    
73
        [System.ComponentModel.Composition.Import]
74
        public IPithosSettings Settings { get; set; }
75

    
76
        private List<string> _selectivePaths = new List<string>();
77
        public List<string> SelectivePaths
78
        {
79
            get { return _selectivePaths; }
80
            set { _selectivePaths = value; }
81
        }
82

    
83
        public WorkflowAgent()
84
        {
85
            _agent = Agent<WorkflowState>.Start(inbox =>
86
            {
87
                Action loop = null;
88
                loop = () =>
89
                {
90
                    var message = inbox.Receive();
91
                    var process = message.Then(Process, inbox.CancellationToken);                        
92
                    inbox.LoopAsync(process,loop,ex=>
93
                            Log.ErrorFormat("[ERROR] Synch for {0}:\r{1}", message.Result.FileName, ex));
94
                };
95
                loop();
96
            });
97
        }
98

    
99
        private Task<object> Process(WorkflowState state)
100
        {
101
            var accountInfo = state.AccountInfo;
102
            using (log4net.ThreadContext.Stacks["Workflow"].Push("Process"))
103
            {
104
                try
105
                {
106

    
107
                    if (Log.IsDebugEnabled)
108
                        Log.DebugFormat("State {0} {1} {2}", state.FileName, state.Status, state.TriggeringChange);
109

    
110
                    if (state.Skip)
111
                    {
112
                        if (Log.IsDebugEnabled) Log.DebugFormat("Skipping {0}", state.FileName);
113

    
114
                        return CompletedTask<object>.Default;
115
                    }                    
116

    
117
                    var info = Directory.Exists(state.Path) ? (FileSystemInfo)new DirectoryInfo(state.Path) : new FileInfo(state.Path);
118

    
119
                    //Bypass deleted files, unless the status is Deleted
120
                    if (!info.Exists && state.Status != FileStatus.Deleted)
121
                    {
122
                        state.Skip = true;
123
                        this.StatusKeeper.ClearFileStatus(state.Path);
124

    
125
                        if (Log.IsDebugEnabled) Log.DebugFormat("Skipped missing {0}", state.FileName);
126

    
127
                        return CompletedTask<object>.Default;
128
                    }
129

    
130
                    using (new SessionScope(FlushAction.Never))
131
                    {
132

    
133
                        var fileState = StatusKeeper.GetStateByFilePath(state.Path);
134

    
135
                        switch (state.Status)
136
                        {
137
                            case FileStatus.Created:
138
                            case FileStatus.Modified:
139
                                NetworkAgent.Post(new CloudUploadAction(accountInfo, info, fileState,
140
                                                                        accountInfo.BlockSize,
141
                                                                        accountInfo.BlockHash));
142
                                break;
143
                            case FileStatus.Deleted:
144
                                DeleteChildObjects(state, fileState);
145
                                NetworkAgent.Post(new CloudDeleteAction(accountInfo, info, fileState));
146
                                break;
147
                            case FileStatus.Renamed:
148
                                if (state.OldPath == null)
149
                                {
150
                                    //We reach this point only if the app closed before propagating a rename to the server
151
                                    Log.WarnFormat("Unfinished rename [{0}]",state.Path);
152
                                    StatusKeeper.SetFileState(state.Path,FileStatus.Conflict,FileOverlayStatus.Conflict, "Rename without old path");
153
                                    break;
154
                                }
155
                                FileSystemInfo oldInfo = Directory.Exists(state.OldPath)
156
                                                             ? (FileSystemInfo) new DirectoryInfo(state.OldPath)
157
                                                             : new FileInfo(state.OldPath);
158
                                FileSystemInfo newInfo = Directory.Exists(state.Path)
159
                                                             ? (FileSystemInfo) new DirectoryInfo(state.Path)
160
                                                             : new FileInfo(state.Path);
161
                                NetworkAgent.Post(new CloudMoveAction(accountInfo, CloudActionType.RenameCloud,
162
                                                                      oldInfo,
163
                                                                      newInfo));                                
164
                                //TODO: Do I have to move children as well or will Pithos handle this?
165
                               //Need to find all children of the OLD filepath
166
                                //MoveChildObjects(state);
167
                                break;
168
                        }
169
                    }
170

    
171
                    return CompletedTask<object>.Default;
172
                }
173
                catch (Exception ex)
174
                {
175
                    Log.Error(ex.ToString());
176
                    throw;
177
                }
178
            }
179
        }
180

    
181

    
182
        private void DeleteChildObjects(WorkflowState state, FileState fileState)
183
        {
184
            if (fileState != null)
185
            {
186
                var children = StatusKeeper.GetChildren(fileState);
187
                foreach (var child in children)
188
                {
189
                    var childInfo = child.IsFolder
190
                                        ? (FileSystemInfo) new DirectoryInfo(child.FilePath)
191
                                        : new FileInfo(child.FilePath);
192
                    NetworkAgent.Post(new CloudDeleteAction(state.AccountInfo, childInfo, child));
193
                }
194
            }
195
        }
196

    
197
        /*private void MoveChildObjects(WorkflowState state)
198
        {
199
            var oldFileState = StatusKeeper.GetStateByFilePath(state.OldPath);
200
            if (oldFileState != null)
201
            {
202
                var children = StatusKeeper.GetChildren(oldFileState);
203
                foreach (var child in children)
204
                {
205
                    var newPath = Path.Combine(state.Path, child.FilePath.Substring(state.OldPath.Length+1));
206

    
207
                    var oldMoveInfo = child.IsFolder
208
                                          ? (FileSystemInfo) new DirectoryInfo(child.FilePath)
209
                                          : new FileInfo(child.FilePath);
210
                    var newMoveInfo = child.IsFolder
211
                                          ? (FileSystemInfo) new DirectoryInfo(newPath)
212
                                          : new FileInfo(newPath);
213
                    //The new file path will be created by trimming the old root path
214
                    //and substituting the new root path
215

    
216
                    NetworkAgent.Post(new CloudMoveAction(state.AccountInfo, CloudActionType.RenameCloud,
217
                                                          oldMoveInfo, newMoveInfo));
218
                }
219
            }
220
        }*/
221

    
222

    
223
        //Starts interrupted files for a specific account
224
        public void RestartInterruptedFiles(AccountInfo accountInfo)
225
        {
226
            
227
            StatusKeeper.CleanupOrphanStates();
228
            using (log4net.ThreadContext.Stacks["Operation"].Push("RestartInterrupted"))
229
            {
230
                if (Log.IsDebugEnabled)
231
                    Log.Debug("Starting interrupted files");
232

    
233
                var cachePath = Path.Combine(accountInfo.AccountPath, FolderConstants.CacheFolder)
234
                    .ToLower();
235

    
236

    
237
                
238
                
239
                var account = accountInfo;
240
                var pendingEntries = (from state in FileState.Queryable
241
                                     where state.FileStatus != FileStatus.Unchanged &&
242
                                            state.FileStatus != FileStatus.Forbidden &&
243
                                            state.FileStatus != FileStatus.Conflict &&
244
                                           !state.FilePath.StartsWith(cachePath) &&
245
                                           !state.FilePath.EndsWith(".ignore") &&
246
                                           state.FilePath.StartsWith(account.AccountPath)                                            
247
                                     select state).ToList();
248
                if (pendingEntries.Count>0)
249
                    StatusNotification.NotifyChange("Restart processing interrupted files", TraceLevel.Verbose);
250

    
251
                var pendingStates = pendingEntries
252
                    .Select(state => new WorkflowState(account, state))
253
                    .ToList();
254
                
255
                                
256
                if (Log.IsDebugEnabled)
257
                    Log.DebugFormat("Found {0} interrupted files", pendingStates.Count);
258

    
259
                pendingStates.ForEach(Post);
260
            }
261
        }
262

    
263

    
264

    
265
        public void Post(WorkflowState workflowState)
266
        {
267
            if (Log.IsDebugEnabled)
268
                Log.DebugFormat("Posted {0} {1} {2}", workflowState.Path, workflowState.Status, workflowState.TriggeringChange);
269

    
270
            //Remove invalid state            
271
            //For now, ignore paths
272
           /* if (Directory.Exists(workflowState.Path))
273
                return;*/
274
            //TODO: Need to handle folder renames            
275

    
276
            //If there are selective sync paths defined
277
            if (SelectivePaths.Count > 0
278
                //And the target file is not below any of the selective paths
279
                && !SelectivePaths.Any(workflowState.Path.IsAtOrDirectlyBelow))
280
            //abort the post
281
            {
282
                Log.InfoFormat("File skipped, not under a selected folder [{0}] ",workflowState.Path);
283
                return;
284
            }
285

    
286

    
287
            Debug.Assert(workflowState.Path.StartsWith(workflowState.AccountInfo.AccountPath, StringComparison.InvariantCultureIgnoreCase), "File from wrong account posted");
288

    
289
            _agent.Post(workflowState);
290
        }     
291

    
292
    }
293
}