Statistics
| Branch: | Revision:

root / trunk / Libraries / ParallelExtensionsExtras / Extensions / TaskExtrasExtensions.cs @ d17258c2

History | View | Annotate | Download (14.3 kB)

1
//--------------------------------------------------------------------------
2
// 
3
//  Copyright (c) Microsoft Corporation.  All rights reserved. 
4
// 
5
//  File: TaskExtensions.cs
6
//
7
//--------------------------------------------------------------------------
8

    
9
using System.Linq;
10
using System.Windows.Threading;
11

    
12
namespace System.Threading.Tasks
13
{
14
    /// <summary>Extensions methods for Task.</summary>
15
    public static class TaskExtrasExtensions
16
    {
17
        #region ContinueWith accepting TaskFactory
18
        /// <summary>Creates a continuation task using the specified TaskFactory.</summary>
19
        /// <param name="task">The antecedent Task.</param>
20
        /// <param name="continuationAction">The continuation action.</param>
21
        /// <param name="factory">The TaskFactory.</param>
22
        /// <returns>A continuation task.</returns>
23
        public static Task ContinueWith(
24
            this Task task, Action<Task> continuationAction, TaskFactory factory)
25
        {
26
            return task.ContinueWith(continuationAction, factory.CancellationToken, factory.ContinuationOptions, factory.Scheduler);
27
        }
28

    
29
        /// <summary>Creates a continuation task using the specified TaskFactory.</summary>
30
        /// <param name="task">The antecedent Task.</param>
31
        /// <param name="continuationFunction">The continuation function.</param>
32
        /// <param name="factory">The TaskFactory.</param>
33
        /// <returns>A continuation task.</returns>
34
        public static Task<TResult> ContinueWith<TResult>(
35
            this Task task, Func<Task, TResult> continuationFunction, TaskFactory factory)
36
        {
37
            return task.ContinueWith(continuationFunction, factory.CancellationToken, factory.ContinuationOptions, factory.Scheduler);
38
        }
39
        #endregion
40

    
41
        #region ContinueWith accepting TaskFactory<TResult>
42
        /// <summary>Creates a continuation task using the specified TaskFactory.</summary>
43
        /// <param name="task">The antecedent Task.</param>
44
        /// <param name="continuationAction">The continuation action.</param>
45
        /// <param name="factory">The TaskFactory.</param>
46
        /// <returns>A continuation task.</returns>
47
        public static Task ContinueWith<TResult>(
48
            this Task<TResult> task, Action<Task<TResult>> continuationAction, TaskFactory<TResult> factory)
49
        {
50
            return task.ContinueWith(continuationAction, factory.CancellationToken, factory.ContinuationOptions, factory.Scheduler);
51
        }
52

    
53
        /// <summary>Creates a continuation task using the specified TaskFactory.</summary>
54
        /// <param name="task">The antecedent Task.</param>
55
        /// <param name="continuationFunction">The continuation function.</param>
56
        /// <param name="factory">The TaskFactory.</param>
57
        /// <returns>A continuation task.</returns>
58
        public static Task<TNewResult> ContinueWith<TResult, TNewResult>(
59
            this Task<TResult> task, Func<Task<TResult>, TNewResult> continuationFunction, TaskFactory<TResult> factory)
60
        {
61
            return task.ContinueWith(continuationFunction, factory.CancellationToken, factory.ContinuationOptions, factory.Scheduler);
62
        }
63
        #endregion
64

    
65
        #region ToAsync(AsyncCallback, object)
66
        /// <summary>
67
        /// Creates a Task that represents the completion of another Task, and 
68
        /// that schedules an AsyncCallback to run upon completion.
69
        /// </summary>
70
        /// <param name="task">The antecedent Task.</param>
71
        /// <param name="callback">The AsyncCallback to run.</param>
72
        /// <param name="state">The object state to use with the AsyncCallback.</param>
73
        /// <returns>The new task.</returns>
74
        public static Task ToAsync(this Task task, AsyncCallback callback, object state)
75
        {
76
            if (task == null) throw new ArgumentNullException("task");
77

    
78
            var tcs = new TaskCompletionSource<object>(state);
79
            task.ContinueWith(_ =>
80
            {
81
                tcs.SetFromTask(task);
82
                if (callback != null) callback(tcs.Task);
83
            });
84
            return tcs.Task;
85
        }
86

    
87
        /// <summary>
88
        /// Creates a Task that represents the completion of another Task, and 
89
        /// that schedules an AsyncCallback to run upon completion.
90
        /// </summary>
91
        /// <param name="task">The antecedent Task.</param>
92
        /// <param name="callback">The AsyncCallback to run.</param>
93
        /// <param name="state">The object state to use with the AsyncCallback.</param>
94
        /// <returns>The new task.</returns>
95
        public static Task<TResult> ToAsync<TResult>(this Task<TResult> task, AsyncCallback callback, object state)
96
        {
97
            if (task == null) throw new ArgumentNullException("task");
98

    
99
            var tcs = new TaskCompletionSource<TResult>(state);
100
            task.ContinueWith(_ =>
101
            {
102
                tcs.SetFromTask(task);
103
                if (callback != null) callback(tcs.Task);
104
            });
105
            return tcs.Task;
106
        }
107
        #endregion
108

    
109
        #region Exception Handling
110
        /// <summary>Suppresses default exception handling of a Task that would otherwise reraise the exception on the finalizer thread.</summary>
111
        /// <param name="task">The Task to be monitored.</param>
112
        /// <returns>The original Task.</returns>
113
        public static Task IgnoreExceptions(this Task task)
114
        {
115
            task.ContinueWith(t => { var ignored = t.Exception; }, 
116
                CancellationToken.None,
117
                TaskContinuationOptions.ExecuteSynchronously | TaskContinuationOptions.OnlyOnFaulted, 
118
                TaskScheduler.Default);
119
            return task;
120
        }
121

    
122
        /// <summary>Suppresses default exception handling of a Task that would otherwise reraise the exception on the finalizer thread.</summary>
123
        /// <param name="task">The Task to be monitored.</param>
124
        /// <returns>The original Task.</returns>
125
        public static Task<T> IgnoreExceptions<T>(this Task<T> task)
126
        {
127
            return (Task<T>)((Task)task).IgnoreExceptions();
128
        }
129

    
130
        /// <summary>Fails immediately when an exception is encountered.</summary>
131
        /// <param name="task">The Task to be monitored.</param>
132
        /// <returns>The original Task.</returns>
133
        public static Task FailFastOnException(this Task task)
134
        {
135
            task.ContinueWith(t => Environment.FailFast("A task faulted.", t.Exception),
136
                CancellationToken.None,
137
                TaskContinuationOptions.ExecuteSynchronously | TaskContinuationOptions.OnlyOnFaulted,
138
                TaskScheduler.Default);
139
            return task;
140
        }
141

    
142
        /// <summary>Fails immediately when an exception is encountered.</summary>
143
        /// <param name="task">The Task to be monitored.</param>
144
        /// <returns>The original Task.</returns>
145
        public static Task<T> FailFastOnException<T>(this Task<T> task)
146
        {
147
            return (Task<T>)((Task)task).FailFastOnException();
148
        }
149

    
150
        /// <summary>Propagates any exceptions that occurred on the specified task.</summary>
151
        /// <param name="task">The Task whose exceptions are to be propagated.</param>
152
        public static void PropagateExceptions(this Task task)
153
        {
154
            if (!task.IsCompleted) throw new InvalidOperationException("The task has not completed.");
155
            if (task.IsFaulted) task.Wait();
156
        }
157

    
158
        /// <summary>Propagates any exceptions that occurred on the specified tasks.</summary>
159
        /// <param name="task">The Tassk whose exceptions are to be propagated.</param>
160
        public static void PropagateExceptions(this Task [] tasks)
161
        {
162
            if (tasks == null) throw new ArgumentNullException("tasks");
163
            if (tasks.Any(t => t == null)) throw new ArgumentException("tasks");
164
            if (tasks.Any(t => !t.IsCompleted)) throw new InvalidOperationException("A task has not completed.");
165
            Task.WaitAll(tasks);
166
        }
167
        #endregion
168

    
169
        #region Observables
170
        /// <summary>Creates an IObservable that represents the completion of a Task.</summary>
171
        /// <typeparam name="TResult">Specifies the type of data returned by the Task.</typeparam>
172
        /// <param name="task">The Task to be represented as an IObservable.</param>
173
        /// <returns>An IObservable that represents the completion of the Task.</returns>
174
        public static IObservable<TResult> ToObservable<TResult>(this Task<TResult> task)
175
        {
176
            if (task == null) throw new ArgumentNullException("task");
177
            return new TaskObservable<TResult> { _task = task };
178
        }
179

    
180
        /// <summary>An implementation of IObservable that wraps a Task.</summary>
181
        /// <typeparam name="TResult">The type of data returned by the task.</typeparam>
182
        private class TaskObservable<TResult> : IObservable<TResult>
183
        {
184
            internal Task<TResult> _task;
185

    
186
            public IDisposable Subscribe(IObserver<TResult> observer)
187
            {
188
                // Validate arguments
189
                if (observer == null) throw new ArgumentNullException("observer");
190

    
191
                // Support cancelling the continuation if the observer is unsubscribed
192
                var cts = new CancellationTokenSource();
193

    
194
                // Create a continuation to pass data along to the observer
195
                _task.ContinueWith(t =>
196
                {
197
                    switch (t.Status)
198
                    {
199
                        case TaskStatus.RanToCompletion:
200
                            observer.OnNext(_task.Result);
201
                            observer.OnCompleted();
202
                            break;
203

    
204
                        case TaskStatus.Faulted:
205
                            observer.OnError(_task.Exception);
206
                            break;
207

    
208
                        case TaskStatus.Canceled:
209
                            observer.OnError(new TaskCanceledException(t));
210
                            break;
211
                    }
212
                }, cts.Token);
213

    
214
                // Support unsubscribe simply by canceling the continuation if it hasn't yet run
215
                return new CancelOnDispose { Source = cts };
216
            }
217
        }
218

    
219
        /// <summary>Translate a call to IDisposable.Dispose to a CancellationTokenSource.Cancel.</summary>
220
        private class CancelOnDispose : IDisposable
221
        {
222
            internal CancellationTokenSource Source;
223
            void IDisposable.Dispose() { Source.Cancel(); }
224
        }
225
        #endregion
226

    
227
        #region Timeouts
228
        /// <summary>Creates a new Task that mirrors the supplied task but that will be canceled after the specified timeout.</summary>
229
        /// <typeparam name="TResult">Specifies the type of data contained in the task.</typeparam>
230
        /// <param name="task">The task.</param>
231
        /// <param name="timeout">The timeout.</param>
232
        /// <returns>The new Task that may time out.</returns>
233
        public static Task WithTimeout(this Task task, TimeSpan timeout)
234
        {
235
            var result = new TaskCompletionSource<object>(task.AsyncState);
236
            var timer = new Timer(state => ((TaskCompletionSource<object>)state).TrySetCanceled(), result, timeout, TimeSpan.FromMilliseconds(-1));
237
            task.ContinueWith(t =>
238
            {
239
                timer.Dispose();
240
                result.TrySetFromTask(t);
241
            }, TaskContinuationOptions.ExecuteSynchronously);
242
            return result.Task;
243
        }
244

    
245
        /// <summary>Creates a new Task that mirrors the supplied task but that will be canceled after the specified timeout.</summary>
246
        /// <typeparam name="TResult">Specifies the type of data contained in the task.</typeparam>
247
        /// <param name="task">The task.</param>
248
        /// <param name="timeout">The timeout.</param>
249
        /// <returns>The new Task that may time out.</returns>
250
        public static Task<TResult> WithTimeout<TResult>(this Task<TResult> task, TimeSpan timeout)
251
        {
252
            var result = new TaskCompletionSource<TResult>(task.AsyncState);
253
            var timer = new Timer(state => ((TaskCompletionSource<TResult>)state).TrySetCanceled(), result, timeout, TimeSpan.FromMilliseconds(-1));
254
            task.ContinueWith(t =>
255
            {
256
                timer.Dispose();
257
                result.TrySetFromTask(t);
258
            }, TaskContinuationOptions.ExecuteSynchronously);
259
            return result.Task;
260
        }
261
        #endregion
262

    
263
        #region Children
264
        /// <summary>
265
        /// Ensures that a parent task can't transition into a completed state
266
        /// until the specified task has also completed, even if it's not
267
        /// already a child task.
268
        /// </summary>
269
        /// <param name="task">The task to attach to the current task as a child.</param>
270
        public static void AttachToParent(this Task task)
271
        {
272
            if (task == null) throw new ArgumentNullException("task");
273
            task.ContinueWith(t => t.Wait(), CancellationToken.None,
274
                TaskContinuationOptions.AttachedToParent | 
275
                TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Default);
276
        }
277
        #endregion
278

    
279
        #region Waiting
280
        /// <summary>Waits for the task to complete execution, pumping in the meantime.</summary>
281
        /// <param name="task">The task for which to wait.</param>
282
        /// <remarks>This method is intended for usage with Windows Presentation Foundation.</remarks>
283
        public static void WaitWithPumping(this Task task)
284
        {
285
            if (task == null) throw new ArgumentNullException("task");
286
            var nestedFrame = new DispatcherFrame();
287
            task.ContinueWith(_ => nestedFrame.Continue = false);
288
            Dispatcher.PushFrame(nestedFrame);
289
            task.Wait();
290
        }
291

    
292
        /// <summary>Waits for the task to complete execution, returning the task's final status.</summary>
293
        /// <param name="task">The task for which to wait.</param>
294
        /// <returns>The completion status of the task.</returns>
295
        /// <remarks>Unlike Wait, this method will not throw an exception if the task ends in the Faulted or Canceled state.</remarks>
296
        public static TaskStatus WaitForCompletionStatus(this Task task)
297
        {
298
            if (task == null) throw new ArgumentNullException("task");
299
            ((IAsyncResult)task).AsyncWaitHandle.WaitOne();
300
            return task.Status;
301
        }
302
        #endregion
303
    }
304
}