Progess I
authorSofia Papagiannaki <papagian@gmail.com>
Mon, 25 Jun 2012 16:50:25 +0000 (19:50 +0300)
committerSofia Papagiannaki <papagian@gmail.com>
Mon, 25 Jun 2012 16:50:25 +0000 (19:50 +0300)
Refs: #2611

snf-pithos-app/pithos/api/functions.py
snf-pithos-app/pithos/api/util.py
snf-pithos-backend/pithos/backends/base.py
snf-pithos-backend/pithos/backends/lib/sqlite/node.py
snf-pithos-backend/pithos/backends/lib/sqlite/permissions.py
snf-pithos-backend/pithos/backends/lib/sqlite/public.py
snf-pithos-backend/pithos/backends/lib/sqlite/xfeatures.py
snf-pithos-backend/pithos/backends/modular.py

index 9909cd2..65e56d1 100644 (file)
@@ -788,6 +788,7 @@ def object_write(request, v_account, v_container, v_object):
     copy_from = request.META.get('HTTP_X_COPY_FROM')
     move_from = request.META.get('HTTP_X_MOVE_FROM')
     if copy_from or move_from:
+        delimiter = request.GET.get('delimiter')
         content_length = get_content_length(request) # Required by the API.
         
         src_account = request.META.get('HTTP_X_SOURCE_ACCOUNT')
@@ -799,14 +800,14 @@ def object_write(request, v_account, v_container, v_object):
             except ValueError:
                 raise BadRequest('Invalid X-Move-From header')
             version_id = copy_or_move_object(request, src_account, src_container, src_name,
-                                                v_account, v_container, v_object, move=True)
+                                                v_account, v_container, v_object, move=True, delimiter=delimiter)
         else:
             try:
                 src_container, src_name = split_container_object_string(copy_from)
             except ValueError:
                 raise BadRequest('Invalid X-Copy-From header')
             version_id = copy_or_move_object(request, src_account, src_container, src_name,
-                                                v_account, v_container, v_object, move=False)
+                                                v_account, v_container, v_object, move=False, delimiter=delimiter)
         response = HttpResponse(status=201)
         response['X-Object-Version'] = version_id
         return response
@@ -967,8 +968,10 @@ def object_copy(request, v_account, v_container, v_object):
             raise ItemNotFound('Container or object does not exist')
         validate_matching_preconditions(request, meta)
     
+    delimiter = request.GET.get('delimiter')
+    
     version_id = copy_or_move_object(request, v_account, v_container, v_object,
-                                        dest_account, dest_container, dest_name, move=False)
+                                        dest_account, dest_container, dest_name, move=False, delimiter=delimiter)
     response = HttpResponse(status=201)
     response['X-Object-Version'] = version_id
     return response
@@ -1003,8 +1006,10 @@ def object_move(request, v_account, v_container, v_object):
             raise ItemNotFound('Container or object does not exist')
         validate_matching_preconditions(request, meta)
     
+    delimiter = request.GET.get('delimiter')
+    
     version_id = copy_or_move_object(request, v_account, v_container, v_object,
-                                        dest_account, dest_container, dest_name, move=True)
+                                        dest_account, dest_container, dest_name, move=True, delimiter=delimiter)
     response = HttpResponse(status=201)
     response['X-Object-Version'] = version_id
     return response
@@ -1221,9 +1226,11 @@ def object_delete(request, v_account, v_container, v_object):
     #                       badRequest (400)
     
     until = get_int_parameter(request.GET.get('until'))
+    delimiter = request.GET.get('delimiter')
+    
     try:
         request.backend.delete_object(request.user_uniq, v_account, v_container,
-                                        v_object, until)
+                                        v_object, until, delimiter=delimiter)
     except NotAllowedError:
         raise Forbidden('Not allowed')
     except NameError:
index 0da251a..379a8cc 100644 (file)
@@ -320,7 +320,7 @@ def split_container_object_string(s):
         raise ValueError
     return s[:pos], s[(pos + 1):]
 
-def copy_or_move_object(request, src_account, src_container, src_name, dest_account, dest_container, dest_name, move=False):
+def copy_or_move_object(request, src_account, src_container, src_name, dest_account, dest_container, dest_name, move=False, delimiter=None):
     """Copy or move an object."""
     
     if 'ignore_content_type' in request.GET and 'CONTENT_TYPE' in request.META:
@@ -331,11 +331,11 @@ def copy_or_move_object(request, src_account, src_container, src_name, dest_acco
         if move:
             version_id = request.backend.move_object(request.user_uniq, src_account, src_container, src_name,
                                                         dest_account, dest_container, dest_name,
-                                                        content_type, 'pithos', meta, False, permissions)
+                                                        content_type, 'pithos', meta, False, permissions, delimiter)
         else:
             version_id = request.backend.copy_object(request.user_uniq, src_account, src_container, src_name,
                                                         dest_account, dest_container, dest_name,
-                                                        content_type, 'pithos', meta, False, permissions, src_version)
+                                                        content_type, 'pithos', meta, False, permissions, src_version, delimiter)
     except NotAllowedError:
         raise Forbidden('Not allowed')
     except (NameError, IndexError):
index 3cf2251..0d4e198 100644 (file)
@@ -169,7 +169,7 @@ class BaseBackend(object):
         """
         return
     
-    def list_containers(self, user, account, marker=None, limit=10000, shared=False, until=None):
+    def list_containers(self, user, account, marker=None, limit=10000, shared=False, until=None, public=False):
         """Return a list of container names existing under an account.
         
         Parameters:
@@ -179,6 +179,8 @@ class BaseBackend(object):
             
             'shared': Only list containers with permissions set
             
+            'public': Only list containers containing public objects
+            
         
         Raises:
             NotAllowedError: Operation not permitted
@@ -284,7 +286,7 @@ class BaseBackend(object):
         """
         return
     
-    def list_objects(self, user, account, container, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, domain=None, keys=[], shared=False, until=None, size_range=None):
+    def list_objects(self, user, account, container, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, domain=None, keys=[], shared=False, until=None, size_range=None, public=False):
         """Return a list of object (name, version_id) tuples existing under a container.
         
         Parameters:
@@ -312,6 +314,9 @@ class BaseBackend(object):
              
             'size_range': Include objects with byte size in (from, to).
                           Use None to specify unlimited
+            
+            'public': Only list public objects
+             
         
         Raises:
             NotAllowedError: Operation not permitted
@@ -489,7 +494,7 @@ class BaseBackend(object):
         """Update an object's checksum."""
         return
     
-    def copy_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta={}, replace_meta=False, permissions=None, src_version=None):
+    def copy_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta={}, replace_meta=False, permissions=None, src_version=None, delimiter=None):
         """Copy an object's data and metadata and return the new version.
         
         Parameters:
@@ -502,6 +507,8 @@ class BaseBackend(object):
             'permissions': New object permissions
             
             'src_version': Copy from the version provided
+            
+            'delimiter': Copy objects whose path starts with src_name + delimiter
         
         Raises:
             NotAllowedError: Operation not permitted
@@ -516,7 +523,7 @@ class BaseBackend(object):
         """
         return ''
     
-    def move_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta={}, replace_meta=False, permissions=None):
+    def move_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta={}, replace_meta=False, permissions=None, delimiter=None):
         """Move an object's data and metadata and return the new version.
         
         Parameters:
@@ -527,6 +534,8 @@ class BaseBackend(object):
             'replace_meta': Replace metadata instead of update
             
             'permissions': New object permissions
+            
+            'delimiter': Move objects whose path starts with src_name + delimiter
         
         Raises:
             NotAllowedError: Operation not permitted
@@ -539,9 +548,12 @@ class BaseBackend(object):
         """
         return ''
     
-    def delete_object(self, user, account, container, name, until=None):
+    def delete_object(self, user, account, container, name, until=None, delimiter=None):
         """Delete/purge an object.
         
+        Parameters:
+            'delimiter': Delete objects whose path starting with name + delimiter
+        
         Raises:
             NotAllowedError: Operation not permitted
             
index ab79c52..784296f 100644 (file)
@@ -201,6 +201,16 @@ class Node(DBWorker):
             return r[0]
         return None
     
+    def node_lookup_bulk(self, paths):
+       """Lookup the current nodes for the given paths.
+           Return () if the path is not found.
+        """
+        
+        placeholders = ','.join('?' for path in paths)
+        q = "select path, node from nodes where path in (%s)" % placeholders
+        self.execute(q, paths)
+        return self.fetchall()
+    
     def node_get_properties(self, node):
         """Return the node's (parent, path).
            Return None if the node is not found.
@@ -500,6 +510,24 @@ class Node(DBWorker):
         if props is not None:
             return props
         return None
+
+    def version_lookup_bulk(self, nodes, before=inf, cluster=0):
+        """Lookup the current versions of the given nodes.
+           Return a list with their properties:
+           (serial, node, hash, size, type, source, mtime, muser, uuid, checksum, cluster).
+        """
+        
+        placeholders = ','.join('?' for node in nodes)
+        q = ("select serial, node, hash, size, type, source, mtime, muser, uuid, checksum, cluster "
+             "from versions "
+             "where serial in (select max(serial) "
+                             "from versions "
+                             "where node in (%s) and mtime < ? group by node) "
+             "and cluster = ?" % placeholders)
+        args = nodes
+        args.extend((before, cluster))
+        self.execute(q, args)
+        return self.fetchall()
     
     def version_get_properties(self, serial, keys=(), propnames=_propnames):
         """Return a sequence of values for the properties of
@@ -656,7 +684,6 @@ class Node(DBWorker):
         
         subqlist = []
         args = []
-        print pathq
         for path, match in pathq:
             if match == MATCH_PREFIX:
                 subqlist.append("n.path like ? escape '\\'")
index b29422d..8faed87 100644 (file)
@@ -112,6 +112,12 @@ class Permissions(XFeatures, Groups, Public):
         self.xfeature_destroy(path)
         self.public_unset(path)
     
+    def access_clear_bulk(self, paths):
+        """Revoke access to path (both permissions and public)."""
+        
+        self.xfeature_destroy_bulk(paths)
+        self.public_unset_bulk(paths)
+    
     def access_check(self, path, access, member):
         """Return true if the member has this access to the path."""
         
index 2d6e9cb..e314bfd 100644 (file)
@@ -58,6 +58,11 @@ class Public(DBWorker):
         q = "update public set active = 0 where path = ?"
         self.execute(q, (path,))
     
+    def public_unset_bulk(self, paths):
+        placeholders = ','.join('?' for path in paths)
+        q = "update public set active = 0 where path in (%s)" % placeholders
+        self.execute(q, paths)
+    
     def public_get(self, path):
         q = "select public_id from public where path = ? and active = 1"
         self.execute(q, (path,))
index 7d682d0..de948d0 100644 (file)
@@ -99,6 +99,13 @@ class XFeatures(DBWorker):
         q = "delete from xfeatures where path = ?"
         self.execute(q, (path,))
     
+    def xfeature_destroy_bulk(self, paths):
+        """Destroy features and all their key, value pairs."""
+        
+        placeholders = ','.join('?' for path in paths)
+        q = "delete from xfeatures where path in (%s)" % placeholders
+        self.execute(q, paths)
+    
     def feature_dict(self, feature):
         """Return a dict mapping keys to list of values for feature."""
         
index cfc0339..c34d1e1 100644 (file)
@@ -476,6 +476,17 @@ class ModularBackend(BaseBackend):
         allowed = self._get_formatted_paths(allowed)
         return self._list_object_properties(node, path, prefix, delimiter, marker, limit, virtual, domain, keys, until, size_range, allowed, all_props)
     
+    def _list_objects_no_limit(self, user, account, container, prefix, delimiter, virtual, domain, keys, shared, until, size_range, all_props, public):
+        objects = []
+        while True:
+            marker = objects[-1] if objects else None
+            limit = 10000
+            l = self._list_objects(user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, all_props, public)
+            objects.extend(l)
+            if not l or len(l) < limit:
+                break
+        return objects
+    
     def _list_object_permissions(self, user, account, container, prefix, shared, public):
         allowed = []
         path = '/'.join((account, container, prefix)).rstrip('/')
@@ -724,7 +735,8 @@ class ModularBackend(BaseBackend):
             if x[self.SERIAL] >= int(version) and x[self.HASH] == props[self.HASH] and x[self.SIZE] == props[self.SIZE]:
                 self.node.version_put_property(x[self.SERIAL], 'checksum', checksum)
     
-    def _copy_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, dest_domain=None, dest_meta={}, replace_meta=False, permissions=None, src_version=None, is_move=False):
+    def _copy_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, dest_domain=None, dest_meta={}, replace_meta=False, permissions=None, src_version=None, is_move=False, delimiter=None):
+        dest_version_ids = []
         self._can_read(user, src_account, src_container, src_name)
         path, node = self._lookup_object(src_account, src_container, src_name)
         # TODO: Will do another fetch of the properties in duplicate version...
@@ -732,32 +744,49 @@ class ModularBackend(BaseBackend):
         src_version_id = props[self.SERIAL]
         hash = props[self.HASH]
         size = props[self.SIZE]
-        
         is_copy = not is_move and (src_account, src_container, src_name) != (dest_account, dest_container, dest_name) # New uuid.
-        dest_version_id = self._update_object_hash(user, dest_account, dest_container, dest_name, size, type, hash, None, dest_domain, dest_meta, replace_meta, permissions, src_node=node, src_version_id=src_version_id, is_copy=is_copy)
-        return dest_version_id
+        dest_version_ids.append(self._update_object_hash(user, dest_account, dest_container, dest_name, size, type, hash, None, dest_domain, dest_meta, replace_meta, permissions, src_node=node, src_version_id=src_version_id, is_copy=is_copy))
+        if is_move:
+               self._delete_object(user, src_account, src_container, src_name)
+        
+        if delimiter:
+            prefix = src_name + delimiter if not src_name.endswith(delimiter) else src_name
+            src_names = self._list_objects_no_limit(user, src_account, src_container, prefix, delimiter=None, virtual=True, domain=None, keys=[], shared=False, until=None, size_range=None, all_props=True, public=False)
+            paths = [elem[0] for elem in src_names]
+            nodes = [elem[2] for elem in src_names]
+            # TODO: Will do another fetch of the properties in duplicate version...
+            props = self._get_versions(nodes) # Check to see if source exists.
+            
+            for prop, path, node in zip(props, paths, nodes):
+                src_version_id = prop[self.SERIAL]
+                hash = prop[self.HASH]
+                vtype = prop[self.TYPE]
+                dest_prefix = dest_name + delimiter if not dest_name.endswith(delimiter) else dest_name
+                vdest_name = path.replace(prefix, dest_prefix, 1)
+                dest_version_ids.append(self._update_object_hash(user, dest_account, dest_container, vdest_name, size, vtype, hash, None, dest_domain, dest_meta, replace_meta, permissions, src_node=node, src_version_id=src_version_id, is_copy=is_copy))
+                if is_move:
+                       self._delete_object(user, src_account, src_container, path)
+        return dest_version_ids[0] if len(dest_version_ids) == 1 else dest_version_ids
     
     @backend_method
-    def copy_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta={}, replace_meta=False, permissions=None, src_version=None):
+    def copy_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta={}, replace_meta=False, permissions=None, src_version=None, delimiter=None):
         """Copy an object's data and metadata."""
         
