root / trunk / Pithos.Network / WebExtensions.cs @ dd5f5163
History | View | Annotate | Download (19.1 kB)
1 |
using System; |
---|---|
2 |
using System.Collections.Generic; |
3 |
using System.ComponentModel; |
4 |
using System.Diagnostics.Contracts; |
5 |
using System.Linq; |
6 |
using System.Net.Http; |
7 |
using System.Net.Http.Headers; |
8 |
using System.Reflection; |
9 |
using System.Text; |
10 |
using System.Net; |
11 |
using System.IO; |
12 |
using System.Threading; |
13 |
using System.Threading.Tasks; |
14 |
using Pithos.Interfaces; |
15 |
using log4net; |
16 |
using System.ServiceModel.Channels; |
17 |
|
18 |
namespace Pithos.Network |
19 |
{ |
20 |
public static class WebExtensions |
21 |
{ |
22 |
private static readonly ILog Log = LogManager.GetLogger(MethodBase.GetCurrentMethod().DeclaringType); |
23 |
|
24 |
public static string ReadToEnd(this HttpWebResponse response) |
25 |
{ |
26 |
using (var stream = response.GetResponseStream()) |
27 |
{ |
28 |
if (stream == null) |
29 |
return null; |
30 |
using (var reader = new StreamReader(stream)) |
31 |
{ |
32 |
var body = reader.ReadToEnd(); |
33 |
return body; |
34 |
} |
35 |
} |
36 |
} |
37 |
|
38 |
public static void LogError(this ILog log, HttpWebResponse response) |
39 |
{ |
40 |
if (log.IsDebugEnabled) |
41 |
{ |
42 |
if (response != null) |
43 |
{ |
44 |
var body = response.ReadToEnd(); |
45 |
log.ErrorFormat("Headers:\n{0}\nBody:{1}", response.Headers, body); |
46 |
} |
47 |
} |
48 |
} |
49 |
|
50 |
public static TextReader GetLoggedReader(this Stream stream, ILog log) |
51 |
{ |
52 |
var reader = new StreamReader(stream); |
53 |
if (!log.IsDebugEnabled) |
54 |
return reader; |
55 |
|
56 |
using (reader) |
57 |
{ |
58 |
var body = reader.ReadToEnd(); |
59 |
log.DebugFormat("JSON response: {0}", body); |
60 |
return new StringReader(body); |
61 |
} |
62 |
} |
63 |
|
64 |
|
65 |
public static IEnumerable<T> Range<T>(this IList<T> list, int start, int end) |
66 |
{ |
67 |
Contract.Requires(start >= 0); |
68 |
Contract.Requires(end < list.Count); |
69 |
Contract.Requires(start <= end); |
70 |
|
71 |
if (list == null) |
72 |
yield break; |
73 |
|
74 |
for (var i = 0; i <= end; i++) |
75 |
{ |
76 |
yield return list[i]; |
77 |
} |
78 |
|
79 |
} |
80 |
|
81 |
public static Task<byte[]> UploadDataTaskAsync(this WebClient webClient, Uri address, string method, byte[] data, CancellationToken cancellationToken, IProgress<UploadProgressChangedEventArgs> progress) |
82 |
{ |
83 |
var tcs = new TaskCompletionSource<byte[]>(address); |
84 |
if (cancellationToken.IsCancellationRequested) |
85 |
{ |
86 |
tcs.TrySetCanceled(); |
87 |
} |
88 |
else |
89 |
{ |
90 |
CancellationTokenRegistration ctr = cancellationToken.Register(()=> |
91 |
{ |
92 |
webClient.CancelAsync(); |
93 |
}); |
94 |
UploadDataCompletedEventHandler completedHandler = null; |
95 |
UploadProgressChangedEventHandler progressHandler = null; |
96 |
if (progress != null) |
97 |
progressHandler = (s, e) => PithosEAPCommon.HandleProgress(tcs, e, () => e, progress); |
98 |
completedHandler =(sender, e) =>PithosEAPCommon.HandleCompletion(tcs, true, e,() => e.Result,() => |
99 |
{ |
100 |
ctr.Dispose(); |
101 |
webClient.UploadDataCompleted -= completedHandler; |
102 |
webClient.UploadProgressChanged -= progressHandler; |
103 |
}); |
104 |
webClient.UploadDataCompleted += completedHandler; |
105 |
webClient.UploadProgressChanged += progressHandler; |
106 |
try |
107 |
{ |
108 |
webClient.UploadDataAsync(address, method, data, tcs); |
109 |
if (cancellationToken.IsCancellationRequested) |
110 |
webClient.CancelAsync(); |
111 |
} |
112 |
catch |
113 |
{ |
114 |
webClient.UploadDataCompleted -= completedHandler; |
115 |
webClient.UploadProgressChanged -= progressHandler; |
116 |
throw; |
117 |
} |
118 |
} |
119 |
return tcs.Task; |
120 |
} |
121 |
|
122 |
public static async Task<T> WithRetries<T>(this Func<Task<T>> func, int retries) |
123 |
{ |
124 |
while (retries > 0) |
125 |
{ |
126 |
try |
127 |
{ |
128 |
var result = await func().ConfigureAwait(false); |
129 |
return result; |
130 |
} |
131 |
catch (Exception exc) |
132 |
{ |
133 |
if (--retries == 0) |
134 |
throw new RetryException("Failed too many times", exc); |
135 |
} |
136 |
} |
137 |
throw new RetryException(); |
138 |
} |
139 |
|
140 |
public static async Task<HttpResponseMessage> WithRetriesForWeb(this Func<Task<HttpResponseMessage>> func, int retries) |
141 |
{ |
142 |
var waitTime = TimeSpan.FromSeconds(10); |
143 |
var acceptedCodes = new[] { HttpStatusCode.Moved, HttpStatusCode.MovedPermanently, HttpStatusCode.Found, HttpStatusCode.Redirect,HttpStatusCode.SeeOther, |
144 |
HttpStatusCode.RedirectMethod,HttpStatusCode.NotModified,HttpStatusCode.TemporaryRedirect,HttpStatusCode.RedirectKeepVerb}; |
145 |
while (retries > 0) |
146 |
{ |
147 |
var result = await func().ConfigureAwait(false); |
148 |
if (result.IsSuccessStatusCode || acceptedCodes.Contains(result.StatusCode)) |
149 |
return result; |
150 |
|
151 |
if (--retries == 0) |
152 |
throw new RetryException("Failed too many times"); |
153 |
|
154 |
//Wait for service unavailable |
155 |
if (result.StatusCode == HttpStatusCode.ServiceUnavailable) |
156 |
{ |
157 |
Log.InfoFormat("[UNAVAILABLE] Waiting before retry: {0}",result.ReasonPhrase); |
158 |
await TaskEx.Delay(waitTime).ConfigureAwait(false); |
159 |
//increase the timeout for repeated timeouts |
160 |
if (waitTime<TimeSpan.FromSeconds(10)) |
161 |
waitTime = waitTime.Add(TimeSpan.FromSeconds(10)); |
162 |
} |
163 |
//Throw in all other cases |
164 |
else |
165 |
result.EnsureSuccessStatusCode(); |
166 |
} |
167 |
throw new RetryException(); |
168 |
} |
169 |
|
170 |
|
171 |
public static async Task<string> GetStringAsyncWithRetries(this HttpClient client, Uri requestUri, int retries,DateTime? since=null) |
172 |
{ |
173 |
var request = new HttpRequestMessage(HttpMethod.Get, requestUri); |
174 |
if (since.HasValue) |
175 |
{ |
176 |
request.Headers.IfModifiedSince = since.Value; |
177 |
} |
178 |
//Func<Task<HttpResponseMessage>> call = () => _baseHttpClient.SendAsync(request); |
179 |
using (var response = await client.SendAsyncWithRetries(request,3).ConfigureAwait(false)) |
180 |
{ |
181 |
if (response.StatusCode == HttpStatusCode.NoContent) |
182 |
return String.Empty; |
183 |
|
184 |
var content = await response.Content.ReadAsStringAsync().ConfigureAwait(false); |
185 |
return content; |
186 |
} |
187 |
|
188 |
} |
189 |
|
190 |
public static Task<HttpResponseMessage> HeadAsyncWithRetries(this HttpClient client, Uri requestUri, int retries,bool acceptNotFound=false) |
191 |
{ |
192 |
return client.HeadAsyncWithRetries(requestUri, retries, acceptNotFound,HttpCompletionOption.ResponseContentRead, CancellationToken.None); |
193 |
} |
194 |
|
195 |
public static Task<HttpResponseMessage> HeadAsyncWithRetries(this HttpClient client, Uri requestUri, int retries, bool acceptNotFound,HttpCompletionOption completionOption, CancellationToken cancellationToken) |
196 |
{ |
197 |
return client.SendAsyncWithRetries(new HttpRequestMessage(HttpMethod.Head, requestUri), retries, acceptNotFound,completionOption, cancellationToken); |
198 |
} |
199 |
|
200 |
public static Task<HttpResponseMessage> GetAsyncWithRetries(this HttpClient client, Uri requestUri, int retries) |
201 |
{ |
202 |
return client.GetAsyncWithRetries(requestUri, retries, HttpCompletionOption.ResponseContentRead, CancellationToken.None); |
203 |
} |
204 |
|
205 |
public static Task<HttpResponseMessage> GetAsyncWithRetries(this HttpClient client, Uri requestUri, int retries, HttpCompletionOption completionOption, CancellationToken cancellationToken) |
206 |
{ |
207 |
return client.SendAsyncWithRetries(new HttpRequestMessage(HttpMethod.Get, requestUri), retries, false,completionOption, cancellationToken); |
208 |
} |
209 |
|
210 |
|
211 |
public static Task<HttpResponseMessage> SendAsyncWithRetries(this HttpClient client,HttpRequestMessage message, int retries) |
212 |
{ |
213 |
return client.SendAsyncWithRetries(message, retries,false, HttpCompletionOption.ResponseContentRead, CancellationToken.None); |
214 |
} |
215 |
|
216 |
public static async Task<HttpResponseMessage> SendAsyncWithRetries(this HttpClient client,HttpRequestMessage message, int retries,bool acceptNotFound,HttpCompletionOption completionOption, CancellationToken cancellationToken) |
217 |
{ |
218 |
var waitTime = TimeSpan.FromSeconds(10); |
219 |
var acceptedCodes = new HashSet<HttpStatusCode> { HttpStatusCode.Moved, HttpStatusCode.MovedPermanently, |
220 |
HttpStatusCode.Found, HttpStatusCode.Redirect,HttpStatusCode.SeeOther,HttpStatusCode.RedirectMethod, |
221 |
HttpStatusCode.NotModified,HttpStatusCode.TemporaryRedirect,HttpStatusCode.RedirectKeepVerb, |
222 |
HttpStatusCode.Conflict}; |
223 |
if (acceptNotFound) |
224 |
acceptedCodes.Add(HttpStatusCode.NotFound); |
225 |
|
226 |
while (retries > 0) |
227 |
{ |
228 |
var timedOut = false; |
229 |
if (Log.IsDebugEnabled) |
230 |
Log.DebugFormat("[REQUEST] {0}", message); |
231 |
HttpResponseMessage result=null; |
232 |
|
233 |
|
234 |
var msg = message.Clone(); |
235 |
Exception innerException=null; |
236 |
try |
237 |
{ |
238 |
result = await client.SendAsync(msg, completionOption, cancellationToken).ConfigureAwait(false); |
239 |
if (result.IsSuccessStatusCode || acceptedCodes.Contains(result.StatusCode)) |
240 |
{ |
241 |
if (Log.IsDebugEnabled) |
242 |
Log.DebugFormat("[RESPONSE] [{0}]:[{1}] OK: [{2}]", msg.Method, msg.RequestUri, |
243 |
result.StatusCode); |
244 |
return result; |
245 |
} |
246 |
|
247 |
} |
248 |
catch (HttpRequestException exc) |
249 |
{ |
250 |
//A timeout or other error caused a failure without receiving a response from the server |
251 |
if (Log.IsDebugEnabled) |
252 |
Log.DebugFormat("[RESPONSE] [{0}]:[{1}] HTTP FAIL :\n{2}", msg.Method, msg.RequestUri,exc); |
253 |
innerException = exc; |
254 |
} |
255 |
catch (WebException exc) |
256 |
{ |
257 |
if (exc.Status != WebExceptionStatus.Timeout) |
258 |
throw; |
259 |
timedOut = true; |
260 |
if (Log.IsDebugEnabled) |
261 |
Log.DebugFormat("[RESPONSE] [{0}]:[{1}] FAIL WITH TIMEOUT", msg.Method, msg.RequestUri); |
262 |
innerException = exc; |
263 |
} |
264 |
catch(TaskCanceledException exc) |
265 |
{ |
266 |
//If the task was cancelled due to a timeout, retry it |
267 |
if (!exc.CancellationToken.IsCancellationRequested) |
268 |
{ |
269 |
timedOut = true; |
270 |
if (Log.IsDebugEnabled) |
271 |
Log.DebugFormat("[RESPONSE] [{0}]:[{1}] FAIL WITH TIMEOUT", msg.Method, msg.RequestUri); |
272 |
} |
273 |
else |
274 |
{ |
275 |
throw; |
276 |
} |
277 |
innerException = exc; |
278 |
} |
279 |
catch (Exception exc) |
280 |
{ |
281 |
Log.FatalFormat("Unexpected error while sending:\n{0}\n{1}", msg, exc); |
282 |
throw; |
283 |
} |
284 |
|
285 |
//Report the failure |
286 |
var resultStatus = result.NullSafe(s => s.StatusCode); |
287 |
|
288 |
if (Log.IsDebugEnabled) |
289 |
{ |
290 |
if (timedOut) |
291 |
Log.DebugFormat("[TIMEOUT] [{0}]:[{1}]", msg.Method, msg.RequestUri); |
292 |
else |
293 |
Log.DebugFormat("[RESPONSE] [{0}]:[{1}] FAIL: [{2}]\n{3}", msg.Method, msg.RequestUri, |
294 |
resultStatus, result); |
295 |
} |
296 |
|
297 |
//Handle the failure |
298 |
|
299 |
if (--retries == 0) |
300 |
throw new RetryException("Failed too many times"); |
301 |
|
302 |
//Wait for service unavailable |
303 |
if (resultStatus == HttpStatusCode.ServiceUnavailable || |
304 |
resultStatus == HttpStatusCode.BadGateway || |
305 |
resultStatus == HttpStatusCode.InternalServerError ) |
306 |
{ |
307 |
|
308 |
Log.WarnFormat("[UNAVAILABLE] Waiting before retrying [{0}]:[{1}] due to [{2}]", msg.Method, |
309 |
msg.RequestUri, result.ReasonPhrase); |
310 |
await TaskEx.Delay(waitTime).ConfigureAwait(false); |
311 |
//increase the timeout for repeated timeouts |
312 |
if (waitTime < TimeSpan.FromSeconds(10)) |
313 |
waitTime = waitTime.Add(TimeSpan.FromSeconds(10)); |
314 |
} |
315 |
//Throw in all other cases, unless there was a client-side timeout |
316 |
else if (!timedOut) |
317 |
//Throw if there was no result (never received a reply) |
318 |
if (result == null) |
319 |
throw innerException ?? new RetryException(); |
320 |
else |
321 |
//Otherwise force the exception that corresponds to the response |
322 |
throw new HttpRequestWithStatusException(result.StatusCode,result.ReasonPhrase); |
323 |
} |
324 |
throw new RetryException(); |
325 |
} |
326 |
|
327 |
public static HttpRequestMessage Clone(this HttpRequestMessage message) |
328 |
{ |
329 |
var newMessage = new HttpRequestMessage(message.Method, message.RequestUri); |
330 |
foreach (var header in message.Headers) |
331 |
{ |
332 |
newMessage.Headers.Add(header.Key, header.Value); |
333 |
} |
334 |
newMessage.Content = message.Content.Clone(); |
335 |
|
336 |
foreach (var property in message.Properties) |
337 |
{ |
338 |
newMessage.Properties.Add(property.Key, property.Value); |
339 |
} |
340 |
return newMessage; |
341 |
} |
342 |
|
343 |
public static HttpContent Clone(this HttpContent content) |
344 |
{ |
345 |
if (content == null) |
346 |
return null; |
347 |
if (!(content is FileBlockContent || |
348 |
content is ByteArrayContentWithProgress || |
349 |
content is StringContent || |
350 |
content is ByteArrayContent)) |
351 |
throw new ArgumentException("content"); |
352 |
|
353 |
HttpContent newContent=null; |
354 |
if (content is FileBlockContent) |
355 |
{ |
356 |
newContent = (content as FileBlockContent).Clone(); |
357 |
} |
358 |
else if (content is ByteArrayContentWithProgress) |
359 |
{ |
360 |
newContent=(content as ByteArrayContentWithProgress).Clone(); |
361 |
} |
362 |
else if (content is StringContent) |
363 |
{ |
364 |
var fieldInfo = typeof(ByteArrayContent).GetField("content", |
365 |
BindingFlags.NonPublic | BindingFlags.Instance); |
366 |
var bytes = (byte[])fieldInfo.GetValue(content); |
367 |
var enc=Encoding.GetEncoding(content.Headers.ContentType.CharSet); |
368 |
var stringContent=enc.GetString(bytes); |
369 |
newContent = new StringContent(stringContent); |
370 |
} |
371 |
else if (content is ByteArrayContent) |
372 |
{ |
373 |
var fieldInfo = typeof(ByteArrayContent).GetField("content", |
374 |
BindingFlags.NonPublic | BindingFlags.Instance); |
375 |
var bytes = (byte[])fieldInfo.GetValue(content); |
376 |
newContent = new ByteArrayContent(bytes); |
377 |
} |
378 |
|
379 |
|
380 |
foreach (var header in content.Headers) |
381 |
{ |
382 |
if (!(header.Key == "Content-Type" && newContent.Headers.Contains(header.Key))) |
383 |
newContent.Headers.Add(header.Key, header.Value); |
384 |
} |
385 |
return newContent; |
386 |
} |
387 |
|
388 |
public static string GetFirstValue(this HttpResponseHeaders headers, string name) |
389 |
{ |
390 |
if (headers==null) |
391 |
throw new ArgumentNullException("headers"); |
392 |
if (String.IsNullOrWhiteSpace(name)) |
393 |
throw new ArgumentNullException("name"); |
394 |
Contract.EndContractBlock(); |
395 |
|
396 |
IEnumerable<string> values; |
397 |
if (headers.TryGetValues(name, out values)) |
398 |
{ |
399 |
return values.FirstOrDefault(); |
400 |
} |
401 |
return null; |
402 |
} |
403 |
|
404 |
public static Dictionary<string, string> GetMeta(this HttpResponseHeaders headers,string metaPrefix) |
405 |
{ |
406 |
if (headers == null) |
407 |
throw new ArgumentNullException("headers"); |
408 |
if (String.IsNullOrWhiteSpace(metaPrefix)) |
409 |
throw new ArgumentNullException("metaPrefix"); |
410 |
Contract.EndContractBlock(); |
411 |
|
412 |
var dict = (from header in headers |
413 |
where header.Key.StartsWith(metaPrefix) |
414 |
let name = header.Key.Substring(metaPrefix.Length) |
415 |
select new { Name = name, Value = String.Join(",",header.Value) }) |
416 |
.ToDictionary(t => t.Name, t => t.Value); |
417 |
return dict; |
418 |
} |
419 |
|
420 |
} |
421 |
|
422 |
internal static class PithosEAPCommon |
423 |
{ |
424 |
internal static void HandleProgress<T, E>(TaskCompletionSource<T> tcs, ProgressChangedEventArgs eventArgs, Func<E> getProgress, IProgress<E> callback) |
425 |
{ |
426 |
if (eventArgs.UserState != tcs) |
427 |
return; |
428 |
callback.Report(getProgress()); |
429 |
} |
430 |
|
431 |
internal static void HandleCompletion<T>(TaskCompletionSource<T> tcs, bool requireMatch, AsyncCompletedEventArgs e, Func<T> getResult, Action unregisterHandler) |
432 |
{ |
433 |
if (requireMatch) |
434 |
{ |
435 |
if (e.UserState != tcs) |
436 |
return; |
437 |
} |
438 |
try |
439 |
{ |
440 |
unregisterHandler(); |
441 |
} |
442 |
finally |
443 |
{ |
444 |
if (e.Cancelled) |
445 |
tcs.TrySetCanceled(); |
446 |
else if (e.Error != null) |
447 |
tcs.TrySetException(e.Error); |
448 |
else |
449 |
tcs.TrySetResult(getResult()); |
450 |
} |
451 |
} |
452 |
} |
453 |
|
454 |
} |