Added BlockUpdater.cs to perform block updates in a separate class. Will include...
authorPanagiotis Kanavos <pkanavos@gmail.com>
Sun, 9 Oct 2011 16:45:17 +0000 (19:45 +0300)
committerPanagiotis Kanavos <pkanavos@gmail.com>
Sun, 9 Oct 2011 16:45:17 +0000 (19:45 +0300)
Simplified agent loop code

trunk/Pithos.Core/Agents/Agent.cs
trunk/Pithos.Core/Agents/BlockUpdater.cs [new file with mode: 0644]
trunk/Pithos.Core/Agents/FileAgent.cs
trunk/Pithos.Core/Agents/NetworkAgent.cs
trunk/Pithos.Core/Agents/WorkflowAgent.cs
trunk/Pithos.Core/FileState.cs
trunk/Pithos.Core/Pithos.Core.csproj
trunk/Pithos.Core/TaskExtensions.cs [new file with mode: 0644]
trunk/Pithos.Network/CloudFilesClient.cs
trunk/Pithos.Network/Signature.cs
trunk/Pithos.Network/TreeHash.cs

index 8ac7a98..fc5f0e0 100644 (file)
@@ -99,16 +99,6 @@ namespace Pithos.Core
             }
         }
 
-/*
-        public void AddFromEnumerable(IEnumerable<TMessage> enumerable)
-        {
-            foreach (var message in enumerable)
-            {
-                Post(message);
-            }
-        }
-*/
-
         public IEnumerable<TMessage> GetEnumerable()
         {
             return _messages;
@@ -129,8 +119,26 @@ namespace Pithos.Core
                     if (onError != null)
                         onError(ex);
                 }
-            });
+            },CancellationToken);
+        }
 
+        public Task<T> LoopAsync<T>(Task<T> process, Action loop,Action<Exception> onError=null)
+        {
+            return process.ContinueWith(t =>
+            {   
+                //Spawn the Loop immediatelly
+                Task.Factory.StartNew(loop,CancellationToken);
+                //Then process possible exceptions
+                if (t.IsFaulted)
+                {
+                    var ex = t.Exception.InnerException;
+                    if (ex is OperationCanceledException)
+                        Stop();
+                    if (onError != null)
+                        onError(ex);
+                }
+                return default(T);
+            },CancellationToken);
         }
     }
 }
diff --git a/trunk/Pithos.Core/Agents/BlockUpdater.cs b/trunk/Pithos.Core/Agents/BlockUpdater.cs
new file mode 100644 (file)
index 0000000..7548be5
--- /dev/null
@@ -0,0 +1,157 @@
+using System;
+using System.Collections.Concurrent;
+using System.Collections.Generic;
+using System.Diagnostics.Contracts;
+using System.IO;
+using System.Linq;
+using System.Text;
+using System.Threading.Tasks;
+using Pithos.Network;
+
+namespace Pithos.Core.Agents
+{
+    class BlockUpdater
+    {
+        public string FilePath { get; private set; }
+        public string RelativePath { get; private set; }
+
+        public string FragmentsPath { get; private set; }
+
+        public TreeHash ServerHash { get; private set; }
+
+        public string TempPath { get; private set; }
+        
+
+        public BlockUpdater(string fragmentsPath, string filePath, string relativePath,TreeHash serverHash)
+        {            
+            FragmentsPath=fragmentsPath;
+            FilePath = filePath;
+            RelativePath=relativePath;
+            ServerHash = serverHash;
+
+            Start();
+        }
+
+        public void Start()
+        {
+            //The file will be stored in a temporary location while downloading with an extension .download
+            TempPath = Path.Combine(FragmentsPath, RelativePath + ".download");
+            var directoryPath = Path.GetDirectoryName(TempPath);
+            if (!Directory.Exists(directoryPath))
+                Directory.CreateDirectory(directoryPath);
+        }
+
+
+        public void Commit()
+        {                       
+            //Copy the file to a temporary location. Changes will be made to the
+            //temporary file, then it will replace the original file
+            File.Copy(FilePath, TempPath, true);
+
+            //Set the size of the file to the size specified in the treehash
+            //This will also create an empty file if the file doesn't exist            
+            SetFileSize(TempPath, ServerHash.Bytes);
+
+            //Update the temporary file with the data from the blocks
+            using (var stream = File.OpenWrite(TempPath))
+            {
+                foreach (var block in _blocks)
+                {
+                    var blockPath = block.Value;
+                    var blockIndex = block.Key;
+                    using (var blockStream = File.OpenRead(blockPath))
+                    {                        
+                        var offset = blockIndex*ServerHash.BlockSize;
+                        stream.Seek(offset, SeekOrigin.Begin);
+                        blockStream.CopyTo(stream);
+                    }
+                }
+            }
+            SwapFiles();
+
+            ClearBlocks();
+        }
+
+        private void SwapFiles()
+        {
+            if (File.Exists(FilePath))
+                File.Replace(TempPath, FilePath, null, true);
+            else
+                File.Move(TempPath, FilePath);
+        }
+
+        private void ClearBlocks()
+        {
+            foreach (var block in _blocks)
+            {
+                var filePath = block.Value;                
+                File.Delete(filePath);
+            }
+            File.Delete(TempPath);
+            _blocks.Clear();
+        }
+
+        //Change the file's size, possibly truncating or adding to it
+        private  void SetFileSize(string filePath, long fileSize)
+        {
+            if (String.IsNullOrWhiteSpace(filePath))
+                throw new ArgumentNullException("filePath");
+            if (!Path.IsPathRooted(filePath))
+                throw new ArgumentException("The filePath must be rooted", "filePath");
+            if (fileSize < 0)
+                throw new ArgumentOutOfRangeException("fileSize");
+            Contract.EndContractBlock();
+
+            using (var stream = File.Open(filePath, FileMode.OpenOrCreate, FileAccess.Write))
+            {
+                stream.SetLength(fileSize);
+            }
+        }
+
+       /* //Check whether we should copy the local file to a temp path        
+        private  bool ShouldCopy(string localPath, string tempPath)
+        {
+            //No need to copy if there is no file
+            if (!File.Exists(localPath))
+                return false;
+
+            //If there is no temp file, go ahead and copy
+            if (!File.Exists(tempPath))
+                return true;
+
+            //If there is a temp file and is newer than the actual file, don't copy
+            var localLastWrite = File.GetLastWriteTime(localPath);
+            var tempLastWrite = File.GetLastWriteTime(tempPath);
+
+            //This could mean there is an interrupted download in progress
+            return (tempLastWrite < localLastWrite);
+        }*/
+
+        ConcurrentDictionary<int,string> _blocks=new ConcurrentDictionary<int, string>();
+
+        public Task StoreBlock(int blockIndex,byte[] buffer)
+        {
+            var blockPath = String.Format("{0}.{1:3}", TempPath, blockIndex);
+            _blocks[blockIndex] = blockPath;
+            
+            return FileAsync.WriteAllBytes(blockPath, buffer);
+        }
+
+        private Task WriteAsync(string filePath, byte[] buffer, int offset, int count)
+        {
+            var stream = FileAsync.OpenWrite(filePath);
+            try
+            {
+                stream.Seek(offset, SeekOrigin.Begin);
+                var write = stream.WriteAsync(buffer, 0, count);
+                return write.ContinueWith(s => stream.Close());
+            }
+            catch (Exception ex)
+            {
+                stream.Close();
+                return Task.Factory.FromException(ex);
+            }
+        }
+
+    }
+}
index f73a33f..3eacd5d 100644 (file)
@@ -6,6 +6,7 @@ using System.Diagnostics.Contracts;
 using System.IO;
 using System.Linq;
 using System.Text;
