All files
[pithos-ms-client] / trunk / Libraries / ParallelExtensionsExtras / TaskSchedulers / ConcurrentExclusiveInterleave.cs
1 //--------------------------------------------------------------------------
2 // 
3 //  Copyright (c) Microsoft Corporation.  All rights reserved. 
4 // 
5 //  File: ConcurrentExclusiveInterleave.cs
6 //
7 //--------------------------------------------------------------------------
8
9 using System.Collections.Generic;
10 using System.Diagnostics;
11
12 namespace System.Threading.Tasks.Schedulers
13 {
14     /// <summary>Provides concurrent and exclusive task schedulers that coordinate.</summary>
15     [DebuggerDisplay("ConcurrentTasksWaiting={ConcurrentTaskCount}, ExclusiveTasksWaiting={ExclusiveTaskCount}")]
16     [DebuggerTypeProxy(typeof(ConcurrentExclusiveInterleaveDebugView))]
17     public sealed class ConcurrentExclusiveInterleave
18     {
19         /// <summary>Provides a debug view for ConcurrentExclusiveInterleave.</summary>
20         internal class ConcurrentExclusiveInterleaveDebugView
21         {
22             /// <summary>The interleave being debugged.</summary>
23             private ConcurrentExclusiveInterleave _interleave;
24
25             /// <summary>Initializes the debug view.</summary>
26             /// <param name="interleave">The interleave being debugged.</param>
27             public ConcurrentExclusiveInterleaveDebugView(ConcurrentExclusiveInterleave interleave)
28             {
29                 if (interleave == null) throw new ArgumentNullException("interleave");
30                 _interleave = interleave;
31             }
32
33             public IEnumerable<Task> ExclusiveTasksWaiting { get { return _interleave._exclusiveTaskScheduler.Tasks; } }
34             /// <summary>Gets the number of tasks waiting to run concurrently.</summary>
35             public IEnumerable<Task> ConcurrentTasksWaiting { get { return _interleave._concurrentTaskScheduler.Tasks; } }
36             /// <summary>Gets a description of the processing task for debugging purposes.</summary>
37             public Task InterleaveTask { get { return _interleave._taskExecuting; } }
38         }
39
40         /// <summary>Synchronizes all activity in this type and its generated schedulers.</summary>
41         private readonly object _internalLock;
42         /// <summary>The parallel options used by the asynchronous task and parallel loops.</summary>
43         private ParallelOptions _parallelOptions;
44         /// <summary>The scheduler used to queue and execute "reader" tasks that may run concurrently with other readers.</summary>
45         private ConcurrentExclusiveTaskScheduler _concurrentTaskScheduler;
46         /// <summary>The scheduler used to queue and execute "writer" tasks that must run exclusively while no other tasks for this interleave are running.</summary>
47         private ConcurrentExclusiveTaskScheduler _exclusiveTaskScheduler;
48         /// <summary>Whether this interleave has queued its processing task.</summary>
49         private Task _taskExecuting;
50         /// <summary>Whether the exclusive processing of a task should include all of its children as well.</summary>
51         private bool _exclusiveProcessingIncludesChildren;
52
53         /// <summary>Initialies the ConcurrentExclusiveInterleave.</summary>
54         public ConcurrentExclusiveInterleave() : 
55             this(TaskScheduler.Current, false) {}
56
57         /// <summary>Initialies the ConcurrentExclusiveInterleave.</summary>
58         /// <param name="exclusiveProcessingIncludesChildren">Whether the exclusive processing of a task should include all of its children as well.</param>
59         public ConcurrentExclusiveInterleave(bool exclusiveProcessingIncludesChildren) : 
60             this(TaskScheduler.Current, exclusiveProcessingIncludesChildren) {}
61
62         /// <summary>Initialies the ConcurrentExclusiveInterleave.</summary>
63         /// <param name="targetScheduler">The target scheduler on which this interleave should execute.</param>
64         public ConcurrentExclusiveInterleave(TaskScheduler targetScheduler) :
65             this(targetScheduler, false) {}
66
67         /// <summary>Initialies the ConcurrentExclusiveInterleave.</summary>
68         /// <param name="targetScheduler">The target scheduler on which this interleave should execute.</param>
69         /// <param name="exclusiveProcessingIncludesChildren">Whether the exclusive processing of a task should include all of its children as well.</param>
70         public ConcurrentExclusiveInterleave(TaskScheduler targetScheduler, bool exclusiveProcessingIncludesChildren)
71         {
72             // A scheduler must be provided
73             if (targetScheduler == null) throw new ArgumentNullException("targetScheduler");
74
75             // Create the state for this interleave
76             _internalLock = new object();
77             _exclusiveProcessingIncludesChildren = exclusiveProcessingIncludesChildren;
78             _parallelOptions = new ParallelOptions() { TaskScheduler = targetScheduler };
79             _concurrentTaskScheduler = new ConcurrentExclusiveTaskScheduler(this, new Queue<Task>(), targetScheduler.MaximumConcurrencyLevel);
80             _exclusiveTaskScheduler = new ConcurrentExclusiveTaskScheduler(this, new Queue<Task>(), 1);
81         }
82
83         /// <summary>
84         /// Gets a TaskScheduler that can be used to schedule tasks to this interleave
85         /// that may run concurrently with other tasks on this interleave.
86         /// </summary>
87         public TaskScheduler ConcurrentTaskScheduler { get { return _concurrentTaskScheduler; } }
88         /// <summary>
89         /// Gets a TaskScheduler that can be used to schedule tasks to this interleave
90         /// that must run exclusively with regards to other tasks on this interleave.
91         /// </summary>
92         public TaskScheduler ExclusiveTaskScheduler { get { return _exclusiveTaskScheduler; } }
93
94         /// <summary>Gets the number of tasks waiting to run exclusively.</summary>
95         private int ExclusiveTaskCount { get { lock (_internalLock) return _exclusiveTaskScheduler.Tasks.Count; } }
96         /// <summary>Gets the number of tasks waiting to run concurrently.</summary>
97         private int ConcurrentTaskCount { get { lock (_internalLock) return _concurrentTaskScheduler.Tasks.Count; } }
98
99         /// <summary>Notifies the interleave that new work has arrived to be processed.</summary>
100         /// <remarks>Must only be called while holding the lock.</remarks>
101         internal void NotifyOfNewWork()
102         {
103             // If a task is already running, bail.  
104             if (_taskExecuting != null) return;
105
106             // Otherwise, run the processor. Store the task and then start it to ensure that 
107             // the assignment happens before the body of the task runs.
108             _taskExecuting = new Task(ConcurrentExclusiveInterleaveProcessor, CancellationToken.None, TaskCreationOptions.None);
109             _taskExecuting.Start(_parallelOptions.TaskScheduler);
110         }
111
112         /// <summary>The body of the async processor to be run in a Task.  Only one should be running at a time.</summary>
113         /// <remarks>This has been separated out into its own method to improve the Parallel Tasks window experience.</remarks>
114         private void ConcurrentExclusiveInterleaveProcessor()
115         {
116             // Run while there are more tasks to be processed.  We assume that the first time through,
117             // there are tasks.  If they aren't, worst case is we try to process and find none.
118             bool runTasks = true;
119             bool cleanupOnExit = true;
120             while (runTasks)
121             {
122                 try
123                 {
124                     // Process all waiting exclusive tasks
125                     foreach (var task in GetExclusiveTasks())
126                     {
127                         _exclusiveTaskScheduler.ExecuteTask(task);
128
129                         // Just because we executed the task doesn't mean it's "complete",
130                         // if it has child tasks that have not yet completed
131                         // and will complete later asynchronously.  To account for this, 
132                         // if a task isn't yet completed, leave the interleave processor 
133                         // but leave it still in a running state.  When the task completes,
134                         // we'll come back in and keep going.  Note that the children
135                         // must not be scheduled to this interleave, or this will deadlock.
136                         if (_exclusiveProcessingIncludesChildren && !task.IsCompleted)
137                         {
138                             cleanupOnExit = false;
139                             task.ContinueWith(_ => ConcurrentExclusiveInterleaveProcessor(), _parallelOptions.TaskScheduler);
140                             return;
141                         }
142                     }
143
144                     // Process all waiting concurrent tasks *until* any exclusive tasks show up, in which
145                     // case we want to switch over to processing those (by looping around again).
146                     Parallel.ForEach(GetConcurrentTasksUntilExclusiveExists(), _parallelOptions,
147                         ExecuteConcurrentTask);
148                 }
149                 finally
150                 {
151                     if (cleanupOnExit)
152                     {
153                         lock (_internalLock)
154                         {
155                             // If there are no tasks remaining, we're done. If there are, loop around and go again.
156                             if (_concurrentTaskScheduler.Tasks.Count == 0 && _exclusiveTaskScheduler.Tasks.Count == 0)
157                             {
158                                 _taskExecuting = null;
159                                 runTasks = false;
160                             }
161                         }
162                     }
163                 }
164             }
165         }
166
167         /// <summary>Runs a concurrent task.</summary>
168         /// <param name="task">The task to execute.</param>
169         /// <remarks>This has been separated out into its own method to improve the Parallel Tasks window experience.</remarks>
170         private void ExecuteConcurrentTask(Task task) { _concurrentTaskScheduler.ExecuteTask(task); }
171
172         /// <summary>
173         /// Gets an enumerable that yields waiting concurrent tasks one at a time until
174         /// either there are no more concurrent tasks or there are any exclusive tasks.
175         /// </summary>
176         private IEnumerable<Task> GetConcurrentTasksUntilExclusiveExists()
177         {
178             while (true)
179             {
180                 Task foundTask = null;
181                 lock (_internalLock)
182                 {
183                     if (_exclusiveTaskScheduler.Tasks.Count == 0 &&
184                         _concurrentTaskScheduler.Tasks.Count > 0)
185                     {
186                         foundTask = _concurrentTaskScheduler.Tasks.Dequeue();
187                     }
188                 }
189                 if (foundTask != null) yield return foundTask;
190                 else yield break;
191             }
192         }
193
194         /// <summary>
195         /// Gets an enumerable that yields all of the exclusive tasks one at a time.
196         /// </summary>
197         private IEnumerable<Task> GetExclusiveTasks()
198         {
199             while (true)
200             {
201                 Task foundTask = null;
202                 lock (_internalLock)
203                 {
204                     if (_exclusiveTaskScheduler.Tasks.Count > 0) foundTask = _exclusiveTaskScheduler.Tasks.Dequeue();
205                 }
206                 if (foundTask != null) yield return foundTask;
207                 else yield break;
208             }
209         }
210
211         /// <summary>
212         /// A scheduler shim used to queue tasks to the interleave and execute those tasks on request of the interleave.
213         /// </summary>
214         private class ConcurrentExclusiveTaskScheduler : TaskScheduler
215         {
216             /// <summary>The parent interleave.</summary>
217             private readonly ConcurrentExclusiveInterleave _interleave;
218             /// <summary>The maximum concurrency level for the scheduler.</summary>
219             private readonly int _maximumConcurrencyLevel;
220             /// <summary>Whether a Task is currently being processed on this thread.</summary>
221             private ThreadLocal<bool> _processingTaskOnCurrentThread = new ThreadLocal<bool>();
222
223             /// <summary>Initializes the scheduler.</summary>
224             /// <param name="interleave">The parent interleave.</param>
225             /// <param name="tasks">The queue to store queued tasks into.</param>
226             internal ConcurrentExclusiveTaskScheduler(ConcurrentExclusiveInterleave interleave, Queue<Task> tasks, int maximumConcurrencyLevel)
227             {
228                 if (interleave == null) throw new ArgumentNullException("interleave");
229                 if (tasks == null) throw new ArgumentNullException("tasks");
230                 _interleave = interleave;
231                 _maximumConcurrencyLevel = maximumConcurrencyLevel;
232                 Tasks = tasks;
233             }
234
235             /// <summary>Gets the maximum concurrency level this scheduler is able to support.</summary>
236             public override int MaximumConcurrencyLevel { get { return _maximumConcurrencyLevel; } }
237
238             /// <summary>Gets the queue of tasks for this scheduler.</summary>
239             internal Queue<Task> Tasks { get; private set; }
240
241             /// <summary>Queues a task to the scheduler.</summary>
242             /// <param name="task">The task to be queued.</param>
243             protected override void QueueTask(Task task)
244             {
245                 lock (_interleave._internalLock)
246                 {
247                     Tasks.Enqueue(task);
248                     _interleave.NotifyOfNewWork();
249                 }
250             }
251
252             /// <summary>Executes a task on this scheduler.</summary>
253             /// <param name="task">The task to be executed.</param>
254             internal void ExecuteTask(Task task) 
255             {
256                 var processingTaskOnCurrentThread = _processingTaskOnCurrentThread.Value;
257                 if (!processingTaskOnCurrentThread) _processingTaskOnCurrentThread.Value = true;
258                 base.TryExecuteTask(task);
259                 if (!processingTaskOnCurrentThread) _processingTaskOnCurrentThread.Value = false;
260             }
261
262             /// <summary>Tries to execute the task synchronously on this scheduler.</summary>
263             /// <param name="task">The task to execute.</param>
264             /// <param name="taskWasPreviouslyQueued">Whether the task was previously queued to the scheduler.</param>
265             /// <returns>true if the task could be executed; otherwise, false.</returns>
266             protected override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued)
267             {
268                 if (_processingTaskOnCurrentThread.Value)
269                 {
270                     var t = new Task<bool>(state => TryExecuteTask((Task)state), task);
271                     t.RunSynchronously(_interleave._parallelOptions.TaskScheduler);
272                     return t.Result;
273                 }
274                 return false;
275             }
276
277             /// <summary>Gets for debugging purposes the tasks scheduled to this scheduler.</summary>
278             /// <returns>An enumerable of the tasks queued.</returns>
279             protected override IEnumerable<Task> GetScheduledTasks() { return Tasks; }
280         }
281     }
282 }