Statistics
| Branch: | Revision:

root / trunk / Pithos.Core / Agents / NetworkAgent.cs @ e81dd1f6

History | View | Annotate | Download (48.7 kB)

1
// -----------------------------------------------------------------------
2
// <copyright file="NetworkAgent.cs" company="GRNET">
3
// Copyright 2011 GRNET S.A. All rights reserved.
4
// 
5
// Redistribution and use in source and binary forms, with or
6
// without modification, are permitted provided that the following
7
// conditions are met:
8
// 
9
//   1. Redistributions of source code must retain the above
10
//      copyright notice, this list of conditions and the following
11
//      disclaimer.
12
// 
13
//   2. Redistributions in binary form must reproduce the above
14
//      copyright notice, this list of conditions and the following
15
//      disclaimer in the documentation and/or other materials
16
//      provided with the distribution.
17
// 
18
// THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
19
// OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
20
// WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
21
// PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
22
// CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
23
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
24
// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
25
// USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
26
// AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
27
// LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
28
// ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
29
// POSSIBILITY OF SUCH DAMAGE.
30
// 
31
// The views and conclusions contained in the software and
32
// documentation are those of the authors and should not be
33
// interpreted as representing official policies, either expressed
34
// or implied, of GRNET S.A.
35
// </copyright>
36
// -----------------------------------------------------------------------
37

    
38
using System;
39
using System.Collections.Concurrent;
40
using System.Collections.Generic;
41
using System.ComponentModel.Composition;
42
using System.Diagnostics;
43
using System.Diagnostics.Contracts;
44
using System.IO;
45
using System.Linq;
46
using System.Net;
47
using System.Threading.Tasks;
48
using Castle.ActiveRecord;
49
using Pithos.Interfaces;
50
using Pithos.Network;
51
using log4net;
52

    
53
namespace Pithos.Core.Agents
54
{
55
    //TODO: Ensure all network operations use exact casing. Pithos is case sensitive
56
    [Export]
57
    public class NetworkAgent
58
    {
59
        private Agent<CloudAction> _agent;
60

    
61
        //A separate agent is used to execute delete actions immediatelly;
62
        private Agent<CloudDeleteAction> _deleteAgent;
63

    
64

    
65
        [System.ComponentModel.Composition.Import]
66
        public IStatusKeeper StatusKeeper { get; set; }
67
        
68
        public IStatusNotification StatusNotification { get; set; }
69

    
70
        private static readonly ILog Log = LogManager.GetLogger("NetworkAgent");
71

    
72
        private readonly ConcurrentBag<AccountInfo> _accounts = new ConcurrentBag<AccountInfo>();
73

    
74
        public void Start()
75
        {
76

    
77
            _agent = Agent<CloudAction>.Start(inbox =>
78
            {
79
                Action loop = null;
80
                loop = () =>
81
                {
82
                    var message = inbox.Receive();
83
                    var process=message.Then(Process,inbox.CancellationToken);
84
                    inbox.LoopAsync(process, loop);
85
                };
86
                loop();
87
            });
88

    
89
            _deleteAgent = Agent<CloudDeleteAction>.Start(inbox =>
90
            {
91
                Action loop = null;
92
                loop = () =>
93
                            {
94
                                var message = inbox.Receive();
95
                                var process = message.Then(ProcessDelete,inbox.CancellationToken);
96
                                inbox.LoopAsync(process, loop);
97
                            };
98
                loop();
99

    
100
            });
101
        }
102

    
103
        private async Task Process(CloudAction action)
104
        {
105
            if (action == null)
106
                throw new ArgumentNullException("action");
107
            if (action.AccountInfo==null)
108
                throw new ArgumentException("The action.AccountInfo is empty","action");
109
            Contract.EndContractBlock();
110

    
111
            var accountInfo = action.AccountInfo;
112

    
113
            using (log4net.ThreadContext.Stacks["NETWORK"].Push("PROCESS"))
114
            {                
115
                Log.InfoFormat("[ACTION] Start Processing {0}", action);
116

    
117
                var cloudFile = action.CloudFile;
118
                var downloadPath = action.GetDownloadPath();
119

    
120
                try
121
                {
122

    
123
                    switch (action.Action)
124
                    {
125
                        case CloudActionType.UploadUnconditional:
126
                            await UploadCloudFile(action);
127
                            break;
128
                        case CloudActionType.DownloadUnconditional:
129

    
130
                            await DownloadCloudFile(accountInfo, cloudFile, downloadPath);
131
                            break;
132
                        case CloudActionType.DeleteCloud:
133
                            //Redirect deletes to the delete agent 
134
                            _deleteAgent.Post((CloudDeleteAction)action);
135
                            break;
136
                        case CloudActionType.RenameCloud:
137
                            var moveAction = (CloudMoveAction)action;
138
                            RenameCloudFile(accountInfo, moveAction);
139
                            break;
140
                        case CloudActionType.MustSynch:
141
                            if (!File.Exists(downloadPath) && !Directory.Exists(downloadPath))
142
                            {
143
                                await DownloadCloudFile(accountInfo, cloudFile, downloadPath);
144
                            }
145
                            else
146
                            {
147
                                await SyncFiles(accountInfo, action);
148
                            }
149
                            break;
150
                    }
151
                    Log.InfoFormat("[ACTION] End Processing {0}:{1}->{2}", action.Action, action.LocalFile,
152
                                           action.CloudFile.Name);
153
                }
154
                catch (WebException exc)
155
                {
156
                    Log.ErrorFormat("[WEB ERROR] {0} : {1} -> {2} due to exception\r\n{3}", action.Action, action.LocalFile, action.CloudFile, exc);
157
                }
158
                catch (OperationCanceledException)
159
                {
160
                    throw;
161
                }
162
                catch (DirectoryNotFoundException)
163
                {
164
                    Log.ErrorFormat("{0} : {1} -> {2}  failed because the directory was not found.\n Rescheduling a delete",
165
                        action.Action, action.LocalFile, action.CloudFile);
166
                    //Post a delete action for the missing file
167
                    Post(new CloudDeleteAction(action));                    
168
                }
169
                catch (FileNotFoundException)
170
                {
171
                    Log.ErrorFormat("{0} : {1} -> {2}  failed because the file was not found.\n Rescheduling a delete",
172
                        action.Action, action.LocalFile, action.CloudFile);
173
                    //Post a delete action for the missing file
174
                    Post(new CloudDeleteAction(action));
175
                }
176
                catch (Exception exc)
177
                {
178
                    Log.ErrorFormat("[REQUEUE] {0} : {1} -> {2} due to exception\r\n{3}",
179
                                     action.Action, action.LocalFile, action.CloudFile, exc);
180

    
181
                    _agent.Post(action);
182
                }                
183
            }
184
        }
185

    
186
        /// <summary>
187
        /// Processes cloud delete actions
188
        /// </summary>
189
        /// <param name="action">The delete action to execute</param>
190
        /// <returns></returns>
191
        /// <remarks>
192
        /// When a file/folder is deleted locally, we must delete it ASAP from the server and block any download
193
        /// operations that may be in progress.
194
        /// <para>
195
        /// A separate agent is used to process deletes because the main agent may be busy with a long operation.
196
        /// </para>
197
        /// </remarks>
198
        private async Task ProcessDelete(CloudDeleteAction action)
199
        {
200
            if (action == null)
201
                throw new ArgumentNullException("action");
202
            if (action.AccountInfo==null)
203
                throw new ArgumentException("The action.AccountInfo is empty","action");
204
            Contract.EndContractBlock();
205

    
206
            var accountInfo = action.AccountInfo;
207

    
208
            using (log4net.ThreadContext.Stacks["NETWORK"].Push("PROCESS"))
209
            {                
210
                Log.InfoFormat("[ACTION] Start Processing {0}", action);
211

    
212
                var cloudFile = action.CloudFile;
213

    
214
                try
215
                {
216
                    //Acquire a lock on the deleted file to prevent uploading/downloading operations from the normal
217
                    //agent
218
                    using (var gate = NetworkGate.Acquire(action.LocalFile.FullName, NetworkOperation.Deleting))
219
                    {
220
                        // Remove any related actions from the normal agent
221
                        _agent.Remove(queuedAction =>
222
                                      queuedAction.CloudFile.Container == action.CloudFile.Container &&
223
                                      queuedAction.CloudFile.Name == action.CloudFile.Name);
224
                        // and then delete the file from the server
225
                        DeleteCloudFile(accountInfo, cloudFile);
226
                        Log.InfoFormat("[ACTION] End Delete {0}:{1}->{2}", action.Action, action.LocalFile,
227
                                       action.CloudFile.Name);
228
                    }
229
                }
230
                catch (WebException exc)
231
                {
232
                    Log.ErrorFormat("[WEB ERROR] {0} : {1} -> {2} due to exception\r\n{3}", action.Action, action.LocalFile, action.CloudFile, exc);
233
                }
234
                catch (OperationCanceledException)
235
                {
236
                    throw;
237
                }
238
                catch (DirectoryNotFoundException)
239
                {
240
                    Log.ErrorFormat("{0} : {1} -> {2}  failed because the directory was not found.\n Rescheduling a delete",
241
                        action.Action, action.LocalFile, action.CloudFile);
242
                    //Repost a delete action for the missing file
243
                    _deleteAgent.Post(action);                    
244
                }
245
                catch (FileNotFoundException)
246
                {
247
                    Log.ErrorFormat("{0} : {1} -> {2}  failed because the file was not found.\n Rescheduling a delete",
248
                        action.Action, action.LocalFile, action.CloudFile);
249
                    //Post a delete action for the missing file
250
                    _deleteAgent.Post(action);
251
                }
252
                catch (Exception exc)
253
                {
254
                    Log.ErrorFormat("[REQUEUE] {0} : {1} -> {2} due to exception\r\n{3}",
255
                                     action.Action, action.LocalFile, action.CloudFile, exc);
256

    
257
                    _deleteAgent.Post(action);
258
                }                
259
            }
260
        }
261

    
262
        private async Task SyncFiles(AccountInfo accountInfo,CloudAction action)
263
        {
264
            if (accountInfo == null)
265
                throw new ArgumentNullException("accountInfo");
266
            if (action==null)
267
                throw new ArgumentNullException("action");
268
            if (action.LocalFile==null)
269
                throw new ArgumentException("The action's local file is not specified","action");
270
            if (!Path.IsPathRooted(action.LocalFile.FullName))
271
                throw new ArgumentException("The action's local file path must be absolute","action");
272
            if (action.CloudFile== null)
273
                throw new ArgumentException("The action's cloud file is not specified", "action");
274
            Contract.EndContractBlock();
275

    
276
            var localFile = action.LocalFile;
277
            var cloudFile = action.CloudFile;
278
            var downloadPath=action.LocalFile.GetProperCapitalization();
279

    
280
            var cloudHash = cloudFile.Hash.ToLower();
281
            var localHash = action.LocalHash.Value.ToLower();
282
            var topHash = action.TopHash.Value.ToLower();
283

    
284
            //Not enough to compare only the local hashes, also have to compare the tophashes
285
            
286
            //If any of the hashes match, we are done
287
            if ((cloudHash == localHash || cloudHash == topHash))
288
            {
289
                Log.InfoFormat("Skipping {0}, hashes match",downloadPath);
290
                return;
291
            }
292

    
293
            //The hashes DON'T match. We need to sync
294
            var lastLocalTime = localFile.LastWriteTime;
295
            var lastUpTime = cloudFile.Last_Modified;
296
            
297
            //If the local file is newer upload it
298
            if (lastUpTime <= lastLocalTime)
299
            {
300
                //It probably means it was changed while the app was down                        
301
                UploadCloudFile(action);
302
            }
303
            else
304
            {
305
                //It the cloud file has a later date, it was modified by another user or computer.
306
                //We need to check the local file's status                
307
                var status = StatusKeeper.GetFileStatus(downloadPath);
308
                switch (status)
309
                {
310
                    case FileStatus.Unchanged:                        
311
                        //If the local file's status is Unchanged, we can go on and download the newer cloud file
312
                        await DownloadCloudFile(accountInfo,cloudFile,downloadPath);
313
                        break;
314
                    case FileStatus.Modified:
315
                        //If the local file is Modified, we may have a conflict. In this case we should mark the file as Conflict
316
                        //We can't ensure that a file modified online since the last time will appear as Modified, unless we 
317
                        //index all files before we start listening.                       
318
                    case FileStatus.Created:
319
                        //If the local file is Created, it means that the local and cloud files aren't related,
320
                        // yet they have the same name.
321

    
322
                        //In both cases we must mark the file as in conflict
323
                        ReportConflict(downloadPath);
324
                        break;
325
                    default:
326
                        //Other cases should never occur. Mark them as Conflict as well but log a warning
327
                        ReportConflict(downloadPath);
328
                        Log.WarnFormat("Unexcepted status {0} for file {1}->{2}", status,
329
                                       downloadPath, action.CloudFile.Name);
330
                        break;
331
                }
332
            }
333
        }
334

    
335
        private void ReportConflict(string downloadPath)
336
        {
337
            if (String.IsNullOrWhiteSpace(downloadPath))
338
                throw new ArgumentNullException("downloadPath");
339
            Contract.EndContractBlock();
340

    
341
            StatusKeeper.SetFileOverlayStatus(downloadPath, FileOverlayStatus.Conflict);
342
            var message = String.Format("Conflict detected for file {0}", downloadPath);
343
            Log.Warn(message);
344
            StatusNotification.NotifyChange(message, TraceLevel.Warning);
345
        }
346

    
347
        public void Post(CloudAction cloudAction)
348
        {
349
            if (cloudAction == null)
350
                throw new ArgumentNullException("cloudAction");
351
            if (cloudAction.AccountInfo==null)
352
                throw new ArgumentException("The CloudAction.AccountInfo is empty","cloudAction");
353
            Contract.EndContractBlock();
354

    
355
            //If the action targets a local file, add a treehash calculation
356
            if (!(cloudAction is CloudDeleteAction) && cloudAction.LocalFile as FileInfo != null)
357
            {
358
                var accountInfo = cloudAction.AccountInfo;
359
                var localFile = (FileInfo) cloudAction.LocalFile;
360
                if (localFile.Length > accountInfo.BlockSize)
361
                    cloudAction.TopHash =
362
                        new Lazy<string>(() => Signature.CalculateTreeHashAsync(localFile,
363
                                                                                accountInfo.BlockSize,
364
                                                                                accountInfo.BlockHash).Result
365
                                                    .TopHash.ToHashString());
366
                else
367
                {
368
                    cloudAction.TopHash = new Lazy<string>(() => cloudAction.LocalHash.Value);
369
                }
370
            }
371
            else
372
            {
373
                //The hash for a directory is the empty string
374
                cloudAction.TopHash = new Lazy<string>(() => String.Empty);
375
            }
376
            
377
            if (cloudAction is CloudDeleteAction)
378
                _deleteAgent.Post((CloudDeleteAction)cloudAction);
379
            else
380
                _agent.Post(cloudAction);
381
        }
382

    
383
       /* class ObjectInfoByNameComparer:IEqualityComparer<ObjectInfo>
384
        {
385
            public bool Equals(ObjectInfo x, ObjectInfo y)
386
            {
387
                return x.Name.Equals(y.Name,StringComparison.InvariantCultureIgnoreCase);
388
            }
389

    
390
            public int GetHashCode(ObjectInfo obj)
391
            {
392
                return obj.Name.ToLower().GetHashCode();
393
            }
394
        }*/
395

    
396
        
397

    
398
        //Remote files are polled periodically. Any changes are processed
399
        public async Task ProcessRemoteFiles(DateTime? since = null)
400
        {            
401
            await TaskEx.Delay(TimeSpan.FromSeconds(10),_agent.CancellationToken);
402

    
403
            using (log4net.ThreadContext.Stacks["Retrieve Remote"].Push("All accounts"))
404
            {
405

    
406
                try
407
                {
408
                    //Next time we will check for all changes since the current check minus 1 second
409
                    //This is done to ensure there are no discrepancies due to clock differences
410
                    DateTime nextSince = DateTime.Now.AddSeconds(-1);
411

    
412
                    var tasks = from accountInfo in _accounts
413
                                select ProcessAccountFiles(accountInfo, since);
414

    
415
                    await TaskEx.WhenAll(tasks.ToList());
416

    
417
                    ProcessRemoteFiles(nextSince);
418
                }
419
                catch (Exception ex)
420
                {
421
                    Log.ErrorFormat("Error while processing accounts\r\n{0}",ex);
422
                    //In case of failure retry with the same parameter
423
                    ProcessRemoteFiles(since);
424
                }
425
                
426

    
427
            }
428
        }
429

    
430
        public async Task ProcessAccountFiles(AccountInfo accountInfo,DateTime? since=null)
431
        {   
432
            if (accountInfo==null)
433
                throw new ArgumentNullException("accountInfo");
434
            if (String.IsNullOrWhiteSpace(accountInfo.AccountPath))
435
                throw new ArgumentException("The AccountInfo.AccountPath is empty","accountInfo");
436
            Contract.EndContractBlock();
437

    
438
            using (log4net.ThreadContext.Stacks["Retrieve Remote"].Push(accountInfo.UserName))
439
            {
440
                Log.Info("Scheduled");
441
                var client=new CloudFilesClient(accountInfo);
442

    
443
                var containers = client.ListContainers(accountInfo.UserName);
444
                
445
                CreateContainerFolders(accountInfo, containers);
446

    
447
                try
448
                {
449
                    
450
                    //Get the list of server objects changed since the last check
451
                    //The name of the container is passed as state in order to create a dictionary of tasks in a subsequent step
452
                    var listObjects = from container in containers
453
                                      select  Task<IList<ObjectInfo>>.Factory.StartNew(_ =>
454
                                            client.ListObjects(accountInfo.UserName,container.Name, since),container.Name);
455

    
456

    
457
                    var listTasks = await Task.Factory.WhenAll(listObjects.ToArray());
458

    
459
                    using (log4net.ThreadContext.Stacks["SCHEDULE"].Push("Process Results"))
460
                    {
461
                        var dict = listTasks.ToDictionary(t => t.AsyncState);
462

    
463
                        //Get all non-trash objects. Remember, the container name is stored in AsyncState
464
                        var remoteObjects = from objectList in listTasks
465
                                            where (string) objectList.AsyncState != "trash"
466
                                            from obj in objectList.Result
467
                                            select obj;
468

    
469
                        var trashObjects = dict["trash"].Result;
470
                        //var sharedObjects = ((Task<IList<ObjectInfo>>) task.Result[2]).Result;
471

    
472
                        //Items with the same name, hash may be both in the container and the trash
473
                        //Don't delete items that exist in the container
474
                        var realTrash = from trash in trashObjects
475
                                        where
476
                                            !remoteObjects.Any(
477
                                                info => info.Name == trash.Name && info.Hash == trash.Hash)
478
                                        select trash;
479
                        ProcessDeletedFiles(accountInfo, realTrash);
480

    
481

    
482
                        var remote = from info in remoteObjects
483
                                     //.Union(sharedObjects)
484
                                     let name = info.Name
485
                                     where !name.EndsWith(".ignore", StringComparison.InvariantCultureIgnoreCase) &&
486
                                           !name.StartsWith(FolderConstants.CacheFolder + "/",
487
                                                            StringComparison.InvariantCultureIgnoreCase)
488
                                     select info;
489

    
490
                        //Create a list of actions from the remote files
491
                        var allActions = ObjectsToActions(accountInfo, remote);
492

    
493
                        //And remove those that are already being processed by the agent
494
                        var distinctActions = allActions
495
                            .Except(_agent.GetEnumerable(), new PithosMonitor.LocalFileComparer())
496
                            .ToList();
497

    
498
                        //Queue all the actions
499
                        foreach (var message in distinctActions)
500
                        {
501
                            Post(message);
502
                        }
503

    
504
                        Log.Info("[LISTENER] End Processing");
505
                    }
506
                }
507
                catch (Exception ex)
508
                {
509
                    Log.ErrorFormat("[FAIL] ListObjects for{0} in ProcessRemoteFiles with {1}", accountInfo.UserName, ex);
510
                    return;
511
                }
512

    
513
                Log.Info("[LISTENER] Finished");
514

    
515
            }
516
        }
517

    
518
        private static void CreateContainerFolders(AccountInfo accountInfo, IEnumerable<ContainerInfo> containers)
519
        {
520
            var containerPaths = from container in containers
521
                                 let containerPath = Path.Combine(accountInfo.AccountPath, container.Name)
522
                                 where container.Name != FolderConstants.TrashContainer && !Directory.Exists(containerPath)
523
                                 select containerPath;
524

    
525
            foreach (var path in containerPaths)
526
            {
527
                Directory.CreateDirectory(path);
528
            }
529
        }
530

    
531
        //Creates an appropriate action for each server file
532
        private IEnumerable<CloudAction> ObjectsToActions(AccountInfo accountInfo,IEnumerable<ObjectInfo> remote)
533
        {
534
            if (remote==null)
535
                throw new ArgumentNullException();
536
            Contract.EndContractBlock();
537
            var fileAgent = GetFileAgent(accountInfo);
538

    
539
            //In order to avoid multiple iterations over the files, we iterate only once
540
            //over the remote files
541
            foreach (var objectInfo in remote)
542
            {
543
                var relativePath = objectInfo.RelativeUrlToFilePath(accountInfo.UserName);
544
                //and remove any matching objects from the list, adding them to the commonObjects list
545
                
546
                if (fileAgent.Exists(relativePath))
547
                {
548
                    //If a directory object already exists, we don't need to perform any other action                    
549
                    var localFile = fileAgent.GetFileSystemInfo(relativePath);
550
                    if (objectInfo.Content_Type == @"application/directory" && localFile is DirectoryInfo)
551
                        continue;
552
                    using (new SessionScope(FlushAction.Never))
553
                    {
554
                        var state = FileState.FindByFilePath(localFile.FullName);
555
                        //Common files should be checked on a per-case basis to detect differences, which is newer
556

    
557
                        yield return new CloudAction(accountInfo, CloudActionType.MustSynch,
558
                                                     localFile, objectInfo, state, accountInfo.BlockSize,
559
                                                     accountInfo.BlockHash);
560
                    }
561
                }
562
                else
563
                {
564
                    //If there is no match we add them to the localFiles list
565
                    //but only if the file is not marked for deletion
566
                    var targetFile = Path.Combine(accountInfo.AccountPath, relativePath);
567
                    var fileStatus = StatusKeeper.GetFileStatus(targetFile);
568
                    if (fileStatus != FileStatus.Deleted)
569
                    {
570
                        //Remote files should be downloaded
571
                        yield return new CloudDownloadAction(accountInfo,objectInfo);
572
                    }
573
                }
574
            }            
575
        }
576

    
577
        private static FileAgent GetFileAgent(AccountInfo accountInfo)
578
        {
579
            return AgentLocator<FileAgent>.Get(accountInfo.AccountPath);
580
        }
581

    
582
        private void ProcessDeletedFiles(AccountInfo accountInfo,IEnumerable<ObjectInfo> trashObjects)
583
        {
584
            var fileAgent = GetFileAgent(accountInfo);
585
            foreach (var trashObject in trashObjects)
586
            {
587
                var barePath = trashObject.RelativeUrlToFilePath(accountInfo.UserName);
588
                //HACK: Assume only the "pithos" container is used. Must find out what happens when
589
                //deleting a file from a different container
590
                var relativePath = Path.Combine("pithos", barePath);
591
                fileAgent.Delete(relativePath);                                
592
            }
593
        }
594

    
595

    
596
        private void RenameCloudFile(AccountInfo accountInfo,CloudMoveAction action)
597
        {
598
            if (accountInfo==null)
599
                throw new ArgumentNullException("accountInfo");
600
            if (action==null)
601
                throw new ArgumentNullException("action");
602
            if (action.CloudFile==null)
603
                throw new ArgumentException("CloudFile","action");
604
            if (action.LocalFile==null)
605
                throw new ArgumentException("LocalFile","action");
606
            if (action.OldLocalFile==null)
607
                throw new ArgumentException("OldLocalFile","action");
608
            if (action.OldCloudFile==null)
609
                throw new ArgumentException("OldCloudFile","action");
610
            Contract.EndContractBlock();
611
            
612
            
613
            var newFilePath = action.LocalFile.FullName;
614
            
615
            //How do we handle concurrent renames and deletes/uploads/downloads?
616
            //* A conflicting upload means that a file was renamed before it had a chance to finish uploading
617
            //  This should never happen as the network agent executes only one action at a time
618
            //* A conflicting download means that the file was modified on the cloud. While we can go on and complete
619
            //  the rename, there may be a problem if the file is downloaded in blocks, as subsequent block requests for the 
620
            //  same name will fail.
621
            //  This should never happen as the network agent executes only one action at a time.
622
            //* A conflicting delete can happen if the rename was followed by a delete action that didn't have the chance
623
            //  to remove the rename from the queue.
624
            //  We can probably ignore this case. It will result in an error which should be ignored            
625

    
626
            
627
            //The local file is already renamed
628
            StatusKeeper.SetFileOverlayStatus(newFilePath, FileOverlayStatus.Modified);
629

    
630

    
631
            var account = action.CloudFile.Account ?? accountInfo.UserName;
632
            var container = action.CloudFile.Container;
633
            
634
            var client = new CloudFilesClient(accountInfo);
635
            //TODO: What code is returned when the source file doesn't exist?
636
            client.MoveObject(account, container, action.OldCloudFile.Name, container, action.CloudFile.Name);
637

    
638
            StatusKeeper.SetFileStatus(newFilePath, FileStatus.Unchanged);
639
            StatusKeeper.SetFileOverlayStatus(newFilePath, FileOverlayStatus.Normal);
640
            NativeMethods.RaiseChangeNotification(newFilePath);
641
        }
642

    
643
        private void DeleteCloudFile(AccountInfo accountInfo, ObjectInfo cloudFile)
644
        {
645
            if (accountInfo == null)
646
                throw new ArgumentNullException("accountInfo");
647
            if (cloudFile==null)
648
                throw new ArgumentNullException("cloudFile");
649

    
650
            if (String.IsNullOrWhiteSpace(cloudFile.Container))
651
                throw new ArgumentException("Invalid container", "cloudFile");
652
            Contract.EndContractBlock();
653
            
654
            var fileAgent = GetFileAgent(accountInfo);
655

    
656
            using ( log4net.ThreadContext.Stacks["DeleteCloudFile"].Push("Delete"))
657
            {
658
                var fileName= cloudFile.RelativeUrlToFilePath(accountInfo.UserName);
659
                var info = fileAgent.GetFileSystemInfo(fileName);                
660
                var fullPath = info.FullName.ToLower();
661

    
662
                StatusKeeper.SetFileOverlayStatus(fullPath, FileOverlayStatus.Modified);
663

    
664
                var account = cloudFile.Account ?? accountInfo.UserName;
665
                var container = cloudFile.Container ;//?? FolderConstants.PithosContainer;
666

    
667
                var client = new CloudFilesClient(accountInfo);
668
                client.DeleteObject(account, container, cloudFile.Name);
669

    
670
                StatusKeeper.ClearFileStatus(fullPath);
671
            }
672
        }
673

    
674
        //Download a file.
675
        private async Task DownloadCloudFile(AccountInfo accountInfo, ObjectInfo cloudFile , string filePath)
676
        {
677
            if (accountInfo == null)
678
                throw new ArgumentNullException("accountInfo");
679
            if (cloudFile == null)
680
                throw new ArgumentNullException("cloudFile");
681
            if (String.IsNullOrWhiteSpace(cloudFile.Account))
682
                throw new ArgumentNullException("cloudFile");
683
            if (String.IsNullOrWhiteSpace(cloudFile.Container))
684
                throw new ArgumentNullException("cloudFile");
685
            if (String.IsNullOrWhiteSpace(filePath))
686
                throw new ArgumentNullException("filePath");
687
            if (!Path.IsPathRooted(filePath))
688
                throw new ArgumentException("The filePath must be rooted", "filePath");
689
            Contract.EndContractBlock();
690

    
691
            var localPath = Interfaces.FileInfoExtensions.GetProperFilePathCapitalization(filePath);
692
            var relativeUrl = new Uri(cloudFile.Name, UriKind.Relative);
693

    
694
            var url = relativeUrl.ToString();
695
            if (cloudFile.Name.EndsWith(".ignore", StringComparison.InvariantCultureIgnoreCase))
696
                return;
697

    
698
            //Are we already downloading or uploading the file? 
699
            using (var gate=NetworkGate.Acquire(localPath, NetworkOperation.Downloading))
700
            {
701
                if (gate.Failed)
702
                    return;
703
                //The file's hashmap will be stored in the same location with the extension .hashmap
704
                //var hashPath = Path.Combine(FileAgent.CachePath, relativePath + ".hashmap");
705
                
706
                var client = new CloudFilesClient(accountInfo);
707
                var account = cloudFile.Account;
708
                var container = cloudFile.Container;
709

    
710
                if (cloudFile.Content_Type == @"application/directory")
711
                {
712
                    if (!Directory.Exists(localPath))
713
                        Directory.CreateDirectory(localPath);
714
                }
715
                else
716
                {
717
                    //Retrieve the hashmap from the server
718
                    var serverHash = await client.GetHashMap(account, container, url);
719
                    //If it's a small file
720
                    if (serverHash.Hashes.Count == 1)
721
                        //Download it in one go
722
                        await
723
                            DownloadEntireFileAsync(accountInfo, client, cloudFile, relativeUrl, localPath, serverHash);
724
                        //Otherwise download it block by block
725
                    else
726
                        await DownloadWithBlocks(accountInfo, client, cloudFile, relativeUrl, localPath, serverHash);
727

    
728
                    if (cloudFile.AllowedTo == "read")
729
                    {
730
                        var attributes = File.GetAttributes(localPath);
731
                        File.SetAttributes(localPath, attributes | FileAttributes.ReadOnly);                        
732
                    }
733
                }
734

    
735
                //Now we can store the object's metadata without worrying about ghost status entries
736
                StatusKeeper.StoreInfo(localPath, cloudFile);
737
                
738
            }
739
        }
740

    
741
        //Download a small file with a single GET operation
742
        private async Task DownloadEntireFileAsync(AccountInfo accountInfo, CloudFilesClient client, ObjectInfo cloudFile, Uri relativeUrl, string filePath,TreeHash serverHash)
743
        {
744
            if (client == null)
745
                throw new ArgumentNullException("client");
746
            if (cloudFile==null)
747
                throw new ArgumentNullException("cloudFile");
748
            if (relativeUrl == null)
749
                throw new ArgumentNullException("relativeUrl");
750
            if (String.IsNullOrWhiteSpace(filePath))
751
                throw new ArgumentNullException("filePath");
752
            if (!Path.IsPathRooted(filePath))
753
                throw new ArgumentException("The localPath must be rooted", "filePath");
754
            Contract.EndContractBlock();
755

    
756
            var localPath = Pithos.Interfaces.FileInfoExtensions.GetProperFilePathCapitalization(filePath);
757
            //If the file already exists
758
            if (File.Exists(localPath))
759
            {
760
                //First check with MD5 as this is a small file
761
                var localMD5 = Signature.CalculateMD5(localPath);
762
                var cloudHash=serverHash.TopHash.ToHashString();
763
                if (localMD5==cloudHash)
764
                    return;
765
                //Then check with a treehash
766
                var localTreeHash = Signature.CalculateTreeHash(localPath, serverHash.BlockSize, serverHash.BlockHash);
767
                var localHash = localTreeHash.TopHash.ToHashString();
768
                if (localHash==cloudHash)
769
                    return;
770
            }
771

    
772
            var fileAgent = GetFileAgent(accountInfo);
773
            //Calculate the relative file path for the new file
774
            var relativePath = relativeUrl.RelativeUriToFilePath();
775
            //The file will be stored in a temporary location while downloading with an extension .download
776
            var tempPath = Path.Combine(fileAgent.CachePath, relativePath + ".download");
777
            //Make sure the target folder exists. DownloadFileTask will not create the folder
778
            var tempFolder = Path.GetDirectoryName(tempPath);
779
            if (!Directory.Exists(tempFolder))
780
                Directory.CreateDirectory(tempFolder);
781

    
782
            //Download the object to the temporary location
783
            await client.GetObject(cloudFile.Account, cloudFile.Container, relativeUrl.ToString(), tempPath);
784

    
785
            //Create the local folder if it doesn't exist (necessary for shared objects)
786
            var localFolder = Path.GetDirectoryName(localPath);
787
            if (!Directory.Exists(localFolder))
788
                Directory.CreateDirectory(localFolder);            
789
            //And move it to its actual location once downloading is finished
790
            if (File.Exists(localPath))
791
                File.Replace(tempPath,localPath,null,true);
792
            else
793
                File.Move(tempPath,localPath);
794
            //Notify listeners that a local file has changed
795
            StatusNotification.NotifyChangedFile(localPath);
796

    
797
                       
798
        }
799

    
800
        //Download a file asynchronously using blocks
801
        public async Task DownloadWithBlocks(AccountInfo accountInfo, CloudFilesClient client, ObjectInfo cloudFile, Uri relativeUrl, string filePath, TreeHash serverHash)
802
        {
803
            if (client == null)
804
                throw new ArgumentNullException("client");
805
            if (cloudFile == null)
806
                throw new ArgumentNullException("cloudFile");
807
            if (relativeUrl == null)
808
                throw new ArgumentNullException("relativeUrl");
809
            if (String.IsNullOrWhiteSpace(filePath))
810
                throw new ArgumentNullException("filePath");
811
            if (!Path.IsPathRooted(filePath))
812
                throw new ArgumentException("The filePath must be rooted", "filePath");
813
            if (serverHash == null)
814
                throw new ArgumentNullException("serverHash");
815
            Contract.EndContractBlock();
816
            
817
           var fileAgent = GetFileAgent(accountInfo);
818
            var localPath = Interfaces.FileInfoExtensions.GetProperFilePathCapitalization(filePath);
819
            
820
            //Calculate the relative file path for the new file
821
            var relativePath = relativeUrl.RelativeUriToFilePath();
822
            var blockUpdater = new BlockUpdater(fileAgent.CachePath, localPath, relativePath, serverHash);
823

    
824
            
825
                        
826
            //Calculate the file's treehash
827
            var treeHash = await Signature.CalculateTreeHashAsync(localPath, serverHash.BlockSize, serverHash.BlockHash);
828
                
829
            //And compare it with the server's hash
830
            var upHashes = serverHash.GetHashesAsStrings();
831
            var localHashes = treeHash.HashDictionary;
832
            for (int i = 0; i < upHashes.Length; i++)
833
            {
834
                //For every non-matching hash
835
                var upHash = upHashes[i];
836
                if (!localHashes.ContainsKey(upHash))
837
                {
838
                    if (blockUpdater.UseOrphan(i, upHash))
839
                    {
840
                        Log.InfoFormat("[BLOCK GET] ORPHAN FOUND for {0} of {1} for {2}", i, upHashes.Length, localPath);
841
                        continue;
842
                    }
843
                    Log.InfoFormat("[BLOCK GET] START {0} of {1} for {2}", i, upHashes.Length, localPath);
844
                    var start = i*serverHash.BlockSize;
845
                    //To download the last block just pass a null for the end of the range
846
                    long? end = null;
847
                    if (i < upHashes.Length - 1 )
848
                        end= ((i + 1)*serverHash.BlockSize) ;
849
                            
850
                    //Download the missing block
851
                    var block = await client.GetBlock(cloudFile.Account, cloudFile.Container, relativeUrl, start, end);
852

    
853
                    //and store it
854
                    blockUpdater.StoreBlock(i, block);
855

    
856

    
857
                    Log.InfoFormat("[BLOCK GET] FINISH {0} of {1} for {2}", i, upHashes.Length, localPath);
858
                }
859
            }
860

    
861
            //Want to avoid notifications if no changes were made
862
            var hasChanges = blockUpdater.HasBlocks;
863
            blockUpdater.Commit();
864
            
865
            if (hasChanges)
866
                //Notify listeners that a local file has changed
867
                StatusNotification.NotifyChangedFile(localPath);
868

    
869
            Log.InfoFormat("[BLOCK GET] COMPLETE {0}", localPath);            
870
        }
871

    
872

    
873
        private async Task UploadCloudFile(CloudAction action)
874
        {
875
            if (action == null)
876
                throw new ArgumentNullException("action");           
877
            Contract.EndContractBlock();
878

    
879
            try
880
            {
881
                var accountInfo = action.AccountInfo;
882

    
883
                var fileInfo = action.LocalFile;
884

    
885
                if (fileInfo.Extension.Equals("ignore", StringComparison.InvariantCultureIgnoreCase))
886
                    return;
887

    
888
                var relativePath = fileInfo.AsRelativeTo(accountInfo.AccountPath);
889
                if (relativePath.StartsWith(FolderConstants.OthersFolder))
890
                {
891
                    var parts = relativePath.Split('\\');
892
                    var accountName = parts[1];
893
                    var oldName = accountInfo.UserName;
894
                    var absoluteUri = accountInfo.StorageUri.AbsoluteUri;
895
                    var nameIndex = absoluteUri.IndexOf(oldName);
896
                    var root = absoluteUri.Substring(0, nameIndex);
897

    
898
                    accountInfo = new AccountInfo
899
                    {
900
                        UserName = accountName,
901
                        AccountPath = Path.Combine(accountInfo.AccountPath, parts[0], parts[1]),
902
                        StorageUri = new Uri(root + accountName),
903
                        BlockHash = accountInfo.BlockHash,
904
                        BlockSize = accountInfo.BlockSize,
905
                        Token = accountInfo.Token
906
                    };
907
                }
908

    
909

    
910
                var fullFileName = fileInfo.GetProperCapitalization();
911
                using (var gate = NetworkGate.Acquire(fullFileName, NetworkOperation.Uploading))
912
                {
913
                    //Abort if the file is already being uploaded or downloaded
914
                    if (gate.Failed)
915
                        return;
916

    
917
                    var cloudFile = action.CloudFile;
918
                    var account = cloudFile.Account ?? accountInfo.UserName;
919

    
920
                    var client = new CloudFilesClient(accountInfo);                    
921
                    //Even if GetObjectInfo times out, we can proceed with the upload            
922
                    var info = client.GetObjectInfo(account, cloudFile.Container, cloudFile.Name);
923

    
924
                    //If this is a read-only file, do not upload changes
925
                    if (info.AllowedTo == "read")
926
                        return;
927
                    
928
                    //TODO: Check how a directory hash is calculated -> All dirs seem to have the same hash
929
                    if (fileInfo is DirectoryInfo)
930
                    {
931
                        //If the directory doesn't exist the Hash property will be empty
932
                        if (String.IsNullOrWhiteSpace(info.Hash))
933
                            //Go on and create the directory
934
                            client.PutObject(account, cloudFile.Container, cloudFile.Name, fullFileName, String.Empty, "application/directory");
935
                    }
936
                    else
937
                    {
938

    
939
                        var cloudHash = info.Hash.ToLower();
940

    
941
                        var hash = action.LocalHash.Value;
942
                        var topHash = action.TopHash.Value;
943

    
944
                        //If the file hashes match, abort the upload
945
                        if (hash == cloudHash || topHash == cloudHash)
946
                        {
947
                            //but store any metadata changes 
948
                            StatusKeeper.StoreInfo(fullFileName, info);
949
                            Log.InfoFormat("Skip upload of {0}, hashes match", fullFileName);
950
                            return;
951
                        }
952

    
953

    
954
                        //Mark the file as modified while we upload it
955
                        StatusKeeper.SetFileOverlayStatus(fullFileName, FileOverlayStatus.Modified);
956
                        //And then upload it
957

    
958
                        //Upload even small files using the Hashmap. The server may already contain
959
                        //the relevant block
960

    
961
                        //First, calculate the tree hash
962
                        var treeHash = await Signature.CalculateTreeHashAsync(fullFileName, accountInfo.BlockSize,
963
                                                                              accountInfo.BlockHash);
964

    
965
                        await UploadWithHashMap(accountInfo, cloudFile, fileInfo as FileInfo, cloudFile.Name, treeHash);
966
                    }
967
                    //If everything succeeds, change the file and overlay status to normal
968
                    StatusKeeper.SetFileState(fullFileName, FileStatus.Unchanged, FileOverlayStatus.Normal);
969
                }
970
                //Notify the Shell to update the overlays
971
                NativeMethods.RaiseChangeNotification(fullFileName);
972
                StatusNotification.NotifyChangedFile(fullFileName);
973
            }
974
            catch (AggregateException ex)
975
            {
976
                var exc = ex.InnerException as WebException;
977
                if (exc == null)
978
                    throw ex.InnerException;
979
                if (HandleUploadWebException(action, exc)) 
980
                    return;
981
                throw;
982
            }
983
            catch (WebException ex)
984
            {
985
                if (HandleUploadWebException(action, ex))
986
                    return;
987
                throw;
988
            }
989
            catch (Exception ex)
990
            {
991
                Log.Error("Unexpected error while uploading file", ex);
992
                throw;
993
            }
994

    
995
        }
996

    
997
        private bool HandleUploadWebException(CloudAction action, WebException exc)
998
        {
999
            var response = exc.Response as HttpWebResponse;
1000
            if (response == null)
1001
                throw exc;
1002
            if (response.StatusCode == HttpStatusCode.Unauthorized)
1003
            {
1004
                Log.Error("Not allowed to upload file", exc);
1005
                var message = String.Format("Not allowed to uplad file {0}", action.LocalFile.FullName);
1006
                StatusKeeper.SetFileState(action.LocalFile.FullName, FileStatus.Unchanged, FileOverlayStatus.Normal);
1007
                StatusNotification.NotifyChange(message, TraceLevel.Warning);
1008
                return true;
1009
            }
1010
            return false;
1011
        }
1012

    
1013
        public async Task UploadWithHashMap(AccountInfo accountInfo,ObjectInfo cloudFile,FileInfo fileInfo,string url,TreeHash treeHash)
1014
        {
1015
            if (accountInfo == null)
1016
                throw new ArgumentNullException("accountInfo");
1017
            if (cloudFile==null)
1018
                throw new ArgumentNullException("cloudFile");
1019
            if (fileInfo == null)
1020
                throw new ArgumentNullException("fileInfo");
1021
            if (String.IsNullOrWhiteSpace(url))
1022
                throw new ArgumentNullException(url);
1023
            if (treeHash==null)
1024
                throw new ArgumentNullException("treeHash");
1025
            if (String.IsNullOrWhiteSpace(cloudFile.Container) )
1026
                throw new ArgumentException("Invalid container","cloudFile");
1027
            Contract.EndContractBlock();
1028

    
1029
            var fullFileName = fileInfo.GetProperCapitalization();
1030

    
1031
            var account = cloudFile.Account ?? accountInfo.UserName;
1032
            var container = cloudFile.Container ;
1033

    
1034
            var client = new CloudFilesClient(accountInfo);
1035
            //Send the hashmap to the server            
1036
            var missingHashes =  await client.PutHashMap(account, container, url, treeHash);
1037
            //If the server returns no missing hashes, we are done
1038
            while (missingHashes.Count > 0)
1039
            {
1040

    
1041
                var buffer = new byte[accountInfo.BlockSize];
1042
                foreach (var missingHash in missingHashes)
1043
                {
1044
                    //Find the proper block
1045
                    var blockIndex = treeHash.HashDictionary[missingHash];
1046
                    var offset = blockIndex*accountInfo.BlockSize;
1047

    
1048
                    var read = fileInfo.Read(buffer, offset, accountInfo.BlockSize);
1049

    
1050
                    try
1051
                    {
1052
                        //And upload the block                
1053
                        await client.PostBlock(account, container, buffer, 0, read);
1054
                        Log.InfoFormat("[BLOCK] Block {0} of {1} uploaded", blockIndex, fullFileName);
1055
                    }
1056
                    catch (Exception exc)
1057
                    {
1058
                        Log.ErrorFormat("[ERROR] uploading block {0} of {1}\n{2}", blockIndex, fullFileName, exc);
1059
                    }
1060

    
1061
                }
1062

    
1063
                //Repeat until there are no more missing hashes                
1064
                missingHashes = await client.PutHashMap(account, container, url, treeHash);
1065
            }
1066
        }
1067

    
1068

    
1069
        public void AddAccount(AccountInfo accountInfo)
1070
        {            
1071
            if (!_accounts.Contains(accountInfo))
1072
                _accounts.Add(accountInfo);
1073
        }
1074
    }
1075

    
1076
   
1077

    
1078

    
1079
}