Some warning fixes and change of some agents from a hand-coded Agent to Dataflow...
[pithos-ms-client] / trunk / Pithos.Core / Agents / WorkflowAgent.cs
1 using System;
2 using System.Collections.Generic;
3 using System.ComponentModel.Composition;
4 using System.Diagnostics;
5 using System.Diagnostics.Contracts;
6 using System.IO;
7 using System.Linq;
8 using System.Text;
9 using System.Threading.Tasks;
10 using System.Threading.Tasks.Dataflow;
11 using Pithos.Interfaces;
12 using Pithos.Network;
13 using log4net;
14
15 namespace Pithos.Core.Agents
16 {
17     [Export]
18     public class WorkflowAgent
19     {
20         private ActionBlock<WorkflowState> _agent;
21                 
22         public IStatusNotification StatusNotification { get; set; }
23         [Import]
24         public IStatusKeeper StatusKeeper { get; set; }
25
26         [Import]
27         public NetworkAgent NetworkAgent { get; set; }
28
29         public ActionBlock<WorkflowState> Agent
30         {
31             get { return _agent; }
32         }
33
34         private static readonly ILog Log = LogManager.GetLogger("WorkflowAgent");
35
36         public void Start()
37         {
38             /*_agent = new ActionBlock<WorkflowState>(message =>
39             {
40                 Action loop = null;
41                 loop = () =>
42                 {
43                     //var message = inbox.Receive();
44                     Process(message);
45                     var process = message.Then(Process, inbox.CancellationToken);                        
46                     inbox.LoopAsync(process,loop,ex=>
47                             Log.ErrorFormat("[ERROR] Synch for {0}:\r{1}", message.Result.FileName, ex));
48                 };
49                 loop();
50             });*/
51             _agent = new ActionBlock<WorkflowState>(async message =>
52             {
53                 try
54                 {
55                     var action=await TaskEx.Run(()=>Process(message));
56                     if (action!=null)
57                         NetworkAgent.Post(action);
58                 }
59                 catch (Exception ex)
60                 {
61                     Log.ErrorFormat("[ERROR] Synch for {0}:\r{1}", message.FileName, ex);                    
62                 }
63             });            
64         }
65
66         private CloudAction Process(WorkflowState state)
67         {
68             var accountInfo = state.AccountInfo;
69             using (log4net.ThreadContext.Stacks["Workflow"].Push("Process"))
70             {
71                 if (Log.IsDebugEnabled) Log.DebugFormat("State {0} {1} {2}", state.FileName,state.Status,state.TriggeringChange);
72
73                 if (state.Skip)
74                 {
75                     if (Log.IsDebugEnabled) Log.DebugFormat("Skipping {0}",state.FileName);
76                     
77                     return null;
78                 }
79                 string path = state.Path.ToLower();
80
81                 //Bypass deleted files, unless the status is Deleted
82                 if (!File.Exists(path) && state.Status != FileStatus.Deleted)
83                 {
84                     state.Skip = true;
85                     this.StatusKeeper.ClearFileStatus(path);
86                     
87                     if (Log.IsDebugEnabled) Log.DebugFormat("Skipped missing {0}", state.FileName);
88
89                     return null;
90                 }
91
92                 var fileState = FileState.FindByFilePath(path);
93                 var info = new FileInfo(path);
94
95
96                 switch (state.Status)
97                 {
98                     case FileStatus.Created:
99                     case FileStatus.Modified:
100                         return new CloudUploadAction(accountInfo,info, fileState, accountInfo.BlockSize,
101                             accountInfo.BlockHash);
102                     case FileStatus.Deleted:                       
103                         return new CloudDeleteAction(accountInfo,info, fileState);
104                     case FileStatus.Renamed:
105                         return new CloudMoveAction(accountInfo,CloudActionType.RenameCloud, 
106                             new FileInfo(state.OldPath),
107                             new FileInfo(state.Path));
108                 }
109                 return null;
110             }
111         }
112
113
114         //Starts interrupted files for a specific account
115         public void RestartInterruptedFiles(AccountInfo accountInfo)
116         {
117             
118             StatusNotification.NotifyChange("Restart processing interrupted files", TraceLevel.Verbose);
119
120             using (log4net.ThreadContext.Stacks["Workflow"].Push("Restart"))
121             {
122                 if (Log.IsDebugEnabled)
123                     Log.Debug("Starting interrupted files");
124
125                 var cachePath = Path.Combine(accountInfo.AccountPath, FolderConstants.CacheFolder)
126                     .ToLower();
127
128
129                 var pendingEntries = from state in FileState.Queryable
130                                      where state.FileStatus != FileStatus.Unchanged &&
131                                            !state.FilePath.StartsWith(cachePath) &&
132                                            !state.FilePath.EndsWith(".ignore") &&
133                                            state.FilePath.StartsWith(accountInfo.AccountPath)
134                                      select state;
135                 if (Log.IsDebugEnabled)
136                     Log.DebugFormat("Found {0} interrupted files",pendingEntries.Count());
137
138                 var validEntries = from state in pendingEntries
139                                    select new WorkflowState
140                                               {
141                                                   AccountInfo=accountInfo,
142                                                   Path = state.FilePath.ToLower(),
143                                                   FileName = Path.GetFileName(state.FilePath).ToLower(),
144                                                   Hash = state.Checksum,
145                                                   Status = state.OverlayStatus == FileOverlayStatus.Unversioned
146                                                                ? FileStatus.Created
147                                                                : state.FileStatus,
148                                                   TriggeringChange =
149                                                       state.OverlayStatus == FileOverlayStatus.Unversioned
150                                                           ? WatcherChangeTypes.Created
151                                                           : WatcherChangeTypes.Changed
152                                               };
153                 foreach (var entry in validEntries)
154                 {                    
155                     Post(entry);
156                 }
157             }
158         }       
159
160        
161
162         public void Post(WorkflowState workflowState)
163         {
164             if (Log.IsDebugEnabled)
165                 Log.DebugFormat("Posted {0} {1} {2}", workflowState.Path, workflowState.Status, workflowState.TriggeringChange);
166             Agent.Post(workflowState);
167         }
168     }
169 }