Revision c945b450 trunk/Pithos.Core/Agents/PollAgent.cs

b/trunk/Pithos.Core/Agents/PollAgent.cs
45 45
using System.Diagnostics;
46 46
using System.Diagnostics.Contracts;
47 47
using System.IO;
48
using System.Linq.Expressions;
48 49
using System.Reflection;
49 50
using System.Threading;
50 51
using System.Threading.Tasks;
......
59 60
    using System.Collections.Generic;
60 61
    using System.Linq;
61 62

  
63
    [DebuggerDisplay("{FilePath} C:{C} L:{L} S:{S}")]
64
    public class StateTuple
65
    {
66
        public string FilePath { get; private set; }
67

  
68
        public string L
69
        {
70
            get { return FileState==null?null:FileState.Checksum; }
71
        }
72

  
73
        public string C { get; set; }
74

  
75
        public string S
76
        {
77
            get { return ObjectInfo== null ? null : ObjectInfo.Hash; }
78
        }
79

  
80
        private FileSystemInfo _fileInfo;
81
        public FileSystemInfo FileInfo
82
        {
83
            get { return _fileInfo; }
84
            set
85
            {
86
                _fileInfo = value;
87
                FilePath = value.FullName;
88
            }
89
        }
90

  
91
        public FileState FileState { get; set; }
92
        public ObjectInfo ObjectInfo{ get; set; }
93

  
94
        public StateTuple() { }
95

  
96
        public StateTuple(FileSystemInfo info)
97
        {
98
            FileInfo = info;
99
        }
100

  
101

  
102
    }
103

  
104

  
62 105
    /// <summary>
63 106
    /// PollAgent periodically polls the server to detect object changes. The agent retrieves a listing of all
64 107
    /// objects and compares it with a previously cached version to detect differences. 
......
84 127

  
85 128
        public IStatusNotification StatusNotification { get; set; }
86 129

  
130
        private CancellationTokenSource _currentOperationCancellation = new CancellationTokenSource();
131

  
132
        public void CancelCurrentOperation()
133
        {
134
            //What does it mean to cancel the current upload/download?
135
            //Obviously, the current operation will be cancelled by throwing
136
            //a cancellation exception.
137
            //
138
            //The default behavior is to retry any operations that throw.
139
            //Obviously this is not what we want in this situation.
140
            //The cancelled operation should NOT bea retried. 
141
            //
142
            //This can be done by catching the cancellation exception
143
            //and avoiding the retry.
144
            //
145

  
146
            //Have to reset the cancellation source - it is not possible to reset the source
147
            //Have to prevent a case where an operation requests a token from the old source
148
            var oldSource = Interlocked.Exchange(ref _currentOperationCancellation, new CancellationTokenSource());
149
            oldSource.Cancel();
150

  
151
        }
152

  
87 153
        public bool Pause
88 154
        {
89 155
            get {
......
264 330
                        var remoteObjects = (from objectList in listTasks
265 331
                                            where (string)objectList.AsyncState != "trash"
266 332
                                            from obj in objectList.Result
333
                                            orderby obj.Bytes ascending 
267 334
                                            select obj).ToList();
268 335
                        
269 336
                        //Get the latest remote object modification date, only if it is after
......
300 367
                            StatusKeeper.CleanupOrphanStates();
301 368
                        StatusKeeper.CleanupStaleStates(accountInfo, cleanRemotes);
302 369
                        
303
                        var differencer = _differencer.PostSnapshot(accountInfo, cleanRemotes);
370
                        //var differencer = _differencer.PostSnapshot(accountInfo, cleanRemotes);
304 371

  
305 372
                        var filterUris = Selectives.SelectiveUris[accountInfo.AccountKey];
306 373

  
307 374

  
308
                        //On the first run
309
                        if (_firstPoll)
310
                        {
311
                            MarkSuspectedDeletes(accountInfo, cleanRemotes);
312
                        }
313
                        ProcessDeletedFiles(accountInfo, differencer.Deleted.FilterDirectlyBelow(filterUris));
375
                        //Get the local files here                        
376
                        var agent = AgentLocator<FileAgent>.Get(accountInfo.AccountPath);
314 377

  
315
                        // @@@ NEED To add previous state here as well, To compare with previous hash
378
                        var files = LoadLocalFileTuples(accountInfo);
316 379

  
317
                        
380
                        var states = FileState.Queryable.ToList();
381

  
382
                        var infos = (from remote in cleanRemotes
383
                                    let path = remote.RelativeUrlToFilePath(accountInfo.UserName)
384
                                    let info=agent.GetFileSystemInfo(path)
385
                                    select Tuple.Create(info.FullName,remote))
386
                                    .ToList();
387

  
388
                        var token = _currentOperationCancellation.Token;
389

  
390
                        var tuples = MergeSources(infos, files, states).ToList();
318 391

  
319
                        //Create a list of actions from the remote files
320 392
                        
321
                        var allActions = MovesToActions(accountInfo,differencer.Moved.FilterDirectlyBelow(filterUris))
322
                                        .Union(
323
                                        ChangesToActions(accountInfo, differencer.Changed.FilterDirectlyBelow(filterUris)))
324
                                        .Union(
325
                                        CreatesToActions(accountInfo, differencer.Created.FilterDirectlyBelow(filterUris)));
326

  
327
                        //And remove those that are already being processed by the agent
328
                        var distinctActions = allActions
329
                            .Except(NetworkAgent.GetEnumerable(), new LocalFileComparer())
330
                            .ToList();
331

  
332
                        await _unPauseEvent.WaitAsync();
333
                        //Queue all the actions
334
                        foreach (var message in distinctActions)
393
                        foreach (var tuple in tuples)
335 394
                        {
336
                            NetworkAgent.Post(message);
395
                            await _unPauseEvent.WaitAsync();
396
                            
397
                            SyncSingleItem(accountInfo, tuple, agent, token);
337 398
                        }
338 399

  
400

  
401
                        //On the first run
402
/*
403
                        if (_firstPoll)
404
                        {
405
                            MarkSuspectedDeletes(accountInfo, cleanRemotes);
406
                        }
407
*/
408

  
409

  
339 410
                        Log.Info("[LISTENER] End Processing");
340 411
                    }
341 412
                }
......
350 421
            }
351 422
        }
352 423

  
424
        private static List<Tuple<FileSystemInfo, string>> LoadLocalFileTuples(AccountInfo accountInfo)
425
        {
426
            using (ThreadContext.Stacks["Account Files Hashing"].Push(accountInfo.UserName))
427
            {
428

  
429
                var localInfos = AgentLocator<FileAgent>.Get(accountInfo.AccountPath).EnumerateFileSystemInfos();
430
                //Use the queue to retry locked file hashing
431
                var fileQueue = new Queue<FileSystemInfo>(localInfos);
432

  
433
                var results = new List<Tuple<FileSystemInfo, string>>();
434

  
435
                while (fileQueue.Count > 0)
436
                {
437
                    var file = fileQueue.Dequeue();
438
                    using (ThreadContext.Stacks["File"].Push(file.FullName))
439
                    {
440
                        try
441
                        {
442
                            var hash = (file is DirectoryInfo)
443
                                           ? "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855"
444
                                           : Signature.CalculateTreeHash(file, accountInfo.BlockSize,
445
                                                                         accountInfo.BlockHash)
446
                                                 .
447
                                                 TopHash.ToHashString();
448
                            results.Add(Tuple.Create(file, hash));
449
                        }
450
                        catch (IOException exc)
451
                        {
452
                            Log.WarnFormat("[HASH] File in use, will retry [{0}]", exc);
453
                            fileQueue.Enqueue(file);
454
                        }
455
                    }
456
                }
457

  
458
                return results;
459
            }
460
        }
461

  
462
        private void SyncSingleItem(AccountInfo accountInfo, StateTuple tuple, FileAgent agent, CancellationToken token)
463
        {
464
            Log.DebugFormat("Sync [{0}] C:[{1}] L:[{2}] S:[{3}]",tuple.FilePath,tuple.C,tuple.L,tuple.S);
465

  
466
            var localFilePath = tuple.FilePath;
467
            //Don't use the tuple info, it may have been deleted
468
            var localInfo = FileInfoExtensions.FromPath(localFilePath);
469

  
470
            // Local file unchanged? If both C and L are null, make sure it's because 
471
            //both the file is missing and the state checksum is not missing
472
            if (tuple.C == tuple.L && (localInfo.Exists || tuple.FileState==null))
473
            {
474
                //No local changes
475
                //Server unchanged?
476
                if (tuple.S == tuple.L)
477
                {
478
                    // No server changes
479
                    ;
480
                }
481
                else
482
                {
483
                    //Different from server
484
                    if (Selectives.IsSelected(accountInfo, localFilePath))
485
                    {
486
                        //Does the server file exist?
487
                        if (tuple.S == null)
488
                        {
489
                            //Server file doesn't exist
490
                            //deleteObjectFromLocal()
491
                            StatusKeeper.SetFileState(localFilePath, FileStatus.Deleted,
492
                                                      FileOverlayStatus.Deleted, "");
493
                            agent.Delete(localFilePath);
494
                            //updateRecord(Remove C, L)
495
                            StatusKeeper.ClearFileStatus(localFilePath);
496
                        }
497
                        else
498
                        {
499
                            //Server file exists
500
                            //downloadServerObject() // Result: L = S
501
                            StatusKeeper.SetFileState(localFilePath, FileStatus.Modified,
502
                                                      FileOverlayStatus.Modified, "");
503
                            NetworkAgent.Downloader.DownloadCloudFile(accountInfo,
504
                                                                            tuple.ObjectInfo,
505
                                                                            localFilePath, token).Wait(token);
506
                            //updateRecord( L = S )
507
                            StatusKeeper.UpdateFileChecksum(localFilePath, tuple.FileState==null?"":tuple.FileState.ShortHash,
508
                                                            tuple.ObjectInfo.Hash);
509

  
510
                            StatusKeeper.SetFileState(localFilePath, FileStatus.Unchanged,
511
                                                      FileOverlayStatus.Normal, "");
512
                        }
513
                    }
514
                }
515
            }
516
            else
517
            {
518
                //Local changes found
519

  
520
                //Server unchanged?
521
                if (tuple.S == tuple.L)
522
                {
523
                    //The FileAgent selective sync checks for new root folder files
524
                    if (!agent.Ignore(localFilePath))
525
                    {
526
                        if ((tuple.C == null || !localInfo.Exists) && tuple.ObjectInfo != null)
527
                        {
528
                            //deleteObjectFromServer()
529
                            DeleteCloudFile(accountInfo, tuple);
530
                            //updateRecord( Remove L, S)                  
531
                        }
532
                        else
533
                        {
534
                            //uploadLocalObject() // Result: S = C, L = S                        
535
                            var isUnselected = agent.IsUnselectedRootFolder(tuple.FilePath);
536

  
537
                            //Debug.Assert(tuple.FileState !=null);
538
                            var action = new CloudUploadAction(accountInfo, localInfo, tuple.FileState,
539
                                                               accountInfo.BlockSize, accountInfo.BlockHash,
540
                                                               "Poll", isUnselected);
541
                            NetworkAgent.Uploader.UploadCloudFile(action, token).Wait(token);
542

  
543

  
544
                            //updateRecord( S = C )
545
                            StatusKeeper.SetFileState(localFilePath, FileStatus.Unchanged,
546
                                                      FileOverlayStatus.Normal, "");
547
                            if (isUnselected)
548
                            {
549
                                ProcessChildren(accountInfo, tuple, agent, token);
550
                            }
551
                        }
552
                    }
553
                }
554
                else
555
                {
556
                    if (Selectives.IsSelected(accountInfo, localFilePath))
557
                    {
558
                        if (tuple.C == tuple.S)
559
                        {
560
                            // (Identical Changes) Result: L = S
561
                            //doNothing()
562
                            StatusKeeper.UpdateFileChecksum(localFilePath, tuple.FileState == null ? "" : tuple.FileState.ShortHash,
563
                                                            tuple.ObjectInfo.Hash);
564
                            StatusKeeper.SetFileState(localFilePath, FileStatus.Unchanged,
565
                                                      FileOverlayStatus.Normal, "");
566
                        }
567
                        else
568
                        {
569
                            if ((tuple.C == null || !localInfo.Exists) && tuple.ObjectInfo != null )
570
                            {
571
                                //deleteObjectFromServer()
572
                                DeleteCloudFile(accountInfo, tuple);
573
                                //updateRecord(Remove L, S)                  
574
                            }
575
                            else
576
                            {
577
                                ReportConflictForMismatch(localFilePath);
578
                                //identifyAsConflict() // Manual action required
579
                            }
580
                        }
581
                    }
582
                }
583
            }
584
        }
585

  
586
        private void DeleteCloudFile(AccountInfo accountInfo, StateTuple tuple)
587
        {
588
            StatusKeeper.SetFileState(tuple.FilePath, FileStatus.Deleted,
589
                                      FileOverlayStatus.Deleted, "");
590
            NetworkAgent.DeleteAgent.DeleteCloudFile(accountInfo, tuple.ObjectInfo);
591
            StatusKeeper.ClearFileStatus(tuple.FilePath);
592
        }
