Revision 46286f5f pithos/backends/modular.py

b/pithos/backends/modular.py
47 47
DEFAULT_DB_CONNECTION = 'sqlite:///backend.db'
48 48
DEFAULT_BLOCK_MODULE = 'pithos.backends.lib.hashfiler'
49 49
DEFAULT_BLOCK_PATH = 'data/'
50
DEFAULT_QUEUE_MODULE = 'pithos.backends.lib.rabbitmq'
51
DEFAULT_QUEUE_CONNECTION = 'rabbitmq://guest:guest@localhost:5672/pithos'
50 52

  
51 53
( CLUSTER_NORMAL, CLUSTER_HISTORY, CLUSTER_DELETED ) = range(3)
52 54

  
......
84 86
    Uses modules for SQL functions and storage.
85 87
    """
86 88
    
87
    def __init__(self, db_module=None, db_connection=None, block_module=None, block_path=None):
89
    def __init__(self, db_module=None, db_connection=None,
90
                 block_module=None, block_path=None,
91
                 queue_module=None, queue_connection=None):
88 92
        db_module = db_module or DEFAULT_DB_MODULE
89 93
        db_connection = db_connection or DEFAULT_DB_CONNECTION
90 94
        block_module = block_module or DEFAULT_BLOCK_MODULE
91 95
        block_path = block_path or DEFAULT_BLOCK_PATH
96
        #queue_module = queue_module or DEFAULT_QUEUE_MODULE
97
        #queue_connection = queue_connection or DEFAULT_QUEUE_CONNECTION
92 98
        
93 99
        self.hash_algorithm = 'sha256'
94 100
        self.block_size = 4 * 1024 * 1024 # 4MB
95 101
        
96 102
        self.default_policy = {'quota': DEFAULT_QUOTA, 'versioning': DEFAULT_VERSIONING}
97 103
        
98
        __import__(db_module)
99
        self.db_module = sys.modules[db_module]
100
        self.wrapper = self.db_module.DBWrapper(db_connection)
104
        def load_module(m):
105
            __import__(m)
106
            return sys.modules[m]
101 107
        
108
        self.db_module = load_module(db_module)
109
        self.wrapper = self.db_module.DBWrapper(db_connection)
102 110
        params = {'wrapper': self.wrapper}
103 111
        self.permissions = self.db_module.Permissions(**params)
104 112
        for x in ['READ', 'WRITE']:
......
107 115
        for x in ['ROOTNODE', 'SERIAL', 'HASH', 'SIZE', 'MTIME', 'MUSER', 'UUID', 'CLUSTER']:
108 116
            setattr(self, x, getattr(self.db_module, x))
109 117
        
110
        __import__(block_module)
111
        self.block_module = sys.modules[block_module]
112
        
118
        self.block_module = load_module(block_module)
113 119
        params = {'path': block_path,
114 120
                  'block_size': self.block_size,
115 121
                  'hash_algorithm': self.hash_algorithm}
116 122
        self.store = self.block_module.Store(**params)
123

  
124
        if queue_module and queue_connection:
125
            self.queue_module = load_module(queue_module)
126
            params = {'exchange': queue_connection}
127
            self.queue = self.queue_module.Queue(**params)
128
        else:
129
            class NoQueue:
130
                def send(self, key, value):
131
                    pass
132
            
133
            self.queue = NoQueue()
117 134
    
118 135
    def close(self):
119 136
        self.wrapper.close()
......
367 384
            for h in hashes:
368 385
                self.store.map_delete(h)
369 386
            self.node.node_purge_children(node, until, CLUSTER_DELETED)
387
            self.queue.send('#', {'op': 'del objects'})
370 388
            return
371 389
        
372 390
        if self._get_statistics(node)[0] > 0:
......
376 394
            self.store.map_delete(h)
377 395
        self.node.node_purge_children(node, inf, CLUSTER_DELETED)
378 396
        self.node.node_remove(node)
397
        self.queue.send('#', {'op': 'del objects'})
379 398
    
380 399
    @backend_method
381 400
    def list_objects(self, user, account, container, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, domain=None, keys=[], shared=False, until=None, size_range=None):
......
565 584
        pre_version_id, dest_version_id = self._update_object_hash(user, account, container, name, size, binascii.hexlify(hash), permissions)
566 585
        self._put_metadata_duplicate(pre_version_id, dest_version_id, domain, meta, replace_meta)
567 586
        self.store.map_put(hash, map)
587
        self.queue.send('#', {'op': 'add object', 'id': dest_version_id})
568 588
        return dest_version_id
569 589
    
570 590
    def _copy_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, dest_domain=None, dest_meta={}, replace_meta=False, permissions=None, src_version=None, is_move=False):
......
586 606
        """Copy an object's data and metadata."""
587 607
        
588 608
        logger.debug("copy_object: %s %s %s %s %s %s %s %s %s %s %s", src_account, src_container, src_name, dest_account, dest_container, dest_name, domain, meta, replace_meta, permissions, src_version)
589
        return self._copy_object(user, src_account, src_container, src_name, dest_account, dest_container, dest_name, domain, meta, replace_meta, permissions, src_version, False)
609
        dest_version_id = self._copy_object(user, src_account, src_container, src_name, dest_account, dest_container, dest_name, domain, meta, replace_meta, permissions, src_version, False)
610
        self.queue.send('#', {'op': 'add object', 'id': dest_version_id})
611
        return dest_version_id
590 612
    
591 613
    @backend_method
592 614
    def move_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, domain, meta={}, replace_meta=False, permissions=None):
......
598 620
        dest_version_id = self._copy_object(user, src_account, src_container, src_name, dest_account, dest_container, dest_name, domain, meta, replace_meta, permissions, None, True)
599 621
        if (src_account, src_container, src_name) != (dest_account, dest_container, dest_name):
600 622
            self._delete_object(user, src_account, src_container, src_name)
623
        self.queue.send('#', {'op': 'add object', 'id': dest_version_id})
601 624
        return dest_version_id
602 625
    
603 626
    def _delete_object(self, user, account, container, name, until=None):
......
618 641
                props = self._get_version(node)
619 642
            except NameError:
620 643
                self.permissions.access_clear(path)
644
            self.queue.send('#', {'op': 'del objects'})
621 645
            return
622 646
        
623 647
        path, node = self._lookup_object(account, container, name)

Also available in: Unified diff