Commit df79206f authored by Giorgos Verigakis's avatar Giorgos Verigakis
Browse files

Refactored networking

Adds requests dependency.
parent d8d801e0
This diff is collapsed.
......@@ -31,26 +31,122 @@
# interpreted as representing official policies, either expressed
# or implied, of GRNET S.A.
import json
import logging
import requests
from requests.auth import AuthBase
sendlog = logging.getLogger('clients.send')
recvlog = logging.getLogger('clients.recv')
# Add a convenience json property to the responses
def _json(self):
try:
return json.loads(self.content)
except ValueError:
raise ClientError("Invalid JSON reply", self.status_code)
requests.Response.json = property(_json)
# Add a convenience status property to the responses
def _status(self):
return requests.status_codes._codes[self.status_code][0].upper()
requests.Response.status = property(_status)
class XAuthTokenAuth(AuthBase):
def __init__(self, token):
self.token = token
def __call__(self, r):
r.headers['X-Auth-Token'] = self.token
return r
class ClientError(Exception):
def __init__(self, message, status=0, details=''):
self.message = message
self.status = status
self.details = details
def __int__(self):
return int(self.status)
def __str__(self):
r = self.message
if self.status:
r += "\nHTTP Status: %d" % self.status
if self.details:
r += "\nDetails: \n%s" % self.details
class Client(object):
def __init__(self, base_url, token, include=False, verbose=False):
self.base_url = base_url
self.auth = XAuthTokenAuth(token)
self.include = include
self.verbose = verbose
def raise_for_status(self, r):
if 400 <= r.status_code < 500:
message, sep, details = r.text.partition('\n')
elif 500 <= r.status_code < 600:
message = '%d Server Error' % (r.status_code,)
details = r.text
else:
message = '%d Unknown Error' % (r.status_code,)
details = r.text
message = message or "HTTP Error %d" % (r.status_code,)
raise ClientError(message, r.status_code, details)
def request(self, method, path, **kwargs):
raw = kwargs.pop('raw', False)
success = kwargs.pop('success', 200)
if 'json' in kwargs:
data = json.dumps(kwargs.pop('json'))
kwargs['data'] = data
headers = kwargs.setdefault('headers', {})
headers['content-type'] = 'application/json'
url = self.base_url + path
kwargs.setdefault('auth', self.auth)
r = requests.request(method, url, **kwargs)
req = r.request
sendlog.info('%s %s', req.method, req.url)
for key, val in req.headers.items():
sendlog.info('%s: %s', key, val)
sendlog.info('')
if req.data:
sendlog.info('%s', req.data)
recvlog.info('%d %s', r.status_code, r.status)
for key, val in r.headers.items():
recvlog.info('%s: %s', key, val)
recvlog.info('')
if not raw and r.text:
recvlog.debug(r.text)
if success is not None:
# Success can either be an in or a collection
success = (success,) if isinstance(success, int) else success
if r.status_code not in success:
self.raise_for_status(r)
return r
def delete(self, path, **kwargs):
return self.request('delete', path, **kwargs)
def get(self, path, **kwargs):
return self.request('get', path, **kwargs)
def head(self, path, **kwargs):
return self.request('head', path, **kwargs)
def post(self, path, **kwargs):
return self.request('post', path, **kwargs)
def put(self, path, **kwargs):
return self.request('put', path, **kwargs)
from .compute import ComputeClient
from .image import ImageClient
from .storage import StorageClient
from .cyclades import CycladesClient
from .pithos import PithosClient
from .compute import ComputeClient as compute
from .image import ImageClient as image
from .storage import StorageClient as storage
from .cyclades import CycladesClient as cyclades
from .pithos import PithosClient as pithos
......@@ -31,40 +31,33 @@
# interpreted as representing official policies, either expressed
# or implied, of GRNET S.A.
import json
from . import Client, ClientError
from . import ClientError
from .http import HTTPClient
class ComputeClient(HTTPClient):
class ComputeClient(Client):
"""OpenStack Compute API 1.1 client"""
@property
def url(self):
url = self.config.get('compute_url') or self.config.get('url')
if not url:
raise ClientError('No URL was given')
return url
@property
def token(self):
token = self.config.get('compute_token') or self.config.get('token')
if not token:
raise ClientError('No token was given')
return token
def raise_for_status(self, r):
d = r.json
key = d.keys()[0]
val = d[key]
message = '%s: %s' % (key, val.get('message', ''))
details = val.get('details', '')
raise ClientError(message, r.status_code, details)
def list_servers(self, detail=False):
"""List servers, returned detailed output if detailed is True"""
path = '/servers/detail' if detail else '/servers'
reply = self.http_get(path)
return reply['servers']['values']
r = self.get(path, success=200)
return r.json['servers']['values']
def get_server_details(self, server_id):
"""Return detailed output on a server specified by its id"""
path = '/servers/%d' % server_id
reply = self.http_get(path)
return reply['server']
path = '/servers/%s' % (server_id,)
r = self.get(path, success=200)
return r.json['server']
def create_server(self, name, flavor_id, image_id, personality=None):
"""Submit request to create a new server
......@@ -78,13 +71,14 @@ class ComputeClient(HTTPClient):
The call returns a dictionary describing the newly created server.
"""
req = {'name': name, 'flavorRef': flavor_id, 'imageRef': image_id}
req = {'server': {'name': name,
'flavorRef': flavor_id,
'imageRef': image_id}}
if personality:
req['personality'] = personality
body = json.dumps({'server': req})
reply = self.http_post('/servers', body)
return reply['server']
r = self.post('/servers', json=req, success=202)
return r.json['server']
def update_server_name(self, server_id, new_name):
"""Update the name of the server as reported by the API.
......@@ -92,90 +86,92 @@ class ComputeClient(HTTPClient):
This call does not modify the hostname actually used by the server
internally.
"""
path = '/servers/%d' % server_id
body = json.dumps({'server': {'name': new_name}})
self.http_put(path, body)
path = '/servers/%s' % (server_id,)
req = {'server': {'name': new_name}}
self.put(path, json=req, success=204)
def delete_server(self, server_id):
"""Submit a deletion request for a server specified by id"""
path = '/servers/%d' % server_id
self.http_delete(path)
path = '/servers/%s' % (server_id,)
self.delete(path, success=204)
def reboot_server(self, server_id, hard=False):
"""Submit a reboot request for a server specified by id"""
path = '/servers/%d/action' % server_id
type = 'HARD' if hard else 'SOFT'
body = json.dumps({'reboot': {'type': type}})
self.http_post(path, body)
path = '/servers/%s/action' % (server_id,)
type = 'HARD' if hard else 'SOFT'
req = {'reboot': {'type': type}}
self.post(path, json=req, success=202)
def get_server_metadata(self, server_id, key=None):
path = '/servers/%d/meta' % server_id
path = '/servers/%s/meta' % (server_id,)
if key:
path += '/%s' % key
reply = self.http_get(path)
return reply['meta'] if key else reply['metadata']['values']
r = self.get(path, success=200)
return r.json['meta'] if key else r.json['metadata']['values']
def create_server_metadata(self, server_id, key, val):
path = '/servers/%d/meta/%s' % (server_id, key)
body = json.dumps({'meta': {key: val}})
reply = self.http_put(path, body, success=201)
return reply['meta']
req = {'meta': {key: val}}
r = self.put(path, json=req, success=201)
return r.json['meta']
def update_server_metadata(self, server_id, **metadata):
path = '/servers/%d/meta' % server_id
body = json.dumps({'metadata': metadata})
reply = self.http_post(path, body, success=201)
return reply['metadata']
path = '/servers/%d/meta' % (server_id,)
req = {'metadata': metadata}
r = self.post(path, json=req, success=201)
return r.json['metadata']
def delete_server_metadata(self, server_id, key):
path = '/servers/%d/meta/%s' % (server_id, key)
reply = self.http_delete(path)
self.delete(path, success=204)
def list_flavors(self, detail=False):
path = '/flavors/detail' if detail else '/flavors'
reply = self.http_get(path)
return reply['flavors']['values']
r = self.get(path, success=200)
return r.json['flavors']['values']
def get_flavor_details(self, flavor_id):
path = '/flavors/%d' % flavor_id
reply = self.http_get(path)
return reply['flavor']
r = self.get(path, success=200)
return r.json['flavor']
def list_images(self, detail=False):
path = '/images/detail' if detail else '/images'
reply = self.http_get(path)
return reply['images']['values']
r = self.get(path, success=200)
return r.json['images']['values']
def get_image_details(self, image_id):
path = '/images/%s' % image_id
reply = self.http_get(path)
return reply['image']
path = '/images/%s' % (image_id,)
r = self.get(path, success=200)
return r.json['image']
def delete_image(self, image_id):
path = '/images/%s' % image_id
self.http_delete(path)
path = '/images/%s' % (image_id,)
self.delete(path, success=204)
def get_image_metadata(self, image_id, key=None):
path = '/images/%s/meta' % image_id
path = '/images/%s/meta' % (image_id,)
if key:
path += '/%s' % key
reply = self.http_get(path)
return reply['meta'] if key else reply['metadata']['values']
r = self.get(path, success=200)
return r.json['meta'] if key else r.json['metadata']['values']
def create_image_metadata(self, image_id, key, val):
path = '/images/%s/meta/%s' % (image_id, key)
body = json.dumps({'meta': {key: val}})
reply = self.http_put(path, body, success=201)
return reply['meta']
req = {'meta': {key: val}}
r = self.put(path, json=req, success=201)
return r.json['meta']
def update_image_metadata(self, image_id, **metadata):
path = '/images/%s/meta' % image_id
body = json.dumps({'metadata': metadata})
reply = self.http_post(path, body, success=201)
return reply['metadata']
path = '/images/%s/meta' % (image_id,)
req = {'metadata': metadata}
r = self.post(path, json=req, success=201)
return r.json['metadata']
def delete_image_metadata(self, image_id, key):
path = '/images/%s/meta/%s' % (image_id, key)
self.http_delete(path)
self.delete(path, success=204)
......@@ -31,8 +31,6 @@
# interpreted as representing official policies, either expressed
# or implied, of GRNET S.A.
import json
from .compute import ComputeClient
......@@ -41,22 +39,25 @@ class CycladesClient(ComputeClient):
def start_server(self, server_id):
"""Submit a startup request for a server specified by id"""
path = '/servers/%d/action' % server_id
body = json.dumps({'start': {}})
self.http_post(path, body)
path = '/servers/%s/action' % (server_id,)
req = {'start': {}}
self.post(path, json=req, success=202)
def shutdown_server(self, server_id):
"""Submit a shutdown request for a server specified by id"""
path = '/servers/%d/action' % server_id
body = json.dumps({'shutdown': {}})
self.http_post(path, body)
path = '/servers/%s/action' % (server_id,)
req = {'shutdown': {}}
self.post(path, json=req, success=202)
def get_server_console(self, server_id):
"""Get a VNC connection to the console of a server specified by id"""
path = '/servers/%d/action' % server_id
body = json.dumps({'console': {'type': 'vnc'}})
reply = self.http_post(path, body, success=200)
return reply['console']
path = '/servers/%s/action' % (server_id,)
req = {'console': {'type': 'vnc'}}
r = self.post(path, json=req, success=200)
return r.json['console']
def set_firewall_profile(self, server_id, profile):
"""Set the firewall profile for the public interface of a server
......@@ -64,53 +65,56 @@ class CycladesClient(ComputeClient):
The server is specified by id, the profile argument
is one of (ENABLED, DISABLED, PROTECTED).
"""
path = '/servers/%d/action' % server_id
body = json.dumps({'firewallProfile': {'profile': profile}})
self.http_post(path, body)
path = '/servers/%s/action' % (server_id,)
req = {'firewallProfile': {'profile': profile}}
self.post(path, json=req, success=202)
def list_server_addresses(self, server_id, network=None):
path = '/servers/%d/ips' % server_id
path = '/servers/%s/ips' % (server_id,)
if network:
path += '/%s' % network
reply = self.http_get(path)
return [reply['network']] if network else reply['addresses']['values']
r = self.get(path, success=200)
if network:
return [r.json['network']]
else:
return r.json['addresses']['values']
def get_server_stats(self, server_id):
path = '/servers/%d/stats' % server_id
reply = self.http_get(path)
return reply['stats']
path = '/servers/%s/stats' % (server_id,)
r = self.get(path, success=200)
return r.json['stats']
def list_networks(self, detail=False):
path = '/networks/detail' if detail else '/networks'
reply = self.http_get(path)
return reply['networks']['values']
r = self.get(path, success=200)
return r.json['networks']['values']
def create_network(self, name):
body = json.dumps({'network': {'name': name}})
reply = self.http_post('/networks', body)
return reply['network']
req = {'network': {'name': name}}
r = self.post('/networks', json=req, success=202)
return r.json['network']
def get_network_details(self, network_id):
path = '/networks/%s' % network_id
reply = self.http_get(path)
return reply['network']
path = '/networks/%s' % (network_id,)
r = self.get(path, success=200)
return r.json['network']
def update_network_name(self, network_id, new_name):
path = '/networks/%s' % network_id
body = json.dumps({'network': {'name': new_name}})
self.http_put(path, body)
path = '/networks/%s' % (network_id,)
req = {'network': {'name': new_name}}
self.put(path, json=req, success=204)
def delete_network(self, network_id):
path = '/networks/%s' % network_id
self.http_delete(path)
path = '/networks/%s' % (network_id,)
self.delete(path, success=204)
def connect_server(self, server_id, network_id):
path = '/networks/%s/action' % network_id
body = json.dumps({'add': {'serverRef': server_id}})
self.http_post(path, body)
path = '/networks/%s/action' % (network_id,)
req = {'add': {'serverRef': server_id}}
self.post(path, json=req, success=202)
def disconnect_server(self, server_id, network_id):
path = '/networks/%s/action' % network_id
body = json.dumps({'remove': {'serverRef': server_id}})
self.http_post(path, body)
path = '/networks/%s/action' % (network_id,)
req = {'remove': {'serverRef': server_id}}
self.post(path, json=req, success=202)
......@@ -32,59 +32,86 @@
# or implied, of GRNET S.A.
import hashlib
import json
import os
from . import ClientError
from .storage import StorageClient
from ..utils import OrderedDict
from .storage import StorageClient
def pithos_hash(block, blockhash):
h = hashlib.new(blockhash)
h.update(block.rstrip('\x00'))
return h.hexdigest()
class PithosClient(StorageClient):
"""GRNet Pithos API client"""
def put_block(self, data, hash):
path = '/%s/%s?update' % (self.account, self.container)
path = '/%s/%s' % (self.account, self.container)
params = {'update': ''}
headers = {'Content-Type': 'application/octet-stream',
'Content-Length': len(data)}
resp, reply = self.raw_http_cmd('POST', path, data, headers,
success=202)
assert reply.strip() == hash, 'Local hash does not match server'
'Content-Length': str(len(data))}
r = self.post(path, params=params, data=data, headers=headers,
success=202)
assert r.text.strip() == hash, 'Local hash does not match server'
def create_object(self, object, f):
meta = self.get_container_meta()
def create_object(self, object, f, hash_cb=None, upload_cb=None):
"""Create an object by uploading only the missing blocks
hash_cb is a generator function taking the total number of blocks to
be hashed as an argument. Its next() will be called every time a block
is hashed.
upload_cb is a generator function with the same properties that is
called every time a block is uploaded.
"""
self.assert_container()
meta = self.get_container_meta(self.container)
blocksize = int(meta['block-size'])
blockhash = meta['block-hash']
size = 0
file_size = os.fstat(f.fileno()).st_size
nblocks = 1 + (file_size - 1) // blocksize
hashes = OrderedDict()
data = f.read(blocksize)
while data:
bytes = len(data)
h = hashlib.new(blockhash)
h.update(data.rstrip('\x00'))
hash = h.hexdigest()
size = 0
if hash_cb:
hash_gen = hash_cb(nblocks)
hash_gen.next()
for i in range(nblocks):
block = f.read(blocksize)
bytes = len(block)
hash = pithos_hash(block, blockhash)
hashes[hash] = (size, bytes)
size += bytes
data = f.read(blocksize)
if hash_cb:
hash_gen.next()
assert size == file_size
path = '/%s/%s/%s?hashmap&format=json' % (self.account, self.container,
object)
path = '/%s/%s/%s' % (self.account, self.container, object)
params = {'hashmap': '', 'format': 'json'}
hashmap = dict(bytes=size, hashes=hashes.keys())
req = json.dumps(hashmap)
resp, reply = self.raw_http_cmd('PUT', path, req, success=None)
if resp.status not in (201, 409):
raise ClientError('Invalid response from the server')
r = self.put(path, params=params, json=hashmap, success=(201, 409))
if resp.status == 201:
if r.status_code == 201:
return
missing = json.loads(reply)
missing = r.json
if upload_cb:
upload_gen = upload_cb(len(missing))
upload_gen.next()
for hash in missing:
offset, bytes = hashes[hash]
f.seek(offset)
data = f.read(bytes)
self.put_block(data, hash)
if upload_cb: