root / trunk / Pithos.Core / Agents / AsyncCollection.cs @ dccd340f
History | View | Annotate | Download (2.6 kB)
1 |
//-------------------------------------------------------------------------- |
---|---|
2 |
// |
3 |
// Copyright (c) Microsoft Corporation. All rights reserved. |
4 |
// |
5 |
// File: AsyncProducerConsumerCollection.cs |
6 |
// |
7 |
//-------------------------------------------------------------------------- |
8 |
|
9 |
using System; |
10 |
using System.Collections.Concurrent; |
11 |
using System.Diagnostics; |
12 |
using System.Threading; |
13 |
using System.Threading.Tasks; |
14 |
|
15 |
namespace Pithos.Core |
16 |
{ |
17 |
/// <summary>Provides an asynchronous producer/consumer collection.</summary> |
18 |
[DebuggerDisplay("Count={CurrentCount}")] |
19 |
public sealed class AsyncCollection<T> |
20 |
{ |
21 |
/// <summary>Asynchronous semaphore used to keep track of asynchronous work.</summary> |
22 |
private AsyncSemaphore _semaphore = new AsyncSemaphore(0); |
23 |
/// <summary>The data stored in the collection.</summary> |
24 |
private IProducerConsumerCollection<T> _collection; |
25 |
|
26 |
/// <summary>Initializes the asynchronous producer/consumer collection to store data in a first-in-first-out (FIFO) order.</summary> |
27 |
public AsyncCollection() : this(new ConcurrentQueue<T>()) { } |
28 |
|
29 |
/// <summary>Initializes the asynchronous producer/consumer collection.</summary> |
30 |
/// <param name="collection">The underlying collection to use to store data.</param> |
31 |
public AsyncCollection(IProducerConsumerCollection<T> collection) |
32 |
{ |
33 |
if (collection == null) throw new ArgumentNullException("collection"); |
34 |
_collection = collection; |
35 |
} |
36 |
|
37 |
/// <summary>Adds an element to the collection.</summary> |
38 |
/// <param name="item">The item to be added.</param> |
39 |
public void Add(T item) |
40 |
{ |
41 |
if (_collection.TryAdd(item)) |
42 |
Task.Factory.StartNew(s => ((AsyncSemaphore)s).Release(), |
43 |
_semaphore, CancellationToken.None, TaskCreationOptions.PreferFairness, TaskScheduler.Default); |
44 |
else throw new InvalidOperationException("Invalid collection"); |
45 |
} |
46 |
|
47 |
/// <summary>Takes an element from the collection asynchronously.</summary> |
48 |
/// <returns>A Task that represents the element removed from the collection.</returns> |
49 |
public Task<T> Take() |
50 |
{ |
51 |
return _semaphore.WaitAsync().ContinueWith(_ => |
52 |
{ |
53 |
T result; |
54 |
if (!_collection.TryTake(out result)) throw new InvalidOperationException("Invalid collection"); |
55 |
return result; |
56 |
}, TaskContinuationOptions.OnlyOnRanToCompletion); |
57 |
} |
58 |
|
59 |
/// <summary>Gets the number of elements in the collection.</summary> |
60 |
public int Count { get { return _collection.Count; } } |
61 |
|
62 |
} |
63 |
} |