#region
/* -----------------------------------------------------------------------
*
*
* Copyright 2011-2012 GRNET S.A. All rights reserved.
*
* Redistribution and use in source and binary forms, with or
* without modification, are permitted provided that the following
* conditions are met:
*
* 1. Redistributions of source code must retain the above
* copyright notice, this list of conditions and the following
* disclaimer.
*
* 2. Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following
* disclaimer in the documentation and/or other materials
* provided with the distribution.
*
*
* THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
* OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
* WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
* PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
* CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
* USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
* AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
* LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
* ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*
* The views and conclusions contained in the software and
* documentation are those of the authors and should not be
* interpreted as representing official policies, either expressed
* or implied, of GRNET S.A.
*
* -----------------------------------------------------------------------
*/
#endregion
using System;
using System.Collections.Generic;
using System.ComponentModel.Composition;
using System.Diagnostics;
using System.Diagnostics.Contracts;
using System.IO;
using System.Net;
using System.Reflection;
using System.Threading;
using System.Threading.Tasks;
using Castle.ActiveRecord;
using Pithos.Interfaces;
using Pithos.Network;
using log4net;
namespace Pithos.Core.Agents
{
[Export]
public class NetworkAgent
{
private static readonly ILog Log = LogManager.GetLogger(MethodBase.GetCurrentMethod().DeclaringType);
private Agent _agent;
[System.ComponentModel.Composition.Import]
private DeleteAgent DeleteAgent { get; set; }
[System.ComponentModel.Composition.Import]
public IStatusKeeper StatusKeeper { get; set; }
private IStatusNotification _statusNotification;
public IStatusNotification StatusNotification
{
get { return _statusNotification; }
set
{
_statusNotification = value;
DeleteAgent.StatusNotification = value;
Uploader.StatusNotification = value;
Downloader.StatusNotification = value;
}
}
[System.ComponentModel.Composition.Import]
public IPithosSettings Settings { get; set; }
[System.ComponentModel.Composition.Import]
public Uploader Uploader { get; set; }
[System.ComponentModel.Composition.Import]
public Downloader Downloader { get; set; }
//The Proceed signals the poll agent that it can proceed with polling.
//Essentially it stops the poll agent to give priority to the network agent
//Initially the event is signalled because we don't need to pause
private readonly AsyncManualResetEvent _proceedEvent = new AsyncManualResetEvent(true);
public AsyncManualResetEvent ProceedEvent
{
get { return _proceedEvent; }
}
public void Start()
{
if (_agent != null)
return;
if (Log.IsDebugEnabled)
Log.Debug("Starting Network Agent");
_agent = Agent.Start(inbox =>
{
Action loop = null;
loop = () =>
{
DeleteAgent.ProceedEvent.Wait();
var message = inbox.Receive();
var process=message.Then(Process,inbox.CancellationToken);
inbox.LoopAsync(process, loop);
};
loop();
});
}
private async Task Process(CloudAction action)
{
if (action == null)
throw new ArgumentNullException("action");
if (action.AccountInfo==null)
throw new ArgumentException("The action.AccountInfo is empty","action");
Contract.EndContractBlock();
using (ThreadContext.Stacks["Operation"].Push(action.ToString()))
{
var cloudFile = action.CloudFile;
var downloadPath = action.GetDownloadPath();
try
{
StatusNotification.SetPithosStatus(PithosStatus.LocalSyncing,"Processing");
_proceedEvent.Reset();
var accountInfo = action.AccountInfo;
if (action.Action == CloudActionType.DeleteCloud)
{
//Redirect deletes to the delete agent
DeleteAgent.Post((CloudDeleteAction)action);
}
if (DeleteAgent.IsDeletedFile(action))
{
//Clear the status of already deleted files to avoid reprocessing
if (action.LocalFile != null)
StatusKeeper.ClearFileStatus(action.LocalFile.FullName);
}
else
{
switch (action.Action)
{
case CloudActionType.UploadUnconditional:
//Abort if the file was deleted before we reached this point
await Uploader.UploadCloudFile(action);
break;
case CloudActionType.DownloadUnconditional:
await Downloader.DownloadCloudFile(accountInfo, cloudFile, downloadPath);
break;
case CloudActionType.RenameCloud:
var moveAction = (CloudMoveAction)action;
RenameCloudFile(accountInfo, moveAction);
break;
case CloudActionType.RenameLocal:
RenameLocalFile(accountInfo, action);
break;
case CloudActionType.MustSynch:
if (!File.Exists(downloadPath) && !Directory.Exists(downloadPath))
{
await Downloader.DownloadCloudFile(accountInfo, cloudFile, downloadPath);
}
else
{
await SyncFiles(accountInfo, action);
}
break;
}
}
Log.InfoFormat("End Processing {0}:{1}->{2}", action.Action, action.LocalFile,
action.CloudFile.Name);
}
/*
catch (WebException exc)
{
Log.ErrorFormat("[WEB ERROR] {0} : {1} -> {2} due to exception\r\n{3}", action.Action, action.LocalFile, action.CloudFile, exc);
//Actions that resulted in server errors should be retried
var response = exc.Response as HttpWebResponse;
if (response != null && response.StatusCode >= HttpStatusCode.InternalServerError)
{
_agent.Post(action);
Log.WarnFormat("[REQUEUE] {0} : {1} -> {2}", action.Action, action.LocalFile, action.CloudFile);
}
}
*/
catch (OperationCanceledException)
{
throw;
}
catch (DirectoryNotFoundException)
{
Log.ErrorFormat("{0} : {1} -> {2} failed because the directory was not found.\n Rescheduling a delete",
action.Action, action.LocalFile, action.CloudFile);
//Post a delete action for the missing file
Post(new CloudDeleteAction(action));
}
catch (FileNotFoundException)
{
Log.ErrorFormat("{0} : {1} -> {2} failed because the file was not found.\n Rescheduling a delete",
action.Action, action.LocalFile, action.CloudFile);
//Post a delete action for the missing file
Post(new CloudDeleteAction(action));
}
catch (Exception exc)
{
Log.ErrorFormat("[REQUEUE] {0} : {1} -> {2} due to exception\r\n{3}",
action.Action, action.LocalFile, action.CloudFile, exc);
_agent.Post(action);
}
finally
{
if (_agent.IsEmpty)
_proceedEvent.Set();
UpdateStatus(PithosStatus.LocalComplete);
}
}
}
private void UpdateStatus(PithosStatus status)
{
StatusNotification.SetPithosStatus(status);
//StatusNotification.Notify(new Notification());
}
private void RenameLocalFile(AccountInfo accountInfo, CloudAction action)
{
if (accountInfo == null)
throw new ArgumentNullException("accountInfo");
if (action == null)
throw new ArgumentNullException("action");
if (action.LocalFile == null)
throw new ArgumentException("The action's local file is not specified", "action");
if (!Path.IsPathRooted(action.LocalFile.FullName))
throw new ArgumentException("The action's local file path must be absolute", "action");
if (action.CloudFile == null)
throw new ArgumentException("The action's cloud file is not specified", "action");
Contract.EndContractBlock();
using (ThreadContext.Stacks["Operation"].Push("RenameLocalFile"))
{
//We assume that the local file already exists, otherwise the poll agent
//would have issued a download request
var currentInfo = action.CloudFile;
var previousInfo = action.CloudFile.Previous;
var fileAgent = FileAgent.GetFileAgent(accountInfo);
var previousRelativepath = previousInfo.RelativeUrlToFilePath(accountInfo.UserName);
var previousFile = fileAgent.GetFileSystemInfo(previousRelativepath);
//In every case we need to move the local file first
MoveLocalFile(accountInfo, previousFile, fileAgent, currentInfo);
}
}
private void MoveLocalFile(AccountInfo accountInfo, FileSystemInfo previousFile, FileAgent fileAgent,
ObjectInfo currentInfo)
{
var currentRelativepath = currentInfo.RelativeUrlToFilePath(accountInfo.UserName);
var newPath = Path.Combine(fileAgent.RootPath, currentRelativepath);
var isFile= (previousFile is FileInfo);
var previousFullPath = isFile?
FileInfoExtensions.GetProperFilePathCapitalization(previousFile.FullName):
FileInfoExtensions.GetProperDirectoryCapitalization(previousFile.FullName);
using (NetworkGate.Acquire(previousFullPath, NetworkOperation.Renaming))
using (NetworkGate.Acquire(newPath,NetworkOperation.Renaming))
using (new SessionScope(FlushAction.Auto))
{
if (isFile)
(previousFile as FileInfo).MoveTo(newPath);
else
{
(previousFile as DirectoryInfo).MoveTo(newPath);
}
var state = StatusKeeper.GetStateByFilePath(previousFullPath);
state.FilePath = newPath;
state.SaveCopy();
StatusKeeper.SetFileState(previousFullPath,FileStatus.Deleted,FileOverlayStatus.Deleted);
}
}
private async Task SyncFiles(AccountInfo accountInfo,CloudAction action)
{
if (accountInfo == null)
throw new ArgumentNullException("accountInfo");
if (action==null)
throw new ArgumentNullException("action");
if (action.LocalFile==null)
throw new ArgumentException("The action's local file is not specified","action");
if (!Path.IsPathRooted(action.LocalFile.FullName))
throw new ArgumentException("The action's local file path must be absolute","action");
if (action.CloudFile== null)
throw new ArgumentException("The action's cloud file is not specified", "action");
Contract.EndContractBlock();
using (ThreadContext.Stacks["Operation"].Push("SyncFiles"))
{
//var localFile = action.LocalFile;
var cloudFile = action.CloudFile;
var downloadPath = action.LocalFile.GetProperCapitalization();
var cloudHash = cloudFile.Hash.ToLower();
var previousCloudHash = cloudFile.PreviousHash.ToLower();
var localHash = action.TreeHash.Value.TopHash.ToHashString();// LocalHash.Value.ToLower();
//var topHash = action.TopHash.Value.ToLower();
//At this point we know that an object has changed on the server and that a local
//file already exists. We need to decide whether the file has only changed on
//the server or there is a conflicting change on the client.
//
//If the hashes match, we are done
if (cloudFile != ObjectInfo.Empty && cloudHash == localHash)
{
Log.InfoFormat("Skipping {0}, hashes match", downloadPath);
return;
}
//The hashes DON'T match. We need to sync
// If the previous tophash matches the local tophash, the file was only changed on the server.
if (localHash == previousCloudHash)
{
await Downloader.DownloadCloudFile(accountInfo, cloudFile, downloadPath);
}
else
{
//If the previous and local hash don't match, there was a local conflict
//that was not uploaded to the server. We have a conflict
ReportConflictForMismatch(downloadPath);
}
}
}
private void ReportConflictForMismatch(string downloadPath)
{
if (String.IsNullOrWhiteSpace(downloadPath))
throw new ArgumentNullException("downloadPath");
Contract.EndContractBlock();
StatusKeeper.SetFileOverlayStatus(downloadPath, FileOverlayStatus.Conflict);
UpdateStatus(PithosStatus.HasConflicts);
var message = String.Format("Conflict detected for file {0}", downloadPath);
Log.Warn(message);
StatusNotification.NotifyChange(message, TraceLevel.Warning);
}
public void Post(CloudAction cloudAction)
{
if (cloudAction == null)
throw new ArgumentNullException("cloudAction");
if (cloudAction.AccountInfo==null)
throw new ArgumentException("The CloudAction.AccountInfo is empty","cloudAction");
Contract.EndContractBlock();
DeleteAgent.ProceedEvent.Wait();
/*
//If the action targets a local file, add a treehash calculation
if (!(cloudAction is CloudDeleteAction) && cloudAction.LocalFile as FileInfo != null)
{
var accountInfo = cloudAction.AccountInfo;
var localFile = (FileInfo) cloudAction.LocalFile;
if (localFile.Length > accountInfo.BlockSize)
cloudAction.TopHash =
new Lazy(() => Signature.CalculateTreeHashAsync(localFile,
accountInfo.BlockSize,
accountInfo.BlockHash, Settings.HashingParallelism).Result
.TopHash.ToHashString());
else
{
cloudAction.TopHash = new Lazy(() => cloudAction.LocalHash.Value);
}
}
else
{
//The hash for a directory is the empty string
cloudAction.TopHash = new Lazy(() => String.Empty);
}
*/
if (cloudAction is CloudDeleteAction)
DeleteAgent.Post((CloudDeleteAction)cloudAction);
else
_agent.Post(cloudAction);
}
public IEnumerable GetEnumerable()
{
return _agent.GetEnumerable();
}
public Task GetDeleteAwaiter()
{
return DeleteAgent.ProceedEvent.WaitAsync();
}
public CancellationToken CancellationToken
{
get { return _agent.CancellationToken; }
}
private void RenameCloudFile(AccountInfo accountInfo,CloudMoveAction action)
{
if (accountInfo==null)
throw new ArgumentNullException("accountInfo");
if (action==null)
throw new ArgumentNullException("action");
if (action.CloudFile==null)
throw new ArgumentException("CloudFile","action");
if (action.LocalFile==null)
throw new ArgumentException("LocalFile","action");
if (action.OldLocalFile==null)
throw new ArgumentException("OldLocalFile","action");
if (action.OldCloudFile==null)
throw new ArgumentException("OldCloudFile","action");
Contract.EndContractBlock();
using (ThreadContext.Stacks["Operation"].Push("RenameCloudFile"))
{
var newFilePath = action.LocalFile.FullName;
//How do we handle concurrent renames and deletes/uploads/downloads?
//* A conflicting upload means that a file was renamed before it had a chance to finish uploading
// This should never happen as the network agent executes only one action at a time
//* A conflicting download means that the file was modified on the cloud. While we can go on and complete
// the rename, there may be a problem if the file is downloaded in blocks, as subsequent block requests for the
// same name will fail.
// This should never happen as the network agent executes only one action at a time.
//* A conflicting delete can happen if the rename was followed by a delete action that didn't have the chance
// to remove the rename from the queue.
// We can probably ignore this case. It will result in an error which should be ignored
//The local file is already renamed
StatusKeeper.SetFileOverlayStatus(newFilePath, FileOverlayStatus.Modified);
var account = action.CloudFile.Account ?? accountInfo.UserName;
var container = action.CloudFile.Container;
var client = new CloudFilesClient(accountInfo);
//TODO: What code is returned when the source file doesn't exist?
client.MoveObject(account, container, action.OldCloudFile.Name, container, action.CloudFile.Name);
StatusKeeper.SetFileStatus(newFilePath, FileStatus.Unchanged);
StatusKeeper.SetFileOverlayStatus(newFilePath, FileOverlayStatus.Normal);
NativeMethods.RaiseChangeNotification(newFilePath);
}
}
}
}