-        logger.debug("copy_object: %s %s %s %s %s %s %s %s %s %s %s %s %s", user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions, src_version)
-        dest_version_id = self._copy_object(user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions, src_version, False)
+        logger.debug("copy_object: %s %s %s %s %s %s %s %s %s %s %s %s %s %s", user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions, src_version, delimiter)
+        dest_version_id = self._copy_object(user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions, src_version, False, delimiter)
         return dest_version_id
     
     @backend_method
-    def move_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta={}, replace_meta=False, permissions=None):
+    def move_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta={}, replace_meta=False, permissions=None, delimiter=None):
         """Move an object's data and metadata."""
         
-        logger.debug("move_object: %s %s %s %s %s %s %s %s %s %s %s %s", user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions)
+        logger.debug("move_object: %s %s %s %s %s %s %s %s %s %s %s %s %s", user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions, delimiter)
         if user != src_account:
             raise NotAllowedError
-        dest_version_id = self._copy_object(user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions, None, True)
-        if (src_account, src_container, src_name) != (dest_account, dest_container, dest_name):
-            self._delete_object(user, src_account, src_container, src_name)
+        dest_version_id = self._copy_object(user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions, None, True, delimiter)
         return dest_version_id
     
-    def _delete_object(self, user, account, container, name, until=None, prefix='', delimiter=None):
+    def _delete_object(self, user, account, container, name, until=None, delimiter=None):
         if user != account:
             raise NotAllowedError
         
