Statistics
| Branch: | Revision:

root / trunk / Pithos.Core / Agents / Agent.cs @ cfed7823

History | View | Annotate | Download (4 kB)

1
using System;
2
using System.Collections.Concurrent;
3
using System.Collections.Generic;
4
using System.Diagnostics.Contracts;
5
using System.Linq;
6
using System.Text;
7
using System.Threading;
8
using System.Threading.Tasks;
9

    
10
namespace Pithos.Core
11
{
12
    public class Agent<TMessage> : IDisposable
13
    {        
14
        private readonly BlockingCollection<TMessage> _messages = new BlockingCollection<TMessage>();
15
        private readonly CancellationTokenSource _cancelSource = new CancellationTokenSource();
16
        public CancellationToken CancellationToken;
17

    
18
        private readonly Action<Agent<TMessage>> _process;
19

    
20

    
21
        public Agent(Action<Agent<TMessage>> action)
22
        {
23
            _process = action;
24
            CancellationToken = _cancelSource.Token;
25
        }
26

    
27
        public void Post(TMessage message)
28
        {
29
            _messages.Add(message);
30
        }
31

    
32
        public Task<TMessage> Receive(int timeout = -1)
33
        {
34
            return Task<TMessage>.Factory.StartNew(() =>
35
            {
36
                TMessage item;
37
                if (!_messages.TryTake(out item, timeout, CancellationToken))
38
                    throw new TimeoutException();
39
                return item;
40
            });
41
        }
42

    
43
        public Task<TMessage> TryReceive(int timeout = -1)
44
        {
45
            return Task<TMessage>.Factory.StartNew(() =>
46
            {
47
                TMessage item;
48
                _messages.TryTake(out item, timeout, CancellationToken);
49
                return item;
50
            });
51
        }
52

    
53

    
54

    
55

    
56
        public void Start()
57
        {
58
            Task.Factory.StartNew(() => _process(this), CancellationToken);            
59
        }
60

    
61

    
62

    
63
        public static Agent<TMessage> Start(Action<Agent<TMessage>> action)
64
        {
65
            var agent = new Agent<TMessage>(action);
66
            agent.Start();
67
            return agent;
68
        }
69

    
70
        public void Stop()
71
        {
72
            _messages.CompleteAdding();
73
            _cancelSource.Cancel();
74
        }
75

    
76
        public void DoAsync(Action action)
77
        {
78
            Contract.Requires(action!=null);
79
            Task.Factory.StartNew(action, CancellationToken);
80
        }
81

    
82

    
83
        ~Agent()
84
        {
85
            Dispose(false);
86
        }
87

    
88
        public void Dispose()
89
        {
90
            Dispose(true);
91
            GC.SuppressFinalize(this);
92
        }
93

    
94
        protected void Dispose(bool disposing)
95
        {
96
            if (disposing)
97
            {
98
                Stop();
99
                _messages.Dispose();
100
                _cancelSource.Dispose();
101
            }
102
        }
103

    
104
        public IEnumerable<TMessage> GetEnumerable()
105
        {
106
            return _messages;
107
        }
108

    
109
        public Task LoopAsync(Task process, Action loop,Action<Exception> onError=null)
110
        {
111
            Contract.Requires(process!=null);
112
            Contract.Requires(loop!=null);
113

    
114
            return process.ContinueWith(t =>
115
            {   
116
             
117
                Task.Factory.StartNew(loop, CancellationToken);
118

    
119
                if (t.IsFaulted)
120
                {
121
                    var ex = t.Exception.InnerException;
122
                    if (ex is OperationCanceledException)
123
                        Stop();
124
                    if (onError != null)
125
                        onError(ex);
126
                }
127
            },CancellationToken);
128
        }
129

    
130
        public Task<T> LoopAsync<T>(Task<T> process, Action loop,Action<Exception> onError=null)
131
        {
132
            return process.ContinueWith(t =>
133
            {   
134
                //Spawn the Loop immediatelly
135
                Task.Factory.StartNew(loop,CancellationToken);
136
                //Then process possible exceptions
137
                if (t.IsFaulted)
138
                {
139
                    var ex = t.Exception.InnerException;
140
                    if (ex is OperationCanceledException)
141
                        Stop();
142
                    if (onError != null)
143
                        onError(ex);
144
                }
145
                return default(T);
146
            },CancellationToken);
147
        }
148
    }
149
}