root / trunk / Pithos.Core / Agents / AsyncCollection.cs @ dccd340f
History | View | Annotate | Download (2.6 kB)
1 | 174bbb6e | Panagiotis Kanavos | //-------------------------------------------------------------------------- |
---|---|---|---|
2 | 174bbb6e | Panagiotis Kanavos | // |
3 | 174bbb6e | Panagiotis Kanavos | // Copyright (c) Microsoft Corporation. All rights reserved. |
4 | 174bbb6e | Panagiotis Kanavos | // |
5 | 174bbb6e | Panagiotis Kanavos | // File: AsyncProducerConsumerCollection.cs |
6 | 174bbb6e | Panagiotis Kanavos | // |
7 | 174bbb6e | Panagiotis Kanavos | //-------------------------------------------------------------------------- |
8 | 174bbb6e | Panagiotis Kanavos | |
9 | 174bbb6e | Panagiotis Kanavos | using System; |
10 | 174bbb6e | Panagiotis Kanavos | using System.Collections.Concurrent; |
11 | 174bbb6e | Panagiotis Kanavos | using System.Diagnostics; |
12 | 174bbb6e | Panagiotis Kanavos | using System.Threading; |
13 | 174bbb6e | Panagiotis Kanavos | using System.Threading.Tasks; |
14 | 174bbb6e | Panagiotis Kanavos | |
15 | 174bbb6e | Panagiotis Kanavos | namespace Pithos.Core |
16 | 174bbb6e | Panagiotis Kanavos | { |
17 | 174bbb6e | Panagiotis Kanavos | /// <summary>Provides an asynchronous producer/consumer collection.</summary> |
18 | 174bbb6e | Panagiotis Kanavos | [DebuggerDisplay("Count={CurrentCount}")] |
19 | 174bbb6e | Panagiotis Kanavos | public sealed class AsyncCollection<T> |
20 | 174bbb6e | Panagiotis Kanavos | { |
21 | 174bbb6e | Panagiotis Kanavos | /// <summary>Asynchronous semaphore used to keep track of asynchronous work.</summary> |
22 | 174bbb6e | Panagiotis Kanavos | private AsyncSemaphore _semaphore = new AsyncSemaphore(0); |
23 | 174bbb6e | Panagiotis Kanavos | /// <summary>The data stored in the collection.</summary> |
24 | 174bbb6e | Panagiotis Kanavos | private IProducerConsumerCollection<T> _collection; |
25 | 174bbb6e | Panagiotis Kanavos | |
26 | 174bbb6e | Panagiotis Kanavos | /// <summary>Initializes the asynchronous producer/consumer collection to store data in a first-in-first-out (FIFO) order.</summary> |
27 | 174bbb6e | Panagiotis Kanavos | public AsyncCollection() : this(new ConcurrentQueue<T>()) { } |
28 | 174bbb6e | Panagiotis Kanavos | |
29 | 174bbb6e | Panagiotis Kanavos | /// <summary>Initializes the asynchronous producer/consumer collection.</summary> |
30 | 174bbb6e | Panagiotis Kanavos | /// <param name="collection">The underlying collection to use to store data.</param> |
31 | 174bbb6e | Panagiotis Kanavos | public AsyncCollection(IProducerConsumerCollection<T> collection) |
32 | 174bbb6e | Panagiotis Kanavos | { |
33 | 174bbb6e | Panagiotis Kanavos | if (collection == null) throw new ArgumentNullException("collection"); |
34 | 174bbb6e | Panagiotis Kanavos | _collection = collection; |
35 | 174bbb6e | Panagiotis Kanavos | } |
36 | 174bbb6e | Panagiotis Kanavos | |
37 | 174bbb6e | Panagiotis Kanavos | /// <summary>Adds an element to the collection.</summary> |
38 | 174bbb6e | Panagiotis Kanavos | /// <param name="item">The item to be added.</param> |
39 | 174bbb6e | Panagiotis Kanavos | public void Add(T item) |
40 | 174bbb6e | Panagiotis Kanavos | { |
41 | 174bbb6e | Panagiotis Kanavos | if (_collection.TryAdd(item)) |
42 | 174bbb6e | Panagiotis Kanavos | Task.Factory.StartNew(s => ((AsyncSemaphore)s).Release(), |
43 | 174bbb6e | Panagiotis Kanavos | _semaphore, CancellationToken.None, TaskCreationOptions.PreferFairness, TaskScheduler.Default); |
44 | 174bbb6e | Panagiotis Kanavos | else throw new InvalidOperationException("Invalid collection"); |
45 | 174bbb6e | Panagiotis Kanavos | } |
46 | 174bbb6e | Panagiotis Kanavos | |
47 | 174bbb6e | Panagiotis Kanavos | /// <summary>Takes an element from the collection asynchronously.</summary> |
48 | 174bbb6e | Panagiotis Kanavos | /// <returns>A Task that represents the element removed from the collection.</returns> |
49 | 174bbb6e | Panagiotis Kanavos | public Task<T> Take() |
50 | 174bbb6e | Panagiotis Kanavos | { |
51 | 174bbb6e | Panagiotis Kanavos | return _semaphore.WaitAsync().ContinueWith(_ => |
52 | 174bbb6e | Panagiotis Kanavos | { |
53 | 174bbb6e | Panagiotis Kanavos | T result; |
54 | 174bbb6e | Panagiotis Kanavos | if (!_collection.TryTake(out result)) throw new InvalidOperationException("Invalid collection"); |
55 | 174bbb6e | Panagiotis Kanavos | return result; |
56 | 174bbb6e | Panagiotis Kanavos | }, TaskContinuationOptions.OnlyOnRanToCompletion); |
57 | 174bbb6e | Panagiotis Kanavos | } |
58 | 174bbb6e | Panagiotis Kanavos | |
59 | 174bbb6e | Panagiotis Kanavos | /// <summary>Gets the number of elements in the collection.</summary> |
60 | 174bbb6e | Panagiotis Kanavos | public int Count { get { return _collection.Count; } } |
61 | 174bbb6e | Panagiotis Kanavos | |
62 | 174bbb6e | Panagiotis Kanavos | } |
63 | 174bbb6e | Panagiotis Kanavos | } |