593

  
594
        private void ProcessChildren(AccountInfo accountInfo, StateTuple tuple, FileAgent agent, CancellationToken token)
595
        {
596

  
597
            var dirInfo = tuple.FileInfo as DirectoryInfo;
598
            var folderTuples = from folder in dirInfo.EnumerateDirectories("*", SearchOption.AllDirectories)
599
                               select new StateTuple(folder);
600
            var fileTuples = from file in dirInfo.EnumerateFiles("*", SearchOption.AllDirectories)
601
                             select new StateTuple(file);
602
            
603
            //Process folders first, to ensure folders appear on the sever as soon as possible
604
            folderTuples.ApplyAction(t => SyncSingleItem(accountInfo, t, agent, token));
605
            
606
            fileTuples.ApplyAction(t => SyncSingleItem(accountInfo, t, agent, token));
607
        }
608

  
609
        private static IEnumerable<StateTuple> MergeSources(
610
            IEnumerable<Tuple<string, ObjectInfo>> infos, 
611
            IEnumerable<Tuple<FileSystemInfo, string>> files, 
612
            IEnumerable<FileState> states)
613
        {
614
            var dct = new Dictionary<string, StateTuple>();
615
            foreach (var file in files)
616
            {
617
                var fsInfo = file.Item1;
618
                var fileHash = file.Item2;
619
                dct[fsInfo.FullName] = new StateTuple {FileInfo = fsInfo, C = fileHash};
620
            }
621
            foreach (var state in states)
622
            {
623
                StateTuple hashTuple;
624
                if (dct.TryGetValue(state.FilePath, out hashTuple))
625
                {
626
                    hashTuple.FileState = state;
627
                }
628
                else
629
                {
630
                    var fsInfo = FileInfoExtensions.FromPath(state.FilePath);
631
                    dct[state.FilePath] = new StateTuple {FileInfo = fsInfo, FileState = state};
632
                }
633
            }
634
            foreach (var info in infos)
635
            {
636
                StateTuple hashTuple;
637
                var filePath = info.Item1;
638
                var objectInfo = info.Item2;
639
                if (dct.TryGetValue(filePath, out hashTuple))
640
                {
641
                    hashTuple.ObjectInfo = objectInfo;
642
                }
643
                else
644
                {
645
                    var fsInfo = FileInfoExtensions.FromPath(filePath);
646
                    dct[filePath] = new StateTuple {FileInfo = fsInfo, ObjectInfo = objectInfo};
647
                }
648
            }
649
            return dct.Values;
650
        }
651

  
353 652
        /// <summary>
354 653
        /// Returns the latest LastModified date from the list of objects, but only if it is before
355 654
        /// than the threshold value
......
388 687
            return threshold;
389 688
        }
390 689

  
391
        readonly AccountsDifferencer _differencer = new AccountsDifferencer();
690
        //readonly AccountsDifferencer _differencer = new AccountsDifferencer();
392 691
        private Dictionary<Uri, List<Uri>> _selectiveUris = new Dictionary<Uri, List<Uri>>();
393 692
        private bool _pause;
394 693

  
......
484 783
                                              TraceLevel.Info);
485 784
        }
486 785

  
786
        private void ReportConflictForMismatch(string localFilePath)
787
        {
788
            if (String.IsNullOrWhiteSpace(localFilePath))
789
                throw new ArgumentNullException("localFilePath");
790
            Contract.EndContractBlock();
791

  
792
            StatusKeeper.SetFileState(localFilePath, FileStatus.Conflict, FileOverlayStatus.Conflict, "File changed at the server");
793
            UpdateStatus(PithosStatus.HasConflicts);
794
            var message = String.Format("Conflict detected for file {0}", localFilePath);
795
            Log.Warn(message);
796
            StatusNotification.NotifyChange(message, TraceLevel.Warning);
797
        }
798

  
799

  
800

  
487 801
        /// <summary>
488 802
        /// Creates a Sync action for each changed server file
489 803
        /// </summary>
......
655 969
        {
656 970
            AccountInfo account;
657 971
            _accounts.TryRemove(accountInfo.AccountKey, out account);
972
/*
658 973
            SnapshotDifferencer differencer;
659 974
            _differencer.Differencers.TryRemove(accountInfo.AccountKey, out differencer);
975
*/
660 976
        }
661 977

  
662 978
        public void SetSelectivePaths(AccountInfo accountInfo,Uri[] added, Uri[] removed)

Also available in: Unified diff