root / trunk / Libraries / ParallelExtensionsExtras / Extensions / TaskExtrasExtensions.cs @ d17258c2
History | View | Annotate | Download (14.3 kB)
1 |
//-------------------------------------------------------------------------- |
---|---|
2 |
// |
3 |
// Copyright (c) Microsoft Corporation. All rights reserved. |
4 |
// |
5 |
// File: TaskExtensions.cs |
6 |
// |
7 |
//-------------------------------------------------------------------------- |
8 |
|
9 |
using System.Linq; |
10 |
using System.Windows.Threading; |
11 |
|
12 |
namespace System.Threading.Tasks |
13 |
{ |
14 |
/// <summary>Extensions methods for Task.</summary> |
15 |
public static class TaskExtrasExtensions |
16 |
{ |
17 |
#region ContinueWith accepting TaskFactory |
18 |
/// <summary>Creates a continuation task using the specified TaskFactory.</summary> |
19 |
/// <param name="task">The antecedent Task.</param> |
20 |
/// <param name="continuationAction">The continuation action.</param> |
21 |
/// <param name="factory">The TaskFactory.</param> |
22 |
/// <returns>A continuation task.</returns> |
23 |
public static Task ContinueWith( |
24 |
this Task task, Action<Task> continuationAction, TaskFactory factory) |
25 |
{ |
26 |
return task.ContinueWith(continuationAction, factory.CancellationToken, factory.ContinuationOptions, factory.Scheduler); |
27 |
} |
28 |
|
29 |
/// <summary>Creates a continuation task using the specified TaskFactory.</summary> |
30 |
/// <param name="task">The antecedent Task.</param> |
31 |
/// <param name="continuationFunction">The continuation function.</param> |
32 |
/// <param name="factory">The TaskFactory.</param> |
33 |
/// <returns>A continuation task.</returns> |
34 |
public static Task<TResult> ContinueWith<TResult>( |
35 |
this Task task, Func<Task, TResult> continuationFunction, TaskFactory factory) |
36 |
{ |
37 |
return task.ContinueWith(continuationFunction, factory.CancellationToken, factory.ContinuationOptions, factory.Scheduler); |
38 |
} |
39 |
#endregion |
40 |
|
41 |
#region ContinueWith accepting TaskFactory<TResult> |
42 |
/// <summary>Creates a continuation task using the specified TaskFactory.</summary> |
43 |
/// <param name="task">The antecedent Task.</param> |
44 |
/// <param name="continuationAction">The continuation action.</param> |
45 |
/// <param name="factory">The TaskFactory.</param> |
46 |
/// <returns>A continuation task.</returns> |
47 |
public static Task ContinueWith<TResult>( |
48 |
this Task<TResult> task, Action<Task<TResult>> continuationAction, TaskFactory<TResult> factory) |
49 |
{ |
50 |
return task.ContinueWith(continuationAction, factory.CancellationToken, factory.ContinuationOptions, factory.Scheduler); |
51 |
} |
52 |
|
53 |
/// <summary>Creates a continuation task using the specified TaskFactory.</summary> |
54 |
/// <param name="task">The antecedent Task.</param> |
55 |
/// <param name="continuationFunction">The continuation function.</param> |
56 |
/// <param name="factory">The TaskFactory.</param> |
57 |
/// <returns>A continuation task.</returns> |
58 |
public static Task<TNewResult> ContinueWith<TResult, TNewResult>( |
59 |
this Task<TResult> task, Func<Task<TResult>, TNewResult> continuationFunction, TaskFactory<TResult> factory) |
60 |
{ |
61 |
return task.ContinueWith(continuationFunction, factory.CancellationToken, factory.ContinuationOptions, factory.Scheduler); |
62 |
} |
63 |
#endregion |
64 |
|
65 |
#region ToAsync(AsyncCallback, object) |
66 |
/// <summary> |
67 |
/// Creates a Task that represents the completion of another Task, and |
68 |
/// that schedules an AsyncCallback to run upon completion. |
69 |
/// </summary> |
70 |
/// <param name="task">The antecedent Task.</param> |
71 |
/// <param name="callback">The AsyncCallback to run.</param> |
72 |
/// <param name="state">The object state to use with the AsyncCallback.</param> |
73 |
/// <returns>The new task.</returns> |
74 |
public static Task ToAsync(this Task task, AsyncCallback callback, object state) |
75 |
{ |
76 |
if (task == null) throw new ArgumentNullException("task"); |
77 |
|
78 |
var tcs = new TaskCompletionSource<object>(state); |
79 |
task.ContinueWith(_ => |
80 |
{ |
81 |
tcs.SetFromTask(task); |
82 |
if (callback != null) callback(tcs.Task); |
83 |
}); |
84 |
return tcs.Task; |
85 |
} |
86 |
|
87 |
/// <summary> |
88 |
/// Creates a Task that represents the completion of another Task, and |
89 |
/// that schedules an AsyncCallback to run upon completion. |
90 |
/// </summary> |
91 |
/// <param name="task">The antecedent Task.</param> |
92 |
/// <param name="callback">The AsyncCallback to run.</param> |
93 |
/// <param name="state">The object state to use with the AsyncCallback.</param> |
94 |
/// <returns>The new task.</returns> |
95 |
public static Task<TResult> ToAsync<TResult>(this Task<TResult> task, AsyncCallback callback, object state) |
96 |
{ |
97 |
if (task == null) throw new ArgumentNullException("task"); |
98 |
|
99 |
var tcs = new TaskCompletionSource<TResult>(state); |
100 |
task.ContinueWith(_ => |
101 |
{ |
102 |
tcs.SetFromTask(task); |
103 |
if (callback != null) callback(tcs.Task); |
104 |
}); |
105 |
return tcs.Task; |
106 |
} |
107 |
#endregion |
108 |
|
109 |
#region Exception Handling |
110 |
/// <summary>Suppresses default exception handling of a Task that would otherwise reraise the exception on the finalizer thread.</summary> |
111 |
/// <param name="task">The Task to be monitored.</param> |
112 |
/// <returns>The original Task.</returns> |
113 |
public static Task IgnoreExceptions(this Task task) |
114 |
{ |
115 |
task.ContinueWith(t => { var ignored = t.Exception; }, |
116 |
CancellationToken.None, |
117 |
TaskContinuationOptions.ExecuteSynchronously | TaskContinuationOptions.OnlyOnFaulted, |
118 |
TaskScheduler.Default); |
119 |
return task; |
120 |
} |
121 |
|
122 |
/// <summary>Suppresses default exception handling of a Task that would otherwise reraise the exception on the finalizer thread.</summary> |
123 |
/// <param name="task">The Task to be monitored.</param> |
124 |
/// <returns>The original Task.</returns> |
125 |
public static Task<T> IgnoreExceptions<T>(this Task<T> task) |
126 |
{ |
127 |
return (Task<T>)((Task)task).IgnoreExceptions(); |
128 |
} |
129 |
|
130 |
/// <summary>Fails immediately when an exception is encountered.</summary> |
131 |
/// <param name="task">The Task to be monitored.</param> |
132 |
/// <returns>The original Task.</returns> |
133 |
public static Task FailFastOnException(this Task task) |
134 |
{ |
135 |
task.ContinueWith(t => Environment.FailFast("A task faulted.", t.Exception), |
136 |
CancellationToken.None, |
137 |
TaskContinuationOptions.ExecuteSynchronously | TaskContinuationOptions.OnlyOnFaulted, |
138 |
TaskScheduler.Default); |
139 |
return task; |
140 |
} |
141 |
|
142 |
/// <summary>Fails immediately when an exception is encountered.</summary> |
143 |
/// <param name="task">The Task to be monitored.</param> |
144 |
/// <returns>The original Task.</returns> |
145 |
public static Task<T> FailFastOnException<T>(this Task<T> task) |
146 |
{ |
147 |
return (Task<T>)((Task)task).FailFastOnException(); |
148 |
} |
149 |
|
150 |
/// <summary>Propagates any exceptions that occurred on the specified task.</summary> |
151 |
/// <param name="task">The Task whose exceptions are to be propagated.</param> |
152 |
public static void PropagateExceptions(this Task task) |
153 |
{ |
154 |
if (!task.IsCompleted) throw new InvalidOperationException("The task has not completed."); |
155 |
if (task.IsFaulted) task.Wait(); |
156 |
} |
157 |
|
158 |
/// <summary>Propagates any exceptions that occurred on the specified tasks.</summary> |
159 |
/// <param name="task">The Tassk whose exceptions are to be propagated.</param> |
160 |
public static void PropagateExceptions(this Task [] tasks) |
161 |
{ |
162 |
if (tasks == null) throw new ArgumentNullException("tasks"); |
163 |
if (tasks.Any(t => t == null)) throw new ArgumentException("tasks"); |
164 |
if (tasks.Any(t => !t.IsCompleted)) throw new InvalidOperationException("A task has not completed."); |
165 |
Task.WaitAll(tasks); |
166 |
} |
167 |
#endregion |
168 |
|
169 |
#region Observables |
170 |
/// <summary>Creates an IObservable that represents the completion of a Task.</summary> |
171 |
/// <typeparam name="TResult">Specifies the type of data returned by the Task.</typeparam> |
172 |
/// <param name="task">The Task to be represented as an IObservable.</param> |
173 |
/// <returns>An IObservable that represents the completion of the Task.</returns> |
174 |
public static IObservable<TResult> ToObservable<TResult>(this Task<TResult> task) |
175 |
{ |
176 |
if (task == null) throw new ArgumentNullException("task"); |
177 |
return new TaskObservable<TResult> { _task = task }; |
178 |
} |
179 |
|
180 |
/// <summary>An implementation of IObservable that wraps a Task.</summary> |
181 |
/// <typeparam name="TResult">The type of data returned by the task.</typeparam> |
182 |
private class TaskObservable<TResult> : IObservable<TResult> |
183 |
{ |
184 |
internal Task<TResult> _task; |
185 |
|
186 |
public IDisposable Subscribe(IObserver<TResult> observer) |
187 |
{ |
188 |
// Validate arguments |
189 |
if (observer == null) throw new ArgumentNullException("observer"); |
190 |
|
191 |
// Support cancelling the continuation if the observer is unsubscribed |
192 |
var cts = new CancellationTokenSource(); |
193 |
|
194 |
// Create a continuation to pass data along to the observer |
195 |
_task.ContinueWith(t => |
196 |
{ |
197 |
switch (t.Status) |
198 |
{ |
199 |
case TaskStatus.RanToCompletion: |
200 |
observer.OnNext(_task.Result); |
201 |
observer.OnCompleted(); |
202 |
break; |
203 |
|
204 |
case TaskStatus.Faulted: |
205 |
observer.OnError(_task.Exception); |
206 |
break; |
207 |
|
208 |
case TaskStatus.Canceled: |
209 |
observer.OnError(new TaskCanceledException(t)); |
210 |
break; |
211 |
} |
212 |
}, cts.Token); |
213 |
|
214 |
// Support unsubscribe simply by canceling the continuation if it hasn't yet run |
215 |
return new CancelOnDispose { Source = cts }; |
216 |
} |
217 |
} |
218 |
|
219 |
/// <summary>Translate a call to IDisposable.Dispose to a CancellationTokenSource.Cancel.</summary> |
220 |
private class CancelOnDispose : IDisposable |
221 |
{ |
222 |
internal CancellationTokenSource Source; |
223 |
void IDisposable.Dispose() { Source.Cancel(); } |
224 |
} |
225 |
#endregion |
226 |
|
227 |
#region Timeouts |
228 |
/// <summary>Creates a new Task that mirrors the supplied task but that will be canceled after the specified timeout.</summary> |
229 |
/// <typeparam name="TResult">Specifies the type of data contained in the task.</typeparam> |
230 |
/// <param name="task">The task.</param> |
231 |
/// <param name="timeout">The timeout.</param> |
232 |
/// <returns>The new Task that may time out.</returns> |
233 |
public static Task WithTimeout(this Task task, TimeSpan timeout) |
234 |
{ |
235 |
var result = new TaskCompletionSource<object>(task.AsyncState); |
236 |
var timer = new Timer(state => ((TaskCompletionSource<object>)state).TrySetCanceled(), result, timeout, TimeSpan.FromMilliseconds(-1)); |
237 |
task.ContinueWith(t => |
238 |
{ |
239 |
timer.Dispose(); |
240 |
result.TrySetFromTask(t); |
241 |
}, TaskContinuationOptions.ExecuteSynchronously); |
242 |
return result.Task; |
243 |
} |
244 |
|
245 |
/// <summary>Creates a new Task that mirrors the supplied task but that will be canceled after the specified timeout.</summary> |
246 |
/// <typeparam name="TResult">Specifies the type of data contained in the task.</typeparam> |
247 |
/// <param name="task">The task.</param> |
248 |
/// <param name="timeout">The timeout.</param> |
249 |
/// <returns>The new Task that may time out.</returns> |
250 |
public static Task<TResult> WithTimeout<TResult>(this Task<TResult> task, TimeSpan timeout) |
251 |
{ |
252 |
var result = new TaskCompletionSource<TResult>(task.AsyncState); |
253 |
var timer = new Timer(state => ((TaskCompletionSource<TResult>)state).TrySetCanceled(), result, timeout, TimeSpan.FromMilliseconds(-1)); |
254 |
task.ContinueWith(t => |
255 |
{ |
256 |
timer.Dispose(); |
257 |
result.TrySetFromTask(t); |
258 |
}, TaskContinuationOptions.ExecuteSynchronously); |
259 |
return result.Task; |
260 |
} |
261 |
#endregion |
262 |
|
263 |
#region Children |
264 |
/// <summary> |
265 |
/// Ensures that a parent task can't transition into a completed state |
266 |
/// until the specified task has also completed, even if it's not |
267 |
/// already a child task. |
268 |
/// </summary> |
269 |
/// <param name="task">The task to attach to the current task as a child.</param> |
270 |
public static void AttachToParent(this Task task) |
271 |
{ |
272 |
if (task == null) throw new ArgumentNullException("task"); |
273 |
task.ContinueWith(t => t.Wait(), CancellationToken.None, |
274 |
TaskContinuationOptions.AttachedToParent | |
275 |
TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Default); |
276 |
} |
277 |
#endregion |
278 |
|
279 |
#region Waiting |
280 |
/// <summary>Waits for the task to complete execution, pumping in the meantime.</summary> |
281 |
/// <param name="task">The task for which to wait.</param> |
282 |
/// <remarks>This method is intended for usage with Windows Presentation Foundation.</remarks> |
283 |
public static void WaitWithPumping(this Task task) |
284 |
{ |
285 |
if (task == null) throw new ArgumentNullException("task"); |
286 |
var nestedFrame = new DispatcherFrame(); |
287 |
task.ContinueWith(_ => nestedFrame.Continue = false); |
288 |
Dispatcher.PushFrame(nestedFrame); |
289 |
task.Wait(); |
290 |
} |
291 |
|
292 |
/// <summary>Waits for the task to complete execution, returning the task's final status.</summary> |
293 |
/// <param name="task">The task for which to wait.</param> |
294 |
/// <returns>The completion status of the task.</returns> |
295 |
/// <remarks>Unlike Wait, this method will not throw an exception if the task ends in the Faulted or Canceled state.</remarks> |
296 |
public static TaskStatus WaitForCompletionStatus(this Task task) |
297 |
{ |
298 |
if (task == null) throw new ArgumentNullException("task"); |
299 |
((IAsyncResult)task).AsyncWaitHandle.WaitOne(); |
300 |
return task.Status; |
301 |
} |
302 |
#endregion |
303 |
} |
304 |
} |