Revision 437abfca trunk/Pithos.Core/Agents/NetworkAgent.cs
b/trunk/Pithos.Core/Agents/NetworkAgent.cs | ||
---|---|---|
782 | 782 |
} |
783 | 783 |
|
784 | 784 |
|
785 |
private void UploadCloudFile(CloudAction action) |
|
785 |
private async void UploadCloudFile(CloudAction action)
|
|
786 | 786 |
{ |
787 | 787 |
if (action == null) |
788 | 788 |
throw new ArgumentNullException("action"); |
... | ... | |
790 | 790 |
|
791 | 791 |
try |
792 | 792 |
{ |
793 |
var upload = Task.Factory.Iterate(UploadIterator(action)); |
|
794 |
upload.Wait(); |
|
795 |
} |
|
796 |
catch (AggregateException ex) |
|
797 |
{ |
|
798 |
var exc = ex.InnerException as WebException; |
|
799 |
if (exc==null) |
|
800 |
throw ex.InnerException; |
|
801 |
var response = exc.Response as HttpWebResponse; |
|
802 |
if (response==null) |
|
803 |
throw exc; |
|
804 |
if (response.StatusCode == HttpStatusCode.Unauthorized) |
|
805 |
{ |
|
806 |
Log.Error("Not allowed to upload file", exc); |
|
807 |
var message = String.Format("Not allowed to uplad file {0}",action.LocalFile.FullName); |
|
808 |
StatusKeeper.SetFileState(action.LocalFile.FullName,FileStatus.Unchanged,FileOverlayStatus.Normal); |
|
809 |
StatusNotification.NotifyChange(message,TraceLevel.Warning); |
|
810 |
return; |
|
811 |
} |
|
812 |
throw; |
|
813 |
} |
|
814 |
} |
|
793 |
if (action == null) |
|
794 |
throw new ArgumentNullException("action"); |
|
795 |
Contract.EndContractBlock(); |
|
815 | 796 |
|
816 |
private IEnumerable<Task> UploadIterator(CloudAction action) |
|
817 |
{ |
|
818 |
if (action == null) |
|
819 |
throw new ArgumentNullException("action"); |
|
820 |
Contract.EndContractBlock(); |
|
821 |
|
|
822 |
var accountInfo=action.AccountInfo; |
|
797 |
var accountInfo=action.AccountInfo; |
|
823 | 798 |
|
824 |
var fileInfo=action.LocalFile; |
|
799 |
var fileInfo=action.LocalFile;
|
|
825 | 800 |
|
826 |
if (fileInfo.Extension.Equals("ignore", StringComparison.InvariantCultureIgnoreCase)) |
|
827 |
yield break;
|
|
801 |
if (fileInfo.Extension.Equals("ignore", StringComparison.InvariantCultureIgnoreCase))
|
|
802 |
return;
|
|
828 | 803 |
|
829 |
var relativePath = fileInfo.AsRelativeTo(accountInfo.AccountPath); |
|
830 |
if (relativePath.StartsWith(FolderConstants.OthersFolder)) |
|
831 |
{ |
|
832 |
var parts=relativePath.Split('\\'); |
|
833 |
var accountName = parts[1]; |
|
834 |
var oldName = accountInfo.UserName; |
|
835 |
var absoluteUri = accountInfo.StorageUri.AbsoluteUri; |
|
836 |
var nameIndex=absoluteUri.IndexOf(oldName); |
|
837 |
var root=absoluteUri.Substring(0, nameIndex); |
|
838 |
|
|
839 |
accountInfo = new AccountInfo |
|
804 |
var relativePath = fileInfo.AsRelativeTo(accountInfo.AccountPath); |
|
805 |
if (relativePath.StartsWith(FolderConstants.OthersFolder)) |
|
840 | 806 |
{ |
841 |
UserName = accountName, |
|
842 |
AccountPath = Path.Combine(accountInfo.AccountPath, parts[0], parts[1]), |
|
843 |
StorageUri = new Uri(root + accountName), |
|
844 |
BlockHash=accountInfo.BlockHash, |
|
845 |
BlockSize=accountInfo.BlockSize, |
|
846 |
Token=accountInfo.Token |
|
847 |
}; |
|
848 |
} |
|
807 |
var parts=relativePath.Split('\\'); |
|
808 |
var accountName = parts[1]; |
|
809 |
var oldName = accountInfo.UserName; |
|
810 |
var absoluteUri = accountInfo.StorageUri.AbsoluteUri; |
|
811 |
var nameIndex=absoluteUri.IndexOf(oldName); |
|
812 |
var root=absoluteUri.Substring(0, nameIndex); |
|
813 |
|
|
814 |
accountInfo = new AccountInfo |
|
815 |
{ |
|
816 |
UserName = accountName, |
|
817 |
AccountPath = Path.Combine(accountInfo.AccountPath, parts[0], parts[1]), |
|
818 |
StorageUri = new Uri(root + accountName), |
|
819 |
BlockHash=accountInfo.BlockHash, |
|
820 |
BlockSize=accountInfo.BlockSize, |
|
821 |
Token=accountInfo.Token |
|
822 |
}; |
|
823 |
} |
|
849 | 824 |
|
850 | 825 |
|
851 |
var fullFileName = fileInfo.FullName; |
|
852 |
using(var gate=NetworkGate.Acquire(fullFileName,NetworkOperation.Uploading)) |
|
853 |
{ |
|
854 |
//Abort if the file is already being uploaded or downloaded |
|
855 |
if (gate.Failed) |
|
856 |
yield break;
|
|
826 |
var fullFileName = fileInfo.FullName;
|
|
827 |
using(var gate=NetworkGate.Acquire(fullFileName,NetworkOperation.Uploading))
|
|
828 |
{
|
|
829 |
//Abort if the file is already being uploaded or downloaded
|
|
830 |
if (gate.Failed)
|
|
831 |
return;
|
|
857 | 832 |
|
858 |
var cloudFile = action.CloudFile; |
|
859 |
var account = cloudFile.Account ?? accountInfo.UserName; |
|
833 |
var cloudFile = action.CloudFile;
|
|
834 |
var account = cloudFile.Account ?? accountInfo.UserName;
|
|
860 | 835 |
|
861 |
var client = new CloudFilesClient(accountInfo); |
|
862 |
//Even if GetObjectInfo times out, we can proceed with the upload |
|
863 |
var info = client.GetObjectInfo(account, cloudFile.Container, cloudFile.Name); |
|
864 |
var cloudHash = info.Hash.ToLower(); |
|
836 |
var client = new CloudFilesClient(accountInfo);
|
|
837 |
//Even if GetObjectInfo times out, we can proceed with the upload
|
|
838 |
var info = client.GetObjectInfo(account, cloudFile.Container, cloudFile.Name);
|
|
839 |
var cloudHash = info.Hash.ToLower();
|
|
865 | 840 |
|
866 |
var hash = action.LocalHash.Value; |
|
867 |
var topHash = action.TopHash.Value; |
|
841 |
var hash = action.LocalHash.Value;
|
|
842 |
var topHash = action.TopHash.Value;
|
|
868 | 843 |
|
869 |
//If the file hashes match, abort the upload |
|
870 |
if (hash == cloudHash || topHash ==cloudHash) |
|
871 |
{ |
|
872 |
//but store any metadata changes |
|
873 |
this.StatusKeeper.StoreInfo(fullFileName, info); |
|
874 |
Log.InfoFormat("Skip upload of {0}, hashes match", fullFileName); |
|
875 |
yield break;
|
|
876 |
} |
|
844 |
//If the file hashes match, abort the upload
|
|
845 |
if (hash == cloudHash || topHash ==cloudHash)
|
|
846 |
{
|
|
847 |
//but store any metadata changes
|
|
848 |
this.StatusKeeper.StoreInfo(fullFileName, info);
|
|
849 |
Log.InfoFormat("Skip upload of {0}, hashes match", fullFileName);
|
|
850 |
return;
|
|
851 |
}
|
|
877 | 852 |
|
878 |
if (info.AllowedTo=="read") |
|
879 |
yield break;
|
|
853 |
if (info.AllowedTo=="read")
|
|
854 |
return;
|
|
880 | 855 |
|
881 |
//Mark the file as modified while we upload it |
|
882 |
StatusKeeper.SetFileOverlayStatus(fullFileName, FileOverlayStatus.Modified); |
|
883 |
//And then upload it |
|
856 |
//Mark the file as modified while we upload it
|
|
857 |
StatusKeeper.SetFileOverlayStatus(fullFileName, FileOverlayStatus.Modified);
|
|
858 |
//And then upload it
|
|
884 | 859 |
|
885 |
//Upload even small files using the Hashmap. The server may already containt |
|
886 |
//the relevant folder |
|
860 |
//Upload even small files using the Hashmap. The server may already containt
|
|
861 |
//the relevant folder
|
|
887 | 862 |
|
888 |
//First, calculate the tree hash |
|
889 |
var treeHash = Signature.CalculateTreeHashAsync(fileInfo.FullName, accountInfo.BlockSize, |
|
890 |
accountInfo.BlockHash); |
|
891 |
yield return treeHash; |
|
863 |
//First, calculate the tree hash |
|
864 |
var treeHash = await Signature.CalculateTreeHashAsync(fileInfo.FullName, accountInfo.BlockSize, |
|
865 |
accountInfo.BlockHash); |
|
892 | 866 |
|
893 |
yield return Task.Factory.Iterate(UploadWithHashMap(accountInfo,cloudFile,fileInfo,cloudFile.Name,treeHash));
|
|
867 |
UploadWithHashMap(accountInfo,cloudFile,fileInfo,cloudFile.Name,treeHash);
|
|
894 | 868 |
|
895 |
//If everything succeeds, change the file and overlay status to normal |
|
896 |
this.StatusKeeper.SetFileState(fullFileName, FileStatus.Unchanged, FileOverlayStatus.Normal); |
|
869 |
//If everything succeeds, change the file and overlay status to normal |
|
870 |
this.StatusKeeper.SetFileState(fullFileName, FileStatus.Unchanged, FileOverlayStatus.Normal); |
|
871 |
} |
|
872 |
//Notify the Shell to update the overlays |
|
873 |
NativeMethods.RaiseChangeNotification(fullFileName); |
|
874 |
StatusNotification.NotifyChangedFile(fullFileName); |
|
897 | 875 |
} |
898 |
//Notify the Shell to update the overlays |
|
899 |
NativeMethods.RaiseChangeNotification(fullFileName); |
|
900 |
StatusNotification.NotifyChangedFile(fullFileName); |
|
876 |
catch (AggregateException ex) |
|
877 |
{ |
|
878 |
var exc = ex.InnerException as WebException; |
|
879 |
if (exc == null) |
|
880 |
throw ex.InnerException; |
|
881 |
var response = exc.Response as HttpWebResponse; |
|
882 |
if (response == null) |
|
883 |
throw exc; |
|
884 |
if (response.StatusCode == HttpStatusCode.Unauthorized) |
|
885 |
{ |
|
886 |
Log.Error("Not allowed to upload file", exc); |
|
887 |
var message = String.Format("Not allowed to uplad file {0}", action.LocalFile.FullName); |
|
888 |
StatusKeeper.SetFileState(action.LocalFile.FullName, FileStatus.Unchanged, FileOverlayStatus.Normal); |
|
889 |
StatusNotification.NotifyChange(message, TraceLevel.Warning); |
|
890 |
return; |
|
891 |
} |
|
892 |
throw; |
|
893 |
} |
|
894 |
|
|
901 | 895 |
} |
902 | 896 |
|
903 |
public IEnumerable<Task> UploadWithHashMap(AccountInfo accountInfo,ObjectInfo cloudFile,FileInfo fileInfo,string url,Task<TreeHash> treeHash)
|
|
897 |
public async void UploadWithHashMap(AccountInfo accountInfo,ObjectInfo cloudFile,FileInfo fileInfo,string url,TreeHash treeHash)
|
|
904 | 898 |
{ |
905 | 899 |
if (accountInfo == null) |
906 | 900 |
throw new ArgumentNullException("accountInfo"); |
... | ... | |
923 | 917 |
|
924 | 918 |
var client = new CloudFilesClient(accountInfo); |
925 | 919 |
//Send the hashmap to the server |
926 |
var hashPut = client.PutHashMap(account, container, url, treeHash.Result); |
|
927 |
yield return hashPut; |
|
928 |
|
|
929 |
var missingHashes = hashPut.Result; |
|
920 |
var missingHashes = await client.PutHashMap(account, container, url, treeHash); |
|
930 | 921 |
//If the server returns no missing hashes, we are done |
931 | 922 |
while (missingHashes.Count > 0) |
932 | 923 |
{ |
... | ... | |
935 | 926 |
foreach (var missingHash in missingHashes) |
936 | 927 |
{ |
937 | 928 |
//Find the proper block |
938 |
var blockIndex = treeHash.Result.HashDictionary[missingHash];
|
|
929 |
var blockIndex = treeHash.HashDictionary[missingHash]; |
|
939 | 930 |
var offset = blockIndex*accountInfo.BlockSize; |
940 | 931 |
|
941 | 932 |
var read = fileInfo.Read(buffer, offset, accountInfo.BlockSize); |
942 | 933 |
|
943 |
//And upload the block |
|
944 |
var postBlock = client.PostBlock(account, container, buffer, 0, read); |
|
934 |
try |
|
935 |
{ |
|
936 |
//And upload the block |
|
937 |
await client.PostBlock(account, container, buffer, 0, read); |
|
938 |
Log.InfoFormat("[BLOCK] Block {0} of {1} uploaded", blockIndex, fullFileName); |
|
939 |
} |
|
940 |
catch (Exception exc) |
|
941 |
{ |
|
942 |
Log.ErrorFormat("[ERROR] uploading block {0} of {1}\n{2}", blockIndex, fullFileName, exc); |
|
943 |
} |
|
945 | 944 |
|
946 |
//We have to handle possible exceptions in a continuation because |
|
947 |
//*yield return* can't appear inside a try block |
|
948 |
yield return postBlock.ContinueWith(t => |
|
949 |
t.ReportExceptions( |
|
950 |
exc => Log.ErrorFormat("[ERROR] uploading block {0} of {1}\n{2}", blockIndex, fullFileName, exc), |
|
951 |
()=>Log.InfoFormat("[BLOCK] Block {0} of {1} uploaded", blockIndex,fullFileName))); |
|
952 | 945 |
} |
953 | 946 |
|
954 |
//Repeat until there are no more missing hashes |
|
955 |
hashPut = client.PutHashMap(account, container, url, treeHash.Result); |
|
956 |
yield return hashPut; |
|
957 |
missingHashes = hashPut.Result; |
|
947 |
//Repeat until there are no more missing hashes |
|
948 |
missingHashes = await client.PutHashMap(account, container, url, treeHash); |
|
958 | 949 |
} |
959 | 950 |
} |
960 | 951 |
|
Also available in: Unified diff