//-------------------------------------------------------------------------- // // Copyright (c) Microsoft Corporation. All rights reserved. // // File: ConcurrentExclusiveInterleave.cs // //-------------------------------------------------------------------------- using System.Collections.Generic; using System.Diagnostics; namespace System.Threading.Tasks.Schedulers { /// Provides concurrent and exclusive task schedulers that coordinate. [DebuggerDisplay("ConcurrentTasksWaiting={ConcurrentTaskCount}, ExclusiveTasksWaiting={ExclusiveTaskCount}")] [DebuggerTypeProxy(typeof(ConcurrentExclusiveInterleaveDebugView))] public sealed class ConcurrentExclusiveInterleave { /// Provides a debug view for ConcurrentExclusiveInterleave. internal class ConcurrentExclusiveInterleaveDebugView { /// The interleave being debugged. private ConcurrentExclusiveInterleave _interleave; /// Initializes the debug view. /// The interleave being debugged. public ConcurrentExclusiveInterleaveDebugView(ConcurrentExclusiveInterleave interleave) { if (interleave == null) throw new ArgumentNullException("interleave"); _interleave = interleave; } public IEnumerable ExclusiveTasksWaiting { get { return _interleave._exclusiveTaskScheduler.Tasks; } } /// Gets the number of tasks waiting to run concurrently. public IEnumerable ConcurrentTasksWaiting { get { return _interleave._concurrentTaskScheduler.Tasks; } } /// Gets a description of the processing task for debugging purposes. public Task InterleaveTask { get { return _interleave._taskExecuting; } } } /// Synchronizes all activity in this type and its generated schedulers. private readonly object _internalLock; /// The parallel options used by the asynchronous task and parallel loops. private ParallelOptions _parallelOptions; /// The scheduler used to queue and execute "reader" tasks that may run concurrently with other readers. private ConcurrentExclusiveTaskScheduler _concurrentTaskScheduler; /// The scheduler used to queue and execute "writer" tasks that must run exclusively while no other tasks for this interleave are running. private ConcurrentExclusiveTaskScheduler _exclusiveTaskScheduler; /// Whether this interleave has queued its processing task. private Task _taskExecuting; /// Whether the exclusive processing of a task should include all of its children as well. private bool _exclusiveProcessingIncludesChildren; /// Initialies the ConcurrentExclusiveInterleave. public ConcurrentExclusiveInterleave() : this(TaskScheduler.Current, false) {} /// Initialies the ConcurrentExclusiveInterleave. /// Whether the exclusive processing of a task should include all of its children as well. public ConcurrentExclusiveInterleave(bool exclusiveProcessingIncludesChildren) : this(TaskScheduler.Current, exclusiveProcessingIncludesChildren) {} /// Initialies the ConcurrentExclusiveInterleave. /// The target scheduler on which this interleave should execute. public ConcurrentExclusiveInterleave(TaskScheduler targetScheduler) : this(targetScheduler, false) {} /// Initialies the ConcurrentExclusiveInterleave. /// The target scheduler on which this interleave should execute. /// Whether the exclusive processing of a task should include all of its children as well. public ConcurrentExclusiveInterleave(TaskScheduler targetScheduler, bool exclusiveProcessingIncludesChildren) { // A scheduler must be provided if (targetScheduler == null) throw new ArgumentNullException("targetScheduler"); // Create the state for this interleave _internalLock = new object(); _exclusiveProcessingIncludesChildren = exclusiveProcessingIncludesChildren; _parallelOptions = new ParallelOptions() { TaskScheduler = targetScheduler }; _concurrentTaskScheduler = new ConcurrentExclusiveTaskScheduler(this, new Queue(), targetScheduler.MaximumConcurrencyLevel); _exclusiveTaskScheduler = new ConcurrentExclusiveTaskScheduler(this, new Queue(), 1); } /// /// Gets a TaskScheduler that can be used to schedule tasks to this interleave /// that may run concurrently with other tasks on this interleave. /// public TaskScheduler ConcurrentTaskScheduler { get { return _concurrentTaskScheduler; } } /// /// Gets a TaskScheduler that can be used to schedule tasks to this interleave /// that must run exclusively with regards to other tasks on this interleave. /// public TaskScheduler ExclusiveTaskScheduler { get { return _exclusiveTaskScheduler; } } /// Gets the number of tasks waiting to run exclusively. private int ExclusiveTaskCount { get { lock (_internalLock) return _exclusiveTaskScheduler.Tasks.Count; } } /// Gets the number of tasks waiting to run concurrently. private int ConcurrentTaskCount { get { lock (_internalLock) return _concurrentTaskScheduler.Tasks.Count; } } /// Notifies the interleave that new work has arrived to be processed. /// Must only be called while holding the lock. internal void NotifyOfNewWork() { // If a task is already running, bail. if (_taskExecuting != null) return; // Otherwise, run the processor. Store the task and then start it to ensure that // the assignment happens before the body of the task runs. _taskExecuting = new Task(ConcurrentExclusiveInterleaveProcessor, CancellationToken.None, TaskCreationOptions.None); _taskExecuting.Start(_parallelOptions.TaskScheduler); } /// The body of the async processor to be run in a Task. Only one should be running at a time. /// This has been separated out into its own method to improve the Parallel Tasks window experience. private void ConcurrentExclusiveInterleaveProcessor() { // Run while there are more tasks to be processed. We assume that the first time through, // there are tasks. If they aren't, worst case is we try to process and find none. bool runTasks = true; bool cleanupOnExit = true; while (runTasks) { try { // Process all waiting exclusive tasks foreach (var task in GetExclusiveTasks()) { _exclusiveTaskScheduler.ExecuteTask(task); // Just because we executed the task doesn't mean it's "complete", // if it has child tasks that have not yet completed // and will complete later asynchronously. To account for this, // if a task isn't yet completed, leave the interleave processor // but leave it still in a running state. When the task completes, // we'll come back in and keep going. Note that the children // must not be scheduled to this interleave, or this will deadlock. if (_exclusiveProcessingIncludesChildren && !task.IsCompleted) { cleanupOnExit = false; task.ContinueWith(_ => ConcurrentExclusiveInterleaveProcessor(), _parallelOptions.TaskScheduler); return; } } // Process all waiting concurrent tasks *until* any exclusive tasks show up, in which // case we want to switch over to processing those (by looping around again). Parallel.ForEach(GetConcurrentTasksUntilExclusiveExists(), _parallelOptions, ExecuteConcurrentTask); } finally { if (cleanupOnExit) { lock (_internalLock) { // If there are no tasks remaining, we're done. If there are, loop around and go again. if (_concurrentTaskScheduler.Tasks.Count == 0 && _exclusiveTaskScheduler.Tasks.Count == 0) { _taskExecuting = null; runTasks = false; } } } } } } /// Runs a concurrent task. /// The task to execute. /// This has been separated out into its own method to improve the Parallel Tasks window experience. private void ExecuteConcurrentTask(Task task) { _concurrentTaskScheduler.ExecuteTask(task); } /// /// Gets an enumerable that yields waiting concurrent tasks one at a time until /// either there are no more concurrent tasks or there are any exclusive tasks. /// private IEnumerable GetConcurrentTasksUntilExclusiveExists() { while (true) { Task foundTask = null; lock (_internalLock) { if (_exclusiveTaskScheduler.Tasks.Count == 0 && _concurrentTaskScheduler.Tasks.Count > 0) { foundTask = _concurrentTaskScheduler.Tasks.Dequeue(); } } if (foundTask != null) yield return foundTask; else yield break; } } /// /// Gets an enumerable that yields all of the exclusive tasks one at a time. /// private IEnumerable GetExclusiveTasks() { while (true) { Task foundTask = null; lock (_internalLock) { if (_exclusiveTaskScheduler.Tasks.Count > 0) foundTask = _exclusiveTaskScheduler.Tasks.Dequeue(); } if (foundTask != null) yield return foundTask; else yield break; } } /// /// A scheduler shim used to queue tasks to the interleave and execute those tasks on request of the interleave. /// private class ConcurrentExclusiveTaskScheduler : TaskScheduler { /// The parent interleave. private readonly ConcurrentExclusiveInterleave _interleave; /// The maximum concurrency level for the scheduler. private readonly int _maximumConcurrencyLevel; /// Whether a Task is currently being processed on this thread. private ThreadLocal _processingTaskOnCurrentThread = new ThreadLocal(); /// Initializes the scheduler. /// The parent interleave. /// The queue to store queued tasks into. internal ConcurrentExclusiveTaskScheduler(ConcurrentExclusiveInterleave interleave, Queue tasks, int maximumConcurrencyLevel) { if (interleave == null) throw new ArgumentNullException("interleave"); if (tasks == null) throw new ArgumentNullException("tasks"); _interleave = interleave; _maximumConcurrencyLevel = maximumConcurrencyLevel; Tasks = tasks; } /// Gets the maximum concurrency level this scheduler is able to support. public override int MaximumConcurrencyLevel { get { return _maximumConcurrencyLevel; } } /// Gets the queue of tasks for this scheduler. internal Queue Tasks { get; private set; } /// Queues a task to the scheduler. /// The task to be queued. protected override void QueueTask(Task task) { lock (_interleave._internalLock) { Tasks.Enqueue(task); _interleave.NotifyOfNewWork(); } } /// Executes a task on this scheduler. /// The task to be executed. internal void ExecuteTask(Task task) { var processingTaskOnCurrentThread = _processingTaskOnCurrentThread.Value; if (!processingTaskOnCurrentThread) _processingTaskOnCurrentThread.Value = true; base.TryExecuteTask(task); if (!processingTaskOnCurrentThread) _processingTaskOnCurrentThread.Value = false; } /// Tries to execute the task synchronously on this scheduler. /// The task to execute. /// Whether the task was previously queued to the scheduler. /// true if the task could be executed; otherwise, false. protected override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued) { if (_processingTaskOnCurrentThread.Value) { var t = new Task(state => TryExecuteTask((Task)state), task); t.RunSynchronously(_interleave._parallelOptions.TaskScheduler); return t.Result; } return false; } /// Gets for debugging purposes the tasks scheduled to this scheduler. /// An enumerable of the tasks queued. protected override IEnumerable GetScheduledTasks() { return Tasks; } } } }