Revision a27aa447 trunk/Pithos.Core/Agents/NetworkAgent.cs

b/trunk/Pithos.Core/Agents/NetworkAgent.cs
6 6
using System.IO;
7 7
using System.Linq;
8 8
using System.Text;
9
using System.Threading;
9 10
using System.Threading.Tasks;
10 11
using Pithos.Interfaces;
11 12
using Pithos.Network;
......
33 34
*/
34 35

  
35 36

  
36
        private string _pithosContainer;
37
        private string _trashContainer;
37
        public string PithosContainer { get; set; }
38
        public string TrashContainer { get; private set; }
38 39

  
39
        private int _blockSize;
40
        private string _blockHash;
40
        public int BlockSize { get; set; }
41
        public string BlockHash { get; set; }
41 42

  
42 43

  
43 44
        public void Start(string pithosContainer, string trashContainer, int blockSize, string blockHash)
......
48 49
                throw new ArgumentNullException("trashContainer");
49 50
            Contract.EndContractBlock();
50 51

  
51
            _pithosContainer = pithosContainer;
52
            _trashContainer = trashContainer;
53
            _blockSize = blockSize;
54
            _blockHash = blockHash;
52
            PithosContainer = pithosContainer;
53
            TrashContainer = trashContainer;
54
            BlockSize = blockSize;
55
            BlockHash = blockHash;
55 56

  
56 57

  
57 58
            _agent = Agent<CloudAction>.Start(inbox =>
......
60 61
                loop = () =>
61 62
                {
62 63
                    var message = inbox.Receive();
64

  
65
/*
66
                    var process=Process(message);
67
                    inbox.LoopAsync(process, loop);
68
*/
69

  
70
/*
71
                    process1.ContinueWith(t =>
72
                    {
73
                        inbox.DoAsync(loop);
74
                        if (t.IsFaulted)
75
                        {
76
                            var ex = t.Exception.InnerException;
77
                            if (ex is OperationCanceledException)
78
                                inbox.Stop();
79
                        }
80
                    });
81
*/
82
                    //inbox.DoAsync(loop);
83

  
84

  
63 85
                    var process = message.ContinueWith(t =>
64 86
                    {
65 87
                        var action = t.Result;
......
67 89
                        Process(action);
68 90
                        inbox.DoAsync(loop);
69 91
                    });
92

  
70 93
                    process.ContinueWith(t =>
71 94
                    {
72 95
                        inbox.DoAsync(loop);
......
78 101
                        }
79 102
                    });
80 103

  
104

  
81 105
                };
82 106
                loop();
83 107
            });
......
89 113
            if (cloudAction == null)
90 114
                throw new ArgumentNullException("cloudAction");
91 115
            Contract.EndContractBlock();
116
            
117
            //If the action targets a local file, add a treehash calculation
118
            if (cloudAction.LocalFile != null)
119
            {
120
                cloudAction.TopHash = new Lazy<string>(() => Signature.CalculateTreeHashAsync(cloudAction.LocalFile, 
121
                                    BlockSize, BlockHash).Result
122
                                     .TopHash.ToHashString());
92 123

  
124
            }
93 125
            _agent.Post(cloudAction);
94 126
        }
95 127

  
......
112 144
            Trace.CorrelationManager.StartLogicalOperation();
113 145
            Trace.TraceInformation("[LISTENER] Scheduled");
114 146
            var listObjects = Task.Factory.StartNewDelayed(10000).ContinueWith(t =>
115
                CloudClient.ListObjects(_pithosContainer,since));
147
                CloudClient.ListObjects(PithosContainer,since));
116 148

  
117 149
            DateTime nextSince = DateTime.Now.AddSeconds(-1);
118 150

  
......
181 213
                        .ToList();
182 214

  
183 215
                //Queue all the actions
184
                _agent.AddFromEnumerable(distinctActions);
216
                foreach (var message in distinctActions)
217
                {
218
                    Post(message);
219
                }
