Statistics
| Branch: | Revision:

root / trunk / Pithos.Core / Agents / PollAgent.cs @ 1cc1e8c5

History | View | Annotate | Download (47.7 kB)

1
#region
2
/* -----------------------------------------------------------------------
3
 * <copyright file="PollAgent.cs" company="GRNet">
4
 * 
5
 * Copyright 2011-2012 GRNET S.A. All rights reserved.
6
 *
7
 * Redistribution and use in source and binary forms, with or
8
 * without modification, are permitted provided that the following
9
 * conditions are met:
10
 *
11
 *   1. Redistributions of source code must retain the above
12
 *      copyright notice, this list of conditions and the following
13
 *      disclaimer.
14
 *
15
 *   2. Redistributions in binary form must reproduce the above
16
 *      copyright notice, this list of conditions and the following
17
 *      disclaimer in the documentation and/or other materials
18
 *      provided with the distribution.
19
 *
20
 *
21
 * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
22
 * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
23
 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
24
 * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
25
 * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26
 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27
 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
28
 * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
29
 * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
30
 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
31
 * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
32
 * POSSIBILITY OF SUCH DAMAGE.
33
 *
34
 * The views and conclusions contained in the software and
35
 * documentation are those of the authors and should not be
36
 * interpreted as representing official policies, either expressed
37
 * or implied, of GRNET S.A.
38
 * </copyright>
39
 * -----------------------------------------------------------------------
40
 */
