Revision a27aa447 trunk/Pithos.Core/Agents/NetworkAgent.cs
b/trunk/Pithos.Core/Agents/NetworkAgent.cs | ||
---|---|---|
6 | 6 |
using System.IO; |
7 | 7 |
using System.Linq; |
8 | 8 |
using System.Text; |
9 |
using System.Threading; |
|
9 | 10 |
using System.Threading.Tasks; |
10 | 11 |
using Pithos.Interfaces; |
11 | 12 |
using Pithos.Network; |
... | ... | |
33 | 34 |
*/ |
34 | 35 |
|
35 | 36 |
|
36 |
private string _pithosContainer;
|
|
37 |
private string _trashContainer;
|
|
37 |
public string PithosContainer { get; set; }
|
|
38 |
public string TrashContainer { get; private set; }
|
|
38 | 39 |
|
39 |
private int _blockSize;
|
|
40 |
private string _blockHash;
|
|
40 |
public int BlockSize { get; set; }
|
|
41 |
public string BlockHash { get; set; }
|
|
41 | 42 |
|
42 | 43 |
|
43 | 44 |
public void Start(string pithosContainer, string trashContainer, int blockSize, string blockHash) |
... | ... | |
48 | 49 |
throw new ArgumentNullException("trashContainer"); |
49 | 50 |
Contract.EndContractBlock(); |
50 | 51 |
|
51 |
_pithosContainer = pithosContainer;
|
|
52 |
_trashContainer = trashContainer;
|
|
53 |
_blockSize = blockSize;
|
|
54 |
_blockHash = blockHash;
|
|
52 |
PithosContainer = pithosContainer;
|
|
53 |
TrashContainer = trashContainer;
|
|
54 |
BlockSize = blockSize;
|
|
55 |
BlockHash = blockHash;
|
|
55 | 56 |
|
56 | 57 |
|
57 | 58 |
_agent = Agent<CloudAction>.Start(inbox => |
... | ... | |
60 | 61 |
loop = () => |
61 | 62 |
{ |
62 | 63 |
var message = inbox.Receive(); |
64 |
|
|
65 |
/* |
|
66 |
var process=Process(message); |
|
67 |
inbox.LoopAsync(process, loop); |
|
68 |
*/ |
|
69 |
|
|
70 |
/* |
|
71 |
process1.ContinueWith(t => |
|
72 |
{ |
|
73 |
inbox.DoAsync(loop); |
|
74 |
if (t.IsFaulted) |
|
75 |
{ |
|
76 |
var ex = t.Exception.InnerException; |
|
77 |
if (ex is OperationCanceledException) |
|
78 |
inbox.Stop(); |
|
79 |
} |
|
80 |
}); |
|
81 |
*/ |
|
82 |
//inbox.DoAsync(loop); |
|
83 |
|
|
84 |
|
|
63 | 85 |
var process = message.ContinueWith(t => |
64 | 86 |
{ |
65 | 87 |
var action = t.Result; |
... | ... | |
67 | 89 |
Process(action); |
68 | 90 |
inbox.DoAsync(loop); |
69 | 91 |
}); |
92 |
|
|
70 | 93 |
process.ContinueWith(t => |
71 | 94 |
{ |
72 | 95 |
inbox.DoAsync(loop); |
... | ... | |
78 | 101 |
} |
79 | 102 |
}); |
80 | 103 |
|
104 |
|
|
81 | 105 |
}; |
82 | 106 |
loop(); |
83 | 107 |
}); |
... | ... | |
89 | 113 |
if (cloudAction == null) |
90 | 114 |
throw new ArgumentNullException("cloudAction"); |
91 | 115 |
Contract.EndContractBlock(); |
116 |
|
|
117 |
//If the action targets a local file, add a treehash calculation |
|
118 |
if (cloudAction.LocalFile != null) |
|
119 |
{ |
|
120 |
cloudAction.TopHash = new Lazy<string>(() => Signature.CalculateTreeHashAsync(cloudAction.LocalFile, |
|
121 |
BlockSize, BlockHash).Result |
|
122 |
.TopHash.ToHashString()); |
|
92 | 123 |
|
124 |
} |
|
93 | 125 |
_agent.Post(cloudAction); |
94 | 126 |
} |
95 | 127 |
|
... | ... | |
112 | 144 |
Trace.CorrelationManager.StartLogicalOperation(); |
113 | 145 |
Trace.TraceInformation("[LISTENER] Scheduled"); |
114 | 146 |
var listObjects = Task.Factory.StartNewDelayed(10000).ContinueWith(t => |
115 |
CloudClient.ListObjects(_pithosContainer,since));
|
|
147 |
CloudClient.ListObjects(PithosContainer,since));
|
|
116 | 148 |
|
117 | 149 |
DateTime nextSince = DateTime.Now.AddSeconds(-1); |
118 | 150 |
|
... | ... | |
181 | 213 |
.ToList(); |
182 | 214 |
|
183 | 215 |
//Queue all the actions |
184 |
_agent.AddFromEnumerable(distinctActions); |
|
216 |
foreach (var message in distinctActions) |
|
217 |
{ |
|
218 |
Post(message); |
|
219 |
} |
|
220 |
|
|
185 | 221 |
|
186 | 222 |
if(remoteOnly.Count>0) |
187 | 223 |
StatusNotification.NotifyChange(String.Format("Processing {0} new files", remoteOnly.Count)); |
... | ... | |
208 | 244 |
} |
209 | 245 |
|
210 | 246 |
|
247 |
private Task Process(Task<CloudAction> action) |
|
248 |
{ |
|
249 |
return action.ContinueWith(t=> Process(t.Result)); |
|
250 |
} |
|
211 | 251 |
|
212 | 252 |
|
213 | 253 |
private void Process(CloudAction action) |
... | ... | |
230 | 270 |
UploadCloudFile(localFile, action.LocalHash.Value,action.TopHash.Value); |
231 | 271 |
break; |
232 | 272 |
case CloudActionType.DownloadUnconditional: |
233 |
DownloadCloudFile(_pithosContainer, new Uri(cloudFile.Name,UriKind.Relative), downloadPath);
|
|
273 |
DownloadCloudFile(PithosContainer, new Uri(cloudFile.Name,UriKind.Relative), downloadPath);
|
|
234 | 274 |
break; |
235 | 275 |
case CloudActionType.DeleteCloud: |
236 | 276 |
DeleteCloudFile(cloudFile.Name); |
... | ... | |
266 | 306 |
case FileStatus.Unchanged: |
267 | 307 |
//It he cloud file has a later date, it was modified by another user or computer. |
268 | 308 |
//If the local file's status is Unchanged, we should go on and download the cloud file |
269 |
DownloadCloudFile(_pithosContainer, new Uri(action.CloudFile.Name,UriKind.Relative), downloadPath);
|
|
309 |
DownloadCloudFile(PithosContainer, new Uri(action.CloudFile.Name,UriKind.Relative), downloadPath);
|
|
270 | 310 |
break; |
271 | 311 |
case FileStatus.Modified: |
272 | 312 |
//If the local file is Modified, we may have a conflict. In this case we should mark the file as Conflict |
... | ... | |
292 | 332 |
} |
293 | 333 |
} |
294 | 334 |
else |
295 |
DownloadCloudFile(_pithosContainer, new Uri(action.CloudFile.Name,UriKind.Relative), downloadPath);
|
|
335 |
DownloadCloudFile(PithosContainer, new Uri(action.CloudFile.Name,UriKind.Relative), downloadPath);
|
|
296 | 336 |
break; |
297 | 337 |
} |
298 | 338 |
Trace.TraceInformation("[ACTION] End Processing {0}:{1}->{2}", action.Action, action.LocalFile, action.CloudFile.Name); |
... | ... | |
325 | 365 |
//The local file is already renamed |
326 | 366 |
this.StatusKeeper.SetFileOverlayStatus(newPath, FileOverlayStatus.Modified); |
327 | 367 |
|
328 |
CloudClient.MoveObject(_pithosContainer, oldFileName, _pithosContainer, newFileName);
|
|
368 |
CloudClient.MoveObject(PithosContainer, oldFileName, PithosContainer, newFileName);
|
|
329 | 369 |
|
330 | 370 |
this.StatusKeeper.SetFileStatus(newPath, FileStatus.Unchanged); |
331 | 371 |
this.StatusKeeper.SetFileOverlayStatus(newPath, FileOverlayStatus.Normal); |
... | ... | |
342 | 382 |
|
343 | 383 |
this.StatusKeeper.SetFileOverlayStatus(fileName, FileOverlayStatus.Modified); |
344 | 384 |
|
345 |
CloudClient.MoveObject(_pithosContainer, fileName, _trashContainer, fileName); |
|
385 |
CloudClient.MoveObject(PithosContainer, fileName, TrashContainer, fileName); |
|
386 |
|
|
346 | 387 |
this.StatusKeeper.ClearFileStatus(fileName); |
347 | 388 |
this.StatusKeeper.RemoveFileOverlayStatus(fileName); |
348 | 389 |
} |
... | ... | |
354 | 395 |
throw new ArgumentNullException("container"); |
355 | 396 |
if (relativeUrl==null) |
356 | 397 |
throw new ArgumentNullException("relativeUrl"); |
357 |
|
|
358 | 398 |
if (String.IsNullOrWhiteSpace(localPath)) |
359 | 399 |
throw new ArgumentNullException("localPath"); |
360 | 400 |
if (!Path.IsPathRooted(localPath)) |
... | ... | |
370 | 410 |
{ |
371 | 411 |
if (gate.Failed) |
372 | 412 |
return; |
373 |
//Calculate the relative file path for the new file |
|
374 |
var relativePath = relativeUrl.RelativeUriToFilePath(); |
|
375 |
//The file will be stored in a temporary location while downloading with an extension .download |
|
376 |
var tempPath = Path.Combine(FileAgent.FragmentsPath, relativePath + ".download"); |
|
377 | 413 |
//The file's hashmap will be stored in the same location with the extension .hashmap |
378 |
var hashPath = Path.Combine(FileAgent.FragmentsPath, relativePath + ".hashmap"); |
|
414 |
//var hashPath = Path.Combine(FileAgent.FragmentsPath, relativePath + ".hashmap");
|
|
379 | 415 |
|
380 | 416 |
//Retrieve the hashmap from the server |
381 |
var getHashMap = CloudClient.GetHashMap(container,url); |
|
382 |
//And save it |
|
383 |
var saveHashMap = getHashMap.ContinueWith(t => |
|
417 |
var getHashMap = CloudClient.GetHashMap(container,url); |
|
418 |
var downloadTask= getHashMap.ContinueWith(t => |
|
384 | 419 |
{ |
385 |
var treeHash = t.Result; |
|
386 |
treeHash.Save(hashPath); |
|
420 |
var serverHash=t.Result; |
|
421 |
//If it's a small file |
|
422 |
return serverHash.Hashes.Count == 1 |
|
423 |
//Download it in one go |
|
424 |
? DownloadEntireFile(container, relativeUrl, localPath) |
|
425 |
//Otherwise download it block by block |
|
426 |
: DownloadWithBlocks(container, relativeUrl, localPath, serverHash); |
|
387 | 427 |
}); |
388 | 428 |
|
389 |
//Make sure the target folder exists. DownloadFileTask will not create the folder |
|
390 |
var directoryPath=Path.GetDirectoryName(tempPath); |
|
391 |
if (!Directory.Exists(directoryPath)) |
|
392 |
Directory.CreateDirectory(directoryPath); |
|
393 |
|
|
394 |
//Download the object to the temporary location |
|
395 |
var getObject = CloudClient.GetObject(container, url, tempPath).ContinueWith(t => |
|
396 |
{ |
|
397 |
//And move it to its actual location once downloading is finished |
|
398 |
if (File.Exists(localPath)) |
|
399 |
File.Replace(tempPath,localPath,null,true); |
|
400 |
else |
|
401 |
File.Move(tempPath,localPath); |
|
402 |
}); |
|
403 | 429 |
|
430 |
|
|
404 | 431 |
//Retrieve the object's metadata |
405 |
var getInfo = saveHashMap.ContinueWith(t => getObject).ContinueWith(t =>
|
|
432 |
var getInfo = downloadTask.ContinueWith(t =>
|
|
406 | 433 |
CloudClient.GetObjectInfo(container, url)); |
407 | 434 |
//And store it |
408 | 435 |
var storeInfo = getInfo.ContinueWith(t => |
... | ... | |
414 | 441 |
} |
415 | 442 |
} |
416 | 443 |
|
444 |
//Download a small file with a single GET operation |
|
445 |
private Task DownloadEntireFile(string container, Uri relativeUrl, string localPath) |
|
446 |
{ |
|
447 |
if (String.IsNullOrWhiteSpace(container)) |
|
448 |
throw new ArgumentNullException("container"); |
|
449 |
if (relativeUrl == null) |
|
450 |
throw new ArgumentNullException("relativeUrl"); |
|
451 |
if (String.IsNullOrWhiteSpace(localPath)) |
|
452 |
throw new ArgumentNullException("localPath"); |
|
453 |
if (!Path.IsPathRooted(localPath)) |
|
454 |
throw new ArgumentException("The localPath must be rooted", "localPath"); |
|
455 |
Contract.EndContractBlock(); |
|
456 |
|
|
457 |
//Calculate the relative file path for the new file |
|
458 |
var relativePath = relativeUrl.RelativeUriToFilePath(); |
|
459 |
//The file will be stored in a temporary location while downloading with an extension .download |
|
460 |
var tempPath = Path.Combine(FileAgent.FragmentsPath, relativePath + ".download"); |
|
461 |
//Make sure the target folder exists. DownloadFileTask will not create the folder |
|
462 |
var directoryPath = Path.GetDirectoryName(tempPath); |
|
463 |
if (!Directory.Exists(directoryPath)) |
|
464 |
Directory.CreateDirectory(directoryPath); |
|
465 |
|
|
466 |
//Download the object to the temporary location |
|
467 |
var getObject = CloudClient.GetObject(container, relativeUrl.ToString(), tempPath).ContinueWith(t => |
|
468 |
{ |
|
469 |
//And move it to its actual location once downloading is finished |
|
470 |
if (File.Exists(localPath)) |
|
471 |
File.Replace(tempPath,localPath,null,true); |
|
472 |
else |
|
473 |
File.Move(tempPath,localPath); |
|
474 |
}); |
|
475 |
return getObject; |
|
476 |
} |
|
477 |
|
|
478 |
public Task DownloadWithBlocks(string container,Uri relativeUrl, string localPath,TreeHash serverHash) |
|
479 |
{ |
|
480 |
if (String.IsNullOrWhiteSpace(container)) |
|
481 |
throw new ArgumentNullException("container"); |
|
482 |
if (relativeUrl == null) |
|
483 |
throw new ArgumentNullException("relativeUrl"); |
|
484 |
if (String.IsNullOrWhiteSpace(localPath)) |
|
485 |
throw new ArgumentNullException("localPath"); |
|
486 |
if (!Path.IsPathRooted(localPath)) |
|
487 |
throw new ArgumentException("The localPath must be rooted", "localPath"); |
|
488 |
if(serverHash==null) |
|
489 |
throw new ArgumentNullException("serverHash"); |
|
490 |
Contract.EndContractBlock(); |
|
491 |
|
|
492 |
//Calculate the relative file path for the new file |
|
493 |
var relativePath = relativeUrl.RelativeUriToFilePath(); |
|
494 |
//The file will be stored in a temporary location while downloading with an extension .download |
|
495 |
var tempPath = Path.Combine(FileAgent.FragmentsPath, relativePath + ".download"); |
|
496 |
var directoryPath = Path.GetDirectoryName(tempPath); |
|
497 |
if (!Directory.Exists(directoryPath)) |
|
498 |
Directory.CreateDirectory(directoryPath); |
|
499 |
|
|
500 |
//If the local file exists we should make a copy of it to the |
|
501 |
//fragments folder, unless a newer temp copy already exists, which |
|
502 |
//means there is an interrupted download |
|
503 |
if (ShouldCopy(localPath, tempPath)) |
|
504 |
File.Copy(localPath, tempPath, true); |
|
505 |
|
|
506 |
//Set the size of the file to the size specified in the treehash |
|
507 |
//This will also create an empty file if the file doesn't exist |
|
508 |
SetFileSize(tempPath, serverHash.Bytes); |
|
509 |
|
|
510 |
return Task.Factory.StartNew(() => |
|
511 |
{ |
|
512 |
//Calculate the temp file's treehash |
|
513 |
var treeHash = Signature.CalculateTreeHashAsync(tempPath, this.BlockSize,BlockHash).Result; |
|
514 |
|
|
515 |
//And compare it with the server's hash |
|
516 |
var upHashes = serverHash.GetHashesAsStrings(); |
|
517 |
var localHashes = treeHash.HashDictionary; |
|
518 |
for (int i = 0; i < upHashes.Length; i++) |
|
519 |
{ |
|
520 |
//For every non-matching hash |
|
521 |
if (!localHashes.ContainsKey(upHashes[i])) |
|
522 |
{ |
|
523 |
Trace.TraceInformation("[BLOCK GET] START {0} of {1} for {2}",i,upHashes.Length,localPath); |
|
524 |
var start = i*BlockSize; |
|
525 |
long? end = null; |
|
526 |
if (i < upHashes.Length - 1 ) |
|
527 |
end= ((i + 1)*BlockSize) ; |
|
528 |
|
|
529 |
//Get its block |
|
530 |
var blockTask = CloudClient.GetBlock(container, relativeUrl, |
|
531 |
start, end); |
|
532 |
|
|
533 |
blockTask.ContinueWith(b => |
|
534 |
{ |
|
535 |
//And apply it to the temp file |
|
536 |
var buffer = b.Result; |
|
537 |
var stream =FileAsync.OpenWrite(tempPath); |
|
538 |
stream.Seek(start,SeekOrigin.Begin); |
|
539 |
return stream.WriteAsync(buffer, 0, buffer.Length) |
|
540 |
.ContinueWith(s => stream.Close()); |
|
541 |
|
|
542 |
}).Unwrap() |
|
543 |
.Wait(); |
|
544 |
Trace.TraceInformation("[BLOCK GET] FINISH {0} of {1} for {2}", i, upHashes.Length, localPath); |
|
545 |
} |
|
546 |
|
|
547 |
} |
|
548 |
|
|
549 |
|
|
550 |
//Replace the existing file with the temp |
|
551 |
if (File.Exists(localPath)) |
|
552 |
File.Replace(tempPath, localPath, null, true); |
|
553 |
else |
|
554 |
File.Move(tempPath, localPath); |
|
555 |
Trace.TraceInformation("[BLOCK GET] COMPLETE {0}", localPath); |
|
556 |
}); |
|
557 |
} |
|
558 |
|
|
559 |
//Change the file's size, possibly truncating or adding to it |
|
560 |
private static void SetFileSize(string filePath, long fileSize) |
|
561 |
{ |
|
562 |
if (String.IsNullOrWhiteSpace(filePath)) |
|
563 |
throw new ArgumentNullException("filePath"); |
|
564 |
if (!Path.IsPathRooted(filePath)) |
|
565 |
throw new ArgumentException("The filePath must be rooted", "filePath"); |
|
566 |
if (fileSize<0) |
|
567 |
throw new ArgumentOutOfRangeException("fileSize"); |
|
568 |
Contract.EndContractBlock(); |
|
569 |
|
|
570 |
using (var stream = File.Open(filePath, FileMode.OpenOrCreate, FileAccess.Write)) |
|
571 |
{ |
|
572 |
stream.SetLength(fileSize); |
|
573 |
} |
|
574 |
} |
|
575 |
|
|
576 |
//Check whether we should copy the local file to a temp path |
|
577 |
private static bool ShouldCopy(string localPath, string tempPath) |
|
578 |
{ |
|
579 |
//No need to copy if there is no file |
|
580 |
if (!File.Exists(localPath)) |
|
581 |
return false; |
|
582 |
|
|
583 |
//If there is no temp file, go ahead and copy |
|
584 |
if (!File.Exists(tempPath)) |
|
585 |
return true; |
|
586 |
|
|
587 |
//If there is a temp file and is newer than the actual file, don't copy |
|
588 |
var localLastWrite = File.GetLastWriteTime(localPath); |
|
589 |
var tempLastWrite = File.GetLastWriteTime(tempPath); |
|
590 |
|
|
591 |
//This could mean there is an interrupted download in progress |
|
592 |
return (tempLastWrite < localLastWrite); |
|
593 |
} |
|
594 |
|
|
417 | 595 |
private void UploadCloudFile(FileInfo fileInfo, string hash,string topHash) |
418 | 596 |
{ |
419 | 597 |
if (fileInfo==null) |
... | ... | |
436 | 614 |
|
437 | 615 |
|
438 | 616 |
//Even if GetObjectInfo times out, we can proceed with the upload |
439 |
var info = CloudClient.GetObjectInfo(_pithosContainer, url);
|
|
617 |
var info = CloudClient.GetObjectInfo(PithosContainer, url);
|
|
440 | 618 |
|
441 | 619 |
//If the file hashes match, abort the upload |
442 | 620 |
if (hash.Equals(info.Hash, StringComparison.InvariantCultureIgnoreCase) || |
... | ... | |
448 | 626 |
return; |
449 | 627 |
} |
450 | 628 |
|
451 |
//If the file was last modified by a hashmap operation, its hash will be a treehash |
|
452 |
//We must load the local file's treehash and check it against the object's hash |
|
453 |
//The hash will be stored under the fragments path, in the same relative path as to |
|
454 |
//the pithos root path |
|
455 |
var relativePath = fileInfo.AsRelativeTo(FileAgent.RootPath); |
|
456 |
/* |
|
457 |
var hashPath = Path.Combine(FileAgent.FragmentsPath, relativePath) + ".hashmap"; |
|
458 |
//Load the hash or calculate a new one |
|
459 |
var hashFileExists = File.Exists(hashPath); |
|
460 |
var treeHash = hashFileExists ? |
|
461 |
TreeHash.LoadTreeHash(hashPath, Guid.NewGuid()).Result |
|
462 |
: Signature.CalculateTreeHash(fileInfo.FullName, _blockSize, _blockHash); |
|
463 |
//If this is a new treehash, store it |
|
464 |
if (!hashFileExists) |
|
465 |
treeHash.Save(hashPath); |
|
466 |
|
|
467 |
var topHash = treeHash.TopHash.ToHashString(); |
|
468 |
if (topHash.Equals(info.Hash, StringComparison.InvariantCultureIgnoreCase)) |
|
629 |
//Mark the file as modified while we upload it |
|
630 |
var setStatus = Task.Factory.StartNew(() => |
|
631 |
StatusKeeper.SetFileOverlayStatus(fullFileName, FileOverlayStatus.Modified)); |
|
632 |
//And then upload it |
|
633 |
|
|
634 |
//If the file is larger than the block size, try a hashmap PUT |
|
635 |
if (fileInfo.Length > BlockSize ) |
|
469 | 636 |
{ |
470 |
this.StatusKeeper.StoreInfo(fileInfo.FullName, info); |
|
471 |
Trace.TraceInformation("Skip upload of {0}, treehashes match", fileInfo.FullName); |
|
472 |
return; |
|
637 |
//To upload using a hashmap |
|
638 |
//First, calculate the tree hash |
|
639 |
var treeHash = Signature.CalculateTreeHashAsync(fileInfo.FullName, BlockSize, BlockHash); |
|
640 |
|
|
641 |
var putHashMap = setStatus.ContinueWith(t=> |
|
642 |
UploadWithHashMap(fileInfo,url,treeHash)); |
|
643 |
|
|
644 |
putHashMap.Wait(); |
|
473 | 645 |
} |
474 |
*/ |
|
475 |
|
|
476 |
|
|
477 |
|
|
478 |
|
|
479 |
//Does the object exist or not? |
|
480 |
|
|
481 |
//Calculate or load the file's hashmap |
|
482 |
|
|
483 |
/* |
|
484 |
if (fileInfo.Bytes > _blockSize) |
|
485 |
{ |
|
486 |
var hashPath = Path.Combine(FileAgent.FragmentsPath, relativePath) + ".hashmap"; |
|
487 |
|
|
488 |
//Load the existing hashPath, if we have one, or calculate a new one |
|
489 |
var treeHash = File.Exists(hashPath) ? |
|
490 |
TreeHash.LoadTreeHash(hashPath, Guid.NewGuid()).Result |
|
491 |
: Signature.CalculateTreeHash(fileInfo.FullName, _blockSize , _blockHash); |
|
492 |
|
|
493 |
//Do the Hashmap Put |
|
494 |
Task<string> putHashMap=CloudClient.PutHashMap(_pithosContainer,url, treeHash); |
|
495 |
putHashMap.Wait(); |
|
496 |
}*/ |
|
497 |
|
|
498 |
// |
|
499 |
|
|
500 |
|
|
501 |
var putOrUpdate = Task.Factory.StartNew(() => |
|
646 |
else |
|
502 | 647 |
{ |
503 |
//Mark the file as modified while we upload it |
|
504 |
var setStatus = Task.Factory.StartNew(() => |
|
505 |
StatusKeeper.SetFileOverlayStatus(fullFileName,FileOverlayStatus.Modified)); |
|
506 |
//And then upload it |
|
648 |
//Otherwise do a regular PUT |
|
507 | 649 |
var put = setStatus.ContinueWith(t => |
508 |
CloudClient.PutObject(_pithosContainer,url,fullFileName, hash));
|
|
509 |
});
|
|
510 |
putOrUpdate.Wait();
|
|
650 |
CloudClient.PutObject(PithosContainer,url,fullFileName,hash));
|
|
651 |
put.Wait();
|
|
652 |
}
|
|
511 | 653 |
//If everything succeeds, change the file and overlay status to normal |
512 | 654 |
this.StatusKeeper.SetFileState(fullFileName, FileStatus.Unchanged, FileOverlayStatus.Normal); |
513 | 655 |
} |
... | ... | |
516 | 658 |
StatusNotification.NotifyChangedFile(fullFileName); |
517 | 659 |
} |
518 | 660 |
|
661 |
public void UploadWithHashMap(FileInfo fileInfo,string url,Task<TreeHash> treeHash) |
|
662 |
{ |
|
663 |
var fullFileName = fileInfo.FullName; |
|
664 |
|
|
665 |
//Send the hashmap to the server |
|
666 |
var hashPut = CloudClient.PutHashMap(PithosContainer, url, treeHash.Result); |
|
667 |
var missingHashes = hashPut.Result; |
|
668 |
if (missingHashes.Count == 0) |
|
669 |
return; |
|
670 |
|
|
671 |
var buffer = new byte[BlockSize]; |
|
672 |
foreach (var missingHash in missingHashes) |
|
673 |
{ |
|
674 |
int blockIndex = -1; |
|
675 |
try |
|
676 |
{ |
|
677 |
//Find the proper block |
|
678 |
blockIndex = treeHash.Result.HashDictionary[missingHash]; |
|
679 |
var offset = blockIndex*BlockSize; |
|
680 |
|
|
681 |
var read = fileInfo.Read(buffer, offset, BlockSize); |
|
682 |
if (read > 0) |
|
683 |
{ |
|
684 |
//Copy the actual block data out of the buffer |
|
685 |
var data = new byte[read]; |
|
686 |
Buffer.BlockCopy(buffer, 0, data, 0, read); |
|
687 |
|
|
688 |
//And POST them |
|
689 |
CloudClient.PostBlock(PithosContainer, data).Wait(); |
|
690 |
Trace.TraceInformation("[BLOCK] Block {0} of {1} uploaded", blockIndex, |
|
691 |
fullFileName); |
|
692 |
} |
|
693 |
} |
|
694 |
catch (Exception exc) |
|
695 |
{ |
|
696 |
Trace.TraceError("[ERROR] uploading block {0} of {1}\n{2}", blockIndex, fullFileName, exc); |
|
697 |
} |
|
698 |
} |
|
699 |
|
|
700 |
UploadWithHashMap(fileInfo, url, treeHash); |
|
701 |
|
|
702 |
} |
|
703 |
|
|
519 | 704 |
|
520 | 705 |
} |
521 | 706 |
|
Also available in: Unified diff