when an agent stops (e.g. when a FileAgent stops because an account is removed).
Cleanup of several related files
Fixes #1785
var accountInfo=_accounts.FirstOrDefault(account => account.UserName == accountName);
_accounts.TryRemove(accountInfo);
+ _pollAgent.RemoveAccount(accountInfo);
PithosMonitor monitor;
if (Monitors.TryRemove(accountName, out monitor))
{
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics.Contracts;
-using System.Linq;
-using System.Text;
using System.Threading;
+using System.Threading.Async;
using System.Threading.Tasks;
using Pithos.Core.Agents;
public class Agent<TMessage> : IDisposable
{
private readonly ConcurrentQueue<TMessage> _queue;
- private readonly BlockingCollection<TMessage> _messages;
+ private readonly AsyncProducerConsumerCollection<TMessage> _messages;
private readonly CancellationTokenSource _cancelSource = new CancellationTokenSource();
public CancellationToken CancellationToken;
public Agent(Action<Agent<TMessage>> action)
{
_queue=new ConcurrentQueue<TMessage>();
- _messages = new BlockingCollection<TMessage>(_queue);
+ _messages = new AsyncProducerConsumerCollection<TMessage>(_queue);
_process = action;
CancellationToken = _cancelSource.Token;
}
/// <summary>
/// Receives a message asynchronously, optionally with a timeout. Receive throws a TimeoutException if the timeout expires
/// </summary>
- /// <param name="timeout">Optional timeout in milliseconds. If provided, Receive fails with a TimeoutException if no message is available in the specified time</param>
/// <returns>A Task that will return the message asynchronously</returns>
- public Task<TMessage> Receive(int timeout = -1)
+ public Task<TMessage> Receive()
{
- return Task<TMessage>.Factory.StartNew(() =>
- {
- TMessage item;
- if (!_messages.TryTake(out item, timeout, CancellationToken))
- throw new TimeoutException();
- return item;
- });
- }
-
-
- /// <summary>
- /// Receives a message asynchronously, optionally with a timeout. TryReceive returns an empty task if the timeout expires
- /// </summary>
- /// <param name="timeout">Optional timeout in milliseconds. If provided, Receive returns an empty task</param>
- /// <returns>A Task that will return the message asynchronously</returns>
- public Task<TMessage> TryReceive(int timeout = -1)
- {
- return Task<TMessage>.Factory.StartNew(() =>
- {
- TMessage item;
- _messages.TryTake(out item, timeout, CancellationToken);
- return item;
- });
+ return _messages.Take();
}
public void Stop()
{
//Stop the message queue
- _messages.CompleteAdding();
+ //_messages.CompleteAdding();
//And signal the cancellation
_cancelSource.Cancel();
}
public IEnumerable<TMessage> GetEnumerable()
{
- return _messages;
+ return _queue;
}
/// <summary>
public Lazy<string> LocalHash { get; protected set; }
- private Lazy<string> _topHash;
- public Lazy<string> TopHash
- {
- get { return _topHash; }
- set { _topHash = value; }
- }
+ public Lazy<string> TopHash { get; set; }
[ContractInvariantMethod]
public override string ToString()
{
- return String.Format("{0}:{1}->{2}", this.Action, this.LocalFile.FullName, this.CloudFile.Name);
+ return String.Format("{0}:{1}->{2}", Action, LocalFile.FullName, CloudFile.Name);
}
protected static ObjectInfo CreateObjectInfoFor(AccountInfo accountInfo, FileSystemInfo fileInfo)
public override string ToString()
{
- return String.Format("{0}: _ ->{1}", this.Action, this.CloudFile.Name);
+ return String.Format("{0}: _ ->{1}", Action, CloudFile.Name);
}
}
public override string ToString()
{
- return String.Format("{0}:{1}->{2}", this.Action, OldCloudFile.Name, CloudFile.Name);
+ return String.Format("{0}:{1}->{2}", Action, OldCloudFile.Name, CloudFile.Name);
}
}
*/
#endregion
-//TODO: Now there is a UUID tag. This can be used for renames/moves
-
-
using System;
-using System.Collections.Concurrent;
using System.Collections.Generic;
using System.ComponentModel.Composition;
using System.Diagnostics;
using System.Diagnostics.Contracts;
using System.IO;
-using System.Linq;
using System.Net;
using System.Threading;
using System.Threading.Tasks;
-using System.Threading.Tasks.Dataflow;
using Castle.ActiveRecord;
using Pithos.Interfaces;
using Pithos.Network;
namespace Pithos.Core.Agents
{
- //TODO: Ensure all network operations use exact casing. Pithos is case sensitive
[Export]
public class NetworkAgent
{
private static readonly ILog Log = LogManager.GetLogger("NetworkAgent");
- private readonly ConcurrentBag<AccountInfo> _accounts = new ConcurrentBag<AccountInfo>();
-
[System.ComponentModel.Composition.Import]
public IPithosSettings Settings { get; set; }
{
//Clear the status of already deleted files to avoid reprocessing
if (action.LocalFile != null)
- this.StatusKeeper.ClearFileStatus(action.LocalFile.FullName);
+ StatusKeeper.ClearFileStatus(action.LocalFile.FullName);
}
else
{
var accountName = parts[1];
var oldName = accountInfo.UserName;
var absoluteUri = accountInfo.StorageUri.AbsoluteUri;
- var nameIndex = absoluteUri.IndexOf(oldName);
+ var nameIndex = absoluteUri.IndexOf(oldName, StringComparison.Ordinal);
var root = absoluteUri.Substring(0, nameIndex);
accountInfo = new AccountInfo
}
- public void AddAccount(AccountInfo accountInfo)
- {
- if (!_accounts.Contains(accountInfo))
- _accounts.Add(accountInfo);
- }
}
using System.IO;\r
using System.Threading;\r
using System.Threading.Tasks;\r
-using System.Threading.Tasks.Dataflow;\r
using Castle.ActiveRecord;\r
using Pithos.Interfaces;\r
using Pithos.Network;\r
using System;\r
using System.Collections.Generic;\r
using System.Linq;\r
- using System.Text;\r
\r
/// <summary>\r
/// PollAgent periodically polls the server to detect object changes. The agent retrieves a listing of all\r
//The Sync Event signals a manual synchronisation\r
private readonly AsyncManualResetEvent _syncEvent = new AsyncManualResetEvent();\r
\r
- private ConcurrentDictionary<string, DateTime> _lastSeen = new ConcurrentDictionary<string, DateTime>();\r
- private readonly ConcurrentBag<AccountInfo> _accounts = new ConcurrentBag<AccountInfo>();\r
+ private readonly ConcurrentDictionary<string, DateTime> _lastSeen = new ConcurrentDictionary<string, DateTime>();\r
+ private readonly ConcurrentDictionary<string, AccountInfo> _accounts = new ConcurrentDictionary<string,AccountInfo>();\r
\r
\r
/// <summary>\r
UpdateStatus(PithosStatus.Syncing);\r
StatusNotification.Notify(new PollNotification());\r
\r
- using (log4net.ThreadContext.Stacks["Retrieve Remote"].Push("All accounts"))\r
+ using (ThreadContext.Stacks["Retrieve Remote"].Push("All accounts"))\r
{\r
//If this poll fails, we will retry with the same since value\r
var nextSince = since;\r
//This is done to ensure there are no discrepancies due to clock differences\r
var current = DateTime.Now.AddSeconds(-1);\r
\r
- var tasks = from accountInfo in _accounts\r
+ var tasks = from accountInfo in _accounts.Values\r
select ProcessAccountFiles(accountInfo, since);\r
\r
await TaskEx.WhenAll(tasks.ToList());\r
var client = new CloudFilesClient(accountInfo);\r
\r
//We don't need to check the trash container\r
- var containers = client.ListContainers(accountInfo.UserName).Where(c=>c.Name!="trash");\r
+ var containers = client.ListContainers(accountInfo.UserName)\r
+ .Where(c=>c.Name!="trash")\r
+ .ToList();\r
\r
\r
CreateContainerFolders(accountInfo, containers);\r
await NetworkAgent.GetDeleteAwaiter();\r
//Get the poll time now. We may miss some deletions but it's better to keep a file that was deleted\r
//than delete a file that was created while we were executing the poll \r
- var pollTime = DateTime.Now;\r
\r
//Get the list of server objects changed since the last check\r
//The name of the container is passed as state in order to create a dictionary of tasks in a subsequent step\r
var listObjects = (from container in containers\r
select Task<IList<ObjectInfo>>.Factory.StartNew(_ =>\r
client.ListObjects(accountInfo.UserName, container.Name, since), container.Name)).ToList();\r
- //BUG: Can't detect difference between no changes or no objects\r
- //ListObjects returns nothing if there are no changes since the last check time (since value) \r
- //TODO: Must detect the difference between no server objects and no change\r
-\r
- //NOTE: One option is to "mark" all result lists with their container name, or \r
- //rather the url of the container\r
- //Another option \r
\r
var listShared = Task<IList<ObjectInfo>>.Factory.StartNew(_ => \r
client.ListSharedObjects(since), "shared");\r
\r
var differencer = _differencer.PostSnapshot(accountInfo, cleanRemotes);\r
\r
- ProcessDeletedFiles(accountInfo, differencer.Deleted.FilterDirectlyBelow(SelectiveUris), pollTime);\r
+ ProcessDeletedFiles(accountInfo, differencer.Deleted.FilterDirectlyBelow(SelectiveUris));\r
\r
// @@@ NEED To add previous state here as well, To compare with previous hash\r
\r
}\r
}\r
\r
- AccountsDifferencer _differencer = new AccountsDifferencer();\r
+ readonly AccountsDifferencer _differencer = new AccountsDifferencer();\r
private List<Uri> _selectiveUris=new List<Uri>();\r
\r
/// <summary>\r
/// </summary>\r
/// <param name="accountInfo"></param>\r
/// <param name="cloudFiles"></param>\r
- /// <param name="pollTime"></param>\r
- private void ProcessDeletedFiles(AccountInfo accountInfo, IEnumerable<ObjectInfo> cloudFiles, DateTime pollTime)\r
+ private void ProcessDeletedFiles(AccountInfo accountInfo, IEnumerable<ObjectInfo> cloudFiles)\r
{\r
if (accountInfo == null)\r
throw new ArgumentNullException("accountInfo");\r
}\r
}\r
\r
- private void ProcessTrashedFiles(AccountInfo accountInfo, IEnumerable<ObjectInfo> trashObjects)\r
- {\r
- var fileAgent = FileAgent.GetFileAgent(accountInfo);\r
- foreach (var trashObject in trashObjects)\r
- {\r
- var barePath = trashObject.RelativeUrlToFilePath(accountInfo.UserName);\r
- //HACK: Assume only the "pithos" container is used. Must find out what happens when\r
- //deleting a file from a different container\r
- var relativePath = Path.Combine("pithos", barePath);\r
- fileAgent.Delete(relativePath);\r
- }\r
- }\r
-\r
/// <summary>\r
/// Notify the UI to update the visual status\r
/// </summary>\r
\r
public void AddAccount(AccountInfo accountInfo)\r
{\r
- if (!_accounts.Contains(accountInfo))\r
- _accounts.Add(accountInfo);\r
+ //Avoid adding a duplicate accountInfo\r
+ _accounts.TryAdd(accountInfo.UserName, accountInfo);\r
+ }\r
+\r
+ public void RemoveAccount(AccountInfo accountInfo)\r
+ {\r
+ AccountInfo account;\r
+ _accounts.TryRemove(accountInfo.UserName,out account);\r
}\r
}\r
}\r
private void StartNetworkAgent()
{
- NetworkAgent.AddAccount(_accountInfo);
-
NetworkAgent.StatusNotification = StatusNotification;
NetworkAgent.Start();