Statistics
| Branch: | Revision:

root / trunk / Pithos.Core / TaskExtensions.cs @ 7f5882da

History | View | Annotate | Download (7.6 kB)

1
#region
2
/* -----------------------------------------------------------------------
3
 * <copyright file="TaskExtensions.cs" company="GRNet">
4
 * 
5
 * Copyright 2011-2012 GRNET S.A. All rights reserved.
6
 *
7
 * Redistribution and use in source and binary forms, with or
8
 * without modification, are permitted provided that the following
9
 * conditions are met:
10
 *
11
 *   1. Redistributions of source code must retain the above
12
 *      copyright notice, this list of conditions and the following
13
 *      disclaimer.
14
 *
15
 *   2. Redistributions in binary form must reproduce the above
16
 *      copyright notice, this list of conditions and the following
17
 *      disclaimer in the documentation and/or other materials
18
 *      provided with the distribution.
19
 *
20
 *
21
 * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
22
 * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
23
 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
24
 * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
25
 * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26
 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27
 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
28
 * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
29
 * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
30
 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
31
 * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
32
 * POSSIBILITY OF SUCH DAMAGE.
33
 *
34
 * The views and conclusions contained in the software and
35
 * documentation are those of the authors and should not be
36
 * interpreted as representing official policies, either expressed
37
 * or implied, of GRNET S.A.
38
 * </copyright>
39
 * -----------------------------------------------------------------------
40
 */
41
#endregion
42
using System;
43
using System.Collections.Concurrent;
44
using System.Collections.Generic;
45
using System.Diagnostics.Contracts;
46
using System.Linq;
47
using System.Text;
48
using System.Threading;
49
using System.Threading.Tasks;
50

    
51
namespace Pithos.Core
52
{
53
    public static class TaskExtensions
54
    {
55
        public static Task<T2> Then<T1, T2>(this Task<T1> first, Func<T1, Task<T2>> next)
56
        {
57
            if (first == null)
58
                throw new ArgumentNullException("first");
59
            if (next == null)
60
                throw new ArgumentNullException("next");
61
            Contract.EndContractBlock();
62
            return Then(first, next, CancellationToken.None);
63
        }
64

    
65
        public static Task Then<T1>(this Task<T1> first, Func<T1, Task> next)
66
        {
67
            if (first == null)
68
                throw new ArgumentNullException("first");
69
            if (next == null)
70
                throw new ArgumentNullException("next");
71
            Contract.EndContractBlock();
72
            return Then(first, next, CancellationToken.None);
73
        }
74

    
75
        public static Task<T2> Then<T1, T2>(this Task<T1> first, Func<T1, Task<T2>> next, CancellationToken cancellationToken)
76
        {
77
            if (first == null) 
78
                throw new ArgumentNullException("first");
79
            if (next == null) 
80
                throw new ArgumentNullException("next");
81
            Contract.EndContractBlock();
82
            Contract.Assume(TaskScheduler.Current!=null);
83

    
84
            var tcs = new TaskCompletionSource<T2>();
85
            first.ContinueWith(delegate
86
            {
87
                if (first.IsFaulted) tcs.TrySetException(first.Exception.InnerExceptions);
88
                else if (first.IsCanceled) tcs.TrySetCanceled();
89
                else
90
                {
91
                    try
92
                    {
93
                        var t = next(first.Result);
94
                        if (t == null) tcs.TrySetCanceled();
95
                        else t.ContinueWith(delegate
96
                        {
97
                            if (t.IsFaulted) tcs.TrySetException(t.Exception.InnerExceptions);
98
                            else if (t.IsCanceled) tcs.TrySetCanceled();
99
                            else tcs.TrySetResult(t.Result);
100
                        }, TaskContinuationOptions.ExecuteSynchronously);
101
                    }
102
                    catch (Exception exc) { tcs.TrySetException(exc); }
103
                }
104
            }, cancellationToken, TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Current);
105
            return tcs.Task;
106
        }
107

    
108
        public static Task Then<T1>(this Task<T1> first, Func<T1, Task> next, CancellationToken cancellationToken)
109
        {
110
            if (first == null)
111
                throw new ArgumentNullException("first");
112
            if (next == null)
113
                throw new ArgumentNullException("next");
114
            Contract.EndContractBlock();
115
            Contract.Assume(TaskScheduler.Current != null);
116

    
117
            var tcs = new TaskCompletionSource<object>();
118
            first.ContinueWith(delegate
119
            {
120
                if (first.IsFaulted) tcs.TrySetException(first.Exception.InnerExceptions);
121
                else if (first.IsCanceled) tcs.TrySetCanceled();
122
                else
123
                {
124
                    try
125
                    {
126
                        var t = next(first.Result);
127
                        if (t == null) tcs.TrySetCanceled();
128
                        else t.ContinueWith(delegate
129
                        {
130
                            if (t.IsFaulted) tcs.TrySetException(t.Exception.InnerExceptions);
131
                            else if (t.IsCanceled) tcs.TrySetCanceled();
132
                            else tcs.TrySetResult(null);
133
                        }, TaskContinuationOptions.ExecuteSynchronously);
134
                    }
135
                    catch (Exception exc) { tcs.TrySetException(exc); }
136
                }
137
            }, cancellationToken, TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Current);
138
            return tcs.Task;
139
        }
140

    
141

    
142

    
143
        public static void ReportExceptions(this Task task,Action<AggregateException> OnError,Action OnSuccess )
144
        {
145
            if (!task.IsCompleted) throw new InvalidOperationException("The task has not completed.");
146
            if (task.IsFaulted)             
147
                task.Exception.Handle(exc=>
148
                                          {
149
                                              OnError(task.Exception);
150
                                              return true;
151
                                          }); 
152
            else
153
            {
154
                OnSuccess();
155
            }
156
        }
157

    
158
        public static bool TryRemove<T>(this ObservableConcurrentCollection<T> collection,T item) where T:class
159
        {
160
            var found = false;
161
            IProducerConsumerCollection<T> items= collection;
162
            //Store the initial count
163
            var count = items.Count;
164
            for (var i = 0; i < count; i++)
165
            {
166
                T tempItem;
167
                //Take an item
168
                if (!items.TryTake(out tempItem)) 
169
                    return false;
170
                //If it isn't the one we are looking for
171
                if (tempItem != item)
172
                    //put it back
173
                    items.TryAdd(tempItem);
174
                else
175
                    //otherwise skip it and flag succcess
176
                    found = true;
177
            }
178
            return found;
179
        }
180

    
181
        public static bool TryAdd<T>(this ObservableConcurrentCollection<T> collection,T item) where T:class
182
        {
183
            if (collection==null)
184
                throw new ArgumentNullException("collection");
185
            Contract.EndContractBlock();
186

    
187
            if (item == null)
188
                return false;
189

    
190
            IProducerConsumerCollection<T> items= collection;            
191
            return items.TryAdd(item);
192
        }
193

    
194
    }
195
}