Backend transaction handling.
[pithos] / pithos / backends / simple.py
index b81e8ca..4970052 100644 (file)
@@ -44,6 +44,25 @@ from pithos.lib.hashfiler import Mapper, Blocker
 
 logger = logging.getLogger(__name__)
 
+def backend_method(func=None, autocommit=1):
+    if func is None:
+        def fn(func):
+            return backend_method(func, autocommit)
+        return fn
+
+    if not autocommit:
+        return func
+    def fn(self, *args, **kw):
+        self.con.execute('begin deferred')
+        try:
+            ret = func(self, *args, **kw)
+            self.con.commit()
+            return ret
+        except:
+            self.con.rollback()
+            raise
+    return fn
+
 
 class SimpleBackend(BaseBackend):
     """A simple backend.
@@ -110,6 +129,7 @@ class SimpleBackend(BaseBackend):
                   'namelen': self.blocker.hashlen}
         self.mapper = Mapper(**params)
     
+    @backend_method
     def get_account_meta(self, user, account, until=None):
         """Return a dictionary with the account metadata."""
         
@@ -146,6 +166,7 @@ class SimpleBackend(BaseBackend):
             meta.update({'until_timestamp': tstamp})
         return meta
     
+    @backend_method
     def update_account_meta(self, user, account, meta, replace=False):
         """Update the metadata associated with the account."""
         
@@ -154,6 +175,7 @@ class SimpleBackend(BaseBackend):
             raise NotAllowedError
         self._put_metadata(user, account, meta, replace, False)
     
+    @backend_method
     def get_account_groups(self, user, account):
         """Return a dictionary with the user groups defined for this account."""
         
@@ -162,6 +184,7 @@ class SimpleBackend(BaseBackend):
             raise NotAllowedError
         return self._get_groups(account)
     
+    @backend_method
     def update_account_groups(self, user, account, groups, replace=False):
         """Update the groups associated with the account."""
         
@@ -171,6 +194,7 @@ class SimpleBackend(BaseBackend):
         self._check_groups(groups)
         self._put_groups(account, groups, replace)
     
+    @backend_method
     def put_account(self, user, account):
         """Create a new account with the given name."""
         
@@ -184,8 +208,8 @@ class SimpleBackend(BaseBackend):
         else:
             raise NameError('Account already exists')
         version_id = self._put_version(account, user)
-        self.con.commit()
     
+    @backend_method
     def delete_account(self, user, account):
         """Delete the account with the given name."""
         
@@ -198,8 +222,8 @@ class SimpleBackend(BaseBackend):
         sql = 'delete from versions where name = ?'
         self.con.execute(sql, (account,))
         self._del_groups(account)
-        self.con.commit()
     
+    @backend_method
     def list_containers(self, user, account, marker=None, limit=10000, until=None):
         """Return a list of containers existing under an account."""
         
@@ -219,6 +243,7 @@ class SimpleBackend(BaseBackend):
             return containers[start:start + limit]
         return self._list_objects(account, '', '/', marker, limit, False, [], until)
     
+    @backend_method
     def get_container_meta(self, user, account, container, until=None):
         """Return a dictionary with the container metadata."""
         
@@ -242,6 +267,7 @@ class SimpleBackend(BaseBackend):
             meta.update({'until_timestamp': tstamp})
         return meta
     
+    @backend_method
     def update_container_meta(self, user, account, container, meta, replace=False):
         """Update the metadata associated with the container."""
         
@@ -251,6 +277,7 @@ class SimpleBackend(BaseBackend):
         path, version_id, mtime = self._get_containerinfo(account, container)
         self._put_metadata(user, path, meta, replace, False)
     
+    @backend_method
     def get_container_policy(self, user, account, container):
         """Return a dictionary with the container policy."""
         
@@ -260,6 +287,7 @@ class SimpleBackend(BaseBackend):
         path = self._get_containerinfo(account, container)[0]
         return self._get_policy(path)
     
+    @backend_method
     def update_container_policy(self, user, account, container, policy, replace=False):
         """Update the policy associated with the account."""
         
@@ -275,8 +303,8 @@ class SimpleBackend(BaseBackend):
         for k, v in policy.iteritems():
             sql = 'insert or replace into policy (name, key, value) values (?, ?, ?)'
             self.con.execute(sql, (path, k, v))
-        self.con.commit()
     
+    @backend_method
     def put_container(self, user, account, container, policy=None):
         """Create a new container with the given name."""
         
@@ -299,8 +327,8 @@ class SimpleBackend(BaseBackend):
         for k, v in policy.iteritems():
             sql = 'insert or replace into policy (name, key, value) values (?, ?, ?)'
             self.con.execute(sql, (path, k, v))
-        self.con.commit()
     
+    @backend_method
     def delete_container(self, user, account, container, until=None):
         """Delete/purge the container with the given name."""
         
@@ -316,7 +344,6 @@ class SimpleBackend(BaseBackend):
             c = self.con.execute(sql, (path + '/%', until))
             for v in [x[0] for x in c.fetchall()]:
                 self._del_version(v)
-            self.con.commit()
             return
         
         count = self._get_pathstats(path)[0]
@@ -328,6 +355,7 @@ class SimpleBackend(BaseBackend):
         self.con.execute(sql, (path,))
         self._copy_version(user, account, account, True, False) # New account version (for timestamp update).
     
+    @backend_method
     def list_objects(self, user, account, container, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, keys=[], until=None):
         """Return a list of objects existing under a container."""
         
@@ -337,6 +365,7 @@ class SimpleBackend(BaseBackend):
         path, version_id, mtime = self._get_containerinfo(account, container, until)
         return self._list_objects(path, prefix, delimiter, marker, limit, virtual, keys, until)
     
+    @backend_method
     def list_object_meta(self, user, account, container, until=None):
         """Return a list with all the container's object meta keys."""
         
