Revision 3742088d

b/trunk/Pithos.Core/Agents/DeleteAgent.cs
1
// -----------------------------------------------------------------------
2
// <copyright file="DeleteAgent.cs" company="GRNET">
3
// Copyright 2011-2012 GRNET S.A. All rights reserved.
4
// 
5
// Redistribution and use in source and binary forms, with or
6
// without modification, are permitted provided that the following
7
// conditions are met:
8
// 
9
//   1. Redistributions of source code must retain the above
10
//      copyright notice, this list of conditions and the following
11
//      disclaimer.
12
// 
13
//   2. Redistributions in binary form must reproduce the above
14
//      copyright notice, this list of conditions and the following
15
//      disclaimer in the documentation and/or other materials
16
//      provided with the distribution.
17
// 
18
// THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
19
// OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
20
// WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
21
// PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
22
// CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
23
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
24
// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
25
// USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
26
// AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
27
// LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
28
// ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
29
// POSSIBILITY OF SUCH DAMAGE.
30
// 
31
// The views and conclusions contained in the software and
32
// documentation are those of the authors and should not be
33
// interpreted as representing official policies, either expressed
34
// or implied, of GRNET S.A.
35
// </copyright>
36
// -----------------------------------------------------------------------
37

  
38
using System.Collections.Concurrent;
39
using System.ComponentModel.Composition;
40
using System.Diagnostics.Contracts;
41
using System.IO;
42
using System.Net;
43
using System.Threading.Tasks.Dataflow;
44
using Pithos.Interfaces;
45
using Pithos.Network;
46
using System;
47
using log4net;
48

  
49
namespace Pithos.Core.Agents
50
{
51

  
52
    /// <summary>
53
    /// The Delete Agent is used to delete files from the Pithos server with high priority, 
54
    /// blocking the network agent through the PauseEvent until all pending deletions complete
55
    /// </summary>    
56
    [Export]
57
    public class DeleteAgent
58
    {
59

  
60
        [Import]
61
        public IStatusKeeper StatusKeeper { get; set; }
62

  
63
        private static readonly ILog Log = LogManager.GetLogger("DeleteAgent");
64

  
65
        //A separate agent is used to execute delete actions immediatelly;
66
        private readonly ActionBlock<CloudDeleteAction> _deleteAgent;
67

  
68
        //The Pause event stops the network agent to give priority to the deletion agent
69
        //Initially the event is signalled because we don't need to pause
70
        private readonly AsyncManualResetEvent _pauseEvent = new AsyncManualResetEvent(true);
71

  
72
        public AsyncManualResetEvent PauseEvent
73
        {
74
            get { return _pauseEvent; }
75
        }
76

  
77
        //Deleted file names are stored in memory so we can check that a file has already been deleted.
78
        //and avoid sending duplicate delete commands
79
        readonly ConcurrentDictionary<string, DateTime> _deletedFiles = new ConcurrentDictionary<string, DateTime>();
80

  
81
        public DeleteAgent()
82
        {
83
            _deleteAgent = new ActionBlock<CloudDeleteAction>(message => ProcessDelete(message), new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 4 });
84

  
85
        }
86

  
87
        /// <summary>
88
        /// Processes cloud delete actions
89
        /// </summary>
90
        /// <param name="action">The delete action to execute</param>
91
        /// <returns></returns>
92
        /// <remarks>
93
        /// When a file/folder is deleted locally, we must delete it ASAP from the server and block any download
94
        /// operations that may be in progress.
95
        /// <para>
96
        /// A separate agent is used to process deletes because the main agent may be busy with a long operation.
97
        /// </para>
98
        /// </remarks>
99
        private void ProcessDelete(CloudDeleteAction action)
100
        {
101
            if (action == null)
102
                throw new ArgumentNullException("action");
103
            if (action.AccountInfo == null)
104
                throw new ArgumentException("The action.AccountInfo is empty", "action");
105
            Contract.EndContractBlock();
106

  
107
            var accountInfo = action.AccountInfo;
108

  
109
            using (log4net.ThreadContext.Stacks["NETWORK"].Push("PROCESS"))
110
            {
111
                Log.InfoFormat("[ACTION] Start Processing {0}", action);
112

  
113
                var cloudFile = action.CloudFile;
114

  
115
                try
116
                {
117
                    //Acquire a lock on the deleted file to prevent uploading/downloading operations from the normal
118
                    //agent
119
                    using (NetworkGate.Acquire(action.LocalFile.FullName, NetworkOperation.Deleting))
120
                    {
121

  
122
                        //Add the file URL to the deleted files list
123
                        var key = GetFileKey(action.CloudFile);
124
                        _deletedFiles[key] = DateTime.Now;
125

  
126
                        _pauseEvent.Reset();
127
                        // and then delete the file from the server
128
                        DeleteCloudFile(accountInfo, cloudFile);
129

  
130
                        Log.InfoFormat("[ACTION] End Delete {0}:{1}->{2}", action.Action, action.LocalFile,
131
                                       action.CloudFile.Name);
132
                    }
133
                }
134
                catch (WebException exc)
135
                {
136
                    Log.ErrorFormat("[WEB ERROR] {0} : {1} -> {2} due to exception\r\n{3}", action.Action, action.LocalFile, action.CloudFile, exc);
137
                }
138
                catch (OperationCanceledException)
139
                {
140
                    throw;
141
                }
142
                catch (DirectoryNotFoundException)
143
                {
144
                    Log.ErrorFormat("{0} : {1} -> {2}  failed because the directory was not found.\n Rescheduling a delete",
145
                        action.Action, action.LocalFile, action.CloudFile);
146
                    //Repost a delete action for the missing file
147
                    _deleteAgent.Post(action);
148
                }
149
                catch (FileNotFoundException)
150
                {
151
                    Log.ErrorFormat("{0} : {1} -> {2}  failed because the file was not found.\n Rescheduling a delete",
152
                        action.Action, action.LocalFile, action.CloudFile);
153
                    //Post a delete action for the missing file
154
                    _deleteAgent.Post(action);
155
                }
156
                catch (Exception exc)
157
                {
158
                    Log.ErrorFormat("[REQUEUE] {0} : {1} -> {2} due to exception\r\n{3}",
159
                                     action.Action, action.LocalFile, action.CloudFile, exc);
160

  
161
                    _deleteAgent.Post(action);
162
                }
163
                finally
164
                {
165
                    //Set the event when all delete actions are processed
166
                    if (_deleteAgent.InputCount == 0)
167
                        _pauseEvent.Set();
168

  
169
                }
170
            }
171
        }
172

  
173
        //Returns true if an action concerns a file that was deleted
174
        public bool IsDeletedFile(CloudAction action)
175
        {
176
            //Doesn't work for actions targeting shared files
177
            if (action.IsShared)
178
                return false;
179
            var key = GetFileKey(action.CloudFile);
180
            DateTime entryDate;
181
            if (_deletedFiles.TryGetValue(key, out entryDate))
182
            {
183
                //If the delete entry was created after this action, abort the action
184
                if (entryDate > action.Created)
185
                    return true;
186
                //Otherwise, remove the stale entry 
187
                _deletedFiles.TryRemove(key, out entryDate);
188
            }
189
            return false;
190
        }
191

  
192
        public void Post(CloudDeleteAction action)
193
        {
194
            _deleteAgent.Post(action);
195
        }
196

  
197
        private void DeleteCloudFile(AccountInfo accountInfo, ObjectInfo cloudFile)
198
        {
199
            if (accountInfo == null)
200
                throw new ArgumentNullException("accountInfo");
201
            if (cloudFile == null)
202
                throw new ArgumentNullException("cloudFile");
203

  
204
            if (String.IsNullOrWhiteSpace(cloudFile.Container))
205
                throw new ArgumentException("Invalid container", "cloudFile");
206
            Contract.EndContractBlock();
207

  
208
            var fileAgent = GetFileAgent(accountInfo);
209

  
210
            using (ThreadContext.Stacks["DeleteCloudFile"].Push("Delete"))
211
            {
212
                var fileName = cloudFile.RelativeUrlToFilePath(accountInfo.UserName);
213
                var info = fileAgent.GetFileSystemInfo(fileName);
214
                var fullPath = info.FullName.ToLower();
215

  
216
                StatusKeeper.SetFileOverlayStatus(fullPath, FileOverlayStatus.Modified);
217

  
218
                var account = cloudFile.Account ?? accountInfo.UserName;
219
                var container = cloudFile.Container;//?? FolderConstants.PithosContainer;
220

  
221
                var client = new CloudFilesClient(accountInfo);
222
                client.DeleteObject(account, container, cloudFile.Name);
223

  
224
                StatusKeeper.ClearFileStatus(fullPath);
225
            }
226
        }
227

  
228

  
229
        private static string GetFileKey(ObjectInfo info)
230
        {
231
            var key = String.Format("{0}/{1}/{2}", info.Account, info.Container, info.Name);
232
            return key;
233
        }
234

  
235
        private static FileAgent GetFileAgent(AccountInfo accountInfo)
236
        {
237
            return AgentLocator<FileAgent>.Get(accountInfo.AccountPath);
238
        }
239

  
240
    }
