#region
/* -----------------------------------------------------------------------
*
*
* Copyright 2011-2012 GRNET S.A. All rights reserved.
*
* Redistribution and use in source and binary forms, with or
* without modification, are permitted provided that the following
* conditions are met:
*
* 1. Redistributions of source code must retain the above
* copyright notice, this list of conditions and the following
* disclaimer.
*
* 2. Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following
* disclaimer in the documentation and/or other materials
* provided with the distribution.
*
*
* THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
* OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
* WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
* PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
* CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
* USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
* AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
* LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
* ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*
* The views and conclusions contained in the software and
* documentation are those of the authors and should not be
* interpreted as representing official policies, either expressed
* or implied, of GRNET S.A.
*
* -----------------------------------------------------------------------
*/
#endregion
using System.Collections.Concurrent;
using System.ComponentModel.Composition;
using System.Diagnostics;
using System.Diagnostics.Contracts;
using System.IO;
using System.Reflection;
using System.Threading;
using System.Threading.Tasks;
using Pithos.Interfaces;
using Pithos.Network;
using log4net;
namespace Pithos.Core.Agents
{
using System;
using System.Collections.Generic;
using System.Linq;
/*public class PollRequest
{
public DateTime? Since { get; set; }
public IEnumerable Batch { get; set; }
}*/
///
/// PollAgent periodically polls the server to detect object changes. The agent retrieves a listing of all
/// objects and compares it with a previously cached version to detect differences.
/// New files are downloaded, missing files are deleted from the local file system and common files are compared
/// to determine the appropriate action
///
[Export]
public class PollAgent
{
private static readonly ILog Log = LogManager.GetLogger(MethodBase.GetCurrentMethod().DeclaringType);
[System.ComponentModel.Composition.Import]
public IStatusKeeper StatusKeeper { get; set; }
[System.ComponentModel.Composition.Import]
public IPithosSettings Settings { get; set; }
[System.ComponentModel.Composition.Import]
public NetworkAgent NetworkAgent { get; set; }
[System.ComponentModel.Composition.Import]
public Selectives Selectives { get; set; }
public IStatusNotification StatusNotification { get; set; }
private CancellationTokenSource _currentOperationCancellation = new CancellationTokenSource();
public void CancelCurrentOperation()
{
//What does it mean to cancel the current upload/download?
//Obviously, the current operation will be cancelled by throwing
//a cancellation exception.
//
//The default behavior is to retry any operations that throw.
//Obviously this is not what we want in this situation.
//The cancelled operation should NOT bea retried.
//
//This can be done by catching the cancellation exception
//and avoiding the retry.
//
//Have to reset the cancellation source - it is not possible to reset the source
//Have to prevent a case where an operation requests a token from the old source
var oldSource = Interlocked.Exchange(ref _currentOperationCancellation, new CancellationTokenSource());
oldSource.Cancel();
}
public bool Pause
{
get {
return _pause;
}
set {
_pause = value;
if (!_pause)
_unPauseEvent.Set();
else
{
_unPauseEvent.Reset();
}
}
}
public CancellationToken CancellationToken
{
get { return _currentOperationCancellation.Token; }
}
private bool _firstPoll = true;
//The Sync Event signals a manual synchronisation
private readonly AsyncManualResetEvent _syncEvent = new AsyncManualResetEvent();
private readonly AsyncManualResetEvent _unPauseEvent = new AsyncManualResetEvent(true);
private readonly ConcurrentDictionary _lastSeen = new ConcurrentDictionary();
private readonly ConcurrentDictionary _accounts = new ConcurrentDictionary();
//private readonly ActionBlock _pollAction;
readonly HashSet _knownContainers = new HashSet();
///
/// Start a manual synchronization
///
public void SynchNow(IEnumerable paths=null)
{
_batchQueue.Enqueue(paths);
_syncEvent.Set();
//_pollAction.Post(new PollRequest {Batch = paths});
}
readonly ConcurrentQueue> _batchQueue=new ConcurrentQueue>();
ConcurrentDictionary _moves=new ConcurrentDictionary();
public void PostMove(MovedEventArgs args)
{
TaskEx.Run(() => _moves.AddOrUpdate(args.OldFullPath, args,(s,e)=>e));
}
///
/// Remote files are polled periodically. Any changes are processed
///
///
///
public void PollRemoteFiles(DateTime? since = null)
{
if (Log.IsDebugEnabled)
Log.DebugFormat("Polling changes after [{0}]",since);
Debug.Assert(Thread.CurrentThread.IsBackground, "Polling Ended up in the main thread!");
//GC.Collect();
using (ThreadContext.Stacks["Retrieve Remote"].Push("All accounts"))
{
//If this poll fails, we will retry with the same since value
var nextSince = since;
try
{
_unPauseEvent.Wait();
UpdateStatus(PithosStatus.PollSyncing);
var accountBatches=new Dictionary>();
IEnumerable batch = null;
if (_batchQueue.TryDequeue(out batch) && batch != null)
foreach (var account in _accounts.Values)
{
var accountBatch = batch.Where(path => path.IsAtOrBelow(account.AccountPath));
accountBatches[account.AccountKey] = accountBatch;
}
var moves=Interlocked.Exchange(ref _moves, new ConcurrentDictionary());
var tasks = new List>();
foreach(var accountInfo in _accounts.Values)
{
IEnumerable accountBatch ;
accountBatches.TryGetValue(accountInfo.AccountKey,out accountBatch);
var t=ProcessAccountFiles (accountInfo, accountBatch, moves,since);
tasks.Add(t);
}
var nextTimes=TaskEx.WhenAll(tasks.ToList()).Result;
_firstPoll = false;
//Reschedule the poll with the current timestamp as a "since" value
if (nextTimes.Length>0)
nextSince = nextTimes.Min();
if (Log.IsDebugEnabled)
Log.DebugFormat("Next Poll for changes since [{0}]",nextSince);
}
catch (Exception ex)
{
Log.ErrorFormat("Error while processing accounts\r\n{0}", ex);
//In case of failure retry with the same "since" value
}
UpdateStatus(PithosStatus.PollComplete);
//The multiple try blocks are required because we can't have an await call
//inside a finally block
//TODO: Find a more elegant solution for reschedulling in the event of an exception
try
{
//Wait for the polling interval to pass or the Sync event to be signalled
nextSince = WaitForScheduledOrManualPoll(nextSince).Result;
}
finally
{
//Ensure polling is scheduled even in case of error
TaskEx.Run(()=>PollRemoteFiles(nextSince));
//_pollAction.Post(new PollRequest {Since = nextSince});
}
}
}
///
/// Wait for the polling period to expire or a manual sync request
///
///
///
private async Task WaitForScheduledOrManualPoll(DateTime? since)
{
var sync = _syncEvent.WaitAsync();
var delay = TimeSpan.FromSeconds(Settings.PollingInterval);
if (Log.IsDebugEnabled)
Log.DebugFormat("Next Poll at [{0}]", DateTime.Now.Add(delay));
var wait = TaskEx.Delay(delay);
var signaledTask = await TaskEx.WhenAny(sync, wait).ConfigureAwait(false);
//Pausing takes precedence over manual sync or awaiting
_unPauseEvent.Wait();
//Wait for network processing to finish before polling
var pauseTask=NetworkAgent.ProceedEvent.WaitAsync();
await TaskEx.WhenAll(signaledTask, pauseTask).ConfigureAwait(false);
//If polling is signalled by SynchNow, ignore the since tag
if (sync.IsCompleted)
{
_syncEvent.Reset();
return null;
}
return since;
}
public async Task ProcessAccountFiles(AccountInfo accountInfo, IEnumerable accountBatch, ConcurrentDictionary moves, DateTime? since = null)
{
if (accountInfo == null)
throw new ArgumentNullException("accountInfo");
if (String.IsNullOrWhiteSpace(accountInfo.AccountPath))
throw new ArgumentException("The AccountInfo.AccountPath is empty", "accountInfo");
Contract.EndContractBlock();
using (ThreadContext.Stacks["Retrieve Remote"].Push(accountInfo.UserName))
{
await NetworkAgent.GetDeleteAwaiter().ConfigureAwait(false);
Log.Info("Scheduled");
var client = new CloudFilesClient(accountInfo);
//We don't need to check the trash container
var allContainers=await client.ListContainers(accountInfo.UserName).ConfigureAwait(false);
var containers = allContainers
.Where(c=>c.Name.ToString()!="trash")
.ToList();
CreateContainerFolders(accountInfo, containers);
//The nextSince time fallback time is the same as the current.
//If polling succeeds, the next Since time will be the smallest of the maximum modification times
//of the shared and account objects
var nextSince = since;
try
{
//Wait for any deletions to finish
await NetworkAgent.GetDeleteAwaiter().ConfigureAwait(false);
//Get the poll time now. We may miss some deletions but it's better to keep a file that was deleted
//than delete a file that was created while we were executing the poll
//Get the list of server objects changed since the last check
//The name of the container is passed as state in order to create a dictionary of tasks in a subsequent step
var listObjects = (from container in containers
select Task>.Factory.StartNew(_ =>
client.ListObjects(accountInfo.UserName, container.Name, since), container.Name)).ToList();
var listShared = Task>.Factory.StartNew(_ =>
client.ListSharedObjects(_knownContainers,since), "shared");
listObjects.Add(listShared);
var listTasks = await Task.Factory.WhenAll(listObjects.ToArray()).ConfigureAwait(false);
using (ThreadContext.Stacks["SCHEDULE"].Push("Process Results"))
{
var dict = listTasks.ToDictionary(t => t.AsyncState);
//Get all non-trash objects. Remember, the container name is stored in AsyncState
var remoteObjects = (from objectList in listTasks
where (string)objectList.AsyncState.ToString() != "trash"
from obj in objectList.Result
orderby obj.Bytes ascending
select obj).ToList();
//Get the latest remote object modification date, only if it is after
//the original since date
nextSince = GetLatestDateAfter(nextSince, remoteObjects);
var sharedObjects = dict["shared"].Result;
//DON'T process trashed files
//If some files are deleted and added again to a folder, they will be deleted
//even though they are new.
//We would have to check file dates and hashes to ensure that a trashed file
//can be deleted safely from the local hard drive.
/*
//Items with the same name, hash may be both in the container and the trash
//Don't delete items that exist in the container
var realTrash = from trash in trashObjects
where
!remoteObjects.Any(
info => info.Name == trash.Name && info.Hash == trash.Hash)
8 select trash;
ProcessTrashedFiles(accountInfo, realTrash);
*/
var cleanRemotes = (from info in remoteObjects.Union(sharedObjects)
let name = info.Name.ToUnescapedString()??""
where !name.EndsWith(".ignore", StringComparison.InvariantCultureIgnoreCase) &&
!name.StartsWith(FolderConstants.CacheFolder + "/",
StringComparison.InvariantCultureIgnoreCase)
select info).ToList();
if (_firstPoll)
StatusKeeper.CleanupOrphanStates();
var differencer = _differencer.PostSnapshot(accountInfo, cleanRemotes);
var currentRemotes = differencer.Current.ToList();
StatusKeeper.CleanupStaleStates(accountInfo, currentRemotes);
//var filterUris = Selectives.SelectiveUris[accountInfo.AccountKey];
//May have to wait if the FileAgent has asked for a Pause, due to local changes
await _unPauseEvent.WaitAsync().ConfigureAwait(false);
//Get the local files here
var agent = AgentLocator.Get(accountInfo.AccountPath);
var files = LoadLocalFileTuples(accountInfo, accountBatch);
var states = StatusKeeper.GetAllStates();
var infos = (from remote in currentRemotes
let path = remote.RelativeUrlToFilePath(accountInfo.UserName)
let info=agent.GetFileSystemInfo(path)
select Tuple.Create(info.FullName,remote))
.ToList();
var token = _currentOperationCancellation.Token;
var tuples = MergeSources(infos, files, states,moves).ToList();
var processedPaths = new HashSet();
//Process only the changes in the batch file, if one exists
var stateTuples = accountBatch==null?tuples:tuples.Where(t => accountBatch.Contains(t.FilePath));
foreach (var tuple in stateTuples.Where(s=>!s.Locked))
{
await _unPauseEvent.WaitAsync().ConfigureAwait(false);
//Set the Merkle Hash
//SetMerkleHash(accountInfo, tuple);
await SyncSingleItem(accountInfo, tuple, agent, moves,processedPaths,token).ConfigureAwait(false);
}
//On the first run
/*
if (_firstPoll)
{
MarkSuspectedDeletes(accountInfo, cleanRemotes);
}
*/
Log.Info("[LISTENER] End Processing");
}
}
catch (Exception ex)
{
Log.ErrorFormat("[FAIL] ListObjects for {0} in ProcessRemoteFiles with {1}", accountInfo.UserName, ex);
return nextSince;
}
Log.Info("[LISTENER] Finished");
return nextSince;
}
}
/*
private static void SetMerkleHash(AccountInfo accountInfo, StateTuple tuple)
{
//The Merkle hash for directories is that of an empty buffer
if (tuple.FileInfo is DirectoryInfo)
tuple.C = MERKLE_EMPTY;
else if (tuple.FileState != null && tuple.MD5 == tuple.FileState.ETag)
{
//If there is a state whose MD5 matches, load the merkle hash from the file state
//insteaf of calculating it
tuple.C = tuple.FileState.Checksum;
}
else
{
tuple.Merkle = Signature.CalculateTreeHashAsync((FileInfo)tuple.FileInfo, accountInfo.BlockSize, accountInfo.BlockHash,1,progress);
//tuple.C=tuple.Merkle.TopHash.ToHashString();
}
}
*/
private IEnumerable LoadLocalFileTuples(AccountInfo accountInfo,IEnumerable batch )
{
using (ThreadContext.Stacks["Account Files Hashing"].Push(accountInfo.UserName))
{
var batchPaths = (batch==null)?new List():batch.ToList();
IEnumerable localInfos=AgentLocator.Get(accountInfo.AccountPath)
.EnumerateFileSystemInfos();
if (batchPaths.Count>0)
localInfos= localInfos.Where(fi => batchPaths.Contains(fi.FullName));
return localInfos;
}
}
///
/// Wait and Pause the agent while waiting
///
///
///
private async Task PauseFor(int backoff)
{
Pause = true;
await TaskEx.Delay(backoff).ConfigureAwait(false);
Pause = false;
}
private async Task SyncSingleItem(AccountInfo accountInfo, StateTuple tuple, FileAgent agent, ConcurrentDictionary moves,HashSet processedPaths, CancellationToken token)
{
Log.DebugFormat("Sync [{0}] C:[{1}] L:[{2}] S:[{3}]", tuple.FilePath, tuple.C, tuple.L, tuple.S);
//If the processed paths already contain the current path, exit
if (!processedPaths.Add(tuple.FilePath))
return;
try
{
bool isInferredParent = tuple.ObjectInfo != null && tuple.ObjectInfo.UUID.StartsWith("00000000-0000-0000");
var localFilePath = tuple.FilePath;
//Don't use the tuple info, it may have been deleted
var localInfo = FileInfoExtensions.FromPath(localFilePath);
var isUnselectedRootFolder = agent.IsUnselectedRootFolder(tuple.FilePath);
//Unselected root folders that have not yet been uploaded should be uploaded and added to the
//selective folders
if (!Selectives.IsSelected(accountInfo, localFilePath) &&
!(isUnselectedRootFolder && tuple.ObjectInfo == null))
return;
// Local file unchanged? If both C and L are null, make sure it's because
//both the file is missing and the state checksum is not missing
if (tuple.C == tuple.L /*&& (localInfo.Exists || tuple.FileState == null)*/)
{
//No local changes
//Server unchanged?
if (tuple.S == tuple.L)
{
// No server changes
//Has the file been renamed locally?
if (!MoveForLocalMove(accountInfo,tuple))
//Has the file been renamed on the server?
MoveForServerMove(accountInfo, tuple);
}
else
{
//Different from server
//Does the server file exist?
if (tuple.S == null)
{
//Server file doesn't exist
//deleteObjectFromLocal()
using (
StatusNotification.GetNotifier("Deleting local {0}", "Deleted local {0}",
localInfo.Name))
{
DeleteLocalFile(agent, localFilePath);
}
}
else
{
//Server file exists
//downloadServerObject() // Result: L = S
//If the file has moved on the server, move it locally before downloading
using (
StatusNotification.GetNotifier("Downloading {0}", "Downloaded {0}",
localInfo.Name))
{
var targetPath = MoveForServerMove(accountInfo, tuple);
if (targetPath != null)
{
await DownloadCloudFile(accountInfo, tuple, token, targetPath).ConfigureAwait(false);
AddOwnFolderToSelectives(accountInfo, tuple, targetPath);
}
}
/*
StatusKeeper.SetFileState(targetPath, FileStatus.Unchanged,
FileOverlayStatus.Normal, "");
*/
}
}
}
else
{
//Local changes found
//Server unchanged?
if (tuple.S == tuple.L)
{
//The FileAgent selective sync checks for new root folder files
if (!agent.Ignore(localFilePath))
{
if ((tuple.C == null || !localInfo.Exists) && tuple.ObjectInfo != null)
{
//deleteObjectFromServer()
DeleteCloudFile(accountInfo, tuple);
//updateRecord( Remove L, S)
}
else
{
//uploadLocalObject() // Result: S = C, L = S
var progress = new Progress(d =>
StatusNotification.Notify(new StatusNotification(String.Format("Merkle Hashing for Upload {0:p} of {1}", d, localInfo.Name))));
//Is it an unselected root folder
var isCreation = isUnselectedRootFolder ||//or a new folder under a selected parent?
(localInfo is DirectoryInfo && Selectives.IsSelected(accountInfo, localInfo) && tuple.FileState == null && tuple.ObjectInfo == null);
//Is this a result of a FILE move with no modifications? Then try to move it,
//to avoid an expensive hash
if (!MoveForLocalMove(accountInfo, tuple))
{
await UploadLocalFile(accountInfo, tuple, token, isCreation, localInfo,processedPaths, progress).ConfigureAwait(false);
}
//updateRecord( S = C )
//State updated by the uploader
if (isCreation )
{
ProcessChildren(accountInfo, tuple, agent, moves,processedPaths,token);
}
}
}
}
else
{
if (tuple.C == tuple.S)
{
// (Identical Changes) Result: L = S
//doNothing()
//Don't update anything for nonexistend server files
if (tuple.S != null)
{
//Detect server moves
var targetPath = MoveForServerMove(accountInfo, tuple);
if (targetPath != null)
{
Debug.Assert(tuple.Merkle != null);
StatusKeeper.StoreInfo(targetPath, tuple.ObjectInfo, tuple.Merkle);
AddOwnFolderToSelectives(accountInfo, tuple, targetPath);
}
}
else
{
//At this point, C==S==NULL and we have a stale state (L)
//Log the stale tuple for investigation
Log.WarnFormat("Stale tuple detected FilePathPath:[{0}], State:[{1}], LocalFile:[{2}]", tuple.FilePath, tuple.FileState, tuple.FileInfo);
//And remove it
if (!String.IsNullOrWhiteSpace(tuple.FilePath))
StatusKeeper.ClearFileStatus(tuple.FilePath);
}
}
else
{
if ((tuple.C == null || !localInfo.Exists) && tuple.ObjectInfo != null)
{
//deleteObjectFromServer()
DeleteCloudFile(accountInfo, tuple);
//updateRecord(Remove L, S)
}
//If both the local and server files are missing, the state is stale
else if (!localInfo.Exists && (tuple.S == null || tuple.ObjectInfo == null))
{
StatusKeeper.ClearFileStatus(localInfo.FullName);
}
else
{
ReportConflictForMismatch(localFilePath);
//identifyAsConflict() // Manual action required
}
}
}
}
}
catch (Exception exc)
{
//In case of error log and retry with the next poll
Log.ErrorFormat("[SYNC] Failed for file {0}. Will Retry.\r\n{1}",tuple.FilePath,exc);
}
}
private void DeleteLocalFile(FileAgent agent, string localFilePath)
{
StatusKeeper.SetFileState(localFilePath, FileStatus.Deleted,
FileOverlayStatus.Deleted, "");
using (NetworkGate.Acquire(localFilePath, NetworkOperation.Deleting))
{
agent.Delete(localFilePath);
}
//updateRecord(Remove C, L)
StatusKeeper.ClearFileStatus(localFilePath);
}
private async Task DownloadCloudFile(AccountInfo accountInfo, StateTuple tuple, CancellationToken token, string targetPath)
{
StatusKeeper.SetFileState(targetPath, FileStatus.Modified, FileOverlayStatus.Modified,
"");
var finalHash=await
NetworkAgent.Downloader.DownloadCloudFile(accountInfo, tuple.ObjectInfo, targetPath,
token)
.ConfigureAwait(false);
//updateRecord( L = S )
StatusKeeper.UpdateFileChecksum(targetPath, tuple.ObjectInfo.ETag,
finalHash);
StatusKeeper.StoreInfo(targetPath, tuple.ObjectInfo,finalHash);
}
private async Task UploadLocalFile(AccountInfo accountInfo, StateTuple tuple, CancellationToken token,
bool isUnselectedRootFolder, FileSystemInfo localInfo, HashSet processedPaths, Progress progress)
{
var action = new CloudUploadAction(accountInfo, localInfo, tuple.FileState,
accountInfo.BlockSize, accountInfo.BlockHash,
"Poll", isUnselectedRootFolder, token, progress,tuple.Merkle);
using (StatusNotification.GetNotifier("Uploading {0}", "Uploaded {0}",
localInfo.Name))
{
await NetworkAgent.Uploader.UploadCloudFile(action, token).ConfigureAwait(false);
}
if (isUnselectedRootFolder)
{
var dirActions =(
from dir in ((DirectoryInfo) localInfo).EnumerateDirectories("*", SearchOption.AllDirectories)
let subAction = new CloudUploadAction(accountInfo, dir, null,
accountInfo.BlockSize, accountInfo.BlockHash,
"Poll", true, token, progress)
select subAction).ToList();
foreach (var dirAction in dirActions)
{
processedPaths.Add(dirAction.LocalFile.FullName);
}
await TaskEx.WhenAll(dirActions.Select(a=>NetworkAgent.Uploader.UploadCloudFile(a,token)).ToArray());
}
}
private bool MoveForLocalMove(AccountInfo accountInfo, StateTuple tuple)
{
//Is the file a directory or previous path missing?
if (tuple.FileInfo is DirectoryInfo)
return false;
//Is the previous path missing?
if (String.IsNullOrWhiteSpace(tuple.OldFullPath))
return false;
//Has the file locally, in which case it should be uploaded rather than moved?
if (tuple.OldChecksum != tuple.Merkle.TopHash.ToHashString())
return false;
var relativePath = tuple.ObjectInfo.RelativeUrlToFilePath(accountInfo.UserName);
var serverPath = Path.Combine(accountInfo.AccountPath, relativePath);
//Has the file been renamed on the server?
if (!tuple.OldFullPath.Equals(serverPath))
{
ReportConflictForDoubleRename(tuple.FilePath);
return false;
}
try
{
var client = new CloudFilesClient(accountInfo);
var objectInfo = CloudAction.CreateObjectInfoFor(accountInfo, tuple.FileInfo);
var containerPath = Path.Combine(accountInfo.AccountPath, objectInfo.Container.ToUnescapedString());
//TODO: SImplify these multiple conversions from and to Uris
var oldName = tuple.OldFullPath.AsRelativeTo(containerPath);
//Then execute a move instead of an upload
using (StatusNotification.GetNotifier("Moving {0}", "Moved {0}", tuple.FileInfo.Name))
{
client.MoveObject(objectInfo.Account, objectInfo.Container, oldName.ToEscapedUri(),
objectInfo.Container, objectInfo.Name);
}
return true;
}
catch (Exception exc)
{
Log.ErrorFormat("[MOVE] Failed for [{0}],:\r\n{1}", tuple.FilePath, exc);
//Return false to force an upload of the file
return false;
}
}
private void AddOwnFolderToSelectives(AccountInfo accountInfo, StateTuple tuple, string targetPath)
{
//Not for shared folders
if (tuple.ObjectInfo.IsShared==true)
return;
//Also ensure that any newly created folders are added to the selectives, if the original folder was selected
var containerPath = Path.Combine(accountInfo.AccountPath, tuple.ObjectInfo.Container.ToUnescapedString());
//If this is a root folder encountered for the first time
if (tuple.L == null && Directory.Exists(tuple.FileInfo.FullName)
&& (tuple.FileInfo.FullName.IsAtOrBelow(containerPath)))
{
var relativePath = tuple.ObjectInfo.RelativeUrlToFilePath(accountInfo.UserName);
var initialPath = Path.Combine(accountInfo.AccountPath, relativePath);
//var hasMoved = true;// !initialPath.Equals(targetPath);
//If the new path is under a selected folder, add it to the selectives as well
if (Selectives.IsSelected(accountInfo, initialPath))
{
Selectives.AddUri(accountInfo, tuple.ObjectInfo.Uri);
Selectives.Save(accountInfo);
}
}
}
private string MoveForServerMove(AccountInfo accountInfo, StateTuple tuple)
{
if (tuple.ObjectInfo == null)
return null;
var relativePath = tuple.ObjectInfo.RelativeUrlToFilePath(accountInfo.UserName);
var serverPath = Path.Combine(accountInfo.AccountPath, relativePath);
//Compare Case Insensitive
if (String.Equals(tuple.FilePath ,serverPath,StringComparison.InvariantCultureIgnoreCase))
return serverPath;
//Has the file been renamed locally?
if (!String.IsNullOrWhiteSpace(tuple.OldFullPath) && !tuple.OldFullPath.Equals(tuple.FilePath))
{
ReportConflictForDoubleRename(tuple.FilePath);
return null;
}
tuple.FileInfo.Refresh();
//The file/folder may not exist if it was moved because its parent moved
if (!tuple.FileInfo.Exists)
{
var target=FileInfoExtensions.FromPath(serverPath);
if (!target.Exists)
{
Log.ErrorFormat("No source or target found while trying to move {0} to {1}", tuple.FileInfo.FullName, serverPath);
}
return serverPath;
}
using (StatusNotification.GetNotifier("Moving local {0}", "Moved local {0}", Path.GetFileName(tuple.FilePath)))
using(NetworkGate.Acquire(tuple.FilePath,NetworkOperation.Renaming))
{
var fi = tuple.FileInfo as FileInfo;
if (fi != null)
{
var targetFile = new FileInfo(serverPath);
if (!targetFile.Directory.Exists)
targetFile.Directory.Create();
fi.MoveTo(serverPath);
}
var di = tuple.FileInfo as DirectoryInfo;
if (di != null)
{
var targetDir = new DirectoryInfo(serverPath);
if (!targetDir.Parent.Exists)
targetDir.Parent.Create();
di.MoveTo(serverPath);
}
}
StatusKeeper.StoreInfo(serverPath, tuple.ObjectInfo);
return serverPath;
}
private void DeleteCloudFile(AccountInfo accountInfo, StateTuple tuple)
{
using (StatusNotification.GetNotifier("Deleting server {0}", "Deleted server {0}", Path.GetFileName(tuple.FilePath)))
{
StatusKeeper.SetFileState(tuple.FilePath, FileStatus.Deleted,
FileOverlayStatus.Deleted, "");
NetworkAgent.DeleteAgent.DeleteCloudFile(accountInfo, tuple.ObjectInfo);
StatusKeeper.ClearFileStatus(tuple.FilePath);
}
}
private void ProcessChildren(AccountInfo accountInfo, StateTuple tuple, FileAgent agent, ConcurrentDictionary moves,HashSet processedPaths,CancellationToken token)
{
var dirInfo = tuple.FileInfo as DirectoryInfo;
var folderTuples = from folder in dirInfo.EnumerateDirectories("*", SearchOption.AllDirectories)
select new StateTuple(folder){C=Signature.MERKLE_EMPTY};
var fileTuples = from file in dirInfo.EnumerateFiles("*", SearchOption.AllDirectories)
let state=StatusKeeper.GetStateByFilePath(file.FullName)
select new StateTuple(file){
Merkle=StatusAgent.CalculateTreeHash(file,accountInfo,state,
Settings.HashingParallelism,token,null)
};
//Process folders first, to ensure folders appear on the sever as soon as possible
folderTuples.ApplyAction(t => SyncSingleItem(accountInfo, t, agent, moves, processedPaths,token).Wait());
fileTuples.ApplyAction(t => SyncSingleItem(accountInfo, t, agent, moves,processedPaths, token).Wait());
}
/*
* //Use the queue to retry locked file hashing
var fileQueue = new ConcurrentQueue(localInfos);
var results = new List>();
var backoff = 0;
while (fileQueue.Count > 0)
{
FileSystemInfo file;
fileQueue.TryDequeue(out file);
using (ThreadContext.Stacks["File"].Push(file.FullName))
{
try
{
//Replace MD5 here, do the calc while syncing individual files
string hash ;
if (file is DirectoryInfo)
hash = MD5_EMPTY;
else
{
//Wait in case the FileAgent has requested a Pause
await _unPauseEvent.WaitAsync().ConfigureAwait(false);
using (StatusNotification.GetNotifier("Hashing {0}", "", file.Name))
{
hash = ((FileInfo)file).ComputeShortHash(StatusNotification);
backoff = 0;
}
}
results.Add(Tuple.Create(file, hash));
}
catch (IOException exc)
{
Log.WarnFormat("[HASH] File in use, will retry [{0}]", exc);
fileQueue.Enqueue(file);
//If this is the only enqueued file
if (fileQueue.Count != 1) continue;
//Increase delay
if (backoff<60000)
backoff += 10000;
//Pause Polling for the specified time
}
if (backoff>0)
await PauseFor(backoff).ConfigureAwait(false);
}
}
return results;
*/
private IEnumerable MergeSources(IEnumerable> infos, IEnumerable files, List states, ConcurrentDictionary moves)
{
var tuplesByPath = new Dictionary();
foreach (var info in files)
{
var tuple = new StateTuple(info);
//Is this the target of a move event?
var moveArg = moves.Values.FirstOrDefault(arg => info.FullName.Equals(arg.FullPath, StringComparison.InvariantCultureIgnoreCase)
|| info.FullName.IsAtOrBelow(arg.FullPath));
if (moveArg != null)
{
tuple.NewFullPath = info.FullName;
var relativePath = info.AsRelativeTo(moveArg.FullPath);
tuple.OldFullPath = Path.Combine(moveArg.OldFullPath, relativePath);
tuple.OldChecksum = states.FirstOrDefault(st => st.FilePath.Equals(tuple.OldFullPath, StringComparison.InvariantCultureIgnoreCase))
.NullSafe(st => st.Checksum);
}
tuplesByPath[tuple.FilePath] = tuple;
}
//For files that have state
foreach (var state in states)
{
StateTuple hashTuple;
if (tuplesByPath.TryGetValue(state.FilePath, out hashTuple))
{
hashTuple.FileState = state;
UpdateHashes(hashTuple);
}
else if (moves.ContainsKey(state.FilePath) && tuplesByPath.TryGetValue(moves[state.FilePath].FullPath, out hashTuple))
{
hashTuple.FileState = state;
UpdateHashes(hashTuple);
}
else
{
var fsInfo = FileInfoExtensions.FromPath(state.FilePath);
hashTuple = new StateTuple {FileInfo = fsInfo, FileState = state};
//Is the source of a moved item?
var moveArg = moves.Values.FirstOrDefault(arg => state.FilePath.Equals(arg.OldFullPath,StringComparison.InvariantCultureIgnoreCase)
|| state.FilePath.IsAtOrBelow(arg.OldFullPath));
if (moveArg != null)
{
var relativePath = state.FilePath.AsRelativeTo(moveArg.OldFullPath);
hashTuple.NewFullPath = Path.Combine(moveArg.FullPath,relativePath);
hashTuple.OldFullPath = state.FilePath;
//Do we have the old MD5?
//hashTuple.OldMD5 = state.LastMD5;
}
tuplesByPath[state.FilePath] = hashTuple;
}
}
//for files that don't have state
foreach (var tuple in tuplesByPath.Values.Where(t => t.FileState == null))
{
UpdateHashes(tuple);
}
var tuplesByID = tuplesByPath.Values
.Where(tuple => tuple.FileState != null && tuple.FileState.ObjectID!=null)
.ToDictionary(tuple=>tuple.FileState.ObjectID,tuple=>tuple);//new Dictionary();
foreach (var info in infos)
{
StateTuple hashTuple;
var filePath = info.Item1;
var objectInfo = info.Item2;
var objectID = objectInfo.UUID;
if (objectID != _emptyGuid && tuplesByID.TryGetValue(objectID, out hashTuple))
{
hashTuple.ObjectInfo = objectInfo;
}
else if (tuplesByPath.TryGetValue(filePath, out hashTuple))
{
hashTuple.ObjectInfo = objectInfo;
}
else
{
var fsInfo = FileInfoExtensions.FromPath(filePath);
hashTuple= new StateTuple {FileInfo = fsInfo, ObjectInfo = objectInfo};
tuplesByPath[filePath] = hashTuple;
if (objectInfo.UUID!=_emptyGuid)
tuplesByID[objectInfo.UUID] = hashTuple;
}
}
var tuples = tuplesByPath.Values;
var brokenTuples = from tuple in tuples
where tuple.FileState != null && tuple.FileState.Checksum == null
&& tuple.ObjectInfo != null && (tuple.FileInfo==null || !tuple.FileInfo.Exists)
select tuple;
var actualTuples = tuples.Except(brokenTuples);
Debug.Assert(actualTuples.All(t => t.HashesValid()));
foreach (var tuple in brokenTuples)
{
StatusKeeper.SetFileState(tuple.FileState.FilePath,
FileStatus.Conflict, FileOverlayStatus.Conflict, "FileState without checksum encountered for server object missing from disk");
}
return actualTuples;
}
///
/// Update the tuple with the file's hashes, avoiding calculation if the file is unchanged
///
///
///
/// The function first checks the file's size and last write date to see if there are any changes. If there are none,
/// the file's stored hashes are used.
/// Otherwise, MD5 is calculated first to ensure there are no changes. If MD5 is different, the Merkle hash is calculated
///
private void UpdateHashes(StateTuple hashTuple)
{
try
{
var state = hashTuple.NullSafe(s => s.FileState);
var storedHash = state.NullSafe(s => s.Checksum);
var storedHashes = state.NullSafe(s => s.Hashes);
//var storedMD5 = state.NullSafe(s => s.LastMD5);
var storedDate = state.NullSafe(s => s.LastWriteDate) ?? DateTime.MinValue;
var storedLength = state.NullSafe(s => s.LastLength);
//var md5Hash = Signature.MD5_EMPTY;
var merkle=TreeHash.Empty;
if (hashTuple.FileInfo is FileInfo)
{
var file = (FileInfo)hashTuple.FileInfo.WithProperCapitalization();
//Attributes unchanged?
//LastWriteTime is only accurate to the second
var unchangedAttributes = file.LastWriteTime - storedDate < TimeSpan.FromSeconds(1)
&& storedLength == file.Length;
//Attributes appear unchanged but the file length doesn't match the stored hash ?
var nonEmptyMismatch = unchangedAttributes &&
(file.Length == 0 ^ storedHash== Signature.MERKLE_EMPTY);
//Missing hashes for NON-EMPTY hash ?
var missingHashes = storedHash != Signature.MERKLE_EMPTY &&
String.IsNullOrWhiteSpace(storedHashes);
//Unchanged attributes but changed MD5
//Short-circuiting ensures MD5 is computed only if the attributes are changed
//var md5Mismatch = (!unchangedAttributes && file.ComputeShortHash(StatusNotification) != storedMD5);
//If the attributes are unchanged but the Merkle doesn't match the size,
//or the attributes and the MD5 hash have changed,
//or the hashes are missing but the tophash is NOT empty, we need to recalculate
//
//Otherwise we load the hashes from state
if (!unchangedAttributes || nonEmptyMismatch || missingHashes)
merkle = RecalculateTreehash(file);
else
{
merkle=TreeHash.Parse(hashTuple.FileState.Hashes);
//merkle.MD5 = storedMD5;
}
//md5Hash = merkle.MD5;
}
//hashTuple.MD5 = md5Hash;
//Setting Merkle also updates C
hashTuple.Merkle = merkle;
}
catch (IOException)
{
hashTuple.Locked = true;
}
}
///
/// Recalculate a file's treehash and md5 and update the database
///
///
///
private TreeHash RecalculateTreehash(FileInfo file)
{
var progress = new Progress(d =>StatusNotification.Notify(
new StatusNotification(String.Format("Hashing {0} of {1}", d, file.Name))));
var merkle = Signature.CalculateTreeHash(file, StatusKeeper.BlockSize,
StatusKeeper.BlockHash,CancellationToken, progress);
StatusKeeper.UpdateFileHashes(file.FullName, merkle);
return merkle;
}
///
/// Returns the latest LastModified date from the list of objects, but only if it is before
/// than the threshold value
///
///
///
///
private static DateTime? GetLatestDateBefore(DateTime? threshold, IList cloudObjects)
{
DateTime? maxDate = null;
if (cloudObjects!=null && cloudObjects.Count > 0)
maxDate = cloudObjects.Max(obj => obj.Last_Modified);
if (maxDate == null || maxDate == DateTime.MinValue)
return threshold;
if (threshold == null || threshold == DateTime.MinValue || threshold > maxDate)
return maxDate;
return threshold;
}
///
/// Returns the latest LastModified date from the list of objects, but only if it is after
/// the threshold value
///
///
///
///
private static DateTime? GetLatestDateAfter(DateTime? threshold, IList cloudObjects)
{
DateTime? maxDate = null;
if (cloudObjects!=null && cloudObjects.Count > 0)
maxDate = cloudObjects.Max(obj => obj.Last_Modified);
if (maxDate == null || maxDate == DateTime.MinValue)
return threshold;
if (threshold == null || threshold == DateTime.MinValue || threshold < maxDate)
return maxDate;
return threshold;
}
readonly AccountsDifferencer _differencer = new AccountsDifferencer();
private bool _pause;
private readonly string _emptyGuid = Guid.Empty.ToString();
private void ReportConflictForMismatch(string localFilePath)
{
if (String.IsNullOrWhiteSpace(localFilePath))
throw new ArgumentNullException("localFilePath");
Contract.EndContractBlock();
StatusKeeper.SetFileState(localFilePath, FileStatus.Conflict, FileOverlayStatus.Conflict, "File changed at the server");
UpdateStatus(PithosStatus.HasConflicts);
var message = String.Format("Conflict detected for file {0}", localFilePath);
Log.Warn(message);
StatusNotification.NotifyChange(message, TraceLevel.Warning);
}
private void ReportConflictForDoubleRename(string localFilePath)
{
if (String.IsNullOrWhiteSpace(localFilePath))
throw new ArgumentNullException("localFilePath");
Contract.EndContractBlock();
StatusKeeper.SetFileState(localFilePath, FileStatus.Conflict, FileOverlayStatus.Conflict, "File renamed both locally and on the server");
UpdateStatus(PithosStatus.HasConflicts);
var message = String.Format("Double rename conflict detected for file {0}", localFilePath);
Log.Warn(message);
StatusNotification.NotifyChange(message, TraceLevel.Warning);
}
///
/// Notify the UI to update the visual status
///
///
private void UpdateStatus(PithosStatus status)
{
try
{
StatusNotification.SetPithosStatus(status);
//StatusNotification.Notify(new Notification());
}
catch (Exception exc)
{
//Failure is not critical, just log it
Log.Warn("Error while updating status", exc);
}
}
private static void CreateContainerFolders(AccountInfo accountInfo, IEnumerable containers)
{
var containerPaths = from container in containers
let containerPath = Path.Combine(accountInfo.AccountPath, container.Name.ToUnescapedString())
where container.Name.ToString() != FolderConstants.TrashContainer && !Directory.Exists(containerPath)
select containerPath;
foreach (var path in containerPaths)
{
Directory.CreateDirectory(path);
}
}
public void AddAccount(AccountInfo accountInfo)
{
//Avoid adding a duplicate accountInfo
_accounts.TryAdd(accountInfo.AccountKey, accountInfo);
}
public void RemoveAccount(AccountInfo accountInfo)
{
AccountInfo account;
_accounts.TryRemove(accountInfo.AccountKey, out account);
SnapshotDifferencer differencer;
_differencer.Differencers.TryRemove(accountInfo.AccountKey, out differencer);
}
}
}