cleanup pithos backend pools, new pool api support
[pithos] / snf-pithos-app / pithos / api / util.py
index 11642a7..f8233f9 100644 (file)
@@ -49,6 +49,7 @@ from django.core.files.uploadhandler import FileUploadHandler
 from django.core.files.uploadedfile import UploadedFile
 
 from synnefo.lib.parsedate import parse_http_date_safe, parse_http_date
+from synnefo.lib.astakos import get_user
 
 from pithos.api.faults import (Fault, NotModified, BadRequest, Unauthorized, Forbidden, ItemNotFound,
                                 Conflict, LengthRequired, PreconditionFailed, RequestEntityTooLarge,
@@ -56,10 +57,14 @@ from pithos.api.faults import (Fault, NotModified, BadRequest, Unauthorized, For
 from pithos.api.short_url import encode_url
 from pithos.api.settings import (BACKEND_DB_MODULE, BACKEND_DB_CONNECTION,
                                     BACKEND_BLOCK_MODULE, BACKEND_BLOCK_PATH,
+                                    BACKEND_BLOCK_UMASK,
                                     BACKEND_QUEUE_MODULE, BACKEND_QUEUE_CONNECTION,
-                                    BACKEND_QUOTA, BACKEND_VERSIONING)
+                                    BACKEND_QUOTA, BACKEND_VERSIONING,
+                                    AUTHENTICATION_URL, AUTHENTICATION_USERS,
+                                    SERVICE_TOKEN, COOKIE_NAME)
+
 from pithos.backends import connect_backend
-from pithos.backends.base import NotAllowedError, QuotaError
+from pithos.backends.base import NotAllowedError, QuotaError, ItemNotExists, VersionNotExists
 
 import logging
 import re
@@ -315,7 +320,7 @@ def split_container_object_string(s):
         raise ValueError
     return s[:pos], s[(pos + 1):]
 
-def copy_or_move_object(request, src_account, src_container, src_name, dest_account, dest_container, dest_name, move=False):
+def copy_or_move_object(request, src_account, src_container, src_name, dest_account, dest_container, dest_name, move=False, delimiter=None):
     """Copy or move an object."""
     
     if 'ignore_content_type' in request.GET and 'CONTENT_TYPE' in request.META:
@@ -326,14 +331,14 @@ def copy_or_move_object(request, src_account, src_container, src_name, dest_acco
         if move:
             version_id = request.backend.move_object(request.user_uniq, src_account, src_container, src_name,
                                                         dest_account, dest_container, dest_name,
-                                                        content_type, 'pithos', meta, False, permissions)
+                                                        content_type, 'pithos', meta, False, permissions, delimiter)
         else:
             version_id = request.backend.copy_object(request.user_uniq, src_account, src_container, src_name,
                                                         dest_account, dest_container, dest_name,
-                                                        content_type, 'pithos', meta, False, permissions, src_version)
+                                                        content_type, 'pithos', meta, False, permissions, src_version, delimiter)
     except NotAllowedError:
         raise Forbidden('Not allowed')
-    except (NameError, IndexError):
+    except (ItemNotExists, VersionNotExists):
         raise ItemNotFound('Container or object does not exist')
     except ValueError:
         raise BadRequest('Invalid sharing header')
