//--------------------------------------------------------------------------
//
// Copyright (c) Microsoft Corporation. All rights reserved.
//
// File: ConcurrentExclusiveInterleave.cs
//
//--------------------------------------------------------------------------
using System.Collections.Generic;
using System.Diagnostics;
namespace System.Threading.Tasks.Schedulers
{
/// Provides concurrent and exclusive task schedulers that coordinate.
[DebuggerDisplay("ConcurrentTasksWaiting={ConcurrentTaskCount}, ExclusiveTasksWaiting={ExclusiveTaskCount}")]
[DebuggerTypeProxy(typeof(ConcurrentExclusiveInterleaveDebugView))]
public sealed class ConcurrentExclusiveInterleave
{
/// Provides a debug view for ConcurrentExclusiveInterleave.
internal class ConcurrentExclusiveInterleaveDebugView
{
/// The interleave being debugged.
private ConcurrentExclusiveInterleave _interleave;
/// Initializes the debug view.
/// The interleave being debugged.
public ConcurrentExclusiveInterleaveDebugView(ConcurrentExclusiveInterleave interleave)
{
if (interleave == null) throw new ArgumentNullException("interleave");
_interleave = interleave;
}
public IEnumerable ExclusiveTasksWaiting { get { return _interleave._exclusiveTaskScheduler.Tasks; } }
/// Gets the number of tasks waiting to run concurrently.
public IEnumerable ConcurrentTasksWaiting { get { return _interleave._concurrentTaskScheduler.Tasks; } }
/// Gets a description of the processing task for debugging purposes.
public Task InterleaveTask { get { return _interleave._taskExecuting; } }
}
/// Synchronizes all activity in this type and its generated schedulers.
private readonly object _internalLock;
/// The parallel options used by the asynchronous task and parallel loops.
private ParallelOptions _parallelOptions;
/// The scheduler used to queue and execute "reader" tasks that may run concurrently with other readers.
private ConcurrentExclusiveTaskScheduler _concurrentTaskScheduler;
/// The scheduler used to queue and execute "writer" tasks that must run exclusively while no other tasks for this interleave are running.
private ConcurrentExclusiveTaskScheduler _exclusiveTaskScheduler;
/// Whether this interleave has queued its processing task.
private Task _taskExecuting;
/// Whether the exclusive processing of a task should include all of its children as well.
private bool _exclusiveProcessingIncludesChildren;
/// Initialies the ConcurrentExclusiveInterleave.
public ConcurrentExclusiveInterleave() :
this(TaskScheduler.Current, false) {}
/// Initialies the ConcurrentExclusiveInterleave.
/// Whether the exclusive processing of a task should include all of its children as well.
public ConcurrentExclusiveInterleave(bool exclusiveProcessingIncludesChildren) :
this(TaskScheduler.Current, exclusiveProcessingIncludesChildren) {}
/// Initialies the ConcurrentExclusiveInterleave.
/// The target scheduler on which this interleave should execute.
public ConcurrentExclusiveInterleave(TaskScheduler targetScheduler) :
this(targetScheduler, false) {}
/// Initialies the ConcurrentExclusiveInterleave.
/// The target scheduler on which this interleave should execute.
/// Whether the exclusive processing of a task should include all of its children as well.
public ConcurrentExclusiveInterleave(TaskScheduler targetScheduler, bool exclusiveProcessingIncludesChildren)
{
// A scheduler must be provided
if (targetScheduler == null) throw new ArgumentNullException("targetScheduler");
// Create the state for this interleave
_internalLock = new object();
_exclusiveProcessingIncludesChildren = exclusiveProcessingIncludesChildren;
_parallelOptions = new ParallelOptions() { TaskScheduler = targetScheduler };
_concurrentTaskScheduler = new ConcurrentExclusiveTaskScheduler(this, new Queue(), targetScheduler.MaximumConcurrencyLevel);
_exclusiveTaskScheduler = new ConcurrentExclusiveTaskScheduler(this, new Queue(), 1);
}
///
/// Gets a TaskScheduler that can be used to schedule tasks to this interleave
/// that may run concurrently with other tasks on this interleave.
///
public TaskScheduler ConcurrentTaskScheduler { get { return _concurrentTaskScheduler; } }
///
/// Gets a TaskScheduler that can be used to schedule tasks to this interleave
/// that must run exclusively with regards to other tasks on this interleave.
///
public TaskScheduler ExclusiveTaskScheduler { get { return _exclusiveTaskScheduler; } }
/// Gets the number of tasks waiting to run exclusively.
private int ExclusiveTaskCount { get { lock (_internalLock) return _exclusiveTaskScheduler.Tasks.Count; } }
/// Gets the number of tasks waiting to run concurrently.
private int ConcurrentTaskCount { get { lock (_internalLock) return _concurrentTaskScheduler.Tasks.Count; } }
/// Notifies the interleave that new work has arrived to be processed.
/// Must only be called while holding the lock.
internal void NotifyOfNewWork()
{
// If a task is already running, bail.
if (_taskExecuting != null) return;
// Otherwise, run the processor. Store the task and then start it to ensure that
// the assignment happens before the body of the task runs.
_taskExecuting = new Task(ConcurrentExclusiveInterleaveProcessor, CancellationToken.None, TaskCreationOptions.None);
_taskExecuting.Start(_parallelOptions.TaskScheduler);
}
/// The body of the async processor to be run in a Task. Only one should be running at a time.
/// This has been separated out into its own method to improve the Parallel Tasks window experience.
private void ConcurrentExclusiveInterleaveProcessor()
{
// Run while there are more tasks to be processed. We assume that the first time through,
// there are tasks. If they aren't, worst case is we try to process and find none.
bool runTasks = true;
bool cleanupOnExit = true;
while (runTasks)
{
try
{
// Process all waiting exclusive tasks
foreach (var task in GetExclusiveTasks())
{
_exclusiveTaskScheduler.ExecuteTask(task);
// Just because we executed the task doesn't mean it's "complete",
// if it has child tasks that have not yet completed
// and will complete later asynchronously. To account for this,
// if a task isn't yet completed, leave the interleave processor
// but leave it still in a running state. When the task completes,
// we'll come back in and keep going. Note that the children
// must not be scheduled to this interleave, or this will deadlock.
if (_exclusiveProcessingIncludesChildren && !task.IsCompleted)
{
cleanupOnExit = false;
task.ContinueWith(_ => ConcurrentExclusiveInterleaveProcessor(), _parallelOptions.TaskScheduler);
return;
}
}
// Process all waiting concurrent tasks *until* any exclusive tasks show up, in which
// case we want to switch over to processing those (by looping around again).
Parallel.ForEach(GetConcurrentTasksUntilExclusiveExists(), _parallelOptions,
ExecuteConcurrentTask);
}
finally
{
if (cleanupOnExit)
{
lock (_internalLock)
{
// If there are no tasks remaining, we're done. If there are, loop around and go again.
if (_concurrentTaskScheduler.Tasks.Count == 0 && _exclusiveTaskScheduler.Tasks.Count == 0)
{
_taskExecuting = null;
runTasks = false;
}
}
}
}
}
}
/// Runs a concurrent task.
/// The task to execute.
/// This has been separated out into its own method to improve the Parallel Tasks window experience.
private void ExecuteConcurrentTask(Task task) { _concurrentTaskScheduler.ExecuteTask(task); }
///
/// Gets an enumerable that yields waiting concurrent tasks one at a time until
/// either there are no more concurrent tasks or there are any exclusive tasks.
///
private IEnumerable GetConcurrentTasksUntilExclusiveExists()
{
while (true)
{
Task foundTask = null;
lock (_internalLock)
{
if (_exclusiveTaskScheduler.Tasks.Count == 0 &&
_concurrentTaskScheduler.Tasks.Count > 0)
{
foundTask = _concurrentTaskScheduler.Tasks.Dequeue();
}
}
if (foundTask != null) yield return foundTask;
else yield break;
}
}
///
/// Gets an enumerable that yields all of the exclusive tasks one at a time.
///
private IEnumerable GetExclusiveTasks()
{
while (true)
{
Task foundTask = null;
lock (_internalLock)
{
if (_exclusiveTaskScheduler.Tasks.Count > 0) foundTask = _exclusiveTaskScheduler.Tasks.Dequeue();
}
if (foundTask != null) yield return foundTask;
else yield break;
}
}
///
/// A scheduler shim used to queue tasks to the interleave and execute those tasks on request of the interleave.
///
private class ConcurrentExclusiveTaskScheduler : TaskScheduler
{
/// The parent interleave.
private readonly ConcurrentExclusiveInterleave _interleave;
/// The maximum concurrency level for the scheduler.
private readonly int _maximumConcurrencyLevel;
/// Whether a Task is currently being processed on this thread.
private ThreadLocal _processingTaskOnCurrentThread = new ThreadLocal();
/// Initializes the scheduler.
/// The parent interleave.
/// The queue to store queued tasks into.
internal ConcurrentExclusiveTaskScheduler(ConcurrentExclusiveInterleave interleave, Queue tasks, int maximumConcurrencyLevel)
{
if (interleave == null) throw new ArgumentNullException("interleave");
if (tasks == null) throw new ArgumentNullException("tasks");
_interleave = interleave;
_maximumConcurrencyLevel = maximumConcurrencyLevel;
Tasks = tasks;
}
/// Gets the maximum concurrency level this scheduler is able to support.
public override int MaximumConcurrencyLevel { get { return _maximumConcurrencyLevel; } }
/// Gets the queue of tasks for this scheduler.
internal Queue Tasks { get; private set; }
/// Queues a task to the scheduler.
/// The task to be queued.
protected override void QueueTask(Task task)
{
lock (_interleave._internalLock)
{
Tasks.Enqueue(task);
_interleave.NotifyOfNewWork();
}
}
/// Executes a task on this scheduler.
/// The task to be executed.
internal void ExecuteTask(Task task)
{
var processingTaskOnCurrentThread = _processingTaskOnCurrentThread.Value;
if (!processingTaskOnCurrentThread) _processingTaskOnCurrentThread.Value = true;
base.TryExecuteTask(task);
if (!processingTaskOnCurrentThread) _processingTaskOnCurrentThread.Value = false;
}
/// Tries to execute the task synchronously on this scheduler.
/// The task to execute.
/// Whether the task was previously queued to the scheduler.
/// true if the task could be executed; otherwise, false.
protected override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued)
{
if (_processingTaskOnCurrentThread.Value)
{
var t = new Task(state => TryExecuteTask((Task)state), task);
t.RunSynchronously(_interleave._parallelOptions.TaskScheduler);
return t.Result;
}
return false;
}
/// Gets for debugging purposes the tasks scheduled to this scheduler.
/// An enumerable of the tasks queued.
protected override IEnumerable GetScheduledTasks() { return Tasks; }
}
}
}