#DEFAULT_QUEUE_MODULE = 'pithos.backends.lib.rabbitmq'
#DEFAULT_QUEUE_CONNECTION = 'rabbitmq://guest:guest@localhost:5672/pithos'
-QUEUE_MESSAGE_KEY = '#'
-QUEUE_CLIENT_ID = 2 # Pithos.
+QUEUE_MESSAGE_KEY = 'pithos'
+QUEUE_CLIENT_ID = 'pithos'
( CLUSTER_NORMAL, CLUSTER_HISTORY, CLUSTER_DELETED ) = range(3)
def fn(self, *args, **kw):
self.wrapper.execute()
try:
+ self.messages = []
ret = func(self, *args, **kw)
self.wrapper.commit()
+ for m in self.messages:
+ self.queue.send(*m)
return ret
except:
self.wrapper.rollback()
if permissions is not None:
self.permissions.access_set(path, permissions)
+
+ self._report_object_change(user, account, path, details={'version': dest_version_id, 'action': 'object update'})
return dest_version_id
@backend_method
del_size = self._apply_versioning(account, container, src_version_id)
if del_size:
self._report_size_change(user, account, -del_size, {'action': 'object delete'})
+ self._report_object_change(user, account, path, details={'action': 'object delete'})
self.permissions.access_clear(path)
@backend_method
account_node = self._lookup_account(account, True)[1]
total = self._get_statistics(account_node)[1]
details.update({'user': user, 'total': total})
- self.queue.send(account, 'diskspace', size, details)
+ self.messages.append((account, 'diskspace', size, details))
+
+ def _report_object_change(self, user, account, path, details={}):
+ logger.debug("_report_object_change: %s %s %s %s", user, account, path, details)
+ details.update({'user': user})
+ self.messages.append((account, 'object', path, details))
# Policy functions.