Initial queue support in the backend.
authorAntony Chazapis <chazapis@gmail.com>
Sat, 28 Jan 2012 10:08:07 +0000 (12:08 +0200)
committerAntony Chazapis <chazapis@gmail.com>
Sat, 28 Jan 2012 10:08:07 +0000 (12:08 +0200)
Refs #1688
Refs #1792

pithos/backends/lib/rabbitmq/__init__.py [new file with mode: 0644]
pithos/backends/lib/rabbitmq/queue.py [new file with mode: 0644]
pithos/backends/modular.py

diff --git a/pithos/backends/lib/rabbitmq/__init__.py b/pithos/backends/lib/rabbitmq/__init__.py
new file mode 100644 (file)
index 0000000..4377a28
--- /dev/null
@@ -0,0 +1,37 @@
+# Copyright 2012 GRNET S.A. All rights reserved.
+# 
+# Redistribution and use in source and binary forms, with or
+# without modification, are permitted provided that the following
+# conditions are met:
+# 
+#   1. Redistributions of source code must retain the above
+#      copyright notice, this list of conditions and the following
+#      disclaimer.
+# 
+#   2. Redistributions in binary form must reproduce the above
+#      copyright notice, this list of conditions and the following
+#      disclaimer in the documentation and/or other materials
+#      provided with the distribution.
+# 
+# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
+# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
+# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
+# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
+# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
+# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
+# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
+# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+# POSSIBILITY OF SUCH DAMAGE.
+# 
+# The views and conclusions contained in the software and
+# documentation are those of the authors and should not be
+# interpreted as representing official policies, either expressed
+# or implied, of GRNET S.A.
+
+from queue import Queue
+
+__all__ = ["Queue"]
+
diff --git a/pithos/backends/lib/rabbitmq/queue.py b/pithos/backends/lib/rabbitmq/queue.py
new file mode 100644 (file)
index 0000000..0ff7f27
--- /dev/null
@@ -0,0 +1,48 @@
+# Copyright 2012 GRNET S.A. All rights reserved.
+# 
+# Redistribution and use in source and binary forms, with or
+# without modification, are permitted provided that the following
+# conditions are met:
+# 
+#   1. Redistributions of source code must retain the above
+#      copyright notice, this list of conditions and the following
+#      disclaimer.
+# 
+#   2. Redistributions in binary form must reproduce the above
+#      copyright notice, this list of conditions and the following
+#      disclaimer in the documentation and/or other materials
+#      provided with the distribution.
+# 
+# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
+# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
+# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
+# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
+# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
+# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
+# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
+# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+# POSSIBILITY OF SUCH DAMAGE.
+# 
+# The views and conclusions contained in the software and
+# documentation are those of the authors and should not be
+# interpreted as representing official policies, either expressed
+# or implied, of GRNET S.A.
+
+from pithos.lib.queue import exchange_connect, exchange_send
+
+
+class Queue(object):
+    """Queue.
+       Required contstructor parameters: exchange.
+    """
+    
+    def __init__(self, **params):
+        exchange = params['exchange']
+        self.conn = exchange_connect(exchange)
+    
+    def send(self, key, value):
+        exchange_send(self.conn, key, value)
+
index 950c6db..ec87b7a 100644 (file)
@@ -47,6 +47,8 @@ DEFAULT_DB_MODULE = 'pithos.backends.lib.sqlalchemy'
 DEFAULT_DB_CONNECTION = 'sqlite:///backend.db'
 DEFAULT_BLOCK_MODULE = 'pithos.backends.lib.hashfiler'
 DEFAULT_BLOCK_PATH = 'data/'
+DEFAULT_QUEUE_MODULE = 'pithos.backends.lib.rabbitmq'
+DEFAULT_QUEUE_CONNECTION = 'rabbitmq://guest:guest@localhost:5672/pithos'
 
 ( CLUSTER_NORMAL, CLUSTER_HISTORY, CLUSTER_DELETED ) = range(3)
 
