X-Git-Url: https://code.grnet.gr/git/pithos-ms-client/blobdiff_plain/cbefd298922d7c0b603d65c1dfbd1fda788250f3..f037bf1f2ddef7e40f8dbd115465bfb5ebd238e4:/trunk/Pithos.Core/Agents/Uploader.cs diff --git a/trunk/Pithos.Core/Agents/Uploader.cs b/trunk/Pithos.Core/Agents/Uploader.cs index 094d2f9..222fbb8 100644 --- a/trunk/Pithos.Core/Agents/Uploader.cs +++ b/trunk/Pithos.Core/Agents/Uploader.cs @@ -1,10 +1,8 @@ using System; -using System.Collections.Generic; using System.ComponentModel.Composition; using System.Diagnostics; using System.Diagnostics.Contracts; using System.IO; -using System.Linq; using System.Net; using System.Reflection; using System.Threading; @@ -23,7 +21,9 @@ namespace Pithos.Core.Agents [Import] private IStatusKeeper StatusKeeper { get; set; } - + [Import] + private IPithosSettings Settings { get; set; } + public IStatusNotification StatusNotification { get; set; } @@ -33,7 +33,7 @@ namespace Pithos.Core.Agents _cts.Cancel(); }*/ - public async Task UploadCloudFile(CloudAction action,CancellationToken cancellationToken) + public async Task UploadCloudFile(CloudUploadAction action,CancellationToken cancellationToken) { if (action == null) throw new ArgumentNullException("action"); @@ -43,40 +43,60 @@ namespace Pithos.Core.Agents { try { - await UnpauseEvent.WaitAsync(); + await UnpauseEvent.WaitAsync().ConfigureAwait(false); var fileInfo = action.LocalFile; if (fileInfo.Extension.Equals("ignore", StringComparison.InvariantCultureIgnoreCase)) return; - if (!Selectives.IsSelected(action.AccountInfo, fileInfo)) + if (!Selectives.IsSelected(action.AccountInfo, fileInfo) && !action.IsCreation) return; + //Try to load the action's local state, if it is empty if (action.FileState == null) action.FileState = StatusKeeper.GetStateByFilePath(fileInfo.FullName); - if (action.FileState == null) + + TreeHash localTreeHash; + using (StatusNotification.GetNotifier("Merkle Hashing for Upload {0}", "Merkle Hashed for Upload {0}", fileInfo.Name)) { - Log.WarnFormat("File [{0}] has no local state. It was probably created by a download action", fileInfo.FullName); - return; - } + //TODO: Load the stored treehash if appropriate + //TODO: WHO updates LastMD5? - var latestState = action.FileState; + var progress = new Progress(d => StatusNotification.Notify( + new StatusNotification(String.Format("Merkle Hashing for Upload {0:p} of {1}", d, fileInfo.Name)))); - //Do not upload files in conflict - if (latestState.FileStatus == FileStatus.Conflict) - { - Log.InfoFormat("Skipping file in conflict [{0}]", fileInfo.FullName); - return; + //If the action's Treehash is already calculated, use it instead of reprocessing + localTreeHash = action.TreeHash.IsValueCreated + ? action.TreeHash.Value + : StatusAgent.CalculateTreeHash(fileInfo, action.AccountInfo, action.FileState, Settings.HashingParallelism, cancellationToken, progress); } - //Do not upload files when we have no permission - if (latestState.FileStatus == FileStatus.Forbidden) + + + if (action.FileState != null) { - Log.InfoFormat("Skipping forbidden file [{0}]", fileInfo.FullName); - return; - } + /* + Log.WarnFormat("File [{0}] has no local state. It was probably created by a download action", fileInfo.FullName); + return; + */ + + var latestState = action.FileState; + + //Do not upload files in conflict + if (latestState.FileStatus == FileStatus.Conflict) + { + Log.InfoFormat("Skipping file in conflict [{0}]", fileInfo.FullName); + return; + } + //Do not upload files when we have no permission + if (latestState.FileStatus == FileStatus.Forbidden) + { + Log.InfoFormat("Skipping forbidden file [{0}]", fileInfo.FullName); + return; + } + } //Are we targeting our own account or a sharer account? var relativePath = fileInfo.AsRelativeTo(action.AccountInfo.AccountPath); var accountInfo = relativePath.StartsWith(FolderConstants.OthersFolder) @@ -105,68 +125,116 @@ namespace Pithos.Core.Agents //If this a shared file if (!cloudFile.Account.Equals(action.AccountInfo.UserName,StringComparison.InvariantCultureIgnoreCase)) { - //If this is a read-only file, do not upload changes + +/* if (!cloudInfo.IsWritable(action.AccountInfo.UserName)) { MakeFileReadOnly(fullFileName); + StatusKeeper.SetFileState(fullFileName, FileStatus.Unchanged, FileOverlayStatus.Normal, ""); return; } +*/ - //If the file is new, can we upload it? - if ( !cloudInfo.Exists && !client.CanUpload(account, cloudFile)) + //If this is a read-only file, do not upload changes + if ( !cloudInfo.IsWritable(action.AccountInfo.UserName) || + //If the file is new, but we can't upload it + (!cloudInfo.Exists && !client.CanUpload(account, cloudFile)) ) { MakeFileReadOnly(fullFileName); + StatusKeeper.SetFileState(fullFileName, FileStatus.Unchanged, FileOverlayStatus.Normal, ""); return; } } - await UnpauseEvent.WaitAsync(); + await UnpauseEvent.WaitAsync().ConfigureAwait(false); - if (fileInfo is DirectoryInfo) + fileInfo.Refresh(); + //Does the file still exist or was it deleted/renamed? + if (fileInfo.Exists) { - //If the directory doesn't exist the Hash property will be empty - if (String.IsNullOrWhiteSpace(cloudInfo.Hash)) - //Go on and create the directory - await client.PutObject(account, cloudFile.Container, cloudFile.Name, fullFileName, - String.Empty, "application/directory"); - } - else - { - - var cloudHash = cloudInfo.Hash.ToLower(); - - string topHash; - TreeHash treeHash; - using(StatusNotification.GetNotifier("Hashing {0} for Upload", "Finished hashing {0}",fileInfo.Name)) + if (fileInfo is DirectoryInfo) { - treeHash = action.TreeHash.Value; - topHash = treeHash.TopHash.ToHashString(); + //If the directory doesn't exist the Hash property will be empty + if (String.IsNullOrWhiteSpace(cloudInfo.X_Object_Hash)) + //Go on and create the directory + await + client.PutObject(account, cloudFile.Container, cloudFile.Name, fullFileName, + Signature.MERKLE_EMPTY, ObjectInfo.CONTENT_TYPE_DIRECTORY); + //If the upload is in response to a Folder create with Selective Sync enabled + if (action.IsCreation) + { + //Add the folder to the Selected URls + var selectiveUri = client.RootAddressUri.Combine(cloudFile.Uri); + Selectives.AddUri(accountInfo, selectiveUri); + Selectives.Save(accountInfo); + } } - - //If the file hashes match, abort the upload - if (cloudInfo != ObjectInfo.Empty && topHash == cloudHash) + else { - //but store any metadata changes - StatusKeeper.StoreInfo(fullFileName, cloudInfo); - Log.InfoFormat("Skip upload of {0}, hashes match", fullFileName); - return; - } + var cloudHash = cloudInfo.X_Object_Hash.ToLower(); - //Mark the file as modified while we upload it - StatusKeeper.SetFileOverlayStatus(fullFileName, FileOverlayStatus.Modified); - //And then upload it + string topHash; + TreeHash treeHash; + using ( + StatusNotification.GetNotifier("Hashing {0} for Upload", "Finished hashing {0}", + fileInfo.Name)) + { + treeHash = localTreeHash ?? action.TreeHash.Value; + topHash = treeHash.TopHash.ToHashString(); + } - //Upload even small files using the Hashmap. The server may already contain - //the relevant block - - await UploadWithHashMap(accountInfo, cloudFile, fileInfo as FileInfo, cloudFile.Name, treeHash,cancellationToken); + //If the file hashes match, abort the upload + if (cloudInfo != ObjectInfo.Empty && (topHash == cloudHash)) + { + //but store any metadata changes + StatusKeeper.StoreInfo(fullFileName, cloudInfo,treeHash); + Log.InfoFormat("Skip upload of {0}, hashes match", fullFileName); + return; + } + + + //Mark the file as modified while we upload it + StatusKeeper.SetFileOverlayStatus(fullFileName, FileOverlayStatus.Modified); + //And then upload it + + //Upload even small files using the Hashmap. The server may already contain + //the relevant block + + + + await + UploadWithHashMap(accountInfo, cloudFile, fileInfo as FileInfo, cloudFile.Name, + treeHash, cancellationToken).ConfigureAwait(false); + } + + var currentInfo = client.GetObjectInfo(cloudFile.Account, cloudFile.Container, + cloudFile.Name); + + StatusKeeper.StoreInfo(fullFileName, currentInfo, localTreeHash); + //Ensure the status is cleared + StatusKeeper.SetFileState(fullFileName, FileStatus.Unchanged, + FileOverlayStatus.Normal, ""); +/* + //If there is no stored ObjectID in the file state, add it + //TODO: Why not just update everything, then change the state? + if (action.FileState == null || action.FileState.ObjectID == null) + { + + } + else + //If everything succeeds, change the file and overlay status to normal + StatusKeeper.SetFileState(fullFileName, FileStatus.Unchanged, + FileOverlayStatus.Normal, ""); +*/ + } + else + { + StatusKeeper.ClearFileStatus(fullFileName); } - //If everything succeeds, change the file and overlay status to normal - StatusKeeper.SetFileState(fullFileName, FileStatus.Unchanged, FileOverlayStatus.Normal, ""); } catch (WebException exc) { @@ -210,13 +278,14 @@ namespace Pithos.Core.Agents } } + private static void MakeFileReadOnly(string fullFileName) { var attributes = File.GetAttributes(fullFileName); //Do not make any modifications if not necessary if (attributes.HasFlag(FileAttributes.ReadOnly)) return; - File.SetAttributes(fullFileName, attributes | FileAttributes.ReadOnly); + File.SetAttributes(fullFileName, attributes | FileAttributes.ReadOnly); } private static AccountInfo GetSharerAccount(string relativePath, AccountInfo accountInfo) @@ -241,7 +310,7 @@ namespace Pithos.Core.Agents } - public async Task UploadWithHashMap(AccountInfo accountInfo, ObjectInfo cloudFile, FileInfo fileInfo, string url, TreeHash treeHash, CancellationToken token) + public async Task UploadWithHashMap(AccountInfo accountInfo, ObjectInfo cloudFile, FileInfo fileInfo, Uri uri, TreeHash treeHash, CancellationToken token) { if (accountInfo == null) throw new ArgumentNullException("accountInfo"); @@ -249,89 +318,104 @@ namespace Pithos.Core.Agents throw new ArgumentNullException("cloudFile"); if (fileInfo == null) throw new ArgumentNullException("fileInfo"); - if (String.IsNullOrWhiteSpace(url)) - throw new ArgumentNullException(url); + if (uri==null) + throw new ArgumentNullException("uri"); if (treeHash == null) throw new ArgumentNullException("treeHash"); - if (String.IsNullOrWhiteSpace(cloudFile.Container)) + if (cloudFile.Container==null) throw new ArgumentException("Invalid container", "cloudFile"); + if (cloudFile.Container.IsAbsoluteUri) + throw new ArgumentException("Container URI must be relative", "cloudFile"); Contract.EndContractBlock(); - - using (StatusNotification.GetNotifier("Uploading {0}", "Finished Uploading {0}", fileInfo.Name)) - { - if (await WaitOrAbort(cloudFile, token)) - return; - var fullFileName = fileInfo.GetProperCapitalization(); + if (await WaitOrAbort(accountInfo, cloudFile, token).ConfigureAwait(false)) + return; - var account = cloudFile.Account ?? accountInfo.UserName; - var container = cloudFile.Container; + var fullFileName = fileInfo.GetProperCapitalization(); + var account = cloudFile.Account ?? accountInfo.UserName; + var container = cloudFile.Container; - var client = new CloudFilesClient(accountInfo); - //Send the hashmap to the server - var missingHashes = await client.PutHashMap(account, container, url, treeHash); - int block = 0; - ReportUploadProgress(fileInfo.Name, block++, missingHashes.Count, fileInfo.Length); - //If the server returns no missing hashes, we are done - while (missingHashes.Count > 0) - { + int block = 0; - if (await WaitOrAbort(cloudFile, token)) - return; + var client = new CloudFilesClient(accountInfo); + //Send the hashmap to the server + var missingHashes = await client.PutHashMap(account, container, uri, treeHash).ConfigureAwait(false); + ReportUploadProgress(fileInfo.Name, block, 0, missingHashes.Count, fileInfo.Length); + //If the server returns no missing hashes, we are done + client.UploadProgressChanged += (sender, args) => + ReportUploadProgress(fileInfo.Name, block, args.ProgressPercentage, + missingHashes.Count, fileInfo.Length); - var buffer = new byte[accountInfo.BlockSize]; - foreach (var missingHash in missingHashes) - { - if (await WaitOrAbort(cloudFile, token)) - return; + while (missingHashes.Count > 0) + { + block = 0; + + if (await WaitOrAbort(accountInfo, cloudFile, token).ConfigureAwait(false)) + return; - //Find the proper block - var blockIndex = treeHash.HashDictionary[missingHash]; - long offset = blockIndex*accountInfo.BlockSize; - var read = fileInfo.Read(buffer, offset, accountInfo.BlockSize); + var buffer = new byte[accountInfo.BlockSize]; + foreach (var missingHash in missingHashes) + { + if (await WaitOrAbort(accountInfo, cloudFile, token).ConfigureAwait(false)) + return; - try - { - //And upload the block - await client.PostBlock(account, container, buffer, 0, read); - Log.InfoFormat("[BLOCK] Block {0} of {1} uploaded", blockIndex, fullFileName); - } - catch (Exception exc) - { - Log.Error(String.Format("Uploading block {0} of {1}", blockIndex, fullFileName), exc); - } - ReportUploadProgress(fileInfo.Name, block++, missingHashes.Count, fileInfo.Length); - } - token.ThrowIfCancellationRequested(); - //Repeat until there are no more missing hashes - missingHashes = await client.PutHashMap(account, container, url, treeHash); + //Find the proper block + long blockIndex = treeHash.HashDictionary[missingHash]; + long offset = blockIndex*accountInfo.BlockSize; + Debug.Assert(offset >= 0, + String.Format("Negative Offset! BlockIndex {0} BlockSize {1}", blockIndex, + accountInfo.BlockSize)); + + var read = fileInfo.Read(buffer, offset, accountInfo.BlockSize); + + try + { + //And upload the block + await client.PostBlock(account, container, buffer, 0, read,missingHash, token).ConfigureAwait(false); + token.ThrowIfCancellationRequested(); + Log.InfoFormat("[BLOCK] Block {0} of {1} uploaded", blockIndex, fullFileName); + } + catch (TaskCanceledException) + { + throw new OperationCanceledException(token); + } + catch (Exception exc) + { + Log.Error(String.Format("Uploading block {0} of {1}", blockIndex, fullFileName), exc); + } + ReportUploadProgress(fileInfo.Name, block++, 100, missingHashes.Count, fileInfo.Length); } - ReportUploadProgress(fileInfo.Name, missingHashes.Count, missingHashes.Count, fileInfo.Length); + token.ThrowIfCancellationRequested(); + //Repeat until there are no more missing hashes + missingHashes = await client.PutHashMap(account, container, uri, treeHash).ConfigureAwait(false); } + + ReportUploadProgress(fileInfo.Name, missingHashes.Count, 0, missingHashes.Count, fileInfo.Length); + } - private async Task WaitOrAbort(ObjectInfo cloudFile, CancellationToken token) + private async Task WaitOrAbort(AccountInfo account,ObjectInfo cloudFile, CancellationToken token) { token.ThrowIfCancellationRequested(); - await UnpauseEvent.WaitAsync(); - var shouldAbort = !Selectives.IsSelected(cloudFile); + await UnpauseEvent.WaitAsync().ConfigureAwait(false); + var shouldAbort = !Selectives.IsSelected(account,cloudFile); if (shouldAbort) Log.InfoFormat("Aborting [{0}]",cloudFile.Uri); return shouldAbort; } - private void ReportUploadProgress(string fileName, int block, int totalBlocks, long fileSize) + private void ReportUploadProgress(string fileName, int block, int blockPercentage, int totalBlocks, long fileSize) { StatusNotification.Notify(totalBlocks == 0 - ? new ProgressNotification(fileName, "Uploading", 1, 1, fileSize) - : new ProgressNotification(fileName, "Uploading", block, totalBlocks, fileSize)); + ? new ProgressNotification(fileName, "Uploading", 1,blockPercentage, 1, fileSize) + : new ProgressNotification(fileName, "Uploading", block, blockPercentage, totalBlocks, fileSize)); }