Extracet the job queue functionality to JobQueue.cs
authorPanagiotis Kanavos <pkanavos@gmail.com>
Thu, 15 Sep 2011 15:56:50 +0000 (18:56 +0300)
committerPanagiotis Kanavos <pkanavos@gmail.com>
Thu, 15 Sep 2011 15:56:50 +0000 (18:56 +0300)
Added a Retry function to PithosClient.cs.cs
Removed commented code from CloudFilesClient.cs

trunk/Pithos.Core/JobQueue.cs [new file with mode: 0644]
trunk/Pithos.Core/Pithos.Core.csproj
trunk/Pithos.Core/StatusKeeper.cs
trunk/Pithos.Network/CloudFilesClient.cs
trunk/Pithos.Network/PithosClient.cs

diff --git a/trunk/Pithos.Core/JobQueue.cs b/trunk/Pithos.Core/JobQueue.cs
new file mode 100644 (file)
index 0000000..78aa206
--- /dev/null
@@ -0,0 +1,52 @@
+// -----------------------------------------------------------------------
+// <copyright file="JobQueue.cs" company="Microsoft">
+// TODO: Update copyright text.
+// </copyright>
+// -----------------------------------------------------------------------
+
+using System.Collections.Concurrent;
+using System.Threading;
+using System.Threading.Tasks;
+
+namespace Pithos.Core
+{
+    using System;
+    using System.Collections.Generic;
+    using System.Linq;
+    using System.Text;
+
+    /// <summary>
+    /// TODO: Update summary.
+    /// </summary>
+    public class JobQueue
+    {
+        private readonly BlockingCollection<Action> _statusUpdateQueue = new BlockingCollection<Action>();
+        private CancellationToken _cancellationToken;
+        
+
+        public void Start(CancellationToken token)
+        {
+            _cancellationToken = token;
+            Task.Factory.StartNew(ProcessUpdates, _cancellationToken);
+        }
+
+        private void ProcessUpdates()
+        {
+            foreach (var action in _statusUpdateQueue.GetConsumingEnumerable())
+            {
+                action();
+            }
+        }
+
+        public void Add(Action action)
+        {
+            _statusUpdateQueue.Add(action);
+        }
+
+        public void Stop()
+        {
+            _statusUpdateQueue.CompleteAdding();
+        }
+       
+    }
+}
index f448517..1f1ea99 100644 (file)
   <ItemGroup>
     <Compile Include="FileState.cs" />
     <Compile Include="IStatusService.cs" />
+    <Compile Include="JobQueue.cs" />
     <Compile Include="Signature.cs" />
     <Compile Include="StatusKeeper.cs" />
     <Compile Include="IPithosWorkflow.cs" />
index e9888d2..4dc51a6 100644 (file)
@@ -24,7 +24,7 @@ namespace Pithos.Core
         [System.ComponentModel.Composition.Import]
         public IPithosSettings Settings { get; set; }
 
-        private BlockingCollection<Action> _statusUpdateQueue = new BlockingCollection<Action>();        
+        private JobQueue _statusUpdateQueue;
 
         public StatusKeeper()
         {            
@@ -63,20 +63,15 @@ namespace Pithos.Core
 
         public void StartProcessing(CancellationToken token)
         {
-            Task.Factory.StartNew(ProcessUpdates,token);            
+            _statusUpdateQueue.Start(token);
+            
         }
 
-        public void ProcessUpdates()
-        {            
-            foreach (var action in _statusUpdateQueue.GetConsumingEnumerable())
-            {
-                action();
-            }
-        }
+       
 
         public void Stop()
         {
-            _statusUpdateQueue.CompleteAdding();            
+            _statusUpdateQueue.Stop();            
         }
        
 
index fb26abf..ad1eb77 100644 (file)
@@ -8,6 +8,7 @@ using System.Linq;
 using System.Net;
 using System.Security.Cryptography;
 using System.Text;
+using System.Threading.Algorithms;
 using System.Threading.Tasks;
 using Newtonsoft.Json;
 using Pithos.Interfaces;
@@ -78,8 +79,6 @@ namespace Pithos.Network
 
                 authClient.AssertStatusOK("Authentication failed");
 
-                //var keys = authClient.ResponseHeaders.AllKeys.AsQueryable();
-
                 string storageUrl = authClient.GetHeaderValue("X-Storage-Url");
                 if (String.IsNullOrWhiteSpace(storageUrl))
                     throw new InvalidOperationException("Failed to obtain storage url");
@@ -91,47 +90,22 @@ namespace Pithos.Network
                 Token = token;
             }
 
-            /*_retryPolicy = new RetryPolicy { RetryCount = _retries };
-            _retryPolicy.RetryConditions.Add(new TimeoutRetryCondition());*/
-
             _client = new PithosClient{
                 BaseAddress  = StorageUrl.AbsoluteUri,                
                 Timeout=10000,
                 Retries=3};
             if (Proxy!=null)
                 _client.Proxy = new WebProxy(Proxy);
-            //_client.FileProgress += OnFileProgress;
             
             _client.Headers.Add("X-Auth-Token", Token);
-            /*if (UsePithos)
-            {
-                _client.AddHeader("X-Auth-User", UserName);
-                _client.AddHeader("X-Auth-Key",ApiKey);                
-            }*/
 
             Trace.TraceInformation("[AUTHENTICATE] End for {0}", userName);
         }
 
-       /* private void OnFileProgress(object sender, FileProgressEventArgs e)
-        {
-            Trace.TraceInformation("[PROGRESS] {0} {1:p} {2} of {3}",e.FileName,(double)e.BytesWritten/e.TotalBytes, e.BytesWritten,e.TotalBytes);            
-        }*/
 
         public IList<ContainerInfo> ListContainers()
         {                        
-            //Workaround for Hammock quirk: Hammock always
-            //appends a / unless a Path is specified.
-            
-            //Create a request with a complete path
-            //var request = new RestRequest { Path = StorageUrl.ToString(), RetryPolicy = _retryPolicy,Timeout = _shortTimeout };
-            //request.AddParameter("format","json");
-            //Create a client clone
-
-            /*var url = String.Join("/", new[] { _client.Authority, StorageUrl.ToString() });
-            var builder=new UriBuilder(url);
-            builder.Query = "format=json";
-           
-            var client= new PithosClient(_client){Timeout=10};   */         
+                  
             var content=_client.DownloadStringWithRetry("",3);
             _client.Parameters.Clear();
             _client.Parameters.Add("format", "json");
@@ -142,29 +116,6 @@ namespace Pithos.Network
             var infos = JsonConvert.DeserializeObject<IList<ContainerInfo>>(content);
             return infos;
 
-
-/*
-            var client = new RestClient{Proxy=Proxy.ToString()};
-            foreach (var header in _client.GetAllHeaders())
-            {
-                client.AddHeader(header.Name,header.Value);
-            }
-
-            
-
-
-
-            var response = client.Request(request);
-
-            if (response.StatusCode == HttpStatusCode.NoContent)
-                return new List<ContainerInfo>();
-
-            ThrowIfNotStatusOK(response, "List Containers failed");
-
-
-            var infos=JsonConvert.DeserializeObject<IList<ContainerInfo>>(response.Content);
-            
-            return infos;*/
         }
 
         public IList<ObjectInfo> ListObjects(string container)
@@ -174,17 +125,7 @@ namespace Pithos.Network
 
             Trace.TraceInformation("[START] ListObjects");
 
-            //var request = new RestRequest { Path = container, RetryPolicy = _retryPolicy, Timeout = TimeSpan.FromMinutes(1) };
-            //request.AddParameter("format", "json");
-            //var response = _client.Request(request);
-
-
-/*
-            var url = String.Join("/", new[] { _client.Authority, container });
-            var builder = new UriBuilder(url) {Query = "format=json"};
-
-            var client = new PithosClient(_client) { Timeout = 60000 };
-*/
+            
             _client.Parameters.Clear();
             _client.Parameters.Add("format", "json");
             var content = _client.DownloadStringWithRetry(container, 3);
@@ -206,9 +147,7 @@ namespace Pithos.Network
 
             Trace.TraceInformation("[START] ListObjects");
 
-           /* var request = new RestRequest { Path = container,RetryPolicy = _retryPolicy, Timeout = TimeSpan.FromMinutes(1) };
-            request.AddParameter("format", "json");
-            request.AddParameter("path", folder);*/
+           
             
             _client.Parameters.Clear();
             _client.Parameters.Add("format", "json");
@@ -218,37 +157,13 @@ namespace Pithos.Network
 
             var infos = JsonConvert.DeserializeObject<IList<ObjectInfo>>(content);
 
-           /* var response = _client.Request(request);
-            
-            var infos = InfosFromContent(response);*/
+         
 
             Trace.TraceInformation("[END] ListObjects");
             return infos;
         }
 
