Revision a9faac18 trunk/Pithos.Core/Agents/NetworkAgent.cs
b/trunk/Pithos.Core/Agents/NetworkAgent.cs | ||
---|---|---|
62 | 62 |
|
63 | 63 |
//A separate agent is used to execute delete actions immediatelly; |
64 | 64 |
private ActionBlock<CloudDeleteAction> _deleteAgent; |
65 |
|
|
66 |
//TODO: CHECK |
|
65 | 67 |
readonly ConcurrentDictionary<string,DateTime> _deletedFiles=new ConcurrentDictionary<string, DateTime>(); |
66 | 68 |
|
67 | 69 |
|
... | ... | |
80 | 82 |
|
81 | 83 |
private bool _firstPoll = true; |
82 | 84 |
|
83 |
private TaskCompletionSource<bool> _tcs; |
|
84 |
private readonly AsyncManualResetEvent _pauseAgent = new AsyncManualResetEvent(true); |
|
85 |
//The Sync Event signals a manual synchronisation |
|
86 |
private readonly AsyncManualResetEvent _syncEvent=new AsyncManualResetEvent(); |
|
87 |
//The Pause event stops the network agent to give priority to the deletion agent |
|
88 |
//Initially the event is signalled because we don't need to pause |
|
89 |
private readonly AsyncManualResetEvent _pauseEvent = new AsyncManualResetEvent(true); |
|
85 | 90 |
|
86 | 91 |
private ConcurrentDictionary<string, DateTime> _lastSeen = new ConcurrentDictionary<string, DateTime>(); |
87 | 92 |
|
... | ... | |
93 | 98 |
Action loop = null; |
94 | 99 |
loop = () => |
95 | 100 |
{ |
96 |
_pauseAgent.Wait();
|
|
101 |
_pauseEvent.Wait();
|
|
97 | 102 |
var message = inbox.Receive(); |
98 | 103 |
var process=message.Then(Process,inbox.CancellationToken); |
99 | 104 |
inbox.LoopAsync(process, loop); |
... | ... | |
102 | 107 |
}); |
103 | 108 |
|
104 | 109 |
_deleteAgent = new ActionBlock<CloudDeleteAction>(message =>ProcessDelete(message),new ExecutionDataflowBlockOptions{MaxDegreeOfParallelism=4}); |
105 |
/* |
|
106 |
Action loop = null; |
|
107 |
loop = () => |
|
108 |
{ |
|
109 |
var message = inbox.Receive(); |
|
110 |
var process = message.Then(ProcessDelete,inbox.CancellationToken); |
|
111 |
inbox.LoopAsync(process, loop); |
|
112 |
}; |
|
113 |
loop(); |
|
114 |
*/ |
|
115 |
|
|
116 | 110 |
} |
117 | 111 |
|
118 | 112 |
private async Task Process(CloudAction action) |
... | ... | |
230 | 224 |
/// A separate agent is used to process deletes because the main agent may be busy with a long operation. |
231 | 225 |
/// </para> |
232 | 226 |
/// </remarks> |
233 |
private async Task ProcessDelete(CloudDeleteAction action)
|
|
227 |
private void ProcessDelete(CloudDeleteAction action)
|
|
234 | 228 |
{ |
235 | 229 |
if (action == null) |
236 | 230 |
throw new ArgumentNullException("action"); |
... | ... | |
257 | 251 |
var key = GetFileKey(action.CloudFile); |
258 | 252 |
_deletedFiles[key] = DateTime.Now; |
259 | 253 |
|
260 |
_pauseAgent.Reset();
|
|
254 |
_pauseEvent.Reset();
|
|
261 | 255 |
// and then delete the file from the server |
262 | 256 |
DeleteCloudFile(accountInfo, cloudFile); |
263 | 257 |
|
... | ... | |
298 | 292 |
{ |
299 | 293 |
//Set the event when all delete actions are processed |
300 | 294 |
if (_deleteAgent.InputCount == 0) |
301 |
_pauseAgent.Set();
|
|
295 |
_pauseEvent.Set();
|
|
302 | 296 |
|
303 | 297 |
} |
304 | 298 |
} |
... | ... | |
404 | 398 |
throw new ArgumentException("The CloudAction.AccountInfo is empty","cloudAction"); |
405 | 399 |
Contract.EndContractBlock(); |
406 | 400 |
|
407 |
_pauseAgent.Wait();
|
|
401 |
_pauseEvent.Wait();
|
|
408 | 402 |
|
409 | 403 |
//If the action targets a local file, add a treehash calculation |
410 | 404 |
if (!(cloudAction is CloudDeleteAction) && cloudAction.LocalFile as FileInfo != null) |
... | ... | |
433 | 427 |
else |
434 | 428 |
_agent.Post(cloudAction); |
435 | 429 |
} |
430 |
|
|
436 | 431 |
|
437 |
/* class ObjectInfoByNameComparer:IEqualityComparer<ObjectInfo> |
|
438 |
{ |
|
439 |
public bool Equals(ObjectInfo x, ObjectInfo y) |
|
440 |
{ |
|
441 |
return x.Name.Equals(y.Name,StringComparison.InvariantCultureIgnoreCase); |
|
442 |
} |
|
443 |
|
|
444 |
public int GetHashCode(ObjectInfo obj) |
|
445 |
{ |
|
446 |
return obj.Name.ToLower().GetHashCode(); |
|
447 |
} |
|
448 |
}*/ |
|
449 |
|
|
432 |
/// <summary> |
|
433 |
/// Start a manual synchronization |
|
434 |
/// </summary> |
|
450 | 435 |
public void SynchNow() |
451 |
{ |
|
452 |
if (_tcs!=null) |
|
453 |
_tcs.TrySetResult(true); |
|
454 |
else |
|
455 |
{ |
|
456 |
//TODO: This may be OK for testing purposes, but we have no guarantee that it will |
|
457 |
//work properly in production |
|
458 |
PollRemoteFiles(repeat:false); |
|
459 |
} |
|
436 |
{ |
|
437 |
_syncEvent.Set(); |
|
460 | 438 |
} |
461 | 439 |
|
462 | 440 |
//Remote files are polled periodically. Any changes are processed |
463 |
public async Task PollRemoteFiles(DateTime? since = null,bool repeat=true)
|
|
441 |
public async Task PollRemoteFiles(DateTime? since = null) |
|
464 | 442 |
{ |
465 | 443 |
UpdateStatus(PithosStatus.Syncing); |
466 | 444 |
StatusNotification.Notify(new PollNotification()); |
... | ... | |
473 | 451 |
{ |
474 | 452 |
//Next time we will check for all changes since the current check minus 1 second |
475 | 453 |
//This is done to ensure there are no discrepancies due to clock differences |
476 |
DateTime current = DateTime.Now.AddSeconds(-1);
|
|
454 |
var current = DateTime.Now.AddSeconds(-1);
|
|
477 | 455 |
|
478 | 456 |
var tasks = from accountInfo in _accounts |
479 | 457 |
select ProcessAccountFiles(accountInfo, since); |
... | ... | |
482 | 460 |
|
483 | 461 |
_firstPoll = false; |
484 | 462 |
//Reschedule the poll with the current timestamp as a "since" value |
485 |
if (repeat) |
|
486 |
nextSince = current; |
|
487 |
else |
|
488 |
return; |
|
463 |
nextSince = current; |
|
489 | 464 |
} |
490 | 465 |
catch (Exception ex) |
491 | 466 |
{ |
... | ... | |
494 | 469 |
} |
495 | 470 |
|
496 | 471 |
UpdateStatus(PithosStatus.InSynch); |
497 |
//Wait for the polling interval to pass or the Manual flat to be toggled
|
|
472 |
//Wait for the polling interval to pass or the Sync event to be signalled
|
|
498 | 473 |
nextSince = await WaitForScheduledOrManualPoll(nextSince); |
499 | 474 |
|
500 | 475 |
PollRemoteFiles(nextSince); |
... | ... | |
502 | 477 |
} |
503 | 478 |
} |
504 | 479 |
|
480 |
/// <summary> |
|
481 |
/// Wait for the polling period to expire or a manual sync request |
|
482 |
/// </summary> |
|
483 |
/// <param name="since"></param> |
|
484 |
/// <returns></returns> |
|
505 | 485 |
private async Task<DateTime?> WaitForScheduledOrManualPoll(DateTime? since) |
506 |
{
|
|
507 |
_tcs = new TaskCompletionSource<bool>();
|
|
486 |
{ |
|
487 |
var sync=_syncEvent.WaitAsync();
|
|
508 | 488 |
var wait = TaskEx.Delay(TimeSpan.FromSeconds(Settings.PollingInterval), _agent.CancellationToken); |
509 |
var signaledTask = await TaskEx.WhenAny(_tcs.Task, wait); |
|
489 |
var signaledTask = await TaskEx.WhenAny(sync, wait); |
|
490 |
|
|
510 | 491 |
//If polling is signalled by SynchNow, ignore the since tag |
511 | 492 |
if (signaledTask is Task<bool>) |
512 | 493 |
return null; |
... | ... | |
524 | 505 |
|
525 | 506 |
using (log4net.ThreadContext.Stacks["Retrieve Remote"].Push(accountInfo.UserName)) |
526 | 507 |
{ |
527 |
await _pauseAgent.WaitAsync();
|
|
508 |
await _pauseEvent.WaitAsync();
|
|
528 | 509 |
|
529 | 510 |
Log.Info("Scheduled"); |
530 | 511 |
var client = new CloudFilesClient(accountInfo) |
... | ... | |
539 | 520 |
|
540 | 521 |
try |
541 | 522 |
{ |
542 |
await _pauseAgent.WaitAsync();
|
|
523 |
await _pauseEvent.WaitAsync();
|
|
543 | 524 |
//Get the poll time now. We may miss some deletions but it's better to keep a file that was deleted |
544 | 525 |
//than delete a file that was created while we were executing the poll |
545 | 526 |
var pollTime = DateTime.Now; |
... | ... | |
564 | 545 |
from obj in objectList.Result |
565 | 546 |
select obj; |
566 | 547 |
|
567 |
//TODO: Change the way deleted objects are detected. |
|
568 |
//The list operation returns all existing objects so we could detect deleted remote objects |
|
569 |
//by detecting objects that exist only locally. There are several cases where this is NOT the case: |
|
570 |
//1. The first time the application runs, as there may be files that were added while |
|
571 |
// the application was down. |
|
572 |
//2. An object that is currently being uploaded will not appear in the remote list |
|
573 |
// until the upload finishes. |
|
574 |
// SOLUTION 1: Check the upload/download queue for the file |
|
575 |
// SOLUTION 2: Check the SQLite states for the file's entry. If it is being uploaded, |
|
576 |
// or its last modification was after the current poll, don't delete it. This way we don't |
|
577 |
// delete objects whose upload finished too late to be included in the list. |
|
578 |
//We need to detect and protect against such situations |
|
579 |
//TODO: Does FileState have a LastModification field? |
|
580 |
//TODO: How do we update the LastModification field? Do we need to add SQLite triggers? |
|
581 |
// Do we need to use a proper SQLite schema? |
|
582 |
// We can create a trigger with |
|
583 |
// CREATE TRIGGER IF NOT EXISTS update_last_modified UPDATE ON FileState FOR EACH ROW |
|
584 |
// BEGIN |
|
585 |
// UPDATE FileState SET LastModification=datetime('now') WHERE Id=old.Id; |
|
586 |
// END; |
|
587 |
// |
|
588 |
//NOTE: Some files may have been deleted remotely while the application was down. |
|
589 |
// We DO have to delete those files. Checking the trash makes it easy to detect them, |
|
590 |
// Otherwise, we can't be really sure whether we need to upload or delete |
|
591 |
// the local-only files. |
|
592 |
// SOLUTION 1: Ask the user when such a local-only file is detected during the first poll. |
|
593 |
// SOLUTION 2: Mark conflict and ask the user as in #1 |
|
594 |
|
|
595 | 548 |
var trashObjects = dict["trash"].Result; |
596 | 549 |
var sharedObjects = dict["shared"].Result; |
597 | 550 |
|
... | ... | |
604 | 557 |
select trash; |
605 | 558 |
ProcessTrashedFiles(accountInfo, realTrash); |
606 | 559 |
|
607 |
|
|
608 | 560 |
var cleanRemotes = (from info in remoteObjects.Union(sharedObjects) |
609 | 561 |
let name = info.Name |
610 | 562 |
where !name.EndsWith(".ignore", StringComparison.InvariantCultureIgnoreCase) && |
... | ... | |
620 | 572 |
var allActions = ChangesToActions(accountInfo, differencer.Changed) |
621 | 573 |
.Union( |
622 | 574 |
CreatesToActions(accountInfo,differencer.Created)); |
623 |
|
|
624 | 575 |
|
625 |
//var relativePath = objectInfo.RelativeUrlToFilePath(accountInfo.UserName); |
|
626 |
|
|
627 | 576 |
//And remove those that are already being processed by the agent |
628 | 577 |
var distinctActions = allActions |
629 | 578 |
.Except(_agent.GetEnumerable(), new PithosMonitor.LocalFileComparer()) |
... | ... | |
651 | 600 |
|
652 | 601 |
AccountsDifferencer _differencer= new AccountsDifferencer(); |
653 | 602 |
|
654 |
/* |
|
655 |
Dictionary<string, List<ObjectInfo>> _currentSnapshot = new Dictionary<string, List<ObjectInfo>>(); |
|
656 |
Dictionary<string, List<ObjectInfo>> _previousSnapshot = new Dictionary<string, List<ObjectInfo>>(); |
|
657 |
*/ |
|
658 |
|
|
659 | 603 |
/// <summary> |
660 | 604 |
/// Deletes local files that are not found in the list of cloud files |
661 | 605 |
/// </summary> |
... | ... | |
716 | 660 |
var item = GetFileAgent(accountInfo).GetFileSystemInfo(relativePath); |
717 | 661 |
if (item.Exists) |
718 | 662 |
{ |
719 |
//Try to acquire a gate on the file, to take into account files that have been dequeued |
|
720 |
//and are being processed |
|
721 |
//TODO: The gate is not enough. Perhaps we need to keep a journal of processed files and check against |
|
722 |
//that as well. |
|
723 |
/* |
|
724 |
using (var gate = NetworkGate.Acquire(item.FullName, NetworkOperation.Deleting)) |
|
663 |
if ((item.Attributes & FileAttributes.ReadOnly) == FileAttributes.ReadOnly) |
|
725 | 664 |
{ |
726 |
if (gate.Failed) |
|
727 |
continue; |
|
728 |
*/ |
|
729 |
if ((item.Attributes & FileAttributes.ReadOnly) == FileAttributes.ReadOnly) |
|
730 |
{ |
|
731 |
item.Attributes = item.Attributes & ~FileAttributes.ReadOnly; |
|
732 |
|
|
733 |
} |
|
734 |
item.Delete(); |
|
735 |
DateTime lastDate; |
|
736 |
_lastSeen.TryRemove(item.FullName, out lastDate); |
|
737 |
deletedFiles.Add(item); |
|
738 |
/* |
|
665 |
item.Attributes = item.Attributes & ~FileAttributes.ReadOnly; |
|
666 |
|
|
739 | 667 |
} |
740 |
*/ |
|
668 |
item.Delete(); |
|
669 |
DateTime lastDate; |
|
670 |
_lastSeen.TryRemove(item.FullName, out lastDate); |
|
671 |
deletedFiles.Add(item); |
|
741 | 672 |
} |
742 | 673 |
StatusKeeper.SetFileState(item.FullName, FileStatus.Deleted, FileOverlayStatus.Deleted); |
743 | 674 |
} |
... | ... | |
783 | 714 |
{ |
784 | 715 |
var state = StatusKeeper.GetStateByFilePath(localFile.FullName); |
785 | 716 |
_lastSeen[localFile.FullName] = DateTime.Now; |
786 |
//FileState.FindByFilePath(localFile.FullName); |
|
787 | 717 |
//Common files should be checked on a per-case basis to detect differences, which is newer |
788 | 718 |
|
789 | 719 |
yield return new CloudAction(accountInfo, CloudActionType.MustSynch, |
... | ... | |
827 | 757 |
} |
828 | 758 |
} |
829 | 759 |
|
830 |
//Creates an appropriate action for each server file |
|
831 |
/* |
|
832 |
private IEnumerable<CloudAction> ObjectsToActions(AccountInfo accountInfo,IEnumerable<ObjectInfo> remote) |
|
833 |
{ |
|
834 |
if (remote==null) |
|
835 |
throw new ArgumentNullException(); |
|
836 |
Contract.EndContractBlock(); |
|
837 |
var fileAgent = GetFileAgent(accountInfo); |
|
838 |
|
|
839 |
//In order to avoid multiple iterations over the files, we iterate only once |
|
840 |
//over the remote files |
|
841 |
foreach (var objectInfo in remote) |
|
842 |
{ |
|
843 |
var relativePath = objectInfo.RelativeUrlToFilePath(accountInfo.UserName); |
|
844 |
//and remove any matching objects from the list, adding them to the commonObjects list |
|
845 |
|
|
846 |
if (fileAgent.Exists(relativePath)) |
|
847 |
{ |
|
848 |
//If a directory object already exists, we don't need to perform any other action |
|
849 |
var localFile = fileAgent.GetFileSystemInfo(relativePath); |
|
850 |
if (objectInfo.Content_Type == @"application/directory" && localFile is DirectoryInfo) |
|
851 |
continue; |
|
852 |
using (new SessionScope(FlushAction.Never)) |
|
853 |
{ |
|
854 |
var state = StatusKeeper.GetStateByFilePath(localFile.FullName); |
|
855 |
_lastSeen[localFile.FullName] = DateTime.Now; |
|
856 |
//FileState.FindByFilePath(localFile.FullName); |
|
857 |
//Common files should be checked on a per-case basis to detect differences, which is newer |
|
858 |
|
|
859 |
yield return new CloudAction(accountInfo, CloudActionType.MustSynch, |
|
860 |
localFile, objectInfo, state, accountInfo.BlockSize, |
|
861 |
accountInfo.BlockHash); |
|
862 |
} |
|
863 |
} |
|
864 |
else |
|
865 |
{ |
|
866 |
//If there is no match we add them to the localFiles list |
|
867 |
//but only if the file is not marked for deletion |
|
868 |
var targetFile = Path.Combine(accountInfo.AccountPath, relativePath); |
|
869 |
var fileStatus = StatusKeeper.GetFileStatus(targetFile); |
|
870 |
if (fileStatus != FileStatus.Deleted) |
|
871 |
{ |
|
872 |
//Remote files should be downloaded |
|
873 |
yield return new CloudDownloadAction(accountInfo,objectInfo); |
|
874 |
} |
|
875 |
} |
|
876 |
} |
|
877 |
} |
|
878 |
*/ |
|
879 | 760 |
|
880 | 761 |
private static FileAgent GetFileAgent(AccountInfo accountInfo) |
881 | 762 |
{ |
... | ... | |
1005 | 886 |
{ |
1006 | 887 |
if (gate.Failed) |
1007 | 888 |
return; |
1008 |
//The file's hashmap will be stored in the same location with the extension .hashmap |
|
1009 |
//var hashPath = Path.Combine(FileAgent.CachePath, relativePath + ".hashmap"); |
|
1010 | 889 |
|
1011 | 890 |
var client = new CloudFilesClient(accountInfo); |
1012 | 891 |
var account = cloudFile.Account; |
Also available in: Unified diff