Commit 2f577992 authored by Antony Chazapis's avatar Antony Chazapis
Browse files

Provide metadata functions for tags and trash support. Restructure backend to...

Provide metadata functions for tags and trash support. Restructure backend to work with blocks and hashmaps.

List of major API updates (tested and ready changes are in the docs):
* List object meta keys in container headers.
* Filter object listings based on their meta keys.
* Enforce metadata replacement on POST/COPY/MOVE.
* Add Content-Range to range replies.
* Return ETag in If-None-Match GET replies.
* Handle multi-range GET replies as outlined in RFC2616.
* Add Content-Disposition to the allowable object headers.
* Implement block per block GET and PUT.
* Better range parsing (and multiple ranges).

The new backend is version-ready, but the API does not yet implement versions. The frontend exposes hashmaps through GET - this may change.

Fixes #559
Fixes #561
Refs #563
parent 539e1583
......@@ -30,6 +30,7 @@ import re
import datetime
import calendar
MONTHS = 'jan feb mar apr may jun jul aug sep oct nov dec'.split()
__D = r'(?P<day>\d{2})'
__D2 = r'(?P<day>[ \d]\d)'
__M = r'(?P<mon>\w{3})'
......
import os
import logging
import hashlib
import types
import uuid
from django.http import HttpResponse
from django.template.loader import render_to_string
......@@ -10,10 +10,11 @@ from django.utils.http import parse_etags
from pithos.api.faults import (Fault, NotModified, BadRequest, Unauthorized, ItemNotFound, Conflict,
LengthRequired, PreconditionFailed, RangeNotSatisfiable, UnprocessableEntity)
from pithos.api.util import (printable_meta_dict, get_account_meta, put_account_meta,
get_container_meta, put_container_meta, get_object_meta, put_object_meta,
validate_modification_preconditions, copy_or_move_object, get_range,
raw_input_socket, socket_read_iterator, api_method)
from pithos.api.util import (format_meta_key, printable_meta_dict, get_account_meta,
put_account_meta, get_container_meta, put_container_meta, get_object_meta, put_object_meta,
validate_modification_preconditions, validate_matching_preconditions, copy_or_move_object,
get_content_length, get_range, get_content_range, raw_input_socket, socket_read_iterator,
ObjectWrapper, api_method)
from pithos.backends import backend
......@@ -106,7 +107,7 @@ def account_update(request, v_account):
# badRequest (400)
meta = get_account_meta(request)
backend.update_account_meta(request.user, meta)
backend.update_account_meta(request.user, meta, replace=True)
return HttpResponse(status=202)
@api_method('GET', format_allowed=True)
......@@ -173,6 +174,7 @@ def container_meta(request, v_account, v_container):
try:
meta = backend.get_container_meta(request.user, v_container)
meta['object_meta'] = backend.list_object_meta(request.user, v_container)
except NameError:
raise ItemNotFound('Container does not exist')
......@@ -191,13 +193,13 @@ def container_create(request, v_account, v_container):
meta = get_container_meta(request)
try:
backend.create_container(request.user, v_container)
backend.put_container(request.user, v_container)
ret = 201
except NameError:
ret = 202
if len(meta) > 0:
backend.update_container_meta(request.user, v_container, meta)
backend.update_container_meta(request.user, v_container, meta, replace=True)
return HttpResponse(status=ret)
......@@ -211,7 +213,7 @@ def container_update(request, v_account, v_container):
meta = get_container_meta(request)
try:
backend.update_container_meta(request.user, v_container, meta)
backend.update_container_meta(request.user, v_container, meta, replace=True)
except NameError:
raise ItemNotFound('Container does not exist')
return HttpResponse(status=202)
......@@ -243,6 +245,7 @@ def object_list(request, v_account, v_container):
try:
meta = backend.get_container_meta(request.user, v_container)
meta['object_meta'] = backend.list_object_meta(request.user, v_container)
except NameError:
raise ItemNotFound('Container does not exist')
......@@ -279,8 +282,15 @@ def object_list(request, v_account, v_container):
except ValueError:
limit = 10000
keys = request.GET.get('meta')
if keys:
keys = keys.split(',')
keys = [format_meta_key('X-Object-Meta-' + x.strip()) for x in keys if x.strip() != '']
else:
keys = []
try:
objects = backend.list_objects(request.user, v_container, prefix, delimiter, marker, limit, virtual)
objects = backend.list_objects(request.user, v_container, prefix, delimiter, marker, limit, virtual, keys)
except NameError:
raise ItemNotFound('Container does not exist')
......@@ -300,7 +310,7 @@ def object_list(request, v_account, v_container):
except NameError:
# Virtual objects/directories.
if virtual and delimiter and x.endswith(delimiter):
object_meta.append({"subdir": x})
object_meta.append({'subdir': x})
continue
object_meta.append(printable_meta_dict(meta))
if request.serialization == 'xml':
......@@ -328,7 +338,7 @@ def object_meta(request, v_account, v_container, v_object):
put_object_meta(response, meta)
return response
@api_method('GET')
@api_method('GET', format_allowed=True)
def object_read(request, v_account, v_container, v_object):
# Normal Response Codes: 200, 206
# Error Response Codes: serviceUnavailable (503),
......@@ -344,43 +354,61 @@ def object_read(request, v_account, v_container, v_object):
except NameError:
raise ItemNotFound('Object does not exist')
response = HttpResponse()
put_object_meta(response, meta)
# Range handling.
range = get_range(request)
if range is not None:
offset, length = range
if offset < 0:
offset = meta['bytes'] + offset
if offset > meta['bytes'] or (length and offset + length > meta['bytes']):
raise RangeNotSatisfiable('Requested range exceeds object limits')
if not length:
length = -1
response['Content-Length'] = length # Update with the correct length.
response.status_code = 206
else:
offset = 0
length = -1
response.status_code = 200
# Conditions (according to RFC2616 must be evaluated at the end).
# Evaluate conditions.
validate_modification_preconditions(request, meta)
if_match = request.META.get('HTTP_IF_MATCH')
if if_match is not None and if_match != '*':
if meta['hash'] not in [x.lower() for x in parse_etags(if_match)]:
raise PreconditionFailed('Object Etag does not match')
if_none_match = request.META.get('HTTP_IF_NONE_MATCH')
if if_none_match is not None:
if if_none_match == '*' or meta['hash'] in [x.lower() for x in parse_etags(if_none_match)]:
raise NotModified('Object Etag matches')
try:
validate_matching_preconditions(request, meta)
except NotModified:
response = HttpResponse(status=304)
response['ETag'] = meta['hash']
return response
try:
response.content = backend.get_object(request.user, v_container, v_object, offset, length)
size, hashmap = backend.get_object_hashmap(v_account, v_container, v_object)
except NameError:
raise ItemNotFound('Object does not exist')
# Reply with the hashmap.
if request.serialization != 'text':
if request.serialization == 'xml':
data = render_to_string('hashes.xml', {'object': v_object, 'bytes': size, 'hashes': hashmap})
elif request.serialization == 'json':
data = json.dumps({'bytes': size, 'hashes': hashmap})
response = HttpResponse(data, status=200)
put_object_meta(response, meta)
response['Content-Length'] = len(data)
return response
# Range handling.
ranges = get_range(request, size)
if ranges is None:
ranges = [(0, size)]
ret = 200
else:
check = [True for offset, length in ranges if
length <= 0 or length > size or
offset < 0 or offset >= size or
offset + length > size]
if len(check) > 0:
raise RangeNotSatisfiable('Requested range exceeds object limits')
ret = 206
if ret == 206 and len(ranges) > 1:
boundary = uuid.uuid4().hex
else:
boundary = ''
wrapper = ObjectWrapper(request.user, v_container, v_object, ranges, size, hashmap, boundary)
response = HttpResponse(wrapper, status=ret)
put_object_meta(response, meta)
if ret == 206:
if len(ranges) == 1:
offset, length = ranges[0]
response['Content-Length'] = length # Update with the correct length.
response['Content-Range'] = 'bytes %d-%d/%d' % (offset, offset + length - 1, size)
else:
del(response['Content-Length'])
response['Content-Type'] = 'multipart/byteranges; boundary=%s' % (boundary,)
return response
@api_method('PUT')
......@@ -397,9 +425,7 @@ def object_write(request, v_account, v_container, v_object):
move_from = request.META.get('HTTP_X_MOVE_FROM')
if copy_from or move_from:
# TODO: Why is this required? Copy this ammount?
content_length = request.META.get('CONTENT_LENGTH')
if not content_length:
raise LengthRequired('Missing Content-Length header')
content_length = get_content_length(request)
if move_from:
copy_or_move_object(request, move_from, (v_container, v_object), move=True)
......@@ -410,15 +436,7 @@ def object_write(request, v_account, v_container, v_object):
meta = get_object_meta(request)
content_length = -1
if request.META.get('HTTP_TRANSFER_ENCODING') != 'chunked':
content_length = request.META.get('CONTENT_LENGTH')
if not content_length:
raise LengthRequired('Missing Content-Length header')
try:
content_length = int(content_length)
if content_length < 0:
raise ValueError
except ValueError:
raise BadRequest('Invalid Content-Length header')
content_length = get_content_length(request)
# Should be BadRequest, but API says otherwise.
if 'Content-Type' not in meta:
raise LengthRequired('Missing Content-Type header')
......@@ -426,26 +444,29 @@ def object_write(request, v_account, v_container, v_object):
md5 = hashlib.md5()
if content_length == 0:
try:
backend.update_object(request.user, v_container, v_object, '')
backend.update_object_hashmap(request.user, v_container, v_object, 0, [])
except NameError:
raise ItemNotFound('Container does not exist')
else:
size = 0
hashmap = []
sock = raw_input_socket(request)
offset = 0
for data in socket_read_iterator(sock, content_length):
for data in socket_read_iterator(sock, content_length, backend.block_size):
# TODO: Raise 408 (Request Timeout) if this takes too long.
# TODO: Raise 499 (Client Disconnect) if a length is defined and we stop before getting this much data.
size += len(data)
hashmap.append(backend.put_block(data))
md5.update(data)
try:
backend.update_object(request.user, v_container, v_object, data, offset)
except NameError:
raise ItemNotFound('Container does not exist')
offset += len(data)
meta['hash'] = md5.hexdigest().lower()
etag = request.META.get('HTTP_ETAG')
if etag and parse_etags(etag)[0].lower() != meta['hash']:
raise UnprocessableEntity('Object Etag does not match')
raise UnprocessableEntity('Object ETag does not match')
try:
backend.update_object_hashmap(request.user, v_container, v_object, size, hashmap)
except NameError:
raise ItemNotFound('Container does not exist')
try:
backend.update_object_meta(request.user, v_container, v_object, meta)
except NameError:
......@@ -492,12 +513,33 @@ def object_update(request, v_account, v_container, v_object):
# badRequest (400)
meta = get_object_meta(request)
if 'Content-Type' in meta:
content_type = meta.get('Content-Type')
if content_type:
del(meta['Content-Type']) # Do not allow changing the Content-Type.
try:
backend.update_object_meta(request.user, v_container, v_object, meta)
except NameError:
raise ItemNotFound('Object does not exist')
prev_meta = None
if len(meta) != 0:
try:
prev_meta = backend.get_object_meta(request.user, v_container, v_object)
except NameError:
raise ItemNotFound('Object does not exist')
# Keep previous values of 'Content-Type' and 'hash'.
for k in ('Content-Type', 'hash'):
if k in prev_meta:
meta[k] = prev_meta[k]
try:
backend.update_object_meta(request.user, v_container, v_object, meta, replace=True)
except NameError:
raise ItemNotFound('Object does not exist')
# Based on: http://code.google.com/p/gears/wiki/ContentRangePostProposal
content_range = request.META.get('HTTP_CONTENT_RANGE')
if not content_range:
return HttpResponse(status=202)
ranges = get_content_range(request)
if not ranges:
return HttpResponse(status=202)
return HttpResponse(status=202)
@api_method('DELETE')
......
{% spaceless %}
<?xml version="1.0" encoding="UTF-8"?>
<object name="{{ object }}" bytes="{{ bytes }}">
{% for hash in hashes %}
<hash>{{ hash }}</hash>
{% endfor %}
</object>
{% endspaceless %}
......@@ -5,15 +5,16 @@ from wsgiref.handlers import format_date_time
from django.conf import settings
from django.http import HttpResponse
from django.utils.http import http_date
from django.utils.http import http_date, parse_etags
from pithos.api.compat import parse_http_date_safe
from pithos.api.faults import (Fault, NotModified, BadRequest, ItemNotFound, PreconditionFailed,
ServiceUnavailable)
from pithos.api.faults import (Fault, NotModified, BadRequest, ItemNotFound, LengthRequired,
PreconditionFailed, ServiceUnavailable)
from pithos.backends import backend
import datetime
import logging
import re
logger = logging.getLogger(__name__)
......@@ -66,6 +67,7 @@ def put_container_meta(response, meta):
response['Last-Modified'] = http_date(int(meta['modified']))
for k in [x for x in meta.keys() if x.startswith('X-Container-Meta-')]:
response[k.encode('utf-8')] = meta[k].encode('utf-8')
response['X-Container-Object-Meta'] = [x[14:] for x in meta['object_meta'] if x.startswith('X-Object-Meta-')]
def get_object_meta(request):
"""Get metadata from an object request"""
......@@ -74,6 +76,8 @@ def get_object_meta(request):
meta['Content-Type'] = request.META['CONTENT_TYPE']
if request.META.get('HTTP_CONTENT_ENCODING'):
meta['Content-Encoding'] = request.META['HTTP_CONTENT_ENCODING']
if request.META.get('HTTP_CONTENT_DISPOSITION'):
meta['Content-Disposition'] = request.META['HTTP_CONTENT_DISPOSITION']
if request.META.get('HTTP_X_OBJECT_MANIFEST'):
meta['X-Object-Manifest'] = request.META['HTTP_X_OBJECT_MANIFEST']
return meta
......@@ -86,24 +90,42 @@ def put_object_meta(response, meta):
response['Last-Modified'] = http_date(int(meta['modified']))
for k in [x for x in meta.keys() if x.startswith('X-Object-Meta-')]:
response[k.encode('utf-8')] = meta[k].encode('utf-8')
for k in ('Content-Encoding', 'X-Object-Manifest'):
for k in ('Content-Encoding', 'Content-Disposition', 'X-Object-Manifest'):
if k in meta:
response[k] = meta[k]
def validate_modification_preconditions(request, meta):
"""Check that the modified timestamp conforms with the preconditions set"""
if 'modified' not in meta:
return # TODO: Always return?
if_modified_since = request.META.get('HTTP_IF_MODIFIED_SINCE')
if if_modified_since is not None:
if_modified_since = parse_http_date_safe(if_modified_since)
if if_modified_since is not None and 'modified' in meta and int(meta['modified']) <= if_modified_since:
if if_modified_since is not None and int(meta['modified']) <= if_modified_since:
raise NotModified('Object has not been modified')
if_unmodified_since = request.META.get('HTTP_IF_UNMODIFIED_SINCE')
if if_unmodified_since is not None:
if_unmodified_since = parse_http_date_safe(if_unmodified_since)
if if_unmodified_since is not None and 'modified' in meta and int(meta['modified']) > if_unmodified_since:
if if_unmodified_since is not None and int(meta['modified']) > if_unmodified_since:
raise PreconditionFailed('Object has been modified')
def validate_matching_preconditions(request, meta):
"""Check that the ETag conforms with the preconditions set"""
if 'hash' not in meta:
return # TODO: Always return?
if_match = request.META.get('HTTP_IF_MATCH')
if if_match is not None and if_match != '*':
if meta['hash'] not in [x.lower() for x in parse_etags(if_match)]:
raise PreconditionFailed('Object Etag does not match')
if_none_match = request.META.get('HTTP_IF_NONE_MATCH')
if if_none_match is not None:
if if_none_match == '*' or meta['hash'] in [x.lower() for x in parse_etags(if_none_match)]:
raise NotModified('Object Etag matches')
def copy_or_move_object(request, src_path, dest_path, move=False):
"""Copy or move an object"""
if type(src_path) == str:
......@@ -114,7 +136,6 @@ def copy_or_move_object(request, src_path, dest_path, move=False):
src_name = '/'.join(parts[2:])
elif type(src_path) == tuple and len(src_path) == 2:
src_container, src_name = src_path
if type(dest_path) == str:
parts = dest_path.split('/')
if len(parts) < 3 or parts[0] != '':
......@@ -123,57 +144,114 @@ def copy_or_move_object(request, src_path, dest_path, move=False):
dest_name = '/'.join(parts[2:])
elif type(dest_path) == tuple and len(dest_path) == 2:
dest_container, dest_name = dest_path
meta = get_object_meta(request)
# Keep previous values of 'Content-Type' (if a new one is absent) and 'hash'.
try:
src_meta = backend.get_object_meta(request.user, src_container, src_name)
except NameError:
raise ItemNotFound('Container or object does not exist')
if 'Content-Type' in meta and 'Content-Type' in src_meta:
del(src_meta['Content-Type'])
for k in ('Content-Type', 'hash'):
if k in src_meta:
meta[k] = src_meta[k]
try:
if move:
backend.move_object(request.user, src_container, src_name, dest_container, dest_name, meta)
backend.move_object(request.user, src_container, src_name, dest_container, dest_name, meta, replace_meta=True)
else:
backend.copy_object(request.user, src_container, src_name, dest_container, dest_name, meta)
backend.copy_object(request.user, src_container, src_name, dest_container, dest_name, meta, replace_meta=True)
except NameError:
raise ItemNotFound('Container or object does not exist')
def get_range(request):
def get_content_length(request):
content_length = request.META.get('CONTENT_LENGTH')
if not content_length:
raise LengthRequired('Missing Content-Length header')
try:
content_length = int(content_length)
if content_length < 0:
raise ValueError
except ValueError:
raise BadRequest('Invalid Content-Length header')
return content_length
def get_range(request, size):
"""Parse a Range header from the request
Either returns None, or an (offset, length) tuple.
If no length is defined length is None.
May return a negative offset (offset from the end).
Either returns None, when the header is not existent or should be ignored,
or a list of (offset, length) tuples - should be further checked.
"""
range = request.META.get('HTTP_RANGE', '').replace(' ', '')
if not range.startswith('bytes='):
ranges = request.META.get('HTTP_RANGE', '').replace(' ', '')
if not ranges.startswith('bytes='):
return None
parts = range[6:].split('-')
if len(parts) != 2:
return None
offset, upto = parts
if offset == '' and upto == '':
return None
if offset != '':
try:
offset = int(offset)
except ValueError:
ret = []
for r in (x.strip() for x in ranges[6:].split(',')):
p = re.compile('^(?P<offset>\d*)-(?P<upto>\d*)$')
m = p.match(r)
if not m:
return None
offset = m.group('offset')
upto = m.group('upto')
if offset == '' and upto == '':
return None
if upto != '':
try:
if offset != '':
offset = int(offset)
if upto != '':
upto = int(upto)
except ValueError:
return None
if offset > upto:
return None
ret.append((offset, upto - offset + 1))
else:
ret.append((offset, size - offset))
else:
return (offset, None)
if offset > upto:
return None
return (offset, upto - offset + 1)
length = int(upto)
ret.append((size - length, length))
return ret
def get_content_range(request):
"""Parse a Content-Range header from the request
Either returns None, when the header is not existent or should be ignored,
or an (offset, length, total) tuple - check as length, total may be None.
Returns (None, None, None) if the provided range is '*/*'.
"""
ranges = request.META.get('HTTP_CONTENT_RANGE', '')
if not ranges:
return None
p = re.compile('^bytes (?P<offset>\d+)-(?P<upto>\d*)/(?P<total>(\d+|\*))$')
m = p.match(ranges)
if not m:
if ranges == 'bytes */*':
return (None, None, None)
return None
offset = int(m.group('offset'))
upto = m.group('upto')
total = m.group('total')
if upto != '':
upto = int(upto)
else:
try:
offset = -int(upto)
except ValueError:
return None
return (offset, None)
upto = None
if total != '*':
total = int(total)
else:
total = None
if (upto and offset > upto) or \
(total and offset >= total) or \
(total and upto and upto >= total):
return None
if not upto:
length = None
else:
length = upto - offset + 1
return (offset, length, total)
def raw_input_socket(request):
"""Return the socket for reading the rest of the request"""
......@@ -190,15 +268,23 @@ def raw_input_socket(request):
MAX_UPLOAD_SIZE = 10 * (1024 * 1024) # 10MB
def socket_read_iterator(sock, length=-1, blocksize=4096):
def socket_read_iterator(sock, length=0, blocksize=4096):
"""Return a maximum of blocksize data read from the socket in each iteration
Read up to 'length'. If no 'length' is defined, will attempt a chunked read.
Read up to 'length'. If 'length' is negative, will attempt a chunked read.
The maximum ammount of data read is controlled by MAX_UPLOAD_SIZE.
"""
if length < 0: # Chunked transfers
data = ''
while length < MAX_UPLOAD_SIZE:
chunk_length = sock.readline()
# Get chunk size.
if hasattr(sock, 'readline'):
chunk_length = sock.readline()
else:
chunk_length = ''
while chunk_length[-1:] != '\n':
chunk_length += sock.read(1)
chunk_length.strip()
pos = chunk_length.find(';')
if pos >= 0:
chunk_length = chunk_length[:pos]
......@@ -206,14 +292,22 @@ def socket_read_iterator(sock, length=-1, blocksize=4096):
chunk_length = int(chunk_length, 16)
except Exception, e:
raise BadRequest('Bad chunk size') # TODO: Change to something more appropriate.
# Check if done.
if chunk_length == 0:
if len(data) > 0:
yield data
return
# Get the actual data.
while chunk_length > 0:
data = sock.read(min(chunk_length, blocksize))
chunk_length -= len(data)
length += len(data)
yield data
data = sock.read(2) # CRLF
chunk = sock.read(min(chunk_length, blocksize))
chunk_length -= len(chunk)
length += len(chunk)
data += chunk
if len(data) >= blocksize:
ret = data[:blocksize]
data = data[blocksize:]
yield ret
sock.read(2) # CRLF