//--------------------------------------------------------------------------
//
// Copyright (c) Microsoft Corporation. All rights reserved.
//
// File: AsyncProducerConsumerCollection.cs
//
//--------------------------------------------------------------------------
using System;
using System.Collections.Concurrent;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;
namespace Pithos.Core
{
/// Provides an asynchronous producer/consumer collection.
[DebuggerDisplay("Count={CurrentCount}")]
public sealed class AsyncCollection
{
/// Asynchronous semaphore used to keep track of asynchronous work.
private AsyncSemaphore _semaphore = new AsyncSemaphore(0);
/// The data stored in the collection.
private IProducerConsumerCollection _collection;
/// Initializes the asynchronous producer/consumer collection to store data in a first-in-first-out (FIFO) order.
public AsyncCollection() : this(new ConcurrentQueue()) { }
/// Initializes the asynchronous producer/consumer collection.
/// The underlying collection to use to store data.
public AsyncCollection(IProducerConsumerCollection collection)
{
if (collection == null) throw new ArgumentNullException("collection");
_collection = collection;
}
/// Adds an element to the collection.
/// The item to be added.
public void Add(T item)
{
if (_collection.TryAdd(item))
Task.Factory.StartNew(s => ((AsyncSemaphore)s).Release(),
_semaphore, CancellationToken.None, TaskCreationOptions.PreferFairness, TaskScheduler.Default);
else throw new InvalidOperationException("Invalid collection");
}
/// Takes an element from the collection asynchronously.
/// A Task that represents the element removed from the collection.
public Task Take()
{
return _semaphore.WaitAsync().ContinueWith(_ =>
{
T result;
if (!_collection.TryTake(out result)) throw new InvalidOperationException("Invalid collection");
return result;
}, TaskContinuationOptions.OnlyOnRanToCompletion);
}
/// Gets the number of elements in the collection.
public int Count { get { return _collection.Count; } }
}
}