Replaced Merkle hash with MD5 for change checking
[pithos-ms-client] / trunk / Pithos.Core / Agents / NetworkAgent.cs
index 0bdddd1..47e0ddc 100644 (file)
@@ -46,6 +46,7 @@ using System.ComponentModel.Composition;
 using System.Diagnostics;
 using System.Diagnostics.Contracts;
 using System.IO;
+using System.Linq;
 using System.Net;
 using System.Reflection;
 using System.Threading;
@@ -62,10 +63,10 @@ namespace Pithos.Core.Agents
     {
         private static readonly ILog Log = LogManager.GetLogger(MethodBase.GetCurrentMethod().DeclaringType);
 
-        private Agent<CloudAction> _agent;
+        //private Agent<CloudAction> _agent;
 
         [System.ComponentModel.Composition.Import]
-        private DeleteAgent DeleteAgent { get; set; }
+        public DeleteAgent DeleteAgent { get; set; }
 
         [System.ComponentModel.Composition.Import]
         public IStatusKeeper StatusKeeper { get; set; }
@@ -87,24 +88,72 @@ namespace Pithos.Core.Agents
         [System.ComponentModel.Composition.Import]
         public IPithosSettings Settings { get; set; }
 
+        private Uploader _uploader;
+
         [System.ComponentModel.Composition.Import]
-        public Uploader Uploader { get; set; }
+        public Uploader Uploader
+        {
+            get { return _uploader; }
+            set
+            {
+                _uploader = value;
+                _uploader.UnpauseEvent = _unPauseEvent;                
+            }
+        }
+
+        private Downloader _downloader;
 
         [System.ComponentModel.Composition.Import]
-        public Downloader Downloader { get; set; }
+        public Downloader Downloader
+        {
+            get { return _downloader; }
+            set
+            {
+                _downloader = value;
+                _downloader.UnpauseEvent = _unPauseEvent;
+            }
+        }
 
+        [System.ComponentModel.Composition.Import]
+        public Selectives Selectives { get; set; }
+        
         //The Proceed signals the poll agent that it can proceed with polling. 
         //Essentially it stops the poll agent to give priority to the network agent
         //Initially the event is signalled because we don't need to pause
         private readonly AsyncManualResetEvent _proceedEvent = new AsyncManualResetEvent(true);
+        private bool _pause;
 
         public AsyncManualResetEvent ProceedEvent
         {
             get { return _proceedEvent; }
         }
 
+        private readonly AsyncManualResetEvent _unPauseEvent = new AsyncManualResetEvent(true);
+
+        private CancellationTokenSource _currentOperationCancellation=new CancellationTokenSource();
+
+        public void CancelCurrentOperation()
+        {
+            //What does it mean to cancel the current upload/download?
+            //Obviously, the current operation will be cancelled by throwing
+            //a cancellation exception.
+            //
+            //The default behavior is to retry any operations that throw.
+            //Obviously this is not what we want in this situation.
+            //The cancelled operation should NOT bea retried. 
+            //
+            //This can be done by catching the cancellation exception
+            //and avoiding the retry.
+            //
+
+            //Have to reset the cancellation source - it is not possible to reset the source
+            //Have to prevent a case where an operation requests a token from the old source
+            var oldSource = Interlocked.Exchange(ref _currentOperationCancellation, new CancellationTokenSource());
+            oldSource.Cancel();
+            
+        }
 
-        public void Start()
+        /*public void Start()
         {
             if (_agent != null)
                 return;
@@ -118,6 +167,7 @@ namespace Pithos.Core.Agents
                 loop = () =>
                 {
                     DeleteAgent.ProceedEvent.Wait();
+                    _unPauseEvent.Wait();
                     var message = inbox.Receive();
                     var process=message.Then(Process,inbox.CancellationToken);
                     inbox.LoopAsync(process, loop);
@@ -125,8 +175,9 @@ namespace Pithos.Core.Agents
                 loop();
             });
 
-        }
+        }*/
 
+/*
         private async Task Process(CloudAction action)
         {
             if (action == null)
@@ -168,10 +219,12 @@ namespace Pithos.Core.Agents
                         {
                             case CloudActionType.UploadUnconditional:
                                 //Abort if the file was deleted before we reached this point
-                                await Uploader.UploadCloudFile(action);
+                                var uploadAction = (CloudUploadAction) action;
+                                ProcessChildUploads(uploadAction);
+                                await Uploader.UploadCloudFile(uploadAction ,CurrentOperationCancelToken);
                                 break;
                             case CloudActionType.DownloadUnconditional:
-                                await Downloader.DownloadCloudFile(accountInfo, cloudFile, downloadPath);
+                                await Downloader.DownloadCloudFile(accountInfo, cloudFile, downloadPath, CurrentOperationCancelToken);
                                 break;
                             case CloudActionType.RenameCloud:
                                 var moveAction = (CloudMoveAction)action;
@@ -183,7 +236,7 @@ namespace Pithos.Core.Agents
                             case CloudActionType.MustSynch:
                                 if (!File.Exists(downloadPath) && !Directory.Exists(downloadPath))
                                 {
-                                    await Downloader.DownloadCloudFile(accountInfo, cloudFile, downloadPath);
+                                    await Downloader.DownloadCloudFile(accountInfo, cloudFile, downloadPath, CurrentOperationCancelToken);
                                 }
                                 else
                                 {
@@ -209,10 +262,10 @@ namespace Pithos.Core.Agents
                         Log.WarnFormat("[REQUEUE] {0} : {1} -> {2}", action.Action, action.LocalFile, action.CloudFile);
                     }
                 }
-*/
-                catch (OperationCanceledException)
-                {
-                    throw;
+#1#
+                catch (OperationCanceledException ex)
+                {                    
+                    Log.WarnFormat("Cancelling [{0}]",ex);
                 }
                 catch (DirectoryNotFoundException)
                 {
@@ -243,6 +296,50 @@ namespace Pithos.Core.Agents
                 }
             }
         }
+*/
+
+    /*    private void ProcessChildUploads(CloudUploadAction uploadAction)
+        {
+            if (!uploadAction.IsCreation || !(uploadAction.LocalFile is DirectoryInfo)) 
+                return;
+
+            var dirInfo = uploadAction.LocalFile as DirectoryInfo;
+
+            var account = uploadAction.AccountInfo;
+            var folderActions = from info in dirInfo.EnumerateDirectories("*", SearchOption.AllDirectories)                          
+                          select
+                              new CloudUploadAction(account, info, null, account.BlockSize, account.BlockHash,
+                                                    uploadAction, true);
+            var fileActions = from info in dirInfo.EnumerateFiles("*", SearchOption.AllDirectories)                          
+                          select
+                              new CloudUploadAction(account, info, null, account.BlockSize, account.BlockHash,
+                                                    uploadAction, true);            
+            //Post folder actions first, to ensure the selective folders are updated
+            folderActions.ApplyAction(PostUploadAction);
+            fileActions.ApplyAction(PostUploadAction);            
+        }
+*/
+/*
+        private void PostUploadAction(CloudUploadAction action)
+        {
+            var state = StatusKeeper.GetStateByFilePath(action.LocalFile.FullName);
+            if (state != null)
+                state.Delete();
+            //StatusKeeper.SetFileState(action.LocalFile.FullName,FileStatus.Created,FileOverlayStatus.Normal,String.Empty);
+            state = FileState.CreateFor(action.LocalFile);
+            //StatusKeeper.SetFileStatus();
+            state.FileStatus = FileStatus.Created;
+            state.OverlayStatus = FileOverlayStatus.Normal;
+            state.Create();
+            action.FileState = state;
+            Post(action);
+        }
+*/
+
+        public CancellationToken CurrentOperationCancelToken
+        {
+            get { return _currentOperationCancellation.Token; }
+        }
 
 
         private void UpdateStatus(PithosStatus status)
@@ -310,7 +407,7 @@ namespace Pithos.Core.Agents
             }            
         }
 
-        private async Task SyncFiles(AccountInfo accountInfo,CloudAction action)
+/*        private async Task SyncFiles(AccountInfo accountInfo,CloudAction action)
         {
             if (accountInfo == null)
                 throw new ArgumentNullException("accountInfo");
@@ -330,11 +427,17 @@ namespace Pithos.Core.Agents
                 var cloudFile = action.CloudFile;
                 var downloadPath = action.LocalFile.GetProperCapitalization();
 
-                var cloudHash = cloudFile.Hash.ToLower();
-                var previousCloudHash = cloudFile.PreviousHash.ToLower();
+                var cloudHash = cloudFile.X_Object_Hash.ToLower();
+                var previousCloudHash = cloudFile.PreviousHash == null?null: cloudFile.PreviousHash.ToLower();
                 var localHash = action.TreeHash.Value.TopHash.ToHashString();// LocalHash.Value.ToLower();
                 //var topHash = action.TopHash.Value.ToLower();
 
+                if(cloudFile.IsDirectory && action.LocalFile is DirectoryInfo)
+                {
+                    Log.InfoFormat("Skipping folder {0} , exists in server", downloadPath);
+                    return;
+                }
+
                 //At this point we know that an object has changed on the server and that a local
                 //file already exists. We need to decide whether the file has only changed on 
                 //the server or there is a conflicting change on the client.
@@ -347,12 +450,19 @@ namespace Pithos.Core.Agents
                     return;
                 }
 
+                //If the local and remote files have 0 length their hashes will not match
+                if (!cloudFile.IsDirectory && cloudFile.Bytes==0 && action.LocalFile is FileInfo && (action.LocalFile as FileInfo).Length==0 )
+                {
+                    Log.InfoFormat("Skipping {0}, files are empty", downloadPath);
+                    return;
+                }
+
                 //The hashes DON'T match. We need to sync
 
                 // If the previous tophash matches the local tophash, the file was only changed on the server. 
                 if (localHash == previousCloudHash)
                 {
-                    await Downloader.DownloadCloudFile(accountInfo, cloudFile, downloadPath);
+                    await Downloader.DownloadCloudFile(accountInfo, cloudFile, downloadPath CurrentOperationCancelToken);
                 }
                 else
                 {
@@ -361,7 +471,7 @@ namespace Pithos.Core.Agents
                     ReportConflictForMismatch(downloadPath);
                 }
             }
-        }
+        }*/
 
         private void ReportConflictForMismatch(string downloadPath)
         {
@@ -369,13 +479,14 @@ namespace Pithos.Core.Agents
                 throw new ArgumentNullException("downloadPath");
             Contract.EndContractBlock();
 
-            StatusKeeper.SetFileOverlayStatus(downloadPath, FileOverlayStatus.Conflict);
+            StatusKeeper.SetFileState(downloadPath,FileStatus.Conflict, FileOverlayStatus.Conflict,"File changed at the server");
             UpdateStatus(PithosStatus.HasConflicts);
             var message = String.Format("Conflict detected for file {0}", downloadPath);
             Log.Warn(message);
             StatusNotification.NotifyChange(message, TraceLevel.Warning);
         }
 
+/*
         public void Post(CloudAction cloudAction)
         {
             if (cloudAction == null)
@@ -385,54 +496,48 @@ namespace Pithos.Core.Agents
             Contract.EndContractBlock();
 
             DeleteAgent.ProceedEvent.Wait();
-/*
-
-            //If the action targets a local file, add a treehash calculation
-            if (!(cloudAction is CloudDeleteAction) && cloudAction.LocalFile as FileInfo != null)
-            {
-                var accountInfo = cloudAction.AccountInfo;
-                var localFile = (FileInfo) cloudAction.LocalFile;
-
-                if (localFile.Length > accountInfo.BlockSize)
-                    cloudAction.TopHash =
-                        new Lazy<string>(() => Signature.CalculateTreeHashAsync(localFile,
-                                                                                accountInfo.BlockSize,
-                                                                                accountInfo.BlockHash, Settings.HashingParallelism).Result
-                                                    .TopHash.ToHashString());
-                else
-                {
-                    cloudAction.TopHash = new Lazy<string>(() => cloudAction.LocalHash.Value);
-                }
-
-            }
-            else
-            {
-                //The hash for a directory is the empty string
-                cloudAction.TopHash = new Lazy<string>(() => String.Empty);
-            }
-*/
             
             if (cloudAction is CloudDeleteAction)
                 DeleteAgent.Post((CloudDeleteAction)cloudAction);
             else
                 _agent.Post(cloudAction);
         }
+*/
        
 
+/*
         public IEnumerable<CloudAction> GetEnumerable()
         {
             return _agent.GetEnumerable();
         }
+*/
 
         public Task GetDeleteAwaiter()
         {
             return DeleteAgent.ProceedEvent.WaitAsync();
         }
+/*
         public CancellationToken CancellationToken
         {
             get { return _agent.CancellationToken; }
         }
+*/
 
+        public bool Pause
+        {
+            get {
+                return _pause;
+            }
+            set {
+                _pause = value;
+                if (_pause)
+                    _unPauseEvent.Reset();
+                else
+                {
+                    _unPauseEvent.Set();
+                }
+            }
+        }
 
 
         private void RenameCloudFile(AccountInfo accountInfo,CloudMoveAction action)
@@ -469,7 +574,7 @@ namespace Pithos.Core.Agents
 
 
                 //The local file is already renamed
-                StatusKeeper.SetFileOverlayStatus(newFilePath, FileOverlayStatus.Modified);
+                StatusKeeper.SetFileOverlayStatus(newFilePath, FileOverlayStatus.Modified).Wait();
 
 
                 var account = action.CloudFile.Account ?? accountInfo.UserName;
@@ -480,12 +585,13 @@ namespace Pithos.Core.Agents
                 client.MoveObject(account, container, action.OldCloudFile.Name, container, action.CloudFile.Name);
 
                 StatusKeeper.SetFileStatus(newFilePath, FileStatus.Unchanged);
-                StatusKeeper.SetFileOverlayStatus(newFilePath, FileOverlayStatus.Normal);
+                StatusKeeper.SetFileOverlayStatus(newFilePath, FileOverlayStatus.Normal).Wait();
                 NativeMethods.RaiseChangeNotification(newFilePath);
             }
         }
 
 
+        
 
     }