public=getattr(self.args, 'public'))
except ClientError as err:
raiseCLIError(err)
+ print 'Upload completed'
@command()
class store_download(_store_container_command):
--- /dev/null
+# Copyright 2012 GRNET S.A. All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or
+# without modification, are permitted provided that the following
+# conditions are met:
+#
+# 1. Redistributions of source code must retain the above
+# copyright notice, self.list of conditions and the following
+# disclaimer.
+#
+# 2. Redistributions in binary form must reproduce the above
+# copyright notice, self.list of conditions and the following
+# disclaimer in the documentation and/or other materials
+# provided with the distribution.
+#
+# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
+# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
+# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
+# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
+# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
+# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
+# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
+# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+# POSSIBILITY OF SUCH DAMAGE.
+#
+# The views and conclusions contained in the software and
+# documentation are those of the authors and should not be
+# interpreted as representing official policies, either expressed
+# or implied, of GRNET S.A.
+
+from urlparse import urlparse
+from .pool.http import get_http_connection
+from . import HTTPConnection, HTTPResponse, HTTPConnectionError
+
+
+from time import sleep
+from httplib import ResponseNotReady
+
+class KamakiHTTPResponse(HTTPResponse):
+
+ def _get_response(self):
+ print('KamakiHTTPResponse:should I get response?')
+ if self.prefetched:
+ print('\tKamakiHTTPResponse: no, I have already done that before')
+ return
+ print('\tKamakiHTTPResponse: yes, pls')
+ r = self.request.getresponse()
+ self.prefetched = True
+ headers = {}
+ for k,v in r.getheaders():
+ headers.update({k:v})
+ self.headers = headers
+ self.content = r.read(r.length)
+ self.status_code = r.status
+ self.status = r.reason
+ print('KamakiHTTPResponse: Niiiiice')
+
+ @property
+ def text(self):
+ _get_response()
+ return self._content
+ @text.setter
+ def test(self, v):
+ pass
+
+ @property
+ def json(self):
+ _get_response()
+ from json import loads
+ try:
+ return loads(self._content)
+ except ValueError as err:
+ HTTPConnectionError('Response not formated in JSON', details=unicode(err), status=702)
+ @json.setter
+ def json(self, v):
+ pass
+
+class KamakiHTTPConnection(HTTPConnection):
+
+ url = None
+ scheme = None
+ netloc = None
+ method = None
+ data = None
+ headers = None
+
+ scheme_ports = {
+ 'http': '80',
+ 'https': '443',
+ }
+
+ def _load_connection_settings(self, url=None, scheme=None, params=None, headers=None, host=None,
+ port=None, method=None):
+ if params is not None:
+ self.params = params
+ if headers is not None:
+ self.headers = headers
+
+ if url is None:
+ url = self.url
+ if host is None or scheme is None:
+ p = urlparse(url)
+ netloc = p.netloc
+ if not netloc:
+ netloc = 'localhost'
+ scheme = p.scheme
+ if not scheme:
+ scheme = 'http'
+ param_str = ''
+ for i,(key, val) in enumerate(self.params.items()):
+ param_str = ('?' if i == 0 else '&') + unicode(key)
+ if val is not None:
+ param_str+= '='+unicode(val)
+ url = p.path + param_str
+ else:
+ host = host
+ port = port if port is not None else self.scheme_ports[scheme]
+ #NOTE: we force host:port as canonical form,
+ # lest we have a cache miss 'host' vs 'host:80'
+ netloc = "%s%s" % (host, port)
+
+ self.netloc = netloc
+ self.url = url #if url in (None, '') or url[0] != '/' else url[1:]
+ self.scheme = scheme
+
+ if method is not None:
+ self.method = method
+
+ def perform_request(self, url=None, params=None, headers=None, method=None, host=None,
+ port=None, data=None):
+ self._load_connection_settings(url=url, params=params, headers=headers, host=host,
+ port=port, method=method)
+ print('---> %s %s %s %s %s'%(self.method, self.scheme, self.netloc, self.url, self.headers))
+ conn = get_http_connection(netloc=self.netloc, scheme=self.scheme)
+ try:
+ conn.request(self.method, self.url, headers=self.headers, body=data)
+ except:
+ conn.close()
+ raise
+ return KamakiHTTPResponse(conn)
--- /dev/null
+# Copyright 2011-2012 GRNET S.A. All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or
+# without modification, are permitted provided that the following
+# conditions are met:
+#
+# 1. Redistributions of source code must retain the above
+# copyright notice, this list of conditions and the following
+# disclaimer.
+#
+# 2. Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following
+# disclaimer in the documentation and/or other materials
+# provided with the distribution.
+#
+# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
+# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
+# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
+# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
+# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
+# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
+# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
+# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+# POSSIBILITY OF SUCH DAMAGE.
+#
+# The views and conclusions contained in the software and
+# documentation are those of the authors and should not be
+# interpreted as representing official policies, either expressed
+# or implied, of GRNET S.A.
+
+
+"""Classes to support pools of arbitrary objects.
+
+The :class:`ObjectPool` class in this module abstracts a pool
+of arbitrary objects. Subclasses need to define the details regarding
+creation, destruction, allocation and release of their specific objects.
+
+"""
+
+# This should work under gevent, because gevent monkey patches 'threading'
+# if not, we can detect if running under gevent, e.g. using
+# if 'gevent' in sys.modules:
+# from gevent.coros import Semaphore
+# else:
+# from threading import Semaphore
+from threading import Semaphore, Lock
+
+
+__all__ = ['ObjectPool', 'ObjectPoolError', 'PoolEmptyError']
+
+
+class ObjectPoolError(Exception):
+ pass
+
+
+class PoolEmptyError(ObjectPoolError):
+ pass
+
+
+class ObjectPool(object):
+ def __init__(self, size=None):
+ try:
+ self.size = int(size)
+ assert size >= 1
+ except:
+ raise ValueError("Invalid size for pool (positive integer "
+ "required): %r" % (size,))
+
+ self._semaphore = Semaphore(size) # Pool grows up to size
+ self._mutex = Lock() # Protect shared _set oject
+ self._set = set()
+
+ def pool_get(self, blocking=True, timeout=None, create=True):
+ """Get an object from the pool.
+
+ Get an object from the pool. By default (create=True), create a new
+ object if the pool has not reached its maximum size yet. If
+ create == False, the caller is responsible for creating the object and
+ put()ting it back into the pool when done.
+
+ """
+ # timeout argument only supported by gevent and py3k variants
+ # of Semaphore. acquire() will raise TypeError if timeout
+ # is specified but not supported by the underlying implementation.
+ kw = {"blocking": blocking}
+ if timeout is not None:
+ kw["timeout"] = timeout
+ r = self._semaphore.acquire(**kw)
+ if not r:
+ raise PoolEmptyError()
+ with self._mutex:
+ try:
+ try:
+ obj = self._set.pop()
+ except KeyError:
+ obj = self._pool_create() if create else None
+ except:
+ self._semaphore.release()
+ raise
+ # We keep _semaphore locked, put() will release it
+ return obj
+
+ def pool_put(self, obj):
+ """Put an object back into the pool.
+
+ Return an object to the pool, for subsequent retrieval
+ by pool_get() calls. If _pool_cleanup() returns True,
+ the object has died and is not put back into self._set.
+
+ """
+ with self._mutex:
+ if not self._pool_cleanup(obj):
+ self._set.add(obj)
+ self._semaphore.release()
+
+ def _pool_create(self):
+ """Create a new object to be used with this pool.
+
+ Create a new object to be used with this pool,
+ should be overriden in subclasses.
+
+ """
+ raise NotImplementedError
+
+ def _pool_cleanup(self, obj):
+ """Cleanup an object before being put back into the pool.
+
+ Cleanup an object before it can be put back into the pull,
+ ensure it is in a stable, reusable state.
+
+ """
+ raise NotImplementedError
--- /dev/null
+
+from kamaki.clients.connection.pool import ObjectPool
+
+from httplib import (
+ HTTPConnection as http_class,
+ HTTPSConnection as https_class,
+ HTTPException,
+ ResponseNotReady
+)
+
+from urlparse import urlparse
+from new import instancemethod
+
+
+_pools = {}
+pool_size = 8
+
+
+USAGE_LIMIT = 25
+RETRY_LIMIT = 100
+
+
+def init_http_pooling(size):
+ global pool_size
+ pool_size = size
+
+
+def put_http_connection(conn):
+ pool = conn._pool
+ if pool is None:
+ return
+ conn._pool = None
+ pool.pool_put(conn)
+
+
+class HTTPConnectionPool(ObjectPool):
+
+ _scheme_to_class = {
+ 'http' : http_class,
+ 'https' : https_class,
+ }
+
+ def __init__(self, scheme, netloc, size=None):
+ ObjectPool.__init__(self, size=size)
+
+ connection_class = self._scheme_to_class.get(scheme, None)
+ if connection_class is None:
+ m = 'Unsupported scheme: %s' % (scheme,)
+ raise ValueError(m)
+
+ self.connection_class = connection_class
+ self.scheme = scheme
+ self.netloc = netloc
+
+ def _pool_create(self):
+ conn = self.connection_class(self.netloc)
+ conn._use_counter = USAGE_LIMIT
+ conn._pool = self
+ conn._real_close = conn.close
+ conn.close = instancemethod(put_http_connection,
+ conn, type(conn))
+ return conn
+
+ def _pool_cleanup(self, conn):
+ # every connection can be used a finite number of times
+ conn._use_counter -= 1
+
+ # see httplib source for connection states documentation
+ if conn._use_counter > 0 and conn._HTTPConnection__state == 'Idle':
+ try:
+ resp = conn.getresponse()
+ except ResponseNotReady:
+ return False
+
+ conn._real_close()
+ return True
+
+
+def _verify_connection(conn):
+ try:
+ conn.request("HEAD", "/")
+ conn.getresponse().read()
+ except HTTPException:
+ # The connection has died.
+ conn.close()
+ return False
+ return True
+
+
+def get_http_connection(netloc=None, scheme='http'):
+ if netloc is None:
+ m = "netloc cannot be None"
+ raise ValueError(m)
+ # does the pool need to be created?
+ if netloc not in _pools:
+ pool = HTTPConnectionPool(scheme, netloc, size=pool_size)
+ _pools[netloc] = pool
+
+ pool = _pools[netloc]
+
+ conn = None
+ n = 0
+ while conn is None:
+ conn = pool.pool_get()
+ conn._pool = pool
+ if not _verify_connection(conn):
+ # The connection has died, e.g., Keepalive expired
+ conn = None
+ n += 1
+ if n > RETRY_LIMIT:
+ m = ("Could not get live HTTP conn from pool"
+ "after %d retries." % RETRY_LIMIT)
+ raise RuntimeError(m)
+ return conn
+
+def main():
+ #cpool = HTTPConnectionPool('https', 'pithos.okeanos.io/v1', size=8)
+ c = get_http_connection('pithos.okeanos.io', 'https')
+
+
+if __name__ == '__main__':
+ main()
\ No newline at end of file
--- /dev/null
+#!/usr/bin/env python
+#
+# -*- coding: utf-8 -*-
+#
+# Copyright 2011 GRNET S.A. All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or
+# without modification, are permitted provided that the following
+# conditions are met:
+#
+# 1. Redistributions of source code must retain the above
+# copyright notice, this list of conditions and the following
+# disclaimer.
+#
+# 2. Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following
+# disclaimer in the documentation and/or other materials
+# provided with the distribution.
+#
+# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
+# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
+# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
+# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
+# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
+# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
+# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
+# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+# POSSIBILITY OF SUCH DAMAGE.
+#
+# The views and conclusions contained in the software and
+# documentation are those of the authors and should not be
+# interpreted as representing official policies, either expressed
+# or implied, of GRNET S.A.
+#
+#
+
+"""Unit Tests for the pool classes in synnefo.lib.pool
+
+Provides unit tests for the code implementing pool
+classes in the synnefo.lib.pool module.
+
+"""
+
+# Support running under a gevent-monkey-patched environment
+# if the "monkey" argument is specified in the command line.
+import sys
+if "monkey" in sys.argv:
+ from gevent import monkey
+ monkey.patch_all()
+ sys.argv.pop(sys.argv.index("monkey"))
+
+import sys
+import time
+import threading
+
+from . import ObjectPool, PoolEmptyError
+
+# Use backported unittest functionality if Python < 2.7
+try:
+ import unittest2 as unittest
+except ImportError:
+ if sys.version_info < (2, 7):
+ raise Exception("The unittest2 package is required for Python < 2.7")
+ import unittest
+
+
+class NumbersPool(ObjectPool):
+ max = 0
+
+ def _pool_create(self):
+ n = self.max
+ self.max += 1
+ return n
+
+ def _pool_cleanup(self, obj):
+ n = int(obj)
+ if n < 0:
+ return True
+ pass
+
+
+class ObjectPoolTestCase(unittest.TestCase):
+ def test_create_pool_requires_size(self):
+ """Test __init__() requires valid size argument"""
+ self.assertRaises(ValueError, ObjectPool)
+ self.assertRaises(ValueError, ObjectPool, size="size10")
+ self.assertRaises(ValueError, ObjectPool, size=0)
+ self.assertRaises(ValueError, ObjectPool, size=-1)
+
+ def test_create_pool(self):
+ """Test pool creation works"""
+ pool = ObjectPool(100)
+ self.assertEqual(pool.size, 100)
+
+ def test_get_not_implemented(self):
+ """Test pool_get() method not implemented in abstract class"""
+ pool = ObjectPool(100)
+ self.assertRaises(NotImplementedError, pool.pool_get)
+
+ def test_put_not_implemented(self):
+ """Test pool_put() method not implemented in abstract class"""
+ pool = ObjectPool(100)
+ self.assertRaises(NotImplementedError, pool.pool_put, None)
+
+
+class NumbersPoolTestCase(unittest.TestCase):
+ N = 1500
+ SEC = 0.5
+ maxDiff = None
+
+ def setUp(self):
+ self.numbers = NumbersPool(self.N)
+
+ def test_initially_empty(self):
+ """Test pool is empty upon creation"""
+ self.assertEqual(self.numbers._set, set([]))
+
+ def test_seq_allocate_all(self):
+ """Test allocation and deallocation of all pool objects"""
+ n = []
+ for _ in xrange(0, self.N):
+ n.append(self.numbers.pool_get())
+ self.assertEqual(n, range(0, self.N))
+ for i in n:
+ self.numbers.pool_put(i)
+ self.assertEqual(self.numbers._set, set(n))
+
+ def test_parallel_allocate_all(self):
+ """Allocate all pool objects in parallel"""
+ def allocate_one(pool, results, index):
+ n = pool.pool_get()
+ results[index] = n
+
+ results = [None] * self.N
+ threads = [threading.Thread(target=allocate_one,
+ args=(self.numbers, results, i,))
+ for i in xrange(0, self.N)]
+
+ for t in threads:
+ t.start()
+ for t in threads:
+ t.join()
+
+ # This nonblocking pool_get() should fail
+ self.assertRaises(PoolEmptyError, self.numbers.pool_get,
+ blocking=False)
+ self.assertEqual(sorted(results), range(0, self.N))
+
+ def test_allocate_no_create(self):
+ """Allocate objects from the pool without creating them"""
+ for i in xrange(0, self.N):
+ self.assertIsNone(self.numbers.pool_get(create=False))
+
+ # This nonblocking pool_get() should fail
+ self.assertRaises(PoolEmptyError, self.numbers.pool_get,
+ blocking=False)
+
+ def test_pool_cleanup_returns_failure(self):
+ """Put a broken object, test a new one is retrieved eventually"""
+ n = []
+ for _ in xrange(0, self.N):
+ n.append(self.numbers.pool_get())
+ self.assertEqual(n, range(0, self.N))
+
+ del n[-1:]
+ self.numbers.pool_put(-1) # This is a broken object
+ self.assertFalse(self.numbers._set)
+ self.assertEqual(self.numbers.pool_get(), self.N)
+
+ def test_parallel_get_blocks(self):
+ """Test threads block if no object left in the pool"""
+ def allocate_one_and_sleep(pool, sec, result, index):
+ n = pool.pool_get()
+ time.sleep(sec)
+ result[index] = n
+ pool.pool_put(n)
+
+ results = [None] * (2 * self.N + 1)
+ threads = [threading.Thread(target=allocate_one_and_sleep,
+ args=(self.numbers, self.SEC, results, i,))
+ for i in xrange(0, 2 * self.N + 1)]
+
+ # This should take 3 * SEC seconds
+ start = time.time()
+ for t in threads:
+ t.start()
+ for t in threads:
+ t.join()
+ diff = time.time() - start
+ self.assertTrue(diff > 3 * self.SEC)
+ self.assertLess((diff - 3 * self.SEC) / 3 * self.SEC, .5)
+
+ # One number must have been used three times,
+ # all others must have been used once
+ freq = {}
+ for r in results:
+ freq[r] = freq.get(r, 0) + 1
+ self.assertTrue(len([r for r in results if freq[r] == 2]), self. N)
+ triples = [r for r in freq if freq[r] == 3]
+ self.assertTrue(len(triples), 1)
+ self.assertEqual(sorted(results),
+ sorted(2 * range(0, self.N) + triples))
+
+
+if __name__ == '__main__':
+ unittest.main()
param_str+= '='+unicode(val)
self.url += param_str
+ #use pool before request, so that it will block if pool is full
+ res = self._get_response_object()
self._response_object = requests.request(self.method, self.url, headers=self.headers, data=data,
verify=self.verify, prefetch = False)
- res = self._get_response_object()
res.request = self._response_object.request
return res