Statistics
| Branch: | Revision:

root / trunk / Pithos.Core / Agents / IdleBatch.cs @ e0f69809

History | View | Annotate | Download (3.4 kB)

1 79f92570 Panagiotis Kanavos
#region
2 79f92570 Panagiotis Kanavos
/* -----------------------------------------------------------------------
3 79f92570 Panagiotis Kanavos
 * <copyright file="IdleBatch.cs" company="GRNet">
4 79f92570 Panagiotis Kanavos
 * 
5 79f92570 Panagiotis Kanavos
 * Copyright 2011-2012 GRNET S.A. All rights reserved.
6 79f92570 Panagiotis Kanavos
 *
7 79f92570 Panagiotis Kanavos
 * Redistribution and use in source and binary forms, with or
8 79f92570 Panagiotis Kanavos
 * without modification, are permitted provided that the following
9 79f92570 Panagiotis Kanavos
 * conditions are met:
10 79f92570 Panagiotis Kanavos
 *
11 79f92570 Panagiotis Kanavos
 *   1. Redistributions of source code must retain the above
12 79f92570 Panagiotis Kanavos
 *      copyright notice, this list of conditions and the following
13 79f92570 Panagiotis Kanavos
 *      disclaimer.
14 79f92570 Panagiotis Kanavos
 *
15 79f92570 Panagiotis Kanavos
 *   2. Redistributions in binary form must reproduce the above
16 79f92570 Panagiotis Kanavos
 *      copyright notice, this list of conditions and the following
17 79f92570 Panagiotis Kanavos
 *      disclaimer in the documentation and/or other materials
18 79f92570 Panagiotis Kanavos
 *      provided with the distribution.
19 79f92570 Panagiotis Kanavos
 *
20 79f92570 Panagiotis Kanavos
 *
21 79f92570 Panagiotis Kanavos
 * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
22 79f92570 Panagiotis Kanavos
 * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
23 79f92570 Panagiotis Kanavos
 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
24 79f92570 Panagiotis Kanavos
 * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
25 79f92570 Panagiotis Kanavos
 * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26 79f92570 Panagiotis Kanavos
 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27 79f92570 Panagiotis Kanavos
 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
28 79f92570 Panagiotis Kanavos
 * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
29 79f92570 Panagiotis Kanavos
 * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
30 79f92570 Panagiotis Kanavos
 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
31 79f92570 Panagiotis Kanavos
 * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
32 79f92570 Panagiotis Kanavos
 * POSSIBILITY OF SUCH DAMAGE.
33 79f92570 Panagiotis Kanavos
 *
34 79f92570 Panagiotis Kanavos
 * The views and conclusions contained in the software and
35 79f92570 Panagiotis Kanavos
 * documentation are those of the authors and should not be
36 79f92570 Panagiotis Kanavos
 * interpreted as representing official policies, either expressed
37 79f92570 Panagiotis Kanavos
 * or implied, of GRNET S.A.
38 79f92570 Panagiotis Kanavos
 * </copyright>
39 79f92570 Panagiotis Kanavos
 * -----------------------------------------------------------------------
40 79f92570 Panagiotis Kanavos
 */
41 79f92570 Panagiotis Kanavos
#endregion
42 79f92570 Panagiotis Kanavos
43 79f92570 Panagiotis Kanavos
using System.Reflection;
44 79f92570 Panagiotis Kanavos
using System.Threading;
45 79f92570 Panagiotis Kanavos
using System.Threading.Tasks.Dataflow;
46 79f92570 Panagiotis Kanavos
using System;
47 79f92570 Panagiotis Kanavos
using log4net;
48 79f92570 Panagiotis Kanavos
49 79f92570 Panagiotis Kanavos
namespace Pithos.Core.Agents
50 79f92570 Panagiotis Kanavos
{
51 79f92570 Panagiotis Kanavos
52 79f92570 Panagiotis Kanavos
    /// <summary>
53 79f92570 Panagiotis Kanavos
    /// Calls a callback function if an idle timeout expires after the last post
54 79f92570 Panagiotis Kanavos
    /// </summary>
55 79f92570 Panagiotis Kanavos
    public class IdleBatch<T>
56 79f92570 Panagiotis Kanavos
    {
57 79f92570 Panagiotis Kanavos
        private static readonly ILog Log = LogManager.GetLogger(MethodBase.GetCurrentMethod().DeclaringType);
58 79f92570 Panagiotis Kanavos
59 79f92570 Panagiotis Kanavos
        readonly Timer _triggerTimer;
60 79f92570 Panagiotis Kanavos
61 79f92570 Panagiotis Kanavos
        private readonly BatchBlock<T> _batchBlock;
62 79f92570 Panagiotis Kanavos
        private readonly TransformBlock<T, T> _timerBlock;
63 79f92570 Panagiotis Kanavos
        private readonly ActionBlock<T[]> _processBlock;
64 79f92570 Panagiotis Kanavos
65 79f92570 Panagiotis Kanavos
        private readonly int _timeOut;
66 79f92570 Panagiotis Kanavos
67 79f92570 Panagiotis Kanavos
        public IdleBatch(int idleTimeout, Action<T[]> action)
68 79f92570 Panagiotis Kanavos
        {
69 79f92570 Panagiotis Kanavos
            _timeOut = idleTimeout;
70 79f92570 Panagiotis Kanavos
            _timerBlock = new TransformBlock<T, T>(t =>
71 79f92570 Panagiotis Kanavos
            {
72 79f92570 Panagiotis Kanavos
                _triggerTimer.Change(_timeOut, Timeout.Infinite);
73 79f92570 Panagiotis Kanavos
                return t;
74 79f92570 Panagiotis Kanavos
            });
75 79f92570 Panagiotis Kanavos
            _batchBlock = new BatchBlock<T>(9999);
76 79f92570 Panagiotis Kanavos
            _processBlock = new ActionBlock<T[]>(action);
77 79f92570 Panagiotis Kanavos
78 79f92570 Panagiotis Kanavos
            _timerBlock.LinkTo(_batchBlock);
79 79f92570 Panagiotis Kanavos
            _batchBlock.LinkTo(_processBlock);
80 79f92570 Panagiotis Kanavos
81 79f92570 Panagiotis Kanavos
            _timerBlock.Completion.ContinueWith(_ => _batchBlock.Complete());
82 79f92570 Panagiotis Kanavos
            _batchBlock.Completion.ContinueWith(_ => _processBlock.Complete());
83 79f92570 Panagiotis Kanavos
84 79f92570 Panagiotis Kanavos
            _triggerTimer = new Timer(_ => _batchBlock.TriggerBatch());
85 79f92570 Panagiotis Kanavos
        }
86 79f92570 Panagiotis Kanavos
87 79f92570 Panagiotis Kanavos
        public void Post(T input)
88 79f92570 Panagiotis Kanavos
        {
89 79f92570 Panagiotis Kanavos
            _timerBlock.Post(input);
90 79f92570 Panagiotis Kanavos
        }
91 79f92570 Panagiotis Kanavos
92 79f92570 Panagiotis Kanavos
        public void ForceTrigger()
93 79f92570 Panagiotis Kanavos
        {
94 79f92570 Panagiotis Kanavos
            _batchBlock.TriggerBatch();
95 79f92570 Panagiotis Kanavos
            _timerBlock.Complete();
96 79f92570 Panagiotis Kanavos
            _processBlock.Completion.Wait();
97 79f92570 Panagiotis Kanavos
        }
98 79f92570 Panagiotis Kanavos
    }
99 79f92570 Panagiotis Kanavos
100 79f92570 Panagiotis Kanavos
}