Some timeout issues
[pithos-ms-client] / trunk / Pithos.Network / WebExtensions.cs
index 6676d65..a1fb7d8 100644 (file)
-using System;
-using System.Collections.Generic;
-using System.Linq;
-using System.Text;
-using System.Net;
-using System.IO;
-using log4net;
-
-namespace Pithos.Network
-{
-    public static class WebExtensions
-    {
-        public static string ReadToEnd(this HttpWebResponse response)
-        {
-            using (var stream = response.GetResponseStream())
-            {
-                if (stream == null)
-                    return null;
-                using (var reader = new StreamReader(stream))
-                {
-                    var body = reader.ReadToEnd();
-                    return body;
-                }
-            }
-        }
-    
-        public static void LogError(this ILog log,HttpWebResponse response)
-        {
-            if (log.IsDebugEnabled)
-            {
-                if (response != null)
-                {
-                    var body = response.ReadToEnd();
-                    log.ErrorFormat("Headers:\n{0}\nBody:{1}", response.Headers,body);
-                }
-            }
-        }
-
-        public static TextReader GetLoggedReader(this Stream stream, ILog log)
-        {
-            var reader = new StreamReader(stream);
-            if (!log.IsDebugEnabled)
-                return reader;
-            
-            using (reader)
-            {
-                var body = reader.ReadToEnd();
-                log.DebugFormat("JSON response: {0}", body);
-                return new StringReader(body);
-            }
-        }
-    }
-}
+using System;\r
+using System.Collections.Generic;\r
+using System.ComponentModel;\r
+using System.Diagnostics.Contracts;\r
+using System.Linq;\r
+using System.Net.Http;\r
+using System.Net.Http.Headers;\r
+using System.Reflection;\r
+using System.Text;\r
+using System.Net;\r
+using System.IO;\r
+using System.Threading;\r
+using System.Threading.Tasks;\r
+using log4net;\r
+using System.ServiceModel.Channels;\r
+\r
+namespace Pithos.Network\r
+{\r
+    public static class WebExtensions\r
+    {\r
+        private static readonly ILog Log = LogManager.GetLogger(MethodBase.GetCurrentMethod().DeclaringType);\r
+\r
+        public static string ReadToEnd(this HttpWebResponse response)\r
+        {\r
+            using (var stream = response.GetResponseStream())\r
+            {\r
+                if (stream == null)\r
+                    return null;\r
+                using (var reader = new StreamReader(stream))\r
+                {\r
+                    var body = reader.ReadToEnd();\r
+                    return body;\r
+                }\r
+            }\r
+        }\r
+\r
+        public static void LogError(this ILog log, HttpWebResponse response)\r
+        {\r
+            if (log.IsDebugEnabled)\r
+            {\r
+                if (response != null)\r
+                {\r
+                    var body = response.ReadToEnd();\r
+                    log.ErrorFormat("Headers:\n{0}\nBody:{1}", response.Headers, body);\r
+                }\r
+            }\r
+        }\r
+\r
+        public static TextReader GetLoggedReader(this Stream stream, ILog log)\r
+        {\r
+            var reader = new StreamReader(stream);\r
+            if (!log.IsDebugEnabled)\r
+                return reader;\r
+\r
+            using (reader)\r
+            {\r
+                var body = reader.ReadToEnd();\r
+                log.DebugFormat("JSON response: {0}", body);\r
+                return new StringReader(body);\r
+            }\r
+        }\r
+\r
+\r
+        public static IEnumerable<T> Range<T>(this IList<T> list, int start, int end)\r
+        {\r
+            Contract.Requires(start >= 0);\r
+            Contract.Requires(end < list.Count);\r
+            Contract.Requires(start <= end);\r
+\r
+            if (list == null)\r
+                yield break;\r
+\r
+            for (var i = 0; i <= end; i++)\r
+            {\r
+                yield return list[i];\r
+            }\r
+\r
+        }\r
+\r
+        public static Task<byte[]> UploadDataTaskAsync(this WebClient webClient, Uri address, string method, byte[] data, CancellationToken cancellationToken, IProgress<UploadProgressChangedEventArgs> progress)\r
+        {\r
+            var tcs = new TaskCompletionSource<byte[]>(address);\r
+            if (cancellationToken.IsCancellationRequested)\r
+            {\r
+                tcs.TrySetCanceled();\r
+            }\r
+            else\r
+            {\r
+                CancellationTokenRegistration ctr = cancellationToken.Register(()=>\r
+                {\r
+                    webClient.CancelAsync();\r
+                });\r
+                UploadDataCompletedEventHandler completedHandler = null;\r
+                UploadProgressChangedEventHandler progressHandler = null;\r
+                if (progress != null)\r
+                    progressHandler = (s, e) => PithosEAPCommon.HandleProgress(tcs, e, () => e, progress);\r
+                completedHandler =(sender, e) =>PithosEAPCommon.HandleCompletion(tcs, true, e,() => e.Result,() =>\r
+                { \r
+                    ctr.Dispose();\r
+                    webClient.UploadDataCompleted -= completedHandler;\r
+                    webClient.UploadProgressChanged -= progressHandler;\r
+                });\r
+                webClient.UploadDataCompleted += completedHandler;\r
+                webClient.UploadProgressChanged += progressHandler;\r
+                try\r
+                {\r
+                    webClient.UploadDataAsync(address, method, data, tcs);\r
+                    if (cancellationToken.IsCancellationRequested)\r
+                        webClient.CancelAsync();\r
+                }\r
+                catch\r
+                {\r
+                    webClient.UploadDataCompleted -= completedHandler;\r
+                    webClient.UploadProgressChanged -= progressHandler;\r
+                    throw;\r
+                }\r
+            }\r
+            return tcs.Task;\r
+        }\r
+\r
+        public static async Task<T> WithRetries<T>(this Func<Task<T>> func, int retries)\r
+        {\r
+            while (retries > 0)\r
+            {\r
+                try\r
+                {\r
+                    var result = await func().ConfigureAwait(false);\r
+                    return result;\r
+                }\r
+                catch (Exception exc)\r
+                {\r
+                    if (--retries == 0)\r
+                        throw new RetryException("Failed too many times", exc);\r
+                }\r
+            }\r
+            throw new RetryException();\r
+        }\r
+\r
+        public static async Task<HttpResponseMessage> WithRetriesForWeb(this Func<Task<HttpResponseMessage>> func, int retries)\r
+        {\r
+            var waitTime = TimeSpan.FromSeconds(10);\r
+            var acceptedCodes = new[] { HttpStatusCode.Moved, HttpStatusCode.MovedPermanently, HttpStatusCode.Found, HttpStatusCode.Redirect,HttpStatusCode.SeeOther,\r
+                HttpStatusCode.RedirectMethod,HttpStatusCode.NotModified,HttpStatusCode.TemporaryRedirect,HttpStatusCode.RedirectKeepVerb};\r
+            while (retries > 0)\r
+            {\r
+                var result = await func().ConfigureAwait(false);\r
+                if (result.IsSuccessStatusCode || acceptedCodes.Contains(result.StatusCode))\r
+                    return result;\r
+                    \r
+                if (--retries == 0)\r
+                    throw new RetryException("Failed too many times");\r
+\r
+                //Wait for service unavailable\r
+                if (result.StatusCode == HttpStatusCode.ServiceUnavailable)\r
+                {\r
+                    Log.InfoFormat("[UNAVAILABLE] Waiting before retry: {0}",result.ReasonPhrase);\r
+                    await TaskEx.Delay(waitTime).ConfigureAwait(false);\r
+                    //increase the timeout for repeated timeouts\r
+                    if (waitTime<TimeSpan.FromSeconds(10))\r
+                        waitTime = waitTime.Add(TimeSpan.FromSeconds(10));\r
+                }                \r
+                //Throw in all other cases\r
+                else \r
+                    result.EnsureSuccessStatusCode();\r
+            }\r
+            throw new RetryException();\r
+        }\r
+\r
+\r
+        public static async Task<string> GetStringAsyncWithRetries(this HttpClient client, Uri requestUri, int retries,DateTime? since=null)\r
+        {                        \r
+            var request = new HttpRequestMessage(HttpMethod.Get, requestUri);            \r
+            if (since.HasValue)\r
+            {\r
+                request.Headers.IfModifiedSince = since.Value;\r
+            }\r
+            //Func<Task<HttpResponseMessage>> call = () => _baseHttpClient.SendAsync(request);\r
+            using (var response = await client.SendAsyncWithRetries(request,3).ConfigureAwait(false))\r
+            {\r
+                if (response.StatusCode == HttpStatusCode.NoContent)\r
+                    return String.Empty;\r
+\r
+                var content = await response.Content.ReadAsStringAsync().ConfigureAwait(false);\r
+                return content;\r
+            }\r
+\r
+        }\r
+\r
+        public static Task<HttpResponseMessage> HeadAsyncWithRetries(this HttpClient client, Uri requestUri, int retries,bool acceptNotFound=false)\r
+        {\r
+            return client.HeadAsyncWithRetries(requestUri, retries, acceptNotFound,HttpCompletionOption.ResponseContentRead, CancellationToken.None);\r
+        }\r
+\r
+        public static Task<HttpResponseMessage> HeadAsyncWithRetries(this HttpClient client, Uri requestUri, int retries, bool acceptNotFound,HttpCompletionOption completionOption, CancellationToken cancellationToken)\r
+        {\r
+            return client.SendAsyncWithRetries(new HttpRequestMessage(HttpMethod.Head, requestUri), retries, acceptNotFound,completionOption, cancellationToken);\r
+        }\r
+\r
+        public static Task<HttpResponseMessage> GetAsyncWithRetries(this HttpClient client, Uri requestUri, int retries)\r
+        {\r
+            return client.GetAsyncWithRetries(requestUri, retries, HttpCompletionOption.ResponseContentRead, CancellationToken.None);\r
+        }\r
+\r
+        public static Task<HttpResponseMessage> GetAsyncWithRetries(this HttpClient client, Uri requestUri, int retries, HttpCompletionOption completionOption, CancellationToken cancellationToken)\r
+        {\r
+            return client.SendAsyncWithRetries(new HttpRequestMessage(HttpMethod.Get, requestUri), retries, false,completionOption, cancellationToken);\r
+        }\r
+\r
+\r
+        public static Task<HttpResponseMessage> SendAsyncWithRetries(this HttpClient client,HttpRequestMessage message, int retries)\r
+        {\r
+            return client.SendAsyncWithRetries(message, retries,false, HttpCompletionOption.ResponseContentRead, CancellationToken.None);\r
+        }\r
+\r
+        public static async Task<HttpResponseMessage> SendAsyncWithRetries(this HttpClient client,HttpRequestMessage message, int retries,bool acceptNotFound,HttpCompletionOption completionOption, CancellationToken cancellationToken)\r
+        {\r
+            var waitTime = TimeSpan.FromSeconds(10);\r
+            var acceptedCodes =acceptNotFound\r
+                ? new[] {HttpStatusCode.NotFound, HttpStatusCode.Moved, HttpStatusCode.MovedPermanently, HttpStatusCode.Found, HttpStatusCode.Redirect,HttpStatusCode.SeeOther,\r
+                HttpStatusCode.RedirectMethod,HttpStatusCode.NotModified,HttpStatusCode.TemporaryRedirect,HttpStatusCode.RedirectKeepVerb,HttpStatusCode.Conflict}\r
+                : new[] { HttpStatusCode.Moved, HttpStatusCode.MovedPermanently, HttpStatusCode.Found, HttpStatusCode.Redirect,HttpStatusCode.SeeOther,\r
+                HttpStatusCode.RedirectMethod,HttpStatusCode.NotModified,HttpStatusCode.TemporaryRedirect,HttpStatusCode.RedirectKeepVerb,HttpStatusCode.Conflict};\r
+            \r
+                \r
+            while (retries > 0)\r
+            {\r
+                var timedOut = false;\r
+                if (Log.IsDebugEnabled)\r
+                    Log.DebugFormat("[REQUEST] {0}", message);\r
+                HttpResponseMessage result=null;\r
+                \r
+                \r
+                \r
+                try\r
+                {\r
+                    result = await client.SendAsync(message, completionOption, cancellationToken).ConfigureAwait(false);\r
+                }\r
+                catch (WebException exc)\r
+                {\r
+                    if (exc.Status != WebExceptionStatus.Timeout)\r
+                        throw;\r
+                    timedOut = true;\r
+                    if (Log.IsDebugEnabled)\r
+                        Log.DebugFormat("[RESPONSE] [{0}]:[{1}] FAIL WITH TIMEOUT", message.Method, message.RequestUri);\r
+                }\r
+                catch(TaskCanceledException exc)\r
+                {\r
+                    //If the task was cancelled due to a timeout, retry it\r
+                    if (!exc.CancellationToken.IsCancellationRequested)\r
+                    {\r
+                        timedOut = true;\r
+                        if (Log.IsDebugEnabled)\r
+                            Log.DebugFormat("[RESPONSE] [{0}]:[{1}] FAIL WITH TIMEOUT", message.Method, message.RequestUri);\r
+                    }\r
+                    else\r
+                    {\r
+                        throw;\r
+                    }\r
+                }\r
+                catch (Exception exc)\r
+                {\r
+                    Log.FatalFormat("Unexpected error while sending:\n{0}\n{1}", message, exc);\r
+                    throw;\r
+                }\r
+\r
+                if (timedOut)\r
+                {\r
+                    if (--retries == 0)\r
+                        throw new RetryException("Failed too many times");\r
+                    continue;\r
+                }\r
+\r
+                if (result.IsSuccessStatusCode || acceptedCodes.Contains(result.StatusCode))\r
+                {\r
+                    if (Log.IsDebugEnabled)\r
+                        Log.DebugFormat("[RESPONSE] [{0}]:[{1}] OK: [{2}]", message.Method, message.RequestUri,\r
+                                        result.StatusCode);\r
+                    return result;\r
+                }\r
+                //Failed, will have to abort or retry\r
+                if (Log.IsDebugEnabled)\r
+                    Log.DebugFormat("[RESPONSE] [{0}]:[{1}] FAIL: [{2}]\n{3}", message.Method, message.RequestUri,\r
+                                    result.StatusCode, result);\r
+\r
+                if (--retries == 0)\r
+                    throw new RetryException("Failed too many times");\r
+\r
+                //Wait for service unavailable\r
+                if (result.StatusCode == HttpStatusCode.ServiceUnavailable ||\r
+                    result.StatusCode == HttpStatusCode.BadGateway)\r
+                {\r
+\r
+                    Log.WarnFormat("[UNAVAILABLE] Waiting before retrying [{0}]:[{1}] due to [{2}]", message.Method,\r
+                                   message.RequestUri, result.ReasonPhrase);\r
+                    await TaskEx.Delay(waitTime).ConfigureAwait(false);\r
+                    //increase the timeout for repeated timeouts\r
+                    if (waitTime < TimeSpan.FromSeconds(10))\r
+                        waitTime = waitTime.Add(TimeSpan.FromSeconds(10));\r
+                }\r
+                    //Throw in all other cases\r
+                else\r
+                    result.EnsureSuccessStatusCode();\r
+            }\r
+            throw new RetryException();\r
+        }\r
+\r
+        public static string GetFirstValue(this HttpResponseHeaders headers, string name)\r
+        {\r
+            if (headers==null)\r
+                throw new ArgumentNullException("headers");\r
+            if (String.IsNullOrWhiteSpace(name))\r
+                throw new ArgumentNullException("name");\r
+            Contract.EndContractBlock();\r
+\r
+            IEnumerable<string> values;\r
+            if (headers.TryGetValues(name, out values))\r
+            {\r
+                return values.FirstOrDefault();\r
+            }\r
+            return null;\r
+        }\r
+\r
+        public static  Dictionary<string, string> GetMeta(this HttpResponseHeaders headers,string metaPrefix)\r
+        {\r
+            if (headers == null)\r
+                throw new ArgumentNullException("headers");\r
+            if (String.IsNullOrWhiteSpace(metaPrefix))\r
+                throw new ArgumentNullException("metaPrefix");\r
+            Contract.EndContractBlock();\r
+\r
+            var dict = (from header in headers\r
+                        where header.Key.StartsWith(metaPrefix)\r
+                        let name = header.Key.Substring(metaPrefix.Length)\r
+                        select new { Name = name, Value = String.Join(",",header.Value) })\r
+                        .ToDictionary(t => t.Name, t => t.Value);\r
+            return dict;\r
+        }\r
+\r
+    }\r
+\r
+    internal static class PithosEAPCommon\r
+    {\r
+        internal static void HandleProgress<T, E>(TaskCompletionSource<T> tcs, ProgressChangedEventArgs eventArgs, Func<E> getProgress, IProgress<E> callback)\r
+        {\r
+            if (eventArgs.UserState != tcs)\r
+                return;\r
+            callback.Report(getProgress());\r
+        }\r
+\r
+        internal static void HandleCompletion<T>(TaskCompletionSource<T> tcs, bool requireMatch, AsyncCompletedEventArgs e, Func<T> getResult, Action unregisterHandler)\r
+        {\r
+            if (requireMatch)\r
+            {\r
+                if (e.UserState != tcs)\r
+                    return;\r
+            }\r
+            try\r
+            {\r
+                unregisterHandler();\r
+            }\r
+            finally\r
+            {\r
+                if (e.Cancelled)\r
+                    tcs.TrySetCanceled();\r
+                else if (e.Error != null)\r
+                    tcs.TrySetException(e.Error);\r
+                else\r
+                    tcs.TrySetResult(getResult());\r
+            }\r
+        }\r
+    }\r
+\r
+}\r