Revision fa9cae7e pithos/backends/modular.py
b/pithos/backends/modular.py | ||
---|---|---|
47 | 47 |
DEFAULT_DB_CONNECTION = 'sqlite:///backend.db' |
48 | 48 |
DEFAULT_BLOCK_MODULE = 'pithos.backends.lib.hashfiler' |
49 | 49 |
DEFAULT_BLOCK_PATH = 'data/' |
50 |
DEFAULT_QUEUE_MODULE = 'pithos.backends.lib.rabbitmq' |
|
51 |
DEFAULT_QUEUE_CONNECTION = 'rabbitmq://guest:guest@localhost:5672/pithos' |
|
50 |
#DEFAULT_QUEUE_MODULE = 'pithos.backends.lib.rabbitmq' |
|
51 |
#DEFAULT_QUEUE_CONNECTION = 'rabbitmq://guest:guest@localhost:5672/pithos' |
|
52 |
|
|
53 |
QUEUE_MESSAGE_KEY = '#' |
|
54 |
QUEUE_CLIENT_ID = 2 # Pithos. |
|
52 | 55 |
|
53 | 56 |
( CLUSTER_NORMAL, CLUSTER_HISTORY, CLUSTER_DELETED ) = range(3) |
54 | 57 |
|
... | ... | |
123 | 126 |
|
124 | 127 |
if queue_module and queue_connection: |
125 | 128 |
self.queue_module = load_module(queue_module) |
126 |
params = {'exchange': queue_connection} |
|
129 |
params = {'exchange': queue_connection, |
|
130 |
'message_key': QUEUE_MESSAGE_KEY, |
|
131 |
'client_id': QUEUE_CLIENT_ID} |
|
127 | 132 |
self.queue = self.queue_module.Queue(**params) |
128 | 133 |
else: |
129 | 134 |
class NoQueue: |
... | ... | |
384 | 389 |
for h in hashes: |
385 | 390 |
self.store.map_delete(h) |
386 | 391 |
self.node.node_purge_children(node, until, CLUSTER_DELETED) |
387 |
self.queue.send('#', {'op': 'del objects'})
|
|
392 |
self.queue.send(user, 'diskspace', 0, {'action': 'delete', 'total': 0})
|
|
388 | 393 |
return |
389 | 394 |
|
390 | 395 |
if self._get_statistics(node)[0] > 0: |
... | ... | |
394 | 399 |
self.store.map_delete(h) |
395 | 400 |
self.node.node_purge_children(node, inf, CLUSTER_DELETED) |
396 | 401 |
self.node.node_remove(node) |
397 |
self.queue.send('#', {'op': 'del objects'})
|
|
402 |
self.queue.send(user, 'diskspace', 0, {'action': 'delete', 'total': 0})
|
|
398 | 403 |
|
399 | 404 |
@backend_method |
400 | 405 |
def list_objects(self, user, account, container, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, domain=None, keys=[], shared=False, until=None, size_range=None): |
... | ... | |
584 | 589 |
pre_version_id, dest_version_id = self._update_object_hash(user, account, container, name, size, binascii.hexlify(hash), permissions) |
585 | 590 |
self._put_metadata_duplicate(pre_version_id, dest_version_id, domain, meta, replace_meta) |
586 | 591 |
self.store.map_put(hash, map) |
587 |
self.queue.send('#', {'op': 'add object', 'id': dest_version_id})
|
|
592 |
self.queue.send(user, 'diskspace', 0, {'action': 'add', 'version': dest_version_id, 'total': 0})
|
|
588 | 593 |
return dest_version_id |
589 | 594 |
|
590 | 595 |
def _copy_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, dest_domain=None, dest_meta={}, replace_meta=False, permissions=None, src_version=None, is_move=False): |
... | ... | |
607 | 612 |
|
608 | 613 |
logger.debug("copy_object: %s %s %s %s %s %s %s %s %s %s %s", src_account, src_container, src_name, dest_account, dest_container, dest_name, domain, meta, replace_meta, permissions, src_version) |
609 | 614 |
dest_version_id = self._copy_object(user, src_account, src_container, src_name, dest_account, dest_container, dest_name, domain, meta, replace_meta, permissions, src_version, False) |
610 |
self.queue.send('#', {'op': 'add object', 'id': dest_version_id})
|
|
615 |
self.queue.send(user, 'diskspace', 0, {'action': 'add', 'version': dest_version_id, 'total': 0})
|
|
611 | 616 |
return dest_version_id |
612 | 617 |
|
613 | 618 |
@backend_method |
... | ... | |
620 | 625 |
dest_version_id = self._copy_object(user, src_account, src_container, src_name, dest_account, dest_container, dest_name, domain, meta, replace_meta, permissions, None, True) |
621 | 626 |
if (src_account, src_container, src_name) != (dest_account, dest_container, dest_name): |
622 | 627 |
self._delete_object(user, src_account, src_container, src_name) |
623 |
self.queue.send('#', {'op': 'add object', 'id': dest_version_id})
|
|
628 |
self.queue.send(user, 'diskspace', 0, {'action': 'add', 'version': dest_version_id, 'total': 0})
|
|
624 | 629 |
return dest_version_id |
625 | 630 |
|
626 | 631 |
def _delete_object(self, user, account, container, name, until=None): |
... | ... | |
641 | 646 |
props = self._get_version(node) |
642 | 647 |
except NameError: |
643 | 648 |
self.permissions.access_clear(path) |
644 |
self.queue.send('#', {'op': 'del objects'})
|
|
649 |
self.queue.send(user, 'diskspace', 0, {'action': 'delete', 'total': 0})
|
|
645 | 650 |
return |
646 | 651 |
|
647 | 652 |
path, node = self._lookup_object(account, container, name) |
... | ... | |
905 | 910 |
if versioning != 'auto': |
906 | 911 |
hash = self.node.version_remove(version_id) |
907 | 912 |
self.store.map_delete(hash) |
913 |
self.queue.send(user, 'diskspace', 0, {'action': 'delete', 'total': 0}) |
|
908 | 914 |
|
909 | 915 |
# Access control functions. |
910 | 916 |
|
Also available in: Unified diff