+using System.Threading.Tasks;
 using Pithos.Interfaces;
 using Pithos.Network;
 
@@ -43,30 +44,57 @@ namespace Pithos.Core.Agents
                 loop = () =>
                 {
                     var message = inbox.Receive();
-                    var process = message.ContinueWith(t =>
-                    {
-                        var state = t.Result;
-                        Process(state);
-                        inbox.DoAsync(loop);
-                    });
-
-                    process.ContinueWith(t =>
-                    {
-                        inbox.DoAsync(loop);
-                        if (t.IsFaulted)
-                        {
-                            var ex = t.Exception.InnerException;
-                            if (ex is OperationCanceledException)
-                                inbox.Stop();
-                            Trace.TraceError("[ERROR] File Event Processing:\r{0}", ex);
-                        }
-                    });
+                    var process=message.Then(Process,inbox.CancellationToken);
 
+                    inbox.LoopAsync(process,loop,ex=>
+                        Trace.TraceError("[ERROR] File Event Processing:\r{0}", ex));
                 };
                 loop();
             });
         }
 
+        private Task<object> Process(WorkflowState state)
+        {
+            Debug.Assert(!Ignore(state.Path));
+
+            var networkState = NetworkGate.GetNetworkState(state.Path);
+            //Skip if the file is already being downloaded or uploaded and 
+            //the change is create or modify
+            if (networkState != NetworkOperation.None &&
+                (
+                    state.TriggeringChange == WatcherChangeTypes.Created ||
+                    state.TriggeringChange == WatcherChangeTypes.Changed
+                ))
+                return CompletedTask<object>.Default;
+
+            try
+            {
+                UpdateFileStatus(state);
+                UpdateOverlayStatus(state);
+                UpdateFileChecksum(state);
+                WorkflowAgent.Post(state);
+            }
+            catch (IOException exc)
+            {
+                Trace.TraceWarning("File access error occured, retrying {0}\n{1}", state.Path, exc);
+                _agent.Post(state);
+            }
+            catch (Exception exc)
+            {
+                Trace.TraceWarning("Error occured while indexing{0. The file will be skipped}\n{1}", state.Path, exc);
+            }
+            return CompletedTask<object>.Default;
+        }
+
+
+/*
+        private Task Process(Task<WorkflowState> action)
+        {
+            return action.ContinueWith(t => Process(t.Result));
+        }
+*/
+
+
         public bool Pause
         {
             get { return _watcher == null || !_watcher.EnableRaisingEvents; }
@@ -167,37 +195,6 @@ namespace Pithos.Core.Agents
         }
 
 
