using System; using System.ComponentModel.Composition; using System.Diagnostics.Contracts; using System.IO; using System.Reflection; using System.Threading; using System.Threading.Tasks; using Pithos.Interfaces; using Pithos.Network; using log4net; namespace Pithos.Core.Agents { [Export(typeof(Downloader))] public class Downloader { private static readonly ILog Log = LogManager.GetLogger(MethodBase.GetCurrentMethod().DeclaringType); [Import] private IStatusKeeper StatusKeeper { get; set; } [Import] private IPithosSettings Settings { get; set; } public IStatusNotification StatusNotification { get; set; } /* private CancellationTokenSource _cts=new CancellationTokenSource(); public void SignalStop() { _cts.Cancel(); } */ //Download a file. public async Task DownloadCloudFile(AccountInfo accountInfo, ObjectInfo cloudFile, string filePath,CancellationToken cancellationToken) { if (accountInfo == null) throw new ArgumentNullException("accountInfo"); if (cloudFile == null) throw new ArgumentNullException("cloudFile"); if (String.IsNullOrWhiteSpace(cloudFile.Account)) throw new ArgumentNullException("cloudFile"); if (cloudFile.Container==null) throw new ArgumentNullException("cloudFile"); if (cloudFile.Container.IsAbsoluteUri) throw new ArgumentNullException("cloudFile"); if (String.IsNullOrWhiteSpace(filePath)) throw new ArgumentNullException("filePath"); if (!Path.IsPathRooted(filePath)) throw new ArgumentException("The filePath must be rooted", "filePath"); Contract.EndContractBlock(); using (ThreadContext.Stacks["Operation"].Push("DownloadCloudFile")) { // var cancellationToken=_cts.Token;// .ThrowIfCancellationRequested(); //The file's treehash after download completes. For directories, the treehash is always the empty hash var finalHash = TreeHash.Empty; if (await WaitOrAbort(accountInfo,cloudFile, cancellationToken).ConfigureAwait(false)) return finalHash; var fileName = Path.GetFileName(filePath); var info = FileInfoExtensions.FromPath(filePath).WithProperCapitalization(); TreeHash localTreeHash; using (StatusNotification.GetNotifier("Hashing for Download {0}", "Hashed for Download {0}", fileName)) { var state = StatusKeeper.GetStateByFilePath(filePath); var progress = new Progress(d => StatusNotification.Notify(new StatusNotification(String.Format("Hashing for Download {0} of {1}", d, fileName)))); localTreeHash = StatusAgent.CalculateTreeHash(info, accountInfo, state, Settings.HashingParallelism, cancellationToken, progress); } var localPath = info.FullName; var relativeUrl = cloudFile.Name; var url = relativeUrl.ToString(); if (url.EndsWith(".ignore", StringComparison.InvariantCultureIgnoreCase)) return finalHash; if (!Selectives.IsSelected(accountInfo,cloudFile)) return finalHash; //Are we already downloading or uploading the file? using (var gate = NetworkGate.Acquire(localPath, NetworkOperation.Downloading)) { if (gate.Failed) return finalHash; var client = new CloudFilesClient(accountInfo); var account = cloudFile.Account; var container = cloudFile.Container; if (cloudFile.IsDirectory) { if (!Directory.Exists(localPath)) try { Directory.CreateDirectory(localPath); if (Log.IsDebugEnabled) Log.DebugFormat("Created Directory [{0}]", localPath); } catch (IOException) { var localInfo = new FileInfo(localPath); if (localInfo.Exists && localInfo.Length == 0) { Log.WarnFormat("Malformed directory object detected for [{0}]", localPath); localInfo.Delete(); Directory.CreateDirectory(localPath); if (Log.IsDebugEnabled) Log.DebugFormat("Created Directory [{0}]", localPath); } } } else { var isChanged = IsObjectChanged(cloudFile, localPath,localTreeHash); if (isChanged) { //Retrieve the hashmap from the server var serverHash = await client.GetHashMap(account, container, relativeUrl).ConfigureAwait(false); //If it's a small file if (serverHash.Hashes.Count == 1) //Download it in one go await DownloadEntireFileAsync(accountInfo, client, cloudFile, relativeUrl, localPath,cancellationToken); //Otherwise download it block by block else await DownloadWithBlocks(accountInfo, client, cloudFile, relativeUrl, localPath,localTreeHash, serverHash,cancellationToken); if (!cloudFile.IsWritable(accountInfo.UserName)) { var attributes = File.GetAttributes(localPath); File.SetAttributes(localPath, attributes | FileAttributes.ReadOnly); } //Once download completes, the final hash will be equal to the server hash finalHash = serverHash; } } //Now we can store the object's metadata without worrying about ghost status entries StatusKeeper.StoreInfo(localPath, cloudFile,finalHash); } return finalHash; } } //Download a file asynchronously using blocks public async Task DownloadWithBlocks(AccountInfo accountInfo, CloudFilesClient client, ObjectInfo cloudFile, Uri relativeUrl, string filePath, TreeHash localTreeHash,TreeHash serverHash, CancellationToken cancellationToken) { if (client == null) throw new ArgumentNullException("client"); if (cloudFile == null) throw new ArgumentNullException("cloudFile"); if (relativeUrl == null) throw new ArgumentNullException("relativeUrl"); if (String.IsNullOrWhiteSpace(filePath)) throw new ArgumentNullException("filePath"); if (!Path.IsPathRooted(filePath)) throw new ArgumentException("The filePath must be rooted", "filePath"); if (serverHash == null) throw new ArgumentNullException("serverHash"); if (cloudFile.IsDirectory) throw new ArgumentException("cloudFile is a directory, not a file", "cloudFile"); Contract.EndContractBlock(); if (await WaitOrAbort(accountInfo, cloudFile, cancellationToken).ConfigureAwait(false)) return; var fileAgent = GetFileAgent(accountInfo); var localPath = FileInfoExtensions.GetProperFilePathCapitalization(filePath); //Calculate the relative file path for the new file var relativePath = relativeUrl.RelativeUriToFilePath(); var blockUpdater = new BlockUpdater(fileAgent.CachePath, localPath, relativePath, serverHash); StatusNotification.SetPithosStatus(PithosStatus.LocalSyncing, String.Format("Calculating hashmap for {0} before download", Path.GetFileName(localPath))); //Calculate the file's treehash var fileName = Path.GetFileName(localPath); var progress = new Progress(d => StatusNotification.Notify(new StatusNotification(String.Format("Hashing for Download {0} of {1}", d, fileName)))); var treeHash = localTreeHash ?? Signature.CalculateTreeHashAsync(localPath, (int)serverHash.BlockSize, serverHash.BlockHash, Settings.HashingParallelism,cancellationToken,progress); //And compare it with the server's hash var upHashes = serverHash.GetHashesAsStrings(); var localHashes = treeHash.HashDictionary; ReportDownloadProgress(Path.GetFileName(localPath), 0,0, upHashes.Length, cloudFile.Bytes); long i = 0; client.DownloadProgressChanged += (sender, args) => ReportDownloadProgress(Path.GetFileName(localPath), i, args.ProgressPercentage, upHashes.Length, cloudFile.Bytes); for (i = 0; i < upHashes.Length; i++) { if (await WaitOrAbort(accountInfo, cloudFile, cancellationToken).ConfigureAwait(false)) return; //For every non-matching hash var upHash = upHashes[i]; if (!localHashes.ContainsKey(upHash)) { StatusNotification.Notify(new CloudNotification { Data = cloudFile }); ReportDownloadProgress(Path.GetFileName(localPath), i, 0,upHashes.Length, cloudFile.Bytes); if (blockUpdater.UseOrphan(i, upHash)) { Log.InfoFormat("[BLOCK GET] ORPHAN FOUND for {0} of {1} for {2}", i, upHashes.Length, localPath); continue; } Log.InfoFormat("[BLOCK GET] START {0} of {1} for {2}", i, upHashes.Length, localPath); long start = i * serverHash.BlockSize; //To download the last block just pass a null for the end of the range long? end = null; if (i < upHashes.Length - 1) end = ((i + 1) * serverHash.BlockSize); //Download the missing block byte[] block = await client.GetBlock(cloudFile.Account, cloudFile.Container, relativeUrl, start, end, cancellationToken).ConfigureAwait(false); //and store it blockUpdater.StoreBlock(i, block); Log.InfoFormat("[BLOCK GET] FINISH {0} of {1} for {2}", i, upHashes.Length, localPath); } ReportDownloadProgress(Path.GetFileName(localPath), i, 100,upHashes.Length, cloudFile.Bytes); } //Want to avoid notifications if no changes were made var hasChanges = blockUpdater.HasBlocks; blockUpdater.Commit(); if (hasChanges) //Notify listeners that a local file has changed StatusNotification.NotifyChangedFile(localPath); Log.InfoFormat("[BLOCK GET] COMPLETE {0}", localPath); } //Download a small file with a single GET operation private async Task DownloadEntireFileAsync(AccountInfo accountInfo, CloudFilesClient client, ObjectInfo cloudFile, Uri relativeUrl, string filePath, CancellationToken cancellationToken) { if (client == null) throw new ArgumentNullException("client"); if (cloudFile == null) throw new ArgumentNullException("cloudFile"); if (relativeUrl == null) throw new ArgumentNullException("relativeUrl"); if (String.IsNullOrWhiteSpace(filePath)) throw new ArgumentNullException("filePath"); if (!Path.IsPathRooted(filePath)) throw new ArgumentException("The localPath must be rooted", "filePath"); if (cloudFile.IsDirectory) throw new ArgumentException("cloudFile is a directory, not a file", "cloudFile"); Contract.EndContractBlock(); if (await WaitOrAbort(accountInfo, cloudFile, cancellationToken).ConfigureAwait(false)) return; var localPath = FileInfoExtensions.GetProperFilePathCapitalization(filePath); StatusNotification.SetPithosStatus(PithosStatus.LocalSyncing, String.Format("Downloading {0}", Path.GetFileName(localPath))); StatusNotification.Notify(new CloudNotification { Data = cloudFile }); ReportDownloadProgress(Path.GetFileName(localPath), 1, 0,1, cloudFile.Bytes); var fileAgent = GetFileAgent(accountInfo); //Calculate the relative file path for the new file var relativePath = relativeUrl.RelativeUriToFilePath(); //The file will be stored in a temporary location while downloading with an extension .download var tempPath = Path.Combine(fileAgent.CachePath, relativePath + ".download"); //Make sure the target folder exists. DownloadFileTask will not create the folder var tempFolder = Path.GetDirectoryName(tempPath); if (!Directory.Exists(tempFolder)) Directory.CreateDirectory(tempFolder); //Download the object to the temporary location await client.GetObject(cloudFile.Account, cloudFile.Container, relativeUrl, tempPath, cancellationToken).ConfigureAwait(false); //Create the local folder if it doesn't exist (necessary for shared objects) var localFolder = Path.GetDirectoryName(localPath); if (!Directory.Exists(localFolder)) try { Directory.CreateDirectory(localFolder); } catch (IOException) { //A file may already exist that has the same name as the new folder. //This may be an artifact of the way Pithos handles directories var fileInfo = new FileInfo(localFolder); if (fileInfo.Exists && fileInfo.Length == 0) { Log.WarnFormat("Malformed directory object detected for [{0}]", localFolder); fileInfo.Delete(); Directory.CreateDirectory(localFolder); } else throw; } //And move it to its actual location once downloading is finished if (File.Exists(localPath)) File.Replace(tempPath, localPath, null, true); else File.Move(tempPath, localPath); //Notify listeners that a local file has changed StatusNotification.NotifyChangedFile(localPath); } private void ReportDownloadProgress(string fileName, long block, int blockPercentage,int totalBlocks, long fileSize) { StatusNotification.Notify(totalBlocks == 0 ? new ProgressNotification(fileName, "Downloading", 1, blockPercentage,1, fileSize) : new ProgressNotification(fileName, "Downloading", block, blockPercentage, totalBlocks, fileSize)); } private bool IsObjectChanged(ObjectInfo cloudFile, string localPath,TreeHash localTreeHash) { //If the target is a directory, there are no changes to download if (Directory.Exists(localPath)) return false; //If the file doesn't exist, we have a chagne if (!File.Exists(localPath)) return true; //If there is no stored state, we have a change var localState = StatusKeeper.GetStateByFilePath(localPath); if (localState == null) return true; var localHash= localTreeHash.TopHash.ToHashString(); //If the file is different from the stored state, we have a change if (localState.Checksum != localHash) return true; //If the top hashes differ, we have a change return (localState.Checksum != cloudFile.X_Object_Hash); } private static FileAgent GetFileAgent(AccountInfo accountInfo) { return AgentLocator.Get(accountInfo.AccountPath); } private async Task WaitOrAbort(AccountInfo account,ObjectInfo cloudFile, CancellationToken token) { token.ThrowIfCancellationRequested(); await UnpauseEvent.WaitAsync().ConfigureAwait(false); var shouldAbort = !Selectives.IsSelected(account,cloudFile); if (shouldAbort) Log.InfoFormat("Aborting [{0}]", cloudFile.Uri); return shouldAbort; } [Import] public Selectives Selectives { get; set; } public AsyncManualResetEvent UnpauseEvent { get; set; } } }