Commit 6ec7d6e0 authored by Antony Chazapis's avatar Antony Chazapis
Browse files

Merge API and backend.

parent af486b4c
......@@ -20,7 +20,6 @@ from os import path
STORAGE_PATH = path.join(PROJECT_PATH, 'data')
from pithos.backends.dummy import BackEnd
from pithos.backends.dummy_debug import *
import logging
......@@ -154,6 +153,7 @@ def container_list(request, v_account):
containers = [be.get_container_meta(request.user, x) for x in containers]
except NameError:
raise ItemNotFound()
# TODO: Format dates.
if request.serialization == 'xml':
data = render_to_string('containers.xml', {'account': request.user, 'containers': containers})
elif request.serialization == 'json':
......@@ -246,8 +246,6 @@ def container_delete(request, v_account, v_container):
raise ItemNotFound()
return HttpResponse(status = 204)
# --- MERGED UP TO HERE ---
@api_method('GET', format_allowed = True)
def object_list(request, v_account, v_container):
# Normal Response Codes: 200, 204
......@@ -259,7 +257,6 @@ def object_list(request, v_account, v_container):
path = request.GET.get('path')
prefix = request.GET.get('prefix')
delimiter = request.GET.get('delimiter')
logging.debug("path: %s", path)
# Path overrides prefix and delimiter.
if path:
......@@ -268,6 +265,8 @@ def object_list(request, v_account, v_container):
# Naming policy.
if prefix and delimiter:
prefix = prefix + delimiter
if not prefix:
prefix = ''
marker = request.GET.get('marker')
limit = request.GET.get('limit')
......@@ -275,19 +274,30 @@ def object_list(request, v_account, v_container):
try:
limit = int(limit)
except ValueError:
limit = None
limit = 10000
objects = list_objects(request.user, v_container, prefix, delimiter, marker, limit)
be = BackEnd(STORAGE_PATH)
try:
objects = be.list_objects(request.user, v_container, prefix, delimiter, marker, limit)
except NameError:
raise ItemNotFound()
# TODO: The cloudfiles python bindings expect 200 if json/xml.
if len(objects) == 0:
return HttpResponse(status = 204)
if request.serialization == 'text':
return HttpResponse('\n'.join(objects), status = 200)
# TODO: Do this with a backend parameter?
try:
objects = [be.get_object_meta(request.user, v_container, x) for x in objects]
except NameError:
raise ItemNotFound()
# TODO: Format dates.
if request.serialization == 'xml':
data = render_to_string('objects.xml', {'container': v_container, 'objects': objects})
elif request.serialization == 'json':
data = json.dumps(objects)
else:
data = '\n'.join(x['name'] for x in objects)
return HttpResponse(data, status = 200)
@api_method('HEAD')
......@@ -297,16 +307,20 @@ def object_meta(request, v_account, v_container, v_object):
# itemNotFound (404),
# unauthorized (401),
# badRequest (400)
info = get_object_meta(request.user, v_container, v_object)
be = BackEnd(STORAGE_PATH)
try:
info = be.get_object_meta(request.user, v_container, v_object)
except NameError:
raise ItemNotFound()
response = HttpResponse(status = 204)
response['ETag'] = info['hash']
response['Content-Length'] = info['bytes']
response['Content-Type'] = info['content_type']
response['Last-Modified'] = http_date(info['last_modified'])
for k, v in info['meta'].iteritems():
response['X-Object-Meta-%s' % k.capitalize()] = v
for k in [x for x in info.keys() if x.startswith('X-Object-Meta-')]:
response[k] = info[k]
return response
......@@ -321,8 +335,13 @@ def object_read(request, v_account, v_container, v_object):
# badRequest (400),
# notModified (304)
info = get_object_meta(request.user, v_container, v_object)
be = BackEnd(STORAGE_PATH)
try:
info = be.get_object_meta(request.user, v_container, v_object)
except NameError:
raise ItemNotFound()
# TODO: Check if the cloudfiles python bindings expect hash/content_type/last_modified on range requests.
response = HttpResponse()
response['ETag'] = info['hash']
response['Content-Type'] = info['content_type']
......@@ -332,30 +351,35 @@ def object_read(request, v_account, v_container, v_object):
range = get_range(request)
if range is not None:
offset, length = range
if length:
if offset + length > info['bytes']:
raise RangeNotSatisfiable()
else:
if offset > info['bytes']:
raise RangeNotSatisfiable()
if not length:
length = 0
if offset + length > info['bytes']:
raise RangeNotSatisfiable()
length = -1
response['Content-Length'] = length
response.status_code = 206
else:
offset = 0
length = 0
length = -1
response['Content-Length'] = info['bytes']
response.status_code = 200
# Conditions (according to RFC2616 must be evaluated at the end).
# TODO: Check etag/date conditions.
if_match = request.META.get('HTTP_IF_MATCH')
if if_match is not None and if_match != '*':
if info['hash'] not in parse_etags(if_match):
raise PreconditionFailed()
if_none_match = request.META.get('HTTP_IF_NONE_MATCH')
# if if_none_match is not None:
# if if_none_match = '*' or info['hash'] in parse_etags(if_none_match):
# raise NotModified()
if if_none_match is not None:
if if_none_match == '*' or info['hash'] in parse_etags(if_none_match):
raise NotModified()
if_modified_since = request.META.get('HTTP_IF_MODIFIED_SINCE')
if if_modified_since is not None:
......@@ -369,7 +393,11 @@ def object_read(request, v_account, v_container, v_object):
if if_unmodified_since is not None and info['last_modified'] > if_unmodified_since:
raise PreconditionFailed()
response.content = get_object_data(request.user, v_container, v_object, offset, length)
try:
response.content = be.get_object(request.user, v_container, v_object, offset, length)
except NameError:
raise ItemNotFound()
return response
@api_method('PUT')
......@@ -382,6 +410,8 @@ def object_write(request, v_account, v_container, v_object):
# unauthorized (401),
# badRequest (400)
be = BackEnd(STORAGE_PATH)
copy_from = request.META.get('HTTP_X_COPY_FROM')
if copy_from:
parts = copy_from.split('/')
......@@ -390,21 +420,27 @@ def object_write(request, v_account, v_container, v_object):
copy_container = parts[1]
copy_name = '/'.join(parts[2:])
info = get_object_meta(request.user, copy_container, copy_name)
try:
info = be.get_object_meta(request.user, copy_container, copy_name)
except NameError:
raise ItemNotFound()
content_length = request.META.get('CONTENT_LENGTH')
content_type = request.META.get('CONTENT_TYPE')
# TODO: Why is this required? Copy this ammount?
if not content_length:
raise LengthRequired()
if content_type:
info['content_type'] = content_type
meta = get_object_meta(request)
for k, v in meta.iteritems():
info['meta'][k] = v
meta = get_meta(request, 'X-Object-Meta-')
info.update(meta)
copy_object(request.user, copy_container, copy_name, v_container, v_object)
update_object_meta(request.user, v_container, v_object, info)
try:
be.copy_object(request.user, copy_container, copy_name, v_container, v_object)
be.update_object_meta(request.user, v_container, v_object, info)
except NameError:
raise ItemNotFound()
response = HttpResponse(status = 201)
else:
......@@ -412,25 +448,29 @@ def object_write(request, v_account, v_container, v_object):
content_type = request.META.get('CONTENT_TYPE')
if not content_length or not content_type:
raise LengthRequired()
info = {'content_type': content_type}
meta = get_meta(request, 'X-Object-Meta-')
info = {'bytes': content_length, 'content_type': content_type, 'meta': meta}
info.update(meta)
data = request.raw_post_data
try:
be.update_object(request.user, v_container, v_object, data)
be.update_object_meta(request.user, v_container, v_object, info)
except NameError:
raise ItemNotFound()
# TODO: Check before update?
info = be.get_object_meta(request.user, v_container, v_object)
etag = request.META.get('HTTP_ETAG')
if etag:
etag = parse_etags(etag)[0] # TODO: Unescape properly.
info['hash'] = etag
data = request.read()
# TODO: Hash function.
# etag = hash(data)
# if info.get('hash') and info['hash'] != etag:
# raise UnprocessableEntity()
update_object_data(request.user, v_container, v_name, info, data)
if etag != info['hash']:
be.delete_object(request.user, v_container, v_object)
raise UnprocessableEntity()
response = HttpResponse(status = 201)
# response['ETag'] = etag
response['ETag'] = info['hash']
return response
......@@ -452,18 +492,23 @@ def object_copy(request, v_account, v_container, v_object):
dest_container = parts[1]
dest_name = '/'.join(parts[2:])
info = get_object_meta(request.user, v_container, v_object)
be = BackEnd(STORAGE_PATH)
try:
info = be.get_object_meta(request.user, v_container, v_object)
except NameError:
raise ItemNotFound()
content_type = request.META.get('CONTENT_TYPE')
if content_type:
info['content_type'] = content_type
meta = get_meta(request, 'X-Object-Meta-')
for k, v in meta.iteritems():
info['meta'][k] = v
info.update(meta)
copy_object(request.user, v_container, v_object, dest_container, dest_name)
update_object_meta(request.user, dest_container, dest_name, info)
try:
be.copy_object(request.user, v_container, v_object, dest_container, dest_name)
be.update_object_meta(request.user, dest_container, dest_name, info)
except NameError:
raise ItemNotFound()
response = HttpResponse(status = 201)
......@@ -475,9 +520,6 @@ def object_update(request, v_account, v_container, v_object):
# unauthorized (401),
# badRequest (400)
meta = get_meta(request, 'X-Object-Meta-')
update_object_meta(request.user, v_container, v_object, meta)
return HttpResponse(status = 202)
@api_method('DELETE')
......@@ -488,7 +530,11 @@ def object_delete(request, v_account, v_container, v_object):
# unauthorized (401),
# badRequest (400)
delete_object(request.user, v_container, v_object)
be = BackEnd(STORAGE_PATH)
try:
be.delete_object(request.user, v_container, v_object)
except NameError:
raise ItemNotFound()
return HttpResponse(status = 204)
@api_method()
......
......@@ -23,4 +23,39 @@ cont = conn.get_container(container)
print 'Got container %s.' % container
print 'Object count: %s Total bytes: %s' % (cont.object_count, cont.size_used)
objects = cont.list_objects()
print 'Found: %d objects' % len(objects)
for object in objects:
print object
cont.delete_object(object)
object = 'test_file'
obj = cont.create_object(object)
obj.content_type = 'text/plain'
obj.metadata['blah'] = 'aldsjflkajdsflk'
obj.write('asdfasdfasdf')
obj.metadata
print ''
print 'OBJECT'
print 'Name: %s' % obj.name
print 'Content Type: %s' % obj.content_type
print 'Size: %s' % obj.size
print 'Last Modified: %s' % obj.last_modified
print 'Container: %s' % obj.container
print 'Metadata: %s' % obj.metadata
obj = cont.get_object(object)
data = obj.read()
print ''
print 'OBJECT'
print 'Name: %s' % obj.name
print 'Content Type: %s' % obj.content_type
print 'Size: %s' % obj.size
print 'Last Modified: %s' % obj.last_modified
print 'Container: %s' % obj.container
print 'Metadata: %s' % obj.metadata
print 'Data: %s' % data
cont.delete_object(object)
conn.delete_container(container)
......@@ -4,6 +4,7 @@ import json
import logging
import types
import hashlib
import shutil
logger = logging.getLogger(__name__)
formatter = logging.Formatter('[%(levelname)s] %(message)s')
......@@ -33,6 +34,7 @@ class BackEnd:
self.con.commit()
# TODO: Create/delete account?
# TODO: Catch OSError exceptions.
def get_account_meta(self, account):
"""
......@@ -125,89 +127,44 @@ class BackEnd:
if not os.path.exists(fullname):
raise NameError('Account does not exist')
containers = os.listdir(fullname)
start = 0
if marker:
try:
start = containers.index(marker)
except ValueError:
pass
return containers[start:limit]
# def __get_linkinfo(self, path):
# c = self.con.execute('select rowid from objects where name=''?''', (path,))
# row = c.fetchone()
# if row:
# return str(row[0])
# else:
# raise NameError('Requested path not found')
#
# def __put_linkinfo(self, path):
# id = self.con.execute('insert into objects (name) values (?)', (path,)).lastrowid
# self.con.commit()
# return id
def __del_dbpath(self, path):
self.con.execute('delete from metadata where object_id in (select rowid from objects where name = ''?'')', (path,))
self.con.execute('delete from objects where name = ''?''', (path,))
self.con.commit()
return
def __get_metadata(self, path):
c = self.con.execute('select m.name, m.value from metadata m, objects o where o.rowid = m.object_id and o.name = ''?''', (path,))
return dict(c.fetchall())
def __put_metadata(self, path, meta):
c = self.con.execute('select rowid from objects where name=''?''', (path,))
row = c.fetchone()
if row:
link = str(row[0])
else:
link = self.con.execute('insert into objects (name) values (?)', (path,)).lastrowid
for k, v in meta.iteritems():
self.con.execute('insert or replace into metadata (object_id, name, value) values (?, ?, ?)', (link, k, v))
self.con.commit()
return
def __object_hash(self, location, block_size = 8192):
md5 = hashlib.md5()
f = open(location, 'r')
while True:
data = f.read(block_size)
if not data:
break
md5.update(data)
f.close()
return md5.hexdigest()
# --- MERGED UP TO HERE ---
if not limit or limit > 10000:
limit = 10000
return containers[start:start + limit]
def list_objects(self, account, container, prefix='', delimiter=None, marker = None, limit = 10000):
def list_objects(self, account, container, prefix = '', delimiter = None, marker = None, limit = 10000):
"""
returns a list of the objects existing under a specific account container
returns a list of objects existing under a container
"""
logger.info("list_objects: %s %s %s %s %s %s", account, container, prefix, delimiter, marker, limit)
dir = os.path.join(self.basepath, account, container)
if not os.path.exists(dir):
fullname = os.path.join(self.basepath, account, container)
if not os.path.exists(fullname):
raise NameError('Container does not exist')
p1 = ''.join(['%', prefix, '%'])
p2 = '/'.join([account, container, '%'])
search_str = (prefix and [p1] or [p2])[0]
c = self.con.execute('select * from objects where name like ''?'' order by name', (search_str,))
objects = c.fetchall()
while prefix.startswith('/'):
prefix = prefix[1:]
# TODO: Test this with various prefixes. Does '//' bother it?
prefix = os.path.join(account, container, prefix)
c = self.con.execute('select * from objects where name like ''?'' order by name', (os.path.join(prefix, '%'),))
objects = [x[0][len(prefix):] for x in c.fetchall()]
if delimiter:
pseudo_objects = {}
for x in objects:
pseudo_name = x[0][len(prefix):]
pseudo_name = x
i = pseudo_name.find(delimiter)
if i != -1:
pseudo_name = pseudo_name[:i]
#TODO: Virtual directories.
# TODO: Virtual directories.
pseudo_objects[pseudo_name] = x
objects = pseudo_objects.keys()
start = 0
if marker:
try:
......@@ -216,127 +173,165 @@ class BackEnd:
pass
if not limit or limit > 10000:
limit = 10000
return objects[start:start + limit]
def get_object_meta(self, account, container, name, keys=None):
dir = os.path.join(self.basepath, account, container)
if not os.path.exists(dir):
def get_object_meta(self, account, container, name, keys = None):
"""
returns a dictionary with the object metadata
"""
logger.info("get_object_meta: %s %s %s %s", account, container, name, keys)
fullname = os.path.join(self.basepath, account, container)
if not os.path.exists(fullname):
raise NameError('Container does not exist')
link = self.__get_object_linkinfo(os.path.join(account, container, name))
c = self.con.execute('select name, value from metadata where object_id = ''?''', (link,))
l = c.fetchall()
if keys:
l = [elem for elem in l if elem[0] in keys]
meta = {}
for e in l:
meta[e[0]] = e[1]
link = self.__get_linkinfo(os.path.join(account, container, name))
location = os.path.join(self.basepath, account, container, link)
size = os.path.getsize(location)
mtime = os.path.getmtime(location)
meta = self.__get_metadata(os.path.join(account, container, name))
meta.update({'name': name, 'bytes': size, 'last_modified': mtime})
if 'hash' not in meta:
meta['hash'] = self.__object_hash(location)
if 'content_type' not in meta:
meta['content_type'] = 'application/octet-stream'
return meta
def get_object_data(self, account, container, name, offset=0, length=-1):
dir = os.path.join(self.basepath, account, container)
if not os.path.exists(dir):
def update_object_meta(self, account, container, name, meta):
"""
updates the metadata associated with the object
"""
logger.info("update_object_meta: %s %s %s %s", account, container, name, meta)
fullname = os.path.join(self.basepath, account, container)
if not os.path.exists(fullname):
raise NameError('Container does not exist')
else:
os.chdir(dir)
location = self.__get_object_linkinfo(os.path.join(account, container, name))
self.__put_metadata(os.path.join(account, container, name), meta)
return
def get_object(self, account, container, name, offset = 0, length = -1):
"""
returns the object data
"""
logger.info("get_object: %s %s %s %s %s", account, container, name, offset, length)
fullname = os.path.join(self.basepath, account, container)
if not os.path.exists(fullname):
raise NameError('Container does not exist')
link = self.__get_linkinfo(os.path.join(account, container, name))
location = os.path.join(self.basepath, account, container, link)
f = open(location, 'r')
if offset:
f.seek(offset)
data = f.read(length)
f.close()
return data
def update_object(self, account, container, name, data):
dir = os.path.join(self.basepath, account, container)
if not os.path.exists(dir):
def update_object(self, account, container, name, data, offset = 0):
"""
creates/updates an object with the specified data
"""
logger.info("put_object: %s %s %s %s %s", account, container, name, data, offset)
fullname = os.path.join(self.basepath, account, container)
if not os.path.exists(fullname):
raise NameError('Container does not exist')
try:
location = self.__get_object_linkinfo(os.path.join(account, container, name))
link = self.__get_linkinfo(os.path.join(account, container, name))
except NameError:
# new object
location = str(self.__save_linkinfo(os.path.join(account, container, name)))
self.__store_data(location, account, container, data)
link = self.__put_linkinfo(os.path.join(account, container, name))
location = os.path.join(self.basepath, account, container, link)
f = open(location, 'w')
if offset:
f.seek(offset)
f.write(data)
f.close()
self.__put_metadata(os.path.join(account, container, name), {'hash': self.__object_hash(location)})
return
def update_object_meta(self, account, container, name, meta):
dir = os.path.join(self.basepath, account, container)
if not os.path.exists(dir):
raise NameError('Container does not exist')
def copy_object(self, account, src_container, src_name, dest_container, dest_name):
"""
copies an object
"""
logger.info("copy_object: %s %s %s %s %s", account, src_container, src_name, dest_container, dest_name)
link = self.__get_linkinfo(os.path.join(account, src_container, src_name))
src_location = os.path.join(self.basepath, account, src_container, link)
dest_fullname = os.path.join(self.basepath, account, dest_container)
if not os.path.exists(dest_fullname):
raise NameError('Destination container does not exist')
try:
location = self.__get_object_linkinfo(os.path.join(account, container, name))
link = self.__get_linkinfo(os.path.join(account, dest_container, dest_name))
except NameError:
# new object
location = str(self.__save_linkinfo(os.path.join(account, container, name)))
self.__store_metadata(location, account, container, meta)
return
def copy_object(self, account, src_container, src_name, dest_container, dest_name, meta):
fullname = os.path.join(self.basepath, account, dest_container)
if not os.path.exists(fullname):
raise NameError('Destination container does not exist')
data = self.get_object_data(account, src_container, src_name)
self.update_object(account, dest_container, dest_name, data)
src_object_meta = self.get_object_meta(account, src_container, src_name)
if (type(src_object_meta) == types.DictType):
distinct_keys = [k for k in src_object_meta.keys() if k not in meta.keys()]
for k in distinct_keys:
meta[k] = src_object_meta[k]
self.update_object_meta(account, dest_container, dest_name, meta)
else: