Add backend close method.
[pithos] / pithos / backends / modular.py
index 726dcf0..62ecdf6 100644 (file)
 import sys
 import os
 import time
-import sqlite3
 import logging
 import hashlib
 import binascii
 
-from base import NotAllowedError, BaseBackend
+from base import NotAllowedError, QuotaError, BaseBackend
 from lib.hashfiler import Mapper, Blocker
 
 ( CLUSTER_NORMAL, CLUSTER_HISTORY, CLUSTER_DELETED ) = range(3)
@@ -49,6 +48,35 @@ inf = float('inf')
 
 logger = logging.getLogger(__name__)
 
+
+class HashMap(list):
+    
+    def __init__(self, blocksize, blockhash):
+        super(HashMap, self).__init__()
+        self.blocksize = blocksize
+        self.blockhash = blockhash
+    
+    def _hash_raw(self, v):
+        h = hashlib.new(self.blockhash)
+        h.update(v)
+        return h.digest()
+    
+    def hash(self):
+        if len(self) == 0:
+            return self._hash_raw('')
+        if len(self) == 1:
+            return self.__getitem__(0)
+        
+        h = list(self)
+        s = 2
+        while s < len(h):
+            s = s * 2
+        h += [('\x00' * len(h[0]))] * (s - len(h))
+        while len(h) > 1:
+            h = [self._hash_raw(h[x] + h[x + 1]) for x in range(0, len(h), 2)]
+        return h[0]
+
+
 def backend_method(func=None, autocommit=1):
     if func is None:
         def fn(func):
@@ -79,7 +107,7 @@ class ModularBackend(BaseBackend):
         self.hash_algorithm = 'sha256'
         self.block_size = 4 * 1024 * 1024 # 4MB
         
-        self.default_policy = {'quota': 0, 'versioning': 'auto'}
+        self.default_policy = {'quota': 0, 'versioning': 'manual'}
         
         if path and not os.path.exists(path):
             os.makedirs(path)
@@ -88,6 +116,7 @@ class ModularBackend(BaseBackend):
         
         __import__(mod)
         self.mod = sys.modules[mod]