220

  
185 221

  
186 222
                if(remoteOnly.Count>0)
187 223
                    StatusNotification.NotifyChange(String.Format("Processing {0} new files", remoteOnly.Count));
......
208 244
        }
209 245

  
210 246
        
247
        private Task Process(Task<CloudAction> action)
248
        {
249
            return action.ContinueWith(t=> Process(t.Result));
250
        }
211 251

  
212 252

  
213 253
        private void Process(CloudAction action)
......
230 270
                        UploadCloudFile(localFile, action.LocalHash.Value,action.TopHash.Value);
231 271
                        break;
232 272
                    case CloudActionType.DownloadUnconditional:
233
                        DownloadCloudFile(_pithosContainer, new Uri(cloudFile.Name,UriKind.Relative), downloadPath);
273
                        DownloadCloudFile(PithosContainer, new Uri(cloudFile.Name,UriKind.Relative), downloadPath);
234 274
                        break;
235 275
                    case CloudActionType.DeleteCloud:
236 276
                        DeleteCloudFile(cloudFile.Name);
......
266 306
                                        case FileStatus.Unchanged:
267 307
                                            //It he cloud file has a later date, it was modified by another user or computer.
268 308
                                            //If the local file's status is Unchanged, we should go on and download the cloud file
269
                                            DownloadCloudFile(_pithosContainer, new Uri(action.CloudFile.Name,UriKind.Relative), downloadPath);
309
                                            DownloadCloudFile(PithosContainer, new Uri(action.CloudFile.Name,UriKind.Relative), downloadPath);
270 310
                                            break;
271 311
                                        case FileStatus.Modified:
272 312
                                            //If the local file is Modified, we may have a conflict. In this case we should mark the file as Conflict
......
292 332
                            }
293 333
                        }
294 334
                        else
295
                            DownloadCloudFile(_pithosContainer, new Uri(action.CloudFile.Name,UriKind.Relative), downloadPath);
335
                            DownloadCloudFile(PithosContainer, new Uri(action.CloudFile.Name,UriKind.Relative), downloadPath);
296 336
                        break;
297 337
                }
298 338
                Trace.TraceInformation("[ACTION] End Processing {0}:{1}->{2}", action.Action, action.LocalFile, action.CloudFile.Name);
......
325 365
            //The local file is already renamed
326 366
            this.StatusKeeper.SetFileOverlayStatus(newPath, FileOverlayStatus.Modified);
327 367

  
328
            CloudClient.MoveObject(_pithosContainer, oldFileName, _pithosContainer, newFileName);
368
            CloudClient.MoveObject(PithosContainer, oldFileName, PithosContainer, newFileName);
329 369

  
330 370
            this.StatusKeeper.SetFileStatus(newPath, FileStatus.Unchanged);
331 371
            this.StatusKeeper.SetFileOverlayStatus(newPath, FileOverlayStatus.Normal);
......
342 382

  
343 383
            this.StatusKeeper.SetFileOverlayStatus(fileName, FileOverlayStatus.Modified);
344 384

  
345
            CloudClient.MoveObject(_pithosContainer, fileName, _trashContainer, fileName);
385
            CloudClient.MoveObject(PithosContainer, fileName, TrashContainer, fileName);
386

  
346 387
            this.StatusKeeper.ClearFileStatus(fileName);
347 388
            this.StatusKeeper.RemoveFileOverlayStatus(fileName);
348 389
        }
......
354 395
                throw new ArgumentNullException("container");
355 396
            if (relativeUrl==null)
356 397
                throw new ArgumentNullException("relativeUrl");
357

  
358 398
            if (String.IsNullOrWhiteSpace(localPath))
359 399
                throw new ArgumentNullException("localPath");
360 400
            if (!Path.IsPathRooted(localPath))
