X-Git-Url: https://code.grnet.gr/git/pithos-ms-client/blobdiff_plain/ebc37b0dd4943d8490fc646854932bc75e8be425..cfb091032c1ea51d109dccf68322ce2ee63eaab5:/trunk/Pithos.Network/WebExtensions.cs diff --git a/trunk/Pithos.Network/WebExtensions.cs b/trunk/Pithos.Network/WebExtensions.cs index 6676d65..a1fb7d8 100644 --- a/trunk/Pithos.Network/WebExtensions.cs +++ b/trunk/Pithos.Network/WebExtensions.cs @@ -1,53 +1,373 @@ -using System; -using System.Collections.Generic; -using System.Linq; -using System.Text; -using System.Net; -using System.IO; -using log4net; - -namespace Pithos.Network -{ - public static class WebExtensions - { - public static string ReadToEnd(this HttpWebResponse response) - { - using (var stream = response.GetResponseStream()) - { - if (stream == null) - return null; - using (var reader = new StreamReader(stream)) - { - var body = reader.ReadToEnd(); - return body; - } - } - } - - public static void LogError(this ILog log,HttpWebResponse response) - { - if (log.IsDebugEnabled) - { - if (response != null) - { - var body = response.ReadToEnd(); - log.ErrorFormat("Headers:\n{0}\nBody:{1}", response.Headers,body); - } - } - } - - public static TextReader GetLoggedReader(this Stream stream, ILog log) - { - var reader = new StreamReader(stream); - if (!log.IsDebugEnabled) - return reader; - - using (reader) - { - var body = reader.ReadToEnd(); - log.DebugFormat("JSON response: {0}", body); - return new StringReader(body); - } - } - } -} +using System; +using System.Collections.Generic; +using System.ComponentModel; +using System.Diagnostics.Contracts; +using System.Linq; +using System.Net.Http; +using System.Net.Http.Headers; +using System.Reflection; +using System.Text; +using System.Net; +using System.IO; +using System.Threading; +using System.Threading.Tasks; +using log4net; +using System.ServiceModel.Channels; + +namespace Pithos.Network +{ + public static class WebExtensions + { + private static readonly ILog Log = LogManager.GetLogger(MethodBase.GetCurrentMethod().DeclaringType); + + public static string ReadToEnd(this HttpWebResponse response) + { + using (var stream = response.GetResponseStream()) + { + if (stream == null) + return null; + using (var reader = new StreamReader(stream)) + { + var body = reader.ReadToEnd(); + return body; + } + } + } + + public static void LogError(this ILog log, HttpWebResponse response) + { + if (log.IsDebugEnabled) + { + if (response != null) + { + var body = response.ReadToEnd(); + log.ErrorFormat("Headers:\n{0}\nBody:{1}", response.Headers, body); + } + } + } + + public static TextReader GetLoggedReader(this Stream stream, ILog log) + { + var reader = new StreamReader(stream); + if (!log.IsDebugEnabled) + return reader; + + using (reader) + { + var body = reader.ReadToEnd(); + log.DebugFormat("JSON response: {0}", body); + return new StringReader(body); + } + } + + + public static IEnumerable Range(this IList list, int start, int end) + { + Contract.Requires(start >= 0); + Contract.Requires(end < list.Count); + Contract.Requires(start <= end); + + if (list == null) + yield break; + + for (var i = 0; i <= end; i++) + { + yield return list[i]; + } + + } + + public static Task UploadDataTaskAsync(this WebClient webClient, Uri address, string method, byte[] data, CancellationToken cancellationToken, IProgress progress) + { + var tcs = new TaskCompletionSource(address); + if (cancellationToken.IsCancellationRequested) + { + tcs.TrySetCanceled(); + } + else + { + CancellationTokenRegistration ctr = cancellationToken.Register(()=> + { + webClient.CancelAsync(); + }); + UploadDataCompletedEventHandler completedHandler = null; + UploadProgressChangedEventHandler progressHandler = null; + if (progress != null) + progressHandler = (s, e) => PithosEAPCommon.HandleProgress(tcs, e, () => e, progress); + completedHandler =(sender, e) =>PithosEAPCommon.HandleCompletion(tcs, true, e,() => e.Result,() => + { + ctr.Dispose(); + webClient.UploadDataCompleted -= completedHandler; + webClient.UploadProgressChanged -= progressHandler; + }); + webClient.UploadDataCompleted += completedHandler; + webClient.UploadProgressChanged += progressHandler; + try + { + webClient.UploadDataAsync(address, method, data, tcs); + if (cancellationToken.IsCancellationRequested) + webClient.CancelAsync(); + } + catch + { + webClient.UploadDataCompleted -= completedHandler; + webClient.UploadProgressChanged -= progressHandler; + throw; + } + } + return tcs.Task; + } + + public static async Task WithRetries(this Func> func, int retries) + { + while (retries > 0) + { + try + { + var result = await func().ConfigureAwait(false); + return result; + } + catch (Exception exc) + { + if (--retries == 0) + throw new RetryException("Failed too many times", exc); + } + } + throw new RetryException(); + } + + public static async Task WithRetriesForWeb(this Func> func, int retries) + { + var waitTime = TimeSpan.FromSeconds(10); + var acceptedCodes = new[] { HttpStatusCode.Moved, HttpStatusCode.MovedPermanently, HttpStatusCode.Found, HttpStatusCode.Redirect,HttpStatusCode.SeeOther, + HttpStatusCode.RedirectMethod,HttpStatusCode.NotModified,HttpStatusCode.TemporaryRedirect,HttpStatusCode.RedirectKeepVerb}; + while (retries > 0) + { + var result = await func().ConfigureAwait(false); + if (result.IsSuccessStatusCode || acceptedCodes.Contains(result.StatusCode)) + return result; + + if (--retries == 0) + throw new RetryException("Failed too many times"); + + //Wait for service unavailable + if (result.StatusCode == HttpStatusCode.ServiceUnavailable) + { + Log.InfoFormat("[UNAVAILABLE] Waiting before retry: {0}",result.ReasonPhrase); + await TaskEx.Delay(waitTime).ConfigureAwait(false); + //increase the timeout for repeated timeouts + if (waitTime GetStringAsyncWithRetries(this HttpClient client, Uri requestUri, int retries,DateTime? since=null) + { + var request = new HttpRequestMessage(HttpMethod.Get, requestUri); + if (since.HasValue) + { + request.Headers.IfModifiedSince = since.Value; + } + //Func> call = () => _baseHttpClient.SendAsync(request); + using (var response = await client.SendAsyncWithRetries(request,3).ConfigureAwait(false)) + { + if (response.StatusCode == HttpStatusCode.NoContent) + return String.Empty; + + var content = await response.Content.ReadAsStringAsync().ConfigureAwait(false); + return content; + } + + } + + public static Task HeadAsyncWithRetries(this HttpClient client, Uri requestUri, int retries,bool acceptNotFound=false) + { + return client.HeadAsyncWithRetries(requestUri, retries, acceptNotFound,HttpCompletionOption.ResponseContentRead, CancellationToken.None); + } + + public static Task HeadAsyncWithRetries(this HttpClient client, Uri requestUri, int retries, bool acceptNotFound,HttpCompletionOption completionOption, CancellationToken cancellationToken) + { + return client.SendAsyncWithRetries(new HttpRequestMessage(HttpMethod.Head, requestUri), retries, acceptNotFound,completionOption, cancellationToken); + } + + public static Task GetAsyncWithRetries(this HttpClient client, Uri requestUri, int retries) + { + return client.GetAsyncWithRetries(requestUri, retries, HttpCompletionOption.ResponseContentRead, CancellationToken.None); + } + + public static Task GetAsyncWithRetries(this HttpClient client, Uri requestUri, int retries, HttpCompletionOption completionOption, CancellationToken cancellationToken) + { + return client.SendAsyncWithRetries(new HttpRequestMessage(HttpMethod.Get, requestUri), retries, false,completionOption, cancellationToken); + } + + + public static Task SendAsyncWithRetries(this HttpClient client,HttpRequestMessage message, int retries) + { + return client.SendAsyncWithRetries(message, retries,false, HttpCompletionOption.ResponseContentRead, CancellationToken.None); + } + + public static async Task SendAsyncWithRetries(this HttpClient client,HttpRequestMessage message, int retries,bool acceptNotFound,HttpCompletionOption completionOption, CancellationToken cancellationToken) + { + var waitTime = TimeSpan.FromSeconds(10); + var acceptedCodes =acceptNotFound + ? new[] {HttpStatusCode.NotFound, HttpStatusCode.Moved, HttpStatusCode.MovedPermanently, HttpStatusCode.Found, HttpStatusCode.Redirect,HttpStatusCode.SeeOther, + HttpStatusCode.RedirectMethod,HttpStatusCode.NotModified,HttpStatusCode.TemporaryRedirect,HttpStatusCode.RedirectKeepVerb,HttpStatusCode.Conflict} + : new[] { HttpStatusCode.Moved, HttpStatusCode.MovedPermanently, HttpStatusCode.Found, HttpStatusCode.Redirect,HttpStatusCode.SeeOther, + HttpStatusCode.RedirectMethod,HttpStatusCode.NotModified,HttpStatusCode.TemporaryRedirect,HttpStatusCode.RedirectKeepVerb,HttpStatusCode.Conflict}; + + + while (retries > 0) + { + var timedOut = false; + if (Log.IsDebugEnabled) + Log.DebugFormat("[REQUEST] {0}", message); + HttpResponseMessage result=null; + + + + try + { + result = await client.SendAsync(message, completionOption, cancellationToken).ConfigureAwait(false); + } + catch (WebException exc) + { + if (exc.Status != WebExceptionStatus.Timeout) + throw; + timedOut = true; + if (Log.IsDebugEnabled) + Log.DebugFormat("[RESPONSE] [{0}]:[{1}] FAIL WITH TIMEOUT", message.Method, message.RequestUri); + } + catch(TaskCanceledException exc) + { + //If the task was cancelled due to a timeout, retry it + if (!exc.CancellationToken.IsCancellationRequested) + { + timedOut = true; + if (Log.IsDebugEnabled) + Log.DebugFormat("[RESPONSE] [{0}]:[{1}] FAIL WITH TIMEOUT", message.Method, message.RequestUri); + } + else + { + throw; + } + } + catch (Exception exc) + { + Log.FatalFormat("Unexpected error while sending:\n{0}\n{1}", message, exc); + throw; + } + + if (timedOut) + { + if (--retries == 0) + throw new RetryException("Failed too many times"); + continue; + } + + if (result.IsSuccessStatusCode || acceptedCodes.Contains(result.StatusCode)) + { + if (Log.IsDebugEnabled) + Log.DebugFormat("[RESPONSE] [{0}]:[{1}] OK: [{2}]", message.Method, message.RequestUri, + result.StatusCode); + return result; + } + //Failed, will have to abort or retry + if (Log.IsDebugEnabled) + Log.DebugFormat("[RESPONSE] [{0}]:[{1}] FAIL: [{2}]\n{3}", message.Method, message.RequestUri, + result.StatusCode, result); + + if (--retries == 0) + throw new RetryException("Failed too many times"); + + //Wait for service unavailable + if (result.StatusCode == HttpStatusCode.ServiceUnavailable || + result.StatusCode == HttpStatusCode.BadGateway) + { + + Log.WarnFormat("[UNAVAILABLE] Waiting before retrying [{0}]:[{1}] due to [{2}]", message.Method, + message.RequestUri, result.ReasonPhrase); + await TaskEx.Delay(waitTime).ConfigureAwait(false); + //increase the timeout for repeated timeouts + if (waitTime < TimeSpan.FromSeconds(10)) + waitTime = waitTime.Add(TimeSpan.FromSeconds(10)); + } + //Throw in all other cases + else + result.EnsureSuccessStatusCode(); + } + throw new RetryException(); + } + + public static string GetFirstValue(this HttpResponseHeaders headers, string name) + { + if (headers==null) + throw new ArgumentNullException("headers"); + if (String.IsNullOrWhiteSpace(name)) + throw new ArgumentNullException("name"); + Contract.EndContractBlock(); + + IEnumerable values; + if (headers.TryGetValues(name, out values)) + { + return values.FirstOrDefault(); + } + return null; + } + + public static Dictionary GetMeta(this HttpResponseHeaders headers,string metaPrefix) + { + if (headers == null) + throw new ArgumentNullException("headers"); + if (String.IsNullOrWhiteSpace(metaPrefix)) + throw new ArgumentNullException("metaPrefix"); + Contract.EndContractBlock(); + + var dict = (from header in headers + where header.Key.StartsWith(metaPrefix) + let name = header.Key.Substring(metaPrefix.Length) + select new { Name = name, Value = String.Join(",",header.Value) }) + .ToDictionary(t => t.Name, t => t.Value); + return dict; + } + + } + + internal static class PithosEAPCommon + { + internal static void HandleProgress(TaskCompletionSource tcs, ProgressChangedEventArgs eventArgs, Func getProgress, IProgress callback) + { + if (eventArgs.UserState != tcs) + return; + callback.Report(getProgress()); + } + + internal static void HandleCompletion(TaskCompletionSource tcs, bool requireMatch, AsyncCompletedEventArgs e, Func getResult, Action unregisterHandler) + { + if (requireMatch) + { + if (e.UserState != tcs) + return; + } + try + { + unregisterHandler(); + } + finally + { + if (e.Cancelled) + tcs.TrySetCanceled(); + else if (e.Error != null) + tcs.TrySetException(e.Error); + else + tcs.TrySetResult(getResult()); + } + } + } + +}