Log.InfoFormat("[RETRIEVE] Listening at {0}", listenerUrl);
listener.Start();
+
+ var startListening = Task.Factory.FromAsync<HttpListenerContext>(listener.BeginGetContext, listener.EndGetContext, null)
+ .WithTimeout(TimeSpan.FromMinutes(5));
- var task = Task.Factory.FromAsync<HttpListenerContext>(listener.BeginGetContext, listener.EndGetContext, null)
- .WithTimeout(TimeSpan.FromMinutes(1))
- .ContinueWith(tc =>
+ var receiveCredentials=startListening.ContinueWith(tc =>
{
try
{
Log.InfoFormat("[RETRIEVE] Open Browser at {0}", retrieveUri);
Process.Start(retrieveUri.ToString());
- return task;
+ return receiveCredentials;
}
private static void Respond(HttpListenerContext context)
using System.Collections.Concurrent;
using System.ComponentModel.Composition;
using System.Diagnostics;
+using System.Diagnostics.Contracts;
using System.IO;
+using System.Net;
using System.Runtime.InteropServices;
using System.ServiceModel;
using System.ServiceModel.Description;
_statusChecker = statusChecker;
_events = events;
Settings = settings;
-
-
-
+
UsageMessage = "Using 15% of 50 GB";
StatusMessage = "In Synch";
if (_monitors.TryGetValue(accountName,out monitor))
{
//If the account is active
- if (account.IsActive)
+ if (account.IsActive)
//Start the monitor. It's OK to start an already started monitor,
//it will just ignore the call
monitor.Start();
- private Task StartMonitor(PithosMonitor monitor)
+ private Task StartMonitor(PithosMonitor monitor,int retries=0)
{
return Task.Factory.StartNew(() =>
{
try
{
Log.InfoFormat("Start Monitoring {0}", monitor.UserName);
+
monitor.Start();
}
+ catch (WebException exc)
+ {
+ if (AbandonRetry(monitor, retries))
+ return;
+
+ if (IsUnauthorized(exc))
+ {
+ var message = String.Format("API Key Expired for {0}. Starting Renewal",monitor.UserName);
+ Log.Error(message,exc);
+ TryAuthorize(monitor,retries);
+ }
+ else
+ {
+ TryLater(monitor, exc,retries);
+ }
+ }
catch (Exception exc)
{
- var message =
- String.Format("An exception occured. Can't start monitoring\nWill retry in 10 seconds");
- Task.Factory.StartNewDelayed(10000, () => StartMonitor(monitor));
- _events.Publish(new Notification {Title = "Error", Message = message, Level = TraceLevel.Error});
- Log.Error(message, exc);
+ if (AbandonRetry(monitor, retries))
+ return;
+
+ TryLater(monitor,exc,retries);
}
}
});
}
+ private bool AbandonRetry(PithosMonitor monitor, int retries)
+ {
+ if (retries > 1)
+ {
+ var message = String.Format("Monitoring of account {0} has failed too many times. Will not retry",
+ monitor.UserName);
+ _events.Publish(new Notification
+ {Title = "Account monitoring failed", Message = message, Level = TraceLevel.Error});
+ return true;
+ }
+ return false;
+ }
+
+
+ private Task TryAuthorize(PithosMonitor monitor,int retries)
+ {
+ _events.Publish(new Notification { Title = "Authorization failed", Message = "Your API Key has probably expired. You will be directed to a page where you can renew it", Level = TraceLevel.Error });
+
+ var authorize= PithosAccount.RetrieveCredentialsAsync(Settings.PithosSite);
+
+ return authorize.ContinueWith(t =>
+ {
+ if (t.IsFaulted)
+ {
+ string message = String.Format("API Key retrieval for {0} failed", monitor.UserName);
+ Log.Error(message,t.Exception.InnerException);
+ _events.Publish(new Notification { Title = "Authorization failed", Message = message, Level = TraceLevel.Error });
+ return;
+ }
+ var credentials = t.Result;
+ var account =Settings.Accounts.FirstOrDefault(act => act.AccountName == credentials.UserName);
+ account.ApiKey = credentials.Password;
+ monitor.ApiKey = credentials.Password;
+ Task.Factory.StartNewDelayed(10000, () => StartMonitor(monitor,retries+1));
+ });
+ }
+
+ private static bool IsUnauthorized(WebException exc)
+ {
+ if (exc==null)
+ throw new ArgumentNullException("exc");
+ Contract.EndContractBlock();
+
+ var response = exc.Response as HttpWebResponse;
+ if (response == null)
+ return false;
+ return (response.StatusCode == HttpStatusCode.Unauthorized);
+ }
+
+ private void TryLater(PithosMonitor monitor, Exception exc,int retries)
+ {
+ var message = String.Format("An exception occured. Can't start monitoring\nWill retry in 10 seconds");
+ Task.Factory.StartNewDelayed(10000, () => StartMonitor(monitor,retries+1));
+ _events.Publish(new Notification
+ {Title = "Error", Message = message, Level = TraceLevel.Error});
+ Log.Error(message, exc);
+ }
+
public void NotifyChange(string status, TraceLevel level=TraceLevel.Info)
{
File.Delete(filePath);
var newHash = client.GetHashMap(null, FolderConstants.PithosContainer, fileName).Result;
- agent.DownloadWithBlocks(client, account, FolderConstants.PithosContainer, new Uri(fileName, UriKind.Relative), filePath, newHash)
+ agent.DownloadWithBlocks(accountInfo, client, account, FolderConstants.PithosContainer, new Uri(fileName, UriKind.Relative), filePath, newHash)
.Wait();
Assert.IsTrue(File.Exists(filePath));
--- /dev/null
+using System;
+using System.Collections.Concurrent;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+
+namespace Pithos.Core.Agents
+{
+ static class AgentLocator<T> where T:class
+ {
+ static ConcurrentDictionary<string, WeakReference> _agents = new ConcurrentDictionary<string, WeakReference>();
+ public static void Register(T agent,string key)
+ {
+ _agents[key] = new WeakReference(agent);
+ }
+
+ public static T Get(string key)
+ {
+ return _agents[key].Target as T;
+ }
+
+ public static bool TryGet(string key, out T value)
+ {
+ WeakReference target;
+ var exists = _agents.TryGetValue(key, out target);
+ value = target.Target as T;
+ return exists;
+ }
+
+ public static void Remove(string key)
+ {
+ WeakReference target;
+ _agents.TryRemove(key, out target);
+ }
+ }
+}
namespace Pithos.Core.Agents
{
- [Export,PartCreationPolicy(CreationPolicy.NonShared)]
+ [Export]
public class FileAgent
{
Agent<WorkflowState> _agent;
public IStatusKeeper StatusKeeper { get; set; }
public IStatusNotification StatusNotification { get; set; }
+/*
[Import]
public FileAgent FileAgent {get;set;}
+*/
/* public int BlockSize { get; set; }
public string BlockHash { get; set; }*/
select ProcessAccountFiles(accountInfo, since);
var process=Task.Factory.Iterate(tasks);
- return process.ContinueWith(t=>
- ProcessRemoteFiles(nextSince));
+ return process.ContinueWith(t =>
+ {
+ if (t.IsFaulted)
+ {
+ Log.Error("Error while processing accounts");
+ t.Exception.Handle(exc=>
+ {
+ Log.Error("Details:", exc);
+ return true;
+ });
+ }
+ ProcessRemoteFiles(nextSince);
+ });
}
});
}
if (remote==null)
throw new ArgumentNullException();
Contract.EndContractBlock();
+ var fileAgent = GetFileAgent(accountInfo);
//In order to avoid multiple iterations over the files, we iterate only once
//over the remote files
{
var relativePath = objectInfo.RelativeUrlToFilePath(accountInfo.UserName);
//and remove any matching objects from the list, adding them to the commonObjects list
- if (FileAgent.Exists(relativePath))
+
+ if (fileAgent.Exists(relativePath))
{
- var localFile = FileAgent.GetFileInfo(relativePath);
+ var localFile = fileAgent.GetFileInfo(relativePath);
var state = FileState.FindByFilePath(localFile.FullName);
//Common files should be checked on a per-case basis to detect differences, which is newer
}
}
+ private static FileAgent GetFileAgent(AccountInfo accountInfo)
+ {
+ return AgentLocator<FileAgent>.Get(accountInfo.AccountPath);
+ }
+
private void ProcessDeletedFiles(AccountInfo accountInfo,IEnumerable<ObjectInfo> trashObjects)
{
+ var fileAgent = GetFileAgent(accountInfo);
foreach (var trashObject in trashObjects)
{
var relativePath = trashObject.RelativeUrlToFilePath(accountInfo.UserName);
//and remove any matching objects from the list, adding them to the commonObjects list
- FileAgent.Delete(relativePath);
+ fileAgent.Delete(relativePath);
}
}
if (Path.IsPathRooted(fileName))
throw new ArgumentException("The fileName should not be rooted","fileName");
Contract.EndContractBlock();
+
+ var fileAgent = GetFileAgent(accountInfo);
using ( log4net.ThreadContext.Stacks["DeleteCloudFile"].Push("Delete"))
{
- var info = FileAgent.GetFileInfo(fileName);
+ var info = fileAgent.GetFileInfo(fileName);
var fullPath = info.FullName.ToLower();
this.StatusKeeper.SetFileOverlayStatus(fullPath, FileOverlayStatus.Modified);
//If it's a small file
var downloadTask=(serverHash.Hashes.Count == 1 )
//Download it in one go
- ? DownloadEntireFile(client, account, container, relativeUrl, localPath)
+ ? DownloadEntireFile(accountInfo,client, account, container, relativeUrl, localPath)
//Otherwise download it block by block
- : DownloadWithBlocks(client, account, container, relativeUrl, localPath, serverHash);
+ : DownloadWithBlocks(accountInfo,client, account, container, relativeUrl, localPath, serverHash);
yield return downloadTask;
}
//Download a small file with a single GET operation
- private Task DownloadEntireFile(CloudFilesClient client, string account, string container, Uri relativeUrl, string localPath)
+ private Task DownloadEntireFile(AccountInfo accountInfo, CloudFilesClient client, string account, string container, Uri relativeUrl, string localPath)
{
if (client == null)
throw new ArgumentNullException("client");
throw new ArgumentException("The localPath must be rooted", "localPath");
Contract.EndContractBlock();
+ var fileAgent = GetFileAgent(accountInfo);
//Calculate the relative file path for the new file
var relativePath = relativeUrl.RelativeUriToFilePath();
//The file will be stored in a temporary location while downloading with an extension .download
- var tempPath = Path.Combine(FileAgent.FragmentsPath, relativePath + ".download");
+ var tempPath = Path.Combine(fileAgent.FragmentsPath, relativePath + ".download");
//Make sure the target folder exists. DownloadFileTask will not create the folder
var directoryPath = Path.GetDirectoryName(tempPath);
if (!Directory.Exists(directoryPath))
}
//Download a file asynchronously using blocks
- public Task DownloadWithBlocks(CloudFilesClient client, string account, string container, Uri relativeUrl, string localPath, TreeHash serverHash)
+ public Task DownloadWithBlocks(AccountInfo accountInfo, CloudFilesClient client, string account, string container, Uri relativeUrl, string localPath, TreeHash serverHash)
{
if (client == null)
throw new ArgumentNullException("client");
throw new ArgumentNullException("serverHash");
Contract.EndContractBlock();
- return Task.Factory.Iterate(BlockDownloadIterator(client,account,container, relativeUrl, localPath, serverHash));
+ return Task.Factory.Iterate(BlockDownloadIterator(accountInfo,client,account,container, relativeUrl, localPath, serverHash));
}
- private IEnumerable<Task> BlockDownloadIterator(CloudFilesClient client, string account, string container, Uri relativeUrl, string localPath, TreeHash serverHash)
+ private IEnumerable<Task> BlockDownloadIterator(AccountInfo accountInfo,CloudFilesClient client, string account, string container, Uri relativeUrl, string localPath, TreeHash serverHash)
{
if (client == null)
throw new ArgumentNullException("client");
if(serverHash==null)
throw new ArgumentNullException("serverHash");
Contract.EndContractBlock();
-
+
+ var fileAgent = GetFileAgent(accountInfo);
//Calculate the relative file path for the new file
var relativePath = relativeUrl.RelativeUriToFilePath();
- var blockUpdater = new BlockUpdater(FileAgent.FragmentsPath, localPath, relativePath, serverHash);
+ var blockUpdater = new BlockUpdater(fileAgent.FragmentsPath, localPath, relativePath, serverHash);
</ItemGroup>
<ItemGroup>
<Compile Include="Agents\Agent.cs" />
+ <Compile Include="Agents\AgentLocator.cs" />
<Compile Include="Agents\BlockUpdater.cs" />
<Compile Include="Agents\CloudTransferAction.cs" />
<Compile Include="Agents\FileAgent.cs" />
public WorkflowAgent WorkflowAgent { get; set; }
[Import]
- public NetworkAgent NetworkAgent { get; set; }
-
+ public NetworkAgent NetworkAgent { get; set; }
public string UserName { get; set; }
public string ApiKey { get; set; }
}
}
- public string RootPath { get; set; }
+ private string _rootPath;
+ public string RootPath
+ {
+ get { return _rootPath; }
+ set
+ {
+ _rootPath = value.ToLower();
+ }
+ }
CancellationTokenSource _cancellationSource;
StatusKeeper.BlockSize = _blockSize;
StatusKeeper.StartProcessing(_cancellationSource.Token);
- IndexLocalFiles(RootPath);
- StartWatcherAgent(RootPath);
+ IndexLocalFiles();
+ StartWatcherAgent();
StartNetworkAgent();
return null;
}
- private void IndexLocalFiles(string path)
+ private void IndexLocalFiles()
{
StatusNotification.NotifyChange("Indexing Local Files",TraceLevel.Info);
using (log4net.ThreadContext.Stacks["Monitor"].Push("Indexing local files"))
try
{
var fragmentsPath = Path.Combine(RootPath, FolderConstants.FragmentsFolder);
- var directory = new DirectoryInfo(path);
+ var directory = new DirectoryInfo(RootPath);
var files =
from file in directory.EnumerateFiles("*", SearchOption.AllDirectories)
where !file.FullName.StartsWith(fragmentsPath, StringComparison.InvariantCultureIgnoreCase) &&
NetworkAgent.AddAccount(_accountInfo);
NetworkAgent.StatusNotification = StatusNotification;
-
-
+
NetworkAgent.Start();
NetworkAgent.ProcessRemoteFiles();
- private void StartWatcherAgent(string path)
+ private void StartWatcherAgent()
{
+ AgentLocator<FileAgent>.Register(FileAgent,RootPath);
+
FileAgent.StatusKeeper = StatusKeeper;
FileAgent.Workflow = Workflow;
FileAgent.FragmentsPath = Path.Combine(RootPath, FolderConstants.FragmentsFolder);
- FileAgent.Start(_accountInfo,path);
+ FileAgent.Start(_accountInfo, RootPath);
}
public void Stop()
- {
+ {
+ AgentLocator<FileAgent>.Remove(RootPath);
+
if (FileAgent!=null)
FileAgent.Stop();
FileAgent = null;
{
Log.ErrorFormat("[{0}] FAILED for {1} with \n{2}", method, address, exc);
}
- throw;
+ throw exc;
}
catch(Exception ex)