root / trunk / Pithos.Core / Agents / NetworkAgent.cs @ cfed7823
History | View | Annotate | Download (37.8 kB)
1 |
using System; |
---|---|
2 |
using System.Collections.Generic; |
3 |
using System.ComponentModel.Composition; |
4 |
using System.Diagnostics; |
5 |
using System.Diagnostics.Contracts; |
6 |
using System.IO; |
7 |
using System.Linq; |
8 |
using System.Net; |
9 |
using System.Text; |
10 |
using System.Threading; |
11 |
using System.Threading.Tasks; |
12 |
using Pithos.Interfaces; |
13 |
using Pithos.Network; |
14 |
using log4net; |
15 |
|
16 |
namespace Pithos.Core.Agents |
17 |
{ |
18 |
[Export] |
19 |
public class NetworkAgent |
20 |
{ |
21 |
private Agent<CloudAction> _agent; |
22 |
|
23 |
[Import] |
24 |
public IStatusKeeper StatusKeeper { get; set; } |
25 |
|
26 |
public IStatusNotification StatusNotification { get; set; } |
27 |
[Import] |
28 |
public ICloudClient CloudClient { get; set; } |
29 |
|
30 |
[Import] |
31 |
public FileAgent FileAgent {get;set;} |
32 |
|
33 |
/* |
34 |
[Import] |
35 |
public IPithosWorkflow Workflow { get; set; } |
36 |
*/ |
37 |
|
38 |
|
39 |
public string PithosContainer { get; set; } |
40 |
public string TrashContainer { get; private set; } |
41 |
public IList<string> Containers { get; private set; } |
42 |
|
43 |
public int BlockSize { get; set; } |
44 |
public string BlockHash { get; set; } |
45 |
|
46 |
private static readonly ILog Log = LogManager.GetLogger(typeof(NetworkAgent)); |
47 |
|
48 |
|
49 |
public void Start(string pithosContainer, string trashContainer, int blockSize, string blockHash) |
50 |
{ |
51 |
if (String.IsNullOrWhiteSpace(pithosContainer)) |
52 |
throw new ArgumentNullException("pithosContainer"); |
53 |
if (String.IsNullOrWhiteSpace(trashContainer)) |
54 |
throw new ArgumentNullException("trashContainer"); |
55 |
Contract.EndContractBlock(); |
56 |
|
57 |
PithosContainer = pithosContainer; |
58 |
TrashContainer = trashContainer; |
59 |
BlockSize = blockSize; |
60 |
BlockHash = blockHash; |
61 |
|
62 |
|
63 |
_agent = Agent<CloudAction>.Start(inbox => |
64 |
{ |
65 |
Action loop = null; |
66 |
loop = () => |
67 |
{ |
68 |
var message = inbox.Receive(); |
69 |
var process=message.Then(Process,inbox.CancellationToken); |
70 |
inbox.LoopAsync(process, loop); |
71 |
}; |
72 |
loop(); |
73 |
}); |
74 |
} |
75 |
|
76 |
private Task<object> Process(CloudAction action) |
77 |
{ |
78 |
if (action == null) |
79 |
throw new ArgumentNullException("action"); |
80 |
Contract.EndContractBlock(); |
81 |
|
82 |
using (log4net.LogicalThreadContext.Stacks["NETWORK"].Push("PROCESS")) |
83 |
{ |
84 |
Log.InfoFormat("[ACTION] Start Processing {0}:{1}->{2}", action.Action, action.LocalFile, |
85 |
action.CloudFile.Name); |
86 |
|
87 |
var localFile = action.LocalFile; |
88 |
var cloudFile = action.CloudFile; |
89 |
var downloadPath = (cloudFile == null) |
90 |
? String.Empty |
91 |
: Path.Combine(FileAgent.RootPath, cloudFile.RelativeUrlToFilePath(CloudClient.UserName)); |
92 |
|
93 |
try |
94 |
{ |
95 |
var account = action.CloudFile.Account ?? CloudClient.UserName; |
96 |
var container = action.CloudFile.Container ?? PithosContainer; |
97 |
|
98 |
switch (action.Action) |
99 |
{ |
100 |
case CloudActionType.UploadUnconditional: |
101 |
UploadCloudFile(account, container, localFile, action.LocalHash.Value, action.TopHash.Value); |
102 |
break; |
103 |
case CloudActionType.DownloadUnconditional: |
104 |
|
105 |
DownloadCloudFile(account, container, new Uri(cloudFile.Name, UriKind.Relative), |
106 |
downloadPath); |
107 |
break; |
108 |
case CloudActionType.DeleteCloud: |
109 |
DeleteCloudFile(account, container, cloudFile.Name); |
110 |
break; |
111 |
case CloudActionType.RenameCloud: |
112 |
var moveAction = (CloudMoveAction)action; |
113 |
RenameCloudFile(account, container, moveAction.OldFileName, moveAction.NewPath, |
114 |
moveAction.NewFileName); |
115 |
break; |
116 |
case CloudActionType.MustSynch: |
117 |
|
118 |
if (!File.Exists(downloadPath)) |
119 |
{ |
120 |
var cloudUri = new Uri(action.CloudFile.Name, UriKind.Relative); |
121 |
DownloadCloudFile(account, container, cloudUri, downloadPath); |
122 |
} |
123 |
else |
124 |
{ |
125 |
SyncFiles(action); |
126 |
} |
127 |
break; |
128 |
} |
129 |
Log.InfoFormat("[ACTION] End Processing {0}:{1}->{2}", action.Action, action.LocalFile, |
130 |
action.CloudFile.Name); |
131 |
} |
132 |
catch (OperationCanceledException) |
133 |
{ |
134 |
throw; |
135 |
} |
136 |
catch (System.IO.FileNotFoundException exc) |
137 |
{ |
138 |
Log.ErrorFormat("{0} : {1} -> {2} failed because the file was not found.\n Rescheduling a delete", |
139 |
action.Action, action.LocalFile, action.CloudFile, exc); |
140 |
Post(new CloudAction(CloudActionType.DeleteCloud,action.CloudFile)); |
141 |
} |
142 |
catch (Exception exc) |
143 |
{ |
144 |
Log.ErrorFormat("[REQUEUE] {0} : {1} -> {2} due to exception\r\n{3}", |
145 |
action.Action, action.LocalFile, action.CloudFile, exc); |
146 |
|
147 |
_agent.Post(action); |
148 |
} |
149 |
return CompletedTask<object>.Default; |
150 |
} |
151 |
} |
152 |
|
153 |
private void SyncFiles(CloudAction action) |
154 |
{ |
155 |
if (action==null) |
156 |
throw new ArgumentNullException("action"); |
157 |
if (action.LocalFile==null) |
158 |
throw new ArgumentException("The action's local file is not specified","action"); |
159 |
if (!Path.IsPathRooted(action.LocalFile.FullName)) |
160 |
throw new ArgumentException("The action's local file path must be absolute","action"); |
161 |
if (action.CloudFile== null) |
162 |
throw new ArgumentException("The action's cloud file is not specified", "action"); |
163 |
Contract.EndContractBlock(); |
164 |
|
165 |
var localFile = action.LocalFile; |
166 |
var cloudFile = action.CloudFile; |
167 |
var downloadPath=action.LocalFile.FullName.ToLower(); |
168 |
|
169 |
var account = cloudFile.Account; |
170 |
//Use "pithos" by default if no container is specified |
171 |
var container = cloudFile.Container ?? PithosContainer; |
172 |
|
173 |
var cloudUri = new Uri(cloudFile.Name, UriKind.Relative); |
174 |
var cloudHash = cloudFile.Hash.ToLower(); |
175 |
var localHash = action.LocalHash.Value.ToLower(); |
176 |
var topHash = action.TopHash.Value.ToLower(); |
177 |
|
178 |
//Not enough to compare only the local hashes, also have to compare the tophashes |
179 |
|
180 |
//If any of the hashes match, we are done |
181 |
if ((cloudHash == localHash || cloudHash == topHash)) |
182 |
{ |
183 |
Log.InfoFormat("Skipping {0}, hashes match",downloadPath); |
184 |
return; |
185 |
} |
186 |
|
187 |
//The hashes DON'T match. We need to sync |
188 |
var lastLocalTime = localFile.LastWriteTime; |
189 |
var lastUpTime = cloudFile.Last_Modified; |
190 |
|
191 |
//If the local file is newer upload it |
192 |
if (lastUpTime <= lastLocalTime) |
193 |
{ |
194 |
//It probably means it was changed while the app was down |
195 |
UploadCloudFile(account, container, localFile, action.LocalHash.Value, |
196 |
action.TopHash.Value); |
197 |
} |
198 |
else |
199 |
{ |
200 |
//It the cloud file has a later date, it was modified by another user or computer. |
201 |
//We need to check the local file's status |
202 |
var status = StatusKeeper.GetFileStatus(downloadPath); |
203 |
switch (status) |
204 |
{ |
205 |
case FileStatus.Unchanged: |
206 |
//If the local file's status is Unchanged, we can go on and download the newer cloud file |
207 |
DownloadCloudFile(account, container,cloudUri,downloadPath); |
208 |
break; |
209 |
case FileStatus.Modified: |
210 |
//If the local file is Modified, we may have a conflict. In this case we should mark the file as Conflict |
211 |
//We can't ensure that a file modified online since the last time will appear as Modified, unless we |
212 |
//index all files before we start listening. |
213 |
case FileStatus.Created: |
214 |
//If the local file is Created, it means that the local and cloud files aren't related, |
215 |
// yet they have the same name. |
216 |
|
217 |
//In both cases we must mark the file as in conflict |
218 |
ReportConflict(downloadPath); |
219 |
break; |
220 |
default: |
221 |
//Other cases should never occur. Mark them as Conflict as well but log a warning |
222 |
ReportConflict(downloadPath); |
223 |
Log.WarnFormat("Unexcepted status {0} for file {1}->{2}", status, |
224 |
downloadPath, action.CloudFile.Name); |
225 |
break; |
226 |
} |
227 |
} |
228 |
} |
229 |
|
230 |
private void ReportConflict(string downloadPath) |
231 |
{ |
232 |
if (String.IsNullOrWhiteSpace(downloadPath)) |
233 |
throw new ArgumentNullException("downloadPath"); |
234 |
Contract.EndContractBlock(); |
235 |
|
236 |
StatusKeeper.SetFileOverlayStatus(downloadPath, FileOverlayStatus.Conflict); |
237 |
var message = String.Format("Conflict detected for file {0}", downloadPath); |
238 |
Log.Warn(message); |
239 |
StatusNotification.NotifyChange(message, TraceLevel.Warning); |
240 |
} |
241 |
|
242 |
/* |
243 |
private Task<object> Process(CloudMoveAction action) |
244 |
{ |
245 |
if (action == null) |
246 |
throw new ArgumentNullException("action"); |
247 |
Contract.EndContractBlock(); |
248 |
|
249 |
Trace.TraceInformation("[ACTION] Start Processing {0}:{1}->{2}", action.Action, action.LocalFile, action.CloudFile.Name); |
250 |
|
251 |
try |
252 |
{ |
253 |
RenameCloudFile(action.OldFileName, action.NewPath, action.NewFileName); |
254 |
Trace.TraceInformation("[ACTION] End Processing {0}:{1}->{2}", action.Action, action.LocalFile, action.CloudFile.Name); |
255 |
} |
256 |
catch (OperationCanceledException) |
257 |
{ |
258 |
throw; |
259 |
} |
260 |
catch (Exception exc) |
261 |
{ |
262 |
Trace.TraceError("[REQUEUE] {0} : {1} -> {2} due to exception\r\n{3}", |
263 |
action.Action, action.OldFileName, action.NewFileName, exc); |
264 |
|
265 |
_agent.Post(action); |
266 |
} |
267 |
return CompletedTask<object>.Default; |
268 |
} |
269 |
*/ |
270 |
|
271 |
|
272 |
public void Post(CloudAction cloudAction) |
273 |
{ |
274 |
if (cloudAction == null) |
275 |
throw new ArgumentNullException("cloudAction"); |
276 |
Contract.EndContractBlock(); |
277 |
|
278 |
//If the action targets a local file, add a treehash calculation |
279 |
if (cloudAction.LocalFile != null) |
280 |
{ |
281 |
|
282 |
if (cloudAction.LocalFile.Length>BlockSize) |
283 |
cloudAction.TopHash = new Lazy<string>(() => Signature.CalculateTreeHashAsync(cloudAction.LocalFile, |
284 |
BlockSize, BlockHash).Result |
285 |
.TopHash.ToHashString()); |
286 |
else |
287 |
{ |
288 |
cloudAction.TopHash=new Lazy<string>(()=> cloudAction.LocalHash.Value); |
289 |
} |
290 |
|
291 |
} |
292 |
_agent.Post(cloudAction); |
293 |
} |
294 |
|
295 |
class ObjectInfoByNameComparer:IEqualityComparer<ObjectInfo> |
296 |
{ |
297 |
public bool Equals(ObjectInfo x, ObjectInfo y) |
298 |
{ |
299 |
return x.Name.Equals(y.Name,StringComparison.InvariantCultureIgnoreCase); |
300 |
} |
301 |
|
302 |
public int GetHashCode(ObjectInfo obj) |
303 |
{ |
304 |
return obj.Name.ToLower().GetHashCode(); |
305 |
} |
306 |
} |
307 |
|
308 |
//Remote files are polled periodically. Any changes are processed |
309 |
public Task ProcessRemoteFiles(string accountPath,DateTime? since=null) |
310 |
{ |
311 |
if (String.IsNullOrWhiteSpace(accountPath)) |
312 |
throw new ArgumentNullException(accountPath); |
313 |
Contract.EndContractBlock(); |
314 |
|
315 |
using (log4net.LogicalThreadContext.Stacks["SCHEDULE"].Push("Retrieve Remote")) |
316 |
{ |
317 |
Log.Info("[LISTENER] Scheduled"); |
318 |
|
319 |
//Get the list of server objects changed since the last check |
320 |
var listObjects = Task<IList<ObjectInfo>>.Factory.StartNewDelayed(10000, () => |
321 |
CloudClient.ListObjects(CloudClient.UserName, PithosContainer, since)); |
322 |
//Get the list of deleted objects since the last check |
323 |
var listTrash = Task<IList<ObjectInfo>>.Factory.StartNewDelayed(10000, () => |
324 |
CloudClient.ListObjects(CloudClient.UserName, TrashContainer, since)); |
325 |
|
326 |
var listShared = Task<IList<ObjectInfo>>.Factory.StartNewDelayed(10000, () => |
327 |
CloudClient.ListSharedObjects(since)); |
328 |
|
329 |
var listAll = Task.Factory.TrackedSequence( |
330 |
() => listObjects, |
331 |
() => listTrash, |
332 |
() => listShared); |
333 |
|
334 |
//Next time we will check for all changes since the current check minus 1 second |
335 |
//This is done to ensure there are no discrepancies due to clock differences |
336 |
DateTime nextSince = DateTime.Now.AddSeconds(-1); |
337 |
|
338 |
|
339 |
var enqueueFiles = listAll.ContinueWith(task => |
340 |
{ |
341 |
if (task.IsFaulted) |
342 |
{ |
343 |
//ListObjects failed at this point, need to reschedule |
344 |
Log.ErrorFormat("[FAIL] ListObjects in ProcessRemoteFiles with {0}", task.Exception); |
345 |
ProcessRemoteFiles(accountPath, since); |
346 |
return; |
347 |
} |
348 |
using (log4net.LogicalThreadContext.Stacks["SCHEDULE"].Push("Process Results")) |
349 |
{ |
350 |
var remoteObjects = ((Task<IList<ObjectInfo>>) task.Result[0]).Result; |
351 |
var trashObjects = ((Task<IList<ObjectInfo>>) task.Result[1]).Result; |
352 |
var sharedObjects = ((Task<IList<ObjectInfo>>) task.Result[2]).Result; |
353 |
|
354 |
//Items with the same name, hash may be both in the container and the trash |
355 |
//Don't delete items that exist in the container |
356 |
var realTrash = from trash in trashObjects |
357 |
where !remoteObjects.Any(info => info.Hash == trash.Hash) |
358 |
select trash; |
359 |
ProcessDeletedFiles(realTrash); |
360 |
|
361 |
|
362 |
var remote = from info in remoteObjects.Union(sharedObjects) |
363 |
let name = info.Name |
364 |
where !name.EndsWith(".ignore", StringComparison.InvariantCultureIgnoreCase) && |
365 |
!name.StartsWith("fragments/", StringComparison.InvariantCultureIgnoreCase) |
366 |
select info; |
367 |
|
368 |
//Create a list of actions from the remote files |
369 |
var allActions = ObjectsToActions(remote); |
370 |
|
371 |
//And remove those that are already being processed by the agent |
372 |
var distinctActions = allActions |
373 |
.Except(_agent.GetEnumerable(), new PithosMonitor.LocalFileComparer()) |
374 |
.ToList(); |
375 |
|
376 |
//Queue all the actions |
377 |
foreach (var message in distinctActions) |
378 |
{ |
379 |
Post(message); |
380 |
} |
381 |
|
382 |
//Report the number of new files |
383 |
var remoteCount = distinctActions.Count(action=> |
384 |
action.Action==CloudActionType.DownloadUnconditional); |
385 |
if ( remoteCount > 0) |
386 |
StatusNotification.NotifyChange(String.Format("Processing {0} new files", remoteCount)); |
387 |
|
388 |
Log.Info("[LISTENER] End Processing"); |
389 |
} |
390 |
}); |
391 |
|
392 |
var loop = enqueueFiles.ContinueWith(t => |
393 |
{ |
394 |
if (t.IsFaulted) |
395 |
{ |
396 |
Log.Error("[LISTENER] Exception", t.Exception); |
397 |
} |
398 |
else |
399 |
{ |
400 |
Log.Info("[LISTENER] Finished"); |
401 |
} |
402 |
ProcessRemoteFiles(accountPath, nextSince); |
403 |
|
404 |
}); |
405 |
return loop; |
406 |
} |
407 |
} |
408 |
|
409 |
//Creates an appropriate action for each server file |
410 |
private IEnumerable<CloudAction> ObjectsToActions(IEnumerable<ObjectInfo> remote) |
411 |
{ |
412 |
if (remote==null) |
413 |
throw new ArgumentNullException(); |
414 |
Contract.EndContractBlock(); |
415 |
|
416 |
//In order to avoid multiple iterations over the files, we iterate only once |
417 |
//over the remote files |
418 |
foreach (var objectInfo in remote) |
419 |
{ |
420 |
var relativePath = objectInfo.RelativeUrlToFilePath(CloudClient.UserName); |
421 |
//and remove any matching objects from the list, adding them to the commonObjects list |
422 |
if (FileAgent.Exists(relativePath)) |
423 |
{ |
424 |
var localFile = FileAgent.GetFileInfo(relativePath); |
425 |
var state = FileState.FindByFilePath(localFile.FullName); |
426 |
//Common files should be checked on a per-case basis to detect differences, which is newer |
427 |
|
428 |
yield return new CloudAction(CloudActionType.MustSynch, |
429 |
localFile, objectInfo, state, BlockSize, BlockHash); |
430 |
} |
431 |
else |
432 |
{ |
433 |
//If there is no match we add them to the localFiles list |
434 |
//but only if the file is not marked for deletion |
435 |
var targetFile = Path.Combine(FileAgent.RootPath, relativePath); |
436 |
var fileStatus = StatusKeeper.GetFileStatus(targetFile); |
437 |
if (fileStatus != FileStatus.Deleted) |
438 |
{ |
439 |
//Remote files should be downloaded |
440 |
yield return new CloudAction(CloudActionType.DownloadUnconditional, objectInfo); |
441 |
} |
442 |
} |
443 |
} |
444 |
} |
445 |
|
446 |
private void ProcessDeletedFiles(IEnumerable<ObjectInfo> trashObjects) |
447 |
{ |
448 |
foreach (var trashObject in trashObjects) |
449 |
{ |
450 |
var relativePath = trashObject.RelativeUrlToFilePath(CloudClient.UserName); |
451 |
//and remove any matching objects from the list, adding them to the commonObjects list |
452 |
FileAgent.Delete(relativePath); |
453 |
} |
454 |
} |
455 |
|
456 |
|
457 |
private void RenameCloudFile(string account, string container,string oldFileName, string newPath, string newFileName) |
458 |
{ |
459 |
if (String.IsNullOrWhiteSpace(account)) |
460 |
throw new ArgumentNullException("account"); |
461 |
if (String.IsNullOrWhiteSpace(container)) |
462 |
throw new ArgumentNullException("container"); |
463 |
if (String.IsNullOrWhiteSpace(oldFileName)) |
464 |
throw new ArgumentNullException("oldFileName"); |
465 |
if (String.IsNullOrWhiteSpace(oldFileName)) |
466 |
throw new ArgumentNullException("newPath"); |
467 |
if (String.IsNullOrWhiteSpace(oldFileName)) |
468 |
throw new ArgumentNullException("newFileName"); |
469 |
Contract.EndContractBlock(); |
470 |
//The local file is already renamed |
471 |
this.StatusKeeper.SetFileOverlayStatus(newPath, FileOverlayStatus.Modified); |
472 |
|
473 |
CloudClient.MoveObject(account, container, oldFileName, container, newFileName); |
474 |
|
475 |
this.StatusKeeper.SetFileStatus(newPath, FileStatus.Unchanged); |
476 |
this.StatusKeeper.SetFileOverlayStatus(newPath, FileOverlayStatus.Normal); |
477 |
NativeMethods.RaiseChangeNotification(newPath); |
478 |
} |
479 |
|
480 |
private void DeleteCloudFile(string account,string container, string fileName) |
481 |
{ |
482 |
if (String.IsNullOrWhiteSpace(account)) |
483 |
throw new ArgumentNullException("account"); |
484 |
if (String.IsNullOrWhiteSpace(container)) |
485 |
throw new ArgumentNullException("container"); |
486 |
if (String.IsNullOrWhiteSpace(container)) |
487 |
throw new ArgumentNullException("container"); |
488 |
|
489 |
if (String.IsNullOrWhiteSpace(fileName)) |
490 |
throw new ArgumentNullException("fileName"); |
491 |
if (Path.IsPathRooted(fileName)) |
492 |
throw new ArgumentException("The fileName should not be rooted","fileName"); |
493 |
Contract.EndContractBlock(); |
494 |
|
495 |
using ( log4net.LogicalThreadContext.Stacks["DeleteCloudFile"].Push("Delete")) |
496 |
{ |
497 |
var info = FileAgent.GetFileInfo(fileName); |
498 |
var path = info.FullName.ToLower(); |
499 |
this.StatusKeeper.SetFileOverlayStatus(path, FileOverlayStatus.Modified); |
500 |
|
501 |
CloudClient.DeleteObject(account, container, fileName, TrashContainer); |
502 |
|
503 |
this.StatusKeeper.ClearFileStatus(path); |
504 |
} |
505 |
} |
506 |
|
507 |
//Download a file. |
508 |
private void DownloadCloudFile(string account,string container, Uri relativeUrl, string localPath) |
509 |
{ |
510 |
if (String.IsNullOrWhiteSpace(account)) |
511 |
throw new ArgumentNullException("account"); |
512 |
if (String.IsNullOrWhiteSpace(container)) |
513 |
throw new ArgumentNullException("container"); |
514 |
if (relativeUrl == null) |
515 |
throw new ArgumentNullException("relativeUrl"); |
516 |
if (String.IsNullOrWhiteSpace(localPath)) |
517 |
throw new ArgumentNullException("localPath"); |
518 |
if (!Path.IsPathRooted(localPath)) |
519 |
throw new ArgumentException("The localPath must be rooted", "localPath"); |
520 |
Contract.EndContractBlock(); |
521 |
|
522 |
var download=Task.Factory.Iterate(DownloadIterator(account,container, relativeUrl, localPath)); |
523 |
download.Wait(); |
524 |
} |
525 |
|
526 |
private IEnumerable<Task> DownloadIterator(string account,string container, Uri relativeUrl, string localPath) |
527 |
{ |
528 |
if (String.IsNullOrWhiteSpace(account)) |
529 |
throw new ArgumentNullException("account"); |
530 |
if (String.IsNullOrWhiteSpace(container)) |
531 |
throw new ArgumentNullException("container"); |
532 |
if (relativeUrl==null) |
533 |
throw new ArgumentNullException("relativeUrl"); |
534 |
if (String.IsNullOrWhiteSpace(localPath)) |
535 |
throw new ArgumentNullException("localPath"); |
536 |
if (!Path.IsPathRooted(localPath)) |
537 |
throw new ArgumentException("The localPath must be rooted", "localPath"); |
538 |
Contract.EndContractBlock(); |
539 |
|
540 |
var url = relativeUrl.ToString(); |
541 |
if (url.EndsWith(".ignore",StringComparison.InvariantCultureIgnoreCase)) |
542 |
yield break; |
543 |
|
544 |
//Are we already downloading or uploading the file? |
545 |
using (var gate=NetworkGate.Acquire(localPath, NetworkOperation.Downloading)) |
546 |
{ |
547 |
if (gate.Failed) |
548 |
yield break; |
549 |
//The file's hashmap will be stored in the same location with the extension .hashmap |
550 |
//var hashPath = Path.Combine(FileAgent.FragmentsPath, relativePath + ".hashmap"); |
551 |
|
552 |
//Retrieve the hashmap from the server |
553 |
var getHashMap = CloudClient.GetHashMap(account, container, url); |
554 |
yield return getHashMap; |
555 |
|
556 |
var serverHash=getHashMap.Result; |
557 |
//If it's a small file |
558 |
var downloadTask=(serverHash.Hashes.Count == 1 ) |
559 |
//Download it in one go |
560 |
? DownloadEntireFile(account,container, relativeUrl, localPath) |
561 |
//Otherwise download it block by block |
562 |
: DownloadWithBlocks(account,container, relativeUrl, localPath, serverHash); |
563 |
|
564 |
yield return downloadTask; |
565 |
|
566 |
|
567 |
//Retrieve the object's metadata |
568 |
var info=CloudClient.GetObjectInfo(account, container, url); |
569 |
//And store it |
570 |
StatusKeeper.StoreInfo(localPath, info); |
571 |
|
572 |
//Notify listeners that a local file has changed |
573 |
StatusNotification.NotifyChangedFile(localPath); |
574 |
|
575 |
} |
576 |
} |
577 |
|
578 |
//Download a small file with a single GET operation |
579 |
private Task DownloadEntireFile(string account,string container, Uri relativeUrl, string localPath) |
580 |
{ |
581 |
if (String.IsNullOrWhiteSpace(account)) |
582 |
throw new ArgumentNullException("account"); |
583 |
if (String.IsNullOrWhiteSpace(container)) |
584 |
throw new ArgumentNullException("container"); |
585 |
if (relativeUrl == null) |
586 |
throw new ArgumentNullException("relativeUrl"); |
587 |
if (String.IsNullOrWhiteSpace(localPath)) |
588 |
throw new ArgumentNullException("localPath"); |
589 |
if (!Path.IsPathRooted(localPath)) |
590 |
throw new ArgumentException("The localPath must be rooted", "localPath"); |
591 |
Contract.EndContractBlock(); |
592 |
|
593 |
//Calculate the relative file path for the new file |
594 |
var relativePath = relativeUrl.RelativeUriToFilePath(); |
595 |
//The file will be stored in a temporary location while downloading with an extension .download |
596 |
var tempPath = Path.Combine(FileAgent.FragmentsPath, relativePath + ".download"); |
597 |
//Make sure the target folder exists. DownloadFileTask will not create the folder |
598 |
var directoryPath = Path.GetDirectoryName(tempPath); |
599 |
if (!Directory.Exists(directoryPath)) |
600 |
Directory.CreateDirectory(directoryPath); |
601 |
|
602 |
//Download the object to the temporary location |
603 |
var getObject = CloudClient.GetObject(account, container, relativeUrl.ToString(), tempPath).ContinueWith(t => |
604 |
{ |
605 |
t.PropagateExceptions(); |
606 |
//And move it to its actual location once downloading is finished |
607 |
if (File.Exists(localPath)) |
608 |
File.Replace(tempPath,localPath,null,true); |
609 |
else |
610 |
File.Move(tempPath,localPath); |
611 |
}); |
612 |
return getObject; |
613 |
} |
614 |
|
615 |
//Download a file asynchronously using blocks |
616 |
public Task DownloadWithBlocks(string account,string container, Uri relativeUrl, string localPath, TreeHash serverHash) |
617 |
{ |
618 |
if (String.IsNullOrWhiteSpace(account)) |
619 |
throw new ArgumentNullException("account"); |
620 |
if (String.IsNullOrWhiteSpace(container)) |
621 |
throw new ArgumentNullException("container"); |
622 |
if (relativeUrl == null) |
623 |
throw new ArgumentNullException("relativeUrl"); |
624 |
if (String.IsNullOrWhiteSpace(localPath)) |
625 |
throw new ArgumentNullException("localPath"); |
626 |
if (!Path.IsPathRooted(localPath)) |
627 |
throw new ArgumentException("The localPath must be rooted", "localPath"); |
628 |
if (serverHash == null) |
629 |
throw new ArgumentNullException("serverHash"); |
630 |
Contract.EndContractBlock(); |
631 |
|
632 |
return Task.Factory.Iterate(BlockDownloadIterator(account,container, relativeUrl, localPath, serverHash)); |
633 |
} |
634 |
|
635 |
private IEnumerable<Task> BlockDownloadIterator(string account,string container,Uri relativeUrl, string localPath,TreeHash serverHash) |
636 |
{ |
637 |
if (String.IsNullOrWhiteSpace(account)) |
638 |
throw new ArgumentNullException("account"); |
639 |
if (String.IsNullOrWhiteSpace(container)) |
640 |
throw new ArgumentNullException("container"); |
641 |
if (relativeUrl == null) |
642 |
throw new ArgumentNullException("relativeUrl"); |
643 |
if (String.IsNullOrWhiteSpace(localPath)) |
644 |
throw new ArgumentNullException("localPath"); |
645 |
if (!Path.IsPathRooted(localPath)) |
646 |
throw new ArgumentException("The localPath must be rooted", "localPath"); |
647 |
if(serverHash==null) |
648 |
throw new ArgumentNullException("serverHash"); |
649 |
Contract.EndContractBlock(); |
650 |
|
651 |
|
652 |
//Calculate the relative file path for the new file |
653 |
var relativePath = relativeUrl.RelativeUriToFilePath(); |
654 |
var blockUpdater = new BlockUpdater(FileAgent.FragmentsPath, localPath, relativePath, serverHash); |
655 |
|
656 |
|
657 |
|
658 |
//Calculate the file's treehash |
659 |
var calcHash = Signature.CalculateTreeHashAsync(localPath, this.BlockSize,BlockHash); |
660 |
yield return calcHash; |
661 |
var treeHash = calcHash.Result; |
662 |
|
663 |
//And compare it with the server's hash |
664 |
var upHashes = serverHash.GetHashesAsStrings(); |
665 |
var localHashes = treeHash.HashDictionary; |
666 |
for (int i = 0; i < upHashes.Length; i++) |
667 |
{ |
668 |
//For every non-matching hash |
669 |
var upHash = upHashes[i]; |
670 |
if (!localHashes.ContainsKey(upHash)) |
671 |
{ |
672 |
if (blockUpdater.UseOrphan(i, upHash)) |
673 |
{ |
674 |
Log.InfoFormat("[BLOCK GET] ORPHAN FOUND for {0} of {1} for {2}", i, upHashes.Length, localPath); |
675 |
continue; |
676 |
} |
677 |
Log.InfoFormat("[BLOCK GET] START {0} of {1} for {2}", i, upHashes.Length, localPath); |
678 |
var start = i*BlockSize; |
679 |
//To download the last block just pass a null for the end of the range |
680 |
long? end = null; |
681 |
if (i < upHashes.Length - 1 ) |
682 |
end= ((i + 1)*BlockSize) ; |
683 |
|
684 |
//Download the missing block |
685 |
var getBlock = CloudClient.GetBlock(account, container, relativeUrl, start, end); |
686 |
yield return getBlock; |
687 |
var block = getBlock.Result; |
688 |
|
689 |
//and store it |
690 |
yield return blockUpdater.StoreBlock(i, block); |
691 |
|
692 |
|
693 |
Log.InfoFormat("[BLOCK GET] FINISH {0} of {1} for {2}", i, upHashes.Length, localPath); |
694 |
} |
695 |
} |
696 |
|
697 |
blockUpdater.Commit(); |
698 |
Log.InfoFormat("[BLOCK GET] COMPLETE {0}", localPath); |
699 |
} |
700 |
|
701 |
|
702 |
private void UploadCloudFile(string account,string container,FileInfo fileInfo, string hash,string topHash) |
703 |
{ |
704 |
if (String.IsNullOrWhiteSpace(account)) |
705 |
throw new ArgumentNullException("account"); |
706 |
if (String.IsNullOrWhiteSpace(container)) |
707 |
throw new ArgumentNullException("container"); |
708 |
if (fileInfo == null) |
709 |
throw new ArgumentNullException("fileInfo"); |
710 |
if (String.IsNullOrWhiteSpace(hash)) |
711 |
throw new ArgumentNullException("hash"); |
712 |
if (topHash == null) |
713 |
throw new ArgumentNullException("topHash"); |
714 |
Contract.EndContractBlock(); |
715 |
|
716 |
var upload = Task.Factory.Iterate(UploadIterator(account,container,fileInfo, hash, topHash)); |
717 |
upload.Wait(); |
718 |
} |
719 |
|
720 |
private IEnumerable<Task> UploadIterator(string account,string container,FileInfo fileInfo, string hash,string topHash) |
721 |
{ |
722 |
if (String.IsNullOrWhiteSpace(account)) |
723 |
throw new ArgumentNullException("account"); |
724 |
if (String.IsNullOrWhiteSpace(container)) |
725 |
throw new ArgumentNullException("container"); |
726 |
if (fileInfo == null) |
727 |
throw new ArgumentNullException("fileInfo"); |
728 |
if (String.IsNullOrWhiteSpace(hash)) |
729 |
throw new ArgumentNullException("hash"); |
730 |
if (topHash == null) |
731 |
throw new ArgumentNullException("topHash"); |
732 |
Contract.EndContractBlock(); |
733 |
|
734 |
if (fileInfo.Extension.Equals("ignore", StringComparison.InvariantCultureIgnoreCase)) |
735 |
yield break; |
736 |
|
737 |
var url = fileInfo.AsRelativeUrlTo(FileAgent.RootPath); |
738 |
|
739 |
var fullFileName = fileInfo.FullName; |
740 |
using(var gate=NetworkGate.Acquire(fullFileName,NetworkOperation.Uploading)) |
741 |
{ |
742 |
//Abort if the file is already being uploaded or downloaded |
743 |
if (gate.Failed) |
744 |
yield break; |
745 |
|
746 |
|
747 |
//Even if GetObjectInfo times out, we can proceed with the upload |
748 |
var info = CloudClient.GetObjectInfo(account, container, url); |
749 |
|
750 |
//If the file hashes match, abort the upload |
751 |
if (hash.Equals(info.Hash, StringComparison.InvariantCultureIgnoreCase) || |
752 |
topHash.Equals(info.Hash, StringComparison.InvariantCultureIgnoreCase)) |
753 |
{ |
754 |
//but store any metadata changes |
755 |
this.StatusKeeper.StoreInfo(fullFileName, info); |
756 |
Log.InfoFormat("Skip upload of {0}, hashes match", fullFileName); |
757 |
yield break; |
758 |
} |
759 |
|
760 |
//Mark the file as modified while we upload it |
761 |
StatusKeeper.SetFileOverlayStatus(fullFileName, FileOverlayStatus.Modified); |
762 |
//And then upload it |
763 |
|
764 |
//If the file is larger than the block size, try a hashmap PUT |
765 |
if (fileInfo.Length > BlockSize ) |
766 |
{ |
767 |
//To upload using a hashmap |
768 |
//First, calculate the tree hash |
769 |
var treeHash = Signature.CalculateTreeHashAsync(fileInfo.FullName, BlockSize, BlockHash); |
770 |
yield return treeHash; |
771 |
|
772 |
yield return Task.Factory.Iterate(UploadWithHashMap(account,container,fileInfo,url,treeHash)); |
773 |
|
774 |
} |
775 |
else |
776 |
{ |
777 |
//Otherwise do a regular PUT |
778 |
yield return CloudClient.PutObject(account, container, url, fullFileName, hash); |
779 |
} |
780 |
//If everything succeeds, change the file and overlay status to normal |
781 |
this.StatusKeeper.SetFileState(fullFileName, FileStatus.Unchanged, FileOverlayStatus.Normal); |
782 |
} |
783 |
//Notify the Shell to update the overlays |
784 |
NativeMethods.RaiseChangeNotification(fullFileName); |
785 |
StatusNotification.NotifyChangedFile(fullFileName); |
786 |
} |
787 |
|
788 |
public IEnumerable<Task> UploadWithHashMap(string account,string container,FileInfo fileInfo,string url,Task<TreeHash> treeHash) |
789 |
{ |
790 |
if (String.IsNullOrWhiteSpace(account)) |
791 |
throw new ArgumentNullException("account"); |
792 |
if (String.IsNullOrWhiteSpace(container)) |
793 |
throw new ArgumentNullException("container"); |
794 |
if (fileInfo == null) |
795 |
throw new ArgumentNullException("fileInfo"); |
796 |
if (String.IsNullOrWhiteSpace(url)) |
797 |
throw new ArgumentNullException(url); |
798 |
if (treeHash==null) |
799 |
throw new ArgumentNullException("treeHash"); |
800 |
Contract.EndContractBlock(); |
801 |
|
802 |
var fullFileName = fileInfo.FullName; |
803 |
|
804 |
//Send the hashmap to the server |
805 |
var hashPut = CloudClient.PutHashMap(account, container, url, treeHash.Result); |
806 |
yield return hashPut; |
807 |
|
808 |
var missingHashes = hashPut.Result; |
809 |
//If the server returns no missing hashes, we are done |
810 |
while (missingHashes.Count > 0) |
811 |
{ |
812 |
|
813 |
var buffer = new byte[BlockSize]; |
814 |
foreach (var missingHash in missingHashes) |
815 |
{ |
816 |
//Find the proper block |
817 |
var blockIndex = treeHash.Result.HashDictionary[missingHash]; |
818 |
var offset = blockIndex*BlockSize; |
819 |
|
820 |
var read = fileInfo.Read(buffer, offset, BlockSize); |
821 |
|
822 |
//And upload the block |
823 |
var postBlock = CloudClient.PostBlock(account, container, buffer, 0, read); |
824 |
|
825 |
//We have to handle possible exceptions in a continuation because |
826 |
//*yield return* can't appear inside a try block |
827 |
yield return postBlock.ContinueWith(t => |
828 |
t.ReportExceptions( |
829 |
exc => Log.ErrorFormat("[ERROR] uploading block {0} of {1}\n{2}", blockIndex, fullFileName, exc), |
830 |
()=>Log.InfoFormat("[BLOCK] Block {0} of {1} uploaded", blockIndex,fullFileName))); |
831 |
} |
832 |
|
833 |
//Repeat until there are no more missing hashes |
834 |
hashPut = CloudClient.PutHashMap(account, container, url, treeHash.Result); |
835 |
yield return hashPut; |
836 |
missingHashes = hashPut.Result; |
837 |
} |
838 |
} |
839 |
|
840 |
|
841 |
} |
842 |
|
843 |
|
844 |
|
845 |
|
846 |
} |