2 /* -----------------------------------------------------------------------
3 * <copyright file="NetworkAgent.cs" company="GRNet">
5 * Copyright 2011-2012 GRNET S.A. All rights reserved.
7 * Redistribution and use in source and binary forms, with or
8 * without modification, are permitted provided that the following
11 * 1. Redistributions of source code must retain the above
12 * copyright notice, this list of conditions and the following
15 * 2. Redistributions in binary form must reproduce the above
16 * copyright notice, this list of conditions and the following
17 * disclaimer in the documentation and/or other materials
18 * provided with the distribution.
21 * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
22 * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
23 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
24 * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
25 * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
28 * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
29 * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
30 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
31 * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
32 * POSSIBILITY OF SUCH DAMAGE.
34 * The views and conclusions contained in the software and
35 * documentation are those of the authors and should not be
36 * interpreted as representing official policies, either expressed
37 * or implied, of GRNET S.A.
39 * -----------------------------------------------------------------------
44 using System.Collections.Generic;
45 using System.ComponentModel.Composition;
46 using System.Diagnostics;
47 using System.Diagnostics.Contracts;
51 using System.Reflection;
52 using System.Threading;
53 using System.Threading.Tasks;
54 using Castle.ActiveRecord;
55 using Pithos.Interfaces;
59 namespace Pithos.Core.Agents
62 public class NetworkAgent
64 private static readonly ILog Log = LogManager.GetLogger(MethodBase.GetCurrentMethod().DeclaringType);
66 private Agent<CloudAction> _agent;
68 [System.ComponentModel.Composition.Import]
69 private DeleteAgent DeleteAgent { get; set; }
71 [System.ComponentModel.Composition.Import]
72 public IStatusKeeper StatusKeeper { get; set; }
74 private IStatusNotification _statusNotification;
75 public IStatusNotification StatusNotification
77 get { return _statusNotification; }
80 _statusNotification = value;
81 DeleteAgent.StatusNotification = value;
82 Uploader.StatusNotification = value;
83 Downloader.StatusNotification = value;
88 [System.ComponentModel.Composition.Import]
89 public IPithosSettings Settings { get; set; }
91 private Uploader _uploader;
93 [System.ComponentModel.Composition.Import]
94 public Uploader Uploader
96 get { return _uploader; }
100 _uploader.UnpauseEvent = _unPauseEvent;
104 private Downloader _downloader;
106 [System.ComponentModel.Composition.Import]
107 public Downloader Downloader
109 get { return _downloader; }
113 _downloader.UnpauseEvent = _unPauseEvent;
117 [System.ComponentModel.Composition.Import]
118 public Selectives Selectives { get; set; }
120 //The Proceed signals the poll agent that it can proceed with polling.
121 //Essentially it stops the poll agent to give priority to the network agent
122 //Initially the event is signalled because we don't need to pause
123 private readonly AsyncManualResetEvent _proceedEvent = new AsyncManualResetEvent(true);
124 private Agents.Selectives _selectives;
127 public AsyncManualResetEvent ProceedEvent
129 get { return _proceedEvent; }
132 private readonly AsyncManualResetEvent _unPauseEvent = new AsyncManualResetEvent(true);
134 private CancellationTokenSource _currentOperationCancellation=new CancellationTokenSource();
136 public void CancelCurrentOperation()
138 //What does it mean to cancel the current upload/download?
139 //Obviously, the current operation will be cancelled by throwing
140 //a cancellation exception.
142 //The default behavior is to retry any operations that throw.
143 //Obviously this is not what we want in this situation.
144 //The cancelled operation should NOT bea retried.
146 //This can be done by catching the cancellation exception
147 //and avoiding the retry.
150 //Have to reset the cancellation source - it is not possible to reset the source
151 //Have to prevent a case where an operation requests a token from the old source
152 var oldSource = Interlocked.Exchange(ref _currentOperationCancellation, new CancellationTokenSource());
162 if (Log.IsDebugEnabled)
163 Log.Debug("Starting Network Agent");
165 _agent = Agent<CloudAction>.Start(inbox =>
170 DeleteAgent.ProceedEvent.Wait();
171 _unPauseEvent.Wait();
172 var message = inbox.Receive();
173 var process=message.Then(Process,inbox.CancellationToken);
174 inbox.LoopAsync(process, loop);
181 private async Task Process(CloudAction action)
184 throw new ArgumentNullException("action");
185 if (action.AccountInfo==null)
186 throw new ArgumentException("The action.AccountInfo is empty","action");
187 Contract.EndContractBlock();
192 using (ThreadContext.Stacks["Operation"].Push(action.ToString()))
195 var cloudFile = action.CloudFile;
196 var downloadPath = action.GetDownloadPath();
200 StatusNotification.SetPithosStatus(PithosStatus.LocalSyncing,"Processing");
201 _proceedEvent.Reset();
203 var accountInfo = action.AccountInfo;
205 if (action.Action == CloudActionType.DeleteCloud)
207 //Redirect deletes to the delete agent
208 DeleteAgent.Post((CloudDeleteAction)action);
210 if (DeleteAgent.IsDeletedFile(action))
212 //Clear the status of already deleted files to avoid reprocessing
213 if (action.LocalFile != null)
214 StatusKeeper.ClearFileStatus(action.LocalFile.FullName);
218 switch (action.Action)
220 case CloudActionType.UploadUnconditional:
221 //Abort if the file was deleted before we reached this point
222 var uploadAction = (CloudUploadAction) action;
223 ProcessChildUploads(uploadAction);
224 await Uploader.UploadCloudFile(uploadAction ,CurrentOperationCancelToken);
226 case CloudActionType.DownloadUnconditional:
227 await Downloader.DownloadCloudFile(accountInfo, cloudFile, downloadPath, CurrentOperationCancelToken);
229 case CloudActionType.RenameCloud:
230 var moveAction = (CloudMoveAction)action;
231 RenameCloudFile(accountInfo, moveAction);
233 case CloudActionType.RenameLocal:
234 RenameLocalFile(accountInfo, action);
236 case CloudActionType.MustSynch:
237 if (!File.Exists(downloadPath) && !Directory.Exists(downloadPath))
239 await Downloader.DownloadCloudFile(accountInfo, cloudFile, downloadPath, CurrentOperationCancelToken);
243 await SyncFiles(accountInfo, action);
248 Log.InfoFormat("End Processing {0}:{1}->{2}", action.Action, action.LocalFile,
249 action.CloudFile.Name);
252 catch (WebException exc)
254 Log.ErrorFormat("[WEB ERROR] {0} : {1} -> {2} due to exception\r\n{3}", action.Action, action.LocalFile, action.CloudFile, exc);
257 //Actions that resulted in server errors should be retried
258 var response = exc.Response as HttpWebResponse;
259 if (response != null && response.StatusCode >= HttpStatusCode.InternalServerError)
262 Log.WarnFormat("[REQUEUE] {0} : {1} -> {2}", action.Action, action.LocalFile, action.CloudFile);
266 catch (OperationCanceledException ex)
268 Log.WarnFormat("Cancelling [{0}]",ex);
270 catch (DirectoryNotFoundException)
272 Log.ErrorFormat("{0} : {1} -> {2} failed because the directory was not found.\n Rescheduling a delete",
273 action.Action, action.LocalFile, action.CloudFile);
274 //Post a delete action for the missing file
275 Post(new CloudDeleteAction(action));
277 catch (FileNotFoundException)
279 Log.ErrorFormat("{0} : {1} -> {2} failed because the file was not found.\n Rescheduling a delete",
280 action.Action, action.LocalFile, action.CloudFile);
281 //Post a delete action for the missing file
282 Post(new CloudDeleteAction(action));
284 catch (Exception exc)
286 Log.ErrorFormat("[REQUEUE] {0} : {1} -> {2} due to exception\r\n{3}",
287 action.Action, action.LocalFile, action.CloudFile, exc);
295 UpdateStatus(PithosStatus.LocalComplete);
300 private void ProcessChildUploads(CloudUploadAction uploadAction)
302 if (!uploadAction.IsCreation || !(uploadAction.LocalFile is DirectoryInfo))
305 var dirInfo = uploadAction.LocalFile as DirectoryInfo;
307 var account = uploadAction.AccountInfo;
308 var folderActions = from info in dirInfo.EnumerateDirectories("*", SearchOption.AllDirectories)
310 new CloudUploadAction(account, info, null, account.BlockSize, account.BlockHash,
312 var fileActions = from info in dirInfo.EnumerateFiles("*", SearchOption.AllDirectories)
314 new CloudUploadAction(account, info, null, account.BlockSize, account.BlockHash,
316 //Post folder actions first, to ensure the selective folders are updated
317 folderActions.ApplyAction(PostUploadAction);
318 fileActions.ApplyAction(PostUploadAction);
321 private void PostUploadAction(CloudUploadAction action)
323 var state = StatusKeeper.GetStateByFilePath(action.LocalFile.FullName);
326 //StatusKeeper.SetFileState(action.LocalFile.FullName,FileStatus.Created,FileOverlayStatus.Normal,String.Empty);
327 state = FileState.CreateFor(action.LocalFile);
328 //StatusKeeper.SetFileStatus();
329 state.FileStatus = FileStatus.Created;
330 state.OverlayStatus = FileOverlayStatus.Normal;
332 action.FileState = state;
336 private CancellationToken CurrentOperationCancelToken
338 get { return _currentOperationCancellation.Token; }
342 private void UpdateStatus(PithosStatus status)
344 StatusNotification.SetPithosStatus(status);
345 //StatusNotification.Notify(new Notification());
348 private void RenameLocalFile(AccountInfo accountInfo, CloudAction action)
350 if (accountInfo == null)
351 throw new ArgumentNullException("accountInfo");
353 throw new ArgumentNullException("action");
354 if (action.LocalFile == null)
355 throw new ArgumentException("The action's local file is not specified", "action");
356 if (!Path.IsPathRooted(action.LocalFile.FullName))
357 throw new ArgumentException("The action's local file path must be absolute", "action");
358 if (action.CloudFile == null)
359 throw new ArgumentException("The action's cloud file is not specified", "action");
360 Contract.EndContractBlock();
361 using (ThreadContext.Stacks["Operation"].Push("RenameLocalFile"))
364 //We assume that the local file already exists, otherwise the poll agent
365 //would have issued a download request
367 var currentInfo = action.CloudFile;
368 var previousInfo = action.CloudFile.Previous;
369 var fileAgent = FileAgent.GetFileAgent(accountInfo);
371 var previousRelativepath = previousInfo.RelativeUrlToFilePath(accountInfo.UserName);
372 var previousFile = fileAgent.GetFileSystemInfo(previousRelativepath);
374 //In every case we need to move the local file first
375 MoveLocalFile(accountInfo, previousFile, fileAgent, currentInfo);
379 private void MoveLocalFile(AccountInfo accountInfo, FileSystemInfo previousFile, FileAgent fileAgent,
380 ObjectInfo currentInfo)
382 var currentRelativepath = currentInfo.RelativeUrlToFilePath(accountInfo.UserName);
383 var newPath = Path.Combine(fileAgent.RootPath, currentRelativepath);
385 var isFile= (previousFile is FileInfo);
386 var previousFullPath = isFile?
387 FileInfoExtensions.GetProperFilePathCapitalization(previousFile.FullName):
388 FileInfoExtensions.GetProperDirectoryCapitalization(previousFile.FullName);
390 using (NetworkGate.Acquire(previousFullPath, NetworkOperation.Renaming))
391 using (NetworkGate.Acquire(newPath,NetworkOperation.Renaming))
392 using (new SessionScope(FlushAction.Auto))
395 (previousFile as FileInfo).MoveTo(newPath);
398 (previousFile as DirectoryInfo).MoveTo(newPath);
400 var state = StatusKeeper.GetStateByFilePath(previousFullPath);
401 state.FilePath = newPath;
403 StatusKeeper.SetFileState(previousFullPath,FileStatus.Deleted,FileOverlayStatus.Deleted, "Deleted");
407 private async Task SyncFiles(AccountInfo accountInfo,CloudAction action)
409 if (accountInfo == null)
410 throw new ArgumentNullException("accountInfo");
412 throw new ArgumentNullException("action");
413 if (action.LocalFile==null)
414 throw new ArgumentException("The action's local file is not specified","action");
415 if (!Path.IsPathRooted(action.LocalFile.FullName))
416 throw new ArgumentException("The action's local file path must be absolute","action");
417 if (action.CloudFile== null)
418 throw new ArgumentException("The action's cloud file is not specified", "action");
419 Contract.EndContractBlock();
420 using (ThreadContext.Stacks["Operation"].Push("SyncFiles"))
423 //var localFile = action.LocalFile;
424 var cloudFile = action.CloudFile;
425 var downloadPath = action.LocalFile.GetProperCapitalization();
427 var cloudHash = cloudFile.Hash.ToLower();
428 var previousCloudHash = cloudFile.PreviousHash == null?null: cloudFile.PreviousHash.ToLower();
429 var localHash = action.TreeHash.Value.TopHash.ToHashString();// LocalHash.Value.ToLower();
430 //var topHash = action.TopHash.Value.ToLower();
432 if(cloudFile.IsDirectory && action.LocalFile is DirectoryInfo)
434 Log.InfoFormat("Skipping folder {0} , exists in server", downloadPath);
438 //At this point we know that an object has changed on the server and that a local
439 //file already exists. We need to decide whether the file has only changed on
440 //the server or there is a conflicting change on the client.
443 //If the hashes match, we are done
444 if (cloudFile != ObjectInfo.Empty && cloudHash == localHash)
446 Log.InfoFormat("Skipping {0}, hashes match", downloadPath);
450 //If the local and remote files have 0 length their hashes will not match
451 if (!cloudFile.IsDirectory && cloudFile.Bytes==0 && action.LocalFile is FileInfo && (action.LocalFile as FileInfo).Length==0 )
453 Log.InfoFormat("Skipping {0}, files are empty", downloadPath);
457 //The hashes DON'T match. We need to sync
459 // If the previous tophash matches the local tophash, the file was only changed on the server.
460 if (localHash == previousCloudHash)
462 await Downloader.DownloadCloudFile(accountInfo, cloudFile, downloadPath, CurrentOperationCancelToken);
466 //If the previous and local hash don't match, there was a local conflict
467 //that was not uploaded to the server. We have a conflict
468 ReportConflictForMismatch(downloadPath);
473 private void ReportConflictForMismatch(string downloadPath)
475 if (String.IsNullOrWhiteSpace(downloadPath))
476 throw new ArgumentNullException("downloadPath");
477 Contract.EndContractBlock();
479 StatusKeeper.SetFileState(downloadPath,FileStatus.Conflict, FileOverlayStatus.Conflict,"File changed at the server");
480 UpdateStatus(PithosStatus.HasConflicts);
481 var message = String.Format("Conflict detected for file {0}", downloadPath);
483 StatusNotification.NotifyChange(message, TraceLevel.Warning);
486 public void Post(CloudAction cloudAction)
488 if (cloudAction == null)
489 throw new ArgumentNullException("cloudAction");
490 if (cloudAction.AccountInfo==null)
491 throw new ArgumentException("The CloudAction.AccountInfo is empty","cloudAction");
492 Contract.EndContractBlock();
494 DeleteAgent.ProceedEvent.Wait();
496 if (cloudAction is CloudDeleteAction)
497 DeleteAgent.Post((CloudDeleteAction)cloudAction);
499 _agent.Post(cloudAction);
503 public IEnumerable<CloudAction> GetEnumerable()
505 return _agent.GetEnumerable();
508 public Task GetDeleteAwaiter()
510 return DeleteAgent.ProceedEvent.WaitAsync();
512 public CancellationToken CancellationToken
514 get { return _agent.CancellationToken; }
525 _unPauseEvent.Reset();
534 private void RenameCloudFile(AccountInfo accountInfo,CloudMoveAction action)
536 if (accountInfo==null)
537 throw new ArgumentNullException("accountInfo");
539 throw new ArgumentNullException("action");
540 if (action.CloudFile==null)
541 throw new ArgumentException("CloudFile","action");
542 if (action.LocalFile==null)
543 throw new ArgumentException("LocalFile","action");
544 if (action.OldLocalFile==null)
545 throw new ArgumentException("OldLocalFile","action");
546 if (action.OldCloudFile==null)
547 throw new ArgumentException("OldCloudFile","action");
548 Contract.EndContractBlock();
550 using (ThreadContext.Stacks["Operation"].Push("RenameCloudFile"))
553 var newFilePath = action.LocalFile.FullName;
555 //How do we handle concurrent renames and deletes/uploads/downloads?
556 //* A conflicting upload means that a file was renamed before it had a chance to finish uploading
557 // This should never happen as the network agent executes only one action at a time
558 //* A conflicting download means that the file was modified on the cloud. While we can go on and complete
559 // the rename, there may be a problem if the file is downloaded in blocks, as subsequent block requests for the
560 // same name will fail.
561 // This should never happen as the network agent executes only one action at a time.
562 //* A conflicting delete can happen if the rename was followed by a delete action that didn't have the chance
563 // to remove the rename from the queue.
564 // We can probably ignore this case. It will result in an error which should be ignored
567 //The local file is already renamed
568 StatusKeeper.SetFileOverlayStatus(newFilePath, FileOverlayStatus.Modified).Wait();
571 var account = action.CloudFile.Account ?? accountInfo.UserName;
572 var container = action.CloudFile.Container;
574 var client = new CloudFilesClient(accountInfo);
575 //TODO: What code is returned when the source file doesn't exist?
576 client.MoveObject(account, container, action.OldCloudFile.Name, container, action.CloudFile.Name);
578 StatusKeeper.SetFileStatus(newFilePath, FileStatus.Unchanged);
579 StatusKeeper.SetFileOverlayStatus(newFilePath, FileOverlayStatus.Normal).Wait();
580 NativeMethods.RaiseChangeNotification(newFilePath);