socket verification for pooled pithos backends
[pithos] / snf-pithos-app / pithos / api / util.py
index 0da251a..f59c5a6 100644 (file)
@@ -56,15 +56,17 @@ from pithos.api.faults import (Fault, NotModified, BadRequest, Unauthorized, For
                                 RangeNotSatisfiable, InternalServerError, NotImplemented)
 from pithos.api.short_url import encode_url
 from pithos.api.settings import (BACKEND_DB_MODULE, BACKEND_DB_CONNECTION,
-                                    BACKEND_BLOCK_MODULE, BACKEND_BLOCK_PATH,
-                                    BACKEND_BLOCK_UMASK,
-                                    BACKEND_QUEUE_MODULE, BACKEND_QUEUE_CONNECTION,
-                                    BACKEND_QUOTA, BACKEND_VERSIONING,
-                                    AUTHENTICATION_URL, AUTHENTICATION_USERS,
-                                    SERVICE_TOKEN, COOKIE_NAME)
+                                 BACKEND_BLOCK_MODULE, BACKEND_BLOCK_PATH,
+                                 BACKEND_BLOCK_UMASK,
+                                 BACKEND_QUEUE_MODULE, BACKEND_QUEUE_HOSTS,
+                                 BACKEND_QUEUE_EXCHANGE,
+                                 PITHOS_QUOTAHOLDER_URL,
+                                 BACKEND_QUOTA, BACKEND_VERSIONING,
+                                 AUTHENTICATION_URL, AUTHENTICATION_USERS,
+                                 SERVICE_TOKEN, COOKIE_NAME)
 
 from pithos.backends import connect_backend
-from pithos.backends.base import NotAllowedError, QuotaError
+from pithos.backends.base import NotAllowedError, QuotaError, ItemNotExists, VersionNotExists
 
 import logging
 import re
@@ -320,7 +322,7 @@ def split_container_object_string(s):
         raise ValueError
     return s[:pos], s[(pos + 1):]
 
-def copy_or_move_object(request, src_account, src_container, src_name, dest_account, dest_container, dest_name, move=False):
+def copy_or_move_object(request, src_account, src_container, src_name, dest_account, dest_container, dest_name, move=False, delimiter=None):
     """Copy or move an object."""
     
     if 'ignore_content_type' in request.GET and 'CONTENT_TYPE' in request.META:
@@ -331,14 +333,14 @@ def copy_or_move_object(request, src_account, src_container, src_name, dest_acco
         if move:
             version_id = request.backend.move_object(request.user_uniq, src_account, src_container, src_name,
                                                         dest_account, dest_container, dest_name,
-                                                        content_type, 'pithos', meta, False, permissions)
+                                                        content_type, 'pithos', meta, False, permissions, delimiter)
         else:
             version_id = request.backend.copy_object(request.user_uniq, src_account, src_container, src_name,
                                                         dest_account, dest_container, dest_name,
-                                                        content_type, 'pithos', meta, False, permissions, src_version)
+                                                        content_type, 'pithos', meta, False, permissions, src_version, delimiter)
     except NotAllowedError:
         raise Forbidden('Not allowed')
-    except (NameError, IndexError):
+    except (ItemNotExists, VersionNotExists):
         raise ItemNotFound('Container or object does not exist')
     except ValueError:
         raise BadRequest('Invalid sharing header')
@@ -349,7 +351,7 @@ def copy_or_move_object(request, src_account, src_container, src_name, dest_acco
             request.backend.update_object_public(request.user_uniq, dest_account, dest_container, dest_name, public)
         except NotAllowedError:
             raise Forbidden('Not allowed')
-        except NameError:
+        except ItemNotExists:
             raise ItemNotFound('Object does not exist')
     return version_id
 
@@ -660,7 +662,7 @@ class ObjectWrapper(object):
                 self.block_hash = self.hashmaps[self.file_index][self.block_index]
                 try:
                     self.block = self.backend.get_block(self.block_hash)
-                except NameError:
+                except ItemNotExists:
                     raise ItemNotFound('Block does not exist')
             
             # Get the data from the block.
@@ -788,7 +790,8 @@ def simple_list_response(request, l):
     if request.serialization == 'json':
         return json.dumps(l)
 
-def get_backend():
+
+def _get_backend():
     backend = connect_backend(db_module=BACKEND_DB_MODULE,
                               db_connection=BACKEND_DB_CONNECTION,
                               block_module=BACKEND_BLOCK_MODULE,
@@ -800,6 +803,76 @@ def get_backend():
     backend.default_policy['versioning'] = BACKEND_VERSIONING
     return backend
 
+
+def _pooled_backend_close(backend):
+    backend._pool.pool_put(backend)
+
+
+from synnefo.lib.pool import ObjectPool
+from new import instancemethod
+from select import select
+from traceback import print_exc
+
+USAGE_LIMIT = 500
+POOL_SIZE = 5
+
+class PithosBackendPool(ObjectPool):
+    def _pool_create(self):
+        backend = _get_backend()
+        backend._real_close = backend.close
+        backend.close = instancemethod(_pooled_backend_close, backend,
+                                       type(backend))
+        backend._pool = self
+        backend._use_count = USAGE_LIMIT
+        return backend
+
+    def _pool_verify(self, backend):
+        wrapper = backend.wrapper
+        conn = wrapper.conn
+        if conn.closed:
+            return False
+
+        if conn.in_transaction():
+            conn.close()
+            return False
+
+        try:
+            fd = conn.connection.connection.fileno()
+            r, w, x = select([fd], (), (), 0)
+            if r:
+                conn.close()
+                return False
+        except:
+            print_exc()
+            return False
+
+        return True
+
+    def _pool_cleanup(self, backend):
+        c = backend._use_count - 1
+        if c < 0:
+            backend._real_close()
+            return True
+
+        backend._use_count = c
+        wrapper = backend.wrapper
+        if wrapper.trans is not None:
+            conn = wrapper.conn
+            if conn.closed:
+                wrapper.trans = None
+            else:
+                wrapper.rollback()
+        if backend.messages:
+            backend.messages = []
+        return False
+
+_pithos_backend_pool = PithosBackendPool(size=POOL_SIZE)
+
+
+def get_backend():
+    return _pithos_backend_pool.pool_get()
+
+
 def update_request_headers(request):
     # Handle URL-encoded keys and values.
     meta = dict([(k, v) for k, v in request.META.iteritems() if k.startswith('HTTP_')])
@@ -870,6 +943,7 @@ def request_serialization(request, format_allowed=False):
     
     return 'text'
 
+
 def api_method(http_method=None, format_allowed=False, user_required=True):
     """Decorator function for views that implement an API method."""