2 using System.Collections.Generic;
3 using System.ComponentModel.Composition;
4 using System.Diagnostics;
5 using System.Diagnostics.Contracts;
10 using System.Threading;
11 using System.Threading.Tasks;
12 using Pithos.Interfaces;
16 namespace Pithos.Core.Agents
19 public class NetworkAgent
21 private Agent<CloudAction> _agent;
24 public IStatusKeeper StatusKeeper { get; set; }
26 public IStatusNotification StatusNotification { get; set; }
28 public ICloudClient CloudClient { get; set; }
31 public FileAgent FileAgent {get;set;}
35 public IPithosWorkflow Workflow { get; set; }
39 public string PithosContainer { get; set; }
40 public string TrashContainer { get; private set; }
41 public IList<string> Containers { get; private set; }
43 public int BlockSize { get; set; }
44 public string BlockHash { get; set; }
46 private static readonly ILog Log = LogManager.GetLogger("NetworkAgent");
49 public void Start(string pithosContainer, string trashContainer, int blockSize, string blockHash)
51 if (String.IsNullOrWhiteSpace(pithosContainer))
52 throw new ArgumentNullException("pithosContainer");
53 if (String.IsNullOrWhiteSpace(trashContainer))
54 throw new ArgumentNullException("trashContainer");
55 Contract.EndContractBlock();
57 PithosContainer = pithosContainer;
58 TrashContainer = trashContainer;
59 BlockSize = blockSize;
60 BlockHash = blockHash;
63 _agent = Agent<CloudAction>.Start(inbox =>
68 var message = inbox.Receive();
69 var process=message.Then(Process,inbox.CancellationToken);
70 inbox.LoopAsync(process, loop);
76 private Task<object> Process(CloudAction action)
79 throw new ArgumentNullException("action");
80 Contract.EndContractBlock();
82 using (log4net.ThreadContext.Stacks["NETWORK"].Push("PROCESS"))
84 Log.InfoFormat("[ACTION] Start Processing {0}:{1}->{2}", action.Action, action.LocalFile,
85 action.CloudFile.Name);
87 var localFile = action.LocalFile;
88 var cloudFile = action.CloudFile;
89 var downloadPath = (cloudFile == null)
91 : Path.Combine(FileAgent.RootPath, cloudFile.RelativeUrlToFilePath(CloudClient.UserName));
95 var account = action.CloudFile.Account ?? CloudClient.UserName;
96 var container = action.CloudFile.Container ?? PithosContainer;
98 switch (action.Action)
100 case CloudActionType.UploadUnconditional:
101 UploadCloudFile(account, container, localFile, action.LocalHash.Value, action.TopHash.Value);
103 case CloudActionType.DownloadUnconditional:
105 DownloadCloudFile(account, container, new Uri(cloudFile.Name, UriKind.Relative),
108 case CloudActionType.DeleteCloud:
109 DeleteCloudFile(account, container, cloudFile.Name);
111 case CloudActionType.RenameCloud:
112 var moveAction = (CloudMoveAction)action;
113 RenameCloudFile(account, container, moveAction.OldFileName, moveAction.NewPath,
114 moveAction.NewFileName);
116 case CloudActionType.MustSynch:
118 if (!File.Exists(downloadPath))
120 var cloudUri = new Uri(action.CloudFile.Name, UriKind.Relative);
121 DownloadCloudFile(account, container, cloudUri, downloadPath);
129 Log.InfoFormat("[ACTION] End Processing {0}:{1}->{2}", action.Action, action.LocalFile,
130 action.CloudFile.Name);
132 catch (OperationCanceledException)
136 catch (FileNotFoundException exc)
138 Log.ErrorFormat("{0} : {1} -> {2} failed because the file was not found.\n Rescheduling a delete",
139 action.Action, action.LocalFile, action.CloudFile, exc);
140 Post(new CloudDeleteAction(action.CloudFile,action.FileState));
142 catch (Exception exc)
144 Log.ErrorFormat("[REQUEUE] {0} : {1} -> {2} due to exception\r\n{3}",
145 action.Action, action.LocalFile, action.CloudFile, exc);
149 return CompletedTask<object>.Default;
153 private void SyncFiles(CloudAction action)
156 throw new ArgumentNullException("action");
157 if (action.LocalFile==null)
158 throw new ArgumentException("The action's local file is not specified","action");
159 if (!Path.IsPathRooted(action.LocalFile.FullName))
160 throw new ArgumentException("The action's local file path must be absolute","action");
161 if (action.CloudFile== null)
162 throw new ArgumentException("The action's cloud file is not specified", "action");
163 Contract.EndContractBlock();
165 var localFile = action.LocalFile;
166 var cloudFile = action.CloudFile;
167 var downloadPath=action.LocalFile.FullName.ToLower();
169 var account = cloudFile.Account;
170 //Use "pithos" by default if no container is specified
171 var container = cloudFile.Container ?? PithosContainer;
173 var cloudUri = new Uri(cloudFile.Name, UriKind.Relative);
174 var cloudHash = cloudFile.Hash.ToLower();
175 var localHash = action.LocalHash.Value.ToLower();
176 var topHash = action.TopHash.Value.ToLower();
178 //Not enough to compare only the local hashes, also have to compare the tophashes
180 //If any of the hashes match, we are done
181 if ((cloudHash == localHash || cloudHash == topHash))
183 Log.InfoFormat("Skipping {0}, hashes match",downloadPath);
187 //The hashes DON'T match. We need to sync
188 var lastLocalTime = localFile.LastWriteTime;
189 var lastUpTime = cloudFile.Last_Modified;
191 //If the local file is newer upload it
192 if (lastUpTime <= lastLocalTime)
194 //It probably means it was changed while the app was down
195 UploadCloudFile(account, container, localFile, action.LocalHash.Value,
196 action.TopHash.Value);
200 //It the cloud file has a later date, it was modified by another user or computer.
201 //We need to check the local file's status
202 var status = StatusKeeper.GetFileStatus(downloadPath);
205 case FileStatus.Unchanged:
206 //If the local file's status is Unchanged, we can go on and download the newer cloud file
207 DownloadCloudFile(account, container,cloudUri,downloadPath);
209 case FileStatus.Modified:
210 //If the local file is Modified, we may have a conflict. In this case we should mark the file as Conflict
211 //We can't ensure that a file modified online since the last time will appear as Modified, unless we
212 //index all files before we start listening.
213 case FileStatus.Created:
214 //If the local file is Created, it means that the local and cloud files aren't related,
215 // yet they have the same name.
217 //In both cases we must mark the file as in conflict
218 ReportConflict(downloadPath);
221 //Other cases should never occur. Mark them as Conflict as well but log a warning
222 ReportConflict(downloadPath);
223 Log.WarnFormat("Unexcepted status {0} for file {1}->{2}", status,
224 downloadPath, action.CloudFile.Name);
230 private void ReportConflict(string downloadPath)
232 if (String.IsNullOrWhiteSpace(downloadPath))
233 throw new ArgumentNullException("downloadPath");
234 Contract.EndContractBlock();
236 StatusKeeper.SetFileOverlayStatus(downloadPath, FileOverlayStatus.Conflict);
237 var message = String.Format("Conflict detected for file {0}", downloadPath);
239 StatusNotification.NotifyChange(message, TraceLevel.Warning);
243 private Task<object> Process(CloudMoveAction action)
246 throw new ArgumentNullException("action");
247 Contract.EndContractBlock();
249 Log.InfoFormat("[ACTION] Start Processing {0}:{1}->{2}", action.Action, action.LocalFile, action.CloudFile.Name);
253 RenameCloudFile(action.OldFileName, action.NewPath, action.NewFileName);
254 Log.InfoFormat("[ACTION] End Processing {0}:{1}->{2}", action.Action, action.LocalFile, action.CloudFile.Name);
256 catch (OperationCanceledException)
260 catch (Exception exc)
262 Log.ErrorFormat("[REQUEUE] {0} : {1} -> {2} due to exception\r\n{3}",
263 action.Action, action.OldFileName, action.NewFileName, exc);
267 return CompletedTask<object>.Default;
272 public void Post(CloudAction cloudAction)
274 if (cloudAction == null)
275 throw new ArgumentNullException("cloudAction");
276 Contract.EndContractBlock();
278 //If the action targets a local file, add a treehash calculation
279 if (cloudAction.LocalFile != null)
282 if (cloudAction.LocalFile.Length>BlockSize)
283 cloudAction.TopHash = new Lazy<string>(() => Signature.CalculateTreeHashAsync(cloudAction.LocalFile,
284 BlockSize, BlockHash).Result
285 .TopHash.ToHashString());
288 cloudAction.TopHash=new Lazy<string>(()=> cloudAction.LocalHash.Value);
292 _agent.Post(cloudAction);
295 class ObjectInfoByNameComparer:IEqualityComparer<ObjectInfo>
297 public bool Equals(ObjectInfo x, ObjectInfo y)
299 return x.Name.Equals(y.Name,StringComparison.InvariantCultureIgnoreCase);
302 public int GetHashCode(ObjectInfo obj)
304 return obj.Name.ToLower().GetHashCode();
308 //Remote files are polled periodically. Any changes are processed
309 public Task ProcessRemoteFiles(string accountPath,DateTime? since=null)
311 if (String.IsNullOrWhiteSpace(accountPath))
312 throw new ArgumentNullException(accountPath);
313 Contract.EndContractBlock();
315 using (log4net.ThreadContext.Stacks["SCHEDULE"].Push("Retrieve Remote"))
317 Log.Info("[LISTENER] Scheduled");
319 //Get the list of server objects changed since the last check
320 var listObjects = Task<IList<ObjectInfo>>.Factory.StartNewDelayed(10000, () =>
321 CloudClient.ListObjects(CloudClient.UserName, PithosContainer, since));
322 //Get the list of deleted objects since the last check
323 var listTrash = Task<IList<ObjectInfo>>.Factory.StartNewDelayed(10000, () =>
324 CloudClient.ListObjects(CloudClient.UserName, TrashContainer, since));
326 var listShared = Task<IList<ObjectInfo>>.Factory.StartNewDelayed(10000, () =>
327 CloudClient.ListSharedObjects(since));
329 var listAll = Task.Factory.TrackedSequence(
334 //Next time we will check for all changes since the current check minus 1 second
335 //This is done to ensure there are no discrepancies due to clock differences
336 DateTime nextSince = DateTime.Now.AddSeconds(-1);
339 var enqueueFiles = listAll.ContinueWith(task =>
343 //ListObjects failed at this point, need to reschedule
344 Log.ErrorFormat("[FAIL] ListObjects in ProcessRemoteFiles with {0}", task.Exception);
345 ProcessRemoteFiles(accountPath, since);
348 using (log4net.ThreadContext.Stacks["SCHEDULE"].Push("Process Results"))
350 var remoteObjects = ((Task<IList<ObjectInfo>>) task.Result[0]).Result;
351 var trashObjects = ((Task<IList<ObjectInfo>>) task.Result[1]).Result;
352 var sharedObjects = ((Task<IList<ObjectInfo>>) task.Result[2]).Result;
354 //Items with the same name, hash may be both in the container and the trash
355 //Don't delete items that exist in the container
356 var realTrash = from trash in trashObjects
357 where !remoteObjects.Any(info => info.Hash == trash.Hash)
359 ProcessDeletedFiles(realTrash);
362 var remote = from info in remoteObjects.Union(sharedObjects)
364 where !name.EndsWith(".ignore", StringComparison.InvariantCultureIgnoreCase) &&
365 !name.StartsWith("fragments/", StringComparison.InvariantCultureIgnoreCase)
368 //Create a list of actions from the remote files
369 var allActions = ObjectsToActions(remote);
371 //And remove those that are already being processed by the agent
372 var distinctActions = allActions
373 .Except(_agent.GetEnumerable(), new PithosMonitor.LocalFileComparer())
376 //Queue all the actions
377 foreach (var message in distinctActions)
382 //Report the number of new files
383 var remoteCount = distinctActions.Count(action=>
384 action.Action==CloudActionType.DownloadUnconditional);
385 if ( remoteCount > 0)
386 StatusNotification.NotifyChange(String.Format("Processing {0} new files", remoteCount));
388 Log.Info("[LISTENER] End Processing");
392 var loop = enqueueFiles.ContinueWith(t =>
396 Log.Error("[LISTENER] Exception", t.Exception);
400 Log.Info("[LISTENER] Finished");
402 ProcessRemoteFiles(accountPath, nextSince);
409 //Creates an appropriate action for each server file
410 private IEnumerable<CloudAction> ObjectsToActions(IEnumerable<ObjectInfo> remote)
413 throw new ArgumentNullException();
414 Contract.EndContractBlock();
416 //In order to avoid multiple iterations over the files, we iterate only once
417 //over the remote files
418 foreach (var objectInfo in remote)
420 var relativePath = objectInfo.RelativeUrlToFilePath(CloudClient.UserName);
421 //and remove any matching objects from the list, adding them to the commonObjects list
422 if (FileAgent.Exists(relativePath))
424 var localFile = FileAgent.GetFileInfo(relativePath);
425 var state = FileState.FindByFilePath(localFile.FullName);
426 //Common files should be checked on a per-case basis to detect differences, which is newer
428 yield return new CloudAction(CloudActionType.MustSynch,
429 localFile, objectInfo, state, BlockSize, BlockHash);
433 //If there is no match we add them to the localFiles list
434 //but only if the file is not marked for deletion
435 var targetFile = Path.Combine(FileAgent.RootPath, relativePath);
436 var fileStatus = StatusKeeper.GetFileStatus(targetFile);
437 if (fileStatus != FileStatus.Deleted)
439 //Remote files should be downloaded
440 yield return new CloudDownloadAction(objectInfo);
446 private void ProcessDeletedFiles(IEnumerable<ObjectInfo> trashObjects)
448 foreach (var trashObject in trashObjects)
450 var relativePath = trashObject.RelativeUrlToFilePath(CloudClient.UserName);
451 //and remove any matching objects from the list, adding them to the commonObjects list
452 FileAgent.Delete(relativePath);
457 private void RenameCloudFile(string account, string container,string oldFileName, string newPath, string newFileName)
459 if (String.IsNullOrWhiteSpace(account))
460 throw new ArgumentNullException("account");
461 if (String.IsNullOrWhiteSpace(container))
462 throw new ArgumentNullException("container");
463 if (String.IsNullOrWhiteSpace(oldFileName))
464 throw new ArgumentNullException("oldFileName");
465 if (String.IsNullOrWhiteSpace(oldFileName))
466 throw new ArgumentNullException("newPath");
467 if (String.IsNullOrWhiteSpace(oldFileName))
468 throw new ArgumentNullException("newFileName");
469 Contract.EndContractBlock();
470 //The local file is already renamed
471 this.StatusKeeper.SetFileOverlayStatus(newPath, FileOverlayStatus.Modified);
473 CloudClient.MoveObject(account, container, oldFileName, container, newFileName);
475 this.StatusKeeper.SetFileStatus(newPath, FileStatus.Unchanged);
476 this.StatusKeeper.SetFileOverlayStatus(newPath, FileOverlayStatus.Normal);
477 NativeMethods.RaiseChangeNotification(newPath);
480 private void DeleteCloudFile(string account,string container, string fileName)
482 if (String.IsNullOrWhiteSpace(account))
483 throw new ArgumentNullException("account");
484 if (String.IsNullOrWhiteSpace(container))
485 throw new ArgumentNullException("container");
486 if (String.IsNullOrWhiteSpace(container))
487 throw new ArgumentNullException("container");
489 if (String.IsNullOrWhiteSpace(fileName))
490 throw new ArgumentNullException("fileName");
491 if (Path.IsPathRooted(fileName))
492 throw new ArgumentException("The fileName should not be rooted","fileName");
493 Contract.EndContractBlock();
495 using ( log4net.ThreadContext.Stacks["DeleteCloudFile"].Push("Delete"))
497 var info = FileAgent.GetFileInfo(fileName);
498 var path = info.FullName.ToLower();
499 this.StatusKeeper.SetFileOverlayStatus(path, FileOverlayStatus.Modified);
501 CloudClient.DeleteObject(account, container, fileName, TrashContainer);
503 this.StatusKeeper.ClearFileStatus(path);
508 private void DownloadCloudFile(string account,string container, Uri relativeUrl, string localPath)
510 if (String.IsNullOrWhiteSpace(account))
511 throw new ArgumentNullException("account");
512 if (String.IsNullOrWhiteSpace(container))
513 throw new ArgumentNullException("container");
514 if (relativeUrl == null)
515 throw new ArgumentNullException("relativeUrl");
516 if (String.IsNullOrWhiteSpace(localPath))
517 throw new ArgumentNullException("localPath");
518 if (!Path.IsPathRooted(localPath))
519 throw new ArgumentException("The localPath must be rooted", "localPath");
520 Contract.EndContractBlock();
522 var download=Task.Factory.Iterate(DownloadIterator(account,container, relativeUrl, localPath));
526 private IEnumerable<Task> DownloadIterator(string account,string container, Uri relativeUrl, string localPath)
528 if (String.IsNullOrWhiteSpace(account))
529 throw new ArgumentNullException("account");
530 if (String.IsNullOrWhiteSpace(container))
531 throw new ArgumentNullException("container");
532 if (relativeUrl==null)
533 throw new ArgumentNullException("relativeUrl");
534 if (String.IsNullOrWhiteSpace(localPath))
535 throw new ArgumentNullException("localPath");
536 if (!Path.IsPathRooted(localPath))
537 throw new ArgumentException("The localPath must be rooted", "localPath");
538 Contract.EndContractBlock();
540 var url = relativeUrl.ToString();
541 if (url.EndsWith(".ignore",StringComparison.InvariantCultureIgnoreCase))
544 //Are we already downloading or uploading the file?
545 using (var gate=NetworkGate.Acquire(localPath, NetworkOperation.Downloading))
549 //The file's hashmap will be stored in the same location with the extension .hashmap
550 //var hashPath = Path.Combine(FileAgent.FragmentsPath, relativePath + ".hashmap");
552 //Retrieve the hashmap from the server
553 var getHashMap = CloudClient.GetHashMap(account, container, url);
554 yield return getHashMap;
556 var serverHash=getHashMap.Result;
557 //If it's a small file
558 var downloadTask=(serverHash.Hashes.Count == 1 )
559 //Download it in one go
560 ? DownloadEntireFile(account,container, relativeUrl, localPath)
561 //Otherwise download it block by block
562 : DownloadWithBlocks(account,container, relativeUrl, localPath, serverHash);
564 yield return downloadTask;
567 //Retrieve the object's metadata
568 var info=CloudClient.GetObjectInfo(account, container, url);
570 StatusKeeper.StoreInfo(localPath, info);
572 //Notify listeners that a local file has changed
573 StatusNotification.NotifyChangedFile(localPath);
578 //Download a small file with a single GET operation
579 private Task DownloadEntireFile(string account,string container, Uri relativeUrl, string localPath)
581 if (String.IsNullOrWhiteSpace(account))
582 throw new ArgumentNullException("account");
583 if (String.IsNullOrWhiteSpace(container))
584 throw new ArgumentNullException("container");
585 if (relativeUrl == null)
586 throw new ArgumentNullException("relativeUrl");
587 if (String.IsNullOrWhiteSpace(localPath))
588 throw new ArgumentNullException("localPath");
589 if (!Path.IsPathRooted(localPath))
590 throw new ArgumentException("The localPath must be rooted", "localPath");
591 Contract.EndContractBlock();
593 //Calculate the relative file path for the new file
594 var relativePath = relativeUrl.RelativeUriToFilePath();
595 //The file will be stored in a temporary location while downloading with an extension .download
596 var tempPath = Path.Combine(FileAgent.FragmentsPath, relativePath + ".download");
597 //Make sure the target folder exists. DownloadFileTask will not create the folder
598 var directoryPath = Path.GetDirectoryName(tempPath);
599 if (!Directory.Exists(directoryPath))
600 Directory.CreateDirectory(directoryPath);
602 //Download the object to the temporary location
603 var getObject = CloudClient.GetObject(account, container, relativeUrl.ToString(), tempPath).ContinueWith(t =>
605 t.PropagateExceptions();
606 //And move it to its actual location once downloading is finished
607 if (File.Exists(localPath))
608 File.Replace(tempPath,localPath,null,true);
610 File.Move(tempPath,localPath);
615 //Download a file asynchronously using blocks
616 public Task DownloadWithBlocks(string account,string container, Uri relativeUrl, string localPath, TreeHash serverHash)
618 if (String.IsNullOrWhiteSpace(account))
619 throw new ArgumentNullException("account");
620 if (String.IsNullOrWhiteSpace(container))
621 throw new ArgumentNullException("container");
622 if (relativeUrl == null)
623 throw new ArgumentNullException("relativeUrl");
624 if (String.IsNullOrWhiteSpace(localPath))
625 throw new ArgumentNullException("localPath");
626 if (!Path.IsPathRooted(localPath))
627 throw new ArgumentException("The localPath must be rooted", "localPath");
628 if (serverHash == null)
629 throw new ArgumentNullException("serverHash");
630 Contract.EndContractBlock();
632 return Task.Factory.Iterate(BlockDownloadIterator(account,container, relativeUrl, localPath, serverHash));
635 private IEnumerable<Task> BlockDownloadIterator(string account,string container,Uri relativeUrl, string localPath,TreeHash serverHash)
637 if (String.IsNullOrWhiteSpace(account))
638 throw new ArgumentNullException("account");
639 if (String.IsNullOrWhiteSpace(container))
640 throw new ArgumentNullException("container");
641 if (relativeUrl == null)
642 throw new ArgumentNullException("relativeUrl");
643 if (String.IsNullOrWhiteSpace(localPath))
644 throw new ArgumentNullException("localPath");
645 if (!Path.IsPathRooted(localPath))
646 throw new ArgumentException("The localPath must be rooted", "localPath");
648 throw new ArgumentNullException("serverHash");
649 Contract.EndContractBlock();
652 //Calculate the relative file path for the new file
653 var relativePath = relativeUrl.RelativeUriToFilePath();
654 var blockUpdater = new BlockUpdater(FileAgent.FragmentsPath, localPath, relativePath, serverHash);
658 //Calculate the file's treehash
659 var calcHash = Signature.CalculateTreeHashAsync(localPath, this.BlockSize,BlockHash);
660 yield return calcHash;
661 var treeHash = calcHash.Result;
663 //And compare it with the server's hash
664 var upHashes = serverHash.GetHashesAsStrings();
665 var localHashes = treeHash.HashDictionary;
666 for (int i = 0; i < upHashes.Length; i++)
668 //For every non-matching hash
669 var upHash = upHashes[i];
670 if (!localHashes.ContainsKey(upHash))
672 if (blockUpdater.UseOrphan(i, upHash))
674 Log.InfoFormat("[BLOCK GET] ORPHAN FOUND for {0} of {1} for {2}", i, upHashes.Length, localPath);
677 Log.InfoFormat("[BLOCK GET] START {0} of {1} for {2}", i, upHashes.Length, localPath);
678 var start = i*BlockSize;
679 //To download the last block just pass a null for the end of the range
681 if (i < upHashes.Length - 1 )
682 end= ((i + 1)*BlockSize) ;
684 //Download the missing block
685 var getBlock = CloudClient.GetBlock(account, container, relativeUrl, start, end);
686 yield return getBlock;
687 var block = getBlock.Result;
690 yield return blockUpdater.StoreBlock(i, block);
693 Log.InfoFormat("[BLOCK GET] FINISH {0} of {1} for {2}", i, upHashes.Length, localPath);
697 blockUpdater.Commit();
698 Log.InfoFormat("[BLOCK GET] COMPLETE {0}", localPath);
702 private void UploadCloudFile(string account,string container,FileInfo fileInfo, string hash,string topHash)
704 if (String.IsNullOrWhiteSpace(account))
705 throw new ArgumentNullException("account");
706 if (String.IsNullOrWhiteSpace(container))
707 throw new ArgumentNullException("container");
708 if (fileInfo == null)
709 throw new ArgumentNullException("fileInfo");
710 if (String.IsNullOrWhiteSpace(hash))
711 throw new ArgumentNullException("hash");
713 throw new ArgumentNullException("topHash");
714 Contract.EndContractBlock();
716 var upload = Task.Factory.Iterate(UploadIterator(account,container,fileInfo, hash.ToLower(), topHash.ToLower()));
720 private IEnumerable<Task> UploadIterator(string account,string container,FileInfo fileInfo, string hash,string topHash)
722 if (String.IsNullOrWhiteSpace(account))
723 throw new ArgumentNullException("account");
724 if (String.IsNullOrWhiteSpace(container))
725 throw new ArgumentNullException("container");
726 if (fileInfo == null)
727 throw new ArgumentNullException("fileInfo");
728 if (String.IsNullOrWhiteSpace(hash))
729 throw new ArgumentNullException("hash");
731 throw new ArgumentNullException("topHash");
732 Contract.EndContractBlock();
734 if (fileInfo.Extension.Equals("ignore", StringComparison.InvariantCultureIgnoreCase))
737 var url = fileInfo.AsRelativeUrlTo(FileAgent.RootPath);
739 var fullFileName = fileInfo.FullName;
740 using(var gate=NetworkGate.Acquire(fullFileName,NetworkOperation.Uploading))
742 //Abort if the file is already being uploaded or downloaded
747 //Even if GetObjectInfo times out, we can proceed with the upload
748 var info = CloudClient.GetObjectInfo(account, container, url);
749 var cloudHash = info.Hash.ToLower();
751 //If the file hashes match, abort the upload
752 if (hash == cloudHash || topHash ==cloudHash)
754 //but store any metadata changes
755 this.StatusKeeper.StoreInfo(fullFileName, info);
756 Log.InfoFormat("Skip upload of {0}, hashes match", fullFileName);
760 //Mark the file as modified while we upload it
761 StatusKeeper.SetFileOverlayStatus(fullFileName, FileOverlayStatus.Modified);
764 //If the file is larger than the block size, try a hashmap PUT
765 if (fileInfo.Length > BlockSize )
767 //To upload using a hashmap
768 //First, calculate the tree hash
769 var treeHash = Signature.CalculateTreeHashAsync(fileInfo.FullName, BlockSize, BlockHash);
770 yield return treeHash;
772 yield return Task.Factory.Iterate(UploadWithHashMap(account,container,fileInfo,url,treeHash));
777 //Otherwise do a regular PUT
778 yield return CloudClient.PutObject(account, container, url, fullFileName, hash);
780 //If everything succeeds, change the file and overlay status to normal
781 this.StatusKeeper.SetFileState(fullFileName, FileStatus.Unchanged, FileOverlayStatus.Normal);
783 //Notify the Shell to update the overlays
784 NativeMethods.RaiseChangeNotification(fullFileName);
785 StatusNotification.NotifyChangedFile(fullFileName);
788 public IEnumerable<Task> UploadWithHashMap(string account,string container,FileInfo fileInfo,string url,Task<TreeHash> treeHash)
790 if (String.IsNullOrWhiteSpace(account))
791 throw new ArgumentNullException("account");
792 if (String.IsNullOrWhiteSpace(container))
793 throw new ArgumentNullException("container");
794 if (fileInfo == null)
795 throw new ArgumentNullException("fileInfo");
796 if (String.IsNullOrWhiteSpace(url))
797 throw new ArgumentNullException(url);
799 throw new ArgumentNullException("treeHash");
800 Contract.EndContractBlock();
802 var fullFileName = fileInfo.FullName;
804 //Send the hashmap to the server
805 var hashPut = CloudClient.PutHashMap(account, container, url, treeHash.Result);
806 yield return hashPut;
808 var missingHashes = hashPut.Result;
809 //If the server returns no missing hashes, we are done
810 while (missingHashes.Count > 0)
813 var buffer = new byte[BlockSize];
814 foreach (var missingHash in missingHashes)
816 //Find the proper block
817 var blockIndex = treeHash.Result.HashDictionary[missingHash];
818 var offset = blockIndex*BlockSize;
820 var read = fileInfo.Read(buffer, offset, BlockSize);
822 //And upload the block
823 var postBlock = CloudClient.PostBlock(account, container, buffer, 0, read);
825 //We have to handle possible exceptions in a continuation because
826 //*yield return* can't appear inside a try block
827 yield return postBlock.ContinueWith(t =>
829 exc => Log.ErrorFormat("[ERROR] uploading block {0} of {1}\n{2}", blockIndex, fullFileName, exc),
830 ()=>Log.InfoFormat("[BLOCK] Block {0} of {1} uploaded", blockIndex,fullFileName)));
833 //Repeat until there are no more missing hashes
834 hashPut = CloudClient.PutHashMap(account, container, url, treeHash.Result);
835 yield return hashPut;
836 missingHashes = hashPut.Result;