-        private void Process(WorkflowState state)
-        {
-            Debug.Assert(!Ignore(state.Path));            
-
-            var networkState = NetworkGate.GetNetworkState(state.Path);
-            //Skip if the file is already being downloaded or uploaded and 
-            //the change is create or modify
-            if (networkState != NetworkOperation.None &&
-                (
-                    state.TriggeringChange == WatcherChangeTypes.Created ||
-                    state.TriggeringChange == WatcherChangeTypes.Changed
-                ))
-                return;
-
-            try
-            {
-                UpdateFileStatus(state);
-                UpdateOverlayStatus(state);
-                UpdateFileChecksum(state);
-                WorkflowAgent.Post(state);
-            }
-            catch (IOException exc)
-            {
-                Trace.TraceWarning("File access error occured, retrying {0}\n{1}", state.Path, exc);
-                _agent.Post(state);
-            }
-            catch (Exception exc)
-            {
-                Trace.TraceWarning("Error occured while indexing{0. The file will be skipped}\n{1}", state.Path, exc);                
-            }
-        }
 
         private Dictionary<WatcherChangeTypes, FileStatus> _statusDict = new Dictionary<WatcherChangeTypes, FileStatus>
         {
@@ -295,7 +292,7 @@ namespace Pithos.Core.Agents
             if (File.Exists(absolutePath))
                 return true;
             //Or a directory?
-            if (Directory.Exists(RootPath))
+            if (Directory.Exists(absolutePath))
                 return true;
             //Fail if it is neither
             return false;
index b8c9dcc..699daa1 100644 (file)
@@ -61,50 +61,112 @@ namespace Pithos.Core.Agents
                 loop = () =>
                 {
                     var message = inbox.Receive();
-
-/*
-                    var process=Process(message);
+                    var process=message.Then(Process,inbox.CancellationToken);
                     inbox.LoopAsync(process, loop);
-*/
-
-/*
-                    process1.ContinueWith(t =>
-                    {
-                        inbox.DoAsync(loop);
-                        if (t.IsFaulted)
-                        {
-                            var ex = t.Exception.InnerException;
-                            if (ex is OperationCanceledException)
-                                inbox.Stop();
-                        }
-                    });
-*/
-                    //inbox.DoAsync(loop);
+                };
+                loop();
+            });
+        }
 
+        private Task<object> Process(CloudAction action)
+        {
+            if (action == null)
+                throw new ArgumentNullException("action");
+            Contract.EndContractBlock();
 
-                    var process = message.ContinueWith(t =>
-                    {
-                        var action = t.Result;
-                        
-                        Process(action);
-                        inbox.DoAsync(loop);
-                    });
+            Trace.TraceInformation("[ACTION] Start Processing {0}:{1}->{2}", action.Action, action.LocalFile, action.CloudFile.Name);
+            var localFile = action.LocalFile;
+            var cloudFile = action.CloudFile;
+            var downloadPath = (cloudFile == null) ? String.Empty
+                                    : Path.Combine(FileAgent.RootPath, cloudFile.Name.RelativeUrlToFilePath());
 
-                    process.ContinueWith(t =>
-                    {
-                        inbox.DoAsync(loop);
-                        if (t.IsFaulted)
+            try
+            {
+                switch (action.Action)
+                {
+                    case CloudActionType.UploadUnconditional:
+                        UploadCloudFile(localFile, action.LocalHash.Value, action.TopHash.Value);
+                        break;
+                    case CloudActionType.DownloadUnconditional:
+                        DownloadCloudFile(PithosContainer, new Uri(cloudFile.Name, UriKind.Relative), downloadPath);
+                        break;
+                    case CloudActionType.DeleteCloud:
+                        DeleteCloudFile(cloudFile.Name);
+                        break;
+                    case CloudActionType.RenameCloud:
+                        RenameCloudFile(action.OldFileName, action.NewPath, action.NewFileName);
+                        break;
+                    case CloudActionType.MustSynch:
+                        if (File.Exists(downloadPath))
                         {
-                            var ex = t.Exception.InnerException;
-                            if (ex is OperationCanceledException)
-                                inbox.Stop();
-                        }
-                    });
+                            var cloudHash = cloudFile.Hash;
+                            var localHash = action.LocalHash.Value;
+                            var topHash = action.TopHash.Value;
+                            //Not enough to compare only the local hashes, also have to compare the tophashes
+                            if (!cloudHash.Equals(localHash, StringComparison.InvariantCultureIgnoreCase) &&
+                                !cloudHash.Equals(topHash, StringComparison.InvariantCultureIgnoreCase))
+                            {
+                                var lastLocalTime = localFile.LastWriteTime;
+                                var lastUpTime = cloudFile.Last_Modified;
+                                if (lastUpTime <= lastLocalTime)
+                                {
+                                    //Local change while the app was down or Files in conflict
+                                    //Maybe need to store version as well, to check who has the latest version
 
+                                    //StatusKeeper.SetFileOverlayStatus(downloadPath, FileOverlayStatus.Conflict);
+                                    UploadCloudFile(localFile, action.LocalHash.Value, action.TopHash.Value);
+                                }
+                                else
+                                {
+                                    var status = StatusKeeper.GetFileStatus(downloadPath);
+                                    switch (status)
+                                    {
+                                        case FileStatus.Unchanged:
+                                            //It he cloud file has a later date, it was modified by another user or computer.
+                                            //If the local file's status is Unchanged, we should go on and download the cloud file
+                                            DownloadCloudFile(PithosContainer, new Uri(action.CloudFile.Name, UriKind.Relative), downloadPath);
+                                            break;
+                                        case FileStatus.Modified:
+                                            //If the local file is Modified, we may have a conflict. In this case we should mark the file as Conflict
+                                            //We can't ensure that a file modified online since the last time will appear as Modified, unless we 
+                                            //index all files before we start listening.
+                                            StatusKeeper.SetFileOverlayStatus(downloadPath, FileOverlayStatus.Conflict);
+                                            break;
+                                        case FileStatus.Created:
+                                            //If the local file is Created, it means that the local and cloud files aren't related yet have the same name
+                                            //In this case we must mark the file as in conflict
+                                            //Other cases should never occur. Mark them as Conflict as well but log a warning
+                                            StatusKeeper.SetFileOverlayStatus(downloadPath, FileOverlayStatus.Conflict);
+                                            break;
+                                        default:
+                                            //If the local file is Created, it means that the local and cloud files aren't related yet have the same name
+                                            //In this case we must mark the file as in conflict
+                                            //Other cases should never occur. Mark them as Conflict as well but log a warning
+                                            StatusKeeper.SetFileOverlayStatus(downloadPath, FileOverlayStatus.Conflict);
+                                            Trace.TraceWarning("Unexcepted status {0} for file {1}->{2}", status, downloadPath, action.CloudFile.Name);
+                                            break;
+                                    }
+                                }
+                            }
+                        }
+                        else
+                            DownloadCloudFile(PithosContainer, new Uri(action.CloudFile.Name, UriKind.Relative), downloadPath);
+                        break;
+                }
+                Trace.TraceInformation("[ACTION] End Processing {0}:{1}->{2}", action.Action, action.LocalFile, action.CloudFile.Name);
+            }
+            catch (OperationCanceledException)
+            {
+                throw;
+            }
+            catch (Exception exc)
+            {
+                Trace.TraceError("[REQUEUE] {0} : {1} -> {2} due to exception\r\n{3}",
+                                action.Action, action.LocalFile, action.CloudFile, exc);
 
-                };
-                loop();
-            });
+                _agent.Post(action);
+            }
+            return CompletedTask<object>.Default;
         }
 
 
@@ -138,15 +200,25 @@ namespace Pithos.Core.Agents
             }
         }
 
+        //Remote files are polled periodically. Any changes are processed
         public Task ProcessRemoteFiles(string accountPath,DateTime? since=null)
-        {            
+        {   
+            if (String.IsNullOrWhiteSpace(accountPath))
+                throw new ArgumentNullException(accountPath);
+            Contract.EndContractBlock();
 
             Trace.CorrelationManager.StartLogicalOperation();
             Trace.TraceInformation("[LISTENER] Scheduled");
-            var listObjects = Task.Factory.StartNewDelayed(10000).ContinueWith(t =>
-                CloudClient.ListObjects(PithosContainer,since));
+            
+            //Get the list of server objects changed since the last check
+            var listObjects = Task<IList<ObjectInfo>>.Factory.StartNewDelayed(10000,()=>
+                            CloudClient.ListObjects(PithosContainer,since));
 
+            //Next time we will check for all changes since the current check minus 1 second
+            //This is done to ensure there are no discrepancies due to clock differences
             DateTime nextSince = DateTime.Now.AddSeconds(-1);
+            
+            
 
             var enqueueFiles = listObjects.ContinueWith(task =>
             {
@@ -244,114 +316,7 @@ namespace Pithos.Core.Agents
         }
 
         
-        private Task Process(Task<CloudAction> action)
-        {
-            return action.ContinueWith(t=> Process(t.Result));
-        }
-
-
-        private void Process(CloudAction action)
-        {
-            if (action==null)
-                throw new ArgumentNullException("action");
-            Contract.EndContractBlock();
-
-            Trace.TraceInformation("[ACTION] Start Processing {0}:{1}->{2}", action.Action, action.LocalFile, action.CloudFile.Name);
-            var localFile = action.LocalFile;
-            var cloudFile = action.CloudFile;
-            var downloadPath = (cloudFile == null) ? String.Empty
-                                    : Path.Combine(FileAgent.RootPath, cloudFile.Name.RelativeUrlToFilePath());
-            
-            try
-            {
-                switch (action.Action)
-                {
-                    case CloudActionType.UploadUnconditional:
-                        UploadCloudFile(localFile, action.LocalHash.Value,action.TopHash.Value);
-                        break;
-                    case CloudActionType.DownloadUnconditional:
-                        DownloadCloudFile(PithosContainer, new Uri(cloudFile.Name,UriKind.Relative), downloadPath);
-                        break;
-                    case CloudActionType.DeleteCloud:
-                        DeleteCloudFile(cloudFile.Name);
-                        break;
-                    case CloudActionType.RenameCloud:
-                        RenameCloudFile(action.OldFileName, action.NewPath, action.NewFileName);
-                        break;
-                    case CloudActionType.MustSynch:
-                        if (File.Exists(downloadPath))
-                        {                            
-                            var cloudHash = cloudFile.Hash;
-                            var localHash = action.LocalHash.Value;
-                            var topHash = action.TopHash.Value;
-                            //Not enough to compare only the local hashes, also have to compare the tophashes
-                            if (!cloudHash.Equals(localHash, StringComparison.InvariantCultureIgnoreCase) &&
-                                !cloudHash.Equals(topHash, StringComparison.InvariantCultureIgnoreCase))
-                            {
-                                var lastLocalTime = localFile.LastWriteTime;
-                                var lastUpTime = cloudFile.Last_Modified;
-                                if (lastUpTime <= lastLocalTime)
-                                {
-                                    //Local change while the app was down or Files in conflict
-                                    //Maybe need to store version as well, to check who has the latest version
-
-                                    //StatusKeeper.SetFileOverlayStatus(downloadPath, FileOverlayStatus.Conflict);
-                                    UploadCloudFile(localFile, action.LocalHash.Value,action.TopHash.Value);
-                                }
-                                else
-                                {
-                                    var status = StatusKeeper.GetFileStatus(downloadPath);
-                                    switch (status)
-                                    {
-                                        case FileStatus.Unchanged:
-                                            //It he cloud file has a later date, it was modified by another user or computer.
-                                            //If the local file's status is Unchanged, we should go on and download the cloud file
-                                            DownloadCloudFile(PithosContainer, new Uri(action.CloudFile.Name,UriKind.Relative), downloadPath);
-                                            break;
-                                        case FileStatus.Modified:
-                                            //If the local file is Modified, we may have a conflict. In this case we should mark the file as Conflict
-                                            //We can't ensure that a file modified online since the last time will appear as Modified, unless we 
-                                            //index all files before we start listening.
-                                            StatusKeeper.SetFileOverlayStatus(downloadPath, FileOverlayStatus.Conflict);
-                                            break;
-                                        case FileStatus.Created:
-                                            //If the local file is Created, it means that the local and cloud files aren't related yet have the same name
-                                            //In this case we must mark the file as in conflict
-                                            //Other cases should never occur. Mark them as Conflict as well but log a warning
-                                            StatusKeeper.SetFileOverlayStatus(downloadPath,FileOverlayStatus.Conflict);
-                                            break;
-                                        default:
-                                            //If the local file is Created, it means that the local and cloud files aren't related yet have the same name
-                                            //In this case we must mark the file as in conflict
-                                            //Other cases should never occur. Mark them as Conflict as well but log a warning
-                                            StatusKeeper.SetFileOverlayStatus(downloadPath,FileOverlayStatus.Conflict);
-                                            Trace.TraceWarning("Unexcepted status {0} for file {1}->{2}",status,downloadPath,action.CloudFile.Name);
-                                            break;
-                                    }
-                                }
-                            }
-                        }
-                        else
-                            DownloadCloudFile(PithosContainer, new Uri(action.CloudFile.Name,UriKind.Relative), downloadPath);
-                        break;
-                }
-                Trace.TraceInformation("[ACTION] End Processing {0}:{1}->{2}", action.Action, action.LocalFile, action.CloudFile.Name);
-            }
-            catch (OperationCanceledException)
-            {
-                throw;
-            }
-            catch (Exception exc)
-            {
-                Trace.TraceError("[REQUEUE] {0} : {1} -> {2} due to exception\r\n{3}",
-                                action.Action, action.LocalFile, action.CloudFile, exc);
-
-                _agent.Post(action);
-            }
-
-        }
-
-        
+       
 
         private void RenameCloudFile(string oldFileName, string newPath, string newFileName)
         {
@@ -489,28 +454,16 @@ namespace Pithos.Core.Agents
                 throw new ArgumentNullException("serverHash");
             Contract.EndContractBlock();
 
+            
             //Calculate the relative file path for the new file
             var relativePath = relativeUrl.RelativeUriToFilePath();
-            //The file will be stored in a temporary location while downloading with an extension .download
-            var tempPath = Path.Combine(FileAgent.FragmentsPath, relativePath + ".download");
-            var directoryPath = Path.GetDirectoryName(tempPath);
-            if (!Directory.Exists(directoryPath))
-                Directory.CreateDirectory(directoryPath);
-            
-            //If the local file exists we should make a copy of it to the
-            //fragments folder, unless a newer temp copy already exists, which
-            //means there is an interrupted download
-            if (ShouldCopy(localPath, tempPath))
-                File.Copy(localPath, tempPath, true);    
-
-            //Set the size of the file to the size specified in the treehash
-            //This will also create an empty file if the file doesn't exist            
-            SetFileSize(tempPath, serverHash.Bytes);
+            var blockUpdater = new BlockUpdater(FileAgent.FragmentsPath, localPath, relativePath, serverHash);
 
+            
             return Task.Factory.StartNew(() =>
             {
                 //Calculate the temp file's treehash
-                var treeHash = Signature.CalculateTreeHashAsync(tempPath, this.BlockSize,BlockHash).Result;
+                var treeHash = Signature.CalculateTreeHashAsync(localPath, this.BlockSize,BlockHash).Result;
                 
                 //And compare it with the server's hash
                 var upHashes = serverHash.GetHashesAsStrings();
@@ -527,70 +480,21 @@ namespace Pithos.Core.Agents
                             end= ((i + 1)*BlockSize) ;
                             
                         //Get its block
-                        var blockTask = CloudClient.GetBlock(container, relativeUrl,
-                                                            start, end);
+                        var block= CloudClient.GetBlock(container, relativeUrl,start, end);
+
+                        var store=block.Then(b => blockUpdater.StoreBlock(i, b));
+                        store.Wait();
 
-                        blockTask.ContinueWith(b =>
-                        {
-                            //And apply it to the temp file
-                            var buffer = b.Result;
-                            var stream =FileAsync.OpenWrite(tempPath);
-                            stream.Seek(start,SeekOrigin.Begin);
-                            return stream.WriteAsync(buffer, 0, buffer.Length)
-                                .ContinueWith(s => stream.Close());
-
-                        }).Unwrap()
-                        .Wait();
                         Trace.TraceInformation("[BLOCK GET] FINISH {0} of {1} for {2}", i, upHashes.Length, localPath);
                     }
-
                 }
                 
-
-                //Replace the existing file with the temp
-                if (File.Exists(localPath))
-                    File.Replace(tempPath, localPath, null, true);
-                else
-                    File.Move(tempPath, localPath);
+                blockUpdater.Commit();
                 Trace.TraceInformation("[BLOCK GET] COMPLETE {0}", localPath);
             });
         }
 
-        //Change the file's size, possibly truncating or adding to it
-        private static void SetFileSize(string filePath, long fileSize)
-        {
-            if (String.IsNullOrWhiteSpace(filePath))
-                throw new ArgumentNullException("filePath");
-            if (!Path.IsPathRooted(filePath))
-                throw new ArgumentException("The filePath must be rooted", "filePath");
-            if (fileSize<0)
-                throw new ArgumentOutOfRangeException("fileSize");
-            Contract.EndContractBlock();
-
-            using (var stream = File.Open(filePath, FileMode.OpenOrCreate, FileAccess.Write))
-            {
-                stream.SetLength(fileSize);
-            }
-        }
-
-        //Check whether we should copy the local file to a temp path        
-        private static bool ShouldCopy(string localPath, string tempPath)
-        {
-            //No need to copy if there is no file
-            if (!File.Exists(localPath))            
-                return false;
 
-            //If there is no temp file, go ahead and copy
-            if (!File.Exists(tempPath))                
-                return true;
-
-            //If there is a temp file and is newer than the actual file, don't copy
-            var localLastWrite = File.GetLastWriteTime(localPath);
-            var tempLastWrite = File.GetLastWriteTime(tempPath);
-            
-            //This could mean there is an interrupted download in progress
-            return (tempLastWrite < localLastWrite);
-        }
 
         private void UploadCloudFile(FileInfo fileInfo, string hash,string topHash)
         {
index d237958..52d8916 100644 (file)
@@ -6,6 +6,7 @@ using System.Diagnostics.Contracts;
 using System.IO;
 using System.Linq;
 using System.Text;
+using System.Threading.Tasks;
 using Pithos.Interfaces;
 
 namespace Pithos.Core.Agents
@@ -34,29 +35,49 @@ namespace Pithos.Core.Agents
                 loop = () =>
                 {
                     var message = inbox.Receive();
-                    var process = message.ContinueWith(t =>
-                    {
-                        var state = t.Result;
-                        Process(state);
-                        inbox.DoAsync(loop);
-                    });
-                    process.ContinueWith(t =>
-                    {
-                        inbox.DoAsync(loop);
-                        if (t.IsFaulted)
-                        {
-                            var ex = t.Exception.InnerException;
-                            if (ex is OperationCanceledException)
-                                inbox.Stop();
-                            Trace.TraceError("[ERROR] Synch for {0}:\r{1}", message.Result.FileName, ex);
-                        }
-                    });
-
+                    var process = message.Then(Process, inbox.CancellationToken);                        
+                    inbox.LoopAsync(process,loop,ex=>
+                            Trace.TraceError("[ERROR] Synch for {0}:\r{1}", message.Result.FileName, ex));
                 };
                 loop();
             });
         }
 