@@ -84,21 +86,27 @@ class ModularBackend(BaseBackend):
     Uses modules for SQL functions and storage.
     """
     
-    def __init__(self, db_module=None, db_connection=None, block_module=None, block_path=None):
+    def __init__(self, db_module=None, db_connection=None,
+                 block_module=None, block_path=None,
+                 queue_module=None, queue_connection=None):
         db_module = db_module or DEFAULT_DB_MODULE
         db_connection = db_connection or DEFAULT_DB_CONNECTION
         block_module = block_module or DEFAULT_BLOCK_MODULE
         block_path = block_path or DEFAULT_BLOCK_PATH
+        #queue_module = queue_module or DEFAULT_QUEUE_MODULE
+        #queue_connection = queue_connection or DEFAULT_QUEUE_CONNECTION
         
         self.hash_algorithm = 'sha256'
         self.block_size = 4 * 1024 * 1024 # 4MB
         
         self.default_policy = {'quota': DEFAULT_QUOTA, 'versioning': DEFAULT_VERSIONING}
         
-        __import__(db_module)
-        self.db_module = sys.modules[db_module]
-        self.wrapper = self.db_module.DBWrapper(db_connection)
+        def load_module(m):
+            __import__(m)
+            return sys.modules[m]
         
+        self.db_module = load_module(db_module)
+        self.wrapper = self.db_module.DBWrapper(db_connection)
         params = {'wrapper': self.wrapper}
         self.permissions = self.db_module.Permissions(**params)
         for x in ['READ', 'WRITE']:
@@ -107,13 +115,22 @@ class ModularBackend(BaseBackend):
         for x in ['ROOTNODE', 'SERIAL', 'HASH', 'SIZE', 'MTIME', 'MUSER', 'UUID', 'CLUSTER']:
             setattr(self, x, getattr(self.db_module, x))
         
-        __import__(block_module)
-        self.block_module = sys.modules[block_module]
-        
+        self.block_module = load_module(block_module)
         params = {'path': block_path,
                   'block_size': self.block_size,
                   'hash_algorithm': self.hash_algorithm}
         self.store = self.block_module.Store(**params)
+
+        if queue_module and queue_connection:
+            self.queue_module = load_module(queue_module)
+            params = {'exchange': queue_connection}
+            self.queue = self.queue_module.Queue(**params)
+        else:
+            class NoQueue:
+                def send(self, key, value):
+                    pass
+            
+            self.queue = NoQueue()
     
     def close(self):
         self.wrapper.close()
@@ -367,6 +384,7 @@ class ModularBackend(BaseBackend):
             for h in hashes:
                 self.store.map_delete(h)
             self.node.node_purge_children(node, until, CLUSTER_DELETED)
+            self.queue.send('#', {'op': 'del objects'})
             return
         
         if self._get_statistics(node)[0] > 0:
@@ -376,6 +394,7 @@ class ModularBackend(BaseBackend):
             self.store.map_delete(h)
         self.node.node_purge_children(node, inf, CLUSTER_DELETED)
         self.node.node_remove(node)
+        self.queue.send('#', {'op': 'del objects'})
     
     @backend_method
     def list_objects(self, user, account, container, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, domain=None, keys=[], shared=False, until=None, size_range=None):
@@ -565,6 +584,7 @@ class ModularBackend(BaseBackend):
         pre_version_id, dest_version_id = self._update_object_hash(user, account, container, name, size, binascii.hexlify(hash), permissions)
         self._put_metadata_duplicate(pre_version_id, dest_version_id, domain, meta, replace_meta)
         self.store.map_put(hash, map)
+        self.queue.send('#', {'op': 'add object', 'id': dest_version_id})
         return dest_version_id
     
     def _copy_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, dest_domain=None, dest_meta={}, replace_meta=False, permissions=None, src_version=None, is_move=False):
@@ -586,7 +606,9 @@ class ModularBackend(BaseBackend):
         """Copy an object's data and metadata."""
         
         logger.debug("copy_object: %s %s %s %s %s %s %s %s %s %s %s", src_account, src_container, src_name, dest_account, dest_container, dest_name, domain, meta, replace_meta, permissions, src_version)
-        return self._copy_object(user, src_account, src_container, src_name, dest_account, dest_container, dest_name, domain, meta, replace_meta, permissions, src_version, False)
+        dest_version_id = self._copy_object(user, src_account, src_container, src_name, dest_account, dest_container, dest_name, domain, meta, replace_meta, permissions, src_version, False)
+        self.queue.send('#', {'op': 'add object', 'id': dest_version_id})
+        return dest_version_id
     
     @backend_method
     def move_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, domain, meta={}, replace_meta=False, permissions=None):
@@ -598,6 +620,7 @@ class ModularBackend(BaseBackend):
         dest_version_id = self._copy_object(user, src_account, src_container, src_name, dest_account, dest_container, dest_name, domain, meta, replace_meta, permissions, None, True)
         if (src_account, src_container, src_name) != (dest_account, dest_container, dest_name):
             self._delete_object(user, src_account, src_container, src_name)
+        self.queue.send('#', {'op': 'add object', 'id': dest_version_id})
         return dest_version_id
     
     def _delete_object(self, user, account, container, name, until=None):
@@ -618,6 +641,7 @@ class ModularBackend(BaseBackend):
                 props = self._get_version(node)
             except NameError:
                 self.permissions.access_clear(path)
+            self.queue.send('#', {'op': 'del objects'})
             return
         
         path, node = self._lookup_object(account, container, name)