Statistics
| Branch: | Revision:

root / trunk / Pithos.Core / Agents / NetworkAgent.cs @ 5120f3cb

History | View | Annotate | Download (37.6 kB)

1
using System;
2
using System.Collections.Generic;
3
using System.ComponentModel.Composition;
4
using System.Diagnostics;
5
using System.Diagnostics.Contracts;
6
using System.IO;
7
using System.Linq;
8
using System.Net;
9
using System.Text;
10
using System.Threading;
11
using System.Threading.Tasks;
12
using Pithos.Interfaces;
13
using Pithos.Network;
14
using log4net;
15

    
16
namespace Pithos.Core.Agents
17
{
18
    [Export]
19
    public class NetworkAgent
20
    {
21
        private Agent<CloudAction> _agent;
22

    
23
        [Import]
24
        public IStatusKeeper StatusKeeper { get; set; }
25
        
26
        public IStatusNotification StatusNotification { get; set; }
27
        [Import]
28
        public ICloudClient CloudClient { get; set; }
29

    
30
        [Import]
31
        public FileAgent FileAgent {get;set;}
32

    
33
        /*
34
        [Import]
35
        public IPithosWorkflow Workflow { get; set; }
36
*/
37

    
38

    
39
        public string PithosContainer { get; set; }
40
        public string TrashContainer { get; private set; }
41
        public IList<string> Containers { get; private set; }
42

    
43
        public int BlockSize { get; set; }
44
        public string BlockHash { get; set; }
45

    
46
        private static readonly ILog Log = LogManager.GetLogger("NetworkAgent");
47

    
48

    
49
        public void Start(string pithosContainer, string trashContainer, int blockSize, string blockHash)
50
        {
51
            if (String.IsNullOrWhiteSpace(pithosContainer))
52
                throw new ArgumentNullException("pithosContainer");
53
            if (String.IsNullOrWhiteSpace(trashContainer))
54
                throw new ArgumentNullException("trashContainer");
55
            Contract.EndContractBlock();
56

    
57
            PithosContainer = pithosContainer;
58
            TrashContainer = trashContainer;
59
            BlockSize = blockSize;
60
            BlockHash = blockHash;
61

    
62

    
63
            _agent = Agent<CloudAction>.Start(inbox =>
64
            {
65
                Action loop = null;
66
                loop = () =>
67
                {
68
                    var message = inbox.Receive();
69
                    var process=message.Then(Process,inbox.CancellationToken);
70
                    inbox.LoopAsync(process, loop);
71
                };
72
                loop();
73
            });
74
        }
75

    
76
        private Task<object> Process(CloudAction action)
77
        {
78
            if (action == null)
79
                throw new ArgumentNullException("action");
80
            Contract.EndContractBlock();
81

    
82
            using (log4net.ThreadContext.Stacks["NETWORK"].Push("PROCESS"))
83
            {                
84
                Log.InfoFormat("[ACTION] Start Processing {0}:{1}->{2}", action.Action, action.LocalFile,
85
                               action.CloudFile.Name);
86

    
87
                var localFile = action.LocalFile;
88
                var cloudFile = action.CloudFile;
89
                var downloadPath = (cloudFile == null)
90
                                       ? String.Empty
91
                                       : Path.Combine(FileAgent.RootPath, cloudFile.RelativeUrlToFilePath(CloudClient.UserName));
92

    
93
                try
94
                {
95
                    var account = action.CloudFile.Account ?? CloudClient.UserName;
96
                    var container = action.CloudFile.Container ?? PithosContainer;
97

    
98
                    switch (action.Action)
99
                    {
100
                        case CloudActionType.UploadUnconditional:
101
                            UploadCloudFile(account, container, localFile, action.LocalHash.Value, action.TopHash.Value);
102
                            break;
103
                        case CloudActionType.DownloadUnconditional:
104

    
105
                            DownloadCloudFile(account, container, new Uri(cloudFile.Name, UriKind.Relative),
106
                                              downloadPath);
107
                            break;
108
                        case CloudActionType.DeleteCloud:
109
                            DeleteCloudFile(account, container, cloudFile.Name);
110
                            break;
111
                        case CloudActionType.RenameCloud:
112
                            var moveAction = (CloudMoveAction)action;
113
                            RenameCloudFile(account, container, moveAction.OldFileName, moveAction.NewPath,
114
                                            moveAction.NewFileName);
115
                            break;
116
                        case CloudActionType.MustSynch:
117

    
118
                            if (!File.Exists(downloadPath))
119
                            {
120
                                var cloudUri = new Uri(action.CloudFile.Name, UriKind.Relative);
121
                                DownloadCloudFile(account, container, cloudUri, downloadPath);
122
                            }
123
                            else
124
                            {
125
                                SyncFiles(action);
126
                            }
127
                            break;
128
                    }
129
                    Log.InfoFormat("[ACTION] End Processing {0}:{1}->{2}", action.Action, action.LocalFile,
130
                                           action.CloudFile.Name);
131
                }
132
                catch (OperationCanceledException)
133
                {
134
                    throw;
135
                }
136
                catch (FileNotFoundException exc)
137
                {
138
                    Log.ErrorFormat("{0} : {1} -> {2}  failed because the file was not found.\n Rescheduling a delete",
139
                        action.Action, action.LocalFile, action.CloudFile, exc);
140
                    Post(new CloudDeleteAction(action.CloudFile,action.FileState));
141
                }
142
                catch (Exception exc)
143
                {
144
                    Log.ErrorFormat("[REQUEUE] {0} : {1} -> {2} due to exception\r\n{3}",
145
                                     action.Action, action.LocalFile, action.CloudFile, exc);
146

    
147
                    _agent.Post(action);
148
                }
149
                return CompletedTask<object>.Default;
150
            }
151
        }
152

    
153
        private void SyncFiles(CloudAction action)
154
        {
155
            if (action==null)
156
                throw new ArgumentNullException("action");
157
            if (action.LocalFile==null)
158
                throw new ArgumentException("The action's local file is not specified","action");
159
            if (!Path.IsPathRooted(action.LocalFile.FullName))
160
                throw new ArgumentException("The action's local file path must be absolute","action");
161
            if (action.CloudFile== null)
162
                throw new ArgumentException("The action's cloud file is not specified", "action");
163
            Contract.EndContractBlock();
164

    
165
            var localFile = action.LocalFile;
166
            var cloudFile = action.CloudFile;
167
            var downloadPath=action.LocalFile.FullName.ToLower();
168

    
169
            var account = cloudFile.Account;
170
            //Use "pithos" by default if no container is specified
171
            var container = cloudFile.Container ?? PithosContainer;
172

    
173
            var cloudUri = new Uri(cloudFile.Name, UriKind.Relative);
174
            var cloudHash = cloudFile.Hash.ToLower();
175
            var localHash = action.LocalHash.Value.ToLower();
176
            var topHash = action.TopHash.Value.ToLower();
177

    
178
            //Not enough to compare only the local hashes, also have to compare the tophashes
179
            
180
            //If any of the hashes match, we are done
181
            if ((cloudHash == localHash || cloudHash == topHash))
182
            {
183
                Log.InfoFormat("Skipping {0}, hashes match",downloadPath);
184
                return;
185
            }
186

    
187
            //The hashes DON'T match. We need to sync
188
            var lastLocalTime = localFile.LastWriteTime;
189
            var lastUpTime = cloudFile.Last_Modified;
190
            
191
            //If the local file is newer upload it
192
            if (lastUpTime <= lastLocalTime)
193
            {
194
                //It probably means it was changed while the app was down                        
195
                UploadCloudFile(account, container, localFile, action.LocalHash.Value,
196
                                action.TopHash.Value);
197
            }
198
            else
199
            {
200
                //It the cloud file has a later date, it was modified by another user or computer.
201
                //We need to check the local file's status                
202
                var status = StatusKeeper.GetFileStatus(downloadPath);
203
                switch (status)
204
                {
205
                    case FileStatus.Unchanged:                        
206
                        //If the local file's status is Unchanged, we can go on and download the newer cloud file
207
                        DownloadCloudFile(account, container,cloudUri,downloadPath);
208
                        break;
209
                    case FileStatus.Modified:
210
                        //If the local file is Modified, we may have a conflict. In this case we should mark the file as Conflict
211
                        //We can't ensure that a file modified online since the last time will appear as Modified, unless we 
212
                        //index all files before we start listening.                       
213
                    case FileStatus.Created:
214
                        //If the local file is Created, it means that the local and cloud files aren't related,
215
                        // yet they have the same name.
216

    
217
                        //In both cases we must mark the file as in conflict
218
                        ReportConflict(downloadPath);
219
                        break;
220
                    default:
221
                        //Other cases should never occur. Mark them as Conflict as well but log a warning
222
                        ReportConflict(downloadPath);
223
                        Log.WarnFormat("Unexcepted status {0} for file {1}->{2}", status,
224
                                       downloadPath, action.CloudFile.Name);
225
                        break;
226
                }
227
            }
228
        }
229

    
230
        private void ReportConflict(string downloadPath)
231
        {
232
            if (String.IsNullOrWhiteSpace(downloadPath))
233
                throw new ArgumentNullException("downloadPath");
234
            Contract.EndContractBlock();
235

    
236
            StatusKeeper.SetFileOverlayStatus(downloadPath, FileOverlayStatus.Conflict);
237
            var message = String.Format("Conflict detected for file {0}", downloadPath);
238
            Log.Warn(message);
239
            StatusNotification.NotifyChange(message, TraceLevel.Warning);
240
        }
241

    
242
/*
243
        private Task<object> Process(CloudMoveAction action)
244
        {
245
            if (action == null)
246
                throw new ArgumentNullException("action");
247
            Contract.EndContractBlock();
248

    
249
            Log.InfoFormat("[ACTION] Start Processing {0}:{1}->{2}", action.Action, action.LocalFile, action.CloudFile.Name);
250

    
251
            try
252
            {
253
                RenameCloudFile(action.OldFileName, action.NewPath, action.NewFileName);
254
                Log.InfoFormat("[ACTION] End Processing {0}:{1}->{2}", action.Action, action.LocalFile, action.CloudFile.Name);
255
            }
256
            catch (OperationCanceledException)
257
            {
258
                throw;
259
            }
260
            catch (Exception exc)
261
            {
262
                Log.ErrorFormat("[REQUEUE] {0} : {1} -> {2} due to exception\r\n{3}",
263
                                action.Action, action.OldFileName, action.NewFileName, exc);
264

    
265
                _agent.Post(action);
266
            }
267
            return CompletedTask<object>.Default;
268
        }
269
*/
270

    
271

    
272
        public void Post(CloudAction cloudAction)
273
        {
274
            if (cloudAction == null)
275
                throw new ArgumentNullException("cloudAction");
276
            Contract.EndContractBlock();
277
            
278
            //If the action targets a local file, add a treehash calculation
279
            if (cloudAction.LocalFile != null)
280
            {
281

    
282
                if (cloudAction.LocalFile.Length>BlockSize)
283
                    cloudAction.TopHash = new Lazy<string>(() => Signature.CalculateTreeHashAsync(cloudAction.LocalFile, 
284
                                    BlockSize, BlockHash).Result
285
                                     .TopHash.ToHashString());
286
                else
287
                {
288
                    cloudAction.TopHash=new Lazy<string>(()=> cloudAction.LocalHash.Value);
289
                }
290

    
291
            }
292
            _agent.Post(cloudAction);
293
        }
294

    
295
        class ObjectInfoByNameComparer:IEqualityComparer<ObjectInfo>
296
        {
297
            public bool Equals(ObjectInfo x, ObjectInfo y)
298
            {
299
                return x.Name.Equals(y.Name,StringComparison.InvariantCultureIgnoreCase);
300
            }
301

    
302
            public int GetHashCode(ObjectInfo obj)
303
            {
304
                return obj.Name.ToLower().GetHashCode();
305
            }
306
        }
307

    
308
        //Remote files are polled periodically. Any changes are processed
309
        public Task ProcessRemoteFiles(string accountPath,DateTime? since=null)
310
        {   
311
            if (String.IsNullOrWhiteSpace(accountPath))
312
                throw new ArgumentNullException(accountPath);
313
            Contract.EndContractBlock();
314

    
315
            using (log4net.ThreadContext.Stacks["SCHEDULE"].Push("Retrieve Remote"))
316
            {
317
                Log.Info("[LISTENER] Scheduled");
318

    
319
                //Get the list of server objects changed since the last check
320
                var listObjects = Task<IList<ObjectInfo>>.Factory.StartNewDelayed(10000, () =>
321
                                CloudClient.ListObjects(CloudClient.UserName, PithosContainer, since));
322
                //Get the list of deleted objects since the last check
323
                var listTrash = Task<IList<ObjectInfo>>.Factory.StartNewDelayed(10000, () =>
324
                                CloudClient.ListObjects(CloudClient.UserName, TrashContainer, since));                
325

    
326
                var listShared = Task<IList<ObjectInfo>>.Factory.StartNewDelayed(10000, () =>
327
                                CloudClient.ListSharedObjects(since));
328

    
329
                var listAll = Task.Factory.TrackedSequence(
330
                    () => listObjects,
331
                    () => listTrash,
332
                    () => listShared);
333

    
334
                //Next time we will check for all changes since the current check minus 1 second
335
                //This is done to ensure there are no discrepancies due to clock differences
336
                DateTime nextSince = DateTime.Now.AddSeconds(-1);
337

    
338

    
339
                var enqueueFiles = listAll.ContinueWith(task =>
340
                {
341
                    if (task.IsFaulted)
342
                    {
343
                        //ListObjects failed at this point, need to reschedule
344
                        Log.ErrorFormat("[FAIL] ListObjects in ProcessRemoteFiles with {0}", task.Exception);
345
                        ProcessRemoteFiles(accountPath, since);
346
                        return;
347
                    }
348
                    using (log4net.ThreadContext.Stacks["SCHEDULE"].Push("Process Results"))
349
                    {
350
                        var remoteObjects = ((Task<IList<ObjectInfo>>) task.Result[0]).Result;
351
                        var trashObjects = ((Task<IList<ObjectInfo>>) task.Result[1]).Result;
352
                        var sharedObjects = ((Task<IList<ObjectInfo>>) task.Result[2]).Result;
353

    
354
                        //Items with the same name, hash may be both in the container and the trash
355
                        //Don't delete items that exist in the container
356
                        var realTrash = from trash in trashObjects
357
                                        where !remoteObjects.Any(info => info.Hash == trash.Hash)
358
                                        select trash;
359
                        ProcessDeletedFiles(realTrash);                        
360

    
361

    
362
                        var remote = from info in remoteObjects.Union(sharedObjects)
363
                                     let name = info.Name
364
                                     where !name.EndsWith(".ignore", StringComparison.InvariantCultureIgnoreCase) &&
365
                                           !name.StartsWith("fragments/", StringComparison.InvariantCultureIgnoreCase)
366
                                     select info;
367

    
368
                        //Create a list of actions from the remote files
369
                        var allActions = ObjectsToActions(remote);
370
                       
371
                        //And remove those that are already being processed by the agent
372
                        var distinctActions = allActions
373
                            .Except(_agent.GetEnumerable(), new PithosMonitor.LocalFileComparer())
374
                            .ToList();
375

    
376
                        //Queue all the actions
377
                        foreach (var message in distinctActions)
378
                        {
379
                            Post(message);
380
                        }
381

    
382
                        //Report the number of new files
383
                        var remoteCount = distinctActions.Count(action=>
384
                            action.Action==CloudActionType.DownloadUnconditional);
385
                        if ( remoteCount > 0)
386
                            StatusNotification.NotifyChange(String.Format("Processing {0} new files", remoteCount));
387

    
388
                        Log.Info("[LISTENER] End Processing");                        
389
                    }
390
                });
391

    
392
                var loop = enqueueFiles.ContinueWith(t =>
393
                {                    
394
                    if (t.IsFaulted)
395
                    {
396
                        Log.Error("[LISTENER] Exception", t.Exception);
397
                    }
398
                    else
399
                    {
400
                        Log.Info("[LISTENER] Finished");
401
                    }
402
                    ProcessRemoteFiles(accountPath, nextSince);
403

    
404
                });
405
                return loop;
406
            }
407
        }
408

    
409
        //Creates an appropriate action for each server file
410
        private IEnumerable<CloudAction> ObjectsToActions(IEnumerable<ObjectInfo> remote)
411
        {
412
            if (remote==null)
413
                throw new ArgumentNullException();
414
            Contract.EndContractBlock();
415

    
416
            //In order to avoid multiple iterations over the files, we iterate only once
417
            //over the remote files
418
            foreach (var objectInfo in remote)
419
            {
420
                var relativePath = objectInfo.RelativeUrlToFilePath(CloudClient.UserName);
421
                //and remove any matching objects from the list, adding them to the commonObjects list
422
                if (FileAgent.Exists(relativePath))
423
                {
424
                    var localFile = FileAgent.GetFileInfo(relativePath);
425
                    var state = FileState.FindByFilePath(localFile.FullName);
426
                    //Common files should be checked on a per-case basis to detect differences, which is newer
427

    
428
                    yield return new CloudAction(CloudActionType.MustSynch,
429
                                                   localFile, objectInfo, state, BlockSize, BlockHash);
430
                }
431
                else
432
                {
433
                    //If there is no match we add them to the localFiles list
434
                    //but only if the file is not marked for deletion
435
                    var targetFile = Path.Combine(FileAgent.RootPath, relativePath);
436
                    var fileStatus = StatusKeeper.GetFileStatus(targetFile);
437
                    if (fileStatus != FileStatus.Deleted)
438
                    {
439
                        //Remote files should be downloaded
440
                        yield return new CloudDownloadAction(objectInfo);
441
                    }
442
                }
443
            }            
444
        }
445

    
446
        private void ProcessDeletedFiles(IEnumerable<ObjectInfo> trashObjects)
447
        {
448
            foreach (var trashObject in trashObjects)
449
            {
450
                var relativePath = trashObject.RelativeUrlToFilePath(CloudClient.UserName);
451
                //and remove any matching objects from the list, adding them to the commonObjects list
452
                FileAgent.Delete(relativePath);                                
453
            }
454
        }
455

    
456

    
457
        private void RenameCloudFile(string account, string container,string oldFileName, string newPath, string newFileName)
458
        {
459
            if (String.IsNullOrWhiteSpace(account))
460
                throw new ArgumentNullException("account");
461
            if (String.IsNullOrWhiteSpace(container))
462
                throw new ArgumentNullException("container");
463
            if (String.IsNullOrWhiteSpace(oldFileName))
464
                throw new ArgumentNullException("oldFileName");
465
            if (String.IsNullOrWhiteSpace(oldFileName))
466
                throw new ArgumentNullException("newPath");
467
            if (String.IsNullOrWhiteSpace(oldFileName))
468
                throw new ArgumentNullException("newFileName");
469
            Contract.EndContractBlock();
470
            //The local file is already renamed
471
            this.StatusKeeper.SetFileOverlayStatus(newPath, FileOverlayStatus.Modified);
472

    
473
            CloudClient.MoveObject(account, container, oldFileName, container, newFileName);
474

    
475
            this.StatusKeeper.SetFileStatus(newPath, FileStatus.Unchanged);
476
            this.StatusKeeper.SetFileOverlayStatus(newPath, FileOverlayStatus.Normal);
477
            NativeMethods.RaiseChangeNotification(newPath);
478
        }
479

    
480
        private void DeleteCloudFile(string account,string container, string fileName)
481
        {
482
            if (String.IsNullOrWhiteSpace(account))
483
                throw new ArgumentNullException("account");
484
            if (String.IsNullOrWhiteSpace(container))
485
                throw new ArgumentNullException("container");
486
            if (String.IsNullOrWhiteSpace(container))
487
                throw new ArgumentNullException("container");
488

    
489
            if (String.IsNullOrWhiteSpace(fileName))
490
                throw new ArgumentNullException("fileName");
491
            if (Path.IsPathRooted(fileName))
492
                throw new ArgumentException("The fileName should not be rooted","fileName");
493
            Contract.EndContractBlock();
494

    
495
            using ( log4net.ThreadContext.Stacks["DeleteCloudFile"].Push("Delete"))
496
            {
497
                var info = FileAgent.GetFileInfo(fileName);
498
                var path = info.FullName.ToLower();
499
                this.StatusKeeper.SetFileOverlayStatus(path, FileOverlayStatus.Modified);
500

    
501
                CloudClient.DeleteObject(account, container, fileName, TrashContainer);
502

    
503
                this.StatusKeeper.ClearFileStatus(path);
504
            }
505
        }
506

    
507
        //Download a file.
508
        private void DownloadCloudFile(string account,string container, Uri relativeUrl, string localPath)
509
        {
510
            if (String.IsNullOrWhiteSpace(account))
511
                throw new ArgumentNullException("account");
512
            if (String.IsNullOrWhiteSpace(container))
513
                throw new ArgumentNullException("container");
514
            if (relativeUrl == null)
515
                throw new ArgumentNullException("relativeUrl");
516
            if (String.IsNullOrWhiteSpace(localPath))
517
                throw new ArgumentNullException("localPath");
518
            if (!Path.IsPathRooted(localPath))
519
                throw new ArgumentException("The localPath must be rooted", "localPath");
520
            Contract.EndContractBlock();
521
            
522
            var download=Task.Factory.Iterate(DownloadIterator(account,container, relativeUrl, localPath));
523
            download.Wait();
524
        }
525

    
526
        private IEnumerable<Task> DownloadIterator(string account,string container, Uri relativeUrl, string localPath)
527
        {
528
            if (String.IsNullOrWhiteSpace(account))
529
                throw new ArgumentNullException("account");
530
            if (String.IsNullOrWhiteSpace(container))
531
                throw new ArgumentNullException("container");
532
            if (relativeUrl==null)
533
                throw new ArgumentNullException("relativeUrl");
534
            if (String.IsNullOrWhiteSpace(localPath))
535
                throw new ArgumentNullException("localPath");
536
            if (!Path.IsPathRooted(localPath))
537
                throw new ArgumentException("The localPath must be rooted", "localPath");
538
            Contract.EndContractBlock();
539

    
540
            var url = relativeUrl.ToString();
541
            if (url.EndsWith(".ignore",StringComparison.InvariantCultureIgnoreCase))
542
                yield break;
543

    
544
            //Are we already downloading or uploading the file? 
545
            using (var gate=NetworkGate.Acquire(localPath, NetworkOperation.Downloading))
546
            {
547
                if (gate.Failed)
548
                    yield break;
549
                //The file's hashmap will be stored in the same location with the extension .hashmap
550
                //var hashPath = Path.Combine(FileAgent.FragmentsPath, relativePath + ".hashmap");
551
                
552
                //Retrieve the hashmap from the server
553
                var getHashMap = CloudClient.GetHashMap(account, container, url);
554
                yield return getHashMap;
555
                
556
                var serverHash=getHashMap.Result;
557
                //If it's a small file
558
                var downloadTask=(serverHash.Hashes.Count == 1 )
559
                    //Download it in one go
560
                    ? DownloadEntireFile(account,container, relativeUrl, localPath) 
561
                    //Otherwise download it block by block
562
                    : DownloadWithBlocks(account,container, relativeUrl, localPath, serverHash);
563

    
564
                yield return downloadTask;
565

    
566

    
567
                //Retrieve the object's metadata
568
                var info=CloudClient.GetObjectInfo(account, container, url);
569
                //And store it
570
                StatusKeeper.StoreInfo(localPath, info);
571
                
572
                //Notify listeners that a local file has changed
573
                StatusNotification.NotifyChangedFile(localPath);
574

    
575
            }
576
        }
577

    
578
        //Download a small file with a single GET operation
579
        private Task DownloadEntireFile(string account,string container, Uri relativeUrl, string localPath)
580
        {
581
            if (String.IsNullOrWhiteSpace(account))
582
                throw new ArgumentNullException("account");
583
            if (String.IsNullOrWhiteSpace(container))
584
                throw new ArgumentNullException("container");
585
            if (relativeUrl == null)
586
                throw new ArgumentNullException("relativeUrl");
587
            if (String.IsNullOrWhiteSpace(localPath))
588
                throw new ArgumentNullException("localPath");
589
            if (!Path.IsPathRooted(localPath))
590
                throw new ArgumentException("The localPath must be rooted", "localPath");
591
            Contract.EndContractBlock();
592

    
593
            //Calculate the relative file path for the new file
594
            var relativePath = relativeUrl.RelativeUriToFilePath();
595
            //The file will be stored in a temporary location while downloading with an extension .download
596
            var tempPath = Path.Combine(FileAgent.FragmentsPath, relativePath + ".download");
597
            //Make sure the target folder exists. DownloadFileTask will not create the folder
598
            var directoryPath = Path.GetDirectoryName(tempPath);
599
            if (!Directory.Exists(directoryPath))
600
                Directory.CreateDirectory(directoryPath);
601

    
602
            //Download the object to the temporary location
603
            var getObject = CloudClient.GetObject(account, container, relativeUrl.ToString(), tempPath).ContinueWith(t =>
604
            {
605
                t.PropagateExceptions();
606
                //And move it to its actual location once downloading is finished
607
                if (File.Exists(localPath))
608
                    File.Replace(tempPath,localPath,null,true);
609
                else
610
                    File.Move(tempPath,localPath);
611
            });
612
            return getObject;
613
        }
614

    
615
        //Download a file asynchronously using blocks
616
        public Task DownloadWithBlocks(string account,string container, Uri relativeUrl, string localPath, TreeHash serverHash)
617
        {
618
            if (String.IsNullOrWhiteSpace(account))
619
                throw new ArgumentNullException("account");
620
            if (String.IsNullOrWhiteSpace(container))
621
                throw new ArgumentNullException("container");
622
            if (relativeUrl == null)
623
                throw new ArgumentNullException("relativeUrl");
624
            if (String.IsNullOrWhiteSpace(localPath))
625
                throw new ArgumentNullException("localPath");
626
            if (!Path.IsPathRooted(localPath))
627
                throw new ArgumentException("The localPath must be rooted", "localPath");
628
            if (serverHash == null)
629
                throw new ArgumentNullException("serverHash");
630
            Contract.EndContractBlock();
631
            
632
            return Task.Factory.Iterate(BlockDownloadIterator(account,container, relativeUrl, localPath, serverHash));
633
        }
634
        
635
        private IEnumerable<Task> BlockDownloadIterator(string account,string container,Uri relativeUrl, string localPath,TreeHash serverHash)
636
        {
637
            if (String.IsNullOrWhiteSpace(account))
638
                throw new ArgumentNullException("account");
639
            if (String.IsNullOrWhiteSpace(container))
640
                throw new ArgumentNullException("container");
641
            if (relativeUrl == null)
642
                throw new ArgumentNullException("relativeUrl");
643
            if (String.IsNullOrWhiteSpace(localPath))
644
                throw new ArgumentNullException("localPath");
645
            if (!Path.IsPathRooted(localPath))
646
                throw new ArgumentException("The localPath must be rooted", "localPath");
647
            if(serverHash==null)
648
                throw new ArgumentNullException("serverHash");
649
            Contract.EndContractBlock();
650

    
651
            
652
            //Calculate the relative file path for the new file
653
            var relativePath = relativeUrl.RelativeUriToFilePath();
654
            var blockUpdater = new BlockUpdater(FileAgent.FragmentsPath, localPath, relativePath, serverHash);
655

    
656
            
657
                        
658
            //Calculate the file's treehash
659
            var calcHash = Signature.CalculateTreeHashAsync(localPath, this.BlockSize,BlockHash);
660
            yield return calcHash;                        
661
            var treeHash = calcHash.Result;
662
                
663
            //And compare it with the server's hash
664
            var upHashes = serverHash.GetHashesAsStrings();
665
            var localHashes = treeHash.HashDictionary;
666
            for (int i = 0; i < upHashes.Length; i++)
667
            {
668
                //For every non-matching hash
669
                var upHash = upHashes[i];
670
                if (!localHashes.ContainsKey(upHash))
671
                {
672
                    if (blockUpdater.UseOrphan(i, upHash))
673
                    {
674
                        Log.InfoFormat("[BLOCK GET] ORPHAN FOUND for {0} of {1} for {2}", i, upHashes.Length, localPath);
675
                        continue;
676
                    }
677
                    Log.InfoFormat("[BLOCK GET] START {0} of {1} for {2}", i, upHashes.Length, localPath);
678
                    var start = i*BlockSize;
679
                    //To download the last block just pass a null for the end of the range
680
                    long? end = null;
681
                    if (i < upHashes.Length - 1 )
682
                        end= ((i + 1)*BlockSize) ;
683
                            
684
                    //Download the missing block
685
                    var getBlock = CloudClient.GetBlock(account, container, relativeUrl, start, end);
686
                    yield return getBlock;
687
                    var block = getBlock.Result;
688

    
689
                    //and store it
690
                    yield return blockUpdater.StoreBlock(i, block);
691

    
692

    
693
                    Log.InfoFormat("[BLOCK GET] FINISH {0} of {1} for {2}", i, upHashes.Length, localPath);
694
                }
695
            }
696

    
697
            blockUpdater.Commit();
698
            Log.InfoFormat("[BLOCK GET] COMPLETE {0}", localPath);            
699
        }
700

    
701

    
702
        private void UploadCloudFile(string account,string container,FileInfo fileInfo, string hash,string topHash)
703
        {
704
            if (String.IsNullOrWhiteSpace(account))
705
                throw new ArgumentNullException("account");
706
            if (String.IsNullOrWhiteSpace(container))
707
                throw new ArgumentNullException("container");
708
            if (fileInfo == null)
709
                throw new ArgumentNullException("fileInfo");
710
            if (String.IsNullOrWhiteSpace(hash))
711
                throw new ArgumentNullException("hash");
712
            if (topHash == null)
713
                throw new ArgumentNullException("topHash");
714
            Contract.EndContractBlock();
715

    
716
            var upload = Task.Factory.Iterate(UploadIterator(account,container,fileInfo, hash.ToLower(), topHash.ToLower()));
717
            upload.Wait();
718
        }
719

    
720
        private IEnumerable<Task> UploadIterator(string account,string container,FileInfo fileInfo, string hash,string topHash)
721
        {
722
            if (String.IsNullOrWhiteSpace(account))
723
                throw new ArgumentNullException("account");
724
            if (String.IsNullOrWhiteSpace(container))
725
                throw new ArgumentNullException("container");
726
            if (fileInfo == null)
727
                throw new ArgumentNullException("fileInfo");
728
            if (String.IsNullOrWhiteSpace(hash))
729
                throw new ArgumentNullException("hash");
730
            if (topHash == null)
731
                throw new ArgumentNullException("topHash");
732
            Contract.EndContractBlock();
733

    
734
            if (fileInfo.Extension.Equals("ignore", StringComparison.InvariantCultureIgnoreCase))
735
                yield break;
736
            
737
            var url = fileInfo.AsRelativeUrlTo(FileAgent.RootPath);
738

    
739
            var fullFileName = fileInfo.FullName;
740
            using(var gate=NetworkGate.Acquire(fullFileName,NetworkOperation.Uploading))
741
            {
742
                //Abort if the file is already being uploaded or downloaded
743
                if (gate.Failed)
744
                    yield break; 
745

    
746

    
747
                //Even if GetObjectInfo times out, we can proceed with the upload            
748
                var info = CloudClient.GetObjectInfo(account, container, url);
749
                var cloudHash = info.Hash.ToLower();
750

    
751
                //If the file hashes match, abort the upload
752
                if (hash == cloudHash  || topHash ==cloudHash)
753
                {
754
                    //but store any metadata changes 
755
                    this.StatusKeeper.StoreInfo(fullFileName, info);
756
                    Log.InfoFormat("Skip upload of {0}, hashes match", fullFileName);
757
                    yield break;
758
                }
759

    
760
                //Mark the file as modified while we upload it
761
                StatusKeeper.SetFileOverlayStatus(fullFileName, FileOverlayStatus.Modified);
762
                //And then upload it
763

    
764
                //If the file is larger than the block size, try a hashmap PUT
765
                if (fileInfo.Length > BlockSize )
766
                {
767
                    //To upload using a hashmap
768
                    //First, calculate the tree hash
769
                    var treeHash = Signature.CalculateTreeHashAsync(fileInfo.FullName, BlockSize, BlockHash);
770
                    yield return treeHash;
771
                    
772
                    yield return Task.Factory.Iterate(UploadWithHashMap(account,container,fileInfo,url,treeHash));
773
                                        
774
                }
775
                else
776
                {
777
                    //Otherwise do a regular PUT
778
                    yield return CloudClient.PutObject(account, container, url, fullFileName, hash);                    
779
                }
780
                //If everything succeeds, change the file and overlay status to normal
781
                this.StatusKeeper.SetFileState(fullFileName, FileStatus.Unchanged, FileOverlayStatus.Normal);
782
            }
783
            //Notify the Shell to update the overlays
784
            NativeMethods.RaiseChangeNotification(fullFileName);
785
            StatusNotification.NotifyChangedFile(fullFileName);
786
        }
787

    
788
        public IEnumerable<Task> UploadWithHashMap(string account,string container,FileInfo fileInfo,string url,Task<TreeHash> treeHash)
789
        {
790
            if (String.IsNullOrWhiteSpace(account))
791
                throw new ArgumentNullException("account");
792
            if (String.IsNullOrWhiteSpace(container))
793
                throw new ArgumentNullException("container");
794
            if (fileInfo == null)
795
                throw new ArgumentNullException("fileInfo");
796
            if (String.IsNullOrWhiteSpace(url))
797
                throw new ArgumentNullException(url);
798
            if (treeHash==null)
799
                throw new ArgumentNullException("treeHash");
800
            Contract.EndContractBlock();
801

    
802
            var fullFileName = fileInfo.FullName;
803

    
804
            //Send the hashmap to the server            
805
            var hashPut = CloudClient.PutHashMap(account, container, url, treeHash.Result);
806
            yield return hashPut;
807

    
808
            var missingHashes = hashPut.Result;
809
            //If the server returns no missing hashes, we are done
810
            while (missingHashes.Count > 0)
811
            {
812

    
813
                var buffer = new byte[BlockSize];
814
                foreach (var missingHash in missingHashes)
815
                {
816
                    //Find the proper block
817
                    var blockIndex = treeHash.Result.HashDictionary[missingHash];
818
                    var offset = blockIndex*BlockSize;
819

    
820
                    var read = fileInfo.Read(buffer, offset, BlockSize);
821

    
822
                    //And upload the block                
823
                    var postBlock = CloudClient.PostBlock(account, container, buffer, 0, read);
824

    
825
                    //We have to handle possible exceptions in a continuation because
826
                    //*yield return* can't appear inside a try block
827
                    yield return postBlock.ContinueWith(t => 
828
                        t.ReportExceptions(
829
                            exc => Log.ErrorFormat("[ERROR] uploading block {0} of {1}\n{2}", blockIndex, fullFileName, exc),
830
                            ()=>Log.InfoFormat("[BLOCK] Block {0} of {1} uploaded", blockIndex,fullFileName)));
831
                }
832

    
833
                //Repeat until there are no more missing hashes
834
                hashPut = CloudClient.PutHashMap(account, container, url, treeHash.Result);
835
                yield return hashPut;
836
                missingHashes = hashPut.Result;
837
            }
838
        }
839

    
840

    
841
    }
842

    
843
   
844

    
845

    
846
}