Statistics
| Branch: | Revision:

root / trunk / Pithos.Core / Agents / PollAgent.cs @ e4067290

History | View | Annotate | Download (49.4 kB)

1
#region
2
/* -----------------------------------------------------------------------
3
 * <copyright file="PollAgent.cs" company="GRNet">
4
 * 
5
 * Copyright 2011-2012 GRNET S.A. All rights reserved.
6
 *
7
 * Redistribution and use in source and binary forms, with or
8
 * without modification, are permitted provided that the following
9
 * conditions are met:
10
 *
11
 *   1. Redistributions of source code must retain the above
12
 *      copyright notice, this list of conditions and the following
13
 *      disclaimer.
14
 *
15
 *   2. Redistributions in binary form must reproduce the above
16
 *      copyright notice, this list of conditions and the following
17
 *      disclaimer in the documentation and/or other materials
18
 *      provided with the distribution.
19
 *
20
 *
21
 * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
22
 * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
23
 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
24
 * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
25
 * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26
 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27
 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
28
 * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
29
 * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
30
 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
31
 * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
32
 * POSSIBILITY OF SUCH DAMAGE.
33
 *
34
 * The views and conclusions contained in the software and
35
 * documentation are those of the authors and should not be
36
 * interpreted as representing official policies, either expressed
37
 * or implied, of GRNET S.A.
38
 * </copyright>
39
 * -----------------------------------------------------------------------
40
 */