+        private Task<object> Process(WorkflowState state)
+        {
+            if (state.Skip)
+                return CompletedTask<object>.Default;
+            string path = state.Path.ToLower();
+            string fileName = Path.GetFileName(path);
+
+            //Bypass deleted files, unless the status is Deleted
+            if (!(File.Exists(path) || state.Status != FileStatus.Deleted))
+            {
+                state.Skip = true;
+                this.StatusKeeper.RemoveFileOverlayStatus(path);
+                return CompletedTask<object>.Default;
+            }
+            var fileState = FileState.FindByFilePath(path);
+            switch (state.Status)
+            {
+                case FileStatus.Created:
+                case FileStatus.Modified:
+                    var info = new FileInfo(path);
+                    NetworkAgent.Post(new CloudAction(CloudActionType.UploadUnconditional, info, ObjectInfo.Empty, fileState));
+                    break;
+                case FileStatus.Deleted:
+                    NetworkAgent.Post(new CloudAction(CloudActionType.DeleteCloud, null, new ObjectInfo { Name = fileName }, fileState));
+                    break;
+                case FileStatus.Renamed:
+                    NetworkAgent.Post(new CloudAction(CloudActionType.RenameCloud, state.OldFileName, state.OldPath, state.FileName, state.Path));
+                    break;
+            }
+
+            return CompletedTask<object>.Default;
+        }
+
+
+
         public void RestartInterruptedFiles()
         {
             
@@ -95,39 +116,6 @@ namespace Pithos.Core.Agents
 
         }       
 
