2 using System.Collections.Generic;
\r
3 using System.ComponentModel;
\r
4 using System.Diagnostics.Contracts;
\r
6 using System.Net.Http;
\r
7 using System.Net.Http.Headers;
\r
8 using System.Reflection;
\r
12 using System.Threading;
\r
13 using System.Threading.Tasks;
\r
14 using Pithos.Interfaces;
\r
16 using System.ServiceModel.Channels;
\r
18 namespace Pithos.Network
\r
20 public static class WebExtensions
\r
22 private static readonly ILog Log = LogManager.GetLogger(MethodBase.GetCurrentMethod().DeclaringType);
\r
24 public static string ReadToEnd(this HttpWebResponse response)
\r
26 using (var stream = response.GetResponseStream())
\r
30 using (var reader = new StreamReader(stream))
\r
32 var body = reader.ReadToEnd();
\r
38 public static void LogError(this ILog log, HttpWebResponse response)
\r
40 if (log.IsDebugEnabled)
\r
42 if (response != null)
\r
44 var body = response.ReadToEnd();
\r
45 log.ErrorFormat("Headers:\n{0}\nBody:{1}", response.Headers, body);
\r
50 public static TextReader GetLoggedReader(this Stream stream, ILog log)
\r
52 var reader = new StreamReader(stream);
\r
53 if (!log.IsDebugEnabled)
\r
58 var body = reader.ReadToEnd();
\r
59 log.DebugFormat("JSON response: {0}", body);
\r
60 return new StringReader(body);
\r
65 public static IEnumerable<T> Range<T>(this IList<T> list, int start, int end)
\r
67 Contract.Requires(start >= 0);
\r
68 Contract.Requires(end < list.Count);
\r
69 Contract.Requires(start <= end);
\r
74 for (var i = 0; i <= end; i++)
\r
76 yield return list[i];
\r
81 public static Task<byte[]> UploadDataTaskAsync(this WebClient webClient, Uri address, string method, byte[] data, CancellationToken cancellationToken, IProgress<UploadProgressChangedEventArgs> progress)
\r
83 var tcs = new TaskCompletionSource<byte[]>(address);
\r
84 if (cancellationToken.IsCancellationRequested)
\r
86 tcs.TrySetCanceled();
\r
90 CancellationTokenRegistration ctr = cancellationToken.Register(()=>
\r
92 webClient.CancelAsync();
\r
94 UploadDataCompletedEventHandler completedHandler = null;
\r
95 UploadProgressChangedEventHandler progressHandler = null;
\r
96 if (progress != null)
\r
97 progressHandler = (s, e) => PithosEAPCommon.HandleProgress(tcs, e, () => e, progress);
\r
98 completedHandler =(sender, e) =>PithosEAPCommon.HandleCompletion(tcs, true, e,() => e.Result,() =>
\r
101 webClient.UploadDataCompleted -= completedHandler;
\r
102 webClient.UploadProgressChanged -= progressHandler;
\r
104 webClient.UploadDataCompleted += completedHandler;
\r
105 webClient.UploadProgressChanged += progressHandler;
\r
108 webClient.UploadDataAsync(address, method, data, tcs);
\r
109 if (cancellationToken.IsCancellationRequested)
\r
110 webClient.CancelAsync();
\r
114 webClient.UploadDataCompleted -= completedHandler;
\r
115 webClient.UploadProgressChanged -= progressHandler;
\r
122 public static async Task<T> WithRetries<T>(this Func<Task<T>> func, int retries)
\r
124 while (retries > 0)
\r
128 var result = await func().ConfigureAwait(false);
\r
131 catch (Exception exc)
\r
133 if (--retries == 0)
\r
134 throw new RetryException("Failed too many times", exc);
\r
137 throw new RetryException();
\r
140 public static async Task<HttpResponseMessage> WithRetriesForWeb(this Func<Task<HttpResponseMessage>> func, int retries)
\r
142 var waitTime = TimeSpan.FromSeconds(10);
\r
143 var acceptedCodes = new[] { HttpStatusCode.Moved, HttpStatusCode.MovedPermanently, HttpStatusCode.Found, HttpStatusCode.Redirect,HttpStatusCode.SeeOther,
\r
144 HttpStatusCode.RedirectMethod,HttpStatusCode.NotModified,HttpStatusCode.TemporaryRedirect,HttpStatusCode.RedirectKeepVerb};
\r
145 while (retries > 0)
\r
147 var result = await func().ConfigureAwait(false);
\r
148 if (result.IsSuccessStatusCode || acceptedCodes.Contains(result.StatusCode))
\r
151 if (--retries == 0)
\r
152 throw new RetryException("Failed too many times");
\r
154 //Wait for service unavailable
\r
155 if (result.StatusCode == HttpStatusCode.ServiceUnavailable)
\r
157 Log.InfoFormat("[UNAVAILABLE] Waiting before retry: {0}",result.ReasonPhrase);
\r
158 await TaskEx.Delay(waitTime).ConfigureAwait(false);
\r
159 //increase the timeout for repeated timeouts
\r
160 if (waitTime<TimeSpan.FromSeconds(10))
\r
161 waitTime = waitTime.Add(TimeSpan.FromSeconds(10));
\r
163 //Throw in all other cases
\r
165 result.EnsureSuccessStatusCode();
\r
167 throw new RetryException();
\r
171 public static async Task<string> GetStringAsyncWithRetries(this HttpClient client, Uri requestUri, int retries,DateTime? since=null)
\r
173 var request = new HttpRequestMessage(HttpMethod.Get, requestUri);
\r
174 if (since.HasValue)
\r
176 request.Headers.IfModifiedSince = since.Value;
\r
178 //Func<Task<HttpResponseMessage>> call = () => _baseHttpClient.SendAsync(request);
\r
179 using (var response = await client.SendAsyncWithRetries(request,3).ConfigureAwait(false))
\r
181 if (response.StatusCode == HttpStatusCode.NoContent)
\r
182 return String.Empty;
\r
184 var content = await response.Content.ReadAsStringAsync().ConfigureAwait(false);
\r
190 public static Task<HttpResponseMessage> HeadAsyncWithRetries(this HttpClient client, Uri requestUri, int retries,bool acceptNotFound=false)
\r
192 return client.HeadAsyncWithRetries(requestUri, retries, acceptNotFound,HttpCompletionOption.ResponseContentRead, CancellationToken.None);
\r
195 public static Task<HttpResponseMessage> HeadAsyncWithRetries(this HttpClient client, Uri requestUri, int retries, bool acceptNotFound,HttpCompletionOption completionOption, CancellationToken cancellationToken)
\r
197 return client.SendAsyncWithRetries(new HttpRequestMessage(HttpMethod.Head, requestUri), retries, acceptNotFound,completionOption, cancellationToken);
\r
200 public static Task<HttpResponseMessage> GetAsyncWithRetries(this HttpClient client, Uri requestUri, int retries)
\r
202 return client.GetAsyncWithRetries(requestUri, retries, HttpCompletionOption.ResponseContentRead, CancellationToken.None);
\r
205 public static Task<HttpResponseMessage> GetAsyncWithRetries(this HttpClient client, Uri requestUri, int retries, HttpCompletionOption completionOption, CancellationToken cancellationToken)
\r
207 return client.SendAsyncWithRetries(new HttpRequestMessage(HttpMethod.Get, requestUri), retries, false,completionOption, cancellationToken);
\r
211 public static Task<HttpResponseMessage> SendAsyncWithRetries(this HttpClient client,HttpRequestMessage message, int retries)
\r
213 return client.SendAsyncWithRetries(message, retries,false, HttpCompletionOption.ResponseContentRead, CancellationToken.None);
\r
216 public static async Task<HttpResponseMessage> SendAsyncWithRetries(this HttpClient client,HttpRequestMessage message, int retries,bool acceptNotFound,HttpCompletionOption completionOption, CancellationToken cancellationToken)
\r
218 var waitTime = TimeSpan.FromSeconds(10);
\r
219 var acceptedCodes = new HashSet<HttpStatusCode> { HttpStatusCode.Moved, HttpStatusCode.MovedPermanently,
\r
220 HttpStatusCode.Found, HttpStatusCode.Redirect,HttpStatusCode.SeeOther,HttpStatusCode.RedirectMethod,
\r
221 HttpStatusCode.NotModified,HttpStatusCode.TemporaryRedirect,HttpStatusCode.RedirectKeepVerb,
\r
222 HttpStatusCode.Conflict};
\r
223 if (acceptNotFound)
\r
224 acceptedCodes.Add(HttpStatusCode.NotFound);
\r
226 while (retries > 0)
\r
228 var timedOut = false;
\r
229 if (Log.IsDebugEnabled)
\r
230 Log.DebugFormat("[REQUEST] {0}", message);
\r
231 HttpResponseMessage result=null;
\r
234 var msg = message.Clone();
\r
235 Exception innerException=null;
\r
238 result = await client.SendAsync(msg, completionOption, cancellationToken).ConfigureAwait(false);
\r
239 if (result.IsSuccessStatusCode || acceptedCodes.Contains(result.StatusCode))
\r
241 if (Log.IsDebugEnabled)
\r
242 Log.DebugFormat("[RESPONSE] [{0}]:[{1}] OK: [{2}]", msg.Method, msg.RequestUri,
\r
243 result.StatusCode);
\r
248 catch (HttpRequestException exc)
\r
250 //A timeout or other error caused a failure without receiving a response from the server
\r
251 if (Log.IsDebugEnabled)
\r
252 Log.DebugFormat("[RESPONSE] [{0}]:[{1}] HTTP FAIL :\n{2}", msg.Method, msg.RequestUri,exc);
\r
253 innerException = exc;
\r
255 catch (WebException exc)
\r
257 if (exc.Status != WebExceptionStatus.Timeout)
\r
260 if (Log.IsDebugEnabled)
\r
261 Log.DebugFormat("[RESPONSE] [{0}]:[{1}] FAIL WITH TIMEOUT", msg.Method, msg.RequestUri);
\r
262 innerException = exc;
\r
264 catch(TaskCanceledException exc)
\r
266 //If the task was cancelled due to a timeout, retry it
\r
267 if (!exc.CancellationToken.IsCancellationRequested)
\r
270 if (Log.IsDebugEnabled)
\r
271 Log.DebugFormat("[RESPONSE] [{0}]:[{1}] FAIL WITH TIMEOUT", msg.Method, msg.RequestUri);
\r
277 innerException = exc;
\r
279 catch (Exception exc)
\r
281 Log.FatalFormat("Unexpected error while sending:\n{0}\n{1}", msg, exc);
\r
285 //Report the failure
\r
286 var resultStatus = result.NullSafe(s => s.StatusCode);
\r
288 if (Log.IsDebugEnabled)
\r
291 Log.DebugFormat("[TIMEOUT] [{0}]:[{1}]", msg.Method, msg.RequestUri);
\r
293 Log.DebugFormat("[RESPONSE] [{0}]:[{1}] FAIL: [{2}]\n{3}", msg.Method, msg.RequestUri,
\r
294 resultStatus, result);
\r
297 //Handle the failure
\r
299 if (--retries == 0)
\r
300 throw new RetryException("Failed too many times");
\r
302 //Wait for service unavailable
\r
303 if (resultStatus == HttpStatusCode.ServiceUnavailable ||
\r
304 resultStatus == HttpStatusCode.BadGateway ||
\r
305 resultStatus == HttpStatusCode.InternalServerError )
\r
308 Log.WarnFormat("[UNAVAILABLE] Waiting before retrying [{0}]:[{1}] due to [{2}]", msg.Method,
\r
309 msg.RequestUri, result.ReasonPhrase);
\r
310 await TaskEx.Delay(waitTime).ConfigureAwait(false);
\r
311 //increase the timeout for repeated timeouts
\r
312 if (waitTime < TimeSpan.FromSeconds(10))
\r
313 waitTime = waitTime.Add(TimeSpan.FromSeconds(10));
\r
315 //Throw in all other cases, unless there was a client-side timeout
\r
316 else if (!timedOut)
\r
317 //Throw if there was no result (never received a reply)
\r
318 if (result == null)
\r
319 throw innerException ?? new RetryException();
\r
321 //Otherwise force the exception that corresponds to the response
\r
322 throw new HttpRequestWithStatusException(result.StatusCode,result.ReasonPhrase);
\r
324 throw new RetryException();
\r
327 public static HttpRequestMessage Clone(this HttpRequestMessage message)
\r
329 var newMessage = new HttpRequestMessage(message.Method, message.RequestUri);
\r
330 foreach (var header in message.Headers)
\r
332 newMessage.Headers.Add(header.Key, header.Value);
\r
334 newMessage.Content = message.Content.Clone();
\r
336 foreach (var property in message.Properties)
\r
338 newMessage.Properties.Add(property.Key, property.Value);
\r
343 public static HttpContent Clone(this HttpContent content)
\r
345 if (content == null)
\r
347 if (!(content is FileBlockContent ||
\r
348 content is ByteArrayContentWithProgress ||
\r
349 content is StringContent ||
\r
350 content is ByteArrayContent))
\r
351 throw new ArgumentException("content");
\r
353 HttpContent newContent=null;
\r
354 if (content is FileBlockContent)
\r
356 newContent = (content as FileBlockContent).Clone();
\r
358 else if (content is ByteArrayContentWithProgress)
\r
360 newContent=(content as ByteArrayContentWithProgress).Clone();
\r
362 else if (content is StringContent)
\r
364 var fieldInfo = typeof(ByteArrayContent).GetField("content",
\r
365 BindingFlags.NonPublic | BindingFlags.Instance);
\r
366 var bytes = (byte[])fieldInfo.GetValue(content);
\r
367 var enc=Encoding.GetEncoding(content.Headers.ContentType.CharSet);
\r
368 var stringContent=enc.GetString(bytes);
\r
369 newContent = new StringContent(stringContent);
\r
371 else if (content is ByteArrayContent)
\r
373 var fieldInfo = typeof(ByteArrayContent).GetField("content",
\r
374 BindingFlags.NonPublic | BindingFlags.Instance);
\r
375 var bytes = (byte[])fieldInfo.GetValue(content);
\r
376 newContent = new ByteArrayContent(bytes);
\r
380 foreach (var header in content.Headers)
\r
382 if (!(header.Key == "Content-Type" && newContent.Headers.Contains(header.Key)))
\r
383 newContent.Headers.Add(header.Key, header.Value);
\r
388 public static string GetFirstValue(this HttpResponseHeaders headers, string name)
\r
391 throw new ArgumentNullException("headers");
\r
392 if (String.IsNullOrWhiteSpace(name))
\r
393 throw new ArgumentNullException("name");
\r
394 Contract.EndContractBlock();
\r
396 IEnumerable<string> values;
\r
397 if (headers.TryGetValues(name, out values))
\r
399 return values.FirstOrDefault();
\r
404 public static Dictionary<string, string> GetMeta(this HttpResponseHeaders headers,string metaPrefix)
\r
406 if (headers == null)
\r
407 throw new ArgumentNullException("headers");
\r
408 if (String.IsNullOrWhiteSpace(metaPrefix))
\r
409 throw new ArgumentNullException("metaPrefix");
\r
410 Contract.EndContractBlock();
\r
412 var dict = (from header in headers
\r
413 where header.Key.StartsWith(metaPrefix)
\r
414 let name = header.Key.Substring(metaPrefix.Length)
\r
415 select new { Name = name, Value = String.Join(",",header.Value) })
\r
416 .ToDictionary(t => t.Name, t => t.Value);
\r
422 internal static class PithosEAPCommon
\r
424 internal static void HandleProgress<T, E>(TaskCompletionSource<T> tcs, ProgressChangedEventArgs eventArgs, Func<E> getProgress, IProgress<E> callback)
\r
426 if (eventArgs.UserState != tcs)
\r
428 callback.Report(getProgress());
\r
431 internal static void HandleCompletion<T>(TaskCompletionSource<T> tcs, bool requireMatch, AsyncCompletedEventArgs e, Func<T> getResult, Action unregisterHandler)
\r
435 if (e.UserState != tcs)
\r
440 unregisterHandler();
\r
445 tcs.TrySetCanceled();
\r
446 else if (e.Error != null)
\r
447 tcs.TrySetException(e.Error);
\r
449 tcs.TrySetResult(getResult());
\r