Modified selective sync to propagate the creation of new local directories and their...
[pithos-ms-client] / trunk / Pithos.Core / Agents / NetworkAgent.cs
1 #region
2 /* -----------------------------------------------------------------------
3  * <copyright file="NetworkAgent.cs" company="GRNet">
4  * 
5  * Copyright 2011-2012 GRNET S.A. All rights reserved.
6  *
7  * Redistribution and use in source and binary forms, with or
8  * without modification, are permitted provided that the following
9  * conditions are met:
10  *
11  *   1. Redistributions of source code must retain the above
12  *      copyright notice, this list of conditions and the following
13  *      disclaimer.
14  *
15  *   2. Redistributions in binary form must reproduce the above
16  *      copyright notice, this list of conditions and the following
17  *      disclaimer in the documentation and/or other materials
18  *      provided with the distribution.
19  *
20  *
21  * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
22  * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
23  * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
24  * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
25  * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
28  * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
29  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
30  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
31  * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
32  * POSSIBILITY OF SUCH DAMAGE.
33  *
34  * The views and conclusions contained in the software and
35  * documentation are those of the authors and should not be
36  * interpreted as representing official policies, either expressed
37  * or implied, of GRNET S.A.
38  * </copyright>
39  * -----------------------------------------------------------------------
40  */
41 #endregion
42
43 using System;
44 using System.Collections.Generic;
45 using System.ComponentModel.Composition;
46 using System.Diagnostics;
47 using System.Diagnostics.Contracts;
48 using System.IO;
49 using System.Linq;
50 using System.Net;
51 using System.Reflection;
52 using System.Threading;
53 using System.Threading.Tasks;
54 using Castle.ActiveRecord;
55 using Pithos.Interfaces;
56 using Pithos.Network;
57 using log4net;
58
59 namespace Pithos.Core.Agents
60 {
61     [Export]
62     public class NetworkAgent
63     {
64         private static readonly ILog Log = LogManager.GetLogger(MethodBase.GetCurrentMethod().DeclaringType);
65
66         private Agent<CloudAction> _agent;
67
68         [System.ComponentModel.Composition.Import]
69         private DeleteAgent DeleteAgent { get; set; }
70
71         [System.ComponentModel.Composition.Import]
72         public IStatusKeeper StatusKeeper { get; set; }
73
74         private IStatusNotification _statusNotification;
75         public IStatusNotification StatusNotification
76         {
77             get { return _statusNotification; }
78             set
79             {
80                 _statusNotification = value;
81                 DeleteAgent.StatusNotification = value;
82                 Uploader.StatusNotification = value;
83                 Downloader.StatusNotification = value;
84             }
85         }
86
87
88         [System.ComponentModel.Composition.Import]
89         public IPithosSettings Settings { get; set; }
90
91         private Uploader _uploader;
92
93         [System.ComponentModel.Composition.Import]
94         public Uploader Uploader
95         {
96             get { return _uploader; }
97             set
98             {
99                 _uploader = value;
100                 _uploader.UnpauseEvent = _unPauseEvent;                
101             }
102         }
103
104         private Downloader _downloader;
105
106         [System.ComponentModel.Composition.Import]
107         public Downloader Downloader
108         {
109             get { return _downloader; }
110             set
111             {
112                 _downloader = value;
113                 _downloader.UnpauseEvent = _unPauseEvent;
114             }
115         }
116
117         [System.ComponentModel.Composition.Import]
118         public Selectives Selectives { get; set; }
119         
120         //The Proceed signals the poll agent that it can proceed with polling. 
121         //Essentially it stops the poll agent to give priority to the network agent
122         //Initially the event is signalled because we don't need to pause
123         private readonly AsyncManualResetEvent _proceedEvent = new AsyncManualResetEvent(true);
124         private Agents.Selectives _selectives;
125         private bool _pause;
126
127         public AsyncManualResetEvent ProceedEvent
128         {
129             get { return _proceedEvent; }
130         }
131
132         private readonly AsyncManualResetEvent _unPauseEvent = new AsyncManualResetEvent(true);
133
134         private CancellationTokenSource _currentOperationCancellation=new CancellationTokenSource();
135
136         public void CancelCurrentOperation()
137         {
138             //What does it mean to cancel the current upload/download?
139             //Obviously, the current operation will be cancelled by throwing
140             //a cancellation exception.
141             //
142             //The default behavior is to retry any operations that throw.
143             //Obviously this is not what we want in this situation.
144             //The cancelled operation should NOT bea retried. 
145             //
146             //This can be done by catching the cancellation exception
147             //and avoiding the retry.
148             //
149
150             //Have to reset the cancellation source - it is not possible to reset the source
151             //Have to prevent a case where an operation requests a token from the old source
152             var oldSource = Interlocked.Exchange(ref _currentOperationCancellation, new CancellationTokenSource());
153             oldSource.Cancel();
154             
155         }
156
157         public void Start()
158         {
159             if (_agent != null)
160                 return;
161
162             if (Log.IsDebugEnabled)
163                 Log.Debug("Starting Network Agent");
164
165             _agent = Agent<CloudAction>.Start(inbox =>
166             {
167                 Action loop = null;
168                 loop = () =>
169                 {
170                     DeleteAgent.ProceedEvent.Wait();
171                     _unPauseEvent.Wait();
172                     var message = inbox.Receive();
173                     var process=message.Then(Process,inbox.CancellationToken);
174                     inbox.LoopAsync(process, loop);
175                 };
176                 loop();
177             });
178
179         }
180
181         private async Task Process(CloudAction action)
182         {
183             if (action == null)
184                 throw new ArgumentNullException("action");
185             if (action.AccountInfo==null)
186                 throw new ArgumentException("The action.AccountInfo is empty","action");
187             Contract.EndContractBlock();
188
189
190
191
192             using (ThreadContext.Stacks["Operation"].Push(action.ToString()))
193             {                
194
195                 var cloudFile = action.CloudFile;
196                 var downloadPath = action.GetDownloadPath();
197
198                 try
199                 {
200                     StatusNotification.SetPithosStatus(PithosStatus.LocalSyncing,"Processing");
201                     _proceedEvent.Reset();
202                     
203                     var accountInfo = action.AccountInfo;
204
205                     if (action.Action == CloudActionType.DeleteCloud)
206                     {                        
207                         //Redirect deletes to the delete agent 
208                         DeleteAgent.Post((CloudDeleteAction)action);
209                     }
210                     if (DeleteAgent.IsDeletedFile(action))
211                     {
212                         //Clear the status of already deleted files to avoid reprocessing
213                         if (action.LocalFile != null)
214                             StatusKeeper.ClearFileStatus(action.LocalFile.FullName);
215                     }
216                     else
217                     {
218                         switch (action.Action)
219                         {
220                             case CloudActionType.UploadUnconditional:
221                                 //Abort if the file was deleted before we reached this point
222                                 var uploadAction = (CloudUploadAction) action;
223                                 ProcessChildUploads(uploadAction);
224                                 await Uploader.UploadCloudFile(uploadAction ,CurrentOperationCancelToken);
225                                 break;
226                             case CloudActionType.DownloadUnconditional:
227                                 await Downloader.DownloadCloudFile(accountInfo, cloudFile, downloadPath, CurrentOperationCancelToken);
228                                 break;
229                             case CloudActionType.RenameCloud:
230                                 var moveAction = (CloudMoveAction)action;
231                                 RenameCloudFile(accountInfo, moveAction);
232                                 break;
233                             case CloudActionType.RenameLocal:
234                                 RenameLocalFile(accountInfo, action);
235                                 break;
236                             case CloudActionType.MustSynch:
237                                 if (!File.Exists(downloadPath) && !Directory.Exists(downloadPath))
238                                 {
239                                     await Downloader.DownloadCloudFile(accountInfo, cloudFile, downloadPath, CurrentOperationCancelToken);
240                                 }
241                                 else
242                                 {
243                                     await SyncFiles(accountInfo, action);
244                                 }
245                                 break;
246                         }
247                     }
248                     Log.InfoFormat("End Processing {0}:{1}->{2}", action.Action, action.LocalFile,
249                                            action.CloudFile.Name);
250                 }
251 /*
252                 catch (WebException exc)
253                 {                    
254                     Log.ErrorFormat("[WEB ERROR] {0} : {1} -> {2} due to exception\r\n{3}", action.Action, action.LocalFile, action.CloudFile, exc);
255                     
256                     
257                     //Actions that resulted in server errors should be retried                    
258                     var response = exc.Response as HttpWebResponse;
259                     if (response != null && response.StatusCode >= HttpStatusCode.InternalServerError)
260                     {
261                         _agent.Post(action);
262                         Log.WarnFormat("[REQUEUE] {0} : {1} -> {2}", action.Action, action.LocalFile, action.CloudFile);
263                     }
264                 }
265 */
266                 catch (OperationCanceledException ex)
267                 {                    
268                     Log.WarnFormat("Cancelling [{0}]",ex);
269                 }
270                 catch (DirectoryNotFoundException)
271                 {
272                     Log.ErrorFormat("{0} : {1} -> {2}  failed because the directory was not found.\n Rescheduling a delete",
273                         action.Action, action.LocalFile, action.CloudFile);
274                     //Post a delete action for the missing file
275                     Post(new CloudDeleteAction(action));
276                 }
277                 catch (FileNotFoundException)
278                 {
279                     Log.ErrorFormat("{0} : {1} -> {2}  failed because the file was not found.\n Rescheduling a delete",
280                         action.Action, action.LocalFile, action.CloudFile);
281                     //Post a delete action for the missing file
282                     Post(new CloudDeleteAction(action));
283                 }
284                 catch (Exception exc)
285                 {
286                     Log.ErrorFormat("[REQUEUE] {0} : {1} -> {2} due to exception\r\n{3}",
287                                      action.Action, action.LocalFile, action.CloudFile, exc);
288
289                     _agent.Post(action);
290                 }
291                 finally
292                 {
293                     if (_agent.IsEmpty)
294                         _proceedEvent.Set();
295                     UpdateStatus(PithosStatus.LocalComplete);                                        
296                 }
297             }
298         }
299
300         private void ProcessChildUploads(CloudUploadAction uploadAction)
301         {
302             if (!uploadAction.IsCreation || !(uploadAction.LocalFile is DirectoryInfo)) 
303                 return;
304
305             var dirInfo = uploadAction.LocalFile as DirectoryInfo;
306
307             var account = uploadAction.AccountInfo;
308             var actions = from file in dirInfo.EnumerateFiles("*", SearchOption.AllDirectories)
309                           select
310                               new CloudUploadAction(account, file, null, account.BlockSize, account.BlockHash,
311                                                     uploadAction, true);
312             foreach (var action in actions)
313             {
314                 var state=StatusKeeper.GetStateByFilePath(action.LocalFile.FullName);
315                 if (state!=null)
316                     state.Delete();
317                 //StatusKeeper.SetFileState(action.LocalFile.FullName,FileStatus.Created,FileOverlayStatus.Normal,String.Empty);
318                 state=FileState.CreateFor(action.LocalFile);
319                 //StatusKeeper.SetFileStatus();
320                 state.FileStatus = FileStatus.Created;
321                 state.OverlayStatus=FileOverlayStatus.Normal;
322                 state.Create();
323                 action.FileState = state;
324                 Post(action);
325             }
326         }
327
328         private CancellationToken CurrentOperationCancelToken
329         {
330             get { return _currentOperationCancellation.Token; }
331         }
332
333
334         private void UpdateStatus(PithosStatus status)
335         {
336             StatusNotification.SetPithosStatus(status);
337             //StatusNotification.Notify(new Notification());
338         }
339
340         private void RenameLocalFile(AccountInfo accountInfo, CloudAction action)
341         {
342             if (accountInfo == null)
343                 throw new ArgumentNullException("accountInfo");
344             if (action == null)
345                 throw new ArgumentNullException("action");
346             if (action.LocalFile == null)
347                 throw new ArgumentException("The action's local file is not specified", "action");
348             if (!Path.IsPathRooted(action.LocalFile.FullName))
349                 throw new ArgumentException("The action's local file path must be absolute", "action");
350             if (action.CloudFile == null)
351                 throw new ArgumentException("The action's cloud file is not specified", "action");
352             Contract.EndContractBlock();
353             using (ThreadContext.Stacks["Operation"].Push("RenameLocalFile"))
354             {
355
356                 //We assume that the local file already exists, otherwise the poll agent
357                 //would have issued a download request
358
359                 var currentInfo = action.CloudFile;
360                 var previousInfo = action.CloudFile.Previous;
361                 var fileAgent = FileAgent.GetFileAgent(accountInfo);
362
363                 var previousRelativepath = previousInfo.RelativeUrlToFilePath(accountInfo.UserName);
364                 var previousFile = fileAgent.GetFileSystemInfo(previousRelativepath);
365
366                 //In every case we need to move the local file first
367                 MoveLocalFile(accountInfo, previousFile, fileAgent, currentInfo);
368             }
369         }
370
371         private void MoveLocalFile(AccountInfo accountInfo, FileSystemInfo previousFile, FileAgent fileAgent,
372                                    ObjectInfo currentInfo)
373         {
374             var currentRelativepath = currentInfo.RelativeUrlToFilePath(accountInfo.UserName);
375             var newPath = Path.Combine(fileAgent.RootPath, currentRelativepath);
376
377             var isFile= (previousFile is FileInfo);
378             var previousFullPath = isFile? 
379                 FileInfoExtensions.GetProperFilePathCapitalization(previousFile.FullName):
380                 FileInfoExtensions.GetProperDirectoryCapitalization(previousFile.FullName);                
381             
382             using (NetworkGate.Acquire(previousFullPath, NetworkOperation.Renaming))
383             using (NetworkGate.Acquire(newPath,NetworkOperation.Renaming)) 
384             using (new SessionScope(FlushAction.Auto))
385             {
386                 if (isFile)
387                     (previousFile as FileInfo).MoveTo(newPath);
388                 else
389                 {
390                     (previousFile as DirectoryInfo).MoveTo(newPath);
391                 }
392                 var state = StatusKeeper.GetStateByFilePath(previousFullPath);
393                 state.FilePath = newPath;
394                 state.SaveCopy();
395                 StatusKeeper.SetFileState(previousFullPath,FileStatus.Deleted,FileOverlayStatus.Deleted, "Deleted");
396             }            
397         }
398
399         private async Task SyncFiles(AccountInfo accountInfo,CloudAction action)
400         {
401             if (accountInfo == null)
402                 throw new ArgumentNullException("accountInfo");
403             if (action==null)
404                 throw new ArgumentNullException("action");
405             if (action.LocalFile==null)
406                 throw new ArgumentException("The action's local file is not specified","action");
407             if (!Path.IsPathRooted(action.LocalFile.FullName))
408                 throw new ArgumentException("The action's local file path must be absolute","action");
409             if (action.CloudFile== null)
410                 throw new ArgumentException("The action's cloud file is not specified", "action");
411             Contract.EndContractBlock();
412             using (ThreadContext.Stacks["Operation"].Push("SyncFiles"))
413             {
414
415                 //var localFile = action.LocalFile;
416                 var cloudFile = action.CloudFile;
417                 var downloadPath = action.LocalFile.GetProperCapitalization();
418
419                 var cloudHash = cloudFile.Hash.ToLower();
420                 var previousCloudHash = cloudFile.PreviousHash == null?null: cloudFile.PreviousHash.ToLower();
421                 var localHash = action.TreeHash.Value.TopHash.ToHashString();// LocalHash.Value.ToLower();
422                 //var topHash = action.TopHash.Value.ToLower();
423
424                 if(cloudFile.IsDirectory && action.LocalFile is DirectoryInfo)
425                 {
426                     Log.InfoFormat("Skipping folder {0} , exists in server", downloadPath);
427                     return;
428                 }
429
430                 //At this point we know that an object has changed on the server and that a local
431                 //file already exists. We need to decide whether the file has only changed on 
432                 //the server or there is a conflicting change on the client.
433                 //
434
435                 //If the hashes match, we are done
436                 if (cloudFile != ObjectInfo.Empty && cloudHash == localHash)
437                 {
438                     Log.InfoFormat("Skipping {0}, hashes match", downloadPath);
439                     return;
440                 }
441
442                 //If the local and remote files have 0 length their hashes will not match
443                 if (!cloudFile.IsDirectory && cloudFile.Bytes==0 && action.LocalFile is FileInfo && (action.LocalFile as FileInfo).Length==0 )
444                 {
445                     Log.InfoFormat("Skipping {0}, files are empty", downloadPath);
446                     return;
447                 }
448
449                 //The hashes DON'T match. We need to sync
450
451                 // If the previous tophash matches the local tophash, the file was only changed on the server. 
452                 if (localHash == previousCloudHash)
453                 {
454                     await Downloader.DownloadCloudFile(accountInfo, cloudFile, downloadPath, CurrentOperationCancelToken);
455                 }
456                 else
457                 {
458                     //If the previous and local hash don't match, there was a local conflict
459                     //that was not uploaded to the server. We have a conflict
460                     ReportConflictForMismatch(downloadPath);
461                 }
462             }
463         }
464
465         private void ReportConflictForMismatch(string downloadPath)
466         {
467             if (String.IsNullOrWhiteSpace(downloadPath))
468                 throw new ArgumentNullException("downloadPath");
469             Contract.EndContractBlock();
470
471             StatusKeeper.SetFileState(downloadPath,FileStatus.Conflict, FileOverlayStatus.Conflict,"File changed at the server");
472             UpdateStatus(PithosStatus.HasConflicts);
473             var message = String.Format("Conflict detected for file {0}", downloadPath);
474             Log.Warn(message);
475             StatusNotification.NotifyChange(message, TraceLevel.Warning);
476         }
477
478         public void Post(CloudAction cloudAction)
479         {
480             if (cloudAction == null)
481                 throw new ArgumentNullException("cloudAction");
482             if (cloudAction.AccountInfo==null)
483                 throw new ArgumentException("The CloudAction.AccountInfo is empty","cloudAction");
484             Contract.EndContractBlock();
485
486             DeleteAgent.ProceedEvent.Wait();
487             
488             if (cloudAction is CloudDeleteAction)
489                 DeleteAgent.Post((CloudDeleteAction)cloudAction);
490             else
491                 _agent.Post(cloudAction);
492         }
493        
494
495         public IEnumerable<CloudAction> GetEnumerable()
496         {
497             return _agent.GetEnumerable();
498         }
499
500         public Task GetDeleteAwaiter()
501         {
502             return DeleteAgent.ProceedEvent.WaitAsync();
503         }
504         public CancellationToken CancellationToken
505         {
506             get { return _agent.CancellationToken; }
507         }
508
509         public bool Pause
510         {
511             get {
512                 return _pause;
513             }
514             set {
515                 _pause = value;
516                 if (_pause)
517                     _unPauseEvent.Reset();
518                 else
519                 {
520                     _unPauseEvent.Set();
521                 }
522             }
523         }
524
525
526         private void RenameCloudFile(AccountInfo accountInfo,CloudMoveAction action)
527         {
528             if (accountInfo==null)
529                 throw new ArgumentNullException("accountInfo");
530             if (action==null)
531                 throw new ArgumentNullException("action");
532             if (action.CloudFile==null)
533                 throw new ArgumentException("CloudFile","action");
534             if (action.LocalFile==null)
535                 throw new ArgumentException("LocalFile","action");
536             if (action.OldLocalFile==null)
537                 throw new ArgumentException("OldLocalFile","action");
538             if (action.OldCloudFile==null)
539                 throw new ArgumentException("OldCloudFile","action");
540             Contract.EndContractBlock();
541
542             using (ThreadContext.Stacks["Operation"].Push("RenameCloudFile"))
543             {
544
545                 var newFilePath = action.LocalFile.FullName;
546
547                 //How do we handle concurrent renames and deletes/uploads/downloads?
548                 //* A conflicting upload means that a file was renamed before it had a chance to finish uploading
549                 //  This should never happen as the network agent executes only one action at a time
550                 //* A conflicting download means that the file was modified on the cloud. While we can go on and complete
551                 //  the rename, there may be a problem if the file is downloaded in blocks, as subsequent block requests for the 
552                 //  same name will fail.
553                 //  This should never happen as the network agent executes only one action at a time.
554                 //* A conflicting delete can happen if the rename was followed by a delete action that didn't have the chance
555                 //  to remove the rename from the queue.
556                 //  We can probably ignore this case. It will result in an error which should be ignored            
557
558
559                 //The local file is already renamed
560                 StatusKeeper.SetFileOverlayStatus(newFilePath, FileOverlayStatus.Modified).Wait();
561
562
563                 var account = action.CloudFile.Account ?? accountInfo.UserName;
564                 var container = action.CloudFile.Container;
565
566                 var client = new CloudFilesClient(accountInfo);
567                 //TODO: What code is returned when the source file doesn't exist?
568                 client.MoveObject(account, container, action.OldCloudFile.Name, container, action.CloudFile.Name);
569
570                 StatusKeeper.SetFileStatus(newFilePath, FileStatus.Unchanged);
571                 StatusKeeper.SetFileOverlayStatus(newFilePath, FileOverlayStatus.Normal).Wait();
572                 NativeMethods.RaiseChangeNotification(newFilePath);
573             }
574         }
575
576
577         
578
579     }
580
581    
582
583
584 }