@@ -791,13 +820,28 @@ class ModularBackend(BaseBackend):
             self._report_size_change(user, account, -del_size, {'action': 'object delete'})
         self._report_object_change(user, account, path, details={'action': 'object delete'})
         self.permissions.access_clear(path)
+        
+        if delimiter:
+            prefix = name + delimiter if not name.endswith(delimiter) else name
+            src_names = self._list_objects_no_limit(user, account, container, prefix, delimiter=None, virtual=True, domain=None, keys=[], shared=False, until=None, size_range=None, all_props=True, public=False)
+            paths = []
+            for t in src_names:
+               path = '/'.join((account, container, t[0]))
+               node = t[2]
+                src_version_id, dest_version_id = self._put_version_duplicate(user, node, size=0, type='', hash=None, checksum='', cluster=CLUSTER_DELETED)
+                del_size = self._apply_versioning(account, container, src_version_id)
+                if del_size:
+                    self._report_size_change(user, account, -del_size, {'action': 'object delete'})
+                self._report_object_change(user, account, path, details={'action': 'object delete'})
+                paths.append(path)
+            self.permissions.access_clear_bulk(paths)
     
     @backend_method
-    def (self, user, account, container, name, until=None, prefix='', delimiter=None):
+    def delete_object(self, user, account, container, name, until=None, prefix='', delimiter=None):
         """Delete/purge an object."""
         
         logger.debug("delete_object: %s %s %s %s %s %s %s", user, account, container, name, until, prefix, delimiter)
-        self._delete_object(user, account, container, name, until)
+        self._delete_object(user, account, container, name, until, delimiter)
     
     @backend_method
     def list_versions(self, user, account, container, name):
@@ -936,6 +980,21 @@ class ModularBackend(BaseBackend):
             if props is None or props[self.CLUSTER] == CLUSTER_DELETED:
                 raise IndexError('Version does not exist')
         return props
+
+    def _get_versions(self, nodes, version=None):
+        if version is None:
+            props = self.node.version_lookup_bulk(nodes, inf, CLUSTER_NORMAL)
+            if not props:
+                raise NameError('Object does not exist')
+        else:
+            try:
+                version = int(version)
+            except ValueError:
+                raise IndexError('Version does not exist')
+            props = self.node.version_get_properties(version)
+            if props is None or props[self.CLUSTER] == CLUSTER_DELETED:
+                raise IndexError('Version does not exist')
+        return props
     
     def _put_version_duplicate(self, user, node, src_node=None, size=None, type=None, hash=None, checksum=None, cluster=CLUSTER_NORMAL, is_copy=False):
         """Create a new version of the node."""