import hashlib
import binascii
+from commissioning.clients.quotaholder import QuotaholderHTTP
+
from base import DEFAULT_QUOTA, DEFAULT_VERSIONING, NotAllowedError, QuotaError, BaseBackend, \
AccountExists, ContainerExists, AccountNotEmpty, ContainerNotEmpty, ItemNotExists, VersionNotExists
DEFAULT_BLOCK_PATH = 'data/'
DEFAULT_BLOCK_UMASK = 0o022
#DEFAULT_QUEUE_MODULE = 'pithos.backends.lib.rabbitmq'
-#DEFAULT_QUEUE_CONNECTION = 'rabbitmq://guest:guest@localhost:5672/pithos'
+#DEFAULT_QUEUE_HOSTS = '[amqp://guest:guest@localhost:5672]'
+#DEFAULT_QUEUE_EXCHANGE = 'pithos'
QUEUE_MESSAGE_KEY_PREFIX = 'pithos.%s'
QUEUE_CLIENT_ID = 'pithos'
def fn(self, *args, **kw):
self.wrapper.execute()
+ serials = self.serials
+ self.messages = []
try:
- self.messages = []
ret = func(self, *args, **kw)
for m in self.messages:
self.queue.send(*m)
+ if serials:
+ self.quotaholder.accept_commission(
+ context = {},
+ clientkey = 'pithos',
+ serials = serials)
self.wrapper.commit()
return ret
except:
+ self.quotaholder.reject_commission(
+ context = {},
+ clientkey = 'pithos',
+ serials = serials)
self.wrapper.rollback()
raise
return fn
def __init__(self, db_module=None, db_connection=None,
block_module=None, block_path=None, block_umask=None,
- queue_module=None, queue_connection=None):
+ queue_module=None, queue_hosts=None,
+ queue_exchange=None, quotaholder_url=None):
db_module = db_module or DEFAULT_DB_MODULE
db_connection = db_connection or DEFAULT_DB_CONNECTION
block_module = block_module or DEFAULT_BLOCK_MODULE
block_path = block_path or DEFAULT_BLOCK_PATH
block_umask = block_umask or DEFAULT_BLOCK_UMASK
#queue_module = queue_module or DEFAULT_QUEUE_MODULE
- #queue_connection = queue_connection or DEFAULT_QUEUE_CONNECTION
-
+ #queue_hosts = queue_hosts or DEFAULT_QUEUE_HOSTS
+ #queue_exchange = queue_exchange or DEFAULT_QUEUE_EXCHANGE
+
self.hash_algorithm = 'sha256'
self.block_size = 4 * 1024 * 1024 # 4MB
params = {'wrapper': self.wrapper}
self.permissions = self.db_module.Permissions(**params)
self.config = self.db_module.Config(**params)
- self.config = self.db_module.QuotaholderSync(**params)
+ self.quotaholder_serials = self.db_module.QuotaholderSerial(**params)
for x in ['READ', 'WRITE']:
setattr(self, x, getattr(self.db_module, x))
self.node = self.db_module.Node(**params)
'umask': block_umask}
self.store = self.block_module.Store(**params)
- if queue_module and queue_connection:
+ if queue_module and queue_hosts:
self.queue_module = load_module(queue_module)
- params = {'exchange': queue_connection,
+ params = {'hosts': queue_hosts,
+ 'exchange': queue_exchange,
'client_id': QUEUE_CLIENT_ID}
self.queue = self.queue_module.Queue(**params)
else:
self.queue = NoQueue()
+ self.quotaholder_url = quotaholder_url
+ self.quotaholder = QuotaholderHTTP(quotaholder_url)
+ self.serials = []
+
def close(self):
self.wrapper.close()
self.queue.close()
path, node = self._lookup_container(account, container)
if until is not None:
- hashes, size = self.node.node_purge_children(
+ hashes, size, serials = self.node.node_purge_children(
node, until, CLUSTER_HISTORY)
for h in hashes:
self.store.map_delete(h)
self.node.node_purge_children(node, until, CLUSTER_DELETED)
- self._report_size_change(user, account, -size, {'action':
- 'container purge', 'path': path})
+ self._report_size_change(user, account, -size,
+ {'action':'container purge', 'path': path,
+ 'versions': serials})
return
if not delimiter:
if self._get_statistics(node)[0] > 0:
raise ContainerNotEmpty('Container is not empty')
- hashes, size = self.node.node_purge_children(
+ hashes, size, serials = self.node.node_purge_children(
node, inf, CLUSTER_HISTORY)
for h in hashes:
self.store.map_delete(h)
self.node.node_purge_children(node, inf, CLUSTER_DELETED)
self.node.node_remove(node)
- self._report_size_change(user, account, -size, {'action':
- 'container delete', 'path': path})
+ self._report_size_change(user, account, -size,
+ {'action': 'container delete',
+ 'path': path,
+ 'versions': serials})
else:
# remove only contents
src_names = self._list_objects_no_limit(user, account, container, prefix='', delimiter=None, virtual=False, domain=None, keys=[], shared=False, until=None, size_range=None, all_props=True, public=False)
del_size = self._apply_versioning(
account, container, src_version_id)
if del_size:
- self._report_size_change(user, account, -del_size, {'action': 'object delete', 'path': path})
+ self._report_size_change(user, account, -del_size,
+ {'action': 'object delete',
+ 'path': path, 'versions': [dest_version_id]})
self._report_object_change(
user, account, path, details={'action': 'object delete'})
paths.append(path)
(container_quota > 0 and self._get_statistics(container_node)[1] + size_delta > container_quota):
# This must be executed in a transaction, so the version is never created if it fails.
raise QuotaError
- self._report_size_change(user, account, size_delta, {
- 'action': 'object update', 'path': path})
+ self._report_size_change(user, account, size_delta,
+ {'action': 'object update', 'path': path,
+ 'versions': [dest_version_id]})
if permissions is not None:
self.permissions.access_set(path, permissions)
return
hashes = []
size = 0
- h, s = self.node.node_purge(node, until, CLUSTER_NORMAL)
+ serials = []
+ h, s, v = self.node.node_purge(node, until, CLUSTER_NORMAL)
hashes += h
size += s
- h, s = self.node.node_purge(node, until, CLUSTER_HISTORY)
+ serials += v
+ h, s, v = self.node.node_purge(node, until, CLUSTER_HISTORY)
hashes += h
size += s
+ serials += v
for h in hashes:
self.store.map_delete(h)
self.node.node_purge(node, until, CLUSTER_DELETED)
props = self._get_version(node)
except NameError:
self.permissions.access_clear(path)
- self._report_size_change(user, account, -size, {
- 'action': 'object purge', 'path': path})
+ self._report_size_change(user, account, -size,
+ {'action': 'object purge', 'path': path,
+ 'versions': serials})
return
path, node = self._lookup_object(account, container, name)
src_version_id, dest_version_id = self._put_version_duplicate(user, node, size=0, type='', hash=None, checksum='', cluster=CLUSTER_DELETED)
del_size = self._apply_versioning(account, container, src_version_id)
if del_size:
- self._report_size_change(user, account, -del_size, {
- 'action': 'object delete', 'path': path})
+ self._report_size_change(user, account, -del_size,
+ {'action': 'object delete', 'path': path,
+ 'versions': [dest_version_id]})
self._report_object_change(
user, account, path, details={'action': 'object delete'})
self.permissions.access_clear(path)
del_size = self._apply_versioning(
account, container, src_version_id)
if del_size:
- self._report_size_change(user, account, -del_size, {'action': 'object delete', 'path': path})
+ self._report_size_change(user, account, -del_size,
+ {'action': 'object delete',
+ 'path': path,
+ 'versions': [dest_version_id]})
self._report_object_change(
user, account, path, details={'action': 'object delete'})
paths.append(path)
logger.debug(
"_report_size_change: %s %s %s %s", user, account, size, details)
self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % ('resource.diskspace',),
- account,
- QUEUE_INSTANCE_ID,
- 'diskspace',
- float(size),
- details))
+ account, QUEUE_INSTANCE_ID, 'diskspace',
+ float(size), details))
+
+ serial = self.quotaholder.issue_commission(
+ context = {},
+ target = account,
+ key = '1',
+ clientkey = 'pithos',
+ ownerkey = '',
+ provisions = (('pithos+', 'diskspace', size),)
+ )
+ self.serials.append(serial)
def _report_object_change(self, user, account, path, details={}):
details.update({'user': user})
logger.debug("_report_object_change: %s %s %s %s", user,
account, path, details)
- self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % (
- 'object',), account, QUEUE_INSTANCE_ID, 'object', path, details))
+ self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % ('object',),
+ account, QUEUE_INSTANCE_ID, 'object', path, details))
def _report_sharing_change(self, user, account, path, details={}):
logger.debug("_report_permissions_change: %s %s %s %s",
user, account, path, details)
details.update({'user': user})
- self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % ('sharing',), account,
- QUEUE_INSTANCE_ID, 'sharing', path, details))
+ self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % ('sharing',),
+ account, QUEUE_INSTANCE_ID, 'sharing', path, details))
# Policy functions.