Some timeout issues
[pithos-ms-client] / trunk / Pithos.Network / WebExtensions.cs
1 using System;\r
2 using System.Collections.Generic;\r
3 using System.ComponentModel;\r
4 using System.Diagnostics.Contracts;\r
5 using System.Linq;\r
6 using System.Net.Http;\r
7 using System.Net.Http.Headers;\r
8 using System.Reflection;\r
9 using System.Text;\r
10 using System.Net;\r
11 using System.IO;\r
12 using System.Threading;\r
13 using System.Threading.Tasks;\r
14 using log4net;\r
15 using System.ServiceModel.Channels;\r
16 \r
17 namespace Pithos.Network\r
18 {\r
19     public static class WebExtensions\r
20     {\r
21         private static readonly ILog Log = LogManager.GetLogger(MethodBase.GetCurrentMethod().DeclaringType);\r
22 \r
23         public static string ReadToEnd(this HttpWebResponse response)\r
24         {\r
25             using (var stream = response.GetResponseStream())\r
26             {\r
27                 if (stream == null)\r
28                     return null;\r
29                 using (var reader = new StreamReader(stream))\r
30                 {\r
31                     var body = reader.ReadToEnd();\r
32                     return body;\r
33                 }\r
34             }\r
35         }\r
36 \r
37         public static void LogError(this ILog log, HttpWebResponse response)\r
38         {\r
39             if (log.IsDebugEnabled)\r
40             {\r
41                 if (response != null)\r
42                 {\r
43                     var body = response.ReadToEnd();\r
44                     log.ErrorFormat("Headers:\n{0}\nBody:{1}", response.Headers, body);\r
45                 }\r
46             }\r
47         }\r
48 \r
49         public static TextReader GetLoggedReader(this Stream stream, ILog log)\r
50         {\r
51             var reader = new StreamReader(stream);\r
52             if (!log.IsDebugEnabled)\r
53                 return reader;\r
54 \r
55             using (reader)\r
56             {\r
57                 var body = reader.ReadToEnd();\r
58                 log.DebugFormat("JSON response: {0}", body);\r
59                 return new StringReader(body);\r
60             }\r
61         }\r
62 \r
63 \r
64         public static IEnumerable<T> Range<T>(this IList<T> list, int start, int end)\r
65         {\r
66             Contract.Requires(start >= 0);\r
67             Contract.Requires(end < list.Count);\r
68             Contract.Requires(start <= end);\r
69 \r
70             if (list == null)\r
71                 yield break;\r
72 \r
73             for (var i = 0; i <= end; i++)\r
74             {\r
75                 yield return list[i];\r
76             }\r
77 \r
78         }\r
79 \r
80         public static Task<byte[]> UploadDataTaskAsync(this WebClient webClient, Uri address, string method, byte[] data, CancellationToken cancellationToken, IProgress<UploadProgressChangedEventArgs> progress)\r
81         {\r
82             var tcs = new TaskCompletionSource<byte[]>(address);\r
83             if (cancellationToken.IsCancellationRequested)\r
84             {\r
85                 tcs.TrySetCanceled();\r
86             }\r
87             else\r
88             {\r
89                 CancellationTokenRegistration ctr = cancellationToken.Register(()=>\r
90                 {\r
91                     webClient.CancelAsync();\r
92                 });\r
93                 UploadDataCompletedEventHandler completedHandler = null;\r
94                 UploadProgressChangedEventHandler progressHandler = null;\r
95                 if (progress != null)\r
96                     progressHandler = (s, e) => PithosEAPCommon.HandleProgress(tcs, e, () => e, progress);\r
97                 completedHandler =(sender, e) =>PithosEAPCommon.HandleCompletion(tcs, true, e,() => e.Result,() =>\r
98                 { \r
99                     ctr.Dispose();\r
100                     webClient.UploadDataCompleted -= completedHandler;\r
101                     webClient.UploadProgressChanged -= progressHandler;\r
102                 });\r
103                 webClient.UploadDataCompleted += completedHandler;\r
104                 webClient.UploadProgressChanged += progressHandler;\r
105                 try\r
106                 {\r
107                     webClient.UploadDataAsync(address, method, data, tcs);\r
108                     if (cancellationToken.IsCancellationRequested)\r
109                         webClient.CancelAsync();\r
110                 }\r
111                 catch\r
112                 {\r
113                     webClient.UploadDataCompleted -= completedHandler;\r
114                     webClient.UploadProgressChanged -= progressHandler;\r
115                     throw;\r
116                 }\r
117             }\r
118             return tcs.Task;\r
119         }\r
120 \r
121         public static async Task<T> WithRetries<T>(this Func<Task<T>> func, int retries)\r
122         {\r
123             while (retries > 0)\r
124             {\r
125                 try\r
126                 {\r
127                     var result = await func().ConfigureAwait(false);\r
128                     return result;\r
129                 }\r
130                 catch (Exception exc)\r
131                 {\r
132                     if (--retries == 0)\r
133                         throw new RetryException("Failed too many times", exc);\r
134                 }\r
135             }\r
136             throw new RetryException();\r
137         }\r
138 \r
139         public static async Task<HttpResponseMessage> WithRetriesForWeb(this Func<Task<HttpResponseMessage>> func, int retries)\r
140         {\r
141             var waitTime = TimeSpan.FromSeconds(10);\r
142             var acceptedCodes = new[] { HttpStatusCode.Moved, HttpStatusCode.MovedPermanently, HttpStatusCode.Found, HttpStatusCode.Redirect,HttpStatusCode.SeeOther,\r
143                 HttpStatusCode.RedirectMethod,HttpStatusCode.NotModified,HttpStatusCode.TemporaryRedirect,HttpStatusCode.RedirectKeepVerb};\r
144             while (retries > 0)\r
145             {\r
146                 var result = await func().ConfigureAwait(false);\r
147                 if (result.IsSuccessStatusCode || acceptedCodes.Contains(result.StatusCode))\r
148                     return result;\r
149                     \r
150                 if (--retries == 0)\r
151                     throw new RetryException("Failed too many times");\r
152 \r
153                 //Wait for service unavailable\r
154                 if (result.StatusCode == HttpStatusCode.ServiceUnavailable)\r
155                 {\r
156                     Log.InfoFormat("[UNAVAILABLE] Waiting before retry: {0}",result.ReasonPhrase);\r
157                     await TaskEx.Delay(waitTime).ConfigureAwait(false);\r
158                     //increase the timeout for repeated timeouts\r
159                     if (waitTime<TimeSpan.FromSeconds(10))\r
160                         waitTime = waitTime.Add(TimeSpan.FromSeconds(10));\r
161                 }                \r
162                 //Throw in all other cases\r
163                 else \r
164                     result.EnsureSuccessStatusCode();\r
165             }\r
166             throw new RetryException();\r
167         }\r
168 \r
169 \r
170         public static async Task<string> GetStringAsyncWithRetries(this HttpClient client, Uri requestUri, int retries,DateTime? since=null)\r
171         {                        \r
172             var request = new HttpRequestMessage(HttpMethod.Get, requestUri);            \r
173             if (since.HasValue)\r
174             {\r
175                 request.Headers.IfModifiedSince = since.Value;\r
176             }\r
177             //Func<Task<HttpResponseMessage>> call = () => _baseHttpClient.SendAsync(request);\r
178             using (var response = await client.SendAsyncWithRetries(request,3).ConfigureAwait(false))\r
179             {\r
180                 if (response.StatusCode == HttpStatusCode.NoContent)\r
181                     return String.Empty;\r
182 \r
183                 var content = await response.Content.ReadAsStringAsync().ConfigureAwait(false);\r
184                 return content;\r
185             }\r
186 \r
187         }\r
188 \r
189         public static Task<HttpResponseMessage> HeadAsyncWithRetries(this HttpClient client, Uri requestUri, int retries,bool acceptNotFound=false)\r
190         {\r
191             return client.HeadAsyncWithRetries(requestUri, retries, acceptNotFound,HttpCompletionOption.ResponseContentRead, CancellationToken.None);\r
192         }\r
193 \r
194         public static Task<HttpResponseMessage> HeadAsyncWithRetries(this HttpClient client, Uri requestUri, int retries, bool acceptNotFound,HttpCompletionOption completionOption, CancellationToken cancellationToken)\r
195         {\r
196             return client.SendAsyncWithRetries(new HttpRequestMessage(HttpMethod.Head, requestUri), retries, acceptNotFound,completionOption, cancellationToken);\r
197         }\r
198 \r
199         public static Task<HttpResponseMessage> GetAsyncWithRetries(this HttpClient client, Uri requestUri, int retries)\r
200         {\r
201             return client.GetAsyncWithRetries(requestUri, retries, HttpCompletionOption.ResponseContentRead, CancellationToken.None);\r
202         }\r
203 \r
204         public static Task<HttpResponseMessage> GetAsyncWithRetries(this HttpClient client, Uri requestUri, int retries, HttpCompletionOption completionOption, CancellationToken cancellationToken)\r
205         {\r
206             return client.SendAsyncWithRetries(new HttpRequestMessage(HttpMethod.Get, requestUri), retries, false,completionOption, cancellationToken);\r
207         }\r
208 \r
209 \r
210         public static Task<HttpResponseMessage> SendAsyncWithRetries(this HttpClient client,HttpRequestMessage message, int retries)\r
211         {\r
212             return client.SendAsyncWithRetries(message, retries,false, HttpCompletionOption.ResponseContentRead, CancellationToken.None);\r
213         }\r
214 \r
215         public static async Task<HttpResponseMessage> SendAsyncWithRetries(this HttpClient client,HttpRequestMessage message, int retries,bool acceptNotFound,HttpCompletionOption completionOption, CancellationToken cancellationToken)\r
216         {\r
217             var waitTime = TimeSpan.FromSeconds(10);\r
218             var acceptedCodes =acceptNotFound\r
219                 ? new[] {HttpStatusCode.NotFound, HttpStatusCode.Moved, HttpStatusCode.MovedPermanently, HttpStatusCode.Found, HttpStatusCode.Redirect,HttpStatusCode.SeeOther,\r
220                 HttpStatusCode.RedirectMethod,HttpStatusCode.NotModified,HttpStatusCode.TemporaryRedirect,HttpStatusCode.RedirectKeepVerb,HttpStatusCode.Conflict}\r
221                 : new[] { HttpStatusCode.Moved, HttpStatusCode.MovedPermanently, HttpStatusCode.Found, HttpStatusCode.Redirect,HttpStatusCode.SeeOther,\r
222                 HttpStatusCode.RedirectMethod,HttpStatusCode.NotModified,HttpStatusCode.TemporaryRedirect,HttpStatusCode.RedirectKeepVerb,HttpStatusCode.Conflict};\r
223             \r
224                 \r
225             while (retries > 0)\r
226             {\r
227                 var timedOut = false;\r
228                 if (Log.IsDebugEnabled)\r
229                     Log.DebugFormat("[REQUEST] {0}", message);\r
230                 HttpResponseMessage result=null;\r
231                 \r
232                 \r
233                 \r
234                 try\r
235                 {\r
236                     result = await client.SendAsync(message, completionOption, cancellationToken).ConfigureAwait(false);\r
237                 }\r
238                 catch (WebException exc)\r
239                 {\r
240                     if (exc.Status != WebExceptionStatus.Timeout)\r
241                         throw;\r
242                     timedOut = true;\r
243                     if (Log.IsDebugEnabled)\r
244                         Log.DebugFormat("[RESPONSE] [{0}]:[{1}] FAIL WITH TIMEOUT", message.Method, message.RequestUri);\r
245                 }\r
246                 catch(TaskCanceledException exc)\r
247                 {\r
248                     //If the task was cancelled due to a timeout, retry it\r
249                     if (!exc.CancellationToken.IsCancellationRequested)\r
250                     {\r
251                         timedOut = true;\r
252                         if (Log.IsDebugEnabled)\r
253                             Log.DebugFormat("[RESPONSE] [{0}]:[{1}] FAIL WITH TIMEOUT", message.Method, message.RequestUri);\r
254                     }\r
255                     else\r
256                     {\r
257                         throw;\r
258                     }\r
259                 }\r
260                 catch (Exception exc)\r
261                 {\r
262                     Log.FatalFormat("Unexpected error while sending:\n{0}\n{1}", message, exc);\r
263                     throw;\r
264                 }\r
265 \r
266                 if (timedOut)\r
267                 {\r
268                     if (--retries == 0)\r
269                         throw new RetryException("Failed too many times");\r
270                     continue;\r
271                 }\r
272 \r
273                 if (result.IsSuccessStatusCode || acceptedCodes.Contains(result.StatusCode))\r
274                 {\r
275                     if (Log.IsDebugEnabled)\r
276                         Log.DebugFormat("[RESPONSE] [{0}]:[{1}] OK: [{2}]", message.Method, message.RequestUri,\r
277                                         result.StatusCode);\r
278                     return result;\r
279                 }\r
280                 //Failed, will have to abort or retry\r
281                 if (Log.IsDebugEnabled)\r
282                     Log.DebugFormat("[RESPONSE] [{0}]:[{1}] FAIL: [{2}]\n{3}", message.Method, message.RequestUri,\r
283                                     result.StatusCode, result);\r
284 \r
285                 if (--retries == 0)\r
286                     throw new RetryException("Failed too many times");\r
287 \r
288                 //Wait for service unavailable\r
289                 if (result.StatusCode == HttpStatusCode.ServiceUnavailable ||\r
290                     result.StatusCode == HttpStatusCode.BadGateway)\r
291                 {\r
292 \r
293                     Log.WarnFormat("[UNAVAILABLE] Waiting before retrying [{0}]:[{1}] due to [{2}]", message.Method,\r
294                                    message.RequestUri, result.ReasonPhrase);\r
295                     await TaskEx.Delay(waitTime).ConfigureAwait(false);\r
296                     //increase the timeout for repeated timeouts\r
297                     if (waitTime < TimeSpan.FromSeconds(10))\r
298                         waitTime = waitTime.Add(TimeSpan.FromSeconds(10));\r
299                 }\r
300                     //Throw in all other cases\r
301                 else\r
302                     result.EnsureSuccessStatusCode();\r
303             }\r
304             throw new RetryException();\r
305         }\r
306 \r
307         public static string GetFirstValue(this HttpResponseHeaders headers, string name)\r
308         {\r
309             if (headers==null)\r
310                 throw new ArgumentNullException("headers");\r
311             if (String.IsNullOrWhiteSpace(name))\r
312                 throw new ArgumentNullException("name");\r
313             Contract.EndContractBlock();\r
314 \r
315             IEnumerable<string> values;\r
316             if (headers.TryGetValues(name, out values))\r
317             {\r
318                 return values.FirstOrDefault();\r
319             }\r
320             return null;\r
321         }\r
322 \r
323         public static  Dictionary<string, string> GetMeta(this HttpResponseHeaders headers,string metaPrefix)\r
324         {\r
325             if (headers == null)\r
326                 throw new ArgumentNullException("headers");\r
327             if (String.IsNullOrWhiteSpace(metaPrefix))\r
328                 throw new ArgumentNullException("metaPrefix");\r
329             Contract.EndContractBlock();\r
330 \r
331             var dict = (from header in headers\r
332                         where header.Key.StartsWith(metaPrefix)\r
333                         let name = header.Key.Substring(metaPrefix.Length)\r
334                         select new { Name = name, Value = String.Join(",",header.Value) })\r
335                         .ToDictionary(t => t.Name, t => t.Value);\r
336             return dict;\r
337         }\r
338 \r
339     }\r
340 \r
341     internal static class PithosEAPCommon\r
342     {\r
343         internal static void HandleProgress<T, E>(TaskCompletionSource<T> tcs, ProgressChangedEventArgs eventArgs, Func<E> getProgress, IProgress<E> callback)\r
344         {\r
345             if (eventArgs.UserState != tcs)\r
346                 return;\r
347             callback.Report(getProgress());\r
348         }\r
349 \r
350         internal static void HandleCompletion<T>(TaskCompletionSource<T> tcs, bool requireMatch, AsyncCompletedEventArgs e, Func<T> getResult, Action unregisterHandler)\r
351         {\r
352             if (requireMatch)\r
353             {\r
354                 if (e.UserState != tcs)\r
355                     return;\r
356             }\r
357             try\r
358             {\r
359                 unregisterHandler();\r
360             }\r
361             finally\r
362             {\r
363                 if (e.Cancelled)\r
364                     tcs.TrySetCanceled();\r
365                 else if (e.Error != null)\r
366                     tcs.TrySetException(e.Error);\r
367                 else\r
368                     tcs.TrySetResult(getResult());\r
369             }\r
370         }\r
371     }\r
372 \r
373 }\r