-        private void Process(WorkflowState state)
-        {
-            if (state.Skip)
-                return;
-            string path = state.Path.ToLower();
-            string fileName = Path.GetFileName(path);
-
-            //Bypass deleted files, unless the status is Deleted
-            if (!(File.Exists(path) || state.Status != FileStatus.Deleted))
-            {
-                state.Skip = true;
-                this.StatusKeeper.RemoveFileOverlayStatus(path);
-                return;
-            }
-            var fileState = FileState.FindByFilePath(path);
-            switch (state.Status)
-            {
-                case FileStatus.Created:
-                case FileStatus.Modified:
-                    var info = new FileInfo(path);
-                    NetworkAgent.Post(new CloudAction(CloudActionType.UploadUnconditional, info, ObjectInfo.Empty,fileState));
-                    break;
-                case FileStatus.Deleted:
-                    NetworkAgent.Post(new CloudAction(CloudActionType.DeleteCloud, null, new ObjectInfo {Name=fileName},fileState));                    
-                    break;
-                case FileStatus.Renamed:
-                    NetworkAgent.Post(new CloudAction(CloudActionType.RenameCloud, state.OldFileName,state.OldPath,state.FileName,state.Path));                    
-                    break;
-            }
-
-            return;
-        }
-
        
 
         public void Post(WorkflowState workflowState)
