2 /* -----------------------------------------------------------------------
3 * <copyright file="IdleBatch.cs" company="GRNet">
5 * Copyright 2011-2012 GRNET S.A. All rights reserved.
7 * Redistribution and use in source and binary forms, with or
8 * without modification, are permitted provided that the following
11 * 1. Redistributions of source code must retain the above
12 * copyright notice, this list of conditions and the following
15 * 2. Redistributions in binary form must reproduce the above
16 * copyright notice, this list of conditions and the following
17 * disclaimer in the documentation and/or other materials
18 * provided with the distribution.
21 * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
22 * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
23 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
24 * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
25 * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
28 * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
29 * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
30 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
31 * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
32 * POSSIBILITY OF SUCH DAMAGE.
34 * The views and conclusions contained in the software and
35 * documentation are those of the authors and should not be
36 * interpreted as representing official policies, either expressed
37 * or implied, of GRNET S.A.
39 * -----------------------------------------------------------------------
43 using System.Reflection;
44 using System.Threading;
45 using System.Threading.Tasks.Dataflow;
49 namespace Pithos.Core.Agents
53 /// Calls a callback function if an idle timeout expires after the last post
55 public class IdleBatch<T>
57 private static readonly ILog Log = LogManager.GetLogger(MethodBase.GetCurrentMethod().DeclaringType);
59 readonly Timer _triggerTimer;
61 private readonly BatchBlock<T> _batchBlock;
62 private readonly TransformBlock<T, T> _timerBlock;
63 private readonly ActionBlock<T[]> _processBlock;
65 private readonly int _timeOut;
67 public IdleBatch(int idleTimeout, Action<T[]> action)
69 _timeOut = idleTimeout;
70 _timerBlock = new TransformBlock<T, T>(t =>
72 _triggerTimer.Change(_timeOut, Timeout.Infinite);
75 _batchBlock = new BatchBlock<T>(9999);
76 _processBlock = new ActionBlock<T[]>(action);
78 _timerBlock.LinkTo(_batchBlock);
79 _batchBlock.LinkTo(_processBlock);
81 _timerBlock.Completion.ContinueWith(_ => _batchBlock.Complete());
82 _batchBlock.Completion.ContinueWith(_ => _processBlock.Complete());
84 _triggerTimer = new Timer(_ => _batchBlock.TriggerBatch());
87 public void Post(T input)
89 _timerBlock.Post(input);
92 public void ForceTrigger()
94 _batchBlock.TriggerBatch();
95 _timerBlock.Complete();
96 _processBlock.Completion.Wait();