4551602c3ddb73b41a08bf94345fee04c670f495
[pithos-ms-client] / trunk%2FPithos.Core%2FAgents%2FNetworkAgent.cs
1 #region
2 /* -----------------------------------------------------------------------
3  * <copyright file="NetworkAgent.cs" company="GRNet">
4  * 
5  * Copyright 2011-2012 GRNET S.A. All rights reserved.
6  *
7  * Redistribution and use in source and binary forms, with or
8  * without modification, are permitted provided that the following
9  * conditions are met:
10  *
11  *   1. Redistributions of source code must retain the above
12  *      copyright notice, this list of conditions and the following
13  *      disclaimer.
14  *
15  *   2. Redistributions in binary form must reproduce the above
16  *      copyright notice, this list of conditions and the following
17  *      disclaimer in the documentation and/or other materials
18  *      provided with the distribution.
19  *
20  *
21  * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
22  * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
23  * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
24  * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
25  * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
28  * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
29  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
30  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
31  * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
32  * POSSIBILITY OF SUCH DAMAGE.
33  *
34  * The views and conclusions contained in the software and
35  * documentation are those of the authors and should not be
36  * interpreted as representing official policies, either expressed
37  * or implied, of GRNET S.A.
38  * </copyright>
39  * -----------------------------------------------------------------------
40  */
41 #endregion
42
43 using System;
44 using System.Collections.Generic;
45 using System.ComponentModel.Composition;
46 using System.Diagnostics;
47 using System.Diagnostics.Contracts;
48 using System.IO;
49 using System.Net;
50 using System.Reflection;
51 using System.Threading;
52 using System.Threading.Tasks;
53 using Castle.ActiveRecord;
54 using Pithos.Interfaces;
55 using Pithos.Network;
56 using log4net;
57
58 namespace Pithos.Core.Agents
59 {
60     [Export]
61     public class NetworkAgent
62     {
63         private static readonly ILog Log = LogManager.GetLogger(MethodBase.GetCurrentMethod().DeclaringType);
64
65         private Agent<CloudAction> _agent;
66
67         [System.ComponentModel.Composition.Import]
68         private DeleteAgent DeleteAgent { get; set; }
69
70         [System.ComponentModel.Composition.Import]
71         public IStatusKeeper StatusKeeper { get; set; }
72
73         private IStatusNotification _statusNotification;
74         public IStatusNotification StatusNotification
75         {
76             get { return _statusNotification; }
77             set
78             {
79                 _statusNotification = value;
80                 DeleteAgent.StatusNotification = value;
81                 Uploader.StatusNotification = value;
82                 Downloader.StatusNotification = value;
83             }
84         }
85
86
87         [System.ComponentModel.Composition.Import]
88         public IPithosSettings Settings { get; set; }
89
90         [System.ComponentModel.Composition.Import]
91         public Uploader Uploader { get; set; }
92
93         [System.ComponentModel.Composition.Import]
94         public Downloader Downloader { get; set; }
95
96         //The Proceed signals the poll agent that it can proceed with polling. 
97         //Essentially it stops the poll agent to give priority to the network agent
98         //Initially the event is signalled because we don't need to pause
99         private readonly AsyncManualResetEvent _proceedEvent = new AsyncManualResetEvent(true);
100
101         public AsyncManualResetEvent ProceedEvent
102         {
103             get { return _proceedEvent; }
104         }
105
106
107         public void Start()
108         {
109             if (_agent != null)
110                 return;
111
112             if (Log.IsDebugEnabled)
113                 Log.Debug("Starting Network Agent");
114
115             _agent = Agent<CloudAction>.Start(inbox =>
116             {
117                 Action loop = null;
118                 loop = () =>
119                 {
120                     DeleteAgent.ProceedEvent.Wait();
121                     var message = inbox.Receive();
122                     var process=message.Then(Process,inbox.CancellationToken);
123                     inbox.LoopAsync(process, loop);
124                 };
125                 loop();
126             });
127
128         }
129
130         private async Task Process(CloudAction action)
131         {
132             if (action == null)
133                 throw new ArgumentNullException("action");
134             if (action.AccountInfo==null)
135                 throw new ArgumentException("The action.AccountInfo is empty","action");
136             Contract.EndContractBlock();
137
138
139
140
141             using (ThreadContext.Stacks["Operation"].Push(action.ToString()))
142             {                
143
144                 var cloudFile = action.CloudFile;
145                 var downloadPath = action.GetDownloadPath();
146
147                 try
148                 {
149                     StatusNotification.SetPithosStatus(PithosStatus.LocalSyncing,"Processing");
150                     _proceedEvent.Reset();
151                     
152                     var accountInfo = action.AccountInfo;
153
154                     if (action.Action == CloudActionType.DeleteCloud)
155                     {                        
156                         //Redirect deletes to the delete agent 
157                         DeleteAgent.Post((CloudDeleteAction)action);
158                     }
159                     if (DeleteAgent.IsDeletedFile(action))
160                     {
161                         //Clear the status of already deleted files to avoid reprocessing
162                         if (action.LocalFile != null)
163                             StatusKeeper.ClearFileStatus(action.LocalFile.FullName);
164                     }
165                     else
166                     {
167                         switch (action.Action)
168                         {
169                             case CloudActionType.UploadUnconditional:
170                                 //Abort if the file was deleted before we reached this point
171                                 await Uploader.UploadCloudFile(action);
172                                 break;
173                             case CloudActionType.DownloadUnconditional:
174                                 await Downloader.DownloadCloudFile(accountInfo, cloudFile, downloadPath);
175                                 break;
176                             case CloudActionType.RenameCloud:
177                                 var moveAction = (CloudMoveAction)action;
178                                 RenameCloudFile(accountInfo, moveAction);
179                                 break;
180                             case CloudActionType.RenameLocal:
181                                 RenameLocalFile(accountInfo, action);
182                                 break;
183                             case CloudActionType.MustSynch:
184                                 if (!File.Exists(downloadPath) && !Directory.Exists(downloadPath))
185                                 {
186                                     await Downloader.DownloadCloudFile(accountInfo, cloudFile, downloadPath);
187                                 }
188                                 else
189                                 {
190                                     await SyncFiles(accountInfo, action);
191                                 }
192                                 break;
193                         }
194                     }
195                     Log.InfoFormat("End Processing {0}:{1}->{2}", action.Action, action.LocalFile,
196                                            action.CloudFile.Name);
197                 }
198 /*
199                 catch (WebException exc)
200                 {                    
201                     Log.ErrorFormat("[WEB ERROR] {0} : {1} -> {2} due to exception\r\n{3}", action.Action, action.LocalFile, action.CloudFile, exc);
202                     
203                     
204                     //Actions that resulted in server errors should be retried                    
205                     var response = exc.Response as HttpWebResponse;
206                     if (response != null && response.StatusCode >= HttpStatusCode.InternalServerError)
207                     {
208                         _agent.Post(action);
209                         Log.WarnFormat("[REQUEUE] {0} : {1} -> {2}", action.Action, action.LocalFile, action.CloudFile);
210                     }
211                 }
212 */
213                 catch (OperationCanceledException)
214                 {
215                     throw;
216                 }
217                 catch (DirectoryNotFoundException)
218                 {
219                     Log.ErrorFormat("{0} : {1} -> {2}  failed because the directory was not found.\n Rescheduling a delete",
220                         action.Action, action.LocalFile, action.CloudFile);
221                     //Post a delete action for the missing file
222                     Post(new CloudDeleteAction(action));
223                 }
224                 catch (FileNotFoundException)
225                 {
226                     Log.ErrorFormat("{0} : {1} -> {2}  failed because the file was not found.\n Rescheduling a delete",
227                         action.Action, action.LocalFile, action.CloudFile);
228                     //Post a delete action for the missing file
229                     Post(new CloudDeleteAction(action));
230                 }
231                 catch (Exception exc)
232                 {
233                     Log.ErrorFormat("[REQUEUE] {0} : {1} -> {2} due to exception\r\n{3}",
234                                      action.Action, action.LocalFile, action.CloudFile, exc);
235
236                     _agent.Post(action);
237                 }
238                 finally
239                 {
240                     if (_agent.IsEmpty)
241                         _proceedEvent.Set();
242                     UpdateStatus(PithosStatus.LocalComplete);                                        
243                 }
244             }
245         }
246
247
248         private void UpdateStatus(PithosStatus status)
249         {
250             StatusNotification.SetPithosStatus(status);
251             //StatusNotification.Notify(new Notification());
252         }
253
254         private void RenameLocalFile(AccountInfo accountInfo, CloudAction action)
255         {
256             if (accountInfo == null)
257                 throw new ArgumentNullException("accountInfo");
258             if (action == null)
259                 throw new ArgumentNullException("action");
260             if (action.LocalFile == null)
261                 throw new ArgumentException("The action's local file is not specified", "action");
262             if (!Path.IsPathRooted(action.LocalFile.FullName))
263                 throw new ArgumentException("The action's local file path must be absolute", "action");
264             if (action.CloudFile == null)
265                 throw new ArgumentException("The action's cloud file is not specified", "action");
266             Contract.EndContractBlock();
267             using (ThreadContext.Stacks["Operation"].Push("RenameLocalFile"))
268             {
269
270                 //We assume that the local file already exists, otherwise the poll agent
271                 //would have issued a download request
272
273                 var currentInfo = action.CloudFile;
274                 var previousInfo = action.CloudFile.Previous;
275                 var fileAgent = FileAgent.GetFileAgent(accountInfo);
276
277                 var previousRelativepath = previousInfo.RelativeUrlToFilePath(accountInfo.UserName);
278                 var previousFile = fileAgent.GetFileSystemInfo(previousRelativepath);
279
280                 //In every case we need to move the local file first
281                 MoveLocalFile(accountInfo, previousFile, fileAgent, currentInfo);
282             }
283         }
284
285         private void MoveLocalFile(AccountInfo accountInfo, FileSystemInfo previousFile, FileAgent fileAgent,
286                                    ObjectInfo currentInfo)
287         {
288             var currentRelativepath = currentInfo.RelativeUrlToFilePath(accountInfo.UserName);
289             var newPath = Path.Combine(fileAgent.RootPath, currentRelativepath);
290
291             var isFile= (previousFile is FileInfo);
292             var previousFullPath = isFile? 
293                 FileInfoExtensions.GetProperFilePathCapitalization(previousFile.FullName):
294                 FileInfoExtensions.GetProperDirectoryCapitalization(previousFile.FullName);                
295             
296             using (NetworkGate.Acquire(previousFullPath, NetworkOperation.Renaming))
297             using (NetworkGate.Acquire(newPath,NetworkOperation.Renaming)) 
298             using (new SessionScope(FlushAction.Auto))
299             {
300                 if (isFile)
301                     (previousFile as FileInfo).MoveTo(newPath);
302                 else
303                 {
304                     (previousFile as DirectoryInfo).MoveTo(newPath);
305                 }
306                 var state = StatusKeeper.GetStateByFilePath(previousFullPath);
307                 state.FilePath = newPath;
308                 state.SaveCopy();
309                 StatusKeeper.SetFileState(previousFullPath,FileStatus.Deleted,FileOverlayStatus.Deleted);
310             }            
311         }
312
313         private async Task SyncFiles(AccountInfo accountInfo,CloudAction action)
314         {
315             if (accountInfo == null)
316                 throw new ArgumentNullException("accountInfo");
317             if (action==null)
318                 throw new ArgumentNullException("action");
319             if (action.LocalFile==null)
320                 throw new ArgumentException("The action's local file is not specified","action");
321             if (!Path.IsPathRooted(action.LocalFile.FullName))
322                 throw new ArgumentException("The action's local file path must be absolute","action");
323             if (action.CloudFile== null)
324                 throw new ArgumentException("The action's cloud file is not specified", "action");
325             Contract.EndContractBlock();
326             using (ThreadContext.Stacks["Operation"].Push("SyncFiles"))
327             {
328
329                 //var localFile = action.LocalFile;
330                 var cloudFile = action.CloudFile;
331                 var downloadPath = action.LocalFile.GetProperCapitalization();
332
333                 var cloudHash = cloudFile.Hash.ToLower();
334                 var previousCloudHash = cloudFile.PreviousHash.ToLower();
335                 var localHash = action.TreeHash.Value.TopHash.ToHashString();// LocalHash.Value.ToLower();
336                 //var topHash = action.TopHash.Value.ToLower();
337
338                 //At this point we know that an object has changed on the server and that a local
339                 //file already exists. We need to decide whether the file has only changed on 
340                 //the server or there is a conflicting change on the client.
341                 //
342
343                 //If the hashes match, we are done
344                 if (cloudFile != ObjectInfo.Empty && cloudHash == localHash)
345                 {
346                     Log.InfoFormat("Skipping {0}, hashes match", downloadPath);
347                     return;
348                 }
349
350                 //The hashes DON'T match. We need to sync
351
352                 // If the previous tophash matches the local tophash, the file was only changed on the server. 
353                 if (localHash == previousCloudHash)
354                 {
355                     await Downloader.DownloadCloudFile(accountInfo, cloudFile, downloadPath);
356                 }
357                 else
358                 {
359                     //If the previous and local hash don't match, there was a local conflict
360                     //that was not uploaded to the server. We have a conflict
361                     ReportConflict(downloadPath);
362                 }
363             }
364         }
365
366         private void ReportConflict(string downloadPath)
367         {
368             if (String.IsNullOrWhiteSpace(downloadPath))
369                 throw new ArgumentNullException("downloadPath");
370             Contract.EndContractBlock();
371
372             StatusKeeper.SetFileOverlayStatus(downloadPath, FileOverlayStatus.Conflict);
373             UpdateStatus(PithosStatus.HasConflicts);
374             var message = String.Format("Conflict detected for file {0}", downloadPath);
375             Log.Warn(message);
376             StatusNotification.NotifyChange(message, TraceLevel.Warning);
377         }
378
379         public void Post(CloudAction cloudAction)
380         {
381             if (cloudAction == null)
382                 throw new ArgumentNullException("cloudAction");
383             if (cloudAction.AccountInfo==null)
384                 throw new ArgumentException("The CloudAction.AccountInfo is empty","cloudAction");
385             Contract.EndContractBlock();
386
387             DeleteAgent.ProceedEvent.Wait();
388 /*
389
390             //If the action targets a local file, add a treehash calculation
391             if (!(cloudAction is CloudDeleteAction) && cloudAction.LocalFile as FileInfo != null)
392             {
393                 var accountInfo = cloudAction.AccountInfo;
394                 var localFile = (FileInfo) cloudAction.LocalFile;
395
396                 if (localFile.Length > accountInfo.BlockSize)
397                     cloudAction.TopHash =
398                         new Lazy<string>(() => Signature.CalculateTreeHashAsync(localFile,
399                                                                                 accountInfo.BlockSize,
400                                                                                 accountInfo.BlockHash, Settings.HashingParallelism).Result
401                                                     .TopHash.ToHashString());
402                 else
403                 {
404                     cloudAction.TopHash = new Lazy<string>(() => cloudAction.LocalHash.Value);
405                 }
406
407             }
408             else
409             {
410                 //The hash for a directory is the empty string
411                 cloudAction.TopHash = new Lazy<string>(() => String.Empty);
412             }
413 */
414             
415             if (cloudAction is CloudDeleteAction)
416                 DeleteAgent.Post((CloudDeleteAction)cloudAction);
417             else
418                 _agent.Post(cloudAction);
419         }
420        
421
422         public IEnumerable<CloudAction> GetEnumerable()
423         {
424             return _agent.GetEnumerable();
425         }
426
427         public Task GetDeleteAwaiter()
428         {
429             return DeleteAgent.ProceedEvent.WaitAsync();
430         }
431         public CancellationToken CancellationToken
432         {
433             get { return _agent.CancellationToken; }
434         }
435
436
437
438         private void RenameCloudFile(AccountInfo accountInfo,CloudMoveAction action)
439         {
440             if (accountInfo==null)
441                 throw new ArgumentNullException("accountInfo");
442             if (action==null)
443                 throw new ArgumentNullException("action");
444             if (action.CloudFile==null)
445                 throw new ArgumentException("CloudFile","action");
446             if (action.LocalFile==null)
447                 throw new ArgumentException("LocalFile","action");
448             if (action.OldLocalFile==null)
449                 throw new ArgumentException("OldLocalFile","action");
450             if (action.OldCloudFile==null)
451                 throw new ArgumentException("OldCloudFile","action");
452             Contract.EndContractBlock();
453
454             using (ThreadContext.Stacks["Operation"].Push("RenameCloudFile"))
455             {
456
457                 var newFilePath = action.LocalFile.FullName;
458
459                 //How do we handle concurrent renames and deletes/uploads/downloads?
460                 //* A conflicting upload means that a file was renamed before it had a chance to finish uploading
461                 //  This should never happen as the network agent executes only one action at a time
462                 //* A conflicting download means that the file was modified on the cloud. While we can go on and complete
463                 //  the rename, there may be a problem if the file is downloaded in blocks, as subsequent block requests for the 
464                 //  same name will fail.
465                 //  This should never happen as the network agent executes only one action at a time.
466                 //* A conflicting delete can happen if the rename was followed by a delete action that didn't have the chance
467                 //  to remove the rename from the queue.
468                 //  We can probably ignore this case. It will result in an error which should be ignored            
469
470
471                 //The local file is already renamed
472                 StatusKeeper.SetFileOverlayStatus(newFilePath, FileOverlayStatus.Modified);
473
474
475                 var account = action.CloudFile.Account ?? accountInfo.UserName;
476                 var container = action.CloudFile.Container;
477
478                 var client = new CloudFilesClient(accountInfo);
479                 //TODO: What code is returned when the source file doesn't exist?
480                 client.MoveObject(account, container, action.OldCloudFile.Name, container, action.CloudFile.Name);
481
482                 StatusKeeper.SetFileStatus(newFilePath, FileStatus.Unchanged);
483                 StatusKeeper.SetFileOverlayStatus(newFilePath, FileOverlayStatus.Normal);
484                 NativeMethods.RaiseChangeNotification(newFilePath);
485             }
486         }
487
488
489
490     }
491
492    
493
494
495 }