root / trunk / Pithos.Core / Agents / Downloader.cs @ 2341c603
History | View | Annotate | Download (15.3 kB)
1 |
using System; |
---|---|
2 |
using System.Collections.Generic; |
3 |
using System.ComponentModel.Composition; |
4 |
using System.Diagnostics.Contracts; |
5 |
using System.IO; |
6 |
using System.Linq; |
7 |
using System.Reflection; |
8 |
using System.Threading; |
9 |
using System.Threading.Tasks; |
10 |
using Pithos.Interfaces; |
11 |
using Pithos.Network; |
12 |
using log4net; |
13 |
|
14 |
namespace Pithos.Core.Agents |
15 |
{ |
16 |
[Export(typeof(Downloader))] |
17 |
public class Downloader |
18 |
{ |
19 |
private static readonly ILog Log = LogManager.GetLogger(MethodBase.GetCurrentMethod().DeclaringType); |
20 |
|
21 |
[Import] |
22 |
private IStatusKeeper StatusKeeper { get; set; } |
23 |
|
24 |
|
25 |
public IStatusNotification StatusNotification { get; set; } |
26 |
|
27 |
/* |
28 |
private CancellationTokenSource _cts=new CancellationTokenSource(); |
29 |
|
30 |
public void SignalStop() |
31 |
{ |
32 |
_cts.Cancel(); |
33 |
} |
34 |
*/ |
35 |
|
36 |
|
37 |
//Download a file. |
38 |
public async Task DownloadCloudFile(AccountInfo accountInfo, ObjectInfo cloudFile, string filePath,CancellationToken cancellationToken) |
39 |
{ |
40 |
if (accountInfo == null) |
41 |
throw new ArgumentNullException("accountInfo"); |
42 |
if (cloudFile == null) |
43 |
throw new ArgumentNullException("cloudFile"); |
44 |
if (String.IsNullOrWhiteSpace(cloudFile.Account)) |
45 |
throw new ArgumentNullException("cloudFile"); |
46 |
if (String.IsNullOrWhiteSpace(cloudFile.Container)) |
47 |
throw new ArgumentNullException("cloudFile"); |
48 |
if (String.IsNullOrWhiteSpace(filePath)) |
49 |
throw new ArgumentNullException("filePath"); |
50 |
if (!Path.IsPathRooted(filePath)) |
51 |
throw new ArgumentException("The filePath must be rooted", "filePath"); |
52 |
Contract.EndContractBlock(); |
53 |
using (ThreadContext.Stacks["Operation"].Push("DownloadCloudFile")) |
54 |
{ |
55 |
// var cancellationToken=_cts.Token;// .ThrowIfCancellationRequested(); |
56 |
|
57 |
cancellationToken.ThrowIfCancellationRequested(); |
58 |
await UnpauseEvent.WaitAsync(); |
59 |
|
60 |
|
61 |
var localPath = FileInfoExtensions.GetProperFilePathCapitalization(filePath); |
62 |
var relativeUrl = new Uri(cloudFile.Name, UriKind.Relative); |
63 |
|
64 |
var url = relativeUrl.ToString(); |
65 |
if (cloudFile.Name.EndsWith(".ignore", StringComparison.InvariantCultureIgnoreCase)) |
66 |
return; |
67 |
|
68 |
if (!Selectives.IsSelected(cloudFile)) |
69 |
return; |
70 |
|
71 |
|
72 |
//Are we already downloading or uploading the file? |
73 |
using (var gate = NetworkGate.Acquire(localPath, NetworkOperation.Downloading)) |
74 |
{ |
75 |
if (gate.Failed) |
76 |
return; |
77 |
|
78 |
var client = new CloudFilesClient(accountInfo); |
79 |
var account = cloudFile.Account; |
80 |
var container = cloudFile.Container; |
81 |
|
82 |
if (cloudFile.IsDirectory) |
83 |
{ |
84 |
if (!Directory.Exists(localPath)) |
85 |
try |
86 |
{ |
87 |
Directory.CreateDirectory(localPath); |
88 |
if (Log.IsDebugEnabled) |
89 |
Log.DebugFormat("Created Directory [{0}]", localPath); |
90 |
} |
91 |
catch (IOException) |
92 |
{ |
93 |
var localInfo = new FileInfo(localPath); |
94 |
if (localInfo.Exists && localInfo.Length == 0) |
95 |
{ |
96 |
Log.WarnFormat("Malformed directory object detected for [{0}]", localPath); |
97 |
localInfo.Delete(); |
98 |
Directory.CreateDirectory(localPath); |
99 |
if (Log.IsDebugEnabled) |
100 |
Log.DebugFormat("Created Directory [{0}]", localPath); |
101 |
} |
102 |
} |
103 |
} |
104 |
else |
105 |
{ |
106 |
var isChanged = IsObjectChanged(cloudFile, localPath); |
107 |
if (isChanged) |
108 |
{ |
109 |
//Retrieve the hashmap from the server |
110 |
var serverHash = await client.GetHashMap(account, container, url); |
111 |
//If it's a small file |
112 |
if (serverHash.Hashes.Count == 1) |
113 |
//Download it in one go |
114 |
await |
115 |
DownloadEntireFileAsync(accountInfo, client, cloudFile, relativeUrl, localPath,cancellationToken); |
116 |
//Otherwise download it block by block |
117 |
else |
118 |
await |
119 |
DownloadWithBlocks(accountInfo, client, cloudFile, relativeUrl, localPath, |
120 |
serverHash,cancellationToken); |
121 |
|
122 |
if (!cloudFile.IsWritable(accountInfo.UserName)) |
123 |
{ |
124 |
var attributes = File.GetAttributes(localPath); |
125 |
File.SetAttributes(localPath, attributes | FileAttributes.ReadOnly); |
126 |
} |
127 |
} |
128 |
} |
129 |
|
130 |
//Now we can store the object's metadata without worrying about ghost status entries |
131 |
StatusKeeper.StoreInfo(localPath, cloudFile); |
132 |
|
133 |
} |
134 |
} |
135 |
|
136 |
} |
137 |
|
138 |
//Download a file asynchronously using blocks |
139 |
public async Task DownloadWithBlocks(AccountInfo accountInfo, CloudFilesClient client, ObjectInfo cloudFile, Uri relativeUrl, string filePath, TreeHash serverHash, CancellationToken cancellationToken) |
140 |
{ |
141 |
if (client == null) |
142 |
throw new ArgumentNullException("client"); |
143 |
if (cloudFile == null) |
144 |
throw new ArgumentNullException("cloudFile"); |
145 |
if (relativeUrl == null) |
146 |
throw new ArgumentNullException("relativeUrl"); |
147 |
if (String.IsNullOrWhiteSpace(filePath)) |
148 |
throw new ArgumentNullException("filePath"); |
149 |
if (!Path.IsPathRooted(filePath)) |
150 |
throw new ArgumentException("The filePath must be rooted", "filePath"); |
151 |
if (serverHash == null) |
152 |
throw new ArgumentNullException("serverHash"); |
153 |
if (cloudFile.IsDirectory) |
154 |
throw new ArgumentException("cloudFile is a directory, not a file", "cloudFile"); |
155 |
Contract.EndContractBlock(); |
156 |
|
157 |
cancellationToken.ThrowIfCancellationRequested(); |
158 |
await UnpauseEvent.WaitAsync(); |
159 |
|
160 |
var fileAgent = GetFileAgent(accountInfo); |
161 |
var localPath = FileInfoExtensions.GetProperFilePathCapitalization(filePath); |
162 |
|
163 |
//Calculate the relative file path for the new file |
164 |
var relativePath = relativeUrl.RelativeUriToFilePath(); |
165 |
var blockUpdater = new BlockUpdater(fileAgent.CachePath, localPath, relativePath, serverHash); |
166 |
|
167 |
StatusNotification.SetPithosStatus(PithosStatus.LocalSyncing, String.Format("Calculating hashmap for {0} before download", Path.GetFileName(localPath))); |
168 |
//Calculate the file's treehash |
169 |
|
170 |
//TODO: Should pass cancellation token here |
171 |
var treeHash = await Signature.CalculateTreeHashAsync(localPath, serverHash.BlockSize, serverHash.BlockHash, 2); |
172 |
|
173 |
//And compare it with the server's hash |
174 |
var upHashes = serverHash.GetHashesAsStrings(); |
175 |
var localHashes = treeHash.HashDictionary; |
176 |
ReportDownloadProgress(Path.GetFileName(localPath), 0, upHashes.Length, cloudFile.Bytes); |
177 |
for (int i = 0; i < upHashes.Length; i++) |
178 |
{ |
179 |
cancellationToken.ThrowIfCancellationRequested(); |
180 |
await UnpauseEvent.WaitAsync(); |
181 |
|
182 |
//For every non-matching hash |
183 |
var upHash = upHashes[i]; |
184 |
if (!localHashes.ContainsKey(upHash)) |
185 |
{ |
186 |
StatusNotification.Notify(new CloudNotification { Data = cloudFile }); |
187 |
|
188 |
if (blockUpdater.UseOrphan(i, upHash)) |
189 |
{ |
190 |
Log.InfoFormat("[BLOCK GET] ORPHAN FOUND for {0} of {1} for {2}", i, upHashes.Length, localPath); |
191 |
continue; |
192 |
} |
193 |
Log.InfoFormat("[BLOCK GET] START {0} of {1} for {2}", i, upHashes.Length, localPath); |
194 |
var start = i * serverHash.BlockSize; |
195 |
//To download the last block just pass a null for the end of the range |
196 |
long? end = null; |
197 |
if (i < upHashes.Length - 1) |
198 |
end = ((i + 1) * serverHash.BlockSize); |
199 |
|
200 |
//TODO: Pass token here |
201 |
//Download the missing block |
202 |
var block = await client.GetBlock(cloudFile.Account, cloudFile.Container, relativeUrl, start, end,cancellationToken); |
203 |
|
204 |
//and store it |
205 |
blockUpdater.StoreBlock(i, block); |
206 |
|
207 |
|
208 |
Log.InfoFormat("[BLOCK GET] FINISH {0} of {1} for {2}", i, upHashes.Length, localPath); |
209 |
} |
210 |
ReportDownloadProgress(Path.GetFileName(localPath), i, upHashes.Length, cloudFile.Bytes); |
211 |
} |
212 |
|
213 |
//Want to avoid notifications if no changes were made |
214 |
var hasChanges = blockUpdater.HasBlocks; |
215 |
blockUpdater.Commit(); |
216 |
|
217 |
if (hasChanges) |
218 |
//Notify listeners that a local file has changed |
219 |
StatusNotification.NotifyChangedFile(localPath); |
220 |
|
221 |
Log.InfoFormat("[BLOCK GET] COMPLETE {0}", localPath); |
222 |
} |
223 |
|
224 |
//Download a small file with a single GET operation |
225 |
private async Task DownloadEntireFileAsync(AccountInfo accountInfo, CloudFilesClient client, ObjectInfo cloudFile, Uri relativeUrl, string filePath, CancellationToken cancellationToken) |
226 |
{ |
227 |
if (client == null) |
228 |
throw new ArgumentNullException("client"); |
229 |
if (cloudFile == null) |
230 |
throw new ArgumentNullException("cloudFile"); |
231 |
if (relativeUrl == null) |
232 |
throw new ArgumentNullException("relativeUrl"); |
233 |
if (String.IsNullOrWhiteSpace(filePath)) |
234 |
throw new ArgumentNullException("filePath"); |
235 |
if (!Path.IsPathRooted(filePath)) |
236 |
throw new ArgumentException("The localPath must be rooted", "filePath"); |
237 |
if (cloudFile.IsDirectory) |
238 |
throw new ArgumentException("cloudFile is a directory, not a file", "cloudFile"); |
239 |
Contract.EndContractBlock(); |
240 |
|
241 |
cancellationToken.ThrowIfCancellationRequested(); |
242 |
await UnpauseEvent.WaitAsync(); |
243 |
|
244 |
var localPath = FileInfoExtensions.GetProperFilePathCapitalization(filePath); |
245 |
StatusNotification.SetPithosStatus(PithosStatus.LocalSyncing, String.Format("Downloading {0}", Path.GetFileName(localPath))); |
246 |
StatusNotification.Notify(new CloudNotification { Data = cloudFile }); |
247 |
|
248 |
var fileAgent = GetFileAgent(accountInfo); |
249 |
//Calculate the relative file path for the new file |
250 |
var relativePath = relativeUrl.RelativeUriToFilePath(); |
251 |
//The file will be stored in a temporary location while downloading with an extension .download |
252 |
var tempPath = Path.Combine(fileAgent.CachePath, relativePath + ".download"); |
253 |
//Make sure the target folder exists. DownloadFileTask will not create the folder |
254 |
var tempFolder = Path.GetDirectoryName(tempPath); |
255 |
if (!Directory.Exists(tempFolder)) |
256 |
Directory.CreateDirectory(tempFolder); |
257 |
|
258 |
//TODO: Should pass the token here |
259 |
|
260 |
//Download the object to the temporary location |
261 |
await client.GetObject(cloudFile.Account, cloudFile.Container, relativeUrl.ToString(), tempPath,cancellationToken); |
262 |
|
263 |
//Create the local folder if it doesn't exist (necessary for shared objects) |
264 |
var localFolder = Path.GetDirectoryName(localPath); |
265 |
if (!Directory.Exists(localFolder)) |
266 |
try |
267 |
{ |
268 |
Directory.CreateDirectory(localFolder); |
269 |
} |
270 |
catch (IOException) |
271 |
{ |
272 |
//A file may already exist that has the same name as the new folder. |
273 |
//This may be an artifact of the way Pithos handles directories |
274 |
var fileInfo = new FileInfo(localFolder); |
275 |
if (fileInfo.Exists && fileInfo.Length == 0) |
276 |
{ |
277 |
Log.WarnFormat("Malformed directory object detected for [{0}]", localFolder); |
278 |
fileInfo.Delete(); |
279 |
Directory.CreateDirectory(localFolder); |
280 |
} |
281 |
else |
282 |
throw; |
283 |
} |
284 |
//And move it to its actual location once downloading is finished |
285 |
if (File.Exists(localPath)) |
286 |
File.Replace(tempPath, localPath, null, true); |
287 |
else |
288 |
File.Move(tempPath, localPath); |
289 |
//Notify listeners that a local file has changed |
290 |
StatusNotification.NotifyChangedFile(localPath); |
291 |
|
292 |
|
293 |
} |
294 |
|
295 |
|
296 |
private void ReportDownloadProgress(string fileName, int block, int totalBlocks, long fileSize) |
297 |
{ |
298 |
StatusNotification.Notify(totalBlocks == 0 |
299 |
? new ProgressNotification(fileName, "Downloading", 1, 1, fileSize) |
300 |
: new ProgressNotification(fileName, "Downloading", block, totalBlocks, fileSize)); |
301 |
} |
302 |
|
303 |
private bool IsObjectChanged(ObjectInfo cloudFile, string localPath) |
304 |
{ |
305 |
//If the target is a directory, there are no changes to download |
306 |
if (Directory.Exists(localPath)) |
307 |
return false; |
308 |
//If the file doesn't exist, we have a chagne |
309 |
if (!File.Exists(localPath)) |
310 |
return true; |
311 |
//If there is no stored state, we have a change |
312 |
var localState = StatusKeeper.GetStateByFilePath(localPath); |
313 |
if (localState == null) |
314 |
return true; |
315 |
|
316 |
var info = new FileInfo(localPath); |
317 |
var shortHash = info.ComputeShortHash(); |
318 |
//If the file is different from the stored state, we have a change |
319 |
if (localState.ShortHash != shortHash) |
320 |
return true; |
321 |
//If the top hashes differ, we have a change |
322 |
return (localState.Checksum != cloudFile.Hash); |
323 |
} |
324 |
|
325 |
private static FileAgent GetFileAgent(AccountInfo accountInfo) |
326 |
{ |
327 |
return AgentLocator<FileAgent>.Get(accountInfo.AccountPath); |
328 |
} |
329 |
|
330 |
[Import] |
331 |
public Selectives Selectives { get; set; } |
332 |
|
333 |
public AsyncManualResetEvent UnpauseEvent { get; set; } |
334 |
} |
335 |
} |