Revision 5120f3cb trunk/Pithos.Core/Agents/WorkflowAgent.cs

b/trunk/Pithos.Core/Agents/WorkflowAgent.cs
8 8
using System.Text;
9 9
using System.Threading.Tasks;
10 10
using Pithos.Interfaces;
11
using log4net;
11 12

  
12 13
namespace Pithos.Core.Agents
13 14
{
......
27 28
        [Import]
28 29
        public NetworkAgent NetworkAgent { get; set; }
29 30

  
31
        private static readonly ILog Log = LogManager.GetLogger("WorkflowAgent");
32

  
30 33
        public void Start()
31 34
        {
32 35
            _agent = Agent<WorkflowState>.Start(inbox =>
......
37 40
                    var message = inbox.Receive();
38 41
                    var process = message.Then(Process, inbox.CancellationToken);                        
39 42
                    inbox.LoopAsync(process,loop,ex=>
40
                            Trace.TraceError("[ERROR] Synch for {0}:\r{1}", message.Result.FileName, ex));
43
                            Log.ErrorFormat("[ERROR] Synch for {0}:\r{1}", message.Result.FileName, ex));
41 44
                };
42 45
                loop();
43 46
            });
......
45 48

  
46 49
        private Task<object> Process(WorkflowState state)
47 50
        {
48
            if (state.Skip)
49
                return CompletedTask<object>.Default;
50
            string path = state.Path.ToLower();            
51

  
52
            //Bypass deleted files, unless the status is Deleted
53
            if (!File.Exists(path) && state.Status != FileStatus.Deleted)
51
            using (log4net.ThreadContext.Stacks["Workflow"].Push("Process"))
54 52
            {
55
                state.Skip = true;
56
                this.StatusKeeper.ClearFileStatus(path);
57
                return CompletedTask<object>.Default;
58
            }
59
            var fileState = FileState.FindByFilePath(path);
60
            var blockHash = NetworkAgent.BlockHash;
61
            var blockSize = NetworkAgent.BlockSize;
62
            var info = new FileInfo(path);
53
                if (Log.IsDebugEnabled) Log.DebugFormat("State {0} {1} {2}", state.FileName,state.Status,state.TriggeringChange);
63 54

  
64
            switch (state.Status)
65
            {
66
                case FileStatus.Created:
67
                case FileStatus.Modified:
68
                    NetworkAgent.Post(new CloudAction(CloudActionType.UploadUnconditional, info, ObjectInfo.Empty, fileState,blockSize,blockHash));
69
                    break;
70
                case FileStatus.Deleted:
71
                    string fileName = info.AsRelativeUrlTo(NetworkAgent.FileAgent.RootPath);                    
72
                    NetworkAgent.Post(new CloudAction(CloudActionType.DeleteCloud, null, new ObjectInfo { Name = fileName }, fileState, blockSize, blockHash));
73
                    break;
74
                case FileStatus.Renamed:
75
                    NetworkAgent.Post(new CloudMoveAction(CloudActionType.RenameCloud, state.OldFileName, state.OldPath, state.FileName, state.Path));
76
                    break;
77
            }
55
                if (state.Skip)
56
                {
57
                    if (Log.IsDebugEnabled) Log.DebugFormat("Skipping {0}",state.FileName);
58
                    
59
                    return CompletedTask<object>.Default;
60
                }
61
                string path = state.Path.ToLower();
62

  
63
                //Bypass deleted files, unless the status is Deleted
64
                if (!File.Exists(path) && state.Status != FileStatus.Deleted)
65
                {
66
                    state.Skip = true;
67
                    this.StatusKeeper.ClearFileStatus(path);
68
                    
69
                    if (Log.IsDebugEnabled) Log.DebugFormat("Skipped missing {0}", state.FileName);
70

  
71
                    return CompletedTask<object>.Default;
72
                }
73
                var fileState = FileState.FindByFilePath(path);
74
                var blockHash = NetworkAgent.BlockHash;
75
                var blockSize = NetworkAgent.BlockSize;
76
                var info = new FileInfo(path);
77

  
78
                switch (state.Status)
79
                {
80
                    case FileStatus.Created:
81
                    case FileStatus.Modified:
82
                        NetworkAgent.Post(new CloudUploadAction(info, fileState, blockSize, blockHash));
83
                        break;
84
                    case FileStatus.Deleted:
85
                        string fileName = info.AsRelativeUrlTo(NetworkAgent.FileAgent.RootPath);
86
                        NetworkAgent.Post(new CloudDeleteAction(fileName, fileState));
87
                        break;
88
                    case FileStatus.Renamed:
89
                        NetworkAgent.Post(new CloudMoveAction(CloudActionType.RenameCloud, state.OldFileName,
90
                                                              state.OldPath, state.FileName, state.Path));
91
                        break;
92
                }
78 93

  
79
            return CompletedTask<object>.Default;
94
                return CompletedTask<object>.Default;
95
            }
