Modifications to enable Sync Pausing for all operations
[pithos-ms-client] / trunk / Pithos.Core / Agents / AsyncCollection.cs
1 //--------------------------------------------------------------------------
2 // 
3 //  Copyright (c) Microsoft Corporation.  All rights reserved. 
4 // 
5 //  File: AsyncProducerConsumerCollection.cs
6 //
7 //--------------------------------------------------------------------------
8
9 using System;
10 using System.Collections.Concurrent;
11 using System.Diagnostics;
12 using System.Threading;
13 using System.Threading.Tasks;
14
15 namespace Pithos.Core
16 {
17     /// <summary>Provides an asynchronous producer/consumer collection.</summary>
18     [DebuggerDisplay("Count={CurrentCount}")]
19     public sealed class AsyncCollection<T> 
20     {
21         /// <summary>Asynchronous semaphore used to keep track of asynchronous work.</summary>
22         private AsyncSemaphore _semaphore = new AsyncSemaphore(0);
23         /// <summary>The data stored in the collection.</summary>
24         private IProducerConsumerCollection<T> _collection;
25
26         /// <summary>Initializes the asynchronous producer/consumer collection to store data in a first-in-first-out (FIFO) order.</summary>
27         public AsyncCollection() : this(new ConcurrentQueue<T>()) { }
28
29         /// <summary>Initializes the asynchronous producer/consumer collection.</summary>
30         /// <param name="collection">The underlying collection to use to store data.</param>
31         public AsyncCollection(IProducerConsumerCollection<T> collection)
32         {
33             if (collection == null) throw new ArgumentNullException("collection");
34             _collection = collection;
35         }
36
37         /// <summary>Adds an element to the collection.</summary>
38         /// <param name="item">The item to be added.</param>
39         public void Add(T item)
40         {
41             if (_collection.TryAdd(item))
42                 Task.Factory.StartNew(s => ((AsyncSemaphore)s).Release(),
43                     _semaphore, CancellationToken.None, TaskCreationOptions.PreferFairness, TaskScheduler.Default);
44             else throw new InvalidOperationException("Invalid collection");
45         }
46
47         /// <summary>Takes an element from the collection asynchronously.</summary>
48         /// <returns>A Task that represents the element removed from the collection.</returns>
49         public Task<T> Take()
50         {
51             return _semaphore.WaitAsync().ContinueWith(_ =>
52             {
53                 T result;
54                 if (!_collection.TryTake(out result)) throw new InvalidOperationException("Invalid collection");
55                 return result;
56             }, TaskContinuationOptions.OnlyOnRanToCompletion);
57         }
58
59         /// <summary>Gets the number of elements in the collection.</summary>
60         public int Count { get { return _collection.Count; } }
61
62     }
63 }