Commit 41aeda06 authored by Chrysostomos Nanakos's avatar Chrysostomos Nanakos
Browse files

pithos: Use Archipelago mapper for storing mapfile

parent 4dbf3d42
......@@ -18,13 +18,13 @@ import ctypes
import ConfigParser
import logging
from context_archipelago import ArchipelagoObject
from archipelago.common import (
Request,
xseg_reply_info,
xseg_reply_map,
xseg_reply_map_scatterlist,
string_at,
XF_ASSUMEV0,
XF_MAPFLAG_READONLY,
)
from pithos.workers import (
......@@ -36,6 +36,7 @@ monkey.patch_Request()
logger = logging.getLogger(__name__)
class ArchipelagoMapper(object):
"""Mapper.
Required constructor parameters: namelen.
......@@ -52,72 +53,18 @@ class ArchipelagoMapper(object):
self.dst_port = int(cfg.getint('mapperd', 'blockerm_port'))
self.mapperd_port = int(cfg.getint('vlmcd', 'mapper_port'))
def _get_rear_map(self, maphash, create=0):
name = hexlify(maphash)
return ArchipelagoObject(name, self.ioctx_pool, self.dst_port, create)
def _check_rear_map(self, maphash):
name = hexlify(maphash)
ioctx = self.ioctx_pool.pool_get()
req = Request.get_info_request(ioctx, self.dst_port, name)
req.submit()
req.wait()
ret = req.success()
req.put()
self.ioctx_pool.pool_put(ioctx)
if ret:
return True
else:
return False
def map_retr(self, maphash, blkoff=0, nr=100000000000000):
def map_retr(self, maphash, size):
"""Return as a list, part of the hashes map of an object
at the given block offset.
By default, return the whole hashes map.
"""
namelen = self.namelen
hashes = ()
ioctx = self.ioctx_pool.pool_get()
req = Request.get_info_request(ioctx, self.dst_port,
hexlify(maphash))
req.submit()
req.wait()
ret = req.success()
if ret:
info = req.get_data(_type=xseg_reply_info)
size = int(info.contents.size)
req.put()
else:
req.put()
self.ioctx_pool.pool_put(ioctx)
raise RuntimeError("Hashmap '%s' doesn't exists" %
hexlify(maphash))
req = Request.get_read_request(ioctx, self.dst_port,
hexlify(maphash), size=size)
req.submit()
req.wait()
ret = req.success()
if ret:
data = string_at(req.get_data(), size)
req.put()
self.ioctx_pool.pool_put(ioctx)
for idx in xrange(0, len(data), namelen):
hashes = hashes + (data[idx:idx + namelen],)
hashes = list(hashes)
else:
req.put()
self.ioctx_pool.pool_put(ioctx)
raise RuntimeError("Hashmap '%s' doesn't exists" %
hexlify(maphash))
return hashes
def map_retr_archipelago(self, maphash, size):
"""Retrieve Archipelago mapfile"""
hashes = []
ioctx = self.ioctx_pool.pool_get()
maphash = maphash.split("archip:")[1]
req = Request.get_mapr_request(ioctx, self.mapperd_port, maphash,
offset=0, size=size)
req = Request.get_mapr_request(ioctx, self.mapperd_port,
hexlify(maphash), offset=0, size=size)
flags = req.get_flags()
flags |= XF_ASSUMEV0
req.set_flags(flags)
req.submit()
req.wait()
ret = req.success()
......@@ -126,27 +73,42 @@ class ArchipelagoMapper(object):
Segsarray = xseg_reply_map_scatterlist * data.contents.cnt
segs = Segsarray.from_address(ctypes.addressof(data.contents.segs))
hashes = [string_at(segs[idx].target, segs[idx].targetlen)
for idx in xrange(len(segs))]
for idx in xrange(len(segs))]
req.put()
else:
req.put()
self.ioctx_pool.pool_put(ioctx)
raise Exception("Could not retrieve Archipelago mapfile.")
req = Request.get_close_request(ioctx, self.mapperd_port, maphash);
req = Request.get_close_request(ioctx, self.mapperd_port,
hexlify(maphash))
req.submit()
req.wait()
ret = req.success();
ret = req.success()
if ret is False:
logger.warning("Could not close map %s" % maphash)
logger.warning("Could not close map %s" % hexlify(maphash))
pass
req.put();
req.put()
self.ioctx_pool.pool_put(ioctx)
return hashes
def map_stor(self, maphash, hashes=(), blkoff=0, create=1):
def map_stor(self, maphash, hashes, size, block_size):
"""Store hashes in the given hashes map."""
namelen = self.namelen
if self._check_rear_map(maphash):
return
with self._get_rear_map(maphash, 1) as rmap:
rmap.sync_write_chunks(namelen, blkoff, hashes, None)
objects = list()
for h in hashes:
objects.append({'name': hexlify(h), 'flags': XF_MAPFLAG_READONLY})
ioctx = self.ioctx_pool.pool_get()
req = Request.get_create_request(ioctx, self.mapperd_port,
hexlify(maphash),
mapflags=XF_MAPFLAG_READONLY,
objects=objects, blocksize=block_size,
size=size)
req.submit()
req.wait()
ret = req.success()
if ret is False:
req.put()
self.ioctx_pool.pool_put(ioctx)
raise IOError("Could not write map %s" % hexlify(maphash))
req.put()
self.ioctx_pool.pool_put(ioctx)
......@@ -25,19 +25,13 @@ class Mapper(object):
def __init__(self, **params):
self.archip_map = ArchipelagoMapper(**params)
def map_retr(self, maphash, blkoff=0, nr=100000000000000):
def map_retr(self, maphash, size):
"""Return as a list, part of the hashes map of an object
at the given block offset.
By default, return the whole hashes map.
"""
return self.archip_map.map_retr(maphash, blkoff, nr)
return self.archip_map.map_retr(maphash, size)
def map_retr_archipelago(self, maphash, size):
"""Return as a list the hashes map of an Archipelago
Volume.
"""
return self.archip_map.map_retr_archipelago(maphash, size)
def map_stor(self, maphash, hashes=(), blkoff=0, create=1):
def map_stor(self, maphash, hashes, blocksize, size):
"""Store hashes in the given hashes map."""
self.archip_map.map_stor(maphash, hashes, blkoff, create)
self.archip_map.map_stor(maphash, hashes, blocksize, size)
......@@ -40,14 +40,11 @@ class Store(object):
}
self.mapper = Mapper(**pm)
def map_get(self, name):
return self.mapper.map_retr(name)
def map_get(self, name, size):
return self.mapper.map_retr(name, size)
def map_get_archipelago(self, name, size):
return self.mapper.map_retr_archipelago(name, size)
def map_put(self, name, map):
self.mapper.map_stor(name, map)
def map_put(self, name, map, size, block_size):
self.mapper.map_stor(name, map, size, block_size)
def map_delete(self, name):
pass
......
......@@ -1163,7 +1163,7 @@ class ModularBackend(BaseBackend):
return props[self.SIZE], [x for x in hashmap]
else:
hashmap = self.store.map_get(self._unhexlify_hash(
props[self.HASH]))
props[self.HASH]), props[self.SIZE])
return props[self.SIZE], [binascii.hexlify(x) for x in hashmap]
def _update_object_hash(self, user, account, container, name, size, type,
......@@ -1322,7 +1322,7 @@ class ModularBackend(BaseBackend):
dest_version_id, _ = self._update_object_hash(
user, account, container, name, size, type, hexlified, checksum,
domain, meta, replace_meta, permissions)
self.store.map_put(hash, map)
self.store.map_put(hash, map, size, self.block_size)
return dest_version_id, hexlified
@debug_method
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment