Statistics
| Branch: | Revision:

root / trunk / Pithos.Core / Agents / NetworkAgent.cs @ 14ecd267

History | View | Annotate | Download (50.1 kB)

1
// -----------------------------------------------------------------------
2
// <copyright file="NetworkAgent.cs" company="GRNET">
3
// Copyright 2011 GRNET S.A. All rights reserved.
4
// 
5
// Redistribution and use in source and binary forms, with or
6
// without modification, are permitted provided that the following
7
// conditions are met:
8
// 
9
//   1. Redistributions of source code must retain the above
10
//      copyright notice, this list of conditions and the following
11
//      disclaimer.
12
// 
13
//   2. Redistributions in binary form must reproduce the above
14
//      copyright notice, this list of conditions and the following
15
//      disclaimer in the documentation and/or other materials
16
//      provided with the distribution.
17
// 
18
// THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
19
// OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
20
// WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
21
// PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
22
// CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
23
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
24
// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
25
// USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
26
// AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
27
// LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
28
// ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
29
// POSSIBILITY OF SUCH DAMAGE.
30
// 
31
// The views and conclusions contained in the software and
32
// documentation are those of the authors and should not be
33
// interpreted as representing official policies, either expressed
34
// or implied, of GRNET S.A.
35
// </copyright>
36
// -----------------------------------------------------------------------
37

    
38
using System;
39
using System.Collections.Concurrent;
40
using System.Collections.Generic;
41
using System.ComponentModel.Composition;
42
using System.Diagnostics;
43
using System.Diagnostics.Contracts;
44
using System.IO;
45
using System.Linq;
46
using System.Net;
47
using System.Threading.Tasks;
48
using Castle.ActiveRecord;
49
using Pithos.Interfaces;
50
using Pithos.Network;
51
using log4net;
52

    
53
namespace Pithos.Core.Agents
54
{
55
    //TODO: Ensure all network operations use exact casing. Pithos is case sensitive
56
    [Export]
57
    public class NetworkAgent
58
    {
59
        private Agent<CloudAction> _agent;
60

    
61
        //A separate agent is used to execute delete actions immediatelly;
62
        private Agent<CloudDeleteAction> _deleteAgent;
63
        readonly ConcurrentDictionary<string,DateTime> _deletedFiles=new ConcurrentDictionary<string, DateTime>();
64

    
65
        [System.ComponentModel.Composition.Import]
66
        public IStatusKeeper StatusKeeper { get; set; }
67
        
68
        public IStatusNotification StatusNotification { get; set; }
69

    
70
        private static readonly ILog Log = LogManager.GetLogger("NetworkAgent");
71

    
72
        private readonly ConcurrentBag<AccountInfo> _accounts = new ConcurrentBag<AccountInfo>();
73

    
74
        public void Start()
75
        {
76

    
77
            _agent = Agent<CloudAction>.Start(inbox =>
78
            {
79
                Action loop = null;
80
                loop = () =>
81
                {
82
                    var message = inbox.Receive();
83
                    var process=message.Then(Process,inbox.CancellationToken);
84
                    inbox.LoopAsync(process, loop);
85
                };
86
                loop();
87
            });
88

    
89
            _deleteAgent = Agent<CloudDeleteAction>.Start(inbox =>
90
            {
91
                Action loop = null;
92
                loop = () =>
93
                            {
94
                                var message = inbox.Receive();
95
                                var process = message.Then(ProcessDelete,inbox.CancellationToken);
96
                                inbox.LoopAsync(process, loop);
97
                            };
98
                loop();
99

    
100
            });
101
        }
102

    
103
        private async Task Process(CloudAction action)
104
        {
105
            if (action == null)
106
                throw new ArgumentNullException("action");
107
            if (action.AccountInfo==null)
108
                throw new ArgumentException("The action.AccountInfo is empty","action");
109
            Contract.EndContractBlock();
110

    
111
            var accountInfo = action.AccountInfo;
112

    
113
            using (log4net.ThreadContext.Stacks["NETWORK"].Push("PROCESS"))
114
            {                
115
                Log.InfoFormat("[ACTION] Start Processing {0}", action);
116

    
117
                var cloudFile = action.CloudFile;
118
                var downloadPath = action.GetDownloadPath();
119

    
120
                try
121
                {
122

    
123
                    if (action.Action == CloudActionType.DeleteCloud)
124
                    {
125
                        //Redirect deletes to the delete agent 
126
                        _deleteAgent.Post((CloudDeleteAction)action);                        
127
                    }
128
                    if (IsDeletedFile(action))
129
                    {
130
                        //Clear the status of already deleted files to avoid reprocessing
131
                        this.StatusKeeper.ClearFileStatus(action.LocalFile.FullName);
132
                    }
133
                    else
134
                    {
135
                        switch (action.Action)
136
                        {
137
                            case CloudActionType.UploadUnconditional:
138
                                //Abort if the file was deleted before we reached this point
139
                                await UploadCloudFile(action);
140
                                break;
141
                            case CloudActionType.DownloadUnconditional:
142
                                await DownloadCloudFile(accountInfo, cloudFile, downloadPath);
143
                                break;
144
                            case CloudActionType.RenameCloud:
145
                                var moveAction = (CloudMoveAction) action;
146
                                RenameCloudFile(accountInfo, moveAction);
147
                                break;
148
                            case CloudActionType.MustSynch:
149
                                if (!File.Exists(downloadPath) && !Directory.Exists(downloadPath))
150
                                {
151
                                    await DownloadCloudFile(accountInfo, cloudFile, downloadPath);
152
                                }
153
                                else
154
                                {
155
                                    await SyncFiles(accountInfo, action);
156
                                }
157
                                break;
158
                        }
159
                    }
160
                    Log.InfoFormat("[ACTION] End Processing {0}:{1}->{2}", action.Action, action.LocalFile,
161
                                           action.CloudFile.Name);
162
                }
163
                catch (WebException exc)
164
                {
165
                    Log.ErrorFormat("[WEB ERROR] {0} : {1} -> {2} due to exception\r\n{3}", action.Action, action.LocalFile, action.CloudFile, exc);
166
                }
167
                catch (OperationCanceledException)
168
                {
169
                    throw;
170
                }
171
                catch (DirectoryNotFoundException)
172
                {
173
                    Log.ErrorFormat("{0} : {1} -> {2}  failed because the directory was not found.\n Rescheduling a delete",
174
                        action.Action, action.LocalFile, action.CloudFile);
175
                    //Post a delete action for the missing file
176
                    Post(new CloudDeleteAction(action));                    
177
                }
178
                catch (FileNotFoundException)
179
                {
180
                    Log.ErrorFormat("{0} : {1} -> {2}  failed because the file was not found.\n Rescheduling a delete",
181
                        action.Action, action.LocalFile, action.CloudFile);
182
                    //Post a delete action for the missing file
183
                    Post(new CloudDeleteAction(action));
184
                }
185
                catch (Exception exc)
186
                {
187
                    Log.ErrorFormat("[REQUEUE] {0} : {1} -> {2} due to exception\r\n{3}",
188
                                     action.Action, action.LocalFile, action.CloudFile, exc);
189

    
190
                    _agent.Post(action);
191
                }                
192
            }
193
        }
194

    
195
        /// <summary>
196
        /// Processes cloud delete actions
197
        /// </summary>
198
        /// <param name="action">The delete action to execute</param>
199
        /// <returns></returns>
200
        /// <remarks>
201
        /// When a file/folder is deleted locally, we must delete it ASAP from the server and block any download
202
        /// operations that may be in progress.
203
        /// <para>
204
        /// A separate agent is used to process deletes because the main agent may be busy with a long operation.
205
        /// </para>
206
        /// </remarks>
207
        private async Task ProcessDelete(CloudDeleteAction action)
208
        {
209
            if (action == null)
210
                throw new ArgumentNullException("action");
211
            if (action.AccountInfo==null)
212
                throw new ArgumentException("The action.AccountInfo is empty","action");
213
            Contract.EndContractBlock();
214

    
215
            var accountInfo = action.AccountInfo;
216

    
217
            using (log4net.ThreadContext.Stacks["NETWORK"].Push("PROCESS"))
218
            {                
219
                Log.InfoFormat("[ACTION] Start Processing {0}", action);
220

    
221
                var cloudFile = action.CloudFile;
222

    
223
                try
224
                {
225
                    //Acquire a lock on the deleted file to prevent uploading/downloading operations from the normal
226
                    //agent
227
                    using (var gate = NetworkGate.Acquire(action.LocalFile.FullName, NetworkOperation.Deleting))
228
                    {
229
                        
230
                        //Add the file URL to the deleted files list
231
                        var key = GetFileKey(action.CloudFile);
232
                        _deletedFiles[key]=DateTime.Now;
233

    
234
                        // and then delete the file from the server
235
                        DeleteCloudFile(accountInfo, cloudFile);
236
                        Log.InfoFormat("[ACTION] End Delete {0}:{1}->{2}", action.Action, action.LocalFile,
237
                                       action.CloudFile.Name);
238
                    }
239
                }
240
                catch (WebException exc)
241
                {
242
                    Log.ErrorFormat("[WEB ERROR] {0} : {1} -> {2} due to exception\r\n{3}", action.Action, action.LocalFile, action.CloudFile, exc);
243
                }
244
                catch (OperationCanceledException)
245
                {
246
                    throw;
247
                }
248
                catch (DirectoryNotFoundException)
249
                {
250
                    Log.ErrorFormat("{0} : {1} -> {2}  failed because the directory was not found.\n Rescheduling a delete",
251
                        action.Action, action.LocalFile, action.CloudFile);
252
                    //Repost a delete action for the missing file
253
                    _deleteAgent.Post(action);                    
254
                }
255
                catch (FileNotFoundException)
256
                {
257
                    Log.ErrorFormat("{0} : {1} -> {2}  failed because the file was not found.\n Rescheduling a delete",
258
                        action.Action, action.LocalFile, action.CloudFile);
259
                    //Post a delete action for the missing file
260
                    _deleteAgent.Post(action);
261
                }
262
                catch (Exception exc)
263
                {
264
                    Log.ErrorFormat("[REQUEUE] {0} : {1} -> {2} due to exception\r\n{3}",
265
                                     action.Action, action.LocalFile, action.CloudFile, exc);
266

    
267
                    _deleteAgent.Post(action);
268
                }                
269
            }
270
        }
271

    
272
        private static string GetFileKey(ObjectInfo info)
273
        {
274
            var key = String.Format("{0}/{1}/{2}", info.Account, info.Container,info.Name);
275
            return key;
276
        }
277

    
278
        private async Task SyncFiles(AccountInfo accountInfo,CloudAction action)
279
        {
280
            if (accountInfo == null)
281
                throw new ArgumentNullException("accountInfo");
282
            if (action==null)
283
                throw new ArgumentNullException("action");
284
            if (action.LocalFile==null)
285
                throw new ArgumentException("The action's local file is not specified","action");
286
            if (!Path.IsPathRooted(action.LocalFile.FullName))
287
                throw new ArgumentException("The action's local file path must be absolute","action");
288
            if (action.CloudFile== null)
289
                throw new ArgumentException("The action's cloud file is not specified", "action");
290
            Contract.EndContractBlock();
291

    
292
            var localFile = action.LocalFile;
293
            var cloudFile = action.CloudFile;
294
            var downloadPath=action.LocalFile.GetProperCapitalization();
295

    
296
            var cloudHash = cloudFile.Hash.ToLower();
297
            var localHash = action.LocalHash.Value.ToLower();
298
            var topHash = action.TopHash.Value.ToLower();
299

    
300
            //Not enough to compare only the local hashes, also have to compare the tophashes
301
            
302
            //If any of the hashes match, we are done
303
            if ((cloudHash == localHash || cloudHash == topHash))
304
            {
305
                Log.InfoFormat("Skipping {0}, hashes match",downloadPath);
306
                return;
307
            }
308

    
309
            //The hashes DON'T match. We need to sync
310
            var lastLocalTime = localFile.LastWriteTime;
311
            var lastUpTime = cloudFile.Last_Modified;
312
            
313
            //If the local file is newer upload it
314
            if (lastUpTime <= lastLocalTime)
315
            {
316
                //It probably means it was changed while the app was down                        
317
                UploadCloudFile(action);
318
            }
319
            else
320
            {
321
                //It the cloud file has a later date, it was modified by another user or computer.
322
                //We need to check the local file's status                
323
                var status = StatusKeeper.GetFileStatus(downloadPath);
324
                switch (status)
325
                {
326
                    case FileStatus.Unchanged:                        
327
                        //If the local file's status is Unchanged, we can go on and download the newer cloud file
328
                        await DownloadCloudFile(accountInfo,cloudFile,downloadPath);
329
                        break;
330
                    case FileStatus.Modified:
331
                        //If the local file is Modified, we may have a conflict. In this case we should mark the file as Conflict
332
                        //We can't ensure that a file modified online since the last time will appear as Modified, unless we 
333
                        //index all files before we start listening.                       
334
                    case FileStatus.Created:
335
                        //If the local file is Created, it means that the local and cloud files aren't related,
336
                        // yet they have the same name.
337

    
338
                        //In both cases we must mark the file as in conflict
339
                        ReportConflict(downloadPath);
340
                        break;
341
                    default:
342
                        //Other cases should never occur. Mark them as Conflict as well but log a warning
343
                        ReportConflict(downloadPath);
344
                        Log.WarnFormat("Unexcepted status {0} for file {1}->{2}", status,
345
                                       downloadPath, action.CloudFile.Name);
346
                        break;
347
                }
348
            }
349
        }
350

    
351
        private void ReportConflict(string downloadPath)
352
        {
353
            if (String.IsNullOrWhiteSpace(downloadPath))
354
                throw new ArgumentNullException("downloadPath");
355
            Contract.EndContractBlock();
356

    
357
            StatusKeeper.SetFileOverlayStatus(downloadPath, FileOverlayStatus.Conflict);
358
            var message = String.Format("Conflict detected for file {0}", downloadPath);
359
            Log.Warn(message);
360
            StatusNotification.NotifyChange(message, TraceLevel.Warning);
361
        }
362

    
363
        public void Post(CloudAction cloudAction)
364
        {
365
            if (cloudAction == null)
366
                throw new ArgumentNullException("cloudAction");
367
            if (cloudAction.AccountInfo==null)
368
                throw new ArgumentException("The CloudAction.AccountInfo is empty","cloudAction");
369
            Contract.EndContractBlock();
370

    
371
            //If the action targets a local file, add a treehash calculation
372
            if (!(cloudAction is CloudDeleteAction) && cloudAction.LocalFile as FileInfo != null)
373
            {
374
                var accountInfo = cloudAction.AccountInfo;
375
                var localFile = (FileInfo) cloudAction.LocalFile;
376
                if (localFile.Length > accountInfo.BlockSize)
377
                    cloudAction.TopHash =
378
                        new Lazy<string>(() => Signature.CalculateTreeHashAsync(localFile,
379
                                                                                accountInfo.BlockSize,
380
                                                                                accountInfo.BlockHash).Result
381
                                                    .TopHash.ToHashString());
382
                else
383
                {
384
                    cloudAction.TopHash = new Lazy<string>(() => cloudAction.LocalHash.Value);
385
                }
386
            }
387
            else
388
            {
389
                //The hash for a directory is the empty string
390
                cloudAction.TopHash = new Lazy<string>(() => String.Empty);
391
            }
392
            
393
            if (cloudAction is CloudDeleteAction)
394
                _deleteAgent.Post((CloudDeleteAction)cloudAction);
395
            else
396
                _agent.Post(cloudAction);
397
        }
398

    
399
       /* class ObjectInfoByNameComparer:IEqualityComparer<ObjectInfo>
400
        {
401
            public bool Equals(ObjectInfo x, ObjectInfo y)
402
            {
403
                return x.Name.Equals(y.Name,StringComparison.InvariantCultureIgnoreCase);
404
            }
405

    
406
            public int GetHashCode(ObjectInfo obj)
407
            {
408
                return obj.Name.ToLower().GetHashCode();
409
            }
410
        }*/
411

    
412
        
413

    
414
        //Remote files are polled periodically. Any changes are processed
415
        public async Task ProcessRemoteFiles(DateTime? since = null)
416
        {            
417
            await TaskEx.Delay(TimeSpan.FromSeconds(10),_agent.CancellationToken);
418

    
419
            using (log4net.ThreadContext.Stacks["Retrieve Remote"].Push("All accounts"))
420
            {
421

    
422
                try
423
                {
424
                    //Next time we will check for all changes since the current check minus 1 second
425
                    //This is done to ensure there are no discrepancies due to clock differences
426
                    DateTime nextSince = DateTime.Now.AddSeconds(-1);
427

    
428
                    var tasks = from accountInfo in _accounts
429
                                select ProcessAccountFiles(accountInfo, since);
430

    
431
                    await TaskEx.WhenAll(tasks.ToList());
432

    
433
                    ProcessRemoteFiles(nextSince);
434
                }
435
                catch (Exception ex)
436
                {
437
                    Log.ErrorFormat("Error while processing accounts\r\n{0}",ex);
438
                    //In case of failure retry with the same parameter
439
                    ProcessRemoteFiles(since);
440
                }
441
                
442

    
443
            }
444
        }
445

    
446
        public async Task ProcessAccountFiles(AccountInfo accountInfo,DateTime? since=null)
447
        {   
448
            if (accountInfo==null)
449
                throw new ArgumentNullException("accountInfo");
450
            if (String.IsNullOrWhiteSpace(accountInfo.AccountPath))
451
                throw new ArgumentException("The AccountInfo.AccountPath is empty","accountInfo");
452
            Contract.EndContractBlock();
453

    
454
            using (log4net.ThreadContext.Stacks["Retrieve Remote"].Push(accountInfo.UserName))
455
            {
456
                Log.Info("Scheduled");
457
                var client=new CloudFilesClient(accountInfo);
458

    
459
                var containers = client.ListContainers(accountInfo.UserName);
460
                
461
                CreateContainerFolders(accountInfo, containers);
462

    
463
                try
464
                {
465
                    
466
                    //Get the list of server objects changed since the last check
467
                    //The name of the container is passed as state in order to create a dictionary of tasks in a subsequent step
468
                    var listObjects = from container in containers
469
                                      select  Task<IList<ObjectInfo>>.Factory.StartNew(_ =>
470
                                            client.ListObjects(accountInfo.UserName,container.Name, since),container.Name);
471

    
472

    
473
                    var listTasks = await Task.Factory.WhenAll(listObjects.ToArray());
474

    
475
                    using (log4net.ThreadContext.Stacks["SCHEDULE"].Push("Process Results"))
476
                    {
477
                        var dict = listTasks.ToDictionary(t => t.AsyncState);
478

    
479
                        //Get all non-trash objects. Remember, the container name is stored in AsyncState
480
                        var remoteObjects = from objectList in listTasks
481
                                            where (string) objectList.AsyncState != "trash"
482
                                            from obj in objectList.Result
483
                                            select obj;
484

    
485
                        var trashObjects = dict["trash"].Result;
486
                        //var sharedObjects = ((Task<IList<ObjectInfo>>) task.Result[2]).Result;
487

    
488
                        //Items with the same name, hash may be both in the container and the trash
489
                        //Don't delete items that exist in the container
490
                        var realTrash = from trash in trashObjects
491
                                        where
492
                                            !remoteObjects.Any(
493
                                                info => info.Name == trash.Name && info.Hash == trash.Hash)
494
                                        select trash;
495
                        ProcessDeletedFiles(accountInfo, realTrash);
496

    
497

    
498
                        var remote = from info in remoteObjects
499
                                     //.Union(sharedObjects)
500
                                     let name = info.Name
501
                                     where !name.EndsWith(".ignore", StringComparison.InvariantCultureIgnoreCase) &&
502
                                           !name.StartsWith(FolderConstants.CacheFolder + "/",
503
                                                            StringComparison.InvariantCultureIgnoreCase)
504
                                     select info;
505

    
506
                        //Create a list of actions from the remote files
507
                        var allActions = ObjectsToActions(accountInfo, remote);
508

    
509
                        //And remove those that are already being processed by the agent
510
                        var distinctActions = allActions
511
                            .Except(_agent.GetEnumerable(), new PithosMonitor.LocalFileComparer())
512
                            .ToList();
513

    
514
                        //Queue all the actions
515
                        foreach (var message in distinctActions)
516
                        {
517
                            Post(message);
518
                        }
519

    
520
                        Log.Info("[LISTENER] End Processing");
521
                    }
522
                }
523
                catch (Exception ex)
524
                {
525
                    Log.ErrorFormat("[FAIL] ListObjects for{0} in ProcessRemoteFiles with {1}", accountInfo.UserName, ex);
526
                    return;
527
                }
528

    
529
                Log.Info("[LISTENER] Finished");
530

    
531
            }
532
        }
533

    
534
        private static void CreateContainerFolders(AccountInfo accountInfo, IEnumerable<ContainerInfo> containers)
535
        {
536
            var containerPaths = from container in containers
537
                                 let containerPath = Path.Combine(accountInfo.AccountPath, container.Name)
538
                                 where container.Name != FolderConstants.TrashContainer && !Directory.Exists(containerPath)
539
                                 select containerPath;
540

    
541
            foreach (var path in containerPaths)
542
            {
543
                Directory.CreateDirectory(path);
544
            }
545
        }
546

    
547
        //Creates an appropriate action for each server file
548
        private IEnumerable<CloudAction> ObjectsToActions(AccountInfo accountInfo,IEnumerable<ObjectInfo> remote)
549
        {
550
            if (remote==null)
551
                throw new ArgumentNullException();
552
            Contract.EndContractBlock();
553
            var fileAgent = GetFileAgent(accountInfo);
554

    
555
            //In order to avoid multiple iterations over the files, we iterate only once
556
            //over the remote files
557
            foreach (var objectInfo in remote)
558
            {
559
                var relativePath = objectInfo.RelativeUrlToFilePath(accountInfo.UserName);
560
                //and remove any matching objects from the list, adding them to the commonObjects list
561
                
562
                if (fileAgent.Exists(relativePath))
563
                {
564
                    //If a directory object already exists, we don't need to perform any other action                    
565
                    var localFile = fileAgent.GetFileSystemInfo(relativePath);
566
                    if (objectInfo.Content_Type == @"application/directory" && localFile is DirectoryInfo)
567
                        continue;
568
                    using (new SessionScope(FlushAction.Never))
569
                    {
570
                        var state =  StatusKeeper.GetStateByFilePath(localFile.FullName);
571
                        //FileState.FindByFilePath(localFile.FullName);
572
                        //Common files should be checked on a per-case basis to detect differences, which is newer
573

    
574
                        yield return new CloudAction(accountInfo, CloudActionType.MustSynch,
575
                                                     localFile, objectInfo, state, accountInfo.BlockSize,
576
                                                     accountInfo.BlockHash);
577
                    }
578
                }
579
                else
580
                {
581
                    //If there is no match we add them to the localFiles list
582
                    //but only if the file is not marked for deletion
583
                    var targetFile = Path.Combine(accountInfo.AccountPath, relativePath);
584
                    var fileStatus = StatusKeeper.GetFileStatus(targetFile);
585
                    if (fileStatus != FileStatus.Deleted)
586
                    {
587
                        //Remote files should be downloaded
588
                        yield return new CloudDownloadAction(accountInfo,objectInfo);
589
                    }
590
                }
591
            }            
592
        }
593

    
594
        private static FileAgent GetFileAgent(AccountInfo accountInfo)
595
        {
596
            return AgentLocator<FileAgent>.Get(accountInfo.AccountPath);
597
        }
598

    
599
        private void ProcessDeletedFiles(AccountInfo accountInfo,IEnumerable<ObjectInfo> trashObjects)
600
        {
601
            var fileAgent = GetFileAgent(accountInfo);
602
            foreach (var trashObject in trashObjects)
603
            {
604
                var barePath = trashObject.RelativeUrlToFilePath(accountInfo.UserName);
605
                //HACK: Assume only the "pithos" container is used. Must find out what happens when
606
                //deleting a file from a different container
607
                var relativePath = Path.Combine("pithos", barePath);
608
                fileAgent.Delete(relativePath);                                
609
            }
610
        }
611

    
612

    
613
        private void RenameCloudFile(AccountInfo accountInfo,CloudMoveAction action)
614
        {
615
            if (accountInfo==null)
616
                throw new ArgumentNullException("accountInfo");
617
            if (action==null)
618
                throw new ArgumentNullException("action");
619
            if (action.CloudFile==null)
620
                throw new ArgumentException("CloudFile","action");
621
            if (action.LocalFile==null)
622
                throw new ArgumentException("LocalFile","action");
623
            if (action.OldLocalFile==null)
624
                throw new ArgumentException("OldLocalFile","action");
625
            if (action.OldCloudFile==null)
626
                throw new ArgumentException("OldCloudFile","action");
627
            Contract.EndContractBlock();
628
            
629
            
630
            var newFilePath = action.LocalFile.FullName;
631
            
632
            //How do we handle concurrent renames and deletes/uploads/downloads?
633
            //* A conflicting upload means that a file was renamed before it had a chance to finish uploading
634
            //  This should never happen as the network agent executes only one action at a time
635
            //* A conflicting download means that the file was modified on the cloud. While we can go on and complete
636
            //  the rename, there may be a problem if the file is downloaded in blocks, as subsequent block requests for the 
637
            //  same name will fail.
638
            //  This should never happen as the network agent executes only one action at a time.
639
            //* A conflicting delete can happen if the rename was followed by a delete action that didn't have the chance
640
            //  to remove the rename from the queue.
641
            //  We can probably ignore this case. It will result in an error which should be ignored            
642

    
643
            
644
            //The local file is already renamed
645
            StatusKeeper.SetFileOverlayStatus(newFilePath, FileOverlayStatus.Modified);
646

    
647

    
648
            var account = action.CloudFile.Account ?? accountInfo.UserName;
649
            var container = action.CloudFile.Container;
650
            
651
            var client = new CloudFilesClient(accountInfo);
652
            //TODO: What code is returned when the source file doesn't exist?
653
            client.MoveObject(account, container, action.OldCloudFile.Name, container, action.CloudFile.Name);
654

    
655
            StatusKeeper.SetFileStatus(newFilePath, FileStatus.Unchanged);
656
            StatusKeeper.SetFileOverlayStatus(newFilePath, FileOverlayStatus.Normal);
657
            NativeMethods.RaiseChangeNotification(newFilePath);
658
        }
659

    
660
        private void DeleteCloudFile(AccountInfo accountInfo, ObjectInfo cloudFile)
661
        {
662
            if (accountInfo == null)
663
                throw new ArgumentNullException("accountInfo");
664
            if (cloudFile==null)
665
                throw new ArgumentNullException("cloudFile");
666

    
667
            if (String.IsNullOrWhiteSpace(cloudFile.Container))
668
                throw new ArgumentException("Invalid container", "cloudFile");
669
            Contract.EndContractBlock();
670
            
671
            var fileAgent = GetFileAgent(accountInfo);
672

    
673
            using ( log4net.ThreadContext.Stacks["DeleteCloudFile"].Push("Delete"))
674
            {
675
                var fileName= cloudFile.RelativeUrlToFilePath(accountInfo.UserName);
676
                var info = fileAgent.GetFileSystemInfo(fileName);                
677
                var fullPath = info.FullName.ToLower();
678

    
679
                StatusKeeper.SetFileOverlayStatus(fullPath, FileOverlayStatus.Modified);
680

    
681
                var account = cloudFile.Account ?? accountInfo.UserName;
682
                var container = cloudFile.Container ;//?? FolderConstants.PithosContainer;
683

    
684
                var client = new CloudFilesClient(accountInfo);
685
                client.DeleteObject(account, container, cloudFile.Name);
686

    
687
                StatusKeeper.ClearFileStatus(fullPath);
688
            }
689
        }
690

    
691
        //Download a file.
692
        private async Task DownloadCloudFile(AccountInfo accountInfo, ObjectInfo cloudFile , string filePath)
693
        {
694
            if (accountInfo == null)
695
                throw new ArgumentNullException("accountInfo");
696
            if (cloudFile == null)
697
                throw new ArgumentNullException("cloudFile");
698
            if (String.IsNullOrWhiteSpace(cloudFile.Account))
699
                throw new ArgumentNullException("cloudFile");
700
            if (String.IsNullOrWhiteSpace(cloudFile.Container))
701
                throw new ArgumentNullException("cloudFile");
702
            if (String.IsNullOrWhiteSpace(filePath))
703
                throw new ArgumentNullException("filePath");
704
            if (!Path.IsPathRooted(filePath))
705
                throw new ArgumentException("The filePath must be rooted", "filePath");
706
            Contract.EndContractBlock();
707

    
708
            var localPath = Interfaces.FileInfoExtensions.GetProperFilePathCapitalization(filePath);
709
            var relativeUrl = new Uri(cloudFile.Name, UriKind.Relative);
710

    
711
            var url = relativeUrl.ToString();
712
            if (cloudFile.Name.EndsWith(".ignore", StringComparison.InvariantCultureIgnoreCase))
713
                return;
714

    
715

    
716
            //Are we already downloading or uploading the file? 
717
            using (var gate=NetworkGate.Acquire(localPath, NetworkOperation.Downloading))
718
            {
719
                if (gate.Failed)
720
                    return;
721
                //The file's hashmap will be stored in the same location with the extension .hashmap
722
                //var hashPath = Path.Combine(FileAgent.CachePath, relativePath + ".hashmap");
723
                
724
                var client = new CloudFilesClient(accountInfo);
725
                var account = cloudFile.Account;
726
                var container = cloudFile.Container;
727

    
728
                if (cloudFile.Content_Type == @"application/directory")
729
                {
730
                    if (!Directory.Exists(localPath))
731
                        Directory.CreateDirectory(localPath);
732
                }
733
                else
734
                {
735
                    //Retrieve the hashmap from the server
736
                    var serverHash = await client.GetHashMap(account, container, url);
737
                    //If it's a small file
738
                    if (serverHash.Hashes.Count == 1)
739
                        //Download it in one go
740
                        await
741
                            DownloadEntireFileAsync(accountInfo, client, cloudFile, relativeUrl, localPath, serverHash);
742
                        //Otherwise download it block by block
743
                    else
744
                        await DownloadWithBlocks(accountInfo, client, cloudFile, relativeUrl, localPath, serverHash);
745

    
746
                    if (cloudFile.AllowedTo == "read")
747
                    {
748
                        var attributes = File.GetAttributes(localPath);
749
                        File.SetAttributes(localPath, attributes | FileAttributes.ReadOnly);                        
750
                    }
751
                }
752

    
753
                //Now we can store the object's metadata without worrying about ghost status entries
754
                StatusKeeper.StoreInfo(localPath, cloudFile);
755
                
756
            }
757
        }
758

    
759
        //Download a small file with a single GET operation
760
        private async Task DownloadEntireFileAsync(AccountInfo accountInfo, CloudFilesClient client, ObjectInfo cloudFile, Uri relativeUrl, string filePath,TreeHash serverHash)
761
        {
762
            if (client == null)
763
                throw new ArgumentNullException("client");
764
            if (cloudFile==null)
765
                throw new ArgumentNullException("cloudFile");
766
            if (relativeUrl == null)
767
                throw new ArgumentNullException("relativeUrl");
768
            if (String.IsNullOrWhiteSpace(filePath))
769
                throw new ArgumentNullException("filePath");
770
            if (!Path.IsPathRooted(filePath))
771
                throw new ArgumentException("The localPath must be rooted", "filePath");
772
            Contract.EndContractBlock();
773

    
774
            var localPath = Pithos.Interfaces.FileInfoExtensions.GetProperFilePathCapitalization(filePath);
775
            //If the file already exists
776
            if (File.Exists(localPath))
777
            {
778
                //First check with MD5 as this is a small file
779
                var localMD5 = Signature.CalculateMD5(localPath);
780
                var cloudHash=serverHash.TopHash.ToHashString();
781
                if (localMD5==cloudHash)
782
                    return;
783
                //Then check with a treehash
784
                var localTreeHash = Signature.CalculateTreeHash(localPath, serverHash.BlockSize, serverHash.BlockHash);
785
                var localHash = localTreeHash.TopHash.ToHashString();
786
                if (localHash==cloudHash)
787
                    return;
788
            }
789

    
790
            var fileAgent = GetFileAgent(accountInfo);
791
            //Calculate the relative file path for the new file
792
            var relativePath = relativeUrl.RelativeUriToFilePath();
793
            //The file will be stored in a temporary location while downloading with an extension .download
794
            var tempPath = Path.Combine(fileAgent.CachePath, relativePath + ".download");
795
            //Make sure the target folder exists. DownloadFileTask will not create the folder
796
            var tempFolder = Path.GetDirectoryName(tempPath);
797
            if (!Directory.Exists(tempFolder))
798
                Directory.CreateDirectory(tempFolder);
799

    
800
            //Download the object to the temporary location
801
            await client.GetObject(cloudFile.Account, cloudFile.Container, relativeUrl.ToString(), tempPath);
802

    
803
            //Create the local folder if it doesn't exist (necessary for shared objects)
804
            var localFolder = Path.GetDirectoryName(localPath);
805
            if (!Directory.Exists(localFolder))
806
                Directory.CreateDirectory(localFolder);            
807
            //And move it to its actual location once downloading is finished
808
            if (File.Exists(localPath))
809
                File.Replace(tempPath,localPath,null,true);
810
            else
811
                File.Move(tempPath,localPath);
812
            //Notify listeners that a local file has changed
813
            StatusNotification.NotifyChangedFile(localPath);
814

    
815
                       
816
        }
817

    
818
        //Download a file asynchronously using blocks
819
        public async Task DownloadWithBlocks(AccountInfo accountInfo, CloudFilesClient client, ObjectInfo cloudFile, Uri relativeUrl, string filePath, TreeHash serverHash)
820
        {
821
            if (client == null)
822
                throw new ArgumentNullException("client");
823
            if (cloudFile == null)
824
                throw new ArgumentNullException("cloudFile");
825
            if (relativeUrl == null)
826
                throw new ArgumentNullException("relativeUrl");
827
            if (String.IsNullOrWhiteSpace(filePath))
828
                throw new ArgumentNullException("filePath");
829
            if (!Path.IsPathRooted(filePath))
830
                throw new ArgumentException("The filePath must be rooted", "filePath");
831
            if (serverHash == null)
832
                throw new ArgumentNullException("serverHash");
833
            Contract.EndContractBlock();
834
            
835
           var fileAgent = GetFileAgent(accountInfo);
836
            var localPath = Interfaces.FileInfoExtensions.GetProperFilePathCapitalization(filePath);
837
            
838
            //Calculate the relative file path for the new file
839
            var relativePath = relativeUrl.RelativeUriToFilePath();
840
            var blockUpdater = new BlockUpdater(fileAgent.CachePath, localPath, relativePath, serverHash);
841

    
842
            
843
                        
844
            //Calculate the file's treehash
845
            var treeHash = await Signature.CalculateTreeHashAsync(localPath, serverHash.BlockSize, serverHash.BlockHash);
846
                
847
            //And compare it with the server's hash
848
            var upHashes = serverHash.GetHashesAsStrings();
849
            var localHashes = treeHash.HashDictionary;
850
            for (int i = 0; i < upHashes.Length; i++)
851
            {
852
                //For every non-matching hash
853
                var upHash = upHashes[i];
854
                if (!localHashes.ContainsKey(upHash))
855
                {
856
                    if (blockUpdater.UseOrphan(i, upHash))
857
                    {
858
                        Log.InfoFormat("[BLOCK GET] ORPHAN FOUND for {0} of {1} for {2}", i, upHashes.Length, localPath);
859
                        continue;
860
                    }
861
                    Log.InfoFormat("[BLOCK GET] START {0} of {1} for {2}", i, upHashes.Length, localPath);
862
                    var start = i*serverHash.BlockSize;
863
                    //To download the last block just pass a null for the end of the range
864
                    long? end = null;
865
                    if (i < upHashes.Length - 1 )
866
                        end= ((i + 1)*serverHash.BlockSize) ;
867
                            
868
                    //Download the missing block
869
                    var block = await client.GetBlock(cloudFile.Account, cloudFile.Container, relativeUrl, start, end);
870

    
871
                    //and store it
872
                    blockUpdater.StoreBlock(i, block);
873

    
874

    
875
                    Log.InfoFormat("[BLOCK GET] FINISH {0} of {1} for {2}", i, upHashes.Length, localPath);
876
                }
877
            }
878

    
879
            //Want to avoid notifications if no changes were made
880
            var hasChanges = blockUpdater.HasBlocks;
881
            blockUpdater.Commit();
882
            
883
            if (hasChanges)
884
                //Notify listeners that a local file has changed
885
                StatusNotification.NotifyChangedFile(localPath);
886

    
887
            Log.InfoFormat("[BLOCK GET] COMPLETE {0}", localPath);            
888
        }
889

    
890

    
891
        private async Task UploadCloudFile(CloudAction action)
892
        {
893
            if (action == null)
894
                throw new ArgumentNullException("action");           
895
            Contract.EndContractBlock();
896

    
897
            try
898
            {
899
                var accountInfo = action.AccountInfo;
900

    
901
                var fileInfo = action.LocalFile;
902

    
903
                if (fileInfo.Extension.Equals("ignore", StringComparison.InvariantCultureIgnoreCase))
904
                    return;
905
                
906
                var relativePath = fileInfo.AsRelativeTo(accountInfo.AccountPath);
907
                if (relativePath.StartsWith(FolderConstants.OthersFolder))
908
                {
909
                    var parts = relativePath.Split('\\');
910
                    var accountName = parts[1];
911
                    var oldName = accountInfo.UserName;
912
                    var absoluteUri = accountInfo.StorageUri.AbsoluteUri;
913
                    var nameIndex = absoluteUri.IndexOf(oldName);
914
                    var root = absoluteUri.Substring(0, nameIndex);
915

    
916
                    accountInfo = new AccountInfo
917
                    {
918
                        UserName = accountName,
919
                        AccountPath = Path.Combine(accountInfo.AccountPath, parts[0], parts[1]),
920
                        StorageUri = new Uri(root + accountName),
921
                        BlockHash = accountInfo.BlockHash,
922
                        BlockSize = accountInfo.BlockSize,
923
                        Token = accountInfo.Token
924
                    };
925
                }
926

    
927

    
928
                var fullFileName = fileInfo.GetProperCapitalization();
929
                using (var gate = NetworkGate.Acquire(fullFileName, NetworkOperation.Uploading))
930
                {
931
                    //Abort if the file is already being uploaded or downloaded
932
                    if (gate.Failed)
933
                        return;
934

    
935
                    var cloudFile = action.CloudFile;
936
                    var account = cloudFile.Account ?? accountInfo.UserName;
937

    
938
                    var client = new CloudFilesClient(accountInfo);                    
939
                    //Even if GetObjectInfo times out, we can proceed with the upload            
940
                    var info = client.GetObjectInfo(account, cloudFile.Container, cloudFile.Name);
941

    
942
                    //If this is a read-only file, do not upload changes
943
                    if (info.AllowedTo == "read")
944
                        return;
945
                    
946
                    //TODO: Check how a directory hash is calculated -> All dirs seem to have the same hash
947
                    if (fileInfo is DirectoryInfo)
948
                    {
949
                        //If the directory doesn't exist the Hash property will be empty
950
                        if (String.IsNullOrWhiteSpace(info.Hash))
951
                            //Go on and create the directory
952
                            client.PutObject(account, cloudFile.Container, cloudFile.Name, fullFileName, String.Empty, "application/directory");
953
                    }
954
                    else
955
                    {
956

    
957
                        var cloudHash = info.Hash.ToLower();
958

    
959
                        var hash = action.LocalHash.Value;
960
                        var topHash = action.TopHash.Value;
961

    
962
                        //If the file hashes match, abort the upload
963
                        if (hash == cloudHash || topHash == cloudHash)
964
                        {
965
                            //but store any metadata changes 
966
                            StatusKeeper.StoreInfo(fullFileName, info);
967
                            Log.InfoFormat("Skip upload of {0}, hashes match", fullFileName);
968
                            return;
969
                        }
970

    
971

    
972
                        //Mark the file as modified while we upload it
973
                        StatusKeeper.SetFileOverlayStatus(fullFileName, FileOverlayStatus.Modified);
974
                        //And then upload it
975

    
976
                        //Upload even small files using the Hashmap. The server may already contain
977
                        //the relevant block
978

    
979
                        //First, calculate the tree hash
980
                        var treeHash = await Signature.CalculateTreeHashAsync(fullFileName, accountInfo.BlockSize,
981
                                                                              accountInfo.BlockHash);
982

    
983
                        await UploadWithHashMap(accountInfo, cloudFile, fileInfo as FileInfo, cloudFile.Name, treeHash);
984
                    }
985
                    //If everything succeeds, change the file and overlay status to normal
986
                    StatusKeeper.SetFileState(fullFileName, FileStatus.Unchanged, FileOverlayStatus.Normal);
987
                }
988
                //Notify the Shell to update the overlays
989
                NativeMethods.RaiseChangeNotification(fullFileName);
990
                StatusNotification.NotifyChangedFile(fullFileName);
991
            }
992
            catch (AggregateException ex)
993
            {
994
                var exc = ex.InnerException as WebException;
995
                if (exc == null)
996
                    throw ex.InnerException;
997
                if (HandleUploadWebException(action, exc)) 
998
                    return;
999
                throw;
1000
            }
1001
            catch (WebException ex)
1002
            {
1003
                if (HandleUploadWebException(action, ex))
1004
                    return;
1005
                throw;
1006
            }
1007
            catch (Exception ex)
1008
            {
1009
                Log.Error("Unexpected error while uploading file", ex);
1010
                throw;
1011
            }
1012

    
1013
        }
1014

    
1015
        private bool IsDeletedFile(CloudAction action)
1016
        {            
1017
            var key = GetFileKey(action.CloudFile);
1018
            DateTime entryDate;
1019
            if (_deletedFiles.TryGetValue(key, out entryDate))
1020
            {
1021
                //If the delete entry was created after this action, abort the action
1022
                if (entryDate > action.Created)
1023
                    return true;
1024
                //Otherwise, remove the stale entry 
1025
                _deletedFiles.TryRemove(key, out entryDate);
1026
            }
1027
            return false;
1028
        }
1029

    
1030
        private bool HandleUploadWebException(CloudAction action, WebException exc)
1031
        {
1032
            var response = exc.Response as HttpWebResponse;
1033
            if (response == null)
1034
                throw exc;
1035
            if (response.StatusCode == HttpStatusCode.Unauthorized)
1036
            {
1037
                Log.Error("Not allowed to upload file", exc);
1038
                var message = String.Format("Not allowed to uplad file {0}", action.LocalFile.FullName);
1039
                StatusKeeper.SetFileState(action.LocalFile.FullName, FileStatus.Unchanged, FileOverlayStatus.Normal);
1040
                StatusNotification.NotifyChange(message, TraceLevel.Warning);
1041
                return true;
1042
            }
1043
            return false;
1044
        }
1045

    
1046
        public async Task UploadWithHashMap(AccountInfo accountInfo,ObjectInfo cloudFile,FileInfo fileInfo,string url,TreeHash treeHash)
1047
        {
1048
            if (accountInfo == null)
1049
                throw new ArgumentNullException("accountInfo");
1050
            if (cloudFile==null)
1051
                throw new ArgumentNullException("cloudFile");
1052
            if (fileInfo == null)
1053
                throw new ArgumentNullException("fileInfo");
1054
            if (String.IsNullOrWhiteSpace(url))
1055
                throw new ArgumentNullException(url);
1056
            if (treeHash==null)
1057
                throw new ArgumentNullException("treeHash");
1058
            if (String.IsNullOrWhiteSpace(cloudFile.Container) )
1059
                throw new ArgumentException("Invalid container","cloudFile");
1060
            Contract.EndContractBlock();
1061

    
1062
            var fullFileName = fileInfo.GetProperCapitalization();
1063

    
1064
            var account = cloudFile.Account ?? accountInfo.UserName;
1065
            var container = cloudFile.Container ;
1066

    
1067
            var client = new CloudFilesClient(accountInfo);
1068
            //Send the hashmap to the server            
1069
            var missingHashes =  await client.PutHashMap(account, container, url, treeHash);
1070
            //If the server returns no missing hashes, we are done
1071
            while (missingHashes.Count > 0)
1072
            {
1073

    
1074
                var buffer = new byte[accountInfo.BlockSize];
1075
                foreach (var missingHash in missingHashes)
1076
                {
1077
                    //Find the proper block
1078
                    var blockIndex = treeHash.HashDictionary[missingHash];
1079
                    var offset = blockIndex*accountInfo.BlockSize;
1080

    
1081
                    var read = fileInfo.Read(buffer, offset, accountInfo.BlockSize);
1082

    
1083
                    try
1084
                    {
1085
                        //And upload the block                
1086
                        await client.PostBlock(account, container, buffer, 0, read);
1087
                        Log.InfoFormat("[BLOCK] Block {0} of {1} uploaded", blockIndex, fullFileName);
1088
                    }
1089
                    catch (Exception exc)
1090
                    {
1091
                        Log.ErrorFormat("[ERROR] uploading block {0} of {1}\n{2}", blockIndex, fullFileName, exc);
1092
                    }
1093

    
1094
                }
1095

    
1096
                //Repeat until there are no more missing hashes                
1097
                missingHashes = await client.PutHashMap(account, container, url, treeHash);
1098
            }
1099
        }
1100

    
1101

    
1102
        public void AddAccount(AccountInfo accountInfo)
1103
        {            
1104
            if (!_accounts.Contains(accountInfo))
1105
                _accounts.Add(accountInfo);
1106
        }
1107
    }
1108

    
1109
   
1110

    
1111

    
1112
}