Statistics
| Branch: | Revision:

root / trunk / Pithos.Core / Agents / WorkflowAgent.cs @ 77e10b4f

History | View | Annotate | Download (6.2 kB)

1
using System;
2
using System.Collections.Generic;
3
using System.ComponentModel.Composition;
4
using System.Diagnostics;
5
using System.Diagnostics.Contracts;
6
using System.IO;
7
using System.Linq;
8
using System.Text;
9
using System.Threading.Tasks;
10
using Pithos.Interfaces;
11
using Pithos.Network;
12
using log4net;
13

    
14
namespace Pithos.Core.Agents
15
{
16
    [Export]
17
    public class WorkflowAgent
18
    {
19
        Agent<WorkflowState> _agent;
20
                
21
        public IStatusNotification StatusNotification { get; set; }
22
        [Import]
23
        public IStatusKeeper StatusKeeper { get; set; }
24

    
25
        [Import]
26
        public NetworkAgent NetworkAgent { get; set; }
27

    
28
        private static readonly ILog Log = LogManager.GetLogger("WorkflowAgent");
29

    
30
        public void Start()
31
        {
32
            _agent = Agent<WorkflowState>.Start(inbox =>
33
            {
34
                Action loop = null;
35
                loop = () =>
36
                {
37
                    var message = inbox.Receive();
38
                    var process = message.Then(Process, inbox.CancellationToken);                        
39
                    inbox.LoopAsync(process,loop,ex=>
40
                            Log.ErrorFormat("[ERROR] Synch for {0}:\r{1}", message.Result.FileName, ex));
41
                };
42
                loop();
43
            });
44
        }
45

    
46
        private Task<object> Process(WorkflowState state)
47
        {
48
            var accountInfo = state.AccountInfo;
49
            using (log4net.ThreadContext.Stacks["Workflow"].Push("Process"))
50
            {
51
                if (Log.IsDebugEnabled) Log.DebugFormat("State {0} {1} {2}", state.FileName,state.Status,state.TriggeringChange);
52

    
53
                if (state.Skip)
54
                {
55
                    if (Log.IsDebugEnabled) Log.DebugFormat("Skipping {0}",state.FileName);
56
                    
57
                    return CompletedTask<object>.Default;
58
                }
59
                string path = state.Path.ToLower();
60

    
61
                //Bypass deleted files, unless the status is Deleted
62
                if (!File.Exists(path) && state.Status != FileStatus.Deleted)
63
                {
64
                    state.Skip = true;
65
                    this.StatusKeeper.ClearFileStatus(path);
66
                    
67
                    if (Log.IsDebugEnabled) Log.DebugFormat("Skipped missing {0}", state.FileName);
68

    
69
                    return CompletedTask<object>.Default;
70
                }
71

    
72
                var fileState = FileState.FindByFilePath(path);
73
                var info = new FileInfo(path);
74

    
75

    
76
                switch (state.Status)
77
                {
78
                    case FileStatus.Created:
79
                    case FileStatus.Modified:
80
                        NetworkAgent.Post(new CloudUploadAction(accountInfo,info, fileState, accountInfo.BlockSize,
81
                            accountInfo.BlockHash));
82
                        break;
83
                    case FileStatus.Deleted:
84
                        string fileName = info.AsRelativeUrlTo(accountInfo.AccountPath);
85
                        NetworkAgent.Post(new CloudDeleteAction(accountInfo,fileName, fileState));
86
                        break;
87
                    case FileStatus.Renamed:
88
                        NetworkAgent.Post(new CloudMoveAction(accountInfo,CloudActionType.RenameCloud, state.OldFileName,
89
                                                              state.OldPath, state.FileName, state.Path));
90
                        break;
91
                }
92

    
93
                return CompletedTask<object>.Default;
94
            }
95
        }
96

    
97

    
98
        //Starts interrupted files for a specific account
99
        public void RestartInterruptedFiles(AccountInfo accountInfo)
100
        {
101
            
102
            StatusNotification.NotifyChange("Restart processing interrupted files", TraceLevel.Verbose);
103

    
104
            using (log4net.ThreadContext.Stacks["Workflow"].Push("Restart"))
105
            {
106
                if (Log.IsDebugEnabled)
107
                    Log.Debug("Starting interrupted files");
108

    
109
                var cachePath = Path.Combine(accountInfo.AccountPath, FolderConstants.CacheFolder)
110
                    .ToLower();
111

    
112

    
113
                var pendingEntries = from state in FileState.Queryable
114
                                     where state.FileStatus != FileStatus.Unchanged &&
115
                                           !state.FilePath.StartsWith(cachePath) &&
116
                                           !state.FilePath.EndsWith(".ignore") &&
117
                                           state.FilePath.StartsWith(accountInfo.AccountPath)
118
                                     select state;
119
                if (Log.IsDebugEnabled)
120
                    Log.DebugFormat("Found {0} interrupted files",pendingEntries.Count());
121

    
122
                var validEntries = from state in pendingEntries
123
                                   select new WorkflowState
124
                                              {
125
                                                  AccountInfo=accountInfo,
126
                                                  Path = state.FilePath.ToLower(),
127
                                                  FileName = Path.GetFileName(state.FilePath).ToLower(),
128
                                                  Hash = state.Checksum,
129
                                                  Status = state.OverlayStatus == FileOverlayStatus.Unversioned
130
                                                               ? FileStatus.Created
131
                                                               : state.FileStatus,
132
                                                  TriggeringChange =
133
                                                      state.OverlayStatus == FileOverlayStatus.Unversioned
134
                                                          ? WatcherChangeTypes.Created
135
                                                          : WatcherChangeTypes.Changed
136
                                              };
137
                foreach (var entry in validEntries)
138
                {                    
139
                    Post(entry);
140
                }
141
            }
142
        }       
143

    
144
       
145

    
146
        public void Post(WorkflowState workflowState)
147
        {
148
            if (Log.IsDebugEnabled)
149
                Log.DebugFormat("Posted {0} {1} {2}", workflowState.Path, workflowState.Status, workflowState.TriggeringChange);
150
            _agent.Post(workflowState);
151
        }
152
    }
153
}