#DEFAULT_QUEUE_MODULE = 'pithos.backends.lib.rabbitmq'
#DEFAULT_QUEUE_CONNECTION = 'rabbitmq://guest:guest@localhost:5672/pithos'
-QUEUE_MESSAGE_KEY = '#'
-QUEUE_CLIENT_ID = 2 # Pithos.
+QUEUE_MESSAGE_KEY_PREFIX = 'pithos.%s'
+QUEUE_CLIENT_ID = 'pithos'
( CLUSTER_NORMAL, CLUSTER_HISTORY, CLUSTER_DELETED ) = range(3)
def fn(self, *args, **kw):
self.wrapper.execute()
try:
+ self.messages = []
ret = func(self, *args, **kw)
self.wrapper.commit()
+ for m in self.messages:
+ self.queue.send(*m)
return ret
except:
self.wrapper.rollback()
if queue_module and queue_connection:
self.queue_module = load_module(queue_module)
params = {'exchange': queue_connection,
- 'message_key': QUEUE_MESSAGE_KEY,
'client_id': QUEUE_CLIENT_ID}
self.queue = self.queue_module.Queue(**params)
else:
if user != account:
raise NotAllowedError
path, node = self._lookup_container(account, container)
- self._put_metadata(user, node, domain, meta, replace)
+ src_version_id, dest_version_id = self._put_metadata(user, node, domain, meta, replace)
+ if src_version_id is not None:
+ versioning = self._get_policy(node)['versioning']
+ if versioning != 'auto':
+ self.node.version_remove(src_version_id)
@backend_method
def get_container_policy(self, user, account, container):
hashmap = self.store.map_get(binascii.unhexlify(props[self.HASH]))
return props[self.SIZE], [binascii.hexlify(x) for x in hashmap]
- def _update_object_hash(self, user, account, container, name, size, type, hash, checksum, permissions, src_node=None, is_copy=False):
+ def _update_object_hash(self, user, account, container, name, size, type, hash, checksum, domain, meta, replace_meta, permissions, src_node=None, src_version_id=None, is_copy=False):
if permissions is not None and user != account:
raise NotAllowedError
self._can_write(user, account, container, name)
path, node = self._put_object_node(container_path, container_node, name)
pre_version_id, dest_version_id = self._put_version_duplicate(user, node, src_node=src_node, size=size, type=type, hash=hash, checksum=checksum, is_copy=is_copy)
+ # Handle meta.
+ if src_version_id is None:
+ src_version_id = pre_version_id
+ self._put_metadata_duplicate(src_version_id, dest_version_id, domain, meta, replace_meta)
+
# Check quota.
del_size = self._apply_versioning(account, container, pre_version_id)
size_delta = size - del_size
if permissions is not None:
self.permissions.access_set(path, permissions)
- return pre_version_id, dest_version_id
+
+ self._report_object_change(user, account, path, details={'version': dest_version_id, 'action': 'object update'})
+ return dest_version_id
@backend_method
def update_object_hashmap(self, user, account, container, name, size, type, hashmap, checksum, domain, meta={}, replace_meta=False, permissions=None):
raise ie
hash = map.hash()
- pre_version_id, dest_version_id = self._update_object_hash(user, account, container, name, size, type, binascii.hexlify(hash), checksum, permissions)
- self._put_metadata_duplicate(pre_version_id, dest_version_id, domain, meta, replace_meta)
+ dest_version_id = self._update_object_hash(user, account, container, name, size, type, binascii.hexlify(hash), checksum, domain, meta, replace_meta, permissions)
self.store.map_put(hash, map)
return dest_version_id
size = props[self.SIZE]
is_copy = not is_move and (src_account, src_container, src_name) != (dest_account, dest_container, dest_name) # New uuid.
- pre_version_id, dest_version_id = self._update_object_hash(user, dest_account, dest_container, dest_name, size, type, hash, None, permissions, src_node=node, is_copy=is_copy)
- self._put_metadata_duplicate(src_version_id, dest_version_id, dest_domain, dest_meta, replace_meta)
+ dest_version_id = self._update_object_hash(user, dest_account, dest_container, dest_name, size, type, hash, None, dest_domain, dest_meta, replace_meta, permissions, src_node=node, src_version_id=src_version_id, is_copy=is_copy)
return dest_version_id
@backend_method
del_size = self._apply_versioning(account, container, src_version_id)
if del_size:
self._report_size_change(user, account, -del_size, {'action': 'object delete'})
+ self._report_object_change(user, account, path, details={'action': 'object delete'})
self.permissions.access_clear(path)
@backend_method
account_node = self._lookup_account(account, True)[1]
total = self._get_statistics(account_node)[1]
details.update({'user': user, 'total': total})
- self.queue.send(account, 'diskspace', size, details)
+ self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % ('resource.diskspace',), account, 'diskspace', size, details))
+
+ def _report_object_change(self, user, account, path, details={}):
+ logger.debug("_report_object_change: %s %s %s %s", user, account, path, details)
+ details.update({'user': user})
+ self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % ('object',), account, 'object', path, details))
# Policy functions.
if node is not None:
props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
if props is not None:
- if props[self.TYPE] in ('application/directory', 'application/folder'):
+ if props[self.TYPE].split(';', 1)[0].strip() in ('application/directory', 'application/folder'):
formatted.append((p.rstrip('/') + '/', self.MATCH_PREFIX))
formatted.append((p, self.MATCH_EXACT))
return formatted
if node is not None:
props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
if props is not None:
- if props[self.TYPE] in ('application/directory', 'application/folder'):
+ if props[self.TYPE].split(';', 1)[0].strip() in ('application/directory', 'application/folder'):
return p
return None