Revision 46286f5f
b/pithos/backends/lib/rabbitmq/__init__.py | ||
---|---|---|
1 |
# Copyright 2012 GRNET S.A. All rights reserved. |
|
2 |
# |
|
3 |
# Redistribution and use in source and binary forms, with or |
|
4 |
# without modification, are permitted provided that the following |
|
5 |
# conditions are met: |
|
6 |
# |
|
7 |
# 1. Redistributions of source code must retain the above |
|
8 |
# copyright notice, this list of conditions and the following |
|
9 |
# disclaimer. |
|
10 |
# |
|
11 |
# 2. Redistributions in binary form must reproduce the above |
|
12 |
# copyright notice, this list of conditions and the following |
|
13 |
# disclaimer in the documentation and/or other materials |
|
14 |
# provided with the distribution. |
|
15 |
# |
|
16 |
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS |
|
17 |
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED |
|
18 |
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR |
|
19 |
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR |
|
20 |
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, |
|
21 |
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT |
|
22 |
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF |
|
23 |
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED |
|
24 |
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT |
|
25 |
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN |
|
26 |
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE |
|
27 |
# POSSIBILITY OF SUCH DAMAGE. |
|
28 |
# |
|
29 |
# The views and conclusions contained in the software and |
|
30 |
# documentation are those of the authors and should not be |
|
31 |
# interpreted as representing official policies, either expressed |
|
32 |
# or implied, of GRNET S.A. |
|
33 |
|
|
34 |
from queue import Queue |
|
35 |
|
|
36 |
__all__ = ["Queue"] |
|
37 |
|
b/pithos/backends/lib/rabbitmq/queue.py | ||
---|---|---|
1 |
# Copyright 2012 GRNET S.A. All rights reserved. |
|
2 |
# |
|
3 |
# Redistribution and use in source and binary forms, with or |
|
4 |
# without modification, are permitted provided that the following |
|
5 |
# conditions are met: |
|
6 |
# |
|
7 |
# 1. Redistributions of source code must retain the above |
|
8 |
# copyright notice, this list of conditions and the following |
|
9 |
# disclaimer. |
|
10 |
# |
|
11 |
# 2. Redistributions in binary form must reproduce the above |
|
12 |
# copyright notice, this list of conditions and the following |
|
13 |
# disclaimer in the documentation and/or other materials |
|
14 |
# provided with the distribution. |
|
15 |
# |
|
16 |
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS |
|
17 |
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED |
|
18 |
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR |
|
19 |
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR |
|
20 |
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, |
|
21 |
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT |
|
22 |
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF |
|
23 |
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED |
|
24 |
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT |
|
25 |
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN |
|
26 |
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE |
|
27 |
# POSSIBILITY OF SUCH DAMAGE. |
|
28 |
# |
|
29 |
# The views and conclusions contained in the software and |
|
30 |
# documentation are those of the authors and should not be |
|
31 |
# interpreted as representing official policies, either expressed |
|
32 |
# or implied, of GRNET S.A. |
|
33 |
|
|
34 |
from pithos.lib.queue import exchange_connect, exchange_send |
|
35 |
|
|
36 |
|
|
37 |
class Queue(object): |
|
38 |
"""Queue. |
|
39 |
Required contstructor parameters: exchange. |
|
40 |
""" |
|
41 |
|
|
42 |
def __init__(self, **params): |
|
43 |
exchange = params['exchange'] |
|
44 |
self.conn = exchange_connect(exchange) |
|
45 |
|
|
46 |
def send(self, key, value): |
|
47 |
exchange_send(self.conn, key, value) |
|
48 |
|
b/pithos/backends/modular.py | ||
---|---|---|
47 | 47 |
DEFAULT_DB_CONNECTION = 'sqlite:///backend.db' |
48 | 48 |
DEFAULT_BLOCK_MODULE = 'pithos.backends.lib.hashfiler' |
49 | 49 |
DEFAULT_BLOCK_PATH = 'data/' |
50 |
DEFAULT_QUEUE_MODULE = 'pithos.backends.lib.rabbitmq' |
|
51 |
DEFAULT_QUEUE_CONNECTION = 'rabbitmq://guest:guest@localhost:5672/pithos' |
|
50 | 52 |
|
51 | 53 |
( CLUSTER_NORMAL, CLUSTER_HISTORY, CLUSTER_DELETED ) = range(3) |
52 | 54 |
|
... | ... | |
84 | 86 |
Uses modules for SQL functions and storage. |
85 | 87 |
""" |
86 | 88 |
|
87 |
def __init__(self, db_module=None, db_connection=None, block_module=None, block_path=None): |
|
89 |
def __init__(self, db_module=None, db_connection=None, |
|
90 |
block_module=None, block_path=None, |
|
91 |
queue_module=None, queue_connection=None): |
|
88 | 92 |
db_module = db_module or DEFAULT_DB_MODULE |
89 | 93 |
db_connection = db_connection or DEFAULT_DB_CONNECTION |
90 | 94 |
block_module = block_module or DEFAULT_BLOCK_MODULE |
91 | 95 |
block_path = block_path or DEFAULT_BLOCK_PATH |
96 |
#queue_module = queue_module or DEFAULT_QUEUE_MODULE |
|
97 |
#queue_connection = queue_connection or DEFAULT_QUEUE_CONNECTION |
|
92 | 98 |
|
93 | 99 |
self.hash_algorithm = 'sha256' |
94 | 100 |
self.block_size = 4 * 1024 * 1024 # 4MB |
95 | 101 |
|
96 | 102 |
self.default_policy = {'quota': DEFAULT_QUOTA, 'versioning': DEFAULT_VERSIONING} |
97 | 103 |
|
98 |
__import__(db_module)
|
|
99 |
self.db_module = sys.modules[db_module]
|
|
100 |
self.wrapper = self.db_module.DBWrapper(db_connection)
|
|
104 |
def load_module(m):
|
|
105 |
__import__(m)
|
|
106 |
return sys.modules[m]
|
|
101 | 107 |
|
108 |
self.db_module = load_module(db_module) |
|
109 |
self.wrapper = self.db_module.DBWrapper(db_connection) |
|
102 | 110 |
params = {'wrapper': self.wrapper} |
103 | 111 |
self.permissions = self.db_module.Permissions(**params) |
104 | 112 |
for x in ['READ', 'WRITE']: |
... | ... | |
107 | 115 |
for x in ['ROOTNODE', 'SERIAL', 'HASH', 'SIZE', 'MTIME', 'MUSER', 'UUID', 'CLUSTER']: |
108 | 116 |
setattr(self, x, getattr(self.db_module, x)) |
109 | 117 |
|
110 |
__import__(block_module) |
|
111 |
self.block_module = sys.modules[block_module] |
|
112 |
|
|
118 |
self.block_module = load_module(block_module) |
|
113 | 119 |
params = {'path': block_path, |
114 | 120 |
'block_size': self.block_size, |
115 | 121 |
'hash_algorithm': self.hash_algorithm} |
116 | 122 |
self.store = self.block_module.Store(**params) |
123 |
|
|
124 |
if queue_module and queue_connection: |
|
125 |
self.queue_module = load_module(queue_module) |
|
126 |
params = {'exchange': queue_connection} |
|
127 |
self.queue = self.queue_module.Queue(**params) |
|
128 |
else: |
|
129 |
class NoQueue: |
|
130 |
def send(self, key, value): |
|
131 |
pass |
|
132 |
|
|
133 |
self.queue = NoQueue() |
|
117 | 134 |
|
118 | 135 |
def close(self): |
119 | 136 |
self.wrapper.close() |
... | ... | |
367 | 384 |
for h in hashes: |
368 | 385 |
self.store.map_delete(h) |
369 | 386 |
self.node.node_purge_children(node, until, CLUSTER_DELETED) |
387 |
self.queue.send('#', {'op': 'del objects'}) |
|
370 | 388 |
return |
371 | 389 |
|
372 | 390 |
if self._get_statistics(node)[0] > 0: |
... | ... | |
376 | 394 |
self.store.map_delete(h) |
377 | 395 |
self.node.node_purge_children(node, inf, CLUSTER_DELETED) |
378 | 396 |
self.node.node_remove(node) |
397 |
self.queue.send('#', {'op': 'del objects'}) |
|
379 | 398 |
|
380 | 399 |
@backend_method |
381 | 400 |
def list_objects(self, user, account, container, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, domain=None, keys=[], shared=False, until=None, size_range=None): |
... | ... | |
565 | 584 |
pre_version_id, dest_version_id = self._update_object_hash(user, account, container, name, size, binascii.hexlify(hash), permissions) |
566 | 585 |
self._put_metadata_duplicate(pre_version_id, dest_version_id, domain, meta, replace_meta) |
567 | 586 |
self.store.map_put(hash, map) |
587 |
self.queue.send('#', {'op': 'add object', 'id': dest_version_id}) |
|
568 | 588 |
return dest_version_id |
569 | 589 |
|
570 | 590 |
def _copy_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, dest_domain=None, dest_meta={}, replace_meta=False, permissions=None, src_version=None, is_move=False): |
... | ... | |
586 | 606 |
"""Copy an object's data and metadata.""" |
587 | 607 |
|
588 | 608 |
logger.debug("copy_object: %s %s %s %s %s %s %s %s %s %s %s", src_account, src_container, src_name, dest_account, dest_container, dest_name, domain, meta, replace_meta, permissions, src_version) |
589 |
return self._copy_object(user, src_account, src_container, src_name, dest_account, dest_container, dest_name, domain, meta, replace_meta, permissions, src_version, False) |
|
609 |
dest_version_id = self._copy_object(user, src_account, src_container, src_name, dest_account, dest_container, dest_name, domain, meta, replace_meta, permissions, src_version, False) |
|
610 |
self.queue.send('#', {'op': 'add object', 'id': dest_version_id}) |
|
611 |
return dest_version_id |
|
590 | 612 |
|
591 | 613 |
@backend_method |
592 | 614 |
def move_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, domain, meta={}, replace_meta=False, permissions=None): |
... | ... | |
598 | 620 |
dest_version_id = self._copy_object(user, src_account, src_container, src_name, dest_account, dest_container, dest_name, domain, meta, replace_meta, permissions, None, True) |
599 | 621 |
if (src_account, src_container, src_name) != (dest_account, dest_container, dest_name): |
600 | 622 |
self._delete_object(user, src_account, src_container, src_name) |
623 |
self.queue.send('#', {'op': 'add object', 'id': dest_version_id}) |
|
601 | 624 |
return dest_version_id |
602 | 625 |
|
603 | 626 |
def _delete_object(self, user, account, container, name, until=None): |
... | ... | |
618 | 641 |
props = self._get_version(node) |
619 | 642 |
except NameError: |
620 | 643 |
self.permissions.access_clear(path) |
644 |
self.queue.send('#', {'op': 'del objects'}) |
|
621 | 645 |
return |
622 | 646 |
|
623 | 647 |
path, node = self._lookup_object(account, container, name) |
Also available in: Unified diff