index f004d92..212b72e 100644 (file)
@@ -104,24 +104,24 @@ namespace Pithos.Core
             if (String.IsNullOrWhiteSpace(algorithm))
                 throw new ArgumentNullException("algorithm");
             Contract.EndContractBlock();
-
+            
             //Skip updating the hash for folders
             if (Directory.Exists(FilePath))
-                return Task.Factory.StartNew(() => this);
+                return Task.Factory.FromResult(this); 
 
             var results=Task.Factory.TrackedSequence(
                 () => Task.Factory.StartNew(() => Signature.CalculateMD5(FilePath)),
                 () => Signature.CalculateTreeHashAsync(FilePath, blockSize, algorithm)
-                );
+            );
 
-            results.ContinueWith(t =>
+            var state=results.Then(hashes =>
             {
-                var hashes = t.Result;
                 Checksum = (hashes[0] as Task<string>).Result;
-                TopHash = (hashes[0] as Task<TreeHash>).Result.TopHash.ToHashString();
+                TopHash = (hashes[1] as Task<TreeHash>).Result.TopHash.ToHashString();
+                return Task.Factory.FromResult(this);
             });
             
-            return results.ContinueWith(t => this);
+            return state;
         }
     }
 
index 960fb1d..6882c8a 100644 (file)
   </ItemGroup>
   <ItemGroup>
     <Compile Include="Agents\Agent.cs" />
+    <Compile Include="Agents\BlockUpdater.cs" />
     <Compile Include="Agents\CloudAction.cs" />
     <Compile Include="Agents\FileAgent.cs" />
     <Compile Include="Agents\FileInfoExtensions.cs" />
     <Compile Include="InMemStatusChecker.cs" />
     <Compile Include="StatusInfo.cs" />
     <Compile Include="StatusService.cs" />
+    <Compile Include="TaskExtensions.cs" />
     <Compile Include="WorkflowState.cs" />
   </ItemGroup>
   <ItemGroup>
