//-------------------------------------------------------------------------- // // Copyright (c) Microsoft Corporation. All rights reserved. // // File: AsyncProducerConsumerCollection.cs // //-------------------------------------------------------------------------- using System; using System.Collections.Concurrent; using System.Diagnostics; using System.Threading; using System.Threading.Tasks; namespace Pithos.Core { /// Provides an asynchronous producer/consumer collection. [DebuggerDisplay("Count={CurrentCount}")] public sealed class AsyncCollection { /// Asynchronous semaphore used to keep track of asynchronous work. private AsyncSemaphore _semaphore = new AsyncSemaphore(0); /// The data stored in the collection. private IProducerConsumerCollection _collection; /// Initializes the asynchronous producer/consumer collection to store data in a first-in-first-out (FIFO) order. public AsyncCollection() : this(new ConcurrentQueue()) { } /// Initializes the asynchronous producer/consumer collection. /// The underlying collection to use to store data. public AsyncCollection(IProducerConsumerCollection collection) { if (collection == null) throw new ArgumentNullException("collection"); _collection = collection; } /// Adds an element to the collection. /// The item to be added. public void Add(T item) { if (_collection.TryAdd(item)) Task.Factory.StartNew(s => ((AsyncSemaphore)s).Release(), _semaphore, CancellationToken.None, TaskCreationOptions.PreferFairness, TaskScheduler.Default); else throw new InvalidOperationException("Invalid collection"); } /// Takes an element from the collection asynchronously. /// A Task that represents the element removed from the collection. public Task Take() { return _semaphore.WaitAsync().ContinueWith(_ => { T result; if (!_collection.TryTake(out result)) throw new InvalidOperationException("Invalid collection"); return result; }, TaskContinuationOptions.OnlyOnRanToCompletion); } /// Gets the number of elements in the collection. public int Count { get { return _collection.Count; } } } }