Backed off in PutBlock. Discovered problems in PollAgent.UpdateHashes. It updates...
[pithos-ms-client] / trunk / Pithos.Network / WebExtensions.cs
1 using System;\r
2 using System.Collections.Generic;\r
3 using System.ComponentModel;\r
4 using System.Diagnostics.Contracts;\r
5 using System.Linq;\r
6 using System.Net.Http;\r
7 using System.Reflection;\r
8 using System.Text;\r
9 using System.Net;\r
10 using System.IO;\r
11 using System.Threading;\r
12 using System.Threading.Tasks;\r
13 using log4net;\r
14 \r
15 namespace Pithos.Network\r
16 {\r
17     public static class WebExtensions\r
18     {\r
19         private static readonly ILog Log = LogManager.GetLogger(MethodBase.GetCurrentMethod().DeclaringType);\r
20 \r
21         public static string ReadToEnd(this HttpWebResponse response)\r
22         {\r
23             using (var stream = response.GetResponseStream())\r
24             {\r
25                 if (stream == null)\r
26                     return null;\r
27                 using (var reader = new StreamReader(stream))\r
28                 {\r
29                     var body = reader.ReadToEnd();\r
30                     return body;\r
31                 }\r
32             }\r
33         }\r
34 \r
35         public static void LogError(this ILog log, HttpWebResponse response)\r
36         {\r
37             if (log.IsDebugEnabled)\r
38             {\r
39                 if (response != null)\r
40                 {\r
41                     var body = response.ReadToEnd();\r
42                     log.ErrorFormat("Headers:\n{0}\nBody:{1}", response.Headers, body);\r
43                 }\r
44             }\r
45         }\r
46 \r
47         public static TextReader GetLoggedReader(this Stream stream, ILog log)\r
48         {\r
49             var reader = new StreamReader(stream);\r
50             if (!log.IsDebugEnabled)\r
51                 return reader;\r
52 \r
53             using (reader)\r
54             {\r
55                 var body = reader.ReadToEnd();\r
56                 log.DebugFormat("JSON response: {0}", body);\r
57                 return new StringReader(body);\r
58             }\r
59         }\r
60 \r
61 \r
62         public static IEnumerable<T> Range<T>(this IList<T> list, int start, int end)\r
63         {\r
64             Contract.Requires(start >= 0);\r
65             Contract.Requires(end < list.Count);\r
66             Contract.Requires(start <= end);\r
67 \r
68             if (list == null)\r
69                 yield break;\r
70 \r
71             for (var i = 0; i <= end; i++)\r
72             {\r
73                 yield return list[i];\r
74             }\r
75 \r
76         }\r
77 \r
78         public static Task<byte[]> UploadDataTaskAsync(this WebClient webClient, Uri address, string method, byte[] data, CancellationToken cancellationToken, IProgress<UploadProgressChangedEventArgs> progress)\r
79         {\r
80             var tcs = new TaskCompletionSource<byte[]>(address);\r
81             if (cancellationToken.IsCancellationRequested)\r
82             {\r
83                 tcs.TrySetCanceled();\r
84             }\r
85             else\r
86             {\r
87                 CancellationTokenRegistration ctr = cancellationToken.Register(()=>\r
88                 {\r
89                     webClient.CancelAsync();\r
90                 });\r
91                 UploadDataCompletedEventHandler completedHandler = null;\r
92                 UploadProgressChangedEventHandler progressHandler = null;\r
93                 if (progress != null)\r
94                     progressHandler = (s, e) => PithosEAPCommon.HandleProgress(tcs, e, () => e, progress);\r
95                 completedHandler =(sender, e) =>PithosEAPCommon.HandleCompletion(tcs, true, e,() => e.Result,() =>\r
96                 { \r
97                     ctr.Dispose();\r
98                     webClient.UploadDataCompleted -= completedHandler;\r
99                     webClient.UploadProgressChanged -= progressHandler;\r
100                 });\r
101                 webClient.UploadDataCompleted += completedHandler;\r
102                 webClient.UploadProgressChanged += progressHandler;\r
103                 try\r
104                 {\r
105                     webClient.UploadDataAsync(address, method, data, tcs);\r
106                     if (cancellationToken.IsCancellationRequested)\r
107                         webClient.CancelAsync();\r
108                 }\r
109                 catch\r
110                 {\r
111                     webClient.UploadDataCompleted -= completedHandler;\r
112                     webClient.UploadProgressChanged -= progressHandler;\r
113                     throw;\r
114                 }\r
115             }\r
116             return tcs.Task;\r
117         }\r
118 \r
119         public static async Task<T> WithRetries<T>(this Func<Task<T>> func, int retries)\r
120         {\r
121             while (retries > 0)\r
122             {\r
123                 try\r
124                 {\r
125                     var result = await func();\r
126                     return result;\r
127                 }\r
128                 catch (Exception exc)\r
129                 {\r
130                     if (--retries == 0)\r
131                         throw new RetryException("Failed too many times", exc);\r
132                 }\r
133             }\r
134             throw new RetryException();\r
135         }\r
136 \r
137         public static async Task<HttpResponseMessage> WithRetriesForWeb(this Func<Task<HttpResponseMessage>> func, int retries)\r
138         {\r
139             var waitTime = TimeSpan.FromSeconds(10);\r
140             var acceptedCodes = new[] { HttpStatusCode.Moved, HttpStatusCode.MovedPermanently, HttpStatusCode.Found, HttpStatusCode.Redirect,HttpStatusCode.SeeOther,\r
141                 HttpStatusCode.RedirectMethod,HttpStatusCode.NotModified,HttpStatusCode.TemporaryRedirect,HttpStatusCode.RedirectKeepVerb};\r
142             while (retries > 0)\r
143             {\r
144                 var result = await func();\r
145                 if (result.IsSuccessStatusCode || acceptedCodes.Contains(result.StatusCode))\r
146                     return result;\r
147                     \r
148                 if (--retries == 0)\r
149                     throw new RetryException("Failed too many times");\r
150 \r
151                 //Wait for service unavailable\r
152                 if (result.StatusCode == HttpStatusCode.ServiceUnavailable)\r
153                 {\r
154                     Log.InfoFormat("[UNAVAILABLE] Waiting before retry: {0}",result.ReasonPhrase);\r
155                     await TaskEx.Delay(waitTime);\r
156                     //increase the timeout for repeated timeouts\r
157                     if (waitTime<TimeSpan.FromSeconds(10))\r
158                         waitTime = waitTime.Add(TimeSpan.FromSeconds(10));\r
159                 }                \r
160                 //Throw in all other cases\r
161                 else \r
162                     result.EnsureSuccessStatusCode();\r
163             }\r
164             throw new RetryException();\r
165         }\r
166 \r
167 \r
168         public static async Task<string> GetStringAsyncWithRetries(this HttpClient client, Uri requestUri, int retries,DateTime? since=null)\r
169         {                        \r
170             var request = new HttpRequestMessage(HttpMethod.Get, requestUri);            \r
171             if (since.HasValue)\r
172             {\r
173                 request.Headers.IfModifiedSince = since.Value;\r
174             }\r
175             //Func<Task<HttpResponseMessage>> call = () => _baseHttpClient.SendAsync(request);\r
176             using (var response = await client.SendAsyncWithRetries(request,3))\r
177             {\r
178                 if (response.StatusCode == HttpStatusCode.NoContent)\r
179                     return String.Empty;\r
180 \r
181                 var content = await response.Content.ReadAsStringAsync();\r
182                 return content;\r
183             }\r
184 \r
185         }\r
186 \r
187         public static Task<HttpResponseMessage> HeadAsyncWithRetries(this HttpClient client, Uri requestUri, int retries)\r
188         {\r
189             return client.HeadAsyncWithRetries(requestUri, retries, HttpCompletionOption.ResponseContentRead, CancellationToken.None);\r
190         }\r
191 \r
192         public static Task<HttpResponseMessage> HeadAsyncWithRetries(this HttpClient client, Uri requestUri, int retries, HttpCompletionOption completionOption, CancellationToken cancellationToken)\r
193         {\r
194             return client.SendAsyncWithRetries(new HttpRequestMessage(HttpMethod.Head, requestUri), retries, completionOption, cancellationToken);\r
195         }\r
196 \r
197         public static Task<HttpResponseMessage> GetAsyncWithRetries(this HttpClient client, Uri requestUri, int retries)\r
198         {\r
199             return client.GetAsyncWithRetries(requestUri, retries, HttpCompletionOption.ResponseContentRead, CancellationToken.None);\r
200         }\r
201 \r
202         public static Task<HttpResponseMessage> GetAsyncWithRetries(this HttpClient client, Uri requestUri, int retries, HttpCompletionOption completionOption, CancellationToken cancellationToken)\r
203         {\r
204             return client.SendAsyncWithRetries(new HttpRequestMessage(HttpMethod.Get, requestUri), retries, completionOption, cancellationToken);\r
205         }\r
206 \r
207 \r
208         public static Task<HttpResponseMessage> SendAsyncWithRetries(this HttpClient client,HttpRequestMessage message, int retries)\r
209         {\r
210             return client.SendAsyncWithRetries(message, retries, HttpCompletionOption.ResponseContentRead, CancellationToken.None);\r
211         }\r
212 \r
213         public static async Task<HttpResponseMessage> SendAsyncWithRetries(this HttpClient client,HttpRequestMessage message, int retries,HttpCompletionOption completionOption, CancellationToken cancellationToken)\r
214         {\r
215             var waitTime = TimeSpan.FromSeconds(10);\r
216             var acceptedCodes = new[] { HttpStatusCode.Moved, HttpStatusCode.MovedPermanently, HttpStatusCode.Found, HttpStatusCode.Redirect,HttpStatusCode.SeeOther,\r
217                 HttpStatusCode.RedirectMethod,HttpStatusCode.NotModified,HttpStatusCode.TemporaryRedirect,HttpStatusCode.RedirectKeepVerb,HttpStatusCode.Conflict};\r
218             while (retries > 0)\r
219             {\r
220                 if (Log.IsDebugEnabled)\r
221                     Log.DebugFormat("[REQUEST] {0}",message);\r
222                 var result = await client.SendAsync(message,completionOption,cancellationToken);\r
223                 if (result.IsSuccessStatusCode || acceptedCodes.Contains(result.StatusCode))\r
224                 {\r
225                     if (Log.IsDebugEnabled)\r
226                         Log.DebugFormat("[RESPONSE] [{0}]:[{1}] OK: [{2}]", message.Method,message.RequestUri, result.StatusCode);\r
227                     return result;\r
228                 }\r
229                 //Failed, will have to abort or retry\r
230                 if (Log.IsDebugEnabled)\r
231                     Log.DebugFormat("[RESPONSE] [{0}]:[{1}] FAIL: [{2}]\n{3}", message.Method,message.RequestUri, result.StatusCode,result);\r
232 \r
233                 if (--retries == 0)\r
234                     throw new RetryException("Failed too many times");\r
235 \r
236                 //Wait for service unavailable\r
237                 if (result.StatusCode == HttpStatusCode.ServiceUnavailable)\r
238                 {\r
239                     \r
240                     Log.WarnFormat("[UNAVAILABLE] Waiting before retrying [{0}]:[{1}] due to [{2}]",message.Method, message.RequestUri,result.ReasonPhrase);                    \r
241                     await TaskEx.Delay(waitTime);\r
242                     //increase the timeout for repeated timeouts\r
243                     if (waitTime<TimeSpan.FromSeconds(10))\r
244                         waitTime = waitTime.Add(TimeSpan.FromSeconds(10));\r
245                 }                \r
246                 //Throw in all other cases\r
247                 else \r
248                     result.EnsureSuccessStatusCode();\r
249             }\r
250             throw new RetryException();\r
251         }\r
252 \r
253     }\r
254 \r
255     internal static class PithosEAPCommon\r
256     {\r
257         internal static void HandleProgress<T, E>(TaskCompletionSource<T> tcs, ProgressChangedEventArgs eventArgs, Func<E> getProgress, IProgress<E> callback)\r
258         {\r
259             if (eventArgs.UserState != tcs)\r
260                 return;\r
261             callback.Report(getProgress());\r
262         }\r
263 \r
264         internal static void HandleCompletion<T>(TaskCompletionSource<T> tcs, bool requireMatch, AsyncCompletedEventArgs e, Func<T> getResult, Action unregisterHandler)\r
265         {\r
266             if (requireMatch)\r
267             {\r
268                 if (e.UserState != tcs)\r
269                     return;\r
270             }\r
271             try\r
272             {\r
273                 unregisterHandler();\r
274             }\r
275             finally\r
276             {\r
277                 if (e.Cancelled)\r
278                     tcs.TrySetCanceled();\r
279                 else if (e.Error != null)\r
280                     tcs.TrySetException(e.Error);\r
281                 else\r
282                     tcs.TrySetResult(getResult());\r
283             }\r
284         }\r
285     }\r
286 \r
287 }\r