Fix to upload both folder and file contents, when uploading a new unselected root...
[pithos-ms-client] / trunk / Pithos.Core / Agents / NetworkAgent.cs
1 #region
2 /* -----------------------------------------------------------------------
3  * <copyright file="NetworkAgent.cs" company="GRNet">
4  * 
5  * Copyright 2011-2012 GRNET S.A. All rights reserved.
6  *
7  * Redistribution and use in source and binary forms, with or
8  * without modification, are permitted provided that the following
9  * conditions are met:
10  *
11  *   1. Redistributions of source code must retain the above
12  *      copyright notice, this list of conditions and the following
13  *      disclaimer.
14  *
15  *   2. Redistributions in binary form must reproduce the above
16  *      copyright notice, this list of conditions and the following
17  *      disclaimer in the documentation and/or other materials
18  *      provided with the distribution.
19  *
20  *
21  * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
22  * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
23  * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
24  * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
25  * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
28  * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
29  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
30  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
31  * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
32  * POSSIBILITY OF SUCH DAMAGE.
33  *
34  * The views and conclusions contained in the software and
35  * documentation are those of the authors and should not be
36  * interpreted as representing official policies, either expressed
37  * or implied, of GRNET S.A.
38  * </copyright>
39  * -----------------------------------------------------------------------
40  */
41 #endregion
42
43 using System;
44 using System.Collections.Generic;
45 using System.ComponentModel.Composition;
46 using System.Diagnostics;
47 using System.Diagnostics.Contracts;
48 using System.IO;
49 using System.Linq;
50 using System.Net;
51 using System.Reflection;
52 using System.Threading;
53 using System.Threading.Tasks;
54 using Castle.ActiveRecord;
55 using Pithos.Interfaces;
56 using Pithos.Network;
57 using log4net;
58
59 namespace Pithos.Core.Agents
60 {
61     [Export]
62     public class NetworkAgent
63     {
64         private static readonly ILog Log = LogManager.GetLogger(MethodBase.GetCurrentMethod().DeclaringType);
65
66         private Agent<CloudAction> _agent;
67
68         [System.ComponentModel.Composition.Import]
69         private DeleteAgent DeleteAgent { get; set; }
70
71         [System.ComponentModel.Composition.Import]
72         public IStatusKeeper StatusKeeper { get; set; }
73
74         private IStatusNotification _statusNotification;
75         public IStatusNotification StatusNotification
76         {
77             get { return _statusNotification; }
78             set
79             {
80                 _statusNotification = value;
81                 DeleteAgent.StatusNotification = value;
82                 Uploader.StatusNotification = value;
83                 Downloader.StatusNotification = value;
84             }
85         }
86
87
88         [System.ComponentModel.Composition.Import]
89         public IPithosSettings Settings { get; set; }
90
91         private Uploader _uploader;
92
93         [System.ComponentModel.Composition.Import]
94         public Uploader Uploader
95         {
96             get { return _uploader; }
97             set
98             {
99                 _uploader = value;
100                 _uploader.UnpauseEvent = _unPauseEvent;                
101             }
102         }
103
104         private Downloader _downloader;
105
106         [System.ComponentModel.Composition.Import]
107         public Downloader Downloader
108         {
109             get { return _downloader; }
110             set
111             {
112                 _downloader = value;
113                 _downloader.UnpauseEvent = _unPauseEvent;
114             }
115         }
116
117         [System.ComponentModel.Composition.Import]
118         public Selectives Selectives { get; set; }
119         
120         //The Proceed signals the poll agent that it can proceed with polling. 
121         //Essentially it stops the poll agent to give priority to the network agent
122         //Initially the event is signalled because we don't need to pause
123         private readonly AsyncManualResetEvent _proceedEvent = new AsyncManualResetEvent(true);
124         private Agents.Selectives _selectives;
125         private bool _pause;
126
127         public AsyncManualResetEvent ProceedEvent
128         {
129             get { return _proceedEvent; }
130         }
131
132         private readonly AsyncManualResetEvent _unPauseEvent = new AsyncManualResetEvent(true);
133
134         private CancellationTokenSource _currentOperationCancellation=new CancellationTokenSource();
135
136         public void CancelCurrentOperation()
137         {
138             //What does it mean to cancel the current upload/download?
139             //Obviously, the current operation will be cancelled by throwing
140             //a cancellation exception.
141             //
142             //The default behavior is to retry any operations that throw.
143             //Obviously this is not what we want in this situation.
144             //The cancelled operation should NOT bea retried. 
145             //
146             //This can be done by catching the cancellation exception
147             //and avoiding the retry.
148             //
149
150             //Have to reset the cancellation source - it is not possible to reset the source
151             //Have to prevent a case where an operation requests a token from the old source
152             var oldSource = Interlocked.Exchange(ref _currentOperationCancellation, new CancellationTokenSource());
153             oldSource.Cancel();
154             
155         }
156
157         public void Start()
158         {
159             if (_agent != null)
160                 return;
161
162             if (Log.IsDebugEnabled)
163                 Log.Debug("Starting Network Agent");
164
165             _agent = Agent<CloudAction>.Start(inbox =>
166             {
167                 Action loop = null;
168                 loop = () =>
169                 {
170                     DeleteAgent.ProceedEvent.Wait();
171                     _unPauseEvent.Wait();
172                     var message = inbox.Receive();
173                     var process=message.Then(Process,inbox.CancellationToken);
174                     inbox.LoopAsync(process, loop);
175                 };
176                 loop();
177             });
178
179         }
180
181         private async Task Process(CloudAction action)
182         {
183             if (action == null)
184                 throw new ArgumentNullException("action");
185             if (action.AccountInfo==null)
186                 throw new ArgumentException("The action.AccountInfo is empty","action");
187             Contract.EndContractBlock();
188
189
190
191
192             using (ThreadContext.Stacks["Operation"].Push(action.ToString()))
193             {                
194
195                 var cloudFile = action.CloudFile;
196                 var downloadPath = action.GetDownloadPath();
197
198                 try
199                 {
200                     StatusNotification.SetPithosStatus(PithosStatus.LocalSyncing,"Processing");
201                     _proceedEvent.Reset();
202                     
203                     var accountInfo = action.AccountInfo;
204
205                     if (action.Action == CloudActionType.DeleteCloud)
206                     {                        
207                         //Redirect deletes to the delete agent 
208                         DeleteAgent.Post((CloudDeleteAction)action);
209                     }
210                     if (DeleteAgent.IsDeletedFile(action))
211                     {
212                         //Clear the status of already deleted files to avoid reprocessing
213                         if (action.LocalFile != null)
214                             StatusKeeper.ClearFileStatus(action.LocalFile.FullName);
215                     }
216                     else
217                     {
218                         switch (action.Action)
219                         {
220                             case CloudActionType.UploadUnconditional:
221                                 //Abort if the file was deleted before we reached this point
222                                 var uploadAction = (CloudUploadAction) action;
223                                 ProcessChildUploads(uploadAction);
224                                 await Uploader.UploadCloudFile(uploadAction ,CurrentOperationCancelToken);
225                                 break;
226                             case CloudActionType.DownloadUnconditional:
227                                 await Downloader.DownloadCloudFile(accountInfo, cloudFile, downloadPath, CurrentOperationCancelToken);
228                                 break;
229                             case CloudActionType.RenameCloud:
230                                 var moveAction = (CloudMoveAction)action;
231                                 RenameCloudFile(accountInfo, moveAction);
232                                 break;
233                             case CloudActionType.RenameLocal:
234                                 RenameLocalFile(accountInfo, action);
235                                 break;
236                             case CloudActionType.MustSynch:
237                                 if (!File.Exists(downloadPath) && !Directory.Exists(downloadPath))
238                                 {
239                                     await Downloader.DownloadCloudFile(accountInfo, cloudFile, downloadPath, CurrentOperationCancelToken);
240                                 }
241                                 else
242                                 {
243                                     await SyncFiles(accountInfo, action);
244                                 }
245                                 break;
246                         }
247                     }
248                     Log.InfoFormat("End Processing {0}:{1}->{2}", action.Action, action.LocalFile,
249                                            action.CloudFile.Name);
250                 }
251 /*
252                 catch (WebException exc)
253                 {                    
254                     Log.ErrorFormat("[WEB ERROR] {0} : {1} -> {2} due to exception\r\n{3}", action.Action, action.LocalFile, action.CloudFile, exc);
255                     
256                     
257                     //Actions that resulted in server errors should be retried                    
258                     var response = exc.Response as HttpWebResponse;
259                     if (response != null && response.StatusCode >= HttpStatusCode.InternalServerError)
260                     {
261                         _agent.Post(action);
262                         Log.WarnFormat("[REQUEUE] {0} : {1} -> {2}", action.Action, action.LocalFile, action.CloudFile);
263                     }
264                 }
265 */
266                 catch (OperationCanceledException ex)
267                 {                    
268                     Log.WarnFormat("Cancelling [{0}]",ex);
269                 }
270                 catch (DirectoryNotFoundException)
271                 {
272                     Log.ErrorFormat("{0} : {1} -> {2}  failed because the directory was not found.\n Rescheduling a delete",
273                         action.Action, action.LocalFile, action.CloudFile);
274                     //Post a delete action for the missing file
275                     Post(new CloudDeleteAction(action));
276                 }
277                 catch (FileNotFoundException)
278                 {
279                     Log.ErrorFormat("{0} : {1} -> {2}  failed because the file was not found.\n Rescheduling a delete",
280                         action.Action, action.LocalFile, action.CloudFile);
281                     //Post a delete action for the missing file
282                     Post(new CloudDeleteAction(action));
283                 }
284                 catch (Exception exc)
285                 {
286                     Log.ErrorFormat("[REQUEUE] {0} : {1} -> {2} due to exception\r\n{3}",
287                                      action.Action, action.LocalFile, action.CloudFile, exc);
288
289                     _agent.Post(action);
290                 }
291                 finally
292                 {
293                     if (_agent.IsEmpty)
294                         _proceedEvent.Set();
295                     UpdateStatus(PithosStatus.LocalComplete);                                        
296                 }
297             }
298         }
299
300         private void ProcessChildUploads(CloudUploadAction uploadAction)
301         {
302             if (!uploadAction.IsCreation || !(uploadAction.LocalFile is DirectoryInfo)) 
303                 return;
304
305             var dirInfo = uploadAction.LocalFile as DirectoryInfo;
306
307             var account = uploadAction.AccountInfo;
308             var folderActions = from info in dirInfo.EnumerateDirectories("*", SearchOption.AllDirectories)                          
309                           select
310                               new CloudUploadAction(account, info, null, account.BlockSize, account.BlockHash,
311                                                     uploadAction, true);
312             var fileActions = from info in dirInfo.EnumerateFiles("*", SearchOption.AllDirectories)                          
313                           select
314                               new CloudUploadAction(account, info, null, account.BlockSize, account.BlockHash,
315                                                     uploadAction, true);            
316             //Post folder actions first, to ensure the selective folders are updated
317             folderActions.ApplyAction(PostUploadAction);
318             fileActions.ApplyAction(PostUploadAction);            
319         }
320
321         private void PostUploadAction(CloudUploadAction action)
322         {
323             var state = StatusKeeper.GetStateByFilePath(action.LocalFile.FullName);
324             if (state != null)
325                 state.Delete();
326             //StatusKeeper.SetFileState(action.LocalFile.FullName,FileStatus.Created,FileOverlayStatus.Normal,String.Empty);
327             state = FileState.CreateFor(action.LocalFile);
328             //StatusKeeper.SetFileStatus();
329             state.FileStatus = FileStatus.Created;
330             state.OverlayStatus = FileOverlayStatus.Normal;
331             state.Create();
332             action.FileState = state;
333             Post(action);
334         }
335
336         private CancellationToken CurrentOperationCancelToken
337         {
338             get { return _currentOperationCancellation.Token; }
339         }
340
341
342         private void UpdateStatus(PithosStatus status)
343         {
344             StatusNotification.SetPithosStatus(status);
345             //StatusNotification.Notify(new Notification());
346         }
347
348         private void RenameLocalFile(AccountInfo accountInfo, CloudAction action)
349         {
350             if (accountInfo == null)
351                 throw new ArgumentNullException("accountInfo");
352             if (action == null)
353                 throw new ArgumentNullException("action");
354             if (action.LocalFile == null)
355                 throw new ArgumentException("The action's local file is not specified", "action");
356             if (!Path.IsPathRooted(action.LocalFile.FullName))
357                 throw new ArgumentException("The action's local file path must be absolute", "action");
358             if (action.CloudFile == null)
359                 throw new ArgumentException("The action's cloud file is not specified", "action");
360             Contract.EndContractBlock();
361             using (ThreadContext.Stacks["Operation"].Push("RenameLocalFile"))
362             {
363
364                 //We assume that the local file already exists, otherwise the poll agent
365                 //would have issued a download request
366
367                 var currentInfo = action.CloudFile;
368                 var previousInfo = action.CloudFile.Previous;
369                 var fileAgent = FileAgent.GetFileAgent(accountInfo);
370
371                 var previousRelativepath = previousInfo.RelativeUrlToFilePath(accountInfo.UserName);
372                 var previousFile = fileAgent.GetFileSystemInfo(previousRelativepath);
373
374                 //In every case we need to move the local file first
375                 MoveLocalFile(accountInfo, previousFile, fileAgent, currentInfo);
376             }
377         }
378
379         private void MoveLocalFile(AccountInfo accountInfo, FileSystemInfo previousFile, FileAgent fileAgent,
380                                    ObjectInfo currentInfo)
381         {
382             var currentRelativepath = currentInfo.RelativeUrlToFilePath(accountInfo.UserName);
383             var newPath = Path.Combine(fileAgent.RootPath, currentRelativepath);
384
385             var isFile= (previousFile is FileInfo);
386             var previousFullPath = isFile? 
387                 FileInfoExtensions.GetProperFilePathCapitalization(previousFile.FullName):
388                 FileInfoExtensions.GetProperDirectoryCapitalization(previousFile.FullName);                
389             
390             using (NetworkGate.Acquire(previousFullPath, NetworkOperation.Renaming))
391             using (NetworkGate.Acquire(newPath,NetworkOperation.Renaming)) 
392             using (new SessionScope(FlushAction.Auto))
393             {
394                 if (isFile)
395                     (previousFile as FileInfo).MoveTo(newPath);
396                 else
397                 {
398                     (previousFile as DirectoryInfo).MoveTo(newPath);
399                 }
400                 var state = StatusKeeper.GetStateByFilePath(previousFullPath);
401                 state.FilePath = newPath;
402                 state.SaveCopy();
403                 StatusKeeper.SetFileState(previousFullPath,FileStatus.Deleted,FileOverlayStatus.Deleted, "Deleted");
404             }            
405         }
406
407         private async Task SyncFiles(AccountInfo accountInfo,CloudAction action)
408         {
409             if (accountInfo == null)
410                 throw new ArgumentNullException("accountInfo");
411             if (action==null)
412                 throw new ArgumentNullException("action");
413             if (action.LocalFile==null)
414                 throw new ArgumentException("The action's local file is not specified","action");
415             if (!Path.IsPathRooted(action.LocalFile.FullName))
416                 throw new ArgumentException("The action's local file path must be absolute","action");
417             if (action.CloudFile== null)
418                 throw new ArgumentException("The action's cloud file is not specified", "action");
419             Contract.EndContractBlock();
420             using (ThreadContext.Stacks["Operation"].Push("SyncFiles"))
421             {
422
423                 //var localFile = action.LocalFile;
424                 var cloudFile = action.CloudFile;
425                 var downloadPath = action.LocalFile.GetProperCapitalization();
426
427                 var cloudHash = cloudFile.Hash.ToLower();
428                 var previousCloudHash = cloudFile.PreviousHash == null?null: cloudFile.PreviousHash.ToLower();
429                 var localHash = action.TreeHash.Value.TopHash.ToHashString();// LocalHash.Value.ToLower();
430                 //var topHash = action.TopHash.Value.ToLower();
431
432                 if(cloudFile.IsDirectory && action.LocalFile is DirectoryInfo)
433                 {
434                     Log.InfoFormat("Skipping folder {0} , exists in server", downloadPath);
435                     return;
436                 }
437
438                 //At this point we know that an object has changed on the server and that a local
439                 //file already exists. We need to decide whether the file has only changed on 
440                 //the server or there is a conflicting change on the client.
441                 //
442
443                 //If the hashes match, we are done
444                 if (cloudFile != ObjectInfo.Empty && cloudHash == localHash)
445                 {
446                     Log.InfoFormat("Skipping {0}, hashes match", downloadPath);
447                     return;
448                 }
449
450                 //If the local and remote files have 0 length their hashes will not match
451                 if (!cloudFile.IsDirectory && cloudFile.Bytes==0 && action.LocalFile is FileInfo && (action.LocalFile as FileInfo).Length==0 )
452                 {
453                     Log.InfoFormat("Skipping {0}, files are empty", downloadPath);
454                     return;
455                 }
456
457                 //The hashes DON'T match. We need to sync
458
459                 // If the previous tophash matches the local tophash, the file was only changed on the server. 
460                 if (localHash == previousCloudHash)
461                 {
462                     await Downloader.DownloadCloudFile(accountInfo, cloudFile, downloadPath, CurrentOperationCancelToken);
463                 }
464                 else
465                 {
466                     //If the previous and local hash don't match, there was a local conflict
467                     //that was not uploaded to the server. We have a conflict
468                     ReportConflictForMismatch(downloadPath);
469                 }
470             }
471         }
472
473         private void ReportConflictForMismatch(string downloadPath)
474         {
475             if (String.IsNullOrWhiteSpace(downloadPath))
476                 throw new ArgumentNullException("downloadPath");
477             Contract.EndContractBlock();
478
479             StatusKeeper.SetFileState(downloadPath,FileStatus.Conflict, FileOverlayStatus.Conflict,"File changed at the server");
480             UpdateStatus(PithosStatus.HasConflicts);
481             var message = String.Format("Conflict detected for file {0}", downloadPath);
482             Log.Warn(message);
483             StatusNotification.NotifyChange(message, TraceLevel.Warning);
484         }
485
486         public void Post(CloudAction cloudAction)
487         {
488             if (cloudAction == null)
489                 throw new ArgumentNullException("cloudAction");
490             if (cloudAction.AccountInfo==null)
491                 throw new ArgumentException("The CloudAction.AccountInfo is empty","cloudAction");
492             Contract.EndContractBlock();
493
494             DeleteAgent.ProceedEvent.Wait();
495             
496             if (cloudAction is CloudDeleteAction)
497                 DeleteAgent.Post((CloudDeleteAction)cloudAction);
498             else
499                 _agent.Post(cloudAction);
500         }
501        
502
503         public IEnumerable<CloudAction> GetEnumerable()
504         {
505             return _agent.GetEnumerable();
506         }
507
508         public Task GetDeleteAwaiter()
509         {
510             return DeleteAgent.ProceedEvent.WaitAsync();
511         }
512         public CancellationToken CancellationToken
513         {
514             get { return _agent.CancellationToken; }
515         }
516
517         public bool Pause
518         {
519             get {
520                 return _pause;
521             }
522             set {
523                 _pause = value;
524                 if (_pause)
525                     _unPauseEvent.Reset();
526                 else
527                 {
528                     _unPauseEvent.Set();
529                 }
530             }
531         }
532
533
534         private void RenameCloudFile(AccountInfo accountInfo,CloudMoveAction action)
535         {
536             if (accountInfo==null)
537                 throw new ArgumentNullException("accountInfo");
538             if (action==null)
539                 throw new ArgumentNullException("action");
540             if (action.CloudFile==null)
541                 throw new ArgumentException("CloudFile","action");
542             if (action.LocalFile==null)
543                 throw new ArgumentException("LocalFile","action");
544             if (action.OldLocalFile==null)
545                 throw new ArgumentException("OldLocalFile","action");
546             if (action.OldCloudFile==null)
547                 throw new ArgumentException("OldCloudFile","action");
548             Contract.EndContractBlock();
549
550             using (ThreadContext.Stacks["Operation"].Push("RenameCloudFile"))
551             {
552
553                 var newFilePath = action.LocalFile.FullName;
554
555                 //How do we handle concurrent renames and deletes/uploads/downloads?
556                 //* A conflicting upload means that a file was renamed before it had a chance to finish uploading
557                 //  This should never happen as the network agent executes only one action at a time
558                 //* A conflicting download means that the file was modified on the cloud. While we can go on and complete
559                 //  the rename, there may be a problem if the file is downloaded in blocks, as subsequent block requests for the 
560                 //  same name will fail.
561                 //  This should never happen as the network agent executes only one action at a time.
562                 //* A conflicting delete can happen if the rename was followed by a delete action that didn't have the chance
563                 //  to remove the rename from the queue.
564                 //  We can probably ignore this case. It will result in an error which should be ignored            
565
566
567                 //The local file is already renamed
568                 StatusKeeper.SetFileOverlayStatus(newFilePath, FileOverlayStatus.Modified).Wait();
569
570
571                 var account = action.CloudFile.Account ?? accountInfo.UserName;
572                 var container = action.CloudFile.Container;
573
574                 var client = new CloudFilesClient(accountInfo);
575                 //TODO: What code is returned when the source file doesn't exist?
576                 client.MoveObject(account, container, action.OldCloudFile.Name, container, action.CloudFile.Name);
577
578                 StatusKeeper.SetFileStatus(newFilePath, FileStatus.Unchanged);
579                 StatusKeeper.SetFileOverlayStatus(newFilePath, FileOverlayStatus.Normal).Wait();
580                 NativeMethods.RaiseChangeNotification(newFilePath);
581             }
582         }
583
584
585         
586
587     }
588
589    
590
591
592 }