-using System;
-using System.Collections.Concurrent;
-using System.Collections.Generic;
-using System.ComponentModel.Composition;
-using System.Diagnostics;
-using System.Diagnostics.Contracts;
-using System.IO;
-using System.Linq;
-using System.Net.NetworkInformation;
-using System.Security.Cryptography;
-using System.ServiceModel.Description;
-using System.Text;
-using System.Threading;
-using System.Threading.Tasks;
-using Castle.ActiveRecord.Queries;
-using Microsoft.WindowsAPICodePack.Net;
-using Pithos.Interfaces;
-using System.ServiceModel;
-
-namespace Pithos.Core
-{
- [Export(typeof(PithosMonitor))]
- public class PithosMonitor:IDisposable
- {
- private const string PithosContainer = "pithos";
- private const string TrashContainer = "trash";
-
- [Import]
- public IPithosSettings Settings{get;set;}
-
- [Import]
- public IStatusKeeper StatusKeeper { get; set; }
-
- [Import]
- public IPithosWorkflow Workflow { get; set; }
-
- [Import]
- public ICloudClient CloudClient { get; set; }
-
- [Import]
- public ICloudClient CloudListeningClient { get; set; }
-
- public IStatusNotification StatusNotification { get; set; }
-
- public string UserName { get; set; }
- public string ApiKey { get; set; }
-
- private ServiceHost _statusService { get; set; }
-
- private FileSystemWatcher _watcher;
-
- public bool Pause
- {
- get { return _watcher == null || !_watcher.EnableRaisingEvents; }
- set
- {
- if (_watcher!=null)
- _watcher.EnableRaisingEvents = !value;
- if (value)
- {
- StatusKeeper.SetPithosStatus(PithosStatus.SyncPaused);
- StatusNotification.NotifyChange("Paused");
- }
- else
- {
- StatusKeeper.SetPithosStatus(PithosStatus.InSynch);
- StatusNotification.NotifyChange("In Synch");
- }
- }
- }
-
- public string RootPath { get; set; }
-
-
- CancellationTokenSource _cancellationSource;
-
- readonly BlockingCollection<WorkflowState> _fileEvents = new BlockingCollection<WorkflowState>();
- readonly BlockingCollection<WorkflowState> _uploadEvents = new BlockingCollection<WorkflowState>();
-
-
-
- public void Start()
- {
- StatusNotification.NotifyChange("Starting");
- if (_cancellationSource != null)
- {
- if (!_cancellationSource.IsCancellationRequested)
- return;
- }
- _cancellationSource = new CancellationTokenSource();
-
- var proxyUri = ProxyFromSettings();
- CloudClient.Proxy = proxyUri;
- CloudClient.UsePithos = this.UsePithos;
- EnsurePithosContainers();
- StatusKeeper.StartProcessing(_cancellationSource.Token);
- IndexLocalFiles(RootPath);
- StartMonitoringFiles(RootPath);
-
- StartStatusService();
-
- StartNetwork();
- }
-
- private void EnsurePithosContainers()
- {
- CloudClient.UsePithos = this.UsePithos;
- CloudClient.AuthenticationUrl = this.AuthenticationUrl;
- CloudClient.Authenticate(UserName, ApiKey);
-
- var pithosContainers = new[] {PithosContainer, TrashContainer};
- foreach (var container in pithosContainers)
- {
- if (!CloudClient.ContainerExists(container))
- CloudClient.CreateContainer(container);
- }
- }
-
- public string AuthenticationUrl { get; set; }
-
- private Uri ProxyFromSettings()
- {
- if (Settings.UseManualProxy)
- {
- var proxyUri = new UriBuilder
- {
- Host = Settings.ProxyServer,
- Port = Settings.ProxyPort
- };
- if (Settings.ProxyAuthentication)
- {
- proxyUri.UserName = Settings.ProxyUsername;
- proxyUri.Password = Settings.ProxyPassword;
- }
- return proxyUri.Uri;
- }
- return null;
- }
-
- private void IndexLocalFiles(string path)
- {
- StatusNotification.NotifyChange("Indexing Local Files",TraceLevel.Info);
- Trace.TraceInformation("[START] Index Local");
- try
- {
- var files =
- from filePath in Directory.EnumerateFiles(path, "*", SearchOption.AllDirectories).AsParallel()
- select filePath;
- StatusKeeper.StoreUnversionedFiles(files);
-
- RestartInterruptedFiles();
- }
- catch (Exception exc)
- {
- Trace.TraceError("[ERROR] Index Local - {0}", exc);
- }
- finally
- {
- Trace.TraceInformation("[END] Inxed Local");
- }
- }
-
- private void RestartInterruptedFiles()
- {
- StatusNotification.NotifyChange("Restart processing interrupted files", TraceLevel.Verbose);
- var interruptedStates = new[] { FileOverlayStatus.Unversioned, FileOverlayStatus.Modified };
- var filesQuery = from state in FileState.Queryable
- where interruptedStates.Contains(state.OverlayStatus)
- select new WorkflowState
- {
- Path = state.FilePath.ToLower(),
- FileName = Path.GetFileName(state.FilePath).ToLower(),
- Status=state.OverlayStatus==FileOverlayStatus.Unversioned?
- FileStatus.Created:
- FileStatus.Modified,
- TriggeringChange = state.OverlayStatus==FileOverlayStatus.Unversioned?
- WatcherChangeTypes.Created:
- WatcherChangeTypes.Changed
- };
- _uploadEvents.AddFromEnumerable(filesQuery,false);
-
- }
-
- private void StartStatusService()
- {
- // Create a ServiceHost for the CalculatorService type and provide the base address.
- var baseAddress = new Uri("net.pipe://localhost/pithos");
- _statusService = new ServiceHost(typeof(StatusService), baseAddress);
-
- var binding = new NetNamedPipeBinding(NetNamedPipeSecurityMode.None);
-
- _statusService.AddServiceEndpoint(typeof(IStatusService), binding, "net.pipe://localhost/pithos/statuscache");
- _statusService.AddServiceEndpoint(typeof (ISettingsService), binding, "net.pipe://localhost/pithos/settings");
-
-
- //// Add a mex endpoint
- var smb = new ServiceMetadataBehavior
- {
- HttpGetEnabled = true,
- HttpGetUrl = new Uri("http://localhost:30000/pithos/mex")
- };
- _statusService.Description.Behaviors.Add(smb);
-
-
- _statusService.Open();
- }
-
- private void StopStatusService()
- {
- if (_statusService == null)
- return;
-
- if (_statusService.State == CommunicationState.Faulted)
- _statusService.Abort();
- else if (_statusService.State != CommunicationState.Closed)
- _statusService.Close();
- _statusService = null;
-
- }
-
-
- private void StartNetwork()
- {
-
- bool connected = NetworkListManager.IsConnectedToInternet;
- //If we are not connected retry later
- if (!connected)
- {
- Task.Factory.StartNewDelayed(10000, StartNetwork);
- return;
- }
-
- try
- {
- CloudClient.UsePithos = this.UsePithos;
- CloudClient.AuthenticationUrl = this.AuthenticationUrl;
- CloudClient.Authenticate(UserName, ApiKey);
-
- StartListening(RootPath);
- StartSending();
- }
- catch (Exception)
- {
- //Faild to authenticate due to network or account error
- //Retry after a while
- Task.Factory.StartNewDelayed(10000, StartNetwork);
- }
- }
-
- public bool UsePithos { get; set; }
-
- internal enum CloudActionType
- {
- Upload=0,
- Download,
- UploadUnconditional,
- DownloadUnconditional,
- DeleteLocal,
- DeleteCloud
- }
-
- internal class ListenerAction
- {
- public CloudActionType Action { get; set; }
- public FileInfo LocalFile { get; set; }
- public ObjectInfo CloudFile { get; set; }
-
- public Lazy<string> LocalHash { get; set; }
-
- public ListenerAction(CloudActionType action, FileInfo localFile, ObjectInfo cloudFile)
- {
- Action = action;
- LocalFile = localFile;
- CloudFile = cloudFile;
- LocalHash=new Lazy<string>(()=>Signature.CalculateHash(LocalFile.FullName),LazyThreadSafetyMode.ExecutionAndPublication);
- }
-
- }
-
- internal class LocalFileComparer:EqualityComparer<ListenerAction>
- {
- public override bool Equals(ListenerAction x, ListenerAction y)
- {
- if (x.Action != y.Action)
- return false;
- if (x.LocalFile != null && y.LocalFile != null && !x.LocalFile.FullName.Equals(y.LocalFile.FullName))
- return false;
- if (x.CloudFile != null && y.CloudFile != null && !x.CloudFile.Hash.Equals(y.CloudFile.Hash))
- return false;
- if (x.CloudFile == null ^ y.CloudFile == null ||
- x.LocalFile == null ^ y.LocalFile == null)
- return false;
- return true;
- }
-
- public override int GetHashCode(ListenerAction obj)
- {
- var hash1 = (obj.LocalFile == null) ? int.MaxValue : obj.LocalFile.FullName.GetHashCode();
- var hash2 = (obj.CloudFile == null) ? int.MaxValue : obj.CloudFile.Hash.GetHashCode();
- var hash3 = obj.Action.GetHashCode();
- return hash1 ^ hash2 & hash3;
- }
- }
-
- private BlockingCollection<ListenerAction> _networkActions=new BlockingCollection<ListenerAction>();
-
- private Timer timer;
-
- private void StartListening(string accountPath)
- {
-
- ProcessRemoteFiles(accountPath);
-
- Task.Factory.StartNew(ProcessListenerActions);
-
- }
-
- private Task ProcessRemoteFiles(string accountPath)
- {
- Trace.TraceInformation("[LISTENER] Scheduled");
- return Task.Factory.StartNewDelayed(10000)
- .ContinueWith(t=>CloudClient.ListObjects(PithosContainer))
- .ContinueWith(task =>
- {
- Trace.TraceInformation("[LISTENER] Start Processing");
-
- var remoteObjects = task.Result;
-/*
- if (remoteObjects.Count == 0)
- return;
-*/
-
- var pithosDir = new DirectoryInfo(accountPath);
-
- var remoteFiles = from info in remoteObjects
- select info.Name.ToLower();
-
- var onlyLocal = from localFile in pithosDir.EnumerateFiles()
- where !remoteFiles.Contains(localFile.Name.ToLower())
- select new ListenerAction(CloudActionType.UploadUnconditional, localFile,ObjectInfo.Empty);
-
-
-
- var localNames = from info in pithosDir.EnumerateFiles()
- select info.Name.ToLower();
-
- var onlyRemote = from upFile in remoteObjects
- where !localNames.Contains(upFile.Name.ToLower())
- select new ListenerAction(CloudActionType.DownloadUnconditional,new FileInfo(""), upFile);
-
-
- var commonObjects = from upFile in remoteObjects
- join localFile in pithosDir.EnumerateFiles()
- on upFile.Name.ToLower() equals localFile.Name.ToLower()
- select new ListenerAction(CloudActionType.Download, localFile, upFile);
-
- var uniques =
- onlyLocal.Union(onlyRemote).Union(commonObjects)
- .Except(_networkActions,new LocalFileComparer());
-
- _networkActions.AddFromEnumerable(uniques, false);
-
- StatusNotification.NotifyChange(String.Format("Processing {0} files", _networkActions.Count));
-
- Trace.TraceInformation("[LISTENER] End Processing");
-
- }
- ).ContinueWith(t=>
- {
- if (t.IsFaulted)
- {
- Trace.TraceError("[LISTENER] Exception: {0}",t.Exception);
- }
- else
- {
- Trace.TraceInformation("[LISTENER] Finished");
- }
- ProcessRemoteFiles(accountPath);
- });
- }
-
- private void ProcessListenerActions()
- {
- foreach(var action in _networkActions.GetConsumingEnumerable())
- {
- Trace.TraceInformation("[ACTION] Start Processing {0}:{1}->{2}",action.Action,action.LocalFile,action.CloudFile.Name);
- var localFile = action.LocalFile;
- var cloudFile = action.CloudFile;
- var downloadPath = (cloudFile == null)? String.Empty
- : Path.Combine(RootPath,cloudFile.Name);
- try
- {
- switch (action.Action)
- {
- case CloudActionType.UploadUnconditional:
- UploadCloudFile(localFile.Name,localFile.Length,localFile.FullName,action.LocalHash.Value);
- break;
- case CloudActionType.DownloadUnconditional:
- DownloadCloudFile(PithosContainer,cloudFile.Name,downloadPath);
- break;
- case CloudActionType.Download:
- if (File.Exists(downloadPath))
- {
- if (cloudFile.Hash !=action.LocalHash.Value)
- {
- var lastLocalTime =localFile.LastWriteTime;
- var lastUpTime =cloudFile.Last_Modified;
- if (lastUpTime <=lastLocalTime)
- {
- //Files in conflict
- StatusKeeper.SetFileOverlayStatus(downloadPath,FileOverlayStatus.Conflict);
- }
- else
- DownloadCloudFile(PithosContainer,action.CloudFile.Name,downloadPath);
- }
- }
- else
- DownloadCloudFile(PithosContainer,action.CloudFile.Name,downloadPath);
- break;
- }
- Trace.TraceInformation("[ACTION] End Processing {0}:{1}->{2}", action.Action, action.LocalFile, action.CloudFile.Name);
- }
- catch (Exception exc)
- {
- Trace.TraceError("[REQUEUE] {0} : {1} -> {2} due to exception\r\n{3}",
- action.Action, action.LocalFile,action.CloudFile,exc);
-
- _networkActions.Add(action);
- }
- }
- }
-
-
-
- private void StartMonitoringFiles(string path)
- {
- _watcher = new FileSystemWatcher(path);
- _watcher.Changed += OnFileEvent;
- _watcher.Created += OnFileEvent;
- _watcher.Deleted += OnFileEvent;
- _watcher.Renamed += OnRenameEvent;
- _watcher.EnableRaisingEvents = true;
-
- Task.Factory.StartNew(ProcesFileEvents,_cancellationSource.Token);
- }
-
- private void ProcesFileEvents()
- {
- foreach (var state in _fileEvents.GetConsumingEnumerable())
- {
- try
- {
- var networkState=StatusKeeper.GetNetworkState(state.Path);
- //Skip if the file is already being downloaded or uploaded and
- //the change is create or modify
- if (networkState != NetworkState.None &&
- (
- state.TriggeringChange==WatcherChangeTypes.Created ||
- state.TriggeringChange==WatcherChangeTypes.Changed
- ))
- continue;
- UpdateFileStatus(state);
- UpdateOverlayStatus(state);
- UpdateFileChecksum(state);
- _uploadEvents.Add(state);
- }
- catch (OperationCanceledException exc)
- {
- Trace.TraceError("[ERROR] File Event Processing:\r{0}", exc);
- throw;
- }
- catch (Exception exc)
- {
- Trace.TraceError("[ERROR] File Event Processing:\r{0}",exc);
- }
- }
- }
-
- private void StartSending()
- {
- Task.Factory.StartNew(() =>
- {
- foreach (var state in _uploadEvents.GetConsumingEnumerable())
- {
- try
- {
- SynchToCloud(state);
- }
- catch (OperationCanceledException)
- {
- throw;
- }
- catch(Exception ex)
- {
- Trace.TraceError("[ERROR] Synch for {0}:\r{1}",state.FileName,ex);
- }
- }
-
- },_cancellationSource.Token);
- }
-
-
- private WorkflowState SynchToCloud(WorkflowState state)
- {
- if (state.Skip)
- return state;
- string path = state.Path.ToLower();
- string fileName = Path.GetFileName(path);
-
- //Bypass deleted files, unless the status is Deleted
- if (!(File.Exists(path) || state.Status == FileStatus.Deleted))
- {
- state.Skip = true;
- this.StatusKeeper.RemoveFileOverlayStatus(path);
- return state;
- }
-
- switch(state.Status)
- {
- case FileStatus.Created:
- case FileStatus.Modified:
- var info = new FileInfo(path);
- long fileSize = info.Length;
- UploadCloudFile(fileName, fileSize, path,state.Hash);
- break;
- case FileStatus.Deleted:
- DeleteCloudFile(fileName);
- break;
- case FileStatus.Renamed:
- RenameCloudFile(state);
- break;
- }
- return state;
- }
-
- private void RenameCloudFile(WorkflowState state)
- {
- this.StatusKeeper.SetFileOverlayStatus(state.Path, FileOverlayStatus.Modified);
-
-
-
- CloudClient.MoveObject(PithosContainer, state.OldFileName,PithosContainer, state.FileName);
-
- this.StatusKeeper.SetFileStatus(state.Path, FileStatus.Unchanged);
- this.StatusKeeper.SetFileOverlayStatus(state.Path, FileOverlayStatus.Normal);
- Workflow.RaiseChangeNotification(state.Path);
- }
-
- private void DeleteCloudFile(string fileName)
- {
- Contract.Requires(!Path.IsPathRooted(fileName));
-
- this.StatusKeeper.SetFileOverlayStatus(fileName, FileOverlayStatus.Modified);
-
- CloudClient.MoveObject(PithosContainer,fileName,TrashContainer,fileName);
- this.StatusKeeper.ClearFileStatus(fileName);
- this.StatusKeeper.RemoveFileOverlayStatus(fileName);
- }
-
- private void DownloadCloudFile(string container, string fileName, string localPath)
- {
- var state = StatusKeeper.GetNetworkState(fileName);
- //Abort if the file is already being uploaded or downloaded
- if (state != NetworkState.None)
- return;
-
- StatusKeeper.SetNetworkState(localPath,NetworkState.Downloading);
- CloudClient.GetObject(container, fileName, localPath)
- .ContinueWith(t=>
- CloudClient.GetObjectInfo(container,fileName))
- .ContinueWith(t=>
- StatusKeeper.StoreInfo(fileName,t.Result))
- .ContinueWith(t=>
- StatusKeeper.SetNetworkState(localPath,NetworkState.None))
- .Wait();
- }
-
- private void UploadCloudFile(string fileName, long fileSize, string path,string hash)
- {
- Contract.Requires(!Path.IsPathRooted(fileName));
- var state=StatusKeeper.GetNetworkState(fileName);
- //Abort if the file is already being uploaded or downloaded
- if (state != NetworkState.None)
- return;
-
- StatusKeeper.SetNetworkState(fileName,NetworkState.Uploading);
-
- //Even if GetObjectInfo times out, we can proceed with the upload
- var info = CloudClient.GetObjectInfo(PithosContainer, fileName);
- Task.Factory.StartNew(() =>
- {
- if (hash != info.Hash)
- {
- Task.Factory.StartNew(() =>
- this.StatusKeeper.SetFileOverlayStatus(path, FileOverlayStatus.Modified))
- .ContinueWith(t =>
- CloudClient.PutObject(PithosContainer, fileName, path, hash));
- }
- else
- {
- this.StatusKeeper.StoreInfo(path,info);
- }
- })
- .ContinueWith(t =>
- this.StatusKeeper.SetFileState(path, FileStatus.Unchanged, FileOverlayStatus.Normal))
- .ContinueWith(t=>
- this.StatusKeeper.SetNetworkState(path,NetworkState.None))
- .Wait();
- Workflow.RaiseChangeNotification(path);
- }
-
- private Dictionary<WatcherChangeTypes, FileStatus> _statusDict = new Dictionary<WatcherChangeTypes, FileStatus>
- {
- {WatcherChangeTypes.Created,FileStatus.Created},
- {WatcherChangeTypes.Changed,FileStatus.Modified},
- {WatcherChangeTypes.Deleted,FileStatus.Deleted},
- {WatcherChangeTypes.Renamed,FileStatus.Renamed}
- };
-
- private WorkflowState UpdateFileStatus(WorkflowState state)
- {
- string path = state.Path;
- FileStatus status = _statusDict[state.TriggeringChange];
- var oldStatus = Workflow.StatusKeeper.GetFileStatus(path);
- if (status == oldStatus)
- {
- state.Status = status;
- state.Skip = true;
- return state;
- }
- if (state.Status == FileStatus.Renamed)
- Workflow.ClearFileStatus(path);
-
- state.Status = Workflow.SetFileStatus(path, status);
- return state;
- }
-
- private WorkflowState UpdateOverlayStatus(WorkflowState state)
- {
- if (state.Skip)
- return state;
-
- switch (state.Status)
- {
- case FileStatus.Created:
- case FileStatus.Modified:
- this.StatusKeeper.SetFileOverlayStatus(state.Path, FileOverlayStatus.Modified);
- break;
- case FileStatus.Deleted:
- this.StatusKeeper.RemoveFileOverlayStatus(state.Path);
- break;
- case FileStatus.Renamed:
- this.StatusKeeper.RemoveFileOverlayStatus(state.OldPath);
- this.StatusKeeper.SetFileOverlayStatus(state.Path, FileOverlayStatus.Modified);
- break;
- case FileStatus.Unchanged:
- this.StatusKeeper.SetFileOverlayStatus(state.Path, FileOverlayStatus.Normal);
- break;
- }
-
- if (state.Status==FileStatus.Deleted)
- Workflow.RaiseChangeNotification(Path.GetDirectoryName(state.Path));
- else
- Workflow.RaiseChangeNotification(state.Path);
- return state;
- }
-
-
- private WorkflowState UpdateFileChecksum(WorkflowState state)
- {
- if (state.Skip)
- return state;
-
- if (state.Status == FileStatus.Deleted)
- return state;
-
- string path = state.Path;
- string hash = Signature.CalculateHash(path);
-
- StatusKeeper.UpdateFileChecksum(path, hash);
-
- state.Hash = hash;
- return state;
- }
-
-
-
- private FileSystemEventArgs CalculateSignature(FileSystemEventArgs arg)
- {
- Debug.WriteLine(String.Format("{0} {1} {2}", arg.ChangeType, arg.Name, arg.FullPath), "INFO");
- return arg;
- }
-
- void OnFileEvent(object sender, FileSystemEventArgs e)
- {
- _fileEvents.Add(new WorkflowState{Path=e.FullPath,FileName = e.Name,TriggeringChange=e.ChangeType});
- }
-
- void OnRenameEvent(object sender, RenamedEventArgs e)
- {
- _fileEvents.Add(new WorkflowState { OldPath=e.OldFullPath,OldFileName=e.OldName,
- Path = e.FullPath, FileName = e.Name, TriggeringChange = e.ChangeType });
- }
-
- public void Stop()
- {
- if (_watcher != null)
- {
- _watcher.Changed -= OnFileEvent;
- _watcher.Created -= OnFileEvent;
- _watcher.Deleted -= OnFileEvent;
- _watcher.Renamed -= OnRenameEvent;
- _watcher.Dispose();
- }
- _watcher = null;
- _fileEvents.CompleteAdding();
- if (timer != null)
- timer.Dispose();
- timer = null;
- StopStatusService();
- }
-
-
- ~PithosMonitor()
- {
- Dispose(false);
- }
-
- public void Dispose()
- {
- Dispose(true);
- GC.SuppressFinalize(this);
- }
-
- protected virtual void Dispose(bool disposing)
- {
- if (disposing)
- {
- Stop();
- }
- }
-
-
- }
-
- public interface IStatusNotification
- {
- void NotifyChange(string status,TraceLevel level=TraceLevel.Info);
- }
-}
+#region\r
+/* -----------------------------------------------------------------------\r
+ * <copyright file="PithosMonitor.cs" company="GRNet">\r
+ * \r
+ * Copyright 2011-2012 GRNET S.A. All rights reserved.\r
+ *\r
+ * Redistribution and use in source and binary forms, with or\r
+ * without modification, are permitted provided that the following\r
+ * conditions are met:\r
+ *\r
+ * 1. Redistributions of source code must retain the above\r
+ * copyright notice, this list of conditions and the following\r
+ * disclaimer.\r
+ *\r
+ * 2. Redistributions in binary form must reproduce the above\r
+ * copyright notice, this list of conditions and the following\r
+ * disclaimer in the documentation and/or other materials\r
+ * provided with the distribution.\r
+ *\r
+ *\r
+ * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS\r
+ * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED\r
+ * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR\r
+ * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR\r
+ * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,\r
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT\r
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF\r
+ * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED\r
+ * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT\r
+ * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN\r
+ * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE\r
+ * POSSIBILITY OF SUCH DAMAGE.\r
+ *\r
+ * The views and conclusions contained in the software and\r
+ * documentation are those of the authors and should not be\r
+ * interpreted as representing official policies, either expressed\r
+ * or implied, of GRNET S.A.\r
+ * </copyright>\r
+ * -----------------------------------------------------------------------\r
+ */\r
+#endregion\r
+using System;\r
+using System.Collections.Generic;\r
+using System.ComponentModel.Composition;\r
+using System.Diagnostics.Contracts;\r
+using System.IO;\r
+using System.Linq;\r
+using System.Reflection;\r
+using System.Threading;\r
+using System.Threading.Tasks;\r
+using Pithos.Core.Agents;\r
+using Pithos.Interfaces;\r
+using Pithos.Network;\r
+using log4net;\r
+\r
+namespace Pithos.Core\r
+{\r
+ [Export(typeof(PithosMonitor))]\r
+ public class PithosMonitor:IDisposable\r
+ {\r
+ private static readonly ILog Log = LogManager.GetLogger(MethodBase.GetCurrentMethod().DeclaringType);\r
+\r
+ private int _blockSize;\r
+ private string _blockHash;\r
+\r
+ [Import]\r
+ public IPithosSettings Settings{get;set;}\r
+\r
+ private IStatusKeeper _statusKeeper;\r
+\r
+ [Import]\r
+ public IStatusKeeper StatusKeeper\r
+ {\r
+ get { return _statusKeeper; }\r
+ set\r
+ {\r
+ _statusKeeper = value;\r
+ FileAgent.StatusKeeper = value;\r
+ }\r
+ }\r
+\r
+\r
+\r
+\r
+ private IPithosWorkflow _workflow;\r
+\r
+ [Import]\r
+ public IPithosWorkflow Workflow\r
+ {\r
+ get { return _workflow; }\r
+ set\r
+ {\r
+ _workflow = value;\r
+ FileAgent.Workflow = value;\r
+ }\r
+ }\r
+\r
+ public ICloudClient CloudClient { get; set; }\r
+\r
+ public IStatusNotification StatusNotification { get; set; }\r
+\r
+ //[Import]\r
+ public FileAgent FileAgent { get; private set; }\r
+\r
+/*\r
+ private WorkflowAgent _workflowAgent;\r
+\r
+ [Import]\r
+ public WorkflowAgent WorkflowAgent\r
+ {\r
+ get { return _workflowAgent; }\r
+ set\r
+ {\r
+ _workflowAgent = value;\r
+ //FileAgent.WorkflowAgent = value;\r
+ }\r
+ }\r
+*/\r
+ \r
+ [Import]\r
+ public NetworkAgent NetworkAgent { get; set; }\r
+\r
+ private PollAgent _pollAgent;\r
+\r
+ [Import]\r
+ public PollAgent PollAgent\r
+ {\r
+ get { return _pollAgent; }\r
+ set\r
+ {\r
+ _pollAgent = value;\r
+ FileAgent.PollAgent = value;\r
+ }\r
+ }\r
+\r
+ private Selectives _selectives;\r
+\r
+ [Import]\r
+ public Selectives Selectives\r
+ {\r
+ get { return _selectives; }\r
+ set\r
+ {\r
+ _selectives = value;\r
+ FileAgent.Selectives = value;\r
+ }\r
+ }\r
+\r
+ public string UserName { get; set; }\r
+ private string _apiKey;\r
+ public string ApiKey\r
+ {\r
+ get { return _apiKey; }\r
+ set\r
+ {\r
+ _apiKey = value;\r
+ if (_accountInfo != null)\r
+ _accountInfo.Token = value;\r
+ }\r
+ }\r
+\r
+ private AccountInfo _accountInfo;\r
+\r
+ public AccountInfo Account\r
+ {\r
+ get { return _accountInfo; }\r
+ }\r
+\r
+\r
+\r
+\r
+\r
+ public bool Pause { get; set; } \r
+ /*public bool Pause\r
+ {\r
+ get { return FileAgent.Pause; }\r
+ set\r
+ {\r
+ FileAgent.Pause = value;\r
+ }\r
+ }*/\r
+\r
+ private string _rootPath;\r
+ public string RootPath\r
+ {\r
+ get { return _rootPath; }\r
+ set \r
+ {\r
+ _rootPath = String.IsNullOrWhiteSpace(value) \r
+ ? String.Empty \r
+ : value.ToLower();\r
+ }\r
+ }\r
+\r
+\r
+ CancellationTokenSource _cancellationSource;\r
+\r
+ public PithosMonitor()\r
+ {\r
+ FileAgent = new FileAgent(); \r
+ }\r
+ private bool _started;\r
+\r
+ public void Start()\r
+ { \r
+ if (String.IsNullOrWhiteSpace(ApiKey))\r
+ throw new InvalidOperationException("The ApiKey is empty");\r
+ if (String.IsNullOrWhiteSpace(UserName))\r
+ throw new InvalidOperationException("The UserName is empty");\r
+ if (String.IsNullOrWhiteSpace(AuthenticationUrl))\r
+ throw new InvalidOperationException("The Authentication url is empty");\r
+ Contract.EndContractBlock();\r
+\r
+ //If the account doesn't have a valid path, don't start monitoring but don't throw either\r
+ if (String.IsNullOrWhiteSpace(RootPath))\r
+ //TODO; Warn user?\r
+ return;\r
+\r
+ //WorkflowAgent.StatusNotification = StatusNotification;\r
+\r
+ StatusNotification.NotifyChange("Starting");\r
+ if (_started)\r
+ {\r
+ if (!_cancellationSource.IsCancellationRequested)\r
+ return;\r
+ }\r
+ _cancellationSource = new CancellationTokenSource();\r
+\r
+ lock (this)\r
+ {\r
+ CloudClient = new CloudFilesClient(UserName, ApiKey)\r
+ {UsePithos = true, AuthenticationUrl = AuthenticationUrl};\r
+ _accountInfo = CloudClient.Authenticate();\r
+ }\r
+ _accountInfo.SiteUri = AuthenticationUrl;\r
+ _accountInfo.AccountPath = RootPath;\r
+\r
+\r
+ var pithosFolder = Path.Combine(RootPath, FolderConstants.PithosContainer);\r
+ if (!Directory.Exists(pithosFolder))\r
+ Directory.CreateDirectory(pithosFolder);\r
+ //Create the cache folder and ensure it is hidden\r
+ CreateHiddenFolder(RootPath, FolderConstants.CacheFolder);\r
+\r
+ var policy=CloudClient.GetAccountPolicies(_accountInfo);\r
+\r
+ StatusNotification.NotifyAccount(policy);\r
+ EnsurePithosContainers();\r
+ \r
+ StatusKeeper.BlockHash = _blockHash;\r
+ StatusKeeper.BlockSize = _blockSize;\r
+ \r
+ StatusKeeper.StartProcessing(_cancellationSource.Token);\r
+ CleanupUnselectedStates();\r
+ IndexLocalFiles();\r
+ //Extract the URIs from the string collection\r
+ var settings = Settings.Accounts.First(s => s.AccountKey == _accountInfo.AccountKey );\r
+ \r
+ var selectiveUrls=settings.SelectiveFolders.Cast<string>().Select(url => new Uri(url,UriKind.RelativeOrAbsolute))\r
+ .Where(uri=>uri.IsAbsoluteUri).ToArray();\r
+\r
+ SetSelectivePaths(selectiveUrls,null,null);\r
+ \r
+ StartWatcherAgent();\r
+\r
+ StartNetworkAgent();\r
+ \r
+ //WorkflowAgent.RestartInterruptedFiles(_accountInfo);\r
+ _started = true;\r
+ }\r
+\r
+ private void EnsurePithosContainers()\r
+ {\r
+\r
+ //Create the two default containers if they are missing\r
+ var pithosContainers = new List<string>{ FolderConstants.TrashContainer,FolderConstants.PithosContainer};\r
+ foreach (var container in pithosContainers)\r
+ { \r
+ var info=CloudClient.GetContainerInfo(UserName, container);\r
+ if (info == ContainerInfo.Empty)\r
+ {\r
+ CloudClient.CreateContainer(UserName, container);\r
+ info = CloudClient.GetContainerInfo(UserName, container);\r
+ }\r
+ _blockSize = info.BlockSize;\r
+ _blockHash = info.BlockHash;\r
+ _accountInfo.BlockSize = _blockSize;\r
+ _accountInfo.BlockHash = _blockHash;\r
+ }\r
+ }\r
+\r
+ public string AuthenticationUrl { get; set; }\r
+\r
+ private void IndexLocalFiles()\r
+ {\r
+ using (ThreadContext.Stacks["Operation"].Push("Indexing local files"))\r
+ {\r
+ \r
+ try\r
+ {\r
+ //StatusNotification.NotifyChange("Indexing Local Files");\r
+ Log.Info("Start local indexing");\r
+ StatusNotification.SetPithosStatus(PithosStatus.LocalSyncing,"Indexing Local Files"); \r
+\r
+ var cachePath = Path.Combine(RootPath, FolderConstants.CacheFolder);\r
+ var directory = new DirectoryInfo(RootPath);\r
+ var files =\r
+ from file in directory.EnumerateFiles("*", SearchOption.AllDirectories)\r
+ where !file.FullName.StartsWith(cachePath, StringComparison.InvariantCultureIgnoreCase) &&\r
+ !file.Extension.Equals("ignore", StringComparison.InvariantCultureIgnoreCase)\r
+ select file;\r
+ StatusKeeper.ProcessExistingFiles(files);\r
+\r
+ }\r
+ catch (Exception exc)\r
+ {\r
+ Log.Error("[ERROR]", exc);\r
+ }\r
+ finally\r
+ {\r
+ Log.Info("[END]");\r
+ }\r
+ StatusNotification.SetPithosStatus(PithosStatus.LocalComplete,"Indexing Completed");\r
+ }\r
+ }\r
+\r
+ \r
+ \r
+\r
+\r
+ /* private void StartWorkflowAgent()\r
+ {\r
+ WorkflowAgent.StatusNotification = StatusNotification;\r
+\r
+/* //On Vista and up we can check for a network connection\r
+ bool connected=Environment.OSVersion.Version.Major < 6 || NetworkListManager.IsConnectedToInternet;\r
+ //If we are not connected retry later\r
+ if (!connected)\r
+ {\r
+ Task.Factory.StartNewDelayed(10000, StartWorkflowAgent);\r
+ return;\r
+ }#1#\r
+\r
+ try\r
+ {\r
+ WorkflowAgent.Start(); \r
+ }\r
+ catch (Exception)\r
+ {\r
+ //Faild to authenticate due to network or account error\r
+ //Retry after a while\r
+ Task.Factory.StartNewDelayed(10000, StartWorkflowAgent);\r
+ }\r
+ }*/\r
+\r
+\r
+ private void StartNetworkAgent()\r
+ {\r
+ NetworkAgent.StatusNotification = StatusNotification;\r
+\r
+ //TODO: The Network and Poll agents are not account specific\r
+ //They should be moved outside PithosMonitor\r
+/*\r
+ NetworkAgent.Start();\r
+*/\r
+\r
+ PollAgent.AddAccount(_accountInfo);\r
+\r
+ PollAgent.StatusNotification = StatusNotification;\r
+\r
+ PollAgent.PollRemoteFiles();\r
+ }\r
+\r
+ //Make sure a hidden cache folder exists to store partial downloads\r
+ private static void CreateHiddenFolder(string rootPath, string folderName)\r
+ {\r
+ if (String.IsNullOrWhiteSpace(rootPath))\r
+ throw new ArgumentNullException("rootPath");\r
+ if (!Path.IsPathRooted(rootPath))\r
+ throw new ArgumentException("rootPath");\r
+ if (String.IsNullOrWhiteSpace(folderName))\r
+ throw new ArgumentNullException("folderName");\r
+ Contract.EndContractBlock();\r
+\r
+ var folder = Path.Combine(rootPath, folderName);\r
+ if (!Directory.Exists(folder))\r
+ {\r
+ var info = Directory.CreateDirectory(folder);\r
+ info.Attributes |= FileAttributes.Hidden;\r
+\r
+ Log.InfoFormat("Created cache Folder: {0}", folder);\r
+ }\r
+ else\r
+ {\r
+ var info = new DirectoryInfo(folder);\r
+ if ((info.Attributes & FileAttributes.Hidden) == 0)\r
+ {\r
+ info.Attributes |= FileAttributes.Hidden;\r
+ Log.InfoFormat("Reset cache folder to hidden: {0}", folder);\r
+ } \r
+ }\r
+ }\r
+\r
+ \r
+\r
+\r
+ private void StartWatcherAgent()\r
+ {\r
+ if (Log.IsDebugEnabled)\r
+ Log.DebugFormat("Start Folder Monitoring [{0}]",RootPath);\r
+\r
+ AgentLocator<FileAgent>.Register(FileAgent,RootPath);\r
+ \r
+ FileAgent.IdleTimeout = Settings.FileIdleTimeout;\r
+ FileAgent.StatusKeeper = StatusKeeper;\r
+ FileAgent.StatusNotification = StatusNotification;\r
+ FileAgent.Workflow = Workflow;\r
+ FileAgent.CachePath = Path.Combine(RootPath, FolderConstants.CacheFolder);\r
+ FileAgent.Start(_accountInfo, RootPath);\r
+ }\r
+\r
+ public void Stop()\r
+ {\r
+/*\r
+ AgentLocator<FileAgent>.Remove(RootPath);\r
+\r
+ if (FileAgent!=null)\r
+ FileAgent.Stop();\r
+ FileAgent = null;\r
+*/\r
+ }\r
+\r
+\r
+ ~PithosMonitor()\r
+ {\r
+ Dispose(false);\r
+ }\r
+\r
+ public void Dispose()\r
+ {\r
+ Dispose(true);\r
+ GC.SuppressFinalize(this);\r
+ }\r
+\r
+ protected virtual void Dispose(bool disposing)\r
+ {\r
+ if (disposing)\r
+ {\r
+ Stop();\r
+ }\r
+ }\r
+\r
+\r
+ public void MoveFileStates(string oldPath, string newPath)\r
+ {\r
+ if (String.IsNullOrWhiteSpace(oldPath))\r
+ throw new ArgumentNullException("oldPath");\r
+ if (!Path.IsPathRooted(oldPath))\r
+ throw new ArgumentException("oldPath must be an absolute path","oldPath");\r
+ if (string.IsNullOrWhiteSpace(newPath))\r
+ throw new ArgumentNullException("newPath");\r
+ if (!Path.IsPathRooted(newPath))\r
+ throw new ArgumentException("newPath must be an absolute path","newPath");\r
+ Contract.EndContractBlock();\r
+\r
+ StatusKeeper.ChangeRoots(oldPath, newPath);\r
+ }\r
+\r
+ private void CleanupUnselectedStates()\r
+ {\r
+ //var settings = Settings.Accounts.First(s => s.AccountKey == _accountInfo.AccountKey);\r
+ if (!Selectives.IsSelectiveEnabled(_accountInfo.AccountKey)) return;\r
+\r
+ List<string> selectivePaths;\r
+ if (Selectives.SelectivePaths.TryGetValue(_accountInfo.AccountKey, out selectivePaths))\r
+ {\r
+ var statePaths= FileState.Queryable.Select(state => state.FilePath).ToList();\r
+ var removedPaths = statePaths.Where(sp => !selectivePaths.Any(sp.IsAtOrBelow));\r
+\r
+ UnversionSelectivePaths(removedPaths.ToList());\r
+ }\r
+ else\r
+ {\r
+ StatusKeeper.ClearFolderStatus(Account.AccountPath);\r
+ }\r
+ }\r
+\r
+ public void SetSelectivePaths(Uri[] uris,Uri[] added, Uri[] removed)\r
+ {\r
+ //Convert the uris to paths\r
+ //var selectivePaths = UrisToFilePaths(uris);\r
+ \r
+ var selectiveUri = uris.ToList();\r
+ this.Selectives.SetSelectedUris(_accountInfo,selectiveUri);\r
+\r
+ var removedPaths = UrisToFilePaths(removed);\r
+ UnversionSelectivePaths(removedPaths);\r
+\r
+ }\r
+\r
+ /// <summary>\r
+ /// Mark all unselected paths as Unversioned\r
+ /// </summary>\r
+ /// <param name="removed"></param>\r
+ private void UnversionSelectivePaths(List<string> removed)\r
+ {\r
+ if (removed == null)\r
+ return;\r
+\r
+ //Ensure we remove any file state below the deleted folders\r
+ FileState.UnversionPaths(removed);\r
+ }\r
+\r
+\r
+ /// <summary>\r
+ /// Return a list of absolute filepaths from a list of Uris\r
+ /// </summary>\r
+ /// <param name="uris"></param>\r
+ /// <returns></returns>\r
+ public List<string> UrisToFilePaths(IEnumerable<Uri> uris)\r
+ {\r
+ if (uris == null)\r
+ return new List<string>();\r
+\r
+ var own = (from uri in uris\r
+ where uri.ToString().StartsWith(_accountInfo.StorageUri.ToString())\r
+ let relativePath = _accountInfo.StorageUri.MakeRelativeUri(uri).RelativeUriToFilePath()\r
+ //Trim the account name\r
+ select Path.Combine(RootPath, relativePath.After(_accountInfo.UserName + '\\'))).ToList();\r
+ var others= (from uri in uris\r
+ where !uri.ToString().StartsWith(_accountInfo.StorageUri.ToString())\r
+ let relativePath = _accountInfo.StorageUri.MakeRelativeUri(uri).RelativeUriToFilePath()\r
+ //Trim the account name\r
+ select Path.Combine(RootPath,"others-shared", relativePath)).ToList();\r
+ return own.Union(others).ToList(); \r
+ }\r
+\r
+\r
+ public ObjectInfo GetObjectInfo(string filePath)\r
+ {\r
+ if (String.IsNullOrWhiteSpace(filePath))\r
+ throw new ArgumentNullException("filePath");\r
+ Contract.EndContractBlock();\r
+\r
+ var file=new FileInfo(filePath);\r
+ string relativeUrl;//=file.AsRelativeUrlTo(this.RootPath);\r
+ var relativePath = file.AsRelativeTo(RootPath);\r
+ \r
+ string accountName,container;\r
+ \r
+ var parts=relativePath.Split('\\');\r
+\r
+ var accountInfo = _accountInfo;\r
+ if (relativePath.StartsWith(FolderConstants.OthersFolder))\r
+ { \r
+ accountName = parts[1];\r
+ container = parts[2];\r
+ relativeUrl = String.Join("/", parts.Splice(3));\r
+ //Create the root URL for the target account\r
+ var oldName = UserName;\r
+ var absoluteUri = _accountInfo.StorageUri.AbsoluteUri;\r
+ var nameIndex=absoluteUri.IndexOf(oldName, StringComparison.Ordinal);\r
+ var root=absoluteUri.Substring(0, nameIndex);\r
+\r
+ accountInfo = new AccountInfo\r
+ {\r
+ UserName = accountName,\r
+ AccountPath = Path.Combine(accountInfo.AccountPath, parts[0], parts[1]),\r
+ StorageUri = new Uri(root + accountName),\r
+ BlockHash=accountInfo.BlockHash,\r
+ BlockSize=accountInfo.BlockSize,\r
+ Token=accountInfo.Token\r
+ };\r
+ }\r
+ else\r
+ {\r
+ accountName = UserName;\r
+ container = parts[0];\r
+ relativeUrl = String.Join("/", parts.Splice(1));\r
+ }\r
+ \r
+ var client = new CloudFilesClient(accountInfo);\r
+ var objectInfo=client.GetObjectInfo(accountName, container, relativeUrl);\r
+ return objectInfo;\r
+ }\r
+ \r
+ public Task<ContainerInfo> GetContainerInfo(string filePath)\r
+ {\r
+ if (String.IsNullOrWhiteSpace(filePath))\r
+ throw new ArgumentNullException("filePath");\r
+ Contract.EndContractBlock();\r
+\r
+ var file=new FileInfo(filePath);\r
+ var relativePath = file.AsRelativeTo(RootPath);\r
+ \r
+ string accountName,container;\r
+ \r
+ var parts=relativePath.Split('\\');\r
+\r
+ var accountInfo = _accountInfo;\r
+ if (relativePath.StartsWith(FolderConstants.OthersFolder))\r
+ { \r
+ accountName = parts[1];\r
+ container = parts[2]; \r
+ //Create the root URL for the target account\r
+ var oldName = UserName;\r
+ var absoluteUri = _accountInfo.StorageUri.AbsoluteUri;\r
+ var nameIndex=absoluteUri.IndexOf(oldName, StringComparison.Ordinal);\r
+ var root=absoluteUri.Substring(0, nameIndex);\r
+\r
+ accountInfo = new AccountInfo\r
+ {\r
+ UserName = accountName,\r
+ AccountPath = Path.Combine(accountInfo.AccountPath, parts[0], parts[1]),\r
+ StorageUri = new Uri(root + accountName),\r
+ BlockHash=accountInfo.BlockHash,\r
+ BlockSize=accountInfo.BlockSize,\r
+ Token=accountInfo.Token\r
+ };\r
+ }\r
+ else\r
+ {\r
+ accountName = UserName;\r
+ container = parts[0]; \r
+ }\r
+\r
+ return Task.Factory.StartNew(() =>\r
+ {\r
+ var client = new CloudFilesClient(accountInfo);\r
+ var containerInfo = client.GetContainerInfo(accountName, container);\r
+ return containerInfo;\r
+ });\r
+ }\r
+ }\r
+}\r