@@ -344,7 +349,7 @@ def copy_or_move_object(request, src_account, src_container, src_name, dest_acco
             request.backend.update_object_public(request.user_uniq, dest_account, dest_container, dest_name, public)
         except NotAllowedError:
             raise Forbidden('Not allowed')
-        except NameError:
+        except ItemNotExists:
             raise ItemNotFound('Object does not exist')
     return version_id
 
@@ -655,13 +660,14 @@ class ObjectWrapper(object):
                 self.block_hash = self.hashmaps[self.file_index][self.block_index]
                 try:
                     self.block = self.backend.get_block(self.block_hash)
-                except NameError:
+                except ItemNotExists:
                     raise ItemNotFound('Block does not exist')
             
             # Get the data from the block.
             bo = self.offset % self.backend.block_size
             bs = self.backend.block_size
-            if self.block_index == len(self.hashmaps[self.file_index]) - 1:
+            if (self.block_index == len(self.hashmaps[self.file_index]) - 1 and
+                self.sizes[self.file_index] % self.backend.block_size):
                 bs = self.sizes[self.file_index] % self.backend.block_size
             bl = min(self.length, bs - bo)
             data = self.block[bo:bo + bl]
@@ -761,14 +767,14 @@ def put_object_block(request, hashmap, data, offset):
         hashmap.append(request.backend.put_block(('\x00' * bo) + data[:bl]))
     return bl # Return ammount of data written.
 
-def hashmap_md5(request, hashmap, size):
+def hashmap_md5(backend, hashmap, size):
     """Produce the MD5 sum from the data in the hashmap."""
     
     # TODO: Search backend for the MD5 of another object with the same hashmap and size...
     md5 = hashlib.md5()
-    bs = request.backend.block_size
+    bs = backend.block_size
     for bi, hash in enumerate(hashmap):
-        data = request.backend.get_block(hash) # Blocks come in padded.
+        data = backend.get_block(hash) # Blocks come in padded.
         if bi == len(hashmap) - 1:
             data = data[:size % bs]
         md5.update(data)
@@ -782,17 +788,63 @@ def simple_list_response(request, l):
     if request.serialization == 'json':
         return json.dumps(l)
 
-def get_backend():
+
+def _get_backend():
     backend = connect_backend(db_module=BACKEND_DB_MODULE,
                               db_connection=BACKEND_DB_CONNECTION,
                               block_module=BACKEND_BLOCK_MODULE,
                               block_path=BACKEND_BLOCK_PATH,
+                              block_umask=BACKEND_BLOCK_UMASK,
                               queue_module=BACKEND_QUEUE_MODULE,
                               queue_connection=BACKEND_QUEUE_CONNECTION)
     backend.default_policy['quota'] = BACKEND_QUOTA
     backend.default_policy['versioning'] = BACKEND_VERSIONING
     return backend
 
+
+def _pooled_backend_close(backend):
+    backend._pool.pool_put(backend)
+
+
+from synnefo.lib.pool import ObjectPool
+from new import instancemethod
+
+USAGE_LIMIT = 500
+POOL_SIZE = 5
+
+class PithosBackendPool(ObjectPool):
+    def _pool_create(self):
+        backend = _get_backend()
+        backend._real_close = backend.close
+        backend.close = instancemethod(_pooled_backend_close, backend,
+                                       type(backend))
+        backend._pool = self
+        backend._use_count = USAGE_LIMIT
+        return backend
+
+    def _pool_verify(self, backend):
+        return 1
+
+    def _pool_cleanup(self, backend):
+        c = backend._use_count - 1
+        if c < 0:
+            backend._real_close()
+            return True
+
+        backend._use_count = c
+        if backend.trans is not None:
+            backend.wrapper.rollback()
+        if backend.messages:
+            backend.messages = []
+        return False
+
+_pithos_backend_pool = PithosBackendPool(size=POOL_SIZE)
+
+
+def get_backend():
+    return _pithos_backend_pool.pool_get()
+
+
 def update_request_headers(request):
     # Handle URL-encoded keys and values.
     meta = dict([(k, v) for k, v in request.META.iteritems() if k.startswith('HTTP_')])
@@ -863,6 +915,7 @@ def request_serialization(request, format_allowed=False):
     
     return 'text'
 
+
 def api_method(http_method=None, format_allowed=False, user_required=True):
     """Decorator function for views that implement an API method."""
     
@@ -872,8 +925,16 @@ def api_method(http_method=None, format_allowed=False, user_required=True):
             try:
                 if http_method and request.method != http_method:
                     raise BadRequest('Method not allowed.')
-                if user_required and getattr(request, 'user', None) is None:
-                    raise Unauthorized('Access denied')
+                
+                if user_required:
+                    token = None
+                    if request.method in ('HEAD', 'GET') and COOKIE_NAME in request.COOKIES:
+                        cookie_value = unquote(request.COOKIES.get(COOKIE_NAME, ''))
+                        if cookie_value and '|' in cookie_value:
+                            token = cookie_value.split('|', 1)[1]
+                    get_user(request, AUTHENTICATION_URL, AUTHENTICATION_USERS, token)
+                    if  getattr(request, 'user', None) is None:
+                        raise Unauthorized('Access denied')
                 
                 # The args variable may contain up to (account, container, object).
                 if len(args) > 1 and len(args[1]) > 256:
@@ -895,7 +956,7 @@ def api_method(http_method=None, format_allowed=False, user_required=True):
                 return render_fault(request, fault)
             except BaseException, e:
                 logger.exception('Unexpected error: %s' % e)
-                fault = InternalServerError('Unexpected error')
+                fault = InternalServerError('Unexpected error: %s' % e)
                 return render_fault(request, fault)
             finally:
                 if getattr(request, 'backend', None) is not None: