Statistics
| Branch: | Revision:

root / trunk / Pithos.Core / Agents / WorkflowAgent.cs @ 5e31048f

History | View | Annotate | Download (6.6 kB)

1
using System;
2
using System.Collections.Generic;
3
using System.ComponentModel.Composition;
4
using System.Diagnostics;
5
using System.Diagnostics.Contracts;
6
using System.IO;
7
using System.Linq;
8
using System.Text;
9
using System.Threading.Tasks;
10
using System.Threading.Tasks.Dataflow;
11
using Pithos.Interfaces;
12
using Pithos.Network;
13
using log4net;
14

    
15
namespace Pithos.Core.Agents
16
{
17
    [Export]
18
    public class WorkflowAgent
19
    {
20
        private ActionBlock<WorkflowState> _agent;
21
                
22
        public IStatusNotification StatusNotification { get; set; }
23
        [Import]
24
        public IStatusKeeper StatusKeeper { get; set; }
25

    
26
        [Import]
27
        public NetworkAgent NetworkAgent { get; set; }
28

    
29
        public ActionBlock<WorkflowState> Agent
30
        {
31
            get { return _agent; }
32
        }
33

    
34
        private static readonly ILog Log = LogManager.GetLogger("WorkflowAgent");
35

    
36
        public void Start()
37
        {
38
            /*_agent = new ActionBlock<WorkflowState>(message =>
39
            {
40
                Action loop = null;
41
                loop = () =>
42
                {
43
                    //var message = inbox.Receive();
44
                    Process(message);
45
                    var process = message.Then(Process, inbox.CancellationToken);                        
46
                    inbox.LoopAsync(process,loop,ex=>
47
                            Log.ErrorFormat("[ERROR] Synch for {0}:\r{1}", message.Result.FileName, ex));
48
                };
49
                loop();
50
            });*/
51
            _agent = new ActionBlock<WorkflowState>(async message =>
52
            {
53
                try
54
                {
55
                    var action=await TaskEx.Run(()=>Process(message));
56
                    if (action!=null)
57
                        NetworkAgent.Post(action);
58
                }
59
                catch (Exception ex)
60
                {
61
                    Log.ErrorFormat("[ERROR] Synch for {0}:\r{1}", message.FileName, ex);                    
62
                }
63
            });            
64
        }
65

    
66
        private CloudAction Process(WorkflowState state)
67
        {
68
            var accountInfo = state.AccountInfo;
69
            using (log4net.ThreadContext.Stacks["Workflow"].Push("Process"))
70
            {
71
                if (Log.IsDebugEnabled) Log.DebugFormat("State {0} {1} {2}", state.FileName,state.Status,state.TriggeringChange);
72

    
73
                if (state.Skip)
74
                {
75
                    if (Log.IsDebugEnabled) Log.DebugFormat("Skipping {0}",state.FileName);
76
                    
77
                    return null;
78
                }
79
                string path = state.Path.ToLower();
80

    
81
                //Bypass deleted files, unless the status is Deleted
82
                if (!File.Exists(path) && state.Status != FileStatus.Deleted)
83
                {
84
                    state.Skip = true;
85
                    this.StatusKeeper.ClearFileStatus(path);
86
                    
87
                    if (Log.IsDebugEnabled) Log.DebugFormat("Skipped missing {0}", state.FileName);
88

    
89
                    return null;
90
                }
91

    
92
                var fileState = FileState.FindByFilePath(path);
93
                var info = new FileInfo(path);
94

    
95

    
96
                switch (state.Status)
97
                {
98
                    case FileStatus.Created:
99
                    case FileStatus.Modified:
100
                        return new CloudUploadAction(accountInfo,info, fileState, accountInfo.BlockSize,
101
                            accountInfo.BlockHash);
102
                    case FileStatus.Deleted:                       
103
                        return new CloudDeleteAction(accountInfo,info, fileState);
104
                    case FileStatus.Renamed:
105
                        return new CloudMoveAction(accountInfo,CloudActionType.RenameCloud, 
106
                            new FileInfo(state.OldPath),
107
                            new FileInfo(state.Path));
108
                }
109
                return null;
110
            }
111
        }
112

    
113

    
114
        //Starts interrupted files for a specific account
115
        public void RestartInterruptedFiles(AccountInfo accountInfo)
116
        {
117
            
118
            StatusNotification.NotifyChange("Restart processing interrupted files", TraceLevel.Verbose);
119

    
120
            using (log4net.ThreadContext.Stacks["Workflow"].Push("Restart"))
121
            {
122
                if (Log.IsDebugEnabled)
123
                    Log.Debug("Starting interrupted files");
124

    
125
                var cachePath = Path.Combine(accountInfo.AccountPath, FolderConstants.CacheFolder)
126
                    .ToLower();
127

    
128

    
129
                var pendingEntries = from state in FileState.Queryable
130
                                     where state.FileStatus != FileStatus.Unchanged &&
131
                                           !state.FilePath.StartsWith(cachePath) &&
132
                                           !state.FilePath.EndsWith(".ignore") &&
133
                                           state.FilePath.StartsWith(accountInfo.AccountPath)
134
                                     select state;
135
                if (Log.IsDebugEnabled)
136
                    Log.DebugFormat("Found {0} interrupted files",pendingEntries.Count());
137

    
138
                var validEntries = from state in pendingEntries
139
                                   select new WorkflowState
140
                                              {
141
                                                  AccountInfo=accountInfo,
142
                                                  Path = state.FilePath.ToLower(),
143
                                                  FileName = Path.GetFileName(state.FilePath).ToLower(),
144
                                                  Hash = state.Checksum,
145
                                                  Status = state.OverlayStatus == FileOverlayStatus.Unversioned
146
                                                               ? FileStatus.Created
147
                                                               : state.FileStatus,
148
                                                  TriggeringChange =
149
                                                      state.OverlayStatus == FileOverlayStatus.Unversioned
150
                                                          ? WatcherChangeTypes.Created
151
                                                          : WatcherChangeTypes.Changed
152
                                              };
153
                foreach (var entry in validEntries)
154
                {                    
155
                    Post(entry);
156
                }
157
            }
158
        }       
159

    
160
       
161

    
162
        public void Post(WorkflowState workflowState)
163
        {
164
            if (Log.IsDebugEnabled)
165
                Log.DebugFormat("Posted {0} {1} {2}", workflowState.Path, workflowState.Status, workflowState.TriggeringChange);
166
            Agent.Post(workflowState);
167
        }
168
    }
169
}