// -----------------------------------------------------------------------
//
// TODO: Update copyright text.
//
// -----------------------------------------------------------------------
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;
namespace Pithos.Core
{
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
///
/// TODO: Update summary.
///
public class JobQueue
{
private readonly BlockingCollection _statusUpdateQueue = new BlockingCollection();
private CancellationToken _cancellationToken;
public void Start(CancellationToken token)
{
_cancellationToken = token;
Task.Factory.StartNew(ProcessUpdates, _cancellationToken);
}
private void ProcessUpdates()
{
foreach (var action in _statusUpdateQueue.GetConsumingEnumerable())
{
action();
}
}
public void Add(Action action)
{
_statusUpdateQueue.Add(action);
}
public void Stop()
{
_statusUpdateQueue.CompleteAdding();
}
}
public class JobAgent:Agent
{
protected JobAgent(Action> action)
:base(action)
{
}
public static JobAgent Create()
{
return (JobAgent)Start(queue =>
{
Action loop = null;
loop = () =>
{
var job = queue.Receive();
job.ContinueWith(t =>
{
t.IgnoreExceptions();
var action = job.Result;
action();
queue.DoAsync(loop);
});
};
loop();
});
}
}
}