+++ /dev/null
-# Copyright 2011-2012 GRNET S.A. All rights reserved.
-#
-# Redistribution and use in source and binary forms, with or
-# without modification, are permitted provided that the following
-# conditions are met:
-#
-# 1. Redistributions of source code must retain the above
-# copyright notice, this list of conditions and the following
-# disclaimer.
-#
-# 2. Redistributions in binary form must reproduce the above
-# copyright notice, this list of conditions and the following
-# disclaimer in the documentation and/or other materials
-# provided with the distribution.
-#
-# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
-# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
-# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
-# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
-# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
-# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
-# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
-# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
-# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
-# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
-# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
-# POSSIBILITY OF SUCH DAMAGE.
-#
-# The views and conclusions contained in the software and
-# documentation are those of the authors and should not be
-# interpreted as representing official policies, either expressed
-# or implied, of GRNET S.A.
-
-
-"""Classes to support pools of arbitrary objects.
-
-The :class:`ObjectPool` class in this module abstracts a pool
-of arbitrary objects. Subclasses need to define the details regarding
-creation, destruction, allocation and release of their specific objects.
-
-"""
-
-# This should work under gevent, because gevent monkey patches 'threading'
-# if not, we can detect if running under gevent, e.g. using
-# if 'gevent' in sys.modules:
-# from gevent.coros import Semaphore
-# else:
-# from threading import Semaphore
-from threading import Semaphore, Lock
-
-
-__all__ = ['ObjectPool', 'ObjectPoolError', 'PoolEmptyError']
-
-
-class ObjectPoolError(Exception):
- pass
-
-
-class PoolEmptyError(ObjectPoolError):
- pass
-
-
-class ObjectPool(object):
- def __init__(self, size=None):
- try:
- self.size = int(size)
- assert size >= 1
- except:
- raise ValueError("Invalid size for pool (positive integer "
- "required): %r" % (size,))
-
- self._semaphore = Semaphore(size) # Pool grows up to size
- self._mutex = Lock() # Protect shared _set oject
- self._set = set()
-
- def pool_get(self, blocking=True, timeout=None, create=True):
- """Get an object from the pool.
-
- Get an object from the pool. By default (create=True), create a new
- object if the pool has not reached its maximum size yet. If
- create == False, the caller is responsible for creating the object and
- put()ting it back into the pool when done.
-
- """
- # timeout argument only supported by gevent and py3k variants
- # of Semaphore. acquire() will raise TypeError if timeout
- # is specified but not supported by the underlying implementation.
- kw = {"blocking": blocking}
- if timeout is not None:
- kw["timeout"] = timeout
- r = self._semaphore.acquire(**kw)
- if not r:
- raise PoolEmptyError()
- with self._mutex:
- try:
- try:
- obj = self._set.pop()
- except KeyError:
- obj = self._pool_create() if create else None
- except:
- self._semaphore.release()
- raise
- # We keep _semaphore locked, put() will release it
- return obj
-
- def pool_put(self, obj):
- """Put an object back into the pool.
-
- Return an object to the pool, for subsequent retrieval
- by pool_get() calls. If _pool_cleanup() returns True,
- the object has died and is not put back into self._set.
-
- """
- with self._mutex:
- if not self._pool_cleanup(obj):
- self._set.add(obj)
- self._semaphore.release()
-
- def _pool_create(self):
- """Create a new object to be used with this pool.
-
- Create a new object to be used with this pool,
- should be overriden in subclasses.
-
- """
- raise NotImplementedError
-
- def _pool_cleanup(self, obj):
- """Cleanup an object before being put back into the pool.
-
- Cleanup an object before it can be put back into the pull,
- ensure it is in a stable, reusable state.
-
- """
- raise NotImplementedError
+++ /dev/null
-
-from kamaki.clients.connection.pool import ObjectPool
-
-from httplib import (
- HTTPConnection as http_class,
- HTTPSConnection as https_class,
- HTTPException,
- ResponseNotReady
-)
-
-from urlparse import urlparse
-from new import instancemethod
-
-
-_pools = {}
-pool_size = 8
-
-
-USAGE_LIMIT = 25
-RETRY_LIMIT = 100
-
-
-def init_http_pooling(size):
- global pool_size
- pool_size = size
-
-
-def put_http_connection(conn):
- pool = conn._pool
- if pool is None:
- return
- conn._pool = None
- pool.pool_put(conn)
-
-
-class HTTPConnectionPool(ObjectPool):
-
- _scheme_to_class = {
- 'http' : http_class,
- 'https' : https_class,
- }
-
- def __init__(self, scheme, netloc, size=None):
- ObjectPool.__init__(self, size=size)
-
- connection_class = self._scheme_to_class.get(scheme, None)
- if connection_class is None:
- m = 'Unsupported scheme: %s' % (scheme,)
- raise ValueError(m)
-
- self.connection_class = connection_class
- self.scheme = scheme
- self.netloc = netloc
-
- def _pool_create(self):
- conn = self.connection_class(self.netloc)
- conn._use_counter = USAGE_LIMIT
- conn._pool = self
- conn._real_close = conn.close
- conn.close = instancemethod(put_http_connection,
- conn, type(conn))
- return conn
-
- def _pool_cleanup(self, conn):
- # every connection can be used a finite number of times
- conn._use_counter -= 1
-
- # see httplib source for connection states documentation
- if conn._use_counter > 0 and conn._HTTPConnection__state == 'Idle':
- try:
- resp = conn.getresponse()
- except ResponseNotReady:
- return False
-
- conn._real_close()
- return True
-
-
-def _verify_connection(conn):
- try:
- conn.request("HEAD", "/")
- conn.getresponse().read()
- except HTTPException:
- # The connection has died.
- conn.close()
- return False
- return True
-
-
-def get_http_connection(netloc=None, scheme='http'):
- if netloc is None:
- m = "netloc cannot be None"
- raise ValueError(m)
- # does the pool need to be created?
- if netloc not in _pools:
- pool = HTTPConnectionPool(scheme, netloc, size=pool_size)
- _pools[netloc] = pool
-
- pool = _pools[netloc]
-
- conn = None
- n = 0
- while conn is None:
- conn = pool.pool_get()
- conn._pool = pool
- if not _verify_connection(conn):
- # The connection has died, e.g., Keepalive expired
- conn = None
- n += 1
- if n > RETRY_LIMIT:
- m = ("Could not get live HTTP conn from pool"
- "after %d retries." % RETRY_LIMIT)
- raise RuntimeError(m)
- return conn
-
-def main():
- #cpool = HTTPConnectionPool('https', 'pithos.okeanos.io/v1', size=8)
- headers={'X-Auth-Token':'0TpoyAXqJSPxLdDuZHiLOA=='}
- c = get_http_connection('pithos.okeanos.io', 'https')
- c.request(method='get', url='https://pithos.okeanos.io/v1/saxtouri@admin.grnet.gr?format=json',
- headers=headers)
- r = c.getresponse()
- print('HEADERS:'+unicode(r.getheaders()))
- print('BODY:'+unicode(r.read()))
-
-
-if __name__ == '__main__':
- main()
\ No newline at end of file
+++ /dev/null
-#!/usr/bin/env python
-#
-# -*- coding: utf-8 -*-
-#
-# Copyright 2011 GRNET S.A. All rights reserved.
-#
-# Redistribution and use in source and binary forms, with or
-# without modification, are permitted provided that the following
-# conditions are met:
-#
-# 1. Redistributions of source code must retain the above
-# copyright notice, this list of conditions and the following
-# disclaimer.
-#
-# 2. Redistributions in binary form must reproduce the above
-# copyright notice, this list of conditions and the following
-# disclaimer in the documentation and/or other materials
-# provided with the distribution.
-#
-# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
-# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
-# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
-# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
-# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
-# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
-# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
-# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
-# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
-# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
-# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
-# POSSIBILITY OF SUCH DAMAGE.
-#
-# The views and conclusions contained in the software and
-# documentation are those of the authors and should not be
-# interpreted as representing official policies, either expressed
-# or implied, of GRNET S.A.
-#
-#
-
-"""Unit Tests for the pool classes in synnefo.lib.pool
-
-Provides unit tests for the code implementing pool
-classes in the synnefo.lib.pool module.
-
-"""
-
-# Support running under a gevent-monkey-patched environment
-# if the "monkey" argument is specified in the command line.
-import sys
-if "monkey" in sys.argv:
- from gevent import monkey
- monkey.patch_all()
- sys.argv.pop(sys.argv.index("monkey"))
-
-import sys
-import time
-import threading
-
-from . import ObjectPool, PoolEmptyError
-
-# Use backported unittest functionality if Python < 2.7
-try:
- import unittest2 as unittest
-except ImportError:
- if sys.version_info < (2, 7):
- raise Exception("The unittest2 package is required for Python < 2.7")
- import unittest
-
-
-class NumbersPool(ObjectPool):
- max = 0
-
- def _pool_create(self):
- n = self.max
- self.max += 1
- return n
-
- def _pool_cleanup(self, obj):
- n = int(obj)
- if n < 0:
- return True
- pass
-
-
-class ObjectPoolTestCase(unittest.TestCase):
- def test_create_pool_requires_size(self):
- """Test __init__() requires valid size argument"""
- self.assertRaises(ValueError, ObjectPool)
- self.assertRaises(ValueError, ObjectPool, size="size10")
- self.assertRaises(ValueError, ObjectPool, size=0)
- self.assertRaises(ValueError, ObjectPool, size=-1)
-
- def test_create_pool(self):
- """Test pool creation works"""
- pool = ObjectPool(100)
- self.assertEqual(pool.size, 100)
-
- def test_get_not_implemented(self):
- """Test pool_get() method not implemented in abstract class"""
- pool = ObjectPool(100)
- self.assertRaises(NotImplementedError, pool.pool_get)
-
- def test_put_not_implemented(self):
- """Test pool_put() method not implemented in abstract class"""
- pool = ObjectPool(100)
- self.assertRaises(NotImplementedError, pool.pool_put, None)
-
-
-class NumbersPoolTestCase(unittest.TestCase):
- N = 1500
- SEC = 0.5
- maxDiff = None
-
- def setUp(self):
- self.numbers = NumbersPool(self.N)
-
- def test_initially_empty(self):
- """Test pool is empty upon creation"""
- self.assertEqual(self.numbers._set, set([]))
-
- def test_seq_allocate_all(self):
- """Test allocation and deallocation of all pool objects"""
- n = []
- for _ in xrange(0, self.N):
- n.append(self.numbers.pool_get())
- self.assertEqual(n, range(0, self.N))
- for i in n:
- self.numbers.pool_put(i)
- self.assertEqual(self.numbers._set, set(n))
-
- def test_parallel_allocate_all(self):
- """Allocate all pool objects in parallel"""
- def allocate_one(pool, results, index):
- n = pool.pool_get()
- results[index] = n
-
- results = [None] * self.N
- threads = [threading.Thread(target=allocate_one,
- args=(self.numbers, results, i,))
- for i in xrange(0, self.N)]
-
- for t in threads:
- t.start()
- for t in threads:
- t.join()
-
- # This nonblocking pool_get() should fail
- self.assertRaises(PoolEmptyError, self.numbers.pool_get,
- blocking=False)
- self.assertEqual(sorted(results), range(0, self.N))
-
- def test_allocate_no_create(self):
- """Allocate objects from the pool without creating them"""
- for i in xrange(0, self.N):
- self.assertIsNone(self.numbers.pool_get(create=False))
-
- # This nonblocking pool_get() should fail
- self.assertRaises(PoolEmptyError, self.numbers.pool_get,
- blocking=False)
-
- def test_pool_cleanup_returns_failure(self):
- """Put a broken object, test a new one is retrieved eventually"""
- n = []
- for _ in xrange(0, self.N):
- n.append(self.numbers.pool_get())
- self.assertEqual(n, range(0, self.N))
-
- del n[-1:]
- self.numbers.pool_put(-1) # This is a broken object
- self.assertFalse(self.numbers._set)
- self.assertEqual(self.numbers.pool_get(), self.N)
-
- def test_parallel_get_blocks(self):
- """Test threads block if no object left in the pool"""
- def allocate_one_and_sleep(pool, sec, result, index):
- n = pool.pool_get()
- time.sleep(sec)
- result[index] = n
- pool.pool_put(n)
-
- results = [None] * (2 * self.N + 1)
- threads = [threading.Thread(target=allocate_one_and_sleep,
- args=(self.numbers, self.SEC, results, i,))
- for i in xrange(0, 2 * self.N + 1)]
-
- # This should take 3 * SEC seconds
- start = time.time()
- for t in threads:
- t.start()
- for t in threads:
- t.join()
- diff = time.time() - start
- self.assertTrue(diff > 3 * self.SEC)
- self.assertLess((diff - 3 * self.SEC) / 3 * self.SEC, .5)
-
- # One number must have been used three times,
- # all others must have been used once
- freq = {}
- for r in results:
- freq[r] = freq.get(r, 0) + 1
- self.assertTrue(len([r for r in results if freq[r] == 2]), self. N)
- triples = [r for r in freq if freq[r] == 3]
- self.assertTrue(len(triples), 1)
- self.assertEqual(sorted(results),
- sorted(2 * range(0, self.N) + triples))
-
-
-if __name__ == '__main__':
- unittest.main()
+++ /dev/null
-# Copyright 2011-2012 GRNET S.A. All rights reserved.
-#
-# Redistribution and use in source and binary forms, with or
-# without modification, are permitted provided that the following
-# conditions are met:
-#
-# 1. Redistributions of source code must retain the above
-# copyright notice, self.list of conditions and the following
-# disclaimer.
-#
-# 2. Redistributions in binary form must reproduce the above
-# copyright notice, self.list of conditions and the following
-# disclaimer in the documentation and/or other materials
-# provided with the distribution.
-#
-# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
-# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
-# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
-# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
-# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
-# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
-# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
-# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
-# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
-# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
-# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
-# POSSIBILITY OF SUCH DAMAGE.
-#
-# The views and conclusions contained in the software and
-# documentation are those of the authors and should not be
-# interpreted as representing official policies, either expressed
-# or implied, of GRNET S.A.
-
-import requests
-from kamaki.clients.connection import HTTPConnection, HTTPResponse, HTTPConnectionError
-from kamaki.clients.connection.pool import ObjectPool
-from urlparse import urlparse
-
-# Add a convenience status property to the responses
-def _status(self):
- return requests.status_codes._codes[self.status_code][0].upper()
-requests.Response.status = property(_status)
-
-class HTTPRequestsResponse(HTTPResponse):
-
- def __init__(self, request=None, prefetched=False):
- super(HTTPRequestsResponse, self).__init__(request=request, prefetched=prefetched)
- if prefetched:
- self = request.response
-
- def _get_response(self):
- if self.prefetched:
- return
- r = self.request.response
- try:
- self.headers = r.headers
- self.status = r.status
- self.status_code = r.status_code
- self.content = r.content if hasattr(r, 'content') else None
- from json import loads
- try:
- self.json = loads(r.content)#None if self._get_content_only else r.json
- except ValueError:
- self.json = None
- self.text = r.content#None if self._get_content_only else r.text
- self.exception = r.exception if hasattr(r, 'exception') else None
- except requests.ConnectionError as err:
- raise HTTPConnectionError('Connection error', status=651, details=err.message)
- except requests.HTTPError as err:
- raise HTTPConnectionError('HTTP error', status=652, details=err.message)
- except requests.Timeout as err:
- raise HTTPConnectionError('Connection Timeout', status=408, details=err.message)
- except requests.URLRequired as err:
- raise HTTPConnectionError('Invalid URL', status=404, details=err.message)
- except requests.RequestException as err:
- raise HTTPConnectionError('HTTP Request error', status=700, details=err.message)
- self.prefetched=True
-
- def release(self):
- """requests object handles this automatically"""
- if hasattr(self, '_pool'):
- self._pool.pool_put(self)
-
-POOL_SIZE=8
-class HTTPRequestsResponsePool(ObjectPool):
- def __init__(self, netloc, size=POOL_SIZE):
- super(HTTPRequestsResponsePool, self).__init__(size=size)
- self.netloc = netloc
-
- def _pool_cleanup(self, resp):
- resp._get_response()
- return True
-
- @classmethod
- def key(self, full_url):
- p = urlparse(full_url)
- return '%s:%s:%s'%(p.scheme,p.netloc, p.port)
-
- def _pool_create(self):
- resp = HTTPRequestsResponse()
- resp._pool = self
- return resp
-
-class HTTPRequest(HTTPConnection):
-
- _pools = {}
-
- #Avoid certificate verification by default
- verify = False
-
- def _get_response_object(self):
- pool_key = HTTPRequestsResponsePool.key(self.url)
- try:
- respool = self._pools[pool_key]
- except KeyError:
- self._pools[pool_key] = HTTPRequestsResponsePool(pool_key)
- respool = self._pools[pool_key]
- return respool.pool_get()
-
- def perform_request(self, method=None, url=None, params=None, headers=None, data=None):
- """perform a request
- Example: method='PUT' url='https://my.server:8080/path/to/service'
- params={'update':None, 'format':'json'} headers={'X-Auth-Token':'s0m3t0k3n=='}
- data='The data body to put to server'
- @return an HTTPResponse which is also stored as self.response
- """
- if method is not None:
- self.method = method
- if url is not None:
- self.url = url
- if params is not None:
- self.params = params
- if headers is not None:
- self.headers = headers
- http_headers = {}
- for k,v in self.headers.items():
- http_headers[str(k)] = str(v)
-
- for i,(key, val) in enumerate(self.params.items()):
- param_str = ('?' if i == 0 else '&') + unicode(key)
- if val is not None:
- param_str+= '='+unicode(val)
- self.url += param_str
-
- #use pool before request, so that it will block if pool is full
- res = self._get_response_object()
- self._response_object = requests.request(str(self.method),
- str(self.url), headers=http_headers, data=data,
- verify=self.verify, prefetch = False)
- res.request = self._response_object.request
- return res
import kamaki
-
-required = ['ansicolors==1.0.2', 'progress==1.0.1', 'requests==0.12.1', 'gevent==0.13.6', 'snf-common>=0.10']
+required = ['ansicolors==1.0.2', 'progress==1.0.1', 'gevent==0.13.6', 'snf-common>=0.10']
if version_info[0:2] < (2, 7):
required.extend(['argparse'])
setup(
name='kamaki',
version=kamaki.__version__,
- description='A command-line tool for managing clouds',
+ description='A command-line tool for poking clouds',
long_description=open('README.rst').read(),
url='http://code.grnet.gr/projects/kamaki',
license='BSD',