--- /dev/null
+# Copyright 2012 GRNET S.A. All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or
+# without modification, are permitted provided that the following
+# conditions are met:
+#
+# 1. Redistributions of source code must retain the above
+# copyright notice, this list of conditions and the following
+# disclaimer.
+#
+# 2. Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following
+# disclaimer in the documentation and/or other materials
+# provided with the distribution.
+#
+# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
+# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
+# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
+# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
+# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
+# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
+# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
+# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+# POSSIBILITY OF SUCH DAMAGE.
+#
+# The views and conclusions contained in the software and
+# documentation are those of the authors and should not be
+# interpreted as representing official policies, either expressed
+# or implied, of GRNET S.A.
+
+from queue import Queue
+
+__all__ = ["Queue"]
+
--- /dev/null
+# Copyright 2012 GRNET S.A. All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or
+# without modification, are permitted provided that the following
+# conditions are met:
+#
+# 1. Redistributions of source code must retain the above
+# copyright notice, this list of conditions and the following
+# disclaimer.
+#
+# 2. Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following
+# disclaimer in the documentation and/or other materials
+# provided with the distribution.
+#
+# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
+# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
+# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
+# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
+# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
+# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
+# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
+# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+# POSSIBILITY OF SUCH DAMAGE.
+#
+# The views and conclusions contained in the software and
+# documentation are those of the authors and should not be
+# interpreted as representing official policies, either expressed
+# or implied, of GRNET S.A.
+
+from pithos.lib.queue import exchange_connect, exchange_send
+
+
+class Queue(object):
+ """Queue.
+ Required contstructor parameters: exchange.
+ """
+
+ def __init__(self, **params):
+ exchange = params['exchange']
+ self.conn = exchange_connect(exchange)
+
+ def send(self, key, value):
+ exchange_send(self.conn, key, value)
+
DEFAULT_DB_CONNECTION = 'sqlite:///backend.db'
DEFAULT_BLOCK_MODULE = 'pithos.backends.lib.hashfiler'
DEFAULT_BLOCK_PATH = 'data/'
+DEFAULT_QUEUE_MODULE = 'pithos.backends.lib.rabbitmq'
+DEFAULT_QUEUE_CONNECTION = 'rabbitmq://guest:guest@localhost:5672/pithos'
( CLUSTER_NORMAL, CLUSTER_HISTORY, CLUSTER_DELETED ) = range(3)
Uses modules for SQL functions and storage.
"""
- def __init__(self, db_module=None, db_connection=None, block_module=None, block_path=None):
+ def __init__(self, db_module=None, db_connection=None,
+ block_module=None, block_path=None,
+ queue_module=None, queue_connection=None):
db_module = db_module or DEFAULT_DB_MODULE
db_connection = db_connection or DEFAULT_DB_CONNECTION
block_module = block_module or DEFAULT_BLOCK_MODULE
block_path = block_path or DEFAULT_BLOCK_PATH
+ #queue_module = queue_module or DEFAULT_QUEUE_MODULE
+ #queue_connection = queue_connection or DEFAULT_QUEUE_CONNECTION
self.hash_algorithm = 'sha256'
self.block_size = 4 * 1024 * 1024 # 4MB
self.default_policy = {'quota': DEFAULT_QUOTA, 'versioning': DEFAULT_VERSIONING}
- __import__(db_module)
- self.db_module = sys.modules[db_module]
- self.wrapper = self.db_module.DBWrapper(db_connection)
+ def load_module(m):
+ __import__(m)
+ return sys.modules[m]
+ self.db_module = load_module(db_module)
+ self.wrapper = self.db_module.DBWrapper(db_connection)
params = {'wrapper': self.wrapper}
self.permissions = self.db_module.Permissions(**params)
for x in ['READ', 'WRITE']:
for x in ['ROOTNODE', 'SERIAL', 'HASH', 'SIZE', 'MTIME', 'MUSER', 'UUID', 'CLUSTER']:
setattr(self, x, getattr(self.db_module, x))
- __import__(block_module)
- self.block_module = sys.modules[block_module]
-
+ self.block_module = load_module(block_module)
params = {'path': block_path,
'block_size': self.block_size,
'hash_algorithm': self.hash_algorithm}
self.store = self.block_module.Store(**params)
+
+ if queue_module and queue_connection:
+ self.queue_module = load_module(queue_module)
+ params = {'exchange': queue_connection}
+ self.queue = self.queue_module.Queue(**params)
+ else:
+ class NoQueue:
+ def send(self, key, value):
+ pass
+
+ self.queue = NoQueue()
def close(self):
self.wrapper.close()
for h in hashes:
self.store.map_delete(h)
self.node.node_purge_children(node, until, CLUSTER_DELETED)
+ self.queue.send('#', {'op': 'del objects'})
return
if self._get_statistics(node)[0] > 0:
self.store.map_delete(h)
self.node.node_purge_children(node, inf, CLUSTER_DELETED)
self.node.node_remove(node)
+ self.queue.send('#', {'op': 'del objects'})
@backend_method
def list_objects(self, user, account, container, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, domain=None, keys=[], shared=False, until=None, size_range=None):
pre_version_id, dest_version_id = self._update_object_hash(user, account, container, name, size, binascii.hexlify(hash), permissions)
self._put_metadata_duplicate(pre_version_id, dest_version_id, domain, meta, replace_meta)
self.store.map_put(hash, map)
+ self.queue.send('#', {'op': 'add object', 'id': dest_version_id})
return dest_version_id
def _copy_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, dest_domain=None, dest_meta={}, replace_meta=False, permissions=None, src_version=None, is_move=False):
"""Copy an object's data and metadata."""
logger.debug("copy_object: %s %s %s %s %s %s %s %s %s %s %s", src_account, src_container, src_name, dest_account, dest_container, dest_name, domain, meta, replace_meta, permissions, src_version)
- return self._copy_object(user, src_account, src_container, src_name, dest_account, dest_container, dest_name, domain, meta, replace_meta, permissions, src_version, False)
+ dest_version_id = self._copy_object(user, src_account, src_container, src_name, dest_account, dest_container, dest_name, domain, meta, replace_meta, permissions, src_version, False)
+ self.queue.send('#', {'op': 'add object', 'id': dest_version_id})
+ return dest_version_id
@backend_method
def move_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, domain, meta={}, replace_meta=False, permissions=None):
dest_version_id = self._copy_object(user, src_account, src_container, src_name, dest_account, dest_container, dest_name, domain, meta, replace_meta, permissions, None, True)
if (src_account, src_container, src_name) != (dest_account, dest_container, dest_name):
self._delete_object(user, src_account, src_container, src_name)
+ self.queue.send('#', {'op': 'add object', 'id': dest_version_id})
return dest_version_id
def _delete_object(self, user, account, container, name, until=None):
props = self._get_version(node)
except NameError:
self.permissions.access_clear(path)
+ self.queue.send('#', {'op': 'del objects'})
return
path, node = self._lookup_object(account, container, name)