quotaholder serial handling fix
[pithos] / snf-pithos-backend / pithos / backends / modular.py
index 7ee808a..3fa1368 100644 (file)
@@ -39,6 +39,8 @@ import logging
 import hashlib
 import binascii
 
+from commissioning.clients.quotaholder import QuotaholderHTTP
+
 from base import DEFAULT_QUOTA, DEFAULT_VERSIONING, NotAllowedError, QuotaError, BaseBackend, \
     AccountExists, ContainerExists, AccountNotEmpty, ContainerNotEmpty, ItemNotExists, VersionNotExists
 
@@ -79,7 +81,8 @@ DEFAULT_BLOCK_MODULE = 'pithos.backends.lib.hashfiler'
 DEFAULT_BLOCK_PATH = 'data/'
 DEFAULT_BLOCK_UMASK = 0o022
 #DEFAULT_QUEUE_MODULE = 'pithos.backends.lib.rabbitmq'
-#DEFAULT_QUEUE_CONNECTION = 'rabbitmq://guest:guest@localhost:5672/pithos'
+#DEFAULT_QUEUE_HOSTS = '[amqp://guest:guest@localhost:5672]'
+#DEFAULT_QUEUE_EXCHANGE = 'pithos'
 
 QUEUE_MESSAGE_KEY_PREFIX = 'pithos.%s'
 QUEUE_CLIENT_ID = 'pithos'
@@ -106,14 +109,24 @@ def backend_method(func=None, autocommit=1):
 
     def fn(self, *args, **kw):
         self.wrapper.execute()
+        serials = self.serials
+        self.messages = []
         try:
-            self.messages = []
             ret = func(self, *args, **kw)
             for m in self.messages:
                 self.queue.send(*m)
+            if serials:
+                self.quotaholder.accept_commission(
+                            context     =   {},
+                            clientkey   =   'pithos',
+                            serials     =   serials)
             self.wrapper.commit()
             return ret
         except:
+            self.quotaholder.reject_commission(
+                        context     =   {},
+                        clientkey   =   'pithos',
+                        serials     =   serials)
             self.wrapper.rollback()
             raise
     return fn
@@ -127,15 +140,17 @@ class ModularBackend(BaseBackend):
 
     def __init__(self, db_module=None, db_connection=None,
                  block_module=None, block_path=None, block_umask=None,
-                 queue_module=None, queue_connection=None):
+                 queue_module=None, queue_hosts=None,
+                 queue_exchange=None, quotaholder_url=None):
         db_module = db_module or DEFAULT_DB_MODULE
         db_connection = db_connection or DEFAULT_DB_CONNECTION
         block_module = block_module or DEFAULT_BLOCK_MODULE
         block_path = block_path or DEFAULT_BLOCK_PATH
         block_umask = block_umask or DEFAULT_BLOCK_UMASK
         #queue_module = queue_module or DEFAULT_QUEUE_MODULE
-        #queue_connection = queue_connection or DEFAULT_QUEUE_CONNECTION
-
+        #queue_hosts = queue_hosts or DEFAULT_QUEUE_HOSTS
+        #queue_exchange = queue_exchange or DEFAULT_QUEUE_EXCHANGE
+                
         self.hash_algorithm = 'sha256'
         self.block_size = 4 * 1024 * 1024  # 4MB
 
@@ -151,7 +166,7 @@ class ModularBackend(BaseBackend):
         params = {'wrapper': self.wrapper}
         self.permissions = self.db_module.Permissions(**params)
         self.config = self.db_module.Config(**params)
-        self.config = self.db_module.QuotaholderSync(**params)
+        self.quotaholder_serials = self.db_module.QuotaholderSerial(**params)
         for x in ['READ', 'WRITE']:
             setattr(self, x, getattr(self.db_module, x))
         self.node = self.db_module.Node(**params)
@@ -165,9 +180,10 @@ class ModularBackend(BaseBackend):
                   'umask': block_umask}
         self.store = self.block_module.Store(**params)
 
-        if queue_module and queue_connection:
+        if queue_module and queue_hosts:
             self.queue_module = load_module(queue_module)
