Added Permissions, Tags
[pithos-ms-client] / trunk / Pithos.Core / Agents / WorkflowAgent.cs
1 using System;
2 using System.Collections.Generic;
3 using System.ComponentModel.Composition;
4 using System.Diagnostics;
5 using System.Diagnostics.Contracts;
6 using System.IO;
7 using System.Linq;
8 using System.Text;
9 using System.Threading.Tasks;
10 using Pithos.Interfaces;
11 using Pithos.Network;
12 using log4net;
13
14 namespace Pithos.Core.Agents
15 {
16     [Export]
17     public class WorkflowAgent
18     {
19         Agent<WorkflowState> _agent;
20                 
21         public IStatusNotification StatusNotification { get; set; }
22         [Import]
23         public IStatusKeeper StatusKeeper { get; set; }
24
25         [Import]
26         public NetworkAgent NetworkAgent { get; set; }
27
28         private static readonly ILog Log = LogManager.GetLogger("WorkflowAgent");
29
30         public void Start()
31         {
32             _agent = Agent<WorkflowState>.Start(inbox =>
33             {
34                 Action loop = null;
35                 loop = () =>
36                 {
37                     var message = inbox.Receive();
38                     var process = message.Then(Process, inbox.CancellationToken);                        
39                     inbox.LoopAsync(process,loop,ex=>
40                             Log.ErrorFormat("[ERROR] Synch for {0}:\r{1}", message.Result.FileName, ex));
41                 };
42                 loop();
43             });
44         }
45
46         private Task<object> Process(WorkflowState state)
47         {
48             var accountInfo = state.AccountInfo;
49             using (log4net.ThreadContext.Stacks["Workflow"].Push("Process"))
50             {
51                 if (Log.IsDebugEnabled) Log.DebugFormat("State {0} {1} {2}", state.FileName,state.Status,state.TriggeringChange);
52
53                 if (state.Skip)
54                 {
55                     if (Log.IsDebugEnabled) Log.DebugFormat("Skipping {0}",state.FileName);
56                     
57                     return CompletedTask<object>.Default;
58                 }
59                 string path = state.Path.ToLower();
60
61                 //Bypass deleted files, unless the status is Deleted
62                 if (!File.Exists(path) && state.Status != FileStatus.Deleted)
63                 {
64                     state.Skip = true;
65                     this.StatusKeeper.ClearFileStatus(path);
66                     
67                     if (Log.IsDebugEnabled) Log.DebugFormat("Skipped missing {0}", state.FileName);
68
69                     return CompletedTask<object>.Default;
70                 }
71
72                 var fileState = FileState.FindByFilePath(path);
73                 var info = new FileInfo(path);
74
75
76                 switch (state.Status)
77                 {
78                     case FileStatus.Created:
79                     case FileStatus.Modified:
80                         NetworkAgent.Post(new CloudUploadAction(accountInfo,info, fileState, accountInfo.BlockSize,
81                             accountInfo.BlockHash));
82                         break;
83                     case FileStatus.Deleted:                       
84                         NetworkAgent.Post(new CloudDeleteAction(accountInfo,info, fileState));
85                         break;
86                     case FileStatus.Renamed:
87                         NetworkAgent.Post(new CloudMoveAction(accountInfo,CloudActionType.RenameCloud, new FileInfo(state.OldPath),
88                                                               new FileInfo(state.Path)));
89                         break;
90                 }
91
92                 return CompletedTask<object>.Default;
93             }
94         }
95
96
97         //Starts interrupted files for a specific account
98         public void RestartInterruptedFiles(AccountInfo accountInfo)
99         {
100             
101             StatusNotification.NotifyChange("Restart processing interrupted files", TraceLevel.Verbose);
102
103             using (log4net.ThreadContext.Stacks["Workflow"].Push("Restart"))
104             {
105                 if (Log.IsDebugEnabled)
106                     Log.Debug("Starting interrupted files");
107
108                 var cachePath = Path.Combine(accountInfo.AccountPath, FolderConstants.CacheFolder)
109                     .ToLower();
110
111
112                 var pendingEntries = from state in FileState.Queryable
113                                      where state.FileStatus != FileStatus.Unchanged &&
114                                            !state.FilePath.StartsWith(cachePath) &&
115                                            !state.FilePath.EndsWith(".ignore") &&
116                                            state.FilePath.StartsWith(accountInfo.AccountPath)
117                                      select state;
118                 if (Log.IsDebugEnabled)
119                     Log.DebugFormat("Found {0} interrupted files",pendingEntries.Count());
120
121                 var validEntries = from state in pendingEntries
122                                    select new WorkflowState
123                                               {
124                                                   AccountInfo=accountInfo,
125                                                   Path = state.FilePath.ToLower(),
126                                                   FileName = Path.GetFileName(state.FilePath).ToLower(),
127                                                   Hash = state.Checksum,
128                                                   Status = state.OverlayStatus == FileOverlayStatus.Unversioned
129                                                                ? FileStatus.Created
130                                                                : state.FileStatus,
131                                                   TriggeringChange =
132                                                       state.OverlayStatus == FileOverlayStatus.Unversioned
133                                                           ? WatcherChangeTypes.Created
134                                                           : WatcherChangeTypes.Changed
135                                               };
136                 foreach (var entry in validEntries)
137                 {                    
138                     Post(entry);
139                 }
140             }
141         }       
142
143        
144
145         public void Post(WorkflowState workflowState)
146         {
147             if (Log.IsDebugEnabled)
148                 Log.DebugFormat("Posted {0} {1} {2}", workflowState.Path, workflowState.Status, workflowState.TriggeringChange);
149             _agent.Post(workflowState);
150         }
151     }
152 }