diff --git a/trunk/Pithos.Core/TaskExtensions.cs b/trunk/Pithos.Core/TaskExtensions.cs
new file mode 100644 (file)
index 0000000..aa1a807
--- /dev/null
@@ -0,0 +1,81 @@
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+using System.Threading;
+using System.Threading.Tasks;
+
+namespace Pithos.Core
+{
+    static class TaskExtensions
+    {
+        public static Task<T2> Then<T1, T2>(this Task<T1> first, Func<T1, Task<T2>> next)
+        {
+            return Then(first, next, CancellationToken.None);
+        }
+
+        public static Task Then<T1>(this Task<T1> first, Func<T1, Task> next)
+        {
+            return Then(first, next, CancellationToken.None);
+        }
+        
+        public static Task<T2> Then<T1, T2>(this Task<T1> first, Func<T1, Task<T2>> next,CancellationToken cancellationToken)
+        {
+            if (first == null) throw new ArgumentNullException("first");
+            if (next == null) throw new ArgumentNullException("next");
+
+            var tcs = new TaskCompletionSource<T2>();                        
+            first.ContinueWith(delegate
+            {
+                if (first.IsFaulted) tcs.TrySetException(first.Exception.InnerExceptions);
+                else if (first.IsCanceled) tcs.TrySetCanceled();
+                else
+                {
+                    try
+                    {
+                        var t = next(first.Result);
+                        if (t == null) tcs.TrySetCanceled();
+                        else t.ContinueWith(delegate
+                        {
+                            if (t.IsFaulted) tcs.TrySetException(t.Exception.InnerExceptions);
+                            else if (t.IsCanceled) tcs.TrySetCanceled();
+                            else tcs.TrySetResult(t.Result);
+                        }, TaskContinuationOptions.ExecuteSynchronously);
+                    }
+                    catch (Exception exc) { tcs.TrySetException(exc); }
+                }
+            },cancellationToken, TaskContinuationOptions.ExecuteSynchronously,TaskScheduler.Current);
+            return tcs.Task;
+        }        
+
+        public static Task Then<T1>(this Task<T1> first, Func<T1, Task> next,CancellationToken cancellationToken)
+        {
+            if (first == null) throw new ArgumentNullException("first");
+            if (next == null) throw new ArgumentNullException("next");
+
+            var tcs = new TaskCompletionSource<object>();                        
+            first.ContinueWith(delegate
+            {
+                if (first.IsFaulted) tcs.TrySetException(first.Exception.InnerExceptions);
+                else if (first.IsCanceled) tcs.TrySetCanceled();
+                else
+                {
+                    try
+                    {
+                        var t = next(first.Result);
+                        if (t == null) tcs.TrySetCanceled();
+                        else t.ContinueWith(delegate
+                        {
+                            if (t.IsFaulted) tcs.TrySetException(t.Exception.InnerExceptions);
+                            else if (t.IsCanceled) tcs.TrySetCanceled();
+                            else tcs.TrySetResult(null);
+                        }, TaskContinuationOptions.ExecuteSynchronously);
+                    }
+                    catch (Exception exc) { tcs.TrySetException(exc); }
+                }
+            },cancellationToken, TaskContinuationOptions.ExecuteSynchronously,TaskScheduler.Current);
+            return tcs.Task;
+        }        
+
+    }
+}
index 6c5ba94..c7728e1 100644 (file)
@@ -587,51 +587,29 @@ namespace Pithos.Network
 
             Trace.TraceInformation("[BLOCK POST] START");
 
-            //Not much point to this as the server will timeout if we try to get the 
-            //hashmap too quickly.
-            var tcs = new TaskCompletionSource<bool>();
-
-            client.UploadProgressChanged += (sender, args) =>
-            {
+            client.UploadProgressChanged += (sender, args) => 
                 Trace.TraceInformation("[BLOCK POST PROGRESS] {0}% {1} of {2}",
-                                        args.ProgressPercentage, args.BytesSent,
-                                        args.TotalBytesToSend);
-                if (args.BytesSent == args.TotalBytesToSend)
-                    tcs.SetResult(false);                
-            };
+                                    args.ProgressPercentage, args.BytesSent,
+                                    args.TotalBytesToSend);
+            client.UploadFileCompleted += (sender, args) => 
+                Trace.TraceInformation("[BLOCK POST PROGRESS] Completed ");
 
-            client.UploadFileCompleted += (sender, args) =>
+            
+            //Send the block
+            var uploadTask = client.UploadDataTask(uri, "POST", block)
+            .ContinueWith(upload =>
             {
-                if (args.Error != null)
-                    tcs.SetException(args.Error);
-                else
-                {
-                    Trace.TraceInformation("[BLOCK POST PROGRESS] Completed ");
-                    tcs.TrySetResult(true);
-                }
-            };
-
-
+                client.Dispose();
 
-            client.UploadDataTask(uri, "POST", block);
-
-            //Send the block
-            var uploadTask = tcs.Task
-                .ContinueWith(upload =>
+                if (upload.IsFaulted)
                 {
-                    client.Dispose();
-
-                    if (upload.IsFaulted)
-                    {
-                        var exception = upload.Exception.InnerException;
-                        Trace.TraceError("[BLOCK POST] FAIL with \r{0}", exception);                        
-                        throw exception;
-                    }
-                    else
-                    {
-                        Trace.TraceInformation("[BLOCK POST] END");                        
-                    }
-                });
+                    var exception = upload.Exception.InnerException;
+                    Trace.TraceError("[BLOCK POST] FAIL with \r{0}", exception);                        
+                    throw exception;
+                }
+                    
+                Trace.TraceInformation("[BLOCK POST] END");
+            });
             return uploadTask;            
         }
 
index bd77ad6..a0a82e7 100644 (file)
@@ -154,6 +154,9 @@ namespace Pithos.Network
             //DON'T calculate hashes for folders
             if (Directory.Exists(filePath))
                 return Task.Factory.StartNew(()=>new TreeHash(algorithm));
+            //The hash of a non-existent file is the empty hash
+            if (!File.Exists(filePath))
+                return Task.Factory.StartNew(()=>new TreeHash(algorithm));
 
             //Calculate the hash of all blocks using a blockhash iterator
             var treeHash =Iterate<TreeHash>(BlockHashIterator(filePath, blockSize, algorithm));
index a49fa14..2f036b1 100644 (file)
@@ -86,6 +86,8 @@ namespace Pithos.Network
                 {
                     var blocks =
                         new ConcurrentDictionary<string, int>();
+                    if (Hashes == null)
+                        return blocks;
 
                     var blockIndex = 0;
                     foreach (var hash in this.Hashes)