# interpreted as representing official policies, either expressed
# or implied, of GRNET S.A.
-from pithos.lib.queue import exchange_connect, exchange_send
+from pithos.lib.queue import exchange_connect, exchange_send, Receipt
class Queue(object):
"""Queue.
- Required contstructor parameters: exchange.
+ Required constructor parameters: exchange, message_key, client_id.
"""
def __init__(self, **params):
exchange = params['exchange']
self.conn = exchange_connect(exchange)
+ self.message_key = params['message_key']
+ self.client_id = params['client_id']
- def send(self, key, value):
- exchange_send(self.conn, key, value)
-
+ def send(self, user, resource, value, details):
+ body = Receipt(self.client_id, user, resource, value, details).format()
+ exchange_send(self.conn, self.message_key, body)
DEFAULT_DB_CONNECTION = 'sqlite:///backend.db'
DEFAULT_BLOCK_MODULE = 'pithos.backends.lib.hashfiler'
DEFAULT_BLOCK_PATH = 'data/'
-DEFAULT_QUEUE_MODULE = 'pithos.backends.lib.rabbitmq'
-DEFAULT_QUEUE_CONNECTION = 'rabbitmq://guest:guest@localhost:5672/pithos'
+#DEFAULT_QUEUE_MODULE = 'pithos.backends.lib.rabbitmq'
+#DEFAULT_QUEUE_CONNECTION = 'rabbitmq://guest:guest@localhost:5672/pithos'
+
+QUEUE_MESSAGE_KEY = '#'
+QUEUE_CLIENT_ID = 2 # Pithos.
( CLUSTER_NORMAL, CLUSTER_HISTORY, CLUSTER_DELETED ) = range(3)
if queue_module and queue_connection:
self.queue_module = load_module(queue_module)
- params = {'exchange': queue_connection}
+ params = {'exchange': queue_connection,
+ 'message_key': QUEUE_MESSAGE_KEY,
+ 'client_id': QUEUE_CLIENT_ID}
self.queue = self.queue_module.Queue(**params)
else:
class NoQueue:
for h in hashes:
self.store.map_delete(h)
self.node.node_purge_children(node, until, CLUSTER_DELETED)
- self.queue.send('#', {'op': 'del objects'})
+ self.queue.send(user, 'diskspace', 0, {'action': 'delete', 'total': 0})
return
if self._get_statistics(node)[0] > 0:
self.store.map_delete(h)
self.node.node_purge_children(node, inf, CLUSTER_DELETED)
self.node.node_remove(node)
- self.queue.send('#', {'op': 'del objects'})
+ self.queue.send(user, 'diskspace', 0, {'action': 'delete', 'total': 0})
@backend_method
def list_objects(self, user, account, container, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, domain=None, keys=[], shared=False, until=None, size_range=None):
pre_version_id, dest_version_id = self._update_object_hash(user, account, container, name, size, binascii.hexlify(hash), permissions)
self._put_metadata_duplicate(pre_version_id, dest_version_id, domain, meta, replace_meta)
self.store.map_put(hash, map)
- self.queue.send('#', {'op': 'add object', 'id': dest_version_id})
+ self.queue.send(user, 'diskspace', 0, {'action': 'add', 'version': dest_version_id, 'total': 0})
return dest_version_id
def _copy_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, dest_domain=None, dest_meta={}, replace_meta=False, permissions=None, src_version=None, is_move=False):
logger.debug("copy_object: %s %s %s %s %s %s %s %s %s %s %s", src_account, src_container, src_name, dest_account, dest_container, dest_name, domain, meta, replace_meta, permissions, src_version)
dest_version_id = self._copy_object(user, src_account, src_container, src_name, dest_account, dest_container, dest_name, domain, meta, replace_meta, permissions, src_version, False)
- self.queue.send('#', {'op': 'add object', 'id': dest_version_id})
+ self.queue.send(user, 'diskspace', 0, {'action': 'add', 'version': dest_version_id, 'total': 0})
return dest_version_id
@backend_method
dest_version_id = self._copy_object(user, src_account, src_container, src_name, dest_account, dest_container, dest_name, domain, meta, replace_meta, permissions, None, True)
if (src_account, src_container, src_name) != (dest_account, dest_container, dest_name):
self._delete_object(user, src_account, src_container, src_name)
- self.queue.send('#', {'op': 'add object', 'id': dest_version_id})
+ self.queue.send(user, 'diskspace', 0, {'action': 'add', 'version': dest_version_id, 'total': 0})
return dest_version_id
def _delete_object(self, user, account, container, name, until=None):
props = self._get_version(node)
except NameError:
self.permissions.access_clear(path)
- self.queue.send('#', {'op': 'del objects'})
+ self.queue.send(user, 'diskspace', 0, {'action': 'delete', 'total': 0})
return
path, node = self._lookup_object(account, container, name)
if versioning != 'auto':
hash = self.node.version_remove(version_id)
self.store.map_delete(hash)
+ self.queue.send(user, 'diskspace', 0, {'action': 'delete', 'total': 0})
# Access control functions.