using System.Text;
using System.Threading;
using System.Threading.Tasks;
+using Castle.ActiveRecord.Queries;
using Microsoft.WindowsAPICodePack.Net;
using Pithos.Interfaces;
using System.ServiceModel;
[Export(typeof(PithosMonitor))]
public class PithosMonitor:IDisposable
{
+ private const string PithosContainer = "pithos";
+ private const string TrashContainer = "trash";
+
[Import]
public IPithosSettings Settings{get;set;}
[Import]
public ICloudClient CloudListeningClient { get; set; }
+ public string UserName { get; set; }
+ public string ApiKey { get; set; }
+
private ServiceHost _statusService { get; set; }
private FileSystemWatcher _watcher;
}
}
+ public string RootPath { get; set; }
+
CancellationTokenSource _cancellationSource;
}
_cancellationSource = new CancellationTokenSource();
- string path = Settings.PithosPath;
var proxyUri = ProxyFromSettings();
CloudClient.Proxy = proxyUri;
- IndexLocalFiles(path);
- StartMonitoringFiles(path);
+ CloudClient.UsePithos = this.UsePithos;
+ EnsurePithosContainers();
+ StatusKeeper.StartProcessing(_cancellationSource.Token);
+ IndexLocalFiles(RootPath);
+ StartMonitoringFiles(RootPath);
StartStatusService();
StartNetwork();
}
+ private void EnsurePithosContainers()
+ {
+ CloudClient.UsePithos = this.UsePithos;
+ CloudClient.AuthenticationUrl = this.AuthenticationUrl;
+ CloudClient.Authenticate(UserName, ApiKey);
+
+ var pithosContainers = new[] {PithosContainer, TrashContainer};
+ foreach (var container in pithosContainers)
+ {
+ if (!CloudClient.ContainerExists(container))
+ CloudClient.CreateContainer(container);
+ }
+ }
+
+ public string AuthenticationUrl { get; set; }
+
private Uri ProxyFromSettings()
{
if (Settings.UseManualProxy)
select filePath;
StatusKeeper.StoreUnversionedFiles(files);
- var newFiles= FileState.FindAllByProperty("OverlayStatus", FileOverlayStatus.Unversioned)
- .Select(state=>state.FilePath);
- foreach (var newFile in newFiles)
- {
- _uploadEvents.Add(new WorkflowState
- {
- Path = newFile,
- FileName = Path.GetFileName(newFile),
- TriggeringChange = WatcherChangeTypes.Created
- });
- }
-
+ RestartInterruptedFiles();
}
catch (Exception exc)
{
}
}
+ private void RestartInterruptedFiles()
+ {
+ var interruptedStates = new[] { FileOverlayStatus.Unversioned, FileOverlayStatus.Modified };
+ var filesQuery = from state in FileState.Queryable
+ where interruptedStates.Contains(state.OverlayStatus)
+ select new WorkflowState
+ {
+ Path = state.FilePath.ToLower(),
+ FileName = Path.GetFileName(state.FilePath).ToLower(),
+ Status=state.OverlayStatus==FileOverlayStatus.Unversioned?
+ FileStatus.Created:
+ FileStatus.Modified,
+ TriggeringChange = state.OverlayStatus==FileOverlayStatus.Unversioned?
+ WatcherChangeTypes.Created:
+ WatcherChangeTypes.Changed
+ };
+ _uploadEvents.AddFromEnumerable(filesQuery,false);
+
+ }
+
private void StartStatusService()
{
// Create a ServiceHost for the CalculatorService type and provide the base address.
try
{
-
- CloudClient.Authenticate(Settings.UserName, Settings.ApiKey);
+ CloudClient.UsePithos = this.UsePithos;
+ CloudClient.AuthenticationUrl = this.AuthenticationUrl;
+ CloudClient.Authenticate(UserName, ApiKey);
- StartListening(Settings.PithosPath);
+ StartListening(RootPath);
StartSending();
}
catch (Exception)
}
}
+ public bool UsePithos { get; set; }
+
internal enum CloudActionType
{
Upload=0,
}
}
- private BlockingCollection<ListenerAction> _listenerActions=new BlockingCollection<ListenerAction>();
+ private BlockingCollection<ListenerAction> _networkActions=new BlockingCollection<ListenerAction>();
private Timer timer;
{
Trace.TraceInformation("[LISTENER] Scheduled");
return Task.Factory.StartNewDelayed(10000)
- .ContinueWith(t=>CloudClient.ListObjects("PITHOS"))
+ .ContinueWith(t=>CloudClient.ListObjects(PithosContainer))
.ContinueWith(task =>
{
Trace.TraceInformation("[LISTENER] Start Processing");
var pithosDir = new DirectoryInfo(accountPath);
var remoteFiles = from info in remoteObjects
- select info.Name;
+ select info.Name.ToLower();
var onlyLocal = from localFile in pithosDir.EnumerateFiles()
- where !remoteFiles.Contains(localFile.Name)
- select new ListenerAction(CloudActionType.UploadUnconditional, localFile,null);
-
-
-
+ where !remoteFiles.Contains(localFile.Name.ToLower())
+ select new ListenerAction(CloudActionType.UploadUnconditional, localFile,ObjectInfo.Empty);
+
+
+
+ var localNames = from info in pithosDir.EnumerateFiles()
+ select info.Name.ToLower();
- var localNames =pithosDir.EnumerateFiles().Select(info => info.Name);
var onlyRemote = from upFile in remoteObjects
- where !localNames.Contains(upFile.Name)
- select new ListenerAction(CloudActionType.DownloadUnconditional,null,upFile);
+ where !localNames.Contains(upFile.Name.ToLower())
+ select new ListenerAction(CloudActionType.DownloadUnconditional,new FileInfo(""), upFile);
var commonObjects = from upFile in remoteObjects
join localFile in pithosDir.EnumerateFiles()
- on upFile.Name equals localFile.Name
+ on upFile.Name.ToLower() equals localFile.Name.ToLower()
select new ListenerAction(CloudActionType.Download, localFile, upFile);
var uniques =
onlyLocal.Union(onlyRemote).Union(commonObjects)
- .Except(_listenerActions,new LocalFileComparer());
+ .Except(_networkActions,new LocalFileComparer());
- _listenerActions.AddFromEnumerable(uniques, false);
+ _networkActions.AddFromEnumerable(uniques, false);
Trace.TraceInformation("[LISTENER] End Processing");
private void ProcessListenerActions()
{
- foreach(var action in _listenerActions.GetConsumingEnumerable())
+ foreach(var action in _networkActions.GetConsumingEnumerable())
{
- Trace.TraceInformation("[ACTION] Start Processing {0}:{1}->{2}",action.Action,action.LocalFile,action.CloudFile);
+ Trace.TraceInformation("[ACTION] Start Processing {0}:{1}->{2}",action.Action,action.LocalFile,action.CloudFile.Name);
var localFile = action.LocalFile;
var cloudFile = action.CloudFile;
var downloadPath = (cloudFile == null)? String.Empty
- : Path.Combine(Settings.PithosPath,cloudFile.Name);
+ : Path.Combine(RootPath,cloudFile.Name);
try
{
switch (action.Action)
{
case CloudActionType.UploadUnconditional:
-
UploadCloudFile(localFile.Name,localFile.Length,localFile.FullName,action.LocalHash.Value);
break;
case CloudActionType.DownloadUnconditional:
- DownloadCloudFile("PITHOS",cloudFile.Name,downloadPath);
+ DownloadCloudFile(PithosContainer,cloudFile.Name,downloadPath);
break;
case CloudActionType.Download:
if (File.Exists(downloadPath))
StatusKeeper.SetFileOverlayStatus(downloadPath,FileOverlayStatus.Conflict);
}
else
- DownloadCloudFile("PITHOS",action.CloudFile.Name,downloadPath);
+ DownloadCloudFile(PithosContainer,action.CloudFile.Name,downloadPath);
}
}
else
- DownloadCloudFile("PITHOS",action.CloudFile.Name,downloadPath);
+ DownloadCloudFile(PithosContainer,action.CloudFile.Name,downloadPath);
break;
}
- Trace.TraceInformation("[ACTION] End Processing {0}:{1}->{2}", action.Action, action.LocalFile, action.CloudFile);
+ Trace.TraceInformation("[ACTION] End Processing {0}:{1}->{2}", action.Action, action.LocalFile, action.CloudFile.Name);
}
catch (Exception exc)
{
Trace.TraceError("[REQUEUE] {0} : {1} -> {2} due to exception\r\n{3}",
- action.Action, action.LocalFile,action.CloudFile,exc);
- Trace.TraceError(exc.ToString());
+ action.Action, action.LocalFile,action.CloudFile,exc);
- _listenerActions.Add(action);
+ _networkActions.Add(action);
}
}
}
- private void DownloadCloudFile(string container, string fileName, string localPath)
- {
- using (var upstream = CloudClient.GetObject(container, fileName))
- using (var fileStream = File.OpenWrite(localPath))
- {
- upstream.CopyTo(fileStream);
- }
- }
+
private void StartMonitoringFiles(string path)
{
_watcher.Renamed += OnRenameEvent;
_watcher.EnableRaisingEvents = true;
- Task.Factory.StartNew(() =>
- {
- foreach (var state in _fileEvents.GetConsumingEnumerable())
- {
- try
- {
- UpdateFileStatus(state);
- UpdateOverlayStatus(state);
- UpdateFileChecksum(state);
- _uploadEvents.Add(state);
- }
- catch (OperationCanceledException)
- {
- throw;
- }
- catch(Exception ex)
- {}
- }
-
- },_cancellationSource.Token);
+ Task.Factory.StartNew(ProcesFileEvents,_cancellationSource.Token);
+ }
+
+ private void ProcesFileEvents()
+ {
+ foreach (var state in _fileEvents.GetConsumingEnumerable())
+ {
+ try
+ {
+ var networkState=StatusKeeper.GetNetworkState(state.Path);
+ //Skip if the file is already being downloaded or uploaded and
+ //the change is create or modify
+ if (networkState != NetworkState.None &&
+ (
+ state.TriggeringChange==WatcherChangeTypes.Created ||
+ state.TriggeringChange==WatcherChangeTypes.Changed
+ ))
+ continue;
+ UpdateFileStatus(state);
+ UpdateOverlayStatus(state);
+ UpdateFileChecksum(state);
+ _uploadEvents.Add(state);
+ }
+ catch (OperationCanceledException exc)
+ {
+ Trace.TraceError("[ERROR] File Event Processing:\r{0}", exc);
+ throw;
+ }
+ catch (Exception exc)
+ {
+ Trace.TraceError("[ERROR] File Event Processing:\r{0}",exc);
+ }
+ }
}
private void StartSending()
throw;
}
catch(Exception ex)
- {}
+ {
+ Trace.TraceError("[ERROR] Synch for {0}:\r{1}",state.FileName,ex);
+ }
}
},_cancellationSource.Token);
{
if (state.Skip)
return state;
- string path = state.Path;
+ string path = state.Path.ToLower();
string fileName = Path.GetFileName(path);
+ //Bypass deleted files, unless the status is Deleted
+ if (!(File.Exists(path) || state.Status == FileStatus.Deleted))
+ {
+ state.Skip = true;
+ this.StatusKeeper.RemoveFileOverlayStatus(path);
+ return state;
+ }
+
switch(state.Status)
{
case FileStatus.Created:
private void RenameCloudFile(WorkflowState state)
{
- this.StatusKeeper.SetFileOverlayStatus(state.Path, FileOverlayStatus.Synch);
+ this.StatusKeeper.SetFileOverlayStatus(state.Path, FileOverlayStatus.Modified);
- CloudClient.MoveObject("PITHOS", state.OldFileName, state.FileName);
+ CloudClient.MoveObject(PithosContainer, state.OldFileName,PithosContainer, state.FileName);
this.StatusKeeper.SetFileStatus(state.Path, FileStatus.Unchanged);
this.StatusKeeper.SetFileOverlayStatus(state.Path, FileOverlayStatus.Normal);
{
Contract.Requires(!Path.IsPathRooted(fileName));
- this.StatusKeeper.SetFileOverlayStatus(fileName, FileOverlayStatus.Synch);
- CloudClient.DeleteObject("PITHOS", fileName);
+ this.StatusKeeper.SetFileOverlayStatus(fileName, FileOverlayStatus.Modified);
+
+ CloudClient.MoveObject(PithosContainer,fileName,TrashContainer,fileName);
this.StatusKeeper.ClearFileStatus(fileName);
this.StatusKeeper.RemoveFileOverlayStatus(fileName);
}
+ private void DownloadCloudFile(string container, string fileName, string localPath)
+ {
+ var state = StatusKeeper.GetNetworkState(fileName);
+ //Abort if the file is already being uploaded or downloaded
+ if (state != NetworkState.None)
+ return;
+
+ StatusKeeper.SetNetworkState(localPath,NetworkState.Downloading);
+ CloudClient.GetObject(container, fileName, localPath)
+ .ContinueWith(t=>
+ CloudClient.GetObjectInfo(container,fileName))
+ .ContinueWith(t=>
+ StatusKeeper.StoreInfo(fileName,t.Result))
+ .ContinueWith(t=>
+ StatusKeeper.SetNetworkState(localPath,NetworkState.None))
+ .Wait();
+ }
+
private void UploadCloudFile(string fileName, long fileSize, string path,string hash)
{
Contract.Requires(!Path.IsPathRooted(fileName));
- //Even if GetObjectInfo times out, we can proceed with the upload
- var info=CloudClient.GetObjectInfo("PITHOS", fileName);
- if ( hash != info.Hash)
- {
- this.StatusKeeper.SetFileOverlayStatus(path, FileOverlayStatus.Synch);
+ var state=StatusKeeper.GetNetworkState(fileName);
+ //Abort if the file is already being uploaded or downloaded
+ if (state != NetworkState.None)
+ return;
- CloudClient.PutObject("PITHOS", fileName, path, hash).Wait();
- }
- this.StatusKeeper.SetFileStatus(path,FileStatus.Unchanged);
- this.StatusKeeper.SetFileOverlayStatus(path,FileOverlayStatus.Normal);
+ StatusKeeper.SetNetworkState(fileName,NetworkState.Uploading);
+
+ //Even if GetObjectInfo times out, we can proceed with the upload
+ var info = CloudClient.GetObjectInfo(PithosContainer, fileName);
+ Task.Factory.StartNew(() =>
+ {
+ if (hash != info.Hash)
+ {
+ Task.Factory.StartNew(() =>
+ this.StatusKeeper.SetFileOverlayStatus(path, FileOverlayStatus.Modified))
+ .ContinueWith(t =>
+ CloudClient.PutObject(PithosContainer, fileName, path, hash));
+ }
+ else
+ {
+ this.StatusKeeper.StoreInfo(path,info);
+ }
+ })
+ .ContinueWith(t =>
+ this.StatusKeeper.SetFileState(path, FileStatus.Unchanged, FileOverlayStatus.Normal))
+ .ContinueWith(t=>
+ this.StatusKeeper.SetNetworkState(path,NetworkState.None))
+ .Wait();
Workflow.RaiseChangeNotification(path);
}