quotaholder serial handling fix
[pithos] / snf-pithos-backend / pithos / backends / modular.py
index 61aa6de..3fa1368 100644 (file)
@@ -39,6 +39,8 @@ import logging
 import hashlib
 import binascii
 
+from commissioning.clients.quotaholder import QuotaholderHTTP
+
 from base import DEFAULT_QUOTA, DEFAULT_VERSIONING, NotAllowedError, QuotaError, BaseBackend, \
     AccountExists, ContainerExists, AccountNotEmpty, ContainerNotEmpty, ItemNotExists, VersionNotExists
 
@@ -107,14 +109,24 @@ def backend_method(func=None, autocommit=1):
 
     def fn(self, *args, **kw):
         self.wrapper.execute()
+        serials = self.serials
+        self.messages = []
         try:
-            self.messages = []
             ret = func(self, *args, **kw)
             for m in self.messages:
                 self.queue.send(*m)
+            if serials:
+                self.quotaholder.accept_commission(
+                            context     =   {},
+                            clientkey   =   'pithos',
+                            serials     =   serials)
             self.wrapper.commit()
             return ret
         except:
+            self.quotaholder.reject_commission(
+                        context     =   {},
+                        clientkey   =   'pithos',
+                        serials     =   serials)
             self.wrapper.rollback()
             raise
     return fn
@@ -129,7 +141,7 @@ class ModularBackend(BaseBackend):
     def __init__(self, db_module=None, db_connection=None,
                  block_module=None, block_path=None, block_umask=None,
                  queue_module=None, queue_hosts=None,
-                 queue_exchange=None):
+                 queue_exchange=None, quotaholder_url=None):
         db_module = db_module or DEFAULT_DB_MODULE
         db_connection = db_connection or DEFAULT_DB_CONNECTION
         block_module = block_module or DEFAULT_BLOCK_MODULE
@@ -138,7 +150,7 @@ class ModularBackend(BaseBackend):
         #queue_module = queue_module or DEFAULT_QUEUE_MODULE
         #queue_hosts = queue_hosts or DEFAULT_QUEUE_HOSTS
         #queue_exchange = queue_exchange or DEFAULT_QUEUE_EXCHANGE
-               
+                
         self.hash_algorithm = 'sha256'
         self.block_size = 4 * 1024 * 1024  # 4MB
 
@@ -171,7 +183,7 @@ class ModularBackend(BaseBackend):
         if queue_module and queue_hosts:
             self.queue_module = load_module(queue_module)
             params = {'hosts': queue_hosts,
-                         'exchange': queue_exchange,
+                          'exchange': queue_exchange,
                       'client_id': QUEUE_CLIENT_ID}
             self.queue = self.queue_module.Queue(**params)
         else:
@@ -184,6 +196,10 @@ class ModularBackend(BaseBackend):
 
             self.queue = NoQueue()
 
+        self.quotaholder_url = quotaholder_url
+        self.quotaholder = QuotaholderHTTP(quotaholder_url)
+        self.serials = []
+
     def close(self):
         self.wrapper.close()
         self.queue.close()
@@ -489,8 +505,8 @@ class ModularBackend(BaseBackend):
                 self.store.map_delete(h)
             self.node.node_purge_children(node, until, CLUSTER_DELETED)
             self._report_size_change(user, account, -size,
-                                                        {'action':'container purge', 'path': path,
-                                                         'versions': serials})
+                                     {'action':'container purge', 'path': path,
+                                      'versions': serials})
             return
 
         if not delimiter:
@@ -503,9 +519,9 @@ class ModularBackend(BaseBackend):
             self.node.node_purge_children(node, inf, CLUSTER_DELETED)
             self.node.node_remove(node)
             self._report_size_change(user, account, -size,
-                                                        {'action': 'container delete',
-                                                         'path': path,
-                                                         'versions': serials})
+                                     {'action': 'container delete',
+                                      'path': path,
+                                      'versions': serials})
         else:
                 # remove only contents
             src_names = self._list_objects_no_limit(user, account, container, prefix='', delimiter=None, virtual=False, domain=None, keys=[], shared=False, until=None, size_range=None, all_props=True, public=False)
