Statistics
| Branch: | Revision:

root / trunk / Pithos.Core / Agents / NetworkAgent.cs @ a27aa447

History | View | Annotate | Download (31 kB)

1
using System;
2
using System.Collections.Generic;
3
using System.ComponentModel.Composition;
4
using System.Diagnostics;
5
using System.Diagnostics.Contracts;
6
using System.IO;
7
using System.Linq;
8
using System.Text;
9
using System.Threading;
10
using System.Threading.Tasks;
11
using Pithos.Interfaces;
12
using Pithos.Network;
13

    
14
namespace Pithos.Core.Agents
15
{
16
    [Export]
17
    public class NetworkAgent
18
    {
19
        private Agent<CloudAction> _agent;
20

    
21
        [Import]
22
        public IStatusKeeper StatusKeeper { get; set; }
23
        
24
        public IStatusNotification StatusNotification { get; set; }
25
        [Import]
26
        public ICloudClient CloudClient { get; set; }
27

    
28
        [Import]
29
        public FileAgent FileAgent {get;set;}
30

    
31
        /*
32
        [Import]
33
        public IPithosWorkflow Workflow { get; set; }
34
*/
35

    
36

    
37
        public string PithosContainer { get; set; }
38
        public string TrashContainer { get; private set; }
39

    
40
        public int BlockSize { get; set; }
41
        public string BlockHash { get; set; }
42

    
43

    
44
        public void Start(string pithosContainer, string trashContainer, int blockSize, string blockHash)
45
        {
46
            if (String.IsNullOrWhiteSpace(pithosContainer))
47
                throw new ArgumentNullException("pithosContainer");
48
            if (String.IsNullOrWhiteSpace(trashContainer))
49
                throw new ArgumentNullException("trashContainer");
50
            Contract.EndContractBlock();
51

    
52
            PithosContainer = pithosContainer;
53
            TrashContainer = trashContainer;
54
            BlockSize = blockSize;
55
            BlockHash = blockHash;
56

    
57

    
58
            _agent = Agent<CloudAction>.Start(inbox =>
59
            {
60
                Action loop = null;
61
                loop = () =>
62
                {
63
                    var message = inbox.Receive();
64

    
65
/*
66
                    var process=Process(message);
67
                    inbox.LoopAsync(process, loop);
68
*/
69

    
70
/*
71
                    process1.ContinueWith(t =>
72
                    {
73
                        inbox.DoAsync(loop);
74
                        if (t.IsFaulted)
75
                        {
76
                            var ex = t.Exception.InnerException;
77
                            if (ex is OperationCanceledException)
78
                                inbox.Stop();
79
                        }
80
                    });
81
*/
82
                    //inbox.DoAsync(loop);
83

    
84

    
85
                    var process = message.ContinueWith(t =>
86
                    {
87
                        var action = t.Result;
88
                        
89
                        Process(action);
90
                        inbox.DoAsync(loop);
91
                    });
92

    
93
                    process.ContinueWith(t =>
94
                    {
95
                        inbox.DoAsync(loop);
96
                        if (t.IsFaulted)
97
                        {
98
                            var ex = t.Exception.InnerException;
99
                            if (ex is OperationCanceledException)
100
                                inbox.Stop();
101
                        }
102
                    });
103

    
104

    
105
                };
106
                loop();
107
            });
108
        }
109

    
110

    
111
        public void Post(CloudAction cloudAction)
112
        {
113
            if (cloudAction == null)
114
                throw new ArgumentNullException("cloudAction");
115
            Contract.EndContractBlock();
116
            
117
            //If the action targets a local file, add a treehash calculation
118
            if (cloudAction.LocalFile != null)
119
            {
120
                cloudAction.TopHash = new Lazy<string>(() => Signature.CalculateTreeHashAsync(cloudAction.LocalFile, 
121
                                    BlockSize, BlockHash).Result
122
                                     .TopHash.ToHashString());
123

    
124
            }
125
            _agent.Post(cloudAction);
126
        }
127

    
128
        class ObjectInfoByNameComparer:IEqualityComparer<ObjectInfo>
129
        {
130
            public bool Equals(ObjectInfo x, ObjectInfo y)
131
            {
132
                return x.Name.Equals(y.Name,StringComparison.InvariantCultureIgnoreCase);
133
            }
134

    
135
            public int GetHashCode(ObjectInfo obj)
136
            {
137
                return obj.Name.ToLower().GetHashCode();
138
            }
139
        }
140

    
141
        public Task ProcessRemoteFiles(string accountPath,DateTime? since=null)
142
        {            
143

    
144
            Trace.CorrelationManager.StartLogicalOperation();
145
            Trace.TraceInformation("[LISTENER] Scheduled");
146
            var listObjects = Task.Factory.StartNewDelayed(10000).ContinueWith(t =>
147
                CloudClient.ListObjects(PithosContainer,since));
148

    
149
            DateTime nextSince = DateTime.Now.AddSeconds(-1);
150

    
151
            var enqueueFiles = listObjects.ContinueWith(task =>
152
            {
153
                if (task.IsFaulted)
154
                {
155
                    //ListObjects failed at this point, need to reschedule
156
                    Trace.TraceError("[FAIL] ListObjects in ProcessRemoteFiles with {0}", task.Exception);
157
                    ProcessRemoteFiles(accountPath, since);
158
                    return;
159
                }
160
                Trace.CorrelationManager.StartLogicalOperation("Listener");
161
                Trace.TraceInformation("[LISTENER] Start Processing");
162

    
163
                var remoteObjects = task.Result;
164

    
165
                var remote=from info in remoteObjects
166
                                  let name=info.Name
167
                                  where !name.EndsWith(".ignore",StringComparison.InvariantCultureIgnoreCase) &&
168
                                  !name.StartsWith("fragments/",StringComparison.InvariantCultureIgnoreCase)
169
                                select info;
170

    
171
                var commonObjects = new List<Tuple<ObjectInfo, FileInfo,FileState>>();
172
                var remoteOnly = new List<ObjectInfo>();
173
                
174
                //In order to avoid multiple iterations over the files, we iterate only once
175
                //over the remote files
176
                foreach (var objectInfo in remote)
177
                {
178
                    var relativePath= objectInfo.Name.RelativeUrlToFilePath();// fileInfo.AsRelativeUrlTo(FileAgent.RootPath);
179
                    //and remove any matching objects from the list, adding them to the commonObjects list
180
                    if (FileAgent.Exists(relativePath))
181
                    {
182
                        var localFile = FileAgent.GetFileInfo(relativePath);
183
                        var state=FileState.FindByFilePath(localFile.FullName);                        
184
                        commonObjects.Add(Tuple.Create(objectInfo, localFile,state));
185
                    }
186
                    else
187
                        //If there is no match we add them to the localFiles list
188
                        remoteOnly.Add(objectInfo);
189
                }
190

    
191
                //At the end of the iteration, the *remote* list will contain the files that exist 
192
                //only on the server
193

    
194
                //Remote files should be downloaded
195
                var actionsForRemote = from upFile in remoteOnly
196
                                       select new CloudAction(CloudActionType.DownloadUnconditional,upFile);
197

    
198
                //Common files should be checked on a per-case basis to detect differences, which is newer
199
                var actionsForCommon = from pair in commonObjects
200
                                       let objectInfo = pair.Item1
201
                                       let localFile = pair.Item2
202
                                       let state=pair.Item3
203
                                       select new CloudAction(CloudActionType.MustSynch, 
204
                                           localFile, objectInfo,state);
205
                
206

    
207
                //Collect all the actions
208
                var allActions = actionsForRemote.Union(actionsForCommon);
209

    
210
                //And remove those that are already being processed by the agent
211
                var distinctActions =allActions
212
                        .Except(_agent.GetEnumerable(), new PithosMonitor.LocalFileComparer())
213
                        .ToList();
214

    
215
                //Queue all the actions
216
                foreach (var message in distinctActions)
217
                {
218
                    Post(message);
219
                }
220

    
221

    
222
                if(remoteOnly.Count>0)
223
                    StatusNotification.NotifyChange(String.Format("Processing {0} new files", remoteOnly.Count));
224

    
225
                Trace.TraceInformation("[LISTENER] End Processing");
226
                Trace.CorrelationManager.StopLogicalOperation();
227

    
228
            });
229

    
230
            var loop = enqueueFiles.ContinueWith(t =>
231
            {
232
                if (t.IsFaulted)
233
                {
234
                    Trace.TraceError("[LISTENER] Exception: {0}", t.Exception);
235
                }
236
                else
237
                {
238
                    Trace.TraceInformation("[LISTENER] Finished");
239
                }
240
                ProcessRemoteFiles(accountPath, nextSince);
241
                
242
            });
243
            return loop;
244
        }
245

    
246
        
247
        private Task Process(Task<CloudAction> action)
248
        {
249
            return action.ContinueWith(t=> Process(t.Result));
250
        }
251

    
252

    
253
        private void Process(CloudAction action)
254
        {
255
            if (action==null)
256
                throw new ArgumentNullException("action");
257
            Contract.EndContractBlock();
258

    
259
            Trace.TraceInformation("[ACTION] Start Processing {0}:{1}->{2}", action.Action, action.LocalFile, action.CloudFile.Name);
260
            var localFile = action.LocalFile;
261
            var cloudFile = action.CloudFile;
262
            var downloadPath = (cloudFile == null) ? String.Empty
263
                                    : Path.Combine(FileAgent.RootPath, cloudFile.Name.RelativeUrlToFilePath());
264
            
265
            try
266
            {
267
                switch (action.Action)
268
                {
269
                    case CloudActionType.UploadUnconditional:
270
                        UploadCloudFile(localFile, action.LocalHash.Value,action.TopHash.Value);
271
                        break;
272
                    case CloudActionType.DownloadUnconditional:
273
                        DownloadCloudFile(PithosContainer, new Uri(cloudFile.Name,UriKind.Relative), downloadPath);
274
                        break;
275
                    case CloudActionType.DeleteCloud:
276
                        DeleteCloudFile(cloudFile.Name);
277
                        break;
278
                    case CloudActionType.RenameCloud:
279
                        RenameCloudFile(action.OldFileName, action.NewPath, action.NewFileName);
280
                        break;
281
                    case CloudActionType.MustSynch:
282
                        if (File.Exists(downloadPath))
283
                        {                            
284
                            var cloudHash = cloudFile.Hash;
285
                            var localHash = action.LocalHash.Value;
286
                            var topHash = action.TopHash.Value;
287
                            //Not enough to compare only the local hashes, also have to compare the tophashes
288
                            if (!cloudHash.Equals(localHash, StringComparison.InvariantCultureIgnoreCase) &&
289
                                !cloudHash.Equals(topHash, StringComparison.InvariantCultureIgnoreCase))
290
                            {
291
                                var lastLocalTime = localFile.LastWriteTime;
292
                                var lastUpTime = cloudFile.Last_Modified;
293
                                if (lastUpTime <= lastLocalTime)
294
                                {
295
                                    //Local change while the app was down or Files in conflict
296
                                    //Maybe need to store version as well, to check who has the latest version
297

    
298
                                    //StatusKeeper.SetFileOverlayStatus(downloadPath, FileOverlayStatus.Conflict);
299
                                    UploadCloudFile(localFile, action.LocalHash.Value,action.TopHash.Value);
300
                                }
301
                                else
302
                                {
303
                                    var status = StatusKeeper.GetFileStatus(downloadPath);
304
                                    switch (status)
305
                                    {
306
                                        case FileStatus.Unchanged:
307
                                            //It he cloud file has a later date, it was modified by another user or computer.
308
                                            //If the local file's status is Unchanged, we should go on and download the cloud file
309
                                            DownloadCloudFile(PithosContainer, new Uri(action.CloudFile.Name,UriKind.Relative), downloadPath);
310
                                            break;
311
                                        case FileStatus.Modified:
312
                                            //If the local file is Modified, we may have a conflict. In this case we should mark the file as Conflict
313
                                            //We can't ensure that a file modified online since the last time will appear as Modified, unless we 
314
                                            //index all files before we start listening.
315
                                            StatusKeeper.SetFileOverlayStatus(downloadPath, FileOverlayStatus.Conflict);
316
                                            break;
317
                                        case FileStatus.Created:
318
                                            //If the local file is Created, it means that the local and cloud files aren't related yet have the same name
319
                                            //In this case we must mark the file as in conflict
320
                                            //Other cases should never occur. Mark them as Conflict as well but log a warning
321
                                            StatusKeeper.SetFileOverlayStatus(downloadPath,FileOverlayStatus.Conflict);
322
                                            break;
323
                                        default:
324
                                            //If the local file is Created, it means that the local and cloud files aren't related yet have the same name
325
                                            //In this case we must mark the file as in conflict
326
                                            //Other cases should never occur. Mark them as Conflict as well but log a warning
327
                                            StatusKeeper.SetFileOverlayStatus(downloadPath,FileOverlayStatus.Conflict);
328
                                            Trace.TraceWarning("Unexcepted status {0} for file {1}->{2}",status,downloadPath,action.CloudFile.Name);
329
                                            break;
330
                                    }
331
                                }
332
                            }
333
                        }
334
                        else
335
                            DownloadCloudFile(PithosContainer, new Uri(action.CloudFile.Name,UriKind.Relative), downloadPath);
336
                        break;
337
                }
338
                Trace.TraceInformation("[ACTION] End Processing {0}:{1}->{2}", action.Action, action.LocalFile, action.CloudFile.Name);
339
            }
340
            catch (OperationCanceledException)
341
            {
342
                throw;
343
            }
344
            catch (Exception exc)
345
            {
346
                Trace.TraceError("[REQUEUE] {0} : {1} -> {2} due to exception\r\n{3}",
347
                                action.Action, action.LocalFile, action.CloudFile, exc);
348

    
349
                _agent.Post(action);
350
            }
351

    
352
        }
353

    
354
        
355

    
356
        private void RenameCloudFile(string oldFileName, string newPath, string newFileName)
357
        {
358
            if (String.IsNullOrWhiteSpace(oldFileName))
359
                throw new ArgumentNullException("oldFileName");
360
            if (String.IsNullOrWhiteSpace(oldFileName))
361
                throw new ArgumentNullException("newPath");
362
            if (String.IsNullOrWhiteSpace(oldFileName))
363
                throw new ArgumentNullException("newFileName");
364
            Contract.EndContractBlock();
365
            //The local file is already renamed
366
            this.StatusKeeper.SetFileOverlayStatus(newPath, FileOverlayStatus.Modified);
367

    
368
            CloudClient.MoveObject(PithosContainer, oldFileName, PithosContainer, newFileName);
369

    
370
            this.StatusKeeper.SetFileStatus(newPath, FileStatus.Unchanged);
371
            this.StatusKeeper.SetFileOverlayStatus(newPath, FileOverlayStatus.Normal);
372
            NativeMethods.RaiseChangeNotification(newPath);
373
        }
374

    
375
        private void DeleteCloudFile(string fileName)
376
        {            
377
            if (String.IsNullOrWhiteSpace(fileName))
378
                throw new ArgumentNullException("fileName");
379
            if (Path.IsPathRooted(fileName))
380
                throw new ArgumentException("The fileName should not be rooted","fileName");
381
            Contract.EndContractBlock();
382

    
383
            this.StatusKeeper.SetFileOverlayStatus(fileName, FileOverlayStatus.Modified);
384

    
385
            CloudClient.MoveObject(PithosContainer, fileName, TrashContainer, fileName);
386

    
387
            this.StatusKeeper.ClearFileStatus(fileName);
388
            this.StatusKeeper.RemoveFileOverlayStatus(fileName);
389
        }
390

    
391
        //Download a file.
392
        private void DownloadCloudFile(string container, Uri relativeUrl, string localPath)
393
        {
394
            if (String.IsNullOrWhiteSpace(container))
395
                throw new ArgumentNullException("container");
396
            if (relativeUrl==null)
397
                throw new ArgumentNullException("relativeUrl");
398
            if (String.IsNullOrWhiteSpace(localPath))
399
                throw new ArgumentNullException("localPath");
400
            if (!Path.IsPathRooted(localPath))
401
                throw new ArgumentException("The localPath must be rooted", "localPath");
402
            Contract.EndContractBlock();
403

    
404
            var url = relativeUrl.ToString();
405
            if (url.EndsWith(".ignore",StringComparison.InvariantCultureIgnoreCase))
406
                return;
407

    
408
            //Are we already downloading or uploading the file? 
409
            using (var gate=NetworkGate.Acquire(localPath, NetworkOperation.Downloading))
410
            {
411
                if (gate.Failed)
412
                    return;
413
                //The file's hashmap will be stored in the same location with the extension .hashmap
414
                //var hashPath = Path.Combine(FileAgent.FragmentsPath, relativePath + ".hashmap");
415
                
416
                //Retrieve the hashmap from the server
417
                var getHashMap = CloudClient.GetHashMap(container,url);                
418
                var downloadTask= getHashMap.ContinueWith(t =>
419
                {
420
                    var serverHash=t.Result;
421
                    //If it's a small file
422
                    return serverHash.Hashes.Count == 1 
423
                        //Download it in one go
424
                        ? DownloadEntireFile(container, relativeUrl, localPath) 
425
                        //Otherwise download it block by block
426
                        : DownloadWithBlocks(container, relativeUrl, localPath, serverHash);
427
                });
428

    
429
                
430

    
431
                //Retrieve the object's metadata
432
                var getInfo = downloadTask.ContinueWith(t =>
433
                            CloudClient.GetObjectInfo(container, url));
434
                //And store it
435
                var storeInfo = getInfo.ContinueWith(t =>
436
                            StatusKeeper.StoreInfo(localPath, t.Result));
437

    
438
                storeInfo.Wait();
439
                StatusNotification.NotifyChangedFile(localPath);
440

    
441
            }
442
        }
443

    
444
        //Download a small file with a single GET operation
445
        private Task DownloadEntireFile(string container, Uri relativeUrl, string localPath)
446
        {
447
            if (String.IsNullOrWhiteSpace(container))
448
                throw new ArgumentNullException("container");
449
            if (relativeUrl == null)
450
                throw new ArgumentNullException("relativeUrl");
451
            if (String.IsNullOrWhiteSpace(localPath))
452
                throw new ArgumentNullException("localPath");
453
            if (!Path.IsPathRooted(localPath))
454
                throw new ArgumentException("The localPath must be rooted", "localPath");
455
            Contract.EndContractBlock();
456

    
457
            //Calculate the relative file path for the new file
458
            var relativePath = relativeUrl.RelativeUriToFilePath();
459
            //The file will be stored in a temporary location while downloading with an extension .download
460
            var tempPath = Path.Combine(FileAgent.FragmentsPath, relativePath + ".download");
461
            //Make sure the target folder exists. DownloadFileTask will not create the folder
462
            var directoryPath = Path.GetDirectoryName(tempPath);
463
            if (!Directory.Exists(directoryPath))
464
                Directory.CreateDirectory(directoryPath);
465

    
466
            //Download the object to the temporary location
467
            var getObject = CloudClient.GetObject(container, relativeUrl.ToString(), tempPath).ContinueWith(t =>
468
            {
469
                //And move it to its actual location once downloading is finished
470
                if (File.Exists(localPath))
471
                    File.Replace(tempPath,localPath,null,true);
472
                else
473
                    File.Move(tempPath,localPath);
474
            });
475
            return getObject;
476
        }
477

    
478
        public Task DownloadWithBlocks(string container,Uri relativeUrl, string localPath,TreeHash serverHash)
479
        {
480
            if (String.IsNullOrWhiteSpace(container))
481
                throw new ArgumentNullException("container");
482
            if (relativeUrl == null)
483
                throw new ArgumentNullException("relativeUrl");
484
            if (String.IsNullOrWhiteSpace(localPath))
485
                throw new ArgumentNullException("localPath");
486
            if (!Path.IsPathRooted(localPath))
487
                throw new ArgumentException("The localPath must be rooted", "localPath");
488
            if(serverHash==null)
489
                throw new ArgumentNullException("serverHash");
490
            Contract.EndContractBlock();
491

    
492
            //Calculate the relative file path for the new file
493
            var relativePath = relativeUrl.RelativeUriToFilePath();
494
            //The file will be stored in a temporary location while downloading with an extension .download
495
            var tempPath = Path.Combine(FileAgent.FragmentsPath, relativePath + ".download");
496
            var directoryPath = Path.GetDirectoryName(tempPath);
497
            if (!Directory.Exists(directoryPath))
498
                Directory.CreateDirectory(directoryPath);
499
            
500
            //If the local file exists we should make a copy of it to the
501
            //fragments folder, unless a newer temp copy already exists, which
502
            //means there is an interrupted download
503
            if (ShouldCopy(localPath, tempPath))
504
                File.Copy(localPath, tempPath, true);    
505

    
506
            //Set the size of the file to the size specified in the treehash
507
            //This will also create an empty file if the file doesn't exist            
508
            SetFileSize(tempPath, serverHash.Bytes);
509

    
510
            return Task.Factory.StartNew(() =>
511
            {
512
                //Calculate the temp file's treehash
513
                var treeHash = Signature.CalculateTreeHashAsync(tempPath, this.BlockSize,BlockHash).Result;
514
                
515
                //And compare it with the server's hash
516
                var upHashes = serverHash.GetHashesAsStrings();
517
                var localHashes = treeHash.HashDictionary;
518
                for (int i = 0; i < upHashes.Length; i++)
519
                {
520
                    //For every non-matching hash
521
                    if (!localHashes.ContainsKey(upHashes[i]))
522
                    {
523
                        Trace.TraceInformation("[BLOCK GET] START {0} of {1} for {2}",i,upHashes.Length,localPath);
524
                        var start = i*BlockSize;
525
                        long? end = null;
526
                        if (i < upHashes.Length - 1 )
527
                            end= ((i + 1)*BlockSize) ;
528
                            
529
                        //Get its block
530
                        var blockTask = CloudClient.GetBlock(container, relativeUrl,
531
                                                            start, end);
532

    
533
                        blockTask.ContinueWith(b =>
534
                        {
535
                            //And apply it to the temp file
536
                            var buffer = b.Result;
537
                            var stream =FileAsync.OpenWrite(tempPath);
538
                            stream.Seek(start,SeekOrigin.Begin);
539
                            return stream.WriteAsync(buffer, 0, buffer.Length)
540
                                .ContinueWith(s => stream.Close());
541

    
542
                        }).Unwrap()
543
                        .Wait();
544
                        Trace.TraceInformation("[BLOCK GET] FINISH {0} of {1} for {2}", i, upHashes.Length, localPath);
545
                    }
546

    
547
                }
548
                
549

    
550
                //Replace the existing file with the temp
551
                if (File.Exists(localPath))
552
                    File.Replace(tempPath, localPath, null, true);
553
                else
554
                    File.Move(tempPath, localPath);
555
                Trace.TraceInformation("[BLOCK GET] COMPLETE {0}", localPath);
556
            });
557
        }
558

    
559
        //Change the file's size, possibly truncating or adding to it
560
        private static void SetFileSize(string filePath, long fileSize)
561
        {
562
            if (String.IsNullOrWhiteSpace(filePath))
563
                throw new ArgumentNullException("filePath");
564
            if (!Path.IsPathRooted(filePath))
565
                throw new ArgumentException("The filePath must be rooted", "filePath");
566
            if (fileSize<0)
567
                throw new ArgumentOutOfRangeException("fileSize");
568
            Contract.EndContractBlock();
569

    
570
            using (var stream = File.Open(filePath, FileMode.OpenOrCreate, FileAccess.Write))
571
            {
572
                stream.SetLength(fileSize);
573
            }
574
        }
575

    
576
        //Check whether we should copy the local file to a temp path        
577
        private static bool ShouldCopy(string localPath, string tempPath)
578
        {
579
            //No need to copy if there is no file
580
            if (!File.Exists(localPath))            
581
                return false;
582

    
583
            //If there is no temp file, go ahead and copy
584
            if (!File.Exists(tempPath))                
585
                return true;
586

    
587
            //If there is a temp file and is newer than the actual file, don't copy
588
            var localLastWrite = File.GetLastWriteTime(localPath);
589
            var tempLastWrite = File.GetLastWriteTime(tempPath);
590
            
591
            //This could mean there is an interrupted download in progress
592
            return (tempLastWrite < localLastWrite);
593
        }
594

    
595
        private void UploadCloudFile(FileInfo fileInfo, string hash,string topHash)
596
        {
597
            if (fileInfo==null)
598
                throw new ArgumentNullException("fileInfo");
599
            if (String.IsNullOrWhiteSpace(hash))
600
                throw new ArgumentNullException("hash");
601
            Contract.EndContractBlock();
602

    
603
            if (fileInfo.Extension.Equals("ignore", StringComparison.InvariantCultureIgnoreCase))
604
                return;
605
            
606
            var url = fileInfo.AsRelativeUrlTo(FileAgent.RootPath);
607

    
608
            var fullFileName = fileInfo.FullName;
609
            using(var gate=NetworkGate.Acquire(fullFileName,NetworkOperation.Uploading))
610
            {
611
                //Abort if the file is already being uploaded or downloaded
612
                if (gate.Failed)
613
                    return;
614

    
615

    
616
                //Even if GetObjectInfo times out, we can proceed with the upload            
617
                var info = CloudClient.GetObjectInfo(PithosContainer, url);
618

    
619
                //If the file hashes match, abort the upload
620
                if (hash.Equals(info.Hash, StringComparison.InvariantCultureIgnoreCase) ||
621
                    topHash.Equals(info.Hash, StringComparison.InvariantCultureIgnoreCase))
622
                {
623
                    //but store any metadata changes 
624
                    this.StatusKeeper.StoreInfo(fullFileName, info);
625
                    Trace.TraceInformation("Skip upload of {0}, hashes match", fullFileName);
626
                    return;
627
                }
628

    
629
                //Mark the file as modified while we upload it
630
                var setStatus = Task.Factory.StartNew(() =>
631
                        StatusKeeper.SetFileOverlayStatus(fullFileName, FileOverlayStatus.Modified));
632
                //And then upload it
633

    
634
                //If the file is larger than the block size, try a hashmap PUT
635
                if (fileInfo.Length > BlockSize )
636
                {
637
                    //To upload using a hashmap
638
                    //First, calculate the tree hash
639
                    var treeHash = Signature.CalculateTreeHashAsync(fileInfo.FullName, BlockSize, BlockHash);                    
640
                    
641
                    var putHashMap = setStatus.ContinueWith(t=>
642
                        UploadWithHashMap(fileInfo,url,treeHash));
643
                    
644
                    putHashMap.Wait();
645
                }
646
                else
647
                {
648
                    //Otherwise do a regular PUT
649
                    var put = setStatus.ContinueWith(t =>
650
                        CloudClient.PutObject(PithosContainer,url,fullFileName,hash));
651
                    put.Wait();
652
                }
653
                //If everything succeeds, change the file and overlay status to normal
654
                this.StatusKeeper.SetFileState(fullFileName, FileStatus.Unchanged, FileOverlayStatus.Normal);
655
            }
656
            //Notify the Shell to update the overlays
657
            NativeMethods.RaiseChangeNotification(fullFileName);
658
            StatusNotification.NotifyChangedFile(fullFileName);
659
        }
660

    
661
        public void UploadWithHashMap(FileInfo fileInfo,string url,Task<TreeHash> treeHash)
662
        {
663
            var fullFileName = fileInfo.FullName;
664

    
665
            //Send the hashmap to the server            
666
            var hashPut = CloudClient.PutHashMap(PithosContainer, url, treeHash.Result);
667
            var missingHashes = hashPut.Result;
668
            if (missingHashes.Count == 0)
669
                return;
670

    
671
            var buffer = new byte[BlockSize];                      
672
            foreach (var missingHash in missingHashes)
673
            {
674
                int blockIndex = -1;
675
                try
676
                {
677
                    //Find the proper block
678
                    blockIndex = treeHash.Result.HashDictionary[missingHash];
679
                    var offset = blockIndex*BlockSize;
680

    
681
                    var read = fileInfo.Read(buffer, offset, BlockSize);
682
                    if (read > 0)
683
                    {
684
                        //Copy the actual block data out of the buffer
685
                        var data = new byte[read];
686
                        Buffer.BlockCopy(buffer, 0, data, 0, read);
687

    
688
                        //And POST them
689
                        CloudClient.PostBlock(PithosContainer, data).Wait();
690
                        Trace.TraceInformation("[BLOCK] Block {0} of {1} uploaded", blockIndex,
691
                            fullFileName);
692
                    }
693
                }
694
                catch (Exception exc)
695
                {
696
                    Trace.TraceError("[ERROR] uploading block {0} of {1}\n{2}", blockIndex, fullFileName, exc);
697
                }
698
            }
699

    
700
            UploadWithHashMap(fileInfo, url, treeHash);
701
            
702
        }
703

    
704

    
705
    }
706

    
707
   
708

    
709

    
710
}