Statistics
| Branch: | Revision:

root / trunk / Pithos.Core / Agents / AsyncCollection.cs @ dccd340f

History | View | Annotate | Download (2.6 kB)

1
//--------------------------------------------------------------------------
2
// 
3
//  Copyright (c) Microsoft Corporation.  All rights reserved. 
4
// 
5
//  File: AsyncProducerConsumerCollection.cs
6
//
7
//--------------------------------------------------------------------------
8

    
9
using System;
10
using System.Collections.Concurrent;
11
using System.Diagnostics;
12
using System.Threading;
13
using System.Threading.Tasks;
14

    
15
namespace Pithos.Core
16
{
17
    /// <summary>Provides an asynchronous producer/consumer collection.</summary>
18
    [DebuggerDisplay("Count={CurrentCount}")]
19
    public sealed class AsyncCollection<T> 
20
    {
21
        /// <summary>Asynchronous semaphore used to keep track of asynchronous work.</summary>
22
        private AsyncSemaphore _semaphore = new AsyncSemaphore(0);
23
        /// <summary>The data stored in the collection.</summary>
24
        private IProducerConsumerCollection<T> _collection;
25

    
26
        /// <summary>Initializes the asynchronous producer/consumer collection to store data in a first-in-first-out (FIFO) order.</summary>
27
        public AsyncCollection() : this(new ConcurrentQueue<T>()) { }
28

    
29
        /// <summary>Initializes the asynchronous producer/consumer collection.</summary>
30
        /// <param name="collection">The underlying collection to use to store data.</param>
31
        public AsyncCollection(IProducerConsumerCollection<T> collection)
32
        {
33
            if (collection == null) throw new ArgumentNullException("collection");
34
            _collection = collection;
35
        }
36

    
37
        /// <summary>Adds an element to the collection.</summary>
38
        /// <param name="item">The item to be added.</param>
39
        public void Add(T item)
40
        {
41
            if (_collection.TryAdd(item))
42
                Task.Factory.StartNew(s => ((AsyncSemaphore)s).Release(),
43
                    _semaphore, CancellationToken.None, TaskCreationOptions.PreferFairness, TaskScheduler.Default);
44
            else throw new InvalidOperationException("Invalid collection");
45
        }
46

    
47
        /// <summary>Takes an element from the collection asynchronously.</summary>
48
        /// <returns>A Task that represents the element removed from the collection.</returns>
49
        public Task<T> Take()
50
        {
51
            return _semaphore.WaitAsync().ContinueWith(_ =>
52
            {
53
                T result;
54
                if (!_collection.TryTake(out result)) throw new InvalidOperationException("Invalid collection");
55
                return result;
56
            }, TaskContinuationOptions.OnlyOnRanToCompletion);
57
        }
58

    
59
        /// <summary>Gets the number of elements in the collection.</summary>
60
        public int Count { get { return _collection.Count; } }
61

    
62
    }
63
}