2 using System.Collections.Generic;
\r
3 using System.ComponentModel;
\r
4 using System.Diagnostics.Contracts;
\r
6 using System.Net.Http;
\r
7 using System.Reflection;
\r
11 using System.Threading;
\r
12 using System.Threading.Tasks;
\r
15 namespace Pithos.Network
\r
17 public static class WebExtensions
\r
19 private static readonly ILog Log = LogManager.GetLogger(MethodBase.GetCurrentMethod().DeclaringType);
\r
21 public static string ReadToEnd(this HttpWebResponse response)
\r
23 using (var stream = response.GetResponseStream())
\r
27 using (var reader = new StreamReader(stream))
\r
29 var body = reader.ReadToEnd();
\r
35 public static void LogError(this ILog log, HttpWebResponse response)
\r
37 if (log.IsDebugEnabled)
\r
39 if (response != null)
\r
41 var body = response.ReadToEnd();
\r
42 log.ErrorFormat("Headers:\n{0}\nBody:{1}", response.Headers, body);
\r
47 public static TextReader GetLoggedReader(this Stream stream, ILog log)
\r
49 var reader = new StreamReader(stream);
\r
50 if (!log.IsDebugEnabled)
\r
55 var body = reader.ReadToEnd();
\r
56 log.DebugFormat("JSON response: {0}", body);
\r
57 return new StringReader(body);
\r
62 public static IEnumerable<T> Range<T>(this IList<T> list, int start, int end)
\r
64 Contract.Requires(start >= 0);
\r
65 Contract.Requires(end < list.Count);
\r
66 Contract.Requires(start <= end);
\r
71 for (var i = 0; i <= end; i++)
\r
73 yield return list[i];
\r
78 public static Task<byte[]> UploadDataTaskAsync(this WebClient webClient, Uri address, string method, byte[] data, CancellationToken cancellationToken, IProgress<UploadProgressChangedEventArgs> progress)
\r
80 var tcs = new TaskCompletionSource<byte[]>(address);
\r
81 if (cancellationToken.IsCancellationRequested)
\r
83 tcs.TrySetCanceled();
\r
87 CancellationTokenRegistration ctr = cancellationToken.Register(()=>
\r
89 webClient.CancelAsync();
\r
91 UploadDataCompletedEventHandler completedHandler = null;
\r
92 UploadProgressChangedEventHandler progressHandler = null;
\r
93 if (progress != null)
\r
94 progressHandler = (s, e) => PithosEAPCommon.HandleProgress(tcs, e, () => e, progress);
\r
95 completedHandler =(sender, e) =>PithosEAPCommon.HandleCompletion(tcs, true, e,() => e.Result,() =>
\r
98 webClient.UploadDataCompleted -= completedHandler;
\r
99 webClient.UploadProgressChanged -= progressHandler;
\r
101 webClient.UploadDataCompleted += completedHandler;
\r
102 webClient.UploadProgressChanged += progressHandler;
\r
105 webClient.UploadDataAsync(address, method, data, tcs);
\r
106 if (cancellationToken.IsCancellationRequested)
\r
107 webClient.CancelAsync();
\r
111 webClient.UploadDataCompleted -= completedHandler;
\r
112 webClient.UploadProgressChanged -= progressHandler;
\r
119 public static async Task<T> WithRetries<T>(this Func<Task<T>> func, int retries)
\r
121 while (retries > 0)
\r
125 var result = await func();
\r
128 catch (Exception exc)
\r
130 if (--retries == 0)
\r
131 throw new RetryException("Failed too many times", exc);
\r
134 throw new RetryException();
\r
137 public static async Task<HttpResponseMessage> WithRetriesForWeb(this Func<Task<HttpResponseMessage>> func, int retries)
\r
139 var waitTime = TimeSpan.FromSeconds(10);
\r
140 var acceptedCodes = new[] { HttpStatusCode.Moved, HttpStatusCode.MovedPermanently, HttpStatusCode.Found, HttpStatusCode.Redirect,HttpStatusCode.SeeOther,
\r
141 HttpStatusCode.RedirectMethod,HttpStatusCode.NotModified,HttpStatusCode.TemporaryRedirect,HttpStatusCode.RedirectKeepVerb};
\r
142 while (retries > 0)
\r
144 var result = await func();
\r
145 if (result.IsSuccessStatusCode || acceptedCodes.Contains(result.StatusCode))
\r
148 if (--retries == 0)
\r
149 throw new RetryException("Failed too many times");
\r
151 //Wait for service unavailable
\r
152 if (result.StatusCode == HttpStatusCode.ServiceUnavailable)
\r
154 Log.InfoFormat("[UNAVAILABLE] Waiting before retry: {0}",result.ReasonPhrase);
\r
155 await TaskEx.Delay(waitTime);
\r
156 //increase the timeout for repeated timeouts
\r
157 if (waitTime<TimeSpan.FromSeconds(10))
\r
158 waitTime = waitTime.Add(TimeSpan.FromSeconds(10));
\r
160 //Throw in all other cases
\r
162 result.EnsureSuccessStatusCode();
\r
164 throw new RetryException();
\r
168 public static async Task<string> GetStringAsyncWithRetries(this HttpClient client, Uri requestUri, int retries,DateTime? since=null)
\r
170 var request = new HttpRequestMessage(HttpMethod.Get, requestUri);
\r
171 if (since.HasValue)
\r
173 request.Headers.IfModifiedSince = since.Value;
\r
175 //Func<Task<HttpResponseMessage>> call = () => _baseHttpClient.SendAsync(request);
\r
176 using (var response = await client.SendAsyncWithRetries(request,3))
\r
178 if (response.StatusCode == HttpStatusCode.NoContent)
\r
179 return String.Empty;
\r
181 var content = await response.Content.ReadAsStringAsync();
\r
187 public static Task<HttpResponseMessage> HeadAsyncWithRetries(this HttpClient client, Uri requestUri, int retries)
\r
189 return client.HeadAsyncWithRetries(requestUri, retries, HttpCompletionOption.ResponseContentRead, CancellationToken.None);
\r
192 public static Task<HttpResponseMessage> HeadAsyncWithRetries(this HttpClient client, Uri requestUri, int retries, HttpCompletionOption completionOption, CancellationToken cancellationToken)
\r
194 return client.SendAsyncWithRetries(new HttpRequestMessage(HttpMethod.Head, requestUri), retries, completionOption, cancellationToken);
\r
197 public static Task<HttpResponseMessage> GetAsyncWithRetries(this HttpClient client, Uri requestUri, int retries)
\r
199 return client.GetAsyncWithRetries(requestUri, retries, HttpCompletionOption.ResponseContentRead, CancellationToken.None);
\r
202 public static Task<HttpResponseMessage> GetAsyncWithRetries(this HttpClient client, Uri requestUri, int retries, HttpCompletionOption completionOption, CancellationToken cancellationToken)
\r
204 return client.SendAsyncWithRetries(new HttpRequestMessage(HttpMethod.Get, requestUri), retries, completionOption, cancellationToken);
\r
208 public static Task<HttpResponseMessage> SendAsyncWithRetries(this HttpClient client,HttpRequestMessage message, int retries)
\r
210 return client.SendAsyncWithRetries(message, retries, HttpCompletionOption.ResponseContentRead, CancellationToken.None);
\r
213 public static async Task<HttpResponseMessage> SendAsyncWithRetries(this HttpClient client,HttpRequestMessage message, int retries,HttpCompletionOption completionOption, CancellationToken cancellationToken)
\r
215 var waitTime = TimeSpan.FromSeconds(10);
\r
216 var acceptedCodes = new[] { HttpStatusCode.Moved, HttpStatusCode.MovedPermanently, HttpStatusCode.Found, HttpStatusCode.Redirect,HttpStatusCode.SeeOther,
\r
217 HttpStatusCode.RedirectMethod,HttpStatusCode.NotModified,HttpStatusCode.TemporaryRedirect,HttpStatusCode.RedirectKeepVerb,HttpStatusCode.Conflict};
\r
218 while (retries > 0)
\r
220 if (Log.IsDebugEnabled)
\r
221 Log.DebugFormat("[REQUEST] {0}",message);
\r
222 var result = await client.SendAsync(message,completionOption,cancellationToken);
\r
223 if (result.IsSuccessStatusCode || acceptedCodes.Contains(result.StatusCode))
\r
225 if (Log.IsDebugEnabled)
\r
226 Log.DebugFormat("[RESPONSE] [{0}]:[{1}] OK: [{2}]", message.Method,message.RequestUri, result.StatusCode);
\r
229 //Failed, will have to abort or retry
\r
230 if (Log.IsDebugEnabled)
\r
231 Log.DebugFormat("[RESPONSE] [{0}]:[{1}] FAIL: [{2}]\n{3}", message.Method,message.RequestUri, result.StatusCode,result);
\r
233 if (--retries == 0)
\r
234 throw new RetryException("Failed too many times");
\r
236 //Wait for service unavailable
\r
237 if (result.StatusCode == HttpStatusCode.ServiceUnavailable)
\r
240 Log.WarnFormat("[UNAVAILABLE] Waiting before retrying [{0}]:[{1}] due to [{2}]",message.Method, message.RequestUri,result.ReasonPhrase);
\r
241 await TaskEx.Delay(waitTime);
\r
242 //increase the timeout for repeated timeouts
\r
243 if (waitTime<TimeSpan.FromSeconds(10))
\r
244 waitTime = waitTime.Add(TimeSpan.FromSeconds(10));
\r
246 //Throw in all other cases
\r
248 result.EnsureSuccessStatusCode();
\r
250 throw new RetryException();
\r
255 internal static class PithosEAPCommon
\r
257 internal static void HandleProgress<T, E>(TaskCompletionSource<T> tcs, ProgressChangedEventArgs eventArgs, Func<E> getProgress, IProgress<E> callback)
\r
259 if (eventArgs.UserState != tcs)
\r
261 callback.Report(getProgress());
\r
264 internal static void HandleCompletion<T>(TaskCompletionSource<T> tcs, bool requireMatch, AsyncCompletedEventArgs e, Func<T> getResult, Action unregisterHandler)
\r
268 if (e.UserState != tcs)
\r
273 unregisterHandler();
\r
278 tcs.TrySetCanceled();
\r
279 else if (e.Error != null)
\r
280 tcs.TrySetException(e.Error);
\r
282 tcs.TrySetResult(getResult());
\r