Format billing message.
authorAntony Chazapis <chazapis@gmail.com>
Mon, 30 Jan 2012 10:46:25 +0000 (12:46 +0200)
committerAntony Chazapis <chazapis@gmail.com>
Mon, 30 Jan 2012 10:46:25 +0000 (12:46 +0200)
Refs #1688

pithos/api/util.py
pithos/backends/lib/rabbitmq/queue.py
pithos/backends/modular.py
pithos/lib/queue.py
pithos/settings.d/10-backend.conf

index 5e80dec..96886dc 100644 (file)
@@ -771,7 +771,9 @@ def get_backend():
     backend = connect_backend(db_module=settings.BACKEND_DB_MODULE,
                               db_connection=settings.BACKEND_DB_CONNECTION,
                               block_module=settings.BACKEND_BLOCK_MODULE,
-                              block_path=settings.BACKEND_BLOCK_PATH)
+                              block_path=settings.BACKEND_BLOCK_PATH,
+                              queue_module=settings.BACKEND_QUEUE_MODULE,
+                              queue_connection=settings.BACKEND_QUEUE_CONNECTION)
     backend.default_policy['quota'] = settings.BACKEND_QUOTA
     backend.default_policy['versioning'] = settings.BACKEND_VERSIONING
     return backend
index 0ff7f27..c2ab22a 100644 (file)
 # interpreted as representing official policies, either expressed
 # or implied, of GRNET S.A.
 
-from pithos.lib.queue import exchange_connect, exchange_send
+from pithos.lib.queue import exchange_connect, exchange_send, Receipt
 
 
 class Queue(object):
     """Queue.
-       Required contstructor parameters: exchange.
+       Required constructor parameters: exchange, message_key, client_id.
     """
     
     def __init__(self, **params):
         exchange = params['exchange']
         self.conn = exchange_connect(exchange)
+        self.message_key = params['message_key']
+        self.client_id = params['client_id']
     
-    def send(self, key, value):
-        exchange_send(self.conn, key, value)
-
+    def send(self, user, resource, value, details):
+        body = Receipt(self.client_id, user, resource, value, details).format()
+        exchange_send(self.conn, self.message_key, body)
index ec87b7a..65fda5e 100644 (file)
@@ -47,8 +47,11 @@ DEFAULT_DB_MODULE = 'pithos.backends.lib.sqlalchemy'
 DEFAULT_DB_CONNECTION = 'sqlite:///backend.db'
 DEFAULT_BLOCK_MODULE = 'pithos.backends.lib.hashfiler'
 DEFAULT_BLOCK_PATH = 'data/'
-DEFAULT_QUEUE_MODULE = 'pithos.backends.lib.rabbitmq'
-DEFAULT_QUEUE_CONNECTION = 'rabbitmq://guest:guest@localhost:5672/pithos'
+#DEFAULT_QUEUE_MODULE = 'pithos.backends.lib.rabbitmq'
+#DEFAULT_QUEUE_CONNECTION = 'rabbitmq://guest:guest@localhost:5672/pithos'
+
+QUEUE_MESSAGE_KEY = '#'
+QUEUE_CLIENT_ID = 2 # Pithos.
 
 ( CLUSTER_NORMAL, CLUSTER_HISTORY, CLUSTER_DELETED ) = range(3)
 
@@ -123,7 +126,9 @@ class ModularBackend(BaseBackend):
 
         if queue_module and queue_connection:
             self.queue_module = load_module(queue_module)
-            params = {'exchange': queue_connection}
+            params = {'exchange': queue_connection,
+                      'message_key': QUEUE_MESSAGE_KEY,
+                      'client_id': QUEUE_CLIENT_ID}
             self.queue = self.queue_module.Queue(**params)
         else:
             class NoQueue:
@@ -384,7 +389,7 @@ class ModularBackend(BaseBackend):
             for h in hashes:
                 self.store.map_delete(h)
             self.node.node_purge_children(node, until, CLUSTER_DELETED)
-            self.queue.send('#', {'op': 'del objects'})
+            self.queue.send(user, 'diskspace', 0, {'action': 'delete', 'total': 0})
             return
         
         if self._get_statistics(node)[0] > 0:
@@ -394,7 +399,7 @@ class ModularBackend(BaseBackend):
             self.store.map_delete(h)
         self.node.node_purge_children(node, inf, CLUSTER_DELETED)
         self.node.node_remove(node)
-        self.queue.send('#', {'op': 'del objects'})
+        self.queue.send(user, 'diskspace', 0, {'action': 'delete', 'total': 0})
     
     @backend_method
     def list_objects(self, user, account, container, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, domain=None, keys=[], shared=False, until=None, size_range=None):
@@ -584,7 +589,7 @@ class ModularBackend(BaseBackend):
         pre_version_id, dest_version_id = self._update_object_hash(user, account, container, name, size, binascii.hexlify(hash), permissions)
         self._put_metadata_duplicate(pre_version_id, dest_version_id, domain, meta, replace_meta)
         self.store.map_put(hash, map)
-        self.queue.send('#', {'op': 'add object', 'id': dest_version_id})
+        self.queue.send(user, 'diskspace', 0, {'action': 'add', 'version': dest_version_id, 'total': 0})
         return dest_version_id
     
     def _copy_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, dest_domain=None, dest_meta={}, replace_meta=False, permissions=None, src_version=None, is_move=False):
@@ -607,7 +612,7 @@ class ModularBackend(BaseBackend):
         
         logger.debug("copy_object: %s %s %s %s %s %s %s %s %s %s %s", src_account, src_container, src_name, dest_account, dest_container, dest_name, domain, meta, replace_meta, permissions, src_version)
         dest_version_id = self._copy_object(user, src_account, src_container, src_name, dest_account, dest_container, dest_name, domain, meta, replace_meta, permissions, src_version, False)
-        self.queue.send('#', {'op': 'add object', 'id': dest_version_id})
+        self.queue.send(user, 'diskspace', 0, {'action': 'add', 'version': dest_version_id, 'total': 0})
         return dest_version_id
     
     @backend_method
@@ -620,7 +625,7 @@ class ModularBackend(BaseBackend):
         dest_version_id = self._copy_object(user, src_account, src_container, src_name, dest_account, dest_container, dest_name, domain, meta, replace_meta, permissions, None, True)
         if (src_account, src_container, src_name) != (dest_account, dest_container, dest_name):
             self._delete_object(user, src_account, src_container, src_name)
-        self.queue.send('#', {'op': 'add object', 'id': dest_version_id})
+        self.queue.send(user, 'diskspace', 0, {'action': 'add', 'version': dest_version_id, 'total': 0})
         return dest_version_id
     
     def _delete_object(self, user, account, container, name, until=None):
@@ -641,7 +646,7 @@ class ModularBackend(BaseBackend):
                 props = self._get_version(node)
             except NameError:
                 self.permissions.access_clear(path)
-            self.queue.send('#', {'op': 'del objects'})
+            self.queue.send(user, 'diskspace', 0, {'action': 'delete', 'total': 0})
             return
         
         path, node = self._lookup_object(account, container, name)
@@ -905,6 +910,7 @@ class ModularBackend(BaseBackend):
         if versioning != 'auto':
             hash = self.node.version_remove(version_id)
             self.store.map_delete(hash)
+            self.queue.send(user, 'diskspace', 0, {'action': 'delete', 'total': 0})
     
     # Access control functions.
     
index 0cfe830..a1c08b8 100755 (executable)
 
 import pika
 import json
+import uuid
 
 from urlparse import urlparse
+from time import time
 
 
 def exchange_connect(exchange, vhost='/'):
@@ -89,3 +91,18 @@ def queue_callback(conn, queue, cb):
 def queue_start(conn):
     connection, channel, exchange = conn
     channel.start_consuming()
+
+class Receipt(object):
+    def __init__(self, client, user, resource, value, details=None):
+        self.eventVersion = 1
+        self.id = str(uuid.uuid4())
+        self.timestamp = int(time() * 1000)
+        self.clientId = client
+        self.userId = user
+        self.resource = resource
+        self.value = value
+        if details:
+            self.details = details
+    
+    def format(self):
+        return self.__dict__
index 5cdb0d6..aeb6b34 100644 (file)
@@ -14,6 +14,12 @@ BACKEND_DB_CONNECTION = 'sqlite:///' + join(PROJECT_PATH, 'backend.db')
 BACKEND_BLOCK_MODULE = 'pithos.backends.lib.hashfiler'
 BACKEND_BLOCK_PATH = join(PROJECT_PATH, 'data/')
 
+# Queue for billing.
+#BACKEND_QUEUE_MODULE = 'pithos.backends.lib.rabbitmq'
+#BACKEND_QUEUE_CONNECTION = 'rabbitmq://guest:guest@localhost:5672/pithos'
+BACKEND_QUEUE_MODULE = None
+BACKEND_QUEUE_CONNECTION = None
+
 # Default setting for new accounts.
 BACKEND_QUOTA = 50 * 1024 * 1024 * 1024
 BACKEND_VERSIONING = 'auto'