Statistics
| Branch: | Revision:

root / trunk / Pithos.Core / Agents / WorkflowAgent.cs @ 2e3aee00

History | View | Annotate | Download (10.8 kB)

1
#region
2
/* -----------------------------------------------------------------------
3
 * <copyright file="WorkflowAgent.cs" company="GRNet">
4
 * 
5
 * Copyright 2011-2012 GRNET S.A. All rights reserved.
6
 *
7
 * Redistribution and use in source and binary forms, with or
8
 * without modification, are permitted provided that the following
9
 * conditions are met:
10
 *
11
 *   1. Redistributions of source code must retain the above
12
 *      copyright notice, this list of conditions and the following
13
 *      disclaimer.
14
 *
15
 *   2. Redistributions in binary form must reproduce the above
16
 *      copyright notice, this list of conditions and the following
17
 *      disclaimer in the documentation and/or other materials
18
 *      provided with the distribution.
19
 *
20
 *
21
 * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
22
 * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
23
 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
24
 * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
25
 * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26
 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27
 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
28
 * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
29
 * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
30
 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
31
 * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
32
 * POSSIBILITY OF SUCH DAMAGE.
33
 *
34
 * The views and conclusions contained in the software and
35
 * documentation are those of the authors and should not be
36
 * interpreted as representing official policies, either expressed
37
 * or implied, of GRNET S.A.
38
 * </copyright>
39
 * -----------------------------------------------------------------------
40
 */
41
#endregion
42
using System;
43
using System.Collections.Generic;
44
using System.ComponentModel.Composition;
45
using System.Diagnostics;
46
using System.Diagnostics.Contracts;
47
using System.IO;
48
using System.Linq;
49
using System.Text;
50
using System.Threading.Tasks;
51
using Castle.ActiveRecord;
52
using Pithos.Interfaces;
53
using Pithos.Network;
54
using log4net;
55

    
56
namespace Pithos.Core.Agents
57
{
58
    [Export]
59
    public class WorkflowAgent
60
    {
61
        readonly Agent<WorkflowState> _agent;
62
                
63
        public IStatusNotification StatusNotification { get; set; }
64
        [System.ComponentModel.Composition.Import]
65
        public IStatusKeeper StatusKeeper { get; set; }
66

    
67
        [System.ComponentModel.Composition.Import]
68
        public NetworkAgent NetworkAgent { get; set; }
69

    
70
        private static readonly ILog Log = LogManager.GetLogger("WorkflowAgent");
71

    
72
        public WorkflowAgent()
73
        {
74
            _agent = Agent<WorkflowState>.Start(inbox =>
75
            {
76
                Action loop = null;
77
                loop = () =>
78
                {
79
                    var message = inbox.Receive();
80
                    var process = message.Then(Process, inbox.CancellationToken);                        
81
                    inbox.LoopAsync(process,loop,ex=>
82
                            Log.ErrorFormat("[ERROR] Synch for {0}:\r{1}", message.Result.FileName, ex));
83
                };
84
                loop();
85
            });
86
        }
87

    
88
        private Task<object> Process(WorkflowState state)
89
        {
90
            var accountInfo = state.AccountInfo;
91
            using (log4net.ThreadContext.Stacks["Workflow"].Push("Process"))
92
            {
93
                try
94
                {
95

    
96
                    if (Log.IsDebugEnabled)
97
                        Log.DebugFormat("State {0} {1} {2}", state.FileName, state.Status, state.TriggeringChange);
98

    
99
                    if (state.Skip)
100
                    {
101
                        if (Log.IsDebugEnabled) Log.DebugFormat("Skipping {0}", state.FileName);
102

    
103
                        return CompletedTask<object>.Default;
104
                    }                    
105

    
106
                    var info = Directory.Exists(state.Path) ? (FileSystemInfo)new DirectoryInfo(state.Path) : new FileInfo(state.Path);
107

    
108
                    //Bypass deleted files, unless the status is Deleted
109
                    if (!info.Exists && state.Status != FileStatus.Deleted)
110
                    {
111
                        state.Skip = true;
112
                        this.StatusKeeper.ClearFileStatus(state.Path);
113

    
114
                        if (Log.IsDebugEnabled) Log.DebugFormat("Skipped missing {0}", state.FileName);
115

    
116
                        return CompletedTask<object>.Default;
117
                    }
118

    
119
                    using (new SessionScope(FlushAction.Never))
120
                    {
121

    
122
                        var fileState = StatusKeeper.GetStateByFilePath(state.Path);
123

    
124
                        switch (state.Status)
125
                        {
126
                            case FileStatus.Created:
127
                            case FileStatus.Modified:
128
                                NetworkAgent.Post(new CloudUploadAction(accountInfo, info, fileState,
129
                                                                        accountInfo.BlockSize,
130
                                                                        accountInfo.BlockHash));
131
                                break;
132
                            case FileStatus.Deleted:
133
                                DeleteChildObjects(state, fileState);
134
                                NetworkAgent.Post(new CloudDeleteAction(accountInfo, info, fileState));
135
                                break;
136
                            case FileStatus.Renamed:
137
                                FileSystemInfo oldInfo = Directory.Exists(state.OldPath)
138
                                                             ? (FileSystemInfo) new DirectoryInfo(state.OldPath)
139
                                                             : new FileInfo(state.OldPath);
140
                                FileSystemInfo newInfo = Directory.Exists(state.Path)
141
                                                             ? (FileSystemInfo) new DirectoryInfo(state.Path)
142
                                                             : new FileInfo(state.Path);
143
                                NetworkAgent.Post(new CloudMoveAction(accountInfo, CloudActionType.RenameCloud,
144
                                                                      oldInfo,
145
                                                                      newInfo));                                
146
                                //TODO: Do I have to move children as well or will Pithos handle this?
147
                               //Need to find all children of the OLD filepath
148
                                //MoveChildObjects(state);
149
                                break;
150
                        }
151
                    }
152

    
153
                    return CompletedTask<object>.Default;
154
                }
155
                catch (Exception ex)
156
                {
157
                    Log.Error(ex.ToString());
158
                    throw;
159
                }
160
            }
161
        }
162

    
163

    
164
        private void DeleteChildObjects(WorkflowState state, FileState fileState)
165
        {
166
            if (fileState != null)
167
            {
168
                var children = StatusKeeper.GetChildren(fileState);
169
                foreach (var child in children)
170
                {
171
                    var childInfo = child.IsFolder
172
                                        ? (FileSystemInfo) new DirectoryInfo(child.FilePath)
173
                                        : new FileInfo(child.FilePath);
174
                    NetworkAgent.Post(new CloudDeleteAction(state.AccountInfo, childInfo, child));
175
                }
176
            }
177
        }
178

    
179
        /*private void MoveChildObjects(WorkflowState state)
180
        {
181
            var oldFileState = StatusKeeper.GetStateByFilePath(state.OldPath);
182
            if (oldFileState != null)
183
            {
184
                var children = StatusKeeper.GetChildren(oldFileState);
185
                foreach (var child in children)
186
                {
187
                    var newPath = Path.Combine(state.Path, child.FilePath.Substring(state.OldPath.Length+1));
188

    
189
                    var oldMoveInfo = child.IsFolder
190
                                          ? (FileSystemInfo) new DirectoryInfo(child.FilePath)
191
                                          : new FileInfo(child.FilePath);
192
                    var newMoveInfo = child.IsFolder
193
                                          ? (FileSystemInfo) new DirectoryInfo(newPath)
194
                                          : new FileInfo(newPath);
195
                    //The new file path will be created by trimming the old root path
196
                    //and substituting the new root path
197

    
198
                    NetworkAgent.Post(new CloudMoveAction(state.AccountInfo, CloudActionType.RenameCloud,
199
                                                          oldMoveInfo, newMoveInfo));
200
                }
201
            }
202
        }*/
203

    
204

    
205
        //Starts interrupted files for a specific account
206
        public void RestartInterruptedFiles(AccountInfo accountInfo)
207
        {
208
            
209

    
210
            using (log4net.ThreadContext.Stacks["Workflow"].Push("Restart"))
211
            {
212
                if (Log.IsDebugEnabled)
213
                    Log.Debug("Starting interrupted files");
214

    
215
                var cachePath = Path.Combine(accountInfo.AccountPath, FolderConstants.CacheFolder)
216
                    .ToLower();
217

    
218

    
219

    
220
                var account = accountInfo;
221
                var pendingEntries = (from state in FileState.Queryable
222
                                     where state.FileStatus != FileStatus.Unchanged &&
223
                                           !state.FilePath.StartsWith(cachePath) &&
224
                                           !state.FilePath.EndsWith(".ignore") &&
225
                                           state.FilePath.StartsWith(account.AccountPath)
226
                                     select state).ToList();
227
                if (pendingEntries.Count>0)
228
                    StatusNotification.NotifyChange("Restart processing interrupted files", TraceLevel.Verbose);
229

    
230
                var pendingStates = pendingEntries
231
                    .Select(state => new WorkflowState(account, state))
232
                    .ToList();
233

    
234
                if (Log.IsDebugEnabled)
235
                    Log.DebugFormat("Found {0} interrupted files", pendingStates.Count);
236

    
237
                pendingStates.ForEach(Post);
238
            }
239
        }
240

    
241

    
242

    
243
        public void Post(WorkflowState workflowState)
244
        {
245
            if (Log.IsDebugEnabled)
246
                Log.DebugFormat("Posted {0} {1} {2}", workflowState.Path, workflowState.Status, workflowState.TriggeringChange);
247

    
248
            //Remove invalid state            
249
            //For now, ignore paths
250
           /* if (Directory.Exists(workflowState.Path))
251
                return;*/
252
            //TODO: Need to handle folder renames            
253

    
254
            Debug.Assert(workflowState.Path.StartsWith(workflowState.AccountInfo.AccountPath, StringComparison.InvariantCultureIgnoreCase), "File from wrong account posted");
255

    
256
            _agent.Post(workflowState);
257
        }     
258

    
259
    }
260
}