Revision 2341c603 trunk/Pithos.Core/Agents/Downloader.cs
b/trunk/Pithos.Core/Agents/Downloader.cs | ||
---|---|---|
1 | 1 |
using System; |
2 |
using System.Collections.Generic; |
|
2 | 3 |
using System.ComponentModel.Composition; |
3 | 4 |
using System.Diagnostics.Contracts; |
4 | 5 |
using System.IO; |
6 |
using System.Linq; |
|
5 | 7 |
using System.Reflection; |
8 |
using System.Threading; |
|
6 | 9 |
using System.Threading.Tasks; |
7 | 10 |
using Pithos.Interfaces; |
8 | 11 |
using Pithos.Network; |
... | ... | |
21 | 24 |
|
22 | 25 |
public IStatusNotification StatusNotification { get; set; } |
23 | 26 |
|
27 |
/* |
|
28 |
private CancellationTokenSource _cts=new CancellationTokenSource(); |
|
29 |
|
|
30 |
public void SignalStop() |
|
31 |
{ |
|
32 |
_cts.Cancel(); |
|
33 |
} |
|
34 |
*/ |
|
35 |
|
|
36 |
|
|
24 | 37 |
//Download a file. |
25 |
public async Task DownloadCloudFile(AccountInfo accountInfo, ObjectInfo cloudFile, string filePath) |
|
38 |
public async Task DownloadCloudFile(AccountInfo accountInfo, ObjectInfo cloudFile, string filePath,CancellationToken cancellationToken)
|
|
26 | 39 |
{ |
27 | 40 |
if (accountInfo == null) |
28 | 41 |
throw new ArgumentNullException("accountInfo"); |
... | ... | |
37 | 50 |
if (!Path.IsPathRooted(filePath)) |
38 | 51 |
throw new ArgumentException("The filePath must be rooted", "filePath"); |
39 | 52 |
Contract.EndContractBlock(); |
53 |
using (ThreadContext.Stacks["Operation"].Push("DownloadCloudFile")) |
|
54 |
{ |
|
55 |
// var cancellationToken=_cts.Token;// .ThrowIfCancellationRequested(); |
|
40 | 56 |
|
41 |
using (ThreadContext.Stacks["Operation"].Push("DownloadCloudFile"))
|
|
42 |
{
|
|
57 |
cancellationToken.ThrowIfCancellationRequested();
|
|
58 |
await UnpauseEvent.WaitAsync();
|
|
43 | 59 |
|
44 |
var localPath = FileInfoExtensions.GetProperFilePathCapitalization(filePath); |
|
45 |
var relativeUrl = new Uri(cloudFile.Name, UriKind.Relative); |
|
46 | 60 |
|
47 |
var url = relativeUrl.ToString(); |
|
48 |
if (cloudFile.Name.EndsWith(".ignore", StringComparison.InvariantCultureIgnoreCase)) |
|
49 |
return; |
|
61 |
var localPath = FileInfoExtensions.GetProperFilePathCapitalization(filePath); |
|
62 |
var relativeUrl = new Uri(cloudFile.Name, UriKind.Relative); |
|
50 | 63 |
|
64 |
var url = relativeUrl.ToString(); |
|
65 |
if (cloudFile.Name.EndsWith(".ignore", StringComparison.InvariantCultureIgnoreCase)) |
|
66 |
return; |
|
51 | 67 |
|
52 |
//Are we already downloading or uploading the file? |
|
53 |
using (var gate = NetworkGate.Acquire(localPath, NetworkOperation.Downloading)) |
|
54 |
{ |
|
55 |
if (gate.Failed) |
|
68 |
if (!Selectives.IsSelected(cloudFile)) |
|
56 | 69 |
return; |
57 | 70 |
|
58 |
var client = new CloudFilesClient(accountInfo); |
|
59 |
var account = cloudFile.Account; |
|
60 |
var container = cloudFile.Container; |
|
61 | 71 |
|
62 |
if (cloudFile.IsDirectory) |
|
72 |
//Are we already downloading or uploading the file? |
|
73 |
using (var gate = NetworkGate.Acquire(localPath, NetworkOperation.Downloading)) |
|
63 | 74 |
{ |
64 |
if (!Directory.Exists(localPath))
|
|
65 |
try
|
|
66 |
{ |
|
67 |
Directory.CreateDirectory(localPath);
|
|
68 |
if (Log.IsDebugEnabled)
|
|
69 |
Log.DebugFormat("Created Directory [{0}]", localPath);
|
|
70 |
} |
|
71 |
catch (IOException)
|
|
72 |
{
|
|
73 |
var localInfo = new FileInfo(localPath);
|
|
74 |
if (localInfo.Exists && localInfo.Length == 0)
|
|
75 |
if (gate.Failed)
|
|
76 |
return;
|
|
77 |
|
|
78 |
var client = new CloudFilesClient(accountInfo);
|
|
79 |
var account = cloudFile.Account;
|
|
80 |
var container = cloudFile.Container;
|
|
81 |
|
|
82 |
if (cloudFile.IsDirectory)
|
|
83 |
{ |
|
84 |
if (!Directory.Exists(localPath))
|
|
85 |
try
|
|
75 | 86 |
{ |
76 |
Log.WarnFormat("Malformed directory object detected for [{0}]", localPath); |
|
77 |
localInfo.Delete(); |
|
78 | 87 |
Directory.CreateDirectory(localPath); |
79 | 88 |
if (Log.IsDebugEnabled) |
80 | 89 |
Log.DebugFormat("Created Directory [{0}]", localPath); |
81 | 90 |
} |
82 |
} |
|
83 |
} |
|
84 |
else |
|
85 |
{ |
|
86 |
var isChanged = IsObjectChanged(cloudFile, localPath); |
|
87 |
if (isChanged) |
|
91 |
catch (IOException) |
|
92 |
{ |
|
93 |
var localInfo = new FileInfo(localPath); |
|
94 |
if (localInfo.Exists && localInfo.Length == 0) |
|
95 |
{ |
|
96 |
Log.WarnFormat("Malformed directory object detected for [{0}]", localPath); |
|
97 |
localInfo.Delete(); |
|
98 |
Directory.CreateDirectory(localPath); |
|
99 |
if (Log.IsDebugEnabled) |
|
100 |
Log.DebugFormat("Created Directory [{0}]", localPath); |
|
101 |
} |
|
102 |
} |
|
103 |
} |
|
104 |
else |
|
88 | 105 |
{ |
89 |
//Retrieve the hashmap from the server |
|
90 |
var serverHash = await client.GetHashMap(account, container, url); |
|
91 |
//If it's a small file |
|
92 |
if (serverHash.Hashes.Count == 1) |
|
93 |
//Download it in one go |
|
94 |
await |
|
95 |
DownloadEntireFileAsync(accountInfo, client, cloudFile, relativeUrl, localPath); |
|
96 |
//Otherwise download it block by block |
|
97 |
else |
|
98 |
await |
|
99 |
DownloadWithBlocks(accountInfo, client, cloudFile, relativeUrl, localPath, |
|
100 |
serverHash); |
|
101 |
|
|
102 |
if (!cloudFile.IsWritable(accountInfo.UserName)) |
|
106 |
var isChanged = IsObjectChanged(cloudFile, localPath); |
|
107 |
if (isChanged) |
|
103 | 108 |
{ |
104 |
var attributes = File.GetAttributes(localPath); |
|
105 |
File.SetAttributes(localPath, attributes | FileAttributes.ReadOnly); |
|
109 |
//Retrieve the hashmap from the server |
|
110 |
var serverHash = await client.GetHashMap(account, container, url); |
|
111 |
//If it's a small file |
|
112 |
if (serverHash.Hashes.Count == 1) |
|
113 |
//Download it in one go |
|
114 |
await |
|
115 |
DownloadEntireFileAsync(accountInfo, client, cloudFile, relativeUrl, localPath,cancellationToken); |
|
116 |
//Otherwise download it block by block |
|
117 |
else |
|
118 |
await |
|
119 |
DownloadWithBlocks(accountInfo, client, cloudFile, relativeUrl, localPath, |
|
120 |
serverHash,cancellationToken); |
|
121 |
|
|
122 |
if (!cloudFile.IsWritable(accountInfo.UserName)) |
|
123 |
{ |
|
124 |
var attributes = File.GetAttributes(localPath); |
|
125 |
File.SetAttributes(localPath, attributes | FileAttributes.ReadOnly); |
|
126 |
} |
|
106 | 127 |
} |
107 | 128 |
} |
108 |
} |
|
109 | 129 |
|
110 |
//Now we can store the object's metadata without worrying about ghost status entries |
|
111 |
StatusKeeper.StoreInfo(localPath, cloudFile); |
|
130 |
//Now we can store the object's metadata without worrying about ghost status entries
|
|
131 |
StatusKeeper.StoreInfo(localPath, cloudFile);
|
|
112 | 132 |
|
133 |
} |
|
113 | 134 |
} |
114 |
} |
|
135 |
|
|
115 | 136 |
} |
116 | 137 |
|
117 | 138 |
//Download a file asynchronously using blocks |
118 |
public async Task DownloadWithBlocks(AccountInfo accountInfo, CloudFilesClient client, ObjectInfo cloudFile, Uri relativeUrl, string filePath, TreeHash serverHash) |
|
139 |
public async Task DownloadWithBlocks(AccountInfo accountInfo, CloudFilesClient client, ObjectInfo cloudFile, Uri relativeUrl, string filePath, TreeHash serverHash, CancellationToken cancellationToken)
|
|
119 | 140 |
{ |
120 | 141 |
if (client == null) |
121 | 142 |
throw new ArgumentNullException("client"); |
... | ... | |
133 | 154 |
throw new ArgumentException("cloudFile is a directory, not a file", "cloudFile"); |
134 | 155 |
Contract.EndContractBlock(); |
135 | 156 |
|
157 |
cancellationToken.ThrowIfCancellationRequested(); |
|
158 |
await UnpauseEvent.WaitAsync(); |
|
159 |
|
|
136 | 160 |
var fileAgent = GetFileAgent(accountInfo); |
137 | 161 |
var localPath = FileInfoExtensions.GetProperFilePathCapitalization(filePath); |
138 | 162 |
|
... | ... | |
140 | 164 |
var relativePath = relativeUrl.RelativeUriToFilePath(); |
141 | 165 |
var blockUpdater = new BlockUpdater(fileAgent.CachePath, localPath, relativePath, serverHash); |
142 | 166 |
|
143 |
|
|
144 | 167 |
StatusNotification.SetPithosStatus(PithosStatus.LocalSyncing, String.Format("Calculating hashmap for {0} before download", Path.GetFileName(localPath))); |
145 | 168 |
//Calculate the file's treehash |
169 |
|
|
170 |
//TODO: Should pass cancellation token here |
|
146 | 171 |
var treeHash = await Signature.CalculateTreeHashAsync(localPath, serverHash.BlockSize, serverHash.BlockHash, 2); |
147 | 172 |
|
148 | 173 |
//And compare it with the server's hash |
... | ... | |
151 | 176 |
ReportDownloadProgress(Path.GetFileName(localPath), 0, upHashes.Length, cloudFile.Bytes); |
152 | 177 |
for (int i = 0; i < upHashes.Length; i++) |
153 | 178 |
{ |
179 |
cancellationToken.ThrowIfCancellationRequested(); |
|
180 |
await UnpauseEvent.WaitAsync(); |
|
181 |
|
|
154 | 182 |
//For every non-matching hash |
155 | 183 |
var upHash = upHashes[i]; |
156 | 184 |
if (!localHashes.ContainsKey(upHash)) |
... | ... | |
169 | 197 |
if (i < upHashes.Length - 1) |
170 | 198 |
end = ((i + 1) * serverHash.BlockSize); |
171 | 199 |
|
200 |
//TODO: Pass token here |
|
172 | 201 |
//Download the missing block |
173 |
var block = await client.GetBlock(cloudFile.Account, cloudFile.Container, relativeUrl, start, end); |
|
202 |
var block = await client.GetBlock(cloudFile.Account, cloudFile.Container, relativeUrl, start, end,cancellationToken);
|
|
174 | 203 |
|
175 | 204 |
//and store it |
176 | 205 |
blockUpdater.StoreBlock(i, block); |
... | ... | |
193 | 222 |
} |
194 | 223 |
|
195 | 224 |
//Download a small file with a single GET operation |
196 |
private async Task DownloadEntireFileAsync(AccountInfo accountInfo, CloudFilesClient client, ObjectInfo cloudFile, Uri relativeUrl, string filePath) |
|
225 |
private async Task DownloadEntireFileAsync(AccountInfo accountInfo, CloudFilesClient client, ObjectInfo cloudFile, Uri relativeUrl, string filePath, CancellationToken cancellationToken)
|
|
197 | 226 |
{ |
198 | 227 |
if (client == null) |
199 | 228 |
throw new ArgumentNullException("client"); |
... | ... | |
209 | 238 |
throw new ArgumentException("cloudFile is a directory, not a file", "cloudFile"); |
210 | 239 |
Contract.EndContractBlock(); |
211 | 240 |
|
241 |
cancellationToken.ThrowIfCancellationRequested(); |
|
242 |
await UnpauseEvent.WaitAsync(); |
|
243 |
|
|
212 | 244 |
var localPath = FileInfoExtensions.GetProperFilePathCapitalization(filePath); |
213 | 245 |
StatusNotification.SetPithosStatus(PithosStatus.LocalSyncing, String.Format("Downloading {0}", Path.GetFileName(localPath))); |
214 | 246 |
StatusNotification.Notify(new CloudNotification { Data = cloudFile }); |
... | ... | |
223 | 255 |
if (!Directory.Exists(tempFolder)) |
224 | 256 |
Directory.CreateDirectory(tempFolder); |
225 | 257 |
|
258 |
//TODO: Should pass the token here |
|
259 |
|
|
226 | 260 |
//Download the object to the temporary location |
227 |
await client.GetObject(cloudFile.Account, cloudFile.Container, relativeUrl.ToString(), tempPath); |
|
261 |
await client.GetObject(cloudFile.Account, cloudFile.Container, relativeUrl.ToString(), tempPath,cancellationToken);
|
|
228 | 262 |
|
229 | 263 |
//Create the local folder if it doesn't exist (necessary for shared objects) |
230 | 264 |
var localFolder = Path.GetDirectoryName(localPath); |
... | ... | |
293 | 327 |
return AgentLocator<FileAgent>.Get(accountInfo.AccountPath); |
294 | 328 |
} |
295 | 329 |
|
330 |
[Import] |
|
331 |
public Selectives Selectives { get; set; } |
|
296 | 332 |
|
333 |
public AsyncManualResetEvent UnpauseEvent { get; set; } |
|
297 | 334 |
} |
298 | 335 |
} |
Also available in: Unified diff