Statistics
| Branch: | Revision:

root / trunk / Pithos.Core / JobQueue.cs @ 4147814e

History | View | Annotate | Download (3.6 kB)

1
#region
2
/* -----------------------------------------------------------------------
3
 * <copyright file="JobQueue.cs" company="GRNet">
4
 * 
5
 * Copyright 2011-2012 GRNET S.A. All rights reserved.
6
 *
7
 * Redistribution and use in source and binary forms, with or
8
 * without modification, are permitted provided that the following
9
 * conditions are met:
10
 *
11
 *   1. Redistributions of source code must retain the above
12
 *      copyright notice, this list of conditions and the following
13
 *      disclaimer.
14
 *
15
 *   2. Redistributions in binary form must reproduce the above
16
 *      copyright notice, this list of conditions and the following
17
 *      disclaimer in the documentation and/or other materials
18
 *      provided with the distribution.
19
 *
20
 *
21
 * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
22
 * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
23
 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
24
 * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
25
 * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26
 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27
 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
28
 * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
29
 * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
30
 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
31
 * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
32
 * POSSIBILITY OF SUCH DAMAGE.
33
 *
34
 * The views and conclusions contained in the software and
35
 * documentation are those of the authors and should not be
36
 * interpreted as representing official policies, either expressed
37
 * or implied, of GRNET S.A.
38
 * </copyright>
39
 * -----------------------------------------------------------------------
40
 */
41
#endregion
42
using System.Collections.Concurrent;
43
using System.Threading;
44
using System.Threading.Tasks;
45

    
46
namespace Pithos.Core
47
{
48
    using System;
49
    using System.Collections.Generic;
50
    using System.Linq;
51
    using System.Text;
52

    
53
    /// <summary>
54
    /// TODO: Update summary.
55
    /// </summary>
56
    public class JobQueue
57
    {
58
        private readonly BlockingCollection<Action> _statusUpdateQueue = new BlockingCollection<Action>();
59
        private CancellationToken _cancellationToken;
60
        
61

    
62
        public void Start(CancellationToken token)
63
        {
64
            _cancellationToken = token;
65
            Task.Factory.StartNew(ProcessUpdates, _cancellationToken);
66
        }
67

    
68
        private void ProcessUpdates()
69
        {
70
            foreach (var action in _statusUpdateQueue.GetConsumingEnumerable())
71
            {
72
                action();
73
            }
74
        }
75

    
76
        public void Add(Action action)
77
        {
78
            _statusUpdateQueue.Add(action);
79
        }
80

    
81
        public void Stop()
82
        {
83
            _statusUpdateQueue.CompleteAdding();
84
        }
85
       
86
    }   
87

    
88
    public class JobAgent:Agent<Action>
89
    {
90
        protected JobAgent(Action<Agent<Action>>  action)
91
            :base(action)
92
        {
93
            
94
        }
95

    
96
        public static JobAgent Create()
97
        {
98
            return (JobAgent)Start(queue =>
99
            {
100
                Action loop = null;
101
                loop = () =>
102
                {
103
                    var job = queue.Receive();
104
                    job.ContinueWith(t =>
105
                    {
106
                        t.IgnoreExceptions();
107
                        var action = job.Result;
108
                        action();
109
                        queue.DoAsync(loop);
110
                    });
111
                };
112
                loop();
113
            });
114
        }
115

    
116
    }
117
}