- /*       private static IList<ObjectInfo> InfosFromContent(RestResponse response)
-        {
-            if (response.TimedOut)
-                return new List<ObjectInfo>();
-
-            if (response.StatusCode == 0)
-                return new List<ObjectInfo>();
-
-            if (response.StatusCode == HttpStatusCode.NoContent)
-                return new List<ObjectInfo>();
-
-
-            var statusCode = (int)response.StatusCode;
-            if (statusCode < 200 || statusCode >= 300)
-            {
-                Trace.TraceWarning("ListObjects failed with code {0} - {1}", response.StatusCode, response.StatusDescription);
-                return new List<ObjectInfo>();
-            }
-
-            var infos = JsonConvert.DeserializeObject<IList<ObjectInfo>>(response.Content);
-            return infos;
-        }
-*/
         public bool ContainerExists(string container)
         {
             if (String.IsNullOrWhiteSpace(container))
@@ -256,9 +171,7 @@ namespace Pithos.Network
 
             _client.Parameters.Clear();
             _client.Head(container,3);
-            //var request = new RestRequest { Path = container, Method = WebMethod.Head, RetryPolicy = _retryPolicy,Timeout = _shortTimeout };            
-            //var response = _client.Request(request);
-
+           
             switch (_client.StatusCode)
             {
                 case HttpStatusCode.OK:
@@ -278,11 +191,6 @@ namespace Pithos.Network
             if (String.IsNullOrWhiteSpace(objectName))
                 throw new ArgumentNullException("objectName", "The objectName property can't be empty");
 
-
-/*
-            var request = new RestRequest { Path = container + "/" + objectName, Method = WebMethod.Head,RetryPolicy = _retryPolicy, Timeout = _shortTimeout };
-            var response = _client.Request(request);
-*/
             _client.Parameters.Clear();
             _client.Head(container + "/" + objectName, 3);
 
@@ -306,11 +214,6 @@ namespace Pithos.Network
             if (String.IsNullOrWhiteSpace(objectName))
                 throw new ArgumentNullException("objectName", "The objectName property can't be empty");
 
-
-/*
-            var request = new RestRequest { Path = container + "/" + objectName, Method = WebMethod.Head, RetryPolicy = _retryPolicy,Timeout = _shortTimeout };
-            var response = _client.Request(request);
-*/
             try
             {
                 _client.Parameters.Clear();
@@ -375,12 +278,7 @@ namespace Pithos.Network
                 throw new ArgumentNullException("folder", "The folder property can't be empty");
 
             var folderUrl=String.Format("{0}/{1}",container,folder);
-/*
-            var request = new RestRequest { Path = folderUrl, Method = WebMethod.Put, RetryPolicy = _retryPolicy,Timeout = _shortTimeout };
-            request.AddHeader("Content-Type", @"application/directory");
-            request.AddHeader("Content-Length", "0");
-*/
-            
+   
             _client.Parameters.Clear();
             _client.Headers.Add("Content-Type", @"application/directory");
             _client.Headers.Add("Content-Length", "0");
@@ -396,10 +294,6 @@ namespace Pithos.Network
             if (String.IsNullOrWhiteSpace(container))
                 throw new ArgumentNullException("container", "The container property can't be empty");
 
-/*
-            var request = new RestRequest { Path = container, Method = WebMethod.Head, RetryPolicy = _retryPolicy,Timeout = _shortTimeout };
-            var response = _client.Request(request);
-*/
             _client.Head(container);
             switch (_client.StatusCode)
             {
@@ -423,10 +317,6 @@ namespace Pithos.Network
             if (String.IsNullOrWhiteSpace(container))
                 throw new ArgumentNullException("container", "The container property can't be empty");
 
-/*
-            var request = new RestRequest { Path = container, Method = WebMethod.Put, RetryPolicy = _retryPolicy,Timeout = _shortTimeout };            
-            var response = _client.Request(request);
-*/
             _client.PutWithRetry(container,3);
             var expectedCodes = new[]{HttpStatusCode.Created ,HttpStatusCode.Accepted , HttpStatusCode.OK};
             if (!expectedCodes.Contains(_client.StatusCode))
@@ -438,10 +328,6 @@ namespace Pithos.Network
             if (String.IsNullOrWhiteSpace(container))
                 throw new ArgumentNullException("container", "The container property can't be empty");
 
-/*
-            var request = new RestRequest { Path = container, Method = WebMethod.Delete, RetryPolicy = _retryPolicy,Timeout = _shortTimeout };
-            var response = _client.Request(request);
-*/
             _client.DeleteWithRetry(container,3);
             var expectedCodes = new[] { HttpStatusCode.NotFound, HttpStatusCode.NoContent};
             if (!expectedCodes.Contains(_client.StatusCode))
@@ -463,22 +349,14 @@ namespace Pithos.Network
                 throw new ArgumentNullException("container", "The container property can't be empty");
             if (String.IsNullOrWhiteSpace(objectName))
                 throw new ArgumentNullException("objectName", "The objectName property can't be empty");
-            /*
-           var request = new RestRequest {Path = container + "/" + objectName, Method = WebMethod.Get};
-           
-                       if (DownloadPercentLimit > 0)
-                           request.TaskOptions = new TaskOptions<int> { RateLimitPercent = DownloadPercentLimit };
-           */
+            
             try
             {
                 var url = String.Join("/", _client.BaseAddress, container, objectName);
                 var uri = new Uri(url);
 
                 var client = new PithosClient(_client){Timeout=0};
-               /* if (!String.IsNullOrWhiteSpace(_client.Proxy))
-                    client.Proxy = new WebProxy(_client.Proxy);
-
-                CopyHeaders(_client, client);*/
+               
 
                 Trace.TraceInformation("[GET] START {0}", objectName);
                 client.DownloadProgressChanged += (sender, args) => 
@@ -544,12 +422,6 @@ namespace Pithos.Network
                 client.Headers.Add("Content-Type", "application/octet-stream");
                 client.Headers.Add("ETag", etag);
 
-/*
-                if(!String.IsNullOrWhiteSpace(_client.Proxy))
-                    client.Proxy = new WebProxy(_client.Proxy);
-
-                CopyHeaders(_client, client);
-*/
 
                 Trace.TraceInformation("[PUT] START {0}", objectName);
                 client.UploadProgressChanged += (sender, args) =>
@@ -578,47 +450,6 @@ namespace Pithos.Network
 
         }
        
-/*
-        /// <summary>
-        /// Copies headers from a Hammock RestClient to a WebClient
-        /// </summary>
-        /// <param name="source">The RestClient from which the headers are copied</param>
-        /// <param name="target">The WebClient to which the headers are copied</param>
-        private static void CopyHeaders(RestClient source, WebClient target)
-        {
-            Contract.Requires(source!=null,"source can't be null");
-            Contract.Requires(target != null, "target can't be null");
-            if (source == null)
-                throw new ArgumentNullException("source", "source can't be null");
-            if (source == null)
-                throw new ArgumentNullException("target", "target can't be null");
-
-            foreach (var header in source.GetAllHeaders())
-            {
-                target.Headers.Add(header.Name, header.Value);
-            }
-        }*/
-
-       /* /// <summary>
-        /// Copies headers from a Hammock RestClient to a WebClient
-        /// </summary>
-        /// <param name="source">The RestClient from which the headers are copied</param>
-        /// <param name="target">The WebClient to which the headers are copied</param>
-        private static void CopyHeaders(RestClient source, WebRequest target)
-        {
-            Contract.Requires(source!=null,"source can't be null");
-            Contract.Requires(target != null, "target can't be null");
-            if (source == null)
-                throw new ArgumentNullException("source", "source can't be null");
-            if (source == null)
-                throw new ArgumentNullException("target", "target can't be null");
-
-            foreach (var header in source.GetAllHeaders())
-            {
-                target.Headers.Add(header.Name, header.Value);
-            }
-        }*/
-
         
         private static string CalculateHash(string fileName)
         {
@@ -641,10 +472,6 @@ namespace Pithos.Network
             if (String.IsNullOrWhiteSpace(objectName))
                 throw new ArgumentNullException("objectName", "The objectName property can't be empty");
 
-/*
-            var request = new RestRequest { Path = container + "/" + objectName, Method = WebMethod.Delete, RetryPolicy = _retryPolicy,Timeout = _shortTimeout };
-            var response = _client.Request(request);
-*/
             _client.DeleteWithRetry(container + "/" + objectName,3);
 
             var expectedCodes = new[] { HttpStatusCode.NotFound, HttpStatusCode.NoContent };
@@ -667,13 +494,6 @@ namespace Pithos.Network
             var targetUrl = targetContainer + "/" + newObjectName;
             var sourceUrl = String.Format("/{0}/{1}", sourceContainer, oldObjectName);
 
-/*
-            var request = new RestRequest { Path = targetUrl, Method = WebMethod.Put };
-            request.AddHeader("X-Copy-From",sourceUrl);
-            request.AddPostContent(new byte[]{});
-            var response = _client.Request(request);
-*/
-
             var client = new PithosClient(_client);
             client.Headers.Add("X-Copy-From", sourceUrl);
             client.PutWithRetry(targetUrl,3);
@@ -688,71 +508,11 @@ namespace Pithos.Network
         }
 
       
-
-        /*private string GetHeaderValue(string headerName, WebHeaderCollection headers, IQueryable<string> keys)
-        {
-            if (keys.Any(key => key == headerName))
-                return headers[headerName];
-            else
-                throw new WebException(String.Format("The {0}  header is missing", headerName));
-        }*/
-
-       /* private static void ThrowIfNotStatusOK(RestResponse response, string message)
-        {
-            int status = (int)response.StatusCode;
-            ThrowIfNotStatusOK(status, message);
-        }
-
-        private static void ThrowIfNotStatusOK(HttpWebResponse response, string message)
-        {
-            int status = (int)response.StatusCode;
-            ThrowIfNotStatusOK(status, message);
-        }
-
-        private static void ThrowIfNotStatusOK(int status, string message)
-        {
-            if (status < 200 || status >= 300)
-                throw new WebException(String.Format("{0} with code {1}", message, status));
-        }
-        */
         private static WebException CreateWebException(string operation, HttpStatusCode statusCode)
         {
             return new WebException(String.Format("{0} failed with unexpected status code {1}", operation, statusCode));
         }
 
-       /* public static Func<T> Retry<T>(Func<int,T> original, int retryCount,int timeout)
-        {
-            return () =>
-            {
-                while (true)
-                {
-                    try
-                    {
-                        return original(timeout);
-                    }
-                    catch (WebException e)
-                    {
-                        if (e.Status == WebExceptionStatus.Timeout)
-                        {
-                            if (retryCount == 0)
-                            {
-                                throw;
-                            }
-                            retryCount--;
-                        }
-                        else
-                        {
-                            throw;
-                        }
-                    }
-                    catch (Exception e)
-                    {                                   
-                            throw;                        
-                    }
-                }
-            };
-        } */
-
-
+        
     }
 }
index 15be9de..2942fce 100644 (file)
@@ -44,12 +44,7 @@ namespace Pithos.Network
             
         }
 
-        /*public PithosClient(RestClient restClient)
-            :base()
-        {
-            CopyHeaders(restClient);
-        }*/
-
+       
         public PithosClient(PithosClient other)
             : base()
         {
@@ -103,6 +98,8 @@ namespace Pithos.Network
             var actualAddress = GetActualAddress(address);
 
             var actualRetries = (retries == 0) ? Retries : retries;
+            
+
             var func = Retry(() =>
             {
                 var uriString = String.Join("/", BaseAddress, actualAddress);
@@ -159,14 +156,10 @@ namespace Pithos.Network
                 if (ResponseHeaders!=null)
                     ResponseHeaders.Clear();
 
-
-                //CopyHeaders(this.Headers,request.Headers);
-
                 var response = (HttpWebResponse)GetWebResponse(request);
                 StatusCode = response.StatusCode;
                 StatusDescription = response.StatusDescription;                
                 
-                //CopyHeaders(response.Headers,this.ResponseHeaders);
 
                 return 0;
             }, actualRetries);
@@ -212,27 +205,9 @@ namespace Pithos.Network
             return result;
         }
 
-      /*  /// <summary>
-        /// Copies headers from a Hammock RestClient to a WebClient
-        /// </summary>
-        /// <param name="source">The RestClient from which the headers are copied</param>
-        /// <param name="target">The WebClient to which the headers are copied</param>
-        private void CopyHeaders(RestClient source)
-        {
-            Contract.Requires(source != null, "source can't be null");
-            if (source == null)
-                throw new ArgumentNullException("source", "source can't be null");
-            if (source == null)
-                throw new ArgumentNullException("target", "target can't be null");
-
-            foreach (var header in source.GetAllHeaders())
-            {
-                Headers.Add(header.Name, header.Value);
-            }
-        } 
-        */
+      
         /// <summary>
-        /// Copies headers from a Hammock RestClient to a WebClient
+        /// Copies headers from another PithosClient
         /// </summary>
         /// <param name="source">The PithosClient from which the headers are copied</param>
         public void CopyHeaders(PithosClient source)
@@ -244,10 +219,10 @@ namespace Pithos.Network
         }
         
         /// <summary>
-        /// Copies headers from a Hammock RestClient to a WebClient
+        /// Copies headers from one header collection to another
         /// </summary>
-        /// <param name="source">The RestClient from which the headers are copied</param>
-        /// <param name="target">The WebClient to which the headers are copied</param>
+        /// <param name="source">The source collection from which the headers are copied</param>
+        /// <param name="target">The target collection to which the headers are copied</param>
         public static void CopyHeaders(WebHeaderCollection source,WebHeaderCollection target)
         {
             Contract.Requires(source != null, "source can't be null");