Changes for directories
[pithos-ms-client] / trunk / Pithos.Core / Agents / WorkflowAgent.cs
1 // -----------------------------------------------------------------------
2 // <copyright file="WorkflowAgent.cs" company="GRNET">
3 // Copyright 2011 GRNET S.A. All rights reserved.
4 // 
5 // Redistribution and use in source and binary forms, with or
6 // without modification, are permitted provided that the following
7 // conditions are met:
8 // 
9 //   1. Redistributions of source code must retain the above
10 //      copyright notice, this list of conditions and the following
11 //      disclaimer.
12 // 
13 //   2. Redistributions in binary form must reproduce the above
14 //      copyright notice, this list of conditions and the following
15 //      disclaimer in the documentation and/or other materials
16 //      provided with the distribution.
17 // 
18 // THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
19 // OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
20 // WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
21 // PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
22 // CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
23 // SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
24 // LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
25 // USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
26 // AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
27 // LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
28 // ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
29 // POSSIBILITY OF SUCH DAMAGE.
30 // 
31 // The views and conclusions contained in the software and
32 // documentation are those of the authors and should not be
33 // interpreted as representing official policies, either expressed
34 // or implied, of GRNET S.A.
35 // </copyright>
36 // -----------------------------------------------------------------------
37
38 using System;
39 using System.Collections.Generic;
40 using System.ComponentModel.Composition;
41 using System.Diagnostics;
42 using System.Diagnostics.Contracts;
43 using System.IO;
44 using System.Linq;
45 using System.Text;
46 using System.Threading.Tasks;
47 using Pithos.Interfaces;
48 using Pithos.Network;
49 using log4net;
50
51 namespace Pithos.Core.Agents
52 {
53     [Export]
54     public class WorkflowAgent
55     {
56         Agent<WorkflowState> _agent;
57                 
58         public IStatusNotification StatusNotification { get; set; }
59         [Import]
60         public IStatusKeeper StatusKeeper { get; set; }
61
62         [Import]
63         public NetworkAgent NetworkAgent { get; set; }
64
65         private static readonly ILog Log = LogManager.GetLogger("WorkflowAgent");
66
67         public void Start()
68         {
69             _agent = Agent<WorkflowState>.Start(inbox =>
70             {
71                 Action loop = null;
72                 loop = () =>
73                 {
74                     var message = inbox.Receive();
75                     var process = message.Then(Process, inbox.CancellationToken);                        
76                     inbox.LoopAsync(process,loop,ex=>
77                             Log.ErrorFormat("[ERROR] Synch for {0}:\r{1}", message.Result.FileName, ex));
78                 };
79                 loop();
80             });
81         }
82
83         private Task<object> Process(WorkflowState state)
84         {
85             var accountInfo = state.AccountInfo;
86             using (log4net.ThreadContext.Stacks["Workflow"].Push("Process"))
87             {
88                 try
89                 {
90
91                     if (Log.IsDebugEnabled)
92                         Log.DebugFormat("State {0} {1} {2}", state.FileName, state.Status, state.TriggeringChange);
93
94                     if (state.Skip)
95                     {
96                         if (Log.IsDebugEnabled) Log.DebugFormat("Skipping {0}", state.FileName);
97
98                         return CompletedTask<object>.Default;
99                     }
100                     string path = state.Path.ToLower();
101
102
103
104                     FileSystemInfo info = Directory.Exists(path) ? (FileSystemInfo) new DirectoryInfo(path) : new FileInfo(path);
105
106                     //Bypass deleted files, unless the status is Deleted
107                     if (!info.Exists && state.Status != FileStatus.Deleted)
108                     {
109                         state.Skip = true;
110                         this.StatusKeeper.ClearFileStatus(path);
111
112                         if (Log.IsDebugEnabled) Log.DebugFormat("Skipped missing {0}", state.FileName);
113
114                         return CompletedTask<object>.Default;
115                     }
116
117                     var fileState = FileState.FindByFilePath(path);
118
119
120                     switch (state.Status)
121                     {
122                         case FileStatus.Created:
123                         case FileStatus.Modified:
124                             NetworkAgent.Post(new CloudUploadAction(accountInfo, info, fileState, accountInfo.BlockSize,
125                                                                     accountInfo.BlockHash));
126                             break;
127                         case FileStatus.Deleted:
128                             NetworkAgent.Post(new CloudDeleteAction(accountInfo, info, fileState));
129                             break;
130                         case FileStatus.Renamed:
131                             FileSystemInfo oldInfo = Directory.Exists(state.OldPath) ? (FileSystemInfo)new DirectoryInfo(state.OldPath) : new FileInfo(state.OldPath);
132                             FileSystemInfo newInfo = Directory.Exists(state.Path) ? (FileSystemInfo)new DirectoryInfo(state.Path) : new FileInfo(state.Path);
133                             NetworkAgent.Post(new CloudMoveAction(accountInfo, CloudActionType.RenameCloud,
134                                                                   oldInfo,
135                                                                   newInfo));
136                             break;
137                     }
138
139                     return CompletedTask<object>.Default;
140                 }
141                 catch (Exception ex)
142                 {
143                     Log.Error(ex.ToString());
144                     throw;
145                 }
146             }
147         }
148
149
150         //Starts interrupted files for a specific account
151         public void RestartInterruptedFiles(AccountInfo accountInfo)
152         {
153             
154             StatusNotification.NotifyChange("Restart processing interrupted files", TraceLevel.Verbose);
155
156             using (log4net.ThreadContext.Stacks["Workflow"].Push("Restart"))
157             {
158                 if (Log.IsDebugEnabled)
159                     Log.Debug("Starting interrupted files");
160
161                 var cachePath = Path.Combine(accountInfo.AccountPath, FolderConstants.CacheFolder)
162                     .ToLower();
163
164
165
166                 var account = accountInfo;
167                 var pendingEntries = from state in FileState.Queryable
168                                      where state.FileStatus != FileStatus.Unchanged &&
169                                            !state.FilePath.StartsWith(cachePath) &&
170                                            !state.FilePath.EndsWith(".ignore") &&
171                                            state.FilePath.StartsWith(account.AccountPath)
172                                      select state;
173                 var pendingStates = new List<WorkflowState>();
174                 foreach (var state in pendingEntries)
175                 {
176                         pendingStates.Add(new WorkflowState(account, state));
177                 }
178                 if (Log.IsDebugEnabled)
179                     Log.DebugFormat("Found {0} interrupted files", pendingStates.Count);
180
181
182                 foreach (var entry in pendingStates)
183                 {
184                        Post(entry);
185                 }
186             }
187         }
188
189
190
191         public void Post(WorkflowState workflowState)
192         {
193             if (Log.IsDebugEnabled)
194                 Log.DebugFormat("Posted {0} {1} {2}", workflowState.Path, workflowState.Status, workflowState.TriggeringChange);
195
196             //Remove invalid state            
197             //For now, ignore paths
198            /* if (Directory.Exists(workflowState.Path))
199                 return;*/
200             //TODO: Need to handle folder renames            
201
202             Debug.Assert(workflowState.Path.StartsWith(workflowState.AccountInfo.AccountPath, StringComparison.InvariantCultureIgnoreCase), "File from wrong account posted");
203
204             _agent.Post(workflowState);
205         }     
206
207     }
208 }