root / trunk / Pithos.Core / Agents / Downloader.cs @ 43dd02a8
History | View | Annotate | Download (16.2 kB)
1 |
using System; |
---|---|
2 |
using System.Collections.Generic; |
3 |
using System.ComponentModel.Composition; |
4 |
using System.Diagnostics.Contracts; |
5 |
using System.IO; |
6 |
using System.Linq; |
7 |
using System.Reflection; |
8 |
using System.Threading; |
9 |
using System.Threading.Tasks; |
10 |
using Pithos.Interfaces; |
11 |
using Pithos.Network; |
12 |
using log4net; |
13 |
|
14 |
namespace Pithos.Core.Agents |
15 |
{ |
16 |
[Export(typeof(Downloader))] |
17 |
public class Downloader |
18 |
{ |
19 |
private static readonly ILog Log = LogManager.GetLogger(MethodBase.GetCurrentMethod().DeclaringType); |
20 |
|
21 |
[Import] |
22 |
private IStatusKeeper StatusKeeper { get; set; } |
23 |
|
24 |
|
25 |
public IStatusNotification StatusNotification { get; set; } |
26 |
|
27 |
/* |
28 |
private CancellationTokenSource _cts=new CancellationTokenSource(); |
29 |
|
30 |
public void SignalStop() |
31 |
{ |
32 |
_cts.Cancel(); |
33 |
} |
34 |
*/ |
35 |
|
36 |
|
37 |
//Download a file. |
38 |
public async Task DownloadCloudFile(AccountInfo accountInfo, ObjectInfo cloudFile, string filePath,CancellationToken cancellationToken) |
39 |
{ |
40 |
if (accountInfo == null) |
41 |
throw new ArgumentNullException("accountInfo"); |
42 |
if (cloudFile == null) |
43 |
throw new ArgumentNullException("cloudFile"); |
44 |
if (String.IsNullOrWhiteSpace(cloudFile.Account)) |
45 |
throw new ArgumentNullException("cloudFile"); |
46 |
if (String.IsNullOrWhiteSpace(cloudFile.Container)) |
47 |
throw new ArgumentNullException("cloudFile"); |
48 |
if (String.IsNullOrWhiteSpace(filePath)) |
49 |
throw new ArgumentNullException("filePath"); |
50 |
if (!Path.IsPathRooted(filePath)) |
51 |
throw new ArgumentException("The filePath must be rooted", "filePath"); |
52 |
Contract.EndContractBlock(); |
53 |
using (ThreadContext.Stacks["Operation"].Push("DownloadCloudFile")) |
54 |
{ |
55 |
// var cancellationToken=_cts.Token;// .ThrowIfCancellationRequested(); |
56 |
|
57 |
if (await WaitOrAbort(accountInfo,cloudFile, cancellationToken)) |
58 |
return; |
59 |
|
60 |
|
61 |
var localPath = FileInfoExtensions.GetProperFilePathCapitalization(filePath); |
62 |
var relativeUrl = new Uri(cloudFile.Name, UriKind.Relative); |
63 |
|
64 |
var url = relativeUrl.ToString(); |
65 |
if (cloudFile.Name.EndsWith(".ignore", StringComparison.InvariantCultureIgnoreCase)) |
66 |
return; |
67 |
|
68 |
if (!Selectives.IsSelected(accountInfo,cloudFile)) |
69 |
return; |
70 |
|
71 |
|
72 |
//Are we already downloading or uploading the file? |
73 |
using (var gate = NetworkGate.Acquire(localPath, NetworkOperation.Downloading)) |
74 |
{ |
75 |
if (gate.Failed) |
76 |
return; |
77 |
|
78 |
var client = new CloudFilesClient(accountInfo); |
79 |
var account = cloudFile.Account; |
80 |
var container = cloudFile.Container; |
81 |
|
82 |
if (cloudFile.IsDirectory) |
83 |
{ |
84 |
if (!Directory.Exists(localPath)) |
85 |
try |
86 |
{ |
87 |
Directory.CreateDirectory(localPath); |
88 |
if (Log.IsDebugEnabled) |
89 |
Log.DebugFormat("Created Directory [{0}]", localPath); |
90 |
} |
91 |
catch (IOException) |
92 |
{ |
93 |
var localInfo = new FileInfo(localPath); |
94 |
if (localInfo.Exists && localInfo.Length == 0) |
95 |
{ |
96 |
Log.WarnFormat("Malformed directory object detected for [{0}]", localPath); |
97 |
localInfo.Delete(); |
98 |
Directory.CreateDirectory(localPath); |
99 |
if (Log.IsDebugEnabled) |
100 |
Log.DebugFormat("Created Directory [{0}]", localPath); |
101 |
} |
102 |
} |
103 |
} |
104 |
else |
105 |
{ |
106 |
var isChanged = IsObjectChanged(cloudFile, localPath); |
107 |
if (isChanged) |
108 |
{ |
109 |
//Retrieve the hashmap from the server |
110 |
var serverHash = await client.GetHashMap(account, container, url); |
111 |
//If it's a small file |
112 |
if (serverHash.Hashes.Count == 1) |
113 |
//Download it in one go |
114 |
await |
115 |
DownloadEntireFileAsync(accountInfo, client, cloudFile, relativeUrl, localPath,cancellationToken); |
116 |
//Otherwise download it block by block |
117 |
else |
118 |
await |
119 |
DownloadWithBlocks(accountInfo, client, cloudFile, relativeUrl, localPath, |
120 |
serverHash,cancellationToken); |
121 |
|
122 |
if (!cloudFile.IsWritable(accountInfo.UserName)) |
123 |
{ |
124 |
var attributes = File.GetAttributes(localPath); |
125 |
File.SetAttributes(localPath, attributes | FileAttributes.ReadOnly); |
126 |
} |
127 |
} |
128 |
} |
129 |
|
130 |
//Now we can store the object's metadata without worrying about ghost status entries |
131 |
StatusKeeper.StoreInfo(localPath, cloudFile); |
132 |
|
133 |
} |
134 |
} |
135 |
|
136 |
} |
137 |
|
138 |
//Download a file asynchronously using blocks |
139 |
public async Task DownloadWithBlocks(AccountInfo accountInfo, CloudFilesClient client, ObjectInfo cloudFile, Uri relativeUrl, string filePath, TreeHash serverHash, CancellationToken cancellationToken) |
140 |
{ |
141 |
if (client == null) |
142 |
throw new ArgumentNullException("client"); |
143 |
if (cloudFile == null) |
144 |
throw new ArgumentNullException("cloudFile"); |
145 |
if (relativeUrl == null) |
146 |
throw new ArgumentNullException("relativeUrl"); |
147 |
if (String.IsNullOrWhiteSpace(filePath)) |
148 |
throw new ArgumentNullException("filePath"); |
149 |
if (!Path.IsPathRooted(filePath)) |
150 |
throw new ArgumentException("The filePath must be rooted", "filePath"); |
151 |
if (serverHash == null) |
152 |
throw new ArgumentNullException("serverHash"); |
153 |
if (cloudFile.IsDirectory) |
154 |
throw new ArgumentException("cloudFile is a directory, not a file", "cloudFile"); |
155 |
Contract.EndContractBlock(); |
156 |
|
157 |
if (await WaitOrAbort(accountInfo,cloudFile, cancellationToken)) |
158 |
return; |
159 |
|
160 |
var fileAgent = GetFileAgent(accountInfo); |
161 |
var localPath = FileInfoExtensions.GetProperFilePathCapitalization(filePath); |
162 |
|
163 |
//Calculate the relative file path for the new file |
164 |
var relativePath = relativeUrl.RelativeUriToFilePath(); |
165 |
var blockUpdater = new BlockUpdater(fileAgent.CachePath, localPath, relativePath, serverHash); |
166 |
|
167 |
StatusNotification.SetPithosStatus(PithosStatus.LocalSyncing, String.Format("Calculating hashmap for {0} before download", Path.GetFileName(localPath))); |
168 |
//Calculate the file's treehash |
169 |
|
170 |
//TODO: Should pass cancellation token here |
171 |
var treeHash = await Signature.CalculateTreeHashAsync(localPath, (int)serverHash.BlockSize, serverHash.BlockHash, 2); |
172 |
|
173 |
//And compare it with the server's hash |
174 |
var upHashes = serverHash.GetHashesAsStrings(); |
175 |
var localHashes = treeHash.HashDictionary; |
176 |
ReportDownloadProgress(Path.GetFileName(localPath), 0, upHashes.Length, cloudFile.Bytes); |
177 |
for (var i = 0; i < upHashes.Length; i++) |
178 |
{ |
179 |
if (await WaitOrAbort(accountInfo,cloudFile, cancellationToken)) |
180 |
return; |
181 |
|
182 |
//For every non-matching hash |
183 |
var upHash = upHashes[i]; |
184 |
if (!localHashes.ContainsKey(upHash)) |
185 |
{ |
186 |
StatusNotification.Notify(new CloudNotification { Data = cloudFile }); |
187 |
ReportDownloadProgress(Path.GetFileName(localPath), i, upHashes.Length, cloudFile.Bytes); |
188 |
|
189 |
if (blockUpdater.UseOrphan(i, upHash)) |
190 |
{ |
191 |
Log.InfoFormat("[BLOCK GET] ORPHAN FOUND for {0} of {1} for {2}", i, upHashes.Length, localPath); |
192 |
continue; |
193 |
} |
194 |
Log.InfoFormat("[BLOCK GET] START {0} of {1} for {2}", i, upHashes.Length, localPath); |
195 |
var start = i * serverHash.BlockSize; |
196 |
//To download the last block just pass a null for the end of the range |
197 |
long? end = null; |
198 |
if (i < upHashes.Length - 1) |
199 |
end = ((i + 1) * serverHash.BlockSize); |
200 |
|
201 |
//TODO: Pass token here |
202 |
//Download the missing block |
203 |
var block = await client.GetBlock(cloudFile.Account, cloudFile.Container, relativeUrl, start, end,cancellationToken); |
204 |
|
205 |
//and store it |
206 |
blockUpdater.StoreBlock(i, block); |
207 |
|
208 |
|
209 |
Log.InfoFormat("[BLOCK GET] FINISH {0} of {1} for {2}", i, upHashes.Length, localPath); |
210 |
} |
211 |
ReportDownloadProgress(Path.GetFileName(localPath), i, upHashes.Length, cloudFile.Bytes); |
212 |
} |
213 |
|
214 |
//Want to avoid notifications if no changes were made |
215 |
var hasChanges = blockUpdater.HasBlocks; |
216 |
blockUpdater.Commit(); |
217 |
|
218 |
if (hasChanges) |
219 |
//Notify listeners that a local file has changed |
220 |
StatusNotification.NotifyChangedFile(localPath); |
221 |
|
222 |
Log.InfoFormat("[BLOCK GET] COMPLETE {0}", localPath); |
223 |
} |
224 |
|
225 |
//Download a small file with a single GET operation |
226 |
private async Task DownloadEntireFileAsync(AccountInfo accountInfo, CloudFilesClient client, ObjectInfo cloudFile, Uri relativeUrl, string filePath, CancellationToken cancellationToken) |
227 |
{ |
228 |
if (client == null) |
229 |
throw new ArgumentNullException("client"); |
230 |
if (cloudFile == null) |
231 |
throw new ArgumentNullException("cloudFile"); |
232 |
if (relativeUrl == null) |
233 |
throw new ArgumentNullException("relativeUrl"); |
234 |
if (String.IsNullOrWhiteSpace(filePath)) |
235 |
throw new ArgumentNullException("filePath"); |
236 |
if (!Path.IsPathRooted(filePath)) |
237 |
throw new ArgumentException("The localPath must be rooted", "filePath"); |
238 |
if (cloudFile.IsDirectory) |
239 |
throw new ArgumentException("cloudFile is a directory, not a file", "cloudFile"); |
240 |
Contract.EndContractBlock(); |
241 |
|
242 |
if (await WaitOrAbort(accountInfo,cloudFile, cancellationToken)) |
243 |
return; |
244 |
|
245 |
var localPath = FileInfoExtensions.GetProperFilePathCapitalization(filePath); |
246 |
StatusNotification.SetPithosStatus(PithosStatus.LocalSyncing, String.Format("Downloading {0}", Path.GetFileName(localPath))); |
247 |
StatusNotification.Notify(new CloudNotification { Data = cloudFile }); |
248 |
ReportDownloadProgress(Path.GetFileName(localPath), 1, 1, cloudFile.Bytes); |
249 |
|
250 |
var fileAgent = GetFileAgent(accountInfo); |
251 |
//Calculate the relative file path for the new file |
252 |
var relativePath = relativeUrl.RelativeUriToFilePath(); |
253 |
//The file will be stored in a temporary location while downloading with an extension .download |
254 |
var tempPath = Path.Combine(fileAgent.CachePath, relativePath + ".download"); |
255 |
//Make sure the target folder exists. DownloadFileTask will not create the folder |
256 |
var tempFolder = Path.GetDirectoryName(tempPath); |
257 |
if (!Directory.Exists(tempFolder)) |
258 |
Directory.CreateDirectory(tempFolder); |
259 |
|
260 |
//TODO: Should pass the token here |
261 |
|
262 |
//Download the object to the temporary location |
263 |
await client.GetObject(cloudFile.Account, cloudFile.Container, relativeUrl.ToString(), tempPath,cancellationToken); |
264 |
|
265 |
//Create the local folder if it doesn't exist (necessary for shared objects) |
266 |
var localFolder = Path.GetDirectoryName(localPath); |
267 |
if (!Directory.Exists(localFolder)) |
268 |
try |
269 |
{ |
270 |
Directory.CreateDirectory(localFolder); |
271 |
} |
272 |
catch (IOException) |
273 |
{ |
274 |
//A file may already exist that has the same name as the new folder. |
275 |
//This may be an artifact of the way Pithos handles directories |
276 |
var fileInfo = new FileInfo(localFolder); |
277 |
if (fileInfo.Exists && fileInfo.Length == 0) |
278 |
{ |
279 |
Log.WarnFormat("Malformed directory object detected for [{0}]", localFolder); |
280 |
fileInfo.Delete(); |
281 |
Directory.CreateDirectory(localFolder); |
282 |
} |
283 |
else |
284 |
throw; |
285 |
} |
286 |
//And move it to its actual location once downloading is finished |
287 |
if (File.Exists(localPath)) |
288 |
File.Replace(tempPath, localPath, null, true); |
289 |
else |
290 |
File.Move(tempPath, localPath); |
291 |
//Notify listeners that a local file has changed |
292 |
StatusNotification.NotifyChangedFile(localPath); |
293 |
|
294 |
|
295 |
} |
296 |
|
297 |
|
298 |
private void ReportDownloadProgress(string fileName, int block, int totalBlocks, long fileSize) |
299 |
{ |
300 |
StatusNotification.Notify(totalBlocks == 0 |
301 |
? new ProgressNotification(fileName, "Downloading", 1, 1, fileSize) |
302 |
: new ProgressNotification(fileName, "Downloading", block, totalBlocks, fileSize)); |
303 |
} |
304 |
|
305 |
private bool IsObjectChanged(ObjectInfo cloudFile, string localPath) |
306 |
{ |
307 |
//If the target is a directory, there are no changes to download |
308 |
if (Directory.Exists(localPath)) |
309 |
return false; |
310 |
//If the file doesn't exist, we have a chagne |
311 |
if (!File.Exists(localPath)) |
312 |
return true; |
313 |
//If there is no stored state, we have a change |
314 |
var localState = StatusKeeper.GetStateByFilePath(localPath); |
315 |
if (localState == null) |
316 |
return true; |
317 |
|
318 |
var info = new FileInfo(localPath); |
319 |
var shortHash = info.ComputeShortHash(); |
320 |
//If the file is different from the stored state, we have a change |
321 |
if (localState.ShortHash != shortHash) |
322 |
return true; |
323 |
//If the top hashes differ, we have a change |
324 |
return (localState.Checksum != cloudFile.Hash); |
325 |
} |
326 |
|
327 |
private static FileAgent GetFileAgent(AccountInfo accountInfo) |
328 |
{ |
329 |
return AgentLocator<FileAgent>.Get(accountInfo.AccountPath); |
330 |
} |
331 |
|
332 |
private async Task<bool> WaitOrAbort(AccountInfo account,ObjectInfo cloudFile, CancellationToken token) |
333 |
{ |
334 |
token.ThrowIfCancellationRequested(); |
335 |
await UnpauseEvent.WaitAsync(); |
336 |
var shouldAbort = !Selectives.IsSelected(account,cloudFile); |
337 |
if (shouldAbort) |
338 |
Log.InfoFormat("Aborting [{0}]", cloudFile.Uri); |
339 |
return shouldAbort; |
340 |
} |
341 |
|
342 |
[Import] |
343 |
public Selectives Selectives { get; set; } |
344 |
|
345 |
public AsyncManualResetEvent UnpauseEvent { get; set; } |
346 |
} |
347 |
} |