#region
/* -----------------------------------------------------------------------
*
*
* Copyright 2011-2012 GRNET S.A. All rights reserved.
*
* Redistribution and use in source and binary forms, with or
* without modification, are permitted provided that the following
* conditions are met:
*
* 1. Redistributions of source code must retain the above
* copyright notice, this list of conditions and the following
* disclaimer.
*
* 2. Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following
* disclaimer in the documentation and/or other materials
* provided with the distribution.
*
*
* THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
* OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
* WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
* PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
* CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
* USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
* AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
* LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
* ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*
* The views and conclusions contained in the software and
* documentation are those of the authors and should not be
* interpreted as representing official policies, either expressed
* or implied, of GRNET S.A.
*
* -----------------------------------------------------------------------
*/
#endregion
using System.Collections.Concurrent;
using System.ComponentModel.Composition;
using System.Diagnostics;
using System.Diagnostics.Contracts;
using System.IO;
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
using Castle.ActiveRecord;
using Pithos.Interfaces;
using Pithos.Network;
using log4net;
namespace Pithos.Core.Agents
{
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
///
/// PollAgent periodically polls the server to detect object changes. The agent retrieves a listing of all
/// objects and compares it with a previously cached version to detect differences.
/// New files are downloaded, missing files are deleted from the local file system and common files are compared
/// to determine the appropriate action
///
[Export]
public class PollAgent
{
private static readonly ILog Log = LogManager.GetLogger("PollAgent");
[System.ComponentModel.Composition.Import]
public IStatusKeeper StatusKeeper { get; set; }
[System.ComponentModel.Composition.Import]
public IPithosSettings Settings { get; set; }
[System.ComponentModel.Composition.Import]
public NetworkAgent NetworkAgent { get; set; }
public IStatusNotification StatusNotification { get; set; }
private bool _firstPoll = true;
//The Sync Event signals a manual synchronisation
private readonly AsyncManualResetEvent _syncEvent = new AsyncManualResetEvent();
private ConcurrentDictionary _lastSeen = new ConcurrentDictionary();
private readonly ConcurrentBag _accounts = new ConcurrentBag();
///
/// Start a manual synchronization
///
public void SynchNow()
{
_syncEvent.Set();
}
///
/// Remote files are polled periodically. Any changes are processed
///
///
///
public async Task PollRemoteFiles(DateTime? since = null)
{
Debug.Assert(Thread.CurrentThread.IsBackground, "Polling Ended up in the main thread!");
UpdateStatus(PithosStatus.Syncing);
StatusNotification.Notify(new PollNotification());
using (log4net.ThreadContext.Stacks["Retrieve Remote"].Push("All accounts"))
{
//If this poll fails, we will retry with the same since value
var nextSince = since;
try
{
//Next time we will check for all changes since the current check minus 1 second
//This is done to ensure there are no discrepancies due to clock differences
var current = DateTime.Now.AddSeconds(-1);
var tasks = from accountInfo in _accounts
select ProcessAccountFiles(accountInfo, since);
await TaskEx.WhenAll(tasks.ToList());
_firstPoll = false;
//Reschedule the poll with the current timestamp as a "since" value
nextSince = current;
}
catch (Exception ex)
{
Log.ErrorFormat("Error while processing accounts\r\n{0}", ex);
//In case of failure retry with the same "since" value
}
UpdateStatus(PithosStatus.InSynch);
//The multiple try blocks are required because we can't have an await call
//inside a finally block
//TODO: Find a more elegant solution for reschedulling in the event of an exception
try
{
//Wait for the polling interval to pass or the Sync event to be signalled
nextSince = await WaitForScheduledOrManualPoll(nextSince);
}
finally
{
//Ensure polling is scheduled even in case of error
TaskEx.Run(() => PollRemoteFiles(nextSince));
}
}
}
///
/// Wait for the polling period to expire or a manual sync request
///
///
///
private async Task WaitForScheduledOrManualPoll(DateTime? since)
{
var sync = _syncEvent.WaitAsync();
var wait = TaskEx.Delay(TimeSpan.FromSeconds(Settings.PollingInterval), NetworkAgent.CancellationToken);
var signaledTask = await TaskEx.WhenAny(sync, wait);
//Wait for network processing to finish before polling
var pauseTask=NetworkAgent.ProceedEvent.WaitAsync();
await TaskEx.WhenAll(signaledTask, pauseTask);
//If polling is signalled by SynchNow, ignore the since tag
if (sync.IsCompleted)
{
//TODO: Must convert to AutoReset
_syncEvent.Reset();
return null;
}
return since;
}
public async Task ProcessAccountFiles(AccountInfo accountInfo, DateTime? since = null)
{
if (accountInfo == null)
throw new ArgumentNullException("accountInfo");
if (String.IsNullOrWhiteSpace(accountInfo.AccountPath))
throw new ArgumentException("The AccountInfo.AccountPath is empty", "accountInfo");
Contract.EndContractBlock();
using (log4net.ThreadContext.Stacks["Retrieve Remote"].Push(accountInfo.UserName))
{
await NetworkAgent.GetDeleteAwaiter();
Log.Info("Scheduled");
var client = new CloudFilesClient(accountInfo);
//We don't need to check the trash container
var containers = client.ListContainers(accountInfo.UserName).Where(c=>c.Name!="trash");
CreateContainerFolders(accountInfo, containers);
try
{
//Wait for any deletions to finish
await NetworkAgent.GetDeleteAwaiter();
//Get the poll time now. We may miss some deletions but it's better to keep a file that was deleted
//than delete a file that was created while we were executing the poll
var pollTime = DateTime.Now;
//Get the list of server objects changed since the last check
//The name of the container is passed as state in order to create a dictionary of tasks in a subsequent step
var listObjects = (from container in containers
select Task>.Factory.StartNew(_ =>
client.ListObjects(accountInfo.UserName, container.Name, since), container.Name)).ToList();
//BUG: Can't detect difference between no changes or no objects
//ListObjects returns nothing if there are no changes since the last check time (since value)
//TODO: Must detect the difference between no server objects and no change
//NOTE: One option is to "mark" all result lists with their container name, or
//rather the url of the container
//Another option
var listShared = Task>.Factory.StartNew(_ =>
client.ListSharedObjects(since), "shared");
listObjects.Add(listShared);
var listTasks = await Task.Factory.WhenAll(listObjects.ToArray());
using (log4net.ThreadContext.Stacks["SCHEDULE"].Push("Process Results"))
{
var dict = listTasks.ToDictionary(t => t.AsyncState);
//Get all non-trash objects. Remember, the container name is stored in AsyncState
var remoteObjects = from objectList in listTasks
where (string)objectList.AsyncState != "trash"
from obj in objectList.Result
select obj;
var sharedObjects = dict["shared"].Result;
//DON'T process trashed files
//If some files are deleted and added again to a folder, they will be deleted
//even though they are new.
//We would have to check file dates and hashes to ensure that a trashed file
//can be deleted safely from the local hard drive.
/*
//Items with the same name, hash may be both in the container and the trash
//Don't delete items that exist in the container
var realTrash = from trash in trashObjects
where
!remoteObjects.Any(
info => info.Name == trash.Name && info.Hash == trash.Hash)
select trash;
ProcessTrashedFiles(accountInfo, realTrash);
*/
var cleanRemotes = (from info in remoteObjects.Union(sharedObjects)
let name = info.Name
where !name.EndsWith(".ignore", StringComparison.InvariantCultureIgnoreCase) &&
!name.StartsWith(FolderConstants.CacheFolder + "/",
StringComparison.InvariantCultureIgnoreCase)
select info).ToList();
var differencer = _differencer.PostSnapshot(accountInfo, cleanRemotes);
ProcessDeletedFiles(accountInfo, differencer.Deleted.FilterDirectlyBelow(SelectiveUris), pollTime);
// @@@ NEED To add previous state here as well, To compare with previous hash
//Create a list of actions from the remote files
var allActions = ChangesToActions(accountInfo, differencer.Changed.FilterDirectlyBelow(SelectiveUris))
.Union(
CreatesToActions(accountInfo, differencer.Created.FilterDirectlyBelow(SelectiveUris)));
//And remove those that are already being processed by the agent
var distinctActions = allActions
.Except(NetworkAgent.GetEnumerable(), new PithosMonitor.LocalFileComparer())
.ToList();
//Queue all the actions
foreach (var message in distinctActions)
{
NetworkAgent.Post(message);
}
Log.Info("[LISTENER] End Processing");
}
}
catch (Exception ex)
{
Log.ErrorFormat("[FAIL] ListObjects for{0} in ProcessRemoteFiles with {1}", accountInfo.UserName, ex);
return;
}
Log.Info("[LISTENER] Finished");
}
}
AccountsDifferencer _differencer = new AccountsDifferencer();
private List _selectiveUris=new List();
///
/// Deletes local files that are not found in the list of cloud files
///
///
///
///
private void ProcessDeletedFiles(AccountInfo accountInfo, IEnumerable cloudFiles, DateTime pollTime)
{
if (accountInfo == null)
throw new ArgumentNullException("accountInfo");
if (String.IsNullOrWhiteSpace(accountInfo.AccountPath))
throw new ArgumentException("The AccountInfo.AccountPath is empty", "accountInfo");
if (cloudFiles == null)
throw new ArgumentNullException("cloudFiles");
Contract.EndContractBlock();
//On the first run
if (_firstPoll)
{
//Only consider files that are not being modified, ie they are in the Unchanged state
var deleteCandidates = FileState.Queryable.Where(state =>
state.FilePath.StartsWith(accountInfo.AccountPath)
&& state.FileStatus == FileStatus.Unchanged).ToList();
//TODO: filesToDelete must take into account the Others container
var filesToDelete = (from deleteCandidate in deleteCandidates
let localFile = FileInfoExtensions.FromPath(deleteCandidate.FilePath)
let relativeFilePath = localFile.AsRelativeTo(accountInfo.AccountPath)
where
!cloudFiles.Any(r => r.RelativeUrlToFilePath(accountInfo.UserName) == relativeFilePath)
select localFile).ToList();
//Set the status of missing files to Conflict
foreach (var item in filesToDelete)
{
//Try to acquire a gate on the file, to take into account files that have been dequeued
//and are being processed
using (var gate = NetworkGate.Acquire(item.FullName, NetworkOperation.Deleting))
{
if (gate.Failed)
continue;
StatusKeeper.SetFileState(item.FullName, FileStatus.Conflict, FileOverlayStatus.Deleted);
}
}
UpdateStatus(PithosStatus.HasConflicts);
StatusNotification.NotifyConflicts(filesToDelete, String.Format("{0} local files are missing from Pithos, possibly because they were deleted", filesToDelete.Count));
StatusNotification.NotifyForFiles(filesToDelete, String.Format("{0} files were deleted", filesToDelete.Count), TraceLevel.Info);
}
else
{
var deletedFiles = new List();
foreach (var objectInfo in cloudFiles)
{
var relativePath = objectInfo.RelativeUrlToFilePath(accountInfo.UserName);
var item = FileAgent.GetFileAgent(accountInfo).GetFileSystemInfo(relativePath);
if (item.Exists)
{
if ((item.Attributes & FileAttributes.ReadOnly) == FileAttributes.ReadOnly)
{
item.Attributes = item.Attributes & ~FileAttributes.ReadOnly;
}
item.Delete();
DateTime lastDate;
_lastSeen.TryRemove(item.FullName, out lastDate);
deletedFiles.Add(item);
}
StatusKeeper.SetFileState(item.FullName, FileStatus.Deleted, FileOverlayStatus.Deleted);
}
StatusNotification.NotifyForFiles(deletedFiles, String.Format("{0} files were deleted", deletedFiles.Count), TraceLevel.Info);
}
}
//Creates an appropriate action for each server file
private IEnumerable ChangesToActions(AccountInfo accountInfo, IEnumerable changes)
{
if (changes == null)
throw new ArgumentNullException();
Contract.EndContractBlock();
var fileAgent = FileAgent.GetFileAgent(accountInfo);
//In order to avoid multiple iterations over the files, we iterate only once
//over the remote files
foreach (var objectInfo in changes)
{
var relativePath = objectInfo.RelativeUrlToFilePath(accountInfo.UserName);
//and remove any matching objects from the list, adding them to the commonObjects list
if (fileAgent.Exists(relativePath))
{
//If a directory object already exists, we don't need to perform any other action
var localFile = fileAgent.GetFileSystemInfo(relativePath);
if (objectInfo.Content_Type == @"application/directory" && localFile is DirectoryInfo)
continue;
using (new SessionScope(FlushAction.Never))
{
var state = StatusKeeper.GetStateByFilePath(localFile.FullName);
_lastSeen[localFile.FullName] = DateTime.Now;
//Common files should be checked on a per-case basis to detect differences, which is newer
yield return new CloudAction(accountInfo, CloudActionType.MustSynch,
localFile, objectInfo, state, accountInfo.BlockSize,
accountInfo.BlockHash);
}
}
else
{
//Remote files should be downloaded
yield return new CloudDownloadAction(accountInfo, objectInfo);
}
}
}
private IEnumerable CreatesToActions(AccountInfo accountInfo, IEnumerable creates)
{
if (creates == null)
throw new ArgumentNullException();
Contract.EndContractBlock();
var fileAgent = FileAgent.GetFileAgent(accountInfo);
//In order to avoid multiple iterations over the files, we iterate only once
//over the remote files
foreach (var objectInfo in creates)
{
var relativePath = objectInfo.RelativeUrlToFilePath(accountInfo.UserName);
//and remove any matching objects from the list, adding them to the commonObjects list
if (fileAgent.Exists(relativePath))
{
//If the object already exists, we probably have a conflict
//If a directory object already exists, we don't need to perform any other action
var localFile = fileAgent.GetFileSystemInfo(relativePath);
StatusKeeper.SetFileState(localFile.FullName, FileStatus.Conflict, FileOverlayStatus.Conflict);
}
else
{
//Remote files should be downloaded
yield return new CloudDownloadAction(accountInfo, objectInfo);
}
}
}
private void ProcessTrashedFiles(AccountInfo accountInfo, IEnumerable trashObjects)
{
var fileAgent = FileAgent.GetFileAgent(accountInfo);
foreach (var trashObject in trashObjects)
{
var barePath = trashObject.RelativeUrlToFilePath(accountInfo.UserName);
//HACK: Assume only the "pithos" container is used. Must find out what happens when
//deleting a file from a different container
var relativePath = Path.Combine("pithos", barePath);
fileAgent.Delete(relativePath);
}
}
///
/// Notify the UI to update the visual status
///
///
private void UpdateStatus(PithosStatus status)
{
try
{
StatusKeeper.SetPithosStatus(status);
StatusNotification.Notify(new Notification());
}
catch (Exception exc)
{
//Failure is not critical, just log it
Log.Warn("Error while updating status", exc);
}
}
private static void CreateContainerFolders(AccountInfo accountInfo, IEnumerable containers)
{
var containerPaths = from container in containers
let containerPath = Path.Combine(accountInfo.AccountPath, container.Name)
where container.Name != FolderConstants.TrashContainer && !Directory.Exists(containerPath)
select containerPath;
foreach (var path in containerPaths)
{
Directory.CreateDirectory(path);
}
}
public void SetSyncUris(Uri[] uris)
{
SelectiveUris=uris.ToList();
}
protected List SelectiveUris
{
get { return _selectiveUris;}
set { _selectiveUris = value; }
}
public void AddAccount(AccountInfo accountInfo)
{
if (!_accounts.Contains(accountInfo))
_accounts.Add(accountInfo);
}
}
}