@@ -350,6 +379,7 @@ class SimpleBackend(BaseBackend):
         c = self.con.execute(sql, (path + '/%',))
         return [x[0] for x in c.fetchall()]
     
+    @backend_method
     def get_object_meta(self, user, account, container, name, version=None):
         """Return a dictionary with the object metadata."""
         
@@ -367,6 +397,7 @@ class SimpleBackend(BaseBackend):
         meta.update({'modified': modified, 'modified_by': muser})
         return meta
     
+    @backend_method
     def update_object_meta(self, user, account, container, name, meta, replace=False):
         """Update the metadata associated with the object."""
         
@@ -375,6 +406,7 @@ class SimpleBackend(BaseBackend):
         path, version_id, muser, mtime, size = self._get_objectinfo(account, container, name)
         self._put_metadata(user, path, meta, replace)
     
+    @backend_method
     def get_object_permissions(self, user, account, container, name):
         """Return the path from which this object gets its permissions from,\
         along with a dictionary containing the permissions."""
@@ -384,6 +416,7 @@ class SimpleBackend(BaseBackend):
         path = self._get_objectinfo(account, container, name)[0]
         return self._get_permissions(path)
     
+    @backend_method
     def update_object_permissions(self, user, account, container, name, permissions):
         """Update the permissions associated with the object."""
         
@@ -394,6 +427,7 @@ class SimpleBackend(BaseBackend):
         r, w = self._check_permissions(path, permissions)
         self._put_permissions(path, r, w)
     
+    @backend_method
     def get_object_public(self, user, account, container, name):
         """Return the public URL of the object if applicable."""
         
@@ -404,6 +438,7 @@ class SimpleBackend(BaseBackend):
             return '/public/' + path
         return None
     
+    @backend_method
     def update_object_public(self, user, account, container, name, public):
         """Update the public status of the object."""
         
@@ -412,6 +447,7 @@ class SimpleBackend(BaseBackend):
         path = self._get_objectinfo(account, container, name)[0]
         self._put_public(path, public)
     
+    @backend_method
     def get_object_hashmap(self, user, account, container, name, version=None):
         """Return the object's size and a list with partial hashes."""
         
@@ -421,6 +457,7 @@ class SimpleBackend(BaseBackend):
         hashmap = self.mapper.map_retr(version_id)
         return size, [binascii.hexlify(x) for x in hashmap]
     
+    @backend_method
     def update_object_hashmap(self, user, account, container, name, size, hashmap, meta={}, replace_meta=False, permissions=None):
         """Create/update an object with the specified size and partial hashes."""
         
@@ -446,8 +483,8 @@ class SimpleBackend(BaseBackend):
             self.con.execute(sql, (dest_version_id, k, v))
         if permissions is not None:
             self._put_permissions(path, r, w)
-        self.con.commit()
     
+    @backend_method
     def copy_object(self, user, account, src_container, src_name, dest_container, dest_name, dest_meta={}, replace_meta=False, permissions=None, src_version=None):
         """Copy an object's data and metadata."""
         
