Statistics
| Branch: | Revision:

root / trunk / Pithos.Core / Agents / PollAgent.cs @ f4f39ccb

History | View | Annotate | Download (44.9 kB)

1
#region
2
/* -----------------------------------------------------------------------
3
 * <copyright file="PollAgent.cs" company="GRNet">
4
 * 
5
 * Copyright 2011-2012 GRNET S.A. All rights reserved.
6
 *
7
 * Redistribution and use in source and binary forms, with or
8
 * without modification, are permitted provided that the following
9
 * conditions are met:
10
 *
11
 *   1. Redistributions of source code must retain the above
12
 *      copyright notice, this list of conditions and the following
13
 *      disclaimer.
14
 *
15
 *   2. Redistributions in binary form must reproduce the above
16
 *      copyright notice, this list of conditions and the following
17
 *      disclaimer in the documentation and/or other materials
18
 *      provided with the distribution.
19
 *
20
 *
21
 * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
22
 * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
23
 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
24
 * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
25
 * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26
 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27
 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
28
 * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
29
 * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
30
 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
31
 * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
32
 * POSSIBILITY OF SUCH DAMAGE.
33
 *
34
 * The views and conclusions contained in the software and
35
 * documentation are those of the authors and should not be
36
 * interpreted as representing official policies, either expressed
37
 * or implied, of GRNET S.A.
38
 * </copyright>
39
 * -----------------------------------------------------------------------
40
 */
