Revision 3742088d trunk/Pithos.Core/Agents/NetworkAgent.cs
b/trunk/Pithos.Core/Agents/NetworkAgent.cs | ||
---|---|---|
60 | 60 |
{ |
61 | 61 |
private Agent<CloudAction> _agent; |
62 | 62 |
|
63 |
//A separate agent is used to execute delete actions immediatelly; |
|
64 |
private ActionBlock<CloudDeleteAction> _deleteAgent; |
|
65 |
|
|
66 |
//TODO: CHECK |
|
67 |
readonly ConcurrentDictionary<string,DateTime> _deletedFiles=new ConcurrentDictionary<string, DateTime>(); |
|
68 |
|
|
69 |
|
|
63 |
[System.ComponentModel.Composition.Import] |
|
64 |
private DeleteAgent _deleteAgent=new DeleteAgent(); |
|
70 | 65 |
|
71 | 66 |
[System.ComponentModel.Composition.Import] |
72 | 67 |
public IStatusKeeper StatusKeeper { get; set; } |
... | ... | |
84 | 79 |
|
85 | 80 |
//The Sync Event signals a manual synchronisation |
86 | 81 |
private readonly AsyncManualResetEvent _syncEvent=new AsyncManualResetEvent(); |
87 |
//The Pause event stops the network agent to give priority to the deletion agent |
|
88 |
//Initially the event is signalled because we don't need to pause |
|
89 |
private readonly AsyncManualResetEvent _pauseEvent = new AsyncManualResetEvent(true); |
|
90 | 82 |
|
91 | 83 |
private ConcurrentDictionary<string, DateTime> _lastSeen = new ConcurrentDictionary<string, DateTime>(); |
92 | 84 |
|
... | ... | |
98 | 90 |
Action loop = null; |
99 | 91 |
loop = () => |
100 | 92 |
{ |
101 |
_pauseEvent.Wait();
|
|
93 |
_deleteAgent.PauseEvent.Wait();
|
|
102 | 94 |
var message = inbox.Receive(); |
103 | 95 |
var process=message.Then(Process,inbox.CancellationToken); |
104 | 96 |
inbox.LoopAsync(process, loop); |
... | ... | |
106 | 98 |
loop(); |
107 | 99 |
}); |
108 | 100 |
|
109 |
_deleteAgent = new ActionBlock<CloudDeleteAction>(message =>ProcessDelete(message),new ExecutionDataflowBlockOptions{MaxDegreeOfParallelism=4}); |
|
110 | 101 |
} |
111 | 102 |
|
112 | 103 |
private async Task Process(CloudAction action) |
... | ... | |
134 | 125 |
//Redirect deletes to the delete agent |
135 | 126 |
_deleteAgent.Post((CloudDeleteAction)action); |
136 | 127 |
} |
137 |
if (IsDeletedFile(action)) |
|
128 |
if (_deleteAgent.IsDeletedFile(action))
|
|
138 | 129 |
{ |
139 | 130 |
//Clear the status of already deleted files to avoid reprocessing |
140 | 131 |
if (action.LocalFile != null) |
... | ... | |
212 | 203 |
StatusNotification.Notify(new Notification()); |
213 | 204 |
} |
214 | 205 |
|
215 |
/// <summary> |
|
216 |
/// Processes cloud delete actions |
|
217 |
/// </summary> |
|
218 |
/// <param name="action">The delete action to execute</param> |
|
219 |
/// <returns></returns> |
|
220 |
/// <remarks> |
|
221 |
/// When a file/folder is deleted locally, we must delete it ASAP from the server and block any download |
|
222 |
/// operations that may be in progress. |
|
223 |
/// <para> |
|
224 |
/// A separate agent is used to process deletes because the main agent may be busy with a long operation. |
|
225 |
/// </para> |
|
226 |
/// </remarks> |
|
227 |
private void ProcessDelete(CloudDeleteAction action) |
|
228 |
{ |
|
229 |
if (action == null) |
|
230 |
throw new ArgumentNullException("action"); |
|
231 |
if (action.AccountInfo==null) |
|
232 |
throw new ArgumentException("The action.AccountInfo is empty","action"); |
|
233 |
Contract.EndContractBlock(); |
|
234 |
|
|
235 |
var accountInfo = action.AccountInfo; |
|
236 |
|
|
237 |
using (log4net.ThreadContext.Stacks["NETWORK"].Push("PROCESS")) |
|
238 |
{ |
|
239 |
Log.InfoFormat("[ACTION] Start Processing {0}", action); |
|
240 |
|
|
241 |
var cloudFile = action.CloudFile; |
|
242 |
|
|
243 |
try |
|
244 |
{ |
|
245 |
//Acquire a lock on the deleted file to prevent uploading/downloading operations from the normal |
|
246 |
//agent |
|
247 |
using (var gate = NetworkGate.Acquire(action.LocalFile.FullName, NetworkOperation.Deleting)) |
|
248 |
{ |
|
249 |
|
|
250 |
//Add the file URL to the deleted files list |
|
251 |
var key = GetFileKey(action.CloudFile); |
|
252 |
_deletedFiles[key] = DateTime.Now; |
|
253 |
|
|
254 |
_pauseEvent.Reset(); |
|
255 |
// and then delete the file from the server |
|
256 |
DeleteCloudFile(accountInfo, cloudFile); |
|
257 |
|
|
258 |
Log.InfoFormat("[ACTION] End Delete {0}:{1}->{2}", action.Action, action.LocalFile, |
|
259 |
action.CloudFile.Name); |
|
260 |
} |
|
261 |
} |
|
262 |
catch (WebException exc) |
|
263 |
{ |
|
264 |
Log.ErrorFormat("[WEB ERROR] {0} : {1} -> {2} due to exception\r\n{3}", action.Action, action.LocalFile, action.CloudFile, exc); |
|
265 |
} |
|
266 |
catch (OperationCanceledException) |
|
267 |
{ |
|
268 |
throw; |
|
269 |
} |
|
270 |
catch (DirectoryNotFoundException) |
|
271 |
{ |
|
272 |
Log.ErrorFormat("{0} : {1} -> {2} failed because the directory was not found.\n Rescheduling a delete", |
|
273 |
action.Action, action.LocalFile, action.CloudFile); |
|
274 |
//Repost a delete action for the missing file |
|
275 |
_deleteAgent.Post(action); |
|
276 |
} |
|
277 |
catch (FileNotFoundException) |
|
278 |
{ |
|
279 |
Log.ErrorFormat("{0} : {1} -> {2} failed because the file was not found.\n Rescheduling a delete", |
|
280 |
action.Action, action.LocalFile, action.CloudFile); |
|
281 |
//Post a delete action for the missing file |
|
282 |
_deleteAgent.Post(action); |
|
283 |
} |
|
284 |
catch (Exception exc) |
|
285 |
{ |
|
286 |
Log.ErrorFormat("[REQUEUE] {0} : {1} -> {2} due to exception\r\n{3}", |
|
287 |
action.Action, action.LocalFile, action.CloudFile, exc); |
|
288 |
|
|
289 |
_deleteAgent.Post(action); |
|
290 |
} |
|
291 |
finally |
|
292 |
{ |
|
293 |
//Set the event when all delete actions are processed |
|
294 |
if (_deleteAgent.InputCount == 0) |
|
295 |
_pauseEvent.Set(); |
|
296 |
|
|
297 |
} |
|
298 |
} |
|
299 |
} |
|
300 |
|
|
301 |
private static string GetFileKey(ObjectInfo info) |
|
302 |
{ |
|
303 |
var key = String.Format("{0}/{1}/{2}", info.Account, info.Container,info.Name); |
|
304 |
return key; |
|
305 |
} |
|
306 |
|
|
206 |
|
|
307 | 207 |
private async Task SyncFiles(AccountInfo accountInfo,CloudAction action) |
308 | 208 |
{ |
309 | 209 |
if (accountInfo == null) |
... | ... | |
398 | 298 |
throw new ArgumentException("The CloudAction.AccountInfo is empty","cloudAction"); |
399 | 299 |
Contract.EndContractBlock(); |
400 | 300 |
|
401 |
_pauseEvent.Wait();
|
|
301 |
_deleteAgent.PauseEvent.Wait();
|
|
402 | 302 |
|
403 | 303 |
//If the action targets a local file, add a treehash calculation |
404 | 304 |
if (!(cloudAction is CloudDeleteAction) && cloudAction.LocalFile as FileInfo != null) |
... | ... | |
505 | 405 |
|
506 | 406 |
using (log4net.ThreadContext.Stacks["Retrieve Remote"].Push(accountInfo.UserName)) |
507 | 407 |
{ |
508 |
await _pauseEvent.WaitAsync();
|
|
408 |
await _deleteAgent.PauseEvent.WaitAsync();
|
|
509 | 409 |
|
510 | 410 |
Log.Info("Scheduled"); |
511 | 411 |
var client = new CloudFilesClient(accountInfo) |
... | ... | |
520 | 420 |
|
521 | 421 |
try |
522 | 422 |
{ |
523 |
await _pauseEvent.WaitAsync();
|
|
423 |
await _deleteAgent.PauseEvent.WaitAsync();
|
|
524 | 424 |
//Get the poll time now. We may miss some deletions but it's better to keep a file that was deleted |
525 | 425 |
//than delete a file that was created while we were executing the poll |
526 | 426 |
var pollTime = DateTime.Now; |
... | ... | |
824 | 724 |
NativeMethods.RaiseChangeNotification(newFilePath); |
825 | 725 |
} |
826 | 726 |
|
827 |
private void DeleteCloudFile(AccountInfo accountInfo, ObjectInfo cloudFile) |
|
828 |
{ |
|
829 |
if (accountInfo == null) |
|
830 |
throw new ArgumentNullException("accountInfo"); |
|
831 |
if (cloudFile==null) |
|
832 |
throw new ArgumentNullException("cloudFile"); |
|
833 |
|
|
834 |
if (String.IsNullOrWhiteSpace(cloudFile.Container)) |
|
835 |
throw new ArgumentException("Invalid container", "cloudFile"); |
|
836 |
Contract.EndContractBlock(); |
|
837 |
|
|
838 |
var fileAgent = GetFileAgent(accountInfo); |
|
839 |
|
|
840 |
using ( log4net.ThreadContext.Stacks["DeleteCloudFile"].Push("Delete")) |
|
841 |
{ |
|
842 |
var fileName= cloudFile.RelativeUrlToFilePath(accountInfo.UserName); |
|
843 |
var info = fileAgent.GetFileSystemInfo(fileName); |
|
844 |
var fullPath = info.FullName.ToLower(); |
|
845 |
|
|
846 |
StatusKeeper.SetFileOverlayStatus(fullPath, FileOverlayStatus.Modified); |
|
847 |
|
|
848 |
var account = cloudFile.Account ?? accountInfo.UserName; |
|
849 |
var container = cloudFile.Container ;//?? FolderConstants.PithosContainer; |
|
850 |
|
|
851 |
var client = new CloudFilesClient(accountInfo); |
|
852 |
client.DeleteObject(account, container, cloudFile.Name); |
|
853 |
|
|
854 |
StatusKeeper.ClearFileStatus(fullPath); |
|
855 |
} |
|
856 |
} |
|
857 |
|
|
858 | 727 |
//Download a file. |
859 | 728 |
private async Task DownloadCloudFile(AccountInfo accountInfo, ObjectInfo cloudFile , string filePath) |
860 | 729 |
{ |
... | ... | |
1181 | 1050 |
|
1182 | 1051 |
} |
1183 | 1052 |
|
1184 |
//Returns true if an action concerns a file that was deleted |
|
1185 |
private bool IsDeletedFile(CloudAction action) |
|
1186 |
{ |
|
1187 |
//Doesn't work for actions targeting shared files |
|
1188 |
if (action.IsShared) |
|
1189 |
return false; |
|
1190 |
var key = GetFileKey(action.CloudFile); |
|
1191 |
DateTime entryDate; |
|
1192 |
if (_deletedFiles.TryGetValue(key, out entryDate)) |
|
1193 |
{ |
|
1194 |
//If the delete entry was created after this action, abort the action |
|
1195 |
if (entryDate > action.Created) |
|
1196 |
return true; |
|
1197 |
//Otherwise, remove the stale entry |
|
1198 |
_deletedFiles.TryRemove(key, out entryDate); |
|
1199 |
} |
|
1200 |
return false; |
|
1201 |
} |
|
1053 |
|
|
1202 | 1054 |
|
1203 | 1055 |
private bool HandleUploadWebException(CloudAction action, WebException exc) |
1204 | 1056 |
{ |
Also available in: Unified diff