Statistics
| Branch: | Revision:

root / trunk / Pithos.Core / Agents / NetworkAgent.cs @ 7b0a5fec

History | View | Annotate | Download (43 kB)

1
using System;
2
using System.Collections.Generic;
3
using System.ComponentModel;
4
using System.ComponentModel.Composition;
5
using System.Diagnostics;
6
using System.Diagnostics.Contracts;
7
using System.IO;
8
using System.Linq;
9
using System.Net;
10
using System.Text;
11
using System.Threading;
12
using System.Threading.Tasks;
13
using Pithos.Interfaces;
14
using Pithos.Network;
15
using log4net;
16

    
17
namespace Pithos.Core.Agents
18
{
19
    [Export]
20
    public class NetworkAgent
21
    {
22
        private Agent<CloudAction> _agent;
23

    
24
        [Import]
25
        public IStatusKeeper StatusKeeper { get; set; }
26
        
27
        public IStatusNotification StatusNotification { get; set; }
28
/*
29
        [Import]
30
        public FileAgent FileAgent {get;set;}
31
*/
32

    
33
       /* public int BlockSize { get; set; }
34
        public string BlockHash { get; set; }*/
35

    
36
        private static readonly ILog Log = LogManager.GetLogger("NetworkAgent");
37

    
38
        private List<AccountInfo> _accounts=new List<AccountInfo>();
39

    
40
        public void Start(/*int blockSize, string blockHash*/)
41
        {
42
/*
43
            if (blockSize<0)
44
                throw new ArgumentOutOfRangeException("blockSize");
45
            if (String.IsNullOrWhiteSpace(blockHash))
46
                throw new ArgumentOutOfRangeException("blockHash");
47
            Contract.EndContractBlock();
48
*/
49

    
50
/*
51
            BlockSize = blockSize;
52
            BlockHash = blockHash;
53
*/
54

    
55

    
56
            _agent = Agent<CloudAction>.Start(inbox =>
57
            {
58
                Action loop = null;
59
                loop = () =>
60
                {
61
                    var message = inbox.Receive();
62
                    var process=message.Then(Process,inbox.CancellationToken);
63
                    inbox.LoopAsync(process, loop);
64
                };
65
                loop();
66
            });
67
        }
68

    
69
        private Task<object> Process(CloudAction action)
70
        {
71
            if (action == null)
72
                throw new ArgumentNullException("action");
73
            if (action.AccountInfo==null)
74
                throw new ArgumentException("The action.AccountInfo is empty","action");
75
            Contract.EndContractBlock();
76

    
77
            var accountInfo = action.AccountInfo;
78

    
79
            using (log4net.ThreadContext.Stacks["NETWORK"].Push("PROCESS"))
80
            {                
81
                Log.InfoFormat("[ACTION] Start Processing {0}:{1}->{2}", action.Action, action.LocalFile,
82
                               action.CloudFile.Name);
83

    
84
                var localFile = action.LocalFile;
85
                var cloudFile = action.CloudFile;
86
                var downloadPath = action.GetDownloadPath();
87

    
88
                try
89
                {
90

    
91
                    switch (action.Action)
92
                    {
93
                        case CloudActionType.UploadUnconditional:
94
                            UploadCloudFile(action);
95
                            break;
96
                        case CloudActionType.DownloadUnconditional:
97

    
98
                            DownloadCloudFile(accountInfo,  cloudFile,downloadPath);
99
                            break;
100
                        case CloudActionType.DeleteCloud:
101
                            DeleteCloudFile(accountInfo, cloudFile, cloudFile.Name);
102
                            break;
103
                        case CloudActionType.RenameCloud:
104
                            var moveAction = (CloudMoveAction)action;
105
                            RenameCloudFile(accountInfo, cloudFile, moveAction);
106
                            break;
107
                        case CloudActionType.MustSynch:
108

    
109
                            if (!File.Exists(downloadPath))
110
                            {                                
111
                                DownloadCloudFile(accountInfo, cloudFile, downloadPath);
112
                            }
113
                            else
114
                            {
115
                                SyncFiles(accountInfo, action);
116
                            }
117
                            break;
118
                    }
119
                    Log.InfoFormat("[ACTION] End Processing {0}:{1}->{2}", action.Action, action.LocalFile,
120
                                           action.CloudFile.Name);
121
                }
122
                catch (OperationCanceledException)
123
                {
124
                    throw;
125
                }
126
                catch (FileNotFoundException exc)
127
                {
128
                    Log.ErrorFormat("{0} : {1} -> {2}  failed because the file was not found.\n Rescheduling a delete",
129
                        action.Action, action.LocalFile, action.CloudFile, exc);
130
                    //Post a delete action for the missing file
131
                    Post(new CloudDeleteAction(action));
132
                }
133
                catch (Exception exc)
134
                {
135
                    Log.ErrorFormat("[REQUEUE] {0} : {1} -> {2} due to exception\r\n{3}",
136
                                     action.Action, action.LocalFile, action.CloudFile, exc);
137

    
138
                    _agent.Post(action);
139
                }
140
                return CompletedTask<object>.Default;
141
            }
142
        }
143

    
144
        private void SyncFiles(AccountInfo accountInfo,CloudAction action)
145
        {
146
            if (accountInfo == null)
147
                throw new ArgumentNullException("accountInfo");
148
            if (action==null)
149
                throw new ArgumentNullException("action");
150
            if (action.LocalFile==null)
151
                throw new ArgumentException("The action's local file is not specified","action");
152
            if (!Path.IsPathRooted(action.LocalFile.FullName))
153
                throw new ArgumentException("The action's local file path must be absolute","action");
154
            if (action.CloudFile== null)
155
                throw new ArgumentException("The action's cloud file is not specified", "action");
156
            Contract.EndContractBlock();
157

    
158
            var localFile = action.LocalFile;
159
            var cloudFile = action.CloudFile;
160
            var downloadPath=action.LocalFile.FullName.ToLower();
161

    
162
            var cloudHash = cloudFile.Hash.ToLower();
163
            var localHash = action.LocalHash.Value.ToLower();
164
            var topHash = action.TopHash.Value.ToLower();
165

    
166
            //Not enough to compare only the local hashes, also have to compare the tophashes
167
            
168
            //If any of the hashes match, we are done
169
            if ((cloudHash == localHash || cloudHash == topHash))
170
            {
171
                Log.InfoFormat("Skipping {0}, hashes match",downloadPath);
172
                return;
173
            }
174

    
175
            //The hashes DON'T match. We need to sync
176
            var lastLocalTime = localFile.LastWriteTime;
177
            var lastUpTime = cloudFile.Last_Modified;
178
            
179
            //If the local file is newer upload it
180
            if (lastUpTime <= lastLocalTime)
181
            {
182
                //It probably means it was changed while the app was down                        
183
                UploadCloudFile(action);
184
            }
185
            else
186
            {
187
                //It the cloud file has a later date, it was modified by another user or computer.
188
                //We need to check the local file's status                
189
                var status = StatusKeeper.GetFileStatus(downloadPath);
190
                switch (status)
191
                {
192
                    case FileStatus.Unchanged:                        
193
                        //If the local file's status is Unchanged, we can go on and download the newer cloud file
194
                        DownloadCloudFile(accountInfo,cloudFile,downloadPath);
195
                        break;
196
                    case FileStatus.Modified:
197
                        //If the local file is Modified, we may have a conflict. In this case we should mark the file as Conflict
198
                        //We can't ensure that a file modified online since the last time will appear as Modified, unless we 
199
                        //index all files before we start listening.                       
200
                    case FileStatus.Created:
201
                        //If the local file is Created, it means that the local and cloud files aren't related,
202
                        // yet they have the same name.
203

    
204
                        //In both cases we must mark the file as in conflict
205
                        ReportConflict(downloadPath);
206
                        break;
207
                    default:
208
                        //Other cases should never occur. Mark them as Conflict as well but log a warning
209
                        ReportConflict(downloadPath);
210
                        Log.WarnFormat("Unexcepted status {0} for file {1}->{2}", status,
211
                                       downloadPath, action.CloudFile.Name);
212
                        break;
213
                }
214
            }
215
        }
216

    
217
        private void ReportConflict(string downloadPath)
218
        {
219
            if (String.IsNullOrWhiteSpace(downloadPath))
220
                throw new ArgumentNullException("downloadPath");
221
            Contract.EndContractBlock();
222

    
223
            StatusKeeper.SetFileOverlayStatus(downloadPath, FileOverlayStatus.Conflict);
224
            var message = String.Format("Conflict detected for file {0}", downloadPath);
225
            Log.Warn(message);
226
            StatusNotification.NotifyChange(message, TraceLevel.Warning);
227
        }
228

    
229
        public void Post(CloudAction cloudAction)
230
        {
231
            if (cloudAction == null)
232
                throw new ArgumentNullException("cloudAction");
233
            if (cloudAction.AccountInfo==null)
234
                throw new ArgumentException("The CloudAction.AccountInfo is empty","cloudAction");
235
            Contract.EndContractBlock();
236
            
237
            //If the action targets a local file, add a treehash calculation
238
            if (cloudAction.LocalFile != null)
239
            {
240
                var accountInfo = cloudAction.AccountInfo;
241
                if (cloudAction.LocalFile.Length>accountInfo.BlockSize)
242
                    cloudAction.TopHash = new Lazy<string>(() => Signature.CalculateTreeHashAsync(cloudAction.LocalFile,
243
                                    accountInfo.BlockSize, accountInfo.BlockHash).Result
244
                                     .TopHash.ToHashString());
245
                else
246
                {
247
                    cloudAction.TopHash=new Lazy<string>(()=> cloudAction.LocalHash.Value);
248
                }
249

    
250
            }
251
            _agent.Post(cloudAction);
252
        }
253

    
254
        class ObjectInfoByNameComparer:IEqualityComparer<ObjectInfo>
255
        {
256
            public bool Equals(ObjectInfo x, ObjectInfo y)
257
            {
258
                return x.Name.Equals(y.Name,StringComparison.InvariantCultureIgnoreCase);
259
            }
260

    
261
            public int GetHashCode(ObjectInfo obj)
262
            {
263
                return obj.Name.ToLower().GetHashCode();
264
            }
265
        }
266

    
267
        
268

    
269
        //Remote files are polled periodically. Any changes are processed
270
        public Task ProcessRemoteFiles(DateTime? since=null)
271
        {
272
            return Task<Task>.Factory.StartNewDelayed(10000, () =>
273
            {
274
                using (log4net.ThreadContext.Stacks["Retrieve Remote"].Push("All accounts"))
275
                {
276
                    //Next time we will check for all changes since the current check minus 1 second
277
                    //This is done to ensure there are no discrepancies due to clock differences
278
                    DateTime nextSince = DateTime.Now.AddSeconds(-1);
279
                    
280
                    var tasks=from accountInfo in _accounts
281
                              select ProcessAccountFiles(accountInfo, since);
282
                    var process=Task.Factory.Iterate(tasks);
283

    
284
                    return process.ContinueWith(t =>
285
                    {
286
                        if (t.IsFaulted)
287
                        {
288
                            Log.Error("Error while processing accounts");
289
                            t.Exception.Handle(exc=>
290
                                                   {
291
                                                       Log.Error("Details:", exc);
292
                                                       return true;
293
                                                   });                            
294
                        }
295
                        ProcessRemoteFiles(nextSince);
296
                    });
297
                }
298
            });            
299
        }
300

    
301
        public Task ProcessAccountFiles(AccountInfo accountInfo,DateTime? since=null)
302
        {   
303
            if (accountInfo==null)
304
                throw new ArgumentNullException("accountInfo");
305
            if (String.IsNullOrWhiteSpace(accountInfo.AccountPath))
306
                throw new ArgumentException("The AccountInfo.AccountPath is empty","accountInfo");
307
            Contract.EndContractBlock();
308

    
309
            using (log4net.ThreadContext.Stacks["Retrieve Remote"].Push(accountInfo.UserName))
310
            {
311
                Log.Info("Scheduled");
312
                var client=new CloudFilesClient(accountInfo);
313

    
314
                var containers = client.ListContainers(accountInfo.UserName);
315
                
316
                CreateContainerFolders(accountInfo, containers);
317

    
318

    
319
                //Get the list of server objects changed since the last check
320
                var listObjects = from container in containers
321
                                  select Task<IList<ObjectInfo>>.Factory.StartNew(_ =>
322
                                        client.ListObjects(accountInfo.UserName, container.Name, since),container.Name);
323

    
324
                var listAll = Task.Factory.WhenAll(listObjects.ToArray());
325
                
326
                
327

    
328
                //Get the list of deleted objects since the last check
329
/*
330
                var listTrash = Task<IList<ObjectInfo>>.Factory.StartNew(() =>
331
                                client.ListObjects(accountInfo.UserName, FolderConstants.TrashContainer, since));
332

    
333
                var listShared = Task<IList<ObjectInfo>>.Factory.StartNew(() =>
334
                                client.ListSharedObjects(since));
335

    
336
                var listAll = Task.Factory.TrackedSequence(
337
                    () => listObjects,
338
                    () => listTrash,
339
                    () => listShared);
340
*/
341

    
342

    
343

    
344
                var enqueueFiles = listAll.ContinueWith(task =>
345
                {
346
                    if (task.IsFaulted)
347
                    {
348
                        //ListObjects failed at this point, need to reschedule
349
                        Log.ErrorFormat("[FAIL] ListObjects for{0} in ProcessRemoteFiles with {0}", accountInfo.UserName,task.Exception);
350
                        return;
351
                    }
352
                    using (log4net.ThreadContext.Stacks["SCHEDULE"].Push("Process Results"))
353
                    {
354
                        var dict=task.Result.ToDictionary(t=> t.AsyncState);
355
                        
356
                        //Get all non-trash objects. Remember, the container name is stored in AsyncState
357
                        var remoteObjects = from objectList in task.Result
358
                                            where (string)objectList.AsyncState != "trash"
359
                                            from obj in objectList.Result
360
                                            select obj;
361
                                                                       
362
                        var trashObjects = dict["trash"].Result;
363
                        //var sharedObjects = ((Task<IList<ObjectInfo>>) task.Result[2]).Result;
364

    
365
                        //Items with the same name, hash may be both in the container and the trash
366
                        //Don't delete items that exist in the container
367
                        var realTrash = from trash in trashObjects
368
                                        where !remoteObjects.Any(info => info.Hash == trash.Hash)
369
                                        select trash;
370
                        ProcessDeletedFiles(accountInfo,realTrash);                        
371

    
372

    
373
                        var remote = from info in remoteObjects//.Union(sharedObjects)
374
                                     let name = info.Name
375
                                     where !name.EndsWith(".ignore", StringComparison.InvariantCultureIgnoreCase) &&
376
                                           !name.StartsWith(FolderConstants.CacheFolder +"/", StringComparison.InvariantCultureIgnoreCase)
377
                                     select info;
378

    
379
                        //Create a list of actions from the remote files
380
                        var allActions = ObjectsToActions(accountInfo,remote);
381
                       
382
                        //And remove those that are already being processed by the agent
383
                        var distinctActions = allActions
384
                            .Except(_agent.GetEnumerable(), new PithosMonitor.LocalFileComparer())
385
                            .ToList();
386

    
387
                        //Queue all the actions
388
                        foreach (var message in distinctActions)
389
                        {
390
                            Post(message);
391
                        }
392

    
393
                        //Report the number of new files
394
                        var remoteCount = distinctActions.Count(action=>
395
                            action.Action==CloudActionType.DownloadUnconditional);
396
/*
397
                        if ( remoteCount > 0)
398
                            StatusNotification.NotifyChange(String.Format("Processing {0} new files", remoteCount));
399
*/
400

    
401
                        Log.Info("[LISTENER] End Processing");                        
402
                    }
403
                });
404

    
405
                var log = enqueueFiles.ContinueWith(t =>
406
                {                    
407
                    if (t.IsFaulted)
408
                    {
409
                        Log.Error("[LISTENER] Exception", t.Exception);
410
                    }
411
                    else
412
                    {
413
                        Log.Info("[LISTENER] Finished");
414
                    }
415
                });
416
                return log;
417
            }
418
        }
419

    
420
        private static void CreateContainerFolders(AccountInfo accountInfo, IList<ContainerInfo> containers)
421
        {
422
            var containerPaths = from container in containers
423
                                 let containerPath = Path.Combine(accountInfo.AccountPath, container.Name)
424
                                 where container.Name != FolderConstants.TrashContainer && !Directory.Exists(containerPath)
425
                                 select containerPath;
426

    
427
            foreach (var path in containerPaths)
428
            {
429
                Directory.CreateDirectory(path);
430
            }
431
        }
432

    
433
        //Creates an appropriate action for each server file
434
        private IEnumerable<CloudAction> ObjectsToActions(AccountInfo accountInfo,IEnumerable<ObjectInfo> remote)
435
        {
436
            if (remote==null)
437
                throw new ArgumentNullException();
438
            Contract.EndContractBlock();
439
            var fileAgent = GetFileAgent(accountInfo);
440

    
441
            //In order to avoid multiple iterations over the files, we iterate only once
442
            //over the remote files
443
            foreach (var objectInfo in remote)
444
            {
445
                var relativePath = objectInfo.RelativeUrlToFilePath(accountInfo.UserName);
446
                //and remove any matching objects from the list, adding them to the commonObjects list
447
                
448
                if (fileAgent.Exists(relativePath))
449
                {
450
                    var localFile = fileAgent.GetFileInfo(relativePath);
451
                    var state = FileState.FindByFilePath(localFile.FullName);
452
                    //Common files should be checked on a per-case basis to detect differences, which is newer
453

    
454
                    yield return new CloudAction(accountInfo,CloudActionType.MustSynch,
455
                                                   localFile, objectInfo, state, accountInfo.BlockSize,
456
                                                   accountInfo.BlockHash);
457
                }
458
                else
459
                {
460
                    //If there is no match we add them to the localFiles list
461
                    //but only if the file is not marked for deletion
462
                    var targetFile = Path.Combine(accountInfo.AccountPath, relativePath);
463
                    var fileStatus = StatusKeeper.GetFileStatus(targetFile);
464
                    if (fileStatus != FileStatus.Deleted)
465
                    {
466
                        //Remote files should be downloaded
467
                        yield return new CloudDownloadAction(accountInfo,objectInfo);
468
                    }
469
                }
470
            }            
471
        }
472

    
473
        private static FileAgent GetFileAgent(AccountInfo accountInfo)
474
        {
475
            return AgentLocator<FileAgent>.Get(accountInfo.AccountPath);
476
        }
477

    
478
        private void ProcessDeletedFiles(AccountInfo accountInfo,IEnumerable<ObjectInfo> trashObjects)
479
        {
480
            var fileAgent = GetFileAgent(accountInfo);
481
            foreach (var trashObject in trashObjects)
482
            {
483
                var relativePath = trashObject.RelativeUrlToFilePath(accountInfo.UserName);
484
                //and remove any matching objects from the list, adding them to the commonObjects list
485
                fileAgent.Delete(relativePath);                                
486
            }
487
        }
488

    
489

    
490
        private void RenameCloudFile(AccountInfo accountInfo,ObjectInfo cloudFile,CloudMoveAction action)
491
        {
492
            if (accountInfo==null)
493
                throw new ArgumentNullException("accountInfo");
494
            if (cloudFile==null)
495
                throw new ArgumentNullException("cloudFile");
496
            if (action==null)
497
                throw new ArgumentNullException("action");
498
            if (String.IsNullOrWhiteSpace(action.CloudFile.Container))
499
                throw new ArgumentException("Invalid container", "action");
500
            Contract.EndContractBlock();
501
            //The local file is already renamed
502
            this.StatusKeeper.SetFileOverlayStatus(action.NewPath, FileOverlayStatus.Modified);
503

    
504

    
505
            var account = action.CloudFile.Account ?? accountInfo.UserName;
506
            var container = action.CloudFile.Container;// ?? FolderConstants.PithosContainer;
507
            
508
            var client = new CloudFilesClient(accountInfo);
509
            client.MoveObject(account, container, action.OldFileName, container, action.NewFileName);
510

    
511
            this.StatusKeeper.SetFileStatus(action.NewPath, FileStatus.Unchanged);
512
            this.StatusKeeper.SetFileOverlayStatus(action.NewPath, FileOverlayStatus.Normal);
513
            NativeMethods.RaiseChangeNotification(action.NewPath);
514
        }
515

    
516
        private void DeleteCloudFile(AccountInfo accountInfo, ObjectInfo cloudFile,string fileName)
517
        {
518
            if (accountInfo == null)
519
                throw new ArgumentNullException("accountInfo");
520
            if (cloudFile==null)
521
                throw new ArgumentNullException("cloudFile");
522

    
523
            if (String.IsNullOrWhiteSpace(fileName))
524
                throw new ArgumentNullException("fileName");
525
            if (Path.IsPathRooted(fileName))
526
                throw new ArgumentException("The fileName should not be rooted","fileName");
527
            if (String.IsNullOrWhiteSpace(cloudFile.Container))
528
                throw new ArgumentException("Invalid container", "cloudFile");
529
            Contract.EndContractBlock();
530
            
531
            var fileAgent = GetFileAgent(accountInfo);
532

    
533
            using ( log4net.ThreadContext.Stacks["DeleteCloudFile"].Push("Delete"))
534
            {
535
                var info = fileAgent.GetFileInfo(fileName);
536
                var fullPath = info.FullName.ToLower();
537
                this.StatusKeeper.SetFileOverlayStatus(fullPath, FileOverlayStatus.Modified);
538

    
539
                var account = cloudFile.Account ?? accountInfo.UserName;
540
                var container = cloudFile.Container ;//?? FolderConstants.PithosContainer;
541

    
542
                var client = new CloudFilesClient(accountInfo);
543
                client.DeleteObject(account, container, fileName);
544

    
545
                this.StatusKeeper.ClearFileStatus(fullPath);
546
            }
547
        }
548

    
549
        //Download a file.
550
        private void DownloadCloudFile(AccountInfo accountInfo, ObjectInfo cloudFile , string localPath)
551
        {
552
            if (accountInfo == null)
553
                throw new ArgumentNullException("accountInfo");
554
            if (cloudFile == null)
555
                throw new ArgumentNullException("cloudFile");
556
            if (String.IsNullOrWhiteSpace(cloudFile.Account))
557
                throw new ArgumentNullException("cloudFile");
558
            if (String.IsNullOrWhiteSpace(cloudFile.Container))
559
                throw new ArgumentNullException("cloudFile");
560
            if (String.IsNullOrWhiteSpace(localPath))
561
                throw new ArgumentNullException("localPath");
562
            if (!Path.IsPathRooted(localPath))
563
                throw new ArgumentException("The localPath must be rooted", "localPath");
564
            Contract.EndContractBlock();
565
            
566
            var download=Task.Factory.Iterate(DownloadIterator(accountInfo,cloudFile, localPath));
567
            download.Wait();
568
        }
569

    
570
        private IEnumerable<Task> DownloadIterator(AccountInfo accountInfo,ObjectInfo cloudFile, string localPath)
571
        {
572
            if (accountInfo == null)
573
                throw new ArgumentNullException("accountInfo");
574
            if (cloudFile == null)
575
                throw new ArgumentNullException("cloudFile");
576
            if (String.IsNullOrWhiteSpace(cloudFile.Account))
577
                throw new ArgumentNullException("cloudFile");
578
            if (String.IsNullOrWhiteSpace(cloudFile.Container))
579
                throw new ArgumentNullException("cloudFile");
580
            if (String.IsNullOrWhiteSpace(localPath))
581
                throw new ArgumentNullException("localPath");
582
            if (!Path.IsPathRooted(localPath))
583
                throw new ArgumentException("The localPath must be rooted", "localPath");
584
            Contract.EndContractBlock();
585

    
586
            Uri relativeUrl = new Uri(cloudFile.Name, UriKind.Relative);
587

    
588
            var url = relativeUrl.ToString();
589
            if (cloudFile.Name.EndsWith(".ignore", StringComparison.InvariantCultureIgnoreCase))
590
                yield break;
591

    
592
            //Are we already downloading or uploading the file? 
593
            using (var gate=NetworkGate.Acquire(localPath, NetworkOperation.Downloading))
594
            {
595
                if (gate.Failed)
596
                    yield break;
597
                //The file's hashmap will be stored in the same location with the extension .hashmap
598
                //var hashPath = Path.Combine(FileAgent.CachePath, relativePath + ".hashmap");
599
                
600
                var client = new CloudFilesClient(accountInfo);
601
                var account = cloudFile.Account;
602
                var container = cloudFile.Container;
603

    
604
                //Retrieve the hashmap from the server
605
                var getHashMap = client.GetHashMap(account, container, url);
606
                yield return getHashMap;
607
                
608
                var serverHash=getHashMap.Result;
609
                //If it's a small file
610
                var downloadTask=(serverHash.Hashes.Count == 1 )
611
                    //Download it in one go
612
                    ? DownloadEntireFile(accountInfo,client, cloudFile, relativeUrl, localPath, serverHash) 
613
                    //Otherwise download it block by block
614
                    : DownloadWithBlocks(accountInfo,client, cloudFile, relativeUrl, localPath, serverHash);
615

    
616
                yield return downloadTask;
617

    
618
                if (cloudFile.AllowedTo == "read")
619
                {
620
                    var attributes=File.GetAttributes(localPath);
621
                    File.SetAttributes(localPath,attributes|FileAttributes.ReadOnly);
622
                }
623
                
624
                //Now we can store the object's metadata without worrying about ghost status entries
625
                StatusKeeper.StoreInfo(localPath, cloudFile);
626
                
627
            }
628
        }
629

    
630
        //Download a small file with a single GET operation
631
        private Task DownloadEntireFile(AccountInfo accountInfo, CloudFilesClient client, ObjectInfo cloudFile, Uri relativeUrl, string localPath,TreeHash serverHash)
632
        {
633
            if (client == null)
634
                throw new ArgumentNullException("client");
635
            if (cloudFile==null)
636
                throw new ArgumentNullException("cloudFile");
637
            if (relativeUrl == null)
638
                throw new ArgumentNullException("relativeUrl");
639
            if (String.IsNullOrWhiteSpace(localPath))
640
                throw new ArgumentNullException("localPath");
641
            if (!Path.IsPathRooted(localPath))
642
                throw new ArgumentException("The localPath must be rooted", "localPath");
643
            Contract.EndContractBlock();
644

    
645
            //If the file already exists
646
            if (File.Exists(localPath))
647
            {
648
                //First check with MD5 as this is a small file
649
                var localMD5 = Signature.CalculateMD5(localPath);
650
                var cloudHash=serverHash.TopHash.ToHashString();
651
                if (localMD5==cloudHash)
652
                    return CompletedTask.Default;
653
                //Then check with a treehash
654
                var localTreeHash = Signature.CalculateTreeHash(localPath, serverHash.BlockSize, serverHash.BlockHash);
655
                var localHash = localTreeHash.TopHash.ToHashString();
656
                if (localHash==cloudHash)
657
                    return CompletedTask.Default;
658
            }
659

    
660
            var fileAgent = GetFileAgent(accountInfo);
661
            //Calculate the relative file path for the new file
662
            var relativePath = relativeUrl.RelativeUriToFilePath();
663
            //The file will be stored in a temporary location while downloading with an extension .download
664
            var tempPath = Path.Combine(fileAgent.CachePath, relativePath + ".download");
665
            //Make sure the target folder exists. DownloadFileTask will not create the folder
666
            var tempFolder = Path.GetDirectoryName(tempPath);
667
            if (!Directory.Exists(tempFolder))
668
                Directory.CreateDirectory(tempFolder);
669

    
670
            //Download the object to the temporary location
671
            var getObject = client.GetObject(cloudFile.Account, cloudFile.Container, relativeUrl.ToString(), tempPath).ContinueWith(t =>
672
            {
673
                t.PropagateExceptions();
674
                //Create the local folder if it doesn't exist (necessary for shared objects)
675
                var localFolder = Path.GetDirectoryName(localPath);
676
                if (!Directory.Exists(localFolder))
677
                    Directory.CreateDirectory(localFolder);
678
                //And move it to its actual location once downloading is finished
679
                if (File.Exists(localPath))
680
                    File.Replace(tempPath,localPath,null,true);
681
                else
682
                    File.Move(tempPath,localPath);
683
                //Notify listeners that a local file has changed
684
                StatusNotification.NotifyChangedFile(localPath);
685

    
686
            });
687
            return getObject;
688
        }
689

    
690
        //Download a file asynchronously using blocks
691
        public Task DownloadWithBlocks(AccountInfo accountInfo, CloudFilesClient client, ObjectInfo cloudFile, Uri relativeUrl, string localPath, TreeHash serverHash)
692
        {
693
            if (client == null)
694
                throw new ArgumentNullException("client");
695
            if (cloudFile == null)
696
                throw new ArgumentNullException("cloudFile");
697
            if (relativeUrl == null)
698
                throw new ArgumentNullException("relativeUrl");
699
            if (String.IsNullOrWhiteSpace(localPath))
700
                throw new ArgumentNullException("localPath");
701
            if (!Path.IsPathRooted(localPath))
702
                throw new ArgumentException("The localPath must be rooted", "localPath");
703
            if (serverHash == null)
704
                throw new ArgumentNullException("serverHash");
705
            Contract.EndContractBlock();
706
            
707
            return Task.Factory.Iterate(BlockDownloadIterator(accountInfo,client,cloudFile, relativeUrl, localPath, serverHash));
708
        }
709

    
710
        private IEnumerable<Task> BlockDownloadIterator(AccountInfo accountInfo,CloudFilesClient client,  ObjectInfo cloudFile, Uri relativeUrl, string localPath, TreeHash serverHash)
711
        {
712
            if (client == null)
713
                throw new ArgumentNullException("client");
714
            if (cloudFile==null)
715
                throw new ArgumentNullException("cloudFile");
716
            if (relativeUrl == null)
717
                throw new ArgumentNullException("relativeUrl");
718
            if (String.IsNullOrWhiteSpace(localPath))
719
                throw new ArgumentNullException("localPath");
720
            if (!Path.IsPathRooted(localPath))
721
                throw new ArgumentException("The localPath must be rooted", "localPath");
722
            if(serverHash==null)
723
                throw new ArgumentNullException("serverHash");
724
            Contract.EndContractBlock();
725
            
726
            var fileAgent = GetFileAgent(accountInfo);
727
            
728
            //Calculate the relative file path for the new file
729
            var relativePath = relativeUrl.RelativeUriToFilePath();
730
            var blockUpdater = new BlockUpdater(fileAgent.CachePath, localPath, relativePath, serverHash);
731

    
732
            
733
                        
734
            //Calculate the file's treehash
735
            var calcHash = Signature.CalculateTreeHashAsync(localPath, serverHash.BlockSize,serverHash.BlockHash);
736
            yield return calcHash;                        
737
            var treeHash = calcHash.Result;
738
                
739
            //And compare it with the server's hash
740
            var upHashes = serverHash.GetHashesAsStrings();
741
            var localHashes = treeHash.HashDictionary;
742
            for (int i = 0; i < upHashes.Length; i++)
743
            {
744
                //For every non-matching hash
745
                var upHash = upHashes[i];
746
                if (!localHashes.ContainsKey(upHash))
747
                {
748
                    if (blockUpdater.UseOrphan(i, upHash))
749
                    {
750
                        Log.InfoFormat("[BLOCK GET] ORPHAN FOUND for {0} of {1} for {2}", i, upHashes.Length, localPath);
751
                        continue;
752
                    }
753
                    Log.InfoFormat("[BLOCK GET] START {0} of {1} for {2}", i, upHashes.Length, localPath);
754
                    var start = i*serverHash.BlockSize;
755
                    //To download the last block just pass a null for the end of the range
756
                    long? end = null;
757
                    if (i < upHashes.Length - 1 )
758
                        end= ((i + 1)*serverHash.BlockSize) ;
759
                            
760
                    //Download the missing block
761
                    var getBlock = client.GetBlock(cloudFile.Account, cloudFile.Container, relativeUrl, start, end);
762
                    yield return getBlock;
763
                    var block = getBlock.Result;
764

    
765
                    //and store it
766
                    yield return blockUpdater.StoreBlock(i, block);
767

    
768

    
769
                    Log.InfoFormat("[BLOCK GET] FINISH {0} of {1} for {2}", i, upHashes.Length, localPath);
770
                }
771
            }
772

    
773
            //Want to avoid notifications if no changes were made
774
            var hasChanges = blockUpdater.HasBlocks;
775
            blockUpdater.Commit();
776
            
777
            if (hasChanges)
778
                //Notify listeners that a local file has changed
779
                StatusNotification.NotifyChangedFile(localPath);
780

    
781
            Log.InfoFormat("[BLOCK GET] COMPLETE {0}", localPath);            
782
        }
783

    
784

    
785
        private void UploadCloudFile(CloudAction action)
786
        {
787
            if (action == null)
788
                throw new ArgumentNullException("action");           
789
            Contract.EndContractBlock();
790

    
791
            try
792
            {
793
                var upload = Task.Factory.Iterate(UploadIterator(action));
794
                upload.Wait();
795
            }                
796
            catch (AggregateException ex)
797
            {                
798
                var exc = ex.InnerException as WebException;
799
                if (exc==null)
800
                    throw ex.InnerException;
801
                var response = exc.Response as HttpWebResponse;
802
                if (response==null)
803
                    throw exc;
804
                if (response.StatusCode == HttpStatusCode.Unauthorized)
805
                {
806
                    Log.Error("Not allowed to upload file", exc);
807
                    var message = String.Format("Not allowed to uplad file {0}",action.LocalFile.FullName);
808
                    StatusKeeper.SetFileState(action.LocalFile.FullName,FileStatus.Unchanged,FileOverlayStatus.Normal);
809
                    StatusNotification.NotifyChange(message,TraceLevel.Warning);
810
                    return;
811
                }
812
                throw;
813
            }
814
        }
815

    
816
        private IEnumerable<Task> UploadIterator(CloudAction action)
817
        {
818
            if (action == null)
819
                throw new ArgumentNullException("action");            
820
            Contract.EndContractBlock();
821

    
822
            var accountInfo=action.AccountInfo;
823
            
824
            var fileInfo=action.LocalFile;                        
825

    
826
            if (fileInfo.Extension.Equals("ignore", StringComparison.InvariantCultureIgnoreCase))
827
                yield break;
828

    
829
            var relativePath = fileInfo.AsRelativeTo(accountInfo.AccountPath);
830
            if (relativePath.StartsWith(FolderConstants.OthersFolder))
831
            {
832
                var parts=relativePath.Split('\\');
833
                var accountName = parts[1];
834
                var oldName = accountInfo.UserName;
835
                var absoluteUri = accountInfo.StorageUri.AbsoluteUri;
836
                var nameIndex=absoluteUri.IndexOf(oldName);
837
                var root=absoluteUri.Substring(0, nameIndex);
838

    
839
                accountInfo = new AccountInfo
840
                {
841
                    UserName = accountName,
842
                    AccountPath = Path.Combine(accountInfo.AccountPath, parts[0], parts[1]),
843
                    StorageUri = new Uri(root + accountName),
844
                    BlockHash=accountInfo.BlockHash,
845
                    BlockSize=accountInfo.BlockSize,
846
                    Token=accountInfo.Token
847
                };
848
            }
849

    
850

    
851
            var fullFileName = fileInfo.FullName;
852
            using(var gate=NetworkGate.Acquire(fullFileName,NetworkOperation.Uploading))
853
            {
854
                //Abort if the file is already being uploaded or downloaded
855
                if (gate.Failed)
856
                    yield break;
857

    
858
                var cloudFile = action.CloudFile;
859
                var account = cloudFile.Account ?? accountInfo.UserName;
860

    
861
                var client = new CloudFilesClient(accountInfo);
862
                //Even if GetObjectInfo times out, we can proceed with the upload            
863
                var info = client.GetObjectInfo(account, cloudFile.Container, cloudFile.Name);                
864
                var cloudHash = info.Hash.ToLower();
865

    
866
                var hash = action.LocalHash.Value;
867
                var topHash = action.TopHash.Value;
868

    
869
                //If the file hashes match, abort the upload
870
                if (hash == cloudHash  || topHash ==cloudHash)
871
                {
872
                    //but store any metadata changes 
873
                    this.StatusKeeper.StoreInfo(fullFileName, info);
874
                    Log.InfoFormat("Skip upload of {0}, hashes match", fullFileName);
875
                    yield break;
876
                }
877

    
878
                if (info.AllowedTo=="read")
879
                    yield break;
880

    
881
                //Mark the file as modified while we upload it
882
                StatusKeeper.SetFileOverlayStatus(fullFileName, FileOverlayStatus.Modified);
883
                //And then upload it
884

    
885
                //Upload even small files using the Hashmap. The server may already containt
886
                //the relevant folder
887

    
888
                //First, calculate the tree hash
889
                var treeHash = Signature.CalculateTreeHashAsync(fileInfo.FullName, accountInfo.BlockSize,
890
                    accountInfo.BlockHash);
891
                yield return treeHash;
892
                    
893
                yield return Task.Factory.Iterate(UploadWithHashMap(accountInfo,cloudFile,fileInfo,cloudFile.Name,treeHash));
894

    
895
                //If everything succeeds, change the file and overlay status to normal
896
                this.StatusKeeper.SetFileState(fullFileName, FileStatus.Unchanged, FileOverlayStatus.Normal);
897
            }
898
            //Notify the Shell to update the overlays
899
            NativeMethods.RaiseChangeNotification(fullFileName);
900
            StatusNotification.NotifyChangedFile(fullFileName);
901
        }
902

    
903
        public IEnumerable<Task> UploadWithHashMap(AccountInfo accountInfo,ObjectInfo cloudFile,FileInfo fileInfo,string url,Task<TreeHash> treeHash)
904
        {
905
            if (accountInfo == null)
906
                throw new ArgumentNullException("accountInfo");
907
            if (cloudFile==null)
908
                throw new ArgumentNullException("cloudFile");
909
            if (fileInfo == null)
910
                throw new ArgumentNullException("fileInfo");
911
            if (String.IsNullOrWhiteSpace(url))
912
                throw new ArgumentNullException(url);
913
            if (treeHash==null)
914
                throw new ArgumentNullException("treeHash");
915
            if (String.IsNullOrWhiteSpace(cloudFile.Container) )
916
                throw new ArgumentException("Invalid container","cloudFile");
917
            Contract.EndContractBlock();
918

    
919
            var fullFileName = fileInfo.FullName;
920

    
921
            var account = cloudFile.Account ?? accountInfo.UserName;
922
            var container = cloudFile.Container ;//?? FolderConstants.PithosContainer;
923

    
924
            var client = new CloudFilesClient(accountInfo);
925
            //Send the hashmap to the server            
926
            var hashPut = client.PutHashMap(account, container, url, treeHash.Result);
927
            yield return hashPut;
928

    
929
            var missingHashes = hashPut.Result;
930
            //If the server returns no missing hashes, we are done
931
            while (missingHashes.Count > 0)
932
            {
933

    
934
                var buffer = new byte[accountInfo.BlockSize];
935
                foreach (var missingHash in missingHashes)
936
                {
937
                    //Find the proper block
938
                    var blockIndex = treeHash.Result.HashDictionary[missingHash];
939
                    var offset = blockIndex*accountInfo.BlockSize;
940

    
941
                    var read = fileInfo.Read(buffer, offset, accountInfo.BlockSize);
942

    
943
                    //And upload the block                
944
                    var postBlock = client.PostBlock(account, container, buffer, 0, read);
945

    
946
                    //We have to handle possible exceptions in a continuation because
947
                    //*yield return* can't appear inside a try block
948
                    yield return postBlock.ContinueWith(t => 
949
                        t.ReportExceptions(
950
                            exc => Log.ErrorFormat("[ERROR] uploading block {0} of {1}\n{2}", blockIndex, fullFileName, exc),
951
                            ()=>Log.InfoFormat("[BLOCK] Block {0} of {1} uploaded", blockIndex,fullFileName)));
952
                }
953

    
954
                //Repeat until there are no more missing hashes
955
                hashPut = client.PutHashMap(account, container, url, treeHash.Result);
956
                yield return hashPut;
957
                missingHashes = hashPut.Result;
958
            }
959
        }
960

    
961

    
962
        public void AddAccount(AccountInfo accountInfo)
963
        {            
964
            if (!_accounts.Contains(accountInfo))
965
                _accounts.Add(accountInfo);
966
        }
967
    }
968

    
969
   
970

    
971

    
972
}