41
#endregion
42

    
43
using System.Collections.Concurrent;
44
using System.ComponentModel.Composition;
45
using System.Diagnostics;
46
using System.Diagnostics.Contracts;
47
using System.IO;
48
using System.Linq.Expressions;
49
using System.Reflection;
50
using System.Security.Cryptography;
51
using System.Threading;
52
using System.Threading.Tasks;
53
using System.Threading.Tasks.Dataflow;
54
using Castle.ActiveRecord;
55
using Pithos.Interfaces;
56
using Pithos.Network;
57
using log4net;
58

    
59
namespace Pithos.Core.Agents
60
{
61
    using System;
62
    using System.Collections.Generic;
63
    using System.Linq;
64

    
65
    /*public class PollRequest
66
    {
67
        public DateTime? Since { get; set; }
68
        public IEnumerable<string> Batch { get; set; }
69
    }*/
70

    
71

    
72
    /// <summary>
73
    /// PollAgent periodically polls the server to detect object changes. The agent retrieves a listing of all
74
    /// objects and compares it with a previously cached version to detect differences. 
75
    /// New files are downloaded, missing files are deleted from the local file system and common files are compared
76
    /// to determine the appropriate action
77
    /// </summary>
78
    [Export]
79
    public class PollAgent
80
    {
81
        private static readonly ILog Log = LogManager.GetLogger(MethodBase.GetCurrentMethod().DeclaringType);
82

    
83
        [System.ComponentModel.Composition.Import]
84
        public IStatusKeeper StatusKeeper { get; set; }
85

    
86
        [System.ComponentModel.Composition.Import]
87
        public IPithosSettings Settings { get; set; }
88

    
89
        [System.ComponentModel.Composition.Import]
90
        public NetworkAgent NetworkAgent { get; set; }
91

    
92
        [System.ComponentModel.Composition.Import]
93
        public Selectives Selectives { get; set; }
94

    
95
        public IStatusNotification StatusNotification { get; set; }
96

    
97
        private CancellationTokenSource _currentOperationCancellation = new CancellationTokenSource();
98

    
99
        public void CancelCurrentOperation()
100
        {
101
            //What does it mean to cancel the current upload/download?
102
            //Obviously, the current operation will be cancelled by throwing
103
            //a cancellation exception.
104
            //
105
            //The default behavior is to retry any operations that throw.
106
            //Obviously this is not what we want in this situation.
107
            //The cancelled operation should NOT bea retried. 
108
            //
109
            //This can be done by catching the cancellation exception
110
            //and avoiding the retry.
111
            //
112

    
113
            //Have to reset the cancellation source - it is not possible to reset the source
114
            //Have to prevent a case where an operation requests a token from the old source
115
            var oldSource = Interlocked.Exchange(ref _currentOperationCancellation, new CancellationTokenSource());
116
            oldSource.Cancel();
117

    
118
        }
119

    
120
        public bool Pause
121
        {
122
            get {
123
                return _pause;
124
            }
125
            set {
126
                _pause = value;                
127
                if (!_pause)
128
                    _unPauseEvent.Set();
129
                else
130
                {
131
                    _unPauseEvent.Reset();
132
                }
133
            }
134
        }
135

    
136
        private bool _firstPoll = true;
137

    
138
        //The Sync Event signals a manual synchronisation
139
        private readonly AsyncManualResetEvent _syncEvent = new AsyncManualResetEvent();
140

    
141
        private readonly AsyncManualResetEvent _unPauseEvent = new AsyncManualResetEvent(true);
142

    
143
        private readonly ConcurrentDictionary<string, DateTime> _lastSeen = new ConcurrentDictionary<string, DateTime>();
144
        private readonly ConcurrentDictionary<Uri, AccountInfo> _accounts = new ConcurrentDictionary<Uri,AccountInfo>();
145

    
146
        //private readonly ActionBlock<PollRequest>  _pollAction;
147

    
148
        readonly HashSet<string> _knownContainers = new HashSet<string>();
149

    
150
        
151
        /// <summary>
152
        /// Start a manual synchronization
153
        /// </summary>
154
        public void SynchNow(IEnumerable<string> paths=null)
155
        {
156
            _batchQueue.Enqueue(paths);
157
            _syncEvent.Set();                
158

    
159
            //_pollAction.Post(new PollRequest {Batch = paths});
160
        }
161

    
162
        readonly ConcurrentQueue<IEnumerable<string>> _batchQueue=new ConcurrentQueue<IEnumerable<string>>();
163

    
164
        /// <summary>
165
        /// Remote files are polled periodically. Any changes are processed
166
        /// </summary>
167
        /// <param name="since"></param>
168
        /// <returns></returns>
169
        public  void PollRemoteFiles(DateTime? since = null)
170
        {
171
            if (Log.IsDebugEnabled)
172
                Log.DebugFormat("Polling changes after [{0}]",since);
173

    
174
            Debug.Assert(Thread.CurrentThread.IsBackground, "Polling Ended up in the main thread!");
175

    
176
            //GC.Collect();
177

    
178
            using (ThreadContext.Stacks["Retrieve Remote"].Push("All accounts"))
179
            {
180
                //If this poll fails, we will retry with the same since value
181
                var nextSince = since;
182
                try
183
                {
184
                    _unPauseEvent.Wait();
185
                    UpdateStatus(PithosStatus.PollSyncing);
186

    
187
                    var accountBatches=new Dictionary<Uri, IEnumerable<string>>();
188
                    IEnumerable<string> batch = null;
189
                    if (_batchQueue.TryDequeue(out batch) && batch != null)
190
                        foreach (var account in _accounts.Values)
191
                        {
192
                            var accountBatch = batch.Where(path => path.IsAtOrBelow(account.AccountPath));
193
                            accountBatches[account.AccountKey] = accountBatch;
194
                        }
195

    
196
                    var tasks = new List<Task<DateTime?>>();
197
                    foreach(var accountInfo in _accounts.Values)
198
                    {
199
                        IEnumerable<string> accountBatch ;
200
                        accountBatches.TryGetValue(accountInfo.AccountKey,out accountBatch);
201
                        var t=ProcessAccountFiles (accountInfo, accountBatch, since);
202
                        tasks.Add(t);
203
                    }
204

    
205
                    var nextTimes=TaskEx.WhenAll(tasks.ToList()).Result;
206

    
207
                    _firstPoll = false;
208
                    //Reschedule the poll with the current timestamp as a "since" value
209

    
210
                    if (nextTimes.Length>0)
211
                        nextSince = nextTimes.Min();
212
                    if (Log.IsDebugEnabled)
213
                        Log.DebugFormat("Next Poll at [{0}]",nextSince);
214
                }
215
                catch (Exception ex)
216
                {
217
                    Log.ErrorFormat("Error while processing accounts\r\n{0}", ex);
218
                    //In case of failure retry with the same "since" value
219
                }
220

    
221
                UpdateStatus(PithosStatus.PollComplete);
222
                //The multiple try blocks are required because we can't have an await call
223
                //inside a finally block
224
                //TODO: Find a more elegant solution for reschedulling in the event of an exception
225
                try
226
                {
227
                    //Wait for the polling interval to pass or the Sync event to be signalled
228
                    nextSince = WaitForScheduledOrManualPoll(nextSince).Result;
229
                }
230
                finally
231
                {
232
                    //Ensure polling is scheduled even in case of error
233
                    TaskEx.Run(()=>PollRemoteFiles(nextSince));
234
                    //_pollAction.Post(new PollRequest {Since = nextSince});
235
                }
236
            }
237
        }
238

    
239
        /// <summary>
240
        /// Wait for the polling period to expire or a manual sync request
241
        /// </summary>
242
        /// <param name="since"></param>
243
        /// <returns></returns>
244
        private async Task<DateTime?> WaitForScheduledOrManualPoll(DateTime? since)
245
        {
246
            var sync = _syncEvent.WaitAsync();
247
            var wait = TaskEx.Delay(TimeSpan.FromSeconds(Settings.PollingInterval));
248

    
249
            var signaledTask = await TaskEx.WhenAny(sync, wait).ConfigureAwait(false);
250
            
251
            //Pausing takes precedence over manual sync or awaiting
252
            _unPauseEvent.Wait();
253
            
254
            //Wait for network processing to finish before polling
255
            var pauseTask=NetworkAgent.ProceedEvent.WaitAsync();
256
            await TaskEx.WhenAll(signaledTask, pauseTask).ConfigureAwait(false);
257

    
258
            //If polling is signalled by SynchNow, ignore the since tag
259
            if (sync.IsCompleted)
260
            {                
261
                _syncEvent.Reset();
262
                return null;
263
            }
264
            return since;
265
        }
266

    
267
        
268

    
269
        public async Task<DateTime?> ProcessAccountFiles(AccountInfo accountInfo, IEnumerable<string> accountBatch, DateTime? since = null)
270
        {
271
            if (accountInfo == null)
272
                throw new ArgumentNullException("accountInfo");
273
            if (String.IsNullOrWhiteSpace(accountInfo.AccountPath))
274
                throw new ArgumentException("The AccountInfo.AccountPath is empty", "accountInfo");
275
            Contract.EndContractBlock();
276

    
277

    
278
            using (ThreadContext.Stacks["Retrieve Remote"].Push(accountInfo.UserName))
279
            {
280

    
281
                await NetworkAgent.GetDeleteAwaiter().ConfigureAwait(false);
282

    
283
                Log.Info("Scheduled");
284
                var client = new CloudFilesClient(accountInfo);
285

    
286
                //We don't need to check the trash container
287
                var containers = client.ListContainers(accountInfo.UserName)
288
                    .Where(c=>c.Name!="trash")
289
                    .ToList();
290

    
291

    
292
                CreateContainerFolders(accountInfo, containers);
293

    
294
                //The nextSince time fallback time is the same as the current.
295
                //If polling succeeds, the next Since time will be the smallest of the maximum modification times
296
                //of the shared and account objects
297
                var nextSince = since;
298

    
299
                try
300
                {
301
                    //Wait for any deletions to finish
302
                    await NetworkAgent.GetDeleteAwaiter().ConfigureAwait(false);
303
                    //Get the poll time now. We may miss some deletions but it's better to keep a file that was deleted
304
                    //than delete a file that was created while we were executing the poll                    
305

    
306
                    //Get the list of server objects changed since the last check
307
                    //The name of the container is passed as state in order to create a dictionary of tasks in a subsequent step
308
                    var listObjects = (from container in containers
309
                                       select Task<IList<ObjectInfo>>.Factory.StartNew(_ =>
310
                                             client.ListObjects(accountInfo.UserName, container.Name, since), container.Name)).ToList();
311

    
312
                    var listShared = Task<IList<ObjectInfo>>.Factory.StartNew(_ => 
313
                        client.ListSharedObjects(_knownContainers,since), "shared");
314
                    listObjects.Add(listShared);
315
                    var listTasks = await Task.Factory.WhenAll(listObjects.ToArray()).ConfigureAwait(false);
316

    
317
                    using (ThreadContext.Stacks["SCHEDULE"].Push("Process Results"))
318
                    {
319
                        var dict = listTasks.ToDictionary(t => t.AsyncState);
320

    
321
                        //Get all non-trash objects. Remember, the container name is stored in AsyncState
322
                        var remoteObjects = (from objectList in listTasks
323
                                            where (string)objectList.AsyncState != "trash"
324
                                            from obj in objectList.Result
325
                                            orderby obj.Bytes ascending 
326
                                            select obj).ToList();
327
                        
328
                        //Get the latest remote object modification date, only if it is after
329
                        //the original since date                        
330
                        nextSince = GetLatestDateAfter(nextSince, remoteObjects);
331

    
332
                        var sharedObjects = dict["shared"].Result;
333

    
334
                        //DON'T process trashed files
335
                        //If some files are deleted and added again to a folder, they will be deleted
336
                        //even though they are new.
337
                        //We would have to check file dates and hashes to ensure that a trashed file
338
                        //can be deleted safely from the local hard drive.
339
                        /*
340
                        //Items with the same name, hash may be both in the container and the trash
341
                        //Don't delete items that exist in the container
342
                        var realTrash = from trash in trashObjects
343
                                        where
344
                                            !remoteObjects.Any(
345
                                                info => info.Name == trash.Name && info.Hash == trash.Hash)
346
                                   8     select trash;
347
                        ProcessTrashedFiles(accountInfo, realTrash);
348
*/
349

    
350
                        var cleanRemotes = (from info in remoteObjects.Union(sharedObjects)
351
                                            let name = info.Name??""
352
                                            where !name.EndsWith(".ignore", StringComparison.InvariantCultureIgnoreCase) &&
353
                                                  !name.StartsWith(FolderConstants.CacheFolder + "/",
354
                                                                   StringComparison.InvariantCultureIgnoreCase)
355
                                            select info).ToList();
356

    
357
                        if (_firstPoll)
358
                            StatusKeeper.CleanupOrphanStates();
359
                        
360
                        var differencer = _differencer.PostSnapshot(accountInfo, cleanRemotes);
361
                        var currentRemotes = differencer.Current.ToList();
362
                        StatusKeeper.CleanupStaleStates(accountInfo, currentRemotes);
363

    
364
                        //var filterUris = Selectives.SelectiveUris[accountInfo.AccountKey];
365

    
366
                        //May have to wait if the FileAgent has asked for a Pause, due to local changes
367
                        await _unPauseEvent.WaitAsync().ConfigureAwait(false);
368

    
369
                        //Get the local files here                        
370
                        var agent = AgentLocator<FileAgent>.Get(accountInfo.AccountPath);                                                
371
                        var files = LoadLocalFileTuples(accountInfo, accountBatch);
372

    
373
                        var states = FileState.Queryable.ToList();                        
374
                        
375
                        var infos = (from remote in currentRemotes
376
                                    let path = remote.RelativeUrlToFilePath(accountInfo.UserName)
377
                                    let info=agent.GetFileSystemInfo(path)
378
                                    select Tuple.Create(info.FullName,remote))
379
                                    .ToList();
380

    
381
                        var token = _currentOperationCancellation.Token;
382

    
383
                        var tuples = MergeSources(infos, files, states).ToList();
384

    
385
                        //Process only the changes in the batch file, if one exists
386
                        var stateTuples = accountBatch==null?tuples:tuples.Where(t => accountBatch.Contains(t.FilePath));
387
                        foreach (var tuple in stateTuples.Where(s=>!s.Locked))
388
                        {
389
                            await _unPauseEvent.WaitAsync().ConfigureAwait(false);
390

    
391
                            //Set the Merkle Hash
392
                            //SetMerkleHash(accountInfo, tuple);
393

    
394
                            await SyncSingleItem(accountInfo, tuple, agent, token).ConfigureAwait(false);
395

    
396
                        }
397

    
398

    
399
                        //On the first run
400
/*
401
                        if (_firstPoll)
402
                        {
403
                            MarkSuspectedDeletes(accountInfo, cleanRemotes);
404
                        }
405
*/
406

    
407

    
408
                        Log.Info("[LISTENER] End Processing");
409
                    }
410
                }
411
                catch (Exception ex)
412
                {
413
                    Log.ErrorFormat("[FAIL] ListObjects for {0} in ProcessRemoteFiles with {1}", accountInfo.UserName, ex);
414
                    return nextSince;
415
                }
416

    
417
                Log.Info("[LISTENER] Finished");
418
                return nextSince;
419
            }
420
        }
421
/*
422

    
423
        private static void SetMerkleHash(AccountInfo accountInfo, StateTuple tuple)
424
        {
425
            //The Merkle hash for directories is that of an empty buffer
426
            if (tuple.FileInfo is DirectoryInfo)
427
                tuple.C = MERKLE_EMPTY;
428
            else if (tuple.FileState != null && tuple.MD5 == tuple.FileState.ETag)
429
            {
430
                //If there is a state whose MD5 matches, load the merkle hash from the file state
431
                //insteaf of calculating it
432
                tuple.C = tuple.FileState.Checksum;                              
433
            }
434
            else
435
            {
436
                tuple.Merkle = Signature.CalculateTreeHashAsync((FileInfo)tuple.FileInfo, accountInfo.BlockSize, accountInfo.BlockHash,1,progress);
437
                //tuple.C=tuple.Merkle.TopHash.ToHashString();                
438
            }
439
        }
440
*/
441

    
442
        private IEnumerable<FileSystemInfo> LoadLocalFileTuples(AccountInfo accountInfo,IEnumerable<string> batch )
443
        {
444
            using (ThreadContext.Stacks["Account Files Hashing"].Push(accountInfo.UserName))
445
            {
446
                var batchPaths = (batch==null)?new List<string>():batch.ToList();
447
                IEnumerable<FileSystemInfo> localInfos=AgentLocator<FileAgent>.Get(accountInfo.AccountPath)
448
                                                        .EnumerateFileSystemInfos();
449
                if (batchPaths.Count>0)
450
                    localInfos= localInfos.Where(fi => batchPaths.Contains(fi.FullName));
451

    
452
                return localInfos;
453
            }
454
        }
455

    
456
        /// <summary>
457
        /// Wait and Pause the agent while waiting
458
        /// </summary>
459
        /// <param name="backoff"></param>
460
        /// <returns></returns>
461
        private async Task PauseFor(int backoff)
462
        {
463

    
464
            Pause = true;
465
            await TaskEx.Delay(backoff).ConfigureAwait(false);
466
            Pause = false;
467
        }
468

    
469
        private async Task SyncSingleItem(AccountInfo accountInfo, StateTuple tuple, FileAgent agent, CancellationToken token)
470
        {
471
            Log.DebugFormat("Sync [{0}] C:[{1}] L:[{2}] S:[{3}]", tuple.FilePath, tuple.C, tuple.L, tuple.S);
472

    
473
            try
474
            {
475
                bool isInferredParent = tuple.ObjectInfo != null && tuple.ObjectInfo.UUID.StartsWith("00000000-0000-0000");
476

    
477
                var localFilePath = tuple.FilePath;
478
                //Don't use the tuple info, it may have been deleted
479
                var localInfo = FileInfoExtensions.FromPath(localFilePath);
480

    
481

    
482
                var isUnselectedRootFolder = agent.IsUnselectedRootFolder(tuple.FilePath);
483

    
484
                //Unselected root folders that have not yet been uploaded should be uploaded and added to the 
485
                //selective folders
486

    
487
                if (!Selectives.IsSelected(accountInfo, localFilePath) &&
488
                    !(isUnselectedRootFolder && tuple.ObjectInfo == null))
489
                    return;
490

    
491
                // Local file unchanged? If both C and L are null, make sure it's because 
492
                //both the file is missing and the state checksum is not missing
493
                if (tuple.C == tuple.L /*&& (localInfo.Exists || tuple.FileState == null)*/)
494
                {
495
                    //No local changes
496
                    //Server unchanged?
497
                    if (tuple.S == tuple.L)
498
                    {
499
                        // No server changes
500
                        //Has the file been renamed on the server?
501
                        MoveForServerMove(accountInfo, tuple);
502
                    }
503
                    else
504
                    {
505
                        //Different from server
506
                        //Does the server file exist?
507
                        if (tuple.S == null)
508
                        {
509
                            //Server file doesn't exist
510
                            //deleteObjectFromLocal()
511
                            using (
512
                                StatusNotification.GetNotifier("Deleting local {0}", "Deleted local {0}",
513
                                                               Path.GetFileName(localFilePath)))
514
                            {
515
                                StatusKeeper.SetFileState(localFilePath, FileStatus.Deleted,
516
                                                          FileOverlayStatus.Deleted, "");
517
                                using (NetworkGate.Acquire(localFilePath, NetworkOperation.Deleting))
518
                                {
519
                                    agent.Delete(localFilePath);
520
                                }
521
                                //updateRecord(Remove C, L)
522
                                StatusKeeper.ClearFileStatus(localFilePath);
523
                            }
524
                        }
525
                        else
526
                        {
527
                            //Server file exists
528
                            //downloadServerObject() // Result: L = S
529
                            //If the file has moved on the server, move it locally before downloading
530
                            using (
531
                                StatusNotification.GetNotifier("Downloading {0}", "Downloaded {0}",
532
                                                               Path.GetFileName(localFilePath)))
533
                            {
534
                                var targetPath = MoveForServerMove(accountInfo, tuple);
535

    
536
                                StatusKeeper.SetFileState(targetPath, FileStatus.Modified, FileOverlayStatus.Modified,
537
                                                          "");
538

    
539
                                await
540
                                    NetworkAgent.Downloader.DownloadCloudFile(accountInfo, tuple.ObjectInfo, targetPath,
541
                                                                              token)
542
                                        .ConfigureAwait(false);
543
                                //updateRecord( L = S )
544
                                StatusKeeper.UpdateFileChecksum(targetPath, tuple.ObjectInfo.ETag,
545
                                                                tuple.ObjectInfo.X_Object_Hash);
546

    
547
                                StatusKeeper.StoreInfo(targetPath, tuple.ObjectInfo);
548

    
549
                                AddRootFolderToSelectives(accountInfo, tuple, targetPath);
550
                            }
551

    
552
                            /*
553
                                                            StatusKeeper.SetFileState(targetPath, FileStatus.Unchanged,
554
                                                                                      FileOverlayStatus.Normal, "");
555
                                */
556
                        }
557
                    }
558
                }
559
                else
560
                {                   
561

    
562
                    //Local changes found
563

    
564
                    //Server unchanged?
565
                    if (tuple.S == tuple.L)
566
                    {
567
                        //The FileAgent selective sync checks for new root folder files
568
                        if (!agent.Ignore(localFilePath))
569
                        {
570
                            if ((tuple.C == null || !localInfo.Exists) && tuple.ObjectInfo != null)
571
                            {
572
                                //deleteObjectFromServer()
573
                                DeleteCloudFile(accountInfo, tuple);
574
                                //updateRecord( Remove L, S)                  
575
                            }
576
                            else
577
                            {
578
                                //uploadLocalObject() // Result: S = C, L = S                        
579
                                var progress = new Progress<double>(d =>
580
                                    StatusNotification.Notify(new StatusNotification(String.Format("Merkle Hashing for Upload {0:p} of {1}", d, localInfo.Name))));
581

    
582
                                //Debug.Assert(tuple.FileState !=null);
583
                                var action = new CloudUploadAction(accountInfo, localInfo, tuple.FileState,
584
                                                                   accountInfo.BlockSize, accountInfo.BlockHash,
585
                                                                   "Poll", isUnselectedRootFolder,progress);
586
                                using (
587
                                    StatusNotification.GetNotifier("Uploading {0}", "Uploaded {0}",
588
                                                                   Path.GetFileName(localFilePath)))
589
                                {
590
                                    await NetworkAgent.Uploader.UploadCloudFile(action, token).ConfigureAwait(false);
591
                                }
592

    
593
                                //updateRecord( S = C )
594
                                //State updated by the uploader
595

    
596
                                if (isUnselectedRootFolder)
597
                                {
598
                                    ProcessChildren(accountInfo, tuple, agent, token);
599
                                }
600
                            }
601
                        }
602
                    }
603
                    else
604
                    {
605
                        if (tuple.C == tuple.S)
606
                        {
607
                            // (Identical Changes) Result: L = S
608
                            //doNothing()
609
                            
610
                            //Don't update anything for nonexistend server files
611
                            if (tuple.S != null)
612
                            {
613
                                //Detect server moves
614
                                var targetPath = MoveForServerMove(accountInfo, tuple);
615
                                StatusKeeper.StoreInfo(targetPath, tuple.ObjectInfo);
616
                                
617
                                AddRootFolderToSelectives(accountInfo, tuple, targetPath);
618
                            }
619
                            else
620
                            {
621
                                //At this point, C==S==NULL and we have a stale state (L)
622
                                //Log the stale tuple for investigation
623
                                Log.WarnFormat("Stale tuple detected FilePathPath:[{0}], State:[{1}], LocalFile:[{2}]", tuple.FilePath, tuple.FileState, tuple.FileInfo);
624

    
625
                                //And remove it
626
                                if (!String.IsNullOrWhiteSpace(tuple.FilePath))
627
                                    StatusKeeper.ClearFileStatus(tuple.FilePath);
628
                            }
629
                        }
630
                        else
631
                        {
632
                            if ((tuple.C == null || !localInfo.Exists) && tuple.ObjectInfo != null)
633
                            {
634
                                //deleteObjectFromServer()
635
                                DeleteCloudFile(accountInfo, tuple);
636
                                //updateRecord(Remove L, S)                  
637
                            }
638
                                //If both the local and server files are missing, the state is stale
639
                            else if (!localInfo.Exists && (tuple.S == null || tuple.ObjectInfo == null))
640
                            {
641
                                StatusKeeper.ClearFileStatus(localInfo.FullName);
642
                            }
643
                            else
644
                            {
645
                                ReportConflictForMismatch(localFilePath);
646
                                //identifyAsConflict() // Manual action required
647
                            }
648
                        }
649
                    }
650
                }
651
            }
652
            catch (Exception exc)
653
            {
654
                //In case of error log and retry with the next poll
655
                Log.ErrorFormat("[SYNC] Failed for file {0}. Will Retry.\r\n{1}",tuple.FilePath,exc);
656

    
657
                
658
            }
659
        }
660

    
661
        private void AddRootFolderToSelectives(AccountInfo accountInfo, StateTuple tuple, string targetPath)
662
        {
663
            //Also ensure that any newly created folders are added to the selectives, if the original folder was selected
664
            var containerPath = Path.Combine(accountInfo.AccountPath, tuple.ObjectInfo.Container);
665

    
666
            //If this is a root folder encountered for the first time
667
            if (tuple.L == null && Directory.Exists(tuple.FileInfo.FullName) 
668
                && (tuple.FileInfo.FullName.IsAtOrDirectlyBelow(containerPath)))
669
            {
670
                
671
                var relativePath = tuple.ObjectInfo.RelativeUrlToFilePath(accountInfo.UserName);
672
                var initialPath = Path.Combine(accountInfo.AccountPath, relativePath);
673

    
674
                var hasMoved = true;// !initialPath.Equals(targetPath);
675
                if (hasMoved && Selectives.IsSelected(accountInfo, initialPath))
676
                {
677
                    Selectives.AddUri(accountInfo, tuple.ObjectInfo.Uri);
678
                    Selectives.Save(accountInfo);
679
                }
680
            }
681
        }
682

    
683
        private string MoveForServerMove(AccountInfo accountInfo, StateTuple tuple)
684
        {
685
            if (tuple.ObjectInfo == null)
686
                return null;
687
            var relativePath = tuple.ObjectInfo.RelativeUrlToFilePath(accountInfo.UserName);
688
            var serverPath = Path.Combine(accountInfo.AccountPath, relativePath);
689
            
690
            //Compare Case Insensitive
691
            if (String.Equals(tuple.FilePath ,serverPath,StringComparison.InvariantCultureIgnoreCase)) 
692
                return serverPath;
693

    
694
            if (tuple.FileInfo.Exists)
695
            {
696
                using (StatusNotification.GetNotifier("Moving local {0}", "Moved local {0}", Path.GetFileName(tuple.FilePath)))
697
                using(NetworkGate.Acquire(tuple.FilePath,NetworkOperation.Renaming))
698
                {
699
                    var fi = tuple.FileInfo as FileInfo;
700
                    if (fi != null)
701
                        fi.MoveTo(serverPath);
702
                    var di = tuple.FileInfo as DirectoryInfo;
703
                    if (di != null)
704
                        di.MoveTo(serverPath);
705
                }
706
                StatusKeeper.StoreInfo(serverPath, tuple.ObjectInfo);
707
            }
708
            else
709
            {
710
                Debug.Assert(false, "File does not exist");
711
            }
712
            return serverPath;
713
        }
714

    
715
        private void DeleteCloudFile(AccountInfo accountInfo, StateTuple tuple)
716
        {
717
            using (StatusNotification.GetNotifier("Deleting server {0}", "Deleted server {0}", Path.GetFileName(tuple.FilePath)))
718
            {
719

    
720
                StatusKeeper.SetFileState(tuple.FilePath, FileStatus.Deleted,
721
                                          FileOverlayStatus.Deleted, "");
722
                NetworkAgent.DeleteAgent.DeleteCloudFile(accountInfo, tuple.ObjectInfo);
723
                StatusKeeper.ClearFileStatus(tuple.FilePath);
724
            }
725
        }
726

    
727
        private void ProcessChildren(AccountInfo accountInfo, StateTuple tuple, FileAgent agent, CancellationToken token)
728
        {
729

    
730
            var dirInfo = tuple.FileInfo as DirectoryInfo;
731
            var folderTuples = from folder in dirInfo.EnumerateDirectories("*", SearchOption.AllDirectories)
732
                               select new StateTuple(folder);
733
            var fileTuples = from file in dirInfo.EnumerateFiles("*", SearchOption.AllDirectories)
734
                             select new StateTuple(file);
735
            
736
            //Process folders first, to ensure folders appear on the sever as soon as possible
737
            folderTuples.ApplyAction(async t =>await SyncSingleItem(accountInfo, t, agent, token).ConfigureAwait(false));
738
            
739
            fileTuples.ApplyAction(async t => await SyncSingleItem(accountInfo, t, agent, token).ConfigureAwait(false));
740
        }
741

    
742

    
743
        /*
744
         *                 //Use the queue to retry locked file hashing
745
                var fileQueue = new ConcurrentQueue<FileSystemInfo>(localInfos);
746
                
747

    
748
                var results = new List<Tuple<FileSystemInfo, string>>();
749
                var backoff = 0;
750
                while (fileQueue.Count > 0)
751
                {
752
                    FileSystemInfo file;
753
                    fileQueue.TryDequeue(out file);
754
                    using (ThreadContext.Stacks["File"].Push(file.FullName))
755
                    {
756
                        try
757
                        {
758
                            //Replace MD5 here, do the calc while syncing individual files
759
                            string hash ;
760
                            if (file is DirectoryInfo)
761
                                hash = MD5_EMPTY;
762
                            else
763
                            {
764
                                //Wait in case the FileAgent has requested a Pause
765
                                await _unPauseEvent.WaitAsync().ConfigureAwait(false);
766
                                
767
                                using (StatusNotification.GetNotifier("Hashing {0}", "", file.Name))
768
                                {
769
                                    hash = ((FileInfo)file).ComputeShortHash(StatusNotification);
770
                                    backoff = 0;
771
                                }
772
                            }                            
773
                            results.Add(Tuple.Create(file, hash));
774
                        }
775
                        catch (IOException exc)
776
                        {
777
                            Log.WarnFormat("[HASH] File in use, will retry [{0}]", exc);
778
                            fileQueue.Enqueue(file);
779
                            //If this is the only enqueued file                            
780
                            if (fileQueue.Count != 1) continue;
781
                            
782
                            
783
                            //Increase delay
784
                            if (backoff<60000)
785
                                backoff += 10000;
786
                            //Pause Polling for the specified time
787
                        }
788
                        if (backoff>0)
789
                            await PauseFor(backoff).ConfigureAwait(false);
790
                    }
791
                }
792

    
793
                return results;
794

    
795
         */
796
        private IEnumerable<StateTuple> MergeSources(
797
            IEnumerable<Tuple<string, ObjectInfo>> infos, 
798
            IEnumerable<FileSystemInfo> files, 
799
            IEnumerable<FileState> states)
800
        {
801
            var tuplesByPath = files.ToDictionary(f => f.FullName, f => new StateTuple {FileInfo = f}); new Dictionary<string, StateTuple>();
802

    
803
            //For files that have state
804
            foreach (var state in states)
805
            {
806
                StateTuple hashTuple;
807
                if (tuplesByPath.TryGetValue(state.FilePath, out hashTuple))
808
                {
809
                    hashTuple.FileState = state;
810
                    UpdateMD5(hashTuple);
811
                }
812
                else
813
                {
814
                    var fsInfo = FileInfoExtensions.FromPath(state.FilePath);
815
                    hashTuple = new StateTuple {FileInfo = fsInfo, FileState = state};
816
                    tuplesByPath[state.FilePath] = hashTuple;
817
                }
818
            }
819
            //for files that don't have state
820
            foreach (var tuple in tuplesByPath.Values.Where(t => t.FileState == null))
821
            {
822
                UpdateMD5(tuple);
823
            }
824

    
825
            var tuplesByID = tuplesByPath.Values
826
                .Where(tuple => tuple.FileState != null && tuple.FileState.ObjectID!=null)
827
                .ToDictionary(tuple=>tuple.FileState.ObjectID,tuple=>tuple);//new Dictionary<Guid, StateTuple>();
828

    
829
            foreach (var info in infos)
830
            {
831
                StateTuple hashTuple;
832
                var filePath = info.Item1;
833
                var objectInfo = info.Item2;
834
                var objectID = objectInfo.UUID;
835

    
836
                if (objectID != _emptyGuid &&  tuplesByID.TryGetValue(objectID, out hashTuple))
837
                {
838
                    hashTuple.ObjectInfo = objectInfo;                    
839
                }
840
                else if (tuplesByPath.TryGetValue(filePath, out hashTuple))
841
                {
842
                    hashTuple.ObjectInfo = objectInfo;
843
                }
844
                else
845
                {
846

    
847
                    
848
                    var fsInfo = FileInfoExtensions.FromPath(filePath);
849
                    hashTuple= new StateTuple {FileInfo = fsInfo, ObjectInfo = objectInfo};
850
                    tuplesByPath[filePath] = hashTuple;
851
                    
852
                    if (objectInfo.UUID!=_emptyGuid)
853
                        tuplesByID[objectInfo.UUID] = hashTuple;
854
                }
855
            }
856
            Debug.Assert(tuplesByPath.Values.All(t => t.HashesValid()));
857
            return tuplesByPath.Values;
858
        }
859

    
860
        private void  UpdateMD5(StateTuple hashTuple)
861
        {
862
            
863
            try
864
            {
865
                var hash = Signature.MD5_EMPTY;
866
                if (hashTuple.FileInfo is FileInfo)
867
                {
868
                    var file = hashTuple.FileInfo as FileInfo;
869
                    var stateDate = hashTuple.NullSafe(h => h.FileState).NullSafe(s => s.LastWriteDate) ??
870
                                    DateTime.MinValue;
871
                    if (file.LastWriteTime - stateDate < TimeSpan.FromSeconds(1) &&
872
                        hashTuple.FileState.LastLength == file.Length)
873
                    {
874
                        hash = hashTuple.FileState.LastMD5;
875
                    }
876
                    else
877
                    {
878
                        //Modified, must calculate hash
879
                        hash = file.ComputeShortHash(StatusNotification);
880
                        StatusKeeper.UpdateLastMD5(file, hash);
881
                    }
882
                }
883
                hashTuple.C = hash;
884
                hashTuple.MD5 = hash;
885
            }
886
            catch (IOException)
887
            {
888
                hashTuple.Locked = true;
889
            }            
890
        }
891

    
892
        /// <summary>
893
        /// Returns the latest LastModified date from the list of objects, but only if it is before
894
        /// than the threshold value
895
        /// </summary>
896
        /// <param name="threshold"></param>
897
        /// <param name="cloudObjects"></param>
898
        /// <returns></returns>
899
        private static DateTime? GetLatestDateBefore(DateTime? threshold, IList<ObjectInfo> cloudObjects)
900
        {
901
            DateTime? maxDate = null;
902
            if (cloudObjects!=null &&  cloudObjects.Count > 0)
903
                maxDate = cloudObjects.Max(obj => obj.Last_Modified);
904
            if (maxDate == null || maxDate == DateTime.MinValue)
905
                return threshold;
906
            if (threshold == null || threshold == DateTime.MinValue || threshold > maxDate)
907
                return maxDate;
908
            return threshold;
909
        }
910

    
911
        /// <summary>
912
        /// Returns the latest LastModified date from the list of objects, but only if it is after
913
        /// the threshold value
914
        /// </summary>
915
        /// <param name="threshold"></param>
916
        /// <param name="cloudObjects"></param>
917
        /// <returns></returns>
918
        private static DateTime? GetLatestDateAfter(DateTime? threshold, IList<ObjectInfo> cloudObjects)
919
        {
920
            DateTime? maxDate = null;
921
            if (cloudObjects!=null &&  cloudObjects.Count > 0)
922
                maxDate = cloudObjects.Max(obj => obj.Last_Modified);
923
            if (maxDate == null || maxDate == DateTime.MinValue)
924
                return threshold;
925
            if (threshold == null || threshold == DateTime.MinValue || threshold < maxDate)
926
                return maxDate;
927
            return threshold;
928
        }
929

    
930
        readonly AccountsDifferencer _differencer = new AccountsDifferencer();
931
        private bool _pause;
932
        private readonly string _emptyGuid = Guid.Empty.ToString();
933

    
934

    
935

    
936
        private void ReportConflictForMismatch(string localFilePath)
937
        {
938
            if (String.IsNullOrWhiteSpace(localFilePath))
939
                throw new ArgumentNullException("localFilePath");
940
            Contract.EndContractBlock();
941

    
942
            StatusKeeper.SetFileState(localFilePath, FileStatus.Conflict, FileOverlayStatus.Conflict, "File changed at the server");
943
            UpdateStatus(PithosStatus.HasConflicts);
944
            var message = String.Format("Conflict detected for file {0}", localFilePath);
945
            Log.Warn(message);
946
            StatusNotification.NotifyChange(message, TraceLevel.Warning);
947
        }
948

    
949

    
950
        /// <summary>
951
        /// Notify the UI to update the visual status
952
        /// </summary>
953
        /// <param name="status"></param>
954
        private void UpdateStatus(PithosStatus status)
955
        {
956
            try
957
            {
958
                StatusNotification.SetPithosStatus(status);
959
                //StatusNotification.Notify(new Notification());
960
            }
961
            catch (Exception exc)
962
            {
963
                //Failure is not critical, just log it
964
                Log.Warn("Error while updating status", exc);
965
            }
966
        }
967

    
968
        private static void CreateContainerFolders(AccountInfo accountInfo, IEnumerable<ContainerInfo> containers)
969
        {
970
            var containerPaths = from container in containers
971
                                 let containerPath = Path.Combine(accountInfo.AccountPath, container.Name)
972
                                 where container.Name != FolderConstants.TrashContainer && !Directory.Exists(containerPath)
973
                                 select containerPath;
974

    
975
            foreach (var path in containerPaths)
976
            {
977
                Directory.CreateDirectory(path);
978
            }
979
        }
980

    
981
        public void AddAccount(AccountInfo accountInfo)
982
        {
983
            //Avoid adding a duplicate accountInfo
984
            _accounts.TryAdd(accountInfo.AccountKey, accountInfo);
985
        }
986

    
987
        public void RemoveAccount(AccountInfo accountInfo)
988
        {
989
            AccountInfo account;
990
            _accounts.TryRemove(accountInfo.AccountKey, out account);
991

    
992
            SnapshotDifferencer differencer;
993
            _differencer.Differencers.TryRemove(accountInfo.AccountKey, out differencer);
994
        }
995
       
996
    }
997
}