80 96
        }
81 97

  
82 98

  
......
84 100
        public void RestartInterruptedFiles()
85 101
        {
86 102
            
87
            StatusNotification.NotifyChange("Restart processing interrupted files", TraceLevel.Verbose);            
88

  
89
            var pendingEntries = from state in FileState.Queryable
90
                                   where state.FileStatus != FileStatus.Unchanged &&
91
                                         !state.FilePath.StartsWith(FragmentsPath.ToLower()) &&
92
                                         !state.FilePath.EndsWith(".ignore")
93
                                   select state;
94

  
95
            var validEntries = from state in pendingEntries
96
                             select new WorkflowState
97
                             {
98
                                 Path = state.FilePath.ToLower(),
99
                                 FileName = Path.GetFileName(state.FilePath).ToLower(),
100
                                 Hash = state.Checksum,
101
                                 Status = state.OverlayStatus == FileOverlayStatus.Unversioned ?
102
                                                   FileStatus.Created :
103
                                                   state.FileStatus,
104
                                 TriggeringChange = state.OverlayStatus == FileOverlayStatus.Unversioned ?
105
                                                   WatcherChangeTypes.Created :
106
                                                   WatcherChangeTypes.Changed
107
                             };
108
            foreach (var entry in validEntries)
109
            {
110
                Post(entry);
111
            }            
103
            StatusNotification.NotifyChange("Restart processing interrupted files", TraceLevel.Verbose);
112 104

  
105
            using (log4net.ThreadContext.Stacks["Workflow"].Push("Restart"))
106
            {
107
                if (Log.IsDebugEnabled)
108
                    Log.Debug("Starting interrupted files");
109
                
110
                var pendingEntries = from state in FileState.Queryable
111
                                     where state.FileStatus != FileStatus.Unchanged &&
112
                                           !state.FilePath.StartsWith(FragmentsPath.ToLower()) &&
113
                                           !state.FilePath.EndsWith(".ignore")
114
                                     select state;
115
                if (Log.IsDebugEnabled)
116
                    Log.DebugFormat("Found {0} interrupted files",pendingEntries.Count());
117

  
118
                var validEntries = from state in pendingEntries
119
                                   select new WorkflowState
120
                                              {
121
                                                  Path = state.FilePath.ToLower(),
122
                                                  FileName = Path.GetFileName(state.FilePath).ToLower(),
123
                                                  Hash = state.Checksum,
124
                                                  Status = state.OverlayStatus == FileOverlayStatus.Unversioned
125
                                                               ? FileStatus.Created
126
                                                               : state.FileStatus,
127
                                                  TriggeringChange =
128
                                                      state.OverlayStatus == FileOverlayStatus.Unversioned
129
                                                          ? WatcherChangeTypes.Created
130
                                                          : WatcherChangeTypes.Changed
131
                                              };
132
                foreach (var entry in validEntries)
133
                {                    
134
                    Post(entry);
135
                }
136
            }
113 137
        }       
114 138

  
115 139
       
116 140

  
117 141
        public void Post(WorkflowState workflowState)
118 142
        {
143
            if (Log.IsDebugEnabled)
144
                Log.DebugFormat("Posted {0} {1} {2}", workflowState.Path, workflowState.Status, workflowState.TriggeringChange);
119 145
            _agent.Post(workflowState);
120 146
        }
121 147
    }

Also available in: Unified diff