Statistics
| Branch: | Revision:

root / trunk / Pithos.Core / Agents / WorkflowAgent.cs @ 4f6d51d4

History | View | Annotate | Download (6.4 kB)

1
using System;
2
using System.Collections.Generic;
3
using System.ComponentModel.Composition;
4
using System.Diagnostics;
5
using System.Diagnostics.Contracts;
6
using System.IO;
7
using System.Linq;
8
using System.Text;
9
using System.Threading.Tasks;
10
using Pithos.Interfaces;
11
using Pithos.Network;
12
using log4net;
13

    
14
namespace Pithos.Core.Agents
15
{
16
    [Export]
17
    public class WorkflowAgent
18
    {
19
        Agent<WorkflowState> _agent;
20
                
21
        public IStatusNotification StatusNotification { get; set; }
22
        [Import]
23
        public IStatusKeeper StatusKeeper { get; set; }
24

    
25
        [Import]
26
        public NetworkAgent NetworkAgent { get; set; }
27

    
28
        private static readonly ILog Log = LogManager.GetLogger("WorkflowAgent");
29

    
30
        public void Start()
31
        {
32
            _agent = Agent<WorkflowState>.Start(inbox =>
33
            {
34
                Action loop = null;
35
                loop = () =>
36
                {
37
                    var message = inbox.Receive();
38
                    var process = message.Then(Process, inbox.CancellationToken);                        
39
                    inbox.LoopAsync(process,loop,ex=>
40
                            Log.ErrorFormat("[ERROR] Synch for {0}:\r{1}", message.Result.FileName, ex));
41
                };
42
                loop();
43
            });
44
        }
45

    
46
        private Task<object> Process(WorkflowState state)
47
        {
48
            var accountInfo = state.AccountInfo;
49
            using (log4net.ThreadContext.Stacks["Workflow"].Push("Process"))
50
            {
51
                try
52
                {
53

    
54
                    if (Log.IsDebugEnabled)
55
                        Log.DebugFormat("State {0} {1} {2}", state.FileName, state.Status, state.TriggeringChange);
56

    
57
                    if (state.Skip)
58
                    {
59
                        if (Log.IsDebugEnabled) Log.DebugFormat("Skipping {0}", state.FileName);
60

    
61
                        return CompletedTask<object>.Default;
62
                    }
63
                    string path = state.Path.ToLower();
64

    
65

    
66

    
67
                    FileSystemInfo info = Directory.Exists(path) ? (FileSystemInfo) new DirectoryInfo(path) : new FileInfo(path);
68

    
69
                    //Bypass deleted files, unless the status is Deleted
70
                    if (!info.Exists && state.Status != FileStatus.Deleted)
71
                    {
72
                        state.Skip = true;
73
                        this.StatusKeeper.ClearFileStatus(path);
74

    
75
                        if (Log.IsDebugEnabled) Log.DebugFormat("Skipped missing {0}", state.FileName);
76

    
77
                        return CompletedTask<object>.Default;
78
                    }
79

    
80
                    var fileState = FileState.FindByFilePath(path);
81

    
82

    
83
                    switch (state.Status)
84
                    {
85
                        case FileStatus.Created:
86
                        case FileStatus.Modified:
87
                            NetworkAgent.Post(new CloudUploadAction(accountInfo, info, fileState, accountInfo.BlockSize,
88
                                                                    accountInfo.BlockHash));
89
                            break;
90
                        case FileStatus.Deleted:
91
                            NetworkAgent.Post(new CloudDeleteAction(accountInfo, info, fileState));
92
                            break;
93
                        case FileStatus.Renamed:
94
                            FileSystemInfo oldInfo = Directory.Exists(state.OldPath) ? (FileSystemInfo)new DirectoryInfo(state.OldPath) : new FileInfo(state.OldPath);
95
                            FileSystemInfo newInfo = Directory.Exists(state.Path) ? (FileSystemInfo)new DirectoryInfo(state.Path) : new FileInfo(state.Path);
96
                            NetworkAgent.Post(new CloudMoveAction(accountInfo, CloudActionType.RenameCloud,
97
                                                                  oldInfo,
98
                                                                  newInfo));
99
                            break;
100
                    }
101

    
102
                    return CompletedTask<object>.Default;
103
                }
104
                catch (Exception ex)
105
                {
106
                    Log.Error(ex.ToString());
107
                    throw;
108
                }
109
            }
110
        }
111

    
112

    
113
        //Starts interrupted files for a specific account
114
        public void RestartInterruptedFiles(AccountInfo accountInfo)
115
        {
116
            
117
            StatusNotification.NotifyChange("Restart processing interrupted files", TraceLevel.Verbose);
118

    
119
            using (log4net.ThreadContext.Stacks["Workflow"].Push("Restart"))
120
            {
121
                if (Log.IsDebugEnabled)
122
                    Log.Debug("Starting interrupted files");
123

    
124
                var cachePath = Path.Combine(accountInfo.AccountPath, FolderConstants.CacheFolder)
125
                    .ToLower();
126

    
127

    
128

    
129
                var account = accountInfo;
130
                var pendingEntries = from state in FileState.Queryable
131
                                     where state.FileStatus != FileStatus.Unchanged &&
132
                                           !state.FilePath.StartsWith(cachePath) &&
133
                                           !state.FilePath.EndsWith(".ignore") &&
134
                                           state.FilePath.StartsWith(account.AccountPath)
135
                                     select state;
136
                var pendingStates = new List<WorkflowState>();
137
                foreach (var state in pendingEntries)
138
                {
139
                        pendingStates.Add(new WorkflowState(account, state));
140
                }
141
                if (Log.IsDebugEnabled)
142
                    Log.DebugFormat("Found {0} interrupted files", pendingStates.Count);
143

    
144

    
145
                foreach (var entry in pendingStates)
146
                {
147
                       Post(entry);
148
                }
149
            }
150
        }
151

    
152

    
153

    
154
        public void Post(WorkflowState workflowState)
155
        {
156
            if (Log.IsDebugEnabled)
157
                Log.DebugFormat("Posted {0} {1} {2}", workflowState.Path, workflowState.Status, workflowState.TriggeringChange);
158

    
159
            //Remove invalid state            
160
            //For now, ignore paths
161
           /* if (Directory.Exists(workflowState.Path))
162
                return;*/
163
            //TODO: Need to handle folder renames            
164

    
165
            Debug.Assert(workflowState.Path.StartsWith(workflowState.AccountInfo.AccountPath, StringComparison.InvariantCultureIgnoreCase), "File from wrong account posted");
166

    
167
            _agent.Post(workflowState);
168
        }     
169

    
170
    }
171
}