Removed Dataflow code
[pithos-ms-client] / trunk / Pithos.Network / RestClient.cs
1 // -----------------------------------------------------------------------
2 // <copyright file="RestClient.cs" company="Microsoft">
3 // TODO: Update copyright text.
4 // </copyright>
5 // -----------------------------------------------------------------------
6
7 using System.Collections.Specialized;
8 using System.Diagnostics;
9 using System.Diagnostics.Contracts;
10 using System.IO;
11 using System.Net;
12 using System.Runtime.Serialization;
13 using System.Threading.Tasks;
14 using log4net;
15
16
17 namespace Pithos.Network
18 {
19     using System;
20     using System.Collections.Generic;
21     using System.Linq;
22     using System.Text;
23
24     /// <summary>
25     /// TODO: Update summary.
26     /// </summary>
27     public class RestClient:WebClient
28     {
29         public int Timeout { get; set; }
30
31         public bool TimedOut { get; set; }
32
33         public HttpStatusCode StatusCode { get; private set; }
34
35         public string StatusDescription { get; set; }
36
37         public long? RangeFrom { get; set; }
38         public long? RangeTo { get; set; }
39
40         public int Retries { get; set; }
41
42         private readonly Dictionary<string, string> _parameters=new Dictionary<string, string>();
43         public Dictionary<string, string> Parameters
44         {
45             get
46             {
47                 Contract.Ensures(_parameters!=null);
48                 return _parameters;
49             }            
50         }
51
52         private static readonly ILog Log = LogManager.GetLogger("RestClient");
53
54
55         [ContractInvariantMethod]
56         private void Invariants()
57         {
58             Contract.Invariant(Headers!=null);    
59         }
60
61         public RestClient():base()
62         {
63             
64         }
65
66        
67         public RestClient(RestClient other)
68             : base()
69         {
70             if (other==null)
71                 throw new ArgumentNullException("other");
72             Contract.EndContractBlock();
73
74             CopyHeaders(other);
75             Timeout = other.Timeout;
76             Retries = other.Retries;
77             BaseAddress = other.BaseAddress;             
78
79             foreach (var parameter in other.Parameters)
80             {
81                 Parameters.Add(parameter.Key,parameter.Value);
82             }
83
84             this.Proxy = other.Proxy;
85         }
86
87
88         protected override WebRequest GetWebRequest(Uri address)
89         {
90             TimedOut = false;
91             var webRequest = base.GetWebRequest(address);            
92             var request = (HttpWebRequest)webRequest;
93             request.ServicePoint.ConnectionLimit = 10;
94             if (IfModifiedSince.HasValue)
95                 request.IfModifiedSince = IfModifiedSince.Value;
96             request.AutomaticDecompression = DecompressionMethods.Deflate | DecompressionMethods.GZip;
97             if(Timeout>0)
98                 request.Timeout = Timeout;
99
100             if (RangeFrom.HasValue)
101             {
102                 if (RangeTo.HasValue)
103                     request.AddRange(RangeFrom.Value, RangeTo.Value);
104                 else
105                     request.AddRange(RangeFrom.Value);
106             }
107             return request; 
108         }
109
110         public DateTime? IfModifiedSince { get; set; }
111
112         //Asynchronous version
113         protected override WebResponse GetWebResponse(WebRequest request, IAsyncResult result)
114         {
115             Log.InfoFormat("ASYNC [{0}] {1}",request.Method, request.RequestUri);
116             HttpWebResponse response = null;
117
118             try
119             {
120                 response = (HttpWebResponse)base.GetWebResponse(request, result);
121             }
122             catch (WebException exc)
123             {
124                 if (!TryGetResponse(exc, out response))
125                     throw;
126             }
127
128             StatusCode = response.StatusCode;
129             LastModified = response.LastModified;
130             StatusDescription = response.StatusDescription;
131             return response;
132
133         }
134       
135
136         //Synchronous version
137         protected override WebResponse GetWebResponse(WebRequest request)
138         {
139             HttpWebResponse response = null;
140             try
141             {                                
142                 response = (HttpWebResponse)base.GetWebResponse(request);
143             }
144             catch (WebException exc)
145             {
146                 if (!TryGetResponse(exc, out response))
147                     throw;
148             }
149
150             StatusCode = response.StatusCode;
151             LastModified = response.LastModified;
152             StatusDescription = response.StatusDescription;
153             return response;
154         }
155
156         private bool TryGetResponse(WebException exc, out HttpWebResponse response)
157         {
158             response = null;
159             //Fail on empty response
160             if (exc.Response == null)
161                 return false;
162
163             response = (exc.Response as HttpWebResponse);
164             //Succeed on allowed status codes
165             if (AllowedStatusCodes.Contains(response.StatusCode))
166                 return true;
167
168             //Does the response have any content to log?
169             if (exc.Response.ContentLength > 0)
170             {
171                 var content = LogContent(exc.Response);
172                 Log.ErrorFormat(content);
173             }
174             return false;
175         }
176
177         private readonly List<HttpStatusCode> _allowedStatusCodes=new List<HttpStatusCode>{HttpStatusCode.NotModified};        
178
179         public List<HttpStatusCode> AllowedStatusCodes
180         {
181             get
182             {
183                 return _allowedStatusCodes;
184             }            
185         }
186
187         public DateTime LastModified { get; private set; }
188
189         private static string LogContent(WebResponse webResponse)
190         {
191             if (webResponse == null)
192                 throw new ArgumentNullException("webResponse");
193             Contract.EndContractBlock();
194
195             //The response stream must be copied to avoid affecting other code by disposing of the 
196             //original response stream.
197             var stream = webResponse.GetResponseStream();            
198             using(var memStream=new MemoryStream((int) stream.Length))
199             using (var reader = new StreamReader(memStream))
200             {
201                 stream.CopyTo(memStream);                
202                 string content = reader.ReadToEnd();
203
204                 stream.Seek(0,SeekOrigin.Begin);
205                 return content;
206             }
207         }
208
209         public string DownloadStringWithRetry(string address,int retries=0)
210         {
211             
212             if (address == null)
213                 throw new ArgumentNullException("address");
214
215             var actualAddress = GetActualAddress(address);
216
217             TraceStart("GET",actualAddress);            
218             
219             var actualRetries = (retries == 0) ? Retries : retries;
220
221             var uriString = String.Join("/", BaseAddress.TrimEnd('/'), actualAddress);
222
223             var task = Retry(() =>
224             {                
225                 var content = base.DownloadString(uriString);
226
227                 if (StatusCode == HttpStatusCode.NoContent)
228                     return String.Empty;
229                 return content;
230
231             }, actualRetries);
232
233             var result = task.Result;
234             return result;
235         }
236
237         public void Head(string address,int retries=0)
238         {
239             AllowedStatusCodes.Add(HttpStatusCode.NotFound);
240             RetryWithoutContent(address, retries, "HEAD");
241         }
242
243         public void PutWithRetry(string address, int retries = 0)
244         {
245             RetryWithoutContent(address, retries, "PUT");
246         }
247
248         public void DeleteWithRetry(string address,int retries=0)
249         {
250             RetryWithoutContent(address, retries, "DELETE");
251         }
252
253         public string GetHeaderValue(string headerName,bool optional=false)
254         {
255             if (this.ResponseHeaders==null)
256                 throw new InvalidOperationException("ResponseHeaders are null");
257             Contract.EndContractBlock();
258
259             var values=this.ResponseHeaders.GetValues(headerName);
260             if (values != null)
261                 return values[0];
262
263             if (optional)            
264                 return null;            
265             //A required header was not found
266             throw new WebException(String.Format("The {0}  header is missing", headerName));
267         }
268
269         public void SetNonEmptyHeaderValue(string headerName, string value)
270         {
271             if (String.IsNullOrWhiteSpace(value))
272                 return;
273             Headers.Add(headerName,value);
274         }
275
276         private void RetryWithoutContent(string address, int retries, string method)
277         {
278             if (address == null)
279                 throw new ArgumentNullException("address");
280
281             var actualAddress = GetActualAddress(address);            
282             var actualRetries = (retries == 0) ? Retries : retries;
283
284             var task = Retry(() =>
285             {
286                 var uriString = String.Join("/",BaseAddress ,actualAddress);
287                 var uri = new Uri(uriString);
288                 var request =  GetWebRequest(uri);
289                 request.Method = method;
290                 if (ResponseHeaders!=null)
291                     ResponseHeaders.Clear();
292
293                 TraceStart(method, uriString);
294                 if (method == "PUT")
295                     request.ContentLength = 0;
296
297                 //Have to use try/finally instead of using here, because WebClient needs a valid WebResponse object
298                 //in order to return response headers
299                 var response = (HttpWebResponse)GetWebResponse(request);
300                 try
301                 {
302                     LastModified = response.LastModified;
303                     StatusCode = response.StatusCode;
304                     StatusDescription = response.StatusDescription;
305                 }
306                 finally
307                 {
308                     response.Close();
309                 }
310                 
311
312                 return 0;
313             }, actualRetries);
314
315             try
316             {
317                 task.Wait();
318             }
319             catch (AggregateException ex)
320             {
321                 var exc = ex.InnerException;
322                 if (exc is RetryException)
323                 {
324                     Log.ErrorFormat("[{0}] RETRY FAILED for {1} after {2} retries",method,address,retries);
325                 }
326                 else
327                 {
328                     Log.ErrorFormat("[{0}] FAILED for {1} with \n{2}", method, address, exc);
329                 }
330                 throw exc;
331
332             }
333             catch(Exception ex)
334             {
335                 Log.ErrorFormat("[{0}] FAILED for {1} with \n{2}", method, address, ex);
336                 throw;
337             }
338         }
339         
340         private static void TraceStart(string method, string actualAddress)
341         {
342             Log.InfoFormat("[{0}] {1} {2}", method, DateTime.Now, actualAddress);
343         }
344
345         private string GetActualAddress(string address)
346         {
347             if (Parameters.Count == 0)
348                 return address;
349             var addressBuilder=new StringBuilder(address);            
350
351             bool isFirst = true;
352             foreach (var parameter in Parameters)
353             {
354                 if(isFirst)
355                     addressBuilder.AppendFormat("?{0}={1}", parameter.Key, parameter.Value);
356                 else
357                     addressBuilder.AppendFormat("&{0}={1}", parameter.Key, parameter.Value);
358                 isFirst = false;
359             }
360             return addressBuilder.ToString();
361         }
362
363         public string DownloadStringWithRetry(Uri address,int retries=0)
364         {
365             if (address == null)
366                 throw new ArgumentNullException("address");
367
368             var actualRetries = (retries == 0) ? Retries : retries;            
369             var task = Retry(() =>
370             {
371                 var content = base.DownloadString(address);
372
373                 if (StatusCode == HttpStatusCode.NoContent)
374                     return String.Empty;
375                 return content;
376
377             }, actualRetries);
378
379             var result = task.Result;
380             return result;
381         }
382
383       
384         /// <summary>
385         /// Copies headers from another RestClient
386         /// </summary>
387         /// <param name="source">The RestClient from which the headers are copied</param>
388         public void CopyHeaders(RestClient source)
389         {
390             if (source == null)
391                 throw new ArgumentNullException("source", "source can't be null");
392             Contract.EndContractBlock();
393             //The Headers getter initializes the property, it is never null
394             Contract.Assume(Headers!=null);
395                 
396             CopyHeaders(source.Headers,Headers);
397         }
398         
399         /// <summary>
400         /// Copies headers from one header collection to another
401         /// </summary>
402         /// <param name="source">The source collection from which the headers are copied</param>
403         /// <param name="target">The target collection to which the headers are copied</param>
404         public static void CopyHeaders(WebHeaderCollection source,WebHeaderCollection target)
405         {
406             if (source == null)
407                 throw new ArgumentNullException("source", "source can't be null");
408             if (target == null)
409                 throw new ArgumentNullException("target", "target can't be null");
410             Contract.EndContractBlock();
411
412             for (int i = 0; i < source.Count; i++)
413             {
414                 target.Add(source.GetKey(i), source[i]);
415             }            
416         }
417
418         public void AssertStatusOK(string message)
419         {
420             if (StatusCode >= HttpStatusCode.BadRequest)
421                 throw new WebException(String.Format("{0} with code {1} - {2}", message, StatusCode, StatusDescription));
422         }
423
424
425         private Task<T> Retry<T>(Func<T> original, int retryCount, TaskCompletionSource<T> tcs = null)
426         {
427             if (original==null)
428                 throw new ArgumentNullException("original");
429             Contract.EndContractBlock();
430
431             if (tcs == null)
432                 tcs = new TaskCompletionSource<T>();
433             Task.Factory.StartNew(original).ContinueWith(_original =>
434                 {
435                     if (!_original.IsFaulted)
436                         tcs.SetFromTask(_original);
437                     else 
438                     {
439                         var e = _original.Exception.InnerException;
440                         var we = (e as WebException);
441                         if (we==null)
442                             tcs.SetException(e);
443                         else
444                         {
445                             var statusCode = GetStatusCode(we);
446
447                             //Return null for 404
448                             if (statusCode == HttpStatusCode.NotFound)
449                                 tcs.SetResult(default(T));
450                             //Retry for timeouts and service unavailable
451                             else if (we.Status == WebExceptionStatus.Timeout ||
452                                 (we.Status == WebExceptionStatus.ProtocolError && statusCode == HttpStatusCode.ServiceUnavailable))
453                             {
454                                 TimedOut = true;
455                                 if (retryCount == 0)
456                                 {                                    
457                                     Log.ErrorFormat("[ERROR] Timed out too many times. \n{0}\n",e);
458                                     tcs.SetException(new RetryException("Timed out too many times.", e));                                    
459                                 }
460                                 else
461                                 {
462                                     Log.ErrorFormat(
463                                         "[RETRY] Timed out after {0} ms. Will retry {1} more times\n{2}", Timeout,
464                                         retryCount, e);
465                                     Retry(original, retryCount - 1, tcs);
466                                 }
467                             }
468                             else
469                                 tcs.SetException(e);
470                         }
471                     };
472                 });
473             return tcs.Task;
474         }
475
476         private HttpStatusCode GetStatusCode(WebException we)
477         {
478             if (we==null)
479                 throw new ArgumentNullException("we");
480             var statusCode = HttpStatusCode.RequestTimeout;
481             if (we.Response != null)
482             {
483                 statusCode = ((HttpWebResponse) we.Response).StatusCode;
484                 this.StatusCode = statusCode;
485             }
486             return statusCode;
487         }
488
489         public UriBuilder GetAddressBuilder(string container, string objectName)
490         {
491             var builder = new UriBuilder(String.Join("/", BaseAddress, container, objectName));
492             return builder;
493         }
494
495         public Dictionary<string, string> GetMeta(string metaPrefix)
496         {
497             if (String.IsNullOrWhiteSpace(metaPrefix))
498                 throw new ArgumentNullException("metaPrefix");
499             Contract.EndContractBlock();
500
501             var keys = ResponseHeaders.AllKeys.AsQueryable();
502             var dict = (from key in keys
503                         where key.StartsWith(metaPrefix)
504                         let name = key.Substring(metaPrefix.Length)
505                         select new { Name = name, Value = ResponseHeaders[key] })
506                         .ToDictionary(t => t.Name, t => t.Value);
507             return dict;
508         }
509     }
510
511     public class RetryException:Exception
512     {
513         public RetryException()
514             :base()
515         {
516             
517         }
518
519         public RetryException(string message)
520             :base(message)
521         {
522             
523         }
524
525         public RetryException(string message,Exception innerException)
526             :base(message,innerException)
527         {
528             
529         }
530
531         public RetryException(SerializationInfo info,StreamingContext context)
532             :base(info,context)
533         {
534             
535         }
536     }
537 }