Statistics
| Branch: | Revision:

root / trunk / Pithos.Core / Agents / Agent.cs @ a27aa447

History | View | Annotate | Download (3.3 kB)

1
using System;
2
using System.Collections.Concurrent;
3
using System.Collections.Generic;
4
using System.Linq;
5
using System.Text;
6
using System.Threading;
7
using System.Threading.Tasks;
8

    
9
namespace Pithos.Core
10
{
11
    public class Agent<TMessage> : IDisposable
12
    {        
13
        private readonly BlockingCollection<TMessage> _messages = new BlockingCollection<TMessage>();
14
        private readonly CancellationTokenSource _cancelSource = new CancellationTokenSource();
15
        public CancellationToken CancellationToken;
16

    
17
        private readonly Action<Agent<TMessage>> _process;
18

    
19

    
20
        public Agent(Action<Agent<TMessage>> action)
21
        {
22
            _process = action;
23
            CancellationToken = _cancelSource.Token;
24
        }
25

    
26
        public void Post(TMessage message)
27
        {
28
            _messages.Add(message);
29
        }
30

    
31
        public Task<TMessage> Receive(int timeout = -1)
32
        {
33
            return Task<TMessage>.Factory.StartNew(() =>
34
            {
35
                TMessage item;
36
                if (!_messages.TryTake(out item, timeout, CancellationToken))
37
                    throw new TimeoutException();
38
                return item;
39
            });
40
        }
41

    
42
        public Task<TMessage> TryReceive(int timeout = -1)
43
        {
44
            return Task<TMessage>.Factory.StartNew(() =>
45
            {
46
                TMessage item;
47
                _messages.TryTake(out item, timeout, CancellationToken);
48
                return item;
49
            });
50
        }
51

    
52

    
53

    
54

    
55
        public void Start()
56
        {
57
            Task.Factory.StartNew(() => _process(this), CancellationToken);            
58
        }
59

    
60

    
61

    
62
        public static Agent<TMessage> Start(Action<Agent<TMessage>> action)
63
        {
64
            var agent = new Agent<TMessage>(action);
65
            agent.Start();
66
            return agent;
67
        }
68

    
69
        public void Stop()
70
        {
71
            _messages.CompleteAdding();
72
            _cancelSource.Cancel();
73
        }
74

    
75
        public void DoAsync(Action action)
76
        {
77
            Task.Factory.StartNew(action, CancellationToken);
78
        }
79

    
80

    
81
        ~Agent()
82
        {
83
            Dispose(false);
84
        }
85

    
86
        public void Dispose()
87
        {
88
            Dispose(true);
89
            GC.SuppressFinalize(this);
90
        }
91

    
92
        protected void Dispose(bool disposing)
93
        {
94
            if (disposing)
95
            {
96
                Stop();
97
                _messages.Dispose();
98
                _cancelSource.Dispose();
99
            }
100
        }
101

    
102
/*
103
        public void AddFromEnumerable(IEnumerable<TMessage> enumerable)
104
        {
105
            foreach (var message in enumerable)
106
            {
107
                Post(message);
108
            }
109
        }
110
*/
111

    
112
        public IEnumerable<TMessage> GetEnumerable()
113
        {
114
            return _messages;
115
        }
116

    
117
        public Task LoopAsync(Task process, Action loop,Action<Exception> onError=null)
118
        {
119
            return process.ContinueWith(t =>
120
            {   
121
             
122
                Task.Factory.StartNew(loop, CancellationToken);
123

    
124
                if (t.IsFaulted)
125
                {
126
                    var ex = t.Exception.InnerException;
127
                    if (ex is OperationCanceledException)
128
                        Stop();
129
                    if (onError != null)
130
                        onError(ex);
131
                }
132
            });
133

    
134
        }
135
    }
136
}