-using System;
+#region
+/* -----------------------------------------------------------------------
+ * <copyright file="FileAgent.cs" company="GRNet">
+ *
+ * Copyright 2011-2012 GRNET S.A. All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or
+ * without modification, are permitted provided that the following
+ * conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above
+ * copyright notice, this list of conditions and the following
+ * disclaimer.
+ *
+ * 2. Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following
+ * disclaimer in the documentation and/or other materials
+ * provided with the distribution.
+ *
+ *
+ * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
+ * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+ * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
+ * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
+ * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
+ * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
+ * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
+ * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
+ * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ *
+ * The views and conclusions contained in the software and
+ * documentation are those of the authors and should not be
+ * interpreted as representing official policies, either expressed
+ * or implied, of GRNET S.A.
+ * </copyright>
+ * -----------------------------------------------------------------------
+ */
+#endregion
+using System;
using System.Collections.Generic;
-using System.ComponentModel.Composition;
-using System.Diagnostics;
using System.Diagnostics.Contracts;
using System.IO;
using System.Linq;
-using System.Text;
+using System.Reflection;
using System.Threading.Tasks;
using Pithos.Interfaces;
using Pithos.Network;
using log4net;
-using log4net.Core;
namespace Pithos.Core.Agents
{
// [Export]
public class FileAgent
{
- Agent<WorkflowState> _agent;
+ private static readonly ILog Log = LogManager.GetLogger(MethodBase.GetCurrentMethod().DeclaringType);
+
+ /*
+ Agent<WorkflowState> _agent;
+ */
private FileSystemWatcher _watcher;
+ private FileSystemWatcherAdapter _adapter;
+ private FileEventIdleBatch _eventIdleBatch;
//[Import]
public IStatusKeeper StatusKeeper { get; set; }
+
+ public IStatusNotification StatusNotification { get; set; }
//[Import]
public IPithosWorkflow Workflow { get; set; }
//[Import]
- public WorkflowAgent WorkflowAgent { get; set; }
+ //public WorkflowAgent WorkflowAgent { get; set; }
private AccountInfo AccountInfo { get; set; }
- private string RootPath { get; set; }
+ internal string RootPath { get; set; }
+
+ public TimeSpan IdleTimeout { get; set; }
+
+ public PollAgent PollAgent { get; set; }
+
+ private void ProcessBatchedEvents(Dictionary<string, FileSystemEventArgs[]> fileEvents)
+ {
+ var paths = new HashSet<string>();
+ var events = from evts in fileEvents.Values
+ from evt in evts
+ select evt;
+ foreach (var evt in events)
+ {
+ paths.Add(evt.FullPath);
+ if (evt is MovedEventArgs)
+ {
+ paths.Add((evt as MovedEventArgs).OldFullPath);
+ }
+ }
+
+ PollAgent.SynchNow(paths);
+ }
+
+/*
+ private void ProcessBatchedEvents(Dictionary<string, FileSystemEventArgs[]> fileEvents)
+ {
+ StatusNotification.SetPithosStatus(PithosStatus.LocalSyncing,String.Format("Uploading {0} files",fileEvents.Count));
+ //Start with events that do not originate in one of the ignored folders
+ var initialEvents = from evt in fileEvents
+ where !IgnorePaths(evt.Key)
+ select evt;
- private static readonly ILog Log = LogManager.GetLogger("FileAgent");
+ IEnumerable<KeyValuePair<string, FileSystemEventArgs[]>> cleanEvents;
+
+
+ var selectiveEnabled = Selectives.IsSelectiveEnabled(AccountInfo.AccountKey);
+ //When selective sync is enabled,
+ if (selectiveEnabled)
+ {
+ //Include all selected items
+ var selectedEvents = from evt in initialEvents
+ where Selectives.IsSelected(AccountInfo, evt.Key)
+ select evt;
+ //And all folder creations in the unselected folders
+ var folderCreations = from evt in initialEvents
+ let folderPath=evt.Key
+ //The original folder may not exist due to renames. Just make sure that the path is not a file
+ where !File.Exists(folderPath)
+ //We only want unselected items
+ && !Selectives.IsSelected(AccountInfo, folderPath)
+ //Is there any creation event related to the folder?
+ && evt.Value.Any(arg => arg.ChangeType == WatcherChangeTypes.Created)
+ select evt;
+ cleanEvents = selectedEvents.Union(folderCreations).ToList();
+ }
+ //If selective is disabled, only exclude the shared folders
+ else
+ {
+ cleanEvents = (from evt in initialEvents
+ where !evt.Key.IsSharedTo(AccountInfo)
+ select evt).ToList();
+ }
+
+
+ foreach (var fileEvent in cleanEvents)
+ {
+ //var filePath = fileEvent.Key;
+ var changes = fileEvent.Value;
+
+ var isNotFile = !File.Exists(fileEvent.Key);
+ foreach (var change in changes)
+ {
+ if (change.ChangeType == WatcherChangeTypes.Renamed)
+ {
+ var rename = (MovedEventArgs) change;
+ _agent.Post(new WorkflowState(change)
+ {
+ AccountInfo = AccountInfo,
+ OldPath = rename.OldFullPath,
+ OldFileName = Path.GetFileName(rename.OldName),
+ Path = rename.FullPath,
+ FileName = Path.GetFileName(rename.Name),
+ TriggeringChange = rename.ChangeType
+ });
+ }
+ else
+ {
+ var isCreation = selectiveEnabled && isNotFile && change.ChangeType == WatcherChangeTypes.Created;
+ _agent.Post(new WorkflowState(change)
+ {
+ AccountInfo = AccountInfo,
+ Path = change.FullPath,
+ FileName = Path.GetFileName(change.Name),
+ TriggeringChange = change.ChangeType,
+ IsCreation=isCreation
+ });
+ }
+ }
+ }
+ StatusNotification.SetPithosStatus(PithosStatus.LocalComplete);
+ }
+*/
public void Start(AccountInfo accountInfo,string rootPath)
{
throw new ArgumentNullException("rootPath");
if (!Path.IsPathRooted(rootPath))
throw new ArgumentException("rootPath must be an absolute path","rootPath");
- Contract.EndContractBlock();
+ if (IdleTimeout == null)
+ throw new InvalidOperationException("IdleTimeout must have a valid value");
+ Contract.EndContractBlock();
AccountInfo = accountInfo;
RootPath = rootPath;
- _watcher = new FileSystemWatcher(rootPath);
- _watcher.IncludeSubdirectories = true;
- _watcher.Changed += OnFileEvent;
- _watcher.Created += OnFileEvent;
- _watcher.Deleted += OnFileEvent;
- _watcher.Renamed += OnRenameEvent;
+
+ _eventIdleBatch = new FileEventIdleBatch(PollAgent,(int)IdleTimeout.TotalMilliseconds, ProcessBatchedEvents);
+
+ _watcher = new FileSystemWatcher(rootPath) { IncludeSubdirectories = true, InternalBufferSize = 8 * 4096 };
+ _adapter = new FileSystemWatcherAdapter(_watcher);
+
+ _adapter.Changed += OnFileEvent;
+ _adapter.Created += OnFileEvent;
+ _adapter.Deleted += OnFileEvent;
+ //_adapter.Renamed += OnRenameEvent;
+ _adapter.Moved += OnMoveEvent;
_watcher.EnableRaisingEvents = true;
+/*
+
+
_agent = Agent<WorkflowState>.Start(inbox =>
{
loop = () =>
{
var message = inbox.Receive();
- var process=message.Then(Process,inbox.CancellationToken);
-
+ var process=message.Then(Process,inbox.CancellationToken);
inbox.LoopAsync(process,loop,ex=>
Log.ErrorFormat("[ERROR] File Event Processing:\r{0}", ex));
};
loop();
- });
+ });*/
}
+/*
private Task<object> Process(WorkflowState state)
{
if (state==null)
try
{
+ //StatusKeeper.EnsureFileState(state.Path);
+
UpdateFileStatus(state);
UpdateOverlayStatus(state);
UpdateFileChecksum(state);
_watcher.EnableRaisingEvents = !value;
}
}
+*/
public string CachePath { get; set; }
- private List<string> _selectivePaths = new List<string>();
+ /*private List<string> _selectivePaths = new List<string>();
public List<string> SelectivePaths
{
get { return _selectivePaths; }
set { _selectivePaths = value; }
}
+*/
+ public Selectives Selectives { get; set; }
+/*
public void Post(WorkflowState workflowState)
{
if (workflowState == null)
{
if (_watcher != null)
{
- _watcher.Changed -= OnFileEvent;
- _watcher.Created -= OnFileEvent;
- _watcher.Deleted -= OnFileEvent;
- _watcher.Renamed -= OnRenameEvent;
_watcher.Dispose();
}
_watcher = null;
_agent.Stop();
}
+*/
// Enumerate all files in the Pithos directory except those in the Fragment folder
// and files with a .ignore extension
public IEnumerable<string> EnumerateFiles(string searchPattern="*")
{
var monitoredFiles = from filePath in Directory.EnumerateFileSystemEntries(RootPath, searchPattern, SearchOption.AllDirectories)
where !Ignore(filePath)
+ orderby filePath ascending
select filePath;
return monitoredFiles;
}
var rootDir = new DirectoryInfo(RootPath);
var monitoredFiles = from file in rootDir.EnumerateFiles(searchPattern, SearchOption.AllDirectories)
where !Ignore(file.FullName)
+ orderby file.FullName ascending
select file;
return monitoredFiles;
}
+ public IEnumerable<FileSystemInfo> EnumerateFileSystemInfos(string searchPattern="*")
+ {
+ var rootDir = new DirectoryInfo(RootPath);
+ //Ensure folders appear first, to allow folder processing as soon as possilbe
+ var folders = (from file in rootDir.EnumerateDirectories(searchPattern, SearchOption.AllDirectories)
+ where !Ignore(file.FullName)
+ orderby file.FullName ascending
+ select file).ToList();
+ var files = (from file in rootDir.EnumerateFiles(searchPattern, SearchOption.AllDirectories)
+ where !Ignore(file.FullName)
+ orderby file.Length ascending
+ select file as FileSystemInfo).ToList();
+ var monitoredFiles = folders
+ //Process small files first, leaving expensive large files for last
+ .Concat(files);
+ return monitoredFiles;
+ }
+
public IEnumerable<string> EnumerateFilesAsRelativeUrls(string searchPattern="*")
{
var rootDir = new DirectoryInfo(RootPath);
var monitoredFiles = from file in rootDir.EnumerateFiles(searchPattern, SearchOption.AllDirectories)
where !Ignore(file.FullName)
+ orderby file.FullName ascending
+ select file.AsRelativeUrlTo(RootPath);
+ return monitoredFiles;
+ }
+
+ public IEnumerable<string> EnumerateFilesSystemInfosAsRelativeUrls(string searchPattern="*")
+ {
+ var rootDir = new DirectoryInfo(RootPath);
+ var monitoredFiles = from file in rootDir.EnumerateFileSystemInfos(searchPattern, SearchOption.AllDirectories)
+ where !Ignore(file.FullName)
+ orderby file.FullName ascending
select file.AsRelativeUrlTo(RootPath);
return monitoredFiles;
}
- private bool Ignore(string filePath)
+ public bool Ignore(string filePath)
+ {
+ if (IgnorePaths(filePath)) return true;
+
+
+ //If selective sync is enabled,
+ if (IsUnselectedRootFolder(filePath))
+ return false;
+ //Ignore if selective synchronization is defined,
+ //And the target file is not below any of the selective paths
+ var ignore = !Selectives.IsSelected(AccountInfo, filePath);
+ return ignore;
+ }
+
+ public bool IsUnselectedRootFolder(string filePath)
{
- var pithosPath = Path.Combine(RootPath, "pithos");
- if (pithosPath.Equals(filePath, StringComparison.InvariantCultureIgnoreCase))
+ return Selectives.IsSelectiveEnabled(AccountInfo.AccountKey) //propagate folder events
+ && Directory.Exists(filePath) //from the container root folder only. Note, in the first level below the account root path are the containers
+ && FoundBelowRoot(filePath, RootPath, 2);
+ }
+
+ public bool IgnorePaths(string filePath)
+ {
+//Ignore all first-level directories and files (ie at the container folders level)
+ if (FoundBelowRoot(filePath, RootPath, 1))
return true;
- if (filePath.StartsWith(CachePath))
+
+ //Ignore first-level items under the "others" folder (ie at the accounts folders level).
+ var othersPath = Path.Combine(RootPath, FolderConstants.OthersFolder);
+ if (FoundBelowRoot(filePath, othersPath, 1))
return true;
- if (_ignoreFiles.ContainsKey(filePath.ToLower()))
+
+ //Ignore second-level (container) folders under the "others" folder (ie at the container folders level).
+ if (FoundBelowRoot(filePath, othersPath, 2))
return true;
- return false;
+
+
+ //Ignore anything happening in the cache path
+ if (filePath.StartsWith(CachePath))
+ return true;
+
+ //Finally, ignore events about one of the ignored files
+ return _ignoreFiles.ContainsKey(filePath.ToLower());
}
- //Post a Change message for all events except rename
- void OnFileEvent(object sender, FileSystemEventArgs e)
+/* private static bool FoundInRoot(string filePath, string rootPath)
{
- //Ignore events that affect the cache folder
- var filePath = e.FullPath;
- if (Ignore(filePath))
- return;
- /* if (Directory.Exists(filePath))
- return; */
- _agent.Post(new WorkflowState{AccountInfo=AccountInfo, Path = filePath, FileName = e.Name, TriggeringChange = e.ChangeType });
- }
+ //var rootDirectory = new DirectoryInfo(rootPath);
+
+ //If the paths are equal, return true
+ if (filePath.Equals(rootPath, StringComparison.InvariantCultureIgnoreCase))
+ return true;
+
+ //If the filepath is below the root path
+ if (filePath.StartsWith(rootPath,StringComparison.InvariantCulture))
+ {
+ //Get the relative path
+ var relativePath = filePath.Substring(rootPath.Length + 1);
+ //If the relativePath does NOT contains a path separator, we found a match
+ return (!relativePath.Contains(@"\"));
+ }
+ //If the filepath is not under the root path, return false
+ return false;
+ }*/
+
+ private static bool FoundBelowRoot(string filePath, string rootPath,int level)
+ {
+ //var rootDirectory = new DirectoryInfo(rootPath);
+
+ //If the paths are equal, return true
+ if (filePath.Equals(rootPath, StringComparison.InvariantCultureIgnoreCase))
+ return true;
+
+ //If the filepath is below the root path
+ if (filePath.StartsWith(rootPath,StringComparison.InvariantCulture))
+ {
+ //Get the relative path
+ var relativePath = filePath.Substring(rootPath.Length + 1);
+ //If the relativePath does NOT contains a path separator, we found a match
+ var levels=relativePath.ToCharArray().Count(c=>c=='\\')+1;
+ return levels==level;
+ }
+
+ //If the filepath is not under the root path, return false
+ return false;
+ }
+
+ /*
//Post a Change message for renames containing the old and new names
void OnRenameEvent(object sender, RenamedEventArgs e)
{
TriggeringChange = e.ChangeType
});
}
+ */
+
+ //Post a Change message for all events except rename
+ void OnFileEvent(object sender, FileSystemEventArgs e)
+ {
+ //Ignore events that affect the cache folder
+ var filePath = e.FullPath;
+ if (Ignore(filePath))
+ return;
+ _eventIdleBatch.Post(e);
+ }
+
+ //Post a Change message for moves containing the old and new names
+ void OnMoveEvent(object sender, MovedEventArgs e)
+ {
+ var oldFullPath = e.OldFullPath;
+ var fullPath = e.FullPath;
+
+
+ //If the source path is one of the ignored folders, ignore
+ if (IgnorePaths(oldFullPath))
+ return;
+
+ //TODO: Must prevent move propagation if the source folder is blocked by selective sync
+ //Ignore takes into account Selective Sync
+ if (Ignore(fullPath))
+ return;
+
+ _eventIdleBatch.Post(e);
+ }
private Dictionary<WatcherChangeTypes, FileStatus> _statusDict = new Dictionary<WatcherChangeTypes, FileStatus>
- {
+ {
{WatcherChangeTypes.Created,FileStatus.Created},
{WatcherChangeTypes.Changed,FileStatus.Modified},
{WatcherChangeTypes.Deleted,FileStatus.Deleted},
{WatcherChangeTypes.Renamed,FileStatus.Renamed}
};
- private Dictionary<string,string> _ignoreFiles=new Dictionary<string, string>();
+ private Dictionary<string, string> _ignoreFiles=new Dictionary<string, string>();
private WorkflowState UpdateFileStatus(WorkflowState state)
{
switch (state.Status)
{
case FileStatus.Created:
+ this.StatusKeeper.SetFileOverlayStatus(state.Path, FileOverlayStatus.Modified,state.ShortHash).Wait();
+ break;
case FileStatus.Modified:
- this.StatusKeeper.SetFileOverlayStatus(state.Path, FileOverlayStatus.Modified);
+ this.StatusKeeper.SetFileOverlayStatus(state.Path, FileOverlayStatus.Modified, state.ShortHash).Wait();
break;
case FileStatus.Deleted:
//this.StatusAgent.RemoveFileOverlayStatus(state.Path);
break;
case FileStatus.Renamed:
this.StatusKeeper.ClearFileStatus(state.OldPath);
- this.StatusKeeper.SetFileOverlayStatus(state.Path, FileOverlayStatus.Modified);
+ this.StatusKeeper.SetFileOverlayStatus(state.Path, FileOverlayStatus.Modified, state.ShortHash).Wait();
break;
case FileStatus.Unchanged:
- this.StatusKeeper.SetFileOverlayStatus(state.Path, FileOverlayStatus.Normal);
+ this.StatusKeeper.SetFileOverlayStatus(state.Path, FileOverlayStatus.Normal, state.ShortHash).Wait();
break;
}
return state;
var info = new FileInfo(path);
- string hash = info.CalculateHash(StatusKeeper.BlockSize,StatusKeeper.BlockHash);
- StatusKeeper.UpdateFileChecksum(path, hash);
- state.Hash = hash;
- return state;
+ using (StatusNotification.GetNotifier("Hashing {0}", "Finished Hashing {0}", info.Name))
+ {
+
+ var shortHash = info.ComputeShortHash(StatusNotification);
+
+ string merkleHash = info.CalculateHash(StatusKeeper.BlockSize, StatusKeeper.BlockHash);
+ StatusKeeper.UpdateFileChecksum(path, shortHash, merkleHash);
+
+ state.Hash = merkleHash;
+ return state;
+ }
}
//Does the file exist in the container's local folder?
return false;
}
+ public static FileAgent GetFileAgent(AccountInfo accountInfo)
+ {
+ return GetFileAgent(accountInfo.AccountPath);
+ }
+
+ public static FileAgent GetFileAgent(string rootPath)
+ {
+ return AgentLocator<FileAgent>.Get(rootPath.ToLower());
+ }
+
+
public FileSystemInfo GetFileSystemInfo(string relativePath)
{
if (String.IsNullOrWhiteSpace(relativePath))
public void Delete(string relativePath)
{
var absolutePath = Path.Combine(RootPath, relativePath).ToLower();
+ if (Log.IsDebugEnabled)
+ Log.DebugFormat("Deleting {0}", absolutePath);
if (File.Exists(absolutePath))
{
try