using System; using System.Collections.Generic; using System.ComponentModel.Composition; using System.Diagnostics.Contracts; using System.IO; using System.Linq; using System.Reflection; using System.Threading; using System.Threading.Tasks; using Pithos.Interfaces; using Pithos.Network; using log4net; namespace Pithos.Core.Agents { [Export(typeof(Downloader))] public class Downloader { private static readonly ILog Log = LogManager.GetLogger(MethodBase.GetCurrentMethod().DeclaringType); [Import] private IStatusKeeper StatusKeeper { get; set; } public IStatusNotification StatusNotification { get; set; } /* private CancellationTokenSource _cts=new CancellationTokenSource(); public void SignalStop() { _cts.Cancel(); } */ //Download a file. public async Task DownloadCloudFile(AccountInfo accountInfo, ObjectInfo cloudFile, string filePath,CancellationToken cancellationToken) { if (accountInfo == null) throw new ArgumentNullException("accountInfo"); if (cloudFile == null) throw new ArgumentNullException("cloudFile"); if (String.IsNullOrWhiteSpace(cloudFile.Account)) throw new ArgumentNullException("cloudFile"); if (String.IsNullOrWhiteSpace(cloudFile.Container)) throw new ArgumentNullException("cloudFile"); if (String.IsNullOrWhiteSpace(filePath)) throw new ArgumentNullException("filePath"); if (!Path.IsPathRooted(filePath)) throw new ArgumentException("The filePath must be rooted", "filePath"); Contract.EndContractBlock(); using (ThreadContext.Stacks["Operation"].Push("DownloadCloudFile")) { // var cancellationToken=_cts.Token;// .ThrowIfCancellationRequested(); cancellationToken.ThrowIfCancellationRequested(); await UnpauseEvent.WaitAsync(); var localPath = FileInfoExtensions.GetProperFilePathCapitalization(filePath); var relativeUrl = new Uri(cloudFile.Name, UriKind.Relative); var url = relativeUrl.ToString(); if (cloudFile.Name.EndsWith(".ignore", StringComparison.InvariantCultureIgnoreCase)) return; if (!Selectives.IsSelected(cloudFile)) return; //Are we already downloading or uploading the file? using (var gate = NetworkGate.Acquire(localPath, NetworkOperation.Downloading)) { if (gate.Failed) return; var client = new CloudFilesClient(accountInfo); var account = cloudFile.Account; var container = cloudFile.Container; if (cloudFile.IsDirectory) { if (!Directory.Exists(localPath)) try { Directory.CreateDirectory(localPath); if (Log.IsDebugEnabled) Log.DebugFormat("Created Directory [{0}]", localPath); } catch (IOException) { var localInfo = new FileInfo(localPath); if (localInfo.Exists && localInfo.Length == 0) { Log.WarnFormat("Malformed directory object detected for [{0}]", localPath); localInfo.Delete(); Directory.CreateDirectory(localPath); if (Log.IsDebugEnabled) Log.DebugFormat("Created Directory [{0}]", localPath); } } } else { var isChanged = IsObjectChanged(cloudFile, localPath); if (isChanged) { //Retrieve the hashmap from the server var serverHash = await client.GetHashMap(account, container, url); //If it's a small file if (serverHash.Hashes.Count == 1) //Download it in one go await DownloadEntireFileAsync(accountInfo, client, cloudFile, relativeUrl, localPath,cancellationToken); //Otherwise download it block by block else await DownloadWithBlocks(accountInfo, client, cloudFile, relativeUrl, localPath, serverHash,cancellationToken); if (!cloudFile.IsWritable(accountInfo.UserName)) { var attributes = File.GetAttributes(localPath); File.SetAttributes(localPath, attributes | FileAttributes.ReadOnly); } } } //Now we can store the object's metadata without worrying about ghost status entries StatusKeeper.StoreInfo(localPath, cloudFile); } } } //Download a file asynchronously using blocks public async Task DownloadWithBlocks(AccountInfo accountInfo, CloudFilesClient client, ObjectInfo cloudFile, Uri relativeUrl, string filePath, TreeHash serverHash, CancellationToken cancellationToken) { if (client == null) throw new ArgumentNullException("client"); if (cloudFile == null) throw new ArgumentNullException("cloudFile"); if (relativeUrl == null) throw new ArgumentNullException("relativeUrl"); if (String.IsNullOrWhiteSpace(filePath)) throw new ArgumentNullException("filePath"); if (!Path.IsPathRooted(filePath)) throw new ArgumentException("The filePath must be rooted", "filePath"); if (serverHash == null) throw new ArgumentNullException("serverHash"); if (cloudFile.IsDirectory) throw new ArgumentException("cloudFile is a directory, not a file", "cloudFile"); Contract.EndContractBlock(); cancellationToken.ThrowIfCancellationRequested(); await UnpauseEvent.WaitAsync(); var fileAgent = GetFileAgent(accountInfo); var localPath = FileInfoExtensions.GetProperFilePathCapitalization(filePath); //Calculate the relative file path for the new file var relativePath = relativeUrl.RelativeUriToFilePath(); var blockUpdater = new BlockUpdater(fileAgent.CachePath, localPath, relativePath, serverHash); StatusNotification.SetPithosStatus(PithosStatus.LocalSyncing, String.Format("Calculating hashmap for {0} before download", Path.GetFileName(localPath))); //Calculate the file's treehash //TODO: Should pass cancellation token here var treeHash = await Signature.CalculateTreeHashAsync(localPath, serverHash.BlockSize, serverHash.BlockHash, 2); //And compare it with the server's hash var upHashes = serverHash.GetHashesAsStrings(); var localHashes = treeHash.HashDictionary; ReportDownloadProgress(Path.GetFileName(localPath), 0, upHashes.Length, cloudFile.Bytes); for (int i = 0; i < upHashes.Length; i++) { cancellationToken.ThrowIfCancellationRequested(); await UnpauseEvent.WaitAsync(); //For every non-matching hash var upHash = upHashes[i]; if (!localHashes.ContainsKey(upHash)) { StatusNotification.Notify(new CloudNotification { Data = cloudFile }); if (blockUpdater.UseOrphan(i, upHash)) { Log.InfoFormat("[BLOCK GET] ORPHAN FOUND for {0} of {1} for {2}", i, upHashes.Length, localPath); continue; } Log.InfoFormat("[BLOCK GET] START {0} of {1} for {2}", i, upHashes.Length, localPath); var start = i * serverHash.BlockSize; //To download the last block just pass a null for the end of the range long? end = null; if (i < upHashes.Length - 1) end = ((i + 1) * serverHash.BlockSize); //TODO: Pass token here //Download the missing block var block = await client.GetBlock(cloudFile.Account, cloudFile.Container, relativeUrl, start, end,cancellationToken); //and store it blockUpdater.StoreBlock(i, block); Log.InfoFormat("[BLOCK GET] FINISH {0} of {1} for {2}", i, upHashes.Length, localPath); } ReportDownloadProgress(Path.GetFileName(localPath), i, upHashes.Length, cloudFile.Bytes); } //Want to avoid notifications if no changes were made var hasChanges = blockUpdater.HasBlocks; blockUpdater.Commit(); if (hasChanges) //Notify listeners that a local file has changed StatusNotification.NotifyChangedFile(localPath); Log.InfoFormat("[BLOCK GET] COMPLETE {0}", localPath); } //Download a small file with a single GET operation private async Task DownloadEntireFileAsync(AccountInfo accountInfo, CloudFilesClient client, ObjectInfo cloudFile, Uri relativeUrl, string filePath, CancellationToken cancellationToken) { if (client == null) throw new ArgumentNullException("client"); if (cloudFile == null) throw new ArgumentNullException("cloudFile"); if (relativeUrl == null) throw new ArgumentNullException("relativeUrl"); if (String.IsNullOrWhiteSpace(filePath)) throw new ArgumentNullException("filePath"); if (!Path.IsPathRooted(filePath)) throw new ArgumentException("The localPath must be rooted", "filePath"); if (cloudFile.IsDirectory) throw new ArgumentException("cloudFile is a directory, not a file", "cloudFile"); Contract.EndContractBlock(); cancellationToken.ThrowIfCancellationRequested(); await UnpauseEvent.WaitAsync(); var localPath = FileInfoExtensions.GetProperFilePathCapitalization(filePath); StatusNotification.SetPithosStatus(PithosStatus.LocalSyncing, String.Format("Downloading {0}", Path.GetFileName(localPath))); StatusNotification.Notify(new CloudNotification { Data = cloudFile }); var fileAgent = GetFileAgent(accountInfo); //Calculate the relative file path for the new file var relativePath = relativeUrl.RelativeUriToFilePath(); //The file will be stored in a temporary location while downloading with an extension .download var tempPath = Path.Combine(fileAgent.CachePath, relativePath + ".download"); //Make sure the target folder exists. DownloadFileTask will not create the folder var tempFolder = Path.GetDirectoryName(tempPath); if (!Directory.Exists(tempFolder)) Directory.CreateDirectory(tempFolder); //TODO: Should pass the token here //Download the object to the temporary location await client.GetObject(cloudFile.Account, cloudFile.Container, relativeUrl.ToString(), tempPath,cancellationToken); //Create the local folder if it doesn't exist (necessary for shared objects) var localFolder = Path.GetDirectoryName(localPath); if (!Directory.Exists(localFolder)) try { Directory.CreateDirectory(localFolder); } catch (IOException) { //A file may already exist that has the same name as the new folder. //This may be an artifact of the way Pithos handles directories var fileInfo = new FileInfo(localFolder); if (fileInfo.Exists && fileInfo.Length == 0) { Log.WarnFormat("Malformed directory object detected for [{0}]", localFolder); fileInfo.Delete(); Directory.CreateDirectory(localFolder); } else throw; } //And move it to its actual location once downloading is finished if (File.Exists(localPath)) File.Replace(tempPath, localPath, null, true); else File.Move(tempPath, localPath); //Notify listeners that a local file has changed StatusNotification.NotifyChangedFile(localPath); } private void ReportDownloadProgress(string fileName, int block, int totalBlocks, long fileSize) { StatusNotification.Notify(totalBlocks == 0 ? new ProgressNotification(fileName, "Downloading", 1, 1, fileSize) : new ProgressNotification(fileName, "Downloading", block, totalBlocks, fileSize)); } private bool IsObjectChanged(ObjectInfo cloudFile, string localPath) { //If the target is a directory, there are no changes to download if (Directory.Exists(localPath)) return false; //If the file doesn't exist, we have a chagne if (!File.Exists(localPath)) return true; //If there is no stored state, we have a change var localState = StatusKeeper.GetStateByFilePath(localPath); if (localState == null) return true; var info = new FileInfo(localPath); var shortHash = info.ComputeShortHash(); //If the file is different from the stored state, we have a change if (localState.ShortHash != shortHash) return true; //If the top hashes differ, we have a change return (localState.Checksum != cloudFile.Hash); } private static FileAgent GetFileAgent(AccountInfo accountInfo) { return AgentLocator.Get(accountInfo.AccountPath); } [Import] public Selectives Selectives { get; set; } public AsyncManualResetEvent UnpauseEvent { get; set; } } }