Commit aacff6b9 authored by Stavros Sachtouris's avatar Stavros Sachtouris
Browse files

Merge branch 'feature-cluster' into develop

parents ac94e1aa 40ddc207
......@@ -111,7 +111,7 @@ Here are the general purpose accessors offered by Config:
* get(section, option): get the *value* of an *option* in the specified
*section* e.g.,
.. code-block:: python__
.. code-block:: python
# Example: get the default cloud (global.default_cloud option)
......
......@@ -196,3 +196,216 @@ You can now log to your remote virtual server as root, without a password. Well
adequate understanding of the remote OS are encouraged to prepare and
inject all kinds of useful files, e.g., **lists of package sources**,
**default user profiles**, **device mount configurations**, etc.
Clusters of virtual servers
---------------------------
A cluster of virtual servers can be created and deleted using special
arguments.
A convention is necessary: all servers belonging to the same cluster will have
names with a common prefix e.g., *cluster1*, *cluster2*, etc. This prefix
acts as the cluster name or the cluster key. Still, users must be careful not to
confuse cluster servers with other servers that coincidentally have the same
prefix (e.g., *cluster_of_stars*).
First, let's create a cluster of 4 servers. Each server will run the image with
id *f1r57-1m4g3-1d* on the hardware specified by the flavor with id *1*. The
prefix of the cluster will be "my cluster "
.. code-block:: console
$ kamaki
[kamaki]: server
[server]: create "my cluster " 1 f1r57-1m4g3-1d --cluster-size=4 --wait
... <omitted for clarity>
adminPass: S0mePassw0rd0n3
created: 2013-06-19T12:34:49.362078+00:00
flavor:
id: 1
id: 322
image:
id: f1r57-1m4g3-1d
name: my cluster 1
[progress bar waiting server to build]
Server 321: status is now ACTIVE
... <omitted for clarity>
adminPass: S0mePassw0rdTwo
created: 2013-06-19T12:34:47.362078+00:00
flavor:
id: 1
id: 321
image:
id: f1r57-1m4g3-1d
name: my cluster 2
[progress bar waiting server to build]
Server 322: status is now ACTIVE
... <omitted for clarity>
adminPass: S0mePassw0rdThree
created: 2013-06-19T12:34:55.362078+00:00
flavor:
id: 1
id: 323
image:
id: f1r57-1m4g3-1d
name: my cluster 3
[progress bar waiting server to build]
Server 323: status is now ACTIVE
... <omitted for clarity>
adminPass: S0mePassw0rdFour
created: 2013-06-19T12:34:59.362078+00:00
flavor:
id: 1
id: 324
image:
id: f1r57-1m4g3-1d
name: my cluster 4
[progress bar waiting server to build]
Server 324: status is now ACTIVE
.. note:: The creation dates are similar but not ordered. This is because the
servers are created asynchronously. To deactivate asynchronous operations
in kamaki, set max_theads to 1
.. code-block:: console
# Deactivate multithreading
[server]: /config set max_theads 1
.. note:: the *- - wait* argument is optional, but if not used, the *create*
call will terminate as long as the servers are spawned, even if they are
not built yet.
.. warning:: The server details (password, etc.) are printed in
**standard output** while the progress bar and notification messages are
printed in **standard error**
Now, let's see our clusters:
.. code-block:: console
[server]: list --name-prefix "my cluster "
321 my cluster 2
322 my cluster 1
323 my cluster 3
324 my cluster 4
For demonstration purposes, let's suppose that the maximum resource limit is
reached if we create 2 more servers. We will attempt to expand "my cluster" by
4 servers, expecting kamaki to raise a quota-related error.
.. code-block:: console
$ kamaki
[kamaki]: server
[server]: create "my cluster " 1 f1r57-1m4g3-1d --cluster-size=4 --wait
Failed to build 4 servers
Found 2 matching servers:
325 my cluster 1
326 my cluster 2
Check if any of these servers should be removed
(413) REQUEST ENTITY TOO LARGE overLimit (Resource Limit Exceeded for your
account.)
| Limit for resource 'Virtual Machine' exceeded for your account.
Available: 0, Requested: 1
The cluster expansion has failed, but 2 of the attempted 4 servers are being
created right now. It's up to the users judgment to destroy them or keep them.
First, we need to list all servers:
.. code-block:: console
[server] list --name-prefix="my cluster "
321 my cluster 2
322 my cluster 1
323 my cluster 3
324 my cluster 4
325 my cluster 1
326 my cluster 2
.. warning:: Kamaki will always create clusters by attaching an increment at
the right of the prefix. The increments always start from 1.
Now, our cluster seems messed up. Let's destroy it and rebuilt it.
.. code-block:: console
[server]: delete "my cluster " --cluster --wait
[progress bar waiting server to be deleted]
Server 321: status is now DELETED
[progress bar waiting server to be deleted]
Server 322: status is now DELETED
[progress bar waiting server to be deleted]
Server 323: status is now DELETED
[progress bar waiting server to be deleted]
Server 324: status is now DELETED
[progress bar waiting server to be deleted]
Server 325: status is now DELETED
[progress bar waiting server to be deleted]
Server 326: status is now DELETED
.. note:: *delete* performs a single deletion if feeded with a server id, but
it performs a mass deletion, based on the name, if called with --cluster
While creating the first cluster, we had to note down all passwords
The passwords for each server are printed on the console while creating them.
It would be far more convenient, though, if we could massively inject an ssh
key into all of them. Let's do that!
.. code-block:: console
[server]: create "my new cluster " 1 f1r57-1m4g3-1d --cluster-size=4 --wait --personality /home/someuser/.ssh/id_rsa.pub,/root/.ssh/authorized_keys,root,root,0777
... <output omitted for clarity>
Now, let's check if the cluster has been created.
.. code-block:: console
[server]: list --name-prefix="my new cluster "
321 my new cluster 1
322 my new cluster 2
323 my new cluster 3
324 my new cluster 4
We now have a cluster of 4 virtual servers and we can ssh in all of them
without a password.
Here is a bash script for creating clusters:
.. code-block:: bash
#!/bin/bash
CL_PREFIX="cluster"
CL_SIZE=4
PUB_KEYS="${HOME}/.ssh/id_rsa.pub"
OUT="cl_servers.txt"
CLOUD=`kamaki config get default_cloud`
FLAVOR_ID=1
IMAGE_ID="f1r57-1m4g3-1d"
echo "Clean up cluster \"${CL_PREFIX}\""
kamaki --cloud=${CLOUD} server delete ${CL_PREFIX} --cluster --wait
echo "Cluster \"${CL_PREFIX}\"" > ${OUT}
echo "Create cluster \"${CL_PREFIX}\" of size ${CL_SIZE}"
kamaki --cloud=${CLOUD} server create ${CL_PREFIX} ${FLAVOR_ID} ${IMAGE_ID}
--cluster-size=${CL_SIZE} --wait
--personality ${PUB_KEYS},/root/.ssh/authorized_keys,root,root >>${OUT}
echo "A list of created servers can be found at ${OUT}"
......@@ -375,23 +375,62 @@ class server_create(_init_cyclades, _optional_json, _server_wait):
arguments = dict(
personality=PersonalityArgument(
(80 * ' ').join(howto_personality), ('-p', '--personality')),
wait=FlagArgument('Wait server to build', ('-w', '--wait'))
wait=FlagArgument('Wait server to build', ('-w', '--wait')),
cluster_size=IntArgument(
'Create a cluster of servers of this size. In this case, the name'
'parameter is the prefix of each server in the cluster (e.g.,'
'srv1, srv2, etc.',
'--cluster-size')
)
@errors.cyclades.cluster_size
def _create_cluster(self, prefix, flavor_id, image_id, size):
servers = [dict(
name='%s%s' % (prefix, i),
flavor_id=flavor_id,
image_id=image_id,
personality=self['personality']) for i in range(1, 1 + size)]
if size == 1:
return [self.client.create_server(**servers[0])]
try:
r = self.client.async_run(self.client.create_server, servers)
return r
except Exception as e:
if size == 1:
raise e
try:
requested_names = [s['name'] for s in servers]
spawned_servers = [dict(
name=s['name'],
id=s['id']) for s in self.client.list_servers() if (
s['name'] in requested_names)]
self.error('Failed to build %s servers' % size)
self.error('Found %s matching servers:' % len(spawned_servers))
self._print(spawned_servers, out=self._err)
self.error('Check if any of these servers should be removed\n')
except Exception as ne:
self.error('Error (%s) while notifying about errors' % ne)
finally:
raise e
@errors.generic.all
@errors.cyclades.connection
@errors.plankton.id
@errors.cyclades.flavor_id
def _run(self, name, flavor_id, image_id):
print 'hey, wha?'
r = self.client.create_server(
name, int(flavor_id), image_id, personality=self['personality'])
usernames = self._uuids2usernames([r['user_id'], r['tenant_id']])
r['user_id'] += ' (%s)' % usernames[r['user_id']]
r['tenant_id'] += ' (%s)' % usernames[r['tenant_id']]
self._print(r, self.print_dict)
if self['wait']:
self._wait(r['id'], r['status'])
for r in self._create_cluster(
name, flavor_id, image_id, size=self['cluster_size'] or 1):
if not r:
self.error('Create %s: server response was %s' % (name, r))
continue
usernames = self._uuids2usernames(
[r['user_id'], r['tenant_id']])
r['user_id'] += ' (%s)' % usernames[r['user_id']]
r['tenant_id'] += ' (%s)' % usernames[r['tenant_id']]
self._print(r, self.print_dict)
if self['wait']:
self._wait(r['id'], r['status'])
self.writeln(' ')
def main(self, name, flavor_id, image_id):
super(self.__class__, self)._run()
......@@ -422,27 +461,41 @@ class server_delete(_init_cyclades, _optional_output_cmd, _server_wait):
"""Delete a virtual server"""
arguments = dict(
wait=FlagArgument('Wait server to be destroyed', ('-w', '--wait'))
wait=FlagArgument('Wait server to be destroyed', ('-w', '--wait')),
cluster=FlagArgument(
'(DANGEROUS) Delete all virtual servers prefixed with the cluster '
'prefix. In that case, the prefix replaces the server id',
'--cluster')
)
def _server_ids(self, server_var):
if self['cluster']:
return [s['id'] for s in self.client.list_servers() if (
s['name'].startswith(server_var))]
@errors.cyclades.server_id
def _check_server_id(self, server_id):
return server_id
return [_check_server_id(self, server_id=server_var), ]
@errors.generic.all
@errors.cyclades.connection
@errors.cyclades.server_id
def _run(self, server_id):
status = 'DELETED'
def _run(self, server_var):
for server_id in self._server_ids(server_var):
if self['wait']:
details = self.client.get_server_details(server_id)
status = details['status']
r = self.client.delete_server(int(server_id))
r = self.client.delete_server(server_id)
self._optional_output(r)
if self['wait']:
self._wait(server_id, status)
def main(self, server_id):
def main(self, server_id_or_cluster_prefix):
super(self.__class__, self)._run()
self._run(server_id=server_id)
self._run(server_id_or_cluster_prefix)
@command(server_cmds)
......
......@@ -190,6 +190,25 @@ class cyclades(object):
raise
return _raise
@classmethod
def cluster_size(this, foo):
def _raise(self, *args, **kwargs):
size = kwargs.get('size', None)
try:
size = int(size)
assert size > 0, 'Cluster size must be a positive integer'
return foo(self, *args, **kwargs)
except ValueError as ve:
msg = 'Invalid cluster size value %s' % size
raiseCLIError(ve, msg, importance=1, details=[
'Cluster size must be a positive integer'])
except AssertionError as ae:
raiseCLIError(
ae, 'Invalid cluster size %s' % size, importance=1)
except ClientError:
raise
return _raise
@classmethod
def network_id(this, foo):
def _raise(self, *args, **kwargs):
......@@ -199,7 +218,7 @@ class cyclades(object):
return foo(self, *args, **kwargs)
except ValueError as ve:
msg = 'Invalid network id %s ' % network_id
details = ['network id must be a positive integer']
details = 'network id must be a positive integer'
raiseCLIError(ve, msg, details=details, importance=1)
except ClientError as ce:
if network_id and ce.status == 404 and (
......@@ -255,7 +274,7 @@ class cyclades(object):
return foo(self, *args, **kwargs)
except ValueError as ve:
msg = 'Invalid flavor id %s ' % flavor_id,
details = 'Flavor id must be a positive integer',
details = 'Flavor id must be a positive integer'
raiseCLIError(ve, msg, details=details, importance=1)
except ClientError as ce:
if flavor_id and ce.status == 404 and (
......@@ -275,7 +294,7 @@ class cyclades(object):
return foo(self, *args, **kwargs)
except ValueError as ve:
msg = 'Invalid virtual server id %s' % server_id,
details = ['id must be a positive integer'],
details = 'Server id must be a positive integer'
raiseCLIError(ve, msg, details=details, importance=1)
except ClientError as ce:
err_msg = ('%s' % ce).lower()
......
......@@ -231,9 +231,7 @@ class _file_container_command(_file_account_command):
'Set container to work with (temporary)', ('-C', '--container'))
def extract_container_and_path(
self,
container_with_path,
path_is_optional=True):
self, container_with_path, path_is_optional=True):
"""Contains all heuristics for deciding what should be used as
container or path. Options are:
* user string of the form container:path
......
......@@ -369,6 +369,41 @@ class Client(Logged):
return []
return threadlist
def async_run(self, method, kwarg_list):
"""Fire threads of operations
:param method: the method to run in each thread
:param kwarg_list: (list of dicts) the arguments to pass in each method
call
:returns: (list) the results of each method call w.r. to the order of
kwarg_list
"""
flying, results = {}, {}
self._init_thread_limit()
for index, kwargs in enumerate(kwarg_list):
self._watch_thread_limit(flying.values())
flying[index] = SilentEvent(method=method, **kwargs)
flying[index].start()
unfinished = {}
for key, thread in flying.items():
if thread.isAlive():
unfinished[key] = thread
elif thread.exception:
raise thread.exception
else:
results[key] = thread.value
flying = unfinished
sendlog.info('- - - wait for threads to finish')
for key, thread in flying.items():
if thread.isAlive():
thread.join()
if thread.exception:
raise thread.exception
results[key] = thread.value
return results.values()
def _raise_for_status(self, r):
log.debug('raise err from [%s] of type[%s]' % (r, type(r)))
status_msg = getattr(r, 'status', None) or ''
......
......@@ -33,7 +33,6 @@
from kamaki.clients import ClientError
from kamaki.clients.compute.rest_api import ComputeRestClient
from kamaki.clients.utils import path4url
class ComputeClient(ComputeRestClient):
......
......@@ -31,11 +31,10 @@
# interpreted as representing official policies, either expressed
# or implied, of GRNET S.A.
from sys import stdout
from time import sleep
from kamaki.clients.cyclades.rest_api import CycladesRestClient
from kamaki.clients import ClientError
from kamaki.clients import ClientError, SilentEvent, sendlog
class CycladesClient(CycladesRestClient):
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment