using System.Linq;
using System.Net.NetworkInformation;
using System.Security.Cryptography;
+using System.ServiceModel.Description;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
+using Castle.ActiveRecord.Queries;
using Microsoft.WindowsAPICodePack.Net;
using Pithos.Interfaces;
+using System.ServiceModel;
namespace Pithos.Core
{
[Export(typeof(PithosMonitor))]
public class PithosMonitor:IDisposable
{
+ private const string PithosContainer = "pithos";
+ private const string TrashContainer = "trash";
[Import]
public IPithosSettings Settings{get;set;}
[Import]
public ICloudClient CloudListeningClient { get; set; }
+ public string UserName { get; set; }
+ public string ApiKey { get; set; }
+
+ private ServiceHost _statusService { get; set; }
+
private FileSystemWatcher _watcher;
public bool Pause
set
{
if (_watcher!=null)
- _watcher.EnableRaisingEvents = !value;
+ _watcher.EnableRaisingEvents = !value;
+ if (value)
+ {
+ StatusKeeper.SetPithosStatus(PithosStatus.SyncPaused);
+ }
+ else
+ {
+ StatusKeeper.SetPithosStatus(PithosStatus.InSynch);
+ }
}
}
+ public string RootPath { get; set; }
+
CancellationTokenSource _cancellationSource;
}
_cancellationSource = new CancellationTokenSource();
- string path = Settings.PithosPath;
- StartMonitoringFiles(path);
+ var proxyUri = ProxyFromSettings();
+ CloudClient.Proxy = proxyUri;
+ CloudClient.UsePithos = this.UsePithos;
+ EnsurePithosContainers();
+ StatusKeeper.StartProcessing(_cancellationSource.Token);
+ IndexLocalFiles(RootPath);
+ StartMonitoringFiles(RootPath);
+
+ StartStatusService();
StartNetwork();
}
+ private void EnsurePithosContainers()
+ {
+ CloudClient.UsePithos = this.UsePithos;
+ CloudClient.AuthenticationUrl = this.AuthenticationUrl;
+ CloudClient.Authenticate(UserName, ApiKey);
+
+ var pithosContainers = new[] {PithosContainer, TrashContainer};
+ foreach (var container in pithosContainers)
+ {
+ if (!CloudClient.ContainerExists(container))
+ CloudClient.CreateContainer(container);
+ }
+ }
+
+ public string AuthenticationUrl { get; set; }
+
+ private Uri ProxyFromSettings()
+ {
+ if (Settings.UseManualProxy)
+ {
+ var proxyUri = new UriBuilder
+ {
+ Host = Settings.ProxyServer,
+ Port = Settings.ProxyPort
+ };
+ if (Settings.ProxyAuthentication)
+ {
+ proxyUri.UserName = Settings.ProxyUsername;
+ proxyUri.Password = Settings.ProxyPassword;
+ }
+ return proxyUri.Uri;
+ }
+ return null;
+ }
+
+ private void IndexLocalFiles(string path)
+ {
+ Trace.TraceInformation("[START] Inxed Local");
+ try
+ {
+ var files =
+ from filePath in Directory.EnumerateFiles(path, "*", SearchOption.AllDirectories).AsParallel()
+ select filePath;
+ StatusKeeper.StoreUnversionedFiles(files);
+
+ RestartInterruptedFiles();
+ }
+ catch (Exception exc)
+ {
+ Trace.TraceError("[ERROR] Index Local - {0}", exc);
+ }
+ finally
+ {
+ Trace.TraceInformation("[END] Inxed Local");
+ }
+ }
+
+ private void RestartInterruptedFiles()
+ {
+ var interruptedStates = new[] { FileOverlayStatus.Unversioned, FileOverlayStatus.Modified };
+ var filesQuery = from state in FileState.Queryable
+ where interruptedStates.Contains(state.OverlayStatus)
+ select new WorkflowState
+ {
+ Path = state.FilePath.ToLower(),
+ FileName = Path.GetFileName(state.FilePath).ToLower(),
+ Status=state.OverlayStatus==FileOverlayStatus.Unversioned?
+ FileStatus.Created:
+ FileStatus.Modified,
+ TriggeringChange = state.OverlayStatus==FileOverlayStatus.Unversioned?
+ WatcherChangeTypes.Created:
+ WatcherChangeTypes.Changed
+ };
+ _uploadEvents.AddFromEnumerable(filesQuery,false);
+
+ }
+
+ private void StartStatusService()
+ {
+ // Create a ServiceHost for the CalculatorService type and provide the base address.
+ var baseAddress = new Uri("net.pipe://localhost/pithos");
+ _statusService = new ServiceHost(typeof(StatusService), baseAddress);
+
+ var binding = new NetNamedPipeBinding(NetNamedPipeSecurityMode.None);
+
+ _statusService.AddServiceEndpoint(typeof(IStatusService), binding, "net.pipe://localhost/pithos/statuscache");
+ _statusService.AddServiceEndpoint(typeof (ISettingsService), binding, "net.pipe://localhost/pithos/settings");
+
+
+ //// Add a mex endpoint
+ var smb = new ServiceMetadataBehavior
+ {
+ HttpGetEnabled = true,
+ HttpGetUrl = new Uri("http://localhost:30000/pithos/mex")
+ };
+ _statusService.Description.Behaviors.Add(smb);
+
+
+ _statusService.Open();
+ }
+
+ private void StopStatusService()
+ {
+ if (_statusService == null)
+ return;
+
+ if (_statusService.State == CommunicationState.Faulted)
+ _statusService.Abort();
+ else if (_statusService.State != CommunicationState.Closed)
+ _statusService.Close();
+ _statusService = null;
+
+ }
+
+
private void StartNetwork()
{
try
{
- CloudClient.Authenticate(Settings.UserName, Settings.ApiKey);
+ CloudClient.UsePithos = this.UsePithos;
+ CloudClient.AuthenticationUrl = this.AuthenticationUrl;
+ CloudClient.Authenticate(UserName, ApiKey);
- StartListening();
+ StartListening(RootPath);
StartSending();
}
catch (Exception)
}
}
+ public bool UsePithos { get; set; }
+
internal enum CloudActionType
{
Upload=0,
Action = action;
LocalFile = localFile;
CloudFile = cloudFile;
- LocalHash=new Lazy<string>(()=>CalculateHash(LocalFile.FullName),LazyThreadSafetyMode.ExecutionAndPublication);
+ LocalHash=new Lazy<string>(()=>Signature.CalculateHash(LocalFile.FullName),LazyThreadSafetyMode.ExecutionAndPublication);
}
}
}
}
- private BlockingCollection<ListenerAction> _listenerActions=new BlockingCollection<ListenerAction>();
+ private BlockingCollection<ListenerAction> _networkActions=new BlockingCollection<ListenerAction>();
private Timer timer;
- private void StartListening()
+ private void StartListening(string accountPath)
{
- Func<Task> listener = ()=>Task.Factory.StartNew(()=>CloudClient.ListObjects("PITHOS"))
+ ProcessRemoteFiles(accountPath);
+
+ Task.Factory.StartNew(ProcessListenerActions);
+
+ }
+
+ private Task ProcessRemoteFiles(string accountPath)
+ {
+ Trace.TraceInformation("[LISTENER] Scheduled");
+ return Task.Factory.StartNewDelayed(10000)
+ .ContinueWith(t=>CloudClient.ListObjects(PithosContainer))
.ContinueWith(task =>
{
-
- var objects = task.Result;
- if (objects.Count == 0)
+ Trace.TraceInformation("[LISTENER] Start Processing");
+
+ var remoteObjects = task.Result;
+/*
+ if (remoteObjects.Count == 0)
return;
+*/
- var pithosDir = new DirectoryInfo(Settings.PithosPath);
+ var pithosDir = new DirectoryInfo(accountPath);
- var upFiles = from info in objects
- select info.Name;
+ var remoteFiles = from info in remoteObjects
+ select info.Name.ToLower();
var onlyLocal = from localFile in pithosDir.EnumerateFiles()
- where !upFiles.Contains(localFile.Name)
- select new ListenerAction(CloudActionType.UploadUnconditional, localFile,null);
-
-
-
+ where !remoteFiles.Contains(localFile.Name.ToLower())
+ select new ListenerAction(CloudActionType.UploadUnconditional, localFile,ObjectInfo.Empty);
- var localNames =pithosDir.EnumerateFiles().Select(info => info.Name);
- var onlyRemote = from upFile in objects
- where !localNames.Contains(upFile.Name)
- select new ListenerAction(CloudActionType.DownloadUnconditional,null,upFile);
- var existingObjects = from upFile in objects
- join localFile in pithosDir.EnumerateFiles()
- on upFile.Name equals localFile.Name
- select new ListenerAction(CloudActionType.Download, localFile, upFile);
+ var localNames = from info in pithosDir.EnumerateFiles()
+ select info.Name.ToLower();
- var uniques =
- onlyLocal.Union(onlyRemote).Union(existingObjects)
- .Except(_listenerActions,new LocalFileComparer());
+ var onlyRemote = from upFile in remoteObjects
+ where !localNames.Contains(upFile.Name.ToLower())
+ select new ListenerAction(CloudActionType.DownloadUnconditional,new FileInfo(""), upFile);
- _listenerActions.AddFromEnumerable(uniques, false);
-
- }
- );
- Task.Factory.StartNew(() =>
- {
- foreach (var action in _listenerActions.GetConsumingEnumerable())
- {
- var localFile = action.LocalFile;
- var cloudFile = action.CloudFile;
- var downloadPath = (cloudFile==null)? String.Empty:Path.Combine(Settings.PithosPath,cloudFile.Name);
+ var commonObjects = from upFile in remoteObjects
+ join localFile in pithosDir.EnumerateFiles()
+ on upFile.Name.ToLower() equals localFile.Name.ToLower()
+ select new ListenerAction(CloudActionType.Download, localFile, upFile);
- switch(action.Action)
- {
- case CloudActionType.UploadUnconditional:
-
- UploadCloudFile(localFile.Name, localFile.Length, localFile.FullName, action.LocalHash.Value);
- break;
- case CloudActionType.DownloadUnconditional:
- DownloadCloudFile("PITHOS", cloudFile.Name, downloadPath);
- break;
- case CloudActionType.Download:
- if (File.Exists(downloadPath))
- {
- if (cloudFile.Hash != action.LocalHash.Value)
- {
- var lastLocalTime=localFile.LastWriteTime;
- var lastUpTime=cloudFile.Last_Modified;
- if(lastUpTime<=lastLocalTime)
- {
- //Files in conflict
- StatusKeeper.SetFileOverlayStatus(downloadPath,FileOverlayStatus.Conflict);
- }
- else
- DownloadCloudFile("PITHOS", action.CloudFile.Name, downloadPath);
- }
- }
- else
- DownloadCloudFile("PITHOS", action.CloudFile.Name, downloadPath);
- break;
- }
- }
- }
- );
-
- timer = new Timer(o => listener(), null, TimeSpan.Zero, TimeSpan.FromSeconds(10));
-
+ var uniques =
+ onlyLocal.Union(onlyRemote).Union(commonObjects)
+ .Except(_networkActions,new LocalFileComparer());
+
+ _networkActions.AddFromEnumerable(uniques, false);
+
+ Trace.TraceInformation("[LISTENER] End Processing");
+
+ }
+ ).ContinueWith(t=>
+ {
+ if (t.IsFaulted)
+ {
+ Trace.TraceError("[LISTENER] Exception: {0}",t.Exception);
+ }
+ else
+ {
+ Trace.TraceInformation("[LISTENER] Finished");
+ }
+ ProcessRemoteFiles(accountPath);
+ });
}
- private void DownloadCloudFile(string container, string fileName, string localPath)
+ private void ProcessListenerActions()
{
- using (var upstream = CloudClient.GetObject(container, fileName))
- using (var fileStream = File.OpenWrite(localPath))
+ foreach(var action in _networkActions.GetConsumingEnumerable())
{
- upstream.CopyTo(fileStream);
+ Trace.TraceInformation("[ACTION] Start Processing {0}:{1}->{2}",action.Action,action.LocalFile,action.CloudFile.Name);
+ var localFile = action.LocalFile;
+ var cloudFile = action.CloudFile;
+ var downloadPath = (cloudFile == null)? String.Empty
+ : Path.Combine(RootPath,cloudFile.Name);
+ try
+ {
+ switch (action.Action)
+ {
+ case CloudActionType.UploadUnconditional:
+ UploadCloudFile(localFile.Name,localFile.Length,localFile.FullName,action.LocalHash.Value);
+ break;
+ case CloudActionType.DownloadUnconditional:
+ DownloadCloudFile(PithosContainer,cloudFile.Name,downloadPath);
+ break;
+ case CloudActionType.Download:
+ if (File.Exists(downloadPath))
+ {
+ if (cloudFile.Hash !=action.LocalHash.Value)
+ {
+ var lastLocalTime =localFile.LastWriteTime;
+ var lastUpTime =cloudFile.Last_Modified;
+ if (lastUpTime <=lastLocalTime)
+ {
+ //Files in conflict
+ StatusKeeper.SetFileOverlayStatus(downloadPath,FileOverlayStatus.Conflict);
+ }
+ else
+ DownloadCloudFile(PithosContainer,action.CloudFile.Name,downloadPath);
+ }
+ }
+ else
+ DownloadCloudFile(PithosContainer,action.CloudFile.Name,downloadPath);
+ break;
+ }
+ Trace.TraceInformation("[ACTION] End Processing {0}:{1}->{2}", action.Action, action.LocalFile, action.CloudFile.Name);
+ }
+ catch (Exception exc)
+ {
+ Trace.TraceError("[REQUEUE] {0} : {1} -> {2} due to exception\r\n{3}",
+ action.Action, action.LocalFile,action.CloudFile,exc);
+
+ _networkActions.Add(action);
+ }
}
}
+
+
private void StartMonitoringFiles(string path)
{
_watcher = new FileSystemWatcher(path);
_watcher.Renamed += OnRenameEvent;
_watcher.EnableRaisingEvents = true;
- Task.Factory.StartNew(() =>
- {
- foreach (var state in _fileEvents.GetConsumingEnumerable())
- {
- try
- {
- UpdateFileStatus(state);
- UpdateOverlayStatus(state);
- UpdateFileChecksum(state);
- _uploadEvents.Add(state);
- }
- catch (OperationCanceledException)
- {
- throw;
- }
- catch(Exception ex)
- {}
- }
-
- },_cancellationSource.Token);
+ Task.Factory.StartNew(ProcesFileEvents,_cancellationSource.Token);
+ }
+
+ private void ProcesFileEvents()
+ {
+ foreach (var state in _fileEvents.GetConsumingEnumerable())
+ {
+ try
+ {
+ var networkState=StatusKeeper.GetNetworkState(state.Path);
+ //Skip if the file is already being downloaded or uploaded and
+ //the change is create or modify
+ if (networkState != NetworkState.None &&
+ (
+ state.TriggeringChange==WatcherChangeTypes.Created ||
+ state.TriggeringChange==WatcherChangeTypes.Changed
+ ))
+ continue;
+ UpdateFileStatus(state);
+ UpdateOverlayStatus(state);
+ UpdateFileChecksum(state);
+ _uploadEvents.Add(state);
+ }
+ catch (OperationCanceledException exc)
+ {
+ Trace.TraceError("[ERROR] File Event Processing:\r{0}", exc);
+ throw;
+ }
+ catch (Exception exc)
+ {
+ Trace.TraceError("[ERROR] File Event Processing:\r{0}",exc);
+ }
+ }
}
private void StartSending()
throw;
}
catch(Exception ex)
- {}
+ {
+ Trace.TraceError("[ERROR] Synch for {0}:\r{1}",state.FileName,ex);
+ }
}
},_cancellationSource.Token);
{
if (state.Skip)
return state;
- string path = state.Path;
+ string path = state.Path.ToLower();
string fileName = Path.GetFileName(path);
+ //Bypass deleted files, unless the status is Deleted
+ if (!(File.Exists(path) || state.Status == FileStatus.Deleted))
+ {
+ state.Skip = true;
+ this.StatusKeeper.RemoveFileOverlayStatus(path);
+ return state;
+ }
+
switch(state.Status)
{
case FileStatus.Created:
private void RenameCloudFile(WorkflowState state)
{
- this.StatusKeeper.SetFileOverlayStatus(state.Path, FileOverlayStatus.Synch);
+ this.StatusKeeper.SetFileOverlayStatus(state.Path, FileOverlayStatus.Modified);
- CloudClient.MoveObject("PITHOS", state.OldFileName, state.FileName);
+ CloudClient.MoveObject(PithosContainer, state.OldFileName,PithosContainer, state.FileName);
this.StatusKeeper.SetFileStatus(state.Path, FileStatus.Unchanged);
this.StatusKeeper.SetFileOverlayStatus(state.Path, FileOverlayStatus.Normal);
{
Contract.Requires(!Path.IsPathRooted(fileName));
- this.StatusKeeper.SetFileOverlayStatus(fileName, FileOverlayStatus.Synch);
- CloudClient.DeleteObject("PITHOS", fileName);
+ this.StatusKeeper.SetFileOverlayStatus(fileName, FileOverlayStatus.Modified);
+
+ CloudClient.MoveObject(PithosContainer,fileName,TrashContainer,fileName);
this.StatusKeeper.ClearFileStatus(fileName);
this.StatusKeeper.RemoveFileOverlayStatus(fileName);
}
+ private void DownloadCloudFile(string container, string fileName, string localPath)
+ {
+ var state = StatusKeeper.GetNetworkState(fileName);
+ //Abort if the file is already being uploaded or downloaded
+ if (state != NetworkState.None)
+ return;
+
+ StatusKeeper.SetNetworkState(localPath,NetworkState.Downloading);
+ CloudClient.GetObject(container, fileName, localPath)
+ .ContinueWith(t=>
+ CloudClient.GetObjectInfo(container,fileName))
+ .ContinueWith(t=>
+ StatusKeeper.StoreInfo(fileName,t.Result))
+ .ContinueWith(t=>
+ StatusKeeper.SetNetworkState(localPath,NetworkState.None))
+ .Wait();
+ }
+
private void UploadCloudFile(string fileName, long fileSize, string path,string hash)
{
Contract.Requires(!Path.IsPathRooted(fileName));
- //Even if GetObjectInfo times out, we can proceed with the upload
- var info=CloudClient.GetObjectInfo("PITHOS", fileName);
- if ( hash != info.Hash)
+ var state=StatusKeeper.GetNetworkState(fileName);
+ //Abort if the file is already being uploaded or downloaded
+ if (state != NetworkState.None)
+ return;
+
+ StatusKeeper.SetNetworkState(fileName,NetworkState.Uploading);
+
+ //Even if GetObjectInfo times out, we can proceed with the upload
+ var info = CloudClient.GetObjectInfo(PithosContainer, fileName);
+ Task.Factory.StartNew(() =>
{
- this.StatusKeeper.SetFileOverlayStatus(path, FileOverlayStatus.Synch);
-
- CloudClient.PutObject("PITHOS", fileName, path);
-
- }
- this.StatusKeeper.SetFileStatus(path,FileStatus.Unchanged);
- this.StatusKeeper.SetFileOverlayStatus(path,FileOverlayStatus.Normal);
+ if (hash != info.Hash)
+ {
+ Task.Factory.StartNew(() =>
+ this.StatusKeeper.SetFileOverlayStatus(path, FileOverlayStatus.Modified))
+ .ContinueWith(t =>
+ CloudClient.PutObject(PithosContainer, fileName, path, hash));
+ }
+ else
+ {
+ this.StatusKeeper.StoreInfo(path,info);
+ }
+ })
+ .ContinueWith(t =>
+ this.StatusKeeper.SetFileState(path, FileStatus.Unchanged, FileOverlayStatus.Normal))
+ .ContinueWith(t=>
+ this.StatusKeeper.SetNetworkState(path,NetworkState.None))
+ .Wait();
Workflow.RaiseChangeNotification(path);
}
return state;
string path = state.Path;
- string hash = CalculateHash(path);
+ string hash = Signature.CalculateHash(path);
StatusKeeper.UpdateFileChecksum(path, hash);
return state;
}
- private static string CalculateHash(string path)
- {
- string hash;
- using (var hasher = MD5.Create())
- using (var stream = new FileStream(path, FileMode.Open, FileAccess.Read, FileShare.ReadWrite, 4096, true))
- {
- var hashBytes = hasher.ComputeHash(stream);
- var hashBuilder = new StringBuilder();
- foreach (byte b in hasher.ComputeHash(stream))
- hashBuilder.Append(b.ToString("x2").ToLower());
- hash = hashBuilder.ToString();
-
- }
- return hash;
- }
+
private FileSystemEventArgs CalculateSignature(FileSystemEventArgs arg)
{
if (timer != null)
timer.Dispose();
timer = null;
+ StopStatusService();
}
+
~PithosMonitor()
{
Dispose(false);