Large changes to accomodate multiple users
[pithos-ms-client] / trunk / Pithos.Core / Agents / WorkflowAgent.cs
1 using System;
2 using System.Collections.Generic;
3 using System.ComponentModel.Composition;
4 using System.Diagnostics;
5 using System.Diagnostics.Contracts;
6 using System.IO;
7 using System.Linq;
8 using System.Text;
9 using System.Threading.Tasks;
10 using Pithos.Interfaces;
11 using Pithos.Network;
12 using log4net;
13
14 namespace Pithos.Core.Agents
15 {
16     [Export]
17     public class WorkflowAgent
18     {
19         Agent<WorkflowState> _agent;
20                 
21         public IStatusNotification StatusNotification { get; set; }
22         [Import]
23         public IStatusKeeper StatusKeeper { get; set; }
24
25         //We should avoid processing files stored in the Fragments folder
26         //The Full path to the fragments folder is stored in FragmentsPath
27         //public string FragmentsPath { get; set; }
28
29         [Import]
30         public NetworkAgent NetworkAgent { get; set; }
31
32         private static readonly ILog Log = LogManager.GetLogger("WorkflowAgent");
33
34         public void Start()
35         {
36             _agent = Agent<WorkflowState>.Start(inbox =>
37             {
38                 Action loop = null;
39                 loop = () =>
40                 {
41                     var message = inbox.Receive();
42                     var process = message.Then(Process, inbox.CancellationToken);                        
43                     inbox.LoopAsync(process,loop,ex=>
44                             Log.ErrorFormat("[ERROR] Synch for {0}:\r{1}", message.Result.FileName, ex));
45                 };
46                 loop();
47             });
48         }
49
50         private Task<object> Process(WorkflowState state)
51         {
52             var accountInfo = state.AccountInfo;
53             using (log4net.ThreadContext.Stacks["Workflow"].Push("Process"))
54             {
55                 if (Log.IsDebugEnabled) Log.DebugFormat("State {0} {1} {2}", state.FileName,state.Status,state.TriggeringChange);
56
57                 if (state.Skip)
58                 {
59                     if (Log.IsDebugEnabled) Log.DebugFormat("Skipping {0}",state.FileName);
60                     
61                     return CompletedTask<object>.Default;
62                 }
63                 string path = state.Path.ToLower();
64
65                 //Bypass deleted files, unless the status is Deleted
66                 if (!File.Exists(path) && state.Status != FileStatus.Deleted)
67                 {
68                     state.Skip = true;
69                     this.StatusKeeper.ClearFileStatus(path);
70                     
71                     if (Log.IsDebugEnabled) Log.DebugFormat("Skipped missing {0}", state.FileName);
72
73                     return CompletedTask<object>.Default;
74                 }
75                 var fileState = FileState.FindByFilePath(path);
76                 var info = new FileInfo(path);
77
78                 switch (state.Status)
79                 {
80                     case FileStatus.Created:
81                     case FileStatus.Modified:
82                         NetworkAgent.Post(new CloudUploadAction(accountInfo,info, fileState, accountInfo.BlockSize,
83                             accountInfo.BlockHash));
84                         break;
85                     case FileStatus.Deleted:
86                         string fileName = info.AsRelativeUrlTo(accountInfo.AccountPath);
87                         NetworkAgent.Post(new CloudDeleteAction(accountInfo,fileName, fileState));
88                         break;
89                     case FileStatus.Renamed:
90                         NetworkAgent.Post(new CloudMoveAction(accountInfo,CloudActionType.RenameCloud, state.OldFileName,
91                                                               state.OldPath, state.FileName, state.Path));
92                         break;
93                 }
94
95                 return CompletedTask<object>.Default;
96             }
97         }
98
99
100         //Starts interrupted files for a specific account
101         public void RestartInterruptedFiles(AccountInfo accountInfo)
102         {
103             
104             StatusNotification.NotifyChange("Restart processing interrupted files", TraceLevel.Verbose);
105
106             using (log4net.ThreadContext.Stacks["Workflow"].Push("Restart"))
107             {
108                 if (Log.IsDebugEnabled)
109                     Log.Debug("Starting interrupted files");
110
111                 var fragmentsPath = Path.Combine(accountInfo.AccountPath, FolderConstants.FragmentsFolder)
112                     .ToLower();
113
114
115                 var pendingEntries = from state in FileState.Queryable
116                                      where state.FileStatus != FileStatus.Unchanged &&
117                                            !state.FilePath.StartsWith(fragmentsPath) &&
118                                            !state.FilePath.EndsWith(".ignore") &&
119                                            state.FilePath.StartsWith(accountInfo.AccountPath)
120                                      select state;
121                 if (Log.IsDebugEnabled)
122                     Log.DebugFormat("Found {0} interrupted files",pendingEntries.Count());
123
124                 var validEntries = from state in pendingEntries
125                                    select new WorkflowState(accountInfo)
126                                               {
127                                                   Path = state.FilePath.ToLower(),
128                                                   FileName = Path.GetFileName(state.FilePath).ToLower(),
129                                                   Hash = state.Checksum,
130                                                   Status = state.OverlayStatus == FileOverlayStatus.Unversioned
131                                                                ? FileStatus.Created
132                                                                : state.FileStatus,
133                                                   TriggeringChange =
134                                                       state.OverlayStatus == FileOverlayStatus.Unversioned
135                                                           ? WatcherChangeTypes.Created
136                                                           : WatcherChangeTypes.Changed
137                                               };
138                 foreach (var entry in validEntries)
139                 {                    
140                     Post(entry);
141                 }
142             }
143         }       
144
145        
146
147         public void Post(WorkflowState workflowState)
148         {
149             if (Log.IsDebugEnabled)
150                 Log.DebugFormat("Posted {0} {1} {2}", workflowState.Path, workflowState.Status, workflowState.TriggeringChange);
151             _agent.Post(workflowState);
152         }
153     }
154 }