1 //--------------------------------------------------------------------------
3 // Copyright (c) Microsoft Corporation. All rights reserved.
5 // File: ConcurrentExclusiveInterleave.cs
7 //--------------------------------------------------------------------------
9 using System.Collections.Generic;
10 using System.Diagnostics;
12 namespace System.Threading.Tasks.Schedulers
14 /// <summary>Provides concurrent and exclusive task schedulers that coordinate.</summary>
15 [DebuggerDisplay("ConcurrentTasksWaiting={ConcurrentTaskCount}, ExclusiveTasksWaiting={ExclusiveTaskCount}")]
16 [DebuggerTypeProxy(typeof(ConcurrentExclusiveInterleaveDebugView))]
17 public sealed class ConcurrentExclusiveInterleave
19 /// <summary>Provides a debug view for ConcurrentExclusiveInterleave.</summary>
20 internal class ConcurrentExclusiveInterleaveDebugView
22 /// <summary>The interleave being debugged.</summary>
23 private ConcurrentExclusiveInterleave _interleave;
25 /// <summary>Initializes the debug view.</summary>
26 /// <param name="interleave">The interleave being debugged.</param>
27 public ConcurrentExclusiveInterleaveDebugView(ConcurrentExclusiveInterleave interleave)
29 if (interleave == null) throw new ArgumentNullException("interleave");
30 _interleave = interleave;
33 public IEnumerable<Task> ExclusiveTasksWaiting { get { return _interleave._exclusiveTaskScheduler.Tasks; } }
34 /// <summary>Gets the number of tasks waiting to run concurrently.</summary>
35 public IEnumerable<Task> ConcurrentTasksWaiting { get { return _interleave._concurrentTaskScheduler.Tasks; } }
36 /// <summary>Gets a description of the processing task for debugging purposes.</summary>
37 public Task InterleaveTask { get { return _interleave._taskExecuting; } }
40 /// <summary>Synchronizes all activity in this type and its generated schedulers.</summary>
41 private readonly object _internalLock;
42 /// <summary>The parallel options used by the asynchronous task and parallel loops.</summary>
43 private ParallelOptions _parallelOptions;
44 /// <summary>The scheduler used to queue and execute "reader" tasks that may run concurrently with other readers.</summary>
45 private ConcurrentExclusiveTaskScheduler _concurrentTaskScheduler;
46 /// <summary>The scheduler used to queue and execute "writer" tasks that must run exclusively while no other tasks for this interleave are running.</summary>
47 private ConcurrentExclusiveTaskScheduler _exclusiveTaskScheduler;
48 /// <summary>Whether this interleave has queued its processing task.</summary>
49 private Task _taskExecuting;
50 /// <summary>Whether the exclusive processing of a task should include all of its children as well.</summary>
51 private bool _exclusiveProcessingIncludesChildren;
53 /// <summary>Initialies the ConcurrentExclusiveInterleave.</summary>
54 public ConcurrentExclusiveInterleave() :
55 this(TaskScheduler.Current, false) {}
57 /// <summary>Initialies the ConcurrentExclusiveInterleave.</summary>
58 /// <param name="exclusiveProcessingIncludesChildren">Whether the exclusive processing of a task should include all of its children as well.</param>
59 public ConcurrentExclusiveInterleave(bool exclusiveProcessingIncludesChildren) :
60 this(TaskScheduler.Current, exclusiveProcessingIncludesChildren) {}
62 /// <summary>Initialies the ConcurrentExclusiveInterleave.</summary>
63 /// <param name="targetScheduler">The target scheduler on which this interleave should execute.</param>
64 public ConcurrentExclusiveInterleave(TaskScheduler targetScheduler) :
65 this(targetScheduler, false) {}
67 /// <summary>Initialies the ConcurrentExclusiveInterleave.</summary>
68 /// <param name="targetScheduler">The target scheduler on which this interleave should execute.</param>
69 /// <param name="exclusiveProcessingIncludesChildren">Whether the exclusive processing of a task should include all of its children as well.</param>
70 public ConcurrentExclusiveInterleave(TaskScheduler targetScheduler, bool exclusiveProcessingIncludesChildren)
72 // A scheduler must be provided
73 if (targetScheduler == null) throw new ArgumentNullException("targetScheduler");
75 // Create the state for this interleave
76 _internalLock = new object();
77 _exclusiveProcessingIncludesChildren = exclusiveProcessingIncludesChildren;
78 _parallelOptions = new ParallelOptions() { TaskScheduler = targetScheduler };
79 _concurrentTaskScheduler = new ConcurrentExclusiveTaskScheduler(this, new Queue<Task>(), targetScheduler.MaximumConcurrencyLevel);
80 _exclusiveTaskScheduler = new ConcurrentExclusiveTaskScheduler(this, new Queue<Task>(), 1);
84 /// Gets a TaskScheduler that can be used to schedule tasks to this interleave
85 /// that may run concurrently with other tasks on this interleave.
87 public TaskScheduler ConcurrentTaskScheduler { get { return _concurrentTaskScheduler; } }
89 /// Gets a TaskScheduler that can be used to schedule tasks to this interleave
90 /// that must run exclusively with regards to other tasks on this interleave.
92 public TaskScheduler ExclusiveTaskScheduler { get { return _exclusiveTaskScheduler; } }
94 /// <summary>Gets the number of tasks waiting to run exclusively.</summary>
95 private int ExclusiveTaskCount { get { lock (_internalLock) return _exclusiveTaskScheduler.Tasks.Count; } }
96 /// <summary>Gets the number of tasks waiting to run concurrently.</summary>
97 private int ConcurrentTaskCount { get { lock (_internalLock) return _concurrentTaskScheduler.Tasks.Count; } }
99 /// <summary>Notifies the interleave that new work has arrived to be processed.</summary>
100 /// <remarks>Must only be called while holding the lock.</remarks>
101 internal void NotifyOfNewWork()
103 // If a task is already running, bail.
104 if (_taskExecuting != null) return;
106 // Otherwise, run the processor. Store the task and then start it to ensure that
107 // the assignment happens before the body of the task runs.
108 _taskExecuting = new Task(ConcurrentExclusiveInterleaveProcessor, CancellationToken.None, TaskCreationOptions.None);
109 _taskExecuting.Start(_parallelOptions.TaskScheduler);
112 /// <summary>The body of the async processor to be run in a Task. Only one should be running at a time.</summary>
113 /// <remarks>This has been separated out into its own method to improve the Parallel Tasks window experience.</remarks>
114 private void ConcurrentExclusiveInterleaveProcessor()
116 // Run while there are more tasks to be processed. We assume that the first time through,
117 // there are tasks. If they aren't, worst case is we try to process and find none.
118 bool runTasks = true;
119 bool cleanupOnExit = true;
124 // Process all waiting exclusive tasks
125 foreach (var task in GetExclusiveTasks())
127 _exclusiveTaskScheduler.ExecuteTask(task);
129 // Just because we executed the task doesn't mean it's "complete",
130 // if it has child tasks that have not yet completed
131 // and will complete later asynchronously. To account for this,
132 // if a task isn't yet completed, leave the interleave processor
133 // but leave it still in a running state. When the task completes,
134 // we'll come back in and keep going. Note that the children
135 // must not be scheduled to this interleave, or this will deadlock.
136 if (_exclusiveProcessingIncludesChildren && !task.IsCompleted)
138 cleanupOnExit = false;
139 task.ContinueWith(_ => ConcurrentExclusiveInterleaveProcessor(), _parallelOptions.TaskScheduler);
144 // Process all waiting concurrent tasks *until* any exclusive tasks show up, in which
145 // case we want to switch over to processing those (by looping around again).
146 Parallel.ForEach(GetConcurrentTasksUntilExclusiveExists(), _parallelOptions,
147 ExecuteConcurrentTask);
155 // If there are no tasks remaining, we're done. If there are, loop around and go again.
156 if (_concurrentTaskScheduler.Tasks.Count == 0 && _exclusiveTaskScheduler.Tasks.Count == 0)
158 _taskExecuting = null;
167 /// <summary>Runs a concurrent task.</summary>
168 /// <param name="task">The task to execute.</param>
169 /// <remarks>This has been separated out into its own method to improve the Parallel Tasks window experience.</remarks>
170 private void ExecuteConcurrentTask(Task task) { _concurrentTaskScheduler.ExecuteTask(task); }
173 /// Gets an enumerable that yields waiting concurrent tasks one at a time until
174 /// either there are no more concurrent tasks or there are any exclusive tasks.
176 private IEnumerable<Task> GetConcurrentTasksUntilExclusiveExists()
180 Task foundTask = null;
183 if (_exclusiveTaskScheduler.Tasks.Count == 0 &&
184 _concurrentTaskScheduler.Tasks.Count > 0)
186 foundTask = _concurrentTaskScheduler.Tasks.Dequeue();
189 if (foundTask != null) yield return foundTask;
195 /// Gets an enumerable that yields all of the exclusive tasks one at a time.
197 private IEnumerable<Task> GetExclusiveTasks()
201 Task foundTask = null;
204 if (_exclusiveTaskScheduler.Tasks.Count > 0) foundTask = _exclusiveTaskScheduler.Tasks.Dequeue();
206 if (foundTask != null) yield return foundTask;
212 /// A scheduler shim used to queue tasks to the interleave and execute those tasks on request of the interleave.
214 private class ConcurrentExclusiveTaskScheduler : TaskScheduler
216 /// <summary>The parent interleave.</summary>
217 private readonly ConcurrentExclusiveInterleave _interleave;
218 /// <summary>The maximum concurrency level for the scheduler.</summary>
219 private readonly int _maximumConcurrencyLevel;
220 /// <summary>Whether a Task is currently being processed on this thread.</summary>
221 private ThreadLocal<bool> _processingTaskOnCurrentThread = new ThreadLocal<bool>();
223 /// <summary>Initializes the scheduler.</summary>
224 /// <param name="interleave">The parent interleave.</param>
225 /// <param name="tasks">The queue to store queued tasks into.</param>
226 internal ConcurrentExclusiveTaskScheduler(ConcurrentExclusiveInterleave interleave, Queue<Task> tasks, int maximumConcurrencyLevel)
228 if (interleave == null) throw new ArgumentNullException("interleave");
229 if (tasks == null) throw new ArgumentNullException("tasks");
230 _interleave = interleave;
231 _maximumConcurrencyLevel = maximumConcurrencyLevel;
235 /// <summary>Gets the maximum concurrency level this scheduler is able to support.</summary>
236 public override int MaximumConcurrencyLevel { get { return _maximumConcurrencyLevel; } }
238 /// <summary>Gets the queue of tasks for this scheduler.</summary>
239 internal Queue<Task> Tasks { get; private set; }
241 /// <summary>Queues a task to the scheduler.</summary>
242 /// <param name="task">The task to be queued.</param>
243 protected override void QueueTask(Task task)
245 lock (_interleave._internalLock)
248 _interleave.NotifyOfNewWork();
252 /// <summary>Executes a task on this scheduler.</summary>
253 /// <param name="task">The task to be executed.</param>
254 internal void ExecuteTask(Task task)
256 var processingTaskOnCurrentThread = _processingTaskOnCurrentThread.Value;
257 if (!processingTaskOnCurrentThread) _processingTaskOnCurrentThread.Value = true;
258 base.TryExecuteTask(task);
259 if (!processingTaskOnCurrentThread) _processingTaskOnCurrentThread.Value = false;
262 /// <summary>Tries to execute the task synchronously on this scheduler.</summary>
263 /// <param name="task">The task to execute.</param>
264 /// <param name="taskWasPreviouslyQueued">Whether the task was previously queued to the scheduler.</param>
265 /// <returns>true if the task could be executed; otherwise, false.</returns>
266 protected override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued)
268 if (_processingTaskOnCurrentThread.Value)
270 var t = new Task<bool>(state => TryExecuteTask((Task)state), task);
271 t.RunSynchronously(_interleave._parallelOptions.TaskScheduler);
277 /// <summary>Gets for debugging purposes the tasks scheduled to this scheduler.</summary>
278 /// <returns>An enumerable of the tasks queued.</returns>
279 protected override IEnumerable<Task> GetScheduledTasks() { return Tasks; }