......
370 410
            {
371 411
                if (gate.Failed)
372 412
                    return;
373
                //Calculate the relative file path for the new file
374
                var relativePath = relativeUrl.RelativeUriToFilePath();
375
                //The file will be stored in a temporary location while downloading with an extension .download
376
                var tempPath = Path.Combine(FileAgent.FragmentsPath, relativePath + ".download");
377 413
                //The file's hashmap will be stored in the same location with the extension .hashmap
378
                var hashPath = Path.Combine(FileAgent.FragmentsPath, relativePath + ".hashmap");
414
                //var hashPath = Path.Combine(FileAgent.FragmentsPath, relativePath + ".hashmap");
379 415
                
380 416
                //Retrieve the hashmap from the server
381
                var getHashMap = CloudClient.GetHashMap(container,url);
382
                //And save it
383
                var saveHashMap = getHashMap.ContinueWith(t =>
417
                var getHashMap = CloudClient.GetHashMap(container,url);                
418
                var downloadTask= getHashMap.ContinueWith(t =>
384 419
                {
385
                    var treeHash = t.Result;
386
                    treeHash.Save(hashPath);
420
                    var serverHash=t.Result;
421
                    //If it's a small file
422
                    return serverHash.Hashes.Count == 1 
423
                        //Download it in one go
424
                        ? DownloadEntireFile(container, relativeUrl, localPath) 
425
                        //Otherwise download it block by block
426
                        : DownloadWithBlocks(container, relativeUrl, localPath, serverHash);
387 427
                });
388 428

  
389
                //Make sure the target folder exists. DownloadFileTask will not create the folder
390
                var directoryPath=Path.GetDirectoryName(tempPath);
391
                if (!Directory.Exists(directoryPath))
392
                    Directory.CreateDirectory(directoryPath);
393

  
394
                //Download the object to the temporary location
395
                var getObject = CloudClient.GetObject(container, url, tempPath).ContinueWith(t =>
396
                {
397
                    //And move it to its actual location once downloading is finished
398
                    if (File.Exists(localPath))
399
                        File.Replace(tempPath,localPath,null,true);
400
                    else
401
                        File.Move(tempPath,localPath);
402
                });
403 429
                
430

  
404 431
                //Retrieve the object's metadata
405
                var getInfo = saveHashMap.ContinueWith(t => getObject).ContinueWith(t =>
432
                var getInfo = downloadTask.ContinueWith(t =>
406 433
                            CloudClient.GetObjectInfo(container, url));
407 434
                //And store it
408 435
                var storeInfo = getInfo.ContinueWith(t =>
......
414 441
            }
415 442
        }
416 443

  
444
        //Download a small file with a single GET operation
445
        private Task DownloadEntireFile(string container, Uri relativeUrl, string localPath)
446
        {
447
            if (String.IsNullOrWhiteSpace(container))
448
                throw new ArgumentNullException("container");
449
            if (relativeUrl == null)
450
                throw new ArgumentNullException("relativeUrl");
451
            if (String.IsNullOrWhiteSpace(localPath))
452
                throw new ArgumentNullException("localPath");
453
            if (!Path.IsPathRooted(localPath))
454
                throw new ArgumentException("The localPath must be rooted", "localPath");
455
            Contract.EndContractBlock();
456

  
457
            //Calculate the relative file path for the new file
458
            var relativePath = relativeUrl.RelativeUriToFilePath();
459
            //The file will be stored in a temporary location while downloading with an extension .download
460
            var tempPath = Path.Combine(FileAgent.FragmentsPath, relativePath + ".download");
461
            //Make sure the target folder exists. DownloadFileTask will not create the folder
462
            var directoryPath = Path.GetDirectoryName(tempPath);
463
            if (!Directory.Exists(directoryPath))
464
                Directory.CreateDirectory(directoryPath);
465

  
466
            //Download the object to the temporary location
467
            var getObject = CloudClient.GetObject(container, relativeUrl.ToString(), tempPath).ContinueWith(t =>
468
            {
469
                //And move it to its actual location once downloading is finished
470
                if (File.Exists(localPath))
471
                    File.Replace(tempPath,localPath,null,true);
472
                else
473
                    File.Move(tempPath,localPath);
474
            });
475
            return getObject;
476
        }
477

  
478
        public Task DownloadWithBlocks(string container,Uri relativeUrl, string localPath,TreeHash serverHash)
479
        {
480
            if (String.IsNullOrWhiteSpace(container))
481
                throw new ArgumentNullException("container");
482
            if (relativeUrl == null)
483
                throw new ArgumentNullException("relativeUrl");
484
            if (String.IsNullOrWhiteSpace(localPath))
485
                throw new ArgumentNullException("localPath");
486
            if (!Path.IsPathRooted(localPath))
487
                throw new ArgumentException("The localPath must be rooted", "localPath");
488
            if(serverHash==null)
489
                throw new ArgumentNullException("serverHash");
490
            Contract.EndContractBlock();
491

  
492
            //Calculate the relative file path for the new file
493
            var relativePath = relativeUrl.RelativeUriToFilePath();
494
            //The file will be stored in a temporary location while downloading with an extension .download
495
            var tempPath = Path.Combine(FileAgent.FragmentsPath, relativePath + ".download");
496
            var directoryPath = Path.GetDirectoryName(tempPath);
497
            if (!Directory.Exists(directoryPath))
498
                Directory.CreateDirectory(directoryPath);
499
            
500
            //If the local file exists we should make a copy of it to the
501
            //fragments folder, unless a newer temp copy already exists, which
502
            //means there is an interrupted download
503
            if (ShouldCopy(localPath, tempPath))
504
                File.Copy(localPath, tempPath, true);    
505

  
506
            //Set the size of the file to the size specified in the treehash
507
            //This will also create an empty file if the file doesn't exist            
508
            SetFileSize(tempPath, serverHash.Bytes);
509

  
510
            return Task.Factory.StartNew(() =>
511
            {
512
                //Calculate the temp file's treehash
513
                var treeHash = Signature.CalculateTreeHashAsync(tempPath, this.BlockSize,BlockHash).Result;
514
                
515
                //And compare it with the server's hash
516
                var upHashes = serverHash.GetHashesAsStrings();
517
                var localHashes = treeHash.HashDictionary;
518
                for (int i = 0; i < upHashes.Length; i++)
519
                {
520
                    //For every non-matching hash
521
                    if (!localHashes.ContainsKey(upHashes[i]))
522
                    {
523
                        Trace.TraceInformation("[BLOCK GET] START {0} of {1} for {2}",i,upHashes.Length,localPath);
524
                        var start = i*BlockSize;
525
                        long? end = null;
526
                        if (i < upHashes.Length - 1 )
527
                            end= ((i + 1)*BlockSize) ;
528
                            
529
                        //Get its block
530
                        var blockTask = CloudClient.GetBlock(container, relativeUrl,
531
                                                            start, end);
532

  
533
                        blockTask.ContinueWith(b =>
534
                        {
535
                            //And apply it to the temp file
536
                            var buffer = b.Result;
537
                            var stream =FileAsync.OpenWrite(tempPath);
538
                            stream.Seek(start,SeekOrigin.Begin);
539
                            return stream.WriteAsync(buffer, 0, buffer.Length)
540
                                .ContinueWith(s => stream.Close());
541

  
542
                        }).Unwrap()
543
                        .Wait();
544
                        Trace.TraceInformation("[BLOCK GET] FINISH {0} of {1} for {2}", i, upHashes.Length, localPath);
545
                    }
546

  
547
                }
548
                
549

  
550
                //Replace the existing file with the temp
551
                if (File.Exists(localPath))
552
                    File.Replace(tempPath, localPath, null, true);
553
                else
554
                    File.Move(tempPath, localPath);
555
                Trace.TraceInformation("[BLOCK GET] COMPLETE {0}", localPath);
556
            });
557
        }
558

  
559
        //Change the file's size, possibly truncating or adding to it
560
        private static void SetFileSize(string filePath, long fileSize)
561
        {
562
            if (String.IsNullOrWhiteSpace(filePath))
563
                throw new ArgumentNullException("filePath");
564
            if (!Path.IsPathRooted(filePath))
565
                throw new ArgumentException("The filePath must be rooted", "filePath");
566
            if (fileSize<0)
567
                throw new ArgumentOutOfRangeException("fileSize");
568
            Contract.EndContractBlock();
569

  
570
            using (var stream = File.Open(filePath, FileMode.OpenOrCreate, FileAccess.Write))
571
            {
572
                stream.SetLength(fileSize);
573
            }
574
        }
575

  
576
        //Check whether we should copy the local file to a temp path        
577
        private static bool ShouldCopy(string localPath, string tempPath)
578
        {
579
            //No need to copy if there is no file
580
            if (!File.Exists(localPath))            
581
                return false;
582

  
583
            //If there is no temp file, go ahead and copy
584
            if (!File.Exists(tempPath))                
585
                return true;
586

  
587
            //If there is a temp file and is newer than the actual file, don't copy
588
            var localLastWrite = File.GetLastWriteTime(localPath);
589
            var tempLastWrite = File.GetLastWriteTime(tempPath);
590
            
591
            //This could mean there is an interrupted download in progress
592
            return (tempLastWrite < localLastWrite);
593
        }
594

  
417 595
        private void UploadCloudFile(FileInfo fileInfo, string hash,string topHash)
418 596
        {
419 597
            if (fileInfo==null)
......
436 614

  
437 615

  
438 616
                //Even if GetObjectInfo times out, we can proceed with the upload            
439
                var info = CloudClient.GetObjectInfo(_pithosContainer, url);
617
                var info = CloudClient.GetObjectInfo(PithosContainer, url);
440 618

  
441 619
                //If the file hashes match, abort the upload
442 620
                if (hash.Equals(info.Hash, StringComparison.InvariantCultureIgnoreCase) ||
......
448 626
                    return;
449 627
                }
450 628

  
451
                //If the file was last modified by a hashmap operation, its hash will be a treehash
452
                //We must load the local file's treehash and check it against the object's hash 
453
                //The hash will be stored under the fragments path, in the same relative path as to
454
                //the pithos root path
455
                var relativePath = fileInfo.AsRelativeTo(FileAgent.RootPath);
456
/*
457
                var hashPath = Path.Combine(FileAgent.FragmentsPath, relativePath) + ".hashmap";                
458
                //Load the hash or calculate a new one
459
                var hashFileExists = File.Exists(hashPath);
460
                var treeHash = hashFileExists ?
461
                    TreeHash.LoadTreeHash(hashPath, Guid.NewGuid()).Result
462
                    : Signature.CalculateTreeHash(fileInfo.FullName, _blockSize, _blockHash);
463
                //If this is a new treehash, store it
464
                if (!hashFileExists)
465
                    treeHash.Save(hashPath);
466

  
467
                var topHash = treeHash.TopHash.ToHashString();
468
                if (topHash.Equals(info.Hash, StringComparison.InvariantCultureIgnoreCase))
629
                //Mark the file as modified while we upload it
630
                var setStatus = Task.Factory.StartNew(() =>
631
                        StatusKeeper.SetFileOverlayStatus(fullFileName, FileOverlayStatus.Modified));
632
                //And then upload it
633

  
634
                //If the file is larger than the block size, try a hashmap PUT
635
                if (fileInfo.Length > BlockSize )
469 636
                {
470
                    this.StatusKeeper.StoreInfo(fileInfo.FullName, info);
471
                    Trace.TraceInformation("Skip upload of {0}, treehashes match", fileInfo.FullName);
472
                    return;                    
637
                    //To upload using a hashmap
638
                    //First, calculate the tree hash
639
                    var treeHash = Signature.CalculateTreeHashAsync(fileInfo.FullName, BlockSize, BlockHash);                    
640
                    
641
                    var putHashMap = setStatus.ContinueWith(t=>
642
                        UploadWithHashMap(fileInfo,url,treeHash));
643
                    
644
                    putHashMap.Wait();
473 645
                }
474
*/
475
            
476

  
477

  
478

  
479
                //Does the object exist or not?
480

  
481
                //Calculate or load the file's hashmap                
482

  
483
                /*
484
                 if (fileInfo.Bytes > _blockSize)
485
                 {
486
                  var hashPath = Path.Combine(FileAgent.FragmentsPath, relativePath) + ".hashmap";
487
                
488
                //Load the existing hashPath, if we have one, or calculate a new one
489
                var treeHash = File.Exists(hashPath) ?
490
                    TreeHash.LoadTreeHash(hashPath, Guid.NewGuid()).Result 
491
                    : Signature.CalculateTreeHash(fileInfo.FullName, _blockSize , _blockHash);
492
                                
493
                //Do the Hashmap Put
494
                Task<string> putHashMap=CloudClient.PutHashMap(_pithosContainer,url, treeHash);
495
                putHashMap.Wait();
496
                 }*/                    
497

  
498
            //
499

  
500

  
501
                var putOrUpdate = Task.Factory.StartNew(() =>
646
                else
502 647
                {
503
                    //Mark the file as modified while we upload it
504
                    var setStatus = Task.Factory.StartNew(() =>
505
                        StatusKeeper.SetFileOverlayStatus(fullFileName,FileOverlayStatus.Modified));
506
                    //And then upload it
648
                    //Otherwise do a regular PUT
507 649
                    var put = setStatus.ContinueWith(t =>
508
                        CloudClient.PutObject(_pithosContainer,url,fullFileName, hash));
509
                });
510
                putOrUpdate.Wait();
650
                        CloudClient.PutObject(PithosContainer,url,fullFileName,hash));
651
                    put.Wait();
652
                }
511 653
                //If everything succeeds, change the file and overlay status to normal
512 654
                this.StatusKeeper.SetFileState(fullFileName, FileStatus.Unchanged, FileOverlayStatus.Normal);
513 655
            }
