Revision a9faac18 trunk/Pithos.Core/Agents/NetworkAgent.cs

b/trunk/Pithos.Core/Agents/NetworkAgent.cs
62 62

  
63 63
        //A separate agent is used to execute delete actions immediatelly;
64 64
        private ActionBlock<CloudDeleteAction> _deleteAgent;
65

  
66
        //TODO: CHECK
65 67
        readonly ConcurrentDictionary<string,DateTime> _deletedFiles=new ConcurrentDictionary<string, DateTime>();
66 68

  
67 69

  
......
80 82

  
81 83
        private bool _firstPoll = true;
82 84
        
83
        private TaskCompletionSource<bool> _tcs;
84
        private readonly AsyncManualResetEvent _pauseAgent = new AsyncManualResetEvent(true);
85
        //The Sync Event signals a manual synchronisation
86
        private readonly AsyncManualResetEvent _syncEvent=new AsyncManualResetEvent();
87
        //The Pause event stops the network agent to give priority to the deletion agent
88
        //Initially the event is signalled because we don't need to pause
89
        private readonly AsyncManualResetEvent _pauseEvent = new AsyncManualResetEvent(true);
85 90

  
86 91
        private ConcurrentDictionary<string, DateTime> _lastSeen = new ConcurrentDictionary<string, DateTime>();
87 92

  
......
93 98
                Action loop = null;
94 99
                loop = () =>
95 100
                {
96
                    _pauseAgent.Wait();
101
                    _pauseEvent.Wait();
97 102
                    var message = inbox.Receive();
98 103
                    var process=message.Then(Process,inbox.CancellationToken);
99 104
                    inbox.LoopAsync(process, loop);
......
102 107
            });
103 108

  
104 109
            _deleteAgent = new ActionBlock<CloudDeleteAction>(message =>ProcessDelete(message),new ExecutionDataflowBlockOptions{MaxDegreeOfParallelism=4});
105
            /*
106
                Action loop = null;
107
                loop = () =>
108
                            {
109
                                var message = inbox.Receive();
110
                                var process = message.Then(ProcessDelete,inbox.CancellationToken);
111
                                inbox.LoopAsync(process, loop);
112
                            };
113
                loop();
114
*/
115

  
116 110
        }
117 111

  
118 112
        private async Task Process(CloudAction action)
......
230 224
        /// A separate agent is used to process deletes because the main agent may be busy with a long operation.
231 225
        /// </para>
232 226
        /// </remarks>
233
        private async Task ProcessDelete(CloudDeleteAction action)
227
        private void ProcessDelete(CloudDeleteAction action)
234 228
        {
235 229
            if (action == null)
236 230
                throw new ArgumentNullException("action");
......
257 251
                        var key = GetFileKey(action.CloudFile);
258 252
                        _deletedFiles[key] = DateTime.Now;
259 253

  
260
                        _pauseAgent.Reset();
254
                        _pauseEvent.Reset();
261 255
                        // and then delete the file from the server
262 256
                                DeleteCloudFile(accountInfo, cloudFile);
263 257

  
......
298 292
                {
299 293
                    //Set the event when all delete actions are processed
300 294
                    if (_deleteAgent.InputCount == 0)
301
                        _pauseAgent.Set();
295
                        _pauseEvent.Set();
302 296

  
303 297
                }
304 298
            }
......
404 398
                throw new ArgumentException("The CloudAction.AccountInfo is empty","cloudAction");
405 399
            Contract.EndContractBlock();
406 400

  
407
            _pauseAgent.Wait();
401
            _pauseEvent.Wait();
408 402

  
409 403
            //If the action targets a local file, add a treehash calculation
410 404
            if (!(cloudAction is CloudDeleteAction) && cloudAction.LocalFile as FileInfo != null)
......
433 427
            else
434 428
                _agent.Post(cloudAction);
435 429
        }
430
       
436 431

  
437
       /* class ObjectInfoByNameComparer:IEqualityComparer<ObjectInfo>
438
        {
439
            public bool Equals(ObjectInfo x, ObjectInfo y)
440
            {
441
                return x.Name.Equals(y.Name,StringComparison.InvariantCultureIgnoreCase);
442
            }
443

  
444
            public int GetHashCode(ObjectInfo obj)
445
            {
446
                return obj.Name.ToLower().GetHashCode();
447
            }
448
        }*/
449

  
432
        /// <summary>
433
        /// Start a manual synchronization
434
        /// </summary>
450 435
        public void SynchNow()
451
        {             
452
            if (_tcs!=null)
453
                _tcs.TrySetResult(true);
454
            else
455
            {
456
                //TODO: This may be OK for testing purposes, but we have no guarantee that it will
457
                //work properly in production
458
                PollRemoteFiles(repeat:false);
459
            }
436
        {       
437
            _syncEvent.Set();
460 438
        }
461 439

  
462 440
        //Remote files are polled periodically. Any changes are processed
463
        public async Task PollRemoteFiles(DateTime? since = null,bool repeat=true)
441
        public async Task PollRemoteFiles(DateTime? since = null)
464 442
        {
465 443
            UpdateStatus(PithosStatus.Syncing);
466 444
            StatusNotification.Notify(new PollNotification());
......
473 451
                {
474 452
                    //Next time we will check for all changes since the current check minus 1 second
475 453
                    //This is done to ensure there are no discrepancies due to clock differences
476
                    DateTime current = DateTime.Now.AddSeconds(-1);
454
                    var current = DateTime.Now.AddSeconds(-1);
477 455

  
478 456
                    var tasks = from accountInfo in _accounts
479 457
                                select ProcessAccountFiles(accountInfo, since);
......
482 460
                                        
483 461
                    _firstPoll = false;
484 462
                    //Reschedule the poll with the current timestamp as a "since" value
485
                    if (repeat)
486
                        nextSince = current;
487
                    else
488
                        return;
463
                    nextSince = current;
489 464
                }
490 465
                catch (Exception ex)
491 466
                {
......
494 469
                }
495 470
                
496 471
                UpdateStatus(PithosStatus.InSynch);
497
                //Wait for the polling interval to pass or the Manual flat to be toggled
472
                //Wait for the polling interval to pass or the Sync event to be signalled
498 473
                nextSince = await WaitForScheduledOrManualPoll(nextSince);
499 474

  
500 475
                PollRemoteFiles(nextSince);
......
502 477
            }
503 478
        }
504 479

  
480
        /// <summary>
481
        /// Wait for the polling period to expire or a manual sync request
482
        /// </summary>
483
        /// <param name="since"></param>
484
        /// <returns></returns>
505 485
        private async Task<DateTime?> WaitForScheduledOrManualPoll(DateTime? since)
