Revision 437abfca trunk/Pithos.Core/Agents/NetworkAgent.cs

b/trunk/Pithos.Core/Agents/NetworkAgent.cs
782 782
        }
783 783

  
784 784

  
785
        private void UploadCloudFile(CloudAction action)
785
        private async void UploadCloudFile(CloudAction action)
786 786
        {
787 787
            if (action == null)
788 788
                throw new ArgumentNullException("action");           
......
790 790

  
791 791
            try
792 792
            {
793
                var upload = Task.Factory.Iterate(UploadIterator(action));
794
                upload.Wait();
795
            }                
796
            catch (AggregateException ex)
797
            {                
798
                var exc = ex.InnerException as WebException;
799
                if (exc==null)
800
                    throw ex.InnerException;
801
                var response = exc.Response as HttpWebResponse;
802
                if (response==null)
803
                    throw exc;
804
                if (response.StatusCode == HttpStatusCode.Unauthorized)
805
                {
806
                    Log.Error("Not allowed to upload file", exc);
807
                    var message = String.Format("Not allowed to uplad file {0}",action.LocalFile.FullName);
808
                    StatusKeeper.SetFileState(action.LocalFile.FullName,FileStatus.Unchanged,FileOverlayStatus.Normal);
809
                    StatusNotification.NotifyChange(message,TraceLevel.Warning);
810
                    return;
811
                }
812
                throw;
813
            }
814
        }
793
                if (action == null)
794
                    throw new ArgumentNullException("action");            
795
                Contract.EndContractBlock();
815 796

  
816
        private IEnumerable<Task> UploadIterator(CloudAction action)
817
        {
818
            if (action == null)
819
                throw new ArgumentNullException("action");            
820
            Contract.EndContractBlock();
821

  
822
            var accountInfo=action.AccountInfo;
797
                var accountInfo=action.AccountInfo;
823 798
            
824
            var fileInfo=action.LocalFile;                        
799
                var fileInfo=action.LocalFile;                        
825 800

  
826
            if (fileInfo.Extension.Equals("ignore", StringComparison.InvariantCultureIgnoreCase))
827
                yield break;
801
                if (fileInfo.Extension.Equals("ignore", StringComparison.InvariantCultureIgnoreCase))
802
                    return;
828 803

  
829
            var relativePath = fileInfo.AsRelativeTo(accountInfo.AccountPath);
830
            if (relativePath.StartsWith(FolderConstants.OthersFolder))
831
            {
832
                var parts=relativePath.Split('\\');
833
                var accountName = parts[1];
834
                var oldName = accountInfo.UserName;
835
                var absoluteUri = accountInfo.StorageUri.AbsoluteUri;
836
                var nameIndex=absoluteUri.IndexOf(oldName);
837
                var root=absoluteUri.Substring(0, nameIndex);
838

  
839
                accountInfo = new AccountInfo
804
                var relativePath = fileInfo.AsRelativeTo(accountInfo.AccountPath);
805
                if (relativePath.StartsWith(FolderConstants.OthersFolder))
840 806
                {
841
                    UserName = accountName,
842
                    AccountPath = Path.Combine(accountInfo.AccountPath, parts[0], parts[1]),
843
                    StorageUri = new Uri(root + accountName),
844
                    BlockHash=accountInfo.BlockHash,
845
                    BlockSize=accountInfo.BlockSize,
846
                    Token=accountInfo.Token
847
                };
848
            }
807
                    var parts=relativePath.Split('\\');
808
                    var accountName = parts[1];
809
                    var oldName = accountInfo.UserName;
810
                    var absoluteUri = accountInfo.StorageUri.AbsoluteUri;
811
                    var nameIndex=absoluteUri.IndexOf(oldName);
812
                    var root=absoluteUri.Substring(0, nameIndex);
813

  
814
                    accountInfo = new AccountInfo
815
                    {
816
                        UserName = accountName,
817
                        AccountPath = Path.Combine(accountInfo.AccountPath, parts[0], parts[1]),
818
                        StorageUri = new Uri(root + accountName),
819
                        BlockHash=accountInfo.BlockHash,
820
                        BlockSize=accountInfo.BlockSize,
821
                        Token=accountInfo.Token
822
                    };
823
                }