241
}
b/trunk/Pithos.Core/Agents/NetworkAgent.cs
60 60
    {
61 61
        private Agent<CloudAction> _agent;
62 62

  
63
        //A separate agent is used to execute delete actions immediatelly;
64
        private ActionBlock<CloudDeleteAction> _deleteAgent;
65

  
66
        //TODO: CHECK
67
        readonly ConcurrentDictionary<string,DateTime> _deletedFiles=new ConcurrentDictionary<string, DateTime>();
68

  
69

  
63
        [System.ComponentModel.Composition.Import]
64
        private DeleteAgent _deleteAgent=new DeleteAgent();
70 65

  
71 66
        [System.ComponentModel.Composition.Import]
72 67
        public IStatusKeeper StatusKeeper { get; set; }
......
84 79
        
85 80
        //The Sync Event signals a manual synchronisation
86 81
        private readonly AsyncManualResetEvent _syncEvent=new AsyncManualResetEvent();
87
        //The Pause event stops the network agent to give priority to the deletion agent
88
        //Initially the event is signalled because we don't need to pause
89
        private readonly AsyncManualResetEvent _pauseEvent = new AsyncManualResetEvent(true);
90 82

  
91 83
        private ConcurrentDictionary<string, DateTime> _lastSeen = new ConcurrentDictionary<string, DateTime>();
92 84

  
......
98 90
                Action loop = null;
99 91
                loop = () =>
100 92
                {
101
                    _pauseEvent.Wait();
93
                    _deleteAgent.PauseEvent.Wait();
102 94
                    var message = inbox.Receive();
103 95
                    var process=message.Then(Process,inbox.CancellationToken);
104 96
                    inbox.LoopAsync(process, loop);
......
106 98
                loop();
107 99
            });
108 100

  
109
            _deleteAgent = new ActionBlock<CloudDeleteAction>(message =>ProcessDelete(message),new ExecutionDataflowBlockOptions{MaxDegreeOfParallelism=4});
110 101
        }
111 102

  
112 103
        private async Task Process(CloudAction action)
......
134 125
                        //Redirect deletes to the delete agent 
135 126
                        _deleteAgent.Post((CloudDeleteAction)action);
136 127
                    }
137
                    if (IsDeletedFile(action))
128
                    if (_deleteAgent.IsDeletedFile(action))
138 129
                    {
139 130
                        //Clear the status of already deleted files to avoid reprocessing
140 131
                        if (action.LocalFile != null)
......
212 203
            StatusNotification.Notify(new Notification());
213 204
        }
214 205

  
215
        /// <summary>
216
        /// Processes cloud delete actions
217
        /// </summary>
218
        /// <param name="action">The delete action to execute</param>
219
        /// <returns></returns>
220
        /// <remarks>
221
        /// When a file/folder is deleted locally, we must delete it ASAP from the server and block any download
222
        /// operations that may be in progress.
223
        /// <para>
224
        /// A separate agent is used to process deletes because the main agent may be busy with a long operation.
225
        /// </para>
226
        /// </remarks>
227
        private void ProcessDelete(CloudDeleteAction action)
228
        {
229
            if (action == null)
230
                throw new ArgumentNullException("action");
231
            if (action.AccountInfo==null)
232
                throw new ArgumentException("The action.AccountInfo is empty","action");
233
            Contract.EndContractBlock();
234

  
235
            var accountInfo = action.AccountInfo;
236

  
237
            using (log4net.ThreadContext.Stacks["NETWORK"].Push("PROCESS"))
238
            {                
239
                Log.InfoFormat("[ACTION] Start Processing {0}", action);
240

  
241
                var cloudFile = action.CloudFile;
242

  
243
                try
244
                {
245
                    //Acquire a lock on the deleted file to prevent uploading/downloading operations from the normal
246
                    //agent
247
                    using (var gate = NetworkGate.Acquire(action.LocalFile.FullName, NetworkOperation.Deleting))
248
                    {
249

  
250
                        //Add the file URL to the deleted files list
251
                        var key = GetFileKey(action.CloudFile);
252
                        _deletedFiles[key] = DateTime.Now;
253

  
254
                        _pauseEvent.Reset();
255
                        // and then delete the file from the server
256
                                DeleteCloudFile(accountInfo, cloudFile);
257

  
258
                        Log.InfoFormat("[ACTION] End Delete {0}:{1}->{2}", action.Action, action.LocalFile,
259
                                       action.CloudFile.Name);
260
                    }
261
                }
262
                catch (WebException exc)
263
                {
264
                    Log.ErrorFormat("[WEB ERROR] {0} : {1} -> {2} due to exception\r\n{3}", action.Action, action.LocalFile, action.CloudFile, exc);
265
                }
266
                catch (OperationCanceledException)
267
                {
268
                    throw;
269
                }
270
                catch (DirectoryNotFoundException)
271
                {
272
                    Log.ErrorFormat("{0} : {1} -> {2}  failed because the directory was not found.\n Rescheduling a delete",
273
                        action.Action, action.LocalFile, action.CloudFile);
274
                    //Repost a delete action for the missing file
275
                    _deleteAgent.Post(action);
276
                }
277
                catch (FileNotFoundException)
278
                {
279
                    Log.ErrorFormat("{0} : {1} -> {2}  failed because the file was not found.\n Rescheduling a delete",
280
                        action.Action, action.LocalFile, action.CloudFile);
281
                    //Post a delete action for the missing file
282
                    _deleteAgent.Post(action);
283
                }
284
                catch (Exception exc)
285
                {
286
                    Log.ErrorFormat("[REQUEUE] {0} : {1} -> {2} due to exception\r\n{3}",
287
                                     action.Action, action.LocalFile, action.CloudFile, exc);
288

  
289
                    _deleteAgent.Post(action);
290
                }
291
                finally
292
                {
293
                    //Set the event when all delete actions are processed
294
                    if (_deleteAgent.InputCount == 0)
295
                        _pauseEvent.Set();
296

  
297
                }
298
            }
299
        }
300

  
301
        private static string GetFileKey(ObjectInfo info)
302
        {
303
            var key = String.Format("{0}/{1}/{2}", info.Account, info.Container,info.Name);
304
            return key;
305
        }
306

  
206
        
307 207
        private async Task SyncFiles(AccountInfo accountInfo,CloudAction action)