41
#endregion
42

    
43
using System.Collections.Concurrent;
44
using System.ComponentModel.Composition;
45
using System.Diagnostics;
46
using System.Diagnostics.Contracts;
47
using System.IO;
48
using System.Linq.Expressions;
49
using System.Reflection;
50
using System.Security.Cryptography;
51
using System.Threading;
52
using System.Threading.Tasks;
53
using Castle.ActiveRecord;
54
using Pithos.Interfaces;
55
using Pithos.Network;
56
using log4net;
57

    
58
namespace Pithos.Core.Agents
59
{
60
    using System;
61
    using System.Collections.Generic;
62
    using System.Linq;
63

    
64
    [DebuggerDisplay("{FilePath} C:{C} L:{L} S:{S}")]
65
    public class StateTuple
66
    {
67
        public string FilePath { get; private set; }
68

    
69
        public string MD5 { get; set; }
70

    
71
        public string L
72
        {
73
            get { return FileState==null?null:FileState.Checksum; }
74
        }
75

    
76
        public string C { get; set; }
77

    
78
        public string S
79
        {
80
            get { return ObjectInfo == null ? null : ObjectInfo.X_Object_Hash; }
81
        }
82

    
83
        private FileSystemInfo _fileInfo;
84
        public FileSystemInfo FileInfo
85
        {
86
            get { return _fileInfo; }
87
            set
88
            {
89
                _fileInfo = value;
90
                FilePath = value.FullName;
91
            }
92
        }
93

    
94
        public FileState FileState { get; set; }
95
        public ObjectInfo ObjectInfo{ get; set; }
96

    
97
        public StateTuple() { }
98

    
99
        public StateTuple(FileSystemInfo info)
100
        {
101
            FileInfo = info;
102
        }
103

    
104

    
105
    }
106

    
107

    
108
    /// <summary>
109
    /// PollAgent periodically polls the server to detect object changes. The agent retrieves a listing of all
110
    /// objects and compares it with a previously cached version to detect differences. 
111
    /// New files are downloaded, missing files are deleted from the local file system and common files are compared
112
    /// to determine the appropriate action
113
    /// </summary>
114
    [Export]
115
    public class PollAgent
116
    {
117
        private static readonly ILog Log = LogManager.GetLogger(MethodBase.GetCurrentMethod().DeclaringType);
118

    
119
        [System.ComponentModel.Composition.Import]
120
        public IStatusKeeper StatusKeeper { get; set; }
121

    
122
        [System.ComponentModel.Composition.Import]
123
        public IPithosSettings Settings { get; set; }
124

    
125
        [System.ComponentModel.Composition.Import]
126
        public NetworkAgent NetworkAgent { get; set; }
127

    
128
        [System.ComponentModel.Composition.Import]
129
        public Selectives Selectives { get; set; }
130

    
131
        public IStatusNotification StatusNotification { get; set; }
132

    
133
        private CancellationTokenSource _currentOperationCancellation = new CancellationTokenSource();
134

    
135
        public void CancelCurrentOperation()
136
        {
137
            //What does it mean to cancel the current upload/download?
138
            //Obviously, the current operation will be cancelled by throwing
139
            //a cancellation exception.
140
            //
141
            //The default behavior is to retry any operations that throw.
142
            //Obviously this is not what we want in this situation.
143
            //The cancelled operation should NOT bea retried. 
144
            //
145
            //This can be done by catching the cancellation exception
146
            //and avoiding the retry.
147
            //
148

    
149
            //Have to reset the cancellation source - it is not possible to reset the source
150
            //Have to prevent a case where an operation requests a token from the old source
151
            var oldSource = Interlocked.Exchange(ref _currentOperationCancellation, new CancellationTokenSource());
152
            oldSource.Cancel();
153

    
154
        }
155

    
156
        public bool Pause
157
        {
158
            get {
159
                return _pause;
160
            }
161
            set {
162
                _pause = value;                
163
                if (!_pause)
164
                    _unPauseEvent.Set();
165
                else
166
                {
167
                    _unPauseEvent.Reset();
168
                }
169
            }
170
        }
171

    
172
        private bool _firstPoll = true;
173

    
174
        //The Sync Event signals a manual synchronisation
175
        private readonly AsyncManualResetEvent _syncEvent = new AsyncManualResetEvent();
176

    
177
        private readonly AsyncManualResetEvent _unPauseEvent = new AsyncManualResetEvent(true);
178

    
179
        private readonly ConcurrentDictionary<string, DateTime> _lastSeen = new ConcurrentDictionary<string, DateTime>();
180
        private readonly ConcurrentDictionary<Uri, AccountInfo> _accounts = new ConcurrentDictionary<Uri,AccountInfo>();
181

    
182

    
183
        /// <summary>
184
        /// Start a manual synchronization
185
        /// </summary>
186
        public void SynchNow()
187
        {            
188
            _syncEvent.Set();
189
        }
190

    
191

    
192
        /// <summary>
193
        /// Remote files are polled periodically. Any changes are processed
194
        /// </summary>
195
        /// <param name="since"></param>
196
        /// <returns></returns>
197
        public async Task PollRemoteFiles(DateTime? since = null)
198
        {
199
            if (Log.IsDebugEnabled)
200
                Log.DebugFormat("Polling changes after [{0}]",since);
201

    
202
            Debug.Assert(Thread.CurrentThread.IsBackground, "Polling Ended up in the main thread!");
203

    
204
            //GC.Collect();
205

    
206
            using (ThreadContext.Stacks["Retrieve Remote"].Push("All accounts"))
207
            {
208
                //If this poll fails, we will retry with the same since value
209
                var nextSince = since;
210
                try
211
                {
212
                    await _unPauseEvent.WaitAsync();
213
                    UpdateStatus(PithosStatus.PollSyncing);
214

    
215
                    var tasks = from accountInfo in _accounts.Values
216
                                select ProcessAccountFiles(accountInfo, since);
217

    
218
                    var nextTimes=await TaskEx.WhenAll(tasks.ToList());
219

    
220
                    _firstPoll = false;
221
                    //Reschedule the poll with the current timestamp as a "since" value
222

    
223
                    if (nextTimes.Length>0)
224
                        nextSince = nextTimes.Min();
225
                    if (Log.IsDebugEnabled)
226
                        Log.DebugFormat("Next Poll at [{0}]",nextSince);
227
                }
228
                catch (Exception ex)
229
                {
230
                    Log.ErrorFormat("Error while processing accounts\r\n{0}", ex);
231
                    //In case of failure retry with the same "since" value
232
                }
233

    
234
                UpdateStatus(PithosStatus.PollComplete);
235
                //The multiple try blocks are required because we can't have an await call
236
                //inside a finally block
237
                //TODO: Find a more elegant solution for reschedulling in the event of an exception
238
                try
239
                {
240
                    //Wait for the polling interval to pass or the Sync event to be signalled
241
                    nextSince = await WaitForScheduledOrManualPoll(nextSince);
242
                }
243
                finally
244
                {
245
                    //Ensure polling is scheduled even in case of error
246
                    TaskEx.Run(() => PollRemoteFiles(nextSince));                        
247
                }
248
            }
249
        }
250

    
251
        /// <summary>
252
        /// Wait for the polling period to expire or a manual sync request
253
        /// </summary>
254
        /// <param name="since"></param>
255
        /// <returns></returns>
256
        private async Task<DateTime?> WaitForScheduledOrManualPoll(DateTime? since)
257
        {
258
            var sync = _syncEvent.WaitAsync();
259
            var wait = TaskEx.Delay(TimeSpan.FromSeconds(Settings.PollingInterval), NetworkAgent.CancellationToken);
260
            
261
            var signaledTask = await TaskEx.WhenAny(sync, wait);
262
            
263
            //Pausing takes precedence over manual sync or awaiting
264
            _unPauseEvent.Wait();
265
            
266
            //Wait for network processing to finish before polling
267
            var pauseTask=NetworkAgent.ProceedEvent.WaitAsync();
268
            await TaskEx.WhenAll(signaledTask, pauseTask);
269

    
270
            //If polling is signalled by SynchNow, ignore the since tag
271
            if (sync.IsCompleted)
272
            {
273
                //TODO: Must convert to AutoReset
274
                _syncEvent.Reset();
275
                return null;
276
            }
277
            return since;
278
        }
279

    
280
        public async Task<DateTime?> ProcessAccountFiles(AccountInfo accountInfo, DateTime? since = null)
281
        {
282
            if (accountInfo == null)
283
                throw new ArgumentNullException("accountInfo");
284
            if (String.IsNullOrWhiteSpace(accountInfo.AccountPath))
285
                throw new ArgumentException("The AccountInfo.AccountPath is empty", "accountInfo");
286
            Contract.EndContractBlock();
287

    
288

    
289
            using (ThreadContext.Stacks["Retrieve Remote"].Push(accountInfo.UserName))
290
            {
291

    
292
                await NetworkAgent.GetDeleteAwaiter();
293

    
294
                Log.Info("Scheduled");
295
                var client = new CloudFilesClient(accountInfo);
296

    
297
                //We don't need to check the trash container
298
                var containers = client.ListContainers(accountInfo.UserName)
299
                    .Where(c=>c.Name!="trash")
300
                    .ToList();
301

    
302

    
303
                CreateContainerFolders(accountInfo, containers);
304

    
305
                //The nextSince time fallback time is the same as the current.
306
                //If polling succeeds, the next Since time will be the smallest of the maximum modification times
307
                //of the shared and account objects
308
                var nextSince = since;
309

    
310
                try
311
                {
312
                    //Wait for any deletions to finish
313
                    await NetworkAgent.GetDeleteAwaiter();
314
                    //Get the poll time now. We may miss some deletions but it's better to keep a file that was deleted
315
                    //than delete a file that was created while we were executing the poll                    
316

    
317
                    //Get the list of server objects changed since the last check
318
                    //The name of the container is passed as state in order to create a dictionary of tasks in a subsequent step
319
                    var listObjects = (from container in containers
320
                                       select Task<IList<ObjectInfo>>.Factory.StartNew(_ =>
321
                                             client.ListObjects(accountInfo.UserName, container.Name, since), container.Name)).ToList();
322

    
323
                    var listShared = Task<IList<ObjectInfo>>.Factory.StartNew(_ => 
324
                        client.ListSharedObjects(since), "shared");
325
                    listObjects.Add(listShared);
326
                    var listTasks = await Task.Factory.WhenAll(listObjects.ToArray());
327

    
328
                    using (ThreadContext.Stacks["SCHEDULE"].Push("Process Results"))
329
                    {
330
                        var dict = listTasks.ToDictionary(t => t.AsyncState);
331

    
332
                        //Get all non-trash objects. Remember, the container name is stored in AsyncState
333
                        var remoteObjects = (from objectList in listTasks
334
                                            where (string)objectList.AsyncState != "trash"
335
                                            from obj in objectList.Result
336
                                            orderby obj.Bytes ascending 
337
                                            select obj).ToList();
338
                        
339
                        //Get the latest remote object modification date, only if it is after
340
                        //the original since date
341
                        nextSince = GetLatestDateAfter(nextSince, remoteObjects);
342

    
343
                        var sharedObjects = dict["shared"].Result;
344
                        nextSince = GetLatestDateBefore(nextSince, sharedObjects);
345

    
346
                        //DON'T process trashed files
347
                        //If some files are deleted and added again to a folder, they will be deleted
348
                        //even though they are new.
349
                        //We would have to check file dates and hashes to ensure that a trashed file
350
                        //can be deleted safely from the local hard drive.
351
                        /*
352
                        //Items with the same name, hash may be both in the container and the trash
353
                        //Don't delete items that exist in the container
354
                        var realTrash = from trash in trashObjects
355
                                        where
356
                                            !remoteObjects.Any(
357
                                                info => info.Name == trash.Name && info.Hash == trash.Hash)
358
                                        select trash;
359
                        ProcessTrashedFiles(accountInfo, realTrash);
360
*/
361

    
362
                        var cleanRemotes = (from info in remoteObjects.Union(sharedObjects)
363
                                            let name = info.Name??""
364
                                            where !name.EndsWith(".ignore", StringComparison.InvariantCultureIgnoreCase) &&
365
                                                  !name.StartsWith(FolderConstants.CacheFolder + "/",
366
                                                                   StringComparison.InvariantCultureIgnoreCase)
367
                                            select info).ToList();
368

    
369
                        if (_firstPoll)
370
                            StatusKeeper.CleanupOrphanStates();
371
                        StatusKeeper.CleanupStaleStates(accountInfo, cleanRemotes);
372
                        
373
                        //var differencer = _differencer.PostSnapshot(accountInfo, cleanRemotes);
374

    
375
                        var filterUris = Selectives.SelectiveUris[accountInfo.AccountKey];
376

    
377

    
378
                        //Get the local files here                        
379
                        var agent = AgentLocator<FileAgent>.Get(accountInfo.AccountPath);
380

    
381
                        var files = LoadLocalFileTuples(accountInfo);
382

    
383
                        var states = FileState.Queryable.ToList();
384

    
385
                        var infos = (from remote in cleanRemotes
386
                                    let path = remote.RelativeUrlToFilePath(accountInfo.UserName)
387
                                    let info=agent.GetFileSystemInfo(path)
388
                                    select Tuple.Create(info.FullName,remote))
389
                                    .ToList();
390

    
391
                        var token = _currentOperationCancellation.Token;
392

    
393
                        var tuples = MergeSources(infos, files, states).ToList();
394

    
395
                        
396
                        foreach (var tuple in tuples)
397
                        {
398
                            await _unPauseEvent.WaitAsync();
399

    
400
                            //Set the Merkle Hash
401
                            SetMerkleHash(accountInfo, tuple);
402

    
403
                            SyncSingleItem(accountInfo, tuple, agent, token);
404

    
405
                        }
406

    
407

    
408
                        //On the first run
409
/*
410
                        if (_firstPoll)
411
                        {
412
                            MarkSuspectedDeletes(accountInfo, cleanRemotes);
413
                        }
414
*/
415

    
416

    
417
                        Log.Info("[LISTENER] End Processing");
418
                    }
419
                }
420
                catch (Exception ex)
421
                {
422
                    Log.ErrorFormat("[FAIL] ListObjects for{0} in ProcessRemoteFiles with {1}", accountInfo.UserName, ex);
423
                    return nextSince;
424
                }
425

    
426
                Log.Info("[LISTENER] Finished");
427
                return nextSince;
428
            }
429
        }
430

    
431
        private static void SetMerkleHash(AccountInfo accountInfo, StateTuple tuple)
432
        {
433
            //The Merkle hash for directories is that of an empty buffer
434
            if (tuple.FileInfo is DirectoryInfo)
435
                tuple.C = MERKLE_EMPTY;
436
            else if (tuple.FileState != null && tuple.MD5 == tuple.FileState.ShortHash)
437
            {
438
                //If there is a state whose MD5 matches, load the merkle hash fromthe file state
439
                //insteaf of calculating it
440
                tuple.C = tuple.FileState.Checksum;                              
441
            }
442
            else
443
            {
444
                tuple.C=Signature.CalculateTreeHash(tuple.FileInfo, accountInfo.BlockSize, accountInfo.BlockHash)
445
                                   .TopHash.ToHashString();
446
            }
447
        }
448

    
449
        private static List<Tuple<FileSystemInfo, string>> LoadLocalFileTuples(AccountInfo accountInfo)
450
        {
451
            using (ThreadContext.Stacks["Account Files Hashing"].Push(accountInfo.UserName))
452
            {
453

    
454
                var localInfos = AgentLocator<FileAgent>.Get(accountInfo.AccountPath).EnumerateFileSystemInfos();
455
                //Use the queue to retry locked file hashing
456
                var fileQueue = new Queue<FileSystemInfo>(localInfos);
457
                var hasher = MD5.Create();
458

    
459
                var results = new List<Tuple<FileSystemInfo, string>>();
460

    
461
                while (fileQueue.Count > 0)
462
                {
463
                    var file = fileQueue.Dequeue();
464
                    using (ThreadContext.Stacks["File"].Push(file.FullName))
465
                    {
466
                        /*
467
                                                Signature.CalculateTreeHash(file, accountInfo.BlockSize,
468
                                                                                                 accountInfo.BlockHash).
469
                                                                         TopHash.ToHashString()
470
                        */
471
                        try
472
                        {
473
                            //Replace MD5 here, do the calc while syncing individual files
474
                            string hash ;
475
                            if (file is DirectoryInfo)
476
                                hash = MERKLE_EMPTY;
477
                            else
478
                            {
479
                                using (var stream = (file as FileInfo).OpenRead())
480
                                {
481
                                    hash = hasher.ComputeHash(stream).ToHashString();
482
                                }
483
                            }                            
484
                            results.Add(Tuple.Create(file, hash));
485
                        }
486
                        catch (IOException exc)
487
                        {
488
                            Log.WarnFormat("[HASH] File in use, will retry [{0}]", exc);
489
                            fileQueue.Enqueue(file);
490
                        }
491
                    }
492
                }
493

    
494
                return results;
495
            }
496
        }
497

    
498
        private void SyncSingleItem(AccountInfo accountInfo, StateTuple tuple, FileAgent agent, CancellationToken token)
499
        {
500
            Log.DebugFormat("Sync [{0}] C:[{1}] L:[{2}] S:[{3}]",tuple.FilePath,tuple.C,tuple.L,tuple.S);
501

    
502
            var localFilePath = tuple.FilePath;
503
            //Don't use the tuple info, it may have been deleted
504
            var localInfo = FileInfoExtensions.FromPath(localFilePath);
505

    
506

    
507
            // Local file unchanged? If both C and L are null, make sure it's because 
508
            //both the file is missing and the state checksum is not missing
509
            if (tuple.C == tuple.L && (localInfo.Exists || tuple.FileState==null))
510
            {
511
                //No local changes
512
                //Server unchanged?
513
                if (tuple.S == tuple.L)
514
                {
515
                    // No server changes
516
                    //Has the file been renamed on the server?
517
                    MoveForServerMove(accountInfo, tuple);
518
                }
519
                else
520
                {
521
                    //Different from server
522
                    if (Selectives.IsSelected(accountInfo, localFilePath))
523
                    {
524
                        //Does the server file exist?
525
                        if (tuple.S == null)
526
                        {
527
                            //Server file doesn't exist
528
                            //deleteObjectFromLocal()
529
                            StatusKeeper.SetFileState(localFilePath, FileStatus.Deleted,
530
                                                      FileOverlayStatus.Deleted, "");
531
                            agent.Delete(localFilePath);
532
                            //updateRecord(Remove C, L)
533
                            StatusKeeper.ClearFileStatus(localFilePath);
534
                        }
535
                        else
536
                        {
537
                            //Server file exists
538
                            //downloadServerObject() // Result: L = S
539
                            //If the file has moved on the server, move it locally before downloading
540
                            var targetPath=MoveForServerMove(accountInfo,tuple);
541

    
542
                            StatusKeeper.SetFileState(targetPath, FileStatus.Modified,
543
                                                      FileOverlayStatus.Modified, "");
544
                            NetworkAgent.Downloader.DownloadCloudFile(accountInfo,
545
                                                                            tuple.ObjectInfo,
546
                                                                            targetPath, token).Wait(token);
547
                            //updateRecord( L = S )
548
                            StatusKeeper.UpdateFileChecksum(targetPath, tuple.ObjectInfo.ETag,
549
                                                            tuple.ObjectInfo.X_Object_Hash);
550

    
551
                            StatusKeeper.StoreInfo(targetPath, tuple.ObjectInfo);
552

    
553
/*
554
                            StatusKeeper.SetFileState(targetPath, FileStatus.Unchanged,
555
                                                      FileOverlayStatus.Normal, "");
556
*/
557
                        }
558
                    }
559
                }
560
            }
561
            else
562
            {
563
                //Local changes found
564

    
565
                //Server unchanged?
566
                if (tuple.S == tuple.L)
567
                {
568
                    //The FileAgent selective sync checks for new root folder files
569
                    if (!agent.Ignore(localFilePath))
570
                    {
571
                        if ((tuple.C == null || !localInfo.Exists) && tuple.ObjectInfo != null)
572
                        {
573
                            //deleteObjectFromServer()
574
                            DeleteCloudFile(accountInfo, tuple);
575
                            //updateRecord( Remove L, S)                  
576
                        }
577
                        else
578
                        {
579
                            //uploadLocalObject() // Result: S = C, L = S                        
580
                            var isUnselected = agent.IsUnselectedRootFolder(tuple.FilePath);
581

    
582
                            //Debug.Assert(tuple.FileState !=null);
583
                            var action = new CloudUploadAction(accountInfo, localInfo, tuple.FileState,
584
                                                               accountInfo.BlockSize, accountInfo.BlockHash,
585
                                                               "Poll", isUnselected);
586
                            NetworkAgent.Uploader.UploadCloudFile(action, token).Wait(token);
587

    
588
                            //updateRecord( S = C )
589
                            //State updated by the uploader
590
                            
591
                            if (isUnselected)
592
                            {
593
                                ProcessChildren(accountInfo, tuple, agent, token);
594
                            }
595
                        }
596
                    }
597
                }
598
                else
599
                {
600
                    if (Selectives.IsSelected(accountInfo, localFilePath))
601
                    {
602
                        if (tuple.C == tuple.S)
603
                        {
604
                            // (Identical Changes) Result: L = S
605
                            //doNothing()
606
                            //Detect server moves
607
                            var targetPath=MoveForServerMove(accountInfo, tuple);
608
                            StatusKeeper.StoreInfo(targetPath,tuple.ObjectInfo);
609
                        }
610
                        else
611
                        {
612
                            if ((tuple.C == null || !localInfo.Exists) && tuple.ObjectInfo != null )
613
                            {
614
                                //deleteObjectFromServer()
615
                                DeleteCloudFile(accountInfo, tuple);
616
                                //updateRecord(Remove L, S)                  
617
                            }
618
                            else
619
                            {
620
                                ReportConflictForMismatch(localFilePath);
621
                                //identifyAsConflict() // Manual action required
622
                            }
623
                        }
624
                    }
625
                }
626
            }
627
        }
628

    
629
        private string MoveForServerMove(AccountInfo accountInfo, StateTuple tuple)
630
        {
631
            var relativePath = tuple.ObjectInfo.RelativeUrlToFilePath(accountInfo.UserName);
632
            var serverPath = Path.Combine(accountInfo.AccountPath, relativePath);
633

    
634
            if (tuple.FilePath == serverPath) return serverPath;
635

    
636
            if (tuple.FileInfo.Exists)
637
            {                    
638
                var fi = tuple.FileInfo as FileInfo;
639
                if (fi != null)
640
                    fi.MoveTo(serverPath);
641
                var di = tuple.FileInfo as DirectoryInfo;
642
                if (di != null)
643
                    di.MoveTo(serverPath);
644
                StatusKeeper.StoreInfo(serverPath, tuple.ObjectInfo);
645
            }
646
            else
647
            {
648
                Debug.Assert(false, "File does not exist");
649
            }
650
            return serverPath;
651
        }
652

    
653
        private void DeleteCloudFile(AccountInfo accountInfo, StateTuple tuple)
654
        {
655
            StatusKeeper.SetFileState(tuple.FilePath, FileStatus.Deleted,
656
                                      FileOverlayStatus.Deleted, "");
657
            NetworkAgent.DeleteAgent.DeleteCloudFile(accountInfo, tuple.ObjectInfo);
658
            StatusKeeper.ClearFileStatus(tuple.FilePath);
659
        }
660

    
661
        private void ProcessChildren(AccountInfo accountInfo, StateTuple tuple, FileAgent agent, CancellationToken token)
662
        {
663

    
664
            var dirInfo = tuple.FileInfo as DirectoryInfo;
665
            var folderTuples = from folder in dirInfo.EnumerateDirectories("*", SearchOption.AllDirectories)
666
                               select new StateTuple(folder);
667
            var fileTuples = from file in dirInfo.EnumerateFiles("*", SearchOption.AllDirectories)
668
                             select new StateTuple(file);
669
            
670
            //Process folders first, to ensure folders appear on the sever as soon as possible
671
            folderTuples.ApplyAction(t => SyncSingleItem(accountInfo, t, agent, token));
672
            
673
            fileTuples.ApplyAction(t => SyncSingleItem(accountInfo, t, agent, token));
674
        }
675

    
676
        private static IEnumerable<StateTuple> MergeSources(
677
            IEnumerable<Tuple<string, ObjectInfo>> infos, 
678
            IEnumerable<Tuple<FileSystemInfo, string>> files, 
679
            IEnumerable<FileState> states)
680
        {
681
            var tuplesByPath = new Dictionary<string, StateTuple>();
682
            foreach (var file in files)
683
            {
684
                var fsInfo = file.Item1;
685
                var fileHash = fsInfo is DirectoryInfo? MERKLE_EMPTY:file.Item2;
686

    
687
                tuplesByPath[fsInfo.FullName] = new StateTuple {FileInfo = fsInfo, MD5 = fileHash};
688
            }
689
            foreach (var state in states)
690
            {
691
                StateTuple hashTuple;
692
                if (tuplesByPath.TryGetValue(state.FilePath, out hashTuple))
693
                {
694
                    hashTuple.FileState = state;
695
                }
696
                else
697
                {
698
                    var fsInfo = FileInfoExtensions.FromPath(state.FilePath);
699
                    tuplesByPath[state.FilePath] = new StateTuple {FileInfo = fsInfo, FileState = state};
700
                }
701
            }
702

    
703
            var tuplesByID = tuplesByPath.Values
704
                .Where(tuple => tuple.FileState != null && tuple.FileState.ObjectID!=null)
705
                .ToDictionary(tuple=>tuple.FileState.ObjectID,tuple=>tuple);//new Dictionary<Guid, StateTuple>();
706

    
707
            foreach (var info in infos)
708
            {
709
                StateTuple hashTuple;
710
                var filePath = info.Item1;
711
                var objectInfo = info.Item2;
712
                var objectID = objectInfo.UUID;
713

    
714
                if (tuplesByID.TryGetValue(objectID, out hashTuple))
715
                {
716
                    hashTuple.ObjectInfo = objectInfo;                    
717
                }
718
                else if (tuplesByPath.TryGetValue(filePath, out hashTuple))
719
                {
720
                    hashTuple.ObjectInfo = objectInfo;
721
                }
722
                else
723
                {
724
                    var fsInfo = FileInfoExtensions.FromPath(filePath);
725
                    var tuple = new StateTuple {FileInfo = fsInfo, ObjectInfo = objectInfo};
726
                    tuplesByPath[filePath] = tuple;
727
                    tuplesByID[objectInfo.UUID] = tuple;
728
                }
729
            }
730
            return tuplesByPath.Values;
731
        }
732

    
733
        /// <summary>
734
        /// Returns the latest LastModified date from the list of objects, but only if it is before
735
        /// than the threshold value
736
        /// </summary>
737
        /// <param name="threshold"></param>
738
        /// <param name="cloudObjects"></param>
739
        /// <returns></returns>
740
        private static DateTime? GetLatestDateBefore(DateTime? threshold, IList<ObjectInfo> cloudObjects)
741
        {
742
            DateTime? maxDate = null;
743
            if (cloudObjects!=null &&  cloudObjects.Count > 0)
744
                maxDate = cloudObjects.Max(obj => obj.Last_Modified);
745
            if (maxDate == null || maxDate == DateTime.MinValue)
746
                return threshold;
747
            if (threshold == null || threshold == DateTime.MinValue || threshold > maxDate)
748
                return maxDate;
749
            return threshold;
750
        }
751

    
752
        /// <summary>
753
        /// Returns the latest LastModified date from the list of objects, but only if it is after
754
        /// the threshold value
755
        /// </summary>
756
        /// <param name="threshold"></param>
757
        /// <param name="cloudObjects"></param>
758
        /// <returns></returns>
759
        private static DateTime? GetLatestDateAfter(DateTime? threshold, IList<ObjectInfo> cloudObjects)
760
        {
761
            DateTime? maxDate = null;
762
            if (cloudObjects!=null &&  cloudObjects.Count > 0)
763
                maxDate = cloudObjects.Max(obj => obj.Last_Modified);
764
            if (maxDate == null || maxDate == DateTime.MinValue)
765
                return threshold;
766
            if (threshold == null || threshold == DateTime.MinValue || threshold < maxDate)
767
                return maxDate;
768
            return threshold;
769
        }
770

    
771
        //readonly AccountsDifferencer _differencer = new AccountsDifferencer();
772
        private Dictionary<Uri, List<Uri>> _selectiveUris = new Dictionary<Uri, List<Uri>>();
773
        private bool _pause;
774
        private static string MERKLE_EMPTY = "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855";
775

    
776
        /// <summary>
777
        /// Deletes local files that are not found in the list of cloud files
778
        /// </summary>
779
        /// <param name="accountInfo"></param>
780
        /// <param name="cloudFiles"></param>
781
        private void ProcessDeletedFiles(AccountInfo accountInfo, IEnumerable<ObjectInfo> cloudFiles)
782
        {
783
            if (accountInfo == null)
784
                throw new ArgumentNullException("accountInfo");
785
            if (String.IsNullOrWhiteSpace(accountInfo.AccountPath))
786
                throw new ArgumentException("The AccountInfo.AccountPath is empty", "accountInfo");
787
            if (cloudFiles == null)
788
                throw new ArgumentNullException("cloudFiles");
789
            Contract.EndContractBlock();
790

    
791
            var deletedFiles = new List<FileSystemInfo>();
792
            foreach (var objectInfo in cloudFiles)
793
            {
794
                if (Log.IsDebugEnabled)
795
                    Log.DebugFormat("Handle deleted [{0}]", objectInfo.Uri);
796
                var relativePath = objectInfo.RelativeUrlToFilePath(accountInfo.UserName);
797
                var item = FileAgent.GetFileAgent(accountInfo).GetFileSystemInfo(relativePath);
798
                if (Log.IsDebugEnabled)
799
                    Log.DebugFormat("Will delete [{0}] for [{1}]", item.FullName, objectInfo.Uri);
800
                if (item.Exists)
801
                {
802
                    if ((item.Attributes & FileAttributes.ReadOnly) == FileAttributes.ReadOnly)
803
                    {
804
                        item.Attributes = item.Attributes & ~FileAttributes.ReadOnly;
805

    
806
                    }
807

    
808

    
809
                    Log.DebugFormat("Deleting {0}", item.FullName);
810

    
811
                    var directory = item as DirectoryInfo;
812
                    if (directory != null)
813
                        directory.Delete(true);
814
                    else
815
                        item.Delete();
816
                    Log.DebugFormat("Deleted [{0}] for [{1}]", item.FullName, objectInfo.Uri);
817
                    DateTime lastDate;
818
                    _lastSeen.TryRemove(item.FullName, out lastDate);
819
                    deletedFiles.Add(item);
820
                }
821
                StatusKeeper.SetFileState(item.FullName, FileStatus.Deleted, FileOverlayStatus.Deleted, "File Deleted");
822
            }
823
            Log.InfoFormat("[{0}] files were deleted", deletedFiles.Count);
824
            StatusNotification.NotifyForFiles(deletedFiles, String.Format("{0} files were deleted", deletedFiles.Count),
825
                                              TraceLevel.Info);
826

    
827
        }
828

    
829
        private void MarkSuspectedDeletes(AccountInfo accountInfo, IEnumerable<ObjectInfo> cloudFiles)
830
        {
831
//Only consider files that are not being modified, ie they are in the Unchanged state            
832
            var deleteCandidates = FileState.Queryable.Where(state =>
833
                                                             state.FilePath.StartsWith(accountInfo.AccountPath)
834
                                                             && state.FileStatus == FileStatus.Unchanged).ToList();
835

    
836

    
837
            //TODO: filesToDelete must take into account the Others container            
838
            var filesToDelete = (from deleteCandidate in deleteCandidates
839
                                 let localFile = FileInfoExtensions.FromPath(deleteCandidate.FilePath)
840
                                 let relativeFilePath = localFile.AsRelativeTo(accountInfo.AccountPath)
841
                                 where
842
                                     !cloudFiles.Any(r => r.RelativeUrlToFilePath(accountInfo.UserName) == relativeFilePath)
843
                                 select localFile).ToList();
844

    
845

    
846
            //Set the status of missing files to Conflict
847
            foreach (var item in filesToDelete)
848
            {
849
                //Try to acquire a gate on the file, to take into account files that have been dequeued
850
                //and are being processed
851
                using (var gate = NetworkGate.Acquire(item.FullName, NetworkOperation.Deleting))
852
                {
853
                    if (gate.Failed)
854
                        continue;
855
                    StatusKeeper.SetFileState(item.FullName, FileStatus.Conflict, FileOverlayStatus.Deleted,
856
                                              "Local file missing from server");
857
                }
858
            }
859
            UpdateStatus(PithosStatus.HasConflicts);
860
            StatusNotification.NotifyConflicts(filesToDelete,
861
                                               String.Format(
862
                                                   "{0} local files are missing from Pithos, possibly because they were deleted",
863
                                                   filesToDelete.Count));
864
            StatusNotification.NotifyForFiles(filesToDelete, String.Format("{0} files were deleted", filesToDelete.Count),
865
                                              TraceLevel.Info);
866
        }
867

    
868
        private void ReportConflictForMismatch(string localFilePath)
869
        {
870
            if (String.IsNullOrWhiteSpace(localFilePath))
871
                throw new ArgumentNullException("localFilePath");
872
            Contract.EndContractBlock();
873

    
874
            StatusKeeper.SetFileState(localFilePath, FileStatus.Conflict, FileOverlayStatus.Conflict, "File changed at the server");
875
            UpdateStatus(PithosStatus.HasConflicts);
876
            var message = String.Format("Conflict detected for file {0}", localFilePath);
877
            Log.Warn(message);
878
            StatusNotification.NotifyChange(message, TraceLevel.Warning);
879
        }
880

    
881

    
882

    
883
        /// <summary>
884
        /// Creates a Sync action for each changed server file
885
        /// </summary>
886
        /// <param name="accountInfo"></param>
887
        /// <param name="changes"></param>
888
        /// <returns></returns>
889
        private IEnumerable<CloudAction> ChangesToActions(AccountInfo accountInfo, IEnumerable<ObjectInfo> changes)
890
        {
891
            if (changes == null)
892
                throw new ArgumentNullException();
893
            Contract.EndContractBlock();
894
            var fileAgent = FileAgent.GetFileAgent(accountInfo);
895

    
896
            //In order to avoid multiple iterations over the files, we iterate only once
897
            //over the remote files
898
            foreach (var objectInfo in changes)
899
            {
900
                var relativePath = objectInfo.RelativeUrlToFilePath(accountInfo.UserName);
901
                //If a directory object already exists, we may need to sync it
902
                if (fileAgent.Exists(relativePath))
903
                {
904
                    var localFile = fileAgent.GetFileSystemInfo(relativePath);
905
                    //We don't need to sync directories
906
                    if (objectInfo.IsDirectory && localFile is DirectoryInfo)
907
                        continue;
908
                    using (new SessionScope(FlushAction.Never))
909
                    {
910
                        var state = StatusKeeper.GetStateByFilePath(localFile.FullName);
911
                        _lastSeen[localFile.FullName] = DateTime.Now;
912
                        //Common files should be checked on a per-case basis to detect differences, which is newer
913

    
914
                        yield return new CloudAction(accountInfo, CloudActionType.MustSynch,
915
                                                     localFile, objectInfo, state, accountInfo.BlockSize,
916
                                                     accountInfo.BlockHash,"Poll Changes");
917
                    }
918
                }
919
                else
920
                {
921
                    //Remote files should be downloaded
922
                    yield return new CloudDownloadAction(accountInfo, objectInfo,"Poll Changes");
923
                }
924
            }
925
        }
926

    
927
        /// <summary>
928
        /// Creates a Local Move action for each moved server file
929
        /// </summary>
930
        /// <param name="accountInfo"></param>
931
        /// <param name="moves"></param>
932
        /// <returns></returns>
933
        private IEnumerable<CloudAction> MovesToActions(AccountInfo accountInfo, IEnumerable<ObjectInfo> moves)
934
        {
935
            if (moves == null)
936
                throw new ArgumentNullException();
937
            Contract.EndContractBlock();
938
            var fileAgent = FileAgent.GetFileAgent(accountInfo);
939

    
940
            //In order to avoid multiple iterations over the files, we iterate only once
941
            //over the remote files
942
            foreach (var objectInfo in moves)
943
            {
944
                var previousRelativepath = objectInfo.Previous.RelativeUrlToFilePath(accountInfo.UserName);
945
                //If the previous file already exists, we can execute a Move operation
946
                if (fileAgent.Exists(previousRelativepath))
947
                {
948
                    var previousFile = fileAgent.GetFileSystemInfo(previousRelativepath);
949
                    using (new SessionScope(FlushAction.Never))
950
                    {
951
                        var state = StatusKeeper.GetStateByFilePath(previousFile.FullName);
952
                        _lastSeen[previousFile.FullName] = DateTime.Now;
953

    
954
                        //For each moved object we need to move both the local file and update                                                
955
                        yield return new CloudAction(accountInfo, CloudActionType.RenameLocal,
956
                                                     previousFile, objectInfo, state, accountInfo.BlockSize,
957
                                                     accountInfo.BlockHash,"Poll Moves");
958
                        //For modified files, we need to download the changes as well
959
                        if (objectInfo.X_Object_Hash != objectInfo.PreviousHash)
960
                            yield return new CloudDownloadAction(accountInfo,objectInfo, "Poll Moves");
961
                    }
962
                }
963
                //If the previous file does not exist, we need to download it in the new location
964
                else
965
                {
966
                    //Remote files should be downloaded
967
                    yield return new CloudDownloadAction(accountInfo, objectInfo, "Poll Moves");
968
                }
969
            }
970
        }
971

    
972

    
973
        /// <summary>
974
        /// Creates a download action for each new server file
975
        /// </summary>
976
        /// <param name="accountInfo"></param>
977
        /// <param name="creates"></param>
978
        /// <returns></returns>
979
        private IEnumerable<CloudAction> CreatesToActions(AccountInfo accountInfo, IEnumerable<ObjectInfo> creates)
980
        {
981
            if (creates == null)
982
                throw new ArgumentNullException();
983
            Contract.EndContractBlock();
984
            var fileAgent = FileAgent.GetFileAgent(accountInfo);
985

    
986
            //In order to avoid multiple iterations over the files, we iterate only once
987
            //over the remote files
988
            foreach (var objectInfo in creates)
989
            {
990
                if (Log.IsDebugEnabled)
991
                    Log.DebugFormat("[NEW INFO] {0}",objectInfo.Uri);
992

    
993
                var relativePath = objectInfo.RelativeUrlToFilePath(accountInfo.UserName);
994

    
995
                //If the object already exists, we should check before uploading or downloading
996
                if (fileAgent.Exists(relativePath))
997
                {
998
                    var localFile= fileAgent.GetFileSystemInfo(relativePath);
999
                    var state = StatusKeeper.GetStateByFilePath(localFile.WithProperCapitalization().FullName);
1000
                    yield return new CloudAction(accountInfo, CloudActionType.MustSynch,
1001
                                                     localFile, objectInfo, state, accountInfo.BlockSize,
1002
                                                     accountInfo.BlockHash,"Poll Creates");                    
1003
                }
1004
                else
1005
                {
1006
                    //Remote files should be downloaded
1007
                    yield return new CloudDownloadAction(accountInfo, objectInfo,"Poll Creates");
1008
                }
1009

    
1010
            }
1011
        }
1012

    
1013
        /// <summary>
1014
        /// Notify the UI to update the visual status
1015
        /// </summary>
1016
        /// <param name="status"></param>
1017
        private void UpdateStatus(PithosStatus status)
1018
        {
1019
            try
1020
            {
1021
                StatusNotification.SetPithosStatus(status);
1022
                //StatusNotification.Notify(new Notification());
1023
            }
1024
            catch (Exception exc)
1025
            {
1026
                //Failure is not critical, just log it
1027
                Log.Warn("Error while updating status", exc);
1028
            }
1029
        }
1030

    
1031
        private static void CreateContainerFolders(AccountInfo accountInfo, IEnumerable<ContainerInfo> containers)
1032
        {
1033
            var containerPaths = from container in containers
1034
                                 let containerPath = Path.Combine(accountInfo.AccountPath, container.Name)
1035
                                 where container.Name != FolderConstants.TrashContainer && !Directory.Exists(containerPath)
1036
                                 select containerPath;
1037

    
1038
            foreach (var path in containerPaths)
1039
            {
1040
                Directory.CreateDirectory(path);
1041
            }
1042
        }
1043

    
1044
        public void AddAccount(AccountInfo accountInfo)
1045
        {
1046
            //Avoid adding a duplicate accountInfo
1047
            _accounts.TryAdd(accountInfo.AccountKey, accountInfo);
1048
        }
1049

    
1050
        public void RemoveAccount(AccountInfo accountInfo)
1051
        {
1052
            AccountInfo account;
1053
            _accounts.TryRemove(accountInfo.AccountKey, out account);
1054
/*
1055
            SnapshotDifferencer differencer;
1056
            _differencer.Differencers.TryRemove(accountInfo.AccountKey, out differencer);
1057
*/
1058
        }
1059

    
1060
        public void SetSelectivePaths(AccountInfo accountInfo,Uri[] added, Uri[] removed)
1061
        {
1062
            AbortRemovedPaths(accountInfo,removed);
1063
            DownloadNewPaths(accountInfo,added);
1064
        }
1065

    
1066
        private void DownloadNewPaths(AccountInfo accountInfo, Uri[] added)
1067
        {
1068
            var client = new CloudFilesClient(accountInfo);
1069
            foreach (var folderUri in added)
1070
            {
1071
                try
1072
                {
1073

    
1074
                    string account;
1075
                    string container;
1076
                    var segmentsCount = folderUri.Segments.Length;
1077
                    //Is this an account URL?
1078
                    if (segmentsCount < 3)
1079
                        continue;
1080
                    //Is this a container or  folder URL?
1081
                    if (segmentsCount == 3)
1082
                    {
1083
                        account = folderUri.Segments[1].TrimEnd('/');
1084
                        container = folderUri.Segments[2].TrimEnd('/');
1085
                    }
1086
                    else
1087
                    {
1088
                        account = folderUri.Segments[2].TrimEnd('/');
1089
                        container = folderUri.Segments[3].TrimEnd('/');
1090
                    }
1091
                    IList<ObjectInfo> items;
1092
                    if (segmentsCount > 3)
1093
                    {
1094
                        //List folder
1095
                        var folder = String.Join("", folderUri.Segments.Splice(4));
1096
                        items = client.ListObjects(account, container, folder);
1097
                    }
1098
                    else
1099
                    {
1100
                        //List container
1101
                        items = client.ListObjects(account, container);
1102
                    }
1103
                    var actions = CreatesToActions(accountInfo, items);
1104
                    foreach (var action in actions)
1105
                    {
1106
                        NetworkAgent.Post(action);
1107
                    }
1108
                }
1109
                catch (Exception exc)
1110
                {
1111
                    Log.WarnFormat("Listing of new selective path [{0}] failed with \r\n{1}", folderUri, exc);
1112
                }
1113
            }
1114

    
1115
            //Need to get a listing of each of the URLs, then post them to the NetworkAgent
1116
            //CreatesToActions(accountInfo,)
1117

    
1118
/*            NetworkAgent.Post();*/
1119
        }
1120

    
1121
        private void AbortRemovedPaths(AccountInfo accountInfo, Uri[] removed)
1122
        {
1123
            /*this.NetworkAgent.*/
1124
        }
1125
    }
1126
}