......
516 658
            StatusNotification.NotifyChangedFile(fullFileName);
517 659
        }
518 660

  
661
        public void UploadWithHashMap(FileInfo fileInfo,string url,Task<TreeHash> treeHash)
662
        {
663
            var fullFileName = fileInfo.FullName;
664

  
665
            //Send the hashmap to the server            
666
            var hashPut = CloudClient.PutHashMap(PithosContainer, url, treeHash.Result);
667
            var missingHashes = hashPut.Result;
668
            if (missingHashes.Count == 0)
669
                return;
670

  
671
            var buffer = new byte[BlockSize];                      
672
            foreach (var missingHash in missingHashes)
673
            {
674
                int blockIndex = -1;
675
                try
676
                {
677
                    //Find the proper block
678
                    blockIndex = treeHash.Result.HashDictionary[missingHash];
679
                    var offset = blockIndex*BlockSize;
680

  
681
                    var read = fileInfo.Read(buffer, offset, BlockSize);
682
                    if (read > 0)
683
                    {
684
                        //Copy the actual block data out of the buffer
685
                        var data = new byte[read];
686
                        Buffer.BlockCopy(buffer, 0, data, 0, read);
687

  
688
                        //And POST them
689
                        CloudClient.PostBlock(PithosContainer, data).Wait();
690
                        Trace.TraceInformation("[BLOCK] Block {0} of {1} uploaded", blockIndex,
691
                            fullFileName);
692
                    }
693
                }
694
                catch (Exception exc)
695
                {
696
                    Trace.TraceError("[ERROR] uploading block {0} of {1}\n{2}", blockIndex, fullFileName, exc);
697
                }
698
            }
699

  
700
            UploadWithHashMap(fileInfo, url, treeHash);
701
            
702
        }
703

  
519 704

  
520 705
    }
521 706

  

Also available in: Unified diff