All files
[pithos-ms-client] / trunk / Libraries / ParallelExtensionsExtras / TaskSchedulers / StaTaskScheduler.cs
1 //--------------------------------------------------------------------------
2 // 
3 //  Copyright (c) Microsoft Corporation.  All rights reserved. 
4 // 
5 //  File: StaTaskScheduler.cs
6 //
7 //--------------------------------------------------------------------------
8
9 using System.Collections.Concurrent;
10 using System.Collections.Generic;
11 using System.Linq;
12
13 namespace System.Threading.Tasks.Schedulers
14 {
15     /// <summary>Provides a scheduler that uses STA threads.</summary>
16     public sealed class StaTaskScheduler : TaskScheduler, IDisposable
17     {
18         /// <summary>Stores the queued tasks to be executed by our pool of STA threads.</summary>
19         private BlockingCollection<Task> _tasks;
20         /// <summary>The STA threads used by the scheduler.</summary>
21         private readonly List<Thread> _threads;
22
23         /// <summary>Initializes a new instance of the StaTaskScheduler class with the specified concurrency level.</summary>
24         /// <param name="numberOfThreads">The number of threads that should be created and used by this scheduler.</param>
25         public StaTaskScheduler(int numberOfThreads)
26         {
27             // Validate arguments
28             if (numberOfThreads < 1) throw new ArgumentOutOfRangeException("concurrencyLevel");
29
30             // Initialize the tasks collection
31             _tasks = new BlockingCollection<Task>();
32
33             // Create the threads to be used by this scheduler
34             _threads = Enumerable.Range(0, numberOfThreads).Select(i =>
35                        {
36                            var thread = new Thread(() =>
37                            {
38                                // Continually get the next task and try to execute it.
39                                // This will continue until the scheduler is disposed and no more tasks remain.
40                                foreach (var t in _tasks.GetConsumingEnumerable())
41                                {
42                                    TryExecuteTask(t);
43                                }
44                            });
45                            thread.IsBackground = true;
46                            thread.SetApartmentState(ApartmentState.STA);
47                            return thread;
48                        }).ToList();
49
50             // Start all of the threads
51             _threads.ForEach(t => t.Start());
52         }
53
54         /// <summary>Queues a Task to be executed by this scheduler.</summary>
55         /// <param name="task">The task to be executed.</param>
56         protected override void QueueTask(Task task)
57         {
58             // Push it into the blocking collection of tasks
59             _tasks.Add(task);
60         }
61
62         /// <summary>Provides a list of the scheduled tasks for the debugger to consume.</summary>
63         /// <returns>An enumerable of all tasks currently scheduled.</returns>
64         protected override IEnumerable<Task> GetScheduledTasks()
65         {
66             // Serialize the contents of the blocking collection of tasks for the debugger
67             return _tasks.ToArray();
68         }
69
70         /// <summary>Determines whether a Task may be inlined.</summary>
71         /// <param name="task">The task to be executed.</param>
72         /// <param name="taskWasPreviouslyQueued">Whether the task was previously queued.</param>
73         /// <returns>true if the task was successfully inlined; otherwise, false.</returns>
74         protected override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued)
75         {
76             // Try to inline if the current thread is STA
77             return
78                 Thread.CurrentThread.GetApartmentState() == ApartmentState.STA &&
79                 TryExecuteTask(task);
80         }
81
82         /// <summary>Gets the maximum concurrency level supported by this scheduler.</summary>
83         public override int MaximumConcurrencyLevel
84         {
85             get { return _threads.Count; }
86         }
87
88         /// <summary>
89         /// Cleans up the scheduler by indicating that no more tasks will be queued.
90         /// This method blocks until all threads successfully shutdown.
91         /// </summary>
92         public void Dispose()
93         {
94             if (_tasks != null)
95             {
96                 // Indicate that no new tasks will be coming in
97                 _tasks.CompleteAdding();
98
99                 // Wait for all threads to finish processing tasks
100                 foreach (var thread in _threads) thread.Join();
101
102                 // Cleanup
103                 _tasks.Dispose();
104                 _tasks = null;
105             }
106         }
107     }
108 }