-using System;
+// -----------------------------------------------------------------------
+// <copyright file="NetworkAgent.cs" company="GRNET">
+// Copyright 2011 GRNET S.A. All rights reserved.
+//
+// Redistribution and use in source and binary forms, with or
+// without modification, are permitted provided that the following
+// conditions are met:
+//
+// 1. Redistributions of source code must retain the above
+// copyright notice, this list of conditions and the following
+// disclaimer.
+//
+// 2. Redistributions in binary form must reproduce the above
+// copyright notice, this list of conditions and the following
+// disclaimer in the documentation and/or other materials
+// provided with the distribution.
+//
+// THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
+// OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+// WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
+// PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
+// CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
+// USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
+// AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
+// LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
+// ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+// POSSIBILITY OF SUCH DAMAGE.
+//
+// The views and conclusions contained in the software and
+// documentation are those of the authors and should not be
+// interpreted as representing official policies, either expressed
+// or implied, of GRNET S.A.
+// </copyright>
+// -----------------------------------------------------------------------
+
+using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.ComponentModel.Composition;
using System.IO;
using System.Linq;
using System.Net;
+using System.Threading;
using System.Threading.Tasks;
+using System.Threading.Tasks.Dataflow;
+using Castle.ActiveRecord;
using Pithos.Interfaces;
using Pithos.Network;
using log4net;
namespace Pithos.Core.Agents
{
+ //TODO: Ensure all network operations use exact casing. Pithos is case sensitive
[Export]
public class NetworkAgent
{
private Agent<CloudAction> _agent;
+ //A separate agent is used to execute delete actions immediatelly;
+ private ActionBlock<CloudDeleteAction> _deleteAgent;
+ readonly ConcurrentDictionary<string,DateTime> _deletedFiles=new ConcurrentDictionary<string, DateTime>();
- [Import]
+
+ private readonly ManualResetEventSlim _pauseAgent = new ManualResetEventSlim(true);
+
+ [System.ComponentModel.Composition.Import]
public IStatusKeeper StatusKeeper { get; set; }
public IStatusNotification StatusNotification { get; set; }
Action loop = null;
loop = () =>
{
+ _pauseAgent.Wait();
var message = inbox.Receive();
var process=message.Then(Process,inbox.CancellationToken);
inbox.LoopAsync(process, loop);
};
loop();
- });
+ });
+
+ _deleteAgent = new ActionBlock<CloudDeleteAction>(message =>ProcessDelete(message),new ExecutionDataflowBlockOptions{MaxDegreeOfParallelism=4});
+ /*
+ Action loop = null;
+ loop = () =>
+ {
+ var message = inbox.Receive();
+ var process = message.Then(ProcessDelete,inbox.CancellationToken);
+ inbox.LoopAsync(process, loop);
+ };
+ loop();
+*/
+
}
private async Task Process(CloudAction action)
try
{
- switch (action.Action)
+ if (action.Action == CloudActionType.DeleteCloud)
{
- case CloudActionType.UploadUnconditional:
- await UploadCloudFile(action);
- break;
- case CloudActionType.DownloadUnconditional:
-
- await DownloadCloudFile(accountInfo, cloudFile, downloadPath);
- break;
- case CloudActionType.DeleteCloud:
- DeleteCloudFile(accountInfo, cloudFile);
- break;
- case CloudActionType.RenameCloud:
- var moveAction = (CloudMoveAction)action;
- RenameCloudFile(accountInfo, moveAction);
- break;
- case CloudActionType.MustSynch:
- if (!File.Exists(downloadPath) && !Directory.Exists(downloadPath))
- {
+ //Redirect deletes to the delete agent
+ _deleteAgent.Post((CloudDeleteAction)action);
+ }
+ if (IsDeletedFile(action))
+ {
+ //Clear the status of already deleted files to avoid reprocessing
+ if (action.LocalFile != null)
+ this.StatusKeeper.ClearFileStatus(action.LocalFile.FullName);
+ }
+ else
+ {
+ switch (action.Action)
+ {
+ case CloudActionType.UploadUnconditional:
+ //Abort if the file was deleted before we reached this point
+ await UploadCloudFile(action);
+ break;
+ case CloudActionType.DownloadUnconditional:
await DownloadCloudFile(accountInfo, cloudFile, downloadPath);
- }
- else
- {
- await SyncFiles(accountInfo, action);
- }
- break;
+ break;
+ case CloudActionType.RenameCloud:
+ var moveAction = (CloudMoveAction) action;
+ RenameCloudFile(accountInfo, moveAction);
+ break;
+ case CloudActionType.MustSynch:
+ if (!File.Exists(downloadPath) && !Directory.Exists(downloadPath))
+ {
+ await DownloadCloudFile(accountInfo, cloudFile, downloadPath);
+ }
+ else
+ {
+ await SyncFiles(accountInfo, action);
+ }
+ break;
+ }
}
Log.InfoFormat("[ACTION] End Processing {0}:{1}->{2}", action.Action, action.LocalFile,
action.CloudFile.Name);
}
}
+ /// <summary>
+ /// Processes cloud delete actions
+ /// </summary>
+ /// <param name="action">The delete action to execute</param>
+ /// <returns></returns>
+ /// <remarks>
+ /// When a file/folder is deleted locally, we must delete it ASAP from the server and block any download
+ /// operations that may be in progress.
+ /// <para>
+ /// A separate agent is used to process deletes because the main agent may be busy with a long operation.
+ /// </para>
+ /// </remarks>
+ private async Task ProcessDelete(CloudDeleteAction action)
+ {
+ if (action == null)
+ throw new ArgumentNullException("action");
+ if (action.AccountInfo==null)
+ throw new ArgumentException("The action.AccountInfo is empty","action");
+ Contract.EndContractBlock();
+
+ var accountInfo = action.AccountInfo;
+
+ using (log4net.ThreadContext.Stacks["NETWORK"].Push("PROCESS"))
+ {
+ Log.InfoFormat("[ACTION] Start Processing {0}", action);
+
+ var cloudFile = action.CloudFile;
+
+ try
+ {
+ //Acquire a lock on the deleted file to prevent uploading/downloading operations from the normal
+ //agent
+ using (var gate = NetworkGate.Acquire(action.LocalFile.FullName, NetworkOperation.Deleting))
+ {
+
+ //Add the file URL to the deleted files list
+ var key = GetFileKey(action.CloudFile);
+ _deletedFiles[key] = DateTime.Now;
+
+ _pauseAgent.Reset();
+ // and then delete the file from the server
+ DeleteCloudFile(accountInfo, cloudFile);
+
+ Log.InfoFormat("[ACTION] End Delete {0}:{1}->{2}", action.Action, action.LocalFile,
+ action.CloudFile.Name);
+ }
+ }
+ catch (WebException exc)
+ {
+ Log.ErrorFormat("[WEB ERROR] {0} : {1} -> {2} due to exception\r\n{3}", action.Action, action.LocalFile, action.CloudFile, exc);
+ }
+ catch (OperationCanceledException)
+ {
+ throw;
+ }
+ catch (DirectoryNotFoundException)
+ {
+ Log.ErrorFormat("{0} : {1} -> {2} failed because the directory was not found.\n Rescheduling a delete",
+ action.Action, action.LocalFile, action.CloudFile);
+ //Repost a delete action for the missing file
+ _deleteAgent.Post(action);
+ }
+ catch (FileNotFoundException)
+ {
+ Log.ErrorFormat("{0} : {1} -> {2} failed because the file was not found.\n Rescheduling a delete",
+ action.Action, action.LocalFile, action.CloudFile);
+ //Post a delete action for the missing file
+ _deleteAgent.Post(action);
+ }
+ catch (Exception exc)
+ {
+ Log.ErrorFormat("[REQUEUE] {0} : {1} -> {2} due to exception\r\n{3}",
+ action.Action, action.LocalFile, action.CloudFile, exc);
+
+ _deleteAgent.Post(action);
+ }
+ finally
+ {
+ //Set the event when all delete actions are processed
+ if (_deleteAgent.InputCount == 0)
+ _pauseAgent.Set();
+
+ }
+ }
+ }
+
+ private static string GetFileKey(ObjectInfo info)
+ {
+ var key = String.Format("{0}/{1}/{2}", info.Account, info.Container,info.Name);
+ return key;
+ }
+
private async Task SyncFiles(AccountInfo accountInfo,CloudAction action)
{
if (accountInfo == null)
var localFile = action.LocalFile;
var cloudFile = action.CloudFile;
- var downloadPath=action.LocalFile.FullName.ToLower();
+ var downloadPath=action.LocalFile.GetProperCapitalization();
var cloudHash = cloudFile.Hash.ToLower();
var localHash = action.LocalHash.Value.ToLower();
if (cloudAction.AccountInfo==null)
throw new ArgumentException("The CloudAction.AccountInfo is empty","cloudAction");
Contract.EndContractBlock();
-
+
//If the action targets a local file, add a treehash calculation
- if (cloudAction.LocalFile as FileInfo != null)
+ if (!(cloudAction is CloudDeleteAction) && cloudAction.LocalFile as FileInfo != null)
{
var accountInfo = cloudAction.AccountInfo;
var localFile = (FileInfo) cloudAction.LocalFile;
//The hash for a directory is the empty string
cloudAction.TopHash = new Lazy<string>(() => String.Empty);
}
- _agent.Post(cloudAction);
+
+ if (cloudAction is CloudDeleteAction)
+ _deleteAgent.Post((CloudDeleteAction)cloudAction);
+ else
+ _agent.Post(cloudAction);
}
/* class ObjectInfoByNameComparer:IEqualityComparer<ObjectInfo>
using (log4net.ThreadContext.Stacks["Retrieve Remote"].Push(accountInfo.UserName))
{
+ _pauseAgent.Wait();
+
Log.Info("Scheduled");
var client=new CloudFilesClient(accountInfo);
var containers = client.ListContainers(accountInfo.UserName);
-
+
+
CreateContainerFolders(accountInfo, containers);
try
if (fileAgent.Exists(relativePath))
{
//If a directory object already exists, we don't need to perform any other action
- var localFile = fileAgent.GetFileInfo(relativePath);
+ var localFile = fileAgent.GetFileSystemInfo(relativePath);
if (objectInfo.Content_Type == @"application/directory" && localFile is DirectoryInfo)
continue;
- var state = FileState.FindByFilePath(localFile.FullName);
- //Common files should be checked on a per-case basis to detect differences, which is newer
+ using (new SessionScope(FlushAction.Never))
+ {
+ var state = StatusKeeper.GetStateByFilePath(localFile.FullName);
+ //FileState.FindByFilePath(localFile.FullName);
+ //Common files should be checked on a per-case basis to detect differences, which is newer
- yield return new CloudAction(accountInfo,CloudActionType.MustSynch,
- localFile, objectInfo, state, accountInfo.BlockSize,
- accountInfo.BlockHash);
+ yield return new CloudAction(accountInfo, CloudActionType.MustSynch,
+ localFile, objectInfo, state, accountInfo.BlockSize,
+ accountInfo.BlockHash);
+ }
}
else
{
throw new ArgumentException("OldCloudFile","action");
Contract.EndContractBlock();
- var newFilePath = action.LocalFile.FullName;
+
+ var newFilePath = action.LocalFile.FullName;
+
+ //How do we handle concurrent renames and deletes/uploads/downloads?
+ //* A conflicting upload means that a file was renamed before it had a chance to finish uploading
+ // This should never happen as the network agent executes only one action at a time
+ //* A conflicting download means that the file was modified on the cloud. While we can go on and complete
+ // the rename, there may be a problem if the file is downloaded in blocks, as subsequent block requests for the
+ // same name will fail.
+ // This should never happen as the network agent executes only one action at a time.
+ //* A conflicting delete can happen if the rename was followed by a delete action that didn't have the chance
+ // to remove the rename from the queue.
+ // We can probably ignore this case. It will result in an error which should be ignored
+
+
//The local file is already renamed
StatusKeeper.SetFileOverlayStatus(newFilePath, FileOverlayStatus.Modified);
var container = action.CloudFile.Container;
var client = new CloudFilesClient(accountInfo);
+ //TODO: What code is returned when the source file doesn't exist?
client.MoveObject(account, container, action.OldCloudFile.Name, container, action.CloudFile.Name);
StatusKeeper.SetFileStatus(newFilePath, FileStatus.Unchanged);
using ( log4net.ThreadContext.Stacks["DeleteCloudFile"].Push("Delete"))
{
var fileName= cloudFile.RelativeUrlToFilePath(accountInfo.UserName);
- var info = fileAgent.GetFileInfo(fileName);
+ var info = fileAgent.GetFileSystemInfo(fileName);
var fullPath = info.FullName.ToLower();
StatusKeeper.SetFileOverlayStatus(fullPath, FileOverlayStatus.Modified);
}
//Download a file.
- private async Task DownloadCloudFile(AccountInfo accountInfo, ObjectInfo cloudFile , string localPath)
+ private async Task DownloadCloudFile(AccountInfo accountInfo, ObjectInfo cloudFile , string filePath)
{
if (accountInfo == null)
throw new ArgumentNullException("accountInfo");
throw new ArgumentNullException("cloudFile");
if (String.IsNullOrWhiteSpace(cloudFile.Container))
throw new ArgumentNullException("cloudFile");
- if (String.IsNullOrWhiteSpace(localPath))
- throw new ArgumentNullException("localPath");
- if (!Path.IsPathRooted(localPath))
- throw new ArgumentException("The localPath must be rooted", "localPath");
+ if (String.IsNullOrWhiteSpace(filePath))
+ throw new ArgumentNullException("filePath");
+ if (!Path.IsPathRooted(filePath))
+ throw new ArgumentException("The filePath must be rooted", "filePath");
Contract.EndContractBlock();
-
+
+ var localPath = Interfaces.FileInfoExtensions.GetProperFilePathCapitalization(filePath);
var relativeUrl = new Uri(cloudFile.Name, UriKind.Relative);
var url = relativeUrl.ToString();
if (cloudFile.Name.EndsWith(".ignore", StringComparison.InvariantCultureIgnoreCase))
return;
+
//Are we already downloading or uploading the file?
using (var gate=NetworkGate.Acquire(localPath, NetworkOperation.Downloading))
{
}
//Download a small file with a single GET operation
- private async Task DownloadEntireFileAsync(AccountInfo accountInfo, CloudFilesClient client, ObjectInfo cloudFile, Uri relativeUrl, string localPath,TreeHash serverHash)
+ private async Task DownloadEntireFileAsync(AccountInfo accountInfo, CloudFilesClient client, ObjectInfo cloudFile, Uri relativeUrl, string filePath,TreeHash serverHash)
{
if (client == null)
throw new ArgumentNullException("client");
throw new ArgumentNullException("cloudFile");
if (relativeUrl == null)
throw new ArgumentNullException("relativeUrl");
- if (String.IsNullOrWhiteSpace(localPath))
- throw new ArgumentNullException("localPath");
- if (!Path.IsPathRooted(localPath))
- throw new ArgumentException("The localPath must be rooted", "localPath");
+ if (String.IsNullOrWhiteSpace(filePath))
+ throw new ArgumentNullException("filePath");
+ if (!Path.IsPathRooted(filePath))
+ throw new ArgumentException("The localPath must be rooted", "filePath");
Contract.EndContractBlock();
+ var localPath = Pithos.Interfaces.FileInfoExtensions.GetProperFilePathCapitalization(filePath);
//If the file already exists
if (File.Exists(localPath))
{
Directory.CreateDirectory(tempFolder);
//Download the object to the temporary location
- await client.GetObject(cloudFile.Account, cloudFile.Container, relativeUrl.ToString(), tempPath).ContinueWith(t =>
- {
- t.PropagateExceptions();
- //Create the local folder if it doesn't exist (necessary for shared objects)
- var localFolder = Path.GetDirectoryName(localPath);
- if (!Directory.Exists(localFolder))
- Directory.CreateDirectory(localFolder);
- //And move it to its actual location once downloading is finished
- if (File.Exists(localPath))
- File.Replace(tempPath,localPath,null,true);
- else
- File.Move(tempPath,localPath);
- //Notify listeners that a local file has changed
- StatusNotification.NotifyChangedFile(localPath);
+ await client.GetObject(cloudFile.Account, cloudFile.Container, relativeUrl.ToString(), tempPath);
- });
+ //Create the local folder if it doesn't exist (necessary for shared objects)
+ var localFolder = Path.GetDirectoryName(localPath);
+ if (!Directory.Exists(localFolder))
+ Directory.CreateDirectory(localFolder);
+ //And move it to its actual location once downloading is finished
+ if (File.Exists(localPath))
+ File.Replace(tempPath,localPath,null,true);
+ else
+ File.Move(tempPath,localPath);
+ //Notify listeners that a local file has changed
+ StatusNotification.NotifyChangedFile(localPath);
+
+
}
//Download a file asynchronously using blocks
- public async Task DownloadWithBlocks(AccountInfo accountInfo, CloudFilesClient client, ObjectInfo cloudFile, Uri relativeUrl, string localPath, TreeHash serverHash)
+ public async Task DownloadWithBlocks(AccountInfo accountInfo, CloudFilesClient client, ObjectInfo cloudFile, Uri relativeUrl, string filePath, TreeHash serverHash)
{
if (client == null)
throw new ArgumentNullException("client");
throw new ArgumentNullException("cloudFile");
if (relativeUrl == null)
throw new ArgumentNullException("relativeUrl");
- if (String.IsNullOrWhiteSpace(localPath))
- throw new ArgumentNullException("localPath");
- if (!Path.IsPathRooted(localPath))
- throw new ArgumentException("The localPath must be rooted", "localPath");
+ if (String.IsNullOrWhiteSpace(filePath))
+ throw new ArgumentNullException("filePath");
+ if (!Path.IsPathRooted(filePath))
+ throw new ArgumentException("The filePath must be rooted", "filePath");
if (serverHash == null)
throw new ArgumentNullException("serverHash");
Contract.EndContractBlock();
var fileAgent = GetFileAgent(accountInfo);
+ var localPath = Interfaces.FileInfoExtensions.GetProperFilePathCapitalization(filePath);
//Calculate the relative file path for the new file
var relativePath = relativeUrl.RelativeUriToFilePath();
if (fileInfo.Extension.Equals("ignore", StringComparison.InvariantCultureIgnoreCase))
return;
-
+
var relativePath = fileInfo.AsRelativeTo(accountInfo.AccountPath);
if (relativePath.StartsWith(FolderConstants.OthersFolder))
{
}
- var fullFileName = fileInfo.FullName;
+ var fullFileName = fileInfo.GetProperCapitalization();
using (var gate = NetworkGate.Acquire(fullFileName, NetworkOperation.Uploading))
{
//Abort if the file is already being uploaded or downloaded
//If this is a read-only file, do not upload changes
if (info.AllowedTo == "read")
return;
-
- //WRONG: If this is a directory, there is no hash to check. ????
- //TODO: Check how a directory hash is calculated
+
+ //TODO: Check how a directory hash is calculated -> All dirs seem to have the same hash
if (fileInfo is DirectoryInfo)
{
//If the directory doesn't exist the Hash property will be empty
if (String.IsNullOrWhiteSpace(info.Hash))
//Go on and create the directory
- client.PutObject(account, cloudFile.Container, cloudFile.Name, fileInfo.FullName, String.Empty, "application/directory");
+ await client.PutObject(account, cloudFile.Container, cloudFile.Name, fullFileName, String.Empty, "application/directory");
}
else
{
//the relevant block
//First, calculate the tree hash
- var treeHash = await Signature.CalculateTreeHashAsync(fileInfo.FullName, accountInfo.BlockSize,
+ var treeHash = await Signature.CalculateTreeHashAsync(fullFileName, accountInfo.BlockSize,
accountInfo.BlockHash);
await UploadWithHashMap(accountInfo, cloudFile, fileInfo as FileInfo, cloudFile.Name, treeHash);
}
+ private bool IsDeletedFile(CloudAction action)
+ {
+ var key = GetFileKey(action.CloudFile);
+ DateTime entryDate;
+ if (_deletedFiles.TryGetValue(key, out entryDate))
+ {
+ //If the delete entry was created after this action, abort the action
+ if (entryDate > action.Created)
+ return true;
+ //Otherwise, remove the stale entry
+ _deletedFiles.TryRemove(key, out entryDate);
+ }
+ return false;
+ }
+
private bool HandleUploadWebException(CloudAction action, WebException exc)
{
var response = exc.Response as HttpWebResponse;
throw new ArgumentException("Invalid container","cloudFile");
Contract.EndContractBlock();
- var fullFileName = fileInfo.FullName;
+ var fullFileName = fileInfo.GetProperCapitalization();
var account = cloudFile.Account ?? accountInfo.UserName;
var container = cloudFile.Container ;