#region
/* -----------------------------------------------------------------------
*
*
* Copyright 2011-2012 GRNET S.A. All rights reserved.
*
* Redistribution and use in source and binary forms, with or
* without modification, are permitted provided that the following
* conditions are met:
*
* 1. Redistributions of source code must retain the above
* copyright notice, this list of conditions and the following
* disclaimer.
*
* 2. Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following
* disclaimer in the documentation and/or other materials
* provided with the distribution.
*
*
* THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
* OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
* WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
* PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
* CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
* USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
* AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
* LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
* ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*
* The views and conclusions contained in the software and
* documentation are those of the authors and should not be
* interpreted as representing official policies, either expressed
* or implied, of GRNET S.A.
*
* -----------------------------------------------------------------------
*/
#endregion
using System;
using System.Collections.Generic;
using System.Diagnostics.Contracts;
using System.IO;
using System.Linq;
using System.Reflection;
using System.Threading.Tasks;
using Pithos.Interfaces;
using Pithos.Network;
using log4net;
namespace Pithos.Core.Agents
{
// [Export]
public class FileAgent
{
private static readonly ILog Log = LogManager.GetLogger(MethodBase.GetCurrentMethod().DeclaringType);
Agent _agent;
private FileSystemWatcher _watcher;
private FileSystemWatcherAdapter _adapter;
//[Import]
public IStatusKeeper StatusKeeper { get; set; }
public IStatusNotification StatusNotification { get; set; }
//[Import]
public IPithosWorkflow Workflow { get; set; }
//[Import]
public WorkflowAgent WorkflowAgent { get; set; }
private AccountInfo AccountInfo { get; set; }
internal string RootPath { get; set; }
private FileEventIdleBatch _eventIdleBatch;
public TimeSpan IdleTimeout { get; set; }
private void ProcessBatchedEvents(Dictionary fileEvents)
{
StatusNotification.SetPithosStatus(PithosStatus.LocalSyncing,String.Format("Uploading {0} files",fileEvents.Count));
foreach (var fileEvent in fileEvents)
{
var filePath = fileEvent.Key;
var changes = fileEvent.Value;
if (Ignore(filePath)) continue;
foreach (var change in changes)
{
if (change.ChangeType == WatcherChangeTypes.Renamed)
{
var rename = (MovedEventArgs) change;
_agent.Post(new WorkflowState
{
AccountInfo = AccountInfo,
OldPath = rename.OldFullPath,
OldFileName = Path.GetFileName(rename.OldName),
Path = rename.FullPath,
FileName = Path.GetFileName(rename.Name),
TriggeringChange = rename.ChangeType
});
}
else
_agent.Post(new WorkflowState
{
AccountInfo = AccountInfo,
Path = change.FullPath,
FileName = Path.GetFileName(change.Name),
TriggeringChange = change.ChangeType
});
}
}
StatusNotification.SetPithosStatus(PithosStatus.LocalComplete);
}
public void Start(AccountInfo accountInfo,string rootPath)
{
if (accountInfo==null)
throw new ArgumentNullException("accountInfo");
if (String.IsNullOrWhiteSpace(rootPath))
throw new ArgumentNullException("rootPath");
if (!Path.IsPathRooted(rootPath))
throw new ArgumentException("rootPath must be an absolute path","rootPath");
if (IdleTimeout == null)
throw new InvalidOperationException("IdleTimeout must have a valid value");
Contract.EndContractBlock();
AccountInfo = accountInfo;
RootPath = rootPath;
_eventIdleBatch = new FileEventIdleBatch((int)IdleTimeout.TotalMilliseconds, ProcessBatchedEvents);
_watcher = new FileSystemWatcher(rootPath) {IncludeSubdirectories = true,InternalBufferSize=8*4096};
_adapter = new FileSystemWatcherAdapter(_watcher);
_adapter.Changed += OnFileEvent;
_adapter.Created += OnFileEvent;
_adapter.Deleted += OnFileEvent;
//_adapter.Renamed += OnRenameEvent;
_adapter.Moved += OnMoveEvent;
_watcher.EnableRaisingEvents = true;
_agent = Agent.Start(inbox =>
{
Action loop = null;
loop = () =>
{
var message = inbox.Receive();
var process=message.Then(Process,inbox.CancellationToken);
inbox.LoopAsync(process,loop,ex=>
Log.ErrorFormat("[ERROR] File Event Processing:\r{0}", ex));
};
loop();
});
}
private Task