import hashlib
import binascii
+from commissioning.clients.quotaholder import QuotaholderHTTP
+
from base import DEFAULT_QUOTA, DEFAULT_VERSIONING, NotAllowedError, QuotaError, BaseBackend, \
AccountExists, ContainerExists, AccountNotEmpty, ContainerNotEmpty, ItemNotExists, VersionNotExists
def fn(self, *args, **kw):
self.wrapper.execute()
+ serials = self.serials
+ self.messages = []
try:
- self.messages = []
ret = func(self, *args, **kw)
for m in self.messages:
self.queue.send(*m)
+ if serials:
+ self.quotaholder.accept_commission(
+ context = {},
+ clientkey = 'pithos',
+ serials = serials)
self.wrapper.commit()
return ret
except:
+ self.quotaholder.reject_commission(
+ context = {},
+ clientkey = 'pithos',
+ serials = serials)
self.wrapper.rollback()
raise
return fn
def __init__(self, db_module=None, db_connection=None,
block_module=None, block_path=None, block_umask=None,
queue_module=None, queue_hosts=None,
- queue_exchange=None):
+ queue_exchange=None, quotaholder_url=None):
db_module = db_module or DEFAULT_DB_MODULE
db_connection = db_connection or DEFAULT_DB_CONNECTION
block_module = block_module or DEFAULT_BLOCK_MODULE
#queue_module = queue_module or DEFAULT_QUEUE_MODULE
#queue_hosts = queue_hosts or DEFAULT_QUEUE_HOSTS
#queue_exchange = queue_exchange or DEFAULT_QUEUE_EXCHANGE
-
+
self.hash_algorithm = 'sha256'
self.block_size = 4 * 1024 * 1024 # 4MB
if queue_module and queue_hosts:
self.queue_module = load_module(queue_module)
params = {'hosts': queue_hosts,
- 'exchange': queue_exchange,
+ 'exchange': queue_exchange,
'client_id': QUEUE_CLIENT_ID}
self.queue = self.queue_module.Queue(**params)
else:
self.queue = NoQueue()
+ self.quotaholder_url = quotaholder_url
+ self.quotaholder = QuotaholderHTTP(quotaholder_url)
+ self.serials = []
+
def close(self):
self.wrapper.close()
self.queue.close()
self.store.map_delete(h)
self.node.node_purge_children(node, until, CLUSTER_DELETED)
self._report_size_change(user, account, -size,
- {'action':'container purge', 'path': path,
- 'versions': ','.join(str(i) for i in serials)})
+ {'action':'container purge', 'path': path,
+ 'versions': serials})
return
if not delimiter:
self.node.node_purge_children(node, inf, CLUSTER_DELETED)
self.node.node_remove(node)
self._report_size_change(user, account, -size,
- {'action': 'container delete',
- 'path': path,
- 'versions': ','.join(str(i) for i in serials)})
+ {'action': 'container delete',
+ 'path': path,
+ 'versions': serials})
else:
# remove only contents
src_names = self._list_objects_no_limit(user, account, container, prefix='', delimiter=None, virtual=False, domain=None, keys=[], shared=False, until=None, size_range=None, all_props=True, public=False)
account, container, src_version_id)
if del_size:
self._report_size_change(user, account, -del_size,
- {'action': 'object delete',
- 'path': path,
- 'versions': ','.join([str(dest_version_id)])})
+ {'action': 'object delete',
+ 'path': path, 'versions': [dest_version_id]})
self._report_object_change(
user, account, path, details={'action': 'object delete'})
paths.append(path)
# This must be executed in a transaction, so the version is never created if it fails.
raise QuotaError
self._report_size_change(user, account, size_delta,
- {'action': 'object update', 'path': path,
- 'versions': ','.join([str(dest_version_id)])})
+ {'action': 'object update', 'path': path,
+ 'versions': [dest_version_id]})
if permissions is not None:
self.permissions.access_set(path, permissions)
except NameError:
self.permissions.access_clear(path)
self._report_size_change(user, account, -size,
- {'action': 'object purge', 'path': path,
- 'versions': ','.join(str(i) for i in serials)})
+ {'action': 'object purge', 'path': path,
+ 'versions': serials})
return
path, node = self._lookup_object(account, container, name)
del_size = self._apply_versioning(account, container, src_version_id)
if del_size:
self._report_size_change(user, account, -del_size,
- {'action': 'object delete', 'path': path,
- 'versions': ','.join([str(dest_version_id)])})
+ {'action': 'object delete', 'path': path,
+ 'versions': [dest_version_id]})
self._report_object_change(
user, account, path, details={'action': 'object delete'})
self.permissions.access_clear(path)
account, container, src_version_id)
if del_size:
self._report_size_change(user, account, -del_size,
- {'action': 'object delete',
- 'path': path,
- 'versions': ','.join([str(dest_version_id)])})
+ {'action': 'object delete',
+ 'path': path,
+ 'versions': [dest_version_id]})
self._report_object_change(
user, account, path, details={'action': 'object delete'})
paths.append(path)
logger.debug(
"_report_size_change: %s %s %s %s", user, account, size, details)
self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % ('resource.diskspace',),
- account, QUEUE_INSTANCE_ID, 'diskspace',
- float(size), details))
+ account, QUEUE_INSTANCE_ID, 'diskspace',
+ float(size), details))
+
+ serial = self.quotaholder.issue_commission(
+ context = {},
+ target = account,
+ key = '1',
+ clientkey = 'pithos',
+ ownerkey = '',
+ provisions = (('pithos+', 'diskspace', size),)
+ )
+ self.serials.append(serial)
def _report_object_change(self, user, account, path, details={}):
details.update({'user': user})
logger.debug("_report_object_change: %s %s %s %s", user,
account, path, details)
self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % ('object',),
- account, QUEUE_INSTANCE_ID, 'object', path, details))
+ account, QUEUE_INSTANCE_ID, 'object', path, details))
def _report_sharing_change(self, user, account, path, details={}):
logger.debug("_report_permissions_change: %s %s %s %s",
user, account, path, details)
details.update({'user': user})
self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % ('sharing',),
- account, QUEUE_INSTANCE_ID, 'sharing', path, details))
+ account, QUEUE_INSTANCE_ID, 'sharing', path, details))
# Policy functions.