2 using System.Collections.Generic;
\r
3 using System.ComponentModel.Composition;
\r
4 using System.Diagnostics.Contracts;
\r
7 using System.Reflection;
\r
8 using System.Threading;
\r
9 using System.Threading.Tasks;
\r
10 using Pithos.Interfaces;
\r
11 using Pithos.Network;
\r
14 namespace Pithos.Core.Agents
\r
16 [Export(typeof(Downloader))]
\r
17 public class Downloader
\r
19 private static readonly ILog Log = LogManager.GetLogger(MethodBase.GetCurrentMethod().DeclaringType);
\r
22 private IStatusKeeper StatusKeeper { get; set; }
\r
25 public IStatusNotification StatusNotification { get; set; }
\r
28 private CancellationTokenSource _cts=new CancellationTokenSource();
\r
30 public void SignalStop()
\r
38 public async Task DownloadCloudFile(AccountInfo accountInfo, ObjectInfo cloudFile, string filePath,CancellationToken cancellationToken)
\r
40 if (accountInfo == null)
\r
41 throw new ArgumentNullException("accountInfo");
\r
42 if (cloudFile == null)
\r
43 throw new ArgumentNullException("cloudFile");
\r
44 if (String.IsNullOrWhiteSpace(cloudFile.Account))
\r
45 throw new ArgumentNullException("cloudFile");
\r
46 if (String.IsNullOrWhiteSpace(cloudFile.Container))
\r
47 throw new ArgumentNullException("cloudFile");
\r
48 if (String.IsNullOrWhiteSpace(filePath))
\r
49 throw new ArgumentNullException("filePath");
\r
50 if (!Path.IsPathRooted(filePath))
\r
51 throw new ArgumentException("The filePath must be rooted", "filePath");
\r
52 Contract.EndContractBlock();
\r
53 using (ThreadContext.Stacks["Operation"].Push("DownloadCloudFile"))
\r
55 // var cancellationToken=_cts.Token;// .ThrowIfCancellationRequested();
\r
57 if (await WaitOrAbort(accountInfo,cloudFile, cancellationToken))
\r
61 var localPath = FileInfoExtensions.GetProperFilePathCapitalization(filePath);
\r
62 var relativeUrl = new Uri(cloudFile.Name, UriKind.Relative);
\r
64 var url = relativeUrl.ToString();
\r
65 if (cloudFile.Name.EndsWith(".ignore", StringComparison.InvariantCultureIgnoreCase))
\r
68 if (!Selectives.IsSelected(accountInfo,cloudFile))
\r
72 //Are we already downloading or uploading the file?
\r
73 using (var gate = NetworkGate.Acquire(localPath, NetworkOperation.Downloading))
\r
78 var client = new CloudFilesClient(accountInfo);
\r
79 var account = cloudFile.Account;
\r
80 var container = cloudFile.Container;
\r
82 if (cloudFile.IsDirectory)
\r
84 if (!Directory.Exists(localPath))
\r
87 Directory.CreateDirectory(localPath);
\r
88 if (Log.IsDebugEnabled)
\r
89 Log.DebugFormat("Created Directory [{0}]", localPath);
\r
93 var localInfo = new FileInfo(localPath);
\r
94 if (localInfo.Exists && localInfo.Length == 0)
\r
96 Log.WarnFormat("Malformed directory object detected for [{0}]", localPath);
\r
98 Directory.CreateDirectory(localPath);
\r
99 if (Log.IsDebugEnabled)
\r
100 Log.DebugFormat("Created Directory [{0}]", localPath);
\r
106 var isChanged = IsObjectChanged(cloudFile, localPath);
\r
109 //Retrieve the hashmap from the server
\r
110 var serverHash = await client.GetHashMap(account, container, url);
\r
111 //If it's a small file
\r
112 if (serverHash.Hashes.Count == 1)
\r
113 //Download it in one go
\r
115 DownloadEntireFileAsync(accountInfo, client, cloudFile, relativeUrl, localPath,cancellationToken);
\r
116 //Otherwise download it block by block
\r
119 DownloadWithBlocks(accountInfo, client, cloudFile, relativeUrl, localPath,
\r
120 serverHash,cancellationToken);
\r
122 if (!cloudFile.IsWritable(accountInfo.UserName))
\r
124 var attributes = File.GetAttributes(localPath);
\r
125 File.SetAttributes(localPath, attributes | FileAttributes.ReadOnly);
\r
130 //Now we can store the object's metadata without worrying about ghost status entries
\r
131 StatusKeeper.StoreInfo(localPath, cloudFile);
\r
138 //Download a file asynchronously using blocks
\r
139 public async Task DownloadWithBlocks(AccountInfo accountInfo, CloudFilesClient client, ObjectInfo cloudFile, Uri relativeUrl, string filePath, TreeHash serverHash, CancellationToken cancellationToken)
\r
141 if (client == null)
\r
142 throw new ArgumentNullException("client");
\r
143 if (cloudFile == null)
\r
144 throw new ArgumentNullException("cloudFile");
\r
145 if (relativeUrl == null)
\r
146 throw new ArgumentNullException("relativeUrl");
\r
147 if (String.IsNullOrWhiteSpace(filePath))
\r
148 throw new ArgumentNullException("filePath");
\r
149 if (!Path.IsPathRooted(filePath))
\r
150 throw new ArgumentException("The filePath must be rooted", "filePath");
\r
151 if (serverHash == null)
\r
152 throw new ArgumentNullException("serverHash");
\r
153 if (cloudFile.IsDirectory)
\r
154 throw new ArgumentException("cloudFile is a directory, not a file", "cloudFile");
\r
155 Contract.EndContractBlock();
\r
157 if (await WaitOrAbort(accountInfo,cloudFile, cancellationToken))
\r
160 var fileAgent = GetFileAgent(accountInfo);
\r
161 var localPath = FileInfoExtensions.GetProperFilePathCapitalization(filePath);
\r
163 //Calculate the relative file path for the new file
\r
164 var relativePath = relativeUrl.RelativeUriToFilePath();
\r
165 var blockUpdater = new BlockUpdater(fileAgent.CachePath, localPath, relativePath, serverHash);
\r
167 StatusNotification.SetPithosStatus(PithosStatus.LocalSyncing, String.Format("Calculating hashmap for {0} before download", Path.GetFileName(localPath)));
\r
168 //Calculate the file's treehash
\r
170 //TODO: Should pass cancellation token here
\r
171 var treeHash = await Signature.CalculateTreeHashAsync(localPath, (int)serverHash.BlockSize, serverHash.BlockHash, 2);
\r
173 //And compare it with the server's hash
\r
174 var upHashes = serverHash.GetHashesAsStrings();
\r
175 var localHashes = treeHash.HashDictionary;
\r
176 ReportDownloadProgress(Path.GetFileName(localPath), 0, upHashes.Length, cloudFile.Bytes);
\r
177 for (var i = 0; i < upHashes.Length; i++)
\r
179 if (await WaitOrAbort(accountInfo,cloudFile, cancellationToken))
\r
182 //For every non-matching hash
\r
183 var upHash = upHashes[i];
\r
184 if (!localHashes.ContainsKey(upHash))
\r
186 StatusNotification.Notify(new CloudNotification { Data = cloudFile });
\r
188 if (blockUpdater.UseOrphan(i, upHash))
\r
190 Log.InfoFormat("[BLOCK GET] ORPHAN FOUND for {0} of {1} for {2}", i, upHashes.Length, localPath);
\r
193 Log.InfoFormat("[BLOCK GET] START {0} of {1} for {2}", i, upHashes.Length, localPath);
\r
194 var start = i * serverHash.BlockSize;
\r
195 //To download the last block just pass a null for the end of the range
\r
197 if (i < upHashes.Length - 1)
\r
198 end = ((i + 1) * serverHash.BlockSize);
\r
200 //TODO: Pass token here
\r
201 //Download the missing block
\r
202 var block = await client.GetBlock(cloudFile.Account, cloudFile.Container, relativeUrl, start, end,cancellationToken);
\r
205 blockUpdater.StoreBlock(i, block);
\r
208 Log.InfoFormat("[BLOCK GET] FINISH {0} of {1} for {2}", i, upHashes.Length, localPath);
\r
210 ReportDownloadProgress(Path.GetFileName(localPath), i, upHashes.Length, cloudFile.Bytes);
\r
213 //Want to avoid notifications if no changes were made
\r
214 var hasChanges = blockUpdater.HasBlocks;
\r
215 blockUpdater.Commit();
\r
218 //Notify listeners that a local file has changed
\r
219 StatusNotification.NotifyChangedFile(localPath);
\r
221 Log.InfoFormat("[BLOCK GET] COMPLETE {0}", localPath);
\r
224 //Download a small file with a single GET operation
\r
225 private async Task DownloadEntireFileAsync(AccountInfo accountInfo, CloudFilesClient client, ObjectInfo cloudFile, Uri relativeUrl, string filePath, CancellationToken cancellationToken)
\r
227 if (client == null)
\r
228 throw new ArgumentNullException("client");
\r
229 if (cloudFile == null)
\r
230 throw new ArgumentNullException("cloudFile");
\r
231 if (relativeUrl == null)
\r
232 throw new ArgumentNullException("relativeUrl");
\r
233 if (String.IsNullOrWhiteSpace(filePath))
\r
234 throw new ArgumentNullException("filePath");
\r
235 if (!Path.IsPathRooted(filePath))
\r
236 throw new ArgumentException("The localPath must be rooted", "filePath");
\r
237 if (cloudFile.IsDirectory)
\r
238 throw new ArgumentException("cloudFile is a directory, not a file", "cloudFile");
\r
239 Contract.EndContractBlock();
\r
241 if (await WaitOrAbort(accountInfo,cloudFile, cancellationToken))
\r
244 var localPath = FileInfoExtensions.GetProperFilePathCapitalization(filePath);
\r
245 StatusNotification.SetPithosStatus(PithosStatus.LocalSyncing, String.Format("Downloading {0}", Path.GetFileName(localPath)));
\r
246 StatusNotification.Notify(new CloudNotification { Data = cloudFile });
\r
248 var fileAgent = GetFileAgent(accountInfo);
\r
249 //Calculate the relative file path for the new file
\r
250 var relativePath = relativeUrl.RelativeUriToFilePath();
\r
251 //The file will be stored in a temporary location while downloading with an extension .download
\r
252 var tempPath = Path.Combine(fileAgent.CachePath, relativePath + ".download");
\r
253 //Make sure the target folder exists. DownloadFileTask will not create the folder
\r
254 var tempFolder = Path.GetDirectoryName(tempPath);
\r
255 if (!Directory.Exists(tempFolder))
\r
256 Directory.CreateDirectory(tempFolder);
\r
258 //TODO: Should pass the token here
\r
260 //Download the object to the temporary location
\r
261 await client.GetObject(cloudFile.Account, cloudFile.Container, relativeUrl.ToString(), tempPath,cancellationToken);
\r
263 //Create the local folder if it doesn't exist (necessary for shared objects)
\r
264 var localFolder = Path.GetDirectoryName(localPath);
\r
265 if (!Directory.Exists(localFolder))
\r
268 Directory.CreateDirectory(localFolder);
\r
270 catch (IOException)
\r
272 //A file may already exist that has the same name as the new folder.
\r
273 //This may be an artifact of the way Pithos handles directories
\r
274 var fileInfo = new FileInfo(localFolder);
\r
275 if (fileInfo.Exists && fileInfo.Length == 0)
\r
277 Log.WarnFormat("Malformed directory object detected for [{0}]", localFolder);
\r
279 Directory.CreateDirectory(localFolder);
\r
284 //And move it to its actual location once downloading is finished
\r
285 if (File.Exists(localPath))
\r
286 File.Replace(tempPath, localPath, null, true);
\r
288 File.Move(tempPath, localPath);
\r
289 //Notify listeners that a local file has changed
\r
290 StatusNotification.NotifyChangedFile(localPath);
\r
296 private void ReportDownloadProgress(string fileName, int block, int totalBlocks, long fileSize)
\r
298 StatusNotification.Notify(totalBlocks == 0
\r
299 ? new ProgressNotification(fileName, "Downloading", 1, 1, fileSize)
\r
300 : new ProgressNotification(fileName, "Downloading", block, totalBlocks, fileSize));
\r
303 private bool IsObjectChanged(ObjectInfo cloudFile, string localPath)
\r
305 //If the target is a directory, there are no changes to download
\r
306 if (Directory.Exists(localPath))
\r
308 //If the file doesn't exist, we have a chagne
\r
309 if (!File.Exists(localPath))
\r
311 //If there is no stored state, we have a change
\r
312 var localState = StatusKeeper.GetStateByFilePath(localPath);
\r
313 if (localState == null)
\r
316 var info = new FileInfo(localPath);
\r
317 var shortHash = info.ComputeShortHash();
\r
318 //If the file is different from the stored state, we have a change
\r
319 if (localState.ShortHash != shortHash)
\r
321 //If the top hashes differ, we have a change
\r
322 return (localState.Checksum != cloudFile.Hash);
\r
325 private static FileAgent GetFileAgent(AccountInfo accountInfo)
\r
327 return AgentLocator<FileAgent>.Get(accountInfo.AccountPath);
\r
330 private async Task<bool> WaitOrAbort(AccountInfo account,ObjectInfo cloudFile, CancellationToken token)
\r
332 token.ThrowIfCancellationRequested();
\r
333 await UnpauseEvent.WaitAsync();
\r
334 var shouldAbort = !Selectives.IsSelected(account,cloudFile);
\r
336 Log.InfoFormat("Aborting [{0}]", cloudFile.Uri);
\r
337 return shouldAbort;
\r
341 public Selectives Selectives { get; set; }
\r
343 public AsyncManualResetEvent UnpauseEvent { get; set; }
\r