Revision 2341c603 trunk/Pithos.Core/Agents/Uploader.cs
b/trunk/Pithos.Core/Agents/Uploader.cs | ||
---|---|---|
1 | 1 |
using System; |
2 |
using System.Collections.Generic; |
|
2 | 3 |
using System.ComponentModel.Composition; |
3 | 4 |
using System.Diagnostics; |
4 | 5 |
using System.Diagnostics.Contracts; |
5 | 6 |
using System.IO; |
7 |
using System.Linq; |
|
6 | 8 |
using System.Net; |
7 | 9 |
using System.Reflection; |
10 |
using System.Threading; |
|
8 | 11 |
using System.Threading.Tasks; |
9 | 12 |
using Pithos.Interfaces; |
10 | 13 |
using Pithos.Network; |
... | ... | |
23 | 26 |
|
24 | 27 |
public IStatusNotification StatusNotification { get; set; } |
25 | 28 |
|
26 |
public async Task UploadCloudFile(CloudAction action) |
|
29 |
|
|
30 |
//CancellationTokenSource _cts = new CancellationTokenSource(); |
|
31 |
/*public void SignalStop() |
|
32 |
{ |
|
33 |
_cts.Cancel(); |
|
34 |
}*/ |
|
35 |
|
|
36 |
public async Task UploadCloudFile(CloudAction action,CancellationToken cancellationToken) |
|
27 | 37 |
{ |
28 | 38 |
if (action == null) |
29 | 39 |
throw new ArgumentNullException("action"); |
... | ... | |
33 | 43 |
{ |
34 | 44 |
try |
35 | 45 |
{ |
46 |
await UnpauseEvent.WaitAsync(); |
|
47 |
|
|
36 | 48 |
var fileInfo = action.LocalFile; |
37 | 49 |
|
38 | 50 |
if (fileInfo.Extension.Equals("ignore", StringComparison.InvariantCultureIgnoreCase)) |
39 | 51 |
return; |
40 | 52 |
|
53 |
if (Selectives.IsSelected(action.AccountInfo, fileInfo)) |
|
54 |
return; |
|
55 |
|
|
41 | 56 |
//Try to load the action's local state, if it is empty |
42 | 57 |
if (action.FileState == null) |
43 | 58 |
action.FileState = StatusKeeper.GetStateByFilePath(fileInfo.FullName); |
... | ... | |
106 | 121 |
|
107 | 122 |
} |
108 | 123 |
|
124 |
await UnpauseEvent.WaitAsync(); |
|
125 |
|
|
109 | 126 |
if (fileInfo is DirectoryInfo) |
110 | 127 |
{ |
111 | 128 |
//If the directory doesn't exist the Hash property will be empty |
... | ... | |
144 | 161 |
//Upload even small files using the Hashmap. The server may already contain |
145 | 162 |
//the relevant block |
146 | 163 |
|
147 |
await UploadWithHashMap(accountInfo, cloudFile, fileInfo as FileInfo, cloudFile.Name, treeHash); |
|
164 |
|
|
165 |
|
|
166 |
await UploadWithHashMap(accountInfo, cloudFile, fileInfo as FileInfo, cloudFile.Name, treeHash,cancellationToken); |
|
148 | 167 |
} |
149 | 168 |
//If everything succeeds, change the file and overlay status to normal |
150 | 169 |
StatusKeeper.SetFileState(fullFileName, FileStatus.Unchanged, FileOverlayStatus.Normal, ""); |
... | ... | |
222 | 241 |
} |
223 | 242 |
|
224 | 243 |
|
225 |
public async Task UploadWithHashMap(AccountInfo accountInfo, ObjectInfo cloudFile, FileInfo fileInfo, string url, TreeHash treeHash) |
|
244 |
public async Task UploadWithHashMap(AccountInfo accountInfo, ObjectInfo cloudFile, FileInfo fileInfo, string url, TreeHash treeHash, CancellationToken token)
|
|
226 | 245 |
{ |
227 | 246 |
if (accountInfo == null) |
228 | 247 |
throw new ArgumentNullException("accountInfo"); |
... | ... | |
238 | 257 |
throw new ArgumentException("Invalid container", "cloudFile"); |
239 | 258 |
Contract.EndContractBlock(); |
240 | 259 |
|
260 |
|
|
241 | 261 |
using (StatusNotification.GetNotifier("Uploading {0}", "Finished Uploading {0}", fileInfo.Name)) |
242 | 262 |
{ |
243 |
|
|
263 |
token.ThrowIfCancellationRequested(); |
|
264 |
await UnpauseEvent.WaitAsync(); |
|
265 |
|
|
244 | 266 |
var fullFileName = fileInfo.GetProperCapitalization(); |
245 | 267 |
|
246 | 268 |
var account = cloudFile.Account ?? accountInfo.UserName; |
247 | 269 |
var container = cloudFile.Container; |
248 | 270 |
|
271 |
|
|
249 | 272 |
var client = new CloudFilesClient(accountInfo); |
250 | 273 |
//Send the hashmap to the server |
251 | 274 |
var missingHashes = await client.PutHashMap(account, container, url, treeHash); |
... | ... | |
255 | 278 |
while (missingHashes.Count > 0) |
256 | 279 |
{ |
257 | 280 |
|
281 |
token.ThrowIfCancellationRequested(); |
|
282 |
await UnpauseEvent.WaitAsync(); |
|
283 |
|
|
258 | 284 |
var buffer = new byte[accountInfo.BlockSize]; |
259 | 285 |
foreach (var missingHash in missingHashes) |
260 | 286 |
{ |
287 |
token.ThrowIfCancellationRequested(); |
|
288 |
await UnpauseEvent.WaitAsync(); |
|
289 |
|
|
261 | 290 |
//Find the proper block |
262 | 291 |
var blockIndex = treeHash.HashDictionary[missingHash]; |
263 | 292 |
long offset = blockIndex*accountInfo.BlockSize; |
... | ... | |
277 | 306 |
ReportUploadProgress(fileInfo.Name, block++, missingHashes.Count, fileInfo.Length); |
278 | 307 |
} |
279 | 308 |
|
309 |
token.ThrowIfCancellationRequested(); |
|
280 | 310 |
//Repeat until there are no more missing hashes |
281 | 311 |
missingHashes = await client.PutHashMap(account, container, url, treeHash); |
282 | 312 |
} |
... | ... | |
308 | 338 |
} |
309 | 339 |
return false; |
310 | 340 |
} |
341 |
|
|
342 |
[Import] |
|
343 |
public Selectives Selectives { get; set; } |
|
344 |
|
|
345 |
public AsyncManualResetEvent UnpauseEvent { get; set; } |
|
311 | 346 |
} |
312 | 347 |
} |
Also available in: Unified diff