From: Stavros Sachtouris Date: Tue, 30 Oct 2012 16:11:17 +0000 (+0200) Subject: Remove request depedancy X-Git-Tag: v0.6~26 X-Git-Url: https://code.grnet.gr/git/kamaki/commitdiff_plain/726fa2a18a0015d773cf57e8895fff6330bf422a Remove request depedancy --- diff --git a/kamaki/clients/connection/pool/__init__.py b/kamaki/clients/connection/pool/__init__.py deleted file mode 100644 index 1b90938..0000000 --- a/kamaki/clients/connection/pool/__init__.py +++ /dev/null @@ -1,135 +0,0 @@ -# Copyright 2011-2012 GRNET S.A. All rights reserved. -# -# Redistribution and use in source and binary forms, with or -# without modification, are permitted provided that the following -# conditions are met: -# -# 1. Redistributions of source code must retain the above -# copyright notice, this list of conditions and the following -# disclaimer. -# -# 2. Redistributions in binary form must reproduce the above -# copyright notice, this list of conditions and the following -# disclaimer in the documentation and/or other materials -# provided with the distribution. -# -# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS -# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED -# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR -# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR -# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, -# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT -# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF -# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED -# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT -# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN -# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE -# POSSIBILITY OF SUCH DAMAGE. -# -# The views and conclusions contained in the software and -# documentation are those of the authors and should not be -# interpreted as representing official policies, either expressed -# or implied, of GRNET S.A. - - -"""Classes to support pools of arbitrary objects. - -The :class:`ObjectPool` class in this module abstracts a pool -of arbitrary objects. Subclasses need to define the details regarding -creation, destruction, allocation and release of their specific objects. - -""" - -# This should work under gevent, because gevent monkey patches 'threading' -# if not, we can detect if running under gevent, e.g. using -# if 'gevent' in sys.modules: -# from gevent.coros import Semaphore -# else: -# from threading import Semaphore -from threading import Semaphore, Lock - - -__all__ = ['ObjectPool', 'ObjectPoolError', 'PoolEmptyError'] - - -class ObjectPoolError(Exception): - pass - - -class PoolEmptyError(ObjectPoolError): - pass - - -class ObjectPool(object): - def __init__(self, size=None): - try: - self.size = int(size) - assert size >= 1 - except: - raise ValueError("Invalid size for pool (positive integer " - "required): %r" % (size,)) - - self._semaphore = Semaphore(size) # Pool grows up to size - self._mutex = Lock() # Protect shared _set oject - self._set = set() - - def pool_get(self, blocking=True, timeout=None, create=True): - """Get an object from the pool. - - Get an object from the pool. By default (create=True), create a new - object if the pool has not reached its maximum size yet. If - create == False, the caller is responsible for creating the object and - put()ting it back into the pool when done. - - """ - # timeout argument only supported by gevent and py3k variants - # of Semaphore. acquire() will raise TypeError if timeout - # is specified but not supported by the underlying implementation. - kw = {"blocking": blocking} - if timeout is not None: - kw["timeout"] = timeout - r = self._semaphore.acquire(**kw) - if not r: - raise PoolEmptyError() - with self._mutex: - try: - try: - obj = self._set.pop() - except KeyError: - obj = self._pool_create() if create else None - except: - self._semaphore.release() - raise - # We keep _semaphore locked, put() will release it - return obj - - def pool_put(self, obj): - """Put an object back into the pool. - - Return an object to the pool, for subsequent retrieval - by pool_get() calls. If _pool_cleanup() returns True, - the object has died and is not put back into self._set. - - """ - with self._mutex: - if not self._pool_cleanup(obj): - self._set.add(obj) - self._semaphore.release() - - def _pool_create(self): - """Create a new object to be used with this pool. - - Create a new object to be used with this pool, - should be overriden in subclasses. - - """ - raise NotImplementedError - - def _pool_cleanup(self, obj): - """Cleanup an object before being put back into the pool. - - Cleanup an object before it can be put back into the pull, - ensure it is in a stable, reusable state. - - """ - raise NotImplementedError diff --git a/kamaki/clients/connection/pool/http.py b/kamaki/clients/connection/pool/http.py deleted file mode 100644 index 0e1bea9..0000000 --- a/kamaki/clients/connection/pool/http.py +++ /dev/null @@ -1,128 +0,0 @@ - -from kamaki.clients.connection.pool import ObjectPool - -from httplib import ( - HTTPConnection as http_class, - HTTPSConnection as https_class, - HTTPException, - ResponseNotReady -) - -from urlparse import urlparse -from new import instancemethod - - -_pools = {} -pool_size = 8 - - -USAGE_LIMIT = 25 -RETRY_LIMIT = 100 - - -def init_http_pooling(size): - global pool_size - pool_size = size - - -def put_http_connection(conn): - pool = conn._pool - if pool is None: - return - conn._pool = None - pool.pool_put(conn) - - -class HTTPConnectionPool(ObjectPool): - - _scheme_to_class = { - 'http' : http_class, - 'https' : https_class, - } - - def __init__(self, scheme, netloc, size=None): - ObjectPool.__init__(self, size=size) - - connection_class = self._scheme_to_class.get(scheme, None) - if connection_class is None: - m = 'Unsupported scheme: %s' % (scheme,) - raise ValueError(m) - - self.connection_class = connection_class - self.scheme = scheme - self.netloc = netloc - - def _pool_create(self): - conn = self.connection_class(self.netloc) - conn._use_counter = USAGE_LIMIT - conn._pool = self - conn._real_close = conn.close - conn.close = instancemethod(put_http_connection, - conn, type(conn)) - return conn - - def _pool_cleanup(self, conn): - # every connection can be used a finite number of times - conn._use_counter -= 1 - - # see httplib source for connection states documentation - if conn._use_counter > 0 and conn._HTTPConnection__state == 'Idle': - try: - resp = conn.getresponse() - except ResponseNotReady: - return False - - conn._real_close() - return True - - -def _verify_connection(conn): - try: - conn.request("HEAD", "/") - conn.getresponse().read() - except HTTPException: - # The connection has died. - conn.close() - return False - return True - - -def get_http_connection(netloc=None, scheme='http'): - if netloc is None: - m = "netloc cannot be None" - raise ValueError(m) - # does the pool need to be created? - if netloc not in _pools: - pool = HTTPConnectionPool(scheme, netloc, size=pool_size) - _pools[netloc] = pool - - pool = _pools[netloc] - - conn = None - n = 0 - while conn is None: - conn = pool.pool_get() - conn._pool = pool - if not _verify_connection(conn): - # The connection has died, e.g., Keepalive expired - conn = None - n += 1 - if n > RETRY_LIMIT: - m = ("Could not get live HTTP conn from pool" - "after %d retries." % RETRY_LIMIT) - raise RuntimeError(m) - return conn - -def main(): - #cpool = HTTPConnectionPool('https', 'pithos.okeanos.io/v1', size=8) - headers={'X-Auth-Token':'0TpoyAXqJSPxLdDuZHiLOA=='} - c = get_http_connection('pithos.okeanos.io', 'https') - c.request(method='get', url='https://pithos.okeanos.io/v1/saxtouri@admin.grnet.gr?format=json', - headers=headers) - r = c.getresponse() - print('HEADERS:'+unicode(r.getheaders())) - print('BODY:'+unicode(r.read())) - - -if __name__ == '__main__': - main() \ No newline at end of file diff --git a/kamaki/clients/connection/pool/tests.py b/kamaki/clients/connection/pool/tests.py deleted file mode 100755 index 64ba5ca..0000000 --- a/kamaki/clients/connection/pool/tests.py +++ /dev/null @@ -1,209 +0,0 @@ -#!/usr/bin/env python -# -# -*- coding: utf-8 -*- -# -# Copyright 2011 GRNET S.A. All rights reserved. -# -# Redistribution and use in source and binary forms, with or -# without modification, are permitted provided that the following -# conditions are met: -# -# 1. Redistributions of source code must retain the above -# copyright notice, this list of conditions and the following -# disclaimer. -# -# 2. Redistributions in binary form must reproduce the above -# copyright notice, this list of conditions and the following -# disclaimer in the documentation and/or other materials -# provided with the distribution. -# -# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS -# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED -# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR -# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR -# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, -# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT -# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF -# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED -# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT -# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN -# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE -# POSSIBILITY OF SUCH DAMAGE. -# -# The views and conclusions contained in the software and -# documentation are those of the authors and should not be -# interpreted as representing official policies, either expressed -# or implied, of GRNET S.A. -# -# - -"""Unit Tests for the pool classes in synnefo.lib.pool - -Provides unit tests for the code implementing pool -classes in the synnefo.lib.pool module. - -""" - -# Support running under a gevent-monkey-patched environment -# if the "monkey" argument is specified in the command line. -import sys -if "monkey" in sys.argv: - from gevent import monkey - monkey.patch_all() - sys.argv.pop(sys.argv.index("monkey")) - -import sys -import time -import threading - -from . import ObjectPool, PoolEmptyError - -# Use backported unittest functionality if Python < 2.7 -try: - import unittest2 as unittest -except ImportError: - if sys.version_info < (2, 7): - raise Exception("The unittest2 package is required for Python < 2.7") - import unittest - - -class NumbersPool(ObjectPool): - max = 0 - - def _pool_create(self): - n = self.max - self.max += 1 - return n - - def _pool_cleanup(self, obj): - n = int(obj) - if n < 0: - return True - pass - - -class ObjectPoolTestCase(unittest.TestCase): - def test_create_pool_requires_size(self): - """Test __init__() requires valid size argument""" - self.assertRaises(ValueError, ObjectPool) - self.assertRaises(ValueError, ObjectPool, size="size10") - self.assertRaises(ValueError, ObjectPool, size=0) - self.assertRaises(ValueError, ObjectPool, size=-1) - - def test_create_pool(self): - """Test pool creation works""" - pool = ObjectPool(100) - self.assertEqual(pool.size, 100) - - def test_get_not_implemented(self): - """Test pool_get() method not implemented in abstract class""" - pool = ObjectPool(100) - self.assertRaises(NotImplementedError, pool.pool_get) - - def test_put_not_implemented(self): - """Test pool_put() method not implemented in abstract class""" - pool = ObjectPool(100) - self.assertRaises(NotImplementedError, pool.pool_put, None) - - -class NumbersPoolTestCase(unittest.TestCase): - N = 1500 - SEC = 0.5 - maxDiff = None - - def setUp(self): - self.numbers = NumbersPool(self.N) - - def test_initially_empty(self): - """Test pool is empty upon creation""" - self.assertEqual(self.numbers._set, set([])) - - def test_seq_allocate_all(self): - """Test allocation and deallocation of all pool objects""" - n = [] - for _ in xrange(0, self.N): - n.append(self.numbers.pool_get()) - self.assertEqual(n, range(0, self.N)) - for i in n: - self.numbers.pool_put(i) - self.assertEqual(self.numbers._set, set(n)) - - def test_parallel_allocate_all(self): - """Allocate all pool objects in parallel""" - def allocate_one(pool, results, index): - n = pool.pool_get() - results[index] = n - - results = [None] * self.N - threads = [threading.Thread(target=allocate_one, - args=(self.numbers, results, i,)) - for i in xrange(0, self.N)] - - for t in threads: - t.start() - for t in threads: - t.join() - - # This nonblocking pool_get() should fail - self.assertRaises(PoolEmptyError, self.numbers.pool_get, - blocking=False) - self.assertEqual(sorted(results), range(0, self.N)) - - def test_allocate_no_create(self): - """Allocate objects from the pool without creating them""" - for i in xrange(0, self.N): - self.assertIsNone(self.numbers.pool_get(create=False)) - - # This nonblocking pool_get() should fail - self.assertRaises(PoolEmptyError, self.numbers.pool_get, - blocking=False) - - def test_pool_cleanup_returns_failure(self): - """Put a broken object, test a new one is retrieved eventually""" - n = [] - for _ in xrange(0, self.N): - n.append(self.numbers.pool_get()) - self.assertEqual(n, range(0, self.N)) - - del n[-1:] - self.numbers.pool_put(-1) # This is a broken object - self.assertFalse(self.numbers._set) - self.assertEqual(self.numbers.pool_get(), self.N) - - def test_parallel_get_blocks(self): - """Test threads block if no object left in the pool""" - def allocate_one_and_sleep(pool, sec, result, index): - n = pool.pool_get() - time.sleep(sec) - result[index] = n - pool.pool_put(n) - - results = [None] * (2 * self.N + 1) - threads = [threading.Thread(target=allocate_one_and_sleep, - args=(self.numbers, self.SEC, results, i,)) - for i in xrange(0, 2 * self.N + 1)] - - # This should take 3 * SEC seconds - start = time.time() - for t in threads: - t.start() - for t in threads: - t.join() - diff = time.time() - start - self.assertTrue(diff > 3 * self.SEC) - self.assertLess((diff - 3 * self.SEC) / 3 * self.SEC, .5) - - # One number must have been used three times, - # all others must have been used once - freq = {} - for r in results: - freq[r] = freq.get(r, 0) + 1 - self.assertTrue(len([r for r in results if freq[r] == 2]), self. N) - triples = [r for r in freq if freq[r] == 3] - self.assertTrue(len(triples), 1) - self.assertEqual(sorted(results), - sorted(2 * range(0, self.N) + triples)) - - -if __name__ == '__main__': - unittest.main() diff --git a/kamaki/clients/connection/request.py b/kamaki/clients/connection/request.py deleted file mode 100644 index a853795..0000000 --- a/kamaki/clients/connection/request.py +++ /dev/null @@ -1,151 +0,0 @@ -# Copyright 2011-2012 GRNET S.A. All rights reserved. -# -# Redistribution and use in source and binary forms, with or -# without modification, are permitted provided that the following -# conditions are met: -# -# 1. Redistributions of source code must retain the above -# copyright notice, self.list of conditions and the following -# disclaimer. -# -# 2. Redistributions in binary form must reproduce the above -# copyright notice, self.list of conditions and the following -# disclaimer in the documentation and/or other materials -# provided with the distribution. -# -# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS -# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED -# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR -# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR -# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, -# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT -# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF -# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED -# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT -# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN -# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE -# POSSIBILITY OF SUCH DAMAGE. -# -# The views and conclusions contained in the software and -# documentation are those of the authors and should not be -# interpreted as representing official policies, either expressed -# or implied, of GRNET S.A. - -import requests -from kamaki.clients.connection import HTTPConnection, HTTPResponse, HTTPConnectionError -from kamaki.clients.connection.pool import ObjectPool -from urlparse import urlparse - -# Add a convenience status property to the responses -def _status(self): - return requests.status_codes._codes[self.status_code][0].upper() -requests.Response.status = property(_status) - -class HTTPRequestsResponse(HTTPResponse): - - def __init__(self, request=None, prefetched=False): - super(HTTPRequestsResponse, self).__init__(request=request, prefetched=prefetched) - if prefetched: - self = request.response - - def _get_response(self): - if self.prefetched: - return - r = self.request.response - try: - self.headers = r.headers - self.status = r.status - self.status_code = r.status_code - self.content = r.content if hasattr(r, 'content') else None - from json import loads - try: - self.json = loads(r.content)#None if self._get_content_only else r.json - except ValueError: - self.json = None - self.text = r.content#None if self._get_content_only else r.text - self.exception = r.exception if hasattr(r, 'exception') else None - except requests.ConnectionError as err: - raise HTTPConnectionError('Connection error', status=651, details=err.message) - except requests.HTTPError as err: - raise HTTPConnectionError('HTTP error', status=652, details=err.message) - except requests.Timeout as err: - raise HTTPConnectionError('Connection Timeout', status=408, details=err.message) - except requests.URLRequired as err: - raise HTTPConnectionError('Invalid URL', status=404, details=err.message) - except requests.RequestException as err: - raise HTTPConnectionError('HTTP Request error', status=700, details=err.message) - self.prefetched=True - - def release(self): - """requests object handles this automatically""" - if hasattr(self, '_pool'): - self._pool.pool_put(self) - -POOL_SIZE=8 -class HTTPRequestsResponsePool(ObjectPool): - def __init__(self, netloc, size=POOL_SIZE): - super(HTTPRequestsResponsePool, self).__init__(size=size) - self.netloc = netloc - - def _pool_cleanup(self, resp): - resp._get_response() - return True - - @classmethod - def key(self, full_url): - p = urlparse(full_url) - return '%s:%s:%s'%(p.scheme,p.netloc, p.port) - - def _pool_create(self): - resp = HTTPRequestsResponse() - resp._pool = self - return resp - -class HTTPRequest(HTTPConnection): - - _pools = {} - - #Avoid certificate verification by default - verify = False - - def _get_response_object(self): - pool_key = HTTPRequestsResponsePool.key(self.url) - try: - respool = self._pools[pool_key] - except KeyError: - self._pools[pool_key] = HTTPRequestsResponsePool(pool_key) - respool = self._pools[pool_key] - return respool.pool_get() - - def perform_request(self, method=None, url=None, params=None, headers=None, data=None): - """perform a request - Example: method='PUT' url='https://my.server:8080/path/to/service' - params={'update':None, 'format':'json'} headers={'X-Auth-Token':'s0m3t0k3n=='} - data='The data body to put to server' - @return an HTTPResponse which is also stored as self.response - """ - if method is not None: - self.method = method - if url is not None: - self.url = url - if params is not None: - self.params = params - if headers is not None: - self.headers = headers - http_headers = {} - for k,v in self.headers.items(): - http_headers[str(k)] = str(v) - - for i,(key, val) in enumerate(self.params.items()): - param_str = ('?' if i == 0 else '&') + unicode(key) - if val is not None: - param_str+= '='+unicode(val) - self.url += param_str - - #use pool before request, so that it will block if pool is full - res = self._get_response_object() - self._response_object = requests.request(str(self.method), - str(self.url), headers=http_headers, data=data, - verify=self.verify, prefetch = False) - res.request = self._response_object.request - return res diff --git a/setup.py b/setup.py index 275df3c..6d22ab6 100755 --- a/setup.py +++ b/setup.py @@ -38,8 +38,7 @@ from sys import version_info import kamaki - -required = ['ansicolors==1.0.2', 'progress==1.0.1', 'requests==0.12.1', 'gevent==0.13.6', 'snf-common>=0.10'] +required = ['ansicolors==1.0.2', 'progress==1.0.1', 'gevent==0.13.6', 'snf-common>=0.10'] if version_info[0:2] < (2, 7): required.extend(['argparse']) @@ -47,7 +46,7 @@ if version_info[0:2] < (2, 7): setup( name='kamaki', version=kamaki.__version__, - description='A command-line tool for managing clouds', + description='A command-line tool for poking clouds', long_description=open('README.rst').read(), url='http://code.grnet.gr/projects/kamaki', license='BSD',