Statistics
| Branch: | Revision:

root / trunk / Pithos.Core / Agents / PollAgent.cs @ 2115e2a5

History | View | Annotate | Download (49.8 kB)

1
#region
2
/* -----------------------------------------------------------------------
3
 * <copyright file="PollAgent.cs" company="GRNet">
4
 * 
5
 * Copyright 2011-2012 GRNET S.A. All rights reserved.
6
 *
7
 * Redistribution and use in source and binary forms, with or
8
 * without modification, are permitted provided that the following
9
 * conditions are met:
10
 *
11
 *   1. Redistributions of source code must retain the above
12
 *      copyright notice, this list of conditions and the following
13
 *      disclaimer.
14
 *
15
 *   2. Redistributions in binary form must reproduce the above
16
 *      copyright notice, this list of conditions and the following
17
 *      disclaimer in the documentation and/or other materials
18
 *      provided with the distribution.
19
 *
20
 *
21
 * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
22
 * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
23
 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
24
 * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
25
 * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26
 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27
 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
28
 * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
29
 * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
30
 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
31
 * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
32
 * POSSIBILITY OF SUCH DAMAGE.
33
 *
34
 * The views and conclusions contained in the software and
35
 * documentation are those of the authors and should not be
36
 * interpreted as representing official policies, either expressed
37
 * or implied, of GRNET S.A.
38
 * </copyright>
39
 * -----------------------------------------------------------------------
40
 */
41
#endregion
42

    
43
using System.Collections.Concurrent;
44
using System.ComponentModel.Composition;
45
using System.Diagnostics;
46
using System.Diagnostics.Contracts;
47
using System.IO;
48
using System.Reflection;
49
using System.Threading;
50
using System.Threading.Tasks;
51
using Pithos.Interfaces;
52
using Pithos.Network;
53
using log4net;
54

    
55
namespace Pithos.Core.Agents
56
{
57
    using System;
58
    using System.Collections.Generic;
59
    using System.Linq;
60

    
61
    /*public class PollRequest
62
    {
63
        public DateTime? Since { get; set; }
64
        public IEnumerable<string> Batch { get; set; }
65
    }*/
66

    
67

    
68
    /// <summary>
69
    /// PollAgent periodically polls the server to detect object changes. The agent retrieves a listing of all
70
    /// objects and compares it with a previously cached version to detect differences. 
71
    /// New files are downloaded, missing files are deleted from the local file system and common files are compared
72
    /// to determine the appropriate action
73
    /// </summary>
74
    [Export]
75
    public class PollAgent
76
    {
77
        private static readonly ILog Log = LogManager.GetLogger(MethodBase.GetCurrentMethod().DeclaringType);
78

    
79
        [System.ComponentModel.Composition.Import]
80
        public IStatusKeeper StatusKeeper { get; set; }
81

    
82
        [System.ComponentModel.Composition.Import]
83
        public IPithosSettings Settings { get; set; }
84

    
85
        [System.ComponentModel.Composition.Import]
86
        public NetworkAgent NetworkAgent { get; set; }
87

    
88
        [System.ComponentModel.Composition.Import]
89
        public Selectives Selectives { get; set; }
90

    
91
        public IStatusNotification StatusNotification { get; set; }
92

    
93
        private CancellationTokenSource _currentOperationCancellation = new CancellationTokenSource();
94

    
95
        public void CancelCurrentOperation()
96
        {
97
            //What does it mean to cancel the current upload/download?
98
            //Obviously, the current operation will be cancelled by throwing
99
            //a cancellation exception.
100
            //
101
            //The default behavior is to retry any operations that throw.
102
            //Obviously this is not what we want in this situation.
103
            //The cancelled operation should NOT bea retried. 
104
            //
105
            //This can be done by catching the cancellation exception
106
            //and avoiding the retry.
107
            //
108

    
109
            //Have to reset the cancellation source - it is not possible to reset the source
110
            //Have to prevent a case where an operation requests a token from the old source
111
            var oldSource = Interlocked.Exchange(ref _currentOperationCancellation, new CancellationTokenSource());
112
            oldSource.Cancel();
113

    
114
        }
115

    
116
        public bool Pause
117
        {
118
            get {
119
                return _pause;
120
            }
121
            set {
122
                _pause = value;                
123
                if (!_pause)
124
                    _unPauseEvent.Set();
125
                else
126
                {
127
                    _unPauseEvent.Reset();
128
                }
129
            }
130
        }
131

    
132
        public CancellationToken CancellationToken
133
        {
134
            get { return _currentOperationCancellation.Token; }
135
        }
136

    
137
        private bool _firstPoll = true;
138

    
139
        //The Sync Event signals a manual synchronisation
140
        private readonly AsyncManualResetEvent _syncEvent = new AsyncManualResetEvent();
141

    
142
        private readonly AsyncManualResetEvent _unPauseEvent = new AsyncManualResetEvent(true);
143

    
144
        private readonly ConcurrentDictionary<string, DateTime> _lastSeen = new ConcurrentDictionary<string, DateTime>();
145
        private readonly ConcurrentDictionary<Uri, AccountInfo> _accounts = new ConcurrentDictionary<Uri,AccountInfo>();
146

    
147
        //private readonly ActionBlock<PollRequest>  _pollAction;
148

    
149
        readonly HashSet<string> _knownContainers = new HashSet<string>();
150

    
151
        
152
        /// <summary>
153
        /// Start a manual synchronization
154
        /// </summary>
155
        public void SynchNow(IEnumerable<string> paths=null)
156
        {
157
            _batchQueue.Enqueue(paths);
158
            _syncEvent.SetAsync();                
159

    
160
            //_pollAction.Post(new PollRequest {Batch = paths});
161
        }
162

    
163
        /// <summary>
164
        /// Start a manual synchronization
165
        /// </summary>
166
        public Task SynchNowAsync(IEnumerable<string> paths=null)
167
        {
168
            _batchQueue.Enqueue(paths);
169
            return _syncEvent.SetAsync();                
170

    
171
            //_pollAction.Post(new PollRequest {Batch = paths});
172
        }
173

    
174
        readonly ConcurrentQueue<IEnumerable<string>> _batchQueue=new ConcurrentQueue<IEnumerable<string>>();
175

    
176
        ConcurrentDictionary<string,MovedEventArgs> _moves=new ConcurrentDictionary<string, MovedEventArgs>(); 
177

    
178
        public void PostMove(MovedEventArgs args)
179
        {
180
            TaskEx.Run(() => _moves.AddOrUpdate(args.OldFullPath, args,(s,e)=>e));            
181
        }
182

    
183

    
184
        private bool _hasConnection;
185

    
186
        /// <summary>
187
        /// Remote files are polled periodically. Any changes are processed
188
        /// </summary>
189
        /// <param name="since"></param>
190
        /// <returns></returns>
191
        public  async Task PollRemoteFiles(DateTimeOffset? since = null)
192
        {
193
            if (Log.IsDebugEnabled)
194
                Log.DebugFormat("Polling changes after [{0}]",since);
195

    
196
            Debug.Assert(Thread.CurrentThread.IsBackground, "Polling Ended up in the main thread!");
197

    
198
            //GC.Collect();
199

    
200
            
201
            using (ThreadContext.Stacks["Retrieve Remote"].Push("All accounts"))
202
            {
203
                //If this poll fails, we will retry with the same since value
204
                DateTimeOffset? nextSince = since;
205
                try
206
                {
207
                    _unPauseEvent.Wait();
208
                    UpdateStatus(PithosStatus.PollSyncing);
209

    
210
                    if (!NetworkAgent.IsConnectedToInternet)
211
                    {
212
                        if (_hasConnection)
213
                        {
214
                            StatusNotification.Notify(new Notification
215
                            {
216
                                Level = TraceLevel.Error,
217
                                Title = "Internet Connection problem",
218
                                Message ="Internet connectivity was lost. Synchronization will continue when connectivity is restored"
219
                            });
220
                        }
221
                        _hasConnection = false;
222
                    }
223
                    else
224
                    {
225
                        if (!_hasConnection)
226
                        {
227
                            StatusNotification.Notify(new Notification
228
                            {
229
                                Level = TraceLevel.Info,
230
                                Title = "Internet Connection",
231
                                Message = "Internet connectivity restored."
232
                            });
233
                        }
234
                        _hasConnection = true;
235

    
236
                        var accountBatches = new Dictionary<Uri, IEnumerable<string>>();
237
                        IEnumerable<string> batch = null;
238
                        if (_batchQueue.TryDequeue(out batch) && batch != null)
239
                            foreach (var account in _accounts.Values)
240
                            {
241
                                var accountBatch = batch.Where(path => path.IsAtOrBelow(account.AccountPath));
242
                                accountBatches[account.AccountKey] = accountBatch;
243
                            }
244

    
245
                        var moves = Interlocked.Exchange(ref _moves, new ConcurrentDictionary<string, MovedEventArgs>());
246

    
247
                        var tasks = new List<Task<DateTimeOffset?>>();
248
                        foreach (var accountInfo in _accounts.Values)
249
                        {
250
                            IEnumerable<string> accountBatch;
251
                            accountBatches.TryGetValue(accountInfo.AccountKey, out accountBatch);
252
                            var t = ProcessAccountFiles(accountInfo, accountBatch, moves, since);
253
                            tasks.Add(t);
254
                        }
255

    
256
                        var taskList = tasks.ToList();
257
                        var nextTimes = await TaskEx.WhenAll(taskList).ConfigureAwait(false);
258

    
259
                        _firstPoll = false;
260
                        //Reschedule the poll with the current timestamp as a "since" value
261

    
262
                        if (nextTimes.Length > 0)
263
                            nextSince = nextTimes.Min();
264
                        if (Log.IsDebugEnabled)
265
                            Log.DebugFormat("Next Poll for changes since [{0}]", nextSince);
266
                    }
267
                }
268
                catch (Exception ex)
269
                {
270
                    Log.ErrorFormat("Error while processing accounts\r\n{0}", ex);
271
                    //In case of failure retry with the same "since" value
272
                }
273

    
274
                UpdateStatus(PithosStatus.PollComplete);
275
                //The multiple try blocks are required because we can't have an await call
276
                //inside a finally block
277
                //TODO: Find a more elegant solution for reschedulling in the event of an exception
278
                try
279
                {
280
                    //Wait for the polling interval to pass or the Sync event to be signalled
281
                    nextSince = await WaitForScheduledOrManualPoll(nextSince).ConfigureAwait(false);
282
                }
283
                finally
284
                {
285
                    //Ensure polling is scheduled even in case of error
286
#pragma warning disable 4014
287
                    TaskEx.Run(()=>PollRemoteFiles(nextSince));
288
#pragma warning restore 4014
289
                    //_pollAction.Post(new PollRequest {Since = nextSince});
290
                }
291
            }
292
        }
293

    
294
        /// <summary>
295
        /// Wait for the polling period to expire or a manual sync request
296
        /// </summary>
297
        /// <param name="since"></param>
298
        /// <returns></returns>
299
        private async Task<DateTimeOffset?> WaitForScheduledOrManualPoll(DateTimeOffset? since)
300
        {
301
            var sync = _syncEvent.WaitAsync();
302
            var delay = TimeSpan.FromSeconds(Settings.PollingInterval);
303
            if (Log.IsDebugEnabled)
304
                Log.DebugFormat("Next Poll at [{0}]", DateTime.Now.Add(delay));
305
            var wait = TaskEx.Delay(delay);
306

    
307
            var signaledTask = await TaskEx.WhenAny(sync, wait).ConfigureAwait(false);
308
            
309
            //Pausing takes precedence over manual sync or awaiting
310
            _unPauseEvent.Wait();
311
            
312
            //Wait for network processing to finish before polling
313
            var pauseTask=NetworkAgent.ProceedEvent.WaitAsync();
314
            await TaskEx.WhenAll(signaledTask, pauseTask).ConfigureAwait(false);
315

    
316
            //If polling is signalled by SynchNow, ignore the since tag
317
            if (sync.IsCompleted)
318
            {                
319
                _syncEvent.Reset();
320
                return null;
321
            }
322
            return since;
323
        }
324

    
325
        
326

    
327
        public async Task<DateTimeOffset?> ProcessAccountFiles(AccountInfo accountInfo, IEnumerable<string> accountBatch, ConcurrentDictionary<string, MovedEventArgs> moves, DateTimeOffset? since = null)
328
        {
329
            if (accountInfo == null)
330
                throw new ArgumentNullException("accountInfo");
331
            if (String.IsNullOrWhiteSpace(accountInfo.AccountPath))
332
                throw new ArgumentException("The AccountInfo.AccountPath is empty", "accountInfo");
333
            Contract.EndContractBlock();
334

    
335

    
336
            using (ThreadContext.Stacks["Retrieve Remote"].Push(accountInfo.UserName))
337
            {
338

    
339
                await NetworkAgent.GetDeleteAwaiter().ConfigureAwait(false);
340

    
341
                Log.Info("Scheduled");
342
                var client = new CloudFilesClient(accountInfo);
343

    
344
                //We don't need to check the trash container
345
                var allContainers=await client.ListContainers(accountInfo.UserName).ConfigureAwait(false);
346
                var containers = allContainers
347
                    .Where(c=>c.Name.ToString()!="trash")
348
                    .ToList();
349

    
350

    
351
                CreateContainerFolders(accountInfo, containers);
352

    
353
                //The nextSince time fallback time is the same as the current.
354
                //If polling succeeds, the next Since time will be the smallest of the maximum modification times
355
                //of the shared and account objects
356
                DateTimeOffset? nextSince = since;
357

    
358
                try
359
                {
360
                    //Wait for any deletions to finish
361
                    await NetworkAgent.GetDeleteAwaiter().ConfigureAwait(false);
362
                    //Get the poll time now. We may miss some deletions but it's better to keep a file that was deleted
363
                    //than delete a file that was created while we were executing the poll                    
364

    
365
                    var token = _currentOperationCancellation.Token;
366

    
367
                    //Get the list of server objects changed since the last check
368
                    //The name of the container is passed as state in order to create a dictionary of tasks in a subsequent step
369
                    var listObjects = (from container in containers
370
                                       select Task<IList<ObjectInfo>>.Factory.StartNew(_ =>
371
                                             client.ListObjects(accountInfo.UserName, container.Name, since), container.Name,token)).ToList();
372

    
373
                    var selectiveEnabled = Selectives.IsSelectiveEnabled(accountInfo.AccountKey);
374
                    var listShared = selectiveEnabled?
375
                                Task<IList<ObjectInfo>>.Factory.StartNew(_ => 
376
                                    client.ListSharedObjects(_knownContainers,since), "shared",token)
377
                                :Task.Factory.FromResult((IList<ObjectInfo>) new List<ObjectInfo>(),"shared");
378
                    
379
                    listObjects.Add(listShared);
380
                    var listTasks = await Task.Factory.WhenAll(listObjects.ToArray()).ConfigureAwait(false);
381

    
382
                    using (ThreadContext.Stacks["SCHEDULE"].Push("Process Results"))
383
                    {
384

    
385
                        //In case of cancellation, retry for the current date
386
                        if (token.IsCancellationRequested) return since;
387

    
388
                        var dict = listTasks.ToDictionary(t => t.AsyncState);
389

    
390
                        //Get all non-trash objects. Remember, the container name is stored in AsyncState
391
                        var remoteObjects = (from objectList in listTasks
392
                                            where objectList.AsyncState.ToString() != "trash"
393
                                            from obj in objectList.Result
394
                                            orderby obj.Bytes ascending 
395
                                            select obj).ToList();
396
                        
397
                        //Get the latest remote object modification date, only if it is after
398
                        //the original since date                        
399
                        nextSince = GetLatestDateAfter(nextSince, remoteObjects);
400

    
401
                        var sharedObjects = dict["shared"].Result;
402

    
403
                        //DON'T process trashed files
404
                        //If some files are deleted and added again to a folder, they will be deleted
405
                        //even though they are new.
406
                        //We would have to check file dates and hashes to ensure that a trashed file
407
                        //can be deleted safely from the local hard drive.
408
                        /*
409
                        //Items with the same name, hash may be both in the container and the trash
410
                        //Don't delete items that exist in the container
411
                        var realTrash = from trash in trashObjects
412
                                        where
413
                                            !remoteObjects.Any(
414
                                                info => info.Name == trash.Name && info.Hash == trash.Hash)
415
                                   8     select trash;
416
                        ProcessTrashedFiles(accountInfo, realTrash);
417
*/
418

    
419
                        var cleanRemotes = (from info in remoteObjects.Union(sharedObjects)
420
                                            let name = info.Name.ToUnescapedString()??""
421
                                            where !name.EndsWith(".ignore", StringComparison.InvariantCultureIgnoreCase) &&
422
                                                  !name.StartsWith(FolderConstants.CacheFolder + "/",
423
                                                                   StringComparison.InvariantCultureIgnoreCase)
424
                                            select info).ToList();
425

    
426
                        //In case of cancellation, retry for the current date
427
                        if (token.IsCancellationRequested) return since;
428

    
429
                        if (_firstPoll)
430
                            StatusKeeper.CleanupOrphanStates();
431
                        
432
                        var differencer = _differencer.PostSnapshot(accountInfo, cleanRemotes);
433
                        var currentRemotes = differencer.Current.ToList();
434

    
435
                        //In case of cancellation, retry for the current date
436
                        if (token.IsCancellationRequested) return since;
437
                        
438
                        StatusKeeper.CleanupStaleStates(accountInfo, currentRemotes);
439

    
440
                        //var filterUris = Selectives.SelectiveUris[accountInfo.AccountKey];
441

    
442
                        //May have to wait if the FileAgent has asked for a Pause, due to local changes
443
                        await _unPauseEvent.WaitAsync().ConfigureAwait(false);
444

    
445
                        //In case of cancellation, retry for the current date
446
                        if (token.IsCancellationRequested) return since;
447

    
448
                        //Get the local files here                        
449
                        var agent = AgentLocator<FileAgent>.Get(accountInfo.AccountPath);                                                
450
                        var files = LoadLocalFileTuples(accountInfo, accountBatch);
451

    
452

    
453
                                                
454
                        //WARNING: GetFileSystemInfo may create state entries.
455
                        //TODO: Find a different way to create the tuples and block long filenames
456
                        var infos = (from remote in currentRemotes
457
                                    let path = remote.RelativeUrlToFilePath(accountInfo.UserName)
458
                                    let info=agent.GetFileSystemInfo(path)
459
                                    where info != null
460
                                    select Tuple.Create(info.FullName,remote))
461
                                    .ToList();
462

    
463
                        var states = StatusKeeper.GetAllStates();
464

    
465
                        var tupleBuilder = new TupleBuilder(CancellationToken,StatusKeeper,StatusNotification,Settings);
466

    
467
                        var tuples = tupleBuilder.MergeSources(infos, files, states,moves).ToList();
468

    
469
                        var processedPaths = new HashSet<string>();
470
                        //Process only the changes in the batch file, if one exists
471
                        var stateTuples = accountBatch==null?tuples:tuples.Where(t => accountBatch.Contains(t.FilePath));
472
                        foreach (var tuple in stateTuples.Where(s=>!s.Locked))
473
                        {
474
                            await _unPauseEvent.WaitAsync().ConfigureAwait(false);
475

    
476
                            //In case of cancellation, retry for the current date
477
                            if (token.IsCancellationRequested) return since;
478

    
479
                            //Set the Merkle Hash
480
                            //SetMerkleHash(accountInfo, tuple);
481

    
482
                            await SyncSingleItem(accountInfo, tuple, agent, moves,processedPaths,token).ConfigureAwait(false);
483

    
484
                        }
485

    
486

    
487
                        //On the first run
488
/*
489
                        if (_firstPoll)
490
                        {
491
                            MarkSuspectedDeletes(accountInfo, cleanRemotes);
492
                        }
493
*/
494

    
495

    
496
                        Log.Info("[LISTENER] End Processing");
497
                    }
498
                }
499
                catch (Exception ex)
500
                {
501
                    Log.ErrorFormat("[FAIL] ListObjects for {0} in ProcessRemoteFiles with {1}", accountInfo.UserName, ex);
502
                    return nextSince;
503
                }
504

    
505
                Log.Info("[LISTENER] Finished");
506
                return nextSince;
507
            }
508
        }
509
/*
510

    
511
        private static void SetMerkleHash(AccountInfo accountInfo, StateTuple tuple)
512
        {
513
            //The Merkle hash for directories is that of an empty buffer
514
            if (tuple.FileInfo is DirectoryInfo)
515
                tuple.C = MERKLE_EMPTY;
516
            else if (tuple.FileState != null && tuple.MD5 == tuple.FileState.ETag)
517
            {
518
                //If there is a state whose MD5 matches, load the merkle hash from the file state
519
                //insteaf of calculating it
520
                tuple.C = tuple.FileState.Checksum;                              
521
            }
522
            else
523
            {
524
                tuple.Merkle = Signature.CalculateTreeHashAsync((FileInfo)tuple.FileInfo, accountInfo.BlockSize, accountInfo.BlockHash,1,progress);
525
                //tuple.C=tuple.Merkle.TopHash.ToHashString();                
526
            }
527
        }
528
*/
529

    
530
        private IEnumerable<FileSystemInfo> LoadLocalFileTuples(AccountInfo accountInfo,IEnumerable<string> batch )
531
        {
532
            using (ThreadContext.Stacks["Account Files Hashing"].Push(accountInfo.UserName))
533
            {
534
                var batchPaths = (batch==null)?new List<string>():batch.ToList();
535
                IEnumerable<FileSystemInfo> localInfos=AgentLocator<FileAgent>.Get(accountInfo.AccountPath)
536
                                                        .EnumerateFileSystemInfos();
537
                if (batchPaths.Count>0)
538
                    localInfos= localInfos.Where(fi => batchPaths.Contains(fi.FullName));
539

    
540
                return localInfos;
541
            }
542
        }
543

    
544
        /// <summary>
545
        /// Wait and Pause the agent while waiting
546
        /// </summary>
547
        /// <param name="backoff"></param>
548
        /// <returns></returns>
549
        private async Task PauseFor(int backoff)
550
        {
551

    
552
            Pause = true;
553
            await TaskEx.Delay(backoff).ConfigureAwait(false);
554
            Pause = false;
555
        }
556

    
557
        private async Task SyncSingleItem(AccountInfo accountInfo, StateTuple tuple, FileAgent agent, ConcurrentDictionary<string, MovedEventArgs> moves,HashSet<string> processedPaths, CancellationToken token)
558
        {
559
            Log.DebugFormat("Sync [{0}] C:[{1}] L:[{2}] S:[{3}]", tuple.FilePath, tuple.C, tuple.L, tuple.S);
560

    
561
            //If the processed paths already contain the current path, exit
562
            if (!processedPaths.Add(tuple.FilePath))
563
                return;
564

    
565
            try
566
            {
567
                bool isInferredParent = tuple.ObjectInfo != null && tuple.ObjectInfo.UUID.StartsWith("00000000-0000-0000");
568

    
569
                var localFilePath = tuple.FilePath;
570
                //Don't use the tuple info, it may have been deleted
571
                var localInfo = FileInfoExtensions.FromPath(localFilePath);
572

    
573

    
574
                var isUnselectedRootFolder = agent.IsUnselectedRootFolder(tuple.FilePath);
575

    
576
                //Unselected root folders that have not yet been uploaded should be uploaded and added to the 
577
                //selective folders
578

    
579
                if (!Selectives.IsSelected(accountInfo, localFilePath) &&
580
                    !(isUnselectedRootFolder && tuple.ObjectInfo == null))
581
                    return;
582

    
583
                // Local file unchanged? If both C and L are null, make sure it's because 
584
                //both the file is missing and the state checksum is not missing
585
                if (tuple.C == tuple.L /*&& (localInfo.Exists || tuple.FileState == null)*/)
586
                {
587
                    //No local changes
588
                    //Server unchanged?
589
                    if (tuple.S == tuple.L)
590
                    {
591
                        // No server changes
592
                        //Has the file been renamed locally?
593
                        if (!await MoveForLocalMove(accountInfo,tuple))
594
                            //Has the file been renamed on the server?
595
                            MoveForServerMove(accountInfo, tuple);
596
                    }
597
                    else
598
                    {
599
                        //Different from server
600
                        //Does the server file exist?
601
                        if (tuple.S == null)
602
                        {
603
                            //Server file doesn't exist
604
                            //deleteObjectFromLocal()
605
                            using (
606
                                StatusNotification.GetNotifier("Deleting local {0}", "Deleted local {0}",true,
607
                                                               localInfo.Name))
608
                            {
609
                                DeleteLocalFile(agent, localFilePath);
610
                            }
611
                        }
612
                        else
613
                        {
614
                            //Server file exists
615
                            //downloadServerObject() // Result: L = S
616
                            //If the file has moved on the server, move it locally before downloading
617
                            using (
618
                                StatusNotification.GetNotifier("Downloading {0}", "Downloaded {0}",true,
619
                                                               localInfo.Name))
620
                            {
621
                                var targetPath = MoveForServerMove(accountInfo, tuple);
622
                                if (targetPath != null)
623
                                {
624

    
625
                                    await DownloadCloudFile(accountInfo, tuple, token, targetPath).ConfigureAwait(false);
626

    
627
                                    AddOwnFolderToSelectives(accountInfo, tuple, targetPath);
628
                                }
629
                            }
630
                        }
631
                    }
632
                }
633
                else
634
                {                   
635

    
636
                    //Local changes found
637

    
638
                    //Server unchanged?
639
                    if (tuple.S == tuple.L)
640
                    {
641
                        //The FileAgent selective sync checks for new root folder files
642
                        if (!agent.Ignore(localFilePath))
643
                        {
644
                            if ((tuple.C == null || !localInfo.Exists) && tuple.ObjectInfo != null)
645
                            {
646
                                //deleteObjectFromServer()
647
                                await DeleteCloudFile(accountInfo, tuple);
648
                                //updateRecord( Remove L, S)                  
649
                            }
650
                            else
651
                            {
652
                                //uploadLocalObject() // Result: S = C, L = S                        
653
                                var progress = new Progress<HashProgress>(d =>
654
                                    StatusNotification.Notify(new StatusNotification(String.Format("Merkle Hashing for Upload {0:p} of {1}", d.Percentage, localInfo.Name))));
655

    
656
                                //Is it an unselected root folder
657
                                var isCreation = isUnselectedRootFolder ||//or a new folder under a selected parent?
658
                                        (localInfo is DirectoryInfo && Selectives.IsSelected(accountInfo, localInfo) && tuple.FileState == null && tuple.ObjectInfo == null);
659

    
660

    
661
                                //Is this a result of a FILE move with no modifications? Then try to move it,
662
                                //to avoid an expensive hash
663
                                if (!await MoveForLocalMove(accountInfo, tuple))
664
                                {
665
                                    await UploadLocalFile(accountInfo, tuple, token, isCreation, localInfo,processedPaths, progress).ConfigureAwait(false);
666
                                }
667

    
668
                                //updateRecord( S = C )
669
                                //State updated by the uploader
670
                                
671
                                if (isCreation )
672
                                {                                    
673
                                    ProcessChildren(accountInfo, tuple, agent, moves,processedPaths,token);
674
                                }
675
                            }
676
                        }
677
                    }
678
                    else
679
                    {
680
                        if (tuple.C == tuple.S)
681
                        {
682
                            // (Identical Changes) Result: L = S
683
                            //doNothing()
684
                            
685
                            //Don't update anything for nonexistend server files
686
                            if (tuple.S != null)
687
                            {
688
                                //Detect server moves
689
                                var targetPath = MoveForServerMove(accountInfo, tuple);
690
                                if (targetPath != null)
691
                                {
692
                                    Debug.Assert(tuple.Merkle != null);
693
                                    StatusKeeper.StoreInfo(targetPath, tuple.ObjectInfo, tuple.Merkle);
694

    
695
                                    AddOwnFolderToSelectives(accountInfo, tuple, targetPath);
696
                                }
697
                            }
698
                            else
699
                            {
700
                                //At this point, C==S==NULL and we have a stale state (L)
701
                                //Log the stale tuple for investigation
702
                                Log.WarnFormat("Stale tuple detected FilePathPath:[{0}], State:[{1}], LocalFile:[{2}]", tuple.FilePath, tuple.FileState, tuple.FileInfo);
703

    
704
                                //And remove it
705
                                if (!String.IsNullOrWhiteSpace(tuple.FilePath))
706
                                    StatusKeeper.ClearFileStatus(tuple.FilePath);
707
                            }
708
                        }
709
                        else
710
                        {
711
                            if ((tuple.C == null || !localInfo.Exists) && tuple.ObjectInfo != null)
712
                            {
713
                                //deleteObjectFromServer()
714
                                await DeleteCloudFile(accountInfo, tuple);
715
                                //updateRecord(Remove L, S)                  
716
                            }
717
                                //If both the local and server files are missing, the state is stale
718
                            else if (!localInfo.Exists && (tuple.S == null || tuple.ObjectInfo == null))
719
                            {
720
                                StatusKeeper.ClearFileStatus(localInfo.FullName);
721
                            }
722
                            else
723
                            {
724
                                ReportConflictForMismatch(localFilePath);
725
                                //identifyAsConflict() // Manual action required
726
                            }
727
                        }
728
                    }
729
                }
730
            }
731
            catch (Exception exc)
732
            {
733
                //In case of error log and retry with the next poll
734
                Log.ErrorFormat("[SYNC] Failed for file {0}. Will Retry.\r\n{1}",tuple.FilePath,exc);
735
            }
736
        }
737

    
738
        private void DeleteLocalFile(FileAgent agent, string localFilePath)
739
        {
740
            StatusKeeper.SetFileState(localFilePath, FileStatus.Deleted,
741
                                      FileOverlayStatus.Deleted, "");
742
            using (NetworkGate.Acquire(localFilePath, NetworkOperation.Deleting))
743
            {
744
                agent.Delete(localFilePath);
745
            }
746
            //updateRecord(Remove C, L)
747
            StatusKeeper.ClearFileStatus(localFilePath);
748
        }
749

    
750
        private async Task DownloadCloudFile(AccountInfo accountInfo, StateTuple tuple, CancellationToken token, string targetPath)
751
        {                        
752
            //Don't create a new state for non-existent files
753
            if (File.Exists(targetPath) || Directory.Exists(targetPath))
754
                StatusKeeper.SetFileState(targetPath, FileStatus.Modified, FileOverlayStatus.Modified,"");
755

    
756
            var finalHash=await
757
                NetworkAgent.Downloader.DownloadCloudFile(accountInfo, tuple.ObjectInfo, targetPath,
758
                                                          token)
759
                    .ConfigureAwait(false);
760
            //updateRecord( L = S )
761
            StatusKeeper.UpdateFileChecksum(targetPath, tuple.ObjectInfo.ETag,
762
                                            finalHash);
763

    
764
            StatusKeeper.StoreInfo(targetPath, tuple.ObjectInfo,finalHash);
765
        }
766

    
767
        private async Task UploadLocalFile(AccountInfo accountInfo, StateTuple tuple, CancellationToken token,
768
                                     bool isUnselectedRootFolder, FileSystemInfo localInfo, HashSet<string> processedPaths, IProgress<HashProgress> progress)
769
        {
770
            var action = new CloudUploadAction(accountInfo, localInfo, tuple.FileState,
771
                                               accountInfo.BlockSize, accountInfo.BlockHash,
772
                                               "Poll", isUnselectedRootFolder, token, progress,tuple.Merkle);            
773

    
774
            using (StatusNotification.GetNotifier("Uploading {0}", "Uploaded {0}",true,
775
                                                  localInfo.Name))
776
            {
777
                await NetworkAgent.Uploader.UploadCloudFile(action, token).ConfigureAwait(false);
778
            }
779

    
780
            if (isUnselectedRootFolder)
781
            {
782
                var dirActions =(
783
                    from dir in ((DirectoryInfo) localInfo).EnumerateDirectories("*", SearchOption.AllDirectories)
784
                    let subAction = new CloudUploadAction(accountInfo, dir, null,
785
                                                          accountInfo.BlockSize, accountInfo.BlockHash,
786
                                                          "Poll", true, token, progress)
787
                    select subAction).ToList();
788
                foreach (var dirAction in dirActions)
789
                {
790
                    processedPaths.Add(dirAction.LocalFile.FullName);
791
                }
792
                
793
                await TaskEx.WhenAll(dirActions.Select(a=>NetworkAgent.Uploader.UploadCloudFile(a,token)).ToArray());
794
            }
795
        }
796

    
797
        private async Task<bool> MoveForLocalMove(AccountInfo accountInfo, StateTuple tuple)
798
        {
799
            //Is the previous path missing?
800
            if (String.IsNullOrWhiteSpace(tuple.OldFullPath))
801
                return false;
802
            //Has the file locally, in which case it should be uploaded rather than moved?
803
            if (tuple.OldChecksum != tuple.Merkle.TopHash.ToHashString())
804
                return false;
805

    
806
            var relativePath = tuple.ObjectInfo.RelativeUrlToFilePath(accountInfo.UserName);
807
            var serverPath = Path.Combine(accountInfo.AccountPath, relativePath);
808
            //Has the file been renamed on the server?
809
            if (!tuple.OldFullPath.Equals(serverPath))
810
            {
811
                ReportConflictForDoubleRename(tuple.FilePath);
812
                return false;
813
            }
814

    
815
            try
816
            {
817

    
818
                var client = new CloudFilesClient(accountInfo);
819
                var objectInfo = CloudAction.CreateObjectInfoFor(accountInfo, tuple.FileInfo);
820
                objectInfo.X_Object_Hash = tuple.Merkle.TopHash.ToHashString();
821
                var containerPath = Path.Combine(accountInfo.AccountPath, objectInfo.Container.ToUnescapedString());
822
                //TODO: SImplify these multiple conversions from and to Uris
823
                var oldName = tuple.OldFullPath.AsRelativeTo(containerPath);
824
                //Then execute a move instead of an upload
825
                using (StatusNotification.GetNotifier("Moving {0}", "Moved {0}", true,tuple.FileInfo.Name))
826
                {
827
                    await client.MoveObject(objectInfo.Account, objectInfo.Container, oldName.Replace('\\','/').ToEscapedUri(),
828
                                                          objectInfo.Container, objectInfo.Name).ConfigureAwait(false);
829
                    StatusKeeper.MoveFileState(tuple.OldFullPath, tuple.FilePath, objectInfo, tuple.Merkle);
830
                    //StatusKeeper.StoreInfo(tuple.FilePath,objectInfo,tuple.Merkle);
831
                    //StatusKeeper.ClearFolderStatus(tuple.FilePath);
832
                }
833
                return true;
834
            }
835
            catch (Exception exc)
836
            {
837
                Log.ErrorFormat("[MOVE] Failed for [{0}],:\r\n{1}", tuple.FilePath, exc);
838
                //Return false to force an upload of the file
839
                return false;
840
            }
841

    
842
        }
843

    
844
        private void AddOwnFolderToSelectives(AccountInfo accountInfo, StateTuple tuple, string targetPath)
845
        {
846
            //Not for shared folders
847
            if (tuple.ObjectInfo.IsShared==true)
848
                return;
849
            //Also ensure that any newly created folders are added to the selectives, if the original folder was selected
850
            var containerPath = Path.Combine(accountInfo.AccountPath, tuple.ObjectInfo.Container.ToUnescapedString());
851

    
852
            //If this is a root folder encountered for the first time
853
            if (tuple.L == null && Directory.Exists(tuple.FileInfo.FullName) 
854
                && (tuple.FileInfo.FullName.IsAtOrBelow(containerPath)))
855
            {
856
                
857
                var relativePath = tuple.ObjectInfo.RelativeUrlToFilePath(accountInfo.UserName);
858
                var initialPath = Path.Combine(accountInfo.AccountPath, relativePath);
859

    
860
                //var hasMoved = true;// !initialPath.Equals(targetPath);
861
                //If the new path is under a selected folder, add it to the selectives as well
862
                if (Selectives.IsSelected(accountInfo, initialPath))
863
                {
864
                    Selectives.AddUri(accountInfo, tuple.ObjectInfo.Uri);
865
                    Selectives.Save(accountInfo);
866
                }
867
            }
868
        }
869

    
870
        private string MoveForServerMove(AccountInfo accountInfo, StateTuple tuple)
871
        {
872
            if (tuple.ObjectInfo == null)
873
                return null;
874
            var relativePath = tuple.ObjectInfo.RelativeUrlToFilePath(accountInfo.UserName);
875
            var serverPath = Path.Combine(accountInfo.AccountPath, relativePath);
876
            
877
            //Compare Case Insensitive
878
            if (String.Equals(tuple.FilePath ,serverPath,StringComparison.InvariantCultureIgnoreCase)) 
879
                return serverPath;
880

    
881
            //Has the file been renamed locally?
882
            if (!String.IsNullOrWhiteSpace(tuple.OldFullPath) &&  !tuple.OldFullPath.Equals(tuple.FilePath))
883
            {
884
                ReportConflictForDoubleRename(tuple.FilePath);
885
                return null;
886
            }
887

    
888
            tuple.FileInfo.Refresh();
889
            //The file/folder may not exist if it was moved because its parent moved
890
            if (!tuple.FileInfo.Exists)
891
            {
892
                var target=FileInfoExtensions.FromPath(serverPath);
893
                if (!target.Exists)
894
                {
895
                    Log.ErrorFormat("No source or target found while trying to move {0} to {1}", tuple.FileInfo.FullName, serverPath);
896
                }
897
                return serverPath;
898
            }
899

    
900
            using (StatusNotification.GetNotifier("Moving local {0}", "Moved local {0}", true,Path.GetFileName(tuple.FilePath)))
901
            using(NetworkGate.Acquire(tuple.FilePath,NetworkOperation.Renaming))
902
            {
903
                    
904
                var fi = tuple.FileInfo as FileInfo;
905
                if (fi != null)
906
                {
907
                    var targetFile = new FileInfo(serverPath);
908
                    if (!targetFile.Directory.Exists)
909
                        targetFile.Directory.Create();
910
                    fi.MoveTo(serverPath);
911
                }
912
                var di = tuple.FileInfo as DirectoryInfo;
913
                if (di != null)
914
                {
915
                    var targetDir = new DirectoryInfo(serverPath);
916
                    if (!targetDir.Parent.Exists)
917
                        targetDir.Parent.Create();
918
                    di.MoveTo(serverPath);
919
                }
920
            }
921

    
922
            StatusKeeper.StoreInfo(serverPath, tuple.ObjectInfo);
923

    
924
            return serverPath;
925
        }
926

    
927
        private async Task DeleteCloudFile(AccountInfo accountInfo, StateTuple tuple)
928
        {
929
            using (StatusNotification.GetNotifier("Deleting server {0}", "Deleted server {0}", true,Path.GetFileName(tuple.FilePath)))
930
            {
931

    
932
                StatusKeeper.SetFileState(tuple.FilePath, FileStatus.Deleted,
933
                                          FileOverlayStatus.Deleted, "");
934
                await NetworkAgent.DeleteAgent.DeleteCloudFile(accountInfo, tuple.ObjectInfo);
935
                StatusKeeper.ClearFileStatus(tuple.FilePath);
936
            }
937
        }
938

    
939
        private void ProcessChildren(AccountInfo accountInfo, StateTuple tuple, FileAgent agent, ConcurrentDictionary<string, MovedEventArgs> moves,HashSet<string> processedPaths,CancellationToken token)
940
        {
941

    
942
            var dirInfo = tuple.FileInfo as DirectoryInfo;
943
            var folderTuples = from folder in dirInfo.EnumerateDirectories("*", SearchOption.AllDirectories)
944
                               select new StateTuple(folder){C=Signature.MERKLE_EMPTY};
945
            
946
            var fileTuples = from file in dirInfo.EnumerateFiles("*", SearchOption.AllDirectories)
947
                             let state=StatusKeeper.GetStateByFilePath(file.FullName)
948
                             select new StateTuple(file){
949
                                            Merkle=StatusAgent.CalculateTreeHash(file,accountInfo,state,
950
                                            Settings.HashingParallelism,token,null)
951
                                        };
952
            
953
            //Process folders first, to ensure folders appear on the sever as soon as possible
954
            folderTuples.ApplyAction(t => SyncSingleItem(accountInfo, t, agent, moves, processedPaths,token).Wait());
955
            
956
            fileTuples.ApplyAction(t => SyncSingleItem(accountInfo, t, agent, moves,processedPaths, token).Wait());
957
        }
958

    
959

    
960

    
961

    
962
        
963

    
964

    
965
        /// <summary>
966
        /// Returns the latest LastModified date from the list of objects, but only if it is before
967
        /// than the threshold value
968
        /// </summary>
969
        /// <param name="threshold"></param>
970
        /// <param name="cloudObjects"></param>
971
        /// <returns></returns>
972
        private static DateTimeOffset? GetLatestDateBefore(DateTime? threshold, IList<ObjectInfo> cloudObjects)
973
        {
974
            DateTimeOffset? maxDate = null;
975
            if (cloudObjects!=null &&  cloudObjects.Count > 0)
976
                maxDate = cloudObjects.Max(obj => obj.Last_Modified);
977
            if (!maxDate.HasValue)
978
                return threshold;
979
            if (!threshold.HasValue|| threshold > maxDate)
980
                return maxDate;
981
            return threshold;
982
        }
983

    
984
        /// <summary>
985
        /// Returns the latest LastModified date from the list of objects, but only if it is after
986
        /// the threshold value
987
        /// </summary>
988
        /// <param name="threshold"></param>
989
        /// <param name="cloudObjects"></param>
990
        /// <returns></returns>
991
        private static DateTimeOffset? GetLatestDateAfter(DateTimeOffset? threshold, IList<ObjectInfo> cloudObjects)
992
        {
993
            DateTimeOffset? maxDate = null;
994
            if (cloudObjects!=null &&  cloudObjects.Count > 0)
995
                maxDate = cloudObjects.Max(obj => obj.Last_Modified);
996
            if (!maxDate.HasValue)
997
                return threshold;
998
            if (!threshold.HasValue|| threshold < maxDate)
999
                return maxDate;
1000
            return threshold;
1001
        }
1002

    
1003
        readonly AccountsDifferencer _differencer = new AccountsDifferencer();
1004
        private bool _pause;
1005
        
1006

    
1007

    
1008

    
1009
        private void ReportConflictForMismatch(string localFilePath)
1010
        {
1011
            if (String.IsNullOrWhiteSpace(localFilePath))
1012
                throw new ArgumentNullException("localFilePath");
1013
            Contract.EndContractBlock();
1014

    
1015
            StatusKeeper.SetFileState(localFilePath, FileStatus.Conflict, FileOverlayStatus.Conflict, "File changed at the server");
1016
            UpdateStatus(PithosStatus.HasConflicts);
1017
            var message = String.Format("Conflict detected for file {0}", localFilePath);
1018
            Log.Warn(message);
1019
            StatusNotification.NotifyChange(message, TraceLevel.Warning);
1020
        }
1021

    
1022
        private void ReportConflictForDoubleRename(string localFilePath)
1023
        {
1024
            if (String.IsNullOrWhiteSpace(localFilePath))
1025
                throw new ArgumentNullException("localFilePath");
1026
            Contract.EndContractBlock();
1027

    
1028
            StatusKeeper.SetFileState(localFilePath, FileStatus.Conflict, FileOverlayStatus.Conflict, "File renamed both locally and on the server");
1029
            UpdateStatus(PithosStatus.HasConflicts);
1030
            var message = String.Format("Double rename conflict detected for file {0}", localFilePath);
1031
            Log.Warn(message);
1032
            StatusNotification.NotifyChange(message, TraceLevel.Warning);
1033
        }
1034

    
1035

    
1036
        /// <summary>
1037
        /// Notify the UI to update the visual status
1038
        /// </summary>
1039
        /// <param name="status"></param>
1040
        private void UpdateStatus(PithosStatus status)
1041
        {
1042
            try
1043
            {
1044
                StatusNotification.SetPithosStatus(status);
1045
                //StatusNotification.Notify(new Notification());
1046
            }
1047
            catch (Exception exc)
1048
            {
1049
                //Failure is not critical, just log it
1050
                Log.Warn("Error while updating status", exc);
1051
            }
1052
        }
1053

    
1054
        private static void CreateContainerFolders(AccountInfo accountInfo, IEnumerable<ContainerInfo> containers)
1055
        {
1056
            var containerPaths = from container in containers
1057
                                 let containerPath = Path.Combine(accountInfo.AccountPath, container.Name.ToUnescapedString())
1058
                                 where container.Name.ToString() != FolderConstants.TrashContainer && !Directory.Exists(containerPath)
1059
                                 select containerPath;
1060

    
1061
            foreach (var path in containerPaths)
1062
            {
1063
                Directory.CreateDirectory(path);
1064
            }
1065
        }
1066

    
1067
        public void AddAccount(AccountInfo accountInfo)
1068
        {
1069
            //Avoid adding a duplicate accountInfo
1070
            _accounts.TryAdd(accountInfo.AccountKey, accountInfo);
1071
        }
1072

    
1073
        public void RemoveAccount(AccountInfo accountInfo)
1074
        {
1075
            if (accountInfo == null)
1076
                return;
1077

    
1078
            AccountInfo account;
1079
            _accounts.TryRemove(accountInfo.AccountKey, out account);
1080

    
1081
            SnapshotDifferencer differencer;
1082
            _differencer.Differencers.TryRemove(accountInfo.AccountKey, out differencer);
1083
        }
1084
       
1085
    }
1086
}