Revision fa9cae7e pithos/backends/modular.py

b/pithos/backends/modular.py
47 47
DEFAULT_DB_CONNECTION = 'sqlite:///backend.db'
48 48
DEFAULT_BLOCK_MODULE = 'pithos.backends.lib.hashfiler'
49 49
DEFAULT_BLOCK_PATH = 'data/'
50
DEFAULT_QUEUE_MODULE = 'pithos.backends.lib.rabbitmq'
51
DEFAULT_QUEUE_CONNECTION = 'rabbitmq://guest:guest@localhost:5672/pithos'
50
#DEFAULT_QUEUE_MODULE = 'pithos.backends.lib.rabbitmq'
51
#DEFAULT_QUEUE_CONNECTION = 'rabbitmq://guest:guest@localhost:5672/pithos'
52

  
53
QUEUE_MESSAGE_KEY = '#'
54
QUEUE_CLIENT_ID = 2 # Pithos.
52 55

  
53 56
( CLUSTER_NORMAL, CLUSTER_HISTORY, CLUSTER_DELETED ) = range(3)
54 57

  
......
123 126

  
124 127
        if queue_module and queue_connection:
125 128
            self.queue_module = load_module(queue_module)
126
            params = {'exchange': queue_connection}
129
            params = {'exchange': queue_connection,
130
                      'message_key': QUEUE_MESSAGE_KEY,
131
                      'client_id': QUEUE_CLIENT_ID}
127 132
            self.queue = self.queue_module.Queue(**params)
128 133
        else:
129 134
            class NoQueue:
......
384 389
            for h in hashes:
385 390
                self.store.map_delete(h)
386 391
            self.node.node_purge_children(node, until, CLUSTER_DELETED)
387
            self.queue.send('#', {'op': 'del objects'})
392
            self.queue.send(user, 'diskspace', 0, {'action': 'delete', 'total': 0})
388 393
            return
389 394
        
390 395
        if self._get_statistics(node)[0] > 0:
......
394 399
            self.store.map_delete(h)
395 400
        self.node.node_purge_children(node, inf, CLUSTER_DELETED)
396 401
        self.node.node_remove(node)
397
        self.queue.send('#', {'op': 'del objects'})
402
        self.queue.send(user, 'diskspace', 0, {'action': 'delete', 'total': 0})
398 403
    
399 404
    @backend_method
400 405
    def list_objects(self, user, account, container, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, domain=None, keys=[], shared=False, until=None, size_range=None):
......
584 589
        pre_version_id, dest_version_id = self._update_object_hash(user, account, container, name, size, binascii.hexlify(hash), permissions)
585 590
        self._put_metadata_duplicate(pre_version_id, dest_version_id, domain, meta, replace_meta)
586 591
        self.store.map_put(hash, map)
587
        self.queue.send('#', {'op': 'add object', 'id': dest_version_id})
592
        self.queue.send(user, 'diskspace', 0, {'action': 'add', 'version': dest_version_id, 'total': 0})
588 593
        return dest_version_id
589 594
    
590 595
    def _copy_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, dest_domain=None, dest_meta={}, replace_meta=False, permissions=None, src_version=None, is_move=False):
......
607 612
        
608 613
        logger.debug("copy_object: %s %s %s %s %s %s %s %s %s %s %s", src_account, src_container, src_name, dest_account, dest_container, dest_name, domain, meta, replace_meta, permissions, src_version)
609 614
        dest_version_id = self._copy_object(user, src_account, src_container, src_name, dest_account, dest_container, dest_name, domain, meta, replace_meta, permissions, src_version, False)
610
        self.queue.send('#', {'op': 'add object', 'id': dest_version_id})
615
        self.queue.send(user, 'diskspace', 0, {'action': 'add', 'version': dest_version_id, 'total': 0})
611 616
        return dest_version_id
612 617
    
613 618
    @backend_method
......
620 625
        dest_version_id = self._copy_object(user, src_account, src_container, src_name, dest_account, dest_container, dest_name, domain, meta, replace_meta, permissions, None, True)
621 626
        if (src_account, src_container, src_name) != (dest_account, dest_container, dest_name):
622 627
            self._delete_object(user, src_account, src_container, src_name)
623
        self.queue.send('#', {'op': 'add object', 'id': dest_version_id})
628
        self.queue.send(user, 'diskspace', 0, {'action': 'add', 'version': dest_version_id, 'total': 0})
624 629
        return dest_version_id
625 630
    
626 631
    def _delete_object(self, user, account, container, name, until=None):
......
641 646
                props = self._get_version(node)
642 647
            except NameError:
643 648
                self.permissions.access_clear(path)
644
            self.queue.send('#', {'op': 'del objects'})
649
            self.queue.send(user, 'diskspace', 0, {'action': 'delete', 'total': 0})
645 650
            return
646 651
        
647 652
        path, node = self._lookup_object(account, container, name)
......
905 910
        if versioning != 'auto':
906 911
            hash = self.node.version_remove(version_id)
907 912
            self.store.map_delete(hash)
913
            self.queue.send(user, 'diskspace', 0, {'action': 'delete', 'total': 0})
908 914
    
909 915
    # Access control functions.
910 916
    

Also available in: Unified diff