Statistics
| Branch: | Revision:

root / trunk / Pithos.Core / Agents / AsyncCollection.cs @ dccd340f

History | View | Annotate | Download (2.6 kB)

1 174bbb6e Panagiotis Kanavos
//--------------------------------------------------------------------------
2 174bbb6e Panagiotis Kanavos
// 
3 174bbb6e Panagiotis Kanavos
//  Copyright (c) Microsoft Corporation.  All rights reserved. 
4 174bbb6e Panagiotis Kanavos
// 
5 174bbb6e Panagiotis Kanavos
//  File: AsyncProducerConsumerCollection.cs
6 174bbb6e Panagiotis Kanavos
//
7 174bbb6e Panagiotis Kanavos
//--------------------------------------------------------------------------
8 174bbb6e Panagiotis Kanavos
9 174bbb6e Panagiotis Kanavos
using System;
10 174bbb6e Panagiotis Kanavos
using System.Collections.Concurrent;
11 174bbb6e Panagiotis Kanavos
using System.Diagnostics;
12 174bbb6e Panagiotis Kanavos
using System.Threading;
13 174bbb6e Panagiotis Kanavos
using System.Threading.Tasks;
14 174bbb6e Panagiotis Kanavos
15 174bbb6e Panagiotis Kanavos
namespace Pithos.Core
16 174bbb6e Panagiotis Kanavos
{
17 174bbb6e Panagiotis Kanavos
    /// <summary>Provides an asynchronous producer/consumer collection.</summary>
18 174bbb6e Panagiotis Kanavos
    [DebuggerDisplay("Count={CurrentCount}")]
19 174bbb6e Panagiotis Kanavos
    public sealed class AsyncCollection<T> 
20 174bbb6e Panagiotis Kanavos
    {
21 174bbb6e Panagiotis Kanavos
        /// <summary>Asynchronous semaphore used to keep track of asynchronous work.</summary>
22 174bbb6e Panagiotis Kanavos
        private AsyncSemaphore _semaphore = new AsyncSemaphore(0);
23 174bbb6e Panagiotis Kanavos
        /// <summary>The data stored in the collection.</summary>
24 174bbb6e Panagiotis Kanavos
        private IProducerConsumerCollection<T> _collection;
25 174bbb6e Panagiotis Kanavos
26 174bbb6e Panagiotis Kanavos
        /// <summary>Initializes the asynchronous producer/consumer collection to store data in a first-in-first-out (FIFO) order.</summary>
27 174bbb6e Panagiotis Kanavos
        public AsyncCollection() : this(new ConcurrentQueue<T>()) { }
28 174bbb6e Panagiotis Kanavos
29 174bbb6e Panagiotis Kanavos
        /// <summary>Initializes the asynchronous producer/consumer collection.</summary>
30 174bbb6e Panagiotis Kanavos
        /// <param name="collection">The underlying collection to use to store data.</param>
31 174bbb6e Panagiotis Kanavos
        public AsyncCollection(IProducerConsumerCollection<T> collection)
32 174bbb6e Panagiotis Kanavos
        {
33 174bbb6e Panagiotis Kanavos
            if (collection == null) throw new ArgumentNullException("collection");
34 174bbb6e Panagiotis Kanavos
            _collection = collection;
35 174bbb6e Panagiotis Kanavos
        }
36 174bbb6e Panagiotis Kanavos
37 174bbb6e Panagiotis Kanavos
        /// <summary>Adds an element to the collection.</summary>
38 174bbb6e Panagiotis Kanavos
        /// <param name="item">The item to be added.</param>
39 174bbb6e Panagiotis Kanavos
        public void Add(T item)
40 174bbb6e Panagiotis Kanavos
        {
41 174bbb6e Panagiotis Kanavos
            if (_collection.TryAdd(item))
42 174bbb6e Panagiotis Kanavos
                Task.Factory.StartNew(s => ((AsyncSemaphore)s).Release(),
43 174bbb6e Panagiotis Kanavos
                    _semaphore, CancellationToken.None, TaskCreationOptions.PreferFairness, TaskScheduler.Default);
44 174bbb6e Panagiotis Kanavos
            else throw new InvalidOperationException("Invalid collection");
45 174bbb6e Panagiotis Kanavos
        }
46 174bbb6e Panagiotis Kanavos
47 174bbb6e Panagiotis Kanavos
        /// <summary>Takes an element from the collection asynchronously.</summary>
48 174bbb6e Panagiotis Kanavos
        /// <returns>A Task that represents the element removed from the collection.</returns>
49 174bbb6e Panagiotis Kanavos
        public Task<T> Take()
50 174bbb6e Panagiotis Kanavos
        {
51 174bbb6e Panagiotis Kanavos
            return _semaphore.WaitAsync().ContinueWith(_ =>
52 174bbb6e Panagiotis Kanavos
            {
53 174bbb6e Panagiotis Kanavos
                T result;
54 174bbb6e Panagiotis Kanavos
                if (!_collection.TryTake(out result)) throw new InvalidOperationException("Invalid collection");
55 174bbb6e Panagiotis Kanavos
                return result;
56 174bbb6e Panagiotis Kanavos
            }, TaskContinuationOptions.OnlyOnRanToCompletion);
57 174bbb6e Panagiotis Kanavos
        }
58 174bbb6e Panagiotis Kanavos
59 174bbb6e Panagiotis Kanavos
        /// <summary>Gets the number of elements in the collection.</summary>
60 174bbb6e Panagiotis Kanavos
        public int Count { get { return _collection.Count; } }
61 174bbb6e Panagiotis Kanavos
62 174bbb6e Panagiotis Kanavos
    }
63 174bbb6e Panagiotis Kanavos
}