root / trunk / Pithos.Core / Agents / Downloader.cs @ bf4471a3
History | View | Annotate | Download (17.8 kB)
1 |
using System; |
---|---|
2 |
using System.Collections.Generic; |
3 |
using System.ComponentModel.Composition; |
4 |
using System.Diagnostics.Contracts; |
5 |
using System.IO; |
6 |
using System.Linq; |
7 |
using System.Reflection; |
8 |
using System.Threading; |
9 |
using System.Threading.Tasks; |
10 |
using Pithos.Interfaces; |
11 |
using Pithos.Network; |
12 |
using log4net; |
13 |
|
14 |
namespace Pithos.Core.Agents |
15 |
{ |
16 |
[Export(typeof(Downloader))] |
17 |
public class Downloader |
18 |
{ |
19 |
private static readonly ILog Log = LogManager.GetLogger(MethodBase.GetCurrentMethod().DeclaringType); |
20 |
|
21 |
[Import] |
22 |
private IStatusKeeper StatusKeeper { get; set; } |
23 |
|
24 |
|
25 |
public IStatusNotification StatusNotification { get; set; } |
26 |
|
27 |
/* |
28 |
private CancellationTokenSource _cts=new CancellationTokenSource(); |
29 |
|
30 |
public void SignalStop() |
31 |
{ |
32 |
_cts.Cancel(); |
33 |
} |
34 |
*/ |
35 |
|
36 |
|
37 |
//Download a file. |
38 |
public async Task DownloadCloudFile(AccountInfo accountInfo, ObjectInfo cloudFile, string filePath,CancellationToken cancellationToken) |
39 |
{ |
40 |
if (accountInfo == null) |
41 |
throw new ArgumentNullException("accountInfo"); |
42 |
if (cloudFile == null) |
43 |
throw new ArgumentNullException("cloudFile"); |
44 |
if (String.IsNullOrWhiteSpace(cloudFile.Account)) |
45 |
throw new ArgumentNullException("cloudFile"); |
46 |
if (String.IsNullOrWhiteSpace(cloudFile.Container)) |
47 |
throw new ArgumentNullException("cloudFile"); |
48 |
if (String.IsNullOrWhiteSpace(filePath)) |
49 |
throw new ArgumentNullException("filePath"); |
50 |
if (!Path.IsPathRooted(filePath)) |
51 |
throw new ArgumentException("The filePath must be rooted", "filePath"); |
52 |
Contract.EndContractBlock(); |
53 |
using (ThreadContext.Stacks["Operation"].Push("DownloadCloudFile")) |
54 |
{ |
55 |
// var cancellationToken=_cts.Token;// .ThrowIfCancellationRequested(); |
56 |
|
57 |
if (await WaitOrAbort(accountInfo,cloudFile, cancellationToken).ConfigureAwait(false)) |
58 |
return; |
59 |
|
60 |
var fileName = Path.GetFileName(filePath); |
61 |
var progress = new Progress<double>(d => |
62 |
StatusNotification.Notify(new StatusNotification(String.Format("Hashing for Download {0} of {1}", d, fileName)))); |
63 |
|
64 |
|
65 |
TreeHash localTreeHash; |
66 |
|
67 |
using (StatusNotification.GetNotifier("Hashing for Download {0}", "Hashed for Download {0}", fileName)) |
68 |
{ |
69 |
|
70 |
localTreeHash = Signature.CalculateTreeHashAsync(filePath, |
71 |
accountInfo.BlockSize, |
72 |
accountInfo.BlockHash, 1,progress); |
73 |
} |
74 |
|
75 |
var localPath = FileInfoExtensions.GetProperFilePathCapitalization(filePath); |
76 |
var relativeUrl = new Uri(cloudFile.Name, UriKind.Relative); |
77 |
|
78 |
var url = relativeUrl.ToString(); |
79 |
if (cloudFile.Name.EndsWith(".ignore", StringComparison.InvariantCultureIgnoreCase)) |
80 |
return; |
81 |
|
82 |
if (!Selectives.IsSelected(accountInfo,cloudFile)) |
83 |
return; |
84 |
|
85 |
|
86 |
//Are we already downloading or uploading the file? |
87 |
using (var gate = NetworkGate.Acquire(localPath, NetworkOperation.Downloading)) |
88 |
{ |
89 |
if (gate.Failed) |
90 |
return; |
91 |
|
92 |
var client = new CloudFilesClient(accountInfo); |
93 |
var account = cloudFile.Account; |
94 |
var container = cloudFile.Container; |
95 |
|
96 |
if (cloudFile.IsDirectory) |
97 |
{ |
98 |
if (!Directory.Exists(localPath)) |
99 |
try |
100 |
{ |
101 |
Directory.CreateDirectory(localPath); |
102 |
if (Log.IsDebugEnabled) |
103 |
Log.DebugFormat("Created Directory [{0}]", localPath); |
104 |
} |
105 |
catch (IOException) |
106 |
{ |
107 |
var localInfo = new FileInfo(localPath); |
108 |
if (localInfo.Exists && localInfo.Length == 0) |
109 |
{ |
110 |
Log.WarnFormat("Malformed directory object detected for [{0}]", localPath); |
111 |
localInfo.Delete(); |
112 |
Directory.CreateDirectory(localPath); |
113 |
if (Log.IsDebugEnabled) |
114 |
Log.DebugFormat("Created Directory [{0}]", localPath); |
115 |
} |
116 |
} |
117 |
} |
118 |
else |
119 |
{ |
120 |
var isChanged = IsObjectChanged(cloudFile, localPath); |
121 |
if (isChanged) |
122 |
{ |
123 |
//Retrieve the hashmap from the server |
124 |
var serverHash = await client.GetHashMap(account, container, url).ConfigureAwait(false); |
125 |
//If it's a small file |
126 |
if (serverHash.Hashes.Count == 1) |
127 |
//Download it in one go |
128 |
await |
129 |
DownloadEntireFileAsync(accountInfo, client, cloudFile, relativeUrl, localPath,cancellationToken); |
130 |
//Otherwise download it block by block |
131 |
else |
132 |
await |
133 |
DownloadWithBlocks(accountInfo, client, cloudFile, relativeUrl, localPath,localTreeHash, |
134 |
serverHash,cancellationToken); |
135 |
|
136 |
if (!cloudFile.IsWritable(accountInfo.UserName)) |
137 |
{ |
138 |
var attributes = File.GetAttributes(localPath); |
139 |
File.SetAttributes(localPath, attributes | FileAttributes.ReadOnly); |
140 |
} |
141 |
} |
142 |
} |
143 |
|
144 |
//Now we can store the object's metadata without worrying about ghost status entries |
145 |
StatusKeeper.StoreInfo(localPath, cloudFile); |
146 |
|
147 |
} |
148 |
} |
149 |
|
150 |
} |
151 |
|
152 |
//Download a file asynchronously using blocks |
153 |
public async Task DownloadWithBlocks(AccountInfo accountInfo, CloudFilesClient client, ObjectInfo cloudFile, Uri relativeUrl, string filePath, TreeHash localTreeHash,TreeHash serverHash, CancellationToken cancellationToken) |
154 |
{ |
155 |
if (client == null) |
156 |
throw new ArgumentNullException("client"); |
157 |
if (cloudFile == null) |
158 |
throw new ArgumentNullException("cloudFile"); |
159 |
if (relativeUrl == null) |
160 |
throw new ArgumentNullException("relativeUrl"); |
161 |
if (String.IsNullOrWhiteSpace(filePath)) |
162 |
throw new ArgumentNullException("filePath"); |
163 |
if (!Path.IsPathRooted(filePath)) |
164 |
throw new ArgumentException("The filePath must be rooted", "filePath"); |
165 |
if (serverHash == null) |
166 |
throw new ArgumentNullException("serverHash"); |
167 |
if (cloudFile.IsDirectory) |
168 |
throw new ArgumentException("cloudFile is a directory, not a file", "cloudFile"); |
169 |
Contract.EndContractBlock(); |
170 |
|
171 |
if (await WaitOrAbort(accountInfo, cloudFile, cancellationToken).ConfigureAwait(false)) |
172 |
return; |
173 |
|
174 |
var fileAgent = GetFileAgent(accountInfo); |
175 |
var localPath = FileInfoExtensions.GetProperFilePathCapitalization(filePath); |
176 |
|
177 |
//Calculate the relative file path for the new file |
178 |
var relativePath = relativeUrl.RelativeUriToFilePath(); |
179 |
var blockUpdater = new BlockUpdater(fileAgent.CachePath, localPath, relativePath, serverHash); |
180 |
|
181 |
StatusNotification.SetPithosStatus(PithosStatus.LocalSyncing, String.Format("Calculating hashmap for {0} before download", Path.GetFileName(localPath))); |
182 |
//Calculate the file's treehash |
183 |
|
184 |
var fileName = Path.GetFileName(localPath); |
185 |
var progress = new Progress<double>(d => |
186 |
StatusNotification.Notify(new StatusNotification(String.Format("Hashing for Download {0} of {1}", d, fileName)))); |
187 |
|
188 |
//TODO: Should pass cancellation token here |
189 |
var treeHash = localTreeHash ?? Signature.CalculateTreeHashAsync(localPath, (int)serverHash.BlockSize, serverHash.BlockHash, 2,progress); |
190 |
|
191 |
//And compare it with the server's hash |
192 |
var upHashes = serverHash.GetHashesAsStrings(); |
193 |
var localHashes = treeHash.HashDictionary; |
194 |
ReportDownloadProgress(Path.GetFileName(localPath), 0,0, upHashes.Length, cloudFile.Bytes); |
195 |
|
196 |
|
197 |
long i = 0; |
198 |
client.DownloadProgressChanged += (sender, args) => |
199 |
ReportDownloadProgress(Path.GetFileName(localPath), i, args.ProgressPercentage, upHashes.Length, cloudFile.Bytes); |
200 |
|
201 |
|
202 |
for (i = 0; i < upHashes.Length; i++) |
203 |
{ |
204 |
if (await WaitOrAbort(accountInfo, cloudFile, cancellationToken).ConfigureAwait(false)) |
205 |
return; |
206 |
|
207 |
//For every non-matching hash |
208 |
var upHash = upHashes[i]; |
209 |
if (!localHashes.ContainsKey(upHash)) |
210 |
{ |
211 |
StatusNotification.Notify(new CloudNotification { Data = cloudFile }); |
212 |
ReportDownloadProgress(Path.GetFileName(localPath), i, 0,upHashes.Length, cloudFile.Bytes); |
213 |
|
214 |
if (blockUpdater.UseOrphan(i, upHash)) |
215 |
{ |
216 |
Log.InfoFormat("[BLOCK GET] ORPHAN FOUND for {0} of {1} for {2}", i, upHashes.Length, localPath); |
217 |
continue; |
218 |
} |
219 |
Log.InfoFormat("[BLOCK GET] START {0} of {1} for {2}", i, upHashes.Length, localPath); |
220 |
long start = i * serverHash.BlockSize; |
221 |
//To download the last block just pass a null for the end of the range |
222 |
long? end = null; |
223 |
if (i < upHashes.Length - 1) |
224 |
end = ((i + 1) * serverHash.BlockSize); |
225 |
|
226 |
//TODO: Pass token here |
227 |
//Download the missing block |
228 |
byte[] block = await client.GetBlock(cloudFile.Account, cloudFile.Container, relativeUrl, start, end, cancellationToken).ConfigureAwait(false); |
229 |
|
230 |
//and store it |
231 |
blockUpdater.StoreBlock(i, block); |
232 |
|
233 |
|
234 |
Log.InfoFormat("[BLOCK GET] FINISH {0} of {1} for {2}", i, upHashes.Length, localPath); |
235 |
} |
236 |
ReportDownloadProgress(Path.GetFileName(localPath), i, 100,upHashes.Length, cloudFile.Bytes); |
237 |
} |
238 |
|
239 |
//Want to avoid notifications if no changes were made |
240 |
var hasChanges = blockUpdater.HasBlocks; |
241 |
blockUpdater.Commit(); |
242 |
|
243 |
if (hasChanges) |
244 |
//Notify listeners that a local file has changed |
245 |
StatusNotification.NotifyChangedFile(localPath); |
246 |
|
247 |
Log.InfoFormat("[BLOCK GET] COMPLETE {0}", localPath); |
248 |
} |
249 |
|
250 |
//Download a small file with a single GET operation |
251 |
private async Task DownloadEntireFileAsync(AccountInfo accountInfo, CloudFilesClient client, ObjectInfo cloudFile, Uri relativeUrl, string filePath, CancellationToken cancellationToken) |
252 |
{ |
253 |
if (client == null) |
254 |
throw new ArgumentNullException("client"); |
255 |
if (cloudFile == null) |
256 |
throw new ArgumentNullException("cloudFile"); |
257 |
if (relativeUrl == null) |
258 |
throw new ArgumentNullException("relativeUrl"); |
259 |
if (String.IsNullOrWhiteSpace(filePath)) |
260 |
throw new ArgumentNullException("filePath"); |
261 |
if (!Path.IsPathRooted(filePath)) |
262 |
throw new ArgumentException("The localPath must be rooted", "filePath"); |
263 |
if (cloudFile.IsDirectory) |
264 |
throw new ArgumentException("cloudFile is a directory, not a file", "cloudFile"); |
265 |
Contract.EndContractBlock(); |
266 |
|
267 |
if (await WaitOrAbort(accountInfo, cloudFile, cancellationToken).ConfigureAwait(false)) |
268 |
return; |
269 |
|
270 |
var localPath = FileInfoExtensions.GetProperFilePathCapitalization(filePath); |
271 |
StatusNotification.SetPithosStatus(PithosStatus.LocalSyncing, String.Format("Downloading {0}", Path.GetFileName(localPath))); |
272 |
StatusNotification.Notify(new CloudNotification { Data = cloudFile }); |
273 |
ReportDownloadProgress(Path.GetFileName(localPath), 1, 0,1, cloudFile.Bytes); |
274 |
|
275 |
var fileAgent = GetFileAgent(accountInfo); |
276 |
//Calculate the relative file path for the new file |
277 |
var relativePath = relativeUrl.RelativeUriToFilePath(); |
278 |
//The file will be stored in a temporary location while downloading with an extension .download |
279 |
var tempPath = Path.Combine(fileAgent.CachePath, relativePath + ".download"); |
280 |
//Make sure the target folder exists. DownloadFileTask will not create the folder |
281 |
var tempFolder = Path.GetDirectoryName(tempPath); |
282 |
if (!Directory.Exists(tempFolder)) |
283 |
Directory.CreateDirectory(tempFolder); |
284 |
|
285 |
//TODO: Should pass the token here |
286 |
|
287 |
//Download the object to the temporary location |
288 |
await client.GetObject(cloudFile.Account, cloudFile.Container, relativeUrl.ToString(), tempPath, cancellationToken).ConfigureAwait(false); |
289 |
|
290 |
//Create the local folder if it doesn't exist (necessary for shared objects) |
291 |
var localFolder = Path.GetDirectoryName(localPath); |
292 |
if (!Directory.Exists(localFolder)) |
293 |
try |
294 |
{ |
295 |
Directory.CreateDirectory(localFolder); |
296 |
} |
297 |
catch (IOException) |
298 |
{ |
299 |
//A file may already exist that has the same name as the new folder. |
300 |
//This may be an artifact of the way Pithos handles directories |
301 |
var fileInfo = new FileInfo(localFolder); |
302 |
if (fileInfo.Exists && fileInfo.Length == 0) |
303 |
{ |
304 |
Log.WarnFormat("Malformed directory object detected for [{0}]", localFolder); |
305 |
fileInfo.Delete(); |
306 |
Directory.CreateDirectory(localFolder); |
307 |
} |
308 |
else |
309 |
throw; |
310 |
} |
311 |
//And move it to its actual location once downloading is finished |
312 |
if (File.Exists(localPath)) |
313 |
File.Replace(tempPath, localPath, null, true); |
314 |
else |
315 |
File.Move(tempPath, localPath); |
316 |
//Notify listeners that a local file has changed |
317 |
StatusNotification.NotifyChangedFile(localPath); |
318 |
|
319 |
|
320 |
} |
321 |
|
322 |
|
323 |
private void ReportDownloadProgress(string fileName, long block, int blockPercentage,int totalBlocks, long fileSize) |
324 |
{ |
325 |
StatusNotification.Notify(totalBlocks == 0 |
326 |
? new ProgressNotification(fileName, "Downloading", 1, blockPercentage,1, fileSize) |
327 |
: new ProgressNotification(fileName, "Downloading", block, blockPercentage, totalBlocks, fileSize)); |
328 |
} |
329 |
|
330 |
private bool IsObjectChanged(ObjectInfo cloudFile, string localPath) |
331 |
{ |
332 |
//If the target is a directory, there are no changes to download |
333 |
if (Directory.Exists(localPath)) |
334 |
return false; |
335 |
//If the file doesn't exist, we have a chagne |
336 |
if (!File.Exists(localPath)) |
337 |
return true; |
338 |
//If there is no stored state, we have a change |
339 |
var localState = StatusKeeper.GetStateByFilePath(localPath); |
340 |
if (localState == null) |
341 |
return true; |
342 |
|
343 |
var info = new FileInfo(localPath); |
344 |
var etag = info.ComputeShortHash(StatusNotification); |
345 |
//If the file is different from the stored state, we have a change |
346 |
if (localState.ETag != etag) |
347 |
return true; |
348 |
//If the top hashes differ, we have a change |
349 |
return (localState.Checksum != cloudFile.X_Object_Hash); |
350 |
} |
351 |
|
352 |
private static FileAgent GetFileAgent(AccountInfo accountInfo) |
353 |
{ |
354 |
return AgentLocator<FileAgent>.Get(accountInfo.AccountPath); |
355 |
} |
356 |
|
357 |
private async Task<bool> WaitOrAbort(AccountInfo account,ObjectInfo cloudFile, CancellationToken token) |
358 |
{ |
359 |
token.ThrowIfCancellationRequested(); |
360 |
await UnpauseEvent.WaitAsync().ConfigureAwait(false); |
361 |
var shouldAbort = !Selectives.IsSelected(account,cloudFile); |
362 |
if (shouldAbort) |
363 |
Log.InfoFormat("Aborting [{0}]", cloudFile.Uri); |
364 |
return shouldAbort; |
365 |
} |
366 |
|
367 |
[Import] |
368 |
public Selectives Selectives { get; set; } |
369 |
|
370 |
public AsyncManualResetEvent UnpauseEvent { get; set; } |
371 |
} |
372 |
} |