849 824

  
850 825

  
851
            var fullFileName = fileInfo.FullName;
852
            using(var gate=NetworkGate.Acquire(fullFileName,NetworkOperation.Uploading))
853
            {
854
                //Abort if the file is already being uploaded or downloaded
855
                if (gate.Failed)
856
                    yield break;
826
                var fullFileName = fileInfo.FullName;
827
                using(var gate=NetworkGate.Acquire(fullFileName,NetworkOperation.Uploading))
828
                {
829
                    //Abort if the file is already being uploaded or downloaded
830
                    if (gate.Failed)
831
                        return;
857 832

  
858
                var cloudFile = action.CloudFile;
859
                var account = cloudFile.Account ?? accountInfo.UserName;
833
                    var cloudFile = action.CloudFile;
834
                    var account = cloudFile.Account ?? accountInfo.UserName;
860 835

  
861
                var client = new CloudFilesClient(accountInfo);
862
                //Even if GetObjectInfo times out, we can proceed with the upload            
863
                var info = client.GetObjectInfo(account, cloudFile.Container, cloudFile.Name);                
864
                var cloudHash = info.Hash.ToLower();
836
                    var client = new CloudFilesClient(accountInfo);
837
                    //Even if GetObjectInfo times out, we can proceed with the upload            
838
                    var info = client.GetObjectInfo(account, cloudFile.Container, cloudFile.Name);                
839
                    var cloudHash = info.Hash.ToLower();
865 840

  
866
                var hash = action.LocalHash.Value;
867
                var topHash = action.TopHash.Value;
841
                    var hash = action.LocalHash.Value;
842
                    var topHash = action.TopHash.Value;
868 843

  
869
                //If the file hashes match, abort the upload
870
                if (hash == cloudHash  || topHash ==cloudHash)
871
                {
872
                    //but store any metadata changes 
873
                    this.StatusKeeper.StoreInfo(fullFileName, info);
874
                    Log.InfoFormat("Skip upload of {0}, hashes match", fullFileName);
875
                    yield break;
876
                }
844
                    //If the file hashes match, abort the upload
845
                    if (hash == cloudHash  || topHash ==cloudHash)
846
                    {
847
                        //but store any metadata changes 
848
                        this.StatusKeeper.StoreInfo(fullFileName, info);
849
                        Log.InfoFormat("Skip upload of {0}, hashes match", fullFileName);
850
                        return;
851
                    }
877 852

  
878
                if (info.AllowedTo=="read")
879
                    yield break;
853
                    if (info.AllowedTo=="read")
854
                        return;
880 855

  
881
                //Mark the file as modified while we upload it
882
                StatusKeeper.SetFileOverlayStatus(fullFileName, FileOverlayStatus.Modified);
883
                //And then upload it
856
                    //Mark the file as modified while we upload it
857
                    StatusKeeper.SetFileOverlayStatus(fullFileName, FileOverlayStatus.Modified);
858
                    //And then upload it
884 859

  
885
                //Upload even small files using the Hashmap. The server may already containt
886
                //the relevant folder
860
                    //Upload even small files using the Hashmap. The server may already containt
861
                    //the relevant folder
887 862

  
888
                //First, calculate the tree hash
889
                var treeHash = Signature.CalculateTreeHashAsync(fileInfo.FullName, accountInfo.BlockSize,
890
                    accountInfo.BlockHash);
891
                yield return treeHash;
863
                    //First, calculate the tree hash
864
                    var treeHash = await Signature.CalculateTreeHashAsync(fileInfo.FullName, accountInfo.BlockSize,
865
                        accountInfo.BlockHash);                
892 866
                    
893
                yield return Task.Factory.Iterate(UploadWithHashMap(accountInfo,cloudFile,fileInfo,cloudFile.Name,treeHash));
867
                    UploadWithHashMap(accountInfo,cloudFile,fileInfo,cloudFile.Name,treeHash);
894 868

  
895
                //If everything succeeds, change the file and overlay status to normal
896
                this.StatusKeeper.SetFileState(fullFileName, FileStatus.Unchanged, FileOverlayStatus.Normal);
869
                    //If everything succeeds, change the file and overlay status to normal
870
                    this.StatusKeeper.SetFileState(fullFileName, FileStatus.Unchanged, FileOverlayStatus.Normal);
871
                }
872
                //Notify the Shell to update the overlays
873
                NativeMethods.RaiseChangeNotification(fullFileName);
874
                StatusNotification.NotifyChangedFile(fullFileName);
897 875
            }
