Logging changes, first changes to multi account support
[pithos-ms-client] / trunk / Pithos.Core / Agents / WorkflowAgent.cs
1 using System;
2 using System.Collections.Generic;
3 using System.ComponentModel.Composition;
4 using System.Diagnostics;
5 using System.Diagnostics.Contracts;
6 using System.IO;
7 using System.Linq;
8 using System.Text;
9 using System.Threading.Tasks;
10 using Pithos.Interfaces;
11 using log4net;
12
13 namespace Pithos.Core.Agents
14 {
15     [Export]
16     public class WorkflowAgent
17     {
18         Agent<WorkflowState> _agent;
19                 
20         public IStatusNotification StatusNotification { get; set; }
21         [Import]
22         public IStatusKeeper StatusKeeper { get; set; }
23
24         //We should avoid processing files stored in the Fragments folder
25         //The Full path to the fragments folder is stored in FragmentsPath
26         public string FragmentsPath { get; set; }
27
28         [Import]
29         public NetworkAgent NetworkAgent { get; set; }
30
31         private static readonly ILog Log = LogManager.GetLogger("WorkflowAgent");
32
33         public void Start()
34         {
35             _agent = Agent<WorkflowState>.Start(inbox =>
36             {
37                 Action loop = null;
38                 loop = () =>
39                 {
40                     var message = inbox.Receive();
41                     var process = message.Then(Process, inbox.CancellationToken);                        
42                     inbox.LoopAsync(process,loop,ex=>
43                             Log.ErrorFormat("[ERROR] Synch for {0}:\r{1}", message.Result.FileName, ex));
44                 };
45                 loop();
46             });
47         }
48
49         private Task<object> Process(WorkflowState state)
50         {
51             using (log4net.ThreadContext.Stacks["Workflow"].Push("Process"))
52             {
53                 if (Log.IsDebugEnabled) Log.DebugFormat("State {0} {1} {2}", state.FileName,state.Status,state.TriggeringChange);
54
55                 if (state.Skip)
56                 {
57                     if (Log.IsDebugEnabled) Log.DebugFormat("Skipping {0}",state.FileName);
58                     
59                     return CompletedTask<object>.Default;
60                 }
61                 string path = state.Path.ToLower();
62
63                 //Bypass deleted files, unless the status is Deleted
64                 if (!File.Exists(path) && state.Status != FileStatus.Deleted)
65                 {
66                     state.Skip = true;
67                     this.StatusKeeper.ClearFileStatus(path);
68                     
69                     if (Log.IsDebugEnabled) Log.DebugFormat("Skipped missing {0}", state.FileName);
70
71                     return CompletedTask<object>.Default;
72                 }
73                 var fileState = FileState.FindByFilePath(path);
74                 var blockHash = NetworkAgent.BlockHash;
75                 var blockSize = NetworkAgent.BlockSize;
76                 var info = new FileInfo(path);
77
78                 switch (state.Status)
79                 {
80                     case FileStatus.Created:
81                     case FileStatus.Modified:
82                         NetworkAgent.Post(new CloudUploadAction(info, fileState, blockSize, blockHash));
83                         break;
84                     case FileStatus.Deleted:
85                         string fileName = info.AsRelativeUrlTo(NetworkAgent.FileAgent.RootPath);
86                         NetworkAgent.Post(new CloudDeleteAction(fileName, fileState));
87                         break;
88                     case FileStatus.Renamed:
89                         NetworkAgent.Post(new CloudMoveAction(CloudActionType.RenameCloud, state.OldFileName,
90                                                               state.OldPath, state.FileName, state.Path));
91                         break;
92                 }
93
94                 return CompletedTask<object>.Default;
95             }
96         }
97
98
99
100         public void RestartInterruptedFiles()
101         {
102             
103             StatusNotification.NotifyChange("Restart processing interrupted files", TraceLevel.Verbose);
104
105             using (log4net.ThreadContext.Stacks["Workflow"].Push("Restart"))
106             {
107                 if (Log.IsDebugEnabled)
108                     Log.Debug("Starting interrupted files");
109                 
110                 var pendingEntries = from state in FileState.Queryable
111                                      where state.FileStatus != FileStatus.Unchanged &&
112                                            !state.FilePath.StartsWith(FragmentsPath.ToLower()) &&
113                                            !state.FilePath.EndsWith(".ignore")
114                                      select state;
115                 if (Log.IsDebugEnabled)
116                     Log.DebugFormat("Found {0} interrupted files",pendingEntries.Count());
117
118                 var validEntries = from state in pendingEntries
119                                    select new WorkflowState
120                                               {
121                                                   Path = state.FilePath.ToLower(),
122                                                   FileName = Path.GetFileName(state.FilePath).ToLower(),
123                                                   Hash = state.Checksum,
124                                                   Status = state.OverlayStatus == FileOverlayStatus.Unversioned
125                                                                ? FileStatus.Created
126                                                                : state.FileStatus,
127                                                   TriggeringChange =
128                                                       state.OverlayStatus == FileOverlayStatus.Unversioned
129                                                           ? WatcherChangeTypes.Created
130                                                           : WatcherChangeTypes.Changed
131                                               };
132                 foreach (var entry in validEntries)
133                 {                    
134                     Post(entry);
135                 }
136             }
137         }       
138
139        
140
141         public void Post(WorkflowState workflowState)
142         {
143             if (Log.IsDebugEnabled)
144                 Log.DebugFormat("Posted {0} {1} {2}", workflowState.Path, workflowState.Status, workflowState.TriggeringChange);
145             _agent.Post(workflowState);
146         }
147     }
148 }