+        self.db = db
         self.wrapper = self.mod.dbwrapper.DBWrapper(db)
         
         params = {'blocksize': self.block_size,
@@ -103,11 +132,13 @@ class ModularBackend(BaseBackend):
         self.permissions = self.mod.permissions.Permissions(**params)
         for x in ['READ', 'WRITE']:
             setattr(self, x, getattr(self.mod.permissions, x))
-        self.policy = self.mod.policy.Policy(**params)
         self.node = self.mod.node.Node(**params)
-        for x in ['ROOTNODE', 'SERIAL', 'SIZE', 'MTIME', 'MUSER', 'CLUSTER']:
+        for x in ['ROOTNODE', 'SERIAL', 'HASH', 'SIZE', 'MTIME', 'MUSER', 'CLUSTER']:
             setattr(self, x, getattr(self.mod.node, x))
     
+    def close(self):
+        self.wrapper.close()
+    
     @backend_method
     def list_accounts(self, user, marker=None, limit=10000):
         """Return a list of accounts the user can access."""
@@ -160,7 +191,7 @@ class ModularBackend(BaseBackend):
         if user != account:
             raise NotAllowedError
         path, node = self._lookup_account(account, True)
-        self._put_metadata(user, node, meta, replace, False)
+        self._put_metadata(user, node, meta, replace)
     
     @backend_method
     def get_account_groups(self, user, account):
@@ -192,16 +223,42 @@ class ModularBackend(BaseBackend):
                 self.permissions.group_addmany(account, k, v)
     
     @backend_method
-    def put_account(self, user, account):
+    def get_account_policy(self, user, account):
+        """Return a dictionary with the account policy."""
+        
+        logger.debug("get_account_policy: %s", account)
+        if user != account:
+            if account not in self._allowed_accounts(user):
+                raise NotAllowedError
+            return {}
+        path, node = self._lookup_account(account, True)
+        return self._get_policy(node)
+    
+    @backend_method
+    def update_account_policy(self, user, account, policy, replace=False):
+        """Update the policy associated with the account."""
+        
+        logger.debug("update_account_policy: %s %s %s", account, policy, replace)
+        if user != account:
+            raise NotAllowedError
+        path, node = self._lookup_account(account, True)
+        self._check_policy(policy)
+        self._put_policy(node, policy, replace)
+    
+    @backend_method
+    def put_account(self, user, account, policy={}):
         """Create a new account with the given name."""
         
-        logger.debug("put_account: %s", account)
+        logger.debug("put_account: %s %s", account, policy)
         if user != account:
             raise NotAllowedError
         node = self.node.node_lookup(account)
         if node is not None:
             raise NameError('Account already exists')
-        self._put_path(user, self.ROOTNODE, account)
+        if policy:
+            self._check_policy(policy)
+        node = self._put_path(user, self.ROOTNODE, account)
+        self._put_policy(node, policy, True)
     
     @backend_method
     def delete_account(self, user, account):
@@ -273,7 +330,7 @@ class ModularBackend(BaseBackend):
         if user != account:
             raise NotAllowedError
         path, node = self._lookup_container(account, container)
-        self._put_metadata(user, node, meta, replace, False)
+        self._put_metadata(user, node, meta, replace)
     
     @backend_method
     def get_container_policy(self, user, account, container):
@@ -284,26 +341,22 @@ class ModularBackend(BaseBackend):
             if container not in self._allowed_containers(user, account):
                 raise NotAllowedError
             return {}
-        path = self._lookup_container(account, container)[0]
-        return self.policy.policy_get(path)
+        path, node = self._lookup_container(account, container)
+        return self._get_policy(node)
     
     @backend_method
     def update_container_policy(self, user, account, container, policy, replace=False):
-        """Update the policy associated with the account."""
+        """Update the policy associated with the container."""
         
         logger.debug("update_container_policy: %s %s %s %s", account, container, policy, replace)
         if user != account:
             raise NotAllowedError
-        path = self._lookup_container(account, container)[0]
+        path, node = self._lookup_container(account, container)
         self._check_policy(policy)
-        if replace:
-            for k, v in self.default_policy.iteritems():
-                if k not in policy:
-                    policy[k] = v
-        self.policy.policy_set(path, policy)
+        self._put_policy(node, policy, replace)
     
     @backend_method
-    def put_container(self, user, account, container, policy=None):
+    def put_container(self, user, account, container, policy={}):
         """Create a new container with the given name."""
         
         logger.debug("put_container: %s %s %s", account, container, policy)
@@ -318,11 +371,8 @@ class ModularBackend(BaseBackend):
         if policy:
             self._check_policy(policy)
         path = '/'.join((account, container))
-        self._put_path(user, self._lookup_account(account, True)[1], path)
-        for k, v in self.default_policy.iteritems():
-            if k not in policy:
-                policy[k] = v
-        self.policy.policy_set(path, policy)
+        node = self._put_path(user, self._lookup_account(account, True)[1], path)
+        self._put_policy(node, policy, True)
     
     @backend_method
     def delete_container(self, user, account, container, until=None):
@@ -334,20 +384,15 @@ class ModularBackend(BaseBackend):
         path, node = self._lookup_container(account, container)
         
         if until is not None:
-            versions = self.node.node_purge_children(node, until, CLUSTER_HISTORY)
-            for v in versions:
-                self.mapper.map_remv(v)
+            self.node.node_purge_children(node, until, CLUSTER_HISTORY)
             self.node.node_purge_children(node, until, CLUSTER_DELETED)
             return
         
         if self._get_statistics(node)[0] > 0:
             raise IndexError('Container is not empty')
-        versions = self.node.node_purge_children(node, inf, CLUSTER_HISTORY)
-        for v in versions:
-            self.mapper.map_remv(v)
+        self.node.node_purge_children(node, inf, CLUSTER_HISTORY)
         self.node.node_purge_children(node, inf, CLUSTER_DELETED)
         self.node.node_remove(node)
-        self.policy.policy_unset(path)
     
     @backend_method
     def list_objects(self, user, account, container, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, keys=[], shared=False, until=None):
@@ -480,28 +525,32 @@ class ModularBackend(BaseBackend):
         self._can_read(user, account, container, name)
         path, node = self._lookup_object(account, container, name)
         props = self._get_version(node, version)
-        hashmap = self.mapper.map_retr(props[self.SERIAL])
+        hashmap = self.mapper.map_retr(binascii.unhexlify(props[self.HASH]))
         return props[self.SIZE], [binascii.hexlify(x) for x in hashmap]
     
-    @backend_method
-    def update_object_hashmap(self, user, account, container, name, size, hashmap, meta={}, replace_meta=False, permissions=None):
-        """Create/update an object with the specified size and partial hashes."""
-        
-        logger.debug("update_object_hashmap: %s %s %s %s %s", account, container, name, size, hashmap)
+    def _update_object_hash(self, user, account, container, name, size, hash, meta={}, replace_meta=False, permissions=None):
         if permissions is not None and user != account:
             raise NotAllowedError
         self._can_write(user, account, container, name)
-        missing = self.blocker.block_ping([binascii.unhexlify(x) for x in hashmap])
-        if missing:
-            ie = IndexError()
-            ie.data = [binascii.hexlify(x) for x in missing]
-            raise ie
         if permissions is not None:
             path = '/'.join((account, container, name))
             self._check_permissions(path, permissions)
-        path, node = self._put_object_node(account, container, name)
-        src_version_id, dest_version_id = self._copy_version(user, node, None, node, size)
-        self.mapper.map_stor(dest_version_id, [binascii.unhexlify(x) for x in hashmap])
+        
+        account_path, account_node = self._lookup_account(account, True)
+        container_path, container_node = self._lookup_container(account, container)
+        path, node = self._put_object_node(container_path, container_node, name)
+        src_version_id, dest_version_id = self._put_version_duplicate(user, node, size, hash)
+        
+        # Check quota.
+        size_delta = size # Change with versioning.
+        if size_delta > 0:
+            account_quota = long(self._get_policy(account_node)['quota'])
+            container_quota = long(self._get_policy(container_node)['quota'])
+            if (account_quota > 0 and self._get_statistics(account_node)[1] + size_delta > account_quota) or \
+               (container_quota > 0 and self._get_statistics(container_node)[1] + size_delta > container_quota):
+                # This must be executed in a transaction, so the version is never created if it fails.
+                raise QuotaError
+        
         if not replace_meta and src_version_id is not None:
             self.node.attribute_copy(src_version_id, dest_version_id)
         self.node.attribute_set(dest_version_id, ((k, v) for k, v in meta.iteritems()))
@@ -509,41 +558,60 @@ class ModularBackend(BaseBackend):
             self.permissions.access_set(path, permissions)
         return dest_version_id
     
-    def _copy_object(self, user, account, src_container, src_name, dest_container, dest_name, dest_meta={}, replace_meta=False, permissions=None, src_version=None):
-        if permissions is not None and user != account:
-            raise NotAllowedError
-        self._can_read(user, account, src_container, src_name)
-        self._can_write(user, account, dest_container, dest_name)
-        src_path, src_node = self._lookup_object(account, src_container, src_name)
-        self._get_version(src_node, src_version)
-        if permissions is not None:
-            dest_path = '/'.join((account, container, name))
-            self._check_permissions(dest_path, permissions)
-        dest_path, dest_node = self._put_object_node(account, dest_container, dest_name)
-        src_version_id, dest_version_id = self._copy_version(user, src_node, src_version, dest_node)
-        if src_version_id is not None:
-            self._copy_data(src_version_id, dest_version_id)
-        if not replace_meta and src_version_id is not None:
+    @backend_method
+    def update_object_hashmap(self, user, account, container, name, size, hashmap, meta={}, replace_meta=False, permissions=None):
+        """Create/update an object with the specified size and partial hashes."""
+        
+        logger.debug("update_object_hashmap: %s %s %s %s %s", account, container, name, size, hashmap)
+        if size == 0: # No such thing as an empty hashmap.
+            hashmap = [self.put_block('')]
+        map = HashMap(self.block_size, self.hash_algorithm)
+        map.extend([binascii.unhexlify(x) for x in hashmap])
+        missing = self.blocker.block_ping(map)
+        if missing:
+            ie = IndexError()
+            ie.data = [binascii.hexlify(x) for x in missing]
+            raise ie
+        
+        hash = map.hash()
+        dest_version_id = self._update_object_hash(user, account, container, name, size, binascii.hexlify(hash), meta, replace_meta, permissions)
+        self.mapper.map_stor(hash, map)
+        return dest_version_id
+    
+    def _copy_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, dest_meta={}, replace_meta=False, permissions=None, src_version=None):
+        self._can_read(user, src_account, src_container, src_name)
+        path, node = self._lookup_object(src_account, src_container, src_name)
+        props = self._get_version(node, src_version)
+        src_version_id = props[self.SERIAL]
+        hash = props[self.HASH]
+        size = props[self.SIZE]
+        
+        if replace_meta:
+            meta = dest_meta
+        else:
+            meta = {}
+        dest_version_id = self._update_object_hash(user, dest_account, dest_container, dest_name, size, hash, meta, True, permissions)
+        if not replace_meta:
             self.node.attribute_copy(src_version_id, dest_version_id)
-        self.node.attribute_set(dest_version_id, ((k, v) for k, v in dest_meta.iteritems()))
-        if permissions is not None:
-            self.permissions.access_set(dest_path, permissions)
+            self.node.attribute_set(dest_version_id, ((k, v) for k, v in dest_meta.iteritems()))
         return dest_version_id
     
     @backend_method
-    def copy_object(self, user, account, src_container, src_name, dest_container, dest_name, dest_meta={}, replace_meta=False, permissions=None, src_version=None):
+    def copy_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, dest_meta={}, replace_meta=False, permissions=None, src_version=None):
         """Copy an object's data and metadata."""
         
-        logger.debug("copy_object: %s %s %s %s %s %s %s %s %s", account, src_container, src_name, dest_container, dest_name, dest_meta, replace_meta, permissions, src_version)
-        return self._copy_object(user, account, src_container, src_name, dest_container, dest_name, dest_meta, replace_meta, permissions, src_version)
+        logger.debug("copy_object: %s %s %s %s %s %s %s %s %s %s", src_account, src_container, src_name, dest_account, dest_container, dest_name, dest_meta, replace_meta, permissions, src_version)
+        return self._copy_object(user, src_account, src_container, src_name, dest_account, dest_container, dest_name, dest_meta, replace_meta, permissions, src_version)
     
     @backend_method
-    def move_object(self, user, account, src_container, src_name, dest_container, dest_name, dest_meta={}, replace_meta=False, permissions=None):
+    def move_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, dest_meta={}, replace_meta=False, permissions=None):
         """Move an object's data and metadata."""
         
-        logger.debug("move_object: %s %s %s %s %s %s %s %s", account, src_container, src_name, dest_container, dest_name, dest_meta, replace_meta, permissions)
-        dest_version_id = self._copy_object(user, account, src_container, src_name, dest_container, dest_name, dest_meta, replace_meta, permissions, None)
-        self._delete_object(user, account, src_container, src_name)
+        logger.debug("move_object: %s %s %s %s %s %s %s %s %s", src_account, src_container, src_name, dest_account, dest_container, dest_name, dest_meta, replace_meta, permissions)
+        if user != src_account:
+            raise NotAllowedError
+        dest_version_id = self._copy_object(user, src_account, src_container, src_name, dest_account, dest_container, dest_name, dest_meta, replace_meta, permissions, None)
+        self._delete_object(user, src_account, src_container, src_name)
         return dest_version_id
     
     def _delete_object(self, user, account, container, name, until=None):
@@ -555,10 +623,8 @@ class ModularBackend(BaseBackend):
             node = self.node.node_lookup(path)
             if node is None:
                 return
-            versions = self.node.node_purge(node, until, CLUSTER_NORMAL)
-            versions += self.node.node_purge(node, until, CLUSTER_HISTORY)
-            for v in versions:
-                self.mapper.map_remv(v)
+            self.node.node_purge(node, until, CLUSTER_NORMAL)
+            self.node.node_purge(node, until, CLUSTER_HISTORY)
             self.node.node_purge_children(node, until, CLUSTER_DELETED)
             try:
                 props = self._get_version(node)
@@ -569,7 +635,7 @@ class ModularBackend(BaseBackend):
             return
         
         path, node = self._lookup_object(account, container, name)
-        self._copy_version(user, node, None, node, 0, CLUSTER_DELETED)
+        src_version_id, dest_version_id = self._put_version_duplicate(user, node, 0, None, CLUSTER_DELETED)
         self.permissions.access_clear(path)
     
     @backend_method
@@ -619,8 +685,7 @@ class ModularBackend(BaseBackend):
     
     # Path functions.
     
-    def _put_object_node(self, account, container, name):
-        path, parent = self._lookup_container(account, container)
+    def _put_object_node(self, path, parent, name):
         path = '/'.join((path, name))
         node = self.node.node_lookup(path)
         if node is None:
@@ -629,7 +694,7 @@ class ModularBackend(BaseBackend):
     
     def _put_path(self, user, parent, path):
         node = self.node.node_create(parent, path)