308 208
        {
309 209
            if (accountInfo == null)
......
398 298
                throw new ArgumentException("The CloudAction.AccountInfo is empty","cloudAction");
399 299
            Contract.EndContractBlock();
400 300

  
401
            _pauseEvent.Wait();
301
            _deleteAgent.PauseEvent.Wait();
402 302

  
403 303
            //If the action targets a local file, add a treehash calculation
404 304
            if (!(cloudAction is CloudDeleteAction) && cloudAction.LocalFile as FileInfo != null)
......
505 405

  
506 406
            using (log4net.ThreadContext.Stacks["Retrieve Remote"].Push(accountInfo.UserName))
507 407
            {
508
                await _pauseEvent.WaitAsync();
408
                await _deleteAgent.PauseEvent.WaitAsync();
509 409

  
510 410
                Log.Info("Scheduled");
511 411
                var client = new CloudFilesClient(accountInfo)
......
520 420

  
521 421
                try
522 422
                {
523
                    await _pauseEvent.WaitAsync();
423
                    await _deleteAgent.PauseEvent.WaitAsync();
524 424
                    //Get the poll time now. We may miss some deletions but it's better to keep a file that was deleted
525 425
                    //than delete a file that was created while we were executing the poll                    
526 426
                    var pollTime = DateTime.Now;
......
824 724
            NativeMethods.RaiseChangeNotification(newFilePath);
825 725
        }
826 726

  
827
        private void DeleteCloudFile(AccountInfo accountInfo, ObjectInfo cloudFile)
828
        {
829
            if (accountInfo == null)
830
                throw new ArgumentNullException("accountInfo");
831
            if (cloudFile==null)
832
                throw new ArgumentNullException("cloudFile");
833

  
834
            if (String.IsNullOrWhiteSpace(cloudFile.Container))
835
                throw new ArgumentException("Invalid container", "cloudFile");
836
            Contract.EndContractBlock();
837
            
838
            var fileAgent = GetFileAgent(accountInfo);
839

  
840
            using ( log4net.ThreadContext.Stacks["DeleteCloudFile"].Push("Delete"))
841
            {
842
                var fileName= cloudFile.RelativeUrlToFilePath(accountInfo.UserName);
843
                var info = fileAgent.GetFileSystemInfo(fileName);                
844
                var fullPath = info.FullName.ToLower();
845

  
846
                StatusKeeper.SetFileOverlayStatus(fullPath, FileOverlayStatus.Modified);
847

  
848
                var account = cloudFile.Account ?? accountInfo.UserName;
849
                var container = cloudFile.Container ;//?? FolderConstants.PithosContainer;
850

  
851
                var client = new CloudFilesClient(accountInfo);
852
                client.DeleteObject(account, container, cloudFile.Name);
853

  
854
                StatusKeeper.ClearFileStatus(fullPath);
855
            }
856
        }
857

  
858 727
        //Download a file.
859 728
        private async Task DownloadCloudFile(AccountInfo accountInfo, ObjectInfo cloudFile , string filePath)
860 729
        {
......
1181 1050

  
1182 1051
        }
1183 1052

  
1184
        //Returns true if an action concerns a file that was deleted
1185
        private bool IsDeletedFile(CloudAction action)
1186
        {
1187
            //Doesn't work for actions targeting shared files
1188
            if (action.IsShared)
1189
                return false;
1190
            var key = GetFileKey(action.CloudFile);
1191
            DateTime entryDate;
1192
            if (_deletedFiles.TryGetValue(key, out entryDate))
1193
            {
1194
                //If the delete entry was created after this action, abort the action
1195
                if (entryDate > action.Created)
1196
                    return true;
1197
                //Otherwise, remove the stale entry 
1198
                _deletedFiles.TryRemove(key, out entryDate);
1199
            }
1200
            return false;
1201
        }
1053

  
1202 1054

  
1203 1055
        private bool HandleUploadWebException(CloudAction action, WebException exc)
1204 1056
        {
b/trunk/Pithos.Core/Pithos.Core.csproj
381 381
    <Compile Include="Agents\BlockUpdater.cs" />
382 382
    <Compile Include="Agents\CloudTransferAction.cs" />
383 383
    <Compile Include="Agents\CollectionExtensions.cs" />
384
    <Compile Include="Agents\DeleteAgent.cs" />
384 385
    <Compile Include="Agents\FileAgent.cs" />
385 386
    <Compile Include="Agents\BlockExtensions.cs" />
386 387
    <Compile Include="Agents\FileSystemWatcherAdapter.cs" />

Also available in: Unified diff