Statistics
| Branch: | Revision:

root / trunk / Pithos.Core / Agents / NetworkAgent.cs @ 6bcdd8e2

History | View | Annotate | Download (39.8 kB)

1
#region
2
/* -----------------------------------------------------------------------
3
 * <copyright file="NetworkAgent.cs" company="GRNet">
4
 * 
5
 * Copyright 2011-2012 GRNET S.A. All rights reserved.
6
 *
7
 * Redistribution and use in source and binary forms, with or
8
 * without modification, are permitted provided that the following
9
 * conditions are met:
10
 *
11
 *   1. Redistributions of source code must retain the above
12
 *      copyright notice, this list of conditions and the following
13
 *      disclaimer.
14
 *
15
 *   2. Redistributions in binary form must reproduce the above
16
 *      copyright notice, this list of conditions and the following
17
 *      disclaimer in the documentation and/or other materials
18
 *      provided with the distribution.
19
 *
20
 *
21
 * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
22
 * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
23
 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
24
 * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
25
 * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26
 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27
 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
28
 * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
29
 * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
30
 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
31
 * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
32
 * POSSIBILITY OF SUCH DAMAGE.
33
 *
34
 * The views and conclusions contained in the software and
35
 * documentation are those of the authors and should not be
36
 * interpreted as representing official policies, either expressed
37
 * or implied, of GRNET S.A.
38
 * </copyright>
39
 * -----------------------------------------------------------------------
40
 */
41
#endregion
42

    
43
using System;
44
using System.Collections.Generic;
45
using System.ComponentModel.Composition;
46
using System.Diagnostics;
47
using System.Diagnostics.Contracts;
48
using System.IO;
49
using System.Net;
50
using System.Reflection;
51
using System.Threading;
52
using System.Threading.Tasks;
53
using Castle.ActiveRecord;
54
using Pithos.Interfaces;
55
using Pithos.Network;
56
using log4net;
57

    
58
namespace Pithos.Core.Agents
59
{
60
    [Export]
61
    public class NetworkAgent
62
    {
63
        private static readonly ILog Log = LogManager.GetLogger(MethodBase.GetCurrentMethod().DeclaringType);
64

    
65
        private Agent<CloudAction> _agent;
66

    
67
        [System.ComponentModel.Composition.Import]
68
        private DeleteAgent _deleteAgent=new DeleteAgent();
69

    
70
        [System.ComponentModel.Composition.Import]
71
        public IStatusKeeper StatusKeeper { get; set; }
72
        
73
        public IStatusNotification StatusNotification { get; set; }
74

    
75

    
76
        [System.ComponentModel.Composition.Import]
77
        public IPithosSettings Settings { get; set; }
78

    
79
        //The Proceed signals the poll agent that it can proceed with polling. 
80
        //Essentially it stops the poll agent to give priority to the network agent
81
        //Initially the event is signalled because we don't need to pause
82
        private readonly AsyncManualResetEvent _proceedEvent = new AsyncManualResetEvent(true);
83

    
84
        public AsyncManualResetEvent ProceedEvent
85
        {
86
            get { return _proceedEvent; }
87
        }
88

    
89

    
90
        public void Start()
91
        {
92
            _agent = Agent<CloudAction>.Start(inbox =>
93
            {
94
                Action loop = null;
95
                loop = () =>
96
                {
97
                    _deleteAgent.ProceedEvent.Wait();
98
                    var message = inbox.Receive();
99
                    var process=message.Then(Process,inbox.CancellationToken);
100
                    inbox.LoopAsync(process, loop);
101
                };
102
                loop();
103
            });
104

    
105
        }
106

    
107
        private async Task Process(CloudAction action)
108
        {
109
            if (action == null)
110
                throw new ArgumentNullException("action");
111
            if (action.AccountInfo==null)
112
                throw new ArgumentException("The action.AccountInfo is empty","action");
113
            Contract.EndContractBlock();
114

    
115

    
116

    
117

    
118
            using (log4net.ThreadContext.Stacks["Operation"].Push(action.ToString()))
119
            {                
120

    
121
                var cloudFile = action.CloudFile;
122
                var downloadPath = action.GetDownloadPath();
123

    
124
                try
125
                {
126
                    _proceedEvent.Reset();
127
                    //UpdateStatus(PithosStatus.Syncing);
128
                    var accountInfo = action.AccountInfo;
129

    
130
                    if (action.Action == CloudActionType.DeleteCloud)
131
                    {                        
132
                        //Redirect deletes to the delete agent 
133
                        _deleteAgent.Post((CloudDeleteAction)action);
134
                    }
135
                    if (_deleteAgent.IsDeletedFile(action))
136
                    {
137
                        //Clear the status of already deleted files to avoid reprocessing
138
                        if (action.LocalFile != null)
139
                            StatusKeeper.ClearFileStatus(action.LocalFile.FullName);
140
                    }
141
                    else
142
                    {
143
                        switch (action.Action)
144
                        {
145
                            case CloudActionType.UploadUnconditional:
146
                                //Abort if the file was deleted before we reached this point
147
                                await UploadCloudFile(action);
148
                                break;
149
                            case CloudActionType.DownloadUnconditional:
150
                                await DownloadCloudFile(accountInfo, cloudFile, downloadPath);
151
                                break;
152
                            case CloudActionType.RenameCloud:
153
                                var moveAction = (CloudMoveAction)action;
154
                                RenameCloudFile(accountInfo, moveAction);
155
                                break;
156
                            case CloudActionType.RenameLocal:
157
                                RenameLocalFile(accountInfo, action);
158
                                break;
159
                            case CloudActionType.MustSynch:
160
                                if (!File.Exists(downloadPath) && !Directory.Exists(downloadPath))
161
                                {
162
                                    await DownloadCloudFile(accountInfo, cloudFile, downloadPath);
163
                                }
164
                                else
165
                                {
166
                                    await SyncFiles(accountInfo, action);
167
                                }
168
                                break;
169
                        }
170
                    }
171
                    Log.InfoFormat("End Processing {0}:{1}->{2}", action.Action, action.LocalFile,
172
                                           action.CloudFile.Name);
173
                }
174
                catch (WebException exc)
175
                {
176
                    Log.ErrorFormat("[WEB ERROR] {0} : {1} -> {2} due to exception\r\n{3}", action.Action, action.LocalFile, action.CloudFile, exc);
177
                }
178
                catch (OperationCanceledException)
179
                {
180
                    throw;
181
                }
182
                catch (DirectoryNotFoundException)
183
                {
184
                    Log.ErrorFormat("{0} : {1} -> {2}  failed because the directory was not found.\n Rescheduling a delete",
185
                        action.Action, action.LocalFile, action.CloudFile);
186
                    //Post a delete action for the missing file
187
                    Post(new CloudDeleteAction(action));
188
                }
189
                catch (FileNotFoundException)
190
                {
191
                    Log.ErrorFormat("{0} : {1} -> {2}  failed because the file was not found.\n Rescheduling a delete",
192
                        action.Action, action.LocalFile, action.CloudFile);
193
                    //Post a delete action for the missing file
194
                    Post(new CloudDeleteAction(action));
195
                }
196
                catch (Exception exc)
197
                {
198
                    Log.ErrorFormat("[REQUEUE] {0} : {1} -> {2} due to exception\r\n{3}",
199
                                     action.Action, action.LocalFile, action.CloudFile, exc);
200

    
201
                    _agent.Post(action);
202
                }
203
                finally
204
                {
205
                    if (_agent.IsEmpty)
206
                        _proceedEvent.Set();
207
                    UpdateStatus(PithosStatus.InSynch);                                        
208
                }
209
            }
210
        }
211

    
212

    
213
        private void UpdateStatus(PithosStatus status)
214
        {
215
            StatusKeeper.SetPithosStatus(status);
216
            StatusNotification.Notify(new Notification());
217
        }
218

    
219
        private void RenameLocalFile(AccountInfo accountInfo, CloudAction action)
220
        {
221
            if (accountInfo == null)
222
                throw new ArgumentNullException("accountInfo");
223
            if (action == null)
224
                throw new ArgumentNullException("action");
225
            if (action.LocalFile == null)
226
                throw new ArgumentException("The action's local file is not specified", "action");
227
            if (!Path.IsPathRooted(action.LocalFile.FullName))
228
                throw new ArgumentException("The action's local file path must be absolute", "action");
229
            if (action.CloudFile == null)
230
                throw new ArgumentException("The action's cloud file is not specified", "action");
231
            Contract.EndContractBlock();
232
            using (ThreadContext.Stacks["Operation"].Push("RenameLocalFile"))
233
            {
234

    
235
                //We assume that the local file already exists, otherwise the poll agent
236
                //would have issued a download request
237

    
238
                var currentInfo = action.CloudFile;
239
                var previousInfo = action.CloudFile.Previous;
240
                var fileAgent = FileAgent.GetFileAgent(accountInfo);
241

    
242
                var previousRelativepath = previousInfo.RelativeUrlToFilePath(accountInfo.UserName);
243
                var previousFile = fileAgent.GetFileSystemInfo(previousRelativepath);
244

    
245
                //In every case we need to move the local file first
246
                MoveLocalFile(accountInfo, previousFile, fileAgent, currentInfo);
247
            }
248
        }
249

    
250
        private void MoveLocalFile(AccountInfo accountInfo, FileSystemInfo previousFile, FileAgent fileAgent,
251
                                   ObjectInfo currentInfo)
252
        {
253
            var currentRelativepath = currentInfo.RelativeUrlToFilePath(accountInfo.UserName);
254
            var newPath = Path.Combine(fileAgent.RootPath, currentRelativepath);
255

    
256
            var isFile= (previousFile is FileInfo);
257
            var previousFullPath = isFile? 
258
                FileInfoExtensions.GetProperFilePathCapitalization(previousFile.FullName):
259
                FileInfoExtensions.GetProperDirectoryCapitalization(previousFile.FullName);                
260
            
261
            using (var gateOld = NetworkGate.Acquire(previousFullPath, NetworkOperation.Renaming))
262
            using (var gateNew = NetworkGate.Acquire(newPath,NetworkOperation.Renaming))
263
            using (new SessionScope(FlushAction.Auto))
264
            {
265
                if (isFile)
266
                    (previousFile as FileInfo).MoveTo(newPath);
267
                else
268
                {
269
                    (previousFile as DirectoryInfo).MoveTo(newPath);
270
                }
271
                var state = StatusKeeper.GetStateByFilePath(previousFullPath);
272
                state.FilePath = newPath;
273
                state.SaveCopy();
274
                StatusKeeper.SetFileState(previousFullPath,FileStatus.Deleted,FileOverlayStatus.Deleted);
275
            }            
276
        }
277

    
278
        private async Task SyncFiles(AccountInfo accountInfo,CloudAction action)
279
        {
280
            if (accountInfo == null)
281
                throw new ArgumentNullException("accountInfo");
282
            if (action==null)
283
                throw new ArgumentNullException("action");
284
            if (action.LocalFile==null)
285
                throw new ArgumentException("The action's local file is not specified","action");
286
            if (!Path.IsPathRooted(action.LocalFile.FullName))
287
                throw new ArgumentException("The action's local file path must be absolute","action");
288
            if (action.CloudFile== null)
289
                throw new ArgumentException("The action's cloud file is not specified", "action");
290
            Contract.EndContractBlock();
291
            using (ThreadContext.Stacks["Operation"].Push("SyncFiles"))
292
            {
293

    
294
                var localFile = action.LocalFile;
295
                var cloudFile = action.CloudFile;
296
                var downloadPath = action.LocalFile.GetProperCapitalization();
297

    
298
                var cloudHash = cloudFile.Hash.ToLower();
299
                var previousCloudHash = cloudFile.PreviousHash.ToLower();
300
                var localHash = action.LocalHash.Value.ToLower();
301
                var topHash = action.TopHash.Value.ToLower();
302

    
303
                //At this point we know that an object has changed on the server and that a local
304
                //file already exists. We need to decide whether the file has only changed on 
305
                //the server or there is a conflicting change on the client.
306
                //
307

    
308
                //Not enough to compare only the local hashes (MD5), also have to compare the tophashes            
309
                //If any of the hashes match, we are done
310
                if ((cloudHash == localHash || cloudHash == topHash))
311
                {
312
                    Log.InfoFormat("Skipping {0}, hashes match", downloadPath);
313
                    return;
314
                }
315

    
316
                //The hashes DON'T match. We need to sync
317

    
318
                // If the previous tophash matches the local tophash, the file was only changed on the server. 
319
                if (localHash == previousCloudHash)
320
                {
321
                    await DownloadCloudFile(accountInfo, cloudFile, downloadPath);
322
                }
323
                else
324
                {
325
                    //If the previous and local hash don't match, there was a local conflict
326
                    //that was not uploaded to the server. We have a conflict
327
                    ReportConflict(downloadPath);
328
                }
329
            }
330
        }
331

    
332
        private void ReportConflict(string downloadPath)
333
        {
334
            if (String.IsNullOrWhiteSpace(downloadPath))
335
                throw new ArgumentNullException("downloadPath");
336
            Contract.EndContractBlock();
337

    
338
            StatusKeeper.SetFileOverlayStatus(downloadPath, FileOverlayStatus.Conflict);
339
            UpdateStatus(PithosStatus.HasConflicts);
340
            var message = String.Format("Conflict detected for file {0}", downloadPath);
341
            Log.Warn(message);
342
            StatusNotification.NotifyChange(message, TraceLevel.Warning);
343
        }
344

    
345
        public void Post(CloudAction cloudAction)
346
        {
347
            if (cloudAction == null)
348
                throw new ArgumentNullException("cloudAction");
349
            if (cloudAction.AccountInfo==null)
350
                throw new ArgumentException("The CloudAction.AccountInfo is empty","cloudAction");
351
            Contract.EndContractBlock();
352

    
353
            _deleteAgent.ProceedEvent.Wait();
354

    
355
            //If the action targets a local file, add a treehash calculation
356
            if (!(cloudAction is CloudDeleteAction) && cloudAction.LocalFile as FileInfo != null)
357
            {
358
                var accountInfo = cloudAction.AccountInfo;
359
                var localFile = (FileInfo) cloudAction.LocalFile;
360
                if (localFile.Length > accountInfo.BlockSize)
361
                    cloudAction.TopHash =
362
                        new Lazy<string>(() => Signature.CalculateTreeHashAsync(localFile,
363
                                                                                accountInfo.BlockSize,
364
                                                                                accountInfo.BlockHash, Settings.HashingParallelism).Result
365
                                                    .TopHash.ToHashString());
366
                else
367
                {
368
                    cloudAction.TopHash = new Lazy<string>(() => cloudAction.LocalHash.Value);
369
                }
370
            }
371
            else
372
            {
373
                //The hash for a directory is the empty string
374
                cloudAction.TopHash = new Lazy<string>(() => String.Empty);
375
            }
376
            
377
            if (cloudAction is CloudDeleteAction)
378
                _deleteAgent.Post((CloudDeleteAction)cloudAction);
379
            else
380
                _agent.Post(cloudAction);
381
        }
382
       
383

    
384
        public IEnumerable<CloudAction> GetEnumerable()
385
        {
386
            return _agent.GetEnumerable();
387
        }
388

    
389
        public Task GetDeleteAwaiter()
390
        {
391
            return _deleteAgent.ProceedEvent.WaitAsync();
392
        }
393
        public CancellationToken CancellationToken
394
        {
395
            get { return _agent.CancellationToken; }
396
        }
397

    
398
        private static FileAgent GetFileAgent(AccountInfo accountInfo)
399
        {
400
            return AgentLocator<FileAgent>.Get(accountInfo.AccountPath);
401
        }
402

    
403

    
404

    
405
        private void RenameCloudFile(AccountInfo accountInfo,CloudMoveAction action)
406
        {
407
            if (accountInfo==null)
408
                throw new ArgumentNullException("accountInfo");
409
            if (action==null)
410
                throw new ArgumentNullException("action");
411
            if (action.CloudFile==null)
412
                throw new ArgumentException("CloudFile","action");
413
            if (action.LocalFile==null)
414
                throw new ArgumentException("LocalFile","action");
415
            if (action.OldLocalFile==null)
416
                throw new ArgumentException("OldLocalFile","action");
417
            if (action.OldCloudFile==null)
418
                throw new ArgumentException("OldCloudFile","action");
419
            Contract.EndContractBlock();
420

    
421
            using (ThreadContext.Stacks["Operation"].Push("RenameCloudFile"))
422
            {
423

    
424
                var newFilePath = action.LocalFile.FullName;
425

    
426
                //How do we handle concurrent renames and deletes/uploads/downloads?
427
                //* A conflicting upload means that a file was renamed before it had a chance to finish uploading
428
                //  This should never happen as the network agent executes only one action at a time
429
                //* A conflicting download means that the file was modified on the cloud. While we can go on and complete
430
                //  the rename, there may be a problem if the file is downloaded in blocks, as subsequent block requests for the 
431
                //  same name will fail.
432
                //  This should never happen as the network agent executes only one action at a time.
433
                //* A conflicting delete can happen if the rename was followed by a delete action that didn't have the chance
434
                //  to remove the rename from the queue.
435
                //  We can probably ignore this case. It will result in an error which should be ignored            
436

    
437

    
438
                //The local file is already renamed
439
                StatusKeeper.SetFileOverlayStatus(newFilePath, FileOverlayStatus.Modified);
440

    
441

    
442
                var account = action.CloudFile.Account ?? accountInfo.UserName;
443
                var container = action.CloudFile.Container;
444

    
445
                var client = new CloudFilesClient(accountInfo);
446
                //TODO: What code is returned when the source file doesn't exist?
447
                client.MoveObject(account, container, action.OldCloudFile.Name, container, action.CloudFile.Name);
448

    
449
                StatusKeeper.SetFileStatus(newFilePath, FileStatus.Unchanged);
450
                StatusKeeper.SetFileOverlayStatus(newFilePath, FileOverlayStatus.Normal);
451
                NativeMethods.RaiseChangeNotification(newFilePath);
452
            }
453
        }
454

    
455
        //Download a file.
456
        private async Task DownloadCloudFile(AccountInfo accountInfo, ObjectInfo cloudFile , string filePath)
457
        {
458
            if (accountInfo == null)
459
                throw new ArgumentNullException("accountInfo");
460
            if (cloudFile == null)
461
                throw new ArgumentNullException("cloudFile");
462
            if (String.IsNullOrWhiteSpace(cloudFile.Account))
463
                throw new ArgumentNullException("cloudFile");
464
            if (String.IsNullOrWhiteSpace(cloudFile.Container))
465
                throw new ArgumentNullException("cloudFile");
466
            if (String.IsNullOrWhiteSpace(filePath))
467
                throw new ArgumentNullException("filePath");
468
            if (!Path.IsPathRooted(filePath))
469
                throw new ArgumentException("The filePath must be rooted", "filePath");
470
            Contract.EndContractBlock();
471

    
472
            using (ThreadContext.Stacks["Operation"].Push("DownloadCloudFile"))
473
            {
474

    
475
                var localPath = Interfaces.FileInfoExtensions.GetProperFilePathCapitalization(filePath);
476
                var relativeUrl = new Uri(cloudFile.Name, UriKind.Relative);
477

    
478
                var url = relativeUrl.ToString();
479
                if (cloudFile.Name.EndsWith(".ignore", StringComparison.InvariantCultureIgnoreCase))
480
                    return;
481

    
482

    
483
                //Are we already downloading or uploading the file? 
484
                using (var gate = NetworkGate.Acquire(localPath, NetworkOperation.Downloading))
485
                {
486
                    if (gate.Failed)
487
                        return;
488

    
489
                    var client = new CloudFilesClient(accountInfo);
490
                    var account = cloudFile.Account;
491
                    var container = cloudFile.Container;
492

    
493
                    if (cloudFile.Content_Type == @"application/directory")
494
                    {
495
                        if (!Directory.Exists(localPath))
496
                            Directory.CreateDirectory(localPath);
497
                    }
498
                    else
499
                    {
500
                        //Retrieve the hashmap from the server
501
                        var serverHash = await client.GetHashMap(account, container, url);
502
                        //If it's a small file
503
                        if (serverHash.Hashes.Count == 1)
504
                            //Download it in one go
505
                            await
506
                                DownloadEntireFileAsync(accountInfo, client, cloudFile, relativeUrl, localPath,
507
                                                        serverHash);
508
                            //Otherwise download it block by block
509
                        else
510
                            await DownloadWithBlocks(accountInfo, client, cloudFile, relativeUrl, localPath, serverHash);
511

    
512
                        if (cloudFile.AllowedTo == "read")
513
                        {
514
                            var attributes = File.GetAttributes(localPath);
515
                            File.SetAttributes(localPath, attributes | FileAttributes.ReadOnly);
516
                        }
517
                    }
518

    
519
                    //Now we can store the object's metadata without worrying about ghost status entries
520
                    StatusKeeper.StoreInfo(localPath, cloudFile);
521

    
522
                }
523
            }
524
        }
525

    
526
        //Download a small file with a single GET operation
527
        private async Task DownloadEntireFileAsync(AccountInfo accountInfo, CloudFilesClient client, ObjectInfo cloudFile, Uri relativeUrl, string filePath,TreeHash serverHash)
528
        {
529
            if (client == null)
530
                throw new ArgumentNullException("client");
531
            if (cloudFile==null)
532
                throw new ArgumentNullException("cloudFile");
533
            if (relativeUrl == null)
534
                throw new ArgumentNullException("relativeUrl");
535
            if (String.IsNullOrWhiteSpace(filePath))
536
                throw new ArgumentNullException("filePath");
537
            if (!Path.IsPathRooted(filePath))
538
                throw new ArgumentException("The localPath must be rooted", "filePath");
539
            Contract.EndContractBlock();
540

    
541
            var localPath = Pithos.Interfaces.FileInfoExtensions.GetProperFilePathCapitalization(filePath);
542
            //If the file already exists
543
            if (File.Exists(localPath))
544
            {
545
                //First check with MD5 as this is a small file
546
                var localMD5 = Signature.CalculateMD5(localPath);
547
                var cloudHash=serverHash.TopHash.ToHashString();
548
                if (localMD5==cloudHash)
549
                    return;
550
                //Then check with a treehash
551
                var localTreeHash = Signature.CalculateTreeHash(localPath, serverHash.BlockSize, serverHash.BlockHash);
552
                var localHash = localTreeHash.TopHash.ToHashString();
553
                if (localHash==cloudHash)
554
                    return;
555
            }
556
            StatusNotification.Notify(new CloudNotification { Data = cloudFile });
557

    
558
            var fileAgent = GetFileAgent(accountInfo);
559
            //Calculate the relative file path for the new file
560
            var relativePath = relativeUrl.RelativeUriToFilePath();
561
            //The file will be stored in a temporary location while downloading with an extension .download
562
            var tempPath = Path.Combine(fileAgent.CachePath, relativePath + ".download");
563
            //Make sure the target folder exists. DownloadFileTask will not create the folder
564
            var tempFolder = Path.GetDirectoryName(tempPath);
565
            if (!Directory.Exists(tempFolder))
566
                Directory.CreateDirectory(tempFolder);
567

    
568
            //Download the object to the temporary location
569
            await client.GetObject(cloudFile.Account, cloudFile.Container, relativeUrl.ToString(), tempPath);
570

    
571
            //Create the local folder if it doesn't exist (necessary for shared objects)
572
            var localFolder = Path.GetDirectoryName(localPath);
573
            if (!Directory.Exists(localFolder))
574
                Directory.CreateDirectory(localFolder);            
575
            //And move it to its actual location once downloading is finished
576
            if (File.Exists(localPath))
577
                File.Replace(tempPath,localPath,null,true);
578
            else
579
                File.Move(tempPath,localPath);
580
            //Notify listeners that a local file has changed
581
            StatusNotification.NotifyChangedFile(localPath);
582

    
583
                       
584
        }
585

    
586
        //Download a file asynchronously using blocks
587
        public async Task DownloadWithBlocks(AccountInfo accountInfo, CloudFilesClient client, ObjectInfo cloudFile, Uri relativeUrl, string filePath, TreeHash serverHash)
588
        {
589
            if (client == null)
590
                throw new ArgumentNullException("client");
591
            if (cloudFile == null)
592
                throw new ArgumentNullException("cloudFile");
593
            if (relativeUrl == null)
594
                throw new ArgumentNullException("relativeUrl");
595
            if (String.IsNullOrWhiteSpace(filePath))
596
                throw new ArgumentNullException("filePath");
597
            if (!Path.IsPathRooted(filePath))
598
                throw new ArgumentException("The filePath must be rooted", "filePath");
599
            if (serverHash == null)
600
                throw new ArgumentNullException("serverHash");
601
            Contract.EndContractBlock();
602
            
603
           var fileAgent = GetFileAgent(accountInfo);
604
            var localPath = Interfaces.FileInfoExtensions.GetProperFilePathCapitalization(filePath);
605
            
606
            //Calculate the relative file path for the new file
607
            var relativePath = relativeUrl.RelativeUriToFilePath();
608
            var blockUpdater = new BlockUpdater(fileAgent.CachePath, localPath, relativePath, serverHash);
609

    
610
            
611
                        
612
            //Calculate the file's treehash
613
            var treeHash = await Signature.CalculateTreeHashAsync(localPath, serverHash.BlockSize, serverHash.BlockHash, 2);
614
                
615
            //And compare it with the server's hash
616
            var upHashes = serverHash.GetHashesAsStrings();
617
            var localHashes = treeHash.HashDictionary;
618
            ReportDownloadProgress(Path.GetFileName(localPath),0,upHashes.Length,cloudFile.Bytes);
619
            for (int i = 0; i < upHashes.Length; i++)
620
            {
621
                //For every non-matching hash
622
                var upHash = upHashes[i];
623
                if (!localHashes.ContainsKey(upHash))
624
                {
625
                    StatusNotification.Notify(new CloudNotification { Data = cloudFile });
626

    
627
                    if (blockUpdater.UseOrphan(i, upHash))
628
                    {
629
                        Log.InfoFormat("[BLOCK GET] ORPHAN FOUND for {0} of {1} for {2}", i, upHashes.Length, localPath);
630
                        continue;
631
                    }
632
                    Log.InfoFormat("[BLOCK GET] START {0} of {1} for {2}", i, upHashes.Length, localPath);
633
                    var start = i*serverHash.BlockSize;
634
                    //To download the last block just pass a null for the end of the range
635
                    long? end = null;
636
                    if (i < upHashes.Length - 1 )
637
                        end= ((i + 1)*serverHash.BlockSize) ;
638
                            
639
                    //Download the missing block
640
                    var block = await client.GetBlock(cloudFile.Account, cloudFile.Container, relativeUrl, start, end);
641

    
642
                    //and store it
643
                    blockUpdater.StoreBlock(i, block);
644

    
645

    
646
                    Log.InfoFormat("[BLOCK GET] FINISH {0} of {1} for {2}", i, upHashes.Length, localPath);
647
                }
648
                ReportDownloadProgress(Path.GetFileName(localPath), i, upHashes.Length, cloudFile.Bytes);
649
            }
650

    
651
            //Want to avoid notifications if no changes were made
652
            var hasChanges = blockUpdater.HasBlocks;
653
            blockUpdater.Commit();
654
            
655
            if (hasChanges)
656
                //Notify listeners that a local file has changed
657
                StatusNotification.NotifyChangedFile(localPath);
658

    
659
            Log.InfoFormat("[BLOCK GET] COMPLETE {0}", localPath);            
660
        }
661

    
662

    
663
        private async Task UploadCloudFile(CloudAction action)
664
        {
665
            if (action == null)
666
                throw new ArgumentNullException("action");           
667
            Contract.EndContractBlock();
668
            using(ThreadContext.Stacks["Operation"].Push("UploadCloudFile"))
669
            {
670
                try
671
                {
672
                    var accountInfo = action.AccountInfo;
673

    
674
                    var fileInfo = action.LocalFile;
675

    
676
                    if (fileInfo.Extension.Equals("ignore", StringComparison.InvariantCultureIgnoreCase))
677
                        return;
678

    
679
                    var relativePath = fileInfo.AsRelativeTo(accountInfo.AccountPath);
680
                    if (relativePath.StartsWith(FolderConstants.OthersFolder))
681
                    {
682
                        var parts = relativePath.Split('\\');
683
                        var accountName = parts[1];
684
                        var oldName = accountInfo.UserName;
685
                        var absoluteUri = accountInfo.StorageUri.AbsoluteUri;
686
                        var nameIndex = absoluteUri.IndexOf(oldName, StringComparison.Ordinal);
687
                        var root = absoluteUri.Substring(0, nameIndex);
688

    
689
                        accountInfo = new AccountInfo
690
                                          {
691
                                              UserName = accountName,
692
                                              AccountPath = Path.Combine(accountInfo.AccountPath, parts[0], parts[1]),
693
                                              StorageUri = new Uri(root + accountName),
694
                                              BlockHash = accountInfo.BlockHash,
695
                                              BlockSize = accountInfo.BlockSize,
696
                                              Token = accountInfo.Token
697
                                          };
698
                    }
699

    
700

    
701
                    var fullFileName = fileInfo.GetProperCapitalization();
702
                    using (var gate = NetworkGate.Acquire(fullFileName, NetworkOperation.Uploading))
703
                    {
704
                        //Abort if the file is already being uploaded or downloaded
705
                        if (gate.Failed)
706
                            return;
707

    
708
                        var cloudFile = action.CloudFile;
709
                        var account = cloudFile.Account ?? accountInfo.UserName;
710

    
711
                        var client = new CloudFilesClient(accountInfo);
712
                        //Even if GetObjectInfo times out, we can proceed with the upload            
713
                        var info = client.GetObjectInfo(account, cloudFile.Container, cloudFile.Name);
714

    
715
                        //If this is a read-only file, do not upload changes
716
                        if (info.AllowedTo == "read")
717
                            return;
718

    
719
                        //TODO: Check how a directory hash is calculated -> All dirs seem to have the same hash
720
                        if (fileInfo is DirectoryInfo)
721
                        {
722
                            //If the directory doesn't exist the Hash property will be empty
723
                            if (String.IsNullOrWhiteSpace(info.Hash))
724
                                //Go on and create the directory
725
                                await
726
                                    client.PutObject(account, cloudFile.Container, cloudFile.Name, fullFileName,
727
                                                     String.Empty, "application/directory");
728
                        }
729
                        else
730
                        {
731

    
732
                            var cloudHash = info.Hash.ToLower();
733

    
734
                            var hash = action.LocalHash.Value;
735
                            var topHash = action.TopHash.Value;
736

    
737
                            //If the file hashes match, abort the upload
738
                            if (hash == cloudHash || topHash == cloudHash)
739
                            {
740
                                //but store any metadata changes 
741
                                StatusKeeper.StoreInfo(fullFileName, info);
742
                                Log.InfoFormat("Skip upload of {0}, hashes match", fullFileName);
743
                                return;
744
                            }
745

    
746

    
747
                            //Mark the file as modified while we upload it
748
                            StatusKeeper.SetFileOverlayStatus(fullFileName, FileOverlayStatus.Modified);
749
                            //And then upload it
750

    
751
                            //Upload even small files using the Hashmap. The server may already contain
752
                            //the relevant block
753

    
754
                            //First, calculate the tree hash
755
                            var treeHash = await Signature.CalculateTreeHashAsync(fullFileName, accountInfo.BlockSize,
756
                                                                                  accountInfo.BlockHash, 2);
757

    
758
                            await
759
                                UploadWithHashMap(accountInfo, cloudFile, fileInfo as FileInfo, cloudFile.Name, treeHash);
760
                        }
761
                        //If everything succeeds, change the file and overlay status to normal
762
                        StatusKeeper.SetFileState(fullFileName, FileStatus.Unchanged, FileOverlayStatus.Normal);
763
                    }
764
                    //Notify the Shell to update the overlays
765
                    NativeMethods.RaiseChangeNotification(fullFileName);
766
                    StatusNotification.NotifyChangedFile(fullFileName);
767
                }
768
                catch (AggregateException ex)
769
                {
770
                    var exc = ex.InnerException as WebException;
771
                    if (exc == null)
772
                        throw ex.InnerException;
773
                    if (HandleUploadWebException(action, exc))
774
                        return;
775
                    throw;
776
                }
777
                catch (WebException ex)
778
                {
779
                    if (HandleUploadWebException(action, ex))
780
                        return;
781
                    throw;
782
                }
783
                catch (Exception ex)
784
                {
785
                    Log.Error("Unexpected error while uploading file", ex);
786
                    throw;
787
                }
788
            }
789
        }
790

    
791

    
792

    
793
        private bool HandleUploadWebException(CloudAction action, WebException exc)
794
        {
795
            var response = exc.Response as HttpWebResponse;
796
            if (response == null)
797
                throw exc;
798
            if (response.StatusCode == HttpStatusCode.Unauthorized)
799
            {
800
                Log.Error("Not allowed to upload file", exc);
801
                var message = String.Format("Not allowed to uplad file {0}", action.LocalFile.FullName);
802
                StatusKeeper.SetFileState(action.LocalFile.FullName, FileStatus.Unchanged, FileOverlayStatus.Normal);
803
                StatusNotification.NotifyChange(message, TraceLevel.Warning);
804
                return true;
805
            }
806
            return false;
807
        }
808

    
809
        public async Task UploadWithHashMap(AccountInfo accountInfo,ObjectInfo cloudFile,FileInfo fileInfo,string url,TreeHash treeHash)
810
        {
811
            if (accountInfo == null)
812
                throw new ArgumentNullException("accountInfo");
813
            if (cloudFile==null)
814
                throw new ArgumentNullException("cloudFile");
815
            if (fileInfo == null)
816
                throw new ArgumentNullException("fileInfo");
817
            if (String.IsNullOrWhiteSpace(url))
818
                throw new ArgumentNullException(url);
819
            if (treeHash==null)
820
                throw new ArgumentNullException("treeHash");
821
            if (String.IsNullOrWhiteSpace(cloudFile.Container) )
822
                throw new ArgumentException("Invalid container","cloudFile");
823
            Contract.EndContractBlock();
824

    
825
            var fullFileName = fileInfo.GetProperCapitalization();
826

    
827
            var account = cloudFile.Account ?? accountInfo.UserName;
828
            var container = cloudFile.Container ;
829

    
830
            var client = new CloudFilesClient(accountInfo);            
831
            //Send the hashmap to the server            
832
            var missingHashes =  await client.PutHashMap(account, container, url, treeHash);
833
            int block = 0;
834
            ReportUploadProgress(fileInfo.Name,block++, missingHashes.Count, fileInfo.Length);
835
            //If the server returns no missing hashes, we are done
836
            while (missingHashes.Count > 0)
837
            {
838

    
839
                var buffer = new byte[accountInfo.BlockSize];
840
                foreach (var missingHash in missingHashes)
841
                {
842
                    //Find the proper block
843
                    var blockIndex = treeHash.HashDictionary[missingHash];
844
                    var offset = blockIndex*accountInfo.BlockSize;
845

    
846
                    var read = fileInfo.Read(buffer, offset, accountInfo.BlockSize);
847

    
848
                    try
849
                    {
850
                        //And upload the block                
851
                        await client.PostBlock(account, container, buffer, 0, read);
852
                        Log.InfoFormat("[BLOCK] Block {0} of {1} uploaded", blockIndex, fullFileName);
853
                    }
854
                    catch (Exception exc)
855
                    {
856
                        Log.Error(String.Format("Uploading block {0} of {1}", blockIndex, fullFileName), exc);
857
                    }
858
                    ReportUploadProgress(fileInfo.Name,block++, missingHashes.Count, fileInfo.Length);
859
                }
860

    
861
                //Repeat until there are no more missing hashes                
862
                missingHashes = await client.PutHashMap(account, container, url, treeHash);
863
            }
864
            ReportUploadProgress(fileInfo.Name, missingHashes.Count, missingHashes.Count, fileInfo.Length);
865
        }
866

    
867
        private void ReportUploadProgress(string fileName,int block, int totalBlocks, long fileSize)
868
        {
869
            StatusNotification.Notify(new ProgressNotification(fileName,"Uploading",block,totalBlocks,fileSize) );            
870
        }
871
        private void ReportDownloadProgress(string fileName,int block, int totalBlocks, long fileSize)
872
        {
873
            StatusNotification.Notify(new ProgressNotification(fileName,"Downloading",block,totalBlocks,fileSize) );            
874
        }
875
    }
876

    
877
   
878

    
879

    
880
}