2 using System.ComponentModel.Composition;
\r
3 using System.Diagnostics;
\r
4 using System.Diagnostics.Contracts;
\r
7 using System.Reflection;
\r
8 using System.Threading;
\r
9 using System.Threading.Tasks;
\r
10 using Pithos.Interfaces;
\r
11 using Pithos.Network;
\r
14 namespace Pithos.Core.Agents
\r
16 [Export(typeof(Uploader))]
\r
17 public class Uploader
\r
19 private static readonly ILog Log = LogManager.GetLogger(MethodBase.GetCurrentMethod().DeclaringType);
\r
22 private IStatusKeeper StatusKeeper { get; set; }
\r
25 private IPithosSettings Settings { get; set; }
\r
27 public IStatusNotification StatusNotification { get; set; }
\r
30 //CancellationTokenSource _cts = new CancellationTokenSource();
\r
31 /*public void SignalStop()
\r
36 public async Task UploadCloudFile(CloudUploadAction action,CancellationToken cancellationToken)
\r
39 throw new ArgumentNullException("action");
\r
40 Contract.EndContractBlock();
\r
42 using (ThreadContext.Stacks["Operation"].Push("UploadCloudFile"))
\r
46 await UnpauseEvent.WaitAsync().ConfigureAwait(false);
\r
48 var fileInfo = action.LocalFile;
\r
50 if (fileInfo.Extension.Equals("ignore", StringComparison.InvariantCultureIgnoreCase))
\r
53 if (!Selectives.IsSelected(action.AccountInfo, fileInfo) && !action.IsCreation)
\r
57 //Try to load the action's local state, if it is empty
\r
58 if (action.FileState == null)
\r
59 action.FileState = StatusKeeper.GetStateByFilePath(fileInfo.FullName);
\r
61 TreeHash localTreeHash;
\r
62 var isLarge = ((fileInfo as FileInfo).NullSafe(f => f.Length) >= action.AccountInfo.BlockSize);
\r
63 using (StatusNotification.GetNotifier("Merkle Hashing for Upload {0}", "Merkle Hashed for Upload {0}",isLarge, fileInfo.Name))
\r
65 //TODO: Load the stored treehash if appropriate
\r
66 //TODO: WHO updates LastMD5?
\r
68 var progress = new Progress<HashProgress>(d => StatusNotification.Notify(
\r
69 new StatusNotification(String.Format("Merkle Hashing for Upload {0:p} of {1}", d.Percentage, fileInfo.Name))));
\r
71 //If the action's Treehash is already calculated, use it instead of reprocessing
\r
72 localTreeHash = action.TreeHash.IsValueCreated
\r
73 ? action.TreeHash.Value
\r
74 : StatusAgent.CalculateTreeHash(fileInfo, action.AccountInfo, action.FileState, Settings.HashingParallelism, cancellationToken, progress);
\r
78 if (action.FileState != null)
\r
81 Log.WarnFormat("File [{0}] has no local state. It was probably created by a download action", fileInfo.FullName);
\r
86 var latestState = action.FileState;
\r
88 //Do not upload files in conflict
\r
89 if (latestState.FileStatus == FileStatus.Conflict)
\r
91 Log.InfoFormat("Skipping file in conflict [{0}]", fileInfo.FullName);
\r
94 //Do not upload files when we have no permission
\r
95 if (latestState.FileStatus == FileStatus.Forbidden)
\r
97 Log.InfoFormat("Skipping forbidden file [{0}]", fileInfo.FullName);
\r
101 //Are we targeting our own account or a sharer account?
\r
102 var relativePath = fileInfo.AsRelativeTo(action.AccountInfo.AccountPath);
\r
103 var accountInfo = relativePath.StartsWith(FolderConstants.OthersFolder)
\r
104 ? GetSharerAccount(relativePath, action.AccountInfo)
\r
105 : action.AccountInfo;
\r
109 var fullFileName = fileInfo.GetProperCapitalization();
\r
110 using (var gate = NetworkGate.Acquire(fullFileName, NetworkOperation.Uploading))
\r
112 //Abort if the file is already being uploaded or downloaded
\r
116 var cloudFile = action.CloudFile;
\r
117 var account = cloudFile.Account ?? accountInfo.UserName;
\r
121 var client = new CloudFilesClient(accountInfo);
\r
123 //Even if GetObjectInfo times out, we can proceed with the upload
\r
124 var cloudInfo = await client.GetObjectInfo(account, cloudFile.Container, cloudFile.Name).ConfigureAwait(false);
\r
126 //If this a shared file
\r
127 if (!cloudFile.Account.Equals(action.AccountInfo.UserName,StringComparison.InvariantCultureIgnoreCase))
\r
131 if (!cloudInfo.IsWritable(action.AccountInfo.UserName))
\r
133 MakeFileReadOnly(fullFileName);
\r
134 StatusKeeper.SetFileState(fullFileName, FileStatus.Unchanged, FileOverlayStatus.Normal, "");
\r
139 //If this is a read-only file, do not upload changes
\r
140 if ( !cloudInfo.IsWritable(action.AccountInfo.UserName) ||
\r
141 //If the file is new, but we can't upload it
\r
142 (!cloudInfo.Exists && !await client.CanUpload(account, cloudFile)) )
\r
144 MakeFileReadOnly(fullFileName);
\r
145 StatusKeeper.SetFileState(fullFileName, FileStatus.Unchanged, FileOverlayStatus.Normal, "");
\r
151 await UnpauseEvent.WaitAsync().ConfigureAwait(false);
\r
153 fileInfo.Refresh();
\r
154 //Does the file still exist or was it deleted/renamed?
\r
155 if (fileInfo.Exists)
\r
157 if (fileInfo is DirectoryInfo)
\r
159 //If the directory doesn't exist the Hash property will be empty
\r
160 if (String.IsNullOrWhiteSpace(cloudInfo.X_Object_Hash))
\r
161 //Go on and create the directory
\r
163 client.PutObject(account, cloudFile.Container, cloudFile.Name, fullFileName,
\r
164 Signature.MERKLE_EMPTY, ObjectInfo.CONTENT_TYPE_DIRECTORY);
\r
165 //If the upload is in response to a Folder create with Selective Sync enabled
\r
166 if (action.IsCreation)
\r
168 //Add the folder to the Selected URls
\r
169 var selectiveUri = client.RootAddressUri.Combine(cloudFile.Uri);
\r
170 Selectives.AddUri(accountInfo, selectiveUri);
\r
171 Selectives.Save(accountInfo);
\r
177 var cloudHash = cloudInfo.X_Object_Hash.ToLower();
\r
181 isLarge=(fileInfo as FileInfo).NullSafe(f => f.Length) >= action.AccountInfo.BlockSize;
\r
183 StatusNotification.GetNotifier("Hashing {0} for Upload", "Finished hashing {0}",isLarge,fileInfo.Name))
\r
185 treeHash = localTreeHash ?? action.TreeHash.Value;
\r
186 topHash = treeHash.TopHash.ToHashString();
\r
191 //If the file hashes match, abort the upload
\r
192 if (cloudInfo != ObjectInfo.Empty && (topHash == cloudHash))
\r
194 //but store any metadata changes
\r
195 StatusKeeper.StoreInfo(fullFileName, cloudInfo,treeHash);
\r
196 Log.InfoFormat("Skip upload of {0}, hashes match", fullFileName);
\r
201 //Mark the file as modified while we upload it
\r
202 StatusKeeper.SetFileOverlayStatus(fullFileName, FileOverlayStatus.Modified);
\r
203 //And then upload it
\r
205 //Upload even small files using the Hashmap. The server may already contain
\r
206 //the relevant block
\r
211 UploadWithHashMap(accountInfo, cloudFile, fileInfo as FileInfo, cloudFile.Name,
\r
212 treeHash, cancellationToken).ConfigureAwait(false);
\r
215 var currentInfo =await client.GetObjectInfo(cloudFile.Account, cloudFile.Container,
\r
216 cloudFile.Name).ConfigureAwait(false);
\r
218 StatusKeeper.StoreInfo(fullFileName, currentInfo, localTreeHash);
\r
219 //Ensure the status is cleared
\r
220 StatusKeeper.SetFileState(fullFileName, FileStatus.Unchanged,
\r
221 FileOverlayStatus.Normal, "");
\r
223 //If there is no stored ObjectID in the file state, add it
\r
224 //TODO: Why not just update everything, then change the state?
\r
225 if (action.FileState == null || action.FileState.ObjectID == null)
\r
230 //If everything succeeds, change the file and overlay status to normal
\r
231 StatusKeeper.SetFileState(fullFileName, FileStatus.Unchanged,
\r
232 FileOverlayStatus.Normal, "");
\r
237 StatusKeeper.ClearFileStatus(fullFileName);
\r
240 catch(FileNotFoundException exc)
\r
242 //The file to upload disappeared while uploading. Just abort it
\r
243 StatusKeeper.ClearFileStatus(fullFileName);
\r
245 catch (WebException exc)
\r
247 var response = (exc.Response as HttpWebResponse);
\r
248 if (response == null)
\r
250 if (response.StatusCode == HttpStatusCode.Forbidden)
\r
252 StatusKeeper.SetFileState(fileInfo.FullName, FileStatus.Forbidden, FileOverlayStatus.Conflict, "Forbidden");
\r
253 MakeFileReadOnly(fullFileName);
\r
256 //In any other case, propagate the error
\r
260 //Notify the Shell to update the overlays
\r
261 NativeMethods.RaiseChangeNotification(fullFileName);
\r
262 StatusNotification.NotifyChangedFile(fullFileName);
\r
264 catch (AggregateException ex)
\r
266 var exc = ex.InnerException as WebException;
\r
268 throw ex.InnerException;
\r
269 if (HandleUploadWebException(action, exc))
\r
273 catch (WebException ex)
\r
275 if (HandleUploadWebException(action, ex))
\r
279 catch (Exception ex)
\r
281 Log.Error("Unexpected error while uploading file", ex);
\r
288 private static void MakeFileReadOnly(string fullFileName)
\r
290 var attributes = File.GetAttributes(fullFileName);
\r
291 //Do not make any modifications if not necessary
\r
292 if (attributes.HasFlag(FileAttributes.ReadOnly))
\r
294 File.SetAttributes(fullFileName, attributes | FileAttributes.ReadOnly);
\r
297 private static AccountInfo GetSharerAccount(string relativePath, AccountInfo accountInfo)
\r
299 var parts = relativePath.Split('\\');
\r
300 var accountName = parts[1];
\r
301 var oldName = accountInfo.UserName;
\r
302 var absoluteUri = accountInfo.StorageUri.AbsoluteUri;
\r
303 var nameIndex = absoluteUri.IndexOf(oldName, StringComparison.Ordinal);
\r
304 var root = absoluteUri.Substring(0, nameIndex);
\r
306 accountInfo = new AccountInfo
\r
308 UserName = accountName,
\r
309 AccountPath = Path.Combine(accountInfo.AccountPath, parts[0], parts[1]),
\r
310 StorageUri = new Uri(root + accountName),
\r
311 BlockHash = accountInfo.BlockHash,
\r
312 BlockSize = accountInfo.BlockSize,
\r
313 Token = accountInfo.Token
\r
315 return accountInfo;
\r
319 public async Task UploadWithHashMap(AccountInfo accountInfo, ObjectInfo cloudFile, FileInfo fileInfo, Uri uri, TreeHash treeHash, CancellationToken token)
\r
321 if (accountInfo == null)
\r
322 throw new ArgumentNullException("accountInfo");
\r
323 if (cloudFile == null)
\r
324 throw new ArgumentNullException("cloudFile");
\r
325 if (fileInfo == null)
\r
326 throw new ArgumentNullException("fileInfo");
\r
328 throw new ArgumentNullException("uri");
\r
329 if (treeHash == null)
\r
330 throw new ArgumentNullException("treeHash");
\r
331 if (cloudFile.Container==null)
\r
332 throw new ArgumentException("Invalid container", "cloudFile");
\r
333 if (cloudFile.Container.IsAbsoluteUri)
\r
334 throw new ArgumentException("Container URI must be relative", "cloudFile");
\r
335 Contract.EndContractBlock();
\r
338 if (await WaitOrAbort(accountInfo, cloudFile, token).ConfigureAwait(false))
\r
341 var fullFileName = fileInfo.GetProperCapitalization();
\r
343 var account = cloudFile.Account ?? accountInfo.UserName;
\r
344 var container = cloudFile.Container;
\r
348 var client = new CloudFilesClient(accountInfo);
\r
349 //Send the hashmap to the server
\r
350 var missingHashes = await client.PutHashMap(account, container, uri, treeHash).ConfigureAwait(false);
\r
351 ReportUploadProgress(fileInfo.Name, block, 0, missingHashes.Count, fileInfo.Length);
\r
352 //If the server returns no missing hashes, we are done
\r
354 client.UploadProgressChanged += (sender, args) =>
\r
355 ReportUploadProgress(fileInfo.Name, block, args.ProgressPercentage,
\r
356 missingHashes.Count, fileInfo.Length);
\r
359 while (missingHashes.Count > 0)
\r
363 if (await WaitOrAbort(accountInfo, cloudFile, token).ConfigureAwait(false))
\r
367 var buffer = new byte[accountInfo.BlockSize];
\r
368 foreach (var missingHash in missingHashes)
\r
370 if (await WaitOrAbort(accountInfo, cloudFile, token).ConfigureAwait(false))
\r
374 //Find the proper block
\r
375 long blockIndex = treeHash.HashDictionary[missingHash];
\r
376 long offset = blockIndex*accountInfo.BlockSize;
\r
377 Debug.Assert(offset >= 0,
\r
378 String.Format("Negative Offset! BlockIndex {0} BlockSize {1}", blockIndex,
\r
379 accountInfo.BlockSize));
\r
382 var read = fileInfo.Read(buffer, offset, accountInfo.BlockSize);
\r
385 //And upload the block
\r
386 await client.PostBlock(account, container, buffer, 0, read,missingHash, token).ConfigureAwait(false);
\r
387 token.ThrowIfCancellationRequested();
\r
388 Log.InfoFormat("[BLOCK] Block {0} of {1} uploaded", blockIndex, fullFileName);
\r
393 //And upload the block
\r
394 await client.PostBlock(account, container, fileInfo.FullName, offset, accountInfo.BlockSize,missingHash, token).ConfigureAwait(false);
\r
395 token.ThrowIfCancellationRequested();
\r
396 Log.InfoFormat("[BLOCK] Block {0} of {1} uploaded", blockIndex, fullFileName);
\r
398 catch(FileNotFoundException exc)
\r
400 //The file disappeared before we had a chance to upload it (perhaps it was deleted).
\r
401 //In this case we should abort the upload without updating the local state
\r
402 Log.ErrorFormat("The file to upload is missing: [{0}]\n{1}",fileInfo.FullName,exc);
\r
405 catch (TaskCanceledException)
\r
407 throw new OperationCanceledException(token);
\r
409 catch (Exception exc)
\r
411 //If we couldn't read the block, abort the upload because the contents are sure to change and the hashmap will be invalid
\r
412 if (exc.InnerException is System.IO.IOException)
\r
413 throw exc.InnerException;
\r
414 Log.Error(String.Format("Uploading block {0} of {1}", blockIndex, fullFileName), exc);
\r
416 ReportUploadProgress(fileInfo.Name, block++, 100, missingHashes.Count, fileInfo.Length);
\r
419 token.ThrowIfCancellationRequested();
\r
420 //Repeat until there are no more missing hashes
\r
421 missingHashes = await client.PutHashMap(account, container, uri, treeHash).ConfigureAwait(false);
\r
424 ReportUploadProgress(fileInfo.Name, missingHashes.Count, 0, missingHashes.Count, fileInfo.Length);
\r
428 private async Task<bool> WaitOrAbort(AccountInfo account,ObjectInfo cloudFile, CancellationToken token)
\r
430 token.ThrowIfCancellationRequested();
\r
431 await UnpauseEvent.WaitAsync().ConfigureAwait(false);
\r
432 var shouldAbort = !Selectives.IsSelected(account,cloudFile);
\r
434 Log.InfoFormat("Aborting [{0}]",cloudFile.Uri);
\r
435 return shouldAbort;
\r
438 private void ReportUploadProgress(string fileName, int block, int blockPercentage, int totalBlocks, long fileSize)
\r
440 StatusNotification.Notify(totalBlocks == 0
\r
441 ? new ProgressNotification(fileName, "Uploading", 1,blockPercentage, 1, fileSize)
\r
442 : new ProgressNotification(fileName, "Uploading", block, blockPercentage, totalBlocks, fileSize));
\r
446 private bool HandleUploadWebException(CloudAction action, WebException exc)
\r
448 var response = exc.Response as HttpWebResponse;
\r
449 if (response == null)
\r
451 if (response.StatusCode == HttpStatusCode.Unauthorized)
\r
453 Log.Error("Not allowed to upload file", exc);
\r
454 var message = String.Format("Not allowed to uplad file {0}", action.LocalFile.FullName);
\r
455 StatusKeeper.SetFileState(action.LocalFile.FullName, FileStatus.Unchanged, FileOverlayStatus.Normal, "");
\r
456 StatusNotification.NotifyChange(message, TraceLevel.Warning);
\r
463 public Selectives Selectives { get; set; }
\r
465 public AsyncManualResetEvent UnpauseEvent { get; set; }
\r