Replaced Merkle hash with MD5 for change checking
[pithos-ms-client] / trunk / Pithos.Core / Agents / NetworkAgent.cs
1 #region
2 /* -----------------------------------------------------------------------
3  * <copyright file="NetworkAgent.cs" company="GRNet">
4  * 
5  * Copyright 2011-2012 GRNET S.A. All rights reserved.
6  *
7  * Redistribution and use in source and binary forms, with or
8  * without modification, are permitted provided that the following
9  * conditions are met:
10  *
11  *   1. Redistributions of source code must retain the above
12  *      copyright notice, this list of conditions and the following
13  *      disclaimer.
14  *
15  *   2. Redistributions in binary form must reproduce the above
16  *      copyright notice, this list of conditions and the following
17  *      disclaimer in the documentation and/or other materials
18  *      provided with the distribution.
19  *
20  *
21  * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
22  * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
23  * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
24  * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
25  * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
28  * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
29  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
30  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
31  * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
32  * POSSIBILITY OF SUCH DAMAGE.
33  *
34  * The views and conclusions contained in the software and
35  * documentation are those of the authors and should not be
36  * interpreted as representing official policies, either expressed
37  * or implied, of GRNET S.A.
38  * </copyright>
39  * -----------------------------------------------------------------------
40  */
41 #endregion
42
43 using System;
44 using System.Collections.Generic;
45 using System.ComponentModel.Composition;
46 using System.Diagnostics;
47 using System.Diagnostics.Contracts;
48 using System.IO;
49 using System.Linq;
50 using System.Net;
51 using System.Reflection;
52 using System.Threading;
53 using System.Threading.Tasks;
54 using Castle.ActiveRecord;
55 using Pithos.Interfaces;
56 using Pithos.Network;
57 using log4net;
58
59 namespace Pithos.Core.Agents
60 {
61     [Export]
62     public class NetworkAgent
63     {
64         private static readonly ILog Log = LogManager.GetLogger(MethodBase.GetCurrentMethod().DeclaringType);
65
66         //private Agent<CloudAction> _agent;
67
68         [System.ComponentModel.Composition.Import]
69         public DeleteAgent DeleteAgent { get; set; }
70
71         [System.ComponentModel.Composition.Import]
72         public IStatusKeeper StatusKeeper { get; set; }
73
74         private IStatusNotification _statusNotification;
75         public IStatusNotification StatusNotification
76         {
77             get { return _statusNotification; }
78             set
79             {
80                 _statusNotification = value;
81                 DeleteAgent.StatusNotification = value;
82                 Uploader.StatusNotification = value;
83                 Downloader.StatusNotification = value;
84             }
85         }
86
87
88         [System.ComponentModel.Composition.Import]
89         public IPithosSettings Settings { get; set; }
90
91         private Uploader _uploader;
92
93         [System.ComponentModel.Composition.Import]
94         public Uploader Uploader
95         {
96             get { return _uploader; }
97             set
98             {
99                 _uploader = value;
100                 _uploader.UnpauseEvent = _unPauseEvent;                
101             }
102         }
103
104         private Downloader _downloader;
105
106         [System.ComponentModel.Composition.Import]
107         public Downloader Downloader
108         {
109             get { return _downloader; }
110             set
111             {
112                 _downloader = value;
113                 _downloader.UnpauseEvent = _unPauseEvent;
114             }
115         }
116
117         [System.ComponentModel.Composition.Import]
118         public Selectives Selectives { get; set; }
119         
120         //The Proceed signals the poll agent that it can proceed with polling. 
121         //Essentially it stops the poll agent to give priority to the network agent
122         //Initially the event is signalled because we don't need to pause
123         private readonly AsyncManualResetEvent _proceedEvent = new AsyncManualResetEvent(true);
124         private bool _pause;
125
126         public AsyncManualResetEvent ProceedEvent
127         {
128             get { return _proceedEvent; }
129         }
130
131         private readonly AsyncManualResetEvent _unPauseEvent = new AsyncManualResetEvent(true);
132
133         private CancellationTokenSource _currentOperationCancellation=new CancellationTokenSource();
134
135         public void CancelCurrentOperation()
136         {
137             //What does it mean to cancel the current upload/download?
138             //Obviously, the current operation will be cancelled by throwing
139             //a cancellation exception.
140             //
141             //The default behavior is to retry any operations that throw.
142             //Obviously this is not what we want in this situation.
143             //The cancelled operation should NOT bea retried. 
144             //
145             //This can be done by catching the cancellation exception
146             //and avoiding the retry.
147             //
148
149             //Have to reset the cancellation source - it is not possible to reset the source
150             //Have to prevent a case where an operation requests a token from the old source
151             var oldSource = Interlocked.Exchange(ref _currentOperationCancellation, new CancellationTokenSource());
152             oldSource.Cancel();
153             
154         }
155
156         /*public void Start()
157         {
158             if (_agent != null)
159                 return;
160
161             if (Log.IsDebugEnabled)
162                 Log.Debug("Starting Network Agent");
163
164             _agent = Agent<CloudAction>.Start(inbox =>
165             {
166                 Action loop = null;
167                 loop = () =>
168                 {
169                     DeleteAgent.ProceedEvent.Wait();
170                     _unPauseEvent.Wait();
171                     var message = inbox.Receive();
172                     var process=message.Then(Process,inbox.CancellationToken);
173                     inbox.LoopAsync(process, loop);
174                 };
175                 loop();
176             });
177
178         }*/
179
180 /*
181         private async Task Process(CloudAction action)
182         {
183             if (action == null)
184                 throw new ArgumentNullException("action");
185             if (action.AccountInfo==null)
186                 throw new ArgumentException("The action.AccountInfo is empty","action");
187             Contract.EndContractBlock();
188
189
190
191
192             using (ThreadContext.Stacks["Operation"].Push(action.ToString()))
193             {                
194
195                 var cloudFile = action.CloudFile;
196                 var downloadPath = action.GetDownloadPath();
197
198                 try
199                 {
200                     StatusNotification.SetPithosStatus(PithosStatus.LocalSyncing,"Processing");
201                     _proceedEvent.Reset();
202                     
203                     var accountInfo = action.AccountInfo;
204
205                     if (action.Action == CloudActionType.DeleteCloud)
206                     {                        
207                         //Redirect deletes to the delete agent 
208                         DeleteAgent.Post((CloudDeleteAction)action);
209                     }
210                     if (DeleteAgent.IsDeletedFile(action))
211                     {
212                         //Clear the status of already deleted files to avoid reprocessing
213                         if (action.LocalFile != null)
214                             StatusKeeper.ClearFileStatus(action.LocalFile.FullName);
215                     }
216                     else
217                     {
218                         switch (action.Action)
219                         {
220                             case CloudActionType.UploadUnconditional:
221                                 //Abort if the file was deleted before we reached this point
222                                 var uploadAction = (CloudUploadAction) action;
223                                 ProcessChildUploads(uploadAction);
224                                 await Uploader.UploadCloudFile(uploadAction ,CurrentOperationCancelToken);
225                                 break;
226                             case CloudActionType.DownloadUnconditional:
227                                 await Downloader.DownloadCloudFile(accountInfo, cloudFile, downloadPath, CurrentOperationCancelToken);
228                                 break;
229                             case CloudActionType.RenameCloud:
230                                 var moveAction = (CloudMoveAction)action;
231                                 RenameCloudFile(accountInfo, moveAction);
232                                 break;
233                             case CloudActionType.RenameLocal:
234                                 RenameLocalFile(accountInfo, action);
235                                 break;
236                             case CloudActionType.MustSynch:
237                                 if (!File.Exists(downloadPath) && !Directory.Exists(downloadPath))
238                                 {
239                                     await Downloader.DownloadCloudFile(accountInfo, cloudFile, downloadPath, CurrentOperationCancelToken);
240                                 }
241                                 else
242                                 {
243                                     await SyncFiles(accountInfo, action);
244                                 }
245                                 break;
246                         }
247                     }
248                     Log.InfoFormat("End Processing {0}:{1}->{2}", action.Action, action.LocalFile,
249                                            action.CloudFile.Name);
250                 }
251 /*
252                 catch (WebException exc)
253                 {                    
254                     Log.ErrorFormat("[WEB ERROR] {0} : {1} -> {2} due to exception\r\n{3}", action.Action, action.LocalFile, action.CloudFile, exc);
255                     
256                     
257                     //Actions that resulted in server errors should be retried                    
258                     var response = exc.Response as HttpWebResponse;
259                     if (response != null && response.StatusCode >= HttpStatusCode.InternalServerError)
260                     {
261                         _agent.Post(action);
262                         Log.WarnFormat("[REQUEUE] {0} : {1} -> {2}", action.Action, action.LocalFile, action.CloudFile);
263                     }
264                 }
265 #1#
266                 catch (OperationCanceledException ex)
267                 {                    
268                     Log.WarnFormat("Cancelling [{0}]",ex);
269                 }
270                 catch (DirectoryNotFoundException)
271                 {
272                     Log.ErrorFormat("{0} : {1} -> {2}  failed because the directory was not found.\n Rescheduling a delete",
273                         action.Action, action.LocalFile, action.CloudFile);
274                     //Post a delete action for the missing file
275                     Post(new CloudDeleteAction(action));
276                 }
277                 catch (FileNotFoundException)
278                 {
279                     Log.ErrorFormat("{0} : {1} -> {2}  failed because the file was not found.\n Rescheduling a delete",
280                         action.Action, action.LocalFile, action.CloudFile);
281                     //Post a delete action for the missing file
282                     Post(new CloudDeleteAction(action));
283                 }
284                 catch (Exception exc)
285                 {
286                     Log.ErrorFormat("[REQUEUE] {0} : {1} -> {2} due to exception\r\n{3}",
287                                      action.Action, action.LocalFile, action.CloudFile, exc);
288
289                     _agent.Post(action);
290                 }
291                 finally
292                 {
293                     if (_agent.IsEmpty)
294                         _proceedEvent.Set();
295                     UpdateStatus(PithosStatus.LocalComplete);                                        
296                 }
297             }
298         }
299 */
300
301     /*    private void ProcessChildUploads(CloudUploadAction uploadAction)
302         {
303             if (!uploadAction.IsCreation || !(uploadAction.LocalFile is DirectoryInfo)) 
304                 return;
305
306             var dirInfo = uploadAction.LocalFile as DirectoryInfo;
307
308             var account = uploadAction.AccountInfo;
309             var folderActions = from info in dirInfo.EnumerateDirectories("*", SearchOption.AllDirectories)                          
310                           select
311                               new CloudUploadAction(account, info, null, account.BlockSize, account.BlockHash,
312                                                     uploadAction, true);
313             var fileActions = from info in dirInfo.EnumerateFiles("*", SearchOption.AllDirectories)                          
314                           select
315                               new CloudUploadAction(account, info, null, account.BlockSize, account.BlockHash,
316                                                     uploadAction, true);            
317             //Post folder actions first, to ensure the selective folders are updated
318             folderActions.ApplyAction(PostUploadAction);
319             fileActions.ApplyAction(PostUploadAction);            
320         }
321 */
322 /*
323         private void PostUploadAction(CloudUploadAction action)
324         {
325             var state = StatusKeeper.GetStateByFilePath(action.LocalFile.FullName);
326             if (state != null)
327                 state.Delete();
328             //StatusKeeper.SetFileState(action.LocalFile.FullName,FileStatus.Created,FileOverlayStatus.Normal,String.Empty);
329             state = FileState.CreateFor(action.LocalFile);
330             //StatusKeeper.SetFileStatus();
331             state.FileStatus = FileStatus.Created;
332             state.OverlayStatus = FileOverlayStatus.Normal;
333             state.Create();
334             action.FileState = state;
335             Post(action);
336         }
337 */
338
339         public CancellationToken CurrentOperationCancelToken
340         {
341             get { return _currentOperationCancellation.Token; }
342         }
343
344
345         private void UpdateStatus(PithosStatus status)
346         {
347             StatusNotification.SetPithosStatus(status);
348             //StatusNotification.Notify(new Notification());
349         }
350
351         private void RenameLocalFile(AccountInfo accountInfo, CloudAction action)
352         {
353             if (accountInfo == null)
354                 throw new ArgumentNullException("accountInfo");
355             if (action == null)
356                 throw new ArgumentNullException("action");
357             if (action.LocalFile == null)
358                 throw new ArgumentException("The action's local file is not specified", "action");
359             if (!Path.IsPathRooted(action.LocalFile.FullName))
360                 throw new ArgumentException("The action's local file path must be absolute", "action");
361             if (action.CloudFile == null)
362                 throw new ArgumentException("The action's cloud file is not specified", "action");
363             Contract.EndContractBlock();
364             using (ThreadContext.Stacks["Operation"].Push("RenameLocalFile"))
365             {
366
367                 //We assume that the local file already exists, otherwise the poll agent
368                 //would have issued a download request
369
370                 var currentInfo = action.CloudFile;
371                 var previousInfo = action.CloudFile.Previous;
372                 var fileAgent = FileAgent.GetFileAgent(accountInfo);
373
374                 var previousRelativepath = previousInfo.RelativeUrlToFilePath(accountInfo.UserName);
375                 var previousFile = fileAgent.GetFileSystemInfo(previousRelativepath);
376
377                 //In every case we need to move the local file first
378                 MoveLocalFile(accountInfo, previousFile, fileAgent, currentInfo);
379             }
380         }
381
382         private void MoveLocalFile(AccountInfo accountInfo, FileSystemInfo previousFile, FileAgent fileAgent,
383                                    ObjectInfo currentInfo)
384         {
385             var currentRelativepath = currentInfo.RelativeUrlToFilePath(accountInfo.UserName);
386             var newPath = Path.Combine(fileAgent.RootPath, currentRelativepath);
387
388             var isFile= (previousFile is FileInfo);
389             var previousFullPath = isFile? 
390                 FileInfoExtensions.GetProperFilePathCapitalization(previousFile.FullName):
391                 FileInfoExtensions.GetProperDirectoryCapitalization(previousFile.FullName);                
392             
393             using (NetworkGate.Acquire(previousFullPath, NetworkOperation.Renaming))
394             using (NetworkGate.Acquire(newPath,NetworkOperation.Renaming)) 
395             using (new SessionScope(FlushAction.Auto))
396             {
397                 if (isFile)
398                     (previousFile as FileInfo).MoveTo(newPath);
399                 else
400                 {
401                     (previousFile as DirectoryInfo).MoveTo(newPath);
402                 }
403                 var state = StatusKeeper.GetStateByFilePath(previousFullPath);
404                 state.FilePath = newPath;
405                 state.SaveCopy();
406                 StatusKeeper.SetFileState(previousFullPath,FileStatus.Deleted,FileOverlayStatus.Deleted, "Deleted");
407             }            
408         }
409
410 /*        private async Task SyncFiles(AccountInfo accountInfo,CloudAction action)
411         {
412             if (accountInfo == null)
413                 throw new ArgumentNullException("accountInfo");
414             if (action==null)
415                 throw new ArgumentNullException("action");
416             if (action.LocalFile==null)
417                 throw new ArgumentException("The action's local file is not specified","action");
418             if (!Path.IsPathRooted(action.LocalFile.FullName))
419                 throw new ArgumentException("The action's local file path must be absolute","action");
420             if (action.CloudFile== null)
421                 throw new ArgumentException("The action's cloud file is not specified", "action");
422             Contract.EndContractBlock();
423             using (ThreadContext.Stacks["Operation"].Push("SyncFiles"))
424             {
425
426                 //var localFile = action.LocalFile;
427                 var cloudFile = action.CloudFile;
428                 var downloadPath = action.LocalFile.GetProperCapitalization();
429
430                 var cloudHash = cloudFile.X_Object_Hash.ToLower();
431                 var previousCloudHash = cloudFile.PreviousHash == null?null: cloudFile.PreviousHash.ToLower();
432                 var localHash = action.TreeHash.Value.TopHash.ToHashString();// LocalHash.Value.ToLower();
433                 //var topHash = action.TopHash.Value.ToLower();
434
435                 if(cloudFile.IsDirectory && action.LocalFile is DirectoryInfo)
436                 {
437                     Log.InfoFormat("Skipping folder {0} , exists in server", downloadPath);
438                     return;
439                 }
440
441                 //At this point we know that an object has changed on the server and that a local
442                 //file already exists. We need to decide whether the file has only changed on 
443                 //the server or there is a conflicting change on the client.
444                 //
445
446                 //If the hashes match, we are done
447                 if (cloudFile != ObjectInfo.Empty && cloudHash == localHash)
448                 {
449                     Log.InfoFormat("Skipping {0}, hashes match", downloadPath);
450                     return;
451                 }
452
453                 //If the local and remote files have 0 length their hashes will not match
454                 if (!cloudFile.IsDirectory && cloudFile.Bytes==0 && action.LocalFile is FileInfo && (action.LocalFile as FileInfo).Length==0 )
455                 {
456                     Log.InfoFormat("Skipping {0}, files are empty", downloadPath);
457                     return;
458                 }
459
460                 //The hashes DON'T match. We need to sync
461
462                 // If the previous tophash matches the local tophash, the file was only changed on the server. 
463                 if (localHash == previousCloudHash)
464                 {
465                     await Downloader.DownloadCloudFile(accountInfo, cloudFile, downloadPath CurrentOperationCancelToken);
466                 }
467                 else
468                 {
469                     //If the previous and local hash don't match, there was a local conflict
470                     //that was not uploaded to the server. We have a conflict
471                     ReportConflictForMismatch(downloadPath);
472                 }
473             }
474         }*/
475
476         private void ReportConflictForMismatch(string downloadPath)
477         {
478             if (String.IsNullOrWhiteSpace(downloadPath))
479                 throw new ArgumentNullException("downloadPath");
480             Contract.EndContractBlock();
481
482             StatusKeeper.SetFileState(downloadPath,FileStatus.Conflict, FileOverlayStatus.Conflict,"File changed at the server");
483             UpdateStatus(PithosStatus.HasConflicts);
484             var message = String.Format("Conflict detected for file {0}", downloadPath);
485             Log.Warn(message);
486             StatusNotification.NotifyChange(message, TraceLevel.Warning);
487         }
488
489 /*
490         public void Post(CloudAction cloudAction)
491         {
492             if (cloudAction == null)
493                 throw new ArgumentNullException("cloudAction");
494             if (cloudAction.AccountInfo==null)
495                 throw new ArgumentException("The CloudAction.AccountInfo is empty","cloudAction");
496             Contract.EndContractBlock();
497
498             DeleteAgent.ProceedEvent.Wait();
499             
500             if (cloudAction is CloudDeleteAction)
501                 DeleteAgent.Post((CloudDeleteAction)cloudAction);
502             else
503                 _agent.Post(cloudAction);
504         }
505 */
506        
507
508 /*
509         public IEnumerable<CloudAction> GetEnumerable()
510         {
511             return _agent.GetEnumerable();
512         }
513 */
514
515         public Task GetDeleteAwaiter()
516         {
517             return DeleteAgent.ProceedEvent.WaitAsync();
518         }
519 /*
520         public CancellationToken CancellationToken
521         {
522             get { return _agent.CancellationToken; }
523         }
524 */
525
526         public bool Pause
527         {
528             get {
529                 return _pause;
530             }
531             set {
532                 _pause = value;
533                 if (_pause)
534                     _unPauseEvent.Reset();
535                 else
536                 {
537                     _unPauseEvent.Set();
538                 }
539             }
540         }
541
542
543         private void RenameCloudFile(AccountInfo accountInfo,CloudMoveAction action)
544         {
545             if (accountInfo==null)
546                 throw new ArgumentNullException("accountInfo");
547             if (action==null)
548                 throw new ArgumentNullException("action");
549             if (action.CloudFile==null)
550                 throw new ArgumentException("CloudFile","action");
551             if (action.LocalFile==null)
552                 throw new ArgumentException("LocalFile","action");
553             if (action.OldLocalFile==null)
554                 throw new ArgumentException("OldLocalFile","action");
555             if (action.OldCloudFile==null)
556                 throw new ArgumentException("OldCloudFile","action");
557             Contract.EndContractBlock();
558
559             using (ThreadContext.Stacks["Operation"].Push("RenameCloudFile"))
560             {
561
562                 var newFilePath = action.LocalFile.FullName;
563
564                 //How do we handle concurrent renames and deletes/uploads/downloads?
565                 //* A conflicting upload means that a file was renamed before it had a chance to finish uploading
566                 //  This should never happen as the network agent executes only one action at a time
567                 //* A conflicting download means that the file was modified on the cloud. While we can go on and complete
568                 //  the rename, there may be a problem if the file is downloaded in blocks, as subsequent block requests for the 
569                 //  same name will fail.
570                 //  This should never happen as the network agent executes only one action at a time.
571                 //* A conflicting delete can happen if the rename was followed by a delete action that didn't have the chance
572                 //  to remove the rename from the queue.
573                 //  We can probably ignore this case. It will result in an error which should be ignored            
574
575
576                 //The local file is already renamed
577                 StatusKeeper.SetFileOverlayStatus(newFilePath, FileOverlayStatus.Modified).Wait();
578
579
580                 var account = action.CloudFile.Account ?? accountInfo.UserName;
581                 var container = action.CloudFile.Container;
582
583                 var client = new CloudFilesClient(accountInfo);
584                 //TODO: What code is returned when the source file doesn't exist?
585                 client.MoveObject(account, container, action.OldCloudFile.Name, container, action.CloudFile.Name);
586
587                 StatusKeeper.SetFileStatus(newFilePath, FileStatus.Unchanged);
588                 StatusKeeper.SetFileOverlayStatus(newFilePath, FileOverlayStatus.Normal).Wait();
589                 NativeMethods.RaiseChangeNotification(newFilePath);
590             }
591         }
592
593
594         
595
596     }
597
598    
599
600
601 }