Added directory object creation whenever a new directory is created
[pithos-ms-client] / trunk / Pithos.Core / Agents / WorkflowAgent.cs
1 using System;
2 using System.Collections.Generic;
3 using System.ComponentModel.Composition;
4 using System.Diagnostics;
5 using System.Diagnostics.Contracts;
6 using System.IO;
7 using System.Linq;
8 using System.Text;
9 using System.Threading.Tasks;
10 using Pithos.Interfaces;
11 using Pithos.Network;
12 using log4net;
13
14 namespace Pithos.Core.Agents
15 {
16     [Export]
17     public class WorkflowAgent
18     {
19         Agent<WorkflowState> _agent;
20                 
21         public IStatusNotification StatusNotification { get; set; }
22         [Import]
23         public IStatusKeeper StatusKeeper { get; set; }
24
25         [Import]
26         public NetworkAgent NetworkAgent { get; set; }
27
28         private static readonly ILog Log = LogManager.GetLogger("WorkflowAgent");
29
30         public void Start()
31         {
32             _agent = Agent<WorkflowState>.Start(inbox =>
33             {
34                 Action loop = null;
35                 loop = () =>
36                 {
37                     var message = inbox.Receive();
38                     var process = message.Then(Process, inbox.CancellationToken);                        
39                     inbox.LoopAsync(process,loop,ex=>
40                             Log.ErrorFormat("[ERROR] Synch for {0}:\r{1}", message.Result.FileName, ex));
41                 };
42                 loop();
43             });
44         }
45
46         private Task<object> Process(WorkflowState state)
47         {
48             var accountInfo = state.AccountInfo;
49             using (log4net.ThreadContext.Stacks["Workflow"].Push("Process"))
50             {
51                 try
52                 {
53
54                     if (Log.IsDebugEnabled)
55                         Log.DebugFormat("State {0} {1} {2}", state.FileName, state.Status, state.TriggeringChange);
56
57                     if (state.Skip)
58                     {
59                         if (Log.IsDebugEnabled) Log.DebugFormat("Skipping {0}", state.FileName);
60
61                         return CompletedTask<object>.Default;
62                     }
63                     string path = state.Path.ToLower();
64
65
66
67                     FileSystemInfo info = Directory.Exists(path) ? (FileSystemInfo) new DirectoryInfo(path) : new FileInfo(path);
68
69                     //Bypass deleted files, unless the status is Deleted
70                     if (!info.Exists && state.Status != FileStatus.Deleted)
71                     {
72                         state.Skip = true;
73                         this.StatusKeeper.ClearFileStatus(path);
74
75                         if (Log.IsDebugEnabled) Log.DebugFormat("Skipped missing {0}", state.FileName);
76
77                         return CompletedTask<object>.Default;
78                     }
79
80                     var fileState = FileState.FindByFilePath(path);
81
82
83                     switch (state.Status)
84                     {
85                         case FileStatus.Created:
86                         case FileStatus.Modified:
87                             NetworkAgent.Post(new CloudUploadAction(accountInfo, info, fileState, accountInfo.BlockSize,
88                                                                     accountInfo.BlockHash));
89                             break;
90                         case FileStatus.Deleted:
91                             NetworkAgent.Post(new CloudDeleteAction(accountInfo, info, fileState));
92                             break;
93                         case FileStatus.Renamed:
94                             FileSystemInfo oldInfo = Directory.Exists(state.OldPath) ? (FileSystemInfo)new DirectoryInfo(state.OldPath) : new FileInfo(state.OldPath);
95                             FileSystemInfo newInfo = Directory.Exists(state.Path) ? (FileSystemInfo)new DirectoryInfo(state.Path) : new FileInfo(state.Path);
96                             NetworkAgent.Post(new CloudMoveAction(accountInfo, CloudActionType.RenameCloud,
97                                                                   oldInfo,
98                                                                   newInfo));
99                             break;
100                     }
101
102                     return CompletedTask<object>.Default;
103                 }
104                 catch (Exception ex)
105                 {
106                     Log.Error(ex.ToString());
107                     throw;
108                 }
109             }
110         }
111
112
113         //Starts interrupted files for a specific account
114         public void RestartInterruptedFiles(AccountInfo accountInfo)
115         {
116             
117             StatusNotification.NotifyChange("Restart processing interrupted files", TraceLevel.Verbose);
118
119             using (log4net.ThreadContext.Stacks["Workflow"].Push("Restart"))
120             {
121                 if (Log.IsDebugEnabled)
122                     Log.Debug("Starting interrupted files");
123
124                 var cachePath = Path.Combine(accountInfo.AccountPath, FolderConstants.CacheFolder)
125                     .ToLower();
126
127
128
129                 var account = accountInfo;
130                 var pendingEntries = from state in FileState.Queryable
131                                      where state.FileStatus != FileStatus.Unchanged &&
132                                            !state.FilePath.StartsWith(cachePath) &&
133                                            !state.FilePath.EndsWith(".ignore") &&
134                                            state.FilePath.StartsWith(account.AccountPath)
135                                      select state;
136                 var pendingStates = new List<WorkflowState>();
137                 foreach (var state in pendingEntries)
138                 {
139                         pendingStates.Add(new WorkflowState(account, state));
140                 }
141                 if (Log.IsDebugEnabled)
142                     Log.DebugFormat("Found {0} interrupted files", pendingStates.Count);
143
144
145                 foreach (var entry in pendingStates)
146                 {
147                        Post(entry);
148                 }
149             }
150         }
151
152
153
154         public void Post(WorkflowState workflowState)
155         {
156             if (Log.IsDebugEnabled)
157                 Log.DebugFormat("Posted {0} {1} {2}", workflowState.Path, workflowState.Status, workflowState.TriggeringChange);
158
159             //Remove invalid state            
160             //For now, ignore paths
161            /* if (Directory.Exists(workflowState.Path))
162                 return;*/
163             //TODO: Need to handle folder renames            
164
165             Debug.Assert(workflowState.Path.StartsWith(workflowState.AccountInfo.AccountPath, StringComparison.InvariantCultureIgnoreCase), "File from wrong account posted");
166
167             _agent.Post(workflowState);
168         }     
169
170     }
171 }