Statistics
| Branch: | Revision:

root / trunk / Pithos.Core / Agents / WorkflowAgent.cs @ f3d080df

History | View | Annotate | Download (8.1 kB)

1
// -----------------------------------------------------------------------
2
// <copyright file="WorkflowAgent.cs" company="GRNET">
3
// Copyright 2011 GRNET S.A. All rights reserved.
4
// 
5
// Redistribution and use in source and binary forms, with or
6
// without modification, are permitted provided that the following
7
// conditions are met:
8
// 
9
//   1. Redistributions of source code must retain the above
10
//      copyright notice, this list of conditions and the following
11
//      disclaimer.
12
// 
13
//   2. Redistributions in binary form must reproduce the above
14
//      copyright notice, this list of conditions and the following
15
//      disclaimer in the documentation and/or other materials
16
//      provided with the distribution.
17
// 
18
// THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
19
// OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
20
// WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
21
// PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
22
// CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
23
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
24
// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
25
// USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
26
// AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
27
// LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
28
// ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
29
// POSSIBILITY OF SUCH DAMAGE.
30
// 
31
// The views and conclusions contained in the software and
32
// documentation are those of the authors and should not be
33
// interpreted as representing official policies, either expressed
34
// or implied, of GRNET S.A.
35
// </copyright>
36
// -----------------------------------------------------------------------
37

    
38
using System;
39
using System.Collections.Generic;
40
using System.ComponentModel.Composition;
41
using System.Diagnostics;
42
using System.Diagnostics.Contracts;
43
using System.IO;
44
using System.Linq;
45
using System.Text;
46
using System.Threading.Tasks;
47
using Pithos.Interfaces;
48
using Pithos.Network;
49
using log4net;
50

    
51
namespace Pithos.Core.Agents
52
{
53
    [Export]
54
    public class WorkflowAgent
55
    {
56
        Agent<WorkflowState> _agent;
57
                
58
        public IStatusNotification StatusNotification { get; set; }
59
        [Import]
60
        public IStatusKeeper StatusKeeper { get; set; }
61

    
62
        [Import]
63
        public NetworkAgent NetworkAgent { get; set; }
64

    
65
        private static readonly ILog Log = LogManager.GetLogger("WorkflowAgent");
66

    
67
        public void Start()
68
        {
69
            _agent = Agent<WorkflowState>.Start(inbox =>
70
            {
71
                Action loop = null;
72
                loop = () =>
73
                {
74
                    var message = inbox.Receive();
75
                    var process = message.Then(Process, inbox.CancellationToken);                        
76
                    inbox.LoopAsync(process,loop,ex=>
77
                            Log.ErrorFormat("[ERROR] Synch for {0}:\r{1}", message.Result.FileName, ex));
78
                };
79
                loop();
80
            });
81
        }
82

    
83
        private Task<object> Process(WorkflowState state)
84
        {
85
            var accountInfo = state.AccountInfo;
86
            using (log4net.ThreadContext.Stacks["Workflow"].Push("Process"))
87
            {
88
                try
89
                {
90

    
91
                    if (Log.IsDebugEnabled)
92
                        Log.DebugFormat("State {0} {1} {2}", state.FileName, state.Status, state.TriggeringChange);
93

    
94
                    if (state.Skip)
95
                    {
96
                        if (Log.IsDebugEnabled) Log.DebugFormat("Skipping {0}", state.FileName);
97

    
98
                        return CompletedTask<object>.Default;
99
                    }
100
                    string path = state.Path.ToLower();
101

    
102

    
103

    
104
                    FileSystemInfo info = Directory.Exists(path) ? (FileSystemInfo) new DirectoryInfo(path) : new FileInfo(path);
105

    
106
                    //Bypass deleted files, unless the status is Deleted
107
                    if (!info.Exists && state.Status != FileStatus.Deleted)
108
                    {
109
                        state.Skip = true;
110
                        this.StatusKeeper.ClearFileStatus(path);
111

    
112
                        if (Log.IsDebugEnabled) Log.DebugFormat("Skipped missing {0}", state.FileName);
113

    
114
                        return CompletedTask<object>.Default;
115
                    }
116

    
117
                    var fileState = FileState.FindByFilePath(path);
118

    
119

    
120
                    switch (state.Status)
121
                    {
122
                        case FileStatus.Created:
123
                        case FileStatus.Modified:
124
                            NetworkAgent.Post(new CloudUploadAction(accountInfo, info, fileState, accountInfo.BlockSize,
125
                                                                    accountInfo.BlockHash));
126
                            break;
127
                        case FileStatus.Deleted:
128
                            NetworkAgent.Post(new CloudDeleteAction(accountInfo, info, fileState));
129
                            break;
130
                        case FileStatus.Renamed:
131
                            FileSystemInfo oldInfo = Directory.Exists(state.OldPath) ? (FileSystemInfo)new DirectoryInfo(state.OldPath) : new FileInfo(state.OldPath);
132
                            FileSystemInfo newInfo = Directory.Exists(state.Path) ? (FileSystemInfo)new DirectoryInfo(state.Path) : new FileInfo(state.Path);
133
                            NetworkAgent.Post(new CloudMoveAction(accountInfo, CloudActionType.RenameCloud,
134
                                                                  oldInfo,
135
                                                                  newInfo));
136
                            break;
137
                    }
138

    
139
                    return CompletedTask<object>.Default;
140
                }
141
                catch (Exception ex)
142
                {
143
                    Log.Error(ex.ToString());
144
                    throw;
145
                }
146
            }
147
        }
148

    
149

    
150
        //Starts interrupted files for a specific account
151
        public void RestartInterruptedFiles(AccountInfo accountInfo)
152
        {
153
            
154
            StatusNotification.NotifyChange("Restart processing interrupted files", TraceLevel.Verbose);
155

    
156
            using (log4net.ThreadContext.Stacks["Workflow"].Push("Restart"))
157
            {
158
                if (Log.IsDebugEnabled)
159
                    Log.Debug("Starting interrupted files");
160

    
161
                var cachePath = Path.Combine(accountInfo.AccountPath, FolderConstants.CacheFolder)
162
                    .ToLower();
163

    
164

    
165

    
166
                var account = accountInfo;
167
                var pendingEntries = from state in FileState.Queryable
168
                                     where state.FileStatus != FileStatus.Unchanged &&
169
                                           !state.FilePath.StartsWith(cachePath) &&
170
                                           !state.FilePath.EndsWith(".ignore") &&
171
                                           state.FilePath.StartsWith(account.AccountPath)
172
                                     select state;
173
                var pendingStates = new List<WorkflowState>();
174
                foreach (var state in pendingEntries)
175
                {
176
                        pendingStates.Add(new WorkflowState(account, state));
177
                }
178
                if (Log.IsDebugEnabled)
179
                    Log.DebugFormat("Found {0} interrupted files", pendingStates.Count);
180

    
181

    
182
                foreach (var entry in pendingStates)
183
                {
184
                       Post(entry);
185
                }
186
            }
187
        }
188

    
189

    
190

    
191
        public void Post(WorkflowState workflowState)
192
        {
193
            if (Log.IsDebugEnabled)
194
                Log.DebugFormat("Posted {0} {1} {2}", workflowState.Path, workflowState.Status, workflowState.TriggeringChange);
195

    
196
            //Remove invalid state            
197
            //For now, ignore paths
198
           /* if (Directory.Exists(workflowState.Path))
199
                return;*/
200
            //TODO: Need to handle folder renames            
201

    
202
            Debug.Assert(workflowState.Path.StartsWith(workflowState.AccountInfo.AccountPath, StringComparison.InvariantCultureIgnoreCase), "File from wrong account posted");
203

    
204
            _agent.Post(workflowState);
205
        }     
206

    
207
    }
208
}