-        self.node.version_create(node, 0, None, user, CLUSTER_NORMAL)
+        self.node.version_create(node, None, 0, None, user, CLUSTER_NORMAL)
         return node
     
     def _lookup_account(self, account, create=True):
@@ -680,54 +745,42 @@ class ModularBackend(BaseBackend):
             if props is None:
                 raise NameError('Object does not exist')
         else:
+            try:
+                version = int(version)
+            except ValueError:
+                raise IndexError('Version does not exist')
             props = self.node.version_get_properties(version)
             if props is None or props[self.CLUSTER] == CLUSTER_DELETED:
                 raise IndexError('Version does not exist')
         return props
     
-    def _copy_version(self, user, src_node, src_version, dest_node, dest_size=None, dest_cluster=CLUSTER_NORMAL):
+    def _put_version_duplicate(self, user, node, size=None, hash=None, cluster=CLUSTER_NORMAL):
+        """Create a new version of the node."""
         
-        # Get source serial and size.
-        if src_version is not None:
-            src_props = self._get_version(src_node, src_version)
-            src_version_id = src_props[self.SERIAL]
-            size = src_props[self.SIZE]
+        props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
+        if props is not None:
+            src_version_id = props[self.SERIAL]
+            src_hash = props[self.HASH]
+            src_size = props[self.SIZE]
         else:
-            # Latest or create from scratch.
-            try:
-                src_props = self._get_version(src_node)
-                src_version_id = src_props[self.SERIAL]
-                size = src_props[self.SIZE]
-            except NameError:
-                src_version_id = None
-                size = 0
-        if dest_size is not None:
-            size = dest_size
+            src_version_id = None
+            src_hash = None
+            src_size = 0
+        if size is None:
+            hash = src_hash # This way hash can be set to None.
+            size = src_size
         
-        # Move the latest version at destination to CLUSTER_HISTORY and create new.
-        if src_node == dest_node and src_version is None and src_version_id is not None:
+        if src_version_id is not None:
             self.node.version_recluster(src_version_id, CLUSTER_HISTORY)
-        else:
-            dest_props = self.node.version_lookup(dest_node, inf, CLUSTER_NORMAL)
-            if dest_props is not None:
-                self.node.version_recluster(dest_props[self.SERIAL], CLUSTER_HISTORY)
-        dest_version_id, mtime = self.node.version_create(dest_node, size, src_version_id, user, dest_cluster)
-        
+        dest_version_id, mtime = self.node.version_create(node, hash, size, src_version_id, user, cluster)
         return src_version_id, dest_version_id
     
-    def _copy_data(self, src_version, dest_version):
-        hashmap = self.mapper.map_retr(src_version)
-        self.mapper.map_stor(dest_version, hashmap)
-    
-    def _get_metadata(self, version):
-        if version is None:
-            return {}
-        return dict(self.node.attribute_get(version))
-    
-    def _put_metadata(self, user, node, meta, replace=False, copy_data=True):
+    def _put_metadata(self, user, node, meta, replace=False):
         """Create a new version and store metadata."""
         
-        src_version_id, dest_version_id = self._copy_version(user, node, None, node)
+        src_version_id, dest_version_id = self._put_version_duplicate(user, node)
+        
+        # TODO: Merge with other functions that update metadata...
         if not replace:
             if src_version_id is not None:
                 self.node.attribute_copy(src_version_id, dest_version_id)
@@ -735,8 +788,6 @@ class ModularBackend(BaseBackend):
             self.node.attribute_set(dest_version_id, ((k, v) for k, v in meta.iteritems() if v != ''))
         else:
             self.node.attribute_set(dest_version_id, ((k, v) for k, v in meta.iteritems()))
-        if copy_data and src_version_id is not None:
-            self._copy_data(src_version_id, dest_version_id)
         return dest_version_id
     
     def _list_limits(self, listing, marker, limit):
@@ -782,6 +833,18 @@ class ModularBackend(BaseBackend):
             else:
                 raise ValueError
     
+    def _put_policy(self, node, policy, replace):
+        if replace:
+            for k, v in self.default_policy.iteritems():
+                if k not in policy:
+                    policy[k] = v
+        self.node.policy_set(node, policy)
+    
+    def _get_policy(self, node):
+        policy = self.default_policy.copy()
+        policy.update(self.node.policy_get(node))
+        return policy
+    
     # Access control functions.
     
     def _check_groups(self, groups):