@@ -518,8 +534,8 @@ class ModularBackend(BaseBackend):
                     account, container, src_version_id)
                 if del_size:
                     self._report_size_change(user, account, -del_size,
-                                                                {'action': 'object delete',
-                                                                 'path': path, 'versions': [dest_version_id]})
+                                             {'action': 'object delete',
+                                              'path': path, 'versions': [dest_version_id]})
                 self._report_object_change(
                     user, account, path, details={'action': 'object delete'})
                 paths.append(path)
@@ -810,8 +826,8 @@ class ModularBackend(BaseBackend):
                 # This must be executed in a transaction, so the version is never created if it fails.
                 raise QuotaError
         self._report_size_change(user, account, size_delta,
-                                                        {'action': 'object update', 'path': path,
-                                                         'versions': [dest_version_id]})
+                                 {'action': 'object update', 'path': path,
+                                  'versions': [dest_version_id]})
 
         if permissions is not None:
             self.permissions.access_set(path, permissions)
@@ -942,8 +958,8 @@ class ModularBackend(BaseBackend):
             except NameError:
                 self.permissions.access_clear(path)
             self._report_size_change(user, account, -size,
-                                                       {'action': 'object purge', 'path': path,
-                                                        'versions': serials})
+                                    {'action': 'object purge', 'path': path,
+                                     'versions': serials})
             return
 
         path, node = self._lookup_object(account, container, name)
@@ -951,8 +967,8 @@ class ModularBackend(BaseBackend):
         del_size = self._apply_versioning(account, container, src_version_id)
         if del_size:
             self._report_size_change(user, account, -del_size,
-                                                        {'action': 'object delete', 'path': path,
-                                                         'versions': [dest_version_id]})
+                                     {'action': 'object delete', 'path': path,
+                                      'versions': [dest_version_id]})
         self._report_object_change(
             user, account, path, details={'action': 'object delete'})
         self.permissions.access_clear(path)
@@ -969,9 +985,9 @@ class ModularBackend(BaseBackend):
                     account, container, src_version_id)
                 if del_size:
                     self._report_size_change(user, account, -del_size,
-                                                                {'action': 'object delete',
-                                                                 'path': path,
-                                                                 'versions': [dest_version_id]})
+                                             {'action': 'object delete',
+                                              'path': path,
+                                              'versions': [dest_version_id]})
                 self._report_object_change(
                     user, account, path, details={'action': 'object delete'})
                 paths.append(path)
@@ -1229,22 +1245,32 @@ class ModularBackend(BaseBackend):
         logger.debug(
             "_report_size_change: %s %s %s %s", user, account, size, details)
         self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % ('resource.diskspace',), 
-                                                 account, QUEUE_INSTANCE_ID, 'diskspace',
-                                                 float(size), details))
+                              account, QUEUE_INSTANCE_ID, 'diskspace',
+                              float(size), details))
+
+        serial = self.quotaholder.issue_commission(
+                context     =   {},
+                target      =   account,
+                key         =   '1',
+                clientkey   =   'pithos',
+                ownerkey    =   '',
+                provisions  =   (('pithos+', 'diskspace', size),)
+        )
+        self.serials.append(serial)
 
     def _report_object_change(self, user, account, path, details={}):
         details.update({'user': user})
         logger.debug("_report_object_change: %s %s %s %s", user,
                      account, path, details)
         self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % ('object',),
-                                                 account, QUEUE_INSTANCE_ID, 'object', path, details))
+                                                  account, QUEUE_INSTANCE_ID, 'object', path, details))
 
     def _report_sharing_change(self, user, account, path, details={}):
         logger.debug("_report_permissions_change: %s %s %s %s",
                      user, account, path, details)
         details.update({'user': user})
         self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % ('sharing',),
-                                                 account, QUEUE_INSTANCE_ID, 'sharing', path, details))
+                                                  account, QUEUE_INSTANCE_ID, 'sharing', path, details))
 
     # Policy functions.