Statistics
| Branch: | Revision:

root / trunk / Pithos.Core / Agents / WorkflowAgent.cs @ 70f12b36

History | View | Annotate | Download (10.9 kB)

1
// -----------------------------------------------------------------------
2
// <copyright file="WorkflowAgent.cs" company="GRNET">
3
// Copyright 2011 GRNET S.A. All rights reserved.
4
// 
5
// Redistribution and use in source and binary forms, with or
6
// without modification, are permitted provided that the following
7
// conditions are met:
8
// 
9
//   1. Redistributions of source code must retain the above
10
//      copyright notice, this list of conditions and the following
11
//      disclaimer.
12
// 
13
//   2. Redistributions in binary form must reproduce the above
14
//      copyright notice, this list of conditions and the following
15
//      disclaimer in the documentation and/or other materials
16
//      provided with the distribution.
17
// 
18
// THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
19
// OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
20
// WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
21
// PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
22
// CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
23
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
24
// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
25
// USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
26
// AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
27
// LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
28
// ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
29
// POSSIBILITY OF SUCH DAMAGE.
30
// 
31
// The views and conclusions contained in the software and
32
// documentation are those of the authors and should not be
33
// interpreted as representing official policies, either expressed
34
// or implied, of GRNET S.A.
35
// </copyright>
36
// -----------------------------------------------------------------------
37

    
38
using System;
39
using System.Collections.Generic;
40
using System.ComponentModel.Composition;
41
using System.Diagnostics;
42
using System.Diagnostics.Contracts;
43
using System.IO;
44
using System.Linq;
45
using System.Text;
46
using System.Threading.Tasks;
47
using Castle.ActiveRecord;
48
using Pithos.Interfaces;
49
using Pithos.Network;
50
using log4net;
51

    
52
namespace Pithos.Core.Agents
53
{
54
    [Export]
55
    public class WorkflowAgent
56
    {
57
        readonly Agent<WorkflowState> _agent;
58
                
59
        public IStatusNotification StatusNotification { get; set; }
60
        [System.ComponentModel.Composition.Import]
61
        public IStatusKeeper StatusKeeper { get; set; }
62

    
63
        [System.ComponentModel.Composition.Import]
64
        public NetworkAgent NetworkAgent { get; set; }
65

    
66
        private static readonly ILog Log = LogManager.GetLogger("WorkflowAgent");
67

    
68
        public WorkflowAgent()
69
        {
70
            _agent = Agent<WorkflowState>.Start(inbox =>
71
            {
72
                Action loop = null;
73
                loop = () =>
74
                {
75
                    var message = inbox.Receive();
76
                    var process = message.Then(Process, inbox.CancellationToken);                        
77
                    inbox.LoopAsync(process,loop,ex=>
78
                            Log.ErrorFormat("[ERROR] Synch for {0}:\r{1}", message.Result.FileName, ex));
79
                };
80
                loop();
81
            });
82
        }
83

    
84
        private Task<object> Process(WorkflowState state)
85
        {
86
            var accountInfo = state.AccountInfo;
87
            using (log4net.ThreadContext.Stacks["Workflow"].Push("Process"))
88
            {
89
                try
90
                {
91

    
92
                    if (Log.IsDebugEnabled)
93
                        Log.DebugFormat("State {0} {1} {2}", state.FileName, state.Status, state.TriggeringChange);
94

    
95
                    if (state.Skip)
96
                    {
97
                        if (Log.IsDebugEnabled) Log.DebugFormat("Skipping {0}", state.FileName);
98

    
99
                        return CompletedTask<object>.Default;
100
                    }                    
101

    
102
                    var info = Directory.Exists(state.Path) ? (FileSystemInfo)new DirectoryInfo(state.Path) : new FileInfo(state.Path);
103

    
104
                    //Bypass deleted files, unless the status is Deleted
105
                    if (!info.Exists && state.Status != FileStatus.Deleted)
106
                    {
107
                        state.Skip = true;
108
                        this.StatusKeeper.ClearFileStatus(state.Path);
109

    
110
                        if (Log.IsDebugEnabled) Log.DebugFormat("Skipped missing {0}", state.FileName);
111

    
112
                        return CompletedTask<object>.Default;
113
                    }
114

    
115
                    using (new SessionScope(FlushAction.Never))
116
                    {
117

    
118
                        var fileState = StatusKeeper.GetStateByFilePath(state.Path);
119

    
120
                        switch (state.Status)
121
                        {
122
                            case FileStatus.Created:
123
                            case FileStatus.Modified:
124
                                NetworkAgent.Post(new CloudUploadAction(accountInfo, info, fileState,
125
                                                                        accountInfo.BlockSize,
126
                                                                        accountInfo.BlockHash));
127
                                break;
128
                            case FileStatus.Deleted:
129
                                DeleteChildObjects(state, fileState);
130
                                NetworkAgent.Post(new CloudDeleteAction(accountInfo, info, fileState));
131
                                break;
132
                            case FileStatus.Renamed:
133
                                FileSystemInfo oldInfo = Directory.Exists(state.OldPath)
134
                                                             ? (FileSystemInfo) new DirectoryInfo(state.OldPath)
135
                                                             : new FileInfo(state.OldPath);
136
                                FileSystemInfo newInfo = Directory.Exists(state.Path)
137
                                                             ? (FileSystemInfo) new DirectoryInfo(state.Path)
138
                                                             : new FileInfo(state.Path);
139
                                NetworkAgent.Post(new CloudMoveAction(accountInfo, CloudActionType.RenameCloud,
140
                                                                      oldInfo,
141
                                                                      newInfo));                                
142
                                //TODO: Do I have to move children as well or will Pithos handle this?
143
                               //Need to find all children of the OLD filepath
144
                                MoveChildObjects(state);
145
                                break;
146
                        }
147
                    }
148

    
149
                    return CompletedTask<object>.Default;
150
                }
151
                catch (Exception ex)
152
                {
153
                    Log.Error(ex.ToString());
154
                    throw;
155
                }
156
            }
157
        }
158

    
159
        private void DeleteChildObjects(WorkflowState state, FileState fileState)
160
        {
161
            if (fileState != null)
162
            {
163
                var children = StatusKeeper.GetChildren(fileState);
164
                foreach (var child in children)
165
                {
166
                    var childInfo = child.IsFolder
167
                                        ? (FileSystemInfo) new DirectoryInfo(child.FilePath)
168
                                        : new FileInfo(child.FilePath);
169
                    NetworkAgent.Post(new CloudDeleteAction(state.AccountInfo, childInfo, child));
170
                }
171
            }
172
        }
173

    
174
        private void MoveChildObjects(WorkflowState state)
175
        {
176
            var oldFileState = StatusKeeper.GetStateByFilePath(state.OldPath);
177
            if (oldFileState != null)
178
            {
179
                var children = StatusKeeper.GetChildren(oldFileState);
180
                foreach (var child in children)
181
                {
182
                    var newPath = Path.Combine(state.Path, child.FilePath.Substring(state.OldPath.Length+1));
183

    
184
                    var oldMoveInfo = child.IsFolder
185
                                          ? (FileSystemInfo) new DirectoryInfo(child.FilePath)
186
                                          : new FileInfo(child.FilePath);
187
                    var newMoveInfo = child.IsFolder
188
                                          ? (FileSystemInfo) new DirectoryInfo(newPath)
189
                                          : new FileInfo(newPath);
190
                    //The new file path will be created by trimming the old root path
191
                    //and substituting the new root path
192

    
193
                    NetworkAgent.Post(new CloudMoveAction(state.AccountInfo, CloudActionType.RenameCloud,
194
                                                          oldMoveInfo, newMoveInfo));
195
                }
196
            }
197
        }
198

    
199

    
200
        //Starts interrupted files for a specific account
201
        public void RestartInterruptedFiles(AccountInfo accountInfo)
202
        {
203
            
204
            StatusNotification.NotifyChange("Restart processing interrupted files", TraceLevel.Verbose);
205

    
206
            using (log4net.ThreadContext.Stacks["Workflow"].Push("Restart"))
207
            {
208
                if (Log.IsDebugEnabled)
209
                    Log.Debug("Starting interrupted files");
210

    
211
                var cachePath = Path.Combine(accountInfo.AccountPath, FolderConstants.CacheFolder)
212
                    .ToLower();
213

    
214

    
215

    
216
                var account = accountInfo;
217
                var pendingEntries = from state in FileState.Queryable
218
                                     where state.FileStatus != FileStatus.Unchanged &&
219
                                           !state.FilePath.StartsWith(cachePath) &&
220
                                           !state.FilePath.EndsWith(".ignore") &&
221
                                           state.FilePath.StartsWith(account.AccountPath)
222
                                     select state;
223
                var pendingStates = new List<WorkflowState>();
224
                foreach (var state in pendingEntries)
225
                {
226
                        pendingStates.Add(new WorkflowState(account, state));
227
                }
228
                if (Log.IsDebugEnabled)
229
                    Log.DebugFormat("Found {0} interrupted files", pendingStates.Count);
230

    
231

    
232
                foreach (var entry in pendingStates)
233
                {
234
                       Post(entry);
235
                }
236
            }
237
        }
238

    
239

    
240

    
241
        public void Post(WorkflowState workflowState)
242
        {
243
            if (Log.IsDebugEnabled)
244
                Log.DebugFormat("Posted {0} {1} {2}", workflowState.Path, workflowState.Status, workflowState.TriggeringChange);
245

    
246
            //Remove invalid state            
247
            //For now, ignore paths
248
           /* if (Directory.Exists(workflowState.Path))
249
                return;*/
250
            //TODO: Need to handle folder renames            
251

    
252
            Debug.Assert(workflowState.Path.StartsWith(workflowState.AccountInfo.AccountPath, StringComparison.InvariantCultureIgnoreCase), "File from wrong account posted");
253

    
254
            _agent.Post(workflowState);
255
        }     
256

    
257
    }
258
}