Statistics
| Branch: | Revision:

root / trunk / Pithos.Core / Agents / NetworkAgent.cs @ 268bec7f

History | View | Annotate | Download (21 kB)

1
#region
2
/* -----------------------------------------------------------------------
3
 * <copyright file="NetworkAgent.cs" company="GRNet">
4
 * 
5
 * Copyright 2011-2012 GRNET S.A. All rights reserved.
6
 *
7
 * Redistribution and use in source and binary forms, with or
8
 * without modification, are permitted provided that the following
9
 * conditions are met:
10
 *
11
 *   1. Redistributions of source code must retain the above
12
 *      copyright notice, this list of conditions and the following
13
 *      disclaimer.
14
 *
15
 *   2. Redistributions in binary form must reproduce the above
16
 *      copyright notice, this list of conditions and the following
17
 *      disclaimer in the documentation and/or other materials
18
 *      provided with the distribution.
19
 *
20
 *
21
 * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
22
 * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
23
 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
24
 * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
25
 * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26
 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27
 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
28
 * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
29
 * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
30
 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
31
 * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
32
 * POSSIBILITY OF SUCH DAMAGE.
33
 *
34
 * The views and conclusions contained in the software and
35
 * documentation are those of the authors and should not be
36
 * interpreted as representing official policies, either expressed
37
 * or implied, of GRNET S.A.
38
 * </copyright>
39
 * -----------------------------------------------------------------------
40
 */
41
#endregion
42

    
43
using System;
44
using System.Collections.Generic;
45
using System.ComponentModel.Composition;
46
using System.Diagnostics;
47
using System.Diagnostics.Contracts;
48
using System.IO;
49
using System.Net;
50
using System.Reflection;
51
using System.Threading;
52
using System.Threading.Tasks;
53
using Castle.ActiveRecord;
54
using Pithos.Interfaces;
55
using Pithos.Network;
56
using log4net;
57

    
58
namespace Pithos.Core.Agents
59
{
60
    [Export]
61
    public class NetworkAgent
62
    {
63
        private static readonly ILog Log = LogManager.GetLogger(MethodBase.GetCurrentMethod().DeclaringType);
64

    
65
        private Agent<CloudAction> _agent;
66

    
67
        [System.ComponentModel.Composition.Import]
68
        private DeleteAgent DeleteAgent { get; set; }
69

    
70
        [System.ComponentModel.Composition.Import]
71
        public IStatusKeeper StatusKeeper { get; set; }
72

    
73
        private IStatusNotification _statusNotification;
74
        public IStatusNotification StatusNotification
75
        {
76
            get { return _statusNotification; }
77
            set
78
            {
79
                _statusNotification = value;
80
                DeleteAgent.StatusNotification = value;
81
                Uploader.StatusNotification = value;
82
                Downloader.StatusNotification = value;
83
            }
84
        }
85

    
86

    
87
        [System.ComponentModel.Composition.Import]
88
        public IPithosSettings Settings { get; set; }
89

    
90
        [System.ComponentModel.Composition.Import]
91
        public Uploader Uploader { get; set; }
92

    
93
        [System.ComponentModel.Composition.Import]
94
        public Downloader Downloader { get; set; }
95

    
96
        //The Proceed signals the poll agent that it can proceed with polling. 
97
        //Essentially it stops the poll agent to give priority to the network agent
98
        //Initially the event is signalled because we don't need to pause
99
        private readonly AsyncManualResetEvent _proceedEvent = new AsyncManualResetEvent(true);
100

    
101
        public AsyncManualResetEvent ProceedEvent
102
        {
103
            get { return _proceedEvent; }
104
        }
105

    
106

    
107
        public void Start()
108
        {
109
            if (_agent != null)
110
                return;
111

    
112
            if (Log.IsDebugEnabled)
113
                Log.Debug("Starting Network Agent");
114

    
115
            _agent = Agent<CloudAction>.Start(inbox =>
116
            {
117
                Action loop = null;
118
                loop = () =>
119
                {
120
                    DeleteAgent.ProceedEvent.Wait();
121
                    var message = inbox.Receive();
122
                    var process=message.Then(Process,inbox.CancellationToken);
123
                    inbox.LoopAsync(process, loop);
124
                };
125
                loop();
126
            });
127

    
128
        }
129

    
130
        private async Task Process(CloudAction action)
131
        {
132
            if (action == null)
133
                throw new ArgumentNullException("action");
134
            if (action.AccountInfo==null)
135
                throw new ArgumentException("The action.AccountInfo is empty","action");
136
            Contract.EndContractBlock();
137

    
138

    
139

    
140

    
141
            using (ThreadContext.Stacks["Operation"].Push(action.ToString()))
142
            {                
143

    
144
                var cloudFile = action.CloudFile;
145
                var downloadPath = action.GetDownloadPath();
146

    
147
                try
148
                {
149
                    StatusNotification.SetPithosStatus(PithosStatus.LocalSyncing,"Processing");
150
                    _proceedEvent.Reset();
151
                    
152
                    var accountInfo = action.AccountInfo;
153

    
154
                    if (action.Action == CloudActionType.DeleteCloud)
155
                    {                        
156
                        //Redirect deletes to the delete agent 
157
                        DeleteAgent.Post((CloudDeleteAction)action);
158
                    }
159
                    if (DeleteAgent.IsDeletedFile(action))
160
                    {
161
                        //Clear the status of already deleted files to avoid reprocessing
162
                        if (action.LocalFile != null)
163
                            StatusKeeper.ClearFileStatus(action.LocalFile.FullName);
164
                    }
165
                    else
166
                    {
167
                        switch (action.Action)
168
                        {
169
                            case CloudActionType.UploadUnconditional:
170
                                //Abort if the file was deleted before we reached this point
171
                                await Uploader.UploadCloudFile(action);
172
                                break;
173
                            case CloudActionType.DownloadUnconditional:
174
                                await Downloader.DownloadCloudFile(accountInfo, cloudFile, downloadPath);
175
                                break;
176
                            case CloudActionType.RenameCloud:
177
                                var moveAction = (CloudMoveAction)action;
178
                                RenameCloudFile(accountInfo, moveAction);
179
                                break;
180
                            case CloudActionType.RenameLocal:
181
                                RenameLocalFile(accountInfo, action);
182
                                break;
183
                            case CloudActionType.MustSynch:
184
                                if (!File.Exists(downloadPath) && !Directory.Exists(downloadPath))
185
                                {
186
                                    await Downloader.DownloadCloudFile(accountInfo, cloudFile, downloadPath);
187
                                }
188
                                else
189
                                {
190
                                    await SyncFiles(accountInfo, action);
191
                                }
192
                                break;
193
                        }
194
                    }
195
                    Log.InfoFormat("End Processing {0}:{1}->{2}", action.Action, action.LocalFile,
196
                                           action.CloudFile.Name);
197
                }
198
/*
199
                catch (WebException exc)
200
                {                    
201
                    Log.ErrorFormat("[WEB ERROR] {0} : {1} -> {2} due to exception\r\n{3}", action.Action, action.LocalFile, action.CloudFile, exc);
202
                    
203
                    
204
                    //Actions that resulted in server errors should be retried                    
205
                    var response = exc.Response as HttpWebResponse;
206
                    if (response != null && response.StatusCode >= HttpStatusCode.InternalServerError)
207
                    {
208
                        _agent.Post(action);
209
                        Log.WarnFormat("[REQUEUE] {0} : {1} -> {2}", action.Action, action.LocalFile, action.CloudFile);
210
                    }
211
                }
212
*/
213
                catch (OperationCanceledException)
214
                {
215
                    throw;
216
                }
217
                catch (DirectoryNotFoundException)
218
                {
219
                    Log.ErrorFormat("{0} : {1} -> {2}  failed because the directory was not found.\n Rescheduling a delete",
220
                        action.Action, action.LocalFile, action.CloudFile);
221
                    //Post a delete action for the missing file
222
                    Post(new CloudDeleteAction(action));
223
                }
224
                catch (FileNotFoundException)
225
                {
226
                    Log.ErrorFormat("{0} : {1} -> {2}  failed because the file was not found.\n Rescheduling a delete",
227
                        action.Action, action.LocalFile, action.CloudFile);
228
                    //Post a delete action for the missing file
229
                    Post(new CloudDeleteAction(action));
230
                }
231
                catch (Exception exc)
232
                {
233
                    Log.ErrorFormat("[REQUEUE] {0} : {1} -> {2} due to exception\r\n{3}",
234
                                     action.Action, action.LocalFile, action.CloudFile, exc);
235

    
236
                    _agent.Post(action);
237
                }
238
                finally
239
                {
240
                    if (_agent.IsEmpty)
241
                        _proceedEvent.Set();
242
                    UpdateStatus(PithosStatus.LocalComplete);                                        
243
                }
244
            }
245
        }
246

    
247

    
248
        private void UpdateStatus(PithosStatus status)
249
        {
250
            StatusNotification.SetPithosStatus(status);
251
            //StatusNotification.Notify(new Notification());
252
        }
253

    
254
        private void RenameLocalFile(AccountInfo accountInfo, CloudAction action)
255
        {
256
            if (accountInfo == null)
257
                throw new ArgumentNullException("accountInfo");
258
            if (action == null)
259
                throw new ArgumentNullException("action");
260
            if (action.LocalFile == null)
261
                throw new ArgumentException("The action's local file is not specified", "action");
262
            if (!Path.IsPathRooted(action.LocalFile.FullName))
263
                throw new ArgumentException("The action's local file path must be absolute", "action");
264
            if (action.CloudFile == null)
265
                throw new ArgumentException("The action's cloud file is not specified", "action");
266
            Contract.EndContractBlock();
267
            using (ThreadContext.Stacks["Operation"].Push("RenameLocalFile"))
268
            {
269

    
270
                //We assume that the local file already exists, otherwise the poll agent
271
                //would have issued a download request
272

    
273
                var currentInfo = action.CloudFile;
274
                var previousInfo = action.CloudFile.Previous;
275
                var fileAgent = FileAgent.GetFileAgent(accountInfo);
276

    
277
                var previousRelativepath = previousInfo.RelativeUrlToFilePath(accountInfo.UserName);
278
                var previousFile = fileAgent.GetFileSystemInfo(previousRelativepath);
279

    
280
                //In every case we need to move the local file first
281
                MoveLocalFile(accountInfo, previousFile, fileAgent, currentInfo);
282
            }
283
        }
284

    
285
        private void MoveLocalFile(AccountInfo accountInfo, FileSystemInfo previousFile, FileAgent fileAgent,
286
                                   ObjectInfo currentInfo)
287
        {
288
            var currentRelativepath = currentInfo.RelativeUrlToFilePath(accountInfo.UserName);
289
            var newPath = Path.Combine(fileAgent.RootPath, currentRelativepath);
290

    
291
            var isFile= (previousFile is FileInfo);
292
            var previousFullPath = isFile? 
293
                FileInfoExtensions.GetProperFilePathCapitalization(previousFile.FullName):
294
                FileInfoExtensions.GetProperDirectoryCapitalization(previousFile.FullName);                
295
            
296
            using (NetworkGate.Acquire(previousFullPath, NetworkOperation.Renaming))
297
            using (NetworkGate.Acquire(newPath,NetworkOperation.Renaming)) 
298
            using (new SessionScope(FlushAction.Auto))
299
            {
300
                if (isFile)
301
                    (previousFile as FileInfo).MoveTo(newPath);
302
                else
303
                {
304
                    (previousFile as DirectoryInfo).MoveTo(newPath);
305
                }
306
                var state = StatusKeeper.GetStateByFilePath(previousFullPath);
307
                state.FilePath = newPath;
308
                state.SaveCopy();
309
                StatusKeeper.SetFileState(previousFullPath,FileStatus.Deleted,FileOverlayStatus.Deleted, "Deleted");
310
            }            
311
        }
312

    
313
        private async Task SyncFiles(AccountInfo accountInfo,CloudAction action)
314
        {
315
            if (accountInfo == null)
316
                throw new ArgumentNullException("accountInfo");
317
            if (action==null)
318
                throw new ArgumentNullException("action");
319
            if (action.LocalFile==null)
320
                throw new ArgumentException("The action's local file is not specified","action");
321
            if (!Path.IsPathRooted(action.LocalFile.FullName))
322
                throw new ArgumentException("The action's local file path must be absolute","action");
323
            if (action.CloudFile== null)
324
                throw new ArgumentException("The action's cloud file is not specified", "action");
325
            Contract.EndContractBlock();
326
            using (ThreadContext.Stacks["Operation"].Push("SyncFiles"))
327
            {
328

    
329
                //var localFile = action.LocalFile;
330
                var cloudFile = action.CloudFile;
331
                var downloadPath = action.LocalFile.GetProperCapitalization();
332

    
333
                var cloudHash = cloudFile.Hash.ToLower();
334
                var previousCloudHash = cloudFile.PreviousHash == null?null: cloudFile.PreviousHash.ToLower();
335
                var localHash = action.TreeHash.Value.TopHash.ToHashString();// LocalHash.Value.ToLower();
336
                //var topHash = action.TopHash.Value.ToLower();
337

    
338
                if(cloudFile.IsDirectory && action.LocalFile is DirectoryInfo)
339
                {
340
                    Log.InfoFormat("Skipping folder {0} , exists in server", downloadPath);
341
                    return;
342
                }
343

    
344
                //At this point we know that an object has changed on the server and that a local
345
                //file already exists. We need to decide whether the file has only changed on 
346
                //the server or there is a conflicting change on the client.
347
                //
348

    
349
                //If the hashes match, we are done
350
                if (cloudFile != ObjectInfo.Empty && cloudHash == localHash)
351
                {
352
                    Log.InfoFormat("Skipping {0}, hashes match", downloadPath);
353
                    return;
354
                }
355

    
356
                //The hashes DON'T match. We need to sync
357

    
358
                // If the previous tophash matches the local tophash, the file was only changed on the server. 
359
                if (localHash == previousCloudHash)
360
                {
361
                    await Downloader.DownloadCloudFile(accountInfo, cloudFile, downloadPath);
362
                }
363
                else
364
                {
365
                    //If the previous and local hash don't match, there was a local conflict
366
                    //that was not uploaded to the server. We have a conflict
367
                    ReportConflictForMismatch(downloadPath);
368
                }
369
            }
370
        }
371

    
372
        private void ReportConflictForMismatch(string downloadPath)
373
        {
374
            if (String.IsNullOrWhiteSpace(downloadPath))
375
                throw new ArgumentNullException("downloadPath");
376
            Contract.EndContractBlock();
377

    
378
            StatusKeeper.SetFileOverlayStatus(downloadPath, FileOverlayStatus.Conflict);
379
            UpdateStatus(PithosStatus.HasConflicts);
380
            var message = String.Format("Conflict detected for file {0}", downloadPath);
381
            Log.Warn(message);
382
            StatusNotification.NotifyChange(message, TraceLevel.Warning);
383
        }
384

    
385
        public void Post(CloudAction cloudAction)
386
        {
387
            if (cloudAction == null)
388
                throw new ArgumentNullException("cloudAction");
389
            if (cloudAction.AccountInfo==null)
390
                throw new ArgumentException("The CloudAction.AccountInfo is empty","cloudAction");
391
            Contract.EndContractBlock();
392

    
393
            DeleteAgent.ProceedEvent.Wait();
394
/*
395

    
396
            //If the action targets a local file, add a treehash calculation
397
            if (!(cloudAction is CloudDeleteAction) && cloudAction.LocalFile as FileInfo != null)
398
            {
399
                var accountInfo = cloudAction.AccountInfo;
400
                var localFile = (FileInfo) cloudAction.LocalFile;
401

    
402
                if (localFile.Length > accountInfo.BlockSize)
403
                    cloudAction.TopHash =
404
                        new Lazy<string>(() => Signature.CalculateTreeHashAsync(localFile,
405
                                                                                accountInfo.BlockSize,
406
                                                                                accountInfo.BlockHash, Settings.HashingParallelism).Result
407
                                                    .TopHash.ToHashString());
408
                else
409
                {
410
                    cloudAction.TopHash = new Lazy<string>(() => cloudAction.LocalHash.Value);
411
                }
412

    
413
            }
414
            else
415
            {
416
                //The hash for a directory is the empty string
417
                cloudAction.TopHash = new Lazy<string>(() => String.Empty);
418
            }
419
*/
420
            
421
            if (cloudAction is CloudDeleteAction)
422
                DeleteAgent.Post((CloudDeleteAction)cloudAction);
423
            else
424
                _agent.Post(cloudAction);
425
        }
426
       
427

    
428
        public IEnumerable<CloudAction> GetEnumerable()
429
        {
430
            return _agent.GetEnumerable();
431
        }
432

    
433
        public Task GetDeleteAwaiter()
434
        {
435
            return DeleteAgent.ProceedEvent.WaitAsync();
436
        }
437
        public CancellationToken CancellationToken
438
        {
439
            get { return _agent.CancellationToken; }
440
        }
441

    
442

    
443

    
444
        private void RenameCloudFile(AccountInfo accountInfo,CloudMoveAction action)
445
        {
446
            if (accountInfo==null)
447
                throw new ArgumentNullException("accountInfo");
448
            if (action==null)
449
                throw new ArgumentNullException("action");
450
            if (action.CloudFile==null)
451
                throw new ArgumentException("CloudFile","action");
452
            if (action.LocalFile==null)
453
                throw new ArgumentException("LocalFile","action");
454
            if (action.OldLocalFile==null)
455
                throw new ArgumentException("OldLocalFile","action");
456
            if (action.OldCloudFile==null)
457
                throw new ArgumentException("OldCloudFile","action");
458
            Contract.EndContractBlock();
459

    
460
            using (ThreadContext.Stacks["Operation"].Push("RenameCloudFile"))
461
            {
462

    
463
                var newFilePath = action.LocalFile.FullName;
464

    
465
                //How do we handle concurrent renames and deletes/uploads/downloads?
466
                //* A conflicting upload means that a file was renamed before it had a chance to finish uploading
467
                //  This should never happen as the network agent executes only one action at a time
468
                //* A conflicting download means that the file was modified on the cloud. While we can go on and complete
469
                //  the rename, there may be a problem if the file is downloaded in blocks, as subsequent block requests for the 
470
                //  same name will fail.
471
                //  This should never happen as the network agent executes only one action at a time.
472
                //* A conflicting delete can happen if the rename was followed by a delete action that didn't have the chance
473
                //  to remove the rename from the queue.
474
                //  We can probably ignore this case. It will result in an error which should be ignored            
475

    
476

    
477
                //The local file is already renamed
478
                StatusKeeper.SetFileOverlayStatus(newFilePath, FileOverlayStatus.Modified);
479

    
480

    
481
                var account = action.CloudFile.Account ?? accountInfo.UserName;
482
                var container = action.CloudFile.Container;
483

    
484
                var client = new CloudFilesClient(accountInfo);
485
                //TODO: What code is returned when the source file doesn't exist?
486
                client.MoveObject(account, container, action.OldCloudFile.Name, container, action.CloudFile.Name);
487

    
488
                StatusKeeper.SetFileStatus(newFilePath, FileStatus.Unchanged);
489
                StatusKeeper.SetFileOverlayStatus(newFilePath, FileOverlayStatus.Normal);
490
                NativeMethods.RaiseChangeNotification(newFilePath);
491
            }
492
        }
493

    
494

    
495

    
496
    }
497

    
498
   
499

    
500

    
501
}