@@ -471,8 +508,8 @@ class SimpleBackend(BaseBackend):
             self.con.execute(sql, (dest_version_id, k, v))
         if permissions is not None:
             self._put_permissions(dest_path, r, w)
-        self.con.commit()
     
+    @backend_method
     def move_object(self, user, account, src_container, src_name, dest_container, dest_name, dest_meta={}, replace_meta=False, permissions=None):
         """Move an object's data and metadata."""
         
@@ -480,6 +517,7 @@ class SimpleBackend(BaseBackend):
         self.copy_object(user, account, src_container, src_name, dest_container, dest_name, dest_meta, replace_meta, permissions, None)
         self.delete_object(user, account, src_container, src_name)
     
+    @backend_method
     def delete_object(self, user, account, container, name, until=None):
         """Delete/purge an object."""
         
@@ -499,13 +537,13 @@ class SimpleBackend(BaseBackend):
                 pass
             else:
                 self._del_sharing(path)
-            self.con.commit()
             return
         
         path = self._get_objectinfo(account, container, name)[0]
         self._put_version(path, user, 0, 1)
         self._del_sharing(path)
     
+    @backend_method
     def list_versions(self, user, account, container, name):
         """Return a list of all (version, version_timestamp) tuples for an object."""
         
@@ -517,6 +555,7 @@ class SimpleBackend(BaseBackend):
         c = self.con.execute(sql, (path,))
         return [(int(x[0]), int(x[1])) for x in c.fetchall()]
     
+    @backend_method(autocommit=0)
     def get_block(self, hash):
         """Return a block's data."""
         
@@ -526,6 +565,7 @@ class SimpleBackend(BaseBackend):
             raise NameError('Block does not exist')
         return blocks[0]
     
+    @backend_method(autocommit=0)
     def put_block(self, data):
         """Create a block and return the hash."""
         
@@ -533,6 +573,7 @@ class SimpleBackend(BaseBackend):
         hashes, absent = self.blocker.block_stor((data,))
         return binascii.hexlify(hashes[0])
     
+    @backend_method(autocommit=0)
     def update_block(self, hash, data, offset=0):
         """Update a known block and return the hash."""
         
@@ -584,7 +625,6 @@ class SimpleBackend(BaseBackend):
         tstamp = int(time.time())
         sql = 'insert into versions (name, user, tstamp, size, hide) values (?, ?, ?, ?, ?)'
         id = self.con.execute(sql, (path, user, tstamp, size, hide)).lastrowid
-        self.con.commit()
         return str(id)
     
     def _copy_version(self, user, src_path, dest_path, copy_meta=True, copy_data=True, src_version=None):
@@ -608,7 +648,6 @@ class SimpleBackend(BaseBackend):
             # TODO: Copy properly.
             hashmap = self.mapper.map_retr(src_version_id)
             self.mapper.map_stor(dest_version_id, hashmap)
-        self.con.commit()
         return src_version_id, dest_version_id
     
     def _get_versioninfo(self, account, container, name, until=None):
@@ -663,7 +702,6 @@ class SimpleBackend(BaseBackend):
             else:
                 sql = 'insert or replace into metadata (version_id, key, value) values (?, ?, ?)'
                 self.con.execute(sql, (dest_version_id, k, v))
-        self.con.commit()
     
     def _check_policy(self, policy):
         for k in policy.keys():
@@ -762,7 +800,6 @@ class SimpleBackend(BaseBackend):
             if v:
                 sql = 'insert into groups (account, gname, user) values (?, ?, ?)'
                 self.con.executemany(sql, [(account, k, x) for x in v])
-        self.con.commit()
     
     def _del_groups(self, account):
         sql = 'delete from groups where account = ?'
@@ -812,7 +849,6 @@ class SimpleBackend(BaseBackend):
             self.con.executemany(sql, [(path, 'read', x) for x in r])
         if w:
             self.con.executemany(sql, [(path, 'write', x) for x in w])
-        self.con.commit()
     
     def _get_public(self, path):
         sql = 'select name from public where name = ?'
@@ -828,14 +864,12 @@ class SimpleBackend(BaseBackend):
         else:
             sql = 'insert or replace into public (name) values (?)'
         self.con.execute(sql, (path,))
-        self.con.commit()
     
     def _del_sharing(self, path):
         sql = 'delete from permissions where name = ?'
         self.con.execute(sql, (path,))
         sql = 'delete from public where name = ?'
         self.con.execute(sql, (path,))
-        self.con.commit()
     
     def _is_allowed(self, user, account, container, name, op='read'):
         if user == account: