Statistics
| Branch: | Revision:

root / trunk / Pithos.Network / WebExtensions.cs @ dd5f5163

History | View | Annotate | Download (19.1 kB)

1
using System;
2
using System.Collections.Generic;
3
using System.ComponentModel;
4
using System.Diagnostics.Contracts;
5
using System.Linq;
6
using System.Net.Http;
7
using System.Net.Http.Headers;
8
using System.Reflection;
9
using System.Text;
10
using System.Net;
11
using System.IO;
12
using System.Threading;
13
using System.Threading.Tasks;
14
using Pithos.Interfaces;
15
using log4net;
16
using System.ServiceModel.Channels;
17

    
18
namespace Pithos.Network
19
{
20
    public static class WebExtensions
21
    {
22
        private static readonly ILog Log = LogManager.GetLogger(MethodBase.GetCurrentMethod().DeclaringType);
23

    
24
        public static string ReadToEnd(this HttpWebResponse response)
25
        {
26
            using (var stream = response.GetResponseStream())
27
            {
28
                if (stream == null)
29
                    return null;
30
                using (var reader = new StreamReader(stream))
31
                {
32
                    var body = reader.ReadToEnd();
33
                    return body;
34
                }
35
            }
36
        }
37

    
38
        public static void LogError(this ILog log, HttpWebResponse response)
39
        {
40
            if (log.IsDebugEnabled)
41
            {
42
                if (response != null)
43
                {
44
                    var body = response.ReadToEnd();
45
                    log.ErrorFormat("Headers:\n{0}\nBody:{1}", response.Headers, body);
46
                }
47
            }
48
        }
49

    
50
        public static TextReader GetLoggedReader(this Stream stream, ILog log)
51
        {
52
            var reader = new StreamReader(stream);
53
            if (!log.IsDebugEnabled)
54
                return reader;
55

    
56
            using (reader)
57
            {
58
                var body = reader.ReadToEnd();
59
                log.DebugFormat("JSON response: {0}", body);
60
                return new StringReader(body);
61
            }
62
        }
63

    
64

    
65
        public static IEnumerable<T> Range<T>(this IList<T> list, int start, int end)
66
        {
67
            Contract.Requires(start >= 0);
68
            Contract.Requires(end < list.Count);
69
            Contract.Requires(start <= end);
70

    
71
            if (list == null)
72
                yield break;
73

    
74
            for (var i = 0; i <= end; i++)
75
            {
76
                yield return list[i];
77
            }
78

    
79
        }
80

    
81
        public static Task<byte[]> UploadDataTaskAsync(this WebClient webClient, Uri address, string method, byte[] data, CancellationToken cancellationToken, IProgress<UploadProgressChangedEventArgs> progress)
82
        {
83
            var tcs = new TaskCompletionSource<byte[]>(address);
84
            if (cancellationToken.IsCancellationRequested)
85
            {
86
                tcs.TrySetCanceled();
87
            }
88
            else
89
            {
90
                CancellationTokenRegistration ctr = cancellationToken.Register(()=>
91
                {
92
                    webClient.CancelAsync();
93
                });
94
                UploadDataCompletedEventHandler completedHandler = null;
95
                UploadProgressChangedEventHandler progressHandler = null;
96
                if (progress != null)
97
                    progressHandler = (s, e) => PithosEAPCommon.HandleProgress(tcs, e, () => e, progress);
98
                completedHandler =(sender, e) =>PithosEAPCommon.HandleCompletion(tcs, true, e,() => e.Result,() =>
99
                { 
100
                    ctr.Dispose();
101
                    webClient.UploadDataCompleted -= completedHandler;
102
                    webClient.UploadProgressChanged -= progressHandler;
103
                });
104
                webClient.UploadDataCompleted += completedHandler;
105
                webClient.UploadProgressChanged += progressHandler;
106
                try
107
                {
108
                    webClient.UploadDataAsync(address, method, data, tcs);
109
                    if (cancellationToken.IsCancellationRequested)
110
                        webClient.CancelAsync();
111
                }
112
                catch
113
                {
114
                    webClient.UploadDataCompleted -= completedHandler;
115
                    webClient.UploadProgressChanged -= progressHandler;
116
                    throw;
117
                }
118
            }
119
            return tcs.Task;
120
        }
121

    
122
        public static async Task<T> WithRetries<T>(this Func<Task<T>> func, int retries)
123
        {
124
            while (retries > 0)
125
            {
126
                try
127
                {
128
                    var result = await func().ConfigureAwait(false);
129
                    return result;
130
                }
131
                catch (Exception exc)
132
                {
133
                    if (--retries == 0)
134
                        throw new RetryException("Failed too many times", exc);
135
                }
136
            }
137
            throw new RetryException();
138
        }
139

    
140
        public static async Task<HttpResponseMessage> WithRetriesForWeb(this Func<Task<HttpResponseMessage>> func, int retries)
141
        {
142
            var waitTime = TimeSpan.FromSeconds(10);
143
            var acceptedCodes = new[] { HttpStatusCode.Moved, HttpStatusCode.MovedPermanently, HttpStatusCode.Found, HttpStatusCode.Redirect,HttpStatusCode.SeeOther,
144
                HttpStatusCode.RedirectMethod,HttpStatusCode.NotModified,HttpStatusCode.TemporaryRedirect,HttpStatusCode.RedirectKeepVerb};
145
            while (retries > 0)
146
            {
147
                var result = await func().ConfigureAwait(false);
148
                if (result.IsSuccessStatusCode || acceptedCodes.Contains(result.StatusCode))
149
                    return result;
150
                    
151
                if (--retries == 0)
152
                    throw new RetryException("Failed too many times");
153

    
154
                //Wait for service unavailable
155
                if (result.StatusCode == HttpStatusCode.ServiceUnavailable)
156
                {
157
                    Log.InfoFormat("[UNAVAILABLE] Waiting before retry: {0}",result.ReasonPhrase);
158
                    await TaskEx.Delay(waitTime).ConfigureAwait(false);
159
                    //increase the timeout for repeated timeouts
160
                    if (waitTime<TimeSpan.FromSeconds(10))
161
                        waitTime = waitTime.Add(TimeSpan.FromSeconds(10));
162
                }                
163
                //Throw in all other cases
164
                else 
165
                    result.EnsureSuccessStatusCode();
166
            }
167
            throw new RetryException();
168
        }
169

    
170

    
171
        public static async Task<string> GetStringAsyncWithRetries(this HttpClient client, Uri requestUri, int retries,DateTime? since=null)
172
        {                        
173
            var request = new HttpRequestMessage(HttpMethod.Get, requestUri);            
174
            if (since.HasValue)
175
            {
176
                request.Headers.IfModifiedSince = since.Value;
177
            }
178
            //Func<Task<HttpResponseMessage>> call = () => _baseHttpClient.SendAsync(request);
179
            using (var response = await client.SendAsyncWithRetries(request,3).ConfigureAwait(false))
180
            {
181
                if (response.StatusCode == HttpStatusCode.NoContent)
182
                    return String.Empty;
183

    
184
                var content = await response.Content.ReadAsStringAsync().ConfigureAwait(false);
185
                return content;
186
            }
187

    
188
        }
189

    
190
        public static Task<HttpResponseMessage> HeadAsyncWithRetries(this HttpClient client, Uri requestUri, int retries,bool acceptNotFound=false)
191
        {
192
            return client.HeadAsyncWithRetries(requestUri, retries, acceptNotFound,HttpCompletionOption.ResponseContentRead, CancellationToken.None);
193
        }
194

    
195
        public static Task<HttpResponseMessage> HeadAsyncWithRetries(this HttpClient client, Uri requestUri, int retries, bool acceptNotFound,HttpCompletionOption completionOption, CancellationToken cancellationToken)
196
        {
197
            return client.SendAsyncWithRetries(new HttpRequestMessage(HttpMethod.Head, requestUri), retries, acceptNotFound,completionOption, cancellationToken);
198
        }
199

    
200
        public static Task<HttpResponseMessage> GetAsyncWithRetries(this HttpClient client, Uri requestUri, int retries)
201
        {
202
            return client.GetAsyncWithRetries(requestUri, retries, HttpCompletionOption.ResponseContentRead, CancellationToken.None);
203
        }
204

    
205
        public static Task<HttpResponseMessage> GetAsyncWithRetries(this HttpClient client, Uri requestUri, int retries, HttpCompletionOption completionOption, CancellationToken cancellationToken)
206
        {
207
            return client.SendAsyncWithRetries(new HttpRequestMessage(HttpMethod.Get, requestUri), retries, false,completionOption, cancellationToken);
208
        }
209

    
210

    
211
        public static Task<HttpResponseMessage> SendAsyncWithRetries(this HttpClient client,HttpRequestMessage message, int retries)
212
        {
213
            return client.SendAsyncWithRetries(message, retries,false, HttpCompletionOption.ResponseContentRead, CancellationToken.None);
214
        }
215

    
216
        public static async Task<HttpResponseMessage> SendAsyncWithRetries(this HttpClient client,HttpRequestMessage message, int retries,bool acceptNotFound,HttpCompletionOption completionOption, CancellationToken cancellationToken)
217
        {
218
            var waitTime = TimeSpan.FromSeconds(10);
219
            var acceptedCodes = new HashSet<HttpStatusCode> { HttpStatusCode.Moved, HttpStatusCode.MovedPermanently, 
220
                HttpStatusCode.Found, HttpStatusCode.Redirect,HttpStatusCode.SeeOther,HttpStatusCode.RedirectMethod,
221
                HttpStatusCode.NotModified,HttpStatusCode.TemporaryRedirect,HttpStatusCode.RedirectKeepVerb,
222
                HttpStatusCode.Conflict};
223
            if (acceptNotFound)
224
                acceptedCodes.Add(HttpStatusCode.NotFound);
225
                
226
            while (retries > 0)
227
            {
228
                var timedOut = false;
229
                if (Log.IsDebugEnabled)
230
                    Log.DebugFormat("[REQUEST] {0}", message);
231
                HttpResponseMessage result=null;
232

    
233

    
234
                var msg = message.Clone();
235
                Exception innerException=null;
236
                try
237
                {
238
                    result = await client.SendAsync(msg, completionOption, cancellationToken).ConfigureAwait(false);
239
                    if (result.IsSuccessStatusCode || acceptedCodes.Contains(result.StatusCode))
240
                    {
241
                        if (Log.IsDebugEnabled)
242
                            Log.DebugFormat("[RESPONSE] [{0}]:[{1}] OK: [{2}]", msg.Method, msg.RequestUri,
243
                                            result.StatusCode);
244
                        return result;
245
                    }
246

    
247
                }
248
                catch (HttpRequestException exc)
249
                {
250
                    //A timeout or other error caused a failure without receiving a response from the server
251
                    if (Log.IsDebugEnabled)
252
                        Log.DebugFormat("[RESPONSE] [{0}]:[{1}] HTTP FAIL :\n{2}", msg.Method, msg.RequestUri,exc);
253
                    innerException = exc;
254
                }
255
                catch (WebException exc)
256
                {
257
                    if (exc.Status != WebExceptionStatus.Timeout)
258
                        throw;
259
                    timedOut = true;
260
                    if (Log.IsDebugEnabled)
261
                        Log.DebugFormat("[RESPONSE] [{0}]:[{1}] FAIL WITH TIMEOUT", msg.Method, msg.RequestUri);
262
                    innerException = exc;
263
                }
264
                catch(TaskCanceledException exc)
265
                {
266
                    //If the task was cancelled due to a timeout, retry it
267
                    if (!exc.CancellationToken.IsCancellationRequested)
268
                    {
269
                        timedOut = true;
270
                        if (Log.IsDebugEnabled)
271
                            Log.DebugFormat("[RESPONSE] [{0}]:[{1}] FAIL WITH TIMEOUT", msg.Method, msg.RequestUri);
272
                    }
273
                    else
274
                    {
275
                        throw;
276
                    }
277
                    innerException = exc;
278
                }
279
                catch (Exception exc)
280
                {
281
                    Log.FatalFormat("Unexpected error while sending:\n{0}\n{1}", msg, exc);
282
                    throw;
283
                }
284

    
285
                //Report the failure
286
                var resultStatus = result.NullSafe(s => s.StatusCode);
287

    
288
                if (Log.IsDebugEnabled)
289
                {                    
290
                    if (timedOut)
291
                        Log.DebugFormat("[TIMEOUT] [{0}]:[{1}]", msg.Method, msg.RequestUri);
292
                    else
293
                        Log.DebugFormat("[RESPONSE] [{0}]:[{1}] FAIL: [{2}]\n{3}", msg.Method, msg.RequestUri,
294
                                        resultStatus, result);
295
                }
296

    
297
                //Handle the failure
298

    
299
                if (--retries == 0)
300
                    throw new RetryException("Failed too many times");
301

    
302
                //Wait for service unavailable
303
                if (resultStatus == HttpStatusCode.ServiceUnavailable ||
304
                    resultStatus == HttpStatusCode.BadGateway ||
305
                    resultStatus == HttpStatusCode.InternalServerError )
306
                {
307

    
308
                    Log.WarnFormat("[UNAVAILABLE] Waiting before retrying [{0}]:[{1}] due to [{2}]", msg.Method,
309
                                   msg.RequestUri, result.ReasonPhrase);
310
                    await TaskEx.Delay(waitTime).ConfigureAwait(false);
311
                    //increase the timeout for repeated timeouts
312
                    if (waitTime < TimeSpan.FromSeconds(10))
313
                        waitTime = waitTime.Add(TimeSpan.FromSeconds(10));
314
                }                           
315
                //Throw in all other cases, unless there was a client-side timeout
316
                else if (!timedOut)
317
                    //Throw if there was no result (never received a reply)
318
                    if (result == null)
319
                        throw innerException ?? new RetryException();                        
320
                    else
321
                        //Otherwise force the exception that corresponds to the response
322
                        throw new HttpRequestWithStatusException(result.StatusCode,result.ReasonPhrase);
323
            }
324
            throw new RetryException();
325
        }
326

    
327
        public static HttpRequestMessage Clone(this HttpRequestMessage message)
328
        {
329
            var newMessage = new HttpRequestMessage(message.Method, message.RequestUri);
330
            foreach (var header in message.Headers)
331
            {
332
                newMessage.Headers.Add(header.Key, header.Value);
333
            }
334
            newMessage.Content = message.Content.Clone();
335

    
336
            foreach (var property in message.Properties)
337
            {
338
                newMessage.Properties.Add(property.Key, property.Value);
339
            }
340
            return newMessage;
341
        }
342

    
343
        public static HttpContent Clone(this HttpContent content)
344
        {
345
            if (content == null)
346
                return null;
347
            if (!(content is FileBlockContent || 
348
                content is ByteArrayContentWithProgress || 
349
                content is StringContent ||
350
                content is ByteArrayContent))
351
                throw new ArgumentException("content");
352

    
353
            HttpContent newContent=null;
354
            if (content is FileBlockContent)
355
            {
356
                newContent = (content as FileBlockContent).Clone();
357
            }
358
            else if (content is ByteArrayContentWithProgress)
359
            {
360
                newContent=(content as ByteArrayContentWithProgress).Clone();
361
            }
362
            else if (content is StringContent)
363
            {
364
                var fieldInfo = typeof(ByteArrayContent).GetField("content",
365
                                                                    BindingFlags.NonPublic | BindingFlags.Instance);
366
                var bytes = (byte[])fieldInfo.GetValue(content);
367
                var enc=Encoding.GetEncoding(content.Headers.ContentType.CharSet);
368
                var stringContent=enc.GetString(bytes);
369
                newContent = new StringContent(stringContent);
370
            }
371
            else if (content is ByteArrayContent)
372
            {
373
                var fieldInfo = typeof(ByteArrayContent).GetField("content",
374
                                                                    BindingFlags.NonPublic | BindingFlags.Instance);
375
                var bytes = (byte[])fieldInfo.GetValue(content);
376
                newContent = new ByteArrayContent(bytes);
377
            }
378

    
379

    
380
            foreach (var header in content.Headers)
381
            {
382
                if (!(header.Key == "Content-Type" && newContent.Headers.Contains(header.Key)))
383
                    newContent.Headers.Add(header.Key, header.Value);
384
            }
385
            return newContent;
386
        }
387

    
388
        public static string GetFirstValue(this HttpResponseHeaders headers, string name)
389
        {
390
            if (headers==null)
391
                throw new ArgumentNullException("headers");
392
            if (String.IsNullOrWhiteSpace(name))
393
                throw new ArgumentNullException("name");
394
            Contract.EndContractBlock();
395

    
396
            IEnumerable<string> values;
397
            if (headers.TryGetValues(name, out values))
398
            {
399
                return values.FirstOrDefault();
400
            }
401
            return null;
402
        }
403

    
404
        public static  Dictionary<string, string> GetMeta(this HttpResponseHeaders headers,string metaPrefix)
405
        {
406
            if (headers == null)
407
                throw new ArgumentNullException("headers");
408
            if (String.IsNullOrWhiteSpace(metaPrefix))
409
                throw new ArgumentNullException("metaPrefix");
410
            Contract.EndContractBlock();
411

    
412
            var dict = (from header in headers
413
                        where header.Key.StartsWith(metaPrefix)
414
                        let name = header.Key.Substring(metaPrefix.Length)
415
                        select new { Name = name, Value = String.Join(",",header.Value) })
416
                        .ToDictionary(t => t.Name, t => t.Value);
417
            return dict;
418
        }
419

    
420
    }
421

    
422
    internal static class PithosEAPCommon
423
    {
424
        internal static void HandleProgress<T, E>(TaskCompletionSource<T> tcs, ProgressChangedEventArgs eventArgs, Func<E> getProgress, IProgress<E> callback)
425
        {
426
            if (eventArgs.UserState != tcs)
427
                return;
428
            callback.Report(getProgress());
429
        }
430

    
431
        internal static void HandleCompletion<T>(TaskCompletionSource<T> tcs, bool requireMatch, AsyncCompletedEventArgs e, Func<T> getResult, Action unregisterHandler)
432
        {
433
            if (requireMatch)
434
            {
435
                if (e.UserState != tcs)
436
                    return;
437
            }
438
            try
439
            {
440
                unregisterHandler();
441
            }
442
            finally
443
            {
444
                if (e.Cancelled)
445
                    tcs.TrySetCanceled();
446
                else if (e.Error != null)
447
                    tcs.TrySetException(e.Error);
448
                else
449
                    tcs.TrySetResult(getResult());
450
            }
451
        }
452
    }
453

    
454
}