except NameError:
pass
else:
- rename_meta_key(meta, 'hash', 'x_object_hash') # Will be replaced by ETag.
- rename_meta_key(meta, 'ETag', 'hash')
+ rename_meta_key(meta, 'hash', 'x_object_hash') # Will be replaced by checksum.
+ rename_meta_key(meta, 'checksum', 'hash')
rename_meta_key(meta, 'type', 'content_type')
rename_meta_key(meta, 'uuid', 'x_object_uuid')
rename_meta_key(meta, 'modified', 'last_modified')
validate_matching_preconditions(request, meta)
except NotModified:
response = HttpResponse(status=304)
- response['ETag'] = meta['ETag']
+ response['ETag'] = meta['checksum']
return response
response = HttpResponse(status=200)
validate_matching_preconditions(request, meta)
except NotModified:
response = HttpResponse(status=304)
- response['ETag'] = meta['ETag']
+ response['ETag'] = meta['checksum']
return response
sizes = []
hashmap.append(hash.firstChild.data)
except:
raise BadRequest('Invalid data formatting')
+
+ checksum = '' # Do not set to None (will copy previous value).
else:
md5 = hashlib.md5()
size = 0
hashmap.append(request.backend.put_block(data))
md5.update(data)
- meta['ETag'] = md5.hexdigest().lower()
+ checksum = md5.hexdigest().lower()
etag = request.META.get('HTTP_ETAG')
- if etag and parse_etags(etag)[0].lower() != meta['ETag']:
+ if etag and parse_etags(etag)[0].lower() != checksum:
raise UnprocessableEntity('Object ETag does not match')
try:
version_id = request.backend.update_object_hashmap(request.user_uniq,
v_account, v_container, v_object, size, content_type,
- hashmap, 'pithos', meta, True, permissions)
+ hashmap, checksum, 'pithos', meta, True, permissions)
except NotAllowedError:
raise Forbidden('Not allowed')
except IndexError, e:
raise BadRequest('Invalid sharing header')
except QuotaError:
raise RequestEntityTooLarge('Quota exceeded')
- if 'ETag' not in meta:
+ if not checksum:
# Update the MD5 after the hashmap, as there may be missing hashes.
- # TODO: This will create a new version, even if done synchronously...
- etag = hashmap_md5(request, hashmap, size)
- meta.update({'ETag': etag}) # Update ETag.
+ checksum = hashmap_md5(request, hashmap, size)
try:
- version_id = request.backend.update_object_meta(request.user_uniq,
- v_account, v_container, v_object, 'pithos', {'ETag': etag}, False)
+ version_id = request.backend.update_object_checksum(request.user_uniq,
+ v_account, v_container, v_object, version_id, checksum)
except NotAllowedError:
raise Forbidden('Not allowed')
if public is not None:
raise ItemNotFound('Object does not exist')
response = HttpResponse(status=201)
- response['ETag'] = meta['ETag']
+ if checksum:
+ response['ETag'] = checksum
response['X-Object-Version'] = version_id
return response
raise BadRequest('Missing X-Object-Data field')
file = request.FILES['X-Object-Data']
- content_type = file.content_type
- meta = {}
- meta['ETag'] = file.etag
-
+ checksum = file.etag
try:
version_id = request.backend.update_object_hashmap(request.user_uniq,
- v_account, v_container, v_object, file.size, content_type,
- file.hashmap, 'pithos', meta, True)
+ v_account, v_container, v_object, file.size, file.content_type,
+ file.hashmap, checksum, 'pithos', {}, True)
except NotAllowedError:
raise Forbidden('Not allowed')
except NameError:
raise RequestEntityTooLarge('Quota exceeded')
response = HttpResponse(status=201)
- response['ETag'] = meta['ETag']
+ response['ETag'] = checksum
response['X-Object-Version'] = version_id
- response.content = meta['ETag']
+ response.content = checksum
return response
@api_method('COPY', format_allowed=True)
if request.META.get('HTTP_IF_MATCH') or request.META.get('HTTP_IF_NONE_MATCH'):
validate_matching_preconditions(request, prev_meta)
- # If replacing, keep previous value of 'ETag'.
replace = True
if 'update' in request.GET:
replace = False
- if replace:
- if 'ETag' in prev_meta:
- meta['ETag'] = prev_meta['ETag']
# A Content-Type or X-Source-Object header indicates data updates.
src_object = request.META.get('HTTP_X_SOURCE_OBJECT')
if dest_bytes is not None and dest_bytes < size:
size = dest_bytes
hashmap = hashmap[:(int((size - 1) / request.backend.block_size) + 1)]
- meta.update({'ETag': hashmap_md5(request, hashmap, size)}) # Update ETag.
+ checksum = hashmap_md5(request, hashmap, size)
try:
version_id = request.backend.update_object_hashmap(request.user_uniq,
v_account, v_container, v_object, size, prev_meta['type'],
- hashmap, 'pithos', meta, replace, permissions)
+ hashmap, checksum, 'pithos', meta, replace, permissions)
except NotAllowedError:
raise Forbidden('Not allowed')
except NameError:
raise ItemNotFound('Object does not exist')
response = HttpResponse(status=204)
- response['ETag'] = meta['ETag']
+ response['ETag'] = checksum
response['X-Object-Version'] = version_id
return response
return content_type, meta, get_sharing(request), get_public(request)
def put_object_headers(response, meta, restricted=False):
- if 'ETag' in meta:
- response['ETag'] = meta['ETag']
+ response['ETag'] = meta['checksum']
response['Content-Length'] = meta['bytes']
response['Content-Type'] = meta.get('type', 'application/octet-stream')
response['Last-Modified'] = http_date(int(meta['modified']))
for x in objects:
src_meta = request.backend.get_object_meta(request.user_uniq,
v_account, src_container, x[0], 'pithos', x[1])
- if 'ETag' in src_meta:
- etag += src_meta['ETag']
+ etag += src_meta['checksum']
bytes += src_meta['bytes']
except:
# Ignore errors.
meta['bytes'] = bytes
md5 = hashlib.md5()
md5.update(etag)
- meta['ETag'] = md5.hexdigest().lower()
+ meta['checksum'] = md5.hexdigest().lower()
def update_sharing_meta(request, permissions, v_account, v_container, v_object, meta):
if permissions is None:
def validate_matching_preconditions(request, meta):
"""Check that the ETag conforms with the preconditions set."""
- etag = meta.get('ETag', None)
+ etag = meta['checksum']
+ if not etag:
+ etag = None
if_match = request.META.get('HTTP_IF_MATCH')
if if_match is not None:
ranges = [(0, size)]
ret = 200
except ValueError:
- if if_range != meta['ETag']:
+ if if_range != meta['checksum']:
ranges = [(0, size)]
ret = 200
'version_timestamp': The version's modification timestamp
'uuid': A unique identifier that persists data or metadata updates and renames
+
+ 'checksum': The MD5 sum of the object (may be empty)
Raises:
NotAllowedError: Operation not permitted
"""
return 0, []
- def update_object_hashmap(self, user, account, container, name, size, type, hashmap, domain, meta={}, replace_meta=False, permissions=None):
+ def update_object_hashmap(self, user, account, container, name, size, type, hashmap, checksum, domain, meta={}, replace_meta=False, permissions=None):
"""Create/update an object with the specified size and partial hashes and return the new version.
Parameters:
"""
return ''
+ def update_object_checksum(self, user, account, container, name, version, checksum):
+ """Update an object's checksum."""
+ return
+
def copy_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta={}, replace_meta=False, permissions=None, src_version=None):
"""Copy an object's data and metadata and return the new version.
# or implied, of GRNET S.A.
from dbwrapper import DBWrapper
-from node import Node, ROOTNODE, SERIAL, HASH, SIZE, TYPE, MTIME, MUSER, UUID, CLUSTER, MATCH_PREFIX, MATCH_EXACT
+from node import Node, ROOTNODE, SERIAL, HASH, SIZE, TYPE, MTIME, MUSER, UUID, CHECKSUM, CLUSTER, MATCH_PREFIX, MATCH_EXACT
from permissions import Permissions, READ, WRITE
__all__ = ["DBWrapper",
- "Node", "ROOTNODE", "SERIAL", "HASH", "SIZE", "TYPE", "MTIME", "MUSER", "UUID", "CLUSTER", "MATCH_PREFIX", "MATCH_EXACT",
+ "Node", "ROOTNODE", "SERIAL", "HASH", "SIZE", "TYPE", "MTIME", "MUSER", "UUID", "CHECKSUM", "CLUSTER", "MATCH_PREFIX", "MATCH_EXACT",
"Permissions", "READ", "WRITE"]
ROOTNODE = 0
-( SERIAL, NODE, HASH, SIZE, TYPE, SOURCE, MTIME, MUSER, UUID, CLUSTER ) = range(10)
+( SERIAL, NODE, HASH, SIZE, TYPE, SOURCE, MTIME, MUSER, UUID, CHECKSUM, CLUSTER ) = range(11)
( MATCH_PREFIX, MATCH_EXACT ) = range(2)
'mtime' : 6,
'muser' : 7,
'uuid' : 8,
- 'cluster' : 9
+ 'checksum' : 9,
+ 'cluster' : 10
}
mtime integer,
muser text not null default '',
uuid text not null default '',
+ checksum text not null default '',
cluster integer not null default 0,
foreign key (node)
references nodes(node)
def node_get_versions(self, node, keys=(), propnames=_propnames):
"""Return the properties of all versions at node.
If keys is empty, return all properties in the order
- (serial, node, hash, size, type, source, mtime, muser, uuid, cluster).
+ (serial, node, hash, size, type, source, mtime, muser, uuid, checksum, cluster).
"""
- q = ("select serial, node, hash, size, type, source, mtime, muser, uuid, cluster "
+ q = ("select serial, node, hash, size, type, source, mtime, muser, uuid, checksum, cluster "
"from versions "
"where node = ?")
self.execute(q, (node,))
parent, path = props
# The latest version.
- q = ("select serial, node, hash, size, type, source, mtime, muser, uuid, cluster "
+ q = ("select serial, node, hash, size, type, source, mtime, muser, uuid, checksum, cluster "
"from versions "
"where serial = (select max(serial) "
"from versions "
mtime = max(mtime, r[2])
return (count, size, mtime)
- def version_create(self, node, hash, size, type, source, muser, uuid, cluster=0):
+ def version_create(self, node, hash, size, type, source, muser, uuid, checksum, cluster=0):
"""Create a new version from the given properties.
Return the (serial, mtime) of the new version.
"""
- q = ("insert into versions (node, hash, size, type, source, mtime, muser, uuid, cluster) "
- "values (?, ?, ?, ?, ?, ?, ?, ?, ?)")
+ q = ("insert into versions (node, hash, size, type, source, mtime, muser, uuid, checksum, cluster) "
+ "values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)")
mtime = time()
- props = (node, hash, size, type, source, mtime, muser, uuid, cluster)
+ props = (node, hash, size, type, source, mtime, muser, uuid, checksum, cluster)
serial = self.execute(q, props).lastrowid
self.statistics_update_ancestors(node, 1, size, mtime, cluster)
return serial, mtime
def version_lookup(self, node, before=inf, cluster=0):
"""Lookup the current version of the given node.
Return a list with its properties:
- (serial, node, hash, size, type, source, mtime, muser, uuid, cluster)
+ (serial, node, hash, size, type, source, mtime, muser, uuid, checksum, cluster)
or None if the current version is not found in the given cluster.
"""
- q = ("select serial, node, hash, size, type, source, mtime, muser, uuid, cluster "
+ q = ("select serial, node, hash, size, type, source, mtime, muser, uuid, checksum, cluster "
"from versions "
"where serial = (select max(serial) "
"from versions "
"""Return a sequence of values for the properties of
the version specified by serial and the keys, in the order given.
If keys is empty, return all properties in the order
- (serial, node, hash, size, type, source, mtime, muser, uuid, cluster).
+ (serial, node, hash, size, type, source, mtime, muser, uuid, checksum, cluster).
"""
- q = ("select serial, node, hash, size, type, source, mtime, muser, uuid, cluster "
+ q = ("select serial, node, hash, size, type, source, mtime, muser, uuid, checksum, cluster "
"from versions "
"where serial = ?")
self.execute(q, (serial,))
return r
return [r[propnames[k]] for k in keys if k in propnames]
+ def version_put_property(self, serial, key, value):
+ """Set value for the property of version specified by key."""
+
+ if key not in _propnames:
+ return
+ q = "update versions set %s = ? where serial = ?" % key
+ self.execute(q, (value, serial))
+
def version_recluster(self, serial, cluster):
"""Move the version into another cluster."""
for x in ['READ', 'WRITE']:
setattr(self, x, getattr(self.db_module, x))
self.node = self.db_module.Node(**params)
- for x in ['ROOTNODE', 'SERIAL', 'HASH', 'SIZE', 'TYPE', 'MTIME', 'MUSER', 'UUID', 'CLUSTER', 'MATCH_PREFIX', 'MATCH_EXACT']:
+ for x in ['ROOTNODE', 'SERIAL', 'HASH', 'SIZE', 'TYPE', 'MTIME', 'MUSER', 'UUID', 'CHECKSUM', 'CLUSTER', 'MATCH_PREFIX', 'MATCH_EXACT']:
setattr(self, x, getattr(self.db_module, x))
self.block_module = load_module(block_module)
modified = del_props[self.MTIME]
meta = dict(self.node.attribute_get(props[self.SERIAL], domain))
- meta.update({'name': name, 'bytes': props[self.SIZE], 'type': props[self.TYPE], 'hash':props[self.HASH]})
- meta.update({'version': props[self.SERIAL], 'version_timestamp': props[self.MTIME]})
- meta.update({'modified': modified, 'modified_by': props[self.MUSER], 'uuid': props[self.UUID]})
+ meta.update({'name': name,
+ 'bytes': props[self.SIZE],
+ 'type': props[self.TYPE],
+ 'hash':props[self.HASH],
+ 'version': props[self.SERIAL],
+ 'version_timestamp': props[self.MTIME],
+ 'modified': modified,
+ 'modified_by': props[self.MUSER],
+ 'uuid': props[self.UUID],
+ 'checksum': props[self.CHECKSUM]})
return meta
@backend_method
hashmap = self.store.map_get(binascii.unhexlify(props[self.HASH]))
return props[self.SIZE], [binascii.hexlify(x) for x in hashmap]
- def _update_object_hash(self, user, account, container, name, size, type, hash, permissions, src_node=None, is_copy=False):
+ def _update_object_hash(self, user, account, container, name, size, type, hash, checksum, permissions, src_node=None, is_copy=False):
if permissions is not None and user != account:
raise NotAllowedError
self._can_write(user, account, container, name)
account_path, account_node = self._lookup_account(account, True)
container_path, container_node = self._lookup_container(account, container)
path, node = self._put_object_node(container_path, container_node, name)
- pre_version_id, dest_version_id = self._put_version_duplicate(user, node, src_node=src_node, size=size, type=type, hash=hash, is_copy=is_copy)
+ pre_version_id, dest_version_id = self._put_version_duplicate(user, node, src_node=src_node, size=size, type=type, hash=hash, checksum=checksum, is_copy=is_copy)
# Check quota.
del_size = self._apply_versioning(account, container, pre_version_id)
return pre_version_id, dest_version_id
@backend_method
- def update_object_hashmap(self, user, account, container, name, size, type, hashmap, domain, meta={}, replace_meta=False, permissions=None):
+ def update_object_hashmap(self, user, account, container, name, size, type, hashmap, checksum, domain, meta={}, replace_meta=False, permissions=None):
"""Create/update an object with the specified size and partial hashes."""
- logger.debug("update_object_hashmap: %s %s %s %s %s %s", account, container, name, size, type, hashmap)
+ logger.debug("update_object_hashmap: %s %s %s %s %s %s %s", account, container, name, size, type, hashmap, checksum)
if size == 0: # No such thing as an empty hashmap.
hashmap = [self.put_block('')]
map = HashMap(self.block_size, self.hash_algorithm)
raise ie
hash = map.hash()
- pre_version_id, dest_version_id = self._update_object_hash(user, account, container, name, size, type, binascii.hexlify(hash), permissions)
+ pre_version_id, dest_version_id = self._update_object_hash(user, account, container, name, size, type, binascii.hexlify(hash), checksum, permissions)
self._put_metadata_duplicate(pre_version_id, dest_version_id, domain, meta, replace_meta)
self.store.map_put(hash, map)
return dest_version_id
+ @backend_method
+ def update_object_checksum(self, user, account, container, name, version, checksum):
+ """Update an object's checksum."""
+
+ logger.debug("update_object_checksum: %s %s %s %s %s", account, container, name, version, checksum)
+ # Update objects with greater version and same hashmap and size (fix metadata updates).
+ self._can_write(user, account, container, name)
+ path, node = self._lookup_object(account, container, name)
+ props = self._get_version(node, version)
+ versions = self.node.node_get_versions(node)
+ for x in versions:
+ if x[self.SERIAL] >= int(version) and x[self.HASH] == props[self.HASH] and x[self.SIZE] == props[self.SIZE]:
+ self.node.version_put_property(x[self.SERIAL], 'checksum', checksum)
+
def _copy_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, dest_domain=None, dest_meta={}, replace_meta=False, permissions=None, src_version=None, is_move=False):
self._can_read(user, src_account, src_container, src_name)
path, node = self._lookup_object(src_account, src_container, src_name)
size = props[self.SIZE]
is_copy = not is_move and (src_account, src_container, src_name) != (dest_account, dest_container, dest_name) # New uuid.
- pre_version_id, dest_version_id = self._update_object_hash(user, dest_account, dest_container, dest_name, size, type, hash, permissions, src_node=node, is_copy=is_copy)
+ pre_version_id, dest_version_id = self._update_object_hash(user, dest_account, dest_container, dest_name, size, type, hash, checksum=None, permissions, src_node=node, is_copy=is_copy)
self._put_metadata_duplicate(src_version_id, dest_version_id, dest_domain, dest_meta, replace_meta)
return dest_version_id
return
path, node = self._lookup_object(account, container, name)
- src_version_id, dest_version_id = self._put_version_duplicate(user, node, size=0, type='', hash=None, cluster=CLUSTER_DELETED)
+ src_version_id, dest_version_id = self._put_version_duplicate(user, node, size=0, type='', hash=None, checksum='', cluster=CLUSTER_DELETED)
del_size = self._apply_versioning(account, container, src_version_id)
if del_size:
self._report_size_change(user, account, -del_size, {'action': 'object delete'})
def _put_path(self, user, parent, path):
node = self.node.node_create(parent, path)
- self.node.version_create(node, None, 0, '', None, user, self._generate_uuid(), CLUSTER_NORMAL)
+ self.node.version_create(node, None, 0, '', None, user, self._generate_uuid(), '', CLUSTER_NORMAL)
return node
def _lookup_account(self, account, create=True):
raise IndexError('Version does not exist')
return props
- def _put_version_duplicate(self, user, node, src_node=None, size=None, type=None, hash=None, cluster=CLUSTER_NORMAL, is_copy=False):
+ def _put_version_duplicate(self, user, node, src_node=None, size=None, type=None, hash=None, checksum=None, cluster=CLUSTER_NORMAL, is_copy=False):
"""Create a new version of the node."""
props = self.node.version_lookup(node if src_node is None else src_node, inf, CLUSTER_NORMAL)
src_hash = props[self.HASH]
src_size = props[self.SIZE]
src_type = props[self.TYPE]
+ src_checksum = props[self.CHECKSUM]
else:
src_version_id = None
src_hash = None
src_size = 0
src_type = ''
+ src_checksum = ''
if size is None: # Set metadata.
hash = src_hash # This way hash can be set to None (account or container).
size = src_size
if type is None:
type = src_type
+ if checksum is None:
+ checksum = src_checksum
uuid = self._generate_uuid() if (is_copy or src_version_id is None) else props[self.UUID]
if src_node is None:
if pre_version_id is not None:
self.node.version_recluster(pre_version_id, CLUSTER_HISTORY)
- dest_version_id, mtime = self.node.version_create(node, hash, size, type, src_version_id, user, uuid, cluster)
+ dest_version_id, mtime = self.node.version_create(node, hash, size, type, src_version_id, user, uuid, checksum, cluster)
return pre_version_id, dest_version_id
def _put_metadata_duplicate(self, src_version_id, dest_version_id, domain, meta, replace=False):