#region
/* -----------------------------------------------------------------------
*
*
* Copyright 2011-2012 GRNET S.A. All rights reserved.
*
* Redistribution and use in source and binary forms, with or
* without modification, are permitted provided that the following
* conditions are met:
*
* 1. Redistributions of source code must retain the above
* copyright notice, this list of conditions and the following
* disclaimer.
*
* 2. Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following
* disclaimer in the documentation and/or other materials
* provided with the distribution.
*
*
* THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
* OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
* WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
* PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
* CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
* USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
* AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
* LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
* ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*
* The views and conclusions contained in the software and
* documentation are those of the authors and should not be
* interpreted as representing official policies, either expressed
* or implied, of GRNET S.A.
*
* -----------------------------------------------------------------------
*/
#endregion
using System.Reflection;
using System.Threading;
using System.Threading.Tasks.Dataflow;
using System;
using log4net;
namespace Pithos.Core.Agents
{
///
/// Calls a callback function if an idle timeout expires after the last post
///
public class IdleBatch
{
private static readonly ILog Log = LogManager.GetLogger(MethodBase.GetCurrentMethod().DeclaringType);
readonly Timer _triggerTimer;
private readonly BatchBlock _batchBlock;
private readonly TransformBlock _timerBlock;
private readonly ActionBlock _processBlock;
private readonly int _timeOut;
public IdleBatch(int idleTimeout, Action action)
{
_timeOut = idleTimeout;
_timerBlock = new TransformBlock(t =>
{
_triggerTimer.Change(_timeOut, Timeout.Infinite);
return t;
});
_batchBlock = new BatchBlock(9999);
_processBlock = new ActionBlock(action);
_timerBlock.LinkTo(_batchBlock);
_batchBlock.LinkTo(_processBlock);
_timerBlock.Completion.ContinueWith(_ => _batchBlock.Complete());
_batchBlock.Completion.ContinueWith(_ => _processBlock.Complete());
_triggerTimer = new Timer(_ => _batchBlock.TriggerBatch());
}
public void Post(T input)
{
_timerBlock.Post(input);
}
public void ForceTrigger()
{
_batchBlock.TriggerBatch();
_timerBlock.Complete();
_processBlock.Completion.Wait();
}
}
}