41
#endregion
42

    
43
using System.Collections.Concurrent;
44
using System.ComponentModel.Composition;
45
using System.Diagnostics;
46
using System.Diagnostics.Contracts;
47
using System.IO;
48
using System.Linq.Expressions;
49
using System.Reflection;
50
using System.Security.Cryptography;
51
using System.Threading;
52
using System.Threading.Tasks;
53
using Castle.ActiveRecord;
54
using Pithos.Interfaces;
55
using Pithos.Network;
56
using log4net;
57

    
58
namespace Pithos.Core.Agents
59
{
60
    using System;
61
    using System.Collections.Generic;
62
    using System.Linq;
63

    
64
    [DebuggerDisplay("{FilePath} C:{C} L:{L} S:{S}")]
65
    public class StateTuple
66
    {
67
        public string FilePath { get; private set; }
68

    
69
        public string MD5 { get; set; }
70

    
71
        public string L
72
        {
73
            get { return FileState==null?null:FileState.Checksum; }
74
        }
75

    
76
        public string C { get; set; }
77

    
78
        public string S
79
        {
80
            get { return ObjectInfo == null ? null : ObjectInfo.X_Object_Hash; }
81
        }
82

    
83
        private FileSystemInfo _fileInfo;
84
        public FileSystemInfo FileInfo
85
        {
86
            get { return _fileInfo; }
87
            set
88
            {
89
                _fileInfo = value;
90
                FilePath = value.FullName;
91
            }
92
        }
93

    
94
        public FileState FileState { get; set; }
95
        public ObjectInfo ObjectInfo{ get; set; }
96

    
97
        public StateTuple() { }
98

    
99
        public StateTuple(FileSystemInfo info)
100
        {
101
            FileInfo = info;
102
        }
103

    
104

    
105
    }
106

    
107

    
108
    /// <summary>
109
    /// PollAgent periodically polls the server to detect object changes. The agent retrieves a listing of all
110
    /// objects and compares it with a previously cached version to detect differences. 
111
    /// New files are downloaded, missing files are deleted from the local file system and common files are compared
112
    /// to determine the appropriate action
113
    /// </summary>
114
    [Export]
115
    public class PollAgent
116
    {
117
        private static readonly ILog Log = LogManager.GetLogger(MethodBase.GetCurrentMethod().DeclaringType);
118

    
119
        [System.ComponentModel.Composition.Import]
120
        public IStatusKeeper StatusKeeper { get; set; }
121

    
122
        [System.ComponentModel.Composition.Import]
123
        public IPithosSettings Settings { get; set; }
124

    
125
        [System.ComponentModel.Composition.Import]
126
        public NetworkAgent NetworkAgent { get; set; }
127

    
128
        [System.ComponentModel.Composition.Import]
129
        public Selectives Selectives { get; set; }
130

    
131
        public IStatusNotification StatusNotification { get; set; }
132

    
133
        private CancellationTokenSource _currentOperationCancellation = new CancellationTokenSource();
134

    
135
        public void CancelCurrentOperation()
136
        {
137
            //What does it mean to cancel the current upload/download?
138
            //Obviously, the current operation will be cancelled by throwing
139
            //a cancellation exception.
140
            //
141
            //The default behavior is to retry any operations that throw.
142
            //Obviously this is not what we want in this situation.
143
            //The cancelled operation should NOT bea retried. 
144
            //
145
            //This can be done by catching the cancellation exception
146
            //and avoiding the retry.
147
            //
148

    
149
            //Have to reset the cancellation source - it is not possible to reset the source
150
            //Have to prevent a case where an operation requests a token from the old source
151
            var oldSource = Interlocked.Exchange(ref _currentOperationCancellation, new CancellationTokenSource());
152
            oldSource.Cancel();
153

    
154
        }
155

    
156
        public bool Pause
157
        {
158
            get {
159
                return _pause;
160
            }
161
            set {
162
                _pause = value;                
163
                if (!_pause)
164
                    _unPauseEvent.Set();
165
                else
166
                {
167
                    _unPauseEvent.Reset();
168
                }
169
            }
170
        }
171

    
172
        private bool _firstPoll = true;
173

    
174
        //The Sync Event signals a manual synchronisation
175
        private readonly AsyncManualResetEvent _syncEvent = new AsyncManualResetEvent();
176

    
177
        private readonly AsyncManualResetEvent _unPauseEvent = new AsyncManualResetEvent(true);
178

    
179
        private readonly ConcurrentDictionary<string, DateTime> _lastSeen = new ConcurrentDictionary<string, DateTime>();
180
        private readonly ConcurrentDictionary<Uri, AccountInfo> _accounts = new ConcurrentDictionary<Uri,AccountInfo>();
181

    
182

    
183
        /// <summary>
184
        /// Start a manual synchronization
185
        /// </summary>
186
        public void SynchNow()
187
        {            
188
            _syncEvent.Set();
189
        }
190

    
191

    
192
        /// <summary>
193
        /// Remote files are polled periodically. Any changes are processed
194
        /// </summary>
195
        /// <param name="since"></param>
196
        /// <returns></returns>
197
        public async Task PollRemoteFiles(DateTime? since = null)
198
        {
199
            if (Log.IsDebugEnabled)
200
                Log.DebugFormat("Polling changes after [{0}]",since);
201

    
202
            Debug.Assert(Thread.CurrentThread.IsBackground, "Polling Ended up in the main thread!");
203

    
204
            //GC.Collect();
205

    
206
            using (ThreadContext.Stacks["Retrieve Remote"].Push("All accounts"))
207
            {
208
                //If this poll fails, we will retry with the same since value
209
                var nextSince = since;
210
                try
211
                {
212
                    await _unPauseEvent.WaitAsync();
213
                    UpdateStatus(PithosStatus.PollSyncing);
214

    
215
                    var tasks = from accountInfo in _accounts.Values
216
                                select ProcessAccountFiles(accountInfo, since);
217

    
218
                    var nextTimes=await TaskEx.WhenAll(tasks.ToList());
219

    
220
                    _firstPoll = false;
221
                    //Reschedule the poll with the current timestamp as a "since" value
222

    
223
                    if (nextTimes.Length>0)
224
                        nextSince = nextTimes.Min();
225
                    if (Log.IsDebugEnabled)
226
                        Log.DebugFormat("Next Poll at [{0}]",nextSince);
227
                }
228
                catch (Exception ex)
229
                {
230
                    Log.ErrorFormat("Error while processing accounts\r\n{0}", ex);
231
                    //In case of failure retry with the same "since" value
232
                }
233

    
234
                UpdateStatus(PithosStatus.PollComplete);
235
                //The multiple try blocks are required because we can't have an await call
236
                //inside a finally block
237
                //TODO: Find a more elegant solution for reschedulling in the event of an exception
238
                try
239
                {
240
                    //Wait for the polling interval to pass or the Sync event to be signalled
241
                    nextSince = await WaitForScheduledOrManualPoll(nextSince);
242
                }
243
                finally
244
                {
245
                    //Ensure polling is scheduled even in case of error
246
                    TaskEx.Run(() => PollRemoteFiles(nextSince));                        
247
                }
248
            }
249
        }
250

    
251
        /// <summary>
252
        /// Wait for the polling period to expire or a manual sync request
253
        /// </summary>
254
        /// <param name="since"></param>
255
        /// <returns></returns>
256
        private async Task<DateTime?> WaitForScheduledOrManualPoll(DateTime? since)
257
        {
258
            var sync = _syncEvent.WaitAsync();
259
            var wait = TaskEx.Delay(TimeSpan.FromSeconds(Settings.PollingInterval), NetworkAgent.CancellationToken);
260
            
261
            var signaledTask = await TaskEx.WhenAny(sync, wait);
262
            
263
            //Pausing takes precedence over manual sync or awaiting
264
            _unPauseEvent.Wait();
265
            
266
            //Wait for network processing to finish before polling
267
            var pauseTask=NetworkAgent.ProceedEvent.WaitAsync();
268
            await TaskEx.WhenAll(signaledTask, pauseTask);
269

    
270
            //If polling is signalled by SynchNow, ignore the since tag
271
            if (sync.IsCompleted)
272
            {
273
                //TODO: Must convert to AutoReset
274
                _syncEvent.Reset();
275
                return null;
276
            }
277
            return since;
278
        }
279

    
280
        public async Task<DateTime?> ProcessAccountFiles(AccountInfo accountInfo, DateTime? since = null)
281
        {
282
            if (accountInfo == null)
283
                throw new ArgumentNullException("accountInfo");
284
            if (String.IsNullOrWhiteSpace(accountInfo.AccountPath))
285
                throw new ArgumentException("The AccountInfo.AccountPath is empty", "accountInfo");
286
            Contract.EndContractBlock();
287

    
288

    
289
            using (ThreadContext.Stacks["Retrieve Remote"].Push(accountInfo.UserName))
290
            {
291

    
292
                await NetworkAgent.GetDeleteAwaiter();
293

    
294
                Log.Info("Scheduled");
295
                var client = new CloudFilesClient(accountInfo);
296

    
297
                //We don't need to check the trash container
298
                var containers = client.ListContainers(accountInfo.UserName)
299
                    .Where(c=>c.Name!="trash")
300
                    .ToList();
301

    
302

    
303
                CreateContainerFolders(accountInfo, containers);
304

    
305
                //The nextSince time fallback time is the same as the current.
306
                //If polling succeeds, the next Since time will be the smallest of the maximum modification times
307
                //of the shared and account objects
308
                var nextSince = since;
309

    
310
                try
311
                {
312
                    //Wait for any deletions to finish
313
                    await NetworkAgent.GetDeleteAwaiter();
314
                    //Get the poll time now. We may miss some deletions but it's better to keep a file that was deleted
315
                    //than delete a file that was created while we were executing the poll                    
316

    
317
                    //Get the list of server objects changed since the last check
318
                    //The name of the container is passed as state in order to create a dictionary of tasks in a subsequent step
319
                    var listObjects = (from container in containers
320
                                       select Task<IList<ObjectInfo>>.Factory.StartNew(_ =>
321
                                             client.ListObjects(accountInfo.UserName, container.Name, since), container.Name)).ToList();
322

    
323
                    var listShared = Task<IList<ObjectInfo>>.Factory.StartNew(_ => 
324
                        client.ListSharedObjects(since), "shared");
325
                    listObjects.Add(listShared);
326
                    var listTasks = await Task.Factory.WhenAll(listObjects.ToArray());
327

    
328
                    using (ThreadContext.Stacks["SCHEDULE"].Push("Process Results"))
329
                    {
330
                        var dict = listTasks.ToDictionary(t => t.AsyncState);
331

    
332
                        //Get all non-trash objects. Remember, the container name is stored in AsyncState
333
                        var remoteObjects = (from objectList in listTasks
334
                                            where (string)objectList.AsyncState != "trash"
335
                                            from obj in objectList.Result
336
                                            orderby obj.Bytes ascending 
337
                                            select obj).ToList();
338
                        
339
                        //Get the latest remote object modification date, only if it is after
340
                        //the original since date
341
                        nextSince = GetLatestDateAfter(nextSince, remoteObjects);
342

    
343
                        var sharedObjects = dict["shared"].Result;
344
                        nextSince = GetLatestDateBefore(nextSince, sharedObjects);
345

    
346
                        //DON'T process trashed files
347
                        //If some files are deleted and added again to a folder, they will be deleted
348
                        //even though they are new.
349
                        //We would have to check file dates and hashes to ensure that a trashed file
350
                        //can be deleted safely from the local hard drive.
351
                        /*
352
                        //Items with the same name, hash may be both in the container and the trash
353
                        //Don't delete items that exist in the container
354
                        var realTrash = from trash in trashObjects
355
                                        where
356
                                            !remoteObjects.Any(
357
                                                info => info.Name == trash.Name && info.Hash == trash.Hash)
358
                                        select trash;
359
                        ProcessTrashedFiles(accountInfo, realTrash);
360
*/
361

    
362
                        var cleanRemotes = (from info in remoteObjects.Union(sharedObjects)
363
                                            let name = info.Name??""
364
                                            where !name.EndsWith(".ignore", StringComparison.InvariantCultureIgnoreCase) &&
365
                                                  !name.StartsWith(FolderConstants.CacheFolder + "/",
366
                                                                   StringComparison.InvariantCultureIgnoreCase)
367
                                            select info).ToList();
368

    
369
                        if (_firstPoll)
370
                            StatusKeeper.CleanupOrphanStates();
371
                        StatusKeeper.CleanupStaleStates(accountInfo, cleanRemotes);
372
                        
373
                        //var differencer = _differencer.PostSnapshot(accountInfo, cleanRemotes);
374

    
375
                        var filterUris = Selectives.SelectiveUris[accountInfo.AccountKey];
376

    
377

    
378
                        //Get the local files here                        
379
                        var agent = AgentLocator<FileAgent>.Get(accountInfo.AccountPath);
380

    
381
                        var files = LoadLocalFileTuples(accountInfo);
382

    
383
                        var states = FileState.Queryable.ToList();
384

    
385
                        var infos = (from remote in cleanRemotes
386
                                    let path = remote.RelativeUrlToFilePath(accountInfo.UserName)
387
                                    let info=agent.GetFileSystemInfo(path)
388
                                    select Tuple.Create(info.FullName,remote))
389
                                    .ToList();
390

    
391
                        var token = _currentOperationCancellation.Token;
392

    
393
                        var tuples = MergeSources(infos, files, states).ToList();
394

    
395
                        
396
                        foreach (var tuple in tuples)
397
                        {
398
                            await _unPauseEvent.WaitAsync();
399

    
400
                            //Set the Merkle Hash
401
                            SetMerkleHash(accountInfo, tuple);
402

    
403
                            SyncSingleItem(accountInfo, tuple, agent, token);
404

    
405
                        }
406

    
407

    
408
                        //On the first run
409
/*
410
                        if (_firstPoll)
411
                        {
412
                            MarkSuspectedDeletes(accountInfo, cleanRemotes);
413
                        }
414
*/
415

    
416

    
417
                        Log.Info("[LISTENER] End Processing");
418
                    }
419
                }
420
                catch (Exception ex)
421
                {
422
                    Log.ErrorFormat("[FAIL] ListObjects for{0} in ProcessRemoteFiles with {1}", accountInfo.UserName, ex);
423
                    return nextSince;
424
                }
425

    
426
                Log.Info("[LISTENER] Finished");
427
                return nextSince;
428
            }
429
        }
430

    
431
        private static void SetMerkleHash(AccountInfo accountInfo, StateTuple tuple)
432
        {
433
            //The Merkle hash for directories is that of an empty buffer
434
            if (tuple.FileInfo is DirectoryInfo)
435
                tuple.C = MERKLE_EMPTY;
436
            else if (tuple.FileState != null && tuple.MD5 == tuple.FileState.ShortHash)
437
            {
438
                //If there is a state whose MD5 matches, load the merkle hash fromthe file state
439
                //insteaf of calculating it
440
                tuple.C = tuple.FileState.Checksum;                              
441
            }
442
            else
443
            {
444
                tuple.C=Signature.CalculateTreeHash(tuple.FileInfo, accountInfo.BlockSize, accountInfo.BlockHash)
445
                                   .TopHash.ToHashString();
446
            }
447
        }
448

    
449
        private static List<Tuple<FileSystemInfo, string>> LoadLocalFileTuples(AccountInfo accountInfo)
450
        {
451
            using (ThreadContext.Stacks["Account Files Hashing"].Push(accountInfo.UserName))
452
            {
453

    
454
                var localInfos = AgentLocator<FileAgent>.Get(accountInfo.AccountPath).EnumerateFileSystemInfos();
455
                //Use the queue to retry locked file hashing
456
                var fileQueue = new Queue<FileSystemInfo>(localInfos);
457
                var hasher = MD5.Create();
458

    
459
                var results = new List<Tuple<FileSystemInfo, string>>();
460

    
461
                while (fileQueue.Count > 0)
462
                {
463
                    var file = fileQueue.Dequeue();
464
                    using (ThreadContext.Stacks["File"].Push(file.FullName))
465
                    {
466
                        /*
467
                                                Signature.CalculateTreeHash(file, accountInfo.BlockSize,
468
                                                                                                 accountInfo.BlockHash).
469
                                                                         TopHash.ToHashString()
470
                        */
471
                        try
472
                        {
473
                            //Replace MD5 here, do the calc while syncing individual files
474
                            string hash ;
475
                            if (file is DirectoryInfo)
476
                                hash = MERKLE_EMPTY;
477
                            else
478
                            {
479
                                using (var stream = (file as FileInfo).OpenRead())
480
                                {
481
                                    hash = hasher.ComputeHash(stream).ToHashString();
482
                                }
483
                            }                            
484
                            results.Add(Tuple.Create(file, hash));
485
                        }
486
                        catch (IOException exc)
487
                        {
488
                            Log.WarnFormat("[HASH] File in use, will retry [{0}]", exc);
489
                            fileQueue.Enqueue(file);
490
                        }
491
                    }
492
                }
493

    
494
                return results;
495
            }
496
        }
497

    
498
        private void SyncSingleItem(AccountInfo accountInfo, StateTuple tuple, FileAgent agent, CancellationToken token)
499
        {
500
            Log.DebugFormat("Sync [{0}] C:[{1}] L:[{2}] S:[{3}]",tuple.FilePath,tuple.C,tuple.L,tuple.S);
501

    
502
            var localFilePath = tuple.FilePath;
503
            //Don't use the tuple info, it may have been deleted
504
            var localInfo = FileInfoExtensions.FromPath(localFilePath);
505

    
506
            // Local file unchanged? If both C and L are null, make sure it's because 
507
            //both the file is missing and the state checksum is not missing
508
            if (tuple.C == tuple.L && (localInfo.Exists || tuple.FileState==null))
509
            {
510
                //No local changes
511
                //Server unchanged?
512
                if (tuple.S == tuple.L)
513
                {
514
                    // No server changes
515
                    ;
516
                }
517
                else
518
                {
519
                    //Different from server
520
                    if (Selectives.IsSelected(accountInfo, localFilePath))
521
                    {
522
                        //Does the server file exist?
523
                        if (tuple.S == null)
524
                        {
525
                            //Server file doesn't exist
526
                            //deleteObjectFromLocal()
527
                            StatusKeeper.SetFileState(localFilePath, FileStatus.Deleted,
528
                                                      FileOverlayStatus.Deleted, "");
529
                            agent.Delete(localFilePath);
530
                            //updateRecord(Remove C, L)
531
                            StatusKeeper.ClearFileStatus(localFilePath);
532
                        }
533
                        else
534
                        {
535
                            //Server file exists
536
                            //downloadServerObject() // Result: L = S
537
                            StatusKeeper.SetFileState(localFilePath, FileStatus.Modified,
538
                                                      FileOverlayStatus.Modified, "");
539
                            NetworkAgent.Downloader.DownloadCloudFile(accountInfo,
540
                                                                            tuple.ObjectInfo,
541
                                                                            localFilePath, token).Wait(token);
542
                            //updateRecord( L = S )
543
                            StatusKeeper.UpdateFileChecksum(localFilePath, tuple.ObjectInfo.ETag,
544
                                                            tuple.ObjectInfo.X_Object_Hash);
545

    
546
                            StatusKeeper.SetFileState(localFilePath, FileStatus.Unchanged,
547
                                                      FileOverlayStatus.Normal, "");
548
                        }
549
                    }
550
                }
551
            }
552
            else
553
            {
554
                //Local changes found
555

    
556
                //Server unchanged?
557
                if (tuple.S == tuple.L)
558
                {
559
                    //The FileAgent selective sync checks for new root folder files
560
                    if (!agent.Ignore(localFilePath))
561
                    {
562
                        if ((tuple.C == null || !localInfo.Exists) && tuple.ObjectInfo != null)
563
                        {
564
                            //deleteObjectFromServer()
565
                            DeleteCloudFile(accountInfo, tuple);
566
                            //updateRecord( Remove L, S)                  
567
                        }
568
                        else
569
                        {
570
                            //uploadLocalObject() // Result: S = C, L = S                        
571
                            var isUnselected = agent.IsUnselectedRootFolder(tuple.FilePath);
572

    
573
                            //Debug.Assert(tuple.FileState !=null);
574
                            var action = new CloudUploadAction(accountInfo, localInfo, tuple.FileState,
575
                                                               accountInfo.BlockSize, accountInfo.BlockHash,
576
                                                               "Poll", isUnselected);
577
                            NetworkAgent.Uploader.UploadCloudFile(action, token).Wait(token);
578

    
579

    
580
                            //updateRecord( S = C )
581
                            StatusKeeper.SetFileState(localFilePath, FileStatus.Unchanged,
582
                                                      FileOverlayStatus.Normal, "");
583
                            if (isUnselected)
584
                            {
585
                                ProcessChildren(accountInfo, tuple, agent, token);
586
                            }
587
                        }
588
                    }
589
                }
590
                else
591
                {
592
                    if (Selectives.IsSelected(accountInfo, localFilePath))
593
                    {
594
                        if (tuple.C == tuple.S)
595
                        {
596
                            // (Identical Changes) Result: L = S
597
                            //doNothing()
598
                            StatusKeeper.UpdateFileChecksum(localFilePath, tuple.ObjectInfo.ETag,
599
                                                            tuple.ObjectInfo.X_Object_Hash);
600
                            StatusKeeper.SetFileState(localFilePath, FileStatus.Unchanged,
601
                                                      FileOverlayStatus.Normal, "");
602
                        }
603
                        else
604
                        {
605
                            if ((tuple.C == null || !localInfo.Exists) && tuple.ObjectInfo != null )
606
                            {
607
                                //deleteObjectFromServer()
608
                                DeleteCloudFile(accountInfo, tuple);
609
                                //updateRecord(Remove L, S)                  
610
                            }
611
                            else
612
                            {
613
                                ReportConflictForMismatch(localFilePath);
614
                                //identifyAsConflict() // Manual action required
615
                            }
616
                        }
617
                    }
618
                }
619
            }
620
        }
621

    
622
        private void DeleteCloudFile(AccountInfo accountInfo, StateTuple tuple)
623
        {
624
            StatusKeeper.SetFileState(tuple.FilePath, FileStatus.Deleted,
625
                                      FileOverlayStatus.Deleted, "");
626
            NetworkAgent.DeleteAgent.DeleteCloudFile(accountInfo, tuple.ObjectInfo);
627
            StatusKeeper.ClearFileStatus(tuple.FilePath);
628
        }
629

    
630
        private void ProcessChildren(AccountInfo accountInfo, StateTuple tuple, FileAgent agent, CancellationToken token)
631
        {
632

    
633
            var dirInfo = tuple.FileInfo as DirectoryInfo;
634
            var folderTuples = from folder in dirInfo.EnumerateDirectories("*", SearchOption.AllDirectories)
635
                               select new StateTuple(folder);
636
            var fileTuples = from file in dirInfo.EnumerateFiles("*", SearchOption.AllDirectories)
637
                             select new StateTuple(file);
638
            
639
            //Process folders first, to ensure folders appear on the sever as soon as possible
640
            folderTuples.ApplyAction(t => SyncSingleItem(accountInfo, t, agent, token));
641
            
642
            fileTuples.ApplyAction(t => SyncSingleItem(accountInfo, t, agent, token));
643
        }
644

    
645
        private static IEnumerable<StateTuple> MergeSources(
646
            IEnumerable<Tuple<string, ObjectInfo>> infos, 
647
            IEnumerable<Tuple<FileSystemInfo, string>> files, 
648
            IEnumerable<FileState> states)
649
        {
650
            var dct = new Dictionary<string, StateTuple>();
651
            foreach (var file in files)
652
            {
653
                var fsInfo = file.Item1;
654
                var fileHash = fsInfo is DirectoryInfo? MERKLE_EMPTY:file.Item2;
655

    
656
                dct[fsInfo.FullName] = new StateTuple {FileInfo = fsInfo, MD5 = fileHash};
657
            }
658
            foreach (var state in states)
659
            {
660
                StateTuple hashTuple;
661
                if (dct.TryGetValue(state.FilePath, out hashTuple))
662
                {
663
                    hashTuple.FileState = state;
664
                }
665
                else
666
                {
667
                    var fsInfo = FileInfoExtensions.FromPath(state.FilePath);
668
                    dct[state.FilePath] = new StateTuple {FileInfo = fsInfo, FileState = state};
669
                }
670
            }
671
            foreach (var info in infos)
672
            {
673
                StateTuple hashTuple;
674
                var filePath = info.Item1;
675
                var objectInfo = info.Item2;
676
                if (dct.TryGetValue(filePath, out hashTuple))
677
                {
678
                    hashTuple.ObjectInfo = objectInfo;
679
                }
680
                else
681
                {
682
                    var fsInfo = FileInfoExtensions.FromPath(filePath);
683
                    dct[filePath] = new StateTuple {FileInfo = fsInfo, ObjectInfo = objectInfo};
684
                }
685
            }
686
            return dct.Values;
687
        }
688

    
689
        /// <summary>
690
        /// Returns the latest LastModified date from the list of objects, but only if it is before
691
        /// than the threshold value
692
        /// </summary>
693
        /// <param name="threshold"></param>
694
        /// <param name="cloudObjects"></param>
695
        /// <returns></returns>
696
        private static DateTime? GetLatestDateBefore(DateTime? threshold, IList<ObjectInfo> cloudObjects)
697
        {
698
            DateTime? maxDate = null;
699
            if (cloudObjects!=null &&  cloudObjects.Count > 0)
700
                maxDate = cloudObjects.Max(obj => obj.Last_Modified);
701
            if (maxDate == null || maxDate == DateTime.MinValue)
702
                return threshold;
703
            if (threshold == null || threshold == DateTime.MinValue || threshold > maxDate)
704
                return maxDate;
705
            return threshold;
706
        }
707

    
708
        /// <summary>
709
        /// Returns the latest LastModified date from the list of objects, but only if it is after
710
        /// the threshold value
711
        /// </summary>
712
        /// <param name="threshold"></param>
713
        /// <param name="cloudObjects"></param>
714
        /// <returns></returns>
715
        private static DateTime? GetLatestDateAfter(DateTime? threshold, IList<ObjectInfo> cloudObjects)
716
        {
717
            DateTime? maxDate = null;
718
            if (cloudObjects!=null &&  cloudObjects.Count > 0)
719
                maxDate = cloudObjects.Max(obj => obj.Last_Modified);
720
            if (maxDate == null || maxDate == DateTime.MinValue)
721
                return threshold;
722
            if (threshold == null || threshold == DateTime.MinValue || threshold < maxDate)
723
                return maxDate;
724
            return threshold;
725
        }
726

    
727
        //readonly AccountsDifferencer _differencer = new AccountsDifferencer();
728
        private Dictionary<Uri, List<Uri>> _selectiveUris = new Dictionary<Uri, List<Uri>>();
729
        private bool _pause;
730
        private static string MERKLE_EMPTY = "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855";
731

    
732
        /// <summary>
733
        /// Deletes local files that are not found in the list of cloud files
734
        /// </summary>
735
        /// <param name="accountInfo"></param>
736
        /// <param name="cloudFiles"></param>
737
        private void ProcessDeletedFiles(AccountInfo accountInfo, IEnumerable<ObjectInfo> cloudFiles)
738
        {
739
            if (accountInfo == null)
740
                throw new ArgumentNullException("accountInfo");
741
            if (String.IsNullOrWhiteSpace(accountInfo.AccountPath))
742
                throw new ArgumentException("The AccountInfo.AccountPath is empty", "accountInfo");
743
            if (cloudFiles == null)
744
                throw new ArgumentNullException("cloudFiles");
745
            Contract.EndContractBlock();
746

    
747
            var deletedFiles = new List<FileSystemInfo>();
748
            foreach (var objectInfo in cloudFiles)
749
            {
750
                if (Log.IsDebugEnabled)
751
                    Log.DebugFormat("Handle deleted [{0}]", objectInfo.Uri);
752
                var relativePath = objectInfo.RelativeUrlToFilePath(accountInfo.UserName);
753
                var item = FileAgent.GetFileAgent(accountInfo).GetFileSystemInfo(relativePath);
754
                if (Log.IsDebugEnabled)
755
                    Log.DebugFormat("Will delete [{0}] for [{1}]", item.FullName, objectInfo.Uri);
756
                if (item.Exists)
757
                {
758
                    if ((item.Attributes & FileAttributes.ReadOnly) == FileAttributes.ReadOnly)
759
                    {
760
                        item.Attributes = item.Attributes & ~FileAttributes.ReadOnly;
761

    
762
                    }
763

    
764

    
765
                    Log.DebugFormat("Deleting {0}", item.FullName);
766

    
767
                    var directory = item as DirectoryInfo;
768
                    if (directory != null)
769
                        directory.Delete(true);
770
                    else
771
                        item.Delete();
772
                    Log.DebugFormat("Deleted [{0}] for [{1}]", item.FullName, objectInfo.Uri);
773
                    DateTime lastDate;
774
                    _lastSeen.TryRemove(item.FullName, out lastDate);
775
                    deletedFiles.Add(item);
776
                }
777
                StatusKeeper.SetFileState(item.FullName, FileStatus.Deleted, FileOverlayStatus.Deleted, "File Deleted");
778
            }
779
            Log.InfoFormat("[{0}] files were deleted", deletedFiles.Count);
780
            StatusNotification.NotifyForFiles(deletedFiles, String.Format("{0} files were deleted", deletedFiles.Count),
781
                                              TraceLevel.Info);
782

    
783
        }
784

    
785
        private void MarkSuspectedDeletes(AccountInfo accountInfo, IEnumerable<ObjectInfo> cloudFiles)
786
        {
787
//Only consider files that are not being modified, ie they are in the Unchanged state            
788
            var deleteCandidates = FileState.Queryable.Where(state =>
789
                                                             state.FilePath.StartsWith(accountInfo.AccountPath)
790
                                                             && state.FileStatus == FileStatus.Unchanged).ToList();
791

    
792

    
793
            //TODO: filesToDelete must take into account the Others container            
794
            var filesToDelete = (from deleteCandidate in deleteCandidates
795
                                 let localFile = FileInfoExtensions.FromPath(deleteCandidate.FilePath)
796
                                 let relativeFilePath = localFile.AsRelativeTo(accountInfo.AccountPath)
797
                                 where
798
                                     !cloudFiles.Any(r => r.RelativeUrlToFilePath(accountInfo.UserName) == relativeFilePath)
799
                                 select localFile).ToList();
800

    
801

    
802
            //Set the status of missing files to Conflict
803
            foreach (var item in filesToDelete)
804
            {
805
                //Try to acquire a gate on the file, to take into account files that have been dequeued
806
                //and are being processed
807
                using (var gate = NetworkGate.Acquire(item.FullName, NetworkOperation.Deleting))
808
                {
809
                    if (gate.Failed)
810
                        continue;
811
                    StatusKeeper.SetFileState(item.FullName, FileStatus.Conflict, FileOverlayStatus.Deleted,
812
                                              "Local file missing from server");
813
                }
814
            }
815
            UpdateStatus(PithosStatus.HasConflicts);
816
            StatusNotification.NotifyConflicts(filesToDelete,
817
                                               String.Format(
818
                                                   "{0} local files are missing from Pithos, possibly because they were deleted",
819
                                                   filesToDelete.Count));
820
            StatusNotification.NotifyForFiles(filesToDelete, String.Format("{0} files were deleted", filesToDelete.Count),
821
                                              TraceLevel.Info);
822
        }
823

    
824
        private void ReportConflictForMismatch(string localFilePath)
825
        {
826
            if (String.IsNullOrWhiteSpace(localFilePath))
827
                throw new ArgumentNullException("localFilePath");
828
            Contract.EndContractBlock();
829

    
830
            StatusKeeper.SetFileState(localFilePath, FileStatus.Conflict, FileOverlayStatus.Conflict, "File changed at the server");
831
            UpdateStatus(PithosStatus.HasConflicts);
832
            var message = String.Format("Conflict detected for file {0}", localFilePath);
833
            Log.Warn(message);
834
            StatusNotification.NotifyChange(message, TraceLevel.Warning);
835
        }
836

    
837

    
838

    
839
        /// <summary>
840
        /// Creates a Sync action for each changed server file
841
        /// </summary>
842
        /// <param name="accountInfo"></param>
843
        /// <param name="changes"></param>
844
        /// <returns></returns>
845
        private IEnumerable<CloudAction> ChangesToActions(AccountInfo accountInfo, IEnumerable<ObjectInfo> changes)
846
        {
847
            if (changes == null)
848
                throw new ArgumentNullException();
849
            Contract.EndContractBlock();
850
            var fileAgent = FileAgent.GetFileAgent(accountInfo);
851

    
852
            //In order to avoid multiple iterations over the files, we iterate only once
853
            //over the remote files
854
            foreach (var objectInfo in changes)
855
            {
856
                var relativePath = objectInfo.RelativeUrlToFilePath(accountInfo.UserName);
857
                //If a directory object already exists, we may need to sync it
858
                if (fileAgent.Exists(relativePath))
859
                {
860
                    var localFile = fileAgent.GetFileSystemInfo(relativePath);
861
                    //We don't need to sync directories
862
                    if (objectInfo.IsDirectory && localFile is DirectoryInfo)
863
                        continue;
864
                    using (new SessionScope(FlushAction.Never))
865
                    {
866
                        var state = StatusKeeper.GetStateByFilePath(localFile.FullName);
867
                        _lastSeen[localFile.FullName] = DateTime.Now;
868
                        //Common files should be checked on a per-case basis to detect differences, which is newer
869

    
870
                        yield return new CloudAction(accountInfo, CloudActionType.MustSynch,
871
                                                     localFile, objectInfo, state, accountInfo.BlockSize,
872
                                                     accountInfo.BlockHash,"Poll Changes");
873
                    }
874
                }
875
                else
876
                {
877
                    //Remote files should be downloaded
878
                    yield return new CloudDownloadAction(accountInfo, objectInfo,"Poll Changes");
879
                }
880
            }
881
        }
882

    
883
        /// <summary>
884
        /// Creates a Local Move action for each moved server file
885
        /// </summary>
886
        /// <param name="accountInfo"></param>
887
        /// <param name="moves"></param>
888
        /// <returns></returns>
889
        private IEnumerable<CloudAction> MovesToActions(AccountInfo accountInfo, IEnumerable<ObjectInfo> moves)
890
        {
891
            if (moves == null)
892
                throw new ArgumentNullException();
893
            Contract.EndContractBlock();
894
            var fileAgent = FileAgent.GetFileAgent(accountInfo);
895

    
896
            //In order to avoid multiple iterations over the files, we iterate only once
897
            //over the remote files
898
            foreach (var objectInfo in moves)
899
            {
900
                var previousRelativepath = objectInfo.Previous.RelativeUrlToFilePath(accountInfo.UserName);
901
                //If the previous file already exists, we can execute a Move operation
902
                if (fileAgent.Exists(previousRelativepath))
903
                {
904
                    var previousFile = fileAgent.GetFileSystemInfo(previousRelativepath);
905
                    using (new SessionScope(FlushAction.Never))
906
                    {
907
                        var state = StatusKeeper.GetStateByFilePath(previousFile.FullName);
908
                        _lastSeen[previousFile.FullName] = DateTime.Now;
909

    
910
                        //For each moved object we need to move both the local file and update                                                
911
                        yield return new CloudAction(accountInfo, CloudActionType.RenameLocal,
912
                                                     previousFile, objectInfo, state, accountInfo.BlockSize,
913
                                                     accountInfo.BlockHash,"Poll Moves");
914
                        //For modified files, we need to download the changes as well
915
                        if (objectInfo.X_Object_Hash != objectInfo.PreviousHash)
916
                            yield return new CloudDownloadAction(accountInfo,objectInfo, "Poll Moves");
917
                    }
918
                }
919
                //If the previous file does not exist, we need to download it in the new location
920
                else
921
                {
922
                    //Remote files should be downloaded
923
                    yield return new CloudDownloadAction(accountInfo, objectInfo, "Poll Moves");
924
                }
925
            }
926
        }
927

    
928

    
929
        /// <summary>
930
        /// Creates a download action for each new server file
931
        /// </summary>
932
        /// <param name="accountInfo"></param>
933
        /// <param name="creates"></param>
934
        /// <returns></returns>
935
        private IEnumerable<CloudAction> CreatesToActions(AccountInfo accountInfo, IEnumerable<ObjectInfo> creates)
936
        {
937
            if (creates == null)
938
                throw new ArgumentNullException();
939
            Contract.EndContractBlock();
940
            var fileAgent = FileAgent.GetFileAgent(accountInfo);
941

    
942
            //In order to avoid multiple iterations over the files, we iterate only once
943
            //over the remote files
944
            foreach (var objectInfo in creates)
945
            {
946
                if (Log.IsDebugEnabled)
947
                    Log.DebugFormat("[NEW INFO] {0}",objectInfo.Uri);
948

    
949
                var relativePath = objectInfo.RelativeUrlToFilePath(accountInfo.UserName);
950

    
951
                //If the object already exists, we should check before uploading or downloading
952
                if (fileAgent.Exists(relativePath))
953
                {
954
                    var localFile= fileAgent.GetFileSystemInfo(relativePath);
955
                    var state = StatusKeeper.GetStateByFilePath(localFile.WithProperCapitalization().FullName);
956
                    yield return new CloudAction(accountInfo, CloudActionType.MustSynch,
957
                                                     localFile, objectInfo, state, accountInfo.BlockSize,
958
                                                     accountInfo.BlockHash,"Poll Creates");                    
959
                }
960
                else
961
                {
962
                    //Remote files should be downloaded
963
                    yield return new CloudDownloadAction(accountInfo, objectInfo,"Poll Creates");
964
                }
965

    
966
            }
967
        }
968

    
969
        /// <summary>
970
        /// Notify the UI to update the visual status
971
        /// </summary>
972
        /// <param name="status"></param>
973
        private void UpdateStatus(PithosStatus status)
974
        {
975
            try
976
            {
977
                StatusNotification.SetPithosStatus(status);
978
                //StatusNotification.Notify(new Notification());
979
            }
980
            catch (Exception exc)
981
            {
982
                //Failure is not critical, just log it
983
                Log.Warn("Error while updating status", exc);
984
            }
985
        }
986

    
987
        private static void CreateContainerFolders(AccountInfo accountInfo, IEnumerable<ContainerInfo> containers)
988
        {
989
            var containerPaths = from container in containers
990
                                 let containerPath = Path.Combine(accountInfo.AccountPath, container.Name)
991
                                 where container.Name != FolderConstants.TrashContainer && !Directory.Exists(containerPath)
992
                                 select containerPath;
993

    
994
            foreach (var path in containerPaths)
995
            {
996
                Directory.CreateDirectory(path);
997
            }
998
        }
999

    
1000
        public void AddAccount(AccountInfo accountInfo)
1001
        {
1002
            //Avoid adding a duplicate accountInfo
1003
            _accounts.TryAdd(accountInfo.AccountKey, accountInfo);
1004
        }
1005

    
1006
        public void RemoveAccount(AccountInfo accountInfo)
1007
        {
1008
            AccountInfo account;
1009
            _accounts.TryRemove(accountInfo.AccountKey, out account);
1010
/*
1011
            SnapshotDifferencer differencer;
1012
            _differencer.Differencers.TryRemove(accountInfo.AccountKey, out differencer);
1013
*/
1014
        }
1015

    
1016
        public void SetSelectivePaths(AccountInfo accountInfo,Uri[] added, Uri[] removed)
1017
        {
1018
            AbortRemovedPaths(accountInfo,removed);
1019
            DownloadNewPaths(accountInfo,added);
1020
        }
1021

    
1022
        private void DownloadNewPaths(AccountInfo accountInfo, Uri[] added)
1023
        {
1024
            var client = new CloudFilesClient(accountInfo);
1025
            foreach (var folderUri in added)
1026
            {
1027
                try
1028
                {
1029

    
1030
                    string account;
1031
                    string container;
1032
                    var segmentsCount = folderUri.Segments.Length;
1033
                    //Is this an account URL?
1034
                    if (segmentsCount < 3)
1035
                        continue;
1036
                    //Is this a container or  folder URL?
1037
                    if (segmentsCount == 3)
1038
                    {
1039
                        account = folderUri.Segments[1].TrimEnd('/');
1040
                        container = folderUri.Segments[2].TrimEnd('/');
1041
                    }
1042
                    else
1043
                    {
1044
                        account = folderUri.Segments[2].TrimEnd('/');
1045
                        container = folderUri.Segments[3].TrimEnd('/');
1046
                    }
1047
                    IList<ObjectInfo> items;
1048
                    if (segmentsCount > 3)
1049
                    {
1050
                        //List folder
1051
                        var folder = String.Join("", folderUri.Segments.Splice(4));
1052
                        items = client.ListObjects(account, container, folder);
1053
                    }
1054
                    else
1055
                    {
1056
                        //List container
1057
                        items = client.ListObjects(account, container);
1058
                    }
1059
                    var actions = CreatesToActions(accountInfo, items);
1060
                    foreach (var action in actions)
1061
                    {
1062
                        NetworkAgent.Post(action);
1063
                    }
1064
                }
1065
                catch (Exception exc)
1066
                {
1067
                    Log.WarnFormat("Listing of new selective path [{0}] failed with \r\n{1}", folderUri, exc);
1068
                }
1069
            }
1070

    
1071
            //Need to get a listing of each of the URLs, then post them to the NetworkAgent
1072
            //CreatesToActions(accountInfo,)
1073

    
1074
/*            NetworkAgent.Post();*/
1075
        }
1076

    
1077
        private void AbortRemovedPaths(AccountInfo accountInfo, Uri[] removed)
1078
        {
1079
            /*this.NetworkAgent.*/
1080
        }
1081
    }
1082
}