Commit d8e98fe2 authored by Antony Chazapis's avatar Antony Chazapis

Move ETag as checksum to version.

Refs #1948
parent cfc1f9ac
......@@ -555,8 +555,8 @@ def object_list(request, v_account, v_container):
except NameError:
pass
else:
rename_meta_key(meta, 'hash', 'x_object_hash') # Will be replaced by ETag.
rename_meta_key(meta, 'ETag', 'hash')
rename_meta_key(meta, 'hash', 'x_object_hash') # Will be replaced by checksum.
rename_meta_key(meta, 'checksum', 'hash')
rename_meta_key(meta, 'type', 'content_type')
rename_meta_key(meta, 'uuid', 'x_object_uuid')
rename_meta_key(meta, 'modified', 'last_modified')
......@@ -616,7 +616,7 @@ def object_meta(request, v_account, v_container, v_object):
validate_matching_preconditions(request, meta)
except NotModified:
response = HttpResponse(status=304)
response['ETag'] = meta['ETag']
response['ETag'] = meta['checksum']
return response
response = HttpResponse(status=200)
......@@ -685,7 +685,7 @@ def object_read(request, v_account, v_container, v_object):
validate_matching_preconditions(request, meta)
except NotModified:
response = HttpResponse(status=304)
response['ETag'] = meta['ETag']
response['ETag'] = meta['checksum']
return response
sizes = []
......@@ -836,6 +836,8 @@ def object_write(request, v_account, v_container, v_object):
hashmap.append(hash.firstChild.data)
except:
raise BadRequest('Invalid data formatting')
checksum = '' # Do not set to None (will copy previous value).
else:
md5 = hashlib.md5()
size = 0
......@@ -848,15 +850,15 @@ def object_write(request, v_account, v_container, v_object):
hashmap.append(request.backend.put_block(data))
md5.update(data)
meta['ETag'] = md5.hexdigest().lower()
checksum = md5.hexdigest().lower()
etag = request.META.get('HTTP_ETAG')
if etag and parse_etags(etag)[0].lower() != meta['ETag']:
if etag and parse_etags(etag)[0].lower() != checksum:
raise UnprocessableEntity('Object ETag does not match')
try:
version_id = request.backend.update_object_hashmap(request.user_uniq,
v_account, v_container, v_object, size, content_type,
hashmap, 'pithos', meta, True, permissions)
hashmap, checksum, 'pithos', meta, True, permissions)
except NotAllowedError:
raise Forbidden('Not allowed')
except IndexError, e:
......@@ -867,14 +869,12 @@ def object_write(request, v_account, v_container, v_object):
raise BadRequest('Invalid sharing header')
except QuotaError:
raise RequestEntityTooLarge('Quota exceeded')
if 'ETag' not in meta:
if not checksum:
# Update the MD5 after the hashmap, as there may be missing hashes.
# TODO: This will create a new version, even if done synchronously...
etag = hashmap_md5(request, hashmap, size)
meta.update({'ETag': etag}) # Update ETag.
checksum = hashmap_md5(request, hashmap, size)
try:
version_id = request.backend.update_object_meta(request.user_uniq,
v_account, v_container, v_object, 'pithos', {'ETag': etag}, False)
version_id = request.backend.update_object_checksum(request.user_uniq,
v_account, v_container, v_object, version_id, checksum)
except NotAllowedError:
raise Forbidden('Not allowed')
if public is not None:
......@@ -887,7 +887,8 @@ def object_write(request, v_account, v_container, v_object):
raise ItemNotFound('Object does not exist')
response = HttpResponse(status=201)
response['ETag'] = meta['ETag']
if checksum:
response['ETag'] = checksum
response['X-Object-Version'] = version_id
return response
......@@ -904,14 +905,11 @@ def object_write_form(request, v_account, v_container, v_object):
raise BadRequest('Missing X-Object-Data field')
file = request.FILES['X-Object-Data']
content_type = file.content_type
meta = {}
meta['ETag'] = file.etag
checksum = file.etag
try:
version_id = request.backend.update_object_hashmap(request.user_uniq,
v_account, v_container, v_object, file.size, content_type,
file.hashmap, 'pithos', meta, True)
v_account, v_container, v_object, file.size, file.content_type,
file.hashmap, checksum, 'pithos', {}, True)
except NotAllowedError:
raise Forbidden('Not allowed')
except NameError:
......@@ -920,9 +918,9 @@ def object_write_form(request, v_account, v_container, v_object):
raise RequestEntityTooLarge('Quota exceeded')
response = HttpResponse(status=201)
response['ETag'] = meta['ETag']
response['ETag'] = checksum
response['X-Object-Version'] = version_id
response.content = meta['ETag']
response.content = checksum
return response
@api_method('COPY', format_allowed=True)
......@@ -1021,13 +1019,9 @@ def object_update(request, v_account, v_container, v_object):
if request.META.get('HTTP_IF_MATCH') or request.META.get('HTTP_IF_NONE_MATCH'):
validate_matching_preconditions(request, prev_meta)
# If replacing, keep previous value of 'ETag'.
replace = True
if 'update' in request.GET:
replace = False
if replace:
if 'ETag' in prev_meta:
meta['ETag'] = prev_meta['ETag']
# A Content-Type or X-Source-Object header indicates data updates.
src_object = request.META.get('HTTP_X_SOURCE_OBJECT')
......@@ -1178,11 +1172,11 @@ def object_update(request, v_account, v_container, v_object):
if dest_bytes is not None and dest_bytes < size:
size = dest_bytes
hashmap = hashmap[:(int((size - 1) / request.backend.block_size) + 1)]
meta.update({'ETag': hashmap_md5(request, hashmap, size)}) # Update ETag.
checksum = hashmap_md5(request, hashmap, size)
try:
version_id = request.backend.update_object_hashmap(request.user_uniq,
v_account, v_container, v_object, size, prev_meta['type'],
hashmap, 'pithos', meta, replace, permissions)
hashmap, checksum, 'pithos', meta, replace, permissions)
except NotAllowedError:
raise Forbidden('Not allowed')
except NameError:
......@@ -1201,7 +1195,7 @@ def object_update(request, v_account, v_container, v_object):
raise ItemNotFound('Object does not exist')
response = HttpResponse(status=204)
response['ETag'] = meta['ETag']
response['ETag'] = checksum
response['X-Object-Version'] = version_id
return response
......
......@@ -183,8 +183,7 @@ def get_object_headers(request):
return content_type, meta, get_sharing(request), get_public(request)
def put_object_headers(response, meta, restricted=False):
if 'ETag' in meta:
response['ETag'] = meta['ETag']
response['ETag'] = meta['checksum']
response['Content-Length'] = meta['bytes']
response['Content-Type'] = meta.get('type', 'application/octet-stream')
response['Last-Modified'] = http_date(int(meta['modified']))
......@@ -219,8 +218,7 @@ def update_manifest_meta(request, v_account, meta):
for x in objects:
src_meta = request.backend.get_object_meta(request.user_uniq,
v_account, src_container, x[0], 'pithos', x[1])
if 'ETag' in src_meta:
etag += src_meta['ETag']
etag += src_meta['checksum']
bytes += src_meta['bytes']
except:
# Ignore errors.
......@@ -228,7 +226,7 @@ def update_manifest_meta(request, v_account, meta):
meta['bytes'] = bytes
md5 = hashlib.md5()
md5.update(etag)
meta['ETag'] = md5.hexdigest().lower()
meta['checksum'] = md5.hexdigest().lower()
def update_sharing_meta(request, permissions, v_account, v_container, v_object, meta):
if permissions is None:
......@@ -275,7 +273,9 @@ def validate_modification_preconditions(request, meta):
def validate_matching_preconditions(request, meta):
"""Check that the ETag conforms with the preconditions set."""
etag = meta.get('ETag', None)
etag = meta['checksum']
if not etag:
etag = None
if_match = request.META.get('HTTP_IF_MATCH')
if if_match is not None:
......@@ -713,7 +713,7 @@ def object_data_response(request, sizes, hashmaps, meta, public=False):
ranges = [(0, size)]
ret = 200
except ValueError:
if if_range != meta['ETag']:
if if_range != meta['checksum']:
ranges = [(0, size)]
ret = 200
......
......@@ -341,6 +341,8 @@ class BaseBackend(object):
'version_timestamp': The version's modification timestamp
'uuid': A unique identifier that persists data or metadata updates and renames
'checksum': The MD5 sum of the object (may be empty)
Raises:
NotAllowedError: Operation not permitted
......@@ -435,7 +437,7 @@ class BaseBackend(object):
"""
return 0, []
def update_object_hashmap(self, user, account, container, name, size, type, hashmap, domain, meta={}, replace_meta=False, permissions=None):
def update_object_hashmap(self, user, account, container, name, size, type, hashmap, checksum, domain, meta={}, replace_meta=False, permissions=None):
"""Create/update an object with the specified size and partial hashes and return the new version.
Parameters:
......@@ -458,6 +460,10 @@ class BaseBackend(object):
"""
return ''
def update_object_checksum(self, user, account, container, name, version, checksum):
"""Update an object's checksum."""
return
def copy_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta={}, replace_meta=False, permissions=None, src_version=None):
"""Copy an object's data and metadata and return the new version.
......
......@@ -32,10 +32,10 @@
# or implied, of GRNET S.A.
from dbwrapper import DBWrapper
from node import Node, ROOTNODE, SERIAL, HASH, SIZE, TYPE, MTIME, MUSER, UUID, CLUSTER, MATCH_PREFIX, MATCH_EXACT
from node import Node, ROOTNODE, SERIAL, HASH, SIZE, TYPE, MTIME, MUSER, UUID, CHECKSUM, CLUSTER, MATCH_PREFIX, MATCH_EXACT
from permissions import Permissions, READ, WRITE
__all__ = ["DBWrapper",
"Node", "ROOTNODE", "SERIAL", "HASH", "SIZE", "TYPE", "MTIME", "MUSER", "UUID", "CLUSTER", "MATCH_PREFIX", "MATCH_EXACT",
"Node", "ROOTNODE", "SERIAL", "HASH", "SIZE", "TYPE", "MTIME", "MUSER", "UUID", "CHECKSUM", "CLUSTER", "MATCH_PREFIX", "MATCH_EXACT",
"Permissions", "READ", "WRITE"]
......@@ -40,7 +40,7 @@ from pithos.lib.filter import parse_filters
ROOTNODE = 0
( SERIAL, NODE, HASH, SIZE, TYPE, SOURCE, MTIME, MUSER, UUID, CLUSTER ) = range(10)
( SERIAL, NODE, HASH, SIZE, TYPE, SOURCE, MTIME, MUSER, UUID, CHECKSUM, CLUSTER ) = range(11)
( MATCH_PREFIX, MATCH_EXACT ) = range(2)
......@@ -92,7 +92,8 @@ _propnames = {
'mtime' : 6,
'muser' : 7,
'uuid' : 8,
'cluster' : 9
'checksum' : 9,
'cluster' : 10
}
......@@ -153,6 +154,7 @@ class Node(DBWorker):
mtime integer,
muser text not null default '',
uuid text not null default '',
checksum text not null default '',
cluster integer not null default 0,
foreign key (node)
references nodes(node)
......@@ -211,10 +213,10 @@ class Node(DBWorker):
def node_get_versions(self, node, keys=(), propnames=_propnames):
"""Return the properties of all versions at node.
If keys is empty, return all properties in the order
(serial, node, hash, size, type, source, mtime, muser, uuid, cluster).
(serial, node, hash, size, type, source, mtime, muser, uuid, checksum, cluster).
"""
q = ("select serial, node, hash, size, type, source, mtime, muser, uuid, cluster "
q = ("select serial, node, hash, size, type, source, mtime, muser, uuid, checksum, cluster "
"from versions "
"where node = ?")
self.execute(q, (node,))
......@@ -417,7 +419,7 @@ class Node(DBWorker):
parent, path = props
# The latest version.
q = ("select serial, node, hash, size, type, source, mtime, muser, uuid, cluster "
q = ("select serial, node, hash, size, type, source, mtime, muser, uuid, checksum, cluster "
"from versions "
"where serial = (select max(serial) "
"from versions "
......@@ -467,15 +469,15 @@ class Node(DBWorker):
mtime = max(mtime, r[2])
return (count, size, mtime)
def version_create(self, node, hash, size, type, source, muser, uuid, cluster=0):
def version_create(self, node, hash, size, type, source, muser, uuid, checksum, cluster=0):
"""Create a new version from the given properties.
Return the (serial, mtime) of the new version.
"""
q = ("insert into versions (node, hash, size, type, source, mtime, muser, uuid, cluster) "
"values (?, ?, ?, ?, ?, ?, ?, ?, ?)")
q = ("insert into versions (node, hash, size, type, source, mtime, muser, uuid, checksum, cluster) "
"values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)")
mtime = time()
props = (node, hash, size, type, source, mtime, muser, uuid, cluster)
props = (node, hash, size, type, source, mtime, muser, uuid, checksum, cluster)
serial = self.execute(q, props).lastrowid
self.statistics_update_ancestors(node, 1, size, mtime, cluster)
return serial, mtime
......@@ -483,11 +485,11 @@ class Node(DBWorker):
def version_lookup(self, node, before=inf, cluster=0):
"""Lookup the current version of the given node.
Return a list with its properties:
(serial, node, hash, size, type, source, mtime, muser, uuid, cluster)
(serial, node, hash, size, type, source, mtime, muser, uuid, checksum, cluster)
or None if the current version is not found in the given cluster.
"""
q = ("select serial, node, hash, size, type, source, mtime, muser, uuid, cluster "
q = ("select serial, node, hash, size, type, source, mtime, muser, uuid, checksum, cluster "
"from versions "
"where serial = (select max(serial) "
"from versions "
......@@ -503,10 +505,10 @@ class Node(DBWorker):
"""Return a sequence of values for the properties of
the version specified by serial and the keys, in the order given.
If keys is empty, return all properties in the order
(serial, node, hash, size, type, source, mtime, muser, uuid, cluster).
(serial, node, hash, size, type, source, mtime, muser, uuid, checksum, cluster).
"""
q = ("select serial, node, hash, size, type, source, mtime, muser, uuid, cluster "
q = ("select serial, node, hash, size, type, source, mtime, muser, uuid, checksum, cluster "
"from versions "
"where serial = ?")
self.execute(q, (serial,))
......@@ -518,6 +520,14 @@ class Node(DBWorker):
return r
return [r[propnames[k]] for k in keys if k in propnames]
def version_put_property(self, serial, key, value):
"""Set value for the property of version specified by key."""
if key not in _propnames:
return
q = "update versions set %s = ? where serial = ?" % key
self.execute(q, (value, serial))
def version_recluster(self, serial, cluster):
"""Move the version into another cluster."""
......
......@@ -115,7 +115,7 @@ class ModularBackend(BaseBackend):
for x in ['READ', 'WRITE']:
setattr(self, x, getattr(self.db_module, x))
self.node = self.db_module.Node(**params)
for x in ['ROOTNODE', 'SERIAL', 'HASH', 'SIZE', 'TYPE', 'MTIME', 'MUSER', 'UUID', 'CLUSTER', 'MATCH_PREFIX', 'MATCH_EXACT']:
for x in ['ROOTNODE', 'SERIAL', 'HASH', 'SIZE', 'TYPE', 'MTIME', 'MUSER', 'UUID', 'CHECKSUM', 'CLUSTER', 'MATCH_PREFIX', 'MATCH_EXACT']:
setattr(self, x, getattr(self.db_module, x))
self.block_module = load_module(block_module)
......@@ -463,9 +463,16 @@ class ModularBackend(BaseBackend):
modified = del_props[self.MTIME]
meta = dict(self.node.attribute_get(props[self.SERIAL], domain))
meta.update({'name': name, 'bytes': props[self.SIZE], 'type': props[self.TYPE], 'hash':props[self.HASH]})
meta.update({'version': props[self.SERIAL], 'version_timestamp': props[self.MTIME]})
meta.update({'modified': modified, 'modified_by': props[self.MUSER], 'uuid': props[self.UUID]})
meta.update({'name': name,
'bytes': props[self.SIZE],
'type': props[self.TYPE],
'hash':props[self.HASH],
'version': props[self.SERIAL],
'version_timestamp': props[self.MTIME],
'modified': modified,
'modified_by': props[self.MUSER],
'uuid': props[self.UUID],
'checksum': props[self.CHECKSUM]})
return meta
@backend_method
......@@ -544,7 +551,7 @@ class ModularBackend(BaseBackend):
hashmap = self.store.map_get(binascii.unhexlify(props[self.HASH]))
return props[self.SIZE], [binascii.hexlify(x) for x in hashmap]
def _update_object_hash(self, user, account, container, name, size, type, hash, permissions, src_node=None, is_copy=False):
def _update_object_hash(self, user, account, container, name, size, type, hash, checksum, permissions, src_node=None, is_copy=False):
if permissions is not None and user != account:
raise NotAllowedError
self._can_write(user, account, container, name)
......@@ -555,7 +562,7 @@ class ModularBackend(BaseBackend):
account_path, account_node = self._lookup_account(account, True)
container_path, container_node = self._lookup_container(account, container)
path, node = self._put_object_node(container_path, container_node, name)
pre_version_id, dest_version_id = self._put_version_duplicate(user, node, src_node=src_node, size=size, type=type, hash=hash, is_copy=is_copy)
pre_version_id, dest_version_id = self._put_version_duplicate(user, node, src_node=src_node, size=size, type=type, hash=hash, checksum=checksum, is_copy=is_copy)
# Check quota.
del_size = self._apply_versioning(account, container, pre_version_id)
......@@ -574,10 +581,10 @@ class ModularBackend(BaseBackend):
return pre_version_id, dest_version_id
@backend_method
def update_object_hashmap(self, user, account, container, name, size, type, hashmap, domain, meta={}, replace_meta=False, permissions=None):
def update_object_hashmap(self, user, account, container, name, size, type, hashmap, checksum, domain, meta={}, replace_meta=False, permissions=None):
"""Create/update an object with the specified size and partial hashes."""
logger.debug("update_object_hashmap: %s %s %s %s %s %s", account, container, name, size, type, hashmap)
logger.debug("update_object_hashmap: %s %s %s %s %s %s %s", account, container, name, size, type, hashmap, checksum)
if size == 0: # No such thing as an empty hashmap.
hashmap = [self.put_block('')]
map = HashMap(self.block_size, self.hash_algorithm)
......@@ -589,11 +596,25 @@ class ModularBackend(BaseBackend):
raise ie
hash = map.hash()
pre_version_id, dest_version_id = self._update_object_hash(user, account, container, name, size, type, binascii.hexlify(hash), permissions)
pre_version_id, dest_version_id = self._update_object_hash(user, account, container, name, size, type, binascii.hexlify(hash), checksum, permissions)
self._put_metadata_duplicate(pre_version_id, dest_version_id, domain, meta, replace_meta)
self.store.map_put(hash, map)
return dest_version_id
@backend_method
def update_object_checksum(self, user, account, container, name, version, checksum):
"""Update an object's checksum."""
logger.debug("update_object_checksum: %s %s %s %s %s", account, container, name, version, checksum)
# Update objects with greater version and same hashmap and size (fix metadata updates).
self._can_write(user, account, container, name)
path, node = self._lookup_object(account, container, name)
props = self._get_version(node, version)
versions = self.node.node_get_versions(node)
for x in versions:
if x[self.SERIAL] >= int(version) and x[self.HASH] == props[self.HASH] and x[self.SIZE] == props[self.SIZE]:
self.node.version_put_property(x[self.SERIAL], 'checksum', checksum)
def _copy_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, dest_domain=None, dest_meta={}, replace_meta=False, permissions=None, src_version=None, is_move=False):
self._can_read(user, src_account, src_container, src_name)
path, node = self._lookup_object(src_account, src_container, src_name)
......@@ -604,7 +625,7 @@ class ModularBackend(BaseBackend):
size = props[self.SIZE]
is_copy = not is_move and (src_account, src_container, src_name) != (dest_account, dest_container, dest_name) # New uuid.
pre_version_id, dest_version_id = self._update_object_hash(user, dest_account, dest_container, dest_name, size, type, hash, permissions, src_node=node, is_copy=is_copy)
pre_version_id, dest_version_id = self._update_object_hash(user, dest_account, dest_container, dest_name, size, type, hash, checksum=None, permissions, src_node=node, is_copy=is_copy)
self._put_metadata_duplicate(src_version_id, dest_version_id, dest_domain, dest_meta, replace_meta)
return dest_version_id
......@@ -656,7 +677,7 @@ class ModularBackend(BaseBackend):
return
path, node = self._lookup_object(account, container, name)
src_version_id, dest_version_id = self._put_version_duplicate(user, node, size=0, type='', hash=None, cluster=CLUSTER_DELETED)
src_version_id, dest_version_id = self._put_version_duplicate(user, node, size=0, type='', hash=None, checksum='', cluster=CLUSTER_DELETED)
del_size = self._apply_versioning(account, container, src_version_id)
if del_size:
self._report_size_change(user, account, -del_size, {'action': 'object delete'})
......@@ -747,7 +768,7 @@ class ModularBackend(BaseBackend):
def _put_path(self, user, parent, path):
node = self.node.node_create(parent, path)
self.node.version_create(node, None, 0, '', None, user, self._generate_uuid(), CLUSTER_NORMAL)
self.node.version_create(node, None, 0, '', None, user, self._generate_uuid(), '', CLUSTER_NORMAL)
return node
def _lookup_account(self, account, create=True):
......@@ -807,7 +828,7 @@ class ModularBackend(BaseBackend):
raise IndexError('Version does not exist')
return props
def _put_version_duplicate(self, user, node, src_node=None, size=None, type=None, hash=None, cluster=CLUSTER_NORMAL, is_copy=False):
def _put_version_duplicate(self, user, node, src_node=None, size=None, type=None, hash=None, checksum=None, cluster=CLUSTER_NORMAL, is_copy=False):
"""Create a new version of the node."""
props = self.node.version_lookup(node if src_node is None else src_node, inf, CLUSTER_NORMAL)
......@@ -816,16 +837,20 @@ class ModularBackend(BaseBackend):
src_hash = props[self.HASH]
src_size = props[self.SIZE]
src_type = props[self.TYPE]
src_checksum = props[self.CHECKSUM]
else:
src_version_id = None
src_hash = None
src_size = 0
src_type = ''
src_checksum = ''
if size is None: # Set metadata.
hash = src_hash # This way hash can be set to None (account or container).
size = src_size
if type is None:
type = src_type
if checksum is None:
checksum = src_checksum
uuid = self._generate_uuid() if (is_copy or src_version_id is None) else props[self.UUID]
if src_node is None:
......@@ -838,7 +863,7 @@ class ModularBackend(BaseBackend):
if pre_version_id is not None:
self.node.version_recluster(pre_version_id, CLUSTER_HISTORY)
dest_version_id, mtime = self.node.version_create(node, hash, size, type, src_version_id, user, uuid, cluster)
dest_version_id, mtime = self.node.version_create(node, hash, size, type, src_version_id, user, uuid, checksum, cluster)
return pre_version_id, dest_version_id
def _put_metadata_duplicate(self, src_version_id, dest_version_id, domain, meta, replace=False):
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment