Commit 8f3fdcf4 authored by Makis Tsantekidis's avatar Makis Tsantekidis

Merge pull request #31 from efikalti/fixes

Create cluster using kamaki python api
parents bac90107 e883afed
......@@ -59,5 +59,5 @@ docs/_build/
# PyBuilder
target/
ansible/hosts
MANIFEST
old_kamaki
#
# config file for ansible
# https://raw.githubusercontent.com/ansible/ansible/devel/examples/ansible.cfg
#
[defaults]
remote_user = root
hostfile = hosts
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from __future__ import (absolute_import, division,
print_function, unicode_literals)
import os
"""
This module contains the definitions of returned errors.
"""
# Definitions of return value errors
error_syntax_clustersize = -1
error_syntax_cpu_master = -2
error_syntax_ram_master = -3
error_syntax_disk_master = -4
error_syntax_cpu_slave = -5
error_syntax_ram_slave = -6
error_syntax_disk_slave = -7
error_syntax_logging_level = -8
error_syntax_disk_template = -9
error_quotas_cyclades_disk = -10
error_quotas_cpu = -11
error_quotas_ram = -12
error_quotas_cluster_size = -13
error_quotas_network = -14
error_flavor_id = -15
error_image_id = -16
error_syntax_token = -17
error_ready_reroute = -18
error_no_arguments = -19
error_fatal = -20
error_user_quota = -22
error_flavor_list = -23
error_get_list_servers = -24
error_get_list_projects = -25
error_get_network_quota = -28
error_create_network = -29
error_get_ip = -30
error_create_server = -31
error_syntax_auth_token = -32
error_ansible_playbook = -34
error_ssh_client = -35
error_cluster_not_exist = -69
error_cluster_corrupt = -70
error_proj_id = -71
error_multiple_entries = -72
error_project_quota = -73
error_authentication = -99
from __future__ import (absolute_import, division,
print_function, unicode_literals)
import logging
import re
import argparse
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
......@@ -9,11 +11,13 @@ from kamaki.clients import astakos, cyclades
from kamaki.clients import ClientError
from kamaki.clients.utils import https
from kamaki.cli.config import Config as KamakiConfig
from kamaki import defaults
from fokia.cluster_error_constants import *
from Crypto.PublicKey import RSA
from base64 import b64encode
# TODO: remove this and actually use ssl cert files
https.patch_ignore_ssl()
import argparse
if not defaults.CACERTS_DEFAULT_PATH:
https.patch_with_certs(CA_CERTS_PATH)
storage_templates = ['drdb', 'ext_vlmc']
......@@ -46,6 +50,23 @@ class Provisioner:
logger.info("Initiating Cyclades client")
self.cyclades = cyclades.CycladesComputeClient(compute_url, auth_token)
# Create the network client
networkURL = self.astakos.get_endpoint_url(
cyclades.CycladesNetworkClient.service_type)
self.network_client = cyclades.CycladesNetworkClient(networkURL, auth_token)
# Constants
self.Bytes_to_GB = 1024*1024*1024
self.Bytes_to_MB = 1024*1024
self.master = None
self.ips = None
self.slaves = None
self.vpn = None
self.subnet = None
self.private_key = None
self.image_id = 'c6f5adce-21ad-4ce3-8591-acfe7eb73c02'
def find_flavor(self, **kwargs):
"""
......@@ -92,34 +113,418 @@ class Provisioner:
logger.info("Retrieving project")
return self.astakos.get_projects(**filter)[0]
def create_vm(self, vm_name=None, **kwargs):
def create_vm(self, vm_name=None, image_id=None, ip=None, personality=None, **kwargs):
"""
:param vm_name: Name of the virtual machine to create
:param image_id: image id if you want another image than the default
:param kwargs: passed to the functions called for detail options
:return:
"""
flavor_id = self.find_flavor(**kwargs)['id']
image_id = self.find_image(**kwargs)['id']
# Get image
if image_id == None:
image_id = self.image_id
else:
image_id = self.find_image(**kwargs)['id']
project_id = self.find_project_id(**kwargs)['id']
networks = list()
if ip != None:
ip_obj = dict()
ip_obj['uuid'] = ip['floating_network_id']
ip_obj['fixed_ip'] = ip['floating_ip_address']
networks.append(ip_obj)
networks.append({'uuid': kwargs['net_id']})
if personality == None:
personality = []
try:
okeanos_response = self.cyclades.create_server(name=vm_name, flavor_id=flavor_id,
image_id=image_id,
project_id=project_id,
networks=[], personality=[])
networks=networks, personality=personality)
except ClientError as ex:
raise ex
return okeanos_response
def create_lambda_cluster(self, vm_name, **kwargs):
"""
:param vm_name: hostname of the master
:param kwargs: contains specifications of the vms.
:return: dictionary object with the nodes of the cluster if it was successfully created
"""
quotas = self.get_quotas()
vcpus = kwargs['slaves'] * kwargs['vcpus_slave'] + kwargs['vcpus_master']
ram = kwargs['slaves'] * kwargs['ram_slave'] + kwargs['ram_master']
disk = kwargs['slaves'] * kwargs['disk_slave'] + kwargs['disk_master']
project_id = self.find_project_id(**kwargs)['id']
response = self.check_all_resources(quotas, cluster_size=kwargs['cluster_size'],
vcpus=vcpus,
ram=ram,
disk=disk,
ip_request=kwargs['ip_request'],
network_request=kwargs['network_request'],
project_name=kwargs['project_name'])
if response:
# Get ssh keys
key = RSA.generate(2048)
self.private_key = key.exportKey('PEM')
pub_key = key.publickey().exportKey('OpenSSH') + ' root'
public = dict(contents=b64encode(pub_key),
path='/root/.ssh/id_rsa.pub',
owner='root', group='root', mode=0600)
authorized = dict(contents=b64encode(pub_key),
path='/root/.ssh/authorized_keys',
owner='root', group='root', mode=0600)
private = dict(contents=b64encode(self.private_key),
path='/root/.ssh/id_rsa',
owner='root', group='root', mode=0600)
master_personality = []
master_personality.append(authorized)
master_personality.append(public)
master_personality.append(private)
slave_personality = []
slave_personality.append(authorized)
print(master_personality)
print(slave_personality)
# Create private network for cluster
self.vpn = self.create_vpn('lambda-vpn', project_id=project_id)
vpn_id = self.vpn['id']
self.create_private_subnet(vpn_id)
#reserve ip
ip_request=kwargs['ip_request']
self.ips = list()
for i in range(ip_request):
ip = self.reserve_ip(project_id=project_id)
self.ips.append(ip)
ip = None
# Create master
if len(self.ips) > 0:
ip = self.ips[0]
self.master = self.create_vm(vm_name=vm_name, ip=ip,
net_id=vpn_id,
vcpus=kwargs['vcpus_master'],
ram=kwargs['ram_master'],
disk=kwargs['disk_master'],
personality=master_personality,
**kwargs)
# Create slaves
self.slaves = list()
for i in range(kwargs['slaves']):
ip = None
if len(self.ips) > i+1:
ip = self.ips[i+1]
slave_name = 'lambda-node' + str(i+1)
slave = self.create_vm(vm_name=slave_name,
ip=ip,
net_id=vpn_id,
vcpus=kwargs['vcpus_slave'],
ram=kwargs['ram_slave'],
disk=kwargs['disk_slave'],
personality=slave_personality,
**kwargs)
self.slaves.append(slave)
# Wait for VMs to complete being built
self.cyclades.wait_server(server_id=self.master['id'])
for slave in self.slaves:
self.cyclades.wait_server(slave['id'])
# Create cluster dictionary object
inventory = dict()
inventory["master"] = self.master
inventory["slaves"] = self.slaves
return inventory
def get_cluster_details(self):
"""
:returns: dictionary of basic details for the cluster
"""
details = dict()
nodes = dict()
master = dict()
master['id'] = self.master['id']
master['name'] = self.master['name']
master['adminPass'] = self.master['adminPass']
nodes['master'] = master
slaves = list()
for slave in self.slaves:
slave_obj = dict()
slave_obj['id'] = slave['id']
slave_obj['name'] = slave['name']
name = slave_obj['name']
slaves.append(slave_obj)
nodes['slaves'] = slaves
details['nodes'] = nodes
vpn = dict()
vpn['id'] = self.vpn['id']
vpn['type'] = self.vpn['type']
details['vpn'] = vpn
details['ips'] = self.ips
ips_list = list()
for ip in self.ips:
ip_obj = dict()
ip_obj['floating_network_id'] = ip['floating_network_id']
ip_obj['floating_ip_address'] = ip['floating_ip_address']
ip_obj['id'] = ip['id']
ips_list.append(ip_obj)
details['ips'] = ips_list
subnet = dict()
subnet['id'] = self.subnet['id']
subnet['cidr'] = self.subnet['cidr']
subnet['gateway_ip'] = self.subnet['gateway_ip']
details['subnet'] = subnet
return details
def get_private_key(self):
"""
:returns: Private key of master
"""
return self.private_key
def create_vpn(self, network_name, project_id):
"""
Creates a virtual private network
:param network_name: name of the network
:return: the virtual network object
"""
try:
# Create vpn with custom type and the name given as argument
vpn = self.network_client.create_network(
type=self.network_client.network_types[1],
name=network_name,
project_id=project_id)
return vpn
except ClientError as ex:
raise ex
return okeanos_response
def destroy_vpn(self, id):
"""
Destroy a virtual private network
:param id: id of the network we want to destroy
:return: True if successfull
"""
try:
self.network_client.delete_network(id)
return True
except ClientError as ex:
raise ex
return okeanos_response
def reserve_ip(self,project_id):
"""
Reserve ip
:return: the ip object if successfull
"""
try:
ip = self.network_client.create_floatingip(project_id=project_id)
return ip
except ClientError as ex:
raise ex
return okeanos_response
def create_private_subnet(self, net_id, cidr='192.168.0.0/24', gateway_ip='192.168.0.1'):
"""
Creates a private subnets and connects it with this network
:param net_id: id of the network
:return: the id of the subnet if successfull
"""
try:
subnet = self.network_client.create_subnet(net_id, cidr,
gateway_ip=gateway_ip,
enable_dhcp=True)
self.subnet = subnet
return subnet['id']
except ClientError as ex:
raise ex
return okeanos_response
def connect_vm(self, vm_id, net_id):
"""
Connects the vm with this id to the network with the net_id
:param vm_id: id of the vm
:param net_id: id of the network
:return: returns True if successfull
"""
try:
port = self.network_client.create_port(network_id=net_id,
device_id=vm_id)
return True
except ClientError as ex:
raise ex
return okeanos_response
def attach_authorized_ip(self, ip, vm_id):
"""
Attach the authorized ip with this id to the vm
:param fnet_id: id of the floating network of the ip
:param vm_id: id of the vm
:return: returns True if successfull
"""
try:
port = self.network_client.create_port(network_id=ip['floating_network_id'],
device_id=vm_id,
fixed_ips=[dict(ip_address=ip['floating_ip_address']), ])
return True
except ClientError as ex:
raise ex
return okeanos_response
def get_quotas(self, **kwargs):
"""
Get the user quotas for the defined project.
:return: user quotas object
"""
return self.astakos.get_quotas()
def get_server_info(self, server_id):
"""
"""
return self.cyclades.get_server_details(server_id=server_id)
def get_server_authorized_ip(self, server_id):
"""
:param server_id: id of the server
:returns: the authorized ip of the server if it has one,else None
"""
addresses = self.get_server_info(server_id=server_id)['addresses']
for key in list(addresses.keys()):
ip = addresses[key][0]['addr']
if '192.168.0' not in ip and not re.search('[a-zA-Z]', ip):
return ip
return None
def get_server_private_ip(self, server_id):
"""
:param server_id: id of the server
:returns: the private ip of the server if it has one,else None
"""
addresses = self.get_server_info(server_id=server_id)['addresses']
for key in list(addresses.keys()):
ip = addresses[key][0]['addr']
if '192.168.0' in ip:
return ip
return None
def check_all_resources(self, quotas, **kwargs):
"""
Checks user's quota for every requested resource.
Returns True if everything available.
:param **kwargs: arguments
"""
project_id = self.find_project_id(**kwargs)['id']
# quotas = self.get_quotas()
# Check for VMs
pending_vm = quotas[project_id]['cyclades.vm']['project_pending']
limit_vm = quotas[project_id]['cyclades.vm']['project_limit']
usage_vm = quotas[project_id]['cyclades.vm']['project_usage']
available_vm = limit_vm - usage_vm - pending_vm
if available_vm < kwargs['cluster_size']:
msg = 'Cyclades VMs out of limit'
raise ClientError(msg, error_quotas_cluster_size)
return False
# Check for CPUs
pending_cpu = quotas[project_id]['cyclades.cpu']['project_pending']
limit_cpu = quotas[project_id]['cyclades.cpu']['project_limit']
usage_cpu = quotas[project_id]['cyclades.cpu']['project_usage']
available_cpu = limit_cpu - usage_cpu - pending_cpu
if available_cpu < kwargs['vcpus']:
msg = 'Cyclades cpu out of limit'
raise ClientError(msg, error_quotas_cpu)
return False
# Check for RAM
pending_ram = quotas[project_id]['cyclades.ram']['project_pending']
limit_ram = quotas[project_id]['cyclades.ram']['project_limit']
usage_ram = quotas[project_id]['cyclades.ram']['project_usage']
available_ram = (limit_ram - usage_ram - pending_ram) / self.Bytes_to_MB
if available_ram < kwargs['ram']:
msg = 'Cyclades ram out of limit'
raise ClientError(msg, error_quotas_ram)
return False
# Check for Disk space
pending_cd = quotas[project_id]['cyclades.ram']['project_pending']
limit_cd = quotas[project_id]['cyclades.disk']['project_limit']
usage_cd = quotas[project_id]['cyclades.disk']['project_usage']
available_cyclades_disk_GB = (limit_cd - usage_cd - pending_cd) / self.Bytes_to_GB
if available_cyclades_disk_GB < kwargs['disk']:
msg = 'Cyclades disk out of limit'
raise ClientError(msg, error_quotas_cyclades_disk)
return False
# Check for authorized IPs
list_float_ips = self.network_client.list_floatingips()
pending_ips = quotas[project_id]['cyclades.floating_ip']['project_pending']
limit_ips = quotas[project_id]['cyclades.floating_ip']['project_limit']
usage_ips = quotas[project_id]['cyclades.floating_ip']['project_usage']
available_ips = limit_ips - usage_ips - pending_ips
for d in list_float_ips:
if d['instance_id'] is None and d['port_id'] is None:
available_ips += 1
if available_ips < kwargs['ip_request']:
msg = 'authorized IPs out of limit'
raise ClientError(msg, error_get_ip)
return False
# Check for networks
pending_net = quotas[project_id]['cyclades.network.private']['project_pending']
limit_net = quotas[project_id]['cyclades.network.private']['project_limit']
usage_net = quotas[project_id]['cyclades.network.private']['project_usage']
available_networks = limit_net - usage_net - pending_net
if available_networks < kwargs['network_request']:
msg = 'Private Network out of limit'
raise ClientError(msg, error_get_network_quota)
return False
return True
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Okeanos VM provisioning")
parser.add_argument('--cloud', type=str, dest="cloud", default="lambda")
parser.add_argument('--cloud', type=str, dest="cloud", default="~okeanos")
parser.add_argument('--project-name', type=str, dest="project_name",
default="lambda.grnet.gr")
parser.add_argument('--name', type=str, dest='name', default="to mikro debian sto livadi")
parser.add_argument('--slaves', type=int, dest='slaves', default=1)
parser.add_argument('--vcpus_master', type=int, dest='vcpus_master', default=4)
parser.add_argument('--vcpus_slave', type=int, dest='vcpus_slave', default=4)
parser.add_argument('--ram_master', type=int, dest='ram_master', default=4096) # in MB
parser.add_argument('--ram_slave', type=int, dest='ram_slave', default=4096) # in MB
parser.add_argument('--disk_master', type=int, dest='disk_master', default=40) # in GB
parser.add_argument('--disk_slave', type=int, dest='disk_slave', default=40) # in GB
parser.add_argument('--ip_request', type=int, dest='ip_request', default=1)
parser.add_argument('--network_request', type=int, dest='network_request', default=1)
parser.add_argument('--image_name', type=str, dest='image_name', default="debian")
parser.add_argument('--cluster_size', type=int, dest='cluster_size', default=2)
args = parser.parse_args()
provisioner = Provisioner(cloud_name=args.cloud)
"""
print(provisioner.create_vm(vm_name=args.name, project_name=args.project_name,
image_name="debian"))
image_name="debian"))
"""
response = provisioner.create_lambda_cluster(vm_name="lambda-master" , slaves=args.slaves,
cluster_size=args.cluster_size,
vcpus_master=args.vcpus_master,
vcpus_slave=args.vcpus_slave,
ram_master=args.ram_master,
ram_slave=args.ram_slave,
disk_master=args.disk_master,
disk_slave=args.disk_slave,
ip_request=args.ip_request,
network_request=args.network_request,
project_name=args.project_name)
# print(response)
# print(provisioner.get_cluster_details())
# print(provisioner.get_private_key())
File mode changed from 100644 to 100755
......@@ -124,6 +124,44 @@ test_projects = [{u'creation_date': u'2015-06-09T09:46:44.327826+00:00',
u'state': u'active',
u'system_project': False}]
test_quotas = { '6ff62e8e-0ce9-41f7-ad99-13a18ecada5f':
{'cyclades.disk':
{'project_limit': 1288490188800, 'project_pending': 0, 'project_usage': 64424509440, 'usage': 0, 'limit': 322122547200, 'pending': 0},
'cyclades.vm':
{'project_limit': 60, 'project_pending': 0, 'project_usage': 2, 'usage': 0, 'limit': 15, 'pending': 0},
'pithos.diskspace':
{'project_limit': 429496729600, 'project_pending': 0, 'project_usage': 0, 'usage': 0, 'limit': 107374182400, 'pending': 0},
'cyclades.ram':
{'project_limit': 128849018880, 'project_pending': 0, 'project_usage': 12884901888, 'usage': 0, 'limit': 32212254720, 'pending': 0},
'cyclades.cpu':
{'project_limit': 120, 'project_pending': 0, 'project_usage': 12, 'usage': 0, 'limit': 30, 'pending': 0},
'cyclades.floating_ip':
{'project_limit': 10, 'project_pending': 0, 'project_usage': 6, 'usage': 3, 'limit': 4, 'pending': 0},
'cyclades.network.private':
{'project_limit': 10, 'project_pending': 0, 'project_usage': 7, 'usage': 0, 'limit': 4, 'pending': 0},
'astakos.pending_app':
{'project_limit': 0, 'project_pending': 0, 'project_usage': 0, 'usage': 0, 'limit': 0, 'pending': 0}} }
test_ip = {u'floating_network_id':
u'2186', u'user_id':
u'9819231a-e9e2-40f7-93f1-e2e4cb50cc33',
u'deleted': False, u'tenant_id':
u'9819231a-e9e2-40f7-93f1-e2e4cb50cc33',
u'instance_id': None, u'fixed_ip_address': None,
u'floating_ip_address':
u'83.212.116.58',
u'port_id': None,
u'id': u'684011'}
test_vm = {u'addresses': {}, u'links': [{u'href': u'https://cyclades.okeanos.grnet.gr/compute/v2.0/servers/665007', u'rel': u'self'},
{u'href': u'https://cyclades.okeanos.grnet.gr/compute/v2.0/servers/665007', u'rel': u'bookmark'}], u'image':
{u'id': u'0e399015-8723-4c78-8198-75bdf693cdde', u'links': [
{u'href': u'https://cyclades.okeanos.grnet.gr/compute/v2.0/images/0e399015-8723-4c78-8198-75bdf693cdde', u'rel': u'self'},
{u'href': u'https://cyclades.okeanos.grnet.gr/compute/v2.0/images/0e399015-8723-4c78-8198-75bdf693cdde', u'rel': u'bookmark'},
{u'href': u'https://cyclades.okeanos.grnet.gr/image/v1.0/images/0e399015-8723-4c78-8198-75bdf693cdde', u'rel': u'alternate'}]},
u'suspended': False, u'flavor': {u'id': 3, u'links': [{u'href': u'https://cyclades.okeanos.grnet.gr/compute/v2.0/flavors/3', u'rel': u'self'},
{u'href': u'https://cyclades.okeanos.grnet.gr/compute/v2.0/flavors/3', u'rel': u'bookmark'}]}, u'id': 665007, u'security_groups': [
{u'name': u'default'}], u'attachments': [], u'user_id': u'9819231a-e9e2-40f7-93f1-e2e4cb50cc33', u'accessIPv4': u'', u'accessIPv6': u'', u'progress': 0, u'config_drive': u'', u'status': u'BUILD', u'updated': u'2015-07-10T07:13:25.973280+00:00', u'hostId': u'', u'SNF:fqdn': u'snf-665007.vm.okeanos.grnet.gr', u'deleted': False, u'key_name': None, u'name': u'to mikro debian sto livadi', u'adminPass': u'q0WVXWIjc4', u'tenant_id': u'6ff62e8e-0ce9-41f7-ad99-13a18ecada5f', u'created': u'2015-07-10T07:13:24.862714+00:00', u'SNF:task_state': u'BUILDING', u'volumes': [50722], u'diagnostics': [], u'metadata': {u'os': u'debian', u'users': u'root ckaner'}, u'SNF:port_forwarding': {}}
def test_find_flavor():
with mock.patch('fokia.provisioner.astakos'), \
......@@ -140,6 +178,43 @@ def test_find_flavor():
name='tost', image_id=u'0035ac89-a86e-4108-93e8-93e294b74a3d', flavor_id=3,
project_id=u'6ff62e8e-0ce9-41f7-ad99-13a18ecada5f', networks=[], personality=[])
def test_check_all_resources():
with mock.patch('fokia.provisioner.astakos'), \
mock.patch('fokia.provisioner.KamakiConfig'), \
mock.patch('fokia.provisioner.cyclades'):
provisioner = Provisioner("lambda")
provisioner.astakos.get_projects.return_value = test_projects
provisioner.astakos.get_quotas.return_value = test_quotas
provisioner.check_all_resources(test_quotas, project_id=u'6ff62e8e-0ce9-41f7-ad99-13a18ecada5f',
slaves=2,
cluster_size=3,
vcpus=12,
ram=4096*3,
disk=180,
ip_request=1,
network_request=1)
def test_create_vpn():
with mock.patch('fokia.provisioner.astakos'), \
mock.patch('fokia.provisioner.KamakiConfig'), \
mock.patch('fokia.provisioner.cyclades'):
provisioner = Provisioner("lambda")
provisioner.network_client.create_network = test_ip
provisioner.reserve_ip()
def test_create_vm():
with mock.patch('fokia.provisioner.astakos'), \
mock.patch('fokia.provisioner.KamakiConfig'), \
mock.patch('fokia.provisioner.cyclades'):
provisioner = Provisioner("lambda")
provisioner.cyclades.create_server = test_vm
def test_connect_vm():
with mock.patch('fokia.provisioner.astakos'), \