506
        {            
507
            _tcs = new TaskCompletionSource<bool>();
486
        {
487
            var sync=_syncEvent.WaitAsync();
508 488
            var wait = TaskEx.Delay(TimeSpan.FromSeconds(Settings.PollingInterval), _agent.CancellationToken);
509
            var signaledTask = await TaskEx.WhenAny(_tcs.Task, wait);
489
            var signaledTask = await TaskEx.WhenAny(sync, wait);            
490
            
510 491
            //If polling is signalled by SynchNow, ignore the since tag
511 492
            if (signaledTask is Task<bool>)
512 493
                return null;
......
524 505

  
525 506
            using (log4net.ThreadContext.Stacks["Retrieve Remote"].Push(accountInfo.UserName))
526 507
            {
527
                await _pauseAgent.WaitAsync();
508
                await _pauseEvent.WaitAsync();
528 509

  
529 510
                Log.Info("Scheduled");
530 511
                var client = new CloudFilesClient(accountInfo)
......
539 520

  
540 521
                try
541 522
                {
542
                    await _pauseAgent.WaitAsync();
523
                    await _pauseEvent.WaitAsync();
543 524
                    //Get the poll time now. We may miss some deletions but it's better to keep a file that was deleted
544 525
                    //than delete a file that was created while we were executing the poll                    
545 526
                    var pollTime = DateTime.Now;
......
564 545
                                            from obj in objectList.Result
565 546
                                            select obj;
566 547

  
567
                        //TODO: Change the way deleted objects are detected.
568
                        //The list operation returns all existing objects so we could detect deleted remote objects
569
                        //by detecting objects that exist only locally. There are several cases where this is NOT the case:
570
                        //1.    The first time the application runs, as there may be files that were added while 
571
                        //      the application was down.
572
                        //2.    An object that is currently being uploaded will not appear in the remote list
573
                        //      until the upload finishes.
574
                        //      SOLUTION 1: Check the upload/download queue for the file
575
                        //      SOLUTION 2: Check the SQLite states for the file's entry. If it is being uploaded, 
576
                        //          or its last modification was after the current poll, don't delete it. This way we don't
577
                        //          delete objects whose upload finished too late to be included in the list.
578
                        //We need to detect and protect against such situations
579
                        //TODO: Does FileState have a LastModification field?
580
                        //TODO: How do we update the LastModification field? Do we need to add SQLite triggers?
581
                        //      Do we need to use a proper SQLite schema?
582
                        //      We can create a trigger with 
583
                        // CREATE TRIGGER IF NOT EXISTS update_last_modified UPDATE ON FileState FOR EACH ROW
584
                        //  BEGIN
585
                        //      UPDATE FileState SET LastModification=datetime('now')  WHERE Id=old.Id;
586
                        //  END;
587
                        //
588
                        //NOTE: Some files may have been deleted remotely while the application was down. 
589
                        //  We DO have to delete those files. Checking the trash makes it easy to detect them,
590
                        //  Otherwise, we can't be really sure whether we need to upload or delete 
591
                        //  the local-only files.
592
                        //  SOLUTION 1: Ask the user when such a local-only file is detected during the first poll.
593
                        //  SOLUTION 2: Mark conflict and ask the user as in #1
594

  
595 548
                        var trashObjects = dict["trash"].Result;
596 549
                        var sharedObjects = dict["shared"].Result;
597 550

  
......
604 557
                                        select trash;
605 558
                        ProcessTrashedFiles(accountInfo, realTrash);
606 559

  
607

  
608 560
                        var cleanRemotes = (from info in remoteObjects.Union(sharedObjects)
609 561
                                     let name = info.Name
610 562
                                     where !name.EndsWith(".ignore", StringComparison.InvariantCultureIgnoreCase) &&
......
620 572
                        var allActions = ChangesToActions(accountInfo, differencer.Changed)
621 573
                                        .Union(
622 574
                                        CreatesToActions(accountInfo,differencer.Created));
623

  
624 575
                        
625
                        //var relativePath = objectInfo.RelativeUrlToFilePath(accountInfo.UserName);
626

  
627 576
                        //And remove those that are already being processed by the agent
628 577
                        var distinctActions = allActions
629 578
                            .Except(_agent.GetEnumerable(), new PithosMonitor.LocalFileComparer())
......
651 600

  
652 601
        AccountsDifferencer _differencer= new AccountsDifferencer();
653 602

  
654
/*
655
        Dictionary<string, List<ObjectInfo>> _currentSnapshot = new Dictionary<string, List<ObjectInfo>>();
656
        Dictionary<string, List<ObjectInfo>> _previousSnapshot = new Dictionary<string, List<ObjectInfo>>();
657
*/
658

  
659 603
        /// <summary>
660 604
        /// Deletes local files that are not found in the list of cloud files
661 605
        /// </summary>
......
716 660
                    var item = GetFileAgent(accountInfo).GetFileSystemInfo(relativePath);
717 661
                    if (item.Exists)
718 662
                    {
719
                        //Try to acquire a gate on the file, to take into account files that have been dequeued
720
                        //and are being processed
721
                        //TODO: The gate is not enough. Perhaps we need to keep a journal of processed files and check against
722
                        //that as well.
723
/*
724
                        using (var gate = NetworkGate.Acquire(item.FullName, NetworkOperation.Deleting))
663
                        if ((item.Attributes & FileAttributes.ReadOnly) == FileAttributes.ReadOnly)
725 664
                        {
726
                            if (gate.Failed)
727
                                continue;
728
*/
729
                            if ((item.Attributes & FileAttributes.ReadOnly) == FileAttributes.ReadOnly)
730
                            {
731
                                item.Attributes = item.Attributes & ~FileAttributes.ReadOnly;
732

  
733
                            }
734
                            item.Delete();
735
                            DateTime lastDate;
736
                            _lastSeen.TryRemove(item.FullName, out lastDate);
737
                            deletedFiles.Add(item);
738
/*
665
                            item.Attributes = item.Attributes & ~FileAttributes.ReadOnly;
666

  
739 667
                        }
740
*/
668
                        item.Delete();
669
                        DateTime lastDate;
670
                        _lastSeen.TryRemove(item.FullName, out lastDate);
671
                        deletedFiles.Add(item);
741 672
                    }
742 673
                    StatusKeeper.SetFileState(item.FullName, FileStatus.Deleted, FileOverlayStatus.Deleted);                    
743 674
                }
......
783 714
                    {
784 715
                        var state =  StatusKeeper.GetStateByFilePath(localFile.FullName);
785 716
                        _lastSeen[localFile.FullName] = DateTime.Now;
786
                        //FileState.FindByFilePath(localFile.FullName);
787 717
                        //Common files should be checked on a per-case basis to detect differences, which is newer
788 718

  
789 719
                        yield return new CloudAction(accountInfo, CloudActionType.MustSynch,
......
827 757
            }            
828 758
        }
829 759

  
830
        //Creates an appropriate action for each server file
831
/*
832
        private IEnumerable<CloudAction> ObjectsToActions(AccountInfo accountInfo,IEnumerable<ObjectInfo> remote)
833
        {
834
            if (remote==null)
835
                throw new ArgumentNullException();
836
            Contract.EndContractBlock();
837
            var fileAgent = GetFileAgent(accountInfo);
838

  
839
            //In order to avoid multiple iterations over the files, we iterate only once
840
            //over the remote files
841
            foreach (var objectInfo in remote)
842
            {
843
                var relativePath = objectInfo.RelativeUrlToFilePath(accountInfo.UserName);
844
                //and remove any matching objects from the list, adding them to the commonObjects list
845
                
846
                if (fileAgent.Exists(relativePath))
847
                {
848
                    //If a directory object already exists, we don't need to perform any other action                    
849
                    var localFile = fileAgent.GetFileSystemInfo(relativePath);
850
                    if (objectInfo.Content_Type == @"application/directory" && localFile is DirectoryInfo)
851
                        continue;
852
                    using (new SessionScope(FlushAction.Never))
853
                    {
854
                        var state =  StatusKeeper.GetStateByFilePath(localFile.FullName);
855
                        _lastSeen[localFile.FullName] = DateTime.Now;
856
                        //FileState.FindByFilePath(localFile.FullName);
857
                        //Common files should be checked on a per-case basis to detect differences, which is newer
858

  
859
                        yield return new CloudAction(accountInfo, CloudActionType.MustSynch,
860
                                                     localFile, objectInfo, state, accountInfo.BlockSize,
861
                                                     accountInfo.BlockHash);
862
                    }
863
                }
864
                else
865
                {
866
                    //If there is no match we add them to the localFiles list
867
                    //but only if the file is not marked for deletion
868
                    var targetFile = Path.Combine(accountInfo.AccountPath, relativePath);
869
                    var fileStatus = StatusKeeper.GetFileStatus(targetFile);
870
                    if (fileStatus != FileStatus.Deleted)
871
                    {
872
                        //Remote files should be downloaded
873
                        yield return new CloudDownloadAction(accountInfo,objectInfo);
874
                    }
875
                }
876
            }            
877
        }
878
*/
879 760

  
880 761
        private static FileAgent GetFileAgent(AccountInfo accountInfo)
881 762
        {
......
1005 886
            {
1006 887
                if (gate.Failed)
1007 888
                    return;
1008
                //The file's hashmap will be stored in the same location with the extension .hashmap
1009
                //var hashPath = Path.Combine(FileAgent.CachePath, relativePath + ".hashmap");
1010 889
                
1011 890
                var client = new CloudFilesClient(accountInfo);
1012 891
                var account = cloudFile.Account;

Also available in: Unified diff