898
            //Notify the Shell to update the overlays
899
            NativeMethods.RaiseChangeNotification(fullFileName);
900
            StatusNotification.NotifyChangedFile(fullFileName);
876
            catch (AggregateException ex)
877
            {
878
                var exc = ex.InnerException as WebException;
879
                if (exc == null)
880
                    throw ex.InnerException;
881
                var response = exc.Response as HttpWebResponse;
882
                if (response == null)
883
                    throw exc;
884
                if (response.StatusCode == HttpStatusCode.Unauthorized)
885
                {
886
                    Log.Error("Not allowed to upload file", exc);
887
                    var message = String.Format("Not allowed to uplad file {0}", action.LocalFile.FullName);
888
                    StatusKeeper.SetFileState(action.LocalFile.FullName, FileStatus.Unchanged, FileOverlayStatus.Normal);
889
                    StatusNotification.NotifyChange(message, TraceLevel.Warning);
890
                    return;
891
                }
892
                throw;
893
            }
894

  
901 895
        }
902 896

  
903
        public IEnumerable<Task> UploadWithHashMap(AccountInfo accountInfo,ObjectInfo cloudFile,FileInfo fileInfo,string url,Task<TreeHash> treeHash)
897
        public async void UploadWithHashMap(AccountInfo accountInfo,ObjectInfo cloudFile,FileInfo fileInfo,string url,TreeHash treeHash)
904 898
        {
905 899
            if (accountInfo == null)
906 900
                throw new ArgumentNullException("accountInfo");
......
923 917

  
924 918
            var client = new CloudFilesClient(accountInfo);
925 919
            //Send the hashmap to the server            
926
            var hashPut = client.PutHashMap(account, container, url, treeHash.Result);
927
            yield return hashPut;
928

  
929
            var missingHashes = hashPut.Result;
920
            var missingHashes =  await client.PutHashMap(account, container, url, treeHash);
930 921
            //If the server returns no missing hashes, we are done
931 922
            while (missingHashes.Count > 0)
932 923
            {
......
935 926
                foreach (var missingHash in missingHashes)
936 927
                {
937 928
                    //Find the proper block
938
                    var blockIndex = treeHash.Result.HashDictionary[missingHash];
929
                    var blockIndex = treeHash.HashDictionary[missingHash];
939 930
                    var offset = blockIndex*accountInfo.BlockSize;
940 931

  
941 932
                    var read = fileInfo.Read(buffer, offset, accountInfo.BlockSize);
942 933

  
943
                    //And upload the block                
944
                    var postBlock = client.PostBlock(account, container, buffer, 0, read);
934
                    try
935
                    {
936
                        //And upload the block                
937
                        await client.PostBlock(account, container, buffer, 0, read);
938
                        Log.InfoFormat("[BLOCK] Block {0} of {1} uploaded", blockIndex, fullFileName);
939
                    }
940
                    catch (Exception exc)
941
                    {
942
                        Log.ErrorFormat("[ERROR] uploading block {0} of {1}\n{2}", blockIndex, fullFileName, exc);
943
                    }
945 944

  
946
                    //We have to handle possible exceptions in a continuation because
947
                    //*yield return* can't appear inside a try block
948
                    yield return postBlock.ContinueWith(t => 
949
                        t.ReportExceptions(
950
                            exc => Log.ErrorFormat("[ERROR] uploading block {0} of {1}\n{2}", blockIndex, fullFileName, exc),
951
                            ()=>Log.InfoFormat("[BLOCK] Block {0} of {1} uploaded", blockIndex,fullFileName)));
952 945
                }
953 946

  
954
                //Repeat until there are no more missing hashes
955
                hashPut = client.PutHashMap(account, container, url, treeHash.Result);
956
                yield return hashPut;
957
                missingHashes = hashPut.Result;
947
                //Repeat until there are no more missing hashes                
948
                missingHashes = await client.PutHashMap(account, container, url, treeHash);
958 949
            }
959 950
        }
960 951

  

Also available in: Unified diff