Revision c945b450 trunk/Pithos.Core/Agents/PollAgent.cs
b/trunk/Pithos.Core/Agents/PollAgent.cs | ||
---|---|---|
45 | 45 |
using System.Diagnostics; |
46 | 46 |
using System.Diagnostics.Contracts; |
47 | 47 |
using System.IO; |
48 |
using System.Linq.Expressions; |
|
48 | 49 |
using System.Reflection; |
49 | 50 |
using System.Threading; |
50 | 51 |
using System.Threading.Tasks; |
... | ... | |
59 | 60 |
using System.Collections.Generic; |
60 | 61 |
using System.Linq; |
61 | 62 |
|
63 |
[DebuggerDisplay("{FilePath} C:{C} L:{L} S:{S}")] |
|
64 |
public class StateTuple |
|
65 |
{ |
|
66 |
public string FilePath { get; private set; } |
|
67 |
|
|
68 |
public string L |
|
69 |
{ |
|
70 |
get { return FileState==null?null:FileState.Checksum; } |
|
71 |
} |
|
72 |
|
|
73 |
public string C { get; set; } |
|
74 |
|
|
75 |
public string S |
|
76 |
{ |
|
77 |
get { return ObjectInfo== null ? null : ObjectInfo.Hash; } |
|
78 |
} |
|
79 |
|
|
80 |
private FileSystemInfo _fileInfo; |
|
81 |
public FileSystemInfo FileInfo |
|
82 |
{ |
|
83 |
get { return _fileInfo; } |
|
84 |
set |
|
85 |
{ |
|
86 |
_fileInfo = value; |
|
87 |
FilePath = value.FullName; |
|
88 |
} |
|
89 |
} |
|
90 |
|
|
91 |
public FileState FileState { get; set; } |
|
92 |
public ObjectInfo ObjectInfo{ get; set; } |
|
93 |
|
|
94 |
public StateTuple() { } |
|
95 |
|
|
96 |
public StateTuple(FileSystemInfo info) |
|
97 |
{ |
|
98 |
FileInfo = info; |
|
99 |
} |
|
100 |
|
|
101 |
|
|
102 |
} |
|
103 |
|
|
104 |
|
|
62 | 105 |
/// <summary> |
63 | 106 |
/// PollAgent periodically polls the server to detect object changes. The agent retrieves a listing of all |
64 | 107 |
/// objects and compares it with a previously cached version to detect differences. |
... | ... | |
84 | 127 |
|
85 | 128 |
public IStatusNotification StatusNotification { get; set; } |
86 | 129 |
|
130 |
private CancellationTokenSource _currentOperationCancellation = new CancellationTokenSource(); |
|
131 |
|
|
132 |
public void CancelCurrentOperation() |
|
133 |
{ |
|
134 |
//What does it mean to cancel the current upload/download? |
|
135 |
//Obviously, the current operation will be cancelled by throwing |
|
136 |
//a cancellation exception. |
|
137 |
// |
|
138 |
//The default behavior is to retry any operations that throw. |
|
139 |
//Obviously this is not what we want in this situation. |
|
140 |
//The cancelled operation should NOT bea retried. |
|
141 |
// |
|
142 |
//This can be done by catching the cancellation exception |
|
143 |
//and avoiding the retry. |
|
144 |
// |
|
145 |
|
|
146 |
//Have to reset the cancellation source - it is not possible to reset the source |
|
147 |
//Have to prevent a case where an operation requests a token from the old source |
|
148 |
var oldSource = Interlocked.Exchange(ref _currentOperationCancellation, new CancellationTokenSource()); |
|
149 |
oldSource.Cancel(); |
|
150 |
|
|
151 |
} |
|
152 |
|
|
87 | 153 |
public bool Pause |
88 | 154 |
{ |
89 | 155 |
get { |
... | ... | |
264 | 330 |
var remoteObjects = (from objectList in listTasks |
265 | 331 |
where (string)objectList.AsyncState != "trash" |
266 | 332 |
from obj in objectList.Result |
333 |
orderby obj.Bytes ascending |
|
267 | 334 |
select obj).ToList(); |
268 | 335 |
|
269 | 336 |
//Get the latest remote object modification date, only if it is after |
... | ... | |
300 | 367 |
StatusKeeper.CleanupOrphanStates(); |
301 | 368 |
StatusKeeper.CleanupStaleStates(accountInfo, cleanRemotes); |
302 | 369 |
|
303 |
var differencer = _differencer.PostSnapshot(accountInfo, cleanRemotes); |
|
370 |
//var differencer = _differencer.PostSnapshot(accountInfo, cleanRemotes);
|
|
304 | 371 |
|
305 | 372 |
var filterUris = Selectives.SelectiveUris[accountInfo.AccountKey]; |
306 | 373 |
|
307 | 374 |
|
308 |
//On the first run |
|
309 |
if (_firstPoll) |
|
310 |
{ |
|
311 |
MarkSuspectedDeletes(accountInfo, cleanRemotes); |
|
312 |
} |
|
313 |
ProcessDeletedFiles(accountInfo, differencer.Deleted.FilterDirectlyBelow(filterUris)); |
|
375 |
//Get the local files here |
|
376 |
var agent = AgentLocator<FileAgent>.Get(accountInfo.AccountPath); |
|
314 | 377 |
|
315 |
// @@@ NEED To add previous state here as well, To compare with previous hash
|
|
378 |
var files = LoadLocalFileTuples(accountInfo);
|
|
316 | 379 |
|
317 |
|
|
380 |
var states = FileState.Queryable.ToList(); |
|
381 |
|
|
382 |
var infos = (from remote in cleanRemotes |
|
383 |
let path = remote.RelativeUrlToFilePath(accountInfo.UserName) |
|
384 |
let info=agent.GetFileSystemInfo(path) |
|
385 |
select Tuple.Create(info.FullName,remote)) |
|
386 |
.ToList(); |
|
387 |
|
|
388 |
var token = _currentOperationCancellation.Token; |
|
389 |
|
|
390 |
var tuples = MergeSources(infos, files, states).ToList(); |
|
318 | 391 |
|
319 |
//Create a list of actions from the remote files |
|
320 | 392 |
|
321 |
var allActions = MovesToActions(accountInfo,differencer.Moved.FilterDirectlyBelow(filterUris)) |
|
322 |
.Union( |
|
323 |
ChangesToActions(accountInfo, differencer.Changed.FilterDirectlyBelow(filterUris))) |
|
324 |
.Union( |
|
325 |
CreatesToActions(accountInfo, differencer.Created.FilterDirectlyBelow(filterUris))); |
|
326 |
|
|
327 |
//And remove those that are already being processed by the agent |
|
328 |
var distinctActions = allActions |
|
329 |
.Except(NetworkAgent.GetEnumerable(), new LocalFileComparer()) |
|
330 |
.ToList(); |
|
331 |
|
|
332 |
await _unPauseEvent.WaitAsync(); |
|
333 |
//Queue all the actions |
|
334 |
foreach (var message in distinctActions) |
|
393 |
foreach (var tuple in tuples) |
|
335 | 394 |
{ |
336 |
NetworkAgent.Post(message); |
|
395 |
await _unPauseEvent.WaitAsync(); |
|
396 |
|
|
397 |
SyncSingleItem(accountInfo, tuple, agent, token); |
|
337 | 398 |
} |
338 | 399 |
|
400 |
|
|
401 |
//On the first run |
|
402 |
/* |
|
403 |
if (_firstPoll) |
|
404 |
{ |
|
405 |
MarkSuspectedDeletes(accountInfo, cleanRemotes); |
|
406 |
} |
|
407 |
*/ |
|
408 |
|
|
409 |
|
|
339 | 410 |
Log.Info("[LISTENER] End Processing"); |
340 | 411 |
} |
341 | 412 |
} |
... | ... | |
350 | 421 |
} |
351 | 422 |
} |
352 | 423 |
|
424 |
private static List<Tuple<FileSystemInfo, string>> LoadLocalFileTuples(AccountInfo accountInfo) |
|
425 |
{ |
|
426 |
using (ThreadContext.Stacks["Account Files Hashing"].Push(accountInfo.UserName)) |
|
427 |
{ |
|
428 |
|
|
429 |
var localInfos = AgentLocator<FileAgent>.Get(accountInfo.AccountPath).EnumerateFileSystemInfos(); |
|
430 |
//Use the queue to retry locked file hashing |
|
431 |
var fileQueue = new Queue<FileSystemInfo>(localInfos); |
|
432 |
|
|
433 |
var results = new List<Tuple<FileSystemInfo, string>>(); |
|
434 |
|
|
435 |
while (fileQueue.Count > 0) |
|
436 |
{ |
|
437 |
var file = fileQueue.Dequeue(); |
|
438 |
using (ThreadContext.Stacks["File"].Push(file.FullName)) |
|
439 |
{ |
|
440 |
try |
|
441 |
{ |
|
442 |
var hash = (file is DirectoryInfo) |
|
443 |
? "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855" |
|
444 |
: Signature.CalculateTreeHash(file, accountInfo.BlockSize, |
|
445 |
accountInfo.BlockHash) |
|
446 |
. |
|
447 |
TopHash.ToHashString(); |
|
448 |
results.Add(Tuple.Create(file, hash)); |
|
449 |
} |
|
450 |
catch (IOException exc) |
|
451 |
{ |
|
452 |
Log.WarnFormat("[HASH] File in use, will retry [{0}]", exc); |
|
453 |
fileQueue.Enqueue(file); |
|
454 |
} |
|
455 |
} |
|
456 |
} |
|
457 |
|
|
458 |
return results; |
|
459 |
} |
|
460 |
} |
|
461 |
|
|
462 |
private void SyncSingleItem(AccountInfo accountInfo, StateTuple tuple, FileAgent agent, CancellationToken token) |
|
463 |
{ |
|
464 |
Log.DebugFormat("Sync [{0}] C:[{1}] L:[{2}] S:[{3}]",tuple.FilePath,tuple.C,tuple.L,tuple.S); |
|
465 |
|
|
466 |
var localFilePath = tuple.FilePath; |
|
467 |
//Don't use the tuple info, it may have been deleted |
|
468 |
var localInfo = FileInfoExtensions.FromPath(localFilePath); |
|
469 |
|
|
470 |
// Local file unchanged? If both C and L are null, make sure it's because |
|
471 |
//both the file is missing and the state checksum is not missing |
|
472 |
if (tuple.C == tuple.L && (localInfo.Exists || tuple.FileState==null)) |
|
473 |
{ |
|
474 |
//No local changes |
|
475 |
//Server unchanged? |
|
476 |
if (tuple.S == tuple.L) |
|
477 |
{ |
|
478 |
// No server changes |
|
479 |
; |
|
480 |
} |
|
481 |
else |
|
482 |
{ |
|
483 |
//Different from server |
|
484 |
if (Selectives.IsSelected(accountInfo, localFilePath)) |
|
485 |
{ |
|
486 |
//Does the server file exist? |
|
487 |
if (tuple.S == null) |
|
488 |
{ |
|
489 |
//Server file doesn't exist |
|
490 |
//deleteObjectFromLocal() |
|
491 |
StatusKeeper.SetFileState(localFilePath, FileStatus.Deleted, |
|
492 |
FileOverlayStatus.Deleted, ""); |
|
493 |
agent.Delete(localFilePath); |
|
494 |
//updateRecord(Remove C, L) |
|
495 |
StatusKeeper.ClearFileStatus(localFilePath); |
|
496 |
} |
|
497 |
else |
|
498 |
{ |
|
499 |
//Server file exists |
|
500 |
//downloadServerObject() // Result: L = S |
|
501 |
StatusKeeper.SetFileState(localFilePath, FileStatus.Modified, |
|
502 |
FileOverlayStatus.Modified, ""); |
|
503 |
NetworkAgent.Downloader.DownloadCloudFile(accountInfo, |
|
504 |
tuple.ObjectInfo, |
|
505 |
localFilePath, token).Wait(token); |
|
506 |
//updateRecord( L = S ) |
|
507 |
StatusKeeper.UpdateFileChecksum(localFilePath, tuple.FileState==null?"":tuple.FileState.ShortHash, |
|
508 |
tuple.ObjectInfo.Hash); |
|
509 |
|
|
510 |
StatusKeeper.SetFileState(localFilePath, FileStatus.Unchanged, |
|
511 |
FileOverlayStatus.Normal, ""); |
|
512 |
} |
|
513 |
} |
|
514 |
} |
|
515 |
} |
|
516 |
else |
|
517 |
{ |
|
518 |
//Local changes found |
|
519 |
|
|
520 |
//Server unchanged? |
|
521 |
if (tuple.S == tuple.L) |
|
522 |
{ |
|
523 |
//The FileAgent selective sync checks for new root folder files |
|
524 |
if (!agent.Ignore(localFilePath)) |
|
525 |
{ |
|
526 |
if ((tuple.C == null || !localInfo.Exists) && tuple.ObjectInfo != null) |
|
527 |
{ |
|
528 |
//deleteObjectFromServer() |
|
529 |
DeleteCloudFile(accountInfo, tuple); |
|
530 |
//updateRecord( Remove L, S) |
|
531 |
} |
|
532 |
else |
|
533 |
{ |
|
534 |
//uploadLocalObject() // Result: S = C, L = S |
|
535 |
var isUnselected = agent.IsUnselectedRootFolder(tuple.FilePath); |
|
536 |
|
|
537 |
//Debug.Assert(tuple.FileState !=null); |
|
538 |
var action = new CloudUploadAction(accountInfo, localInfo, tuple.FileState, |
|
539 |
accountInfo.BlockSize, accountInfo.BlockHash, |
|
540 |
"Poll", isUnselected); |
|
541 |
NetworkAgent.Uploader.UploadCloudFile(action, token).Wait(token); |
|
542 |
|
|
543 |
|
|
544 |
//updateRecord( S = C ) |
|
545 |
StatusKeeper.SetFileState(localFilePath, FileStatus.Unchanged, |
|
546 |
FileOverlayStatus.Normal, ""); |
|
547 |
if (isUnselected) |
|
548 |
{ |
|
549 |
ProcessChildren(accountInfo, tuple, agent, token); |
|
550 |
} |
|
551 |
} |
|
552 |
} |
|
553 |
} |
|
554 |
else |
|
555 |
{ |
|
556 |
if (Selectives.IsSelected(accountInfo, localFilePath)) |
|
557 |
{ |
|
558 |
if (tuple.C == tuple.S) |
|
559 |
{ |
|
560 |
// (Identical Changes) Result: L = S |
|
561 |
//doNothing() |
|
562 |
StatusKeeper.UpdateFileChecksum(localFilePath, tuple.FileState == null ? "" : tuple.FileState.ShortHash, |
|
563 |
tuple.ObjectInfo.Hash); |
|
564 |
StatusKeeper.SetFileState(localFilePath, FileStatus.Unchanged, |
|
565 |
FileOverlayStatus.Normal, ""); |
|
566 |
} |
|
567 |
else |
|
568 |
{ |
|
569 |
if ((tuple.C == null || !localInfo.Exists) && tuple.ObjectInfo != null ) |
|
570 |
{ |
|
571 |
//deleteObjectFromServer() |
|
572 |
DeleteCloudFile(accountInfo, tuple); |
|
573 |
//updateRecord(Remove L, S) |
|
574 |
} |
|
575 |
else |
|
576 |
{ |
|
577 |
ReportConflictForMismatch(localFilePath); |
|
578 |
//identifyAsConflict() // Manual action required |
|
579 |
} |
|
580 |
} |
|
581 |
} |
|
582 |
} |
|
583 |
} |
|
584 |
} |
|
585 |
|
|
586 |
private void DeleteCloudFile(AccountInfo accountInfo, StateTuple tuple) |
|
587 |
{ |
|
588 |
StatusKeeper.SetFileState(tuple.FilePath, FileStatus.Deleted, |
|
589 |
FileOverlayStatus.Deleted, ""); |
|
590 |
NetworkAgent.DeleteAgent.DeleteCloudFile(accountInfo, tuple.ObjectInfo); |
|
591 |
StatusKeeper.ClearFileStatus(tuple.FilePath); |
|
592 |
} |
|
593 |
|
|
594 |
private void ProcessChildren(AccountInfo accountInfo, StateTuple tuple, FileAgent agent, CancellationToken token) |
|
595 |
{ |
|
596 |
|
|
597 |
var dirInfo = tuple.FileInfo as DirectoryInfo; |
|
598 |
var folderTuples = from folder in dirInfo.EnumerateDirectories("*", SearchOption.AllDirectories) |
|
599 |
select new StateTuple(folder); |
|
600 |
var fileTuples = from file in dirInfo.EnumerateFiles("*", SearchOption.AllDirectories) |
|
601 |
select new StateTuple(file); |
|
602 |
|
|
603 |
//Process folders first, to ensure folders appear on the sever as soon as possible |
|
604 |
folderTuples.ApplyAction(t => SyncSingleItem(accountInfo, t, agent, token)); |
|
605 |
|
|
606 |
fileTuples.ApplyAction(t => SyncSingleItem(accountInfo, t, agent, token)); |
|
607 |
} |
|
608 |
|
|
609 |
private static IEnumerable<StateTuple> MergeSources( |
|
610 |
IEnumerable<Tuple<string, ObjectInfo>> infos, |
|
611 |
IEnumerable<Tuple<FileSystemInfo, string>> files, |
|
612 |
IEnumerable<FileState> states) |
|
613 |
{ |
|
614 |
var dct = new Dictionary<string, StateTuple>(); |
|
615 |
foreach (var file in files) |
|
616 |
{ |
|
617 |
var fsInfo = file.Item1; |
|
618 |
var fileHash = file.Item2; |
|
619 |
dct[fsInfo.FullName] = new StateTuple {FileInfo = fsInfo, C = fileHash}; |
|
620 |
} |
|
621 |
foreach (var state in states) |
|
622 |
{ |
|
623 |
StateTuple hashTuple; |
|
624 |
if (dct.TryGetValue(state.FilePath, out hashTuple)) |
|
625 |
{ |
|
626 |
hashTuple.FileState = state; |
|
627 |
} |
|
628 |
else |
|
629 |
{ |
|
630 |
var fsInfo = FileInfoExtensions.FromPath(state.FilePath); |
|
631 |
dct[state.FilePath] = new StateTuple {FileInfo = fsInfo, FileState = state}; |
|
632 |
} |
|
633 |
} |
|
634 |
foreach (var info in infos) |
|
635 |
{ |
|
636 |
StateTuple hashTuple; |
|
637 |
var filePath = info.Item1; |
|
638 |
var objectInfo = info.Item2; |
|
639 |
if (dct.TryGetValue(filePath, out hashTuple)) |
|
640 |
{ |
|
641 |
hashTuple.ObjectInfo = objectInfo; |
|
642 |
} |
|
643 |
else |
|
644 |
{ |
|
645 |
var fsInfo = FileInfoExtensions.FromPath(filePath); |
|
646 |
dct[filePath] = new StateTuple {FileInfo = fsInfo, ObjectInfo = objectInfo}; |
|
647 |
} |
|
648 |
} |
|
649 |
return dct.Values; |
|
650 |
} |
|
651 |
|
|
353 | 652 |
/// <summary> |
354 | 653 |
/// Returns the latest LastModified date from the list of objects, but only if it is before |
355 | 654 |
/// than the threshold value |
... | ... | |
388 | 687 |
return threshold; |
389 | 688 |
} |
390 | 689 |
|
391 |
readonly AccountsDifferencer _differencer = new AccountsDifferencer(); |
|
690 |
//readonly AccountsDifferencer _differencer = new AccountsDifferencer();
|
|
392 | 691 |
private Dictionary<Uri, List<Uri>> _selectiveUris = new Dictionary<Uri, List<Uri>>(); |
393 | 692 |
private bool _pause; |
394 | 693 |
|
... | ... | |
484 | 783 |
TraceLevel.Info); |
485 | 784 |
} |
486 | 785 |
|
786 |
private void ReportConflictForMismatch(string localFilePath) |
|
787 |
{ |
|
788 |
if (String.IsNullOrWhiteSpace(localFilePath)) |
|
789 |
throw new ArgumentNullException("localFilePath"); |
|
790 |
Contract.EndContractBlock(); |
|
791 |
|
|
792 |
StatusKeeper.SetFileState(localFilePath, FileStatus.Conflict, FileOverlayStatus.Conflict, "File changed at the server"); |
|
793 |
UpdateStatus(PithosStatus.HasConflicts); |
|
794 |
var message = String.Format("Conflict detected for file {0}", localFilePath); |
|
795 |
Log.Warn(message); |
|
796 |
StatusNotification.NotifyChange(message, TraceLevel.Warning); |
|
797 |
} |
|
798 |
|
|
799 |
|
|
800 |
|
|
487 | 801 |
/// <summary> |
488 | 802 |
/// Creates a Sync action for each changed server file |
489 | 803 |
/// </summary> |
... | ... | |
655 | 969 |
{ |
656 | 970 |
AccountInfo account; |
657 | 971 |
_accounts.TryRemove(accountInfo.AccountKey, out account); |
972 |
/* |
|
658 | 973 |
SnapshotDifferencer differencer; |
659 | 974 |
_differencer.Differencers.TryRemove(accountInfo.AccountKey, out differencer); |
975 |
*/ |
|
660 | 976 |
} |
661 | 977 |
|
662 | 978 |
public void SetSelectivePaths(AccountInfo accountInfo,Uri[] added, Uri[] removed) |
Also available in: Unified diff