-            params = {'exchange': queue_connection,
+            params = {'hosts': queue_hosts,
+                          'exchange': queue_exchange,
                       'client_id': QUEUE_CLIENT_ID}
             self.queue = self.queue_module.Queue(**params)
         else:
@@ -180,6 +196,10 @@ class ModularBackend(BaseBackend):
 
             self.queue = NoQueue()
 
+        self.quotaholder_url = quotaholder_url
+        self.quotaholder = QuotaholderHTTP(quotaholder_url)
+        self.serials = []
+
     def close(self):
         self.wrapper.close()
         self.queue.close()
@@ -479,26 +499,29 @@ class ModularBackend(BaseBackend):
         path, node = self._lookup_container(account, container)
 
         if until is not None:
-            hashes, size = self.node.node_purge_children(
+            hashes, size, serials = self.node.node_purge_children(
                 node, until, CLUSTER_HISTORY)
             for h in hashes:
                 self.store.map_delete(h)
             self.node.node_purge_children(node, until, CLUSTER_DELETED)
-            self._report_size_change(user, account, -size, {'action':
-                                     'container purge', 'path': path})
+            self._report_size_change(user, account, -size,
+                                     {'action':'container purge', 'path': path,
+                                      'versions': serials})
             return
 
         if not delimiter:
             if self._get_statistics(node)[0] > 0:
                 raise ContainerNotEmpty('Container is not empty')
-            hashes, size = self.node.node_purge_children(
+            hashes, size, serials = self.node.node_purge_children(
                 node, inf, CLUSTER_HISTORY)
             for h in hashes:
                 self.store.map_delete(h)
             self.node.node_purge_children(node, inf, CLUSTER_DELETED)
             self.node.node_remove(node)
-            self._report_size_change(user, account, -size, {'action':
-                                     'container delete', 'path': path})
+            self._report_size_change(user, account, -size,
+                                     {'action': 'container delete',
+                                      'path': path,
+                                      'versions': serials})
         else:
                 # remove only contents
             src_names = self._list_objects_no_limit(user, account, container, prefix='', delimiter=None, virtual=False, domain=None, keys=[], shared=False, until=None, size_range=None, all_props=True, public=False)
@@ -510,7 +533,9 @@ class ModularBackend(BaseBackend):
                 del_size = self._apply_versioning(
                     account, container, src_version_id)
                 if del_size:
-                    self._report_size_change(user, account, -del_size, {'action': 'object delete', 'path': path})
+                    self._report_size_change(user, account, -del_size,
+                                             {'action': 'object delete',
+                                              'path': path, 'versions': [dest_version_id]})
                 self._report_object_change(
                     user, account, path, details={'action': 'object delete'})
                 paths.append(path)
@@ -800,8 +825,9 @@ class ModularBackend(BaseBackend):
                (container_quota > 0 and self._get_statistics(container_node)[1] + size_delta > container_quota):
                 # This must be executed in a transaction, so the version is never created if it fails.
                 raise QuotaError
-        self._report_size_change(user, account, size_delta, {
-                                 'action': 'object update', 'path': path})
+        self._report_size_change(user, account, size_delta,
+                                 {'action': 'object update', 'path': path,
+                                  'versions': [dest_version_id]})
 
         if permissions is not None:
             self.permissions.access_set(path, permissions)
@@ -915,12 +941,15 @@ class ModularBackend(BaseBackend):
                 return
             hashes = []
             size = 0
-            h, s = self.node.node_purge(node, until, CLUSTER_NORMAL)
+            serials = []
+            h, s, v = self.node.node_purge(node, until, CLUSTER_NORMAL)
             hashes += h
             size += s
-            h, s = self.node.node_purge(node, until, CLUSTER_HISTORY)
+            serials += v
+            h, s, v = self.node.node_purge(node, until, CLUSTER_HISTORY)
             hashes += h
             size += s
+            serials += v
             for h in hashes:
                 self.store.map_delete(h)
             self.node.node_purge(node, until, CLUSTER_DELETED)
@@ -928,16 +957,18 @@ class ModularBackend(BaseBackend):
                 props = self._get_version(node)
             except NameError:
                 self.permissions.access_clear(path)
-            self._report_size_change(user, account, -size, {
-                                     'action': 'object purge', 'path': path})
+            self._report_size_change(user, account, -size,
+                                    {'action': 'object purge', 'path': path,
+                                     'versions': serials})
             return
 
         path, node = self._lookup_object(account, container, name)
         src_version_id, dest_version_id = self._put_version_duplicate(user, node, size=0, type='', hash=None, checksum='', cluster=CLUSTER_DELETED)
         del_size = self._apply_versioning(account, container, src_version_id)
         if del_size:
-            self._report_size_change(user, account, -del_size, {
-                                     'action': 'object delete', 'path': path})
+            self._report_size_change(user, account, -del_size,
+                                     {'action': 'object delete', 'path': path,
+                                      'versions': [dest_version_id]})
         self._report_object_change(
             user, account, path, details={'action': 'object delete'})
         self.permissions.access_clear(path)
@@ -953,7 +984,10 @@ class ModularBackend(BaseBackend):
                 del_size = self._apply_versioning(
                     account, container, src_version_id)
                 if del_size:
-                    self._report_size_change(user, account, -del_size, {'action': 'object delete', 'path': path})
+                    self._report_size_change(user, account, -del_size,
+                                             {'action': 'object delete',
+                                              'path': path,
+                                              'versions': [dest_version_id]})
                 self._report_object_change(
                     user, account, path, details={'action': 'object delete'})
                 paths.append(path)
@@ -1211,25 +1245,32 @@ class ModularBackend(BaseBackend):
         logger.debug(
             "_report_size_change: %s %s %s %s", user, account, size, details)
         self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % ('resource.diskspace',), 
-                                                 account,
-                                                 QUEUE_INSTANCE_ID,
-                                                 'diskspace',
-                                                 float(size),
-                                                 details))
+                              account, QUEUE_INSTANCE_ID, 'diskspace',
+                              float(size), details))
+
+        serial = self.quotaholder.issue_commission(
+                context     =   {},
+                target      =   account,
+                key         =   '1',
+                clientkey   =   'pithos',
+                ownerkey    =   '',
+                provisions  =   (('pithos+', 'diskspace', size),)
+        )
+        self.serials.append(serial)
 
     def _report_object_change(self, user, account, path, details={}):
         details.update({'user': user})
         logger.debug("_report_object_change: %s %s %s %s", user,
                      account, path, details)
-        self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % (
-            'object',), account, QUEUE_INSTANCE_ID, 'object', path, details))
+        self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % ('object',),
+                                                  account, QUEUE_INSTANCE_ID, 'object', path, details))
 
     def _report_sharing_change(self, user, account, path, details={}):
         logger.debug("_report_permissions_change: %s %s %s %s",
                      user, account, path, details)
         details.update({'user': user})
-        self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % ('sharing',), account,
-                             QUEUE_INSTANCE_ID, 'sharing', path, details))
+        self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % ('sharing',),
+                                                  account, QUEUE_INSTANCE_ID, 'sharing', path, details))
 
     # Policy functions.