Statistics
| Branch: | Revision:

root / trunk / Pithos.Core / Agents / NetworkAgent.cs @ c53aa229

History | View | Annotate | Download (40.6 kB)

1
using System;
2
using System.Collections.Generic;
3
using System.ComponentModel.Composition;
4
using System.Diagnostics;
5
using System.Diagnostics.Contracts;
6
using System.IO;
7
using System.Linq;
8
using System.Net;
9
using System.Text;
10
using System.Threading;
11
using System.Threading.Tasks;
12
using Pithos.Interfaces;
13
using Pithos.Network;
14
using log4net;
15

    
16
namespace Pithos.Core.Agents
17
{
18
    [Export]
19
    public class NetworkAgent
20
    {
21
        private Agent<CloudAction> _agent;
22

    
23
        [Import]
24
        public IStatusKeeper StatusKeeper { get; set; }
25
        
26
        public IStatusNotification StatusNotification { get; set; }
27
        [Import]
28
        public FileAgent FileAgent {get;set;}
29

    
30
       /* public int BlockSize { get; set; }
31
        public string BlockHash { get; set; }*/
32

    
33
        private static readonly ILog Log = LogManager.GetLogger("NetworkAgent");
34

    
35
        private List<AccountInfo> _accounts=new List<AccountInfo>();
36

    
37
        public void Start(/*int blockSize, string blockHash*/)
38
        {
39
/*
40
            if (blockSize<0)
41
                throw new ArgumentOutOfRangeException("blockSize");
42
            if (String.IsNullOrWhiteSpace(blockHash))
43
                throw new ArgumentOutOfRangeException("blockHash");
44
            Contract.EndContractBlock();
45
*/
46

    
47
/*
48
            BlockSize = blockSize;
49
            BlockHash = blockHash;
50
*/
51

    
52

    
53
            _agent = Agent<CloudAction>.Start(inbox =>
54
            {
55
                Action loop = null;
56
                loop = () =>
57
                {
58
                    var message = inbox.Receive();
59
                    var process=message.Then(Process,inbox.CancellationToken);
60
                    inbox.LoopAsync(process, loop);
61
                };
62
                loop();
63
            });
64
        }
65

    
66
        private Task<object> Process(CloudAction action)
67
        {
68
            if (action == null)
69
                throw new ArgumentNullException("action");
70
            if (action.AccountInfo==null)
71
                throw new ArgumentException("The action.AccountInfo is empty","action");
72
            Contract.EndContractBlock();
73

    
74
            var accountInfo = action.AccountInfo;
75

    
76
            using (log4net.ThreadContext.Stacks["NETWORK"].Push("PROCESS"))
77
            {                
78
                Log.InfoFormat("[ACTION] Start Processing {0}:{1}->{2}", action.Action, action.LocalFile,
79
                               action.CloudFile.Name);
80

    
81
                var localFile = action.LocalFile;
82
                var cloudFile = action.CloudFile;                
83
                var downloadPath = (cloudFile == null)
84
                                       ? String.Empty
85
                                       : Path.Combine(accountInfo.AccountPath, cloudFile.RelativeUrlToFilePath(accountInfo.UserName));
86

    
87
                try
88
                {
89
                    var account = action.CloudFile.Account ?? accountInfo.UserName;
90
                    var container = action.CloudFile.Container ?? FolderConstants.PithosContainer;
91

    
92
                    switch (action.Action)
93
                    {
94
                        case CloudActionType.UploadUnconditional:
95
                            UploadCloudFile(accountInfo,account, container, localFile, action.LocalHash.Value, action.TopHash.Value);
96
                            break;
97
                        case CloudActionType.DownloadUnconditional:
98

    
99
                            DownloadCloudFile(accountInfo, account, container, new Uri(cloudFile.Name, UriKind.Relative),
100
                                              downloadPath);
101
                            break;
102
                        case CloudActionType.DeleteCloud:
103
                            DeleteCloudFile(accountInfo, account, container, cloudFile.Name);
104
                            break;
105
                        case CloudActionType.RenameCloud:
106
                            var moveAction = (CloudMoveAction)action;
107
                            RenameCloudFile(accountInfo, account, container, moveAction.OldFileName, moveAction.NewPath,
108
                                            moveAction.NewFileName);
109
                            break;
110
                        case CloudActionType.MustSynch:
111

    
112
                            if (!File.Exists(downloadPath))
113
                            {
114
                                var cloudUri = new Uri(action.CloudFile.Name, UriKind.Relative);
115
                                DownloadCloudFile(accountInfo, account, container, cloudUri, downloadPath);
116
                            }
117
                            else
118
                            {
119
                                SyncFiles(accountInfo, action);
120
                            }
121
                            break;
122
                    }
123
                    Log.InfoFormat("[ACTION] End Processing {0}:{1}->{2}", action.Action, action.LocalFile,
124
                                           action.CloudFile.Name);
125
                }
126
                catch (OperationCanceledException)
127
                {
128
                    throw;
129
                }
130
                catch (FileNotFoundException exc)
131
                {
132
                    Log.ErrorFormat("{0} : {1} -> {2}  failed because the file was not found.\n Rescheduling a delete",
133
                        action.Action, action.LocalFile, action.CloudFile, exc);
134
                    Post(new CloudDeleteAction(accountInfo,action.CloudFile,action.FileState));
135
                }
136
                catch (Exception exc)
137
                {
138
                    Log.ErrorFormat("[REQUEUE] {0} : {1} -> {2} due to exception\r\n{3}",
139
                                     action.Action, action.LocalFile, action.CloudFile, exc);
140

    
141
                    _agent.Post(action);
142
                }
143
                return CompletedTask<object>.Default;
144
            }
145
        }
146

    
147
        private void SyncFiles(AccountInfo accountInfo,CloudAction action)
148
        {
149
            if (accountInfo == null)
150
                throw new ArgumentNullException("accountInfo");
151
            if (action==null)
152
                throw new ArgumentNullException("action");
153
            if (action.LocalFile==null)
154
                throw new ArgumentException("The action's local file is not specified","action");
155
            if (!Path.IsPathRooted(action.LocalFile.FullName))
156
                throw new ArgumentException("The action's local file path must be absolute","action");
157
            if (action.CloudFile== null)
158
                throw new ArgumentException("The action's cloud file is not specified", "action");
159
            Contract.EndContractBlock();
160

    
161
            var localFile = action.LocalFile;
162
            var cloudFile = action.CloudFile;
163
            var downloadPath=action.LocalFile.FullName.ToLower();
164

    
165
            var account = cloudFile.Account;
166
            //Use "pithos" by default if no container is specified
167
            var container = cloudFile.Container ?? FolderConstants.PithosContainer;
168

    
169
            var cloudUri = new Uri(cloudFile.Name, UriKind.Relative);
170
            var cloudHash = cloudFile.Hash.ToLower();
171
            var localHash = action.LocalHash.Value.ToLower();
172
            var topHash = action.TopHash.Value.ToLower();
173

    
174
            //Not enough to compare only the local hashes, also have to compare the tophashes
175
            
176
            //If any of the hashes match, we are done
177
            if ((cloudHash == localHash || cloudHash == topHash))
178
            {
179
                Log.InfoFormat("Skipping {0}, hashes match",downloadPath);
180
                return;
181
            }
182

    
183
            //The hashes DON'T match. We need to sync
184
            var lastLocalTime = localFile.LastWriteTime;
185
            var lastUpTime = cloudFile.Last_Modified;
186
            
187
            //If the local file is newer upload it
188
            if (lastUpTime <= lastLocalTime)
189
            {
190
                //It probably means it was changed while the app was down                        
191
                UploadCloudFile(accountInfo,account, container, localFile, action.LocalHash.Value,
192
                                action.TopHash.Value);
193
            }
194
            else
195
            {
196
                //It the cloud file has a later date, it was modified by another user or computer.
197
                //We need to check the local file's status                
198
                var status = StatusKeeper.GetFileStatus(downloadPath);
199
                switch (status)
200
                {
201
                    case FileStatus.Unchanged:                        
202
                        //If the local file's status is Unchanged, we can go on and download the newer cloud file
203
                        DownloadCloudFile(accountInfo,account, container,cloudUri,downloadPath);
204
                        break;
205
                    case FileStatus.Modified:
206
                        //If the local file is Modified, we may have a conflict. In this case we should mark the file as Conflict
207
                        //We can't ensure that a file modified online since the last time will appear as Modified, unless we 
208
                        //index all files before we start listening.                       
209
                    case FileStatus.Created:
210
                        //If the local file is Created, it means that the local and cloud files aren't related,
211
                        // yet they have the same name.
212

    
213
                        //In both cases we must mark the file as in conflict
214
                        ReportConflict(downloadPath);
215
                        break;
216
                    default:
217
                        //Other cases should never occur. Mark them as Conflict as well but log a warning
218
                        ReportConflict(downloadPath);
219
                        Log.WarnFormat("Unexcepted status {0} for file {1}->{2}", status,
220
                                       downloadPath, action.CloudFile.Name);
221
                        break;
222
                }
223
            }
224
        }
225

    
226
        private void ReportConflict(string downloadPath)
227
        {
228
            if (String.IsNullOrWhiteSpace(downloadPath))
229
                throw new ArgumentNullException("downloadPath");
230
            Contract.EndContractBlock();
231

    
232
            StatusKeeper.SetFileOverlayStatus(downloadPath, FileOverlayStatus.Conflict);
233
            var message = String.Format("Conflict detected for file {0}", downloadPath);
234
            Log.Warn(message);
235
            StatusNotification.NotifyChange(message, TraceLevel.Warning);
236
        }
237

    
238
/*
239
        private Task<object> Process(CloudMoveAction action)
240
        {
241
            if (action == null)
242
                throw new ArgumentNullException("action");
243
            Contract.EndContractBlock();
244

    
245
            Log.InfoFormat("[ACTION] Start Processing {0}:{1}->{2}", action.Action, action.LocalFile, action.CloudFile.Name);
246

    
247
            try
248
            {
249
                RenameCloudFile(action.OldFileName, action.NewPath, action.NewFileName);
250
                Log.InfoFormat("[ACTION] End Processing {0}:{1}->{2}", action.Action, action.LocalFile, action.CloudFile.Name);
251
            }
252
            catch (OperationCanceledException)
253
            {
254
                throw;
255
            }
256
            catch (Exception exc)
257
            {
258
                Log.ErrorFormat("[REQUEUE] {0} : {1} -> {2} due to exception\r\n{3}",
259
                                action.Action, action.OldFileName, action.NewFileName, exc);
260

    
261
                _agent.Post(action);
262
            }
263
            return CompletedTask<object>.Default;
264
        }
265
*/
266

    
267

    
268
        public void Post(CloudAction cloudAction)
269
        {
270
            if (cloudAction == null)
271
                throw new ArgumentNullException("cloudAction");
272
            if (cloudAction.AccountInfo==null)
273
                throw new ArgumentException("The CloudAction.AccountInfo is empty","cloudAction");
274
            Contract.EndContractBlock();
275
            
276
            //If the action targets a local file, add a treehash calculation
277
            if (cloudAction.LocalFile != null)
278
            {
279
                var accountInfo = cloudAction.AccountInfo;
280
                if (cloudAction.LocalFile.Length>accountInfo.BlockSize)
281
                    cloudAction.TopHash = new Lazy<string>(() => Signature.CalculateTreeHashAsync(cloudAction.LocalFile,
282
                                    accountInfo.BlockSize, accountInfo.BlockHash).Result
283
                                     .TopHash.ToHashString());
284
                else
285
                {
286
                    cloudAction.TopHash=new Lazy<string>(()=> cloudAction.LocalHash.Value);
287
                }
288

    
289
            }
290
            _agent.Post(cloudAction);
291
        }
292

    
293
        class ObjectInfoByNameComparer:IEqualityComparer<ObjectInfo>
294
        {
295
            public bool Equals(ObjectInfo x, ObjectInfo y)
296
            {
297
                return x.Name.Equals(y.Name,StringComparison.InvariantCultureIgnoreCase);
298
            }
299

    
300
            public int GetHashCode(ObjectInfo obj)
301
            {
302
                return obj.Name.ToLower().GetHashCode();
303
            }
304
        }
305

    
306
        
307

    
308
        //Remote files are polled periodically. Any changes are processed
309
        public Task ProcessRemoteFiles(DateTime? since=null)
310
        {
311
            return Task<Task>.Factory.StartNewDelayed(10000, () =>
312
            {
313
                using (log4net.ThreadContext.Stacks["Retrieve Remote"].Push("All accounts"))
314
                {
315
                    //Next time we will check for all changes since the current check minus 1 second
316
                    //This is done to ensure there are no discrepancies due to clock differences
317
                    DateTime nextSince = DateTime.Now.AddSeconds(-1);
318
                    
319
                    var tasks=from accountInfo in _accounts
320
                              select ProcessAccountFiles(accountInfo, since);
321
                    var process=Task.Factory.Iterate(tasks);
322

    
323
                    return process.ContinueWith(t=>
324
                        ProcessRemoteFiles(nextSince));
325
                }
326
            });            
327
        }
328

    
329
        public Task ProcessAccountFiles(AccountInfo accountInfo,DateTime? since=null)
330
        {   
331
            if (accountInfo==null)
332
                throw new ArgumentNullException("accountInfo");
333
            if (String.IsNullOrWhiteSpace(accountInfo.AccountPath))
334
                throw new ArgumentException("The AccountInfo.AccountPath is empty","accountInfo");
335
            Contract.EndContractBlock();
336

    
337
            using (log4net.ThreadContext.Stacks["Retrieve Remote"].Push(accountInfo.UserName))
338
            {
339
                Log.Info("Scheduled");
340
                var client=new CloudFilesClient(accountInfo);
341

    
342
                //Get the list of server objects changed since the last check
343
                var listObjects = Task<IList<ObjectInfo>>.Factory.StartNew(() =>
344
                                client.ListObjects(accountInfo.UserName, FolderConstants.PithosContainer, since));
345
                //Get the list of deleted objects since the last check
346
                var listTrash = Task<IList<ObjectInfo>>.Factory.StartNew(() =>
347
                                client.ListObjects(accountInfo.UserName, FolderConstants.TrashContainer, since));
348

    
349
                var listShared = Task<IList<ObjectInfo>>.Factory.StartNew(() =>
350
                                client.ListSharedObjects(since));
351

    
352
                var listAll = Task.Factory.TrackedSequence(
353
                    () => listObjects,
354
                    () => listTrash,
355
                    () => listShared);
356

    
357

    
358

    
359
                var enqueueFiles = listAll.ContinueWith(task =>
360
                {
361
                    if (task.IsFaulted)
362
                    {
363
                        //ListObjects failed at this point, need to reschedule
364
                        Log.ErrorFormat("[FAIL] ListObjects for{0} in ProcessRemoteFiles with {0}", accountInfo.UserName,task.Exception);
365
                        return;
366
                    }
367
                    using (log4net.ThreadContext.Stacks["SCHEDULE"].Push("Process Results"))
368
                    {
369
                        var remoteObjects = ((Task<IList<ObjectInfo>>) task.Result[0]).Result;
370
                        var trashObjects = ((Task<IList<ObjectInfo>>) task.Result[1]).Result;
371
                        var sharedObjects = ((Task<IList<ObjectInfo>>) task.Result[2]).Result;
372

    
373
                        //Items with the same name, hash may be both in the container and the trash
374
                        //Don't delete items that exist in the container
375
                        var realTrash = from trash in trashObjects
376
                                        where !remoteObjects.Any(info => info.Hash == trash.Hash)
377
                                        select trash;
378
                        ProcessDeletedFiles(accountInfo,realTrash);                        
379

    
380

    
381
                        var remote = from info in remoteObjects.Union(sharedObjects)
382
                                     let name = info.Name
383
                                     where !name.EndsWith(".ignore", StringComparison.InvariantCultureIgnoreCase) &&
384
                                           !name.StartsWith("fragments/", StringComparison.InvariantCultureIgnoreCase)
385
                                     select info;
386

    
387
                        //Create a list of actions from the remote files
388
                        var allActions = ObjectsToActions(accountInfo,remote);
389
                       
390
                        //And remove those that are already being processed by the agent
391
                        var distinctActions = allActions
392
                            .Except(_agent.GetEnumerable(), new PithosMonitor.LocalFileComparer())
393
                            .ToList();
394

    
395
                        //Queue all the actions
396
                        foreach (var message in distinctActions)
397
                        {
398
                            Post(message);
399
                        }
400

    
401
                        //Report the number of new files
402
                        var remoteCount = distinctActions.Count(action=>
403
                            action.Action==CloudActionType.DownloadUnconditional);
404
                        if ( remoteCount > 0)
405
                            StatusNotification.NotifyChange(String.Format("Processing {0} new files", remoteCount));
406

    
407
                        Log.Info("[LISTENER] End Processing");                        
408
                    }
409
                });
410

    
411
                var log = enqueueFiles.ContinueWith(t =>
412
                {                    
413
                    if (t.IsFaulted)
414
                    {
415
                        Log.Error("[LISTENER] Exception", t.Exception);
416
                    }
417
                    else
418
                    {
419
                        Log.Info("[LISTENER] Finished");
420
                    }
421
                });
422
                return log;
423
            }
424
        }
425

    
426
        //Creates an appropriate action for each server file
427
        private IEnumerable<CloudAction> ObjectsToActions(AccountInfo accountInfo,IEnumerable<ObjectInfo> remote)
428
        {
429
            if (remote==null)
430
                throw new ArgumentNullException();
431
            Contract.EndContractBlock();
432

    
433
            //In order to avoid multiple iterations over the files, we iterate only once
434
            //over the remote files
435
            foreach (var objectInfo in remote)
436
            {
437
                var relativePath = objectInfo.RelativeUrlToFilePath(accountInfo.UserName);
438
                //and remove any matching objects from the list, adding them to the commonObjects list
439
                if (FileAgent.Exists(relativePath))
440
                {
441
                    var localFile = FileAgent.GetFileInfo(relativePath);
442
                    var state = FileState.FindByFilePath(localFile.FullName);
443
                    //Common files should be checked on a per-case basis to detect differences, which is newer
444

    
445
                    yield return new CloudAction(accountInfo,CloudActionType.MustSynch,
446
                                                   localFile, objectInfo, state, accountInfo.BlockSize,
447
                                                   accountInfo.BlockHash);
448
                }
449
                else
450
                {
451
                    //If there is no match we add them to the localFiles list
452
                    //but only if the file is not marked for deletion
453
                    var targetFile = Path.Combine(accountInfo.AccountPath, relativePath);
454
                    var fileStatus = StatusKeeper.GetFileStatus(targetFile);
455
                    if (fileStatus != FileStatus.Deleted)
456
                    {
457
                        //Remote files should be downloaded
458
                        yield return new CloudDownloadAction(accountInfo,objectInfo);
459
                    }
460
                }
461
            }            
462
        }
463

    
464
        private void ProcessDeletedFiles(AccountInfo accountInfo,IEnumerable<ObjectInfo> trashObjects)
465
        {
466
            foreach (var trashObject in trashObjects)
467
            {
468
                var relativePath = trashObject.RelativeUrlToFilePath(accountInfo.UserName);
469
                //and remove any matching objects from the list, adding them to the commonObjects list
470
                FileAgent.Delete(relativePath);                                
471
            }
472
        }
473

    
474

    
475
        private void RenameCloudFile(AccountInfo accountInfo,string account, string container,string oldFileName, string newPath, string newFileName)
476
        {
477
            if (accountInfo==null)
478
                throw new ArgumentNullException("accountInfo");
479
            if (String.IsNullOrWhiteSpace(account))
480
                throw new ArgumentNullException("account");
481
            if (String.IsNullOrWhiteSpace(container))
482
                throw new ArgumentNullException("container");
483
            if (String.IsNullOrWhiteSpace(oldFileName))
484
                throw new ArgumentNullException("oldFileName");
485
            if (String.IsNullOrWhiteSpace(oldFileName))
486
                throw new ArgumentNullException("newPath");
487
            if (String.IsNullOrWhiteSpace(oldFileName))
488
                throw new ArgumentNullException("newFileName");
489
            Contract.EndContractBlock();
490
            //The local file is already renamed
491
            this.StatusKeeper.SetFileOverlayStatus(newPath, FileOverlayStatus.Modified);
492

    
493
            var client = new CloudFilesClient(accountInfo);
494
            client.MoveObject(account, container, oldFileName, container, newFileName);
495

    
496
            this.StatusKeeper.SetFileStatus(newPath, FileStatus.Unchanged);
497
            this.StatusKeeper.SetFileOverlayStatus(newPath, FileOverlayStatus.Normal);
498
            NativeMethods.RaiseChangeNotification(newPath);
499
        }
500

    
501
        private void DeleteCloudFile(AccountInfo accountInfo, string account, string container, string fileName)
502
        {
503
            if (accountInfo == null)
504
                throw new ArgumentNullException("accountInfo");
505
            if (String.IsNullOrWhiteSpace(account))
506
                throw new ArgumentNullException("account");
507
            if (String.IsNullOrWhiteSpace(container))
508
                throw new ArgumentNullException("container");
509
            if (String.IsNullOrWhiteSpace(container))
510
                throw new ArgumentNullException("container");
511

    
512
            if (String.IsNullOrWhiteSpace(fileName))
513
                throw new ArgumentNullException("fileName");
514
            if (Path.IsPathRooted(fileName))
515
                throw new ArgumentException("The fileName should not be rooted","fileName");
516
            Contract.EndContractBlock();
517

    
518
            using ( log4net.ThreadContext.Stacks["DeleteCloudFile"].Push("Delete"))
519
            {
520
                var info = FileAgent.GetFileInfo(fileName);
521
                var fullPath = info.FullName.ToLower();
522
                this.StatusKeeper.SetFileOverlayStatus(fullPath, FileOverlayStatus.Modified);
523

    
524
                var client = new CloudFilesClient(accountInfo);
525
                client.DeleteObject(account, container, fileName);
526

    
527
                this.StatusKeeper.ClearFileStatus(fullPath);
528
            }
529
        }
530

    
531
        //Download a file.
532
        private void DownloadCloudFile(AccountInfo accountInfo, string account, string container, Uri relativeUrl, string localPath)
533
        {
534
            if (accountInfo == null)
535
                throw new ArgumentNullException("accountInfo");
536
            if (String.IsNullOrWhiteSpace(account))
537
                throw new ArgumentNullException("account");
538
            if (String.IsNullOrWhiteSpace(container))
539
                throw new ArgumentNullException("container");
540
            if (relativeUrl == null)
541
                throw new ArgumentNullException("relativeUrl");
542
            if (String.IsNullOrWhiteSpace(localPath))
543
                throw new ArgumentNullException("localPath");
544
            if (!Path.IsPathRooted(localPath))
545
                throw new ArgumentException("The localPath must be rooted", "localPath");
546
            Contract.EndContractBlock();
547
            
548
            var download=Task.Factory.Iterate(DownloadIterator(accountInfo,account,container, relativeUrl, localPath));
549
            download.Wait();
550
        }
551

    
552
        private IEnumerable<Task> DownloadIterator(AccountInfo accountInfo, string account, string container, Uri relativeUrl, string localPath)
553
        {
554
            if (accountInfo == null)
555
                throw new ArgumentNullException("accountInfo");
556
            if (String.IsNullOrWhiteSpace(account))
557
                throw new ArgumentNullException("account");
558
            if (String.IsNullOrWhiteSpace(container))
559
                throw new ArgumentNullException("container");
560
            if (relativeUrl==null)
561
                throw new ArgumentNullException("relativeUrl");
562
            if (String.IsNullOrWhiteSpace(localPath))
563
                throw new ArgumentNullException("localPath");
564
            if (!Path.IsPathRooted(localPath))
565
                throw new ArgumentException("The localPath must be rooted", "localPath");
566
            Contract.EndContractBlock();
567

    
568
            var url = relativeUrl.ToString();
569
            if (url.EndsWith(".ignore",StringComparison.InvariantCultureIgnoreCase))
570
                yield break;
571

    
572
            //Are we already downloading or uploading the file? 
573
            using (var gate=NetworkGate.Acquire(localPath, NetworkOperation.Downloading))
574
            {
575
                if (gate.Failed)
576
                    yield break;
577
                //The file's hashmap will be stored in the same location with the extension .hashmap
578
                //var hashPath = Path.Combine(FileAgent.FragmentsPath, relativePath + ".hashmap");
579
                
580
                var client = new CloudFilesClient(accountInfo);
581
                //Retrieve the hashmap from the server
582
                var getHashMap = client.GetHashMap(account, container, url);
583
                yield return getHashMap;
584
                
585
                var serverHash=getHashMap.Result;
586
                //If it's a small file
587
                var downloadTask=(serverHash.Hashes.Count == 1 )
588
                    //Download it in one go
589
                    ? DownloadEntireFile(client, account, container, relativeUrl, localPath) 
590
                    //Otherwise download it block by block
591
                    : DownloadWithBlocks(client, account, container, relativeUrl, localPath, serverHash);
592

    
593
                yield return downloadTask;
594

    
595

    
596
                //Retrieve the object's metadata
597
                var info=client.GetObjectInfo(account, container, url);
598
                //And store it
599
                StatusKeeper.StoreInfo(localPath, info);
600
                
601
                //Notify listeners that a local file has changed
602
                StatusNotification.NotifyChangedFile(localPath);
603

    
604
            }
605
        }
606

    
607
        //Download a small file with a single GET operation
608
        private Task DownloadEntireFile(CloudFilesClient client, string account, string container, Uri relativeUrl, string localPath)
609
        {
610
            if (client == null)
611
                throw new ArgumentNullException("client");
612
            if (String.IsNullOrWhiteSpace(account))
613
                throw new ArgumentNullException("account");
614
            if (String.IsNullOrWhiteSpace(container))
615
                throw new ArgumentNullException("container");
616
            if (relativeUrl == null)
617
                throw new ArgumentNullException("relativeUrl");
618
            if (String.IsNullOrWhiteSpace(localPath))
619
                throw new ArgumentNullException("localPath");
620
            if (!Path.IsPathRooted(localPath))
621
                throw new ArgumentException("The localPath must be rooted", "localPath");
622
            Contract.EndContractBlock();
623

    
624
            //Calculate the relative file path for the new file
625
            var relativePath = relativeUrl.RelativeUriToFilePath();
626
            //The file will be stored in a temporary location while downloading with an extension .download
627
            var tempPath = Path.Combine(FileAgent.FragmentsPath, relativePath + ".download");
628
            //Make sure the target folder exists. DownloadFileTask will not create the folder
629
            var directoryPath = Path.GetDirectoryName(tempPath);
630
            if (!Directory.Exists(directoryPath))
631
                Directory.CreateDirectory(directoryPath);
632

    
633
            //Download the object to the temporary location
634
            var getObject = client.GetObject(account, container, relativeUrl.ToString(), tempPath).ContinueWith(t =>
635
            {
636
                t.PropagateExceptions();
637
                //And move it to its actual location once downloading is finished
638
                if (File.Exists(localPath))
639
                    File.Replace(tempPath,localPath,null,true);
640
                else
641
                    File.Move(tempPath,localPath);
642
            });
643
            return getObject;
644
        }
645

    
646
        //Download a file asynchronously using blocks
647
        public Task DownloadWithBlocks(CloudFilesClient client, string account, string container, Uri relativeUrl, string localPath, TreeHash serverHash)
648
        {
649
            if (client == null)
650
                throw new ArgumentNullException("client");
651
            if (String.IsNullOrWhiteSpace(account))
652
                throw new ArgumentNullException("account");
653
            if (String.IsNullOrWhiteSpace(container))
654
                throw new ArgumentNullException("container");
655
            if (relativeUrl == null)
656
                throw new ArgumentNullException("relativeUrl");
657
            if (String.IsNullOrWhiteSpace(localPath))
658
                throw new ArgumentNullException("localPath");
659
            if (!Path.IsPathRooted(localPath))
660
                throw new ArgumentException("The localPath must be rooted", "localPath");
661
            if (serverHash == null)
662
                throw new ArgumentNullException("serverHash");
663
            Contract.EndContractBlock();
664
            
665
            return Task.Factory.Iterate(BlockDownloadIterator(client,account,container, relativeUrl, localPath, serverHash));
666
        }
667

    
668
        private IEnumerable<Task> BlockDownloadIterator(CloudFilesClient client, string account, string container, Uri relativeUrl, string localPath, TreeHash serverHash)
669
        {
670
            if (client == null)
671
                throw new ArgumentNullException("client");
672
            if (String.IsNullOrWhiteSpace(account))
673
                throw new ArgumentNullException("account");
674
            if (String.IsNullOrWhiteSpace(container))
675
                throw new ArgumentNullException("container");
676
            if (relativeUrl == null)
677
                throw new ArgumentNullException("relativeUrl");
678
            if (String.IsNullOrWhiteSpace(localPath))
679
                throw new ArgumentNullException("localPath");
680
            if (!Path.IsPathRooted(localPath))
681
                throw new ArgumentException("The localPath must be rooted", "localPath");
682
            if(serverHash==null)
683
                throw new ArgumentNullException("serverHash");
684
            Contract.EndContractBlock();
685

    
686
            
687
            //Calculate the relative file path for the new file
688
            var relativePath = relativeUrl.RelativeUriToFilePath();
689
            var blockUpdater = new BlockUpdater(FileAgent.FragmentsPath, localPath, relativePath, serverHash);
690

    
691
            
692
                        
693
            //Calculate the file's treehash
694
            var calcHash = Signature.CalculateTreeHashAsync(localPath, serverHash.BlockSize,serverHash.BlockHash);
695
            yield return calcHash;                        
696
            var treeHash = calcHash.Result;
697
                
698
            //And compare it with the server's hash
699
            var upHashes = serverHash.GetHashesAsStrings();
700
            var localHashes = treeHash.HashDictionary;
701
            for (int i = 0; i < upHashes.Length; i++)
702
            {
703
                //For every non-matching hash
704
                var upHash = upHashes[i];
705
                if (!localHashes.ContainsKey(upHash))
706
                {
707
                    if (blockUpdater.UseOrphan(i, upHash))
708
                    {
709
                        Log.InfoFormat("[BLOCK GET] ORPHAN FOUND for {0} of {1} for {2}", i, upHashes.Length, localPath);
710
                        continue;
711
                    }
712
                    Log.InfoFormat("[BLOCK GET] START {0} of {1} for {2}", i, upHashes.Length, localPath);
713
                    var start = i*serverHash.BlockSize;
714
                    //To download the last block just pass a null for the end of the range
715
                    long? end = null;
716
                    if (i < upHashes.Length - 1 )
717
                        end= ((i + 1)*serverHash.BlockSize) ;
718
                            
719
                    //Download the missing block
720
                    var getBlock = client.GetBlock(account, container, relativeUrl, start, end);
721
                    yield return getBlock;
722
                    var block = getBlock.Result;
723

    
724
                    //and store it
725
                    yield return blockUpdater.StoreBlock(i, block);
726

    
727

    
728
                    Log.InfoFormat("[BLOCK GET] FINISH {0} of {1} for {2}", i, upHashes.Length, localPath);
729
                }
730
            }
731

    
732
            blockUpdater.Commit();
733
            Log.InfoFormat("[BLOCK GET] COMPLETE {0}", localPath);            
734
        }
735

    
736

    
737
        private void UploadCloudFile(AccountInfo accountInfo, string account, string container, FileInfo fileInfo, string hash, string topHash)
738
        {
739
            if (accountInfo == null)
740
                throw new ArgumentNullException("accountInfo");
741
            if (String.IsNullOrWhiteSpace(account))
742
                throw new ArgumentNullException("account");
743
            if (String.IsNullOrWhiteSpace(container))
744
                throw new ArgumentNullException("container");
745
            if (fileInfo == null)
746
                throw new ArgumentNullException("fileInfo");
747
            if (String.IsNullOrWhiteSpace(hash))
748
                throw new ArgumentNullException("hash");
749
            if (topHash == null)
750
                throw new ArgumentNullException("topHash");
751
            Contract.EndContractBlock();
752

    
753
            var upload = Task.Factory.Iterate(UploadIterator(accountInfo,account,container,fileInfo, hash.ToLower(), topHash.ToLower()));
754
            upload.Wait();
755
        }
756

    
757
        private IEnumerable<Task> UploadIterator(AccountInfo accountInfo, string account, string container, FileInfo fileInfo, string hash, string topHash)
758
        {
759
            if (accountInfo == null)
760
                throw new ArgumentNullException("accountInfo");
761
            if (String.IsNullOrWhiteSpace(account))
762
                throw new ArgumentNullException("account");
763
            if (String.IsNullOrWhiteSpace(container))
764
                throw new ArgumentNullException("container");
765
            if (fileInfo == null)
766
                throw new ArgumentNullException("fileInfo");
767
            if (String.IsNullOrWhiteSpace(hash))
768
                throw new ArgumentNullException("hash");
769
            if (topHash == null)
770
                throw new ArgumentNullException("topHash");
771
            Contract.EndContractBlock();
772

    
773
            if (fileInfo.Extension.Equals("ignore", StringComparison.InvariantCultureIgnoreCase))
774
                yield break;
775
            
776
            var url = fileInfo.AsRelativeUrlTo(accountInfo.AccountPath);
777

    
778
            var fullFileName = fileInfo.FullName;
779
            using(var gate=NetworkGate.Acquire(fullFileName,NetworkOperation.Uploading))
780
            {
781
                //Abort if the file is already being uploaded or downloaded
782
                if (gate.Failed)
783
                    yield break;
784

    
785
                var client = new CloudFilesClient(accountInfo);
786
                //Even if GetObjectInfo times out, we can proceed with the upload            
787
                var info = client.GetObjectInfo(account, container, url);
788
                var cloudHash = info.Hash.ToLower();
789

    
790
                //If the file hashes match, abort the upload
791
                if (hash == cloudHash  || topHash ==cloudHash)
792
                {
793
                    //but store any metadata changes 
794
                    this.StatusKeeper.StoreInfo(fullFileName, info);
795
                    Log.InfoFormat("Skip upload of {0}, hashes match", fullFileName);
796
                    yield break;
797
                }
798

    
799
                //Mark the file as modified while we upload it
800
                StatusKeeper.SetFileOverlayStatus(fullFileName, FileOverlayStatus.Modified);
801
                //And then upload it
802

    
803
                //If the file is larger than the block size, try a hashmap PUT
804
                if (fileInfo.Length > accountInfo.BlockSize )
805
                {
806
                    //To upload using a hashmap
807
                    //First, calculate the tree hash
808
                    var treeHash = Signature.CalculateTreeHashAsync(fileInfo.FullName, accountInfo.BlockSize,
809
                        accountInfo.BlockHash);
810
                    yield return treeHash;
811
                    
812
                    yield return Task.Factory.Iterate(UploadWithHashMap(accountInfo,account,container,fileInfo,url,treeHash));
813
                                        
814
                }
815
                else
816
                {
817
                    //Otherwise do a regular PUT
818
                    yield return client.PutObject(account, container, url, fullFileName, hash);                    
819
                }
820
                //If everything succeeds, change the file and overlay status to normal
821
                this.StatusKeeper.SetFileState(fullFileName, FileStatus.Unchanged, FileOverlayStatus.Normal);
822
            }
823
            //Notify the Shell to update the overlays
824
            NativeMethods.RaiseChangeNotification(fullFileName);
825
            StatusNotification.NotifyChangedFile(fullFileName);
826
        }
827

    
828
        public IEnumerable<Task> UploadWithHashMap(AccountInfo accountInfo,string account,string container,FileInfo fileInfo,string url,Task<TreeHash> treeHash)
829
        {
830
            if (accountInfo == null)
831
                throw new ArgumentNullException("accountInfo");
832
            if (String.IsNullOrWhiteSpace(account))
833
                throw new ArgumentNullException("account");
834
            if (String.IsNullOrWhiteSpace(container))
835
                throw new ArgumentNullException("container");
836
            if (fileInfo == null)
837
                throw new ArgumentNullException("fileInfo");
838
            if (String.IsNullOrWhiteSpace(url))
839
                throw new ArgumentNullException(url);
840
            if (treeHash==null)
841
                throw new ArgumentNullException("treeHash");
842
            Contract.EndContractBlock();
843

    
844
            var fullFileName = fileInfo.FullName;
845

    
846
            var client = new CloudFilesClient(accountInfo);
847
            //Send the hashmap to the server            
848
            var hashPut = client.PutHashMap(account, container, url, treeHash.Result);
849
            yield return hashPut;
850

    
851
            var missingHashes = hashPut.Result;
852
            //If the server returns no missing hashes, we are done
853
            while (missingHashes.Count > 0)
854
            {
855

    
856
                var buffer = new byte[accountInfo.BlockSize];
857
                foreach (var missingHash in missingHashes)
858
                {
859
                    //Find the proper block
860
                    var blockIndex = treeHash.Result.HashDictionary[missingHash];
861
                    var offset = blockIndex*accountInfo.BlockSize;
862

    
863
                    var read = fileInfo.Read(buffer, offset, accountInfo.BlockSize);
864

    
865
                    //And upload the block                
866
                    var postBlock = client.PostBlock(account, container, buffer, 0, read);
867

    
868
                    //We have to handle possible exceptions in a continuation because
869
                    //*yield return* can't appear inside a try block
870
                    yield return postBlock.ContinueWith(t => 
871
                        t.ReportExceptions(
872
                            exc => Log.ErrorFormat("[ERROR] uploading block {0} of {1}\n{2}", blockIndex, fullFileName, exc),
873
                            ()=>Log.InfoFormat("[BLOCK] Block {0} of {1} uploaded", blockIndex,fullFileName)));
874
                }
875

    
876
                //Repeat until there are no more missing hashes
877
                hashPut = client.PutHashMap(account, container, url, treeHash.Result);
878
                yield return hashPut;
879
                missingHashes = hashPut.Result;
880
            }
881
        }
882

    
883

    
884
        public void AddAccount(AccountInfo accountInfo)
885
        {            
886
            if (!_accounts.Contains(accountInfo))
887
                _accounts.Add(accountInfo);
888
        }
